You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2021/05/06 02:59:43 UTC
[flink] branch master updated: [hotfix][hive] Add an ITCase that
checks partition-time commit pretty well
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new da421e6 [hotfix][hive] Add an ITCase that checks partition-time commit pretty well
da421e6 is described below
commit da421e6a3d6ebf80feca029c8d3fb4f1021dbb09
Author: Leonard Xu <xb...@gmail.com>
AuthorDate: Thu Apr 29 18:11:14 2021 +0800
[hotfix][hive] Add an ITCase that checks partition-time commit pretty well
This closes #15754
---
.../flink/connectors/hive/HiveTableSinkITCase.java | 188 ++++++++++++++-------
1 file changed, 128 insertions(+), 60 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
index 410ebe9..493a5df 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
@@ -23,15 +23,19 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.FiniteTestSource;
+import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ExplainDetail;
+import org.apache.flink.table.api.Expressions;
+import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.SqlDialect;
+import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
-import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.CollectionUtil;
import org.junit.AfterClass;
@@ -46,7 +50,10 @@ import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.function.Consumer;
import static org.apache.flink.table.api.Expressions.$;
@@ -232,82 +239,133 @@ public class HiveTableSinkITCase {
env.enableCheckpointing(100);
StreamTableEnvironment tEnv = HiveTestUtils.createTableEnvWithBlinkPlannerStreamMode(env);
+ tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
tEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
tEnv.useCatalog(hiveCatalog.getName());
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
try {
tEnv.executeSql("create database db1");
tEnv.useDatabase("db1");
+
+ // source table DDL
+ tEnv.executeSql(
+ "create external table source_table ("
+ + " a int,"
+ + " b string,"
+ + " c string,"
+ + " epoch_ts bigint)"
+ + " partitioned by ("
+ + " pt_day string, pt_hour string) TBLPROPERTIES("
+ + "'partition.time-extractor.timestamp-pattern'='$pt_day $pt_hour:00:00',"
+ + "'streaming-source.enable'='true',"
+ + "'streaming-source.monitor-interval'='1s',"
+ + "'streaming-source.consume-order'='partition-time'"
+ + ")");
+
tEnv.executeSql(
"create external table sink_table ("
+ " a int,"
+ " b string,"
+ " c string)"
- + " partitioned by (d string,e string)"
- + " stored as parquet TBLPROPERTIES ("
- + " 'partition.time-extractor.timestamp-pattern'='$d $e:00:00',"
+ + " partitioned by ("
+ + " d string, e string) TBLPROPERTIES("
+ + " 'partition.time-extractor.timestamp-pattern' = '$d $e:00:00',"
+ + " 'auto-compaction'='true',"
+ + " 'compaction.file-size' = '128MB',"
+ " 'sink.partition-commit.trigger'='partition-time',"
- + " 'sink.partition-commit.delay'='1h',"
+ + " 'sink.partition-commit.delay'='30min',"
+ " 'sink.partition-commit.watermark-time-zone'='Asia/Shanghai',"
+ " 'sink.partition-commit.policy.kind'='metastore,success-file',"
- + " 'sink.partition-commit.success-file.name'='_MY_SUCCESS')");
+ + " 'sink.partition-commit.success-file.name'='_MY_SUCCESS',"
+ + " 'streaming-source.enable'='true',"
+ + " 'streaming-source.monitor-interval'='1s',"
+ + " 'streaming-source.consume-order'='partition-time'"
+ + ")");
- // hive dialect only works with hive tables at the moment, switch to default dialect
tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
- tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
- // prepare source
- // epoch mills 1588460400000L <=> local timestamp 2020-05-03 07:00:00 in Shanghai
- // epoch mills 1588464000000L <=> local timestamp 2020-05-03 08:00:00 in Shanghai
- // epoch mills 1588467600000L <=> local timestamp 2020-05-03 09:00:00 in Shanghai
- // epoch mills 1588471200000L <=> local timestamp 2020-05-03 10:00:00 in Shanghai
- // epoch mills 1588474800000L <=> local timestamp 2020-05-03 11:00:00 in Shanghai
- List<Row> data =
- Arrays.asList(
- Row.of(1, "a", "b", "2020-05-03", "7", 1588460400000L),
- Row.of(1, "a", "b", "2020-05-03", "7", 1588460400000L),
- Row.of(2, "p", "q", "2020-05-03", "8", 1588464000000L),
- Row.of(2, "p", "q", "2020-05-03", "8", 1588464000000L),
- Row.of(3, "x", "y", "2020-05-03", "9", 1588467600000L),
- Row.of(3, "x", "y", "2020-05-03", "9", 1588467600000L),
- Row.of(4, "x", "y", "2020-05-03", "10", 1588471200000L),
- Row.of(4, "x", "y", "2020-05-03", "10", 1588471200000L),
- Row.of(5, "x", "y", "2020-05-03", "11", 1588474800000L),
- Row.of(5, "x", "y", "2020-05-03", "11", 1588474800000L));
-
- String dataId = TestValuesTableFactory.registerData(data);
- String sourceTableDDL =
- String.format(
- "create table my_table("
- + " a INT,"
- + " b STRING,"
- + " c STRING,"
- + " d STRING,"
- + " e STRING,"
- + " ts BIGINT,"
- + " ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),"
- + " WATERMARK FOR ts_ltz as ts_ltz"
- + ") with ("
- + " 'connector' = 'values',"
- + " 'data-id' = '%s',"
- + " 'failing-source' = 'true')",
- dataId);
- tEnv.executeSql(sourceTableDDL);
- tEnv.executeSql("insert into sink_table select a, b, c, d, e from my_table").await();
-
- assertBatch(
- "db1.sink_table",
- Arrays.asList(
- "+I[1, a, b, 2020-05-03, 7]",
- "+I[1, a, b, 2020-05-03, 7]",
- "+I[2, p, q, 2020-05-03, 8]",
- "+I[2, p, q, 2020-05-03, 8]",
- "+I[3, x, y, 2020-05-03, 9]",
- "+I[3, x, y, 2020-05-03, 9]",
- "+I[4, x, y, 2020-05-03, 10]",
- "+I[4, x, y, 2020-05-03, 10]",
- "+I[5, x, y, 2020-05-03, 11]",
- "+I[5, x, y, 2020-05-03, 11]"));
+ // Build a partitioned table source with watermark base on the streaming-hive table
+ DataStream<Row> dataStream =
+ tEnv.toDataStream(
+ tEnv.sqlQuery(
+ "select a, b, c, epoch_ts, pt_day, pt_hour from source_table"));
+ Table table =
+ tEnv.fromDataStream(
+ dataStream,
+ Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .column("b", DataTypes.STRING())
+ .column("c", DataTypes.STRING())
+ .column("epoch_ts", DataTypes.BIGINT())
+ .column("pt_day", DataTypes.STRING())
+ .column("pt_hour", DataTypes.STRING())
+ .columnByExpression(
+ "ts_ltz",
+ Expressions.callSql("TO_TIMESTAMP_LTZ(epoch_ts, 3)"))
+ .watermark("ts_ltz", "ts_ltz - INTERVAL '1' SECOND")
+ .build());
+ tEnv.createTemporaryView("my_table", table);
+ /*
+ * prepare test data, the poch mills used to define watermark, the watermark value is
+ * the max timestamp value of all the partition data, i.e:
+ * partition timestamp + 1 hour - 1 second in this case
+ *
+ * <pre>
+ * epoch mills 1588464000000L <=> local timestamp 2020-05-03 08:00:00 in Shanghai
+ * epoch mills 1588467600000L <=> local timestamp 2020-05-03 09:00:00 in Shanghai
+ * epoch mills 1588471200000L <=> local timestamp 2020-05-03 10:00:00 in Shanghai
+ * epoch mills 1588474800000L <=> local timestamp 2020-05-03 11:00:00 in Shanghai
+ * epoch mills 1588478400000L <=> local timestamp 2020-05-03 12:00:00 in Shanghai
+ * </pre>
+ */
+ Map<Integer, Object[]> testData = new HashMap<>();
+ testData.put(1, new Object[] {1, "a", "b", 1588464000000L});
+ testData.put(2, new Object[] {2, "p", "q", 1588467600000L});
+ testData.put(3, new Object[] {3, "x", "y", 1588471200000L});
+ testData.put(4, new Object[] {4, "x", "y", 1588474800000L});
+ testData.put(5, new Object[] {5, "x", "y", 1588478400000L});
+
+ Map<Integer, String> testPartition = new HashMap<>();
+ testPartition.put(1, "pt_day='2020-05-03',pt_hour='7'");
+ testPartition.put(2, "pt_day='2020-05-03',pt_hour='8'");
+ testPartition.put(3, "pt_day='2020-05-03',pt_hour='9'");
+ testPartition.put(4, "pt_day='2020-05-03',pt_hour='10'");
+ testPartition.put(5, "pt_day='2020-05-03',pt_hour='11'");
+
+ Map<Integer, Object[]> expectedData = new HashMap<>();
+ expectedData.put(1, new Object[] {1, "a", "b", "2020-05-03", "7"});
+ expectedData.put(2, new Object[] {2, "p", "q", "2020-05-03", "8"});
+ expectedData.put(3, new Object[] {3, "x", "y", "2020-05-03", "9"});
+ expectedData.put(4, new Object[] {4, "x", "y", "2020-05-03", "10"});
+ expectedData.put(5, new Object[] {5, "x", "y", "2020-05-03", "11"});
+
+ tEnv.executeSql("insert into sink_table select a, b, c, pt_day, pt_hour from my_table");
+ CloseableIterator<Row> iter = tEnv.executeSql("select * from sink_table").collect();
+
+ HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "source_table")
+ .addRow(testData.get(1))
+ .addRow(testData.get(1))
+ .commit(testPartition.get(1));
+
+ for (int i = 2; i < 7; i++) {
+ try {
+ Thread.sleep(1_000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ Assert.assertEquals(
+ Arrays.asList(
+ Row.of(expectedData.get(i - 1)).toString(),
+ Row.of(expectedData.get(i - 1)).toString()),
+ fetchRows(iter, 2));
+
+ if (i < 6) {
+ HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "source_table")
+ .addRow(testData.get(i))
+ .addRow(testData.get(i))
+ .commit(testPartition.get(i));
+ }
+ }
this.checkSuccessFiles(
URI.create(
hiveCatalog
@@ -320,6 +378,16 @@ public class HiveTableSinkITCase {
}
}
+ private static List<String> fetchRows(Iterator<Row> iter, int size) {
+ List<String> strings = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ Assert.assertTrue(iter.hasNext());
+ strings.add(iter.next().toString());
+ }
+ strings.sort(String::compareTo);
+ return strings;
+ }
+
private void checkSuccessFiles(String path) {
File basePath = new File(path, "d=2020-05-03");
Assert.assertEquals(5, basePath.list().length);