You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "Sanil15 (via GitHub)" <gi...@apache.org> on 2023/04/14 01:16:46 UTC

[GitHub] [beam] Sanil15 opened a new pull request, #26276: Populate TransformIOMap as Config for Beam Samza Runner

Sanil15 opened a new pull request, #26276:
URL: https://github.com/apache/beam/pull/26276

   ### **Summary**
   
   - Add support for populating serialized map as config for PTransform inputs and output PCollections
   - Transform Inputs & Outputs PCollections are identified by PValue
   - Topologically traverse the pipeline to serialize list of inputs & outputs 
   - This config can be used later for adding per transform metric (throughput & latency) support for Samza runner
   
   ### Tests 
   
   - Added Unit tests
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Sanil15 commented on a diff in pull request #26276: Populate TransformIOMap as Config for Beam Samza Runner

Posted by "Sanil15 (via GitHub)" <gi...@apache.org>.
Sanil15 commented on code in PR #26276:
URL: https://github.com/apache/beam/pull/26276#discussion_r1172940498


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java:
##########
@@ -107,6 +114,47 @@ public static void createConfig(
     pipeline.traverseTopologically(visitor);
   }
 
+  /**
+   * Builds a map from PTransform to its input and output PValues. The map is serialized and stored
+   * in the job config.
+   */
+  public static Map<String, Map.Entry<String, String>> buildTransformIOMap(

Review Comment:
   Moved it to JSON renderer, it still needs to be a separate scan of the pipeline using SamzaPipelineVisitor instead of a generic Beam PipelineVisitor which is used by the JSONRenderer reason being SamzaPipelineVisitor traverses the pipeline differently (does not enter some composite transforms I if they can be directly translated) - PValues Maps i need to be populated dependent on this logic 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] xinyuiscool merged pull request #26276: Populate TransformIOMap as Config for Beam Samza Runner

Posted by "xinyuiscool (via GitHub)" <gi...@apache.org>.
xinyuiscool merged PR #26276:
URL: https://github.com/apache/beam/pull/26276


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #26276: Populate TransformIOMap as Config for Beam Samza Runner

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #26276:
URL: https://github.com/apache/beam/pull/26276#issuecomment-1507798141

   Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Sanil15 commented on pull request #26276: Populate TransformIOMap as Config for Beam Samza Runner

Posted by "Sanil15 (via GitHub)" <gi...@apache.org>.
Sanil15 commented on PR #26276:
URL: https://github.com/apache/beam/pull/26276#issuecomment-1515095461

   > General questions
   > 
   > 1. Why use JSON as the serialization format when protobuf is the recommended way of serializing internal components within beam?
   > 2. Can we leverage existing protobuf representation of the pipeline and use that instead of JSON? i.e., populating the same configuration w/ toProto(pipeline) and deserializing from the config?
   > 3. Are there any potential pitfalls with varying versions of jackson across beam and samza which can cause issues?
   > 
   > @xinyuiscool in case you have something to chime in on leveraging protobuf representations
   
   - IMO using protobuf might over-complicate things for a simple MapEntry<String, String> serializer 
   - Protobuf is better since it has a schema defined but here we are just converting a list of string -> string, I wanted to to ideally use Pair<String, String>, but there is no serializer for it 
   - PipelineJsonRenderer already uses JSON
   - Configs needs to be readable for debugging, if we serialize this with protobuf, the serialised string will not be readable via configs while debugging - it would need a deserializer 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Sanil15 commented on pull request #26276: Populate TransformIOMap as Config for Beam Samza Runner

Posted by "Sanil15 (via GitHub)" <gi...@apache.org>.
Sanil15 commented on PR #26276:
URL: https://github.com/apache/beam/pull/26276#issuecomment-1515465360

   > > * Configs needs to be readable for debugging, if we serialize this with protobuf, the serialised string will not be readable via configs while debugging - it would need a deserializer
   > 
   > This is an internal configuration not for external consumption for user. I'd rather keep it hidden than have it readable. If you want it for debug purpose, you can use the debug logs to infer as opposed to exposing this configuration externally.
   > 
   > > PipelineJsonRenderer already uses JSON
   > 
   > It is external facing so that we can render the DAG for observability. It is not necessarily used internally.
   > 
   > > IMO using protobuf might over-complicate things for a simple MapEntry<String, String> serializer
   > 
   > `Map<string, string>` is not an evolvable data model and if you need additional metadata evolving the string to have record delimitters and so-on complicates evolution and compatibility handling.
   > 
   > All said, If we can't leverage the protobuf representation of the pipeline, can we use `BEAM_JSON_GRAPH` which also has information about the transforms. Can that be leveraged?
   
   @mynameborat Found a way to use BEAM_JSON_GRAPH to attach transformIoInfo in addition to graphLinks and the graph  there - check the ExpectedDag as the sample.
   - Removed Jackson with custom MapEntrySerializer we do not need it 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] xinyuiscool commented on a diff in pull request #26276: Populate TransformIOMap as Config for Beam Samza Runner

Posted by "xinyuiscool (via GitHub)" <gi...@apache.org>.
xinyuiscool commented on code in PR #26276:
URL: https://github.com/apache/beam/pull/26276#discussion_r1172872669


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java:
##########
@@ -107,6 +114,47 @@ public static void createConfig(
     pipeline.traverseTopologically(visitor);
   }
 
+  /**
+   * Builds a map from PTransform to its input and output PValues. The map is serialized and stored
+   * in the job config.
+   */
+  public static Map<String, Map.Entry<String, String>> buildTransformIOMap(

Review Comment:
   Move this to the json renderer class. This method has nothing to do with translation. Please also think about whether this needs a separate scan of pipeline or we can consolidate with the scan inside Json renderer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mynameborat commented on pull request #26276: Populate TransformIOMap as Config for Beam Samza Runner

Posted by "mynameborat (via GitHub)" <gi...@apache.org>.
mynameborat commented on PR #26276:
URL: https://github.com/apache/beam/pull/26276#issuecomment-1515128208

   > * Configs needs to be readable for debugging, if we serialize this with protobuf, the serialised string will not be readable via configs while debugging - it would need a deserializer
   
   This is an internal configuration not for external consumption for user. I'd rather keep it hidden than have it readable. If you want it for debug purpose, you can use the debug logs to infer as opposed to exposing this configuration externally.
   
   >PipelineJsonRenderer already uses JSON
   
   It is external facing so that we can render the DAG for observability. It is not necessarily used internally.
   
   >IMO using protobuf might over-complicate things for a simple MapEntry<String, String> serializer
   
   `Map<string, string>` is not an evolvable data model and if you need additional metadata evolving the string to have record delimitters and so-on complicates evolution and compatibility handling. 
   
   All said, If we can't leverage the protobuf representation of the pipeline, can we use `BEAM_JSON_GRAPH` which also has information about the transforms. Can that be leveraged?
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Sanil15 commented on a diff in pull request #26276: Populate TransformIOMap as Config for Beam Samza Runner

Posted by "Sanil15 (via GitHub)" <gi...@apache.org>.
Sanil15 commented on code in PR #26276:
URL: https://github.com/apache/beam/pull/26276#discussion_r1172940498


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java:
##########
@@ -107,6 +114,47 @@ public static void createConfig(
     pipeline.traverseTopologically(visitor);
   }
 
+  /**
+   * Builds a map from PTransform to its input and output PValues. The map is serialized and stored
+   * in the job config.
+   */
+  public static Map<String, Map.Entry<String, String>> buildTransformIOMap(

Review Comment:
   Moved it to JSON renderer, it still needs to be a separate scan of the pipeline using SamzaPipelineVisitor instead of a generic Beam PipelineVisitor which is used by the JSONRenderer reason being SamzaPipelineVisitor traverses the pipeline differently (does not enter some composite transforms I if they can be directly translated). The PValues Maps we need for MetricOp stuff at runtime need to be populated using this traversal logic from SamzaPipelineVisitor 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Sanil15 commented on a diff in pull request #26276: Populate TransformIOMap as Config for Beam Samza Runner

Posted by "Sanil15 (via GitHub)" <gi...@apache.org>.
Sanil15 commented on code in PR #26276:
URL: https://github.com/apache/beam/pull/26276#discussion_r1172936179


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java:
##########
@@ -130,21 +130,23 @@ public SamzaPipelineResult run(Pipeline pipeline) {
           PipelineDotRenderer.toDotString(pipeline));
       LOG.debug(
           "Pre-processed Beam pipeline in json format:\n{}",
-          PipelineJsonRenderer.toJsonString(pipeline));
+          PipelineJsonRenderer.toJsonString(pipeline, Collections.emptyMap()));

Review Comment:
   Make sense, did this general readability refactor for all



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mynameborat commented on pull request #26276: Populate TransformIOMap as Config for Beam Samza Runner

Posted by "mynameborat (via GitHub)" <gi...@apache.org>.
mynameborat commented on PR #26276:
URL: https://github.com/apache/beam/pull/26276#issuecomment-1514840227

   General questions
   1. Why use JSON as the serialization format when protobuf is the recommended way of serializing internal components within beam? 
   2. Can we leverage existing protobuf representation of the pipeline and use that instead of JSON? i.e., populating the same configuration w/ toProto(pipeline) and deserializing from the config?
   3. Are there any potential pitfalls with varying versions of jackson across beam and samza which can cause issues?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Sanil15 commented on pull request #26276: Populate TransformIOMap as Config for Beam Samza Runner

Posted by "Sanil15 (via GitHub)" <gi...@apache.org>.
Sanil15 commented on PR #26276:
URL: https://github.com/apache/beam/pull/26276#issuecomment-1507797453

   R: @xinyuiscool @mynameborat please take a look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] xinyuiscool commented on a diff in pull request #26276: Populate TransformIOMap as Config for Beam Samza Runner

Posted by "xinyuiscool (via GitHub)" <gi...@apache.org>.
xinyuiscool commented on code in PR #26276:
URL: https://github.com/apache/beam/pull/26276#discussion_r1172872669


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java:
##########
@@ -107,6 +114,47 @@ public static void createConfig(
     pipeline.traverseTopologically(visitor);
   }
 
+  /**
+   * Builds a map from PTransform to its input and output PValues. The map is serialized and stored
+   * in the job config.
+   */
+  public static Map<String, Map.Entry<String, String>> buildTransformIOMap(

Review Comment:
   Move this to the json renderer class. This method has nothing to do with translation.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java:
##########
@@ -130,21 +130,23 @@ public SamzaPipelineResult run(Pipeline pipeline) {
           PipelineDotRenderer.toDotString(pipeline));
       LOG.debug(
           "Pre-processed Beam pipeline in json format:\n{}",
-          PipelineJsonRenderer.toJsonString(pipeline));
+          PipelineJsonRenderer.toJsonString(pipeline, Collections.emptyMap()));
     }
 
     pipeline.replaceAll(SamzaTransformOverrides.getDefaultOverrides());
 
+    final Map<PValue, String> idMap = PViewToIdMapper.buildIdMap(pipeline);
+    final Set<String> nonUniqueStateIds = StateIdParser.scan(pipeline);
+    final Map<String, Map.Entry<String, String>> transformIOMap =
+        SamzaPipelineTranslator.buildTransformIOMap(pipeline, options, idMap, nonUniqueStateIds);
+
     final String dotGraph = PipelineDotRenderer.toDotString(pipeline);
     LOG.info("Beam pipeline DOT graph:\n{}", dotGraph);
 
-    final String jsonGraph = PipelineJsonRenderer.toJsonString(pipeline);
+    final String jsonGraph = PipelineJsonRenderer.toJsonString(pipeline, transformIOMap);

Review Comment:
   same above: create a ConfigContext from idMap... and pass it in.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java:
##########
@@ -130,21 +130,23 @@ public SamzaPipelineResult run(Pipeline pipeline) {
           PipelineDotRenderer.toDotString(pipeline));
       LOG.debug(
           "Pre-processed Beam pipeline in json format:\n{}",
-          PipelineJsonRenderer.toJsonString(pipeline));
+          PipelineJsonRenderer.toJsonString(pipeline, Collections.emptyMap()));

Review Comment:
   Instead of passing in this emptyMap(), let's create a ConfigContext and pass it in here. This map should be constructed inside the JsonRenderer.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/util/PipelineJsonRenderer.java:
##########
@@ -86,9 +87,12 @@ public static String toJsonString(RunnerApi.Pipeline pipeline) {
   private final StringBuilder jsonBuilder = new StringBuilder();
   private final StringBuilder graphLinks = new StringBuilder();
   private final Map<PValue, String> valueToProducerNodeName = new HashMap<>();
+  private final Map<String, Map.Entry<String, String>> transformIOMap;
   private int indent;
 
-  private PipelineJsonRenderer() {}
+  private PipelineJsonRenderer(Map<String, Map.Entry<String, String>> transformIOMap) {

Review Comment:
   Same above.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java:
##########
@@ -130,21 +130,23 @@ public SamzaPipelineResult run(Pipeline pipeline) {
           PipelineDotRenderer.toDotString(pipeline));
       LOG.debug(
           "Pre-processed Beam pipeline in json format:\n{}",
-          PipelineJsonRenderer.toJsonString(pipeline));
+          PipelineJsonRenderer.toJsonString(pipeline, Collections.emptyMap()));
     }
 
     pipeline.replaceAll(SamzaTransformOverrides.getDefaultOverrides());
 
+    final Map<PValue, String> idMap = PViewToIdMapper.buildIdMap(pipeline);
+    final Set<String> nonUniqueStateIds = StateIdParser.scan(pipeline);
+    final Map<String, Map.Entry<String, String>> transformIOMap =
+        SamzaPipelineTranslator.buildTransformIOMap(pipeline, options, idMap, nonUniqueStateIds);
+
     final String dotGraph = PipelineDotRenderer.toDotString(pipeline);
     LOG.info("Beam pipeline DOT graph:\n{}", dotGraph);
 
-    final String jsonGraph = PipelineJsonRenderer.toJsonString(pipeline);
+    final String jsonGraph = PipelineJsonRenderer.toJsonString(pipeline, transformIOMap);
     LOG.info("Beam pipeline JSON graph:\n{}", jsonGraph);
 
-    final Map<PValue, String> idMap = PViewToIdMapper.buildIdMap(pipeline);
-    final Set<String> nonUniqueStateIds = StateIdParser.scan(pipeline);
     final ConfigBuilder configBuilder = new ConfigBuilder(options);
-
     SamzaPipelineTranslator.createConfig(
         pipeline, options, idMap, nonUniqueStateIds, configBuilder);

Review Comment:
   Let's refactor this method a bit to pass in the previously created ConfigContext instead of options, idMap, nonUni.. That way the code is much more readable and extendable.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/util/PipelineJsonRenderer.java:
##########
@@ -66,8 +66,9 @@ public interface SamzaIORegistrar {
    * @param pipeline The beam pipeline
    * @return JSON string representation of the pipeline
    */
-  public static String toJsonString(Pipeline pipeline) {
-    final PipelineJsonRenderer visitor = new PipelineJsonRenderer();
+  public static String toJsonString(
+      Pipeline pipeline, Map<String, Map.Entry<String, String>> transformIOMap) {

Review Comment:
   Instead of pass in transformIOMap, we should pass in a ConfigContext and then construct the IOMap here. The IO map is only used for creating json.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java:
##########
@@ -130,21 +130,23 @@ public SamzaPipelineResult run(Pipeline pipeline) {
           PipelineDotRenderer.toDotString(pipeline));
       LOG.debug(
           "Pre-processed Beam pipeline in json format:\n{}",
-          PipelineJsonRenderer.toJsonString(pipeline));
+          PipelineJsonRenderer.toJsonString(pipeline, Collections.emptyMap()));
     }
 
     pipeline.replaceAll(SamzaTransformOverrides.getDefaultOverrides());
 
+    final Map<PValue, String> idMap = PViewToIdMapper.buildIdMap(pipeline);
+    final Set<String> nonUniqueStateIds = StateIdParser.scan(pipeline);
+    final Map<String, Map.Entry<String, String>> transformIOMap =

Review Comment:
   Let's not create this map here. Instead, create a ConfigContext and pass it in below.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Sanil15 commented on a diff in pull request #26276: Populate TransformIOMap as Config for Beam Samza Runner

Posted by "Sanil15 (via GitHub)" <gi...@apache.org>.
Sanil15 commented on code in PR #26276:
URL: https://github.com/apache/beam/pull/26276#discussion_r1172941107


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java:
##########
@@ -130,21 +130,23 @@ public SamzaPipelineResult run(Pipeline pipeline) {
           PipelineDotRenderer.toDotString(pipeline));
       LOG.debug(
           "Pre-processed Beam pipeline in json format:\n{}",
-          PipelineJsonRenderer.toJsonString(pipeline));
+          PipelineJsonRenderer.toJsonString(pipeline, Collections.emptyMap()));
     }
 
     pipeline.replaceAll(SamzaTransformOverrides.getDefaultOverrides());
 
+    final Map<PValue, String> idMap = PViewToIdMapper.buildIdMap(pipeline);
+    final Set<String> nonUniqueStateIds = StateIdParser.scan(pipeline);
+    final Map<String, Map.Entry<String, String>> transformIOMap =
+        SamzaPipelineTranslator.buildTransformIOMap(pipeline, options, idMap, nonUniqueStateIds);
+
     final String dotGraph = PipelineDotRenderer.toDotString(pipeline);
     LOG.info("Beam pipeline DOT graph:\n{}", dotGraph);
 
-    final String jsonGraph = PipelineJsonRenderer.toJsonString(pipeline);
+    final String jsonGraph = PipelineJsonRenderer.toJsonString(pipeline, transformIOMap);
     LOG.info("Beam pipeline JSON graph:\n{}", jsonGraph);
 
-    final Map<PValue, String> idMap = PViewToIdMapper.buildIdMap(pipeline);
-    final Set<String> nonUniqueStateIds = StateIdParser.scan(pipeline);
     final ConfigBuilder configBuilder = new ConfigBuilder(options);
-
     SamzaPipelineTranslator.createConfig(
         pipeline, options, idMap, nonUniqueStateIds, configBuilder);

Review Comment:
   done!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Sanil15 commented on a diff in pull request #26276: Populate TransformIOMap as Config for Beam Samza Runner

Posted by "Sanil15 (via GitHub)" <gi...@apache.org>.
Sanil15 commented on code in PR #26276:
URL: https://github.com/apache/beam/pull/26276#discussion_r1172940811


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/util/PipelineJsonRenderer.java:
##########
@@ -66,8 +66,9 @@ public interface SamzaIORegistrar {
    * @param pipeline The beam pipeline
    * @return JSON string representation of the pipeline
    */
-  public static String toJsonString(Pipeline pipeline) {
-    final PipelineJsonRenderer visitor = new PipelineJsonRenderer();
+  public static String toJsonString(
+      Pipeline pipeline, Map<String, Map.Entry<String, String>> transformIOMap) {

Review Comment:
   Did all the refactors for configContext



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org