You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2021/10/24 15:18:18 UTC

[hudi] branch master updated: [MINOR] Show source table operator details on the flink web when reading hudi table (#3842)

This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 91845e2  [MINOR] Show source table operator details on the flink web when reading hudi table (#3842)
91845e2 is described below

commit 91845e241da242cede95f705b0637331ce9222ff
Author: mincwang <33...@users.noreply.github.com>
AuthorDate: Sun Oct 24 23:18:01 2021 +0800

    [MINOR] Show source table operator details on the flink web when reading hudi table (#3842)
---
 .../java/org/apache/hudi/table/HoodieTableSource.java | 19 +++++++++++++++++--
 1 file changed, 17 insertions(+), 2 deletions(-)

diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 4e193fa..f0dbffd 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -180,7 +180,7 @@ public class HoodieTableSource implements
               conf, FilePathUtils.toFlinkPath(path), maxCompactionMemoryInBytes, getRequiredPartitionPaths());
           InputFormat<RowData, ?> inputFormat = getInputFormat(true);
           OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat);
-          SingleOutputStreamOperator<RowData> source = execEnv.addSource(monitoringFunction, "split_monitor")
+          SingleOutputStreamOperator<RowData> source = execEnv.addSource(monitoringFunction, getSourceOperatorName("split_monitor"))
               .setParallelism(1)
               .transform("split_reader", typeInfo, factory)
               .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS));
@@ -188,7 +188,7 @@ public class HoodieTableSource implements
         } else {
           InputFormatSourceFunction<RowData> func = new InputFormatSourceFunction<>(getInputFormat(), typeInfo);
           DataStreamSource<RowData> source = execEnv.addSource(func, asSummaryString(), typeInfo);
-          return source.name("bounded_source").setParallelism(conf.getInteger(FlinkOptions.READ_TASKS));
+          return source.name(getSourceOperatorName("bounded_source")).setParallelism(conf.getInteger(FlinkOptions.READ_TASKS));
         }
       }
     };
@@ -266,6 +266,21 @@ public class HoodieTableSource implements
     return requiredPartitions;
   }
 
+  private String getSourceOperatorName(String operatorName) {
+    String[] schemaFieldNames = this.schema.getColumnNames().toArray(new String[0]);
+    List<String> fields = Arrays.stream(this.requiredPos)
+        .mapToObj(i -> schemaFieldNames[i])
+        .collect(Collectors.toList());
+    StringBuilder sb = new StringBuilder();
+    sb.append(operatorName)
+        .append("(")
+        .append("table=").append(Collections.singletonList(conf.getString(FlinkOptions.TABLE_NAME)))
+        .append(", ")
+        .append("fields=").append(fields)
+        .append(")");
+    return sb.toString();
+  }
+
   @Nullable
   private Set<String> getRequiredPartitionPaths() {
     if (this.requiredPartitions == null) {