You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2021/03/09 08:28:18 UTC

[beam] branch master updated: [BEAM-11033] Updates Dataflow metrics handling to support portable job submission (#14158)

This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c391aba  [BEAM-11033] Updates Dataflow metrics handling to support portable job submission (#14158)
c391aba is described below

commit c391aba4f5edddcd20b25f89e1b987fc482ef129
Author: Chamikara Jayalath <ch...@apache.org>
AuthorDate: Tue Mar 9 00:27:17 2021 -0800

    [BEAM-11033] Updates Dataflow metrics handling to support portable job submission (#14158)
    
    * Updates Dataflow user metrics handling to support portable job submission
    
    * Addresses reviewer comments
    
    * Addresses reviewer comments
---
 .../beam/runners/dataflow/DataflowMetrics.java     | 48 ++++++++++++++++++----
 .../beam/runners/dataflow/DataflowPipelineJob.java | 34 ++++++++++++++-
 .../beam/runners/dataflow/DataflowRunner.java      |  3 +-
 3 files changed, 75 insertions(+), 10 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
index 4469d2f..19449c4 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
@@ -243,22 +243,56 @@ class DataflowMetrics extends MetricResults {
     }
 
     /**
+     * Returns the user step name for a given internal step name.
+     *
+     * @param internalStepName internal step name used by Dataflow
+     * @return user step name used to identify the metric
+     */
+    private @Nullable String getUserStepName(String internalStepName) {
+      if (dataflowPipelineJob.getPipelineProto() != null
+          && dataflowPipelineJob
+              .getPipelineProto()
+              .getComponents()
+              .getTransformsMap()
+              .containsKey(internalStepName)) {
+        // Dataflow Runner v2 with portable job submission uses proto transform map
+        // IDs for step names. Hence we lookup user step names based on the proto.
+        return dataflowPipelineJob
+            .getPipelineProto()
+            .getComponents()
+            .getTransformsMap()
+            .get(internalStepName)
+            .getUniqueName();
+      } else {
+        if (dataflowPipelineJob.transformStepNames == null
+            || !dataflowPipelineJob.transformStepNames.inverse().containsKey(internalStepName)) {
+          // If we can't translate internal step names to user step names, we just skip them
+          // altogether.
+          return null;
+        }
+        return dataflowPipelineJob.transformStepNames.inverse().get(internalStepName).getFullName();
+      }
+    }
+
+    /**
      * Build an {@link MetricKey} that serves as a hash key for a metric update.
      *
      * @return a {@link MetricKey} that can be hashed and used to identify a metric.
      */
-    private MetricKey getMetricHashKey(MetricUpdate metricUpdate) {
-      String fullStepName = metricUpdate.getName().getContext().get("step");
-      if (dataflowPipelineJob.transformStepNames == null
-          || !dataflowPipelineJob.transformStepNames.inverse().containsKey(fullStepName)) {
+    private @Nullable MetricKey getMetricHashKey(MetricUpdate metricUpdate) {
+      String internalStepName = metricUpdate.getName().getContext().get("step");
+      String userStepName = getUserStepName(internalStepName);
+
+      if (userStepName == null
+          && (dataflowPipelineJob.transformStepNames == null
+              || !dataflowPipelineJob.transformStepNames.inverse().containsKey(internalStepName))) {
         // If we can't translate internal step names to user step names, we just skip them
         // altogether.
         return null;
       }
-      fullStepName =
-          dataflowPipelineJob.transformStepNames.inverse().get(fullStepName).getFullName();
+
       return MetricKey.create(
-          fullStepName,
+          userStepName,
           MetricName.named(
               metricUpdate.getName().getContext().get("namespace"),
               metricUpdate.getName().getName()));
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
index 3f14669..b2fdc19 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
@@ -35,6 +35,7 @@ import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.atomic.AtomicReference;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.runners.dataflow.util.MonitoringUtil;
 import org.apache.beam.runners.dataflow.util.MonitoringUtil.JobMessagesHandler;
@@ -121,23 +122,43 @@ public class DataflowPipelineJob implements PipelineResult {
 
   private @Nullable String latestStateString;
 
+  private final @Nullable Pipeline pipelineProto;
+
   /**
    * Constructs the job.
    *
    * @param jobId the job id
    * @param dataflowOptions used to configure the client for the Dataflow Service
    * @param transformStepNames a mapping from AppliedPTransforms to Step Names
+   * @param pipelineProto Runner API pipeline proto.
    */
   public DataflowPipelineJob(
       DataflowClient dataflowClient,
       String jobId,
       DataflowPipelineOptions dataflowOptions,
-      Map<AppliedPTransform<?, ?, ?>, String> transformStepNames) {
+      Map<AppliedPTransform<?, ?, ?>, String> transformStepNames,
+      @Nullable Pipeline pipelineProto) {
     this.dataflowClient = dataflowClient;
     this.jobId = jobId;
     this.dataflowOptions = dataflowOptions;
     this.transformStepNames = HashBiMap.create(firstNonNull(transformStepNames, ImmutableMap.of()));
     this.dataflowMetrics = new DataflowMetrics(this, this.dataflowClient);
+    this.pipelineProto = pipelineProto;
+  }
+
+  /**
+   * Constructs the job.
+   *
+   * @param jobId the job id
+   * @param dataflowOptions used to configure the client for the Dataflow Service
+   * @param transformStepNames a mapping from AppliedPTransforms to Step Names
+   */
+  public DataflowPipelineJob(
+      DataflowClient dataflowClient,
+      String jobId,
+      DataflowPipelineOptions dataflowOptions,
+      Map<AppliedPTransform<?, ?, ?>, String> transformStepNames) {
+    this(dataflowClient, jobId, dataflowOptions, transformStepNames, null);
   }
 
   /** Get the id of this job. */
@@ -154,6 +175,11 @@ public class DataflowPipelineJob implements PipelineResult {
     return dataflowOptions;
   }
 
+  /** Get the Runner API pipeline proto if available. */
+  public @Nullable Pipeline getPipelineProto() {
+    return pipelineProto;
+  }
+
   /** Get the region this job exists in. */
   public String getRegion() {
     return dataflowOptions.getRegion();
@@ -536,7 +562,11 @@ public class DataflowPipelineJob implements PipelineResult {
           terminalState = currentState;
           replacedByJob =
               new DataflowPipelineJob(
-                  dataflowClient, job.getReplacedByJobId(), dataflowOptions, transformStepNames);
+                  dataflowClient,
+                  job.getReplacedByJobId(),
+                  dataflowOptions,
+                  transformStepNames,
+                  pipelineProto);
         }
         return job;
       } catch (IOException exn) {
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index cc0708f..0e3602a 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -1172,7 +1172,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
             DataflowClient.create(options),
             jobResult.getId(),
             options,
-            jobSpecification.getStepNames());
+            jobSpecification.getStepNames(),
+            pipelineProto);
 
     // If the service returned client request id, the SDK needs to compare it
     // with the original id generated in the request, if they are not the same