You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2021/08/19 09:15:40 UTC
[hudi] branch master updated: [HUDI-2316] Support Flink batch
upsert (#3494)
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 1fed44a [HUDI-2316] Support Flink batch upsert (#3494)
1fed44a is described below
commit 1fed44af84b4726d40c57f1dad012c8e4a510f91
Author: swuferhong <33...@qq.com>
AuthorDate: Thu Aug 19 17:15:26 2021 +0800
[HUDI-2316] Support Flink batch upsert (#3494)
---
.../hudi/sink/bootstrap/BootstrapFunction.java | 10 +-
.../bootstrap/batch/BatchBootstrapFunction.java | 65 ++++++++
.../java/org/apache/hudi/sink/utils/Pipelines.java | 165 +++++++++++++++++++++
.../org/apache/hudi/table/HoodieTableSink.java | 107 ++-----------
.../java/org/apache/hudi/util/StreamerUtil.java | 4 +
.../apache/hudi/table/HoodieDataSourceITCase.java | 99 +++++++++----
.../org/apache/hudi/utils/TestConfigurations.java | 18 ++-
.../test/java/org/apache/hudi/utils/TestSQL.java | 10 ++
8 files changed, 344 insertions(+), 134 deletions(-)
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java
index 4923362..fc3c5f4 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java
@@ -72,12 +72,12 @@ public class BootstrapFunction<I, O extends HoodieRecord>
private static final Logger LOG = LoggerFactory.getLogger(BootstrapFunction.class);
- private HoodieTable<?, ?, ?, ?> hoodieTable;
+ protected HoodieTable<?, ?, ?, ?> hoodieTable;
- private final Configuration conf;
+ protected final Configuration conf;
- private transient org.apache.hadoop.conf.Configuration hadoopConf;
- private transient HoodieWriteConfig writeConfig;
+ protected transient org.apache.hadoop.conf.Configuration hadoopConf;
+ protected transient HoodieWriteConfig writeConfig;
private GlobalAggregateManager aggregateManager;
@@ -153,7 +153,7 @@ public class BootstrapFunction<I, O extends HoodieRecord>
* @param partitionPath The partition path
*/
@SuppressWarnings("unchecked")
- private void loadRecords(String partitionPath, Collector<O> out) throws Exception {
+ protected void loadRecords(String partitionPath, Collector<O> out) throws Exception {
long start = System.currentTimeMillis();
BaseFileUtils fileUtils = BaseFileUtils.getInstance(this.hoodieTable.getBaseFileFormat());
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/batch/BatchBootstrapFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/batch/BatchBootstrapFunction.java
new file mode 100644
index 0000000..8b136b4
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/batch/BatchBootstrapFunction.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bootstrap.batch;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.sink.bootstrap.BootstrapFunction;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Collector;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * The function to load specify partition index from existing hoodieTable.
+ */
+public class BatchBootstrapFunction<I, O extends HoodieRecord>
+ extends BootstrapFunction<I, O> {
+
+ private Set<String> partitionPathSet;
+ private boolean haveSuccessfulCommits;
+
+ public BatchBootstrapFunction(Configuration conf) {
+ super(conf);
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ this.partitionPathSet = new HashSet<>();
+ this.haveSuccessfulCommits = StreamerUtil.haveSuccessfulCommits(hoodieTable.getMetaClient());
+ }
+
+ @Override
+ public void processElement(I value, Context context, Collector<O> out) throws Exception {
+ final HoodieRecord record = (HoodieRecord<?>) value;
+ final String partitionPath = record.getKey().getPartitionPath();
+
+ if (haveSuccessfulCommits && !partitionPathSet.contains(partitionPath)) {
+ loadRecords(partitionPath, out);
+ partitionPathSet.add(partitionPath);
+ }
+
+ // send the trigger record
+ out.collect((O) value);
+ }
+
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
new file mode 100644
index 0000000..4808eb7
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.utils;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.sink.CleanFunction;
+import org.apache.hudi.sink.StreamWriteOperatorFactory;
+import org.apache.hudi.sink.bootstrap.BootstrapFunction;
+import org.apache.hudi.sink.bootstrap.batch.BatchBootstrapFunction;
+import org.apache.hudi.sink.bulk.BulkInsertWriteOperator;
+import org.apache.hudi.sink.bulk.RowDataKeyGen;
+import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
+import org.apache.hudi.sink.compact.CompactFunction;
+import org.apache.hudi.sink.compact.CompactionCommitEvent;
+import org.apache.hudi.sink.compact.CompactionCommitSink;
+import org.apache.hudi.sink.compact.CompactionPlanEvent;
+import org.apache.hudi.sink.compact.CompactionPlanOperator;
+import org.apache.hudi.sink.partitioner.BucketAssignFunction;
+import org.apache.hudi.sink.partitioner.BucketAssignOperator;
+import org.apache.hudi.sink.transform.RowDataToHoodieFunctions;
+import org.apache.hudi.table.format.FilePathUtils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.operators.ProcessOperator;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.types.logical.RowType;
+
+/**
+ * Utilities to generate all kinds of sub-pipelines.
+ */
+public class Pipelines {
+
+ public static DataStreamSink<Object> bulkInsert(Configuration conf, RowType rowType, DataStream<RowData> dataStream) {
+ BulkInsertWriteOperator.OperatorFactory<RowData> operatorFactory = BulkInsertWriteOperator.getFactory(conf, rowType);
+
+ final String[] partitionFields = FilePathUtils.extractPartitionKeys(conf);
+ if (partitionFields.length > 0) {
+ RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType);
+ if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_BY_PARTITION)) {
+
+ // shuffle by partition keys
+ dataStream = dataStream.keyBy(rowDataKeyGen::getPartitionPath);
+ }
+ if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_BY_PARTITION)) {
+ SortOperatorGen sortOperatorGen = new SortOperatorGen(rowType, partitionFields);
+ // sort by partition keys
+ dataStream = dataStream
+ .transform("partition_key_sorter",
+ TypeInformation.of(RowData.class),
+ sortOperatorGen.createSortOperator())
+ .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
+ ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(),
+ conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
+ }
+ }
+ return dataStream
+ .transform("hoodie_bulk_insert_write",
+ TypeInformation.of(Object.class),
+ operatorFactory)
+ // follow the parallelism of upstream operators to avoid shuffle
+ .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
+ .addSink(new CleanFunction<>(conf))
+ .setParallelism(1)
+ .name("clean_commits");
+ }
+
+ public static DataStream<HoodieRecord> bootstrap(Configuration conf, RowType rowType, int defaultParallelism, DataStream<RowData> dataStream) {
+ DataStream<HoodieRecord> dataStream1 = rowDataToHoodieRecord(conf, rowType, dataStream);
+
+ if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
+ dataStream1 = dataStream1.rebalance()
+ .transform(
+ "index_bootstrap",
+ TypeInformation.of(HoodieRecord.class),
+ new ProcessOperator<>(new BootstrapFunction<>(conf)))
+ .setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(defaultParallelism))
+ .uid("uid_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME));
+ }
+
+ return dataStream1;
+ }
+
+ public static DataStream<HoodieRecord> batchBootstrap(Configuration conf, RowType rowType, int defaultParallelism, DataStream<RowData> dataStream) {
+ // shuffle and sort by partition keys
+ final String[] partitionFields = FilePathUtils.extractPartitionKeys(conf);
+ if (partitionFields.length > 0) {
+ RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType);
+ // shuffle by partition keys
+ dataStream = dataStream
+ .keyBy(rowDataKeyGen::getPartitionPath);
+ }
+
+ return rowDataToHoodieRecord(conf, rowType, dataStream)
+ .transform(
+ "batch_index_bootstrap",
+ TypeInformation.of(HoodieRecord.class),
+ new ProcessOperator<>(new BatchBootstrapFunction<>(conf)))
+ .setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(defaultParallelism))
+ .uid("uid_batch_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME));
+ }
+
+ public static DataStream<HoodieRecord> rowDataToHoodieRecord(Configuration conf, RowType rowType, DataStream<RowData> dataStream) {
+ return dataStream.map(RowDataToHoodieFunctions.create(rowType, conf), TypeInformation.of(HoodieRecord.class));
+ }
+
+ public static DataStream<Object> hoodieStreamWrite(Configuration conf, int defaultParallelism, DataStream<HoodieRecord> dataStream) {
+ StreamWriteOperatorFactory<HoodieRecord> operatorFactory = new StreamWriteOperatorFactory<>(conf);
+ return dataStream
+ // Key-by record key, to avoid multiple subtasks write to a bucket at the same time
+ .keyBy(HoodieRecord::getRecordKey)
+ .transform(
+ "bucket_assigner",
+ TypeInformation.of(HoodieRecord.class),
+ new BucketAssignOperator<>(new BucketAssignFunction<>(conf)))
+ .uid("uid_bucket_assigner_" + conf.getString(FlinkOptions.TABLE_NAME))
+ .setParallelism(conf.getOptional(FlinkOptions.BUCKET_ASSIGN_TASKS).orElse(defaultParallelism))
+ // shuffle by fileId(bucket id)
+ .keyBy(record -> record.getCurrentLocation().getFileId())
+ .transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory)
+ .uid("uid_hoodie_stream_write" + conf.getString(FlinkOptions.TABLE_NAME))
+ .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
+ }
+
+ public static DataStreamSink<CompactionCommitEvent> compact(Configuration conf, DataStream<Object> dataStream) {
+ return dataStream.transform("compact_plan_generate",
+ TypeInformation.of(CompactionPlanEvent.class),
+ new CompactionPlanOperator(conf))
+ .setParallelism(1) // plan generate must be singleton
+ .rebalance()
+ .transform("compact_task",
+ TypeInformation.of(CompactionCommitEvent.class),
+ new ProcessOperator<>(new CompactFunction(conf)))
+ .setParallelism(conf.getInteger(FlinkOptions.COMPACTION_TASKS))
+ .addSink(new CompactionCommitSink(conf))
+ .name("compact_commit")
+ .setParallelism(1); // compaction commit should be singleton
+ }
+
+ public static DataStreamSink<Object> clean(Configuration conf, DataStream<Object> dataStream) {
+ return dataStream.addSink(new CleanFunction<>(conf))
+ .setParallelism(1)
+ .name("clean_commits");
+ }
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
index ca6d33a..d49fdf1 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
@@ -21,37 +21,19 @@ package org.apache.hudi.table;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.sink.CleanFunction;
-import org.apache.hudi.sink.StreamWriteOperatorFactory;
-import org.apache.hudi.sink.bootstrap.BootstrapFunction;
-import org.apache.hudi.sink.bulk.BulkInsertWriteOperator;
-import org.apache.hudi.sink.bulk.RowDataKeyGen;
-import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
-import org.apache.hudi.sink.compact.CompactFunction;
-import org.apache.hudi.sink.compact.CompactionCommitEvent;
-import org.apache.hudi.sink.compact.CompactionCommitSink;
-import org.apache.hudi.sink.compact.CompactionPlanEvent;
-import org.apache.hudi.sink.compact.CompactionPlanOperator;
-import org.apache.hudi.sink.partitioner.BucketAssignFunction;
-import org.apache.hudi.sink.partitioner.BucketAssignOperator;
-import org.apache.hudi.sink.transform.RowDataToHoodieFunctions;
-import org.apache.hudi.table.format.FilePathUtils;
+import org.apache.hudi.sink.utils.Pipelines;
import org.apache.hudi.util.ChangelogModes;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.types.logical.RowType;
import java.util.Map;
@@ -90,90 +72,23 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
// bulk_insert mode
final String writeOperation = this.conf.get(FlinkOptions.OPERATION);
if (WriteOperationType.fromValue(writeOperation) == WriteOperationType.BULK_INSERT) {
- BulkInsertWriteOperator.OperatorFactory<RowData> operatorFactory = BulkInsertWriteOperator.getFactory(this.conf, rowType);
-
- final String[] partitionFields = FilePathUtils.extractPartitionKeys(this.conf);
- if (partitionFields.length > 0) {
- RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType);
- if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_BY_PARTITION)) {
-
- // shuffle by partition keys
- dataStream = dataStream.keyBy(rowDataKeyGen::getPartitionPath);
- }
- if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_BY_PARTITION)) {
- SortOperatorGen sortOperatorGen = new SortOperatorGen(rowType, partitionFields);
- // sort by partition keys
- dataStream = dataStream
- .transform("partition_key_sorter",
- TypeInformation.of(RowData.class),
- sortOperatorGen.createSortOperator())
- .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
- ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(),
- conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
- }
- }
- return dataStream
- .transform("hoodie_bulk_insert_write",
- TypeInformation.of(Object.class),
- operatorFactory)
- // follow the parallelism of upstream operators to avoid shuffle
- .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
- .addSink(new CleanFunction<>(conf))
- .setParallelism(1)
- .name("clean_commits");
+ return Pipelines.bulkInsert(conf, rowType, dataStream);
}
- // stream write
+ // default parallelism
int parallelism = dataStream.getExecutionConfig().getParallelism();
- StreamWriteOperatorFactory<HoodieRecord> operatorFactory = new StreamWriteOperatorFactory<>(conf);
-
- DataStream<HoodieRecord> dataStream1 = dataStream
- .map(RowDataToHoodieFunctions.create(rowType, conf), TypeInformation.of(HoodieRecord.class));
-
- // bootstrap index
- // TODO: This is a very time-consuming operation, will optimization
- if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
- dataStream1 = dataStream1.rebalance()
- .transform(
- "index_bootstrap",
- TypeInformation.of(HoodieRecord.class),
- new ProcessOperator<>(new BootstrapFunction<>(conf)))
- .setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(parallelism))
- .uid("uid_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME));
- }
+ final DataStream<HoodieRecord> dataStream1 = context.isBounded()
+ ? Pipelines.batchBootstrap(conf, rowType, parallelism, dataStream)
+ : Pipelines.bootstrap(conf, rowType, parallelism, dataStream);
+
+ // write pipeline
+ DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, dataStream1);
- DataStream<Object> pipeline = dataStream1
- // Key-by record key, to avoid multiple subtasks write to a bucket at the same time
- .keyBy(HoodieRecord::getRecordKey)
- .transform(
- "bucket_assigner",
- TypeInformation.of(HoodieRecord.class),
- new BucketAssignOperator<>(new BucketAssignFunction<>(conf)))
- .uid("uid_bucket_assigner_" + conf.getString(FlinkOptions.TABLE_NAME))
- .setParallelism(conf.getOptional(FlinkOptions.BUCKET_ASSIGN_TASKS).orElse(parallelism))
- // shuffle by fileId(bucket id)
- .keyBy(record -> record.getCurrentLocation().getFileId())
- .transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory)
- .uid("uid_hoodie_stream_write" + conf.getString(FlinkOptions.TABLE_NAME))
- .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
// compaction
if (StreamerUtil.needsAsyncCompaction(conf)) {
- return pipeline.transform("compact_plan_generate",
- TypeInformation.of(CompactionPlanEvent.class),
- new CompactionPlanOperator(conf))
- .setParallelism(1) // plan generate must be singleton
- .rebalance()
- .transform("compact_task",
- TypeInformation.of(CompactionCommitEvent.class),
- new ProcessOperator<>(new CompactFunction(conf)))
- .setParallelism(conf.getInteger(FlinkOptions.COMPACTION_TASKS))
- .addSink(new CompactionCommitSink(conf))
- .name("compact_commit")
- .setParallelism(1); // compaction commit should be singleton
+ return Pipelines.compact(conf, pipeline);
} else {
- return pipeline.addSink(new CleanFunction<>(conf))
- .setParallelism(1)
- .name("clean_commits");
+ return Pipelines.clean(conf, pipeline);
}
};
}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 0fac2cc..f89d089 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -379,4 +379,8 @@ public class StreamerUtil {
WriteOperationType operationType = WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION));
return operationType == WriteOperationType.INSERT && !conf.getBoolean(FlinkOptions.INSERT_DEDUP);
}
+
+ public static boolean haveSuccessfulCommits(HoodieTableMetaClient metaClient) {
+ return !metaClient.getCommitsTimeline().filterCompletedInstants().empty();
+ }
}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
index 0764f55..99effba 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
@@ -292,7 +292,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
}
@ParameterizedTest
- @MethodSource("configParams")
+ @MethodSource("executionModeAndPartitioningParams")
void testWriteAndRead(ExecMode execMode, boolean hiveStylePartitioning) {
TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv : streamTableEnv;
Map<String, String> options = new HashMap<>();
@@ -318,6 +318,56 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
}
@ParameterizedTest
+ @EnumSource(value = HoodieTableType.class)
+ void testBatchModeUpsertWithoutPartition(HoodieTableType tableType) {
+ TableEnvironment tableEnv = batchTableEnv;
+ Map<String, String> options = new HashMap<>();
+ options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+ options.put(FlinkOptions.TABLE_NAME.key(), tableType.name());
+ String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options, false);
+ tableEnv.executeSql(hoodieTableDDL);
+
+ execInsertSql(tableEnv, TestSQL.INSERT_T1);
+
+ List<Row> result1 = CollectionUtil.iterableToList(
+ () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+ assertRowsEquals(result1, TestData.DATA_SET_SOURCE_INSERT);
+
+ // batchMode update
+ execInsertSql(tableEnv, TestSQL.UPDATE_INSERT_T1);
+ List<Row> result2 = CollectionUtil.iterableToList(
+ () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+ assertRowsEquals(result2, TestData.DATA_SET_SOURCE_MERGED);
+ }
+
+ @ParameterizedTest
+ @MethodSource("tableTypeAndPartitioningParams")
+ void testBatchModeUpsert(HoodieTableType tableType, boolean hiveStylePartitioning) {
+ TableEnvironment tableEnv = batchTableEnv;
+ Map<String, String> options = new HashMap<>();
+ options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+ options.put(FlinkOptions.TABLE_NAME.key(), tableType.name());
+ if (hiveStylePartitioning) {
+ options.put(FlinkOptions.HIVE_STYLE_PARTITIONING.key(), "true");
+ }
+ String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
+ tableEnv.executeSql(hoodieTableDDL);
+
+ execInsertSql(tableEnv, TestSQL.INSERT_T1);
+
+ List<Row> result1 = CollectionUtil.iterableToList(
+ () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+ assertRowsEquals(result1, TestData.DATA_SET_SOURCE_INSERT);
+
+ // batchMode update
+ execInsertSql(tableEnv, TestSQL.UPDATE_INSERT_T1);
+
+ List<Row> result2 = CollectionUtil.iterableToList(
+ () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+ assertRowsEquals(result2, TestData.DATA_SET_SOURCE_MERGED);
+ }
+
+ @ParameterizedTest
@EnumSource(value = ExecMode.class)
void testWriteAndReadParMiddle(ExecMode execMode) throws Exception {
boolean streaming = execMode == ExecMode.STREAM;
@@ -436,18 +486,9 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
@EnumSource(value = ExecMode.class)
void testWriteNonPartitionedTable(ExecMode execMode) {
TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv : streamTableEnv;
- String hoodieTableDDL = "create table t1(\n"
- + " uuid varchar(20),\n"
- + " name varchar(10),\n"
- + " age int,\n"
- + " ts timestamp(3),\n"
- + " `partition` varchar(20),\n"
- + " PRIMARY KEY(uuid) NOT ENFORCED\n"
- + ")\n"
- + "with (\n"
- + " 'connector' = 'hudi',\n"
- + " 'path' = '" + tempFile.getAbsolutePath() + "'\n"
- + ")";
+ Map<String, String> options = new HashMap<>();
+ options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+ String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options, false);
tableEnv.executeSql(hoodieTableDDL);
final String insertInto1 = "insert into t1 values\n"
@@ -627,19 +668,10 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
@Test
void testBulkInsertNonPartitionedTable() {
TableEnvironment tableEnv = batchTableEnv;
- String hoodieTableDDL = "create table t1(\n"
- + " uuid varchar(20),\n"
- + " name varchar(10),\n"
- + " age int,\n"
- + " ts timestamp(3),\n"
- + " `partition` varchar(20),\n"
- + " PRIMARY KEY(uuid) NOT ENFORCED\n"
- + ")\n"
- + "with (\n"
- + " 'connector' = 'hudi',\n"
- + " 'path' = '" + tempFile.getAbsolutePath() + "',\n"
- + " 'write.operation' = 'bulk_insert'\n"
- + ")";
+ Map<String, String> options = new HashMap<>();
+ options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+ options.put(FlinkOptions.OPERATION.key(), "bulk_insert");
+ String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options, false);
tableEnv.executeSql(hoodieTableDDL);
final String insertInto1 = "insert into t1 values\n"
@@ -675,7 +707,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
/**
* Return test params => (execution mode, hive style partitioning).
*/
- private static Stream<Arguments> configParams() {
+ private static Stream<Arguments> executionModeAndPartitioningParams() {
Object[][] data =
new Object[][] {
{ExecMode.BATCH, false},
@@ -685,6 +717,19 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
return Stream.of(data).map(Arguments::of);
}
+ /**
+ * Return test params => (HoodieTableType, hive style partitioning).
+ */
+ private static Stream<Arguments> tableTypeAndPartitioningParams() {
+ Object[][] data =
+ new Object[][] {
+ {HoodieTableType.COPY_ON_WRITE, false},
+ {HoodieTableType.COPY_ON_WRITE, true},
+ {HoodieTableType.MERGE_ON_READ, false},
+ {HoodieTableType.MERGE_ON_READ, true}};
+ return Stream.of(data).map(Arguments::of);
+ }
+
private void execInsertSql(TableEnvironment tEnv, String insert) {
TableResult tableResult = tEnv.executeSql(insert);
// wait to finish
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
index a7e38c0..f9db04c 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
@@ -56,18 +56,24 @@ public class TestConfigurations {
.build();
public static String getCreateHoodieTableDDL(String tableName, Map<String, String> options) {
- String createTable = "create table " + tableName + "(\n"
+ return getCreateHoodieTableDDL(tableName, options, true);
+ }
+
+ public static String getCreateHoodieTableDDL(String tableName, Map<String, String> options, boolean havePartition) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("create table " + tableName + "(\n"
+ " uuid varchar(20),\n"
+ " name varchar(10),\n"
+ " age int,\n"
+ " ts timestamp(3),\n"
+ " `partition` varchar(20),\n"
+ " PRIMARY KEY(uuid) NOT ENFORCED\n"
- + ")\n"
- + "PARTITIONED BY (`partition`)\n"
- + "with (\n"
- + " 'connector' = 'hudi'";
- StringBuilder builder = new StringBuilder(createTable);
+ + ")\n");
+ if (havePartition) {
+ builder.append("PARTITIONED BY (`partition`)\n");
+ }
+ builder.append("with (\n"
+ + " 'connector' = 'hudi'");
options.forEach((k, v) -> builder.append(",\n")
.append(" '").append(k).append("' = '").append(v).append("'"));
builder.append("\n)");
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java
index 8fb15ed..6b0b71c 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java
@@ -33,4 +33,14 @@ public class TestSQL {
+ "('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n"
+ "('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n"
+ "('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')";
+
+ public static final String UPDATE_INSERT_T1 = "insert into t1 values\n"
+ + "('id1','Danny',24,TIMESTAMP '1970-01-01 00:00:01','par1'),\n"
+ + "('id2','Stephen',34,TIMESTAMP '1970-01-01 00:00:02','par1'),\n"
+ + "('id3','Julian',54,TIMESTAMP '1970-01-01 00:00:03','par2'),\n"
+ + "('id4','Fabian',32,TIMESTAMP '1970-01-01 00:00:04','par2'),\n"
+ + "('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n"
+ + "('id9','Jane',19,TIMESTAMP '1970-01-01 00:00:06','par3'),\n"
+ + "('id10','Ella',38,TIMESTAMP '1970-01-01 00:00:07','par4'),\n"
+ + "('id11','Phoebe',52,TIMESTAMP '1970-01-01 00:00:08','par4')";
}