You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/07/20 19:53:13 UTC
[17/50] [abbrv] beam git commit: Make Dataflow Counter Name Parsing
more Robust
Make Dataflow Counter Name Parsing more Robust
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c5ebbff5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c5ebbff5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c5ebbff5
Branch: refs/heads/DSL_SQL
Commit: c5ebbff584834d16e3aff9859c90122cf9ed5ef2
Parents: 04d364d
Author: Pablo <pa...@google.com>
Authored: Wed Jun 28 15:20:53 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Jul 17 16:01:38 2017 -0700
----------------------------------------------------------------------
.../beam/runners/dataflow/DataflowMetrics.java | 30 +++++++----
.../runners/dataflow/DataflowMetricsTest.java | 53 +++++++++++++++-----
2 files changed, 59 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/c5ebbff5/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
index 31b6cda..330cc7e 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
@@ -79,9 +79,14 @@ class DataflowMetrics extends MetricResults {
private MetricKey metricHashKey(
com.google.api.services.dataflow.model.MetricUpdate metricUpdate) {
String fullStepName = metricUpdate.getName().getContext().get("step");
- fullStepName = (dataflowPipelineJob.transformStepNames != null
- ? dataflowPipelineJob.transformStepNames
- .inverse().get(fullStepName).getFullName() : fullStepName);
+ if (dataflowPipelineJob.transformStepNames == null
+ || !dataflowPipelineJob.transformStepNames.inverse().containsKey(fullStepName)) {
+ // If we can't translate internal step names to user step names, we just skip them
+ // altogether.
+ return null;
+ }
+ fullStepName = dataflowPipelineJob.transformStepNames
+ .inverse().get(fullStepName).getFullName();
return MetricKey.create(
fullStepName,
MetricName.named(
@@ -119,15 +124,18 @@ class DataflowMetrics extends MetricResults {
// If the Context of the metric update does not have a namespace, then these are not
// actual metrics counters.
for (com.google.api.services.dataflow.model.MetricUpdate update : metricUpdates) {
- if (Objects.equal(update.getName().getOrigin(), "user") && isMetricTentative(update)
+ if (Objects.equal(update.getName().getOrigin(), "user")
&& update.getName().getContext().containsKey("namespace")) {
- tentativeByName.put(metricHashKey(update), update);
- metricHashKeys.add(metricHashKey(update));
- } else if (Objects.equal(update.getName().getOrigin(), "user")
- && update.getName().getContext().containsKey("namespace")
- && !isMetricTentative(update)) {
- committedByName.put(metricHashKey(update), update);
- metricHashKeys.add(metricHashKey(update));
+ MetricKey key = metricHashKey(update);
+ if (key == null) {
+ continue;
+ }
+ metricHashKeys.add(key);
+ if (isMetricTentative(update)) {
+ tentativeByName.put(key, update);
+ } else {
+ committedByName.put(key, update);
+ }
}
}
// Create the lists with the metric result information.
http://git-wip-us.apache.org/repos/asf/beam/blob/c5ebbff5/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java
index 85a0979..c3c741c 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java
@@ -33,6 +33,7 @@ import com.google.api.services.dataflow.model.Job;
import com.google.api.services.dataflow.model.JobMetrics;
import com.google.api.services.dataflow.model.MetricStructuredName;
import com.google.api.services.dataflow.model.MetricUpdate;
+import com.google.common.collect.HashBiMap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
@@ -42,6 +43,7 @@ import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
import org.apache.beam.sdk.extensions.gcp.storage.NoopPathValidator;
import org.apache.beam.sdk.metrics.MetricQueryResults;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.runners.AppliedPTransform;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -155,6 +157,11 @@ public class DataflowMetricsTest {
when(job.getState()).thenReturn(State.RUNNING);
job.jobId = JOB_ID;
+ AppliedPTransform<?, ?, ?> myStep = mock(AppliedPTransform.class);
+ when(myStep.getFullName()).thenReturn("myStepName");
+ job.transformStepNames = HashBiMap.create();
+ job.transformStepNames.put(myStep, "s2");
+
MetricUpdate update = new MetricUpdate();
long stepValue = 1234L;
update.setScalar(new BigDecimal(stepValue));
@@ -172,9 +179,9 @@ public class DataflowMetricsTest {
DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient);
MetricQueryResults result = dataflowMetrics.queryMetrics(null);
assertThat(result.counters(), containsInAnyOrder(
- attemptedMetricsResult("counterNamespace", "counterName", "s2", 1233L)));
+ attemptedMetricsResult("counterNamespace", "counterName", "myStepName", 1233L)));
assertThat(result.counters(), containsInAnyOrder(
- committedMetricsResult("counterNamespace", "counterName", "s2", 1234L)));
+ committedMetricsResult("counterNamespace", "counterName", "myStepName", 1234L)));
}
@Test
@@ -186,20 +193,25 @@ public class DataflowMetricsTest {
when(job.getState()).thenReturn(State.RUNNING);
job.jobId = JOB_ID;
+ AppliedPTransform<?, ?, ?> myStep = mock(AppliedPTransform.class);
+ when(myStep.getFullName()).thenReturn("myStepName");
+ job.transformStepNames = HashBiMap.create();
+ job.transformStepNames.put(myStep, "s2");
+
// The parser relies on the fact that one tentative and one committed metric update exist in
// the job metrics results.
jobMetrics.setMetrics(ImmutableList.of(
makeCounterMetricUpdate("counterName", "counterNamespace", "s2", 1233L, false),
makeCounterMetricUpdate("counterName", "counterNamespace", "s2", 1234L, true),
- makeCounterMetricUpdate("otherCounter[MIN]", "otherNamespace", "s3", 0L, false),
- makeCounterMetricUpdate("otherCounter[MIN]", "otherNamespace", "s3", 0L, true)));
+ makeCounterMetricUpdate("otherCounter[MIN]", "otherNamespace", "s2", 0L, false),
+ makeCounterMetricUpdate("otherCounter[MIN]", "otherNamespace", "s2", 0L, true)));
DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient);
MetricQueryResults result = dataflowMetrics.queryMetrics(null);
assertThat(result.counters(), containsInAnyOrder(
- attemptedMetricsResult("counterNamespace", "counterName", "s2", 1234L)));
+ attemptedMetricsResult("counterNamespace", "counterName", "myStepName", 1234L)));
assertThat(result.counters(), containsInAnyOrder(
- committedMetricsResult("counterNamespace", "counterName", "s2", 1233L)));
+ committedMetricsResult("counterNamespace", "counterName", "myStepName", 1233L)));
}
@Test
@@ -211,6 +223,18 @@ public class DataflowMetricsTest {
when(job.getState()).thenReturn(State.RUNNING);
job.jobId = JOB_ID;
+ AppliedPTransform<?, ?, ?> myStep2 = mock(AppliedPTransform.class);
+ when(myStep2.getFullName()).thenReturn("myStepName");
+ job.transformStepNames = HashBiMap.create();
+ job.transformStepNames.put(myStep2, "s2");
+ AppliedPTransform<?, ?, ?> myStep3 = mock(AppliedPTransform.class);
+ when(myStep3.getFullName()).thenReturn("myStepName3");
+ job.transformStepNames.put(myStep3, "s3");
+ AppliedPTransform<?, ?, ?> myStep4 = mock(AppliedPTransform.class);
+ when(myStep4.getFullName()).thenReturn("myStepName4");
+ job.transformStepNames.put(myStep4, "s4");
+
+
// The parser relies on the fact that one tentative and one committed metric update exist in
// the job metrics results.
jobMetrics.setMetrics(ImmutableList.of(
@@ -219,17 +243,20 @@ public class DataflowMetricsTest {
makeCounterMetricUpdate("otherCounter", "otherNamespace", "s3", 12L, false),
makeCounterMetricUpdate("otherCounter", "otherNamespace", "s3", 12L, true),
makeCounterMetricUpdate("counterName", "otherNamespace", "s4", 1200L, false),
- makeCounterMetricUpdate("counterName", "otherNamespace", "s4", 1233L, true)));
+ makeCounterMetricUpdate("counterName", "otherNamespace", "s4", 1233L, true),
+ // The following counter can not have its name translated thus it won't appear.
+ makeCounterMetricUpdate("lostName", "otherNamespace", "s5", 1200L, false),
+ makeCounterMetricUpdate("lostName", "otherNamespace", "s5", 1200L, true)));
DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient);
MetricQueryResults result = dataflowMetrics.queryMetrics(null);
assertThat(result.counters(), containsInAnyOrder(
- attemptedMetricsResult("counterNamespace", "counterName", "s2", 1234L),
- attemptedMetricsResult("otherNamespace", "otherCounter", "s3", 12L),
- attemptedMetricsResult("otherNamespace", "counterName", "s4", 1233L)));
+ attemptedMetricsResult("counterNamespace", "counterName", "myStepName", 1234L),
+ attemptedMetricsResult("otherNamespace", "otherCounter", "myStepName3", 12L),
+ attemptedMetricsResult("otherNamespace", "counterName", "myStepName4", 1233L)));
assertThat(result.counters(), containsInAnyOrder(
- committedMetricsResult("counterNamespace", "counterName", "s2", 1233L),
- committedMetricsResult("otherNamespace", "otherCounter", "s3", 12L),
- committedMetricsResult("otherNamespace", "counterName", "s4", 1200L)));
+ committedMetricsResult("counterNamespace", "counterName", "myStepName", 1233L),
+ committedMetricsResult("otherNamespace", "otherCounter", "myStepName3", 12L),
+ committedMetricsResult("otherNamespace", "counterName", "myStepName4", 1200L)));
}
}