You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/26 20:49:10 UTC

[1/2] beam git commit: Make it possible to test runners that don't support all metrics

Repository: beam
Updated Branches:
  refs/heads/master de9d89c1e -> f23dd6709


Make it possible to test runners that don't support all metrics

Runners may only partially support metrics. This partial support runs along axes:
- attempted & committed
- counters & distributions & gauges & ...

Prior to this PR, we have categories for the first axis, but not the second.
This means that a runner that supports only part of the second axis has to blacklist tests
on the first axis. Adding categories for both axes lets runners build a matrix of supported
features.

(This also lets the DataflowRunner run more tests by accurately identifying its test
 matrix.)

* MetricsMatchers: refactor matchers for committed/attempted code reuse


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4faa8feb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4faa8feb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4faa8feb

Branch: refs/heads/master
Commit: 4faa8feba822db000b4b42636408245422ed324d
Parents: de9d89c
Author: Dan Halperin <dh...@google.com>
Authored: Fri Apr 21 10:03:50 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Apr 26 13:48:42 2017 -0700

----------------------------------------------------------------------
 runners/google-cloud-dataflow-java/pom.xml      |   4 +-
 .../beam/sdk/testing/UsesCounterMetrics.java    |  25 +++
 .../sdk/testing/UsesDistributionMetrics.java    |  26 ++++
 .../beam/sdk/testing/UsesGaugeMetrics.java      |  25 +++
 .../apache/beam/sdk/metrics/MetricMatchers.java | 144 ++++++-----------
 .../apache/beam/sdk/metrics/MetricsTest.java    | 154 ++++++++++++-------
 6 files changed, 225 insertions(+), 153 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/4faa8feb/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index 64dc71e..75aac43 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -122,8 +122,8 @@
             <id>validates-runner-tests</id>
             <configuration>
               <excludedGroups>
-                org.apache.beam.sdk.testing.UsesAttemptedMetrics,
-                org.apache.beam.sdk.testing.UsesCommittedMetrics,
+                org.apache.beam.sdk.testing.UsesDistributionMetrics,
+                org.apache.beam.sdk.testing.UsesGaugeMetrics,
                 org.apache.beam.sdk.testing.UsesSetState,
                 org.apache.beam.sdk.testing.UsesMapState,
                 org.apache.beam.sdk.testing.UsesTimersInParDo,

http://git-wip-us.apache.org/repos/asf/beam/blob/4faa8feb/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesCounterMetrics.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesCounterMetrics.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesCounterMetrics.java
new file mode 100644
index 0000000..0d0ed6f
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesCounterMetrics.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.testing;
+
+/**
+ * Category tag for validation tests which utilize {@link org.apache.beam.sdk.metrics.Counter}.
+ * Tests tagged with {@link UsesCounterMetrics} should be run for runners which support counters.
+ */
+public class UsesCounterMetrics {}

http://git-wip-us.apache.org/repos/asf/beam/blob/4faa8feb/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesDistributionMetrics.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesDistributionMetrics.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesDistributionMetrics.java
new file mode 100644
index 0000000..6422024
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesDistributionMetrics.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.testing;
+
+/**
+ * Category tag for validation tests which utilize {@link org.apache.beam.sdk.metrics.Distribution}.
+ * Tests tagged with {@link UsesDistributionMetrics} should be run for runners which support
+ * distributions.
+ */
+public class UsesDistributionMetrics {}

http://git-wip-us.apache.org/repos/asf/beam/blob/4faa8feb/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesGaugeMetrics.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesGaugeMetrics.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesGaugeMetrics.java
new file mode 100644
index 0000000..9d6455e
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesGaugeMetrics.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.testing;
+
+/**
+ * Category tag for validation tests which utilize {@link org.apache.beam.sdk.metrics.Gauge}.
+ * Tests tagged with {@link UsesGaugeMetrics} should be run for runners which support gauges.
+ */
+public class UsesGaugeMetrics {}

http://git-wip-us.apache.org/repos/asf/beam/blob/4faa8feb/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java
index 2251c82..a0dd119 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java
@@ -81,61 +81,39 @@ public class MetricMatchers {
   }
 
   /**
-   * Matches a {@link MetricResult} with the given namespace, name and step, and whose attempted
-   * value equals the given value.
+   * Matches a {@link MetricResult} with the given namespace, name and step, and whose value equals
+   * the given value for attempted metrics.
    */
   public static <T> Matcher<MetricResult<T>> attemptedMetricsResult(
-      final String namespace, final String name, final String step, final T attempted) {
-    return new TypeSafeMatcher<MetricResult<T>>() {
-      @Override
-      protected boolean matchesSafely(MetricResult<T> item) {
-        return Objects.equals(namespace, item.name().namespace())
-            && Objects.equals(name, item.name().name())
-            && item.step().contains(step)
-            && metricResultsEqual(attempted, item.attempted());
-      }
-
-      @Override
-      public void describeTo(Description description) {
-        description
-            .appendText("MetricResult{inNamespace=").appendValue(namespace)
-            .appendText(", name=").appendValue(name)
-            .appendText(", step=").appendValue(step)
-            .appendText(", attempted=").appendValue(attempted)
-            .appendText("}");
-      }
-
-      @Override
-      protected void describeMismatchSafely(MetricResult<T> item, Description mismatchDescription) {
-        mismatchDescription.appendText("MetricResult{");
-
-        describeMetricsResultMembersMismatch(item, mismatchDescription, namespace, name, step);
-
-        if (!Objects.equals(attempted, item.attempted())) {
-          mismatchDescription
-              .appendText("attempted: ").appendValue(attempted)
-              .appendText(" != ").appendValue(item.attempted());
-        }
-
-        mismatchDescription.appendText("}");
-      }
-    };
+      final String namespace, final String name, final String step, final T value) {
+    return metricsResult(namespace, name, step, value, false);
   }
 
   /**
-   * Matches a {@link MetricResult} with the given namespace, name and step, and whose committed
-   * value equals the given value.
+   * Matches a {@link MetricResult} with the given namespace, name and step, and whose value equals
+   * the given value for committed metrics.
    */
   public static <T> Matcher<MetricResult<T>> committedMetricsResult(
-      final String namespace, final String name, final String step,
-      final T committed) {
+      final String namespace, final String name, final String step, final T value) {
+    return metricsResult(namespace, name, step, value, true);
+  }
+
+  /**
+   * Matches a {@link MetricResult} with the given namespace, name and step, and whose value equals
+   * the given value for either committed or attempted (based on {@code isCommitted}) metrics.
+   */
+  public static <T> Matcher<MetricResult<T>> metricsResult(
+      final String namespace, final String name, final String step, final T value,
+      final boolean isCommitted) {
+    final String metricState = isCommitted ? "committed" : "attempted";
     return new TypeSafeMatcher<MetricResult<T>>() {
       @Override
       protected boolean matchesSafely(MetricResult<T> item) {
+        final T metricValue = isCommitted ? item.committed() : item.attempted();
         return Objects.equals(namespace, item.name().namespace())
             && Objects.equals(name, item.name().name())
             && item.step().contains(step)
-            && metricResultsEqual(committed, item.committed());
+            && metricResultsEqual(value, metricValue);
       }
 
       @Override
@@ -144,20 +122,21 @@ public class MetricMatchers {
             .appendText("MetricResult{inNamespace=").appendValue(namespace)
             .appendText(", name=").appendValue(name)
             .appendText(", step=").appendValue(step)
-            .appendText(", committed=").appendValue(committed)
+            .appendText(String.format(", %s=", metricState)).appendValue(value)
             .appendText("}");
       }
 
       @Override
       protected void describeMismatchSafely(MetricResult<T> item, Description mismatchDescription) {
         mismatchDescription.appendText("MetricResult{");
+        final T metricValue = isCommitted ? item.committed() : item.attempted();
 
         describeMetricsResultMembersMismatch(item, mismatchDescription, namespace, name, step);
 
-        if (!Objects.equals(committed, item.committed())) {
+        if (!Objects.equals(value, metricValue)) {
           mismatchDescription
-              .appendText("committed: ").appendValue(committed)
-              .appendText(" != ").appendValue(item.committed());
+              .appendText(String.format("%s: ", metricState)).appendValue(value)
+              .appendText(" != ").appendValue(metricValue);
         }
 
         mismatchDescription.appendText("}");
@@ -176,62 +155,28 @@ public class MetricMatchers {
   static Matcher<MetricResult<DistributionResult>> distributionAttemptedMinMax(
       final String namespace, final String name, final String step,
       final Long attemptedMin, final Long attemptedMax) {
-    return new TypeSafeMatcher<MetricResult<DistributionResult>>() {
-      @Override
-      protected boolean matchesSafely(MetricResult<DistributionResult> item) {
-        return Objects.equals(namespace, item.name().namespace())
-            && Objects.equals(name, item.name().name())
-            && item.step().contains(step)
-            && Objects.equals(attemptedMin, item.attempted().min())
-            && Objects.equals(attemptedMax, item.attempted().max());
-      }
-
-      @Override
-      public void describeTo(Description description) {
-        description
-            .appendText("MetricResult{inNamespace=").appendValue(namespace)
-            .appendText(", name=").appendValue(name)
-            .appendText(", step=").appendValue(step)
-            .appendText(", attemptedMin=").appendValue(attemptedMin)
-            .appendText(", attemptedMax=").appendValue(attemptedMax)
-            .appendText("}");
-      }
-
-      @Override
-      protected void describeMismatchSafely(MetricResult<DistributionResult> item,
-          Description mismatchDescription) {
-        mismatchDescription.appendText("MetricResult{");
-
-        describeMetricsResultMembersMismatch(item, mismatchDescription, namespace, name, step);
-
-        if (!Objects.equals(attemptedMin, item.attempted())) {
-          mismatchDescription
-              .appendText("attemptedMin: ").appendValue(attemptedMin)
-              .appendText(" != ").appendValue(item.attempted());
-        }
-
-        if (!Objects.equals(attemptedMax, item.attempted())) {
-          mismatchDescription
-              .appendText("attemptedMax: ").appendValue(attemptedMax)
-              .appendText(" != ").appendValue(item.attempted());
-        }
-
-        mismatchDescription.appendText("}");
-      }
-    };
+    return distributionMinMax(namespace, name, step, attemptedMin, attemptedMax, false);
   }
 
   static Matcher<MetricResult<DistributionResult>> distributionCommittedMinMax(
       final String namespace, final String name, final String step,
       final Long committedMin, final Long committedMax) {
+    return distributionMinMax(namespace, name, step, committedMin, committedMax, true);
+  }
+
+  static Matcher<MetricResult<DistributionResult>> distributionMinMax(
+      final String namespace, final String name, final String step,
+      final Long min, final Long max, final boolean isCommitted) {
+    final String metricState = isCommitted ? "committed" : "attempted";
     return new TypeSafeMatcher<MetricResult<DistributionResult>>() {
       @Override
       protected boolean matchesSafely(MetricResult<DistributionResult> item) {
+        DistributionResult metricValue = isCommitted ? item.committed() : item.attempted();
         return Objects.equals(namespace, item.name().namespace())
             && Objects.equals(name, item.name().name())
             && item.step().contains(step)
-            && Objects.equals(committedMin, item.committed().min())
-            && Objects.equals(committedMax, item.committed().max());
+            && Objects.equals(min, metricValue.min())
+            && Objects.equals(max, metricValue.max());
       }
 
       @Override
@@ -240,8 +185,8 @@ public class MetricMatchers {
             .appendText("MetricResult{inNamespace=").appendValue(namespace)
             .appendText(", name=").appendValue(name)
             .appendText(", step=").appendValue(step)
-            .appendText(", committedMin=").appendValue(committedMin)
-            .appendText(", committedMax=").appendValue(committedMax)
+            .appendText(String.format(", %sMin=", metricState)).appendValue(min)
+            .appendText(String.format(", %sMax=", metricState)).appendValue(max)
             .appendText("}");
       }
 
@@ -251,17 +196,18 @@ public class MetricMatchers {
         mismatchDescription.appendText("MetricResult{");
 
         describeMetricsResultMembersMismatch(item, mismatchDescription, namespace, name, step);
+        DistributionResult metricValue = isCommitted ? item.committed() : item.attempted();
 
-        if (!Objects.equals(committedMin, item.committed())) {
+        if (!Objects.equals(min, metricValue.min())) {
           mismatchDescription
-              .appendText("committedMin: ").appendValue(committedMin)
-              .appendText(" != ").appendValue(item.committed());
+              .appendText(String.format("%sMin: ", metricState)).appendValue(min)
+              .appendText(" != ").appendValue(metricValue.min());
         }
 
-        if (!Objects.equals(committedMax, item.committed())) {
+        if (!Objects.equals(max, metricValue.max())) {
           mismatchDescription
-              .appendText("committedMax: ").appendValue(committedMax)
-              .appendText(" != ").appendValue(item.committed());
+              .appendText(String.format("%sMax: ", metricState)).appendValue(max)
+              .appendText(" != ").appendValue(metricValue.max());
         }
 
         mismatchDescription.appendText("}");

http://git-wip-us.apache.org/repos/asf/beam/blob/4faa8feb/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
index c7068e1..8077c27 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
@@ -19,9 +19,8 @@
 package org.apache.beam.sdk.metrics;
 
 import static org.apache.beam.sdk.metrics.MetricMatchers.attemptedMetricsResult;
-import static org.apache.beam.sdk.metrics.MetricMatchers.committedMetricsResult;
-import static org.apache.beam.sdk.metrics.MetricMatchers.distributionAttemptedMinMax;
-import static org.apache.beam.sdk.metrics.MetricMatchers.distributionCommittedMinMax;
+import static org.apache.beam.sdk.metrics.MetricMatchers.distributionMinMax;
+import static org.apache.beam.sdk.metrics.MetricMatchers.metricsResult;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasItem;
 import static org.junit.Assert.assertNull;
@@ -33,6 +32,9 @@ import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.UsesAttemptedMetrics;
 import org.apache.beam.sdk.testing.UsesCommittedMetrics;
+import org.apache.beam.sdk.testing.UsesCounterMetrics;
+import org.apache.beam.sdk.testing.UsesDistributionMetrics;
+import org.apache.beam.sdk.testing.UsesGaugeMetrics;
 import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -58,6 +60,13 @@ public class MetricsTest implements Serializable {
   private static final String NAMESPACE = MetricsTest.class.getName();
   private static final MetricName ELEMENTS_READ = SourceMetrics.elementsRead().getName();
 
+  private static MetricQueryResults queryTestMetrics(PipelineResult result) {
+    return result.metrics().queryMetrics(
+        MetricsFilter.builder()
+            .addNameFilter(MetricNameFilter.inNamespace(MetricsTest.class))
+            .build());
+  }
+
   @Rule
   public final transient TestPipeline pipeline = TestPipeline.create();
 
@@ -67,14 +76,14 @@ public class MetricsTest implements Serializable {
   }
 
   @Test
-  public void distributionWithoutContainer() {
+  public void testDistributionWithoutContainer() {
     assertNull(MetricsEnvironment.getCurrentContainer());
     // Should not fail even though there is no metrics container.
     Metrics.distribution(NS, NAME).update(5L);
   }
 
   @Test
-  public void counterWithoutContainer() {
+  public void testCounterWithoutContainer() {
     assertNull(MetricsEnvironment.getCurrentContainer());
     // Should not fail even though there is no metrics container.
     Counter counter = Metrics.counter(NS, NAME);
@@ -85,7 +94,7 @@ public class MetricsTest implements Serializable {
   }
 
   @Test
-  public void distributionToCell() {
+  public void testDistributionToCell() {
     MetricsContainer container = new MetricsContainer("step");
     MetricsEnvironment.setCurrentContainer(container);
 
@@ -104,7 +113,7 @@ public class MetricsTest implements Serializable {
   }
 
   @Test
-  public void counterToCell() {
+  public void testCounterToCell() {
     MetricsContainer container = new MetricsContainer("step");
     MetricsEnvironment.setCurrentContainer(container);
     Counter counter = Metrics.counter(NS, NAME);
@@ -122,62 +131,73 @@ public class MetricsTest implements Serializable {
     assertThat(cell.getCumulative(), CoreMatchers.equalTo(42L));
   }
 
-  @Category({ValidatesRunner.class, UsesCommittedMetrics.class})
+  @Category({ValidatesRunner.class, UsesCommittedMetrics.class, UsesCounterMetrics.class,
+      UsesDistributionMetrics.class, UsesGaugeMetrics.class})
   @Test
-  public void committedMetricsReportToQuery() {
+  public void testAllCommittedMetrics() {
     PipelineResult result = runPipelineWithMetrics();
+    MetricQueryResults metrics = queryTestMetrics(result);
 
-    MetricQueryResults metrics = result.metrics().queryMetrics(MetricsFilter.builder()
-        .addNameFilter(MetricNameFilter.inNamespace(MetricsTest.class))
-        .build());
-
-    assertThat(metrics.counters(), hasItem(
-        committedMetricsResult(NAMESPACE, "count", "MyStep1", 3L)));
-    assertThat(metrics.distributions(), hasItem(
-        committedMetricsResult(NAMESPACE, "input", "MyStep1",
-            DistributionResult.create(26L, 3L, 5L, 13L))));
+    assertAllMetrics(metrics, true);
+  }
 
-    assertThat(metrics.counters(), hasItem(
-        committedMetricsResult(NAMESPACE, "count", "MyStep2", 6L)));
-    assertThat(metrics.distributions(), hasItem(
-        committedMetricsResult(NAMESPACE, "input", "MyStep2",
-            DistributionResult.create(52L, 6L, 5L, 13L))));
-    assertThat(metrics.gauges(), hasItem(
-        committedMetricsResult(NAMESPACE, "my-gauge", "MyStep2",
-            GaugeResult.create(12L, Instant.now()))));
+  @Category({ValidatesRunner.class, UsesAttemptedMetrics.class, UsesCounterMetrics.class,
+      UsesDistributionMetrics.class, UsesGaugeMetrics.class})
+  @Test
+  public void testAllAttemptedMetrics() {
+    PipelineResult result = runPipelineWithMetrics();
+    MetricQueryResults metrics = queryTestMetrics(result);
 
-    assertThat(metrics.distributions(), hasItem(
-        distributionCommittedMinMax(NAMESPACE, "bundle", "MyStep1", 10L, 40L)));
+    // TODO: BEAM-1169: Metrics shouldn't verify the physical values tightly.
+    assertAllMetrics(metrics, false);
   }
 
+  @Category({ValidatesRunner.class, UsesCommittedMetrics.class, UsesCounterMetrics.class})
+  @Test
+  public void testCommittedCounterMetrics() {
+    PipelineResult result = runPipelineWithMetrics();
+    MetricQueryResults metrics = queryTestMetrics(result);
+    assertCounterMetrics(metrics, true);
+  }
 
-  @Category({ValidatesRunner.class, UsesAttemptedMetrics.class})
+  @Category({ValidatesRunner.class, UsesAttemptedMetrics.class, UsesCounterMetrics.class})
   @Test
-  public void attemptedMetricsReportToQuery() {
+  public void testAttemptedCounterMetrics() {
     PipelineResult result = runPipelineWithMetrics();
+    MetricQueryResults metrics = queryTestMetrics(result);
+    assertCounterMetrics(metrics, false);
+  }
 
-    MetricQueryResults metrics = result.metrics().queryMetrics(MetricsFilter.builder()
-        .addNameFilter(MetricNameFilter.inNamespace(MetricsTest.class))
-        .build());
+  @Category({ValidatesRunner.class, UsesCommittedMetrics.class, UsesDistributionMetrics.class})
+  @Test
+  public void testCommittedDistributionMetrics() {
+    PipelineResult result = runPipelineWithMetrics();
+    MetricQueryResults metrics = queryTestMetrics(result);
+    assertDistributionMetrics(metrics, true);
+  }
 
-    // TODO: BEAM-1169: Metrics shouldn't verify the physical values tightly.
-    assertThat(metrics.counters(), hasItem(
-        attemptedMetricsResult(NAMESPACE, "count", "MyStep1", 3L)));
-    assertThat(metrics.distributions(), hasItem(
-        attemptedMetricsResult(NAMESPACE, "input", "MyStep1",
-            DistributionResult.create(26L, 3L, 5L, 13L))));
+  @Category({ValidatesRunner.class, UsesAttemptedMetrics.class, UsesDistributionMetrics.class})
+  @Test
+  public void testAttemptedDistributionMetrics() {
+    PipelineResult result = runPipelineWithMetrics();
+    MetricQueryResults metrics = queryTestMetrics(result);
+    assertDistributionMetrics(metrics, false);
+  }
 
-    assertThat(metrics.counters(), hasItem(
-        attemptedMetricsResult(NAMESPACE, "count", "MyStep2", 6L)));
-    assertThat(metrics.distributions(), hasItem(
-        attemptedMetricsResult(NAMESPACE, "input", "MyStep2",
-            DistributionResult.create(52L, 6L, 5L, 13L))));
-    assertThat(metrics.gauges(), hasItem(
-        attemptedMetricsResult(NAMESPACE, "my-gauge", "MyStep2",
-            GaugeResult.create(12L, Instant.now()))));
+  @Category({ValidatesRunner.class, UsesCommittedMetrics.class, UsesGaugeMetrics.class})
+  @Test
+  public void testCommittedGaugeMetrics() {
+    PipelineResult result = runPipelineWithMetrics();
+    MetricQueryResults metrics = queryTestMetrics(result);
+    assertGaugeMetrics(metrics, true);
+  }
 
-    assertThat(metrics.distributions(), hasItem(
-        distributionAttemptedMinMax(NAMESPACE, "bundle", "MyStep1", 10L, 40L)));
+  @Category({ValidatesRunner.class, UsesAttemptedMetrics.class, UsesGaugeMetrics.class})
+  @Test
+  public void testAttemptedGaugeMetrics() {
+    PipelineResult result = runPipelineWithMetrics();
+    MetricQueryResults metrics = queryTestMetrics(result);
+    assertGaugeMetrics(metrics, false);
   }
 
   private PipelineResult runPipelineWithMetrics() {
@@ -232,8 +252,39 @@ public class MetricsTest implements Serializable {
     return result;
   }
 
+  private static void assertCounterMetrics(MetricQueryResults metrics, boolean isCommitted) {
+    assertThat(metrics.counters(), hasItem(
+        metricsResult(NAMESPACE, "count", "MyStep1", 3L, isCommitted)));
+    assertThat(metrics.counters(), hasItem(
+        metricsResult(NAMESPACE, "count", "MyStep2", 6L, isCommitted)));
+  }
+
+  private static void assertGaugeMetrics(MetricQueryResults metrics, boolean isCommitted) {
+    assertThat(metrics.gauges(), hasItem(
+        metricsResult(NAMESPACE, "my-gauge", "MyStep2",
+            GaugeResult.create(12L, Instant.now()), isCommitted)));
+  }
+
+  private static void assertDistributionMetrics(MetricQueryResults metrics, boolean isCommitted) {
+    assertThat(metrics.distributions(), hasItem(
+        metricsResult(NAMESPACE, "input", "MyStep1",
+            DistributionResult.create(26L, 3L, 5L, 13L), isCommitted)));
+
+    assertThat(metrics.distributions(), hasItem(
+        metricsResult(NAMESPACE, "input", "MyStep2",
+            DistributionResult.create(52L, 6L, 5L, 13L), isCommitted)));
+    assertThat(metrics.distributions(), hasItem(
+        distributionMinMax(NAMESPACE, "bundle", "MyStep1", 10L, 40L, isCommitted)));
+  }
+
+  private static void assertAllMetrics(MetricQueryResults metrics, boolean isCommitted) {
+    assertCounterMetrics(metrics, isCommitted);
+    assertDistributionMetrics(metrics, isCommitted);
+    assertGaugeMetrics(metrics, isCommitted);
+  }
+
   @Test
-  @Category({ValidatesRunner.class, UsesAttemptedMetrics.class})
+  @Category({ValidatesRunner.class, UsesAttemptedMetrics.class, UsesCounterMetrics.class})
   public void testBoundedSourceMetrics() {
     long numElements = 1000;
 
@@ -259,11 +310,10 @@ public class MetricsTest implements Serializable {
   }
 
   @Test
-  @Category({ValidatesRunner.class, UsesAttemptedMetrics.class})
+  @Category({ValidatesRunner.class, UsesAttemptedMetrics.class, UsesCounterMetrics.class})
   public void testUnboundedSourceMetrics() {
     long numElements = 1000;
 
-
     // Use withMaxReadTime to force unbounded mode.
     pipeline.apply(
         GenerateSequence.from(0).to(numElements).withMaxReadTime(Duration.standardDays(1)));


[2/2] beam git commit: This closes #2633

Posted by dh...@apache.org.
This closes #2633


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f23dd670
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f23dd670
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f23dd670

Branch: refs/heads/master
Commit: f23dd6709bff3cf3827bfeeab0c122ca23ac09ca
Parents: de9d89c 4faa8fe
Author: Dan Halperin <dh...@google.com>
Authored: Wed Apr 26 13:49:00 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Apr 26 13:49:00 2017 -0700

----------------------------------------------------------------------
 runners/google-cloud-dataflow-java/pom.xml      |   4 +-
 .../beam/sdk/testing/UsesCounterMetrics.java    |  25 +++
 .../sdk/testing/UsesDistributionMetrics.java    |  26 ++++
 .../beam/sdk/testing/UsesGaugeMetrics.java      |  25 +++
 .../apache/beam/sdk/metrics/MetricMatchers.java | 144 ++++++-----------
 .../apache/beam/sdk/metrics/MetricsTest.java    | 154 ++++++++++++-------
 6 files changed, 225 insertions(+), 153 deletions(-)
----------------------------------------------------------------------