You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/06/14 17:11:24 UTC

[7/8] incubator-beam git commit: Update the Default Pipeline Runner

Update the Default Pipeline Runner

Select the InProcessRunner if it is on the classpath, and throw an
exception otherwise.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a3ffd510
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a3ffd510
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a3ffd510

Branch: refs/heads/master
Commit: a3ffd510896626019723294931a4c3763faf43af
Parents: 816a3bf
Author: Thomas Groh <tg...@google.com>
Authored: Wed May 18 16:56:06 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Jun 14 09:57:17 2016 -0700

----------------------------------------------------------------------
 examples/java/pom.xml                           |  7 ++
 runners/google-cloud-dataflow-java/pom.xml      | 11 +++
 sdks/java/core/pom.xml                          |  3 +
 .../beam/sdk/options/PipelineOptions.java       | 31 +++++++-
 .../sdk/options/PipelineOptionsFactoryTest.java | 79 +++++++++++++++-----
 .../beam/sdk/options/PipelineOptionsTest.java   |  8 --
 .../options/PipelineOptionsValidatorTest.java   | 15 ++++
 .../sdk/runners/DirectPipelineRunnerTest.java   |  1 +
 .../beam/sdk/testing/TestPipelineTest.java      |  5 +-
 sdks/java/extensions/join-library/pom.xml       |  7 ++
 sdks/java/io/google-cloud-platform/pom.xml      |  7 ++
 sdks/java/io/hdfs/pom.xml                       |  7 ++
 sdks/java/io/kafka/pom.xml                      |  7 ++
 sdks/java/java8tests/pom.xml                    |  7 ++
 .../beam/sdk/transforms/WithKeysJava8Test.java  |  3 +-
 .../main/resources/archetype-resources/pom.xml  |  6 ++
 16 files changed, 170 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/examples/java/pom.xml
----------------------------------------------------------------------
diff --git a/examples/java/pom.xml b/examples/java/pom.xml
index 3d81338..5211b80 100644
--- a/examples/java/pom.xml
+++ b/examples/java/pom.xml
@@ -49,6 +49,7 @@
         <artifactId>maven-surefire-plugin</artifactId>
         <configuration>
           <systemPropertyVariables>
+            <beamUseDummyRunner></beamUseDummyRunner>
             <beamTestPipelineOptions>
             </beamTestPipelineOptions>
           </systemPropertyVariables>
@@ -213,6 +214,12 @@
 
     <dependency>
       <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-direct-java</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
       <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
       <version>${project.version}</version>
     </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index a6dfae3..6d8e94b 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -84,6 +84,17 @@
 
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <systemPropertyVariables>
+            <beamTestPipelineOptions></beamTestPipelineOptions>
+            <beamUseDummyRunner>true</beamUseDummyRunner>
+          </systemPropertyVariables>
+        </configuration>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-dependency-plugin</artifactId>
         <executions>
           <execution>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml
index 372a913..c559cff 100644
--- a/sdks/java/core/pom.xml
+++ b/sdks/java/core/pom.xml
@@ -129,6 +129,9 @@
           <excludedGroups>
             org.apache.beam.sdk.testing.NeedsRunner
           </excludedGroups>
+          <systemPropertyVariables>
+            <beamUseDummyRunner>true</beamUseDummyRunner>
+          </systemPropertyVariables>
         </configuration>
       </plugin>
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
index a2f38ed..b1b5280 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
@@ -21,11 +21,11 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.GoogleApiDebugOptions.GoogleApiTracer;
 import org.apache.beam.sdk.options.ProxyInvocationHandler.Deserializer;
 import org.apache.beam.sdk.options.ProxyInvocationHandler.Serializer;
-import org.apache.beam.sdk.runners.DirectPipelineRunner;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFn.Context;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
+
 import com.google.auto.service.AutoService;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
@@ -225,7 +225,7 @@ public interface PipelineOptions extends HasDisplayData {
   @Description("The pipeline runner that will be used to execute the pipeline. "
       + "For registered runners, the class name can be specified, otherwise the fully "
       + "qualified name needs to be specified.")
-  @Default.Class(DirectPipelineRunner.class)
+  @Default.InstanceFactory(DirectRunner.class)
   Class<? extends PipelineRunner<?>> getRunner();
   void setRunner(Class<? extends PipelineRunner<?>> kls);
 
@@ -262,4 +262,31 @@ public interface PipelineOptions extends HasDisplayData {
   @Description("A pipeline level default location for storing temporary files.")
   String getTempLocation();
   void setTempLocation(String value);
+
+  /**
+   * A {@link DefaultValueFactory} that obtains the class of the {@code DirectRunner} if it exists
+   * on the classpath, and throws an exception otherwise.
+   *
+   * <p>As the {@code DirectRunner} is in an independent module, it cannot be directly referenced
+   * as the {@link Default}. However, it should still be used if available, and a user is required
+   * to explicitly set the {@code --runner} property if they wish to use an alternative runner.
+   */
+  class DirectRunner implements DefaultValueFactory<Class<? extends PipelineRunner>> {
+    @Override
+    public Class<? extends PipelineRunner> create(PipelineOptions options) {
+      try {
+        @SuppressWarnings({"unchecked", "rawtypes"})
+        Class<? extends PipelineRunner> direct = (Class<? extends PipelineRunner>) Class.forName(
+            "org.apache.beam.runners.direct.InProcessPipelineRunner");
+        return direct;
+      } catch (ClassNotFoundException e) {
+        throw new IllegalArgumentException(String.format(
+            "No Runner was specified and the DirectRunner was not found on the classpath.%n"
+                + "Specify a runner by either:%n"
+                + "    Explicitly specifying a runner by providing the 'runner' property%n"
+                + "    Adding the DirectRunner to the classpath%n"
+                + "    Calling 'PipelineOptions.setRunner(PipelineRunner)' directly"));
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
index 62c6909..8b8337e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
@@ -29,11 +29,13 @@ import static org.junit.Assert.assertTrue;
 
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.runners.DirectPipelineRunner;
 import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
+import org.apache.beam.sdk.testing.CrashingRunner;
 import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.testing.RestoreSystemProperties;
 
+import com.google.auto.service.AutoService;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
@@ -59,8 +61,9 @@ import java.util.Set;
 /** Tests for {@link PipelineOptionsFactory}. */
 @RunWith(JUnit4.class)
 public class PipelineOptionsFactoryTest {
-  private static final Class<? extends PipelineRunner<?>> DEFAULT_RUNNER_CLASS =
-      DirectPipelineRunner.class;
+  private static final String DEFAULT_RUNNER_NAME = "DirectRunner";
+  private static final Class<? extends PipelineRunner> REGISTERED_RUNNER =
+      RegisteredTestRunner.class;
 
   @Rule public ExpectedException expectedException = ExpectedException.none();
   @Rule public TestRule restoreSystemProperties = new RestoreSystemProperties();
@@ -68,14 +71,13 @@ public class PipelineOptionsFactoryTest {
 
   @Test
   public void testAutomaticRegistrationOfPipelineOptions() {
-    assertTrue(PipelineOptionsFactory.getRegisteredOptions().contains(DirectPipelineOptions.class));
+    assertTrue(PipelineOptionsFactory.getRegisteredOptions().contains(RegisteredTestOptions.class));
   }
 
   @Test
   public void testAutomaticRegistrationOfRunners() {
-    assertEquals(
-        DEFAULT_RUNNER_CLASS,
-        PipelineOptionsFactory.getRegisteredRunners().get(DEFAULT_RUNNER_CLASS.getSimpleName()));
+    assertEquals(REGISTERED_RUNNER,
+        PipelineOptionsFactory.getRegisteredRunners().get(REGISTERED_RUNNER.getSimpleName()));
   }
 
   @Test
@@ -85,7 +87,7 @@ public class PipelineOptionsFactoryTest {
   }
 
   /** A simple test interface. */
-  public static interface TestPipelineOptions extends PipelineOptions {
+  public interface TestPipelineOptions extends PipelineOptions {
     String getTestPipelineOption();
     void setTestPipelineOption(String value);
   }
@@ -810,18 +812,18 @@ public class PipelineOptionsFactoryTest {
 
   @Test
   public void testSettingRunner() {
-    String[] args = new String[] {"--runner=DirectPipelineRunner"};
+    String[] args = new String[] {"--runner=" + RegisteredTestRunner.class.getSimpleName()};
 
     PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
-    assertEquals(DirectPipelineRunner.class, options.getRunner());
+    assertEquals(RegisteredTestRunner.class, options.getRunner());
   }
 
   @Test
   public void testSettingRunnerFullName() {
     String[] args =
-        new String[] {String.format("--runner=%s", DirectPipelineRunner.class.getName())};
+        new String[] {String.format("--runner=%s", CrashingRunner.class.getName())};
     PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create();
-    assertEquals(opts.getRunner(), DirectPipelineRunner.class);
+    assertEquals(opts.getRunner(), CrashingRunner.class);
   }
 
 
@@ -832,7 +834,7 @@ public class PipelineOptionsFactoryTest {
     expectedException.expectMessage(
         "Unknown 'runner' specified 'UnknownRunner', supported " + "pipeline runners");
     Set<String> registeredRunners = PipelineOptionsFactory.getRegisteredRunners().keySet();
-    assertThat(registeredRunners, hasItem(DEFAULT_RUNNER_CLASS.getSimpleName()));
+    assertThat(registeredRunners, hasItem(REGISTERED_RUNNER.getSimpleName()));
     for (String registeredRunner : registeredRunners) {
       expectedException.expectMessage(registeredRunner);
     }
@@ -923,7 +925,7 @@ public class PipelineOptionsFactoryTest {
   public void testEmptyArgumentIsIgnored() {
     String[] args =
         new String[] {
-          "", "--string=100", "", "", "--runner=" + DEFAULT_RUNNER_CLASS.getSimpleName()
+          "", "--string=100", "", "", "--runner=" + REGISTERED_RUNNER.getSimpleName()
         };
     PipelineOptionsFactory.fromArgs(args).as(Objects.class);
   }
@@ -932,7 +934,7 @@ public class PipelineOptionsFactoryTest {
   public void testNullArgumentIsIgnored() {
     String[] args =
         new String[] {
-          "--string=100", null, null, "--runner=" + DEFAULT_RUNNER_CLASS.getSimpleName()
+          "--string=100", null, null, "--runner=" + REGISTERED_RUNNER.getSimpleName()
         };
     PipelineOptionsFactory.fromArgs(args).as(Objects.class);
   }
@@ -985,7 +987,7 @@ public class PipelineOptionsFactoryTest {
     String output = new String(baos.toByteArray());
     assertThat(output, containsString("org.apache.beam.sdk.options.PipelineOptions"));
     assertThat(output, containsString("--runner"));
-    assertThat(output, containsString("Default: " + DEFAULT_RUNNER_CLASS.getSimpleName()));
+    assertThat(output, containsString("Default: " + DEFAULT_RUNNER_NAME));
     assertThat(output,
         containsString("The pipeline runner that will be used to execute the pipeline."));
   }
@@ -1000,7 +1002,7 @@ public class PipelineOptionsFactoryTest {
     String output = new String(baos.toByteArray());
     assertThat(output, containsString("org.apache.beam.sdk.options.PipelineOptions"));
     assertThat(output, containsString("--runner"));
-    assertThat(output, containsString("Default: " + DEFAULT_RUNNER_CLASS.getSimpleName()));
+    assertThat(output, containsString("Default: " + DEFAULT_RUNNER_NAME));
     assertThat(output,
         containsString("The pipeline runner that will be used to execute the pipeline."));
   }
@@ -1015,7 +1017,7 @@ public class PipelineOptionsFactoryTest {
     String output = new String(baos.toByteArray());
     assertThat(output, containsString("org.apache.beam.sdk.options.PipelineOptions"));
     assertThat(output, containsString("--runner"));
-    assertThat(output, containsString("Default: " + DEFAULT_RUNNER_CLASS.getSimpleName()));
+    assertThat(output, containsString("Default: " + DEFAULT_RUNNER_NAME));
     assertThat(output,
         containsString("The pipeline runner that will be used to execute the pipeline."));
   }
@@ -1109,7 +1111,7 @@ public class PipelineOptionsFactoryTest {
     String output = new String(baos.toByteArray());
     assertThat(output, containsString("org.apache.beam.sdk.options.PipelineOptions"));
     assertThat(output, containsString("--runner"));
-    assertThat(output, containsString("Default: " + DEFAULT_RUNNER_CLASS.getSimpleName()));
+    assertThat(output, containsString("Default: " + DEFAULT_RUNNER_NAME));
     assertThat(output,
         containsString("The pipeline runner that will be used to execute the pipeline."));
   }
@@ -1147,4 +1149,43 @@ public class PipelineOptionsFactoryTest {
     thread.join();
     assertEquals(cl, classLoader[0]);
   }
+
+  private static class RegisteredTestRunner extends PipelineRunner<PipelineResult> {
+    public static PipelineRunner fromOptions(PipelineOptions options) {
+      return new RegisteredTestRunner();
+    }
+
+    public PipelineResult run(Pipeline p) {
+      throw new IllegalArgumentException();
+    }
+  }
+
+
+  /**
+   * A {@link PipelineRunnerRegistrar} to demonstrate default {@link PipelineRunner} registration.
+   */
+  @AutoService(PipelineRunnerRegistrar.class)
+  public static class RegisteredTestRunnerRegistrar implements PipelineRunnerRegistrar {
+    @Override
+    public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
+      return ImmutableList.<Class<? extends PipelineRunner<?>>>of(RegisteredTestRunner.class);
+    }
+  }
+
+  private interface RegisteredTestOptions extends PipelineOptions {
+    Object getRegisteredExampleFooBar();
+    void setRegisteredExampleFooBar(Object registeredExampleFooBar);
+  }
+
+
+  /**
+   * A {@link PipelineOptionsRegistrar} to demonstrate default {@link PipelineOptions} registration.
+   */
+  @AutoService(PipelineOptionsRegistrar.class)
+  public static class RegisteredTestOptionsRegistrar implements PipelineOptionsRegistrar {
+    @Override
+    public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+      return ImmutableList.<Class<? extends PipelineOptions>>of(RegisteredTestOptions.class);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java
index dfda528..687271c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java
@@ -17,15 +17,12 @@
  */
 package org.apache.beam.sdk.options;
 
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
-import org.apache.beam.sdk.runners.DirectPipelineRunner;
-
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
@@ -86,11 +83,6 @@ public class PipelineOptionsTest {
   }
 
   @Test
-  public void testDefaultRunnerIsSet() {
-    assertEquals(DirectPipelineRunner.class, PipelineOptionsFactory.create().getRunner());
-  }
-
-  @Test
   public void testCloneAs() throws IOException {
     DerivedTestOptions options = PipelineOptionsFactory.create().as(DerivedTestOptions.class);
     options.setBaseValue(Lists.<Boolean>newArrayList());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsValidatorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsValidatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsValidatorTest.java
index 0250bd1..2b684a8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsValidatorTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsValidatorTest.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.options;
 
+import org.apache.beam.sdk.testing.CrashingRunner;
+
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -40,6 +42,7 @@ public class PipelineOptionsValidatorTest {
   @Test
   public void testWhenRequiredOptionIsSet() {
     Required required = PipelineOptionsFactory.as(Required.class);
+    required.setRunner(CrashingRunner.class);
     required.setObject("blah");
     PipelineOptionsValidator.validate(Required.class, required);
   }
@@ -114,6 +117,7 @@ public class PipelineOptionsValidatorTest {
     GroupRequired groupRequired = PipelineOptionsFactory.as(GroupRequired.class);
     groupRequired.setFoo("foo");
     groupRequired.setBar(null);
+    groupRequired.setRunner(CrashingRunner.class);
 
     PipelineOptionsValidator.validate(GroupRequired.class, groupRequired);
 
@@ -126,6 +130,7 @@ public class PipelineOptionsValidatorTest {
   @Test
   public void testWhenNoneOfRequiredGroupIsSetThrowsException() {
     GroupRequired groupRequired = PipelineOptionsFactory.as(GroupRequired.class);
+    groupRequired.setRunner(CrashingRunner.class);
 
     expectedException.expect(IllegalArgumentException.class);
     expectedException.expectMessage("Missing required value for group [ham]");
@@ -155,6 +160,7 @@ public class PipelineOptionsValidatorTest {
   public void testWhenOneOfMultipleRequiredGroupsIsSetIsValid() {
     MultiGroupRequired multiGroupRequired = PipelineOptionsFactory.as(MultiGroupRequired.class);
 
+    multiGroupRequired.setRunner(CrashingRunner.class);
     multiGroupRequired.setFoo("eggs");
 
     PipelineOptionsValidator.validate(MultiGroupRequired.class, multiGroupRequired);
@@ -194,6 +200,7 @@ public class PipelineOptionsValidatorTest {
   public void testWhenOptionIsDefinedInMultipleSuperInterfacesAndIsNotPresentFailsRequirement() {
     RightOptions rightOptions = PipelineOptionsFactory.as(RightOptions.class);
     rightOptions.setBoth("foo");
+    rightOptions.setRunner(CrashingRunner.class);
 
     expectedException.expect(IllegalArgumentException.class);
     expectedException.expectMessage("Missing required value for group");
@@ -212,6 +219,8 @@ public class PipelineOptionsValidatorTest {
     leftOpts.setFoo("Untrue");
     leftOpts.setBoth("Raise the");
 
+    rightOpts.setRunner(CrashingRunner.class);
+    leftOpts.setRunner(CrashingRunner.class);
     PipelineOptionsValidator.validate(JoinedOptions.class, rightOpts);
     PipelineOptionsValidator.validate(JoinedOptions.class, leftOpts);
   }
@@ -226,6 +235,8 @@ public class PipelineOptionsValidatorTest {
     leftOpts.setFoo("Untrue");
     leftOpts.setBoth("Raise the");
 
+    rightOpts.setRunner(CrashingRunner.class);
+    leftOpts.setRunner(CrashingRunner.class);
     PipelineOptionsValidator.validate(RightOptions.class, leftOpts);
     PipelineOptionsValidator.validate(LeftOptions.class, rightOpts);
   }
@@ -234,6 +245,7 @@ public class PipelineOptionsValidatorTest {
   public void testWhenOptionIsDefinedOnMultipleInterfacesOnlyListedOnceWhenNotPresent() {
     JoinedOptions options = PipelineOptionsFactory.as(JoinedOptions.class);
     options.setFoo("Hello");
+    options.setRunner(CrashingRunner.class);
 
     expectedException.expect(IllegalArgumentException.class);
     expectedException.expectMessage("required value for group [both]");
@@ -273,6 +285,7 @@ public class PipelineOptionsValidatorTest {
   public void testSuperInterfaceRequiredOptionsAlsoRequiredInSubInterface() {
     SubOptions subOpts = PipelineOptionsFactory.as(SubOptions.class);
     subOpts.setFoo("Bar");
+    subOpts.setRunner(CrashingRunner.class);
 
     expectedException.expect(IllegalArgumentException.class);
     expectedException.expectMessage("otherSuper");
@@ -288,6 +301,7 @@ public class PipelineOptionsValidatorTest {
     SubOptions opts = PipelineOptionsFactory.as(SubOptions.class);
     opts.setFoo("Foo");
     opts.setSuperclassObj("Hello world");
+    opts.setRunner(CrashingRunner.class);
 
     // Valid SubOptions, but invalid SuperOptions
     PipelineOptionsValidator.validate(SubOptions.class, opts);
@@ -304,6 +318,7 @@ public class PipelineOptionsValidatorTest {
     subOpts.setFoo("bar");
     subOpts.setBar("bar");
     subOpts.setSuperclassObj("SuperDuper");
+    subOpts.setRunner(CrashingRunner.class);
 
     PipelineOptionsValidator.validate(SubOptions.class, subOpts);
     PipelineOptionsValidator.validate(SuperOptions.class, subOpts);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java
index ae3b4e0..edf6996 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java
@@ -68,6 +68,7 @@ public class DirectPipelineRunnerTest implements Serializable {
   @Test
   public void testToString() {
     PipelineOptions options = PipelineOptionsFactory.create();
+    options.setRunner(DirectPipelineRunner.class);
     DirectPipelineRunner runner = DirectPipelineRunner.fromOptions(options);
     assertEquals("DirectPipelineRunner#" + runner.hashCode(),
         runner.toString());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
index b741e2e..043c06c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
@@ -28,7 +28,6 @@ import org.apache.beam.sdk.options.ApplicationNameOptions;
 import org.apache.beam.sdk.options.GcpOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.runners.DirectPipelineRunner;
 import org.apache.beam.sdk.transforms.Create;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -62,13 +61,13 @@ public class TestPipelineTest {
   public void testCreationOfPipelineOptions() throws Exception {
     ObjectMapper mapper = new ObjectMapper();
     String stringOptions = mapper.writeValueAsString(new String[]{
-      "--runner=DirectPipelineRunner",
+      "--runner=org.apache.beam.sdk.testing.CrashingRunner",
       "--project=testProject"
     });
     System.getProperties().put("beamTestPipelineOptions", stringOptions);
     GcpOptions options =
         TestPipeline.testingPipelineOptions().as(GcpOptions.class);
-    assertEquals(DirectPipelineRunner.class, options.getRunner());
+    assertEquals(CrashingRunner.class, options.getRunner());
     assertEquals(options.getProject(), "testProject");
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/extensions/join-library/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/join-library/pom.xml b/sdks/java/extensions/join-library/pom.xml
index 828fcce..0fec148 100644
--- a/sdks/java/extensions/join-library/pom.xml
+++ b/sdks/java/extensions/join-library/pom.xml
@@ -63,6 +63,13 @@
 
     <!-- Dependency for tests -->
     <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-direct-java</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
       <groupId>org.hamcrest</groupId>
       <artifactId>hamcrest-all</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/io/google-cloud-platform/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/pom.xml b/sdks/java/io/google-cloud-platform/pom.xml
index 962c7b0..f567261 100644
--- a/sdks/java/io/google-cloud-platform/pom.xml
+++ b/sdks/java/io/google-cloud-platform/pom.xml
@@ -88,6 +88,13 @@
     </dependency>
 
     <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-direct-java</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
       <groupId>org.hamcrest</groupId>
       <artifactId>hamcrest-all</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/io/hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/pom.xml b/sdks/java/io/hdfs/pom.xml
index e99dbd7..9c30792 100644
--- a/sdks/java/io/hdfs/pom.xml
+++ b/sdks/java/io/hdfs/pom.xml
@@ -66,6 +66,13 @@
 
     <!-- test dependencies -->
     <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-direct-java</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
       <groupId>org.hamcrest</groupId>
       <artifactId>hamcrest-all</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/io/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/pom.xml b/sdks/java/io/kafka/pom.xml
index be19a83..76c0eb6 100644
--- a/sdks/java/io/kafka/pom.xml
+++ b/sdks/java/io/kafka/pom.xml
@@ -71,6 +71,13 @@
 
     <!-- test dependencies-->
     <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-direct-java</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
       <groupId>org.hamcrest</groupId>
       <artifactId>hamcrest-all</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/java8tests/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/java8tests/pom.xml b/sdks/java/java8tests/pom.xml
index 48bf682..8e20228 100644
--- a/sdks/java/java8tests/pom.xml
+++ b/sdks/java/java8tests/pom.xml
@@ -96,6 +96,13 @@
     </dependency>
 
     <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-direct-java</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java
index a0d1a63..1ffb147 100644
--- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java
+++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.transforms;
 
-import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -65,7 +64,7 @@ public class WithKeysJava8Test {
 
     values.apply("ApplyKeysWithWithKeys", WithKeys.of((String s) -> Integer.valueOf(s)));
 
-    thrown.expect(PipelineExecutionException.class);
+    thrown.expect(IllegalStateException.class);
     thrown.expectMessage("Unable to return a default Coder for ApplyKeysWithWithKeys");
     thrown.expectMessage("Cannot provide a coder for type variable K");
     thrown.expectMessage("the actual type is unknown due to erasure.");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
index d86b9cc..2b2e24b 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
@@ -101,6 +101,12 @@
     <!-- Adds a dependency on a specific version of the Dataflow runnner. -->
     <dependency>
       <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-direct-java</artifactId>
+      <version>[0-incubating, 2-incubating)</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
       <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
       <version>[0-incubating, 2-incubating)</version>
     </dependency>