You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2021/05/06 02:59:43 UTC

[flink] branch master updated: [hotfix][hive] Add an ITCase that checks partition-time commit pretty well

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new da421e6  [hotfix][hive] Add an ITCase that checks partition-time commit pretty well
da421e6 is described below

commit da421e6a3d6ebf80feca029c8d3fb4f1021dbb09
Author: Leonard Xu <xb...@gmail.com>
AuthorDate: Thu Apr 29 18:11:14 2021 +0800

    [hotfix][hive] Add an ITCase that checks partition-time commit pretty well
    
    This closes #15754
---
 .../flink/connectors/hive/HiveTableSinkITCase.java | 188 ++++++++++++++-------
 1 file changed, 128 insertions(+), 60 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
index 410ebe9..493a5df 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
@@ -23,15 +23,19 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.util.FiniteTestSource;
+import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.ExplainDetail;
+import org.apache.flink.table.api.Expressions;
+import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.SqlDialect;
+import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.table.catalog.hive.HiveTestUtils;
-import org.apache.flink.table.planner.factories.TestValuesTableFactory;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.CollectionUtil;
 
 import org.junit.AfterClass;
@@ -46,7 +50,10 @@ import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.function.Consumer;
 
 import static org.apache.flink.table.api.Expressions.$;
@@ -232,82 +239,133 @@ public class HiveTableSinkITCase {
         env.enableCheckpointing(100);
         StreamTableEnvironment tEnv = HiveTestUtils.createTableEnvWithBlinkPlannerStreamMode(env);
 
+        tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
         tEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
         tEnv.useCatalog(hiveCatalog.getName());
         tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
         try {
             tEnv.executeSql("create database db1");
             tEnv.useDatabase("db1");
+
+            // source table DDL
+            tEnv.executeSql(
+                    "create external table source_table ("
+                            + " a int,"
+                            + " b string,"
+                            + " c string,"
+                            + " epoch_ts bigint)"
+                            + " partitioned by ("
+                            + " pt_day string, pt_hour string) TBLPROPERTIES("
+                            + "'partition.time-extractor.timestamp-pattern'='$pt_day $pt_hour:00:00',"
+                            + "'streaming-source.enable'='true',"
+                            + "'streaming-source.monitor-interval'='1s',"
+                            + "'streaming-source.consume-order'='partition-time'"
+                            + ")");
+
             tEnv.executeSql(
                     "create external table sink_table ("
                             + " a int,"
                             + " b string,"
                             + " c string)"
-                            + " partitioned by (d string,e string)"
-                            + " stored as parquet TBLPROPERTIES ("
-                            + " 'partition.time-extractor.timestamp-pattern'='$d $e:00:00',"
+                            + " partitioned by ("
+                            + " d string, e string) TBLPROPERTIES("
+                            + " 'partition.time-extractor.timestamp-pattern' = '$d $e:00:00',"
+                            + " 'auto-compaction'='true',"
+                            + " 'compaction.file-size' = '128MB',"
                             + " 'sink.partition-commit.trigger'='partition-time',"
-                            + " 'sink.partition-commit.delay'='1h',"
+                            + " 'sink.partition-commit.delay'='30min',"
                             + " 'sink.partition-commit.watermark-time-zone'='Asia/Shanghai',"
                             + " 'sink.partition-commit.policy.kind'='metastore,success-file',"
-                            + " 'sink.partition-commit.success-file.name'='_MY_SUCCESS')");
+                            + " 'sink.partition-commit.success-file.name'='_MY_SUCCESS',"
+                            + " 'streaming-source.enable'='true',"
+                            + " 'streaming-source.monitor-interval'='1s',"
+                            + " 'streaming-source.consume-order'='partition-time'"
+                            + ")");
 
-            // hive dialect only works with hive tables at the moment, switch to default dialect
             tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
-            tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
-            // prepare source
-            // epoch mills 1588460400000L <=>  local timestamp 2020-05-03 07:00:00 in Shanghai
-            // epoch mills 1588464000000L <=>  local timestamp 2020-05-03 08:00:00 in Shanghai
-            // epoch mills 1588467600000L <=>  local timestamp 2020-05-03 09:00:00 in Shanghai
-            // epoch mills 1588471200000L <=>  local timestamp 2020-05-03 10:00:00 in Shanghai
-            // epoch mills 1588474800000L <=>  local timestamp 2020-05-03 11:00:00 in Shanghai
-            List<Row> data =
-                    Arrays.asList(
-                            Row.of(1, "a", "b", "2020-05-03", "7", 1588460400000L),
-                            Row.of(1, "a", "b", "2020-05-03", "7", 1588460400000L),
-                            Row.of(2, "p", "q", "2020-05-03", "8", 1588464000000L),
-                            Row.of(2, "p", "q", "2020-05-03", "8", 1588464000000L),
-                            Row.of(3, "x", "y", "2020-05-03", "9", 1588467600000L),
-                            Row.of(3, "x", "y", "2020-05-03", "9", 1588467600000L),
-                            Row.of(4, "x", "y", "2020-05-03", "10", 1588471200000L),
-                            Row.of(4, "x", "y", "2020-05-03", "10", 1588471200000L),
-                            Row.of(5, "x", "y", "2020-05-03", "11", 1588474800000L),
-                            Row.of(5, "x", "y", "2020-05-03", "11", 1588474800000L));
-
-            String dataId = TestValuesTableFactory.registerData(data);
-            String sourceTableDDL =
-                    String.format(
-                            "create table my_table("
-                                    + " a INT,"
-                                    + " b STRING,"
-                                    + " c STRING,"
-                                    + " d STRING,"
-                                    + " e STRING,"
-                                    + " ts BIGINT,"
-                                    + " ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),"
-                                    + " WATERMARK FOR ts_ltz as ts_ltz"
-                                    + ") with ("
-                                    + " 'connector' = 'values',"
-                                    + " 'data-id' = '%s',"
-                                    + " 'failing-source' = 'true')",
-                            dataId);
-            tEnv.executeSql(sourceTableDDL);
-            tEnv.executeSql("insert into sink_table select a, b, c, d, e from my_table").await();
-
-            assertBatch(
-                    "db1.sink_table",
-                    Arrays.asList(
-                            "+I[1, a, b, 2020-05-03, 7]",
-                            "+I[1, a, b, 2020-05-03, 7]",
-                            "+I[2, p, q, 2020-05-03, 8]",
-                            "+I[2, p, q, 2020-05-03, 8]",
-                            "+I[3, x, y, 2020-05-03, 9]",
-                            "+I[3, x, y, 2020-05-03, 9]",
-                            "+I[4, x, y, 2020-05-03, 10]",
-                            "+I[4, x, y, 2020-05-03, 10]",
-                            "+I[5, x, y, 2020-05-03, 11]",
-                            "+I[5, x, y, 2020-05-03, 11]"));
 
+            // Build a partitioned table source with watermark base on the streaming-hive table
+            DataStream<Row> dataStream =
+                    tEnv.toDataStream(
+                            tEnv.sqlQuery(
+                                    "select a, b, c, epoch_ts, pt_day, pt_hour from source_table"));
+            Table table =
+                    tEnv.fromDataStream(
+                            dataStream,
+                            Schema.newBuilder()
+                                    .column("a", DataTypes.INT())
+                                    .column("b", DataTypes.STRING())
+                                    .column("c", DataTypes.STRING())
+                                    .column("epoch_ts", DataTypes.BIGINT())
+                                    .column("pt_day", DataTypes.STRING())
+                                    .column("pt_hour", DataTypes.STRING())
+                                    .columnByExpression(
+                                            "ts_ltz",
+                                            Expressions.callSql("TO_TIMESTAMP_LTZ(epoch_ts, 3)"))
+                                    .watermark("ts_ltz", "ts_ltz - INTERVAL '1' SECOND")
+                                    .build());
+            tEnv.createTemporaryView("my_table", table);
+            /*
+             * prepare test data, the poch mills used to define watermark, the watermark value is
+             * the max timestamp value of all the partition data, i.e:
+             * partition timestamp + 1 hour - 1 second in this case
+             *
+             * <pre>
+             * epoch mills 1588464000000L <=>  local timestamp 2020-05-03 08:00:00 in Shanghai
+             * epoch mills 1588467600000L <=>  local timestamp 2020-05-03 09:00:00 in Shanghai
+             * epoch mills 1588471200000L <=>  local timestamp 2020-05-03 10:00:00 in Shanghai
+             * epoch mills 1588474800000L <=>  local timestamp 2020-05-03 11:00:00 in Shanghai
+             * epoch mills 1588478400000L <=>  local timestamp 2020-05-03 12:00:00 in Shanghai
+             * </pre>
+             */
+            Map<Integer, Object[]> testData = new HashMap<>();
+            testData.put(1, new Object[] {1, "a", "b", 1588464000000L});
+            testData.put(2, new Object[] {2, "p", "q", 1588467600000L});
+            testData.put(3, new Object[] {3, "x", "y", 1588471200000L});
+            testData.put(4, new Object[] {4, "x", "y", 1588474800000L});
+            testData.put(5, new Object[] {5, "x", "y", 1588478400000L});
+
+            Map<Integer, String> testPartition = new HashMap<>();
+            testPartition.put(1, "pt_day='2020-05-03',pt_hour='7'");
+            testPartition.put(2, "pt_day='2020-05-03',pt_hour='8'");
+            testPartition.put(3, "pt_day='2020-05-03',pt_hour='9'");
+            testPartition.put(4, "pt_day='2020-05-03',pt_hour='10'");
+            testPartition.put(5, "pt_day='2020-05-03',pt_hour='11'");
+
+            Map<Integer, Object[]> expectedData = new HashMap<>();
+            expectedData.put(1, new Object[] {1, "a", "b", "2020-05-03", "7"});
+            expectedData.put(2, new Object[] {2, "p", "q", "2020-05-03", "8"});
+            expectedData.put(3, new Object[] {3, "x", "y", "2020-05-03", "9"});
+            expectedData.put(4, new Object[] {4, "x", "y", "2020-05-03", "10"});
+            expectedData.put(5, new Object[] {5, "x", "y", "2020-05-03", "11"});
+
+            tEnv.executeSql("insert into sink_table select a, b, c, pt_day, pt_hour from my_table");
+            CloseableIterator<Row> iter = tEnv.executeSql("select * from sink_table").collect();
+
+            HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "source_table")
+                    .addRow(testData.get(1))
+                    .addRow(testData.get(1))
+                    .commit(testPartition.get(1));
+
+            for (int i = 2; i < 7; i++) {
+                try {
+                    Thread.sleep(1_000);
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+                Assert.assertEquals(
+                        Arrays.asList(
+                                Row.of(expectedData.get(i - 1)).toString(),
+                                Row.of(expectedData.get(i - 1)).toString()),
+                        fetchRows(iter, 2));
+
+                if (i < 6) {
+                    HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "source_table")
+                            .addRow(testData.get(i))
+                            .addRow(testData.get(i))
+                            .commit(testPartition.get(i));
+                }
+            }
             this.checkSuccessFiles(
                     URI.create(
                                     hiveCatalog
@@ -320,6 +378,16 @@ public class HiveTableSinkITCase {
         }
     }
 
+    private static List<String> fetchRows(Iterator<Row> iter, int size) {
+        List<String> strings = new ArrayList<>(size);
+        for (int i = 0; i < size; i++) {
+            Assert.assertTrue(iter.hasNext());
+            strings.add(iter.next().toString());
+        }
+        strings.sort(String::compareTo);
+        return strings;
+    }
+
     private void checkSuccessFiles(String path) {
         File basePath = new File(path, "d=2020-05-03");
         Assert.assertEquals(5, basePath.list().length);