You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by av...@apache.org on 2017/05/04 18:10:35 UTC
[3/6] beam git commit: [BEAM-1763] Verify PAssert execution in
runners which support metrics.
[BEAM-1763] Verify PAssert execution in runners which support metrics.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/95ade45e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/95ade45e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/95ade45e
Branch: refs/heads/master
Commit: 95ade45eced4787eb67a9d4d13dae48ffb176919
Parents: 48c8ed1
Author: Aviem Zur <av...@gmail.com>
Authored: Tue May 2 19:00:29 2017 +0300
Committer: Aviem Zur <av...@gmail.com>
Committed: Thu May 4 20:48:56 2017 +0300
----------------------------------------------------------------------
.../apache/beam/runners/flink/FlinkRunner.java | 3 ++
.../beam/runners/spark/TestSparkRunner.java | 47 --------------------
.../ResumeFromCheckpointStreamingTest.java | 12 +++--
.../beam/sdk/metrics/MetricsEnvironment.java | 5 +++
.../apache/beam/sdk/testing/TestPipeline.java | 46 ++++++++++++++++---
5 files changed, 57 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/95ade45e/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
index 181ffda..a5972ef 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
@@ -31,6 +31,7 @@ import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.runners.PipelineRunner;
@@ -103,6 +104,8 @@ public class FlinkRunner extends PipelineRunner<PipelineResult> {
public PipelineResult run(Pipeline pipeline) {
logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline);
+ MetricsEnvironment.setMetricsSupported(true);
+
LOG.info("Executing pipeline using FlinkRunner.");
FlinkPipelineExecutionEnvironment env = new FlinkPipelineExecutionEnvironment(options);
http://git-wip-us.apache.org/repos/asf/beam/blob/95ade45e/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
index 10e98b8..1e67813 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
@@ -40,15 +40,11 @@ import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource;
-import org.apache.beam.sdk.metrics.MetricNameFilter;
-import org.apache.beam.sdk.metrics.MetricResult;
-import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
@@ -116,8 +112,6 @@ public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> {
}
SparkPipelineResult result = null;
- int expectedNumberOfAssertions = PAssert.countAsserts(pipeline);
-
// clear state of Aggregators, Metrics and Watermarks if exists.
AggregatorsAccumulator.clear();
MetricsAccumulator.clear();
@@ -137,47 +131,6 @@ public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> {
String.format("Finish state %s is not allowed.", finishState),
finishState,
isOneOf(PipelineResult.State.STOPPED, PipelineResult.State.DONE));
-
- // validate assertion succeeded (at least once).
- long successAssertions = 0;
- Iterable<MetricResult<Long>> counterResults = result.metrics().queryMetrics(
- MetricsFilter.builder()
- .addNameFilter(MetricNameFilter.named(PAssert.class, PAssert.SUCCESS_COUNTER))
- .build()).counters();
- for (MetricResult<Long> counter : counterResults) {
- if (counter.attempted().longValue() > 0) {
- successAssertions++;
- }
- }
- Integer expectedAssertions = testSparkPipelineOptions.getExpectedAssertions() != null
- ? testSparkPipelineOptions.getExpectedAssertions() : expectedNumberOfAssertions;
- assertThat(
- String.format(
- "Expected %d successful assertions, but found %d.",
- expectedAssertions, successAssertions),
- successAssertions,
- is(expectedAssertions.longValue()));
- // validate assertion didn't fail.
- long failedAssertions = 0;
- Iterable<MetricResult<Long>> failCounterResults = result.metrics().queryMetrics(
- MetricsFilter.builder()
- .addNameFilter(MetricNameFilter.named(PAssert.class, PAssert.FAILURE_COUNTER))
- .build()).counters();
- for (MetricResult<Long> counter : failCounterResults) {
- if (counter.attempted().longValue() > 0) {
- failedAssertions++;
- }
- }
- assertThat(
- String.format("Found %d failed assertions.", failedAssertions),
- failedAssertions,
- is(0L));
-
- LOG.info(
- String.format(
- "Successfully asserted pipeline %s with %d successful assertions.",
- testSparkPipelineOptions.getJobName(),
- successAssertions));
} finally {
try {
// cleanup checkpoint dir.
http://git-wip-us.apache.org/repos/asf/beam/blob/95ade45e/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
index 7d7fd08..33571f0 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
@@ -180,7 +180,8 @@ public class ResumeFromCheckpointStreamingTest {
long successAssertions = 0;
Iterable<MetricResult<Long>> counterResults = res.metrics().queryMetrics(
MetricsFilter.builder()
- .addNameFilter(MetricNameFilter.named(PAssert.class, PAssert.SUCCESS_COUNTER))
+ .addNameFilter(
+ MetricNameFilter.named(PAssertWithoutFlatten.class, PAssert.SUCCESS_COUNTER))
.build()).counters();
for (MetricResult<Long> counter : counterResults) {
if (counter.attempted().longValue() > 0) {
@@ -196,7 +197,8 @@ public class ResumeFromCheckpointStreamingTest {
long failedAssertions = 0;
Iterable<MetricResult<Long>> failCounterResults = res.metrics().queryMetrics(
MetricsFilter.builder()
- .addNameFilter(MetricNameFilter.named(PAssert.class, PAssert.FAILURE_COUNTER))
+ .addNameFilter(MetricNameFilter.named(
+ PAssertWithoutFlatten.class, PAssert.FAILURE_COUNTER))
.build()).counters();
for (MetricResult<Long> counter : failCounterResults) {
if (counter.attempted().longValue() > 0) {
@@ -330,8 +332,10 @@ public class ResumeFromCheckpointStreamingTest {
}
private static class AssertDoFn<T> extends DoFn<Iterable<T>, Void> {
- private final Counter success = Metrics.counter(PAssert.class, PAssert.SUCCESS_COUNTER);
- private final Counter failure = Metrics.counter(PAssert.class, PAssert.FAILURE_COUNTER);
+ private final Counter success =
+ Metrics.counter(PAssertWithoutFlatten.class, PAssert.SUCCESS_COUNTER);
+ private final Counter failure =
+ Metrics.counter(PAssertWithoutFlatten.class, PAssert.FAILURE_COUNTER);
private final T[] expected;
AssertDoFn(T[] expected) {
http://git-wip-us.apache.org/repos/asf/beam/blob/95ade45e/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
index 2942578..a4b311f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
@@ -70,6 +70,11 @@ public class MetricsEnvironment {
METRICS_SUPPORTED.set(supported);
}
+ /** Indicates whether metrics reporting is supported. */
+ public static boolean isMetricsSupported() {
+ return METRICS_SUPPORTED.get();
+ }
+
/**
* Set the {@link MetricsContainer} for the current thread.
*
http://git-wip-us.apache.org/repos/asf/beam/blob/95ade45e/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index 868dcbd..d8fe51d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -18,6 +18,8 @@
package org.apache.beam.sdk.testing;
import static com.google.common.base.Preconditions.checkState;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.TreeNode;
@@ -41,6 +43,10 @@ import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.metrics.MetricNameFilter;
+import org.apache.beam.sdk.metrics.MetricResult;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
@@ -186,8 +192,8 @@ public class TestPipeline extends Pipeline implements TestRule {
if (pipelineRunSucceeded() && !visitedAll(pipelineNodes)) {
final boolean hasDanglingPAssert =
FluentIterable.from(pipelineNodes)
- .filter(Predicates.not(Predicates.in(runVisitedNodes)))
- .anyMatch(isPAssertNode);
+ .filter(Predicates.not(Predicates.in(runVisitedNodes)))
+ .anyMatch(isPAssertNode);
if (hasDanglingPAssert) {
throw new AbandonedNodeException("The pipeline contains abandoned PAssert(s).");
} else {
@@ -319,12 +325,13 @@ public class TestPipeline extends Pipeline implements TestRule {
checkState(
enforcement.isPresent(),
"Is your TestPipeline declaration missing a @Rule annotation? Usage: "
- + "@Rule public final transient TestPipeline pipeline = TestPipeline.create();");
+ + "@Rule public final transient TestPipeline pipeline = TestPipeline.create();");
final PipelineResult pipelineResult;
try {
enforcement.get().beforePipelineExecution();
pipelineResult = super.run();
+ verifyPAssertsSucceeded(pipelineResult);
} catch (RuntimeException exc) {
Throwable cause = exc.getCause();
if (cause instanceof AssertionError) {
@@ -385,8 +392,8 @@ public class TestPipeline extends Pipeline implements TestRule {
Strings.isNullOrEmpty(beamTestPipelineOptions)
? PipelineOptionsFactory.create()
: PipelineOptionsFactory.fromArgs(
- MAPPER.readValue(beamTestPipelineOptions, String[].class))
- .as(TestPipelineOptions.class);
+ MAPPER.readValue(beamTestPipelineOptions, String[].class))
+ .as(TestPipelineOptions.class);
options.as(ApplicationNameOptions.class).setAppName(getAppName());
// If no options were specified, set some reasonable defaults
@@ -488,6 +495,35 @@ public class TestPipeline extends Pipeline implements TestRule {
return firstInstanceAfterTestPipeline;
}
+ /**
+ * Verifies all {{@link PAssert PAsserts}} in the pipeline have been executed and were successful.
+ *
+ * <p>Note this only runs for runners which support Metrics. Runners which do not should verify
+ * this in some other way. See: https://issues.apache.org/jira/browse/BEAM-2001</p>
+ */
+ private void verifyPAssertsSucceeded(PipelineResult pipelineResult) {
+ if (MetricsEnvironment.isMetricsSupported()) {
+ long expectedNumberOfAssertions = (long) PAssert.countAsserts(this);
+
+ long successfulAssertions = 0;
+ Iterable<MetricResult<Long>> successCounterResults =
+ pipelineResult.metrics().queryMetrics(
+ MetricsFilter.builder()
+ .addNameFilter(MetricNameFilter.named(PAssert.class, PAssert.SUCCESS_COUNTER))
+ .build())
+ .counters();
+ for (MetricResult<Long> counter : successCounterResults) {
+ if (counter.attempted() > 0) {
+ successfulAssertions++;
+ }
+ }
+
+ assertThat(String
+ .format("Expected %d successful assertions, but found %d.", expectedNumberOfAssertions,
+ successfulAssertions), successfulAssertions, is(expectedNumberOfAssertions));
+ }
+ }
+
private static class IsEmptyVisitor extends PipelineVisitor.Defaults {
private boolean empty = true;