You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/05/20 01:44:54 UTC
incubator-beam git commit: Add CrashingRunner for use in TestPipeline
Repository: incubator-beam
Updated Branches:
refs/heads/master 402fb70c4 -> dc98211cc
Add CrashingRunner for use in TestPipeline
CrashingRunner is a PipelineRunner that crashes on calls to run() with
an IllegalArgumentException. As a runner is currently required to
construct a Pipeline object, this allows removal of all Pipeline Runners
from the core SDK while retaining tests that depend only on the graph
construction behavior.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dc98211c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dc98211c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dc98211c
Branch: refs/heads/master
Commit: dc98211ccf17e94afb03ba51992c731684f855fa
Parents: 402fb70
Author: Thomas Groh <tg...@google.com>
Authored: Wed May 18 13:37:13 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu May 19 18:44:49 2016 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/testing/CrashingRunner.java | 72 +++++++++++++++++++
.../apache/beam/sdk/testing/TestPipeline.java | 10 ++-
.../beam/sdk/testing/CrashingRunnerTest.java | 76 ++++++++++++++++++++
.../beam/sdk/testing/TestPipelineTest.java | 17 ++++-
4 files changed, 172 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dc98211c/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CrashingRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CrashingRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CrashingRunner.java
new file mode 100644
index 0000000..975facc
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CrashingRunner.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.testing;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.runners.AggregatorRetrievalException;
+import org.apache.beam.sdk.runners.AggregatorValues;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.Aggregator;
+
+/**
+ * A {@link PipelineRunner} that applies no overrides and throws an exception on calls to
+ * {@link Pipeline#run()}. For use in {@link TestPipeline} to construct but not execute pipelines.
+ */
+public class CrashingRunner extends PipelineRunner<PipelineResult>{
+
+ public static CrashingRunner fromOptions(PipelineOptions opts) {
+ return new CrashingRunner();
+ }
+
+ @Override
+ public PipelineResult run(Pipeline pipeline) {
+ throw new IllegalArgumentException(String.format("Cannot call #run(Pipeline) on an instance "
+ + "of %s. %s should only be used as the default to construct a Pipeline "
+ + "using %s, and cannot execute Pipelines. Instead, specify a %s "
+ + "by providing PipelineOptions in the environment variable '%s'.",
+ getClass().getSimpleName(),
+ getClass().getSimpleName(),
+ TestPipeline.class.getSimpleName(),
+ PipelineRunner.class.getSimpleName(),
+ TestPipeline.PROPERTY_BEAM_TEST_PIPELINE_OPTIONS));
+ }
+
+ private static class TestPipelineResult implements PipelineResult {
+ private TestPipelineResult() {
+ // Should never be instantiated by the enclosing class
+ throw new UnsupportedOperationException(String.format("Forbidden to instantiate %s",
+ getClass().getSimpleName()));
+ }
+
+ @Override
+ public State getState() {
+ throw new UnsupportedOperationException(String.format("Forbidden to instantiate %s",
+ getClass().getSimpleName()));
+ }
+
+ @Override
+ public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator)
+ throws AggregatorRetrievalException {
+ throw new AssertionError(String.format("Forbidden to instantiate %s",
+ getClass().getSimpleName()));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dc98211c/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index a4921d5..4618e33 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -84,7 +84,8 @@ import javax.annotation.Nullable;
* containing the message from the {@link PAssert} that failed.
*/
public class TestPipeline extends Pipeline {
- private static final String PROPERTY_BEAM_TEST_PIPELINE_OPTIONS = "beamTestPipelineOptions";
+ static final String PROPERTY_BEAM_TEST_PIPELINE_OPTIONS = "beamTestPipelineOptions";
+ static final String PROPERTY_USE_DEFAULT_DUMMY_RUNNER = "beamUseDummyRunner";
private static final ObjectMapper MAPPER = new ObjectMapper();
/**
@@ -145,8 +146,13 @@ public class TestPipeline extends Pipeline {
.as(TestPipelineOptions.class);
options.as(ApplicationNameOptions.class).setAppName(getAppName());
- // If no options were specified, use a test credential object on all pipelines.
+ // If no options were specified, set some reasonable defaults
if (Strings.isNullOrEmpty(beamTestPipelineOptions)) {
+ // If there are no provided options, check to see if a dummy runner should be used.
+ String useDefaultDummy = System.getProperty(PROPERTY_USE_DEFAULT_DUMMY_RUNNER);
+ if (!Strings.isNullOrEmpty(useDefaultDummy) && Boolean.valueOf(useDefaultDummy)) {
+ options.setRunner(CrashingRunner.class);
+ }
options.as(GcpOptions.class).setGcpCredential(new TestCredential());
}
options.setStableUniqueNames(CheckEnabled.ERROR);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dc98211c/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CrashingRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CrashingRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CrashingRunnerTest.java
new file mode 100644
index 0000000..041a73a
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CrashingRunnerTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.testing;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.Create;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link CrashingRunner}.
+ */
+@RunWith(JUnit4.class)
+public class CrashingRunnerTest {
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void fromOptionsCreatesInstance() {
+ PipelineOptions opts = PipelineOptionsFactory.create();
+ opts.setRunner(CrashingRunner.class);
+ PipelineRunner<? extends PipelineResult> runner = PipelineRunner.fromOptions(opts);
+
+ assertTrue("Should have created a CrashingRunner", runner instanceof CrashingRunner);
+ }
+
+ @Test
+ public void applySucceeds() {
+ PipelineOptions opts = PipelineOptionsFactory.create();
+ opts.setRunner(CrashingRunner.class);
+
+ Pipeline p = Pipeline.create(opts);
+ p.apply(Create.of(1, 2, 3));
+ }
+
+ @Test
+ public void runThrows() {
+ PipelineOptions opts = PipelineOptionsFactory.create();
+ opts.setRunner(CrashingRunner.class);
+
+ Pipeline p = Pipeline.create(opts);
+ p.apply(Create.of(1, 2, 3));
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Cannot call #run");
+ thrown.expectMessage(TestPipeline.PROPERTY_BEAM_TEST_PIPELINE_OPTIONS);
+
+ p.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dc98211c/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
index 8af4ff2..b741e2e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
@@ -17,8 +17,8 @@
*/
package org.apache.beam.sdk.testing;
-import static org.hamcrest.CoreMatchers.startsWith;
import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.startsWith;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
@@ -29,6 +29,7 @@ import org.apache.beam.sdk.options.GcpOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.DirectPipelineRunner;
+import org.apache.beam.sdk.transforms.Create;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -36,6 +37,7 @@ import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import org.junit.rules.TestRule;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -49,6 +51,7 @@ import java.util.UUID;
@RunWith(JUnit4.class)
public class TestPipelineTest {
@Rule public TestRule restoreSystemProperties = new RestoreSystemProperties();
+ @Rule public ExpectedException thrown = ExpectedException.none();
@Test
public void testCreationUsingDefaults() {
@@ -139,6 +142,18 @@ public class TestPipelineTest {
assertEquals(m2, newOpts.getOnSuccessMatcher());
}
+ @Test
+ public void testRunWithDummyEnvironmentVariableFails() {
+ System.getProperties()
+ .setProperty(TestPipeline.PROPERTY_USE_DEFAULT_DUMMY_RUNNER, Boolean.toString(true));
+ TestPipeline pipeline = TestPipeline.create();
+ pipeline.apply(Create.of(1, 2, 3));
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Cannot call #run");
+ pipeline.run();
+ }
+
/**
* TestMatcher is a matcher designed for testing matcher serialization/deserialization.
*/