You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/06/20 22:16:12 UTC

[19/50] [abbrv] incubator-beam git commit: Rename InProcessPipelineRunner to DirectRunner

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java
index ba9815b..1fec9d8 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.runners.direct;
 
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.util.WindowedValue;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
index f2d577e..b12a34c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.runners.direct;
 
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.DoFn;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
index 81d2520..dfc1753 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
@@ -19,7 +19,7 @@ package org.apache.beam.runners.direct;
 
 import org.apache.beam.runners.direct.InProcessGroupByKey.InProcessGroupAlsoByWindow;
 import org.apache.beam.runners.direct.InProcessGroupByKey.InProcessGroupByKeyOnly;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
index fb637b4..4dd1475 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
@@ -19,7 +19,7 @@ package org.apache.beam.runners.direct;
 
 import static com.google.common.base.Preconditions.checkState;
 
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.util.WindowedValue;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
index fceb20c..5030730 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
@@ -17,8 +17,8 @@
  */
 package org.apache.beam.runners.direct;
 
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.io.Read.Unbounded;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
index ffaf3fa..f4260f5 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.runners.direct;
 
-import org.apache.beam.runners.direct.InProcessPipelineRunner.PCollectionViewWriter;
+import org.apache.beam.runners.direct.DirectRunner.PCollectionViewWriter;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
@@ -36,7 +36,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 /**
- * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the
+ * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the
  * {@link CreatePCollectionView} primitive {@link PTransform}.
  *
  * <p>The {@link ViewEvaluatorFactory} produces {@link TransformEvaluator TransformEvaluators} for
@@ -49,7 +49,7 @@ class ViewEvaluatorFactory implements TransformEvaluatorFactory {
   @Override
   public <T> TransformEvaluator<T> forApplication(
       AppliedPTransform<?, ?, ?> application,
-      InProcessPipelineRunner.CommittedBundle<?> inputBundle,
+      DirectRunner.CommittedBundle<?> inputBundle,
       InProcessEvaluationContext evaluationContext) {
     @SuppressWarnings({"cast", "unchecked", "rawtypes"})
     TransformEvaluator<T> evaluator = createEvaluator(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
index 628f94d..89866cc 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
@@ -17,8 +17,8 @@
  */
 package org.apache.beam.runners.direct;
 
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -36,7 +36,7 @@ import java.util.Collection;
 import javax.annotation.Nullable;
 
 /**
- * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the
+ * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the
  * {@link Bound Window.Bound} primitive {@link PTransform}.
  */
 class WindowEvaluatorFactory implements TransformEvaluatorFactory {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactoryTest.java
index c0c1361..d94113a 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactoryTest.java
@@ -114,7 +114,7 @@ public class AvroIOShardedWriteFactoryTest {
 
   private Pipeline getPipeline() {
     PipelineOptions options = TestPipeline.testingPipelineOptions();
-    options.setRunner(InProcessPipelineRunner.class);
+    options.setRunner(DirectRunner.class);
     return TestPipeline.fromOptions(options);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
index bcdc089..e26f860 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
@@ -25,8 +25,8 @@ import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.when;
 
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.coders.BigEndianLongCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.BoundedSource;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
index 0d1b464..4969a30 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
@@ -60,24 +60,24 @@ public class CommittedResultTest implements Serializable {
     CommittedResult result =
         CommittedResult.create(StepTransformResult.withoutHold(transform).build(),
             bundleFactory.createRootBundle(created).commit(Instant.now()),
-            Collections.<InProcessPipelineRunner.CommittedBundle<?>>emptyList());
+            Collections.<DirectRunner.CommittedBundle<?>>emptyList());
 
     assertThat(result.getTransform(), Matchers.<AppliedPTransform<?, ?, ?>>equalTo(transform));
   }
 
   @Test
   public void getUncommittedElementsEqualInput() {
-    InProcessPipelineRunner.CommittedBundle<Integer> bundle =
+    DirectRunner.CommittedBundle<Integer> bundle =
         bundleFactory.createRootBundle(created)
             .add(WindowedValue.valueInGlobalWindow(2))
             .commit(Instant.now());
     CommittedResult result =
         CommittedResult.create(StepTransformResult.withoutHold(transform).build(),
             bundle,
-            Collections.<InProcessPipelineRunner.CommittedBundle<?>>emptyList());
+            Collections.<DirectRunner.CommittedBundle<?>>emptyList());
 
     assertThat(result.getUnprocessedInputs(),
-        Matchers.<InProcessPipelineRunner.CommittedBundle<?>>equalTo(bundle));
+        Matchers.<DirectRunner.CommittedBundle<?>>equalTo(bundle));
   }
 
   @Test
@@ -85,14 +85,14 @@ public class CommittedResultTest implements Serializable {
     CommittedResult result =
         CommittedResult.create(StepTransformResult.withoutHold(transform).build(),
             null,
-            Collections.<InProcessPipelineRunner.CommittedBundle<?>>emptyList());
+            Collections.<DirectRunner.CommittedBundle<?>>emptyList());
 
     assertThat(result.getUnprocessedInputs(), nullValue());
   }
 
   @Test
   public void getOutputsEqualInput() {
-    List<? extends InProcessPipelineRunner.CommittedBundle<?>> outputs =
+    List<? extends DirectRunner.CommittedBundle<?>> outputs =
         ImmutableList.of(bundleFactory.createRootBundle(PCollection.createPrimitiveOutputInternal(p,
             WindowingStrategy.globalDefault(),
             PCollection.IsBounded.BOUNDED)).commit(Instant.now()),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRegistrarTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRegistrarTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRegistrarTest.java
new file mode 100644
index 0000000..cd44b7e
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRegistrarTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import org.apache.beam.runners.direct.DirectRegistrar.InProcessRunner;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.ServiceLoader;
+
+/** Tests for {@link InProcessRunner}. */
+@RunWith(JUnit4.class)
+public class DirectRegistrarTest {
+  @Test
+  public void testCorrectOptionsAreReturned() {
+    assertEquals(
+        ImmutableList.of(DirectOptions.class),
+        new DirectRegistrar.InProcessOptions().getPipelineOptions());
+  }
+
+  @Test
+  public void testCorrectRunnersAreReturned() {
+    assertEquals(
+        ImmutableList.of(DirectRunner.class),
+        new DirectRegistrar.InProcessRunner().getPipelineRunners());
+  }
+
+  @Test
+  public void testServiceLoaderForOptions() {
+    for (PipelineOptionsRegistrar registrar :
+        Lists.newArrayList(ServiceLoader.load(PipelineOptionsRegistrar.class).iterator())) {
+      if (registrar instanceof DirectRegistrar.InProcessOptions) {
+        return;
+      }
+    }
+    fail("Expected to find " + DirectRegistrar.InProcessOptions.class);
+  }
+
+  @Test
+  public void testServiceLoaderForRunner() {
+    for (PipelineRunnerRegistrar registrar :
+        Lists.newArrayList(ServiceLoader.load(PipelineRunnerRegistrar.class).iterator())) {
+      if (registrar instanceof DirectRegistrar.InProcessRunner) {
+        return;
+      }
+    }
+    fail("Expected to find " + DirectRegistrar.InProcessRunner.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
new file mode 100644
index 0000000..1de38df
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.IllegalMutationException;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+import com.google.common.collect.ImmutableMap;
+
+import com.fasterxml.jackson.annotation.JsonValue;
+
+import org.hamcrest.Matchers;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.internal.matchers.ThrowableMessageMatcher;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Tests for basic {@link DirectRunner} functionality.
+ */
+@RunWith(JUnit4.class)
+public class DirectRunnerTest implements Serializable {
+  @Rule public transient ExpectedException thrown = ExpectedException.none();
+
+  private Pipeline getPipeline() {
+    PipelineOptions opts = PipelineOptionsFactory.create();
+    opts.setRunner(DirectRunner.class);
+
+    Pipeline p = Pipeline.create(opts);
+    return p;
+  }
+
+  @Test
+  public void defaultRunnerLoaded() {
+    assertThat(DirectRunner.class,
+        Matchers.<Class<? extends PipelineRunner>>equalTo(PipelineOptionsFactory.create()
+            .getRunner()));
+  }
+
+  @Test
+  public void wordCountShouldSucceed() throws Throwable {
+    Pipeline p = getPipeline();
+
+    PCollection<KV<String, Long>> counts =
+        p.apply(Create.of("foo", "bar", "foo", "baz", "bar", "foo"))
+            .apply(MapElements.via(new SimpleFunction<String, String>() {
+              @Override
+              public String apply(String input) {
+                return input;
+              }
+            }))
+            .apply(Count.<String>perElement());
+    PCollection<String> countStrs =
+        counts.apply(MapElements.via(new SimpleFunction<KV<String, Long>, String>() {
+          @Override
+          public String apply(KV<String, Long> input) {
+            String str = String.format("%s: %s", input.getKey(), input.getValue());
+            return str;
+          }
+        }));
+
+    PAssert.that(countStrs).containsInAnyOrder("baz: 1", "bar: 2", "foo: 3");
+
+    DirectPipelineResult result = ((DirectPipelineResult) p.run());
+    result.awaitCompletion();
+  }
+
+  @Test(timeout = 5000L)
+  public void byteArrayCountShouldSucceed() {
+    Pipeline p = getPipeline();
+
+    SerializableFunction<Integer, byte[]> getBytes = new SerializableFunction<Integer, byte[]>() {
+      @Override
+      public byte[] apply(Integer input) {
+        try {
+          return CoderUtils.encodeToByteArray(VarIntCoder.of(), input);
+        } catch (CoderException e) {
+          fail("Unexpected Coder Exception " + e);
+          throw new AssertionError("Unreachable");
+        }
+      }
+    };
+    TypeDescriptor<byte[]> td = new TypeDescriptor<byte[]>() {
+    };
+    PCollection<byte[]> foos =
+        p.apply(Create.of(1, 1, 1, 2, 2, 3)).apply(MapElements.via(getBytes).withOutputType(td));
+    PCollection<byte[]> msync =
+        p.apply(Create.of(1, -2, -8, -16)).apply(MapElements.via(getBytes).withOutputType(td));
+    PCollection<byte[]> bytes =
+        PCollectionList.of(foos).and(msync).apply(Flatten.<byte[]>pCollections());
+    PCollection<KV<byte[], Long>> counts = bytes.apply(Count.<byte[]>perElement());
+    PCollection<KV<Integer, Long>> countsBackToString =
+        counts.apply(MapElements.via(new SimpleFunction<KV<byte[], Long>, KV<Integer, Long>>() {
+          @Override
+          public KV<Integer, Long> apply(KV<byte[], Long> input) {
+            try {
+              return KV.of(CoderUtils.decodeFromByteArray(VarIntCoder.of(), input.getKey()),
+                  input.getValue());
+            } catch (CoderException e) {
+              fail("Unexpected Coder Exception " + e);
+              throw new AssertionError("Unreachable");
+        }
+      }
+    }));
+
+    Map<Integer, Long> expected = ImmutableMap.<Integer, Long>builder().put(1, 4L)
+        .put(2, 2L)
+        .put(3, 1L)
+        .put(-2, 1L)
+        .put(-8, 1L)
+        .put(-16, 1L)
+        .build();
+    PAssert.thatMap(countsBackToString).isEqualTo(expected);
+  }
+
+  @Test
+  public void transformDisplayDataExceptionShouldFail() {
+    DoFn<Integer, Integer> brokenDoFn = new DoFn<Integer, Integer>() {
+      @Override
+      public void processElement(ProcessContext c) throws Exception {}
+
+      @Override
+      public void populateDisplayData(DisplayData.Builder builder) {
+        throw new RuntimeException("oh noes!");
+      }
+    };
+
+    Pipeline p = getPipeline();
+    p
+        .apply(Create.of(1, 2, 3))
+        .apply(ParDo.of(brokenDoFn));
+
+    thrown.expectMessage(brokenDoFn.getClass().getName());
+    thrown.expectCause(ThrowableMessageMatcher.hasMessage(is("oh noes!")));
+    p.run();
+  }
+
+  @Test
+  public void pipelineOptionsDisplayDataExceptionShouldFail() {
+    Object brokenValueType = new Object() {
+      @JsonValue
+      public int getValue () {
+        return 42;
+      }
+
+      @Override
+      public String toString() {
+        throw new RuntimeException("oh noes!!");
+      }
+    };
+
+    Pipeline p = getPipeline();
+    p.getOptions().as(ObjectPipelineOptions.class).setValue(brokenValueType);
+
+    p.apply(Create.of(1, 2, 3));
+
+    thrown.expectMessage(PipelineOptions.class.getName());
+    thrown.expectCause(ThrowableMessageMatcher.hasMessage(is("oh noes!!")));
+    p.run();
+  }
+
+  /** {@link PipelineOptions} to inject bad object implementations. */
+  public interface ObjectPipelineOptions extends PipelineOptions {
+    Object getValue();
+    void setValue(Object value);
+  }
+
+
+  /**
+   * Tests that a {@link DoFn} that mutates an output with a good equals() fails in the
+   * {@link DirectRunner}.
+   */
+  @Test
+  public void testMutatingOutputThenOutputDoFnError() throws Exception {
+    Pipeline pipeline = getPipeline();
+
+    pipeline
+        .apply(Create.of(42))
+        .apply(ParDo.of(new DoFn<Integer, List<Integer>>() {
+          @Override public void processElement(ProcessContext c) {
+            List<Integer> outputList = Arrays.asList(1, 2, 3, 4);
+            c.output(outputList);
+            outputList.set(0, 37);
+            c.output(outputList);
+          }
+        }));
+
+    thrown.expect(IllegalMutationException.class);
+    thrown.expectMessage("output");
+    thrown.expectMessage("must not be mutated");
+    pipeline.run();
+  }
+
+  /**
+   * Tests that a {@link DoFn} that mutates an output with a good equals() fails in the
+   * {@link DirectRunner}.
+   */
+  @Test
+  public void testMutatingOutputThenTerminateDoFnError() throws Exception {
+    Pipeline pipeline = getPipeline();
+
+    pipeline
+        .apply(Create.of(42))
+        .apply(ParDo.of(new DoFn<Integer, List<Integer>>() {
+          @Override public void processElement(ProcessContext c) {
+            List<Integer> outputList = Arrays.asList(1, 2, 3, 4);
+            c.output(outputList);
+            outputList.set(0, 37);
+          }
+        }));
+
+    thrown.expect(IllegalMutationException.class);
+    thrown.expectMessage("output");
+    thrown.expectMessage("must not be mutated");
+    pipeline.run();
+  }
+
+  /**
+   * Tests that a {@link DoFn} that mutates an output with a bad equals() still fails
+   * in the {@link DirectRunner}.
+   */
+  @Test
+  public void testMutatingOutputCoderDoFnError() throws Exception {
+    Pipeline pipeline = getPipeline();
+
+    pipeline
+        .apply(Create.of(42))
+        .apply(ParDo.of(new DoFn<Integer, byte[]>() {
+          @Override public void processElement(ProcessContext c) {
+            byte[] outputArray = new byte[]{0x1, 0x2, 0x3};
+            c.output(outputArray);
+            outputArray[0] = 0xa;
+            c.output(outputArray);
+          }
+        }));
+
+    thrown.expect(IllegalMutationException.class);
+    thrown.expectMessage("output");
+    thrown.expectMessage("must not be mutated");
+    pipeline.run();
+  }
+
+  /**
+   * Tests that a {@link DoFn} that mutates its input with a good equals() fails in the
+   * {@link DirectRunner}.
+   */
+  @Test
+  public void testMutatingInputDoFnError() throws Exception {
+    Pipeline pipeline = getPipeline();
+
+    pipeline
+        .apply(Create.of(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6))
+            .withCoder(ListCoder.of(VarIntCoder.of())))
+        .apply(ParDo.of(new DoFn<List<Integer>, Integer>() {
+          @Override public void processElement(ProcessContext c) {
+            List<Integer> inputList = c.element();
+            inputList.set(0, 37);
+            c.output(12);
+          }
+        }));
+
+    thrown.expect(IllegalMutationException.class);
+    thrown.expectMessage("Input");
+    thrown.expectMessage("must not be mutated");
+    pipeline.run();
+  }
+
+  /**
+   * Tests that a {@link DoFn} that mutates an input with a bad equals() still fails
+   * in the {@link DirectRunner}.
+   */
+  @Test
+  public void testMutatingInputCoderDoFnError() throws Exception {
+    Pipeline pipeline = getPipeline();
+
+    pipeline
+        .apply(Create.of(new byte[]{0x1, 0x2, 0x3}, new byte[]{0x4, 0x5, 0x6}))
+        .apply(ParDo.of(new DoFn<byte[], Integer>() {
+          @Override public void processElement(ProcessContext c) {
+            byte[] inputArray = c.element();
+            inputArray[0] = 0xa;
+            c.output(13);
+          }
+        }));
+
+    thrown.expect(IllegalMutationException.class);
+    thrown.expectMessage("Input");
+    thrown.expectMessage("must not be mutated");
+    pipeline.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactoryTest.java
index 9a358dd..e129489 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactoryTest.java
@@ -19,7 +19,7 @@ package org.apache.beam.runners.direct;
 
 import static org.hamcrest.Matchers.isA;
 
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
index 66a5106..5efb090 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
@@ -23,8 +23,8 @@ import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Create;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
index a4f900c..b589db0 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
@@ -22,8 +22,8 @@ import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
index 20670ca..c4da86c 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
@@ -20,8 +20,8 @@ package org.apache.beam.runners.direct;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertThat;
 
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.TestPipeline;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
index 6cef60d..ead9c9e 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.runners.direct;
 
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Count;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java
index af08d02..21c941a 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java
@@ -29,8 +29,8 @@ import org.apache.beam.runners.direct.InMemoryWatermarkManager.FiredTimers;
 import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate;
 import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate.TimerUpdateBuilder;
 import org.apache.beam.runners.direct.InMemoryWatermarkManager.TransformWatermarks;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarLongCoder;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessBundleFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessBundleFactoryTest.java
index abe2a19..3a3ac8c 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessBundleFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessBundleFactoryTest.java
@@ -21,8 +21,8 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertThat;
 
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java
index 18db400..b1cbeb1 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java
@@ -28,9 +28,9 @@ import static org.junit.Assert.assertThat;
 import org.apache.beam.runners.direct.InMemoryWatermarkManager.FiredTimers;
 import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate;
 import org.apache.beam.runners.direct.InProcessExecutionContext.InProcessStepContext;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.PCollectionViewWriter;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.PCollectionViewWriter;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
@@ -99,8 +99,8 @@ public class InProcessEvaluationContextTest {
 
   @Before
   public void setup() {
-    InProcessPipelineRunner runner =
-        InProcessPipelineRunner.fromOptions(PipelineOptionsFactory.create());
+    DirectRunner runner =
+        DirectRunner.fromOptions(PipelineOptionsFactory.create());
 
     p = TestPipeline.create();
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactoryTest.java
index 28a3cf6..e8d4711 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactoryTest.java
@@ -22,8 +22,8 @@ import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRegistrarTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRegistrarTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRegistrarTest.java
deleted file mode 100644
index 54094c4..0000000
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRegistrarTest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import org.apache.beam.runners.direct.InProcessRegistrar.InProcessRunner;
-import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
-import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.util.ServiceLoader;
-
-/** Tests for {@link InProcessRunner}. */
-@RunWith(JUnit4.class)
-public class InProcessPipelineRegistrarTest {
-  @Test
-  public void testCorrectOptionsAreReturned() {
-    assertEquals(
-        ImmutableList.of(InProcessPipelineOptions.class),
-        new InProcessRegistrar.InProcessOptions().getPipelineOptions());
-  }
-
-  @Test
-  public void testCorrectRunnersAreReturned() {
-    assertEquals(
-        ImmutableList.of(InProcessPipelineRunner.class),
-        new InProcessRegistrar.InProcessRunner().getPipelineRunners());
-  }
-
-  @Test
-  public void testServiceLoaderForOptions() {
-    for (PipelineOptionsRegistrar registrar :
-        Lists.newArrayList(ServiceLoader.load(PipelineOptionsRegistrar.class).iterator())) {
-      if (registrar instanceof InProcessRegistrar.InProcessOptions) {
-        return;
-      }
-    }
-    fail("Expected to find " + InProcessRegistrar.InProcessOptions.class);
-  }
-
-  @Test
-  public void testServiceLoaderForRunner() {
-    for (PipelineRunnerRegistrar registrar :
-        Lists.newArrayList(ServiceLoader.load(PipelineRunnerRegistrar.class).iterator())) {
-      if (registrar instanceof InProcessRegistrar.InProcessRunner) {
-        return;
-      }
-    }
-    fail("Expected to find " + InProcessRegistrar.InProcessRunner.class);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java
deleted file mode 100644
index ab26c15..0000000
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java
+++ /dev/null
@@ -1,339 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
-
-import org.apache.beam.runners.direct.InProcessPipelineRunner.InProcessPipelineResult;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.IllegalMutationException;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionList;
-import org.apache.beam.sdk.values.TypeDescriptor;
-
-import com.google.common.collect.ImmutableMap;
-
-import com.fasterxml.jackson.annotation.JsonValue;
-
-import org.hamcrest.Matchers;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.internal.matchers.ThrowableMessageMatcher;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Tests for basic {@link InProcessPipelineRunner} functionality.
- */
-@RunWith(JUnit4.class)
-public class InProcessPipelineRunnerTest implements Serializable {
-  @Rule public transient ExpectedException thrown = ExpectedException.none();
-
-  private Pipeline getPipeline() {
-    PipelineOptions opts = PipelineOptionsFactory.create();
-    opts.setRunner(InProcessPipelineRunner.class);
-
-    Pipeline p = Pipeline.create(opts);
-    return p;
-  }
-
-  @Test
-  public void defaultRunnerLoaded() {
-    assertThat(InProcessPipelineRunner.class,
-        Matchers.<Class<? extends PipelineRunner>>equalTo(PipelineOptionsFactory.create()
-            .getRunner()));
-  }
-
-  @Test
-  public void wordCountShouldSucceed() throws Throwable {
-    Pipeline p = getPipeline();
-
-    PCollection<KV<String, Long>> counts =
-        p.apply(Create.of("foo", "bar", "foo", "baz", "bar", "foo"))
-            .apply(MapElements.via(new SimpleFunction<String, String>() {
-              @Override
-              public String apply(String input) {
-                return input;
-              }
-            }))
-            .apply(Count.<String>perElement());
-    PCollection<String> countStrs =
-        counts.apply(MapElements.via(new SimpleFunction<KV<String, Long>, String>() {
-          @Override
-          public String apply(KV<String, Long> input) {
-            String str = String.format("%s: %s", input.getKey(), input.getValue());
-            return str;
-          }
-        }));
-
-    PAssert.that(countStrs).containsInAnyOrder("baz: 1", "bar: 2", "foo: 3");
-
-    InProcessPipelineResult result = ((InProcessPipelineResult) p.run());
-    result.awaitCompletion();
-  }
-
-  @Test(timeout = 5000L)
-  public void byteArrayCountShouldSucceed() {
-    Pipeline p = getPipeline();
-
-    SerializableFunction<Integer, byte[]> getBytes = new SerializableFunction<Integer, byte[]>() {
-      @Override
-      public byte[] apply(Integer input) {
-        try {
-          return CoderUtils.encodeToByteArray(VarIntCoder.of(), input);
-        } catch (CoderException e) {
-          fail("Unexpected Coder Exception " + e);
-          throw new AssertionError("Unreachable");
-        }
-      }
-    };
-    TypeDescriptor<byte[]> td = new TypeDescriptor<byte[]>() {
-    };
-    PCollection<byte[]> foos =
-        p.apply(Create.of(1, 1, 1, 2, 2, 3)).apply(MapElements.via(getBytes).withOutputType(td));
-    PCollection<byte[]> msync =
-        p.apply(Create.of(1, -2, -8, -16)).apply(MapElements.via(getBytes).withOutputType(td));
-    PCollection<byte[]> bytes =
-        PCollectionList.of(foos).and(msync).apply(Flatten.<byte[]>pCollections());
-    PCollection<KV<byte[], Long>> counts = bytes.apply(Count.<byte[]>perElement());
-    PCollection<KV<Integer, Long>> countsBackToString =
-        counts.apply(MapElements.via(new SimpleFunction<KV<byte[], Long>, KV<Integer, Long>>() {
-          @Override
-          public KV<Integer, Long> apply(KV<byte[], Long> input) {
-            try {
-              return KV.of(CoderUtils.decodeFromByteArray(VarIntCoder.of(), input.getKey()),
-                  input.getValue());
-            } catch (CoderException e) {
-              fail("Unexpected Coder Exception " + e);
-              throw new AssertionError("Unreachable");
-        }
-      }
-    }));
-
-    Map<Integer, Long> expected = ImmutableMap.<Integer, Long>builder().put(1, 4L)
-        .put(2, 2L)
-        .put(3, 1L)
-        .put(-2, 1L)
-        .put(-8, 1L)
-        .put(-16, 1L)
-        .build();
-    PAssert.thatMap(countsBackToString).isEqualTo(expected);
-  }
-
-  @Test
-  public void transformDisplayDataExceptionShouldFail() {
-    DoFn<Integer, Integer> brokenDoFn = new DoFn<Integer, Integer>() {
-      @Override
-      public void processElement(ProcessContext c) throws Exception {}
-
-      @Override
-      public void populateDisplayData(DisplayData.Builder builder) {
-        throw new RuntimeException("oh noes!");
-      }
-    };
-
-    Pipeline p = getPipeline();
-    p
-        .apply(Create.of(1, 2, 3))
-        .apply(ParDo.of(brokenDoFn));
-
-    thrown.expectMessage(brokenDoFn.getClass().getName());
-    thrown.expectCause(ThrowableMessageMatcher.hasMessage(is("oh noes!")));
-    p.run();
-  }
-
-  @Test
-  public void pipelineOptionsDisplayDataExceptionShouldFail() {
-    Object brokenValueType = new Object() {
-      @JsonValue
-      public int getValue () {
-        return 42;
-      }
-
-      @Override
-      public String toString() {
-        throw new RuntimeException("oh noes!!");
-      }
-    };
-
-    Pipeline p = getPipeline();
-    p.getOptions().as(ObjectPipelineOptions.class).setValue(brokenValueType);
-
-    p.apply(Create.of(1, 2, 3));
-
-    thrown.expectMessage(PipelineOptions.class.getName());
-    thrown.expectCause(ThrowableMessageMatcher.hasMessage(is("oh noes!!")));
-    p.run();
-  }
-
-  /** {@link PipelineOptions} to inject bad object implementations. */
-  public interface ObjectPipelineOptions extends PipelineOptions {
-    Object getValue();
-    void setValue(Object value);
-  }
-
-
-  /**
-   * Tests that a {@link DoFn} that mutates an output with a good equals() fails in the
-   * {@link InProcessPipelineRunner}.
-   */
-  @Test
-  public void testMutatingOutputThenOutputDoFnError() throws Exception {
-    Pipeline pipeline = getPipeline();
-
-    pipeline
-        .apply(Create.of(42))
-        .apply(ParDo.of(new DoFn<Integer, List<Integer>>() {
-          @Override public void processElement(ProcessContext c) {
-            List<Integer> outputList = Arrays.asList(1, 2, 3, 4);
-            c.output(outputList);
-            outputList.set(0, 37);
-            c.output(outputList);
-          }
-        }));
-
-    thrown.expect(IllegalMutationException.class);
-    thrown.expectMessage("output");
-    thrown.expectMessage("must not be mutated");
-    pipeline.run();
-  }
-
-  /**
-   * Tests that a {@link DoFn} that mutates an output with a good equals() fails in the
-   * {@link InProcessPipelineRunner}.
-   */
-  @Test
-  public void testMutatingOutputThenTerminateDoFnError() throws Exception {
-    Pipeline pipeline = getPipeline();
-
-    pipeline
-        .apply(Create.of(42))
-        .apply(ParDo.of(new DoFn<Integer, List<Integer>>() {
-          @Override public void processElement(ProcessContext c) {
-            List<Integer> outputList = Arrays.asList(1, 2, 3, 4);
-            c.output(outputList);
-            outputList.set(0, 37);
-          }
-        }));
-
-    thrown.expect(IllegalMutationException.class);
-    thrown.expectMessage("output");
-    thrown.expectMessage("must not be mutated");
-    pipeline.run();
-  }
-
-  /**
-   * Tests that a {@link DoFn} that mutates an output with a bad equals() still fails
-   * in the {@link InProcessPipelineRunner}.
-   */
-  @Test
-  public void testMutatingOutputCoderDoFnError() throws Exception {
-    Pipeline pipeline = getPipeline();
-
-    pipeline
-        .apply(Create.of(42))
-        .apply(ParDo.of(new DoFn<Integer, byte[]>() {
-          @Override public void processElement(ProcessContext c) {
-            byte[] outputArray = new byte[]{0x1, 0x2, 0x3};
-            c.output(outputArray);
-            outputArray[0] = 0xa;
-            c.output(outputArray);
-          }
-        }));
-
-    thrown.expect(IllegalMutationException.class);
-    thrown.expectMessage("output");
-    thrown.expectMessage("must not be mutated");
-    pipeline.run();
-  }
-
-  /**
-   * Tests that a {@link DoFn} that mutates its input with a good equals() fails in the
-   * {@link InProcessPipelineRunner}.
-   */
-  @Test
-  public void testMutatingInputDoFnError() throws Exception {
-    Pipeline pipeline = getPipeline();
-
-    pipeline
-        .apply(Create.of(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6))
-            .withCoder(ListCoder.of(VarIntCoder.of())))
-        .apply(ParDo.of(new DoFn<List<Integer>, Integer>() {
-          @Override public void processElement(ProcessContext c) {
-            List<Integer> inputList = c.element();
-            inputList.set(0, 37);
-            c.output(12);
-          }
-        }));
-
-    thrown.expect(IllegalMutationException.class);
-    thrown.expectMessage("Input");
-    thrown.expectMessage("must not be mutated");
-    pipeline.run();
-  }
-
-  /**
-   * Tests that a {@link DoFn} that mutates an input with a bad equals() still fails
-   * in the {@link InProcessPipelineRunner}.
-   */
-  @Test
-  public void testMutatingInputCoderDoFnError() throws Exception {
-    Pipeline pipeline = getPipeline();
-
-    pipeline
-        .apply(Create.of(new byte[]{0x1, 0x2, 0x3}, new byte[]{0x4, 0x5, 0x6}))
-        .apply(ParDo.of(new DoFn<byte[], Integer>() {
-          @Override public void processElement(ProcessContext c) {
-            byte[] inputArray = c.element();
-            inputArray[0] = 0xa;
-            c.output(13);
-          }
-        }));
-
-    thrown.expect(IllegalMutationException.class);
-    thrown.expectMessage("Input");
-    thrown.expectMessage("must not be mutated");
-    pipeline.run();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoInProcessEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoInProcessEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoInProcessEvaluatorTest.java
index 0f7afa1..b78eb40 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoInProcessEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoInProcessEvaluatorTest.java
@@ -25,8 +25,8 @@ import static org.mockito.Mockito.when;
 
 import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate;
 import org.apache.beam.runners.direct.InProcessExecutionContext.InProcessStepContext;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Create;
@@ -139,7 +139,7 @@ public class ParDoInProcessEvaluatorTest {
   private ParDoInProcessEvaluator<Integer> createEvaluator(
       PCollectionView<Integer> singletonView,
       RecorderFn fn,
-      InProcessPipelineRunner.CommittedBundle<Integer> inputBundle,
+      DirectRunner.CommittedBundle<Integer> inputBundle,
       PCollection<Integer> output) {
     when(
             evaluationContext.createSideInputReader(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
index a6f31c0..e61881e 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
@@ -26,8 +26,8 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
index a1480e5..8b8d44f 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
@@ -26,8 +26,8 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TextIOShardedWriteFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TextIOShardedWriteFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TextIOShardedWriteFactoryTest.java
index fe9866c..5ede931 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TextIOShardedWriteFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TextIOShardedWriteFactoryTest.java
@@ -114,7 +114,7 @@ public class TextIOShardedWriteFactoryTest {
 
   private Pipeline getPipeline() {
     PipelineOptions options = TestPipeline.testingPipelineOptions();
-    options.setRunner(InProcessPipelineRunner.class);
+    options.setRunner(DirectRunner.class);
     return TestPipeline.fromOptions(options);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
index 0345662..a5e6cee 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
@@ -27,7 +27,7 @@ import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.when;
 
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
index 05656eb..be5c489 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
@@ -25,8 +25,8 @@ import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.BigEndianLongCoder;
 import org.apache.beam.sdk.coders.Coder;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
index 859418b..714e9c9 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
@@ -23,8 +23,8 @@ import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.PCollectionViewWriter;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.PCollectionViewWriter;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VoidCoder;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
index a2f971a..8a3591b 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
@@ -21,8 +21,8 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.when;
 
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
index 01f3070..5fdfb49 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
@@ -21,7 +21,7 @@ package org.apache.beam.runners.spark.translation;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertThat;
 
-import org.apache.beam.runners.direct.InProcessPipelineRunner;
+import org.apache.beam.runners.direct.DirectRunner;
 import org.apache.beam.runners.spark.SparkPipelineRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.TextIO;
@@ -58,7 +58,7 @@ public class TransformTranslatorTest {
    */
   @Test
   public void testTextIOReadAndWriteTransforms() throws IOException {
-    String directOut = runPipeline(InProcessPipelineRunner.class);
+    String directOut = runPipeline(DirectRunner.class);
     String sparkOut = runPipeline(SparkPipelineRunner.class);
 
     List<String> directOutput =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
index b1b5280..456b6ae 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
@@ -277,7 +277,7 @@ public interface PipelineOptions extends HasDisplayData {
       try {
         @SuppressWarnings({"unchecked", "rawtypes"})
         Class<? extends PipelineRunner> direct = (Class<? extends PipelineRunner>) Class.forName(
-            "org.apache.beam.runners.direct.InProcessPipelineRunner");
+            "org.apache.beam.runners.direct.DirectRunner");
         return direct;
       } catch (ClassNotFoundException e) {
         throw new IllegalArgumentException(String.format(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/testing/travis/test_wordcount.sh
----------------------------------------------------------------------
diff --git a/testing/travis/test_wordcount.sh b/testing/travis/test_wordcount.sh
index b00b0d6..e059a35 100755
--- a/testing/travis/test_wordcount.sh
+++ b/testing/travis/test_wordcount.sh
@@ -70,7 +70,7 @@ function run_via_mvn {
   local outfile_prefix="$(get_outfile_prefix "$name")" || exit 2
   local cmd='mvn exec:java -f pom.xml -pl examples/java \
     -Dexec.mainClass=org.apache.beam.examples.WordCount \
-    -Dexec.args="--runner=InProcessPipelineRunner --inputFile='"$input"' --output='"$outfile_prefix"'"'
+    -Dexec.args="--runner=DirectRunner --inputFile='"$input"' --output='"$outfile_prefix"'"'
   echo "$name: Running $cmd" >&2
   sh -c "$cmd"
   check_result_hash "$name" "$outfile_prefix" "$expected_hash"
@@ -84,7 +84,7 @@ function run_bundled {
   local outfile_prefix="$(get_outfile_prefix "$name")" || exit 2
   local cmd='java -cp '"$JAR_FILE"' \
     org.apache.beam.examples.WordCount \
-    --runner=InProcessPipelineRunner \
+    --runner=DirectRunner \
     --inputFile='"'$input'"' \
     --output='"$outfile_prefix"
   echo "$name: Running $cmd" >&2