You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/06 07:24:25 UTC

[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #73: [FLINK-26856] Add ITCase for compound primary key and multi-partition

JingsongLi commented on code in PR #73:
URL: https://github.com/apache/flink-table-store/pull/73#discussion_r843511600


##########
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableTestUtil.java:
##########
@@ -0,0 +1,739 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.table.store.file.utils.BlockingIterator;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.types.Row;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
+import static org.apache.flink.table.planner.factories.TestValuesTableFactory.registerData;
+import static org.apache.flink.table.store.connector.TableStoreTestBase.createResolvedTable;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Util for {@link ReadWriteTableITCase}. */
+public class ReadWriteTableTestUtil {
+
+    static void assertNoMoreRecords(BlockingIterator<Row, Row> iterator) {
+        List<Row> expectedRecords = Collections.emptyList();
+        try {
+            // set expectation size to 1 to let time pass by until timeout
+            // just wait 5s to avoid too long time
+            expectedRecords = iterator.collect(1, 5L, TimeUnit.SECONDS);
+            iterator.close();
+        } catch (Exception ignored) {
+            // don't throw exception
+        }
+        assertThat(expectedRecords).isEmpty();
+    }
+
+    static String prepareManagedTableDdl(
+            String sourceTableName, String managedTableName, Map<String, String> tableOptions) {
+        StringBuilder ddl =
+                new StringBuilder("CREATE TABLE IF NOT EXISTS ")
+                        .append(String.format("`%s`", managedTableName));
+        ddl.append(prepareOptions(tableOptions))
+                .append(String.format(" LIKE `%s` (EXCLUDING OPTIONS)\n", sourceTableName));
+        return ddl.toString();
+    }
+
+    static String prepareOptions(Map<String, String> tableOptions) {

Review Comment:
   optionsToSQL?



##########
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableTestUtil.java:
##########
@@ -0,0 +1,739 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.table.store.file.utils.BlockingIterator;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.types.Row;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
+import static org.apache.flink.table.planner.factories.TestValuesTableFactory.registerData;
+import static org.apache.flink.table.store.connector.TableStoreTestBase.createResolvedTable;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Util for {@link ReadWriteTableITCase}. */
+public class ReadWriteTableTestUtil {
+
+    static void assertNoMoreRecords(BlockingIterator<Row, Row> iterator) {
+        List<Row> expectedRecords = Collections.emptyList();
+        try {
+            // set expectation size to 1 to let time pass by until timeout
+            // just wait 5s to avoid too long time
+            expectedRecords = iterator.collect(1, 5L, TimeUnit.SECONDS);
+            iterator.close();
+        } catch (Exception ignored) {

Review Comment:
   should only catch `TimeoutException`?



##########
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableTestUtil.java:
##########
@@ -0,0 +1,739 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.table.store.file.utils.BlockingIterator;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.types.Row;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
+import static org.apache.flink.table.planner.factories.TestValuesTableFactory.registerData;
+import static org.apache.flink.table.store.connector.TableStoreTestBase.createResolvedTable;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Util for {@link ReadWriteTableITCase}. */
+public class ReadWriteTableTestUtil {
+
+    static void assertNoMoreRecords(BlockingIterator<Row, Row> iterator) {
+        List<Row> expectedRecords = Collections.emptyList();
+        try {
+            // set expectation size to 1 to let time pass by until timeout
+            // just wait 5s to avoid too long time
+            expectedRecords = iterator.collect(1, 5L, TimeUnit.SECONDS);
+            iterator.close();
+        } catch (Exception ignored) {
+            // don't throw exception
+        }
+        assertThat(expectedRecords).isEmpty();
+    }
+
+    static String prepareManagedTableDdl(

Review Comment:
   createTableLikeDDL?



##########
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableTestUtil.java:
##########
@@ -0,0 +1,739 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.table.store.file.utils.BlockingIterator;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.types.Row;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
+import static org.apache.flink.table.planner.factories.TestValuesTableFactory.registerData;
+import static org.apache.flink.table.store.connector.TableStoreTestBase.createResolvedTable;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Util for {@link ReadWriteTableITCase}. */
+public class ReadWriteTableTestUtil {
+
+    static void assertNoMoreRecords(BlockingIterator<Row, Row> iterator) {

Review Comment:
   we don't need restrict our test util methods.
   just public is ok.
   same to others. (some should be private)



##########
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java:
##########
@@ -388,220 +437,1131 @@ public void testBatchWriteNonPartitionedRecordsWithoutPk() throws Exception {
     }
 
     @Test
-    public void testEnableLogAndStreamingReadWritePartitionedRecordsWithPk() throws Exception {
-        // input is dailyRatesChangelogWithoutUB()
-        // test hybrid read
-        Tuple2<String, BlockingIterator<Row, Row>> tuple =
-                collectAndCheckStreamingReadWriteWithoutClose(
-                        Collections.emptyMap(),
-                        "dt >= '2022-01-01' AND dt <= '2022-01-03' OR currency = 'HK Dollar'",
+    public void testBatchWriteWithMultiPartitionedRecordsWithMultiPk() throws Exception {
+        // input is hourlyExchangeRates()
+        List<Row> expectedRecords =
+                Arrays.asList(
+                        // to_currency is USD, dt = 2022-01-01, hh = 11
+                        changelogRow("+I", "Euro", "US Dollar", 1.11d, "2022-01-01", "11"),
+                        changelogRow("+I", "HK Dollar", "US Dollar", 0.13d, "2022-01-01", "11"),
+                        changelogRow(
+                                "+I", "Singapore Dollar", "US Dollar", 0.74d, "2022-01-01", "11"),
+                        changelogRow("+I", "Yen", "US Dollar", 0.0081d, "2022-01-01", "11"),
+                        changelogRow("+I", "US Dollar", "US Dollar", 1.0d, "2022-01-01", "11"),
+                        // to_currency is USD, dt = 2022-01-01, hh = 12
+                        changelogRow("+I", "US Dollar", "US Dollar", 1.0d, "2022-01-01", "12"),
+                        changelogRow("+I", "Euro", "US Dollar", 1.12d, "2022-01-01", "12"),
+                        changelogRow("+I", "HK Dollar", "US Dollar", 0.129d, "2022-01-01", "12"),
+                        changelogRow(
+                                "+I", "Singapore Dollar", "US Dollar", 0.741d, "2022-01-01", "12"),
+                        changelogRow("+I", "Yen", "US Dollar", 0.00812d, "2022-01-01", "12"),
+                        // to_currency is Euro, dt = 2022-01-02, hh = 23
+                        changelogRow("+I", "US Dollar", "Euro", 0.918d, "2022-01-02", "23"),
+                        changelogRow("+I", "Singapore Dollar", "Euro", 0.67d, "2022-01-02", "23"));
+        // test batch read
+        String managedTable =
+                collectAndCheckBatchReadWrite(
+                        Arrays.asList("dt", "hh"),
+                        Arrays.asList("from_currency", "to_currency", "dt", "hh"),
+                        null,
                         Collections.emptyList(),
-                        Arrays.asList(
-                                // part = 2022-01-01
-                                changelogRow("+I", "US Dollar", 102L, "2022-01-01"),
-                                // part = 2022-01-02
-                                changelogRow("+I", "Euro", 119L, "2022-01-02")));
-        String managedTable = tuple.f0;
-        checkFileStorePath(tEnv, managedTable);
-        BlockingIterator<Row, Row> streamIter = tuple.f1;
-
-        // test log store in hybrid mode accepts all filters
-        tEnv.executeSql(
-                        String.format(
-                                "INSERT INTO `%s` PARTITION (dt = '2022-01-03')\n"
-                                        + "VALUES('HK Dollar', 100), ('Yen', 20)\n",
-                                managedTable))
-                .await();
+                        expectedRecords);
 
-        tEnv.executeSql(
-                        String.format(
-                                "INSERT INTO `%s` PARTITION (dt = '2022-01-04')\n"
-                                        + "VALUES('Yen', 20)\n",
-                                managedTable))
-                .await();
+        checkFileStorePath(tEnv, managedTable, hourlyExchangeRates().f1);
 
-        assertThat(streamIter.collect(2, 10, TimeUnit.SECONDS))
-                .containsExactlyInAnyOrderElementsOf(
-                        Arrays.asList(
-                                changelogRow("+I", "HK Dollar", 100L, "2022-01-03"),
-                                changelogRow("+I", "Yen", 20L, "2022-01-03")));
+        // test streaming read
+        final StreamTableEnvironment streamTableEnv =
+                StreamTableEnvironment.create(buildStreamEnv());
+        registerTable(streamTableEnv, managedTable);
+        BlockingIterator<Row, Row> streamIter =
+                collectAndCheck(
+                        streamTableEnv,
+                        managedTable,
+                        Collections.emptyMap(),
+                        null,
+                        expectedRecords);
 
-        // overwrite partition 2022-01-02
+        // overwrite static partition dt = 2022-01-02 and hh = 23
+        Map<String, String> overwritePartition = new LinkedHashMap<>();
+        overwritePartition.put("dt", "'2022-01-02'");
+        overwritePartition.put("hh", "'23'");
         prepareEnvAndOverwrite(
                 managedTable,
-                Collections.singletonMap("dt", "'2022-01-02'"),
-                Arrays.asList(new String[] {"'Euro'", "100"}, new String[] {"'Yen'", "1"}));
-
-        // batch read to check data refresh
-        final StreamTableEnvironment batchTableEnv =
-                StreamTableEnvironment.create(buildBatchEnv(), EnvironmentSettings.inBatchMode());
-        registerTable(batchTableEnv, managedTable);
-        collectAndCheck(
-                batchTableEnv,
-                managedTable,
-                Collections.emptyMap(),
-                Arrays.asList(
-                        // part = 2022-01-01
-                        changelogRow("+I", "US Dollar", 102L, "2022-01-01"),
-                        // part = 2022-01-02
-                        changelogRow("+I", "Euro", 100L, "2022-01-02"),
-                        changelogRow("+I", "Yen", 1L, "2022-01-02"),
-                        // part = 2022-01-03
-                        changelogRow("+I", "HK Dollar", 100L, "2022-01-03"),
-                        changelogRow("+I", "Yen", 20L, "2022-01-03"),
-                        // part = 2022-01-04
-                        changelogRow("+I", "Yen", 20L, "2022-01-04")));
+                overwritePartition,
+                Collections.singletonList(new String[] {"'US Dollar'", "'Thai Baht'", "33.51"}));
 
-        // check no changelog generated for streaming read
+        // streaming iter will not receive any changelog
         assertNoMoreRecords(streamIter);
 
-        // filter on partition
-        collectAndCheckStreamingReadWriteWithClose(
-                true,
-                true,
-                true,
-                Collections.emptyMap(),
+        // batch read to check partition refresh
+        collectAndCheck(
+                        tEnv,
+                        managedTable,
+                        Collections.emptyMap(),
+                        "dt = '2022-01-02' AND hh = '23'",
+                        Collections.singletonList(
+                                changelogRow(
+                                        "+I",
+                                        "US Dollar",
+                                        "Thai Baht",
+                                        33.51d,
+                                        "2022-01-02",
+                                        "23")))
+                .close();
+
+        // test partition filter
+        collectAndCheckBatchReadWrite(
+                Arrays.asList("dt", "hh"), // partition
+                Arrays.asList("from_currency", "to_currency", "dt", "hh"), // pk
                 "dt = '2022-01-01'",
                 Collections.emptyList(),
-                Collections.singletonList(changelogRow("+I", "US Dollar", 102L, "2022-01-01")));
+                Arrays.asList(
+                        // to_currency is USD, dt = 2022-01-01, hh = 11
+                        changelogRow("+I", "Euro", "US Dollar", 1.11d, "2022-01-01", "11"),
+                        changelogRow("+I", "HK Dollar", "US Dollar", 0.13d, "2022-01-01", "11"),
+                        changelogRow(
+                                "+I", "Singapore Dollar", "US Dollar", 0.74d, "2022-01-01", "11"),
+                        changelogRow("+I", "Yen", "US Dollar", 0.0081d, "2022-01-01", "11"),
+                        changelogRow("+I", "US Dollar", "US Dollar", 1.0d, "2022-01-01", "11"),
+                        // to_currency is USD, dt = 2022-01-01, hh = 12
+                        changelogRow("+I", "US Dollar", "US Dollar", 1.0d, "2022-01-01", "12"),
+                        changelogRow("+I", "Euro", "US Dollar", 1.12d, "2022-01-01", "12"),
+                        changelogRow("+I", "HK Dollar", "US Dollar", 0.129d, "2022-01-01", "12"),
+                        changelogRow(
+                                "+I", "Singapore Dollar", "US Dollar", 0.741d, "2022-01-01", "12"),
+                        changelogRow("+I", "Yen", "US Dollar", 0.00812d, "2022-01-01", "12")));
+
+        collectAndCheckBatchReadWrite(
+                Arrays.asList("dt", "hh"), // partition
+                Arrays.asList("from_currency", "to_currency", "dt", "hh"), // pk
+                "dt = '2022-01-01' AND hh = '12'",
+                Collections.emptyList(),
+                Arrays.asList(
+                        changelogRow("+I", "US Dollar", "US Dollar", 1.0d, "2022-01-01", "12"),
+                        changelogRow("+I", "Euro", "US Dollar", 1.12d, "2022-01-01", "12"),
+                        changelogRow("+I", "HK Dollar", "US Dollar", 0.129d, "2022-01-01", "12"),
+                        changelogRow(
+                                "+I", "Singapore Dollar", "US Dollar", 0.741d, "2022-01-01", "12"),
+                        changelogRow("+I", "Yen", "US Dollar", 0.00812d, "2022-01-01", "12")));
 
         // test field filter
-        collectAndCheckStreamingReadWriteWithClose(
-                true,
-                true,
-                true,
-                Collections.emptyMap(),
-                "currency = 'US Dollar'",
+        collectAndCheckBatchReadWrite(
+                Arrays.asList("dt", "hh"), // partition
+                Arrays.asList("from_currency", "to_currency", "dt", "hh"), // pk
+                "to_currency = 'Euro'",
                 Collections.emptyList(),
-                Collections.singletonList(changelogRow("+I", "US Dollar", 102L, "2022-01-01")));
+                Arrays.asList(
+                        changelogRow("+I", "US Dollar", "Euro", 0.918d, "2022-01-02", "23"),
+                        changelogRow("+I", "Singapore Dollar", "Euro", 0.67d, "2022-01-02", "23")));
+
+        collectAndCheckBatchReadWrite(
+                Arrays.asList("dt", "hh"), // partition
+                Arrays.asList("from_currency", "to_currency", "dt", "hh"), // pk
+                "from_currency = 'HK Dollar'",
+                Collections.emptyList(),
+                Arrays.asList(
+                        changelogRow("+I", "HK Dollar", "US Dollar", 0.13d, "2022-01-01", "11"),
+                        changelogRow("+I", "HK Dollar", "US Dollar", 0.129d, "2022-01-01", "12")));
+
+        collectAndCheckBatchReadWrite(
+                Arrays.asList("dt", "hh"), // partition
+                Arrays.asList("from_currency", "to_currency", "dt", "hh"), // pk
+                "rate_by_to_currency > 0.5",
+                Collections.emptyList(),
+                Arrays.asList(
+                        changelogRow("+I", "Euro", "US Dollar", 1.11d, "2022-01-01", "11"),
+                        changelogRow(
+                                "+I", "Singapore Dollar", "US Dollar", 0.74d, "2022-01-01", "11"),
+                        changelogRow("+I", "US Dollar", "US Dollar", 1.0d, "2022-01-01", "11"),
+                        changelogRow("+I", "US Dollar", "US Dollar", 1.0d, "2022-01-01", "12"),
+                        changelogRow("+I", "Euro", "US Dollar", 1.12d, "2022-01-01", "12"),
+                        changelogRow(
+                                "+I", "Singapore Dollar", "US Dollar", 0.741d, "2022-01-01", "12"),
+                        changelogRow("+I", "US Dollar", "Euro", 0.918d, "2022-01-02", "23"),
+                        changelogRow("+I", "Singapore Dollar", "Euro", 0.67d, "2022-01-02", "23")));
 
         // test partition and field filter
-        collectAndCheckStreamingReadWriteWithClose(
-                true,
-                true,
-                true,
-                Collections.emptyMap(),
-                "dt = '2022-01-01' AND rate = 1",
+        collectAndCheckBatchReadWrite(
+                Arrays.asList("dt", "hh"), // partition
+                Arrays.asList("from_currency", "to_currency", "dt", "hh"), // pk
+                "rate_by_to_currency > 0.9 AND hh = '23'",
                 Collections.emptyList(),
-                Collections.emptyList());
+                Collections.singletonList(
+                        changelogRow("+I", "US Dollar", "Euro", 0.918d, "2022-01-02", "23")));
+
+        // test projection
+        collectAndCheckBatchReadWrite(
+                Arrays.asList("dt", "hh"), // partition
+                Arrays.asList("from_currency", "to_currency", "dt", "hh"), // pk
+                null,
+                Arrays.asList("from_currency", "dt", "hh"),
+                Arrays.asList(
+                        // to_currency is USD, dt = 2022-01-01, hh = 11
+                        changelogRow("+I", "Euro", "2022-01-01", "11"),
+                        changelogRow("+I", "HK Dollar", "2022-01-01", "11"),
+                        changelogRow("+I", "Singapore Dollar", "2022-01-01", "11"),
+                        changelogRow("+I", "Yen", "2022-01-01", "11"),
+                        changelogRow("+I", "US Dollar", "2022-01-01", "11"),
+                        // to_currency is USD, dt = 2022-01-01, hh = 12
+                        changelogRow("+I", "US Dollar", "2022-01-01", "12"),
+                        changelogRow("+I", "Euro", "2022-01-01", "12"),
+                        changelogRow("+I", "HK Dollar", "2022-01-01", "12"),
+                        changelogRow("+I", "Singapore Dollar", "2022-01-01", "12"),
+                        changelogRow("+I", "Yen", "2022-01-01", "12"),
+                        // to_currency is Euro, dt = 2022-01-02, hh = 23
+                        changelogRow("+I", "US Dollar", "2022-01-02", "23"),
+                        changelogRow("+I", "Singapore Dollar", "2022-01-02", "23")));
 
         // test projection and filter
-        collectAndCheckStreamingReadWriteWithClose(
-                true,
-                true,
-                true,
-                Collections.emptyMap(),
-                "dt = '2022-01-02' AND currency = 'Euro'",
-                Arrays.asList("rate", "dt", "currency"),
-                Collections.singletonList(changelogRow("+I", 119L, "2022-01-02", "Euro")));
+        collectAndCheckBatchReadWrite(
+                Arrays.asList("dt", "hh"), // partition
+                Arrays.asList("from_currency", "to_currency", "dt", "hh"), // pk
+                "dt = '2022-01-01' AND hh >= '12' OR rate_by_to_currency > 2",
+                Arrays.asList("from_currency", "to_currency"),
+                Arrays.asList(
+                        // to_currency is USD, dt = 2022-01-01, hh = 12
+                        changelogRow("+I", "US Dollar", "US Dollar"),
+                        changelogRow("+I", "Euro", "US Dollar"),
+                        changelogRow("+I", "HK Dollar", "US Dollar"),
+                        changelogRow("+I", "Singapore Dollar", "US Dollar"),
+                        changelogRow("+I", "Yen", "US Dollar")));
     }
 
     @Test
-    public void testDisableLogAndStreamingReadWritePartitionedRecordsWithPk() throws Exception {
-        // input is dailyRatesChangelogWithoutUB()
-        // file store continuous read
-        // will not merge, at least collect two records
-        checkFileStorePath(
-                tEnv,
-                collectAndCheckStreamingReadWriteWithClose(
-                        false,
-                        true,
-                        true,
-                        Collections.emptyMap(),
+    public void testBatchWriteWithSinglePartitionedRecordsWithMultiPk() throws Exception {
+        // input is dailyExchangeRates()
+        List<Row> expectedRecords =
+                Arrays.asList(
+                        changelogRow("+I", "US Dollar", "US Dollar", 1.0d, "2022-01-01"),
+                        changelogRow("+I", "Euro", "US Dollar", 1.11d, "2022-01-01"),
+                        changelogRow("+I", "HK Dollar", "US Dollar", 0.13d, "2022-01-01"),
+                        changelogRow("+I", "Yen", "US Dollar", 0.0082d, "2022-01-01"),
+                        changelogRow("+I", "Singapore Dollar", "US Dollar", 0.74d, "2022-01-01"),
+                        changelogRow("+I", "US Dollar", "Euro", 0.9d, "2022-01-01"),
+                        changelogRow("+I", "Singapore Dollar", "Euro", 0.67d, "2022-01-01"),
+                        changelogRow("+I", "Yen", "Yen", 1.0d, "2022-01-01"),
+                        changelogRow("+I", "Chinese Yuan", "Yen", 19.25d, "2022-01-01"),
+                        changelogRow("+I", "Singapore Dollar", "Yen", 122.46d, "2022-01-01"),
+                        changelogRow("+I", "Yen", "US Dollar", 0.0081d, "2022-01-02"),
+                        changelogRow("+I", "US Dollar", "US Dollar", 1.0d, "2022-01-02"));
+        // test batch read
+        String managedTable =
+                collectAndCheckBatchReadWrite(
+                        Collections.singletonList("dt"),
+                        Arrays.asList("from_currency", "to_currency", "dt"),
                         null,
                         Collections.emptyList(),
-                        Arrays.asList(
-                                // part = 2022-01-01
-                                changelogRow("+I", "US Dollar", 102L, "2022-01-01"),
-                                // part = 2022-01-02
-                                changelogRow("+I", "Euro", 119L, "2022-01-02"))));
+                        expectedRecords);
 
-        // test partition filter
-        collectAndCheckStreamingReadWriteWithClose(
-                false,
-                true,
-                true,
-                Collections.emptyMap(),
-                "dt < '2022-01-02'",
-                Collections.emptyList(),
-                Collections.singletonList(
-                        // part = 2022-01-01
-                        changelogRow("+I", "US Dollar", 102L, "2022-01-01")));
+        checkFileStorePath(tEnv, managedTable, dailyExchangeRates().f1);
 
-        // test field filter
+        // test streaming read
+        final StreamTableEnvironment streamTableEnv =
+                StreamTableEnvironment.create(buildStreamEnv());
+        registerTable(streamTableEnv, managedTable);
+        BlockingIterator<Row, Row> streamIter =
+                collectAndCheck(
+                        streamTableEnv,
+                        managedTable,
+                        Collections.emptyMap(),
+                        null,
+                        expectedRecords);
+
+        // overwrite dynamic partition
+        prepareEnvAndOverwrite(
+                managedTable,
+                Collections.emptyMap(),
+                Collections.singletonList(
+                        new String[] {"'US Dollar'", "'Thai Baht'", "33.51", "'2022-01-01'"}));
+
+        // streaming iter will not receive any changelog
+        assertNoMoreRecords(streamIter);
+
+        // batch read to check partition refresh
+        collectAndCheck(
+                        tEnv,
+                        managedTable,
+                        Collections.emptyMap(),
+                        "dt = '2022-01-01'",
+                        Collections.singletonList(
+                                changelogRow("+I", "US Dollar", "Thai Baht", 33.51d, "2022-01-01")))
+                .close();
+
+        // test partition filter
+        collectAndCheckBatchReadWrite(
+                Collections.singletonList("dt"), // partition
+                Arrays.asList("from_currency", "to_currency", "dt"), // pk
+                "dt = '2022-01-02'",
+                Collections.emptyList(),
+                Arrays.asList(
+                        changelogRow("+I", "Yen", "US Dollar", 0.0081d, "2022-01-02"),
+                        changelogRow("+I", "US Dollar", "US Dollar", 1.0d, "2022-01-02")));
+
+        // test field filter
+        collectAndCheckBatchReadWrite(
+                Collections.singletonList("dt"), // partition
+                Arrays.asList("from_currency", "to_currency", "dt"), // pk
+                "rate_by_to_currency < 0.1",
+                Collections.emptyList(),
+                Arrays.asList(
+                        changelogRow("+I", "Yen", "US Dollar", 0.0082d, "2022-01-01"),
+                        changelogRow("+I", "Yen", "US Dollar", 0.0081d, "2022-01-02")));
+
+        // test partition and field filter
+        collectAndCheckBatchReadWrite(
+                Collections.singletonList("dt"), // partition
+                Arrays.asList("from_currency", "to_currency", "dt"), // pk
+                "rate_by_to_currency > 0.9 OR dt = '2022-01-02'",
+                Collections.emptyList(),
+                Arrays.asList(
+                        changelogRow("+I", "US Dollar", "US Dollar", 1.0d, "2022-01-01"),
+                        changelogRow("+I", "Euro", "US Dollar", 1.11d, "2022-01-01"),
+                        changelogRow("+I", "Yen", "Yen", 1.0d, "2022-01-01"),
+                        changelogRow("+I", "Chinese Yuan", "Yen", 19.25d, "2022-01-01"),
+                        changelogRow("+I", "Singapore Dollar", "Yen", 122.46d, "2022-01-01"),
+                        changelogRow("+I", "Yen", "US Dollar", 0.0081d, "2022-01-02"),
+                        changelogRow("+I", "US Dollar", "US Dollar", 1.0d, "2022-01-02")));
+
+        // test projection
+        collectAndCheckBatchReadWrite(
+                Collections.singletonList("dt"), // partition
+                Arrays.asList("from_currency", "to_currency", "dt"), // pk
+                null,
+                Arrays.asList("rate_by_to_currency", "dt"),
+                Arrays.asList(
+                        changelogRow("+I", 1.0d, "2022-01-01"), // US Dollar,US Dollar
+                        changelogRow("+I", 1.11d, "2022-01-01"), // Euro,US Dollar
+                        changelogRow("+I", 0.13d, "2022-01-01"), // HK Dollar,US Dollar
+                        changelogRow("+I", 0.0082d, "2022-01-01"), // Yen,US Dollar
+                        changelogRow("+I", 0.74d, "2022-01-01"), // Singapore Dollar,US Dollar
+                        changelogRow("+I", 0.9d, "2022-01-01"), // US Dollar,Euro
+                        changelogRow("+I", 0.67d, "2022-01-01"), // Singapore Dollar,Euro
+                        changelogRow("+I", 1.0d, "2022-01-01"), // Yen,Yen
+                        changelogRow("+I", 19.25d, "2022-01-01"), // Chinese Yuan,Yen
+                        changelogRow("+I", 122.46d, "2022-01-01"), // Singapore Dollar,Yen
+                        changelogRow("+I", 0.0081d, "2022-01-02"), // Yen,US Dollar
+                        changelogRow("+I", 1.0d, "2022-01-02") // US Dollar,US Dollar
+                        ));
+
+        // test projection and filter
+        collectAndCheckBatchReadWrite(
+                Collections.singletonList("dt"), // partition
+                Arrays.asList("from_currency", "to_currency", "dt"), // pk
+                "dt = '2022-01-02' OR rate_by_to_currency > 100",
+                Arrays.asList("from_currency", "to_currency"),
+                Arrays.asList(
+                        changelogRow("+I", "Yen", "US Dollar"),
+                        changelogRow("+I", "US Dollar", "US Dollar"),
+                        changelogRow("+I", "Singapore Dollar", "Yen")));
+    }
+
+    @Test
+    public void testBatchWriteWithNonPartitionedRecordsWithMultiPk() throws Exception {
+        // input is exchangeRates()
+        List<Row> expectedRecords =
+                Arrays.asList(
+                        // to_currency is USD
+                        changelogRow("+I", "Euro", "US Dollar", 1.11d),
+                        changelogRow("+I", "HK Dollar", "US Dollar", 0.13d),
+                        changelogRow("+I", "Singapore Dollar", "US Dollar", 0.74d),
+                        changelogRow("+I", "Yen", "US Dollar", 0.0081d),
+                        changelogRow("+I", "US Dollar", "US Dollar", 1.0d),
+                        // to_currency is Euro
+                        changelogRow("+I", "US Dollar", "Euro", 0.9d),
+                        changelogRow("+I", "Singapore Dollar", "Euro", 0.67d),
+                        // to_currency is Yen
+                        changelogRow("+I", "Yen", "Yen", 1.0d),
+                        changelogRow("+I", "Chinese Yuan", "Yen", 19.25d),
+                        changelogRow("+I", "Singapore Dollar", "Yen", 122.46d));
+        // test batch read
+        String managedTable =
+                collectAndCheckBatchReadWrite(
+                        Collections.emptyList(), // partition
+                        Arrays.asList("from_currency", "to_currency"), // pk
+                        null,
+                        Collections.emptyList(),
+                        expectedRecords);
+
+        checkFileStorePath(tEnv, managedTable, null);
+
+        // test streaming read
+        final StreamTableEnvironment streamTableEnv =
+                StreamTableEnvironment.create(buildStreamEnv());
+        registerTable(streamTableEnv, managedTable);
+        BlockingIterator<Row, Row> streamIter =
+                collectAndCheck(
+                        streamTableEnv,
+                        managedTable,
+                        Collections.emptyMap(),
+                        null,
+                        expectedRecords);
+
+        // overwrite the whole table
+        prepareEnvAndOverwrite(
+                managedTable,
+                Collections.emptyMap(),
+                Collections.singletonList(new String[] {"'US Dollar'", "'Thai Baht'", "33.51"}));
+
+        // streaming iter will not receive any changelog
+        assertNoMoreRecords(streamIter);
+
+        // batch read to check data refresh
+        collectAndCheck(
+                        tEnv,
+                        managedTable,
+                        Collections.emptyMap(),
+                        null,
+                        Collections.singletonList(
+                                changelogRow("+I", "US Dollar", "Thai Baht", 33.51d)))
+                .close();
+
+        // test field filter
+        collectAndCheckBatchReadWrite(
+                Collections.emptyList(), // partition
+                Arrays.asList("from_currency", "to_currency"), // pk
+                "rate_by_to_currency < 0.1",
+                Collections.emptyList(),
+                Collections.singletonList(changelogRow("+I", "Yen", "US Dollar", 0.0081d)));
+
+        // test projection
+        collectAndCheckBatchReadWrite(
+                Collections.emptyList(), // partition
+                Arrays.asList("from_currency", "to_currency"), // pk
+                null,
+                Collections.singletonList("rate_by_to_currency"),
+                Arrays.asList(
+                        changelogRow("+I", 1.11d), // Euro, US Dollar
+                        changelogRow("+I", 0.13d), // HK Dollar, US Dollar
+                        changelogRow("+I", 0.74d), // Singapore Dollar, US Dollar
+                        changelogRow("+I", 0.0081d), // Yen, US Dollar
+                        changelogRow("+I", 1.0d), // US Dollar, US Dollar
+                        changelogRow("+I", 0.9d), // US Dollar, Euro
+                        changelogRow("+I", 0.67d), // Singapore Dollar, Euro
+                        changelogRow("+I", 1.0d), // Yen, Yen
+                        changelogRow("+I", 19.25d), // Chinese Yuan, Yen
+                        changelogRow("+I", 122.46d) // Singapore Dollar, Yen
+                        ));
+
+        // test projection and filter
+        collectAndCheckBatchReadWrite(
+                Collections.emptyList(), // partition
+                Arrays.asList("from_currency", "to_currency"), // pk
+                "rate_by_to_currency > 100",
+                Arrays.asList("from_currency", "to_currency"),
+                Collections.singletonList(changelogRow("+I", "Singapore Dollar", "Yen")));
+    }
+
+    @Test
+    public void testBatchWriteMultiPartitionedRecordsWithExclusiveOnePk() throws Exception {
+        // input is hourlyRates()
+        List<Row> expectedRecords =
+                Arrays.asList(
+                        // dt = 2022-01-01, hh = 00
+                        changelogRow("+I", "Yen", 1L, "2022-01-01", "00"),
+                        changelogRow("+I", "Euro", 114L, "2022-01-01", "00"),
+                        changelogRow("+I", "US Dollar", 114L, "2022-01-01", "00"),
+                        // dt = 2022-01-01, hh = 20
+                        changelogRow("+I", "Yen", 1L, "2022-01-01", "20"),
+                        changelogRow("+I", "Euro", 114L, "2022-01-01", "20"),
+                        changelogRow("+I", "US Dollar", 114L, "2022-01-01", "20"),
+                        // dt = 2022-01-02, hh = 12
+                        changelogRow("+I", "Euro", 119L, "2022-01-02", "12"));
+        // test batch read
+        String managedTable =
+                collectAndCheckBatchReadWrite(
+                        Arrays.asList("dt", "hh"), // partition
+                        Arrays.asList("currency", "dt", "hh"), // pk
+                        null,
+                        Collections.emptyList(),
+                        expectedRecords);
+
+        checkFileStorePath(tEnv, managedTable, hourlyRates().f1);
+
+        // test streaming read
+        final StreamTableEnvironment streamTableEnv =
+                StreamTableEnvironment.create(buildStreamEnv());
+        registerTable(streamTableEnv, managedTable);
+        BlockingIterator<Row, Row> streamIter =
+                collectAndCheck(
+                        streamTableEnv,
+                        managedTable,
+                        Collections.emptyMap(),
+                        null,
+                        expectedRecords);
+
+        // dynamic overwrite
+        // INSERT OVERWRITE `manged_table-${uuid}` VALUES(...)
+        prepareEnvAndOverwrite(
+                managedTable,
+                Collections.emptyMap(),
+                Collections.singletonList(
+                        new String[] {"'HK Dollar'", "80", "'2022-01-01'", "'00'"}));
+
+        // INSERT OVERWRITE `manged_table-${uuid}` PARTITION (dt = '2022-01-02') VALUES(...)
+        prepareEnvAndOverwrite(
+                managedTable,
+                Collections.singletonMap("dt", "'2022-01-02'"),
+                Collections.singletonList(new String[] {"'Euro'", "120", "'12'"}));
+
+        // batch read to check data refresh
+        collectAndCheck(
+                tEnv,
+                managedTable,
+                Collections.emptyMap(),
+                "hh = '00' OR hh = '12'",
+                Arrays.asList(
+                        changelogRow("+I", "HK Dollar", 80L, "2022-01-01", "00"),
+                        changelogRow("+I", "Euro", 120L, "2022-01-02", "12")));
+
+        // streaming iter will not receive any changelog
+        assertNoMoreRecords(streamIter);
+
+        // test partition filter
+        collectAndCheckBatchReadWrite(
+                Arrays.asList("dt", "hh"), // partition
+                Arrays.asList("currency", "dt", "hh"), // pk
+                "hh >= '10'",
+                Collections.emptyList(),
+                Arrays.asList(
+                        // dt = 2022-01-01, hh = 20
+                        changelogRow("+I", "Yen", 1L, "2022-01-01", "20"),
+                        changelogRow("+I", "Euro", 114L, "2022-01-01", "20"),
+                        changelogRow("+I", "US Dollar", 114L, "2022-01-01", "20"),
+                        // dt = 2022-01-02, hh = 12
+                        changelogRow("+I", "Euro", 119L, "2022-01-02", "12")));
+
+        // test field filter
+        collectAndCheckBatchReadWrite(
+                Arrays.asList("dt", "hh"), // partition
+                Arrays.asList("currency", "dt", "hh"), // pk
+                "rate >= 119",
+                Collections.emptyList(),
+                Collections.singletonList(
+                        // dt = 2022-01-02, hh = 12
+                        changelogRow("+I", "Euro", 119L, "2022-01-02", "12")));
+
+        // test projection
+        collectAndCheckBatchReadWrite(
+                Arrays.asList("dt", "hh"), // partition
+                Arrays.asList("currency", "dt", "hh"), // pk
+                null,
+                Collections.singletonList("hh"),
+                Arrays.asList(
+                        changelogRow("+I", "00"), // Yen, 1L, 2022-01-01
+                        changelogRow("+I", "00"), // Euro, 114L, 2022-01-01
+                        changelogRow("+I", "00"), // US Dollar, 114L, 2022-01-01
+                        changelogRow("+I", "20"), // Yen, 1L, 2022-01-01
+                        changelogRow("+I", "20"), // Euro, 114L, 2022-01-01
+                        changelogRow("+I", "20"), // US Dollar, 114L, 2022-01-01
+                        changelogRow("+I", "12") // Euro, 119L, "2022-01-02
+                        ));
+
+        // test projection and filter
+        collectAndCheckBatchReadWrite(
+                Arrays.asList("dt", "hh"), // partition
+                Arrays.asList("currency", "dt", "hh"), // pk
+                "rate > 100 AND hh >= '20'",
+                Collections.singletonList("rate"),
+                Collections.singletonList(changelogRow("+I", 114L)));
+    }
+
+    @Test
+    public void testBatchWriteMultiPartitionedRecordsWithoutPk() throws Exception {

Review Comment:
   Can we consider putting the new tests inside another class and shortening those method names? I think this class is too big and method name is long.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org