You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2023/02/07 03:35:41 UTC
[hudi] branch master updated: [HUDI-5651] Sort the inputs by record keys for bulk insert tasks (#7795)
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 2c38af5e627 [HUDI-5651] Sort the inputs by record keys for bulk insert tasks (#7795)
2c38af5e627 is described below
commit 2c38af5e6276fda914025fef63e5cc7da76f784c
Author: vortual <10...@qq.com>
AuthorDate: Tue Feb 7 11:35:34 2023 +0800
[HUDI-5651] Sort the inputs by record keys for bulk insert tasks (#7795)
---
.../apache/hudi/configuration/FlinkOptions.java | 6 +++++
.../java/org/apache/hudi/sink/utils/Pipelines.java | 18 ++++++++++++---
.../apache/hudi/table/ITTestHoodieDataSource.java | 26 ++++++++++++++++++++++
3 files changed, 47 insertions(+), 3 deletions(-)
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 04678947aee..e3ca60d1405 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -549,6 +549,12 @@ public class FlinkOptions extends HoodieConfig {
.defaultValue(true)
.withDescription("Whether to sort the inputs by specific fields for bulk insert tasks, default true");
+ public static final ConfigOption<Boolean> WRITE_BULK_INSERT_SORT_INPUT_BY_RECORD_KEY = ConfigOptions
+ .key("write.bulk_insert.sort_input.by_record_key")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether to sort the inputs by record keys for bulk insert tasks, default false");
+
public static final ConfigOption<Integer> WRITE_SORT_MEMORY = ConfigOptions
.key("write.sort.memory")
.intType()
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index d17213dcc04..2dbaeafbc59 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -62,6 +62,9 @@ import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
@@ -139,10 +142,19 @@ public class Pipelines {
dataStream = dataStream.partitionCustom(partitioner, rowDataKeyGen::getPartitionPath);
}
if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT)) {
- SortOperatorGen sortOperatorGen = new SortOperatorGen(rowType, partitionFields);
- // sort by partition keys
+ String[] sortFields = partitionFields;
+ String operatorName = "sorter:(partition_key)";
+ if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT_BY_RECORD_KEY)) {
+ String[] recordKeyFields = conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(",");
+ ArrayList<String> sortList = new ArrayList<>(Arrays.asList(partitionFields));
+ Collections.addAll(sortList, recordKeyFields);
+ sortFields = sortList.toArray(new String[0]);
+ operatorName = "sorter:(partition_key, record_key)";
+ }
+ SortOperatorGen sortOperatorGen = new SortOperatorGen(rowType, sortFields);
+ // sort by partition keys or (partition keys and record keys)
dataStream = dataStream
- .transform("partition_key_sorter",
+ .transform(operatorName,
InternalTypeInfo.of(rowType),
sortOperatorGen.createSortOperator())
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index d733965b2eb..514a06b1103 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -1066,6 +1066,32 @@ public class ITTestHoodieDataSource {
+ "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
}
+ @Test
+ void testBulkInsertWithSortByRecordKey() {
+ TableEnvironment tableEnv = batchTableEnv;
+
+ String hoodieTableDDL = sql("t1")
+ .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .option(FlinkOptions.OPERATION, "bulk_insert")
+ .option(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_INPUT, true)
+ .option(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT, true)
+ .option(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT_BY_RECORD_KEY, true)
+ .end();
+ tableEnv.executeSql(hoodieTableDDL);
+
+ final String insertInto = "insert into t1 values\n"
+ + "('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n"
+ + "('id1','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par1')";
+
+ execInsertSql(tableEnv, insertInto);
+
+ List<Row> result = CollectionUtil.iterableToList(
+ () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+ assertRowsEquals(result, "["
+ + "+I[id1, Julian, 53, 1970-01-01T00:00:03, par1], "
+ + "+I[id2, Stephen, 33, 1970-01-01T00:00:02, par1]]", 4);
+ }
+
@Test
void testBulkInsertNonPartitionedTable() {
TableEnvironment tableEnv = batchTableEnv;