You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/10/29 05:15:58 UTC

[hudi] branch master updated: [HUDI-5102] source operator(monitor and reader) support user uid (#7085)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e40a6347c [HUDI-5102] source operator(monitor and reader) support user uid  (#7085)
6e40a6347c is described below

commit 6e40a6347c9d95e1b9f17c98e7d2018af594aff8
Author: 矛始 <10...@qq.com>
AuthorDate: Sat Oct 29 13:15:47 2022 +0800

    [HUDI-5102] source operator(monitor and reader) support user uid  (#7085)
    
    * Update HoodieTableSource.java
    
    Co-authored-by: chenzhiming <ch...@chinatelecom.cn>
---
 .../java/org/apache/hudi/sink/utils/Pipelines.java | 30 ++++++++++++----------
 .../org/apache/hudi/table/HoodieTableSource.java   |  3 +++
 2 files changed, 20 insertions(+), 13 deletions(-)

diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index 82761adf73..a045a9276c 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -120,8 +120,8 @@ public class Pipelines {
             conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
       }
       return dataStream
-          .transform(opIdentifier("bucket_bulk_insert", conf), TypeInformation.of(Object.class), operatorFactory)
-          .uid("uid_bucket_bulk_insert" + conf.getString(FlinkOptions.TABLE_NAME))
+          .transform(opName("bucket_bulk_insert", conf), TypeInformation.of(Object.class), operatorFactory)
+          .uid(opUID("bucket_bulk_insert", conf))
           .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
           .addSink(DummySink.INSTANCE)
           .name("dummy");
@@ -152,7 +152,7 @@ public class Pipelines {
       }
     }
     return dataStream
-        .transform(opIdentifier("hoodie_bulk_insert_write", conf),
+        .transform(opName("hoodie_bulk_insert_write", conf),
             TypeInformation.of(Object.class),
             operatorFactory)
         // follow the parallelism of upstream operators to avoid shuffle
@@ -196,8 +196,8 @@ public class Pipelines {
     WriteOperatorFactory<RowData> operatorFactory = AppendWriteOperator.getFactory(conf, rowType);
 
     return dataStream
-        .transform(opIdentifier("hoodie_append_write", conf), TypeInformation.of(Object.class), operatorFactory)
-        .uid("uid_hoodie_stream_write" + conf.getString(FlinkOptions.TABLE_NAME))
+        .transform(opName("hoodie_append_write", conf), TypeInformation.of(Object.class), operatorFactory)
+        .uid(opUID("hoodie_stream_write", conf))
         .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
   }
 
@@ -254,7 +254,7 @@ public class Pipelines {
               TypeInformation.of(HoodieRecord.class),
               new BootstrapOperator<>(conf))
           .setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(dataStream1.getParallelism()))
-          .uid("uid_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME));
+          .uid(opUID("index_bootstrap", conf));
     }
 
     return dataStream1;
@@ -280,7 +280,7 @@ public class Pipelines {
             TypeInformation.of(HoodieRecord.class),
             new BatchBootstrapOperator<>(conf))
         .setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(dataStream.getParallelism()))
-        .uid("uid_batch_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME));
+        .uid(opUID("batch_index_bootstrap", conf));
   }
 
   /**
@@ -320,8 +320,8 @@ public class Pipelines {
       String indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD);
       BucketIndexPartitioner<HoodieKey> partitioner = new BucketIndexPartitioner<>(bucketNum, indexKeyFields);
       return dataStream.partitionCustom(partitioner, HoodieRecord::getKey)
-          .transform(opIdentifier("bucket_write", conf), TypeInformation.of(Object.class), operatorFactory)
-          .uid("uid_bucket_write" + conf.getString(FlinkOptions.TABLE_NAME))
+          .transform(opName("bucket_write", conf), TypeInformation.of(Object.class), operatorFactory)
+          .uid(opUID("bucket_write", conf))
           .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
     } else {
       WriteOperatorFactory<HoodieRecord> operatorFactory = StreamWriteOperator.getFactory(conf);
@@ -332,12 +332,12 @@ public class Pipelines {
               "bucket_assigner",
               TypeInformation.of(HoodieRecord.class),
               new KeyedProcessOperator<>(new BucketAssignFunction<>(conf)))
-          .uid("uid_bucket_assigner_" + conf.getString(FlinkOptions.TABLE_NAME))
+          .uid(opUID("bucket_assigner", conf))
           .setParallelism(conf.getInteger(FlinkOptions.BUCKET_ASSIGN_TASKS))
           // shuffle by fileId(bucket id)
           .keyBy(record -> record.getCurrentLocation().getFileId())
-          .transform(opIdentifier("stream_write", conf), TypeInformation.of(Object.class), operatorFactory)
-          .uid("uid_stream_write" + conf.getString(FlinkOptions.TABLE_NAME))
+          .transform(opName("stream_write", conf), TypeInformation.of(Object.class), operatorFactory)
+          .uid(opUID("stream_write", conf))
           .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
     }
   }
@@ -435,10 +435,14 @@ public class Pipelines {
         .name("dummy");
   }
 
-  public static String opIdentifier(String operatorN, Configuration conf) {
+  public static String opName(String operatorN, Configuration conf) {
     return operatorN + ": " + conf.getString(FlinkOptions.TABLE_NAME);
   }
 
+  public static String opUID(String operatorN, Configuration conf) {
+    return "uid_" + operatorN + "_" + conf.getString(FlinkOptions.TABLE_NAME);
+  }
+
   /**
    * Dummy sink that does nothing.
    */
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 4533ef7179..f4c301892e 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -37,6 +37,7 @@ import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.configuration.OptionsInference;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.sink.utils.Pipelines;
 import org.apache.hudi.source.FileIndex;
 import org.apache.hudi.source.IncrementalInputSplits;
 import org.apache.hudi.source.StreamReadMonitoringFunction;
@@ -188,9 +189,11 @@ public class HoodieTableSource implements
           InputFormat<RowData, ?> inputFormat = getInputFormat(true);
           OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat);
           SingleOutputStreamOperator<RowData> source = execEnv.addSource(monitoringFunction, getSourceOperatorName("split_monitor"))
+              .uid(Pipelines.opUID("split_monitor", conf))
               .setParallelism(1)
               .keyBy(MergeOnReadInputSplit::getFileId)
               .transform("split_reader", typeInfo, factory)
+              .uid(Pipelines.opUID("split_reader", conf))
               .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS));
           return new DataStreamSource<>(source);
         } else {