You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2023/02/07 03:35:41 UTC

[hudi] branch master updated: [HUDI-5651] Sort the inputs by record keys for bulk insert tasks (#7795)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c38af5e627 [HUDI-5651] Sort the inputs by record keys for bulk insert tasks (#7795)
2c38af5e627 is described below

commit 2c38af5e6276fda914025fef63e5cc7da76f784c
Author: vortual <10...@qq.com>
AuthorDate: Tue Feb 7 11:35:34 2023 +0800

    [HUDI-5651] Sort the inputs by record keys for bulk insert tasks (#7795)
---
 .../apache/hudi/configuration/FlinkOptions.java    |  6 +++++
 .../java/org/apache/hudi/sink/utils/Pipelines.java | 18 ++++++++++++---
 .../apache/hudi/table/ITTestHoodieDataSource.java  | 26 ++++++++++++++++++++++
 3 files changed, 47 insertions(+), 3 deletions(-)

diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 04678947aee..e3ca60d1405 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -549,6 +549,12 @@ public class FlinkOptions extends HoodieConfig {
       .defaultValue(true)
       .withDescription("Whether to sort the inputs by specific fields for bulk insert tasks, default true");
 
+  public static final ConfigOption<Boolean> WRITE_BULK_INSERT_SORT_INPUT_BY_RECORD_KEY = ConfigOptions
+          .key("write.bulk_insert.sort_input.by_record_key")
+          .booleanType()
+          .defaultValue(false)
+          .withDescription("Whether to sort the inputs by record keys for bulk insert tasks, default false");
+
   public static final ConfigOption<Integer> WRITE_SORT_MEMORY = ConfigOptions
       .key("write.sort.memory")
       .intType()
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index d17213dcc04..2dbaeafbc59 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -62,6 +62,9 @@ import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.logical.RowType;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -139,10 +142,19 @@ public class Pipelines {
         dataStream = dataStream.partitionCustom(partitioner, rowDataKeyGen::getPartitionPath);
       }
       if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT)) {
-        SortOperatorGen sortOperatorGen = new SortOperatorGen(rowType, partitionFields);
-        // sort by partition keys
+        String[] sortFields = partitionFields;
+        String operatorName = "sorter:(partition_key)";
+        if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT_BY_RECORD_KEY)) {
+          String[] recordKeyFields = conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(",");
+          ArrayList<String> sortList = new ArrayList<>(Arrays.asList(partitionFields));
+          Collections.addAll(sortList, recordKeyFields);
+          sortFields = sortList.toArray(new String[0]);
+          operatorName = "sorter:(partition_key, record_key)";
+        }
+        SortOperatorGen sortOperatorGen = new SortOperatorGen(rowType, sortFields);
+        // sort by partition keys or (partition keys and record keys)
         dataStream = dataStream
-            .transform("partition_key_sorter",
+            .transform(operatorName,
                 InternalTypeInfo.of(rowType),
                 sortOperatorGen.createSortOperator())
             .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index d733965b2eb..514a06b1103 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -1066,6 +1066,32 @@ public class ITTestHoodieDataSource {
         + "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
   }
 
+  @Test
+  void testBulkInsertWithSortByRecordKey() {
+    TableEnvironment tableEnv = batchTableEnv;
+
+    String hoodieTableDDL = sql("t1")
+        .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .option(FlinkOptions.OPERATION, "bulk_insert")
+        .option(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_INPUT, true)
+        .option(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT, true)
+        .option(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT_BY_RECORD_KEY, true)
+        .end();
+    tableEnv.executeSql(hoodieTableDDL);
+
+    final String insertInto = "insert into t1 values\n"
+        + "('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n"
+        + "('id1','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par1')";
+
+    execInsertSql(tableEnv, insertInto);
+
+    List<Row> result = CollectionUtil.iterableToList(
+        () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+    assertRowsEquals(result, "["
+        + "+I[id1, Julian, 53, 1970-01-01T00:00:03, par1], "
+        + "+I[id2, Stephen, 33, 1970-01-01T00:00:02, par1]]", 4);
+  }
+
   @Test
   void testBulkInsertNonPartitionedTable() {
     TableEnvironment tableEnv = batchTableEnv;