You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/21 20:23:45 UTC

[37/50] incubator-beam git commit: Add RunnableOnService test for Metrics

Add RunnableOnService test for Metrics

Add UsesMetrics interface and exclude from runners that don't yet
support Metrics

Add Serializability as needed for Metrics to be created during pipeline
construction

Remove test from DirectRunnerTest


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/998cabc8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/998cabc8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/998cabc8

Branch: refs/heads/gearpump-runner
Commit: 998cabc8bbbf8d08d7bfad71e9376707388f5c5c
Parents: 66318d8
Author: bchambers <bc...@google.com>
Authored: Thu Dec 15 17:04:59 2016 -0800
Committer: bchambers <bc...@google.com>
Committed: Mon Dec 19 11:29:39 2016 -0800

----------------------------------------------------------------------
 runners/apex/pom.xml                            |  3 +-
 .../beam/runners/direct/DirectRunnerTest.java   | 39 ------------
 runners/flink/runner/pom.xml                    |  6 +-
 runners/google-cloud-dataflow-java/pom.xml      |  3 +-
 runners/spark/pom.xml                           |  3 +-
 .../org/apache/beam/sdk/metrics/MetricName.java |  3 +-
 .../org/apache/beam/sdk/metrics/Metrics.java    |  5 +-
 .../apache/beam/sdk/testing/UsesMetrics.java    | 24 ++++++++
 .../apache/beam/sdk/metrics/MetricMatchers.java |  4 +-
 .../apache/beam/sdk/metrics/MetricsTest.java    | 63 +++++++++++++++++++-
 10 files changed, 103 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/998cabc8/runners/apex/pom.xml
----------------------------------------------------------------------
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index f71637c..d03964d 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -186,7 +186,8 @@
               <excludedGroups>
                 org.apache.beam.sdk.testing.UsesStatefulParDo,
                 org.apache.beam.sdk.testing.UsesTimersInParDo,
-                org.apache.beam.sdk.testing.UsesSplittableParDo
+                org.apache.beam.sdk.testing.UsesSplittableParDo,
+                org.apache.beam.sdk.testing.UsesMetrics
               </excludedGroups>
               <parallel>none</parallel>
               <failIfNoTests>true</failIfNoTests>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/998cabc8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index eb0f344..eafb788 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -18,8 +18,6 @@
 package org.apache.beam.runners.direct;
 
 import static com.google.common.base.Preconditions.checkState;
-import static org.apache.beam.sdk.metrics.MetricMatchers.metricResult;
-import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.isA;
@@ -37,7 +35,6 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
@@ -48,13 +45,6 @@ import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.CountingInput;
 import org.apache.beam.sdk.io.CountingSource;
 import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.metrics.Counter;
-import org.apache.beam.sdk.metrics.Distribution;
-import org.apache.beam.sdk.metrics.DistributionResult;
-import org.apache.beam.sdk.metrics.MetricNameFilter;
-import org.apache.beam.sdk.metrics.MetricQueryResults;
-import org.apache.beam.sdk.metrics.Metrics;
-import org.apache.beam.sdk.metrics.MetricsFilter;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.PipelineRunner;
@@ -467,35 +457,6 @@ public class DirectRunnerTest implements Serializable {
     }
   }
 
-  @Test
-  public void testMetrics() throws Exception {
-    Pipeline pipeline = getPipeline();
-    pipeline
-        .apply(Create.of(5, 8, 13))
-        .apply("MyStep", ParDo.of(new DoFn<Integer, Void>() {
-          @ProcessElement
-          public void processElement(ProcessContext c) {
-            Counter count = Metrics.counter(DirectRunnerTest.class, "count");
-            Distribution values = Metrics.distribution(DirectRunnerTest.class, "input");
-
-            count.inc();
-            values.update(c.element());
-          }
-        }));
-    PipelineResult result = pipeline.run();
-    MetricQueryResults metrics = result.metrics().queryMetrics(MetricsFilter.builder()
-        .addNameFilter(MetricNameFilter.inNamespace(DirectRunnerTest.class))
-        .build());
-
-    final String stepName = "MyStep/AnonymousParDo/AnonymousParMultiDo";
-    assertThat(metrics.counters(), contains(
-        metricResult(DirectRunnerTest.class.getName(), "count", stepName, 3L, 3L)));
-    assertThat(metrics.distributions(), contains(
-        metricResult(DirectRunnerTest.class.getName(), "input", stepName,
-            DistributionResult.create(26L, 3L, 5L, 13L),
-            DistributionResult.create(26L, 3L, 5L, 13L))));
-  }
-
   private static class MustSplitSource<T> extends BoundedSource<T>{
     public static <T> BoundedSource<T> of(BoundedSource<T> underlying) {
       return new MustSplitSource<>(underlying);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/998cabc8/runners/flink/runner/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml
index 09773e1..7f49372 100644
--- a/runners/flink/runner/pom.xml
+++ b/runners/flink/runner/pom.xml
@@ -56,7 +56,8 @@
                   <excludedGroups>
                     org.apache.beam.sdk.testing.UsesStatefulParDo,
                     org.apache.beam.sdk.testing.UsesTimersInParDo,
-                    org.apache.beam.sdk.testing.UsesSplittableParDo
+                    org.apache.beam.sdk.testing.UsesSplittableParDo,
+                    org.apache.beam.sdk.testing.UsesMetrics
                   </excludedGroups>
                   <parallel>none</parallel>
                   <failIfNoTests>true</failIfNoTests>
@@ -86,7 +87,8 @@
                   <excludedGroups>
                     org.apache.beam.sdk.testing.UsesStatefulParDo,
                     org.apache.beam.sdk.testing.UsesTimersInParDo,
-                    org.apache.beam.sdk.testing.UsesSplittableParDo
+                    org.apache.beam.sdk.testing.UsesSplittableParDo,
+                    org.apache.beam.sdk.testing.UsesMetrics
                   </excludedGroups>
                   <parallel>none</parallel>
                   <failIfNoTests>true</failIfNoTests>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/998cabc8/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index 46ac7ef..0094791 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -80,7 +80,8 @@
               <excludedGroups>
                 org.apache.beam.sdk.testing.UsesStatefulParDo,
                 org.apache.beam.sdk.testing.UsesTimersInParDo,
-                org.apache.beam.sdk.testing.UsesSplittableParDo
+                org.apache.beam.sdk.testing.UsesSplittableParDo,
+                org.apache.beam.sdk.testing.UsesMetrics
               </excludedGroups>
               <excludes>
                 <exclude>org.apache.beam.sdk.transforms.FlattenTest</exclude>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/998cabc8/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 5a2fe87..309e1ff 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -75,7 +75,8 @@
                   <excludedGroups>
                     org.apache.beam.sdk.testing.UsesStatefulParDo,
                     org.apache.beam.sdk.testing.UsesTimersInParDo,
-                    org.apache.beam.sdk.testing.UsesSplittableParDo
+                    org.apache.beam.sdk.testing.UsesSplittableParDo,
+                    org.apache.beam.sdk.testing.UsesMetrics
                   </excludedGroups>
                   <forkCount>1</forkCount>
                   <reuseForks>false</reuseForks>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/998cabc8/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricName.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricName.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricName.java
index 843a885..3c77043 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricName.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricName.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.metrics;
 
 import com.google.auto.value.AutoValue;
+import java.io.Serializable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 
@@ -28,7 +29,7 @@ import org.apache.beam.sdk.annotations.Experimental.Kind;
  */
 @Experimental(Kind.METRICS)
 @AutoValue
-public abstract class MetricName {
+public abstract class MetricName implements Serializable {
 
   /** The namespace associated with this metric. */
   public abstract String namespace();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/998cabc8/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
index b72a0b2..045e076 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.metrics;
 
+import java.io.Serializable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 
@@ -58,7 +59,7 @@ public class Metrics {
   }
 
   /** Implementation of {@link Counter} that delegates to the instance for the current context. */
-  private static class DelegatingCounter implements Counter {
+  private static class DelegatingCounter implements Counter, Serializable {
     private final MetricName name;
 
     private DelegatingCounter(MetricName name) {
@@ -92,7 +93,7 @@ public class Metrics {
   /**
    * Implementation of {@link Distribution} that delegates to the instance for the current context.
    */
-  private static class DelegatingDistribution implements Distribution {
+  private static class DelegatingDistribution implements Distribution, Serializable {
     private final MetricName name;
 
     private DelegatingDistribution(MetricName name) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/998cabc8/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesMetrics.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesMetrics.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesMetrics.java
new file mode 100644
index 0000000..261354c
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesMetrics.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.testing;
+
+/**
+ * Category tag for validation tests which utilize {@link org.apache.beam.sdk.metrics.Metrics}.
+ */
+public interface UsesMetrics {}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/998cabc8/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java
index 6cd4c52..798d9d4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java
@@ -78,7 +78,7 @@ public class MetricMatchers {
       protected boolean matchesSafely(MetricResult<T> item) {
         return Objects.equals(namespace, item.name().namespace())
             && Objects.equals(name, item.name().name())
-            && Objects.equals(step, item.step())
+            && item.step().contains(step)
             && Objects.equals(committed, item.committed())
             && Objects.equals(attempted, item.attempted());
       }
@@ -109,7 +109,7 @@ public class MetricMatchers {
               .appendText(" != ").appendValue(item.name().name());
         }
 
-        if (!Objects.equals(step, item.step())) {
+        if (!item.step().contains(step)) {
           mismatchDescription
               .appendText("step: ").appendValue(step)
               .appendText(" != ").appendValue(item.step());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/998cabc8/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
index 732cb34..075df19 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
@@ -18,18 +18,30 @@
 
 package org.apache.beam.sdk.metrics;
 
+import static org.apache.beam.sdk.metrics.MetricMatchers.metricResult;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasItem;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 
+import java.io.Serializable;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.testing.RunnableOnService;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.UsesMetrics;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
 import org.hamcrest.CoreMatchers;
 import org.junit.After;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 /**
  * Tests for {@link Metrics}.
  */
-public class MetricsTest {
+public class MetricsTest implements Serializable {
 
   private static final String NS = "test";
   private static final String NAME = "name";
@@ -95,4 +107,53 @@ public class MetricsTest {
     counter.dec();
     assertThat(cell.getCumulative(), CoreMatchers.equalTo(42L));
   }
+
+  @Category({RunnableOnService.class, UsesMetrics.class})
+  @Test
+  public void metricsReportToQuery() {
+    final Counter count = Metrics.counter(MetricsTest.class, "count");
+    Pipeline pipeline = TestPipeline.create();
+    pipeline
+        .apply(Create.of(5, 8, 13))
+        .apply("MyStep1", ParDo.of(new DoFn<Integer, Integer>() {
+          @ProcessElement
+          public void processElement(ProcessContext c) {
+            Distribution values = Metrics.distribution(MetricsTest.class, "input");
+            count.inc();
+            values.update(c.element());
+
+            c.output(c.element());
+            c.output(c.element());
+          }
+        }))
+        .apply("MyStep2", ParDo.of(new DoFn<Integer, Integer>() {
+          @ProcessElement
+          public void processElement(ProcessContext c) {
+            Distribution values = Metrics.distribution(MetricsTest.class, "input");
+            count.inc();
+            values.update(c.element());
+          }
+        }));
+    PipelineResult result = pipeline.run();
+
+    result.waitUntilFinish();
+
+    MetricQueryResults metrics = result.metrics().queryMetrics(MetricsFilter.builder()
+      .addNameFilter(MetricNameFilter.inNamespace(MetricsTest.class))
+      .build());
+    // TODO: BEAM-1169: Metrics shouldn't verify the physical values tightly.
+    assertThat(metrics.counters(), hasItem(
+        metricResult(MetricsTest.class.getName(), "count", "MyStep1", 3L, 3L)));
+    assertThat(metrics.distributions(), hasItem(
+        metricResult(MetricsTest.class.getName(), "input", "MyStep1",
+            DistributionResult.create(26L, 3L, 5L, 13L),
+            DistributionResult.create(26L, 3L, 5L, 13L))));
+
+    assertThat(metrics.counters(), hasItem(
+        metricResult(MetricsTest.class.getName(), "count", "MyStep2", 6L, 6L)));
+    assertThat(metrics.distributions(), hasItem(
+        metricResult(MetricsTest.class.getName(), "input", "MyStep2",
+            DistributionResult.create(52L, 6L, 5L, 13L),
+            DistributionResult.create(52L, 6L, 5L, 13L))));
+  }
 }