You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/07/20 19:53:13 UTC

[17/50] [abbrv] beam git commit: Make Dataflow Counter Name Parsing more Robust

Make Dataflow Counter Name Parsing more Robust


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c5ebbff5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c5ebbff5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c5ebbff5

Branch: refs/heads/DSL_SQL
Commit: c5ebbff584834d16e3aff9859c90122cf9ed5ef2
Parents: 04d364d
Author: Pablo <pa...@google.com>
Authored: Wed Jun 28 15:20:53 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Jul 17 16:01:38 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowMetrics.java  | 30 +++++++----
 .../runners/dataflow/DataflowMetricsTest.java   | 53 +++++++++++++++-----
 2 files changed, 59 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c5ebbff5/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
index 31b6cda..330cc7e 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
@@ -79,9 +79,14 @@ class DataflowMetrics extends MetricResults {
   private MetricKey metricHashKey(
       com.google.api.services.dataflow.model.MetricUpdate metricUpdate) {
     String fullStepName = metricUpdate.getName().getContext().get("step");
-    fullStepName = (dataflowPipelineJob.transformStepNames != null
-        ? dataflowPipelineJob.transformStepNames
-        .inverse().get(fullStepName).getFullName() : fullStepName);
+    if (dataflowPipelineJob.transformStepNames == null
+        || !dataflowPipelineJob.transformStepNames.inverse().containsKey(fullStepName)) {
+      // If we can't translate internal step names to user step names, we just skip them
+      // altogether.
+      return null;
+    }
+    fullStepName = dataflowPipelineJob.transformStepNames
+        .inverse().get(fullStepName).getFullName();
     return MetricKey.create(
         fullStepName,
         MetricName.named(
@@ -119,15 +124,18 @@ class DataflowMetrics extends MetricResults {
     // If the Context of the metric update does not have a namespace, then these are not
     // actual metrics counters.
     for (com.google.api.services.dataflow.model.MetricUpdate update : metricUpdates) {
-      if (Objects.equal(update.getName().getOrigin(), "user") && isMetricTentative(update)
+      if (Objects.equal(update.getName().getOrigin(), "user")
           && update.getName().getContext().containsKey("namespace")) {
-        tentativeByName.put(metricHashKey(update), update);
-        metricHashKeys.add(metricHashKey(update));
-      } else if (Objects.equal(update.getName().getOrigin(), "user")
-          && update.getName().getContext().containsKey("namespace")
-          && !isMetricTentative(update)) {
-        committedByName.put(metricHashKey(update), update);
-        metricHashKeys.add(metricHashKey(update));
+        MetricKey key = metricHashKey(update);
+        if (key == null) {
+          continue;
+        }
+        metricHashKeys.add(key);
+        if (isMetricTentative(update)) {
+          tentativeByName.put(key, update);
+        } else {
+          committedByName.put(key, update);
+        }
       }
     }
     // Create the lists with the metric result information.

http://git-wip-us.apache.org/repos/asf/beam/blob/c5ebbff5/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java
index 85a0979..c3c741c 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java
@@ -33,6 +33,7 @@ import com.google.api.services.dataflow.model.Job;
 import com.google.api.services.dataflow.model.JobMetrics;
 import com.google.api.services.dataflow.model.MetricStructuredName;
 import com.google.api.services.dataflow.model.MetricUpdate;
+import com.google.common.collect.HashBiMap;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
@@ -42,6 +43,7 @@ import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
 import org.apache.beam.sdk.extensions.gcp.storage.NoopPathValidator;
 import org.apache.beam.sdk.metrics.MetricQueryResults;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -155,6 +157,11 @@ public class DataflowMetricsTest {
     when(job.getState()).thenReturn(State.RUNNING);
     job.jobId = JOB_ID;
 
+    AppliedPTransform<?, ?, ?> myStep = mock(AppliedPTransform.class);
+    when(myStep.getFullName()).thenReturn("myStepName");
+    job.transformStepNames = HashBiMap.create();
+    job.transformStepNames.put(myStep, "s2");
+
     MetricUpdate update = new MetricUpdate();
     long stepValue = 1234L;
     update.setScalar(new BigDecimal(stepValue));
@@ -172,9 +179,9 @@ public class DataflowMetricsTest {
     DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient);
     MetricQueryResults result = dataflowMetrics.queryMetrics(null);
     assertThat(result.counters(), containsInAnyOrder(
-        attemptedMetricsResult("counterNamespace", "counterName", "s2", 1233L)));
+        attemptedMetricsResult("counterNamespace", "counterName", "myStepName", 1233L)));
     assertThat(result.counters(), containsInAnyOrder(
-        committedMetricsResult("counterNamespace", "counterName", "s2", 1234L)));
+        committedMetricsResult("counterNamespace", "counterName", "myStepName", 1234L)));
   }
 
   @Test
@@ -186,20 +193,25 @@ public class DataflowMetricsTest {
     when(job.getState()).thenReturn(State.RUNNING);
     job.jobId = JOB_ID;
 
+    AppliedPTransform<?, ?, ?> myStep = mock(AppliedPTransform.class);
+    when(myStep.getFullName()).thenReturn("myStepName");
+    job.transformStepNames = HashBiMap.create();
+    job.transformStepNames.put(myStep, "s2");
+
     // The parser relies on the fact that one tentative and one committed metric update exist in
     // the job metrics results.
     jobMetrics.setMetrics(ImmutableList.of(
         makeCounterMetricUpdate("counterName", "counterNamespace", "s2", 1233L, false),
         makeCounterMetricUpdate("counterName", "counterNamespace", "s2", 1234L, true),
-        makeCounterMetricUpdate("otherCounter[MIN]", "otherNamespace", "s3", 0L, false),
-        makeCounterMetricUpdate("otherCounter[MIN]", "otherNamespace", "s3", 0L, true)));
+        makeCounterMetricUpdate("otherCounter[MIN]", "otherNamespace", "s2", 0L, false),
+        makeCounterMetricUpdate("otherCounter[MIN]", "otherNamespace", "s2", 0L, true)));
 
     DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient);
     MetricQueryResults result = dataflowMetrics.queryMetrics(null);
     assertThat(result.counters(), containsInAnyOrder(
-        attemptedMetricsResult("counterNamespace", "counterName", "s2", 1234L)));
+        attemptedMetricsResult("counterNamespace", "counterName", "myStepName", 1234L)));
     assertThat(result.counters(), containsInAnyOrder(
-        committedMetricsResult("counterNamespace", "counterName", "s2", 1233L)));
+        committedMetricsResult("counterNamespace", "counterName", "myStepName", 1233L)));
   }
 
   @Test
@@ -211,6 +223,18 @@ public class DataflowMetricsTest {
     when(job.getState()).thenReturn(State.RUNNING);
     job.jobId = JOB_ID;
 
+    AppliedPTransform<?, ?, ?> myStep2 = mock(AppliedPTransform.class);
+    when(myStep2.getFullName()).thenReturn("myStepName");
+    job.transformStepNames = HashBiMap.create();
+    job.transformStepNames.put(myStep2, "s2");
+    AppliedPTransform<?, ?, ?> myStep3 = mock(AppliedPTransform.class);
+    when(myStep3.getFullName()).thenReturn("myStepName3");
+    job.transformStepNames.put(myStep3, "s3");
+    AppliedPTransform<?, ?, ?> myStep4 = mock(AppliedPTransform.class);
+    when(myStep4.getFullName()).thenReturn("myStepName4");
+    job.transformStepNames.put(myStep4, "s4");
+
+
     // The parser relies on the fact that one tentative and one committed metric update exist in
     // the job metrics results.
     jobMetrics.setMetrics(ImmutableList.of(
@@ -219,17 +243,20 @@ public class DataflowMetricsTest {
         makeCounterMetricUpdate("otherCounter", "otherNamespace", "s3", 12L, false),
         makeCounterMetricUpdate("otherCounter", "otherNamespace", "s3", 12L, true),
         makeCounterMetricUpdate("counterName", "otherNamespace", "s4", 1200L, false),
-        makeCounterMetricUpdate("counterName", "otherNamespace", "s4", 1233L, true)));
+        makeCounterMetricUpdate("counterName", "otherNamespace", "s4", 1233L, true),
+        // The following counter can not have its name translated thus it won't appear.
+        makeCounterMetricUpdate("lostName", "otherNamespace", "s5", 1200L, false),
+        makeCounterMetricUpdate("lostName", "otherNamespace", "s5", 1200L, true)));
 
     DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient);
     MetricQueryResults result = dataflowMetrics.queryMetrics(null);
     assertThat(result.counters(), containsInAnyOrder(
-        attemptedMetricsResult("counterNamespace", "counterName", "s2", 1234L),
-        attemptedMetricsResult("otherNamespace", "otherCounter", "s3", 12L),
-        attemptedMetricsResult("otherNamespace", "counterName", "s4", 1233L)));
+        attemptedMetricsResult("counterNamespace", "counterName", "myStepName", 1234L),
+        attemptedMetricsResult("otherNamespace", "otherCounter", "myStepName3", 12L),
+        attemptedMetricsResult("otherNamespace", "counterName", "myStepName4", 1233L)));
     assertThat(result.counters(), containsInAnyOrder(
-        committedMetricsResult("counterNamespace", "counterName", "s2", 1233L),
-        committedMetricsResult("otherNamespace", "otherCounter", "s3", 12L),
-        committedMetricsResult("otherNamespace", "counterName", "s4", 1200L)));
+        committedMetricsResult("counterNamespace", "counterName", "myStepName", 1233L),
+        committedMetricsResult("otherNamespace", "otherCounter", "myStepName3", 12L),
+        committedMetricsResult("otherNamespace", "counterName", "myStepName4", 1200L)));
   }
 }