You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2021/08/27 02:14:35 UTC

[hudi] branch master updated: [HUDI-2229] Refact HoodieFlinkStreamer to reuse the pipeline of HoodieTableSink (#3495)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9850e90  [HUDI-2229] Refact HoodieFlinkStreamer to reuse the pipeline of HoodieTableSink (#3495)
9850e90 is described below

commit 9850e90e2e0fa602a5346911810bd65c5f3e0ea8
Author: mikewu <xi...@gmail.com>
AuthorDate: Fri Aug 27 10:14:04 2021 +0800

    [HUDI-2229] Refact HoodieFlinkStreamer to reuse the pipeline of HoodieTableSink (#3495)
    
    Co-authored-by: mikewu <xi...@alibaba-inc.com>
---
 .../apache/hudi/streamer/HoodieFlinkStreamer.java  | 64 ++----------------
 .../org/apache/hudi/table/HoodieTableSink.java     |  7 +-
 .../org/apache/hudi/sink/StreamWriteITCase.java    | 79 ++++------------------
 3 files changed, 19 insertions(+), 131 deletions(-)

diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
index 3d3f804..077633e 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
@@ -21,30 +21,18 @@ package org.apache.hudi.streamer;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.sink.CleanFunction;
-import org.apache.hudi.sink.StreamWriteOperatorFactory;
-import org.apache.hudi.sink.bootstrap.BootstrapOperator;
-import org.apache.hudi.sink.compact.CompactFunction;
-import org.apache.hudi.sink.compact.CompactionCommitEvent;
-import org.apache.hudi.sink.compact.CompactionCommitSink;
-import org.apache.hudi.sink.compact.CompactionPlanEvent;
-import org.apache.hudi.sink.compact.CompactionPlanOperator;
-import org.apache.hudi.sink.partitioner.BucketAssignFunction;
-import org.apache.hudi.sink.partitioner.BucketAssignOperator;
-import org.apache.hudi.sink.transform.RowDataToHoodieFunctions;
 import org.apache.hudi.sink.transform.Transformer;
+import org.apache.hudi.sink.utils.Pipelines;
 import org.apache.hudi.util.AvroSchemaConverter;
 import org.apache.hudi.util.StreamerUtil;
 
 import com.beust.jcommander.JCommander;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.formats.common.TimestampFormat;
 import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.operators.ProcessOperator;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
@@ -88,9 +76,6 @@ public class HoodieFlinkStreamer {
     int parallelism = env.getParallelism();
     conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
 
-    StreamWriteOperatorFactory<HoodieRecord> operatorFactory =
-        new StreamWriteOperatorFactory<>(conf);
-
     DataStream<RowData> dataStream = env.addSource(new FlinkKafkaConsumer<>(
         cfg.kafkaTopic,
         new JsonRowDataDeserializationSchema(
@@ -110,51 +95,12 @@ public class HoodieFlinkStreamer {
       }
     }
 
-    DataStream<HoodieRecord> dataStream2 = dataStream
-        .map(RowDataToHoodieFunctions.create(rowType, conf), TypeInformation.of(HoodieRecord.class));
-
-    if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
-      dataStream2 = dataStream2
-          .transform(
-              "index_bootstrap",
-              TypeInformation.of(HoodieRecord.class),
-              new BootstrapOperator<>(conf))
-          .setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(parallelism))
-          .uid("uid_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME));
-    }
-
-    DataStream<Object> pipeline = dataStream2
-        // Key-by record key, to avoid multiple subtasks write to a bucket at the same time
-        .keyBy(HoodieRecord::getRecordKey)
-        .transform(
-            "bucket_assigner",
-            TypeInformation.of(HoodieRecord.class),
-            new BucketAssignOperator<>(new BucketAssignFunction<>(conf)))
-        .uid("uid_bucket_assigner" + conf.getString(FlinkOptions.TABLE_NAME))
-        .setParallelism(conf.getOptional(FlinkOptions.BUCKET_ASSIGN_TASKS).orElse(parallelism))
-        // shuffle by fileId(bucket id)
-        .keyBy(record -> record.getCurrentLocation().getFileId())
-        .transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory)
-        .uid("uid_hoodie_stream_write" + conf.getString(FlinkOptions.TABLE_NAME))
-        .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
+    DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream, false);
+    DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream);
     if (StreamerUtil.needsAsyncCompaction(conf)) {
-      pipeline.transform("compact_plan_generate",
-          TypeInformation.of(CompactionPlanEvent.class),
-          new CompactionPlanOperator(conf))
-          .uid("uid_compact_plan_generate")
-          .setParallelism(1) // plan generate must be singleton
-          .rebalance()
-          .transform("compact_task",
-              TypeInformation.of(CompactionCommitEvent.class),
-              new ProcessOperator<>(new CompactFunction(conf)))
-          .setParallelism(conf.getInteger(FlinkOptions.COMPACTION_TASKS))
-          .addSink(new CompactionCommitSink(conf))
-          .name("compact_commit")
-          .setParallelism(1); // compaction commit should be singleton
+      Pipelines.compact(conf, pipeline);
     } else {
-      pipeline.addSink(new CleanFunction<>(conf))
-          .setParallelism(1)
-          .name("clean_commits").uid("uid_clean_commits");
+      Pipelines.clean(conf, pipeline);
     }
 
     env.execute(cfg.targetTableName);
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
index feab7b7..2ced22a 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
@@ -77,13 +77,10 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
 
       // default parallelism
       int parallelism = dataStream.getExecutionConfig().getParallelism();
-
       // bootstrap
-      final DataStream<HoodieRecord> dataStream1 = Pipelines.bootstrap(conf, rowType, parallelism, dataStream, context.isBounded());
-
+      final DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream, context.isBounded());
       // write pipeline
-      DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, dataStream1);
-
+      DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream);
       // compaction
       if (StreamerUtil.needsAsyncCompaction(conf)) {
         return Pipelines.compact(conf, pipeline);
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
index a40a010..659e022 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
@@ -28,19 +28,14 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.CompactionUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.sink.bootstrap.BootstrapOperator;
 import org.apache.hudi.sink.compact.CompactFunction;
 import org.apache.hudi.sink.compact.CompactionCommitEvent;
 import org.apache.hudi.sink.compact.CompactionCommitSink;
-import org.apache.hudi.sink.compact.CompactionPlanEvent;
-import org.apache.hudi.sink.compact.CompactionPlanOperator;
 import org.apache.hudi.sink.compact.CompactionPlanSourceFunction;
 import org.apache.hudi.sink.compact.FlinkCompactionConfig;
-import org.apache.hudi.sink.partitioner.BucketAssignFunction;
-import org.apache.hudi.sink.partitioner.BucketAssignOperator;
 import org.apache.hudi.sink.transform.ChainedTransformer;
-import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
 import org.apache.hudi.sink.transform.Transformer;
+import org.apache.hudi.sink.utils.Pipelines;
 import org.apache.hudi.table.HoodieFlinkTable;
 import org.apache.hudi.util.AvroSchemaConverter;
 import org.apache.hudi.util.CompactionUtil;
@@ -235,12 +230,13 @@ public class StreamWriteITCase extends TestLogger {
 
   @Test
   public void testMergeOnReadWriteWithCompaction() throws Exception {
+    int parallelism = 4;
     Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
     conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
     conf.setString(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name());
     StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
     execEnv.getConfig().disableObjectReuse();
-    execEnv.setParallelism(4);
+    execEnv.setParallelism(parallelism);
     // set up checkpoint interval
     execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE);
     execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
@@ -267,49 +263,16 @@ public class StreamWriteITCase extends TestLogger {
     TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
     format.setCharsetName("UTF-8");
 
-    DataStream<HoodieRecord> hoodieDataStream = execEnv
+    DataStream<RowData> dataStream = execEnv
         // use PROCESS_CONTINUOUSLY mode to trigger checkpoint
         .readFile(format, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, typeInfo)
         .map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
-        .setParallelism(4)
-        .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class));
-
-    if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
-      hoodieDataStream = hoodieDataStream.transform("index_bootstrap",
-          TypeInformation.of(HoodieRecord.class),
-          new BootstrapOperator<>(conf));
-    }
-
-    DataStream<Object> pipeline = hoodieDataStream
-        // Key-by record key, to avoid multiple subtasks write to a bucket at the same time
-        .keyBy(HoodieRecord::getRecordKey)
-        .transform(
-            "bucket_assigner",
-            TypeInformation.of(HoodieRecord.class),
-            new BucketAssignOperator<>(new BucketAssignFunction<>(conf)))
-        .uid("uid_bucket_assigner")
-        // shuffle by fileId(bucket id)
-        .keyBy(record -> record.getCurrentLocation().getFileId())
-        .transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory)
-        .uid("uid_hoodie_stream_write");
-
-    pipeline.addSink(new CleanFunction<>(conf))
-        .setParallelism(1)
-        .name("clean_commits").uid("uid_clean_commits");
-
-    pipeline.transform("compact_plan_generate",
-        TypeInformation.of(CompactionPlanEvent.class),
-        new CompactionPlanOperator(conf))
-        .uid("uid_compact_plan_generate")
-        .setParallelism(1) // plan generate must be singleton
-        .rebalance()
-        .transform("compact_task",
-            TypeInformation.of(CompactionCommitEvent.class),
-            new ProcessOperator<>(new CompactFunction(conf)))
-        .addSink(new CompactionCommitSink(conf))
-        .name("compact_commit")
-        .setParallelism(1);
+        .setParallelism(parallelism);
 
+    DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream, false);
+    DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream);
+    Pipelines.clean(conf, pipeline);
+    Pipelines.compact(conf, pipeline);
     JobClient client = execEnv.executeAsync(execEnv.getStreamGraph(conf.getString(FlinkOptions.TABLE_NAME)));
     if (client.getJobStatus().get() != JobStatus.FAILED) {
       try {
@@ -364,27 +327,9 @@ public class StreamWriteITCase extends TestLogger {
       dataStream = transformer.apply(dataStream);
     }
 
-    DataStream<HoodieRecord> hoodieDataStream = dataStream
-        .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class));
-
-    if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
-      hoodieDataStream = hoodieDataStream.transform("index_bootstrap",
-          TypeInformation.of(HoodieRecord.class),
-          new BootstrapOperator<>(conf));
-    }
-
-    DataStream<Object> pipeline = hoodieDataStream
-        // Key-by record key, to avoid multiple subtasks write to a bucket at the same time
-        .keyBy(HoodieRecord::getRecordKey)
-        .transform(
-            "bucket_assigner",
-            TypeInformation.of(HoodieRecord.class),
-            new BucketAssignOperator<>(new BucketAssignFunction<>(conf)))
-        .uid("uid_bucket_assigner")
-        // shuffle by fileId(bucket id)
-        .keyBy(record -> record.getCurrentLocation().getFileId())
-        .transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory)
-        .uid("uid_hoodie_stream_write");
+    int parallelism = execEnv.getParallelism();
+    DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream, false);
+    DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream);
     execEnv.addOperator(pipeline.getTransformation());
 
     JobClient client = execEnv.executeAsync(execEnv.getStreamGraph(conf.getString(FlinkOptions.TABLE_NAME)));