You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2021/08/27 02:14:35 UTC
[hudi] branch master updated: [HUDI-2229] Refact
HoodieFlinkStreamer to reuse the pipeline of HoodieTableSink (#3495)
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 9850e90 [HUDI-2229] Refact HoodieFlinkStreamer to reuse the pipeline of HoodieTableSink (#3495)
9850e90 is described below
commit 9850e90e2e0fa602a5346911810bd65c5f3e0ea8
Author: mikewu <xi...@gmail.com>
AuthorDate: Fri Aug 27 10:14:04 2021 +0800
[HUDI-2229] Refact HoodieFlinkStreamer to reuse the pipeline of HoodieTableSink (#3495)
Co-authored-by: mikewu <xi...@alibaba-inc.com>
---
.../apache/hudi/streamer/HoodieFlinkStreamer.java | 64 ++----------------
.../org/apache/hudi/table/HoodieTableSink.java | 7 +-
.../org/apache/hudi/sink/StreamWriteITCase.java | 79 ++++------------------
3 files changed, 19 insertions(+), 131 deletions(-)
diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
index 3d3f804..077633e 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
@@ -21,30 +21,18 @@ package org.apache.hudi.streamer;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.sink.CleanFunction;
-import org.apache.hudi.sink.StreamWriteOperatorFactory;
-import org.apache.hudi.sink.bootstrap.BootstrapOperator;
-import org.apache.hudi.sink.compact.CompactFunction;
-import org.apache.hudi.sink.compact.CompactionCommitEvent;
-import org.apache.hudi.sink.compact.CompactionCommitSink;
-import org.apache.hudi.sink.compact.CompactionPlanEvent;
-import org.apache.hudi.sink.compact.CompactionPlanOperator;
-import org.apache.hudi.sink.partitioner.BucketAssignFunction;
-import org.apache.hudi.sink.partitioner.BucketAssignOperator;
-import org.apache.hudi.sink.transform.RowDataToHoodieFunctions;
import org.apache.hudi.sink.transform.Transformer;
+import org.apache.hudi.sink.utils.Pipelines;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.StreamerUtil;
import com.beust.jcommander.JCommander;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
@@ -88,9 +76,6 @@ public class HoodieFlinkStreamer {
int parallelism = env.getParallelism();
conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
- StreamWriteOperatorFactory<HoodieRecord> operatorFactory =
- new StreamWriteOperatorFactory<>(conf);
-
DataStream<RowData> dataStream = env.addSource(new FlinkKafkaConsumer<>(
cfg.kafkaTopic,
new JsonRowDataDeserializationSchema(
@@ -110,51 +95,12 @@ public class HoodieFlinkStreamer {
}
}
- DataStream<HoodieRecord> dataStream2 = dataStream
- .map(RowDataToHoodieFunctions.create(rowType, conf), TypeInformation.of(HoodieRecord.class));
-
- if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
- dataStream2 = dataStream2
- .transform(
- "index_bootstrap",
- TypeInformation.of(HoodieRecord.class),
- new BootstrapOperator<>(conf))
- .setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(parallelism))
- .uid("uid_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME));
- }
-
- DataStream<Object> pipeline = dataStream2
- // Key-by record key, to avoid multiple subtasks write to a bucket at the same time
- .keyBy(HoodieRecord::getRecordKey)
- .transform(
- "bucket_assigner",
- TypeInformation.of(HoodieRecord.class),
- new BucketAssignOperator<>(new BucketAssignFunction<>(conf)))
- .uid("uid_bucket_assigner" + conf.getString(FlinkOptions.TABLE_NAME))
- .setParallelism(conf.getOptional(FlinkOptions.BUCKET_ASSIGN_TASKS).orElse(parallelism))
- // shuffle by fileId(bucket id)
- .keyBy(record -> record.getCurrentLocation().getFileId())
- .transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory)
- .uid("uid_hoodie_stream_write" + conf.getString(FlinkOptions.TABLE_NAME))
- .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
+ DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream, false);
+ DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream);
if (StreamerUtil.needsAsyncCompaction(conf)) {
- pipeline.transform("compact_plan_generate",
- TypeInformation.of(CompactionPlanEvent.class),
- new CompactionPlanOperator(conf))
- .uid("uid_compact_plan_generate")
- .setParallelism(1) // plan generate must be singleton
- .rebalance()
- .transform("compact_task",
- TypeInformation.of(CompactionCommitEvent.class),
- new ProcessOperator<>(new CompactFunction(conf)))
- .setParallelism(conf.getInteger(FlinkOptions.COMPACTION_TASKS))
- .addSink(new CompactionCommitSink(conf))
- .name("compact_commit")
- .setParallelism(1); // compaction commit should be singleton
+ Pipelines.compact(conf, pipeline);
} else {
- pipeline.addSink(new CleanFunction<>(conf))
- .setParallelism(1)
- .name("clean_commits").uid("uid_clean_commits");
+ Pipelines.clean(conf, pipeline);
}
env.execute(cfg.targetTableName);
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
index feab7b7..2ced22a 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
@@ -77,13 +77,10 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
// default parallelism
int parallelism = dataStream.getExecutionConfig().getParallelism();
-
// bootstrap
- final DataStream<HoodieRecord> dataStream1 = Pipelines.bootstrap(conf, rowType, parallelism, dataStream, context.isBounded());
-
+ final DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream, context.isBounded());
// write pipeline
- DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, dataStream1);
-
+ DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream);
// compaction
if (StreamerUtil.needsAsyncCompaction(conf)) {
return Pipelines.compact(conf, pipeline);
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
index a40a010..659e022 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
@@ -28,19 +28,14 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.sink.bootstrap.BootstrapOperator;
import org.apache.hudi.sink.compact.CompactFunction;
import org.apache.hudi.sink.compact.CompactionCommitEvent;
import org.apache.hudi.sink.compact.CompactionCommitSink;
-import org.apache.hudi.sink.compact.CompactionPlanEvent;
-import org.apache.hudi.sink.compact.CompactionPlanOperator;
import org.apache.hudi.sink.compact.CompactionPlanSourceFunction;
import org.apache.hudi.sink.compact.FlinkCompactionConfig;
-import org.apache.hudi.sink.partitioner.BucketAssignFunction;
-import org.apache.hudi.sink.partitioner.BucketAssignOperator;
import org.apache.hudi.sink.transform.ChainedTransformer;
-import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
import org.apache.hudi.sink.transform.Transformer;
+import org.apache.hudi.sink.utils.Pipelines;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.CompactionUtil;
@@ -235,12 +230,13 @@ public class StreamWriteITCase extends TestLogger {
@Test
public void testMergeOnReadWriteWithCompaction() throws Exception {
+ int parallelism = 4;
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
conf.setString(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name());
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
execEnv.getConfig().disableObjectReuse();
- execEnv.setParallelism(4);
+ execEnv.setParallelism(parallelism);
// set up checkpoint interval
execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE);
execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
@@ -267,49 +263,16 @@ public class StreamWriteITCase extends TestLogger {
TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
format.setCharsetName("UTF-8");
- DataStream<HoodieRecord> hoodieDataStream = execEnv
+ DataStream<RowData> dataStream = execEnv
// use PROCESS_CONTINUOUSLY mode to trigger checkpoint
.readFile(format, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, typeInfo)
.map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
- .setParallelism(4)
- .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class));
-
- if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
- hoodieDataStream = hoodieDataStream.transform("index_bootstrap",
- TypeInformation.of(HoodieRecord.class),
- new BootstrapOperator<>(conf));
- }
-
- DataStream<Object> pipeline = hoodieDataStream
- // Key-by record key, to avoid multiple subtasks write to a bucket at the same time
- .keyBy(HoodieRecord::getRecordKey)
- .transform(
- "bucket_assigner",
- TypeInformation.of(HoodieRecord.class),
- new BucketAssignOperator<>(new BucketAssignFunction<>(conf)))
- .uid("uid_bucket_assigner")
- // shuffle by fileId(bucket id)
- .keyBy(record -> record.getCurrentLocation().getFileId())
- .transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory)
- .uid("uid_hoodie_stream_write");
-
- pipeline.addSink(new CleanFunction<>(conf))
- .setParallelism(1)
- .name("clean_commits").uid("uid_clean_commits");
-
- pipeline.transform("compact_plan_generate",
- TypeInformation.of(CompactionPlanEvent.class),
- new CompactionPlanOperator(conf))
- .uid("uid_compact_plan_generate")
- .setParallelism(1) // plan generate must be singleton
- .rebalance()
- .transform("compact_task",
- TypeInformation.of(CompactionCommitEvent.class),
- new ProcessOperator<>(new CompactFunction(conf)))
- .addSink(new CompactionCommitSink(conf))
- .name("compact_commit")
- .setParallelism(1);
+ .setParallelism(parallelism);
+ DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream, false);
+ DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream);
+ Pipelines.clean(conf, pipeline);
+ Pipelines.compact(conf, pipeline);
JobClient client = execEnv.executeAsync(execEnv.getStreamGraph(conf.getString(FlinkOptions.TABLE_NAME)));
if (client.getJobStatus().get() != JobStatus.FAILED) {
try {
@@ -364,27 +327,9 @@ public class StreamWriteITCase extends TestLogger {
dataStream = transformer.apply(dataStream);
}
- DataStream<HoodieRecord> hoodieDataStream = dataStream
- .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class));
-
- if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
- hoodieDataStream = hoodieDataStream.transform("index_bootstrap",
- TypeInformation.of(HoodieRecord.class),
- new BootstrapOperator<>(conf));
- }
-
- DataStream<Object> pipeline = hoodieDataStream
- // Key-by record key, to avoid multiple subtasks write to a bucket at the same time
- .keyBy(HoodieRecord::getRecordKey)
- .transform(
- "bucket_assigner",
- TypeInformation.of(HoodieRecord.class),
- new BucketAssignOperator<>(new BucketAssignFunction<>(conf)))
- .uid("uid_bucket_assigner")
- // shuffle by fileId(bucket id)
- .keyBy(record -> record.getCurrentLocation().getFileId())
- .transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory)
- .uid("uid_hoodie_stream_write");
+ int parallelism = execEnv.getParallelism();
+ DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream, false);
+ DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream);
execEnv.addOperator(pipeline.getTransformation());
JobClient client = execEnv.executeAsync(execEnv.getStreamGraph(conf.getString(FlinkOptions.TABLE_NAME)));