You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/17 22:24:02 UTC
[1/3] incubator-beam git commit: [BEAM-85,
BEAM-298] Make TestPipeline a JUnit Rule checking proper usage
Repository: incubator-beam
Updated Branches:
refs/heads/master c28957d16 -> 1c9bf8d66
[BEAM-85, BEAM-298] Make TestPipeline a JUnit Rule checking proper usage
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/443b25a4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/443b25a4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/443b25a4
Branch: refs/heads/master
Commit: 443b25a4d11201fb88f40da437ec7aab4b3e273f
Parents: 33b7ca7
Author: Stas Levin <st...@gmail.com>
Authored: Tue Dec 13 19:27:41 2016 +0200
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sat Dec 17 14:11:39 2016 -0800
----------------------------------------------------------------------
.../apache/beam/sdk/testing/TestPipeline.java | 207 ++++++++++++++++---
.../beam/sdk/testing/TestPipelineTest.java | 183 ++++++++++++++--
2 files changed, 344 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/443b25a4/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index 493d4cc..49ac3af 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -23,12 +23,17 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
import com.google.common.base.Strings;
+import com.google.common.collect.FluentIterable;
import com.google.common.collect.Iterators;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map.Entry;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
@@ -39,34 +44,39 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.util.TestCredential;
import org.junit.experimental.categories.Category;
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
/**
- * A creator of test pipelines that can be used inside of tests that can be
- * configured to run locally or against a remote pipeline runner.
+ * A creator of test pipelines that can be used inside of tests that can be configured to run
+ * locally or against a remote pipeline runner.
*
- * <p>It is recommended to tag hand-selected tests for this purpose using the
- * {@link RunnableOnService} {@link Category} annotation, as each test run against a pipeline runner
- * will utilize resources of that pipeline runner.
+ * <p>It is recommended to tag hand-selected tests for this purpose using the {@link
+ * RunnableOnService} {@link Category} annotation, as each test run against a pipeline runner will
+ * utilize resources of that pipeline runner.
*
* <p>In order to run tests on a pipeline runner, the following conditions must be met:
+ *
* <ul>
- * <li>System property "beamTestPipelineOptions" must contain a JSON delimited list of pipeline
- * options. For example:
- * <pre>{@code [
+ * <li>System property "beamTestPipelineOptions" must contain a JSON delimited list of pipeline
+ * options. For example:
+ * <pre>{@code [
* "--runner=org.apache.beam.runners.dataflow.testing.TestDataflowRunner",
* "--project=mygcpproject",
* "--stagingLocation=gs://mygcsbucket/path"
* ]}</pre>
* Note that the set of pipeline options required is pipeline runner specific.
- * </li>
- * <li>Jars containing the SDK and test classes must be available on the classpath.</li>
+ * <li>Jars containing the SDK and test classes must be available on the classpath.
* </ul>
*
* <p>Use {@link PAssert} for tests, as it integrates with this test harness in both direct and
* remote execution modes. For example:
+ *
* <pre>{@code
* Pipeline p = TestPipeline.create();
* PCollection<Integer> output = ...
@@ -76,19 +86,136 @@ import org.junit.experimental.categories.Category;
* p.run();
* }</pre>
*
- * <p>For pipeline runners, it is required that they must throw an {@link AssertionError}
- * containing the message from the {@link PAssert} that failed.
+ * <p>For pipeline runners, it is required that they must throw an {@link AssertionError} containing
+ * the message from the {@link PAssert} that failed.
*/
-public class TestPipeline extends Pipeline {
+public class TestPipeline extends Pipeline implements TestRule {
+
+ private static class PipelineRunEnforcement {
+
+ protected boolean enableAutoRunIfMissing;
+ protected final Pipeline pipeline;
+ private boolean runInvoked;
+
+ private PipelineRunEnforcement(final Pipeline pipeline) {
+ this.pipeline = pipeline;
+ }
+
+ private void enableAutoRunIfMissing(final boolean enable) {
+ enableAutoRunIfMissing = enable;
+ }
+
+ protected void beforePipelineExecution() {
+ runInvoked = true;
+ }
+
+ protected void afterTestCompletion() {
+ if (!runInvoked && enableAutoRunIfMissing) {
+ pipeline.run().waitUntilFinish();
+ }
+ }
+ }
+
+ private static class PipelineAbandonedNodeEnforcement extends PipelineRunEnforcement {
+
+ private List<TransformHierarchy.Node> runVisitedNodes;
+
+ private final Predicate<TransformHierarchy.Node> isPAssertNode =
+ new Predicate<TransformHierarchy.Node>() {
+
+ @Override
+ public boolean apply(final TransformHierarchy.Node node) {
+ return node.getTransform() instanceof PAssert.GroupThenAssert
+ || node.getTransform() instanceof PAssert.GroupThenAssertForSingleton
+ || node.getTransform() instanceof PAssert.OneSideInputAssert;
+ }
+ };
+
+ private static class NodeRecorder extends PipelineVisitor.Defaults {
+
+ private final List<TransformHierarchy.Node> visited = new LinkedList<>();
+
+ @Override
+ public void leaveCompositeTransform(final TransformHierarchy.Node node) {
+ visited.add(node);
+ }
+
+ @Override
+ public void visitPrimitiveTransform(final TransformHierarchy.Node node) {
+ visited.add(node);
+ }
+ }
+
+ private PipelineAbandonedNodeEnforcement(final TestPipeline pipeline) {
+ super(pipeline);
+ }
+
+ private List<TransformHierarchy.Node> recordPipelineNodes(final Pipeline pipeline) {
+ final NodeRecorder nodeRecorder = new NodeRecorder();
+ pipeline.traverseTopologically(nodeRecorder);
+ return nodeRecorder.visited;
+ }
+
+ private void verifyPipelineExecution() {
+ final List<TransformHierarchy.Node> pipelineNodes = recordPipelineNodes(pipeline);
+ if (runVisitedNodes != null && !runVisitedNodes.equals(pipelineNodes)) {
+ final boolean hasDanglingPAssert =
+ FluentIterable.from(pipelineNodes)
+ .filter(Predicates.not(Predicates.in(runVisitedNodes)))
+ .anyMatch(isPAssertNode);
+ if (hasDanglingPAssert) {
+ throw new AbandonedNodeException("The pipeline contains abandoned PAssert(s).");
+ } else {
+ throw new AbandonedNodeException("The pipeline contains abandoned PTransform(s).");
+ }
+ } else if (runVisitedNodes == null && !enableAutoRunIfMissing) {
+ throw new PipelineRunMissingException("The pipeline has not been run.");
+ }
+ }
+
+ @Override
+ protected void beforePipelineExecution() {
+ super.beforePipelineExecution();
+ runVisitedNodes = recordPipelineNodes(pipeline);
+ }
+
+ @Override
+ protected void afterTestCompletion() {
+ super.afterTestCompletion();
+ verifyPipelineExecution();
+ }
+ }
+
+ /**
+ * An exception thrown in case an abandoned {@link org.apache.beam.sdk.transforms.PTransform} is
+ * detected, that is, a {@link org.apache.beam.sdk.transforms.PTransform} that has not been run.
+ */
+ public static class AbandonedNodeException extends RuntimeException {
+
+ AbandonedNodeException(final String msg) {
+ super(msg);
+ }
+ }
+
+ /** An exception thrown in case a test finishes without invoking {@link Pipeline#run()}. */
+ public static class PipelineRunMissingException extends RuntimeException {
+
+ PipelineRunMissingException(final String msg) {
+ super(msg);
+ }
+ }
+
static final String PROPERTY_BEAM_TEST_PIPELINE_OPTIONS = "beamTestPipelineOptions";
static final String PROPERTY_USE_DEFAULT_DUMMY_RUNNER = "beamUseDummyRunner";
private static final ObjectMapper MAPPER = new ObjectMapper();
+ private PipelineRunEnforcement enforcement = new PipelineAbandonedNodeEnforcement(this);
+
/**
* Creates and returns a new test pipeline.
*
- * <p>Use {@link PAssert} to add tests, then call
- * {@link Pipeline#run} to execute the pipeline and check the tests.
+ * <p>Use {@link PAssert} to add tests, then call {@link Pipeline#run} to execute the pipeline and
+ * check the tests.
*/
public static TestPipeline create() {
return fromOptions(testingPipelineOptions());
@@ -98,16 +225,30 @@ public class TestPipeline extends Pipeline {
return new TestPipeline(PipelineRunner.fromOptions(options), options);
}
- private TestPipeline(PipelineRunner<? extends PipelineResult> runner, PipelineOptions options) {
+ private TestPipeline(
+ final PipelineRunner<? extends PipelineResult> runner, final PipelineOptions options) {
super(runner, options);
}
+ @Override
+ public Statement apply(final Statement statement, final Description description) {
+ return new Statement() {
+
+ @Override
+ public void evaluate() throws Throwable {
+ statement.evaluate();
+ enforcement.afterTestCompletion();
+ }
+ };
+ }
+
/**
- * Runs this {@link TestPipeline}, unwrapping any {@code AssertionError}
- * that is raised during testing.
+ * Runs this {@link TestPipeline}, unwrapping any {@code AssertionError} that is raised during
+ * testing.
*/
@Override
public PipelineResult run() {
+ enforcement.beforePipelineExecution();
try {
return super.run();
} catch (RuntimeException exc) {
@@ -120,18 +261,28 @@ public class TestPipeline extends Pipeline {
}
}
+ public TestPipeline enableAbandonedNodeEnforcement(final boolean enable) {
+ enforcement =
+ enable ? new PipelineAbandonedNodeEnforcement(this) : new PipelineRunEnforcement(this);
+
+ return this;
+ }
+
+ public TestPipeline enableAutoRunIfMissing(final boolean enable) {
+ enforcement.enableAutoRunIfMissing(enable);
+ return this;
+ }
+
@Override
public String toString() {
return "TestPipeline#" + getOptions().as(ApplicationNameOptions.class).getAppName();
}
- /**
- * Creates {@link PipelineOptions} for testing.
- */
+ /** Creates {@link PipelineOptions} for testing. */
public static PipelineOptions testingPipelineOptions() {
try {
- @Nullable String beamTestPipelineOptions =
- System.getProperty(PROPERTY_BEAM_TEST_PIPELINE_OPTIONS);
+ @Nullable
+ String beamTestPipelineOptions = System.getProperty(PROPERTY_BEAM_TEST_PIPELINE_OPTIONS);
PipelineOptions options =
Strings.isNullOrEmpty(beamTestPipelineOptions)
@@ -155,13 +306,15 @@ public class TestPipeline extends Pipeline {
IOChannelUtils.registerIOFactoriesAllowOverride(options);
return options;
} catch (IOException e) {
- throw new RuntimeException("Unable to instantiate test options from system property "
- + PROPERTY_BEAM_TEST_PIPELINE_OPTIONS + ":"
- + System.getProperty(PROPERTY_BEAM_TEST_PIPELINE_OPTIONS), e);
+ throw new RuntimeException(
+ "Unable to instantiate test options from system property "
+ + PROPERTY_BEAM_TEST_PIPELINE_OPTIONS
+ + ":"
+ + System.getProperty(PROPERTY_BEAM_TEST_PIPELINE_OPTIONS),
+ e);
}
}
-
public static String[] convertToArgs(PipelineOptions options) {
try {
byte[] opts = MAPPER.writeValueAsBytes(options);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/443b25a4/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
index 03563f3..d1797e7 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
@@ -24,30 +24,54 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.io.Serializable;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.UUID;
+import org.apache.beam.sdk.AggregatorRetrievalException;
+import org.apache.beam.sdk.AggregatorValues;
+import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.options.GcpOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.PCollection;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
+import org.joda.time.Duration;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
+import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Tests for {@link TestPipeline}. */
@RunWith(JUnit4.class)
-public class TestPipelineTest {
- @Rule public TestRule restoreSystemProperties = new RestoreSystemProperties();
- @Rule public ExpectedException thrown = ExpectedException.none();
+public class TestPipelineTest implements Serializable {
+ private static final List<String> WORDS = Collections.singletonList("hi there");
+ private static final String DUMMY = "expected";
+
+ private final transient TestPipeline pipeline =
+ TestPipeline.fromOptions(pipelineOptions()).enableAbandonedNodeEnforcement(true);
+
+ private final transient ExpectedException exception = ExpectedException.none();
+
+ @Rule public transient TestRule restoreSystemProperties = new RestoreSystemProperties();
+ @Rule public transient ExpectedException thrown = ExpectedException.none();
+ @Rule public transient RuleChain ruleOrder = RuleChain.outerRule(exception).around(pipeline);
@Test
public void testCreationUsingDefaults() {
@@ -57,13 +81,13 @@ public class TestPipelineTest {
@Test
public void testCreationOfPipelineOptions() throws Exception {
ObjectMapper mapper = new ObjectMapper();
- String stringOptions = mapper.writeValueAsString(new String[]{
- "--runner=org.apache.beam.sdk.testing.CrashingRunner",
- "--project=testProject"
- });
+ String stringOptions =
+ mapper.writeValueAsString(
+ new String[] {
+ "--runner=org.apache.beam.sdk.testing.CrashingRunner", "--project=testProject"
+ });
System.getProperties().put("beamTestPipelineOptions", stringOptions);
- GcpOptions options =
- TestPipeline.testingPipelineOptions().as(GcpOptions.class);
+ GcpOptions options = TestPipeline.testingPipelineOptions().as(GcpOptions.class);
assertEquals(CrashingRunner.class, options.getRunner());
assertEquals(options.getProject(), "testProject");
}
@@ -71,8 +95,10 @@ public class TestPipelineTest {
@Test
public void testCreationOfPipelineOptionsFromReallyVerboselyNamedTestCase() throws Exception {
PipelineOptions options = TestPipeline.testingPipelineOptions();
- assertThat(options.as(ApplicationNameOptions.class).getAppName(), startsWith(
- "TestPipelineTest-testCreationOfPipelineOptionsFromReallyVerboselyNamedTestCase"));
+ assertThat(
+ options.as(ApplicationNameOptions.class).getAppName(),
+ startsWith(
+ "TestPipelineTest-testCreationOfPipelineOptionsFromReallyVerboselyNamedTestCase"));
}
@Test
@@ -96,13 +122,13 @@ public class TestPipelineTest {
@Test
public void testConvertToArgs() {
- String[] args = new String[]{"--tempLocation=Test_Location"};
+ String[] args = new String[] {"--tempLocation=Test_Location"};
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(PipelineOptions.class);
String[] arr = TestPipeline.convertToArgs(options);
List<String> lst = Arrays.asList(arr);
assertEquals(lst.size(), 2);
- assertThat(lst, containsInAnyOrder("--tempLocation=Test_Location",
- "--appName=TestPipelineTest"));
+ assertThat(
+ lst, containsInAnyOrder("--tempLocation=Test_Location", "--appName=TestPipelineTest"));
}
@Test
@@ -131,8 +157,8 @@ public class TestPipelineTest {
opts.setOnSuccessMatcher(m2);
String[] arr = TestPipeline.convertToArgs(opts);
- TestPipelineOptions newOpts = PipelineOptionsFactory.fromArgs(arr)
- .as(TestPipelineOptions.class);
+ TestPipelineOptions newOpts =
+ PipelineOptionsFactory.fromArgs(arr).as(TestPipelineOptions.class);
assertEquals(m1, newOpts.getOnCreateMatcher());
assertEquals(m2, newOpts.getOnSuccessMatcher());
@@ -150,12 +176,11 @@ public class TestPipelineTest {
pipeline.run();
}
- /**
- * TestMatcher is a matcher designed for testing matcher serialization/deserialization.
- */
+ /** TestMatcher is a matcher designed for testing matcher serialization/deserialization. */
public static class TestMatcher extends BaseMatcher<PipelineResult>
implements SerializableMatcher<PipelineResult> {
private final UUID uuid = UUID.randomUUID();
+
@Override
public boolean matches(Object o) {
return true;
@@ -180,4 +205,124 @@ public class TestPipelineTest {
return uuid.hashCode();
}
}
+
+ private static class DummyRunner extends PipelineRunner<PipelineResult> {
+
+ @SuppressWarnings("unused") // used by reflection
+ public static DummyRunner fromOptions(final PipelineOptions opts) {
+ return new DummyRunner();
+ }
+
+ @Override
+ public PipelineResult run(final Pipeline pipeline) {
+ return new PipelineResult() {
+
+ @Override
+ public State getState() {
+ return null;
+ }
+
+ @Override
+ public State cancel() throws IOException {
+ return null;
+ }
+
+ @Override
+ public State waitUntilFinish(final Duration duration) {
+ return null;
+ }
+
+ @Override
+ public State waitUntilFinish() {
+ return null;
+ }
+
+ @Override
+ public <T> AggregatorValues<T> getAggregatorValues(final Aggregator<?, T> aggregator)
+ throws AggregatorRetrievalException {
+ return null;
+ }
+
+ @Override
+ public MetricResults metrics() {
+ return null;
+ }
+ };
+ }
+ }
+
+ private static PipelineOptions pipelineOptions() {
+ final PipelineOptions pipelineOptions = PipelineOptionsFactory.create();
+ pipelineOptions.setRunner(DummyRunner.class);
+ return pipelineOptions;
+ }
+
+ private PCollection<String> pCollection() {
+ return addTransform(pipeline.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of())));
+ }
+
+ private PCollection<String> addTransform(final PCollection<String> pCollection) {
+ return pCollection.apply(
+ MapElements.via(
+ new SimpleFunction<String, String>() {
+
+ @Override
+ public String apply(final String input) {
+ return DUMMY;
+ }
+ }));
+ }
+
+ @Test
+ public void testPipelineRunMissing() throws Throwable {
+ exception.expect(TestPipeline.PipelineRunMissingException.class);
+ PAssert.that(pCollection()).containsInAnyOrder(DUMMY);
+ // missing pipeline#run
+ }
+
+ @Test
+ public void testPipelineHasAbandonedPAssertNode() throws Throwable {
+ exception.expect(TestPipeline.AbandonedNodeException.class);
+ exception.expectMessage("PAssert");
+
+ final PCollection<String> pCollection = pCollection();
+ PAssert.that(pCollection).containsInAnyOrder(DUMMY);
+ pipeline.run().waitUntilFinish();
+
+ // dangling PAssert
+ PAssert.that(pCollection).containsInAnyOrder(DUMMY);
+ }
+
+ @Test
+ public void testPipelineHasAbandonedPTransformNode() throws Throwable {
+ exception.expect(TestPipeline.AbandonedNodeException.class);
+ exception.expectMessage("PTransform");
+
+ final PCollection<String> pCollection = pCollection();
+ PAssert.that(pCollection).containsInAnyOrder(DUMMY);
+ pipeline.run().waitUntilFinish();
+
+ // dangling PTransform
+ addTransform(pCollection);
+ }
+
+ @Test
+ public void testNormalFlowWithPAssert() throws Throwable {
+ PAssert.that(pCollection()).containsInAnyOrder(DUMMY);
+ pipeline.run().waitUntilFinish();
+ }
+
+ @Test
+ public void testAutoAddMissingRunFlow() throws Throwable {
+ PAssert.that(pCollection()).containsInAnyOrder(DUMMY);
+ // missing pipeline#run, but have it auto-added.
+ pipeline.enableAutoRunIfMissing(true);
+ }
+
+ @Test
+ public void testDisableStrictPAssertFlow() throws Throwable {
+ pCollection();
+ // dangling PTransform, but ignore it
+ pipeline.enableAbandonedNodeEnforcement(false);
+ }
}
[2/3] incubator-beam git commit: Empty TestPipeline need not be run
Posted by ke...@apache.org.
Empty TestPipeline need not be run
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0d54a4c6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0d54a4c6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0d54a4c6
Branch: refs/heads/master
Commit: 0d54a4c6708516170cb045b5e0d1d33443973c6e
Parents: 443b25a
Author: Kenneth Knowles <kl...@google.com>
Authored: Sat Dec 17 13:57:07 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sat Dec 17 14:12:23 2016 -0800
----------------------------------------------------------------------
.../apache/beam/sdk/testing/TestPipeline.java | 20 +++++++++++++++++++-
.../beam/sdk/testing/TestPipelineTest.java | 3 +++
2 files changed, 22 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0d54a4c6/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index 49ac3af..b707a81 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -169,7 +169,12 @@ public class TestPipeline extends Pipeline implements TestRule {
throw new AbandonedNodeException("The pipeline contains abandoned PTransform(s).");
}
} else if (runVisitedNodes == null && !enableAutoRunIfMissing) {
- throw new PipelineRunMissingException("The pipeline has not been run.");
+ IsEmptyVisitor isEmptyVisitor = new IsEmptyVisitor();
+ pipeline.traverseTopologically(isEmptyVisitor);
+
+ if (!isEmptyVisitor.isEmpty()) {
+ throw new PipelineRunMissingException("The pipeline has not been run.");
+ }
}
}
@@ -389,4 +394,17 @@ public class TestPipeline extends Pipeline implements TestRule {
}
return firstInstanceAfterTestPipeline;
}
+
+ private static class IsEmptyVisitor extends PipelineVisitor.Defaults {
+ private boolean empty = true;
+
+ public boolean isEmpty() {
+ return empty;
+ }
+
+ @Override
+ public void visitPrimitiveTransform(TransformHierarchy.Node node) {
+ empty = false;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0d54a4c6/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
index d1797e7..f484566 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
@@ -74,6 +74,9 @@ public class TestPipelineTest implements Serializable {
@Rule public transient RuleChain ruleOrder = RuleChain.outerRule(exception).around(pipeline);
@Test
+ public void testNoTestPipelineUsed() { }
+
+ @Test
public void testCreationUsingDefaults() {
assertNotNull(TestPipeline.create());
}
[3/3] incubator-beam git commit: This closes #1595,
makes TestPipeline a JUnit rule
Posted by ke...@apache.org.
This closes #1595, makes TestPipeline a JUnit rule
Empty TestPipeline need not be run
[BEAM-85, BEAM-298] Make TestPipeline a JUnit Rule checking proper usage
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1c9bf8d6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1c9bf8d6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1c9bf8d6
Branch: refs/heads/master
Commit: 1c9bf8d665979961478160c384b57ae42847e7b1
Parents: c28957d 0d54a4c
Author: Kenneth Knowles <kl...@google.com>
Authored: Sat Dec 17 14:12:37 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sat Dec 17 14:12:37 2016 -0800
----------------------------------------------------------------------
.../apache/beam/sdk/testing/TestPipeline.java | 225 ++++++++++++++++---
.../beam/sdk/testing/TestPipelineTest.java | 186 +++++++++++++--
2 files changed, 365 insertions(+), 46 deletions(-)
----------------------------------------------------------------------