You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "xinyuiscool (via GitHub)" <gi...@apache.org> on 2023/04/20 17:07:44 UTC

[GitHub] [beam] xinyuiscool commented on a diff in pull request #26276: Populate TransformIOMap as Config for Beam Samza Runner

xinyuiscool commented on code in PR #26276:
URL: https://github.com/apache/beam/pull/26276#discussion_r1172872669


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java:
##########
@@ -107,6 +114,47 @@ public static void createConfig(
     pipeline.traverseTopologically(visitor);
   }
 
+  /**
+   * Builds a map from PTransform to its input and output PValues. The map is serialized and stored
+   * in the job config.
+   */
+  public static Map<String, Map.Entry<String, String>> buildTransformIOMap(

Review Comment:
   Move this to the json renderer class. This method has nothing to do with translation.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java:
##########
@@ -130,21 +130,23 @@ public SamzaPipelineResult run(Pipeline pipeline) {
           PipelineDotRenderer.toDotString(pipeline));
       LOG.debug(
           "Pre-processed Beam pipeline in json format:\n{}",
-          PipelineJsonRenderer.toJsonString(pipeline));
+          PipelineJsonRenderer.toJsonString(pipeline, Collections.emptyMap()));
     }
 
     pipeline.replaceAll(SamzaTransformOverrides.getDefaultOverrides());
 
+    final Map<PValue, String> idMap = PViewToIdMapper.buildIdMap(pipeline);
+    final Set<String> nonUniqueStateIds = StateIdParser.scan(pipeline);
+    final Map<String, Map.Entry<String, String>> transformIOMap =
+        SamzaPipelineTranslator.buildTransformIOMap(pipeline, options, idMap, nonUniqueStateIds);
+
     final String dotGraph = PipelineDotRenderer.toDotString(pipeline);
     LOG.info("Beam pipeline DOT graph:\n{}", dotGraph);
 
-    final String jsonGraph = PipelineJsonRenderer.toJsonString(pipeline);
+    final String jsonGraph = PipelineJsonRenderer.toJsonString(pipeline, transformIOMap);

Review Comment:
   same above: create a ConfigContext from idMap... and pass it in.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java:
##########
@@ -130,21 +130,23 @@ public SamzaPipelineResult run(Pipeline pipeline) {
           PipelineDotRenderer.toDotString(pipeline));
       LOG.debug(
           "Pre-processed Beam pipeline in json format:\n{}",
-          PipelineJsonRenderer.toJsonString(pipeline));
+          PipelineJsonRenderer.toJsonString(pipeline, Collections.emptyMap()));

Review Comment:
   Instead of passing in this emptyMap(), let's create a ConfigContext and pass it in here. This map should be constructed inside the JsonRenderer.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/util/PipelineJsonRenderer.java:
##########
@@ -86,9 +87,12 @@ public static String toJsonString(RunnerApi.Pipeline pipeline) {
   private final StringBuilder jsonBuilder = new StringBuilder();
   private final StringBuilder graphLinks = new StringBuilder();
   private final Map<PValue, String> valueToProducerNodeName = new HashMap<>();
+  private final Map<String, Map.Entry<String, String>> transformIOMap;
   private int indent;
 
-  private PipelineJsonRenderer() {}
+  private PipelineJsonRenderer(Map<String, Map.Entry<String, String>> transformIOMap) {

Review Comment:
   Same above.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java:
##########
@@ -130,21 +130,23 @@ public SamzaPipelineResult run(Pipeline pipeline) {
           PipelineDotRenderer.toDotString(pipeline));
       LOG.debug(
           "Pre-processed Beam pipeline in json format:\n{}",
-          PipelineJsonRenderer.toJsonString(pipeline));
+          PipelineJsonRenderer.toJsonString(pipeline, Collections.emptyMap()));
     }
 
     pipeline.replaceAll(SamzaTransformOverrides.getDefaultOverrides());
 
+    final Map<PValue, String> idMap = PViewToIdMapper.buildIdMap(pipeline);
+    final Set<String> nonUniqueStateIds = StateIdParser.scan(pipeline);
+    final Map<String, Map.Entry<String, String>> transformIOMap =
+        SamzaPipelineTranslator.buildTransformIOMap(pipeline, options, idMap, nonUniqueStateIds);
+
     final String dotGraph = PipelineDotRenderer.toDotString(pipeline);
     LOG.info("Beam pipeline DOT graph:\n{}", dotGraph);
 
-    final String jsonGraph = PipelineJsonRenderer.toJsonString(pipeline);
+    final String jsonGraph = PipelineJsonRenderer.toJsonString(pipeline, transformIOMap);
     LOG.info("Beam pipeline JSON graph:\n{}", jsonGraph);
 
-    final Map<PValue, String> idMap = PViewToIdMapper.buildIdMap(pipeline);
-    final Set<String> nonUniqueStateIds = StateIdParser.scan(pipeline);
     final ConfigBuilder configBuilder = new ConfigBuilder(options);
-
     SamzaPipelineTranslator.createConfig(
         pipeline, options, idMap, nonUniqueStateIds, configBuilder);

Review Comment:
   Let's refactor this method a bit to pass in the previously created ConfigContext instead of options, idMap, nonUni.. That way the code is much more readable and extendable.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/util/PipelineJsonRenderer.java:
##########
@@ -66,8 +66,9 @@ public interface SamzaIORegistrar {
    * @param pipeline The beam pipeline
    * @return JSON string representation of the pipeline
    */
-  public static String toJsonString(Pipeline pipeline) {
-    final PipelineJsonRenderer visitor = new PipelineJsonRenderer();
+  public static String toJsonString(
+      Pipeline pipeline, Map<String, Map.Entry<String, String>> transformIOMap) {

Review Comment:
   Instead of pass in transformIOMap, we should pass in a ConfigContext and then construct the IOMap here. The IO map is only used for creating json.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java:
##########
@@ -130,21 +130,23 @@ public SamzaPipelineResult run(Pipeline pipeline) {
           PipelineDotRenderer.toDotString(pipeline));
       LOG.debug(
           "Pre-processed Beam pipeline in json format:\n{}",
-          PipelineJsonRenderer.toJsonString(pipeline));
+          PipelineJsonRenderer.toJsonString(pipeline, Collections.emptyMap()));
     }
 
     pipeline.replaceAll(SamzaTransformOverrides.getDefaultOverrides());
 
+    final Map<PValue, String> idMap = PViewToIdMapper.buildIdMap(pipeline);
+    final Set<String> nonUniqueStateIds = StateIdParser.scan(pipeline);
+    final Map<String, Map.Entry<String, String>> transformIOMap =

Review Comment:
   Let's not create this map here. Instead, create a ConfigContext and pass it in below.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org