You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/10/28 01:53:42 UTC

[hudi] 01/01: fix operator UID

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch pull/7011
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 4dc6d45d8f5acb4dec768ecbc45c37349d6f02f7
Author: yuzhao.cyz <yu...@gmail.com>
AuthorDate: Fri Oct 28 09:53:11 2022 +0800

    fix operator UID
---
 .../java/org/apache/hudi/sink/utils/Pipelines.java | 30 ++++++++++++----------
 .../org/apache/hudi/table/HoodieTableSource.java   |  9 +++----
 2 files changed, 20 insertions(+), 19 deletions(-)

diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index 82761adf73..a045a9276c 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -120,8 +120,8 @@ public class Pipelines {
             conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
       }
       return dataStream
-          .transform(opIdentifier("bucket_bulk_insert", conf), TypeInformation.of(Object.class), operatorFactory)
-          .uid("uid_bucket_bulk_insert" + conf.getString(FlinkOptions.TABLE_NAME))
+          .transform(opName("bucket_bulk_insert", conf), TypeInformation.of(Object.class), operatorFactory)
+          .uid(opUID("bucket_bulk_insert", conf))
           .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
           .addSink(DummySink.INSTANCE)
           .name("dummy");
@@ -152,7 +152,7 @@ public class Pipelines {
       }
     }
     return dataStream
-        .transform(opIdentifier("hoodie_bulk_insert_write", conf),
+        .transform(opName("hoodie_bulk_insert_write", conf),
             TypeInformation.of(Object.class),
             operatorFactory)
         // follow the parallelism of upstream operators to avoid shuffle
@@ -196,8 +196,8 @@ public class Pipelines {
     WriteOperatorFactory<RowData> operatorFactory = AppendWriteOperator.getFactory(conf, rowType);
 
     return dataStream
-        .transform(opIdentifier("hoodie_append_write", conf), TypeInformation.of(Object.class), operatorFactory)
-        .uid("uid_hoodie_stream_write" + conf.getString(FlinkOptions.TABLE_NAME))
+        .transform(opName("hoodie_append_write", conf), TypeInformation.of(Object.class), operatorFactory)
+        .uid(opUID("hoodie_stream_write", conf))
         .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
   }
 
@@ -254,7 +254,7 @@ public class Pipelines {
               TypeInformation.of(HoodieRecord.class),
               new BootstrapOperator<>(conf))
           .setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(dataStream1.getParallelism()))
-          .uid("uid_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME));
+          .uid(opUID("index_bootstrap", conf));
     }
 
     return dataStream1;
@@ -280,7 +280,7 @@ public class Pipelines {
             TypeInformation.of(HoodieRecord.class),
             new BatchBootstrapOperator<>(conf))
         .setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(dataStream.getParallelism()))
-        .uid("uid_batch_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME));
+        .uid(opUID("batch_index_bootstrap", conf));
   }
 
   /**
@@ -320,8 +320,8 @@ public class Pipelines {
       String indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD);
       BucketIndexPartitioner<HoodieKey> partitioner = new BucketIndexPartitioner<>(bucketNum, indexKeyFields);
       return dataStream.partitionCustom(partitioner, HoodieRecord::getKey)
-          .transform(opIdentifier("bucket_write", conf), TypeInformation.of(Object.class), operatorFactory)
-          .uid("uid_bucket_write" + conf.getString(FlinkOptions.TABLE_NAME))
+          .transform(opName("bucket_write", conf), TypeInformation.of(Object.class), operatorFactory)
+          .uid(opUID("bucket_write", conf))
           .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
     } else {
       WriteOperatorFactory<HoodieRecord> operatorFactory = StreamWriteOperator.getFactory(conf);
@@ -332,12 +332,12 @@ public class Pipelines {
               "bucket_assigner",
               TypeInformation.of(HoodieRecord.class),
               new KeyedProcessOperator<>(new BucketAssignFunction<>(conf)))
-          .uid("uid_bucket_assigner_" + conf.getString(FlinkOptions.TABLE_NAME))
+          .uid(opUID("bucket_assigner", conf))
           .setParallelism(conf.getInteger(FlinkOptions.BUCKET_ASSIGN_TASKS))
           // shuffle by fileId(bucket id)
           .keyBy(record -> record.getCurrentLocation().getFileId())
-          .transform(opIdentifier("stream_write", conf), TypeInformation.of(Object.class), operatorFactory)
-          .uid("uid_stream_write" + conf.getString(FlinkOptions.TABLE_NAME))
+          .transform(opName("stream_write", conf), TypeInformation.of(Object.class), operatorFactory)
+          .uid(opUID("stream_write", conf))
           .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
     }
   }
@@ -435,10 +435,14 @@ public class Pipelines {
         .name("dummy");
   }
 
-  public static String opIdentifier(String operatorN, Configuration conf) {
+  public static String opName(String operatorN, Configuration conf) {
     return operatorN + ": " + conf.getString(FlinkOptions.TABLE_NAME);
   }
 
+  public static String opUID(String operatorN, Configuration conf) {
+    return "uid_" + operatorN + "_" + conf.getString(FlinkOptions.TABLE_NAME);
+  }
+
   /**
    * Dummy sink that does nothing.
    */
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 42241af449..f4c301892e 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -37,6 +37,7 @@ import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.configuration.OptionsInference;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.sink.utils.Pipelines;
 import org.apache.hudi.source.FileIndex;
 import org.apache.hudi.source.IncrementalInputSplits;
 import org.apache.hudi.source.StreamReadMonitoringFunction;
@@ -188,11 +189,11 @@ public class HoodieTableSource implements
           InputFormat<RowData, ?> inputFormat = getInputFormat(true);
           OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat);
           SingleOutputStreamOperator<RowData> source = execEnv.addSource(monitoringFunction, getSourceOperatorName("split_monitor"))
-              .uid(readOpIdentifier("split_monitor", conf))
+              .uid(Pipelines.opUID("split_monitor", conf))
               .setParallelism(1)
               .keyBy(MergeOnReadInputSplit::getFileId)
               .transform("split_reader", typeInfo, factory)
-              .uid(readOpIdentifier("split_reader", conf))
+              .uid(Pipelines.opUID("split_reader", conf))
               .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS));
           return new DataStreamSource<>(source);
         } else {
@@ -204,10 +205,6 @@ public class HoodieTableSource implements
     };
   }
 
-  public static String readOpIdentifier(String operatorN, Configuration conf) {
-    return "uid_" + operatorN + "_" + conf.getString(FlinkOptions.TABLE_NAME);
-  }
-
   @Override
   public ChangelogMode getChangelogMode() {
     // when read as streaming and changelog mode is enabled, emit as FULL mode;