You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/03/23 08:41:51 UTC

[GitHub] [flink-table-store] LadyForest opened a new pull request #58: [FLINK-26669] Refactor ReadWriteTableITCase

LadyForest opened a new pull request #58:
URL: https://github.com/apache/flink-table-store/pull/58


   * Add logPrimaryKey to preserve original pk index for log store


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi merged pull request #58: [FLINK-26669] Refactor ReadWriteTableITCase

Posted by GitBox <gi...@apache.org>.
JingsongLi merged pull request #58:
URL: https://github.com/apache/flink-table-store/pull/58


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi edited a comment on pull request #58: [FLINK-26669] Refactor ReadWriteTableITCase

Posted by GitBox <gi...@apache.org>.
JingsongLi edited a comment on pull request #58:
URL: https://github.com/apache/flink-table-store/pull/58#issuecomment-1076342347


   Some comments about test framework:
   - How to debug test in IDE?  `No tests found matching testSequentialWriteRead with parameter`....
   - parameter name is so so so long ... It is difficult to get effective information from it
   - expected result computation looks so complicated, how do we know the expect results are correct?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] LadyForest commented on a change in pull request #58: [FLINK-26669] Refactor ReadWriteTableITCase

Posted by GitBox <gi...@apache.org>.
LadyForest commented on a change in pull request #58:
URL: https://github.com/apache/flink-table-store/pull/58#discussion_r834245011



##########
File path: flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
##########
@@ -19,73 +19,242 @@
 package org.apache.flink.table.store.connector;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.table.api.TableResult;
-import org.apache.flink.table.planner.runtime.utils.TestData;
+import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
 import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.function.TriFunction;
 
+import org.apache.commons.lang3.tuple.Pair;
+import org.assertj.core.api.AbstractThrowableAssert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-
-import java.math.BigDecimal;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import scala.collection.JavaConverters;
-
 import static org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
 import static org.apache.flink.table.planner.factories.TestValuesTableFactory.registerData;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-/** IT cases for testing querying managed table dml. */
+/** IT cases for managed table dml. */
 @RunWith(Parameterized.class)
 public class ReadWriteTableITCase extends TableStoreTestBase {
 
-    private final boolean hasPk;
-    @Nullable private final Boolean duplicate;
+    private static final Logger LOG = LoggerFactory.getLogger(ReadWriteTableITCase.class);
+
+    private static final Map<Row, Pair<RowKind, Row>> PROCESSED_RECORDS = new LinkedHashMap<>();

Review comment:
       > This is a bad habit, even now without concurrent testing. It makes it difficult to understand the interactions between classes and methods. Here you can create a new Map, and pass the Map as a parameter to the `update` method.
   
   after split cases to different tests, I think we don't need this map anymore :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] LadyForest commented on pull request #58: [FLINK-26669] Refactor ReadWriteTableITCase

Posted by GitBox <gi...@apache.org>.
LadyForest commented on pull request #58:
URL: https://github.com/apache/flink-table-store/pull/58#issuecomment-1076279637


   Currently, two failed cases need to be debugged.
   
   The first case
   ```sql
   --executionMode-STREAMING, tableName-table_6fe1e20e-b871-49e6-98c6-cdc91157cbe2, enableLogStore-false
   -- helper source ddl
   CREATE TABLE `source_table_a1575cf0-feb9-427c-8c4b-0138eed2318e` (
     currency STRING,
     rate BIGINT,
    dt STRING, 
    PRIMARY KEY (currency, dt) NOT ENFORCED
   ) PARTITIONED BY (dt)
     WITH (
     'connector' = 'values',
     'bounded' = 'false',
     'data-id' = '10',
     'changelog-mode' = 'I,UA,D',
     'disable-lookup' = 'true',
     'partition-list' = 'dt:2022-01-01;dt:2022-01-02'
   );
   
   -- managed table ddl
   CREATE TABLE IF NOT EXISTS `catalog`.`database`.`table_6fe1e20e-b871-49e6-98c6-cdc91157cbe2`
    LIKE `source_table_a1575cf0-feb9-427c-8c4b-0138eed2318e` (EXCLUDING OPTIONS)
   
   -- insert query
   INSERT INTO `table_6fe1e20e-b871-49e6-98c6-cdc91157cbe2`
    SELECT * FROM `source_table_a1575cf0-feb9-427c-8c4b-0138eed2318e`
   
   -- select query
   SELECT * FROM `table_6fe1e20e-b871-49e6-98c6-cdc91157cbe2
   ```
   
   The read task is blocked at stream collecting results when `iterator.hasNext` is called, and finally timeout.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] LadyForest commented on pull request #58: [FLINK-26669] Refactor ReadWriteTableITCase

Posted by GitBox <gi...@apache.org>.
LadyForest commented on pull request #58:
URL: https://github.com/apache/flink-table-store/pull/58#issuecomment-1076507370


   
   > * How to debug test in IDE?  `No tests found matching testSequentialWriteRead with parameter`....
   
   Did you click a single case and rerun it? It seems like no way to rerun a single case by clicking the generated case.  I'm not sure whether it's an idea-related issue, I just saw a lot of youtrack tickets reporting this.
   
   > * parameter name is so so so long ... It is difficult to get effective information from it
   
   Indeed. Because junit4 doesn't support multiple data providers for different tests. So `logTestSpec` is used to log input parameters if debug level is enabled. An alternative way is to split test case to multiple tests to avoid using parameterized test runner.
   
   > * expected result computation looks so complicated, how do we know the expect results are correct?
   
   Ineed, but it seems like no better workaround. Do you think it's a good idea for us to provide the hardcoded expected results, it'll be tedious and long.
   
   Maybe we can split cases to different test methods


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] LadyForest commented on a change in pull request #58: [FLINK-26669] Refactor ReadWriteTableITCase

Posted by GitBox <gi...@apache.org>.
LadyForest commented on a change in pull request #58:
URL: https://github.com/apache/flink-table-store/pull/58#discussion_r834917705



##########
File path: flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/BlockingIterator.java
##########
@@ -65,6 +65,15 @@ public BlockingIterator(Iterator<IN> iterator, Function<IN, OUT> converter) {
         }
     }
 
+    public List<OUT> collectAndCloseQuietly(int limit) {

Review comment:
       > Why do you want to be quiet? Just throw the exception?
   
   because the `collect()` is direct returns an empty list when the expected num is zero. What I want to test is to let time pass by until timeout, to verify that it cannot collect even one record.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] LadyForest commented on a change in pull request #58: [FLINK-26669] Refactor ReadWriteTableITCase

Posted by GitBox <gi...@apache.org>.
LadyForest commented on a change in pull request #58:
URL: https://github.com/apache/flink-table-store/pull/58#discussion_r833463128



##########
File path: flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
##########
@@ -19,73 +19,242 @@
 package org.apache.flink.table.store.connector;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.table.api.TableResult;
-import org.apache.flink.table.planner.runtime.utils.TestData;
+import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
 import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.function.TriFunction;
 
+import org.apache.commons.lang3.tuple.Pair;
+import org.assertj.core.api.AbstractThrowableAssert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-
-import java.math.BigDecimal;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import scala.collection.JavaConverters;
-
 import static org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
 import static org.apache.flink.table.planner.factories.TestValuesTableFactory.registerData;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-/** IT cases for testing querying managed table dml. */
+/** IT cases for managed table dml. */
 @RunWith(Parameterized.class)
 public class ReadWriteTableITCase extends TableStoreTestBase {
 
-    private final boolean hasPk;
-    @Nullable private final Boolean duplicate;
+    private static final Logger LOG = LoggerFactory.getLogger(ReadWriteTableITCase.class);
+
+    private static final Map<Row, Pair<RowKind, Row>> PROCESSED_RECORDS = new LinkedHashMap<>();

Review comment:
       > Why use a static collection? It is very dangarous for thread safety and memory leak.
   
   Because the concurrent read/write case specs are not added yet. The current cases are restricted to sequential read/write. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] LadyForest removed a comment on pull request #58: [FLINK-26669] Refactor ReadWriteTableITCase

Posted by GitBox <gi...@apache.org>.
LadyForest removed a comment on pull request #58:
URL: https://github.com/apache/flink-table-store/pull/58#issuecomment-1076279637


   Currently, two failed cases need to be debugged.
   
   The first case
   ```sql
   --executionMode-STREAMING, tableName-table_6fe1e20e-b871-49e6-98c6-cdc91157cbe2, enableLogStore-false
   -- helper source ddl
   CREATE TABLE `source_table_a1575cf0-feb9-427c-8c4b-0138eed2318e` (
     currency STRING,
     rate BIGINT,
    dt STRING, 
    PRIMARY KEY (currency, dt) NOT ENFORCED
   ) PARTITIONED BY (dt)
     WITH (
     'connector' = 'values',
     'bounded' = 'false',
     'data-id' = '10',
     'changelog-mode' = 'I,UA,D',
     'disable-lookup' = 'true',
     'partition-list' = 'dt:2022-01-01;dt:2022-01-02'
   );
   
   -- managed table ddl
   CREATE TABLE IF NOT EXISTS `catalog`.`database`.`table_6fe1e20e-b871-49e6-98c6-cdc91157cbe2`
    LIKE `source_table_a1575cf0-feb9-427c-8c4b-0138eed2318e` (EXCLUDING OPTIONS)
   
   -- insert query
   INSERT INTO `table_6fe1e20e-b871-49e6-98c6-cdc91157cbe2`
    SELECT * FROM `source_table_a1575cf0-feb9-427c-8c4b-0138eed2318e`
   
   -- select query
   SELECT * FROM `table_6fe1e20e-b871-49e6-98c6-cdc91157cbe2
   ```
   
   The read task is blocked at stream collecting results when `iterator.hasNext` is called, and finally timeout.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #58: [FLINK-26669] Refactor ReadWriteTableITCase

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #58:
URL: https://github.com/apache/flink-table-store/pull/58#discussion_r833892291



##########
File path: flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
##########
@@ -19,73 +19,242 @@
 package org.apache.flink.table.store.connector;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.table.api.TableResult;
-import org.apache.flink.table.planner.runtime.utils.TestData;
+import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
 import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.function.TriFunction;
 
+import org.apache.commons.lang3.tuple.Pair;
+import org.assertj.core.api.AbstractThrowableAssert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-
-import java.math.BigDecimal;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import scala.collection.JavaConverters;
-
 import static org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
 import static org.apache.flink.table.planner.factories.TestValuesTableFactory.registerData;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-/** IT cases for testing querying managed table dml. */
+/** IT cases for managed table dml. */
 @RunWith(Parameterized.class)
 public class ReadWriteTableITCase extends TableStoreTestBase {
 
-    private final boolean hasPk;
-    @Nullable private final Boolean duplicate;
+    private static final Logger LOG = LoggerFactory.getLogger(ReadWriteTableITCase.class);
+
+    private static final Map<Row, Pair<RowKind, Row>> PROCESSED_RECORDS = new LinkedHashMap<>();
+
+    private static final TriFunction<Row, Boolean, Boolean, Pair<Row, Pair<RowKind, Row>>>
+            KEY_VALUE_ASSIGNER =
+                    (record, hasPk, partitioned) -> {
+                        boolean retract =
+                                record.getKind() == RowKind.DELETE
+                                        || record.getKind() == RowKind.UPDATE_BEFORE;
+                        Row key;
+                        Row value;
+                        RowKind rowKind = record.getKind();
+                        if (hasPk) {
+                            key =
+                                    partitioned
+                                            ? Row.of(record.getField(0), record.getField(2))
+                                            : Row.of(record.getField(0));
+                            value = record;
+                        } else {
+                            key = record;
+                            value = Row.of(retract ? -1 : 1);
+                        }
+                        key.setKind(RowKind.INSERT);
+                        value.setKind(RowKind.INSERT);
+                        return Pair.of(key, Pair.of(rowKind, value));
+                    };
+
+    private static final TriFunction<List<Row>, Boolean, List<Boolean>, List<Row>> COMBINER =
+            (records, insertOnly, schema) -> {
+                boolean hasPk = schema.get(0);
+                boolean partitioned = schema.get(1);
+                records.forEach(
+                        record -> {
+                            Pair<Row, Pair<RowKind, Row>> kvPair =
+                                    KEY_VALUE_ASSIGNER.apply(record, hasPk, partitioned);
+                            Row key = kvPair.getLeft();
+                            Pair<RowKind, Row> valuePair = kvPair.getRight();
+                            if (insertOnly || !PROCESSED_RECORDS.containsKey(key)) {
+                                update(hasPk, key, valuePair);
+                            } else {
+                                Pair<RowKind, Row> existingValuePair = PROCESSED_RECORDS.get(key);
+                                RowKind existingKind = existingValuePair.getLeft();
+                                Row existingValue = existingValuePair.getRight();
+                                RowKind newKind = valuePair.getLeft();
+                                Row newValue = valuePair.getRight();
+
+                                if (hasPk) {
+                                    if (existingKind == newKind && existingKind == RowKind.INSERT) {
+                                        throw new IllegalStateException(
+                                                "primary key "
+                                                        + key
+                                                        + " already exists for record: "
+                                                        + record);
+                                    } else if (existingKind == RowKind.INSERT
+                                            && newKind == RowKind.UPDATE_AFTER) {
+                                        PROCESSED_RECORDS.replace(key, valuePair);
+                                    } else if (newKind == RowKind.DELETE
+                                            || newKind == RowKind.UPDATE_BEFORE) {
+                                        if (existingValue.equals(newValue)) {
+                                            PROCESSED_RECORDS.remove(key);
+                                        } else {
+                                            throw new IllegalStateException(
+                                                    "Try to retract an non-existing record: "
+                                                            + record);
+                                        }
+                                    }
+                                } else {
+                                    update(false, key, valuePair);
+                                }
+                            }
+                        });
+                List<Row> results =
+                        PROCESSED_RECORDS.entrySet().stream()
+                                .flatMap(
+                                        entry -> {
+                                            if (hasPk) {
+                                                Row row = entry.getValue().getRight();
+                                                row.setKind(RowKind.INSERT);
+                                                return Stream.of(row);
+                                            }
+                                            Row row = entry.getKey();
+                                            row.setKind(RowKind.INSERT);
+                                            int count =
+                                                    (int) entry.getValue().getRight().getField(0);
+                                            List<Row> rows = new ArrayList<>();
+                                            while (count > 0) {
+                                                rows.add(row);
+                                                count--;
+                                            }
+                                            return rows.stream();
+                                        })
+                                .collect(Collectors.toList());
+                PROCESSED_RECORDS.clear();
+                return results;
+            };
+
+    private final String helperTableDdl;
+    private final String managedTableDdl;
+    private final String insertQuery;
+    private final String selectQuery;
+
+    private static int testId = 0;
+
+    private static void update(boolean hasPk, Row key, Pair<RowKind, Row> value) {
+        if (hasPk) {
+            PROCESSED_RECORDS.put(key, value);
+        } else {
+            PROCESSED_RECORDS.compute(
+                    key,
+                    (k, v) -> {
+                        if (v == null) {
+                            return value;
+                        } else {
+                            return Pair.of(
+                                    v.getLeft(),
+                                    Row.of(
+                                            (int) v.getRight().getField(0)
+                                                    + (int) value.getRight().getField(0)));
+                        }
+                    });
+            if ((int) PROCESSED_RECORDS.get(key).getRight().getField(0) == 0) {
+                PROCESSED_RECORDS.remove(key);
+            }
+        }
+    }
 
     public ReadWriteTableITCase(
             RuntimeExecutionMode executionMode,
             String tableName,
             boolean enableLogStore,
-            boolean hasPk,
-            @Nullable Boolean duplicate,
+            String helperTableDdl,
+            String managedTableDdl,
+            String insertQuery,
+            String selectQuery,
             ExpectedResult expectedResult) {
         super(executionMode, tableName, enableLogStore, expectedResult);
-        this.hasPk = hasPk;
-        this.duplicate = duplicate;
+        this.helperTableDdl = helperTableDdl;
+        this.managedTableDdl = managedTableDdl;
+        this.insertQuery = insertQuery;
+        this.selectQuery = selectQuery;
+    }
+
+    @Parameterized.Parameters(
+            name =
+                    "executionMode-{0}, tableName-{1}, "
+                            + "enableLogStore-{2}, helperTableDdl-{3}, managedTableDdl-{4}, "
+                            + "insertQuery-{5}, selectQuery-{6}, expectedResult-{7}")
+    public static List<Object[]> data() {
+        List<Object[]> specs = prepareReadWriteTestSpecs();
+        specs.addAll(prepareBatchWriteStreamingReadTestSpecs());
+        specs.addAll(prepareStreamingWriteBatchReadTestSpecs());
+        specs.addAll(prepareOverwriteTestSpecs());
+        return specs;
     }
 
     @Override
-    public void after() {
-        tEnv.executeSql("DROP TABLE `source_table`");
-        super.after();
+    protected void prepareEnv() {
+        tEnv.executeSql(helperTableDdl);
+        tEnv.executeSql(managedTableDdl);
     }
 
     @Test
-    public void testReadWriteNonPartitioned() throws Exception {
-        String statement =
-                String.format("INSERT INTO %s \nSELECT * FROM `source_table`", tableIdentifier);
+    public void testSequentialWriteRead() throws Exception {
+        logTestSpec(
+                executionMode,
+                tableIdentifier,
+                enableLogStore,
+                helperTableDdl,
+                managedTableDdl,
+                insertQuery,
+                selectQuery,
+                expectedResult);
         if (expectedResult.success) {
-            tEnv.executeSql(statement).await();
-            TableResult result =
-                    tEnv.executeSql(String.format("SELECT * FROM %s", tableIdentifier));
+            tEnv.executeSql(insertQuery).await();
+            TableResult result = tEnv.executeSql(selectQuery);
             List<Row> actual = new ArrayList<>();
+
             try (CloseableIterator<Row> iterator = result.collect()) {
-                while (iterator.hasNext()) {
-                    actual.add(iterator.next());
+                if (env.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
+                        == RuntimeExecutionMode.STREAMING) {
+                    ExecutorService executorService = Executors.newSingleThreadExecutor();

Review comment:
       Good idea, we do not want hang forever util github testing timeout.
   I create a PR to introduce the timeout helper: https://github.com/apache/flink-table-store/pull/60




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] LadyForest commented on a change in pull request #58: [FLINK-26669] Refactor ReadWriteTableITCase

Posted by GitBox <gi...@apache.org>.
LadyForest commented on a change in pull request #58:
URL: https://github.com/apache/flink-table-store/pull/58#discussion_r832996416



##########
File path: flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
##########
@@ -80,9 +80,12 @@
     /** partition keys, default no partition. */
     private int[] partitions = new int[0];
 
-    /** primary keys, default no key. */
+    /** file store primary keys which excludes partition fields if partitioned, default no key. */

Review comment:
       found typo, `exludes` => `exclude`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #58: [FLINK-26669] Refactor ReadWriteTableITCase

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #58:
URL: https://github.com/apache/flink-table-store/pull/58#discussion_r833143977



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecordConverter.java
##########
@@ -55,15 +66,28 @@ public SinkRecord convert(RowData row) {
         return new SinkRecord(partition, bucket, primaryKey, row);
     }
 
+    public SinkRecord convertToLogPk(RowData row, SinkRecord record) {
+        BinaryRowData logPrimaryKey = logPrimaryKey(row);
+        return new SinkRecord(record.partition(), record.bucket(), logPrimaryKey, row);
+    }
+
     public BinaryRowData primaryKey(RowData row) {
         return pkProjection.apply(row);
     }
 
+    public BinaryRowData logPrimaryKey(RowData row) {

Review comment:
       private? It seems we dont need this method

##########
File path: flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
##########
@@ -19,73 +19,242 @@
 package org.apache.flink.table.store.connector;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.table.api.TableResult;
-import org.apache.flink.table.planner.runtime.utils.TestData;
+import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
 import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.function.TriFunction;
 
+import org.apache.commons.lang3.tuple.Pair;
+import org.assertj.core.api.AbstractThrowableAssert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-
-import java.math.BigDecimal;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import scala.collection.JavaConverters;
-
 import static org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
 import static org.apache.flink.table.planner.factories.TestValuesTableFactory.registerData;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-/** IT cases for testing querying managed table dml. */
+/** IT cases for managed table dml. */
 @RunWith(Parameterized.class)
 public class ReadWriteTableITCase extends TableStoreTestBase {
 
-    private final boolean hasPk;
-    @Nullable private final Boolean duplicate;
+    private static final Logger LOG = LoggerFactory.getLogger(ReadWriteTableITCase.class);
+
+    private static final Map<Row, Pair<RowKind, Row>> PROCESSED_RECORDS = new LinkedHashMap<>();

Review comment:
       Why use a static collection?
   It is very dangarous for thread safety and memory leak.

##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecordConverter.java
##########
@@ -55,15 +66,28 @@ public SinkRecord convert(RowData row) {
         return new SinkRecord(partition, bucket, primaryKey, row);
     }
 
+    public SinkRecord convertToLogPk(RowData row, SinkRecord record) {
+        BinaryRowData logPrimaryKey = logPrimaryKey(row);
+        return new SinkRecord(record.partition(), record.bucket(), logPrimaryKey, row);
+    }
+
     public BinaryRowData primaryKey(RowData row) {
         return pkProjection.apply(row);
     }
 
+    public BinaryRowData logPrimaryKey(RowData row) {

Review comment:
       private?

##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecordConverter.java
##########
@@ -55,15 +66,28 @@ public SinkRecord convert(RowData row) {
         return new SinkRecord(partition, bucket, primaryKey, row);
     }
 
+    public SinkRecord convertToLogPk(RowData row, SinkRecord record) {

Review comment:
       `toLogSinkRecord(SinkRecord record)`.
   row can be found in record.

##########
File path: flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
##########
@@ -19,73 +19,242 @@
 package org.apache.flink.table.store.connector;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.table.api.TableResult;
-import org.apache.flink.table.planner.runtime.utils.TestData;
+import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
 import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.function.TriFunction;
 
+import org.apache.commons.lang3.tuple.Pair;
+import org.assertj.core.api.AbstractThrowableAssert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-
-import java.math.BigDecimal;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import scala.collection.JavaConverters;
-
 import static org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
 import static org.apache.flink.table.planner.factories.TestValuesTableFactory.registerData;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-/** IT cases for testing querying managed table dml. */
+/** IT cases for managed table dml. */
 @RunWith(Parameterized.class)
 public class ReadWriteTableITCase extends TableStoreTestBase {
 
-    private final boolean hasPk;
-    @Nullable private final Boolean duplicate;
+    private static final Logger LOG = LoggerFactory.getLogger(ReadWriteTableITCase.class);
+
+    private static final Map<Row, Pair<RowKind, Row>> PROCESSED_RECORDS = new LinkedHashMap<>();
+
+    private static final TriFunction<Row, Boolean, Boolean, Pair<Row, Pair<RowKind, Row>>>
+            KEY_VALUE_ASSIGNER =
+                    (record, hasPk, partitioned) -> {
+                        boolean retract =
+                                record.getKind() == RowKind.DELETE
+                                        || record.getKind() == RowKind.UPDATE_BEFORE;
+                        Row key;
+                        Row value;
+                        RowKind rowKind = record.getKind();
+                        if (hasPk) {
+                            key =
+                                    partitioned
+                                            ? Row.of(record.getField(0), record.getField(2))
+                                            : Row.of(record.getField(0));
+                            value = record;
+                        } else {
+                            key = record;
+                            value = Row.of(retract ? -1 : 1);
+                        }
+                        key.setKind(RowKind.INSERT);
+                        value.setKind(RowKind.INSERT);
+                        return Pair.of(key, Pair.of(rowKind, value));
+                    };
+
+    private static final TriFunction<List<Row>, Boolean, List<Boolean>, List<Row>> COMBINER =
+            (records, insertOnly, schema) -> {
+                boolean hasPk = schema.get(0);
+                boolean partitioned = schema.get(1);
+                records.forEach(
+                        record -> {
+                            Pair<Row, Pair<RowKind, Row>> kvPair =
+                                    KEY_VALUE_ASSIGNER.apply(record, hasPk, partitioned);
+                            Row key = kvPair.getLeft();
+                            Pair<RowKind, Row> valuePair = kvPair.getRight();
+                            if (insertOnly || !PROCESSED_RECORDS.containsKey(key)) {
+                                update(hasPk, key, valuePair);
+                            } else {
+                                Pair<RowKind, Row> existingValuePair = PROCESSED_RECORDS.get(key);
+                                RowKind existingKind = existingValuePair.getLeft();
+                                Row existingValue = existingValuePair.getRight();
+                                RowKind newKind = valuePair.getLeft();
+                                Row newValue = valuePair.getRight();
+
+                                if (hasPk) {
+                                    if (existingKind == newKind && existingKind == RowKind.INSERT) {
+                                        throw new IllegalStateException(
+                                                "primary key "
+                                                        + key
+                                                        + " already exists for record: "
+                                                        + record);
+                                    } else if (existingKind == RowKind.INSERT
+                                            && newKind == RowKind.UPDATE_AFTER) {
+                                        PROCESSED_RECORDS.replace(key, valuePair);
+                                    } else if (newKind == RowKind.DELETE
+                                            || newKind == RowKind.UPDATE_BEFORE) {
+                                        if (existingValue.equals(newValue)) {
+                                            PROCESSED_RECORDS.remove(key);
+                                        } else {
+                                            throw new IllegalStateException(
+                                                    "Try to retract an non-existing record: "
+                                                            + record);
+                                        }
+                                    }
+                                } else {
+                                    update(false, key, valuePair);
+                                }
+                            }
+                        });
+                List<Row> results =
+                        PROCESSED_RECORDS.entrySet().stream()
+                                .flatMap(
+                                        entry -> {
+                                            if (hasPk) {
+                                                Row row = entry.getValue().getRight();
+                                                row.setKind(RowKind.INSERT);
+                                                return Stream.of(row);
+                                            }
+                                            Row row = entry.getKey();
+                                            row.setKind(RowKind.INSERT);
+                                            int count =
+                                                    (int) entry.getValue().getRight().getField(0);
+                                            List<Row> rows = new ArrayList<>();
+                                            while (count > 0) {
+                                                rows.add(row);
+                                                count--;
+                                            }
+                                            return rows.stream();
+                                        })
+                                .collect(Collectors.toList());
+                PROCESSED_RECORDS.clear();
+                return results;
+            };
+
+    private final String helperTableDdl;
+    private final String managedTableDdl;
+    private final String insertQuery;
+    private final String selectQuery;
+
+    private static int testId = 0;
+
+    private static void update(boolean hasPk, Row key, Pair<RowKind, Row> value) {
+        if (hasPk) {
+            PROCESSED_RECORDS.put(key, value);
+        } else {
+            PROCESSED_RECORDS.compute(
+                    key,
+                    (k, v) -> {
+                        if (v == null) {
+                            return value;
+                        } else {
+                            return Pair.of(
+                                    v.getLeft(),
+                                    Row.of(
+                                            (int) v.getRight().getField(0)
+                                                    + (int) value.getRight().getField(0)));
+                        }
+                    });
+            if ((int) PROCESSED_RECORDS.get(key).getRight().getField(0) == 0) {
+                PROCESSED_RECORDS.remove(key);
+            }
+        }
+    }
 
     public ReadWriteTableITCase(
             RuntimeExecutionMode executionMode,
             String tableName,
             boolean enableLogStore,
-            boolean hasPk,
-            @Nullable Boolean duplicate,
+            String helperTableDdl,
+            String managedTableDdl,
+            String insertQuery,
+            String selectQuery,
             ExpectedResult expectedResult) {
         super(executionMode, tableName, enableLogStore, expectedResult);
-        this.hasPk = hasPk;
-        this.duplicate = duplicate;
+        this.helperTableDdl = helperTableDdl;
+        this.managedTableDdl = managedTableDdl;
+        this.insertQuery = insertQuery;
+        this.selectQuery = selectQuery;
+    }
+
+    @Parameterized.Parameters(
+            name =
+                    "executionMode-{0}, tableName-{1}, "
+                            + "enableLogStore-{2}, helperTableDdl-{3}, managedTableDdl-{4}, "
+                            + "insertQuery-{5}, selectQuery-{6}, expectedResult-{7}")
+    public static List<Object[]> data() {
+        List<Object[]> specs = prepareReadWriteTestSpecs();
+        specs.addAll(prepareBatchWriteStreamingReadTestSpecs());
+        specs.addAll(prepareStreamingWriteBatchReadTestSpecs());
+        specs.addAll(prepareOverwriteTestSpecs());
+        return specs;
     }
 
     @Override
-    public void after() {
-        tEnv.executeSql("DROP TABLE `source_table`");
-        super.after();
+    protected void prepareEnv() {
+        tEnv.executeSql(helperTableDdl);
+        tEnv.executeSql(managedTableDdl);
     }
 
     @Test
-    public void testReadWriteNonPartitioned() throws Exception {
-        String statement =
-                String.format("INSERT INTO %s \nSELECT * FROM `source_table`", tableIdentifier);
+    public void testSequentialWriteRead() throws Exception {
+        logTestSpec(
+                executionMode,
+                tableIdentifier,
+                enableLogStore,
+                helperTableDdl,
+                managedTableDdl,
+                insertQuery,
+                selectQuery,
+                expectedResult);
         if (expectedResult.success) {
-            tEnv.executeSql(statement).await();
-            TableResult result =
-                    tEnv.executeSql(String.format("SELECT * FROM %s", tableIdentifier));
+            tEnv.executeSql(insertQuery).await();
+            TableResult result = tEnv.executeSql(selectQuery);
             List<Row> actual = new ArrayList<>();
+
             try (CloseableIterator<Row> iterator = result.collect()) {
-                while (iterator.hasNext()) {
-                    actual.add(iterator.next());
+                if (env.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
+                        == RuntimeExecutionMode.STREAMING) {
+                    ExecutorService executorService = Executors.newSingleThreadExecutor();

Review comment:
       Why use a `ExecutorService`?

##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecordConverter.java
##########
@@ -55,15 +66,28 @@ public SinkRecord convert(RowData row) {
         return new SinkRecord(partition, bucket, primaryKey, row);
     }
 
+    public SinkRecord convertToLogPk(RowData row, SinkRecord record) {
+        BinaryRowData logPrimaryKey = logPrimaryKey(row);

Review comment:
       if (logPkProjection == null) {
       return record;
   }

##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecordConverter.java
##########
@@ -38,14 +39,24 @@
 
     private final Projection<RowData, BinaryRowData> pkProjection;
 
+    private final Projection<RowData, BinaryRowData> logPkProjection;
+
     public SinkRecordConverter(
-            int numBucket, RowType inputType, int[] partitions, int[] primaryKeys) {
+            int numBucket,
+            RowType inputType,
+            int[] partitions,
+            int[] primaryKeys,
+            final int[] logPrimaryKeys) {
         this.numBucket = numBucket;
         this.allProjection =
                 CodeGenUtils.newProjection(
                         inputType, IntStream.range(0, inputType.getFieldCount()).toArray());
         this.partProjection = CodeGenUtils.newProjection(inputType, partitions);
         this.pkProjection = CodeGenUtils.newProjection(inputType, primaryKeys);
+        this.logPkProjection =

Review comment:
       can this just be null when logPk = pk, if there is mis invoke, this can throws a NPE




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #58: [FLINK-26669] Refactor ReadWriteTableITCase

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #58:
URL: https://github.com/apache/flink-table-store/pull/58#discussion_r834005664



##########
File path: flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
##########
@@ -19,73 +19,242 @@
 package org.apache.flink.table.store.connector;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.table.api.TableResult;
-import org.apache.flink.table.planner.runtime.utils.TestData;
+import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
 import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.function.TriFunction;
 
+import org.apache.commons.lang3.tuple.Pair;
+import org.assertj.core.api.AbstractThrowableAssert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-
-import java.math.BigDecimal;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import scala.collection.JavaConverters;
-
 import static org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
 import static org.apache.flink.table.planner.factories.TestValuesTableFactory.registerData;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-/** IT cases for testing querying managed table dml. */
+/** IT cases for managed table dml. */
 @RunWith(Parameterized.class)
 public class ReadWriteTableITCase extends TableStoreTestBase {
 
-    private final boolean hasPk;
-    @Nullable private final Boolean duplicate;
+    private static final Logger LOG = LoggerFactory.getLogger(ReadWriteTableITCase.class);
+
+    private static final Map<Row, Pair<RowKind, Row>> PROCESSED_RECORDS = new LinkedHashMap<>();

Review comment:
       This is a bad habit, even now without concurrent testing. It makes it difficult to understand the interactions between classes and methods.
   Here you can create a new Map, and pass the Map as a parameter to the `update` method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #58: [FLINK-26669] Refactor ReadWriteTableITCase

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #58:
URL: https://github.com/apache/flink-table-store/pull/58#discussion_r834909405



##########
File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecordConverter.java
##########
@@ -55,15 +68,31 @@ public SinkRecord convert(RowData row) {
         return new SinkRecord(partition, bucket, primaryKey, row);
     }
 
+    public SinkRecord convertToLogSinkRecord(SinkRecord record) {
+        if (logPkProjection == null) {
+            return record;
+        }
+        BinaryRowData logPrimaryKey = logPrimaryKey(record.row());
+        return new SinkRecord(record.partition(), record.bucket(), logPrimaryKey, record.row());
+    }
+
     public BinaryRowData primaryKey(RowData row) {
         return pkProjection.apply(row);
     }
 
+    private BinaryRowData logPrimaryKey(RowData row) {
+        return logPkProjection.apply(row);
+    }
+
     public int bucket(RowData row, BinaryRowData primaryKey) {
         int hash = primaryKey.getArity() == 0 ? hashRow(row) : primaryKey.hashCode();
         return Math.abs(hash % numBucket);
     }
 
+    public boolean reuseRecord() {

Review comment:
       Remove this method? `convertToLogSinkRecord` is enough:
   ```
   if (logPkProjection == null) {
               return record;
           }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] LadyForest commented on a change in pull request #58: [FLINK-26669] Refactor ReadWriteTableITCase

Posted by GitBox <gi...@apache.org>.
LadyForest commented on a change in pull request #58:
URL: https://github.com/apache/flink-table-store/pull/58#discussion_r834976434



##########
File path: flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/BlockingIterator.java
##########
@@ -65,6 +65,15 @@ public BlockingIterator(Iterator<IN> iterator, Function<IN, OUT> converter) {
         }
     }
 
+    public List<OUT> collectAndCloseQuietly(int limit) {

Review comment:
       It'll be more dedicated and straightforward  if moving this method to `ReadWriteTableITCase`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on pull request #58: [FLINK-26669] Refactor ReadWriteTableITCase

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on pull request #58:
URL: https://github.com/apache/flink-table-store/pull/58#issuecomment-1076342347


   - How to debug test in IDE?  `No tests found matching testSequentialWriteRead with parameter`....
   - parameter name is so so so long ... It is difficult to get effective information from it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] LadyForest commented on a change in pull request #58: [FLINK-26669] Refactor ReadWriteTableITCase

Posted by GitBox <gi...@apache.org>.
LadyForest commented on a change in pull request #58:
URL: https://github.com/apache/flink-table-store/pull/58#discussion_r833410546



##########
File path: flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
##########
@@ -19,73 +19,242 @@
 package org.apache.flink.table.store.connector;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.table.api.TableResult;
-import org.apache.flink.table.planner.runtime.utils.TestData;
+import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
 import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.function.TriFunction;
 
+import org.apache.commons.lang3.tuple.Pair;
+import org.assertj.core.api.AbstractThrowableAssert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-
-import java.math.BigDecimal;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import scala.collection.JavaConverters;
-
 import static org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
 import static org.apache.flink.table.planner.factories.TestValuesTableFactory.registerData;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-/** IT cases for testing querying managed table dml. */
+/** IT cases for managed table dml. */
 @RunWith(Parameterized.class)
 public class ReadWriteTableITCase extends TableStoreTestBase {
 
-    private final boolean hasPk;
-    @Nullable private final Boolean duplicate;
+    private static final Logger LOG = LoggerFactory.getLogger(ReadWriteTableITCase.class);
+
+    private static final Map<Row, Pair<RowKind, Row>> PROCESSED_RECORDS = new LinkedHashMap<>();
+
+    private static final TriFunction<Row, Boolean, Boolean, Pair<Row, Pair<RowKind, Row>>>
+            KEY_VALUE_ASSIGNER =
+                    (record, hasPk, partitioned) -> {
+                        boolean retract =
+                                record.getKind() == RowKind.DELETE
+                                        || record.getKind() == RowKind.UPDATE_BEFORE;
+                        Row key;
+                        Row value;
+                        RowKind rowKind = record.getKind();
+                        if (hasPk) {
+                            key =
+                                    partitioned
+                                            ? Row.of(record.getField(0), record.getField(2))
+                                            : Row.of(record.getField(0));
+                            value = record;
+                        } else {
+                            key = record;
+                            value = Row.of(retract ? -1 : 1);
+                        }
+                        key.setKind(RowKind.INSERT);
+                        value.setKind(RowKind.INSERT);
+                        return Pair.of(key, Pair.of(rowKind, value));
+                    };
+
+    private static final TriFunction<List<Row>, Boolean, List<Boolean>, List<Row>> COMBINER =
+            (records, insertOnly, schema) -> {
+                boolean hasPk = schema.get(0);
+                boolean partitioned = schema.get(1);
+                records.forEach(
+                        record -> {
+                            Pair<Row, Pair<RowKind, Row>> kvPair =
+                                    KEY_VALUE_ASSIGNER.apply(record, hasPk, partitioned);
+                            Row key = kvPair.getLeft();
+                            Pair<RowKind, Row> valuePair = kvPair.getRight();
+                            if (insertOnly || !PROCESSED_RECORDS.containsKey(key)) {
+                                update(hasPk, key, valuePair);
+                            } else {
+                                Pair<RowKind, Row> existingValuePair = PROCESSED_RECORDS.get(key);
+                                RowKind existingKind = existingValuePair.getLeft();
+                                Row existingValue = existingValuePair.getRight();
+                                RowKind newKind = valuePair.getLeft();
+                                Row newValue = valuePair.getRight();
+
+                                if (hasPk) {
+                                    if (existingKind == newKind && existingKind == RowKind.INSERT) {
+                                        throw new IllegalStateException(
+                                                "primary key "
+                                                        + key
+                                                        + " already exists for record: "
+                                                        + record);
+                                    } else if (existingKind == RowKind.INSERT
+                                            && newKind == RowKind.UPDATE_AFTER) {
+                                        PROCESSED_RECORDS.replace(key, valuePair);
+                                    } else if (newKind == RowKind.DELETE
+                                            || newKind == RowKind.UPDATE_BEFORE) {
+                                        if (existingValue.equals(newValue)) {
+                                            PROCESSED_RECORDS.remove(key);
+                                        } else {
+                                            throw new IllegalStateException(
+                                                    "Try to retract an non-existing record: "
+                                                            + record);
+                                        }
+                                    }
+                                } else {
+                                    update(false, key, valuePair);
+                                }
+                            }
+                        });
+                List<Row> results =
+                        PROCESSED_RECORDS.entrySet().stream()
+                                .flatMap(
+                                        entry -> {
+                                            if (hasPk) {
+                                                Row row = entry.getValue().getRight();
+                                                row.setKind(RowKind.INSERT);
+                                                return Stream.of(row);
+                                            }
+                                            Row row = entry.getKey();
+                                            row.setKind(RowKind.INSERT);
+                                            int count =
+                                                    (int) entry.getValue().getRight().getField(0);
+                                            List<Row> rows = new ArrayList<>();
+                                            while (count > 0) {
+                                                rows.add(row);
+                                                count--;
+                                            }
+                                            return rows.stream();
+                                        })
+                                .collect(Collectors.toList());
+                PROCESSED_RECORDS.clear();
+                return results;
+            };
+
+    private final String helperTableDdl;
+    private final String managedTableDdl;
+    private final String insertQuery;
+    private final String selectQuery;
+
+    private static int testId = 0;
+
+    private static void update(boolean hasPk, Row key, Pair<RowKind, Row> value) {
+        if (hasPk) {
+            PROCESSED_RECORDS.put(key, value);
+        } else {
+            PROCESSED_RECORDS.compute(
+                    key,
+                    (k, v) -> {
+                        if (v == null) {
+                            return value;
+                        } else {
+                            return Pair.of(
+                                    v.getLeft(),
+                                    Row.of(
+                                            (int) v.getRight().getField(0)
+                                                    + (int) value.getRight().getField(0)));
+                        }
+                    });
+            if ((int) PROCESSED_RECORDS.get(key).getRight().getField(0) == 0) {
+                PROCESSED_RECORDS.remove(key);
+            }
+        }
+    }
 
     public ReadWriteTableITCase(
             RuntimeExecutionMode executionMode,
             String tableName,
             boolean enableLogStore,
-            boolean hasPk,
-            @Nullable Boolean duplicate,
+            String helperTableDdl,
+            String managedTableDdl,
+            String insertQuery,
+            String selectQuery,
             ExpectedResult expectedResult) {
         super(executionMode, tableName, enableLogStore, expectedResult);
-        this.hasPk = hasPk;
-        this.duplicate = duplicate;
+        this.helperTableDdl = helperTableDdl;
+        this.managedTableDdl = managedTableDdl;
+        this.insertQuery = insertQuery;
+        this.selectQuery = selectQuery;
+    }
+
+    @Parameterized.Parameters(
+            name =
+                    "executionMode-{0}, tableName-{1}, "
+                            + "enableLogStore-{2}, helperTableDdl-{3}, managedTableDdl-{4}, "
+                            + "insertQuery-{5}, selectQuery-{6}, expectedResult-{7}")
+    public static List<Object[]> data() {
+        List<Object[]> specs = prepareReadWriteTestSpecs();
+        specs.addAll(prepareBatchWriteStreamingReadTestSpecs());
+        specs.addAll(prepareStreamingWriteBatchReadTestSpecs());
+        specs.addAll(prepareOverwriteTestSpecs());
+        return specs;
     }
 
     @Override
-    public void after() {
-        tEnv.executeSql("DROP TABLE `source_table`");
-        super.after();
+    protected void prepareEnv() {
+        tEnv.executeSql(helperTableDdl);
+        tEnv.executeSql(managedTableDdl);
     }
 
     @Test
-    public void testReadWriteNonPartitioned() throws Exception {
-        String statement =
-                String.format("INSERT INTO %s \nSELECT * FROM `source_table`", tableIdentifier);
+    public void testSequentialWriteRead() throws Exception {
+        logTestSpec(
+                executionMode,
+                tableIdentifier,
+                enableLogStore,
+                helperTableDdl,
+                managedTableDdl,
+                insertQuery,
+                selectQuery,
+                expectedResult);
         if (expectedResult.success) {
-            tEnv.executeSql(statement).await();
-            TableResult result =
-                    tEnv.executeSql(String.format("SELECT * FROM %s", tableIdentifier));
+            tEnv.executeSql(insertQuery).await();
+            TableResult result = tEnv.executeSql(selectQuery);
             List<Row> actual = new ArrayList<>();
+
             try (CloseableIterator<Row> iterator = result.collect()) {
-                while (iterator.hasNext()) {
-                    actual.add(iterator.next());
+                if (env.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
+                        == RuntimeExecutionMode.STREAMING) {
+                    ExecutorService executorService = Executors.newSingleThreadExecutor();

Review comment:
       > Why use a `ExecutorService`?
   
   Because in streaming mode, read may get blocked if the expected records num not equal with actual




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #58: [FLINK-26669] Refactor ReadWriteTableITCase

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #58:
URL: https://github.com/apache/flink-table-store/pull/58#discussion_r834909824



##########
File path: flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/BlockingIterator.java
##########
@@ -65,6 +65,15 @@ public BlockingIterator(Iterator<IN> iterator, Function<IN, OUT> converter) {
         }
     }
 
+    public List<OUT> collectAndCloseQuietly(int limit) {

Review comment:
       Why do you want to be quiet? Just throw the exception?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org