You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/03/27 01:14:35 UTC
[hudi] branch master updated: [HUDI-3716] OOM occurred when use bulk_insert cow table with flink BUCKET index (#5135)
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 4d940bb [HUDI-3716] OOM occurred when use bulk_insert cow table with flink BUCKET index (#5135)
4d940bb is described below
commit 4d940bbf8a934087e606e2a9a9ba698a0911caa3
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Sun Mar 27 09:13:58 2022 +0800
[HUDI-3716] OOM occurred when use bulk_insert cow table with flink BUCKET index (#5135)
---
.../apache/hudi/configuration/FlinkOptions.java | 12 ++---
.../sink/bucket/BucketBulkInsertWriterHelper.java | 56 ++++++++++++++++++----
.../hudi/sink/bulk/BulkInsertWriterHelper.java | 8 ++--
.../java/org/apache/hudi/sink/utils/Pipelines.java | 19 ++++++--
.../apache/hudi/table/ITTestHoodieDataSource.java | 2 +-
5 files changed, 73 insertions(+), 24 deletions(-)
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index cd5c2a7..b1e0087 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -459,17 +459,17 @@ public class FlinkOptions extends HoodieConfig {
.withDescription("Timeout limit for a writer task after it finishes a checkpoint and\n"
+ "waits for the instant commit success, only for internal use");
- public static final ConfigOption<Boolean> WRITE_BULK_INSERT_SHUFFLE_BY_PARTITION = ConfigOptions
- .key("write.bulk_insert.shuffle_by_partition")
+ public static final ConfigOption<Boolean> WRITE_BULK_INSERT_SHUFFLE_INPUT = ConfigOptions
+ .key("write.bulk_insert.shuffle_input")
.booleanType()
.defaultValue(true)
- .withDescription("Whether to shuffle the inputs by partition path for bulk insert tasks, default true");
+ .withDescription("Whether to shuffle the inputs by specific fields for bulk insert tasks, default true");
- public static final ConfigOption<Boolean> WRITE_BULK_INSERT_SORT_BY_PARTITION = ConfigOptions
- .key("write.bulk_insert.sort_by_partition")
+ public static final ConfigOption<Boolean> WRITE_BULK_INSERT_SORT_INPUT = ConfigOptions
+ .key("write.bulk_insert.sort_input")
.booleanType()
.defaultValue(true)
- .withDescription("Whether to sort the inputs by partition path for bulk insert tasks, default true");
+ .withDescription("Whether to sort the inputs by specific fields for bulk insert tasks, default true");
public static final ConfigOption<Integer> WRITE_SORT_MEMORY = ConfigOptions
.key("write.sort.memory")
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
index c52b6f0..ae646d9 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
@@ -19,14 +19,19 @@
package org.apache.hudi.sink.bucket;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.index.bucket.BucketIdentifier;
import org.apache.hudi.io.storage.row.HoodieRowDataCreateHandle;
import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
+import org.apache.hudi.sink.bulk.RowDataKeyGen;
+import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
import org.apache.hudi.table.HoodieTable;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,36 +43,67 @@ import java.io.IOException;
*/
public class BucketBulkInsertWriterHelper extends BulkInsertWriterHelper {
private static final Logger LOG = LoggerFactory.getLogger(BucketBulkInsertWriterHelper.class);
+ public static final String FILE_GROUP_META_FIELD = "_fg";
- private final int bucketNum;
- private final String indexKeyFields;
+ private final int recordArity;
+
+ private String lastFileId; // for efficient code path
public BucketBulkInsertWriterHelper(Configuration conf, HoodieTable<?, ?, ?, ?> hoodieTable, HoodieWriteConfig writeConfig,
String instantTime, int taskPartitionId, long taskId, long taskEpochId, RowType rowType) {
super(conf, hoodieTable, writeConfig, instantTime, taskPartitionId, taskId, taskEpochId, rowType);
- this.bucketNum = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
- this.indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD);
+ this.recordArity = rowType.getFieldCount();
}
- public void write(RowData record) throws IOException {
+ public void write(RowData tuple) throws IOException {
try {
+ RowData record = tuple.getRow(1, this.recordArity);
String recordKey = keyGen.getRecordKey(record);
String partitionPath = keyGen.getPartitionPath(record);
- final int bucketNum = BucketIdentifier.getBucketId(recordKey, indexKeyFields, this.bucketNum);
- String fileId = BucketIdentifier.newBucketFileIdPrefix(bucketNum);
- getRowCreateHandle(partitionPath, fileId).write(recordKey, partitionPath, record);
+ String fileId = tuple.getString(0).toString();
+ if ((lastFileId == null) || !lastFileId.equals(fileId)) {
+ LOG.info("Creating new file for partition path " + partitionPath);
+ handle = getRowCreateHandle(partitionPath, fileId);
+ lastFileId = fileId;
+ }
+ handle.write(recordKey, partitionPath, record);
} catch (Throwable throwable) {
LOG.error("Global error thrown while trying to write records in HoodieRowDataCreateHandle", throwable);
throw throwable;
}
}
- private HoodieRowDataCreateHandle getRowCreateHandle(String partitionPath, String fileId) {
+ private HoodieRowDataCreateHandle getRowCreateHandle(String partitionPath, String fileId) throws IOException {
if (!handles.containsKey(fileId)) { // if there is no handle corresponding to the fileId
+ if (this.isInputSorted) {
+ // if records are sorted, we can close all existing handles
+ close();
+ }
HoodieRowDataCreateHandle rowCreateHandle = new HoodieRowDataCreateHandle(hoodieTable, writeConfig, partitionPath, fileId,
instantTime, taskPartitionId, taskId, taskEpochId, rowType);
handles.put(fileId, rowCreateHandle);
}
return handles.get(fileId);
}
+
+ public static SortOperatorGen getFileIdSorterGen(RowType rowType) {
+ return new SortOperatorGen(rowType, new String[] {FILE_GROUP_META_FIELD});
+ }
+
+ private static String getFileId(RowDataKeyGen keyGen, RowData record, String indexKeyFields, int bucketNum) {
+ String recordKey = keyGen.getRecordKey(record);
+ final int bucketId = BucketIdentifier.getBucketId(recordKey, indexKeyFields, bucketNum);
+ return BucketIdentifier.newBucketFileIdPrefix(bucketId);
+ }
+
+ public static RowData rowWithFileId(RowDataKeyGen keyGen, RowData record, String indexKeyFields, int bucketNum) {
+ final String fileId = getFileId(keyGen, record, indexKeyFields, bucketNum);
+ return GenericRowData.of(StringData.fromString(fileId), record);
+ }
+
+ public static RowType rowTypeWithFileId(RowType rowType) {
+ LogicalType[] types = new LogicalType[] {DataTypes.STRING().getLogicalType(), rowType};
+ String[] names = new String[] {FILE_GROUP_META_FIELD, "record"};
+ return RowType.of(types, names);
+ }
}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
index 013595a..4e1d189 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
@@ -57,9 +57,9 @@ public class BulkInsertWriterHelper {
protected final HoodieTable hoodieTable;
protected final HoodieWriteConfig writeConfig;
protected final RowType rowType;
- private final Boolean arePartitionRecordsSorted;
+ protected final Boolean isInputSorted;
private final List<HoodieInternalWriteStatus> writeStatusList = new ArrayList<>();
- private HoodieRowDataCreateHandle handle;
+ protected HoodieRowDataCreateHandle handle;
private String lastKnownPartitionPath = null;
private final String fileIdPrefix;
private int numFilesWritten = 0;
@@ -75,7 +75,7 @@ public class BulkInsertWriterHelper {
this.taskId = taskId;
this.taskEpochId = taskEpochId;
this.rowType = addMetadataFields(rowType, writeConfig.allowOperationMetadataField()); // patch up with metadata fields
- this.arePartitionRecordsSorted = conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_BY_PARTITION);
+ this.isInputSorted = conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT);
this.fileIdPrefix = UUID.randomUUID().toString();
this.keyGen = RowDataKeyGen.instance(conf, rowType);
}
@@ -112,7 +112,7 @@ public class BulkInsertWriterHelper {
private HoodieRowDataCreateHandle getRowCreateHandle(String partitionPath) throws IOException {
if (!handles.containsKey(partitionPath)) { // if there is no handle corresponding to the partition path
// if records are sorted, we can close all existing handles
- if (arePartitionRecordsSorted) {
+ if (isInputSorted) {
close();
}
HoodieRowDataCreateHandle rowCreateHandle = new HoodieRowDataCreateHandle(hoodieTable, writeConfig, partitionPath, getNextFileId(),
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index 4930dbe..1992edd 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -26,6 +26,7 @@ import org.apache.hudi.sink.StreamWriteOperator;
import org.apache.hudi.sink.append.AppendWriteOperator;
import org.apache.hudi.sink.bootstrap.BootstrapOperator;
import org.apache.hudi.sink.bootstrap.batch.BatchBootstrapOperator;
+import org.apache.hudi.sink.bucket.BucketBulkInsertWriterHelper;
import org.apache.hudi.sink.bucket.BucketStreamWriteOperator;
import org.apache.hudi.sink.bulk.BulkInsertWriteOperator;
import org.apache.hudi.sink.bulk.RowDataKeyGen;
@@ -53,6 +54,7 @@ import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
/**
@@ -92,7 +94,18 @@ public class Pipelines {
String indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD);
BucketIndexPartitioner<String> partitioner = new BucketIndexPartitioner<>(bucketNum, indexKeyFields);
RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType);
- return dataStream.partitionCustom(partitioner, rowDataKeyGen::getRecordKey)
+ RowType rowTypeWithFileId = BucketBulkInsertWriterHelper.rowTypeWithFileId(rowType);
+ InternalTypeInfo<RowData> typeInfo = InternalTypeInfo.of(rowTypeWithFileId);
+ dataStream = dataStream.partitionCustom(partitioner, rowDataKeyGen::getRecordKey)
+ .map(record -> BucketBulkInsertWriterHelper.rowWithFileId(rowDataKeyGen, record, indexKeyFields, bucketNum),
+ typeInfo);
+ if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT)) {
+ SortOperatorGen sortOperatorGen = BucketBulkInsertWriterHelper.getFileIdSorterGen(rowTypeWithFileId);
+ dataStream = dataStream.transform("file_sorter", typeInfo, sortOperatorGen.createSortOperator());
+ ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(),
+ conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
+ }
+ return dataStream
.transform("bucket_bulk_insert", TypeInformation.of(Object.class), operatorFactory)
.uid("uid_bucket_bulk_insert" + conf.getString(FlinkOptions.TABLE_NAME))
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
@@ -103,7 +116,7 @@ public class Pipelines {
final String[] partitionFields = FilePathUtils.extractPartitionKeys(conf);
if (partitionFields.length > 0) {
RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType);
- if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_BY_PARTITION)) {
+ if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_INPUT)) {
// shuffle by partition keys
// use #partitionCustom instead of #keyBy to avoid duplicate sort operations,
@@ -112,7 +125,7 @@ public class Pipelines {
KeyGroupRangeAssignment.assignKeyToParallelOperator(key, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM, channels);
dataStream = dataStream.partitionCustom(partitioner, rowDataKeyGen::getPartitionPath);
}
- if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_BY_PARTITION)) {
+ if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT)) {
SortOperatorGen sortOperatorGen = new SortOperatorGen(rowType, partitionFields);
// sort by partition keys
dataStream = dataStream
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 1ef7157..8802cac 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -916,7 +916,7 @@ public class ITTestHoodieDataSource extends AbstractTestBase {
String hoodieTableDDL = sql("hoodie_sink")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.OPERATION, "bulk_insert")
- .option(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_BY_PARTITION, true)
+ .option(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_INPUT, true)
.option(FlinkOptions.INDEX_TYPE, indexType)
.option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning)
.end();