You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/03/27 01:14:35 UTC

[hudi] branch master updated: [HUDI-3716] OOM occurred when use bulk_insert cow table with flink BUCKET index (#5135)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d940bb  [HUDI-3716] OOM occurred when use bulk_insert cow table with flink BUCKET index (#5135)
4d940bb is described below

commit 4d940bbf8a934087e606e2a9a9ba698a0911caa3
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Sun Mar 27 09:13:58 2022 +0800

    [HUDI-3716] OOM occurred when use bulk_insert cow table with flink BUCKET index (#5135)
---
 .../apache/hudi/configuration/FlinkOptions.java    | 12 ++---
 .../sink/bucket/BucketBulkInsertWriterHelper.java  | 56 ++++++++++++++++++----
 .../hudi/sink/bulk/BulkInsertWriterHelper.java     |  8 ++--
 .../java/org/apache/hudi/sink/utils/Pipelines.java | 19 ++++++--
 .../apache/hudi/table/ITTestHoodieDataSource.java  |  2 +-
 5 files changed, 73 insertions(+), 24 deletions(-)

diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index cd5c2a7..b1e0087 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -459,17 +459,17 @@ public class FlinkOptions extends HoodieConfig {
       .withDescription("Timeout limit for a writer task after it finishes a checkpoint and\n"
           + "waits for the instant commit success, only for internal use");
 
-  public static final ConfigOption<Boolean> WRITE_BULK_INSERT_SHUFFLE_BY_PARTITION = ConfigOptions
-      .key("write.bulk_insert.shuffle_by_partition")
+  public static final ConfigOption<Boolean> WRITE_BULK_INSERT_SHUFFLE_INPUT = ConfigOptions
+      .key("write.bulk_insert.shuffle_input")
       .booleanType()
       .defaultValue(true)
-      .withDescription("Whether to shuffle the inputs by partition path for bulk insert tasks, default true");
+      .withDescription("Whether to shuffle the inputs by specific fields for bulk insert tasks, default true");
 
-  public static final ConfigOption<Boolean> WRITE_BULK_INSERT_SORT_BY_PARTITION = ConfigOptions
-      .key("write.bulk_insert.sort_by_partition")
+  public static final ConfigOption<Boolean> WRITE_BULK_INSERT_SORT_INPUT = ConfigOptions
+      .key("write.bulk_insert.sort_input")
       .booleanType()
       .defaultValue(true)
-      .withDescription("Whether to sort the inputs by partition path for bulk insert tasks, default true");
+      .withDescription("Whether to sort the inputs by specific fields for bulk insert tasks, default true");
 
   public static final ConfigOption<Integer> WRITE_SORT_MEMORY = ConfigOptions
       .key("write.sort.memory")
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
index c52b6f0..ae646d9 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
@@ -19,14 +19,19 @@
 package org.apache.hudi.sink.bucket;
 
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.index.bucket.BucketIdentifier;
 import org.apache.hudi.io.storage.row.HoodieRowDataCreateHandle;
 import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
+import org.apache.hudi.sink.bulk.RowDataKeyGen;
+import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
 import org.apache.hudi.table.HoodieTable;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,36 +43,67 @@ import java.io.IOException;
  */
 public class BucketBulkInsertWriterHelper extends BulkInsertWriterHelper {
   private static final Logger LOG = LoggerFactory.getLogger(BucketBulkInsertWriterHelper.class);
+  public static final String FILE_GROUP_META_FIELD = "_fg";
 
-  private final int bucketNum;
-  private final String indexKeyFields;
+  private final int recordArity;
+
+  private String lastFileId; // for efficient code path
 
   public BucketBulkInsertWriterHelper(Configuration conf, HoodieTable<?, ?, ?, ?> hoodieTable, HoodieWriteConfig writeConfig,
                                       String instantTime, int taskPartitionId, long taskId, long taskEpochId, RowType rowType) {
     super(conf, hoodieTable, writeConfig, instantTime, taskPartitionId, taskId, taskEpochId, rowType);
-    this.bucketNum = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
-    this.indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD);
+    this.recordArity = rowType.getFieldCount();
   }
 
-  public void write(RowData record) throws IOException {
+  public void write(RowData tuple) throws IOException {
     try {
+      RowData record = tuple.getRow(1, this.recordArity);
       String recordKey = keyGen.getRecordKey(record);
       String partitionPath = keyGen.getPartitionPath(record);
-      final int bucketNum = BucketIdentifier.getBucketId(recordKey, indexKeyFields, this.bucketNum);
-      String fileId = BucketIdentifier.newBucketFileIdPrefix(bucketNum);
-      getRowCreateHandle(partitionPath, fileId).write(recordKey, partitionPath, record);
+      String fileId = tuple.getString(0).toString();
+      if ((lastFileId == null) || !lastFileId.equals(fileId)) {
+        LOG.info("Creating new file for partition path " + partitionPath);
+        handle = getRowCreateHandle(partitionPath, fileId);
+        lastFileId = fileId;
+      }
+      handle.write(recordKey, partitionPath, record);
     } catch (Throwable throwable) {
       LOG.error("Global error thrown while trying to write records in HoodieRowDataCreateHandle", throwable);
       throw throwable;
     }
   }
 
-  private HoodieRowDataCreateHandle getRowCreateHandle(String partitionPath, String fileId) {
+  private HoodieRowDataCreateHandle getRowCreateHandle(String partitionPath, String fileId) throws IOException {
     if (!handles.containsKey(fileId)) { // if there is no handle corresponding to the fileId
+      if (this.isInputSorted) {
+        // if records are sorted, we can close all existing handles
+        close();
+      }
       HoodieRowDataCreateHandle rowCreateHandle = new HoodieRowDataCreateHandle(hoodieTable, writeConfig, partitionPath, fileId,
           instantTime, taskPartitionId, taskId, taskEpochId, rowType);
       handles.put(fileId, rowCreateHandle);
     }
     return handles.get(fileId);
   }
+
+  public static SortOperatorGen getFileIdSorterGen(RowType rowType) {
+    return new SortOperatorGen(rowType, new String[] {FILE_GROUP_META_FIELD});
+  }
+
+  private static String getFileId(RowDataKeyGen keyGen, RowData record, String indexKeyFields, int bucketNum) {
+    String recordKey = keyGen.getRecordKey(record);
+    final int bucketId = BucketIdentifier.getBucketId(recordKey, indexKeyFields, bucketNum);
+    return BucketIdentifier.newBucketFileIdPrefix(bucketId);
+  }
+
+  public static RowData rowWithFileId(RowDataKeyGen keyGen, RowData record, String indexKeyFields, int bucketNum) {
+    final String fileId = getFileId(keyGen, record, indexKeyFields, bucketNum);
+    return GenericRowData.of(StringData.fromString(fileId), record);
+  }
+
+  public static RowType rowTypeWithFileId(RowType rowType) {
+    LogicalType[] types = new LogicalType[] {DataTypes.STRING().getLogicalType(), rowType};
+    String[] names = new String[] {FILE_GROUP_META_FIELD, "record"};
+    return RowType.of(types, names);
+  }
 }
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
index 013595a..4e1d189 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
@@ -57,9 +57,9 @@ public class BulkInsertWriterHelper {
   protected final HoodieTable hoodieTable;
   protected final HoodieWriteConfig writeConfig;
   protected final RowType rowType;
-  private final Boolean arePartitionRecordsSorted;
+  protected final Boolean isInputSorted;
   private final List<HoodieInternalWriteStatus> writeStatusList = new ArrayList<>();
-  private HoodieRowDataCreateHandle handle;
+  protected HoodieRowDataCreateHandle handle;
   private String lastKnownPartitionPath = null;
   private final String fileIdPrefix;
   private int numFilesWritten = 0;
@@ -75,7 +75,7 @@ public class BulkInsertWriterHelper {
     this.taskId = taskId;
     this.taskEpochId = taskEpochId;
     this.rowType = addMetadataFields(rowType, writeConfig.allowOperationMetadataField()); // patch up with metadata fields
-    this.arePartitionRecordsSorted = conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_BY_PARTITION);
+    this.isInputSorted = conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT);
     this.fileIdPrefix = UUID.randomUUID().toString();
     this.keyGen = RowDataKeyGen.instance(conf, rowType);
   }
@@ -112,7 +112,7 @@ public class BulkInsertWriterHelper {
   private HoodieRowDataCreateHandle getRowCreateHandle(String partitionPath) throws IOException {
     if (!handles.containsKey(partitionPath)) { // if there is no handle corresponding to the partition path
       // if records are sorted, we can close all existing handles
-      if (arePartitionRecordsSorted) {
+      if (isInputSorted) {
         close();
       }
       HoodieRowDataCreateHandle rowCreateHandle = new HoodieRowDataCreateHandle(hoodieTable, writeConfig, partitionPath, getNextFileId(),
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index 4930dbe..1992edd 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -26,6 +26,7 @@ import org.apache.hudi.sink.StreamWriteOperator;
 import org.apache.hudi.sink.append.AppendWriteOperator;
 import org.apache.hudi.sink.bootstrap.BootstrapOperator;
 import org.apache.hudi.sink.bootstrap.batch.BatchBootstrapOperator;
+import org.apache.hudi.sink.bucket.BucketBulkInsertWriterHelper;
 import org.apache.hudi.sink.bucket.BucketStreamWriteOperator;
 import org.apache.hudi.sink.bulk.BulkInsertWriteOperator;
 import org.apache.hudi.sink.bulk.RowDataKeyGen;
@@ -53,6 +54,7 @@ import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
 import org.apache.flink.streaming.api.operators.ProcessOperator;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.logical.RowType;
 
 /**
@@ -92,7 +94,18 @@ public class Pipelines {
       String indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD);
       BucketIndexPartitioner<String> partitioner = new BucketIndexPartitioner<>(bucketNum, indexKeyFields);
       RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType);
-      return dataStream.partitionCustom(partitioner, rowDataKeyGen::getRecordKey)
+      RowType rowTypeWithFileId = BucketBulkInsertWriterHelper.rowTypeWithFileId(rowType);
+      InternalTypeInfo<RowData> typeInfo = InternalTypeInfo.of(rowTypeWithFileId);
+      dataStream = dataStream.partitionCustom(partitioner, rowDataKeyGen::getRecordKey)
+          .map(record -> BucketBulkInsertWriterHelper.rowWithFileId(rowDataKeyGen, record, indexKeyFields, bucketNum),
+              typeInfo);
+      if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT)) {
+        SortOperatorGen sortOperatorGen = BucketBulkInsertWriterHelper.getFileIdSorterGen(rowTypeWithFileId);
+        dataStream = dataStream.transform("file_sorter", typeInfo, sortOperatorGen.createSortOperator());
+        ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(),
+            conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
+      }
+      return dataStream
           .transform("bucket_bulk_insert", TypeInformation.of(Object.class), operatorFactory)
           .uid("uid_bucket_bulk_insert" + conf.getString(FlinkOptions.TABLE_NAME))
           .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
@@ -103,7 +116,7 @@ public class Pipelines {
     final String[] partitionFields = FilePathUtils.extractPartitionKeys(conf);
     if (partitionFields.length > 0) {
       RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType);
-      if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_BY_PARTITION)) {
+      if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_INPUT)) {
 
         // shuffle by partition keys
         // use #partitionCustom instead of #keyBy to avoid duplicate sort operations,
@@ -112,7 +125,7 @@ public class Pipelines {
             KeyGroupRangeAssignment.assignKeyToParallelOperator(key, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM, channels);
         dataStream = dataStream.partitionCustom(partitioner, rowDataKeyGen::getPartitionPath);
       }
-      if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_BY_PARTITION)) {
+      if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT)) {
         SortOperatorGen sortOperatorGen = new SortOperatorGen(rowType, partitionFields);
         // sort by partition keys
         dataStream = dataStream
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 1ef7157..8802cac 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -916,7 +916,7 @@ public class ITTestHoodieDataSource extends AbstractTestBase {
     String hoodieTableDDL = sql("hoodie_sink")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
         .option(FlinkOptions.OPERATION, "bulk_insert")
-        .option(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_BY_PARTITION, true)
+        .option(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_INPUT, true)
         .option(FlinkOptions.INDEX_TYPE, indexType)
         .option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning)
         .end();