You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/10/28 01:53:41 UTC

[hudi] branch pull/7011 created (now 4dc6d45d8f)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a change to branch pull/7011
in repository https://gitbox.apache.org/repos/asf/hudi.git


      at 4dc6d45d8f fix operator UID

This branch includes the following new commits:

     new 4dc6d45d8f fix operator UID

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[hudi] 01/01: fix operator UID

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch pull/7011
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 4dc6d45d8f5acb4dec768ecbc45c37349d6f02f7
Author: yuzhao.cyz <yu...@gmail.com>
AuthorDate: Fri Oct 28 09:53:11 2022 +0800

    fix operator UID
---
 .../java/org/apache/hudi/sink/utils/Pipelines.java | 30 ++++++++++++----------
 .../org/apache/hudi/table/HoodieTableSource.java   |  9 +++----
 2 files changed, 20 insertions(+), 19 deletions(-)

diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index 82761adf73..a045a9276c 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -120,8 +120,8 @@ public class Pipelines {
             conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
       }
       return dataStream
-          .transform(opIdentifier("bucket_bulk_insert", conf), TypeInformation.of(Object.class), operatorFactory)
-          .uid("uid_bucket_bulk_insert" + conf.getString(FlinkOptions.TABLE_NAME))
+          .transform(opName("bucket_bulk_insert", conf), TypeInformation.of(Object.class), operatorFactory)
+          .uid(opUID("bucket_bulk_insert", conf))
           .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
           .addSink(DummySink.INSTANCE)
           .name("dummy");
@@ -152,7 +152,7 @@ public class Pipelines {
       }
     }
     return dataStream
-        .transform(opIdentifier("hoodie_bulk_insert_write", conf),
+        .transform(opName("hoodie_bulk_insert_write", conf),
             TypeInformation.of(Object.class),
             operatorFactory)
         // follow the parallelism of upstream operators to avoid shuffle
@@ -196,8 +196,8 @@ public class Pipelines {
     WriteOperatorFactory<RowData> operatorFactory = AppendWriteOperator.getFactory(conf, rowType);
 
     return dataStream
-        .transform(opIdentifier("hoodie_append_write", conf), TypeInformation.of(Object.class), operatorFactory)
-        .uid("uid_hoodie_stream_write" + conf.getString(FlinkOptions.TABLE_NAME))
+        .transform(opName("hoodie_append_write", conf), TypeInformation.of(Object.class), operatorFactory)
+        .uid(opUID("hoodie_stream_write", conf))
         .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
   }
 
@@ -254,7 +254,7 @@ public class Pipelines {
               TypeInformation.of(HoodieRecord.class),
               new BootstrapOperator<>(conf))
           .setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(dataStream1.getParallelism()))
-          .uid("uid_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME));
+          .uid(opUID("index_bootstrap", conf));
     }
 
     return dataStream1;
@@ -280,7 +280,7 @@ public class Pipelines {
             TypeInformation.of(HoodieRecord.class),
             new BatchBootstrapOperator<>(conf))
         .setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(dataStream.getParallelism()))
-        .uid("uid_batch_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME));
+        .uid(opUID("batch_index_bootstrap", conf));
   }
 
   /**
@@ -320,8 +320,8 @@ public class Pipelines {
       String indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD);
       BucketIndexPartitioner<HoodieKey> partitioner = new BucketIndexPartitioner<>(bucketNum, indexKeyFields);
       return dataStream.partitionCustom(partitioner, HoodieRecord::getKey)
-          .transform(opIdentifier("bucket_write", conf), TypeInformation.of(Object.class), operatorFactory)
-          .uid("uid_bucket_write" + conf.getString(FlinkOptions.TABLE_NAME))
+          .transform(opName("bucket_write", conf), TypeInformation.of(Object.class), operatorFactory)
+          .uid(opUID("bucket_write", conf))
           .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
     } else {
       WriteOperatorFactory<HoodieRecord> operatorFactory = StreamWriteOperator.getFactory(conf);
@@ -332,12 +332,12 @@ public class Pipelines {
               "bucket_assigner",
               TypeInformation.of(HoodieRecord.class),
               new KeyedProcessOperator<>(new BucketAssignFunction<>(conf)))
-          .uid("uid_bucket_assigner_" + conf.getString(FlinkOptions.TABLE_NAME))
+          .uid(opUID("bucket_assigner", conf))
           .setParallelism(conf.getInteger(FlinkOptions.BUCKET_ASSIGN_TASKS))
           // shuffle by fileId(bucket id)
           .keyBy(record -> record.getCurrentLocation().getFileId())
-          .transform(opIdentifier("stream_write", conf), TypeInformation.of(Object.class), operatorFactory)
-          .uid("uid_stream_write" + conf.getString(FlinkOptions.TABLE_NAME))
+          .transform(opName("stream_write", conf), TypeInformation.of(Object.class), operatorFactory)
+          .uid(opUID("stream_write", conf))
           .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
     }
   }
@@ -435,10 +435,14 @@ public class Pipelines {
         .name("dummy");
   }
 
-  public static String opIdentifier(String operatorN, Configuration conf) {
+  public static String opName(String operatorN, Configuration conf) {
     return operatorN + ": " + conf.getString(FlinkOptions.TABLE_NAME);
   }
 
+  public static String opUID(String operatorN, Configuration conf) {
+    return "uid_" + operatorN + "_" + conf.getString(FlinkOptions.TABLE_NAME);
+  }
+
   /**
    * Dummy sink that does nothing.
    */
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 42241af449..f4c301892e 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -37,6 +37,7 @@ import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.configuration.OptionsInference;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.sink.utils.Pipelines;
 import org.apache.hudi.source.FileIndex;
 import org.apache.hudi.source.IncrementalInputSplits;
 import org.apache.hudi.source.StreamReadMonitoringFunction;
@@ -188,11 +189,11 @@ public class HoodieTableSource implements
           InputFormat<RowData, ?> inputFormat = getInputFormat(true);
           OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat);
           SingleOutputStreamOperator<RowData> source = execEnv.addSource(monitoringFunction, getSourceOperatorName("split_monitor"))
-              .uid(readOpIdentifier("split_monitor", conf))
+              .uid(Pipelines.opUID("split_monitor", conf))
               .setParallelism(1)
               .keyBy(MergeOnReadInputSplit::getFileId)
               .transform("split_reader", typeInfo, factory)
-              .uid(readOpIdentifier("split_reader", conf))
+              .uid(Pipelines.opUID("split_reader", conf))
               .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS));
           return new DataStreamSource<>(source);
         } else {
@@ -204,10 +205,6 @@ public class HoodieTableSource implements
     };
   }
 
-  public static String readOpIdentifier(String operatorN, Configuration conf) {
-    return "uid_" + operatorN + "_" + conf.getString(FlinkOptions.TABLE_NAME);
-  }
-
   @Override
   public ChangelogMode getChangelogMode() {
     // when read as streaming and changelog mode is enabled, emit as FULL mode;