You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/10/29 05:15:58 UTC
[hudi] branch master updated: [HUDI-5102] source operator(monitor and reader) support user uid (#7085)
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6e40a6347c [HUDI-5102] source operator(monitor and reader) support user uid (#7085)
6e40a6347c is described below
commit 6e40a6347c9d95e1b9f17c98e7d2018af594aff8
Author: 矛始 <10...@qq.com>
AuthorDate: Sat Oct 29 13:15:47 2022 +0800
[HUDI-5102] source operator(monitor and reader) support user uid (#7085)
* Update HoodieTableSource.java
Co-authored-by: chenzhiming <ch...@chinatelecom.cn>
---
.../java/org/apache/hudi/sink/utils/Pipelines.java | 30 ++++++++++++----------
.../org/apache/hudi/table/HoodieTableSource.java | 3 +++
2 files changed, 20 insertions(+), 13 deletions(-)
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index 82761adf73..a045a9276c 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -120,8 +120,8 @@ public class Pipelines {
conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
}
return dataStream
- .transform(opIdentifier("bucket_bulk_insert", conf), TypeInformation.of(Object.class), operatorFactory)
- .uid("uid_bucket_bulk_insert" + conf.getString(FlinkOptions.TABLE_NAME))
+ .transform(opName("bucket_bulk_insert", conf), TypeInformation.of(Object.class), operatorFactory)
+ .uid(opUID("bucket_bulk_insert", conf))
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
.addSink(DummySink.INSTANCE)
.name("dummy");
@@ -152,7 +152,7 @@ public class Pipelines {
}
}
return dataStream
- .transform(opIdentifier("hoodie_bulk_insert_write", conf),
+ .transform(opName("hoodie_bulk_insert_write", conf),
TypeInformation.of(Object.class),
operatorFactory)
// follow the parallelism of upstream operators to avoid shuffle
@@ -196,8 +196,8 @@ public class Pipelines {
WriteOperatorFactory<RowData> operatorFactory = AppendWriteOperator.getFactory(conf, rowType);
return dataStream
- .transform(opIdentifier("hoodie_append_write", conf), TypeInformation.of(Object.class), operatorFactory)
- .uid("uid_hoodie_stream_write" + conf.getString(FlinkOptions.TABLE_NAME))
+ .transform(opName("hoodie_append_write", conf), TypeInformation.of(Object.class), operatorFactory)
+ .uid(opUID("hoodie_stream_write", conf))
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
}
@@ -254,7 +254,7 @@ public class Pipelines {
TypeInformation.of(HoodieRecord.class),
new BootstrapOperator<>(conf))
.setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(dataStream1.getParallelism()))
- .uid("uid_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME));
+ .uid(opUID("index_bootstrap", conf));
}
return dataStream1;
@@ -280,7 +280,7 @@ public class Pipelines {
TypeInformation.of(HoodieRecord.class),
new BatchBootstrapOperator<>(conf))
.setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(dataStream.getParallelism()))
- .uid("uid_batch_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME));
+ .uid(opUID("batch_index_bootstrap", conf));
}
/**
@@ -320,8 +320,8 @@ public class Pipelines {
String indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD);
BucketIndexPartitioner<HoodieKey> partitioner = new BucketIndexPartitioner<>(bucketNum, indexKeyFields);
return dataStream.partitionCustom(partitioner, HoodieRecord::getKey)
- .transform(opIdentifier("bucket_write", conf), TypeInformation.of(Object.class), operatorFactory)
- .uid("uid_bucket_write" + conf.getString(FlinkOptions.TABLE_NAME))
+ .transform(opName("bucket_write", conf), TypeInformation.of(Object.class), operatorFactory)
+ .uid(opUID("bucket_write", conf))
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
} else {
WriteOperatorFactory<HoodieRecord> operatorFactory = StreamWriteOperator.getFactory(conf);
@@ -332,12 +332,12 @@ public class Pipelines {
"bucket_assigner",
TypeInformation.of(HoodieRecord.class),
new KeyedProcessOperator<>(new BucketAssignFunction<>(conf)))
- .uid("uid_bucket_assigner_" + conf.getString(FlinkOptions.TABLE_NAME))
+ .uid(opUID("bucket_assigner", conf))
.setParallelism(conf.getInteger(FlinkOptions.BUCKET_ASSIGN_TASKS))
// shuffle by fileId(bucket id)
.keyBy(record -> record.getCurrentLocation().getFileId())
- .transform(opIdentifier("stream_write", conf), TypeInformation.of(Object.class), operatorFactory)
- .uid("uid_stream_write" + conf.getString(FlinkOptions.TABLE_NAME))
+ .transform(opName("stream_write", conf), TypeInformation.of(Object.class), operatorFactory)
+ .uid(opUID("stream_write", conf))
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
}
}
@@ -435,10 +435,14 @@ public class Pipelines {
.name("dummy");
}
- public static String opIdentifier(String operatorN, Configuration conf) {
+ public static String opName(String operatorN, Configuration conf) {
return operatorN + ": " + conf.getString(FlinkOptions.TABLE_NAME);
}
+ public static String opUID(String operatorN, Configuration conf) {
+ return "uid_" + operatorN + "_" + conf.getString(FlinkOptions.TABLE_NAME);
+ }
+
/**
* Dummy sink that does nothing.
*/
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 4533ef7179..f4c301892e 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -37,6 +37,7 @@ import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsInference;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.sink.utils.Pipelines;
import org.apache.hudi.source.FileIndex;
import org.apache.hudi.source.IncrementalInputSplits;
import org.apache.hudi.source.StreamReadMonitoringFunction;
@@ -188,9 +189,11 @@ public class HoodieTableSource implements
InputFormat<RowData, ?> inputFormat = getInputFormat(true);
OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat);
SingleOutputStreamOperator<RowData> source = execEnv.addSource(monitoringFunction, getSourceOperatorName("split_monitor"))
+ .uid(Pipelines.opUID("split_monitor", conf))
.setParallelism(1)
.keyBy(MergeOnReadInputSplit::getFileId)
.transform("split_reader", typeInfo, factory)
+ .uid(Pipelines.opUID("split_reader", conf))
.setParallelism(conf.getInteger(FlinkOptions.READ_TASKS));
return new DataStreamSource<>(source);
} else {