You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by av...@apache.org on 2017/05/04 18:10:35 UTC

[3/6] beam git commit: [BEAM-1763] Verify PAssert execution in runners which support metrics.

[BEAM-1763] Verify PAssert execution in runners which support metrics.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/95ade45e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/95ade45e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/95ade45e

Branch: refs/heads/master
Commit: 95ade45eced4787eb67a9d4d13dae48ffb176919
Parents: 48c8ed1
Author: Aviem Zur <av...@gmail.com>
Authored: Tue May 2 19:00:29 2017 +0300
Committer: Aviem Zur <av...@gmail.com>
Committed: Thu May 4 20:48:56 2017 +0300

----------------------------------------------------------------------
 .../apache/beam/runners/flink/FlinkRunner.java  |  3 ++
 .../beam/runners/spark/TestSparkRunner.java     | 47 --------------------
 .../ResumeFromCheckpointStreamingTest.java      | 12 +++--
 .../beam/sdk/metrics/MetricsEnvironment.java    |  5 +++
 .../apache/beam/sdk/testing/TestPipeline.java   | 46 ++++++++++++++++---
 5 files changed, 57 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/95ade45e/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
index 181ffda..a5972ef 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
@@ -31,6 +31,7 @@ import java.util.SortedSet;
 import java.util.TreeSet;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.runners.PipelineRunner;
@@ -103,6 +104,8 @@ public class FlinkRunner extends PipelineRunner<PipelineResult> {
   public PipelineResult run(Pipeline pipeline) {
     logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline);
 
+    MetricsEnvironment.setMetricsSupported(true);
+
     LOG.info("Executing pipeline using FlinkRunner.");
 
     FlinkPipelineExecutionEnvironment env = new FlinkPipelineExecutionEnvironment(options);

http://git-wip-us.apache.org/repos/asf/beam/blob/95ade45e/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
index 10e98b8..1e67813 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
@@ -40,15 +40,11 @@ import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource;
-import org.apache.beam.sdk.metrics.MetricNameFilter;
-import org.apache.beam.sdk.metrics.MetricResult;
-import org.apache.beam.sdk.metrics.MetricsFilter;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.runners.PTransformOverride;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -116,8 +112,6 @@ public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> {
     }
     SparkPipelineResult result = null;
 
-    int expectedNumberOfAssertions = PAssert.countAsserts(pipeline);
-
     // clear state of Aggregators, Metrics and Watermarks if exists.
     AggregatorsAccumulator.clear();
     MetricsAccumulator.clear();
@@ -137,47 +131,6 @@ public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> {
             String.format("Finish state %s is not allowed.", finishState),
             finishState,
             isOneOf(PipelineResult.State.STOPPED, PipelineResult.State.DONE));
-
-        // validate assertion succeeded (at least once).
-        long successAssertions = 0;
-        Iterable<MetricResult<Long>> counterResults = result.metrics().queryMetrics(
-            MetricsFilter.builder()
-                .addNameFilter(MetricNameFilter.named(PAssert.class, PAssert.SUCCESS_COUNTER))
-                .build()).counters();
-        for (MetricResult<Long> counter : counterResults) {
-          if (counter.attempted().longValue() > 0) {
-            successAssertions++;
-          }
-        }
-        Integer expectedAssertions = testSparkPipelineOptions.getExpectedAssertions() != null
-            ? testSparkPipelineOptions.getExpectedAssertions() : expectedNumberOfAssertions;
-        assertThat(
-            String.format(
-                "Expected %d successful assertions, but found %d.",
-                expectedAssertions, successAssertions),
-            successAssertions,
-            is(expectedAssertions.longValue()));
-        // validate assertion didn't fail.
-        long failedAssertions = 0;
-        Iterable<MetricResult<Long>> failCounterResults = result.metrics().queryMetrics(
-            MetricsFilter.builder()
-                .addNameFilter(MetricNameFilter.named(PAssert.class, PAssert.FAILURE_COUNTER))
-                .build()).counters();
-        for (MetricResult<Long> counter : failCounterResults) {
-          if (counter.attempted().longValue() > 0) {
-            failedAssertions++;
-          }
-        }
-        assertThat(
-            String.format("Found %d failed assertions.", failedAssertions),
-            failedAssertions,
-            is(0L));
-
-        LOG.info(
-            String.format(
-                "Successfully asserted pipeline %s with %d successful assertions.",
-                testSparkPipelineOptions.getJobName(),
-                successAssertions));
       } finally {
         try {
           // cleanup checkpoint dir.

http://git-wip-us.apache.org/repos/asf/beam/blob/95ade45e/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
index 7d7fd08..33571f0 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
@@ -180,7 +180,8 @@ public class ResumeFromCheckpointStreamingTest {
     long successAssertions = 0;
     Iterable<MetricResult<Long>> counterResults = res.metrics().queryMetrics(
         MetricsFilter.builder()
-            .addNameFilter(MetricNameFilter.named(PAssert.class, PAssert.SUCCESS_COUNTER))
+            .addNameFilter(
+                MetricNameFilter.named(PAssertWithoutFlatten.class, PAssert.SUCCESS_COUNTER))
             .build()).counters();
     for (MetricResult<Long> counter : counterResults) {
       if (counter.attempted().longValue() > 0) {
@@ -196,7 +197,8 @@ public class ResumeFromCheckpointStreamingTest {
     long failedAssertions = 0;
     Iterable<MetricResult<Long>> failCounterResults = res.metrics().queryMetrics(
         MetricsFilter.builder()
-            .addNameFilter(MetricNameFilter.named(PAssert.class, PAssert.FAILURE_COUNTER))
+            .addNameFilter(MetricNameFilter.named(
+                PAssertWithoutFlatten.class, PAssert.FAILURE_COUNTER))
             .build()).counters();
     for (MetricResult<Long> counter : failCounterResults) {
       if (counter.attempted().longValue() > 0) {
@@ -330,8 +332,10 @@ public class ResumeFromCheckpointStreamingTest {
     }
 
     private static class AssertDoFn<T> extends DoFn<Iterable<T>, Void> {
-      private final Counter success = Metrics.counter(PAssert.class, PAssert.SUCCESS_COUNTER);
-      private final Counter failure = Metrics.counter(PAssert.class, PAssert.FAILURE_COUNTER);
+      private final Counter success =
+          Metrics.counter(PAssertWithoutFlatten.class, PAssert.SUCCESS_COUNTER);
+      private final Counter failure =
+          Metrics.counter(PAssertWithoutFlatten.class, PAssert.FAILURE_COUNTER);
       private final T[] expected;
 
       AssertDoFn(T[] expected) {

http://git-wip-us.apache.org/repos/asf/beam/blob/95ade45e/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
index 2942578..a4b311f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
@@ -70,6 +70,11 @@ public class MetricsEnvironment {
     METRICS_SUPPORTED.set(supported);
   }
 
+  /** Indicates whether metrics reporting is supported. */
+  public static boolean isMetricsSupported() {
+    return METRICS_SUPPORTED.get();
+  }
+
   /**
    * Set the {@link MetricsContainer} for the current thread.
    *

http://git-wip-us.apache.org/repos/asf/beam/blob/95ade45e/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index 868dcbd..d8fe51d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -18,6 +18,8 @@
 package org.apache.beam.sdk.testing;
 
 import static com.google.common.base.Preconditions.checkState;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
 
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.TreeNode;
@@ -41,6 +43,10 @@ import javax.annotation.Nullable;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.metrics.MetricNameFilter;
+import org.apache.beam.sdk.metrics.MetricResult;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.beam.sdk.metrics.MetricsFilter;
 import org.apache.beam.sdk.options.ApplicationNameOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
@@ -186,8 +192,8 @@ public class TestPipeline extends Pipeline implements TestRule {
           if (pipelineRunSucceeded() && !visitedAll(pipelineNodes)) {
             final boolean hasDanglingPAssert =
                 FluentIterable.from(pipelineNodes)
-                              .filter(Predicates.not(Predicates.in(runVisitedNodes)))
-                              .anyMatch(isPAssertNode);
+                    .filter(Predicates.not(Predicates.in(runVisitedNodes)))
+                    .anyMatch(isPAssertNode);
             if (hasDanglingPAssert) {
               throw new AbandonedNodeException("The pipeline contains abandoned PAssert(s).");
             } else {
@@ -319,12 +325,13 @@ public class TestPipeline extends Pipeline implements TestRule {
     checkState(
         enforcement.isPresent(),
         "Is your TestPipeline declaration missing a @Rule annotation? Usage: "
-        + "@Rule public final transient TestPipeline pipeline = TestPipeline.create();");
+            + "@Rule public final transient TestPipeline pipeline = TestPipeline.create();");
 
     final PipelineResult pipelineResult;
     try {
       enforcement.get().beforePipelineExecution();
       pipelineResult = super.run();
+      verifyPAssertsSucceeded(pipelineResult);
     } catch (RuntimeException exc) {
       Throwable cause = exc.getCause();
       if (cause instanceof AssertionError) {
@@ -385,8 +392,8 @@ public class TestPipeline extends Pipeline implements TestRule {
           Strings.isNullOrEmpty(beamTestPipelineOptions)
               ? PipelineOptionsFactory.create()
               : PipelineOptionsFactory.fromArgs(
-                      MAPPER.readValue(beamTestPipelineOptions, String[].class))
-                  .as(TestPipelineOptions.class);
+              MAPPER.readValue(beamTestPipelineOptions, String[].class))
+              .as(TestPipelineOptions.class);
 
       options.as(ApplicationNameOptions.class).setAppName(getAppName());
       // If no options were specified, set some reasonable defaults
@@ -488,6 +495,35 @@ public class TestPipeline extends Pipeline implements TestRule {
     return firstInstanceAfterTestPipeline;
   }
 
+  /**
+   * Verifies all {{@link PAssert PAsserts}} in the pipeline have been executed and were successful.
+   *
+   * <p>Note this only runs for runners which support Metrics. Runners which do not should verify
+   * this in some other way. See: https://issues.apache.org/jira/browse/BEAM-2001</p>
+   */
+  private void verifyPAssertsSucceeded(PipelineResult pipelineResult) {
+    if (MetricsEnvironment.isMetricsSupported()) {
+      long expectedNumberOfAssertions = (long) PAssert.countAsserts(this);
+
+      long successfulAssertions = 0;
+      Iterable<MetricResult<Long>> successCounterResults =
+          pipelineResult.metrics().queryMetrics(
+              MetricsFilter.builder()
+                  .addNameFilter(MetricNameFilter.named(PAssert.class, PAssert.SUCCESS_COUNTER))
+                  .build())
+              .counters();
+      for (MetricResult<Long> counter : successCounterResults) {
+        if (counter.attempted() > 0) {
+          successfulAssertions++;
+        }
+      }
+
+      assertThat(String
+          .format("Expected %d successful assertions, but found %d.", expectedNumberOfAssertions,
+              successfulAssertions), successfulAssertions, is(expectedNumberOfAssertions));
+    }
+  }
+
   private static class IsEmptyVisitor extends PipelineVisitor.Defaults {
     private boolean empty = true;