You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2021/03/09 08:28:18 UTC
[beam] branch master updated: [BEAM-11033] Updates Dataflow metrics
handling to support portable job submission (#14158)
This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new c391aba [BEAM-11033] Updates Dataflow metrics handling to support portable job submission (#14158)
c391aba is described below
commit c391aba4f5edddcd20b25f89e1b987fc482ef129
Author: Chamikara Jayalath <ch...@apache.org>
AuthorDate: Tue Mar 9 00:27:17 2021 -0800
[BEAM-11033] Updates Dataflow metrics handling to support portable job submission (#14158)
* Updates Dataflow user metrics handling to support portable job submission
* Addresses reviewer comments
* Addresses reviewer comments
---
.../beam/runners/dataflow/DataflowMetrics.java | 48 ++++++++++++++++++----
.../beam/runners/dataflow/DataflowPipelineJob.java | 34 ++++++++++++++-
.../beam/runners/dataflow/DataflowRunner.java | 3 +-
3 files changed, 75 insertions(+), 10 deletions(-)
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
index 4469d2f..19449c4 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
@@ -243,22 +243,56 @@ class DataflowMetrics extends MetricResults {
}
/**
+ * Returns the user step name for a given internal step name.
+ *
+ * @param internalStepName internal step name used by Dataflow
+ * @return user step name used to identify the metric
+ */
+ private @Nullable String getUserStepName(String internalStepName) {
+ if (dataflowPipelineJob.getPipelineProto() != null
+ && dataflowPipelineJob
+ .getPipelineProto()
+ .getComponents()
+ .getTransformsMap()
+ .containsKey(internalStepName)) {
+ // Dataflow Runner v2 with portable job submission uses proto transform map
+ // IDs for step names. Hence we lookup user step names based on the proto.
+ return dataflowPipelineJob
+ .getPipelineProto()
+ .getComponents()
+ .getTransformsMap()
+ .get(internalStepName)
+ .getUniqueName();
+ } else {
+ if (dataflowPipelineJob.transformStepNames == null
+ || !dataflowPipelineJob.transformStepNames.inverse().containsKey(internalStepName)) {
+ // If we can't translate internal step names to user step names, we just skip them
+ // altogether.
+ return null;
+ }
+ return dataflowPipelineJob.transformStepNames.inverse().get(internalStepName).getFullName();
+ }
+ }
+
+ /**
* Build an {@link MetricKey} that serves as a hash key for a metric update.
*
* @return a {@link MetricKey} that can be hashed and used to identify a metric.
*/
- private MetricKey getMetricHashKey(MetricUpdate metricUpdate) {
- String fullStepName = metricUpdate.getName().getContext().get("step");
- if (dataflowPipelineJob.transformStepNames == null
- || !dataflowPipelineJob.transformStepNames.inverse().containsKey(fullStepName)) {
+ private @Nullable MetricKey getMetricHashKey(MetricUpdate metricUpdate) {
+ String internalStepName = metricUpdate.getName().getContext().get("step");
+ String userStepName = getUserStepName(internalStepName);
+
+ if (userStepName == null
+ && (dataflowPipelineJob.transformStepNames == null
+ || !dataflowPipelineJob.transformStepNames.inverse().containsKey(internalStepName))) {
// If we can't translate internal step names to user step names, we just skip them
// altogether.
return null;
}
- fullStepName =
- dataflowPipelineJob.transformStepNames.inverse().get(fullStepName).getFullName();
+
return MetricKey.create(
- fullStepName,
+ userStepName,
MetricName.named(
metricUpdate.getName().getContext().get("namespace"),
metricUpdate.getName().getName()));
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
index 3f14669..b2fdc19 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
@@ -35,6 +35,7 @@ import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.util.MonitoringUtil;
import org.apache.beam.runners.dataflow.util.MonitoringUtil.JobMessagesHandler;
@@ -121,23 +122,43 @@ public class DataflowPipelineJob implements PipelineResult {
private @Nullable String latestStateString;
+ private final @Nullable Pipeline pipelineProto;
+
/**
* Constructs the job.
*
* @param jobId the job id
* @param dataflowOptions used to configure the client for the Dataflow Service
* @param transformStepNames a mapping from AppliedPTransforms to Step Names
+ * @param pipelineProto Runner API pipeline proto.
*/
public DataflowPipelineJob(
DataflowClient dataflowClient,
String jobId,
DataflowPipelineOptions dataflowOptions,
- Map<AppliedPTransform<?, ?, ?>, String> transformStepNames) {
+ Map<AppliedPTransform<?, ?, ?>, String> transformStepNames,
+ @Nullable Pipeline pipelineProto) {
this.dataflowClient = dataflowClient;
this.jobId = jobId;
this.dataflowOptions = dataflowOptions;
this.transformStepNames = HashBiMap.create(firstNonNull(transformStepNames, ImmutableMap.of()));
this.dataflowMetrics = new DataflowMetrics(this, this.dataflowClient);
+ this.pipelineProto = pipelineProto;
+ }
+
+ /**
+ * Constructs the job.
+ *
+ * @param jobId the job id
+ * @param dataflowOptions used to configure the client for the Dataflow Service
+ * @param transformStepNames a mapping from AppliedPTransforms to Step Names
+ */
+ public DataflowPipelineJob(
+ DataflowClient dataflowClient,
+ String jobId,
+ DataflowPipelineOptions dataflowOptions,
+ Map<AppliedPTransform<?, ?, ?>, String> transformStepNames) {
+ this(dataflowClient, jobId, dataflowOptions, transformStepNames, null);
}
/** Get the id of this job. */
@@ -154,6 +175,11 @@ public class DataflowPipelineJob implements PipelineResult {
return dataflowOptions;
}
+ /** Get the Runner API pipeline proto if available. */
+ public @Nullable Pipeline getPipelineProto() {
+ return pipelineProto;
+ }
+
/** Get the region this job exists in. */
public String getRegion() {
return dataflowOptions.getRegion();
@@ -536,7 +562,11 @@ public class DataflowPipelineJob implements PipelineResult {
terminalState = currentState;
replacedByJob =
new DataflowPipelineJob(
- dataflowClient, job.getReplacedByJobId(), dataflowOptions, transformStepNames);
+ dataflowClient,
+ job.getReplacedByJobId(),
+ dataflowOptions,
+ transformStepNames,
+ pipelineProto);
}
return job;
} catch (IOException exn) {
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index cc0708f..0e3602a 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -1172,7 +1172,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
DataflowClient.create(options),
jobResult.getId(),
options,
- jobSpecification.getStepNames());
+ jobSpecification.getStepNames(),
+ pipelineProto);
// If the service returned client request id, the SDK needs to compare it
// with the original id generated in the request, if they are not the same