You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/05/20 01:44:54 UTC

incubator-beam git commit: Add CrashingRunner for use in TestPipeline

Repository: incubator-beam
Updated Branches:
  refs/heads/master 402fb70c4 -> dc98211cc


Add CrashingRunner for use in TestPipeline

CrashingRunner is a PipelineRunner that crashes on calls to run() with
an IllegalArgumentException. As a runner is currently required to
construct a Pipeline object, this allows removal of all Pipeline Runners
from the core SDK while retaining tests that depend only on the graph
construction behavior.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dc98211c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dc98211c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dc98211c

Branch: refs/heads/master
Commit: dc98211ccf17e94afb03ba51992c731684f855fa
Parents: 402fb70
Author: Thomas Groh <tg...@google.com>
Authored: Wed May 18 13:37:13 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu May 19 18:44:49 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/testing/CrashingRunner.java | 72 +++++++++++++++++++
 .../apache/beam/sdk/testing/TestPipeline.java   | 10 ++-
 .../beam/sdk/testing/CrashingRunnerTest.java    | 76 ++++++++++++++++++++
 .../beam/sdk/testing/TestPipelineTest.java      | 17 ++++-
 4 files changed, 172 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dc98211c/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CrashingRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CrashingRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CrashingRunner.java
new file mode 100644
index 0000000..975facc
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CrashingRunner.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.testing;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.runners.AggregatorRetrievalException;
+import org.apache.beam.sdk.runners.AggregatorValues;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.Aggregator;
+
+/**
+ * A {@link PipelineRunner} that applies no overrides and throws an exception on calls to
+ * {@link Pipeline#run()}. For use in {@link TestPipeline} to construct but not execute pipelines.
+ */
+public class CrashingRunner extends PipelineRunner<PipelineResult>{
+
+  public static CrashingRunner fromOptions(PipelineOptions opts) {
+    return new CrashingRunner();
+  }
+
+  @Override
+  public PipelineResult run(Pipeline pipeline) {
+    throw new IllegalArgumentException(String.format("Cannot call #run(Pipeline) on an instance "
+            + "of %s. %s should only be used as the default to construct a Pipeline "
+            + "using %s, and cannot execute Pipelines. Instead, specify a %s "
+            + "by providing PipelineOptions in the environment variable '%s'.",
+        getClass().getSimpleName(),
+        getClass().getSimpleName(),
+        TestPipeline.class.getSimpleName(),
+        PipelineRunner.class.getSimpleName(),
+        TestPipeline.PROPERTY_BEAM_TEST_PIPELINE_OPTIONS));
+  }
+
+  private static class TestPipelineResult implements PipelineResult {
+    private TestPipelineResult() {
+      // Should never be instantiated by the enclosing class
+      throw new UnsupportedOperationException(String.format("Forbidden to instantiate %s",
+          getClass().getSimpleName()));
+    }
+
+    @Override
+    public State getState() {
+      throw new UnsupportedOperationException(String.format("Forbidden to instantiate %s",
+          getClass().getSimpleName()));
+    }
+
+    @Override
+    public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator)
+        throws AggregatorRetrievalException {
+      throw new AssertionError(String.format("Forbidden to instantiate %s",
+          getClass().getSimpleName()));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dc98211c/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index a4921d5..4618e33 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -84,7 +84,8 @@ import javax.annotation.Nullable;
  * containing the message from the {@link PAssert} that failed.
  */
 public class TestPipeline extends Pipeline {
-  private static final String PROPERTY_BEAM_TEST_PIPELINE_OPTIONS = "beamTestPipelineOptions";
+  static final String PROPERTY_BEAM_TEST_PIPELINE_OPTIONS = "beamTestPipelineOptions";
+  static final String PROPERTY_USE_DEFAULT_DUMMY_RUNNER = "beamUseDummyRunner";
   private static final ObjectMapper MAPPER = new ObjectMapper();
 
   /**
@@ -145,8 +146,13 @@ public class TestPipeline extends Pipeline {
                   .as(TestPipelineOptions.class);
 
       options.as(ApplicationNameOptions.class).setAppName(getAppName());
-      // If no options were specified, use a test credential object on all pipelines.
+      // If no options were specified, set some reasonable defaults
       if (Strings.isNullOrEmpty(beamTestPipelineOptions)) {
+        // If there are no provided options, check to see if a dummy runner should be used.
+        String useDefaultDummy = System.getProperty(PROPERTY_USE_DEFAULT_DUMMY_RUNNER);
+        if (!Strings.isNullOrEmpty(useDefaultDummy) && Boolean.valueOf(useDefaultDummy)) {
+          options.setRunner(CrashingRunner.class);
+        }
         options.as(GcpOptions.class).setGcpCredential(new TestCredential());
       }
       options.setStableUniqueNames(CheckEnabled.ERROR);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dc98211c/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CrashingRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CrashingRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CrashingRunnerTest.java
new file mode 100644
index 0000000..041a73a
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CrashingRunnerTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.testing;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.Create;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link CrashingRunner}.
+ */
+@RunWith(JUnit4.class)
+public class CrashingRunnerTest {
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void fromOptionsCreatesInstance() {
+    PipelineOptions opts = PipelineOptionsFactory.create();
+    opts.setRunner(CrashingRunner.class);
+    PipelineRunner<? extends PipelineResult> runner = PipelineRunner.fromOptions(opts);
+
+    assertTrue("Should have created a CrashingRunner", runner instanceof CrashingRunner);
+  }
+
+  @Test
+  public void applySucceeds() {
+    PipelineOptions opts = PipelineOptionsFactory.create();
+    opts.setRunner(CrashingRunner.class);
+
+    Pipeline p = Pipeline.create(opts);
+    p.apply(Create.of(1, 2, 3));
+  }
+
+  @Test
+  public void runThrows() {
+    PipelineOptions opts = PipelineOptionsFactory.create();
+    opts.setRunner(CrashingRunner.class);
+
+    Pipeline p = Pipeline.create(opts);
+    p.apply(Create.of(1, 2, 3));
+
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot call #run");
+    thrown.expectMessage(TestPipeline.PROPERTY_BEAM_TEST_PIPELINE_OPTIONS);
+
+    p.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dc98211c/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
index 8af4ff2..b741e2e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
@@ -17,8 +17,8 @@
  */
 package org.apache.beam.sdk.testing;
 
-import static org.hamcrest.CoreMatchers.startsWith;
 import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.startsWith;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
@@ -29,6 +29,7 @@ import org.apache.beam.sdk.options.GcpOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.DirectPipelineRunner;
+import org.apache.beam.sdk.transforms.Create;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 
@@ -36,6 +37,7 @@ import org.hamcrest.BaseMatcher;
 import org.hamcrest.Description;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.junit.rules.TestRule;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -49,6 +51,7 @@ import java.util.UUID;
 @RunWith(JUnit4.class)
 public class TestPipelineTest {
   @Rule public TestRule restoreSystemProperties = new RestoreSystemProperties();
+  @Rule public ExpectedException thrown = ExpectedException.none();
 
   @Test
   public void testCreationUsingDefaults() {
@@ -139,6 +142,18 @@ public class TestPipelineTest {
     assertEquals(m2, newOpts.getOnSuccessMatcher());
   }
 
+  @Test
+  public void testRunWithDummyEnvironmentVariableFails() {
+    System.getProperties()
+        .setProperty(TestPipeline.PROPERTY_USE_DEFAULT_DUMMY_RUNNER, Boolean.toString(true));
+    TestPipeline pipeline = TestPipeline.create();
+    pipeline.apply(Create.of(1, 2, 3));
+
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot call #run");
+    pipeline.run();
+  }
+
   /**
    * TestMatcher is a matcher designed for testing matcher serialization/deserialization.
    */