You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2021/10/24 15:18:18 UTC
[hudi] branch master updated: [MINOR] Show source table operator
details on the flink web when reading hudi table (#3842)
This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 91845e2 [MINOR] Show source table operator details on the flink web when reading hudi table (#3842)
91845e2 is described below
commit 91845e241da242cede95f705b0637331ce9222ff
Author: mincwang <33...@users.noreply.github.com>
AuthorDate: Sun Oct 24 23:18:01 2021 +0800
[MINOR] Show source table operator details on the flink web when reading hudi table (#3842)
---
.../java/org/apache/hudi/table/HoodieTableSource.java | 19 +++++++++++++++++--
1 file changed, 17 insertions(+), 2 deletions(-)
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 4e193fa..f0dbffd 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -180,7 +180,7 @@ public class HoodieTableSource implements
conf, FilePathUtils.toFlinkPath(path), maxCompactionMemoryInBytes, getRequiredPartitionPaths());
InputFormat<RowData, ?> inputFormat = getInputFormat(true);
OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat);
- SingleOutputStreamOperator<RowData> source = execEnv.addSource(monitoringFunction, "split_monitor")
+ SingleOutputStreamOperator<RowData> source = execEnv.addSource(monitoringFunction, getSourceOperatorName("split_monitor"))
.setParallelism(1)
.transform("split_reader", typeInfo, factory)
.setParallelism(conf.getInteger(FlinkOptions.READ_TASKS));
@@ -188,7 +188,7 @@ public class HoodieTableSource implements
} else {
InputFormatSourceFunction<RowData> func = new InputFormatSourceFunction<>(getInputFormat(), typeInfo);
DataStreamSource<RowData> source = execEnv.addSource(func, asSummaryString(), typeInfo);
- return source.name("bounded_source").setParallelism(conf.getInteger(FlinkOptions.READ_TASKS));
+ return source.name(getSourceOperatorName("bounded_source")).setParallelism(conf.getInteger(FlinkOptions.READ_TASKS));
}
}
};
@@ -266,6 +266,21 @@ public class HoodieTableSource implements
return requiredPartitions;
}
+ private String getSourceOperatorName(String operatorName) {
+ String[] schemaFieldNames = this.schema.getColumnNames().toArray(new String[0]);
+ List<String> fields = Arrays.stream(this.requiredPos)
+ .mapToObj(i -> schemaFieldNames[i])
+ .collect(Collectors.toList());
+ StringBuilder sb = new StringBuilder();
+ sb.append(operatorName)
+ .append("(")
+ .append("table=").append(Collections.singletonList(conf.getString(FlinkOptions.TABLE_NAME)))
+ .append(", ")
+ .append("fields=").append(fields)
+ .append(")");
+ return sb.toString();
+ }
+
@Nullable
private Set<String> getRequiredPartitionPaths() {
if (this.requiredPartitions == null) {