You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/06/20 22:16:12 UTC
[19/50] [abbrv] incubator-beam git commit: Rename
InProcessPipelineRunner to DirectRunner
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java
index ba9815b..1fec9d8 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java
@@ -17,7 +17,7 @@
*/
package org.apache.beam.runners.direct;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.util.WindowedValue;
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
index f2d577e..b12a34c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
@@ -17,7 +17,7 @@
*/
package org.apache.beam.runners.direct;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.DoFn;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
index 81d2520..dfc1753 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
@@ -19,7 +19,7 @@ package org.apache.beam.runners.direct;
import org.apache.beam.runners.direct.InProcessGroupByKey.InProcessGroupAlsoByWindow;
import org.apache.beam.runners.direct.InProcessGroupByKey.InProcessGroupByKeyOnly;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
index fb637b4..4dd1475 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
@@ -19,7 +19,7 @@ package org.apache.beam.runners.direct;
import static com.google.common.base.Preconditions.checkState;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.util.WindowedValue;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
index fceb20c..5030730 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
@@ -17,8 +17,8 @@
*/
package org.apache.beam.runners.direct;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.io.Read.Unbounded;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
index ffaf3fa..f4260f5 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
@@ -17,7 +17,7 @@
*/
package org.apache.beam.runners.direct;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.PCollectionViewWriter;
+import org.apache.beam.runners.direct.DirectRunner.PCollectionViewWriter;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.transforms.AppliedPTransform;
@@ -36,7 +36,7 @@ import java.util.ArrayList;
import java.util.List;
/**
- * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the
+ * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the
* {@link CreatePCollectionView} primitive {@link PTransform}.
*
* <p>The {@link ViewEvaluatorFactory} produces {@link TransformEvaluator TransformEvaluators} for
@@ -49,7 +49,7 @@ class ViewEvaluatorFactory implements TransformEvaluatorFactory {
@Override
public <T> TransformEvaluator<T> forApplication(
AppliedPTransform<?, ?, ?> application,
- InProcessPipelineRunner.CommittedBundle<?> inputBundle,
+ DirectRunner.CommittedBundle<?> inputBundle,
InProcessEvaluationContext evaluationContext) {
@SuppressWarnings({"cast", "unchecked", "rawtypes"})
TransformEvaluator<T> evaluator = createEvaluator(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
index 628f94d..89866cc 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
@@ -17,8 +17,8 @@
*/
package org.apache.beam.runners.direct;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -36,7 +36,7 @@ import java.util.Collection;
import javax.annotation.Nullable;
/**
- * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the
+ * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the
* {@link Bound Window.Bound} primitive {@link PTransform}.
*/
class WindowEvaluatorFactory implements TransformEvaluatorFactory {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactoryTest.java
index c0c1361..d94113a 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactoryTest.java
@@ -114,7 +114,7 @@ public class AvroIOShardedWriteFactoryTest {
private Pipeline getPipeline() {
PipelineOptions options = TestPipeline.testingPipelineOptions();
- options.setRunner(InProcessPipelineRunner.class);
+ options.setRunner(DirectRunner.class);
return TestPipeline.fromOptions(options);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
index bcdc089..e26f860 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
@@ -25,8 +25,8 @@ import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.when;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.BoundedSource;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
index 0d1b464..4969a30 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
@@ -60,24 +60,24 @@ public class CommittedResultTest implements Serializable {
CommittedResult result =
CommittedResult.create(StepTransformResult.withoutHold(transform).build(),
bundleFactory.createRootBundle(created).commit(Instant.now()),
- Collections.<InProcessPipelineRunner.CommittedBundle<?>>emptyList());
+ Collections.<DirectRunner.CommittedBundle<?>>emptyList());
assertThat(result.getTransform(), Matchers.<AppliedPTransform<?, ?, ?>>equalTo(transform));
}
@Test
public void getUncommittedElementsEqualInput() {
- InProcessPipelineRunner.CommittedBundle<Integer> bundle =
+ DirectRunner.CommittedBundle<Integer> bundle =
bundleFactory.createRootBundle(created)
.add(WindowedValue.valueInGlobalWindow(2))
.commit(Instant.now());
CommittedResult result =
CommittedResult.create(StepTransformResult.withoutHold(transform).build(),
bundle,
- Collections.<InProcessPipelineRunner.CommittedBundle<?>>emptyList());
+ Collections.<DirectRunner.CommittedBundle<?>>emptyList());
assertThat(result.getUnprocessedInputs(),
- Matchers.<InProcessPipelineRunner.CommittedBundle<?>>equalTo(bundle));
+ Matchers.<DirectRunner.CommittedBundle<?>>equalTo(bundle));
}
@Test
@@ -85,14 +85,14 @@ public class CommittedResultTest implements Serializable {
CommittedResult result =
CommittedResult.create(StepTransformResult.withoutHold(transform).build(),
null,
- Collections.<InProcessPipelineRunner.CommittedBundle<?>>emptyList());
+ Collections.<DirectRunner.CommittedBundle<?>>emptyList());
assertThat(result.getUnprocessedInputs(), nullValue());
}
@Test
public void getOutputsEqualInput() {
- List<? extends InProcessPipelineRunner.CommittedBundle<?>> outputs =
+ List<? extends DirectRunner.CommittedBundle<?>> outputs =
ImmutableList.of(bundleFactory.createRootBundle(PCollection.createPrimitiveOutputInternal(p,
WindowingStrategy.globalDefault(),
PCollection.IsBounded.BOUNDED)).commit(Instant.now()),
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRegistrarTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRegistrarTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRegistrarTest.java
new file mode 100644
index 0000000..cd44b7e
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRegistrarTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import org.apache.beam.runners.direct.DirectRegistrar.InProcessRunner;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.ServiceLoader;
+
+/** Tests for {@link InProcessRunner}. */
+@RunWith(JUnit4.class)
+public class DirectRegistrarTest {
+ @Test
+ public void testCorrectOptionsAreReturned() {
+ assertEquals(
+ ImmutableList.of(DirectOptions.class),
+ new DirectRegistrar.InProcessOptions().getPipelineOptions());
+ }
+
+ @Test
+ public void testCorrectRunnersAreReturned() {
+ assertEquals(
+ ImmutableList.of(DirectRunner.class),
+ new DirectRegistrar.InProcessRunner().getPipelineRunners());
+ }
+
+ @Test
+ public void testServiceLoaderForOptions() {
+ for (PipelineOptionsRegistrar registrar :
+ Lists.newArrayList(ServiceLoader.load(PipelineOptionsRegistrar.class).iterator())) {
+ if (registrar instanceof DirectRegistrar.InProcessOptions) {
+ return;
+ }
+ }
+ fail("Expected to find " + DirectRegistrar.InProcessOptions.class);
+ }
+
+ @Test
+ public void testServiceLoaderForRunner() {
+ for (PipelineRunnerRegistrar registrar :
+ Lists.newArrayList(ServiceLoader.load(PipelineRunnerRegistrar.class).iterator())) {
+ if (registrar instanceof DirectRegistrar.InProcessRunner) {
+ return;
+ }
+ }
+ fail("Expected to find " + DirectRegistrar.InProcessRunner.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
new file mode 100644
index 0000000..1de38df
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.IllegalMutationException;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+import com.google.common.collect.ImmutableMap;
+
+import com.fasterxml.jackson.annotation.JsonValue;
+
+import org.hamcrest.Matchers;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.internal.matchers.ThrowableMessageMatcher;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Tests for basic {@link DirectRunner} functionality.
+ */
+@RunWith(JUnit4.class)
+public class DirectRunnerTest implements Serializable {
+ @Rule public transient ExpectedException thrown = ExpectedException.none();
+
+ private Pipeline getPipeline() {
+ PipelineOptions opts = PipelineOptionsFactory.create();
+ opts.setRunner(DirectRunner.class);
+
+ Pipeline p = Pipeline.create(opts);
+ return p;
+ }
+
+ @Test
+ public void defaultRunnerLoaded() {
+ assertThat(DirectRunner.class,
+ Matchers.<Class<? extends PipelineRunner>>equalTo(PipelineOptionsFactory.create()
+ .getRunner()));
+ }
+
+ @Test
+ public void wordCountShouldSucceed() throws Throwable {
+ Pipeline p = getPipeline();
+
+ PCollection<KV<String, Long>> counts =
+ p.apply(Create.of("foo", "bar", "foo", "baz", "bar", "foo"))
+ .apply(MapElements.via(new SimpleFunction<String, String>() {
+ @Override
+ public String apply(String input) {
+ return input;
+ }
+ }))
+ .apply(Count.<String>perElement());
+ PCollection<String> countStrs =
+ counts.apply(MapElements.via(new SimpleFunction<KV<String, Long>, String>() {
+ @Override
+ public String apply(KV<String, Long> input) {
+ String str = String.format("%s: %s", input.getKey(), input.getValue());
+ return str;
+ }
+ }));
+
+ PAssert.that(countStrs).containsInAnyOrder("baz: 1", "bar: 2", "foo: 3");
+
+ DirectPipelineResult result = ((DirectPipelineResult) p.run());
+ result.awaitCompletion();
+ }
+
+ @Test(timeout = 5000L)
+ public void byteArrayCountShouldSucceed() {
+ Pipeline p = getPipeline();
+
+ SerializableFunction<Integer, byte[]> getBytes = new SerializableFunction<Integer, byte[]>() {
+ @Override
+ public byte[] apply(Integer input) {
+ try {
+ return CoderUtils.encodeToByteArray(VarIntCoder.of(), input);
+ } catch (CoderException e) {
+ fail("Unexpected Coder Exception " + e);
+ throw new AssertionError("Unreachable");
+ }
+ }
+ };
+ TypeDescriptor<byte[]> td = new TypeDescriptor<byte[]>() {
+ };
+ PCollection<byte[]> foos =
+ p.apply(Create.of(1, 1, 1, 2, 2, 3)).apply(MapElements.via(getBytes).withOutputType(td));
+ PCollection<byte[]> msync =
+ p.apply(Create.of(1, -2, -8, -16)).apply(MapElements.via(getBytes).withOutputType(td));
+ PCollection<byte[]> bytes =
+ PCollectionList.of(foos).and(msync).apply(Flatten.<byte[]>pCollections());
+ PCollection<KV<byte[], Long>> counts = bytes.apply(Count.<byte[]>perElement());
+ PCollection<KV<Integer, Long>> countsBackToString =
+ counts.apply(MapElements.via(new SimpleFunction<KV<byte[], Long>, KV<Integer, Long>>() {
+ @Override
+ public KV<Integer, Long> apply(KV<byte[], Long> input) {
+ try {
+ return KV.of(CoderUtils.decodeFromByteArray(VarIntCoder.of(), input.getKey()),
+ input.getValue());
+ } catch (CoderException e) {
+ fail("Unexpected Coder Exception " + e);
+ throw new AssertionError("Unreachable");
+ }
+ }
+ }));
+
+ Map<Integer, Long> expected = ImmutableMap.<Integer, Long>builder().put(1, 4L)
+ .put(2, 2L)
+ .put(3, 1L)
+ .put(-2, 1L)
+ .put(-8, 1L)
+ .put(-16, 1L)
+ .build();
+ PAssert.thatMap(countsBackToString).isEqualTo(expected);
+ }
+
+ @Test
+ public void transformDisplayDataExceptionShouldFail() {
+ DoFn<Integer, Integer> brokenDoFn = new DoFn<Integer, Integer>() {
+ @Override
+ public void processElement(ProcessContext c) throws Exception {}
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ throw new RuntimeException("oh noes!");
+ }
+ };
+
+ Pipeline p = getPipeline();
+ p
+ .apply(Create.of(1, 2, 3))
+ .apply(ParDo.of(brokenDoFn));
+
+ thrown.expectMessage(brokenDoFn.getClass().getName());
+ thrown.expectCause(ThrowableMessageMatcher.hasMessage(is("oh noes!")));
+ p.run();
+ }
+
+ @Test
+ public void pipelineOptionsDisplayDataExceptionShouldFail() {
+ Object brokenValueType = new Object() {
+ @JsonValue
+ public int getValue () {
+ return 42;
+ }
+
+ @Override
+ public String toString() {
+ throw new RuntimeException("oh noes!!");
+ }
+ };
+
+ Pipeline p = getPipeline();
+ p.getOptions().as(ObjectPipelineOptions.class).setValue(brokenValueType);
+
+ p.apply(Create.of(1, 2, 3));
+
+ thrown.expectMessage(PipelineOptions.class.getName());
+ thrown.expectCause(ThrowableMessageMatcher.hasMessage(is("oh noes!!")));
+ p.run();
+ }
+
+ /** {@link PipelineOptions} to inject bad object implementations. */
+ public interface ObjectPipelineOptions extends PipelineOptions {
+ Object getValue();
+ void setValue(Object value);
+ }
+
+
+ /**
+ * Tests that a {@link DoFn} that mutates an output with a good equals() fails in the
+ * {@link DirectRunner}.
+ */
+ @Test
+ public void testMutatingOutputThenOutputDoFnError() throws Exception {
+ Pipeline pipeline = getPipeline();
+
+ pipeline
+ .apply(Create.of(42))
+ .apply(ParDo.of(new DoFn<Integer, List<Integer>>() {
+ @Override public void processElement(ProcessContext c) {
+ List<Integer> outputList = Arrays.asList(1, 2, 3, 4);
+ c.output(outputList);
+ outputList.set(0, 37);
+ c.output(outputList);
+ }
+ }));
+
+ thrown.expect(IllegalMutationException.class);
+ thrown.expectMessage("output");
+ thrown.expectMessage("must not be mutated");
+ pipeline.run();
+ }
+
+ /**
+ * Tests that a {@link DoFn} that mutates an output with a good equals() fails in the
+ * {@link DirectRunner}.
+ */
+ @Test
+ public void testMutatingOutputThenTerminateDoFnError() throws Exception {
+ Pipeline pipeline = getPipeline();
+
+ pipeline
+ .apply(Create.of(42))
+ .apply(ParDo.of(new DoFn<Integer, List<Integer>>() {
+ @Override public void processElement(ProcessContext c) {
+ List<Integer> outputList = Arrays.asList(1, 2, 3, 4);
+ c.output(outputList);
+ outputList.set(0, 37);
+ }
+ }));
+
+ thrown.expect(IllegalMutationException.class);
+ thrown.expectMessage("output");
+ thrown.expectMessage("must not be mutated");
+ pipeline.run();
+ }
+
+ /**
+ * Tests that a {@link DoFn} that mutates an output with a bad equals() still fails
+ * in the {@link DirectRunner}.
+ */
+ @Test
+ public void testMutatingOutputCoderDoFnError() throws Exception {
+ Pipeline pipeline = getPipeline();
+
+ pipeline
+ .apply(Create.of(42))
+ .apply(ParDo.of(new DoFn<Integer, byte[]>() {
+ @Override public void processElement(ProcessContext c) {
+ byte[] outputArray = new byte[]{0x1, 0x2, 0x3};
+ c.output(outputArray);
+ outputArray[0] = 0xa;
+ c.output(outputArray);
+ }
+ }));
+
+ thrown.expect(IllegalMutationException.class);
+ thrown.expectMessage("output");
+ thrown.expectMessage("must not be mutated");
+ pipeline.run();
+ }
+
+ /**
+ * Tests that a {@link DoFn} that mutates its input with a good equals() fails in the
+ * {@link DirectRunner}.
+ */
+ @Test
+ public void testMutatingInputDoFnError() throws Exception {
+ Pipeline pipeline = getPipeline();
+
+ pipeline
+ .apply(Create.of(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6))
+ .withCoder(ListCoder.of(VarIntCoder.of())))
+ .apply(ParDo.of(new DoFn<List<Integer>, Integer>() {
+ @Override public void processElement(ProcessContext c) {
+ List<Integer> inputList = c.element();
+ inputList.set(0, 37);
+ c.output(12);
+ }
+ }));
+
+ thrown.expect(IllegalMutationException.class);
+ thrown.expectMessage("Input");
+ thrown.expectMessage("must not be mutated");
+ pipeline.run();
+ }
+
+ /**
+ * Tests that a {@link DoFn} that mutates an input with a bad equals() still fails
+ * in the {@link DirectRunner}.
+ */
+ @Test
+ public void testMutatingInputCoderDoFnError() throws Exception {
+ Pipeline pipeline = getPipeline();
+
+ pipeline
+ .apply(Create.of(new byte[]{0x1, 0x2, 0x3}, new byte[]{0x4, 0x5, 0x6}))
+ .apply(ParDo.of(new DoFn<byte[], Integer>() {
+ @Override public void processElement(ProcessContext c) {
+ byte[] inputArray = c.element();
+ inputArray[0] = 0xa;
+ c.output(13);
+ }
+ }));
+
+ thrown.expect(IllegalMutationException.class);
+ thrown.expectMessage("Input");
+ thrown.expectMessage("must not be mutated");
+ pipeline.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactoryTest.java
index 9a358dd..e129489 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactoryTest.java
@@ -19,7 +19,7 @@ package org.apache.beam.runners.direct;
import static org.hamcrest.Matchers.isA;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
index 66a5106..5efb090 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
@@ -23,8 +23,8 @@ import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Create;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
index a4f900c..b589db0 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
@@ -22,8 +22,8 @@ import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
index 20670ca..c4da86c 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
@@ -20,8 +20,8 @@ package org.apache.beam.runners.direct;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertThat;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.testing.TestPipeline;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
index 6cef60d..ead9c9e 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
@@ -17,7 +17,7 @@
*/
package org.apache.beam.runners.direct;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Count;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java
index af08d02..21c941a 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java
@@ -29,8 +29,8 @@ import org.apache.beam.runners.direct.InMemoryWatermarkManager.FiredTimers;
import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate;
import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate.TimerUpdateBuilder;
import org.apache.beam.runners.direct.InMemoryWatermarkManager.TransformWatermarks;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessBundleFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessBundleFactoryTest.java
index abe2a19..3a3ac8c 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessBundleFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessBundleFactoryTest.java
@@ -21,8 +21,8 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java
index 18db400..b1cbeb1 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java
@@ -28,9 +28,9 @@ import static org.junit.Assert.assertThat;
import org.apache.beam.runners.direct.InMemoryWatermarkManager.FiredTimers;
import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate;
import org.apache.beam.runners.direct.InProcessExecutionContext.InProcessStepContext;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.PCollectionViewWriter;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.PCollectionViewWriter;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
@@ -99,8 +99,8 @@ public class InProcessEvaluationContextTest {
@Before
public void setup() {
- InProcessPipelineRunner runner =
- InProcessPipelineRunner.fromOptions(PipelineOptionsFactory.create());
+ DirectRunner runner =
+ DirectRunner.fromOptions(PipelineOptionsFactory.create());
p = TestPipeline.create();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactoryTest.java
index 28a3cf6..e8d4711 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactoryTest.java
@@ -22,8 +22,8 @@ import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRegistrarTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRegistrarTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRegistrarTest.java
deleted file mode 100644
index 54094c4..0000000
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRegistrarTest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import org.apache.beam.runners.direct.InProcessRegistrar.InProcessRunner;
-import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
-import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.util.ServiceLoader;
-
-/** Tests for {@link InProcessRunner}. */
-@RunWith(JUnit4.class)
-public class InProcessPipelineRegistrarTest {
- @Test
- public void testCorrectOptionsAreReturned() {
- assertEquals(
- ImmutableList.of(InProcessPipelineOptions.class),
- new InProcessRegistrar.InProcessOptions().getPipelineOptions());
- }
-
- @Test
- public void testCorrectRunnersAreReturned() {
- assertEquals(
- ImmutableList.of(InProcessPipelineRunner.class),
- new InProcessRegistrar.InProcessRunner().getPipelineRunners());
- }
-
- @Test
- public void testServiceLoaderForOptions() {
- for (PipelineOptionsRegistrar registrar :
- Lists.newArrayList(ServiceLoader.load(PipelineOptionsRegistrar.class).iterator())) {
- if (registrar instanceof InProcessRegistrar.InProcessOptions) {
- return;
- }
- }
- fail("Expected to find " + InProcessRegistrar.InProcessOptions.class);
- }
-
- @Test
- public void testServiceLoaderForRunner() {
- for (PipelineRunnerRegistrar registrar :
- Lists.newArrayList(ServiceLoader.load(PipelineRunnerRegistrar.class).iterator())) {
- if (registrar instanceof InProcessRegistrar.InProcessRunner) {
- return;
- }
- }
- fail("Expected to find " + InProcessRegistrar.InProcessRunner.class);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java
deleted file mode 100644
index ab26c15..0000000
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java
+++ /dev/null
@@ -1,339 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
-
-import org.apache.beam.runners.direct.InProcessPipelineRunner.InProcessPipelineResult;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.IllegalMutationException;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionList;
-import org.apache.beam.sdk.values.TypeDescriptor;
-
-import com.google.common.collect.ImmutableMap;
-
-import com.fasterxml.jackson.annotation.JsonValue;
-
-import org.hamcrest.Matchers;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.internal.matchers.ThrowableMessageMatcher;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Tests for basic {@link InProcessPipelineRunner} functionality.
- */
-@RunWith(JUnit4.class)
-public class InProcessPipelineRunnerTest implements Serializable {
- @Rule public transient ExpectedException thrown = ExpectedException.none();
-
- private Pipeline getPipeline() {
- PipelineOptions opts = PipelineOptionsFactory.create();
- opts.setRunner(InProcessPipelineRunner.class);
-
- Pipeline p = Pipeline.create(opts);
- return p;
- }
-
- @Test
- public void defaultRunnerLoaded() {
- assertThat(InProcessPipelineRunner.class,
- Matchers.<Class<? extends PipelineRunner>>equalTo(PipelineOptionsFactory.create()
- .getRunner()));
- }
-
- @Test
- public void wordCountShouldSucceed() throws Throwable {
- Pipeline p = getPipeline();
-
- PCollection<KV<String, Long>> counts =
- p.apply(Create.of("foo", "bar", "foo", "baz", "bar", "foo"))
- .apply(MapElements.via(new SimpleFunction<String, String>() {
- @Override
- public String apply(String input) {
- return input;
- }
- }))
- .apply(Count.<String>perElement());
- PCollection<String> countStrs =
- counts.apply(MapElements.via(new SimpleFunction<KV<String, Long>, String>() {
- @Override
- public String apply(KV<String, Long> input) {
- String str = String.format("%s: %s", input.getKey(), input.getValue());
- return str;
- }
- }));
-
- PAssert.that(countStrs).containsInAnyOrder("baz: 1", "bar: 2", "foo: 3");
-
- InProcessPipelineResult result = ((InProcessPipelineResult) p.run());
- result.awaitCompletion();
- }
-
- @Test(timeout = 5000L)
- public void byteArrayCountShouldSucceed() {
- Pipeline p = getPipeline();
-
- SerializableFunction<Integer, byte[]> getBytes = new SerializableFunction<Integer, byte[]>() {
- @Override
- public byte[] apply(Integer input) {
- try {
- return CoderUtils.encodeToByteArray(VarIntCoder.of(), input);
- } catch (CoderException e) {
- fail("Unexpected Coder Exception " + e);
- throw new AssertionError("Unreachable");
- }
- }
- };
- TypeDescriptor<byte[]> td = new TypeDescriptor<byte[]>() {
- };
- PCollection<byte[]> foos =
- p.apply(Create.of(1, 1, 1, 2, 2, 3)).apply(MapElements.via(getBytes).withOutputType(td));
- PCollection<byte[]> msync =
- p.apply(Create.of(1, -2, -8, -16)).apply(MapElements.via(getBytes).withOutputType(td));
- PCollection<byte[]> bytes =
- PCollectionList.of(foos).and(msync).apply(Flatten.<byte[]>pCollections());
- PCollection<KV<byte[], Long>> counts = bytes.apply(Count.<byte[]>perElement());
- PCollection<KV<Integer, Long>> countsBackToString =
- counts.apply(MapElements.via(new SimpleFunction<KV<byte[], Long>, KV<Integer, Long>>() {
- @Override
- public KV<Integer, Long> apply(KV<byte[], Long> input) {
- try {
- return KV.of(CoderUtils.decodeFromByteArray(VarIntCoder.of(), input.getKey()),
- input.getValue());
- } catch (CoderException e) {
- fail("Unexpected Coder Exception " + e);
- throw new AssertionError("Unreachable");
- }
- }
- }));
-
- Map<Integer, Long> expected = ImmutableMap.<Integer, Long>builder().put(1, 4L)
- .put(2, 2L)
- .put(3, 1L)
- .put(-2, 1L)
- .put(-8, 1L)
- .put(-16, 1L)
- .build();
- PAssert.thatMap(countsBackToString).isEqualTo(expected);
- }
-
- @Test
- public void transformDisplayDataExceptionShouldFail() {
- DoFn<Integer, Integer> brokenDoFn = new DoFn<Integer, Integer>() {
- @Override
- public void processElement(ProcessContext c) throws Exception {}
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- throw new RuntimeException("oh noes!");
- }
- };
-
- Pipeline p = getPipeline();
- p
- .apply(Create.of(1, 2, 3))
- .apply(ParDo.of(brokenDoFn));
-
- thrown.expectMessage(brokenDoFn.getClass().getName());
- thrown.expectCause(ThrowableMessageMatcher.hasMessage(is("oh noes!")));
- p.run();
- }
-
- @Test
- public void pipelineOptionsDisplayDataExceptionShouldFail() {
- Object brokenValueType = new Object() {
- @JsonValue
- public int getValue () {
- return 42;
- }
-
- @Override
- public String toString() {
- throw new RuntimeException("oh noes!!");
- }
- };
-
- Pipeline p = getPipeline();
- p.getOptions().as(ObjectPipelineOptions.class).setValue(brokenValueType);
-
- p.apply(Create.of(1, 2, 3));
-
- thrown.expectMessage(PipelineOptions.class.getName());
- thrown.expectCause(ThrowableMessageMatcher.hasMessage(is("oh noes!!")));
- p.run();
- }
-
- /** {@link PipelineOptions} to inject bad object implementations. */
- public interface ObjectPipelineOptions extends PipelineOptions {
- Object getValue();
- void setValue(Object value);
- }
-
-
- /**
- * Tests that a {@link DoFn} that mutates an output with a good equals() fails in the
- * {@link InProcessPipelineRunner}.
- */
- @Test
- public void testMutatingOutputThenOutputDoFnError() throws Exception {
- Pipeline pipeline = getPipeline();
-
- pipeline
- .apply(Create.of(42))
- .apply(ParDo.of(new DoFn<Integer, List<Integer>>() {
- @Override public void processElement(ProcessContext c) {
- List<Integer> outputList = Arrays.asList(1, 2, 3, 4);
- c.output(outputList);
- outputList.set(0, 37);
- c.output(outputList);
- }
- }));
-
- thrown.expect(IllegalMutationException.class);
- thrown.expectMessage("output");
- thrown.expectMessage("must not be mutated");
- pipeline.run();
- }
-
- /**
- * Tests that a {@link DoFn} that mutates an output with a good equals() fails in the
- * {@link InProcessPipelineRunner}.
- */
- @Test
- public void testMutatingOutputThenTerminateDoFnError() throws Exception {
- Pipeline pipeline = getPipeline();
-
- pipeline
- .apply(Create.of(42))
- .apply(ParDo.of(new DoFn<Integer, List<Integer>>() {
- @Override public void processElement(ProcessContext c) {
- List<Integer> outputList = Arrays.asList(1, 2, 3, 4);
- c.output(outputList);
- outputList.set(0, 37);
- }
- }));
-
- thrown.expect(IllegalMutationException.class);
- thrown.expectMessage("output");
- thrown.expectMessage("must not be mutated");
- pipeline.run();
- }
-
- /**
- * Tests that a {@link DoFn} that mutates an output with a bad equals() still fails
- * in the {@link InProcessPipelineRunner}.
- */
- @Test
- public void testMutatingOutputCoderDoFnError() throws Exception {
- Pipeline pipeline = getPipeline();
-
- pipeline
- .apply(Create.of(42))
- .apply(ParDo.of(new DoFn<Integer, byte[]>() {
- @Override public void processElement(ProcessContext c) {
- byte[] outputArray = new byte[]{0x1, 0x2, 0x3};
- c.output(outputArray);
- outputArray[0] = 0xa;
- c.output(outputArray);
- }
- }));
-
- thrown.expect(IllegalMutationException.class);
- thrown.expectMessage("output");
- thrown.expectMessage("must not be mutated");
- pipeline.run();
- }
-
- /**
- * Tests that a {@link DoFn} that mutates its input with a good equals() fails in the
- * {@link InProcessPipelineRunner}.
- */
- @Test
- public void testMutatingInputDoFnError() throws Exception {
- Pipeline pipeline = getPipeline();
-
- pipeline
- .apply(Create.of(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6))
- .withCoder(ListCoder.of(VarIntCoder.of())))
- .apply(ParDo.of(new DoFn<List<Integer>, Integer>() {
- @Override public void processElement(ProcessContext c) {
- List<Integer> inputList = c.element();
- inputList.set(0, 37);
- c.output(12);
- }
- }));
-
- thrown.expect(IllegalMutationException.class);
- thrown.expectMessage("Input");
- thrown.expectMessage("must not be mutated");
- pipeline.run();
- }
-
- /**
- * Tests that a {@link DoFn} that mutates an input with a bad equals() still fails
- * in the {@link InProcessPipelineRunner}.
- */
- @Test
- public void testMutatingInputCoderDoFnError() throws Exception {
- Pipeline pipeline = getPipeline();
-
- pipeline
- .apply(Create.of(new byte[]{0x1, 0x2, 0x3}, new byte[]{0x4, 0x5, 0x6}))
- .apply(ParDo.of(new DoFn<byte[], Integer>() {
- @Override public void processElement(ProcessContext c) {
- byte[] inputArray = c.element();
- inputArray[0] = 0xa;
- c.output(13);
- }
- }));
-
- thrown.expect(IllegalMutationException.class);
- thrown.expectMessage("Input");
- thrown.expectMessage("must not be mutated");
- pipeline.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoInProcessEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoInProcessEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoInProcessEvaluatorTest.java
index 0f7afa1..b78eb40 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoInProcessEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoInProcessEvaluatorTest.java
@@ -25,8 +25,8 @@ import static org.mockito.Mockito.when;
import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate;
import org.apache.beam.runners.direct.InProcessExecutionContext.InProcessStepContext;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Create;
@@ -139,7 +139,7 @@ public class ParDoInProcessEvaluatorTest {
private ParDoInProcessEvaluator<Integer> createEvaluator(
PCollectionView<Integer> singletonView,
RecorderFn fn,
- InProcessPipelineRunner.CommittedBundle<Integer> inputBundle,
+ DirectRunner.CommittedBundle<Integer> inputBundle,
PCollection<Integer> output) {
when(
evaluationContext.createSideInputReader(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
index a6f31c0..e61881e 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
@@ -26,8 +26,8 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
index a1480e5..8b8d44f 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
@@ -26,8 +26,8 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TextIOShardedWriteFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TextIOShardedWriteFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TextIOShardedWriteFactoryTest.java
index fe9866c..5ede931 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TextIOShardedWriteFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TextIOShardedWriteFactoryTest.java
@@ -114,7 +114,7 @@ public class TextIOShardedWriteFactoryTest {
private Pipeline getPipeline() {
PipelineOptions options = TestPipeline.testingPipelineOptions();
- options.setRunner(InProcessPipelineRunner.class);
+ options.setRunner(DirectRunner.class);
return TestPipeline.fromOptions(options);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
index 0345662..a5e6cee 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
@@ -27,7 +27,7 @@ import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.when;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.AppliedPTransform;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
index 05656eb..be5c489 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
@@ -25,8 +25,8 @@ import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.Coder;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
index 859418b..714e9c9 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
@@ -23,8 +23,8 @@ import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.PCollectionViewWriter;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.PCollectionViewWriter;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VoidCoder;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
index a2f971a..8a3591b 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
@@ -21,8 +21,8 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.when;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.AppliedPTransform;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
index 01f3070..5fdfb49 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
@@ -21,7 +21,7 @@ package org.apache.beam.runners.spark.translation;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertThat;
-import org.apache.beam.runners.direct.InProcessPipelineRunner;
+import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.runners.spark.SparkPipelineRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
@@ -58,7 +58,7 @@ public class TransformTranslatorTest {
*/
@Test
public void testTextIOReadAndWriteTransforms() throws IOException {
- String directOut = runPipeline(InProcessPipelineRunner.class);
+ String directOut = runPipeline(DirectRunner.class);
String sparkOut = runPipeline(SparkPipelineRunner.class);
List<String> directOutput =
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
index b1b5280..456b6ae 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
@@ -277,7 +277,7 @@ public interface PipelineOptions extends HasDisplayData {
try {
@SuppressWarnings({"unchecked", "rawtypes"})
Class<? extends PipelineRunner> direct = (Class<? extends PipelineRunner>) Class.forName(
- "org.apache.beam.runners.direct.InProcessPipelineRunner");
+ "org.apache.beam.runners.direct.DirectRunner");
return direct;
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException(String.format(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/testing/travis/test_wordcount.sh
----------------------------------------------------------------------
diff --git a/testing/travis/test_wordcount.sh b/testing/travis/test_wordcount.sh
index b00b0d6..e059a35 100755
--- a/testing/travis/test_wordcount.sh
+++ b/testing/travis/test_wordcount.sh
@@ -70,7 +70,7 @@ function run_via_mvn {
local outfile_prefix="$(get_outfile_prefix "$name")" || exit 2
local cmd='mvn exec:java -f pom.xml -pl examples/java \
-Dexec.mainClass=org.apache.beam.examples.WordCount \
- -Dexec.args="--runner=InProcessPipelineRunner --inputFile='"$input"' --output='"$outfile_prefix"'"'
+ -Dexec.args="--runner=DirectRunner --inputFile='"$input"' --output='"$outfile_prefix"'"'
echo "$name: Running $cmd" >&2
sh -c "$cmd"
check_result_hash "$name" "$outfile_prefix" "$expected_hash"
@@ -84,7 +84,7 @@ function run_bundled {
local outfile_prefix="$(get_outfile_prefix "$name")" || exit 2
local cmd='java -cp '"$JAR_FILE"' \
org.apache.beam.examples.WordCount \
- --runner=InProcessPipelineRunner \
+ --runner=DirectRunner \
--inputFile='"'$input'"' \
--output='"$outfile_prefix"
echo "$name: Running $cmd" >&2