You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/03/17 21:46:55 UTC
[24/50] [abbrv] beam git commit: Remove Pipeline.getRunner
Remove Pipeline.getRunner
Runners need not be instantiated until after pipeline construction, so
they should not be exposed by the Pipeline class.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d41fe1df
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d41fe1df
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d41fe1df
Branch: refs/heads/gearpump-runner
Commit: d41fe1df26329479b82cc59d260998f2b88b4799
Parents: 2c2424c
Author: Thomas Groh <tg...@google.com>
Authored: Thu Mar 2 10:54:29 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Mar 10 09:40:50 2017 -0800
----------------------------------------------------------------------
.../beam/runners/direct/DirectRunner.java | 95 ++++++++++----------
.../direct/TestStreamEvaluatorFactory.java | 22 +++--
.../direct/TestStreamEvaluatorFactoryTest.java | 6 +-
.../BatchStatefulParDoOverridesTest.java | 4 +-
.../DataflowPipelineTranslatorTest.java | 39 ++++----
.../runners/dataflow/DataflowRunnerTest.java | 4 +-
.../testing/TestDataflowRunnerTest.java | 50 +++++------
.../main/java/org/apache/beam/sdk/Pipeline.java | 7 --
8 files changed, 110 insertions(+), 117 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/d41fe1df/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index f56d225..ce8dbc0 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -70,53 +70,6 @@ import org.joda.time.Instant;
*/
public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
/**
- * The default set of transform overrides to use in the {@link DirectRunner}.
- *
- * <p>The order in which overrides is applied is important, as some overrides are expanded into a
- * composite. If the composite contains {@link PTransform PTransforms} which are also overridden,
- * these PTransforms must occur later in the iteration order. {@link ImmutableMap} has an
- * iteration order based on the order at which elements are added to it.
- */
- @SuppressWarnings("rawtypes")
- private static Map<PTransformMatcher, PTransformOverrideFactory> defaultTransformOverrides =
- ImmutableMap.<PTransformMatcher, PTransformOverrideFactory>builder()
- .put(
- PTransformMatchers.writeWithRunnerDeterminedSharding(),
- new WriteWithShardingFactory()) /* Uses a view internally. */
- .put(
- PTransformMatchers.classEqualTo(CreatePCollectionView.class),
- new ViewOverrideFactory()) /* Uses pardos and GBKs */
- .put(
- PTransformMatchers.classEqualTo(TestStream.class),
- new DirectTestStreamFactory()) /* primitive */
- /* Single-output ParDos are implemented in terms of Multi-output ParDos. Any override
- that is applied to a multi-output ParDo must first have all matching Single-output ParDos
- converted to match.
- */
- .put(PTransformMatchers.splittableParDoSingle(), new ParDoSingleViaMultiOverrideFactory())
- .put(
- PTransformMatchers.stateOrTimerParDoSingle(),
- new ParDoSingleViaMultiOverrideFactory())
- // SplittableParMultiDo is implemented in terms of nonsplittable single ParDos
- .put(PTransformMatchers.splittableParDoMulti(), new ParDoMultiOverrideFactory())
- // state and timer pardos are implemented in terms of nonsplittable single ParDos
- .put(PTransformMatchers.stateOrTimerParDoMulti(), new ParDoMultiOverrideFactory())
- .put(
- PTransformMatchers.classEqualTo(ParDo.Bound.class),
- new ParDoSingleViaMultiOverrideFactory()) /* returns a BoundMulti */
- .put(
- PTransformMatchers.classEqualTo(BoundMulti.class),
- /* returns one of two primitives; SplittableParDos are replaced above. */
- new ParDoMultiOverrideFactory())
- .put(
- PTransformMatchers.classEqualTo(GBKIntoKeyedWorkItems.class),
- new DirectGBKIntoKeyedWorkItemsOverrideFactory()) /* Returns a GBKO */
- .put(
- PTransformMatchers.classEqualTo(GroupByKey.class),
- new DirectGroupByKeyOverrideFactory()) /* returns two chained primitives. */
- .build();
-
- /**
* Part of a {@link PCollection}. Elements are output to a bundle, which will cause them to be
* executed by {@link PTransform PTransforms} that consume the {@link PCollection} this bundle is
* a part of at a later point. This is an uncommitted bundle and can have elements added to it.
@@ -309,7 +262,7 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
@Override
public DirectPipelineResult run(Pipeline pipeline) {
for (Map.Entry<PTransformMatcher, PTransformOverrideFactory> override :
- defaultTransformOverrides.entrySet()) {
+ defaultTransformOverrides().entrySet()) {
pipeline.replace(override.getKey(), override.getValue());
}
MetricsEnvironment.setMetricsSupported(true);
@@ -361,6 +314,52 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
}
/**
+ * The default set of transform overrides to use in the {@link DirectRunner}.
+ *
+ * <p>The order in which overrides is applied is important, as some overrides are expanded into a
+ * composite. If the composite contains {@link PTransform PTransforms} which are also overridden,
+ * these PTransforms must occur later in the iteration order. {@link ImmutableMap} has an
+ * iteration order based on the order at which elements are added to it.
+ */
+ @SuppressWarnings("rawtypes")
+ private Map<PTransformMatcher, PTransformOverrideFactory> defaultTransformOverrides() {
+ return ImmutableMap.<PTransformMatcher, PTransformOverrideFactory>builder()
+ .put(
+ PTransformMatchers.writeWithRunnerDeterminedSharding(),
+ new WriteWithShardingFactory()) /* Uses a view internally. */
+ .put(
+ PTransformMatchers.classEqualTo(CreatePCollectionView.class),
+ new ViewOverrideFactory()) /* Uses pardos and GBKs */
+ .put(
+ PTransformMatchers.classEqualTo(TestStream.class),
+ new DirectTestStreamFactory(this)) /* primitive */
+ /* Single-output ParDos are implemented in terms of Multi-output ParDos. Any override
+ that is applied to a multi-output ParDo must first have all matching Single-output ParDos
+ converted to match.
+ */
+ .put(PTransformMatchers.splittableParDoSingle(), new ParDoSingleViaMultiOverrideFactory())
+ .put(PTransformMatchers.stateOrTimerParDoSingle(), new ParDoSingleViaMultiOverrideFactory())
+ // SplittableParMultiDo is implemented in terms of nonsplittable single ParDos
+ .put(PTransformMatchers.splittableParDoMulti(), new ParDoMultiOverrideFactory())
+ // state and timer pardos are implemented in terms of nonsplittable single ParDos
+ .put(PTransformMatchers.stateOrTimerParDoMulti(), new ParDoMultiOverrideFactory())
+ .put(
+ PTransformMatchers.classEqualTo(ParDo.Bound.class),
+ new ParDoSingleViaMultiOverrideFactory()) /* returns a BoundMulti */
+ .put(
+ PTransformMatchers.classEqualTo(BoundMulti.class),
+ /* returns one of two primitives; SplittableParDos are replaced above. */
+ new ParDoMultiOverrideFactory())
+ .put(
+ PTransformMatchers.classEqualTo(GBKIntoKeyedWorkItems.class),
+ new DirectGBKIntoKeyedWorkItemsOverrideFactory()) /* Returns a GBKO */
+ .put(
+ PTransformMatchers.classEqualTo(GroupByKey.class),
+ new DirectGroupByKeyOverrideFactory()) /* returns two chained primitives. */
+ .build();
+ }
+
+ /**
* The result of running a {@link Pipeline} with the {@link DirectRunner}.
*
* <p>Throws {@link UnsupportedOperationException} for all methods.
http://git-wip-us.apache.org/repos/asf/beam/blob/d41fe1df/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
index 628aa23..0dd8919 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
@@ -18,8 +18,6 @@
package org.apache.beam.runners.direct;
-import static com.google.common.base.Preconditions.checkState;
-
import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Supplier;
@@ -35,7 +33,6 @@ import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
-import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.testing.TestStream.ElementEvent;
import org.apache.beam.sdk.testing.TestStream.Event;
@@ -166,11 +163,16 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
static class DirectTestStreamFactory<T>
implements PTransformOverrideFactory<PBegin, PCollection<T>, TestStream<T>> {
+ private final DirectRunner runner;
+
+ DirectTestStreamFactory(DirectRunner runner) {
+ this.runner = runner;
+ }
@Override
public PTransform<PBegin, PCollection<T>> getReplacementTransform(
TestStream<T> transform) {
- return new DirectTestStream<>(transform);
+ return new DirectTestStream<>(runner, transform);
}
@Override
@@ -185,22 +187,18 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
}
static class DirectTestStream<T> extends PTransform<PBegin, PCollection<T>> {
+ private final transient DirectRunner runner;
private final TestStream<T> original;
@VisibleForTesting
- DirectTestStream(TestStream<T> transform) {
+ DirectTestStream(DirectRunner runner, TestStream<T> transform) {
+ this.runner = runner;
this.original = transform;
}
@Override
public PCollection<T> expand(PBegin input) {
- PipelineRunner<?> runner = input.getPipeline().getRunner();
- checkState(
- runner instanceof DirectRunner,
- "%s can only be used when running with the %s",
- getClass().getSimpleName(),
- DirectRunner.class.getSimpleName());
- ((DirectRunner) runner).setClockSupplier(new TestClockSupplier());
+ runner.setClockSupplier(new TestClockSupplier());
return PCollection.<T>createPrimitiveOutputInternal(
input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
.setCoder(original.getValueCoder());
http://git-wip-us.apache.org/repos/asf/beam/blob/d41fe1df/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
index 9ed72d5..fc689fe 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
@@ -60,10 +60,12 @@ public class TestStreamEvaluatorFactoryTest {
@Rule
public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+ private DirectRunner runner;
@Before
public void setup() {
context = mock(EvaluationContext.class);
+ runner = DirectRunner.fromOptions(TestPipeline.testingPipelineOptions());
factory = new TestStreamEvaluatorFactory(context);
bundleFactory = ImmutableListBundleFactory.create();
}
@@ -80,7 +82,7 @@ public class TestStreamEvaluatorFactoryTest {
.advanceProcessingTime(Duration.standardMinutes(10))
.advanceWatermarkToInfinity();
PCollection<Integer> streamVals =
- p.apply(new DirectTestStream<Integer>(testStream));
+ p.apply(new DirectTestStream<Integer>(runner, testStream));
TestClock clock = new TestClock();
when(context.getClock()).thenReturn(clock);
@@ -180,7 +182,7 @@ public class TestStreamEvaluatorFactoryTest {
@Test
public void overrideFactoryGetInputSucceeds() {
- DirectTestStreamFactory<?> factory = new DirectTestStreamFactory<>();
+ DirectTestStreamFactory<?> factory = new DirectTestStreamFactory<>(runner);
PBegin begin = factory.getInput(Collections.<TaggedPValue>emptyList(), p);
assertThat(begin.getPipeline(), Matchers.<Pipeline>equalTo(p));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/d41fe1df/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java
index ef3e414..899902a 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java
@@ -71,7 +71,7 @@ public class BatchStatefulParDoOverridesTest implements Serializable {
DummyStatefulDoFn fn = new DummyStatefulDoFn();
pipeline.apply(Create.of(KV.of(1, 2))).apply(ParDo.of(fn));
- DataflowRunner runner = (DataflowRunner) pipeline.getRunner();
+ DataflowRunner runner = DataflowRunner.fromOptions(options);
runner.replaceTransforms(pipeline);
assertThat(findBatchStatefulDoFn(pipeline), equalTo((DoFn) fn));
}
@@ -89,7 +89,7 @@ public class BatchStatefulParDoOverridesTest implements Serializable {
.apply(Create.of(KV.of(1, 2)))
.apply(ParDo.withOutputTags(mainOutputTag, TupleTagList.empty()).of(fn));
- DataflowRunner runner = (DataflowRunner) pipeline.getRunner();
+ DataflowRunner runner = DataflowRunner.fromOptions(options);
runner.replaceTransforms(pipeline);
assertThat(findBatchStatefulDoFn(pipeline), equalTo((DoFn) fn));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/d41fe1df/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 660e92e..813e76d 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -190,7 +190,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
Job job =
DataflowPipelineTranslator.fromOptions(options)
.translate(
- p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
+ p, DataflowRunner.fromOptions(options), Collections.<DataflowPackage>emptyList())
.getJob();
Map<String, Object> sdkPipelineOptions = job.getEnvironment().getSdkPipelineOptions();
@@ -223,7 +223,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
Job job =
DataflowPipelineTranslator.fromOptions(options)
.translate(
- p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
+ p, DataflowRunner.fromOptions(options), Collections.<DataflowPackage>emptyList())
.getJob();
assertEquals(1, job.getEnvironment().getWorkerPools().size());
@@ -240,7 +240,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
Job job =
DataflowPipelineTranslator.fromOptions(options)
.translate(
- p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
+ p, DataflowRunner.fromOptions(options), Collections.<DataflowPackage>emptyList())
.getJob();
assertEquals(1, job.getEnvironment().getWorkerPools().size());
@@ -259,7 +259,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
Job job =
DataflowPipelineTranslator.fromOptions(options)
.translate(
- p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
+ p, DataflowRunner.fromOptions(options), Collections.<DataflowPackage>emptyList())
.getJob();
assertEquals(1, job.getEnvironment().getWorkerPools().size());
@@ -276,7 +276,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
Job job =
DataflowPipelineTranslator.fromOptions(options)
.translate(
- p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
+ p, DataflowRunner.fromOptions(options), Collections.<DataflowPackage>emptyList())
.getJob();
assertEquals(1, job.getEnvironment().getWorkerPools().size());
@@ -292,7 +292,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
Job job =
DataflowPipelineTranslator.fromOptions(options)
.translate(
- p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
+ p, DataflowRunner.fromOptions(options), Collections.<DataflowPackage>emptyList())
.getJob();
assertEquals(1, job.getEnvironment().getWorkerPools().size());
@@ -328,7 +328,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
Job job =
DataflowPipelineTranslator.fromOptions(options)
.translate(
- p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
+ p, DataflowRunner.fromOptions(options), Collections.<DataflowPackage>emptyList())
.getJob();
assertEquals(1, job.getEnvironment().getWorkerPools().size());
@@ -363,7 +363,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
Job job =
DataflowPipelineTranslator.fromOptions(options)
.translate(
- p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
+ p, DataflowRunner.fromOptions(options), Collections.<DataflowPackage>emptyList())
.getJob();
assertEquals(1, job.getEnvironment().getWorkerPools().size());
@@ -397,7 +397,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
Job job =
DataflowPipelineTranslator.fromOptions(options)
.translate(
- p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
+ p, DataflowRunner.fromOptions(options), Collections.<DataflowPackage>emptyList())
.getJob();
assertEquals(1, job.getEnvironment().getWorkerPools().size());
@@ -417,7 +417,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
Job job =
DataflowPipelineTranslator.fromOptions(options)
.translate(
- p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
+ p, DataflowRunner.fromOptions(options), Collections.<DataflowPackage>emptyList())
.getJob();
assertEquals(1, job.getEnvironment().getWorkerPools().size());
@@ -438,7 +438,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
Job job =
DataflowPipelineTranslator.fromOptions(options)
.translate(
- p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
+ p, DataflowRunner.fromOptions(options), Collections.<DataflowPackage>emptyList())
.getJob();
assertEquals(1, job.getEnvironment().getWorkerPools().size());
@@ -653,7 +653,8 @@ public class DataflowPipelineTranslatorTest implements Serializable {
@Test
public void testMultiGraphPipelineSerialization() throws Exception {
- Pipeline p = Pipeline.create(buildPipelineOptions());
+ DataflowPipelineOptions options = buildPipelineOptions();
+ Pipeline p = Pipeline.create(options);
PCollection<Integer> input = p.begin()
.apply(Create.of(1, 2, 3));
@@ -666,7 +667,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
// Check that translation doesn't fail.
JobSpecification jobSpecification = t.translate(
- p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList());
+ p, DataflowRunner.fromOptions(options), Collections.<DataflowPackage>emptyList());
assertAllStepOutputsHaveUniqueIds(jobSpecification.getJob());
}
@@ -710,7 +711,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
// Check that translation doesn't fail.
JobSpecification jobSpecification = t.translate(
pipeline,
- (DataflowRunner) pipeline.getRunner(),
+ DataflowRunner.fromOptions(options),
Collections.<DataflowPackage>emptyList());
assertAllStepOutputsHaveUniqueIds(jobSpecification.getJob());
}
@@ -737,7 +738,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
ThrowableMessageMatcher.hasMessage(containsString("Unsupported wildcard usage"))));
t.translate(
pipeline,
- (DataflowRunner) pipeline.getRunner(),
+ DataflowRunner.fromOptions(options),
Collections.<DataflowPackage>emptyList());
}
@@ -764,7 +765,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
// Check that translation does not fail.
t.translate(
pipeline,
- (DataflowRunner) pipeline.getRunner(),
+ DataflowRunner.fromOptions(options),
Collections.<DataflowPackage>emptyList());
}
@@ -785,7 +786,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
translator
.translate(
pipeline,
- (DataflowRunner) pipeline.getRunner(),
+ DataflowRunner.fromOptions(options),
Collections.<DataflowPackage>emptyList())
.getJob();
assertAllStepOutputsHaveUniqueIds(job);
@@ -817,7 +818,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
translator
.translate(
pipeline,
- (DataflowRunner) pipeline.getRunner(),
+ DataflowRunner.fromOptions(options),
Collections.<DataflowPackage>emptyList())
.getJob();
assertAllStepOutputsHaveUniqueIds(job);
@@ -1011,7 +1012,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
translator
.translate(
pipeline,
- (DataflowRunner) pipeline.getRunner(),
+ DataflowRunner.fromOptions(options),
Collections.<DataflowPackage>emptyList())
.getJob();
assertAllStepOutputsHaveUniqueIds(job);
http://git-wip-us.apache.org/repos/asf/beam/blob/d41fe1df/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index a788077..a4031d1 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -951,7 +951,7 @@ public class DataflowRunnerTest {
thrown.expectMessage(Matchers.containsString("no translator registered"));
DataflowPipelineTranslator.fromOptions(options)
.translate(
- p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList());
+ p, DataflowRunner.fromOptions(options), Collections.<DataflowPackage>emptyList());
ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
Mockito.verify(mockJobs).create(eq(PROJECT_ID), eq(REGION_ID), jobCaptor.capture());
@@ -989,7 +989,7 @@ public class DataflowRunnerTest {
});
translator.translate(
- p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList());
+ p, DataflowRunner.fromOptions(options), Collections.<DataflowPackage>emptyList());
assertTrue(transform.translated);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/d41fe1df/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
index 1e906d2..d3eccbb 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
@@ -140,7 +140,7 @@ public class TestDataflowRunnerTest {
DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
- TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+ TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
when(request.execute()).thenReturn(generateMockMetricResponse(true /* success */,
true /* tentative */, null /* additionalMetrics */));
assertEquals(mockJob, runner.run(p, mockRunner));
@@ -160,7 +160,7 @@ public class TestDataflowRunnerTest {
DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
- TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+ TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
when(request.execute()).thenReturn(generateMockMetricResponse(false /* success */,
false /* tentative */, null /* additionalMetrics */));
try {
@@ -202,7 +202,7 @@ public class TestDataflowRunnerTest {
when(request.execute()).thenReturn(generateMockMetricResponse(false /* success */,
true /* tentative */, null /* additionalMetrics */));
- TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+ TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
try {
runner.run(p, mockRunner);
} catch (AssertionError expected) {
@@ -233,7 +233,7 @@ public class TestDataflowRunnerTest {
when(request.execute())
.thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */,
ImmutableMap.of(WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK)));
- TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+ TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
runner.run(p, mockRunner);
}
@@ -254,7 +254,7 @@ public class TestDataflowRunnerTest {
when(request.execute())
.thenReturn(generateMockStreamingMetricResponse(
ImmutableMap.of(WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK)));
- TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+ TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
runner.run(p, mockRunner);
}
@@ -275,7 +275,7 @@ public class TestDataflowRunnerTest {
when(request.execute()).thenReturn(generateMockMetricResponse(false /* success */,
true /* tentative */, null /* additionalMetrics */));
- TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+ TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
try {
runner.run(p, mockRunner);
} catch (AssertionError expected) {
@@ -350,7 +350,7 @@ public class TestDataflowRunnerTest {
PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
PAssert.that(pc).containsInAnyOrder(1, 2, 3);
- TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+ TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
doReturn(State.DONE).when(job).getState();
JobMetrics metrics = buildJobMetrics(
generateMockMetrics(true /* success */, true /* tentative */));
@@ -364,7 +364,7 @@ public class TestDataflowRunnerTest {
PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
PAssert.that(pc).containsInAnyOrder(1, 2, 3);
- TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+ TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
doReturn(State.DONE).when(job).getState();
JobMetrics metrics = buildJobMetrics(
generateMockMetrics(false /* success */, true /* tentative */));
@@ -392,7 +392,7 @@ public class TestDataflowRunnerTest {
Pipeline p = TestPipeline.create(options);
p.apply(Create.of(1, 2, 3));
- TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+ TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics(
ImmutableMap.of("no-watermark", new BigDecimal(100))));
doReturn(State.RUNNING).when(job).getState();
@@ -405,7 +405,7 @@ public class TestDataflowRunnerTest {
Pipeline p = TestPipeline.create(options);
p.apply(Create.of(1, 2, 3));
- TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+ TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics(
ImmutableMap.of(WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK)));
doReturn(State.RUNNING).when(job).getState();
@@ -418,7 +418,7 @@ public class TestDataflowRunnerTest {
Pipeline p = TestPipeline.create(options);
p.apply(Create.of(1, 2, 3));
- TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+ TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics(
ImmutableMap.of(LEGACY_WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK)));
doReturn(State.RUNNING).when(job).getState();
@@ -431,7 +431,7 @@ public class TestDataflowRunnerTest {
Pipeline p = TestPipeline.create(options);
p.apply(Create.of(1, 2, 3));
- TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+ TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics
(ImmutableMap.of(WATERMARK_METRIC_SUFFIX, new BigDecimal(100))));
doReturn(State.RUNNING).when(job).getState();
@@ -444,7 +444,7 @@ public class TestDataflowRunnerTest {
Pipeline p = TestPipeline.create(options);
p.apply(Create.of(1, 2, 3));
- TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+ TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics(
ImmutableMap.of("one" + WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK,
"two" + WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK)));
@@ -458,7 +458,7 @@ public class TestDataflowRunnerTest {
Pipeline p = TestPipeline.create(options);
p.apply(Create.of(1, 2, 3));
- TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+ TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics(
ImmutableMap.of("one" + WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK,
"two" + WATERMARK_METRIC_SUFFIX, new BigDecimal(100))));
@@ -472,7 +472,7 @@ public class TestDataflowRunnerTest {
Pipeline p = TestPipeline.create(options);
p.apply(Create.of(1, 2, 3));
- TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+ TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics(
ImmutableMap.of("one" + WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK,
"no-watermark", new BigDecimal(100))));
@@ -487,7 +487,7 @@ public class TestDataflowRunnerTest {
PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
PAssert.that(pc).containsInAnyOrder(1, 2, 3);
- TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+ TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
doReturn(State.FAILED).when(job).getState();
assertEquals(Optional.of(false), runner.checkForPAssertSuccess(job, null /* metrics */));
}
@@ -522,7 +522,7 @@ public class TestDataflowRunnerTest {
when(request.execute()).thenReturn(generateMockMetricResponse(false /* success */,
true /* tentative */, null /* additionalMetrics */));
- TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+ TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
try {
runner.run(p, mockRunner);
} catch (AssertionError expected) {
@@ -543,7 +543,7 @@ public class TestDataflowRunnerTest {
when(request.execute()).thenReturn(generateMockMetricResponse(true /* success */,
true /* tentative */, null /* additionalMetrics */));
- TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+ TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
JobMetrics metrics = runner.getJobMetrics(job);
assertEquals(1, metrics.getMetrics().size());
@@ -558,7 +558,7 @@ public class TestDataflowRunnerTest {
p.apply(Create.of(1, 2, 3));
when(request.execute()).thenThrow(new IOException());
- TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+ TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
assertNull(runner.getJobMetrics(job));
}
@@ -576,7 +576,7 @@ public class TestDataflowRunnerTest {
DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
- TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+ TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
p.getOptions().as(TestPipelineOptions.class)
.setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0));
@@ -600,7 +600,7 @@ public class TestDataflowRunnerTest {
DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
- TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+ TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
p.getOptions().as(TestPipelineOptions.class)
.setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0));
@@ -627,7 +627,7 @@ public class TestDataflowRunnerTest {
DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
- TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+ TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
p.getOptions().as(TestPipelineOptions.class)
.setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1));
@@ -651,7 +651,7 @@ public class TestDataflowRunnerTest {
DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
- TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+ TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
p.getOptions().as(TestPipelineOptions.class)
.setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1));
@@ -678,7 +678,7 @@ public class TestDataflowRunnerTest {
DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
- TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+ TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
p.getOptions().as(TestPipelineOptions.class)
.setOnSuccessMatcher(new TestFailureMatcher());
@@ -709,7 +709,7 @@ public class TestDataflowRunnerTest {
DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
- TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+ TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
p.getOptions().as(TestPipelineOptions.class)
.setOnSuccessMatcher(new TestFailureMatcher());
http://git-wip-us.apache.org/repos/asf/beam/blob/d41fe1df/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
index f09f2b4..2f368b1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
@@ -456,13 +456,6 @@ public class Pipeline {
}
/**
- * Returns the configured {@link PipelineRunner}.
- */
- public PipelineRunner<?> getRunner() {
- return runner;
- }
-
- /**
* Returns the configured {@link PipelineOptions}.
*
* @deprecated see BEAM-818 Remove Pipeline.getPipelineOptions. Configuration should be explicitly