You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/07/21 20:41:11 UTC

[GitHub] [beam] xinyuiscool commented on a diff in pull request #22370: [22369] Default Metrics for Executable Stages in Samza Runner

xinyuiscool commented on code in PR #22370:
URL: https://github.com/apache/beam/pull/22370#discussion_r927082169


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java:
##########
@@ -274,6 +282,49 @@ public void open(
     }
   }
 
+  private String toStepName(ExecutableStage executableStage) {

Review Comment:
   static? Since this is more like a util function, please move to a util class, e.g. DoFnUtils or something.
   
   Please also add unit tests for this. Please cover the cases that the ExecutableStage starting from PBegin or ending with PEnd.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java:
##########
@@ -369,6 +419,7 @@ public void finishBundle() {
         // RemoteBundle close blocks until all results are received
         remoteBundle.close();
         emitResults();
+        emitMetrics();

Review Comment:
   since we included all the other state ops in the metrics, let's emitMetrics() after clear the state. That will cover the full operation of invoking the bundle.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java:
##########
@@ -274,6 +282,49 @@ public void open(
     }
   }
 
+  private String toStepName(ExecutableStage executableStage) {
+    /*
+     * Look for the first/input ParDo/DoFn in this executable stage by
+     * matching ParDo/DoFn's input PCollection with executable stage's
+     * input PCollection
+     */
+    Set<PipelineNode.PTransformNode> inputs =
+        executableStage.getTransforms().stream()
+            .filter(
+                transform ->
+                    transform
+                        .getTransform()
+                        .getInputsMap()
+                        .containsValue(executableStage.getInputPCollection().getId()))
+            .collect(Collectors.toSet());
+
+    Set<String> outputIds =
+        executableStage.getOutputPCollections().stream()
+            .map(PipelineNode.PCollectionNode::getId)
+            .collect(Collectors.toSet());
+
+    /*
+     * Look for the last/output ParDo/DoFn in this executable stage by
+     * matching ParDo/DoFn's output PCollection(s) with executable stage's
+     * out PCollection(s)
+     */
+    Set<PipelineNode.PTransformNode> outputs =
+        executableStage.getTransforms().stream()
+            .filter(
+                transform ->
+                    CollectionUtils.containsAny(
+                        transform.getTransform().getOutputsMap().values(), outputIds))
+            .collect(Collectors.toSet());
+
+    return String.format("[%s-%s]", toStepName(inputs), toStepName(outputs));
+  }
+
+  private String toStepName(Set<PipelineNode.PTransformNode> nodes) {

Review Comment:
   similar above, make it static, and move to a util class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org