You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/07/20 19:53:17 UTC

[21/50] [abbrv] beam git commit: [BEAM-2084] Adding querying facility for distribution metrics in Java

[BEAM-2084] Adding querying facility for distribution metrics in Java


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a48eefac
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a48eefac
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a48eefac

Branch: refs/heads/DSL_SQL
Commit: a48eeface8c5257f34e85c22f312ec03801b0f82
Parents: 7c36318
Author: Pablo <pa...@google.com>
Authored: Thu May 4 14:56:14 2017 -0700
Committer: Ben Chambers <bc...@bchambers-macbookpro2.roam.corp.google.com>
Committed: Tue Jul 18 09:58:47 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/examples/WordCount.java     |   4 +
 pom.xml                                         |   2 +-
 .../beam/runners/dataflow/DataflowMetrics.java  | 310 +++++++++++++------
 .../runners/dataflow/DataflowPipelineJob.java   |   4 +
 .../runners/dataflow/DataflowMetricsTest.java   | 174 ++++++++++-
 .../beam/sdk/metrics/MetricResultsMatchers.java |   2 +-
 6 files changed, 388 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a48eefac/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
index bfa7eb3..2d568ce 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
@@ -21,6 +21,7 @@ import org.apache.beam.examples.common.ExampleUtils;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
 import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
@@ -88,9 +89,12 @@ public class WordCount {
    */
   static class ExtractWordsFn extends DoFn<String, String> {
     private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, "emptyLines");
+    private final Distribution lineLenDist = Metrics.distribution(
+        ExtractWordsFn.class, "lineLenDistro");
 
     @ProcessElement
     public void processElement(ProcessContext c) {
+      lineLenDist.update(c.element().length());
       if (c.element().trim().isEmpty()) {
         emptyLines.inc();
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/a48eefac/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d9ab9ae..d27d367 100644
--- a/pom.xml
+++ b/pom.xml
@@ -112,7 +112,7 @@
     <cloudresourcemanager.version>v1-rev6-1.22.0</cloudresourcemanager.version>
     <pubsubgrpc.version>0.1.0</pubsubgrpc.version>
     <clouddebugger.version>v2-rev8-1.22.0</clouddebugger.version>
-    <dataflow.version>v1b3-rev196-1.22.0</dataflow.version>
+    <dataflow.version>v1b3-rev198-1.20.0</dataflow.version>
     <dataflow.proto.version>0.5.160222</dataflow.proto.version>
     <datastore.client.version>1.4.0</datastore.client.version>
     <datastore.proto.version>1.3.0</datastore.proto.version>

http://git-wip-us.apache.org/repos/asf/beam/blob/a48eefac/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
index 330cc7e..4c9c493 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
@@ -19,7 +19,9 @@ package org.apache.beam.runners.dataflow;
 
 import static com.google.common.base.MoreObjects.firstNonNull;
 
+import com.google.api.client.util.ArrayMap;
 import com.google.api.services.dataflow.model.JobMetrics;
+import com.google.api.services.dataflow.model.MetricUpdate;
 import com.google.auto.value.AutoValue;
 import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableList;
@@ -28,6 +30,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import javax.annotation.Nullable;
 import org.apache.beam.runners.core.construction.metrics.MetricFiltering;
 import org.apache.beam.runners.core.construction.metrics.MetricKey;
 import org.apache.beam.sdk.metrics.DistributionResult;
@@ -73,39 +76,6 @@ class DataflowMetrics extends MetricResults {
   }
 
   /**
-   * Build an immutable map that serves as a hash key for a metric update.
-   * @return a {@link MetricKey} that can be hashed and used to identify a metric.
-   */
-  private MetricKey metricHashKey(
-      com.google.api.services.dataflow.model.MetricUpdate metricUpdate) {
-    String fullStepName = metricUpdate.getName().getContext().get("step");
-    if (dataflowPipelineJob.transformStepNames == null
-        || !dataflowPipelineJob.transformStepNames.inverse().containsKey(fullStepName)) {
-      // If we can't translate internal step names to user step names, we just skip them
-      // altogether.
-      return null;
-    }
-    fullStepName = dataflowPipelineJob.transformStepNames
-        .inverse().get(fullStepName).getFullName();
-    return MetricKey.create(
-        fullStepName,
-        MetricName.named(
-            metricUpdate.getName().getContext().get("namespace"),
-            metricUpdate.getName().getName()));
-  }
-
-  /**
-   * Check whether a {@link com.google.api.services.dataflow.model.MetricUpdate} is a tentative
-   * update or not.
-   * @return true if update is tentative, false otherwise
-   */
-  private boolean isMetricTentative(
-      com.google.api.services.dataflow.model.MetricUpdate metricUpdate) {
-    return (metricUpdate.getName().getContext().containsKey("tentative")
-        && Objects.equal(metricUpdate.getName().getContext().get("tentative"), "true"));
-  }
-
-  /**
    * Take a list of metric updates coming from the Dataflow service, and format it into a
    * Metrics API MetricQueryResults instance.
    * @param metricUpdates
@@ -114,68 +84,8 @@ class DataflowMetrics extends MetricResults {
   private MetricQueryResults populateMetricQueryResults(
       List<com.google.api.services.dataflow.model.MetricUpdate> metricUpdates,
       MetricsFilter filter) {
-    // Separate metric updates by name and by tentative/committed.
-    HashMap<MetricKey, com.google.api.services.dataflow.model.MetricUpdate>
-        tentativeByName = new HashMap<>();
-    HashMap<MetricKey, com.google.api.services.dataflow.model.MetricUpdate>
-        committedByName = new HashMap<>();
-    HashSet<MetricKey> metricHashKeys = new HashSet<>();
-
-    // If the Context of the metric update does not have a namespace, then these are not
-    // actual metrics counters.
-    for (com.google.api.services.dataflow.model.MetricUpdate update : metricUpdates) {
-      if (Objects.equal(update.getName().getOrigin(), "user")
-          && update.getName().getContext().containsKey("namespace")) {
-        MetricKey key = metricHashKey(update);
-        if (key == null) {
-          continue;
-        }
-        metricHashKeys.add(key);
-        if (isMetricTentative(update)) {
-          tentativeByName.put(key, update);
-        } else {
-          committedByName.put(key, update);
-        }
-      }
-    }
-    // Create the lists with the metric result information.
-    ImmutableList.Builder<MetricResult<Long>> counterResults = ImmutableList.builder();
-    ImmutableList.Builder<MetricResult<DistributionResult>> distributionResults =
-        ImmutableList.builder();
-    ImmutableList.Builder<MetricResult<GaugeResult>> gaugeResults = ImmutableList.builder();
-    for (MetricKey metricKey : metricHashKeys) {
-      if (!MetricFiltering.matches(filter, metricKey)) {
-        // Skip unmatched metrics early.
-        continue;
-      }
-
-      // This code is not robust to evolutions in the types of metrics that can be returned, so
-      // wrap it in a try-catch and log errors.
-      try {
-        String metricName = metricKey.metricName().name();
-        if (metricName.endsWith("[MIN]") || metricName.endsWith("[MAX]")
-            || metricName.endsWith("[MEAN]") || metricName.endsWith("[COUNT]")) {
-          // Skip distribution metrics, as these are not yet properly supported.
-          LOG.warn("Distribution metrics are not yet supported. You can see them in the Dataflow"
-              + " User Interface");
-          continue;
-        }
-
-        String namespace = metricKey.metricName().namespace();
-        String step = metricKey.stepName();
-        Long committed = ((Number) committedByName.get(metricKey).getScalar()).longValue();
-        Long attempted = ((Number) tentativeByName.get(metricKey).getScalar()).longValue();
-        counterResults.add(
-            DataflowMetricResult.create(
-                MetricName.named(namespace, metricName), step, committed, attempted));
-      } catch (Exception e) {
-        LOG.warn("Error handling metric {} for filter {}, skipping result.", metricKey, filter);
-      }
-    }
-    return DataflowMetricQueryResults.create(
-        counterResults.build(),
-        distributionResults.build(),
-        gaugeResults.build());
+    return DataflowMetricQueryResultsFactory.create(dataflowPipelineJob, metricUpdates, filter)
+        .build();
   }
 
   private MetricQueryResults queryServiceForMetrics(MetricsFilter filter) {
@@ -214,6 +124,214 @@ class DataflowMetrics extends MetricResults {
     return result;
   }
 
+  private static class DataflowMetricResultExtractor {
+    private final ImmutableList.Builder<MetricResult<Long>> counterResults;
+    private final ImmutableList.Builder<MetricResult<DistributionResult>> distributionResults;
+    private final ImmutableList.Builder<MetricResult<GaugeResult>> gaugeResults;
+    private final boolean isStreamingJob;
+
+    DataflowMetricResultExtractor(boolean isStreamingJob) {
+      counterResults = ImmutableList.builder();
+      distributionResults = ImmutableList.builder();
+      gaugeResults = ImmutableList.builder();
+      this.isStreamingJob = isStreamingJob;
+    }
+
+    public void addMetricResult(
+        MetricKey metricKey,
+        @Nullable com.google.api.services.dataflow.model.MetricUpdate committed,
+        @Nullable com.google.api.services.dataflow.model.MetricUpdate attempted) {
+      if (committed == null || attempted == null) {
+        LOG.warn(
+            "Metric {} did not have both a committed ({}) and tentative value ({}).",
+            metricKey, committed, attempted);
+      } else if (committed.getDistribution() != null && attempted.getDistribution() != null) {
+        // distribution metric
+        DistributionResult value = getDistributionValue(committed);
+        distributionResults.add(
+            DataflowMetricResult.create(
+                metricKey.metricName(),
+                metricKey.stepName(),
+                isStreamingJob ? null : value, // Committed
+                isStreamingJob ? value : null)); // Attempted
+        /* In Dataflow streaming jobs, only ATTEMPTED metrics are available.
+         * In Dataflow batch jobs, only COMMITTED metrics are available.
+         * Reporting the appropriate metric depending on whether it's a batch/streaming job.
+         */
+      } else if (committed.getScalar() != null && attempted.getScalar() != null) {
+        // counter metric
+        Long value = getCounterValue(committed);
+        counterResults.add(
+            DataflowMetricResult.create(
+                metricKey.metricName(),
+                metricKey.stepName(),
+                isStreamingJob ? null : value, // Committed
+                isStreamingJob ? value : null)); // Attempted
+        /* In Dataflow streaming jobs, only ATTEMPTED metrics are available.
+         * In Dataflow batch jobs, only COMMITTED metrics are available.
+         * Reporting the appropriate metric depending on whether it's a batch/streaming job.
+         */
+      } else {
+        // This is exceptionally unexpected. We expect matching user metrics to only have the
+        // value types provided by the Metrics API.
+        LOG.warn("Unexpected / mismatched metric types."
+            + " Please report JOB ID to Dataflow Support. Metric key: {}."
+            + " Committed / attempted Metric updates: {} / {}",
+            metricKey.toString(), committed.toString(), attempted.toString());
+      }
+    }
+
+    private Long getCounterValue(com.google.api.services.dataflow.model.MetricUpdate metricUpdate) {
+      if (metricUpdate.getScalar() == null) {
+        return 0L;
+      }
+      return ((Number) metricUpdate.getScalar()).longValue();
+    }
+
+    private DistributionResult getDistributionValue(
+        com.google.api.services.dataflow.model.MetricUpdate metricUpdate) {
+      if (metricUpdate.getDistribution() == null) {
+        return DistributionResult.ZERO;
+      }
+      ArrayMap distributionMap = (ArrayMap) metricUpdate.getDistribution();
+      Long count = ((Number) distributionMap.get("count")).longValue();
+      Long min = ((Number) distributionMap.get("min")).longValue();
+      Long max = ((Number) distributionMap.get("max")).longValue();
+      Long sum = ((Number) distributionMap.get("sum")).longValue();
+      return DistributionResult.create(sum, count, min, max);
+    }
+
+    public Iterable<MetricResult<DistributionResult>> getDistributionResults() {
+      return distributionResults.build();
+    }
+
+    public Iterable<MetricResult<Long>> getCounterResults() {
+      return counterResults.build();
+    }
+
+    public Iterable<MetricResult<GaugeResult>> getGaugeResults() {
+      return gaugeResults.build();
+    }
+  }
+
+  private static class DataflowMetricQueryResultsFactory {
+    private final Iterable<com.google.api.services.dataflow.model.MetricUpdate> metricUpdates;
+    private final MetricsFilter filter;
+    private final HashMap<MetricKey, com.google.api.services.dataflow.model.MetricUpdate>
+        tentativeByName;
+    private final HashMap<MetricKey, com.google.api.services.dataflow.model.MetricUpdate>
+        committedByName;
+    private final HashSet<MetricKey> metricHashKeys;
+    private final DataflowPipelineJob dataflowPipelineJob;
+
+    public static DataflowMetricQueryResultsFactory create(DataflowPipelineJob dataflowPipelineJob,
+        Iterable<com.google.api.services.dataflow.model.MetricUpdate> metricUpdates,
+        MetricsFilter filter) {
+      return new DataflowMetricQueryResultsFactory(dataflowPipelineJob, metricUpdates, filter);
+    }
+
+    private DataflowMetricQueryResultsFactory(DataflowPipelineJob dataflowPipelineJob,
+        Iterable<com.google.api.services.dataflow.model.MetricUpdate> metricUpdates,
+        MetricsFilter filter) {
+      this.dataflowPipelineJob = dataflowPipelineJob;
+      this.metricUpdates = metricUpdates;
+      this.filter = filter;
+
+      tentativeByName = new HashMap<>();
+      committedByName = new HashMap<>();
+      metricHashKeys = new HashSet<>();
+    }
+
+    /**
+     * Check whether a {@link com.google.api.services.dataflow.model.MetricUpdate} is a tentative
+     * update or not.
+     * @return true if update is tentative, false otherwise
+     */
+    private boolean isMetricTentative(
+        com.google.api.services.dataflow.model.MetricUpdate metricUpdate) {
+      return (metricUpdate.getName().getContext().containsKey("tentative")
+          && Objects.equal(metricUpdate.getName().getContext().get("tentative"), "true"));
+    }
+
+    /**
+     * Build an {@link MetricKey} that serves as a hash key for a metric update.
+     * @return a {@link MetricKey} that can be hashed and used to identify a metric.
+     */
+    private MetricKey getMetricHashKey(
+        com.google.api.services.dataflow.model.MetricUpdate metricUpdate) {
+      String fullStepName = metricUpdate.getName().getContext().get("step");
+      if (dataflowPipelineJob.transformStepNames == null
+          || !dataflowPipelineJob.transformStepNames.inverse().containsKey(fullStepName)) {
+        // If we can't translate internal step names to user step names, we just skip them
+        // altogether.
+        return null;
+      }
+      fullStepName = dataflowPipelineJob.transformStepNames
+          .inverse().get(fullStepName).getFullName();
+      return MetricKey.create(
+          fullStepName,
+          MetricName.named(
+              metricUpdate.getName().getContext().get("namespace"),
+              metricUpdate.getName().getName()));
+    }
+
+    private void buildMetricsIndex() {
+      // If the Context of the metric update does not have a namespace, then these are not
+      // actual metrics counters.
+      for (com.google.api.services.dataflow.model.MetricUpdate update : metricUpdates) {
+        if (update.getName().getOrigin() != null
+            && (!update.getName().getOrigin().toLowerCase().equals("user")
+            || !update.getName().getContext().containsKey("namespace"))) {
+          // Skip non-user metrics, which should have both a "user" origin and a namespace.
+          continue;
+        }
+
+        MetricKey updateKey = getMetricHashKey(update);
+        if (updateKey == null || !MetricFiltering.matches(filter, updateKey)) {
+          // Skip unmatched metrics early.
+          continue;
+        }
+
+        metricHashKeys.add(updateKey);
+        if (isMetricTentative(update)) {
+          MetricUpdate previousUpdate = tentativeByName.put(updateKey, update);
+          if (previousUpdate != null) {
+            LOG.warn("Metric {} already had a tentative value of {}", updateKey, previousUpdate);
+          }
+        } else {
+          MetricUpdate previousUpdate = committedByName.put(updateKey, update);
+          if (previousUpdate != null) {
+            LOG.warn("Metric {} already had a committed value of {}", updateKey, previousUpdate);
+          }
+        }
+      }
+    }
+
+    public MetricQueryResults build() {
+      buildMetricsIndex();
+
+      DataflowMetricResultExtractor extractor = new DataflowMetricResultExtractor(
+          dataflowPipelineJob.getDataflowOptions().isStreaming());
+      for (MetricKey metricKey : metricHashKeys) {
+        String metricName = metricKey.metricName().name();
+        if (metricName.endsWith("[MIN]") || metricName.endsWith("[MAX]")
+            || metricName.endsWith("[MEAN]") || metricName.endsWith("[COUNT]")) {
+          // Skip distribution metrics, as these are not yet properly supported.
+          // TODO: remove this when distributions stop being broken up for the UI.
+          continue;
+        }
+
+        extractor.addMetricResult(metricKey,
+            committedByName.get(metricKey),
+            tentativeByName.get(metricKey));
+      }
+      return DataflowMetricQueryResults.create(
+          extractor.getCounterResults(),
+          extractor.getDistributionResults(),
+          extractor.getGaugeResults());
+    }
+  }
+
   @AutoValue
   abstract static class DataflowMetricQueryResults implements MetricQueryResults {
     public static MetricQueryResults create(
@@ -231,7 +349,9 @@ class DataflowMetrics extends MetricResults {
     // and the generated constructor is usable and consistent
     public abstract MetricName name();
     public abstract String step();
+    @Nullable
     public abstract T committed();
+    @Nullable
     public abstract T attempted();
 
     public static <T> MetricResult<T> create(MetricName name, String scope,

http://git-wip-us.apache.org/repos/asf/beam/blob/a48eefac/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
index 2d23983..e30d426 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
@@ -164,6 +164,10 @@ public class DataflowPipelineJob implements PipelineResult {
     return dataflowOptions.getProject();
   }
 
+  public DataflowPipelineOptions getDataflowOptions() {
+    return dataflowOptions;
+  }
+
   /**
    * Returns a new {@link DataflowPipelineJob} for the job that replaced this one, if applicable.
    *

http://git-wip-us.apache.org/repos/asf/beam/blob/a48eefac/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java
index c3c741c..05fe687 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.dataflow;
 import static org.apache.beam.sdk.metrics.MetricResultsMatchers.attemptedMetricsResult;
 import static org.apache.beam.sdk.metrics.MetricResultsMatchers.committedMetricsResult;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.is;
@@ -28,6 +29,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import com.google.api.client.util.ArrayMap;
 import com.google.api.services.dataflow.Dataflow;
 import com.google.api.services.dataflow.model.Job;
 import com.google.api.services.dataflow.model.JobMetrics;
@@ -38,9 +40,11 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
 import java.math.BigDecimal;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.sdk.PipelineResult.State;
 import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
 import org.apache.beam.sdk.extensions.gcp.storage.NoopPathValidator;
+import org.apache.beam.sdk.metrics.DistributionResult;
 import org.apache.beam.sdk.metrics.MetricQueryResults;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.AppliedPTransform;
@@ -95,6 +99,9 @@ public class DataflowMetricsTest {
     modelJob.setCurrentState(State.RUNNING.toString());
 
     DataflowPipelineJob job = mock(DataflowPipelineJob.class);
+    DataflowPipelineOptions options = mock(DataflowPipelineOptions.class);
+    when(options.isStreaming()).thenReturn(false);
+    when(job.getDataflowOptions()).thenReturn(options);
     when(job.getState()).thenReturn(State.RUNNING);
     job.jobId = JOB_ID;
 
@@ -115,6 +122,9 @@ public class DataflowMetricsTest {
     modelJob.setCurrentState(State.RUNNING.toString());
 
     DataflowPipelineJob job = mock(DataflowPipelineJob.class);
+    DataflowPipelineOptions options = mock(DataflowPipelineOptions.class);
+    when(options.isStreaming()).thenReturn(false);
+    when(job.getDataflowOptions()).thenReturn(options);
     when(job.getState()).thenReturn(State.DONE);
     job.jobId = JOB_ID;
 
@@ -131,11 +141,8 @@ public class DataflowMetricsTest {
     verify(dataflowClient, times(1)).getJobMetrics(JOB_ID);
   }
 
-  private MetricUpdate makeCounterMetricUpdate(String name, String namespace, String step,
-      long scalar, boolean tentative) {
-    MetricUpdate update = new MetricUpdate();
-    update.setScalar(new BigDecimal(scalar));
-
+  private MetricUpdate setStructuredName(MetricUpdate update, String name, String namespace,
+      String step, boolean tentative) {
     MetricStructuredName structuredName = new MetricStructuredName();
     structuredName.setName(name);
     structuredName.setOrigin("user");
@@ -150,10 +157,34 @@ public class DataflowMetricsTest {
     return update;
   }
 
+  private MetricUpdate makeDistributionMetricUpdate(String name, String namespace, String step,
+      Long sum, Long count, Long min, Long max, boolean tentative) {
+    MetricUpdate update = new MetricUpdate();
+    ArrayMap<String, BigDecimal> distribution = ArrayMap.create();
+    distribution.add("count", new BigDecimal(count));
+    distribution.add("mean", new BigDecimal(sum / count));
+    distribution.add("sum", new BigDecimal(sum));
+    distribution.add("min", new BigDecimal(min));
+    distribution.add("max", new BigDecimal(max));
+    update.setDistribution(distribution);
+    return setStructuredName(update, name, namespace, step, tentative);
+  }
+
+  private MetricUpdate makeCounterMetricUpdate(String name, String namespace, String step,
+      long scalar, boolean tentative) {
+    MetricUpdate update = new MetricUpdate();
+    update.setScalar(new BigDecimal(scalar));
+    return setStructuredName(update, name, namespace, step, tentative);
+
+  }
+
   @Test
   public void testSingleCounterUpdates() throws IOException {
     JobMetrics jobMetrics = new JobMetrics();
     DataflowPipelineJob job = mock(DataflowPipelineJob.class);
+    DataflowPipelineOptions options = mock(DataflowPipelineOptions.class);
+    when(options.isStreaming()).thenReturn(false);
+    when(job.getDataflowOptions()).thenReturn(options);
     when(job.getState()).thenReturn(State.RUNNING);
     job.jobId = JOB_ID;
 
@@ -179,7 +210,7 @@ public class DataflowMetricsTest {
     DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient);
     MetricQueryResults result = dataflowMetrics.queryMetrics(null);
     assertThat(result.counters(), containsInAnyOrder(
-        attemptedMetricsResult("counterNamespace", "counterName", "myStepName", 1233L)));
+        attemptedMetricsResult("counterNamespace", "counterName", "myStepName", (Long) null)));
     assertThat(result.counters(), containsInAnyOrder(
         committedMetricsResult("counterNamespace", "counterName", "myStepName", 1234L)));
   }
@@ -190,6 +221,9 @@ public class DataflowMetricsTest {
     DataflowClient dataflowClient = mock(DataflowClient.class);
     when(dataflowClient.getJobMetrics(JOB_ID)).thenReturn(jobMetrics);
     DataflowPipelineJob job = mock(DataflowPipelineJob.class);
+    DataflowPipelineOptions options = mock(DataflowPipelineOptions.class);
+    when(options.isStreaming()).thenReturn(false);
+    when(job.getDataflowOptions()).thenReturn(options);
     when(job.getState()).thenReturn(State.RUNNING);
     job.jobId = JOB_ID;
 
@@ -202,24 +236,97 @@ public class DataflowMetricsTest {
     // the job metrics results.
     jobMetrics.setMetrics(ImmutableList.of(
         makeCounterMetricUpdate("counterName", "counterNamespace", "s2", 1233L, false),
-        makeCounterMetricUpdate("counterName", "counterNamespace", "s2", 1234L, true),
+        makeCounterMetricUpdate("counterName", "counterNamespace", "s2", 1233L, true),
         makeCounterMetricUpdate("otherCounter[MIN]", "otherNamespace", "s2", 0L, false),
         makeCounterMetricUpdate("otherCounter[MIN]", "otherNamespace", "s2", 0L, true)));
 
     DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient);
     MetricQueryResults result = dataflowMetrics.queryMetrics(null);
     assertThat(result.counters(), containsInAnyOrder(
-        attemptedMetricsResult("counterNamespace", "counterName", "myStepName", 1234L)));
+        attemptedMetricsResult("counterNamespace", "counterName", "myStepName", (Long) null)));
     assertThat(result.counters(), containsInAnyOrder(
         committedMetricsResult("counterNamespace", "counterName", "myStepName", 1233L)));
   }
 
   @Test
+  public void testDistributionUpdates() throws IOException {
+    JobMetrics jobMetrics = new JobMetrics();
+    DataflowClient dataflowClient = mock(DataflowClient.class);
+    when(dataflowClient.getJobMetrics(JOB_ID)).thenReturn(jobMetrics);
+    DataflowPipelineJob job = mock(DataflowPipelineJob.class);
+    DataflowPipelineOptions options = mock(DataflowPipelineOptions.class);
+    when(options.isStreaming()).thenReturn(false);
+    when(job.getDataflowOptions()).thenReturn(options);
+    when(job.getState()).thenReturn(State.RUNNING);
+    job.jobId = JOB_ID;
+
+    AppliedPTransform<?, ?, ?> myStep2 = mock(AppliedPTransform.class);
+    when(myStep2.getFullName()).thenReturn("myStepName");
+    job.transformStepNames = HashBiMap.create();
+    job.transformStepNames.put(myStep2, "s2");
+
+    // The parser relies on the fact that one tentative and one committed metric update exist in
+    // the job metrics results.
+    jobMetrics.setMetrics(ImmutableList.of(
+        makeDistributionMetricUpdate("distributionName", "distributionNamespace", "s2",
+            18L, 2L, 2L, 16L, false),
+        makeDistributionMetricUpdate("distributionName", "distributionNamespace", "s2",
+            18L, 2L, 2L, 16L, true)));
+
+    DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient);
+    MetricQueryResults result = dataflowMetrics.queryMetrics(null);
+    assertThat(result.distributions(), contains(
+        attemptedMetricsResult("distributionNamespace", "distributionName", "myStepName",
+            (DistributionResult) null)));
+    assertThat(result.distributions(), contains(
+        committedMetricsResult("distributionNamespace", "distributionName", "myStepName",
+            DistributionResult.create(18, 2, 2, 16))));
+  }
+
+  @Test
+  public void testDistributionUpdatesStreaming() throws IOException {
+    JobMetrics jobMetrics = new JobMetrics();
+    DataflowClient dataflowClient = mock(DataflowClient.class);
+    when(dataflowClient.getJobMetrics(JOB_ID)).thenReturn(jobMetrics);
+    DataflowPipelineJob job = mock(DataflowPipelineJob.class);
+    DataflowPipelineOptions options = mock(DataflowPipelineOptions.class);
+    when(options.isStreaming()).thenReturn(true);
+    when(job.getDataflowOptions()).thenReturn(options);
+    when(job.getState()).thenReturn(State.RUNNING);
+    job.jobId = JOB_ID;
+
+    AppliedPTransform<?, ?, ?> myStep2 = mock(AppliedPTransform.class);
+    when(myStep2.getFullName()).thenReturn("myStepName");
+    job.transformStepNames = HashBiMap.create();
+    job.transformStepNames.put(myStep2, "s2");
+
+    // The parser relies on the fact that one tentative and one committed metric update exist in
+    // the job metrics results.
+    jobMetrics.setMetrics(ImmutableList.of(
+        makeDistributionMetricUpdate("distributionName", "distributionNamespace", "s2",
+            18L, 2L, 2L, 16L, false),
+        makeDistributionMetricUpdate("distributionName", "distributionNamespace", "s2",
+            18L, 2L, 2L, 16L, true)));
+
+    DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient);
+    MetricQueryResults result = dataflowMetrics.queryMetrics(null);
+    assertThat(result.distributions(), contains(
+        committedMetricsResult("distributionNamespace", "distributionName", "myStepName",
+            (DistributionResult) null)));
+    assertThat(result.distributions(), contains(
+        attemptedMetricsResult("distributionNamespace", "distributionName", "myStepName",
+            DistributionResult.create(18, 2, 2, 16))));
+  }
+
+  @Test
   public void testMultipleCounterUpdates() throws IOException {
     JobMetrics jobMetrics = new JobMetrics();
     DataflowClient dataflowClient = mock(DataflowClient.class);
     when(dataflowClient.getJobMetrics(JOB_ID)).thenReturn(jobMetrics);
     DataflowPipelineJob job = mock(DataflowPipelineJob.class);
+    DataflowPipelineOptions options = mock(DataflowPipelineOptions.class);
+    when(options.isStreaming()).thenReturn(false);
+    when(job.getDataflowOptions()).thenReturn(options);
     when(job.getState()).thenReturn(State.RUNNING);
     job.jobId = JOB_ID;
 
@@ -251,12 +358,57 @@ public class DataflowMetricsTest {
     DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient);
     MetricQueryResults result = dataflowMetrics.queryMetrics(null);
     assertThat(result.counters(), containsInAnyOrder(
-        attemptedMetricsResult("counterNamespace", "counterName", "myStepName", 1234L),
-        attemptedMetricsResult("otherNamespace", "otherCounter", "myStepName3", 12L),
-        attemptedMetricsResult("otherNamespace", "counterName", "myStepName4", 1233L)));
+        attemptedMetricsResult("counterNamespace", "counterName", "myStepName", (Long) null),
+        attemptedMetricsResult("otherNamespace", "otherCounter", "myStepName3", (Long) null),
+        attemptedMetricsResult("otherNamespace", "counterName", "myStepName4", (Long) null)));
     assertThat(result.counters(), containsInAnyOrder(
         committedMetricsResult("counterNamespace", "counterName", "myStepName", 1233L),
         committedMetricsResult("otherNamespace", "otherCounter", "myStepName3", 12L),
         committedMetricsResult("otherNamespace", "counterName", "myStepName4", 1200L)));
   }
+
+  @Test
+  public void testMultipleCounterUpdatesStreaming() throws IOException {
+    JobMetrics jobMetrics = new JobMetrics();
+    DataflowClient dataflowClient = mock(DataflowClient.class);
+    when(dataflowClient.getJobMetrics(JOB_ID)).thenReturn(jobMetrics);
+    DataflowPipelineJob job = mock(DataflowPipelineJob.class);
+    DataflowPipelineOptions options = mock(DataflowPipelineOptions.class);
+    when(options.isStreaming()).thenReturn(true);
+    when(job.getDataflowOptions()).thenReturn(options);
+    when(job.getState()).thenReturn(State.RUNNING);
+    job.jobId = JOB_ID;
+
+    AppliedPTransform<?, ?, ?> myStep2 = mock(AppliedPTransform.class);
+    when(myStep2.getFullName()).thenReturn("myStepName");
+    job.transformStepNames = HashBiMap.create();
+    job.transformStepNames.put(myStep2, "s2");
+    AppliedPTransform<?, ?, ?> myStep3 = mock(AppliedPTransform.class);
+    when(myStep3.getFullName()).thenReturn("myStepName3");
+    job.transformStepNames.put(myStep3, "s3");
+    AppliedPTransform<?, ?, ?> myStep4 = mock(AppliedPTransform.class);
+    when(myStep4.getFullName()).thenReturn("myStepName4");
+    job.transformStepNames.put(myStep4, "s4");
+
+    // The parser relies on the fact that one tentative and one committed metric update exist in
+    // the job metrics results.
+    jobMetrics.setMetrics(ImmutableList.of(
+        makeCounterMetricUpdate("counterName", "counterNamespace", "s2", 1233L, false),
+        makeCounterMetricUpdate("counterName", "counterNamespace", "s2", 1234L, true),
+        makeCounterMetricUpdate("otherCounter", "otherNamespace", "s3", 12L, false),
+        makeCounterMetricUpdate("otherCounter", "otherNamespace", "s3", 12L, true),
+        makeCounterMetricUpdate("counterName", "otherNamespace", "s4", 1200L, false),
+        makeCounterMetricUpdate("counterName", "otherNamespace", "s4", 1233L, true)));
+
+    DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient);
+    MetricQueryResults result = dataflowMetrics.queryMetrics(null);
+    assertThat(result.counters(), containsInAnyOrder(
+        committedMetricsResult("counterNamespace", "counterName", "myStepName", (Long) null),
+        committedMetricsResult("otherNamespace", "otherCounter", "myStepName3", (Long) null),
+        committedMetricsResult("otherNamespace", "counterName", "myStepName4", (Long) null)));
+    assertThat(result.counters(), containsInAnyOrder(
+        attemptedMetricsResult("counterNamespace", "counterName", "myStepName", 1233L),
+        attemptedMetricsResult("otherNamespace", "otherCounter", "myStepName3", 12L),
+        attemptedMetricsResult("otherNamespace", "counterName", "myStepName4", 1200L)));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a48eefac/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricResultsMatchers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricResultsMatchers.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricResultsMatchers.java
index 5031952..030a759 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricResultsMatchers.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricResultsMatchers.java
@@ -96,7 +96,7 @@ public class MetricResultsMatchers {
     if (result1 instanceof GaugeResult) {
       return (((GaugeResult) result1).value()) == (((GaugeResult) result2).value());
     } else {
-      return result1.equals(result2);
+      return Objects.equals(result1, result2);
     }
   }