You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/04/15 19:21:15 UTC
[1/2] incubator-beam git commit: Remove the DataflowPipeline Class
Repository: incubator-beam
Updated Branches:
refs/heads/master 1eec6863f -> bcefff6a3
Remove the DataflowPipeline Class
Pipelines that run with the DataflowPipelineRunner should be created
using an appropriately constructed PipelineOptions (i.e. one with the
runner set to DataflowPipelineRunner.class)
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b1eee4bc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b1eee4bc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b1eee4bc
Branch: refs/heads/master
Commit: b1eee4bcca632eacf4a6dd724ed3eeb27ace0d77
Parents: 1eec686
Author: Thomas Groh <tg...@google.com>
Authored: Fri Apr 15 09:23:08 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri Apr 15 10:07:57 2016 -0700
----------------------------------------------------------------------
.../sdk/options/DataflowPipelineOptions.java | 14 +-
.../beam/sdk/runners/DataflowPipeline.java | 60 -------
.../sdk/runners/DataflowPipelineRegistrar.java | 4 +-
.../sdk/runners/DataflowPipelineRunnerTest.java | 37 +++--
.../beam/sdk/runners/DataflowPipelineTest.java | 45 ------
.../runners/DataflowPipelineTranslatorTest.java | 159 +++++++++++++------
6 files changed, 136 insertions(+), 183 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1eee4bc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineOptions.java
index 4eae85a..50fc956 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineOptions.java
@@ -17,7 +17,7 @@
*/
package org.apache.beam.sdk.options;
-import org.apache.beam.sdk.runners.DataflowPipeline;
+import org.apache.beam.sdk.runners.DataflowPipelineRunner;
import com.google.common.base.MoreObjects;
@@ -27,14 +27,14 @@ import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
/**
- * Options that can be used to configure the {@link DataflowPipeline}.
+ * Options that can be used to configure the {@link DataflowPipelineRunner}.
*/
@Description("Options that configure the Dataflow pipeline.")
-public interface DataflowPipelineOptions extends
- PipelineOptions, GcpOptions, ApplicationNameOptions, DataflowPipelineDebugOptions,
- DataflowPipelineWorkerPoolOptions, BigQueryOptions,
- GcsOptions, StreamingOptions, CloudDebuggerOptions, DataflowWorkerLoggingOptions,
- DataflowProfilingOptions, PubsubOptions {
+public interface DataflowPipelineOptions
+ extends PipelineOptions, GcpOptions, ApplicationNameOptions, DataflowPipelineDebugOptions,
+ DataflowPipelineWorkerPoolOptions, BigQueryOptions, GcsOptions, StreamingOptions,
+ CloudDebuggerOptions, DataflowWorkerLoggingOptions, DataflowProfilingOptions,
+ PubsubOptions {
@Description("Project id. Required when running a Dataflow in the cloud. "
+ "See https://cloud.google.com/storage/docs/projects for further details.")
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1eee4bc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipeline.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipeline.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipeline.java
deleted file mode 100644
index 4d91a38..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipeline.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.options.DataflowPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-
-/**
- * A {@link DataflowPipeline} is a {@link Pipeline} that returns a
- * {@link DataflowPipelineJob} when it is
- * {@link org.apache.beam.sdk.Pipeline#run()}.
- *
- * <p>This is not intended for use by users of Cloud Dataflow.
- * Instead, use {@link Pipeline#create(PipelineOptions)} to initialize a
- * {@link Pipeline}.
- */
-public class DataflowPipeline extends Pipeline {
-
- /**
- * Creates and returns a new {@link DataflowPipeline} instance for tests.
- */
- public static DataflowPipeline create(DataflowPipelineOptions options) {
- return new DataflowPipeline(options);
- }
-
- private DataflowPipeline(DataflowPipelineOptions options) {
- super(DataflowPipelineRunner.fromOptions(options), options);
- }
-
- @Override
- public DataflowPipelineJob run() {
- return (DataflowPipelineJob) super.run();
- }
-
- @Override
- public DataflowPipelineRunner getRunner() {
- return (DataflowPipelineRunner) super.getRunner();
- }
-
- @Override
- public String toString() {
- return "DataflowPipeline#" + getOptions().as(DataflowPipelineOptions.class).getJobName();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1eee4bc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineRegistrar.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineRegistrar.java
index 9333d7d..b0f72ed 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineRegistrar.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineRegistrar.java
@@ -26,8 +26,8 @@ import com.google.auto.service.AutoService;
import com.google.common.collect.ImmutableList;
/**
- * Contains the {@link PipelineOptionsRegistrar} and {@link PipelineRunnerRegistrar} for
- * the {@link DataflowPipeline}.
+ * Contains the {@link PipelineOptionsRegistrar} and {@link PipelineRunnerRegistrar} for the
+ * {@link DataflowPipelineRunner}.
*/
public class DataflowPipelineRegistrar {
private DataflowPipelineRegistrar() { }
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1eee4bc/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineRunnerTest.java
index 303acda..8b024fb 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineRunnerTest.java
@@ -125,7 +125,7 @@ import java.util.List;
import java.util.Map;
/**
- * Tests for DataflowPipelineRunner.
+ * Tests for the {@link DataflowPipelineRunner}.
*/
@RunWith(JUnit4.class)
public class DataflowPipelineRunnerTest {
@@ -143,9 +143,10 @@ public class DataflowPipelineRunnerTest {
assertNull(job.getCurrentState());
}
- private DataflowPipeline buildDataflowPipeline(DataflowPipelineOptions options) {
+ private Pipeline buildDataflowPipeline(DataflowPipelineOptions options) {
options.setStableUniqueNames(CheckEnabled.ERROR);
- DataflowPipeline p = DataflowPipeline.create(options);
+ options.setRunner(DataflowPipelineRunner.class);
+ Pipeline p = Pipeline.create(options);
p.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/object"))
.apply(TextIO.Write.named("WriteMyFile").to("gs://bucket/object"));
@@ -212,6 +213,7 @@ public class DataflowPipelineRunnerTest {
private DataflowPipelineOptions buildPipelineOptions(
ArgumentCaptor<Job> jobCaptor) throws IOException {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+ options.setRunner(DataflowPipelineRunner.class);
options.setProject(PROJECT_ID);
options.setTempLocation("gs://somebucket/some/path");
// Set FILES_PROPERTY to empty to prevent a default value calculated from classpath.
@@ -227,8 +229,8 @@ public class DataflowPipelineRunnerTest {
ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
DataflowPipelineOptions options = buildPipelineOptions(jobCaptor);
- DataflowPipeline p = buildDataflowPipeline(options);
- DataflowPipelineJob job = p.run();
+ Pipeline p = buildDataflowPipeline(options);
+ DataflowPipelineJob job = (DataflowPipelineJob) p.run();
assertEquals("newid", job.getJobId());
assertValidJob(jobCaptor.getValue());
}
@@ -246,7 +248,7 @@ public class DataflowPipelineRunnerTest {
resultJob.setClientRequestId("different_request_id");
when(mockRequest.execute()).thenReturn(resultJob);
- DataflowPipeline p = buildDataflowPipeline(options);
+ Pipeline p = buildDataflowPipeline(options);
try {
p.run();
fail("Expected DataflowJobAlreadyExistsException");
@@ -265,8 +267,8 @@ public class DataflowPipelineRunnerTest {
DataflowPipelineOptions options = buildPipelineOptions(jobCaptor);
options.setUpdate(true);
options.setJobName("oldJobName");
- DataflowPipeline p = buildDataflowPipeline(options);
- DataflowPipelineJob job = p.run();
+ Pipeline p = buildDataflowPipeline(options);
+ DataflowPipelineJob job = (DataflowPipelineJob) p.run();
assertEquals("newid", job.getJobId());
assertValidJob(jobCaptor.getValue());
}
@@ -279,7 +281,7 @@ public class DataflowPipelineRunnerTest {
DataflowPipelineOptions options = buildPipelineOptions();
options.setUpdate(true);
options.setJobName("badJobName");
- DataflowPipeline p = buildDataflowPipeline(options);
+ Pipeline p = buildDataflowPipeline(options);
p.run();
}
@@ -298,7 +300,7 @@ public class DataflowPipelineRunnerTest {
resultJob.setClientRequestId("different_request_id");
when(mockRequest.execute()).thenReturn(resultJob);
- DataflowPipeline p = buildDataflowPipeline(options);
+ Pipeline p = buildDataflowPipeline(options);
thrown.expect(DataflowJobAlreadyUpdatedException.class);
thrown.expect(new TypeSafeMatcher<DataflowJobAlreadyUpdatedException>() {
@@ -348,9 +350,9 @@ public class DataflowPipelineRunnerTest {
options.setGcsUtil(mockGcsUtil);
options.setGcpCredential(new TestCredential());
- DataflowPipeline p = buildDataflowPipeline(options);
+ Pipeline p = buildDataflowPipeline(options);
- DataflowPipelineJob job = p.run();
+ DataflowPipelineJob job = (DataflowPipelineJob) p.run();
assertEquals("newid", job.getJobId());
Job workflowJob = jobCaptor.getValue();
@@ -750,7 +752,7 @@ public class DataflowPipelineRunnerTest {
ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
DataflowPipelineOptions options = buildPipelineOptions(jobCaptor);
- DataflowPipeline p = DataflowPipeline.create(options);
+ Pipeline p = Pipeline.create(options);
p.apply(Create.of(Arrays.asList(1, 2, 3)))
.apply(new TestTransform());
@@ -758,7 +760,8 @@ public class DataflowPipelineRunnerTest {
thrown.expect(IllegalStateException.class);
thrown.expectMessage(Matchers.containsString("no translator registered"));
DataflowPipelineTranslator.fromOptions(options)
- .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList());
+ .translate(
+ p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList());
assertValidJob(jobCaptor.getValue());
}
@@ -766,7 +769,7 @@ public class DataflowPipelineRunnerTest {
public void testTransformTranslator() throws IOException {
// Test that we can provide a custom translation
DataflowPipelineOptions options = buildPipelineOptions();
- DataflowPipeline p = DataflowPipeline.create(options);
+ Pipeline p = Pipeline.create(options);
TestTransform transform = new TestTransform();
p.apply(Create.of(Arrays.asList(1, 2, 3)).withCoder(BigEndianIntegerCoder.of()))
@@ -793,7 +796,7 @@ public class DataflowPipelineRunnerTest {
});
translator.translate(
- p, p.getRunner(), Collections.<DataflowPackage>emptyList());
+ p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList());
assertTrue(transform.translated);
}
@@ -828,7 +831,7 @@ public class DataflowPipelineRunnerTest {
@Test
public void testApplyIsScopedToExactClass() throws IOException {
DataflowPipelineOptions options = buildPipelineOptions();
- DataflowPipeline p = DataflowPipeline.create(options);
+ Pipeline p = Pipeline.create(options);
Create.TimestampedValues<String> transform =
Create.timestamped(Arrays.asList(TimestampedValue.of("TestString", Instant.now())));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1eee4bc/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTest.java
deleted file mode 100644
index 947b599..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTest.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.beam.sdk.options.DataflowPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.util.NoopPathValidator;
-import org.apache.beam.sdk.util.TestCredential;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link DataflowPipeline}. */
-@RunWith(JUnit4.class)
-public class DataflowPipelineTest {
- @Test
- public void testToString() {
- DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
- options.setJobName("TestJobName");
- options.setProject("project-id");
- options.setTempLocation("gs://test/temp/location");
- options.setGcpCredential(new TestCredential());
- options.setPathValidatorClass(NoopPathValidator.class);
- assertEquals("DataflowPipeline#TestJobName",
- DataflowPipeline.create(options).toString());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1eee4bc/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java
index 1429e5a..7a3caa6 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java
@@ -123,8 +123,9 @@ public class DataflowPipelineTranslatorTest implements Serializable {
}
}
- private DataflowPipeline buildPipeline(DataflowPipelineOptions options) {
- DataflowPipeline p = DataflowPipeline.create(options);
+ private Pipeline buildPipeline(DataflowPipelineOptions options) {
+ options.setRunner(DataflowPipelineRunner.class);
+ Pipeline p = Pipeline.create(options);
p.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/object"))
.apply(TextIO.Write.named("WriteMyFile").to("gs://bucket/object"));
@@ -163,6 +164,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
when(mockGcsUtil.isGcsPatternSupported(anyString())).thenCallRealMethod();
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+ options.setRunner(DataflowPipelineRunner.class);
options.setGcpCredential(new TestCredential());
options.setJobName("some-job-name");
options.setProject("some-project");
@@ -178,11 +180,12 @@ public class DataflowPipelineTranslatorTest implements Serializable {
DataflowPipelineOptions options = buildPipelineOptions();
options.setRunner(DataflowPipelineRunner.class);
- DataflowPipeline p = buildPipeline(options);
+ Pipeline p = buildPipeline(options);
p.traverseTopologically(new RecordingPipelineVisitor());
Job job =
DataflowPipelineTranslator.fromOptions(options)
- .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList())
+ .translate(
+ p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
.getJob();
// Note that the contents of this materialized map may be changed by the act of reading an
@@ -212,11 +215,12 @@ public class DataflowPipelineTranslatorTest implements Serializable {
DataflowPipelineOptions options = buildPipelineOptions();
options.setNetwork(testNetwork);
- DataflowPipeline p = buildPipeline(options);
+ Pipeline p = buildPipeline(options);
p.traverseTopologically(new RecordingPipelineVisitor());
Job job =
DataflowPipelineTranslator.fromOptions(options)
- .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList())
+ .translate(
+ p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
.getJob();
assertEquals(1, job.getEnvironment().getWorkerPools().size());
@@ -228,11 +232,12 @@ public class DataflowPipelineTranslatorTest implements Serializable {
public void testNetworkConfigMissing() throws IOException {
DataflowPipelineOptions options = buildPipelineOptions();
- DataflowPipeline p = buildPipeline(options);
+ Pipeline p = buildPipeline(options);
p.traverseTopologically(new RecordingPipelineVisitor());
Job job =
DataflowPipelineTranslator.fromOptions(options)
- .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList())
+ .translate(
+ p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
.getJob();
assertEquals(1, job.getEnvironment().getWorkerPools().size());
@@ -246,11 +251,12 @@ public class DataflowPipelineTranslatorTest implements Serializable {
DataflowPipelineOptions options = buildPipelineOptions();
options.setSubnetwork(testSubnetwork);
- DataflowPipeline p = buildPipeline(options);
+ Pipeline p = buildPipeline(options);
p.traverseTopologically(new RecordingPipelineVisitor());
Job job =
DataflowPipelineTranslator.fromOptions(options)
- .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList())
+ .translate(
+ p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
.getJob();
assertEquals(1, job.getEnvironment().getWorkerPools().size());
@@ -262,11 +268,12 @@ public class DataflowPipelineTranslatorTest implements Serializable {
public void testSubnetworkConfigMissing() throws IOException {
DataflowPipelineOptions options = buildPipelineOptions();
- DataflowPipeline p = buildPipeline(options);
+ Pipeline p = buildPipeline(options);
p.traverseTopologically(new RecordingPipelineVisitor());
Job job =
DataflowPipelineTranslator.fromOptions(options)
- .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList())
+ .translate(
+ p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
.getJob();
assertEquals(1, job.getEnvironment().getWorkerPools().size());
@@ -277,11 +284,12 @@ public class DataflowPipelineTranslatorTest implements Serializable {
public void testScalingAlgorithmMissing() throws IOException {
DataflowPipelineOptions options = buildPipelineOptions();
- DataflowPipeline p = buildPipeline(options);
+ Pipeline p = buildPipeline(options);
p.traverseTopologically(new RecordingPipelineVisitor());
Job job =
DataflowPipelineTranslator.fromOptions(options)
- .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList())
+ .translate(
+ p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
.getJob();
assertEquals(1, job.getEnvironment().getWorkerPools().size());
@@ -312,11 +320,12 @@ public class DataflowPipelineTranslatorTest implements Serializable {
DataflowPipelineOptions options = buildPipelineOptions();
options.setAutoscalingAlgorithm(noScaling);
- DataflowPipeline p = buildPipeline(options);
+ Pipeline p = buildPipeline(options);
p.traverseTopologically(new RecordingPipelineVisitor());
Job job =
DataflowPipelineTranslator.fromOptions(options)
- .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList())
+ .translate(
+ p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
.getJob();
assertEquals(1, job.getEnvironment().getWorkerPools().size());
@@ -346,11 +355,12 @@ public class DataflowPipelineTranslatorTest implements Serializable {
options.setMaxNumWorkers(42);
options.setAutoscalingAlgorithm(noScaling);
- DataflowPipeline p = buildPipeline(options);
+ Pipeline p = buildPipeline(options);
p.traverseTopologically(new RecordingPipelineVisitor());
Job job =
DataflowPipelineTranslator.fromOptions(options)
- .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList())
+ .translate(
+ p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
.getJob();
assertEquals(1, job.getEnvironment().getWorkerPools().size());
@@ -379,11 +389,12 @@ public class DataflowPipelineTranslatorTest implements Serializable {
DataflowPipelineOptions options = buildPipelineOptions();
options.setZone(testZone);
- DataflowPipeline p = buildPipeline(options);
+ Pipeline p = buildPipeline(options);
p.traverseTopologically(new RecordingPipelineVisitor());
Job job =
DataflowPipelineTranslator.fromOptions(options)
- .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList())
+ .translate(
+ p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
.getJob();
assertEquals(1, job.getEnvironment().getWorkerPools().size());
@@ -398,11 +409,12 @@ public class DataflowPipelineTranslatorTest implements Serializable {
DataflowPipelineOptions options = buildPipelineOptions();
options.setWorkerMachineType(testMachineType);
- DataflowPipeline p = buildPipeline(options);
+ Pipeline p = buildPipeline(options);
p.traverseTopologically(new RecordingPipelineVisitor());
Job job =
DataflowPipelineTranslator.fromOptions(options)
- .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList())
+ .translate(
+ p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
.getJob();
assertEquals(1, job.getEnvironment().getWorkerPools().size());
@@ -418,11 +430,12 @@ public class DataflowPipelineTranslatorTest implements Serializable {
DataflowPipelineOptions options = buildPipelineOptions();
options.setDiskSizeGb(diskSizeGb);
- DataflowPipeline p = buildPipeline(options);
+ Pipeline p = buildPipeline(options);
p.traverseTopologically(new RecordingPipelineVisitor());
Job job =
DataflowPipelineTranslator.fromOptions(options)
- .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList())
+ .translate(
+ p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
.getJob();
assertEquals(1, job.getEnvironment().getWorkerPools().size());
@@ -442,13 +455,18 @@ public class DataflowPipelineTranslatorTest implements Serializable {
Step predefinedStep = createPredefinedStep();
// Create a pipeline that the predefined step will be embedded into
- DataflowPipeline pipeline = DataflowPipeline.create(options);
+ Pipeline pipeline = Pipeline.create(options);
pipeline.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/in"))
.apply(ParDo.of(new NoOpFn()))
.apply(new EmbeddedTransform(predefinedStep.clone()))
.apply(ParDo.of(new NoOpFn()));
- Job job = translator.translate(
- pipeline, pipeline.getRunner(), Collections.<DataflowPackage>emptyList()).getJob();
+ Job job =
+ translator
+ .translate(
+ pipeline,
+ (DataflowPipelineRunner) pipeline.getRunner(),
+ Collections.<DataflowPackage>emptyList())
+ .getJob();
List<Step> steps = job.getSteps();
assertEquals(4, steps.size());
@@ -491,13 +509,18 @@ public class DataflowPipelineTranslatorTest implements Serializable {
private static Step createPredefinedStep() throws Exception {
DataflowPipelineOptions options = buildPipelineOptions();
DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
- DataflowPipeline pipeline = DataflowPipeline.create(options);
+ Pipeline pipeline = Pipeline.create(options);
String stepName = "DoFn1";
pipeline.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/in"))
.apply(ParDo.of(new NoOpFn()).named(stepName))
.apply(TextIO.Write.named("WriteMyFile").to("gs://bucket/out"));
- Job job = translator.translate(
- pipeline, pipeline.getRunner(), Collections.<DataflowPackage>emptyList()).getJob();
+ Job job =
+ translator
+ .translate(
+ pipeline,
+ (DataflowPipelineRunner) pipeline.getRunner(),
+ Collections.<DataflowPackage>emptyList())
+ .getJob();
assertEquals(13, job.getSteps().size());
Step step = job.getSteps().get(1);
@@ -617,7 +640,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
@Test
public void testMultiGraphPipelineSerialization() throws IOException {
- DataflowPipeline p = DataflowPipeline.create(buildPipelineOptions());
+ Pipeline p = Pipeline.create(buildPipelineOptions());
PCollection<Integer> input = p.begin()
.apply(Create.of(1, 2, 3));
@@ -629,12 +652,13 @@ public class DataflowPipelineTranslatorTest implements Serializable {
PipelineOptionsFactory.as(DataflowPipelineOptions.class));
// Check that translation doesn't fail.
- t.translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList());
+ t.translate(
+ p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList());
}
@Test
public void testPartiallyBoundFailure() throws IOException {
- Pipeline p = DataflowPipeline.create(buildPipelineOptions());
+ Pipeline p = Pipeline.create(buildPipelineOptions());
PCollection<Integer> input = p.begin()
.apply(Create.of(1, 2, 3));
@@ -651,7 +675,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
@Test
public void testGoodWildcards() throws Exception {
DataflowPipelineOptions options = buildPipelineOptions();
- DataflowPipeline pipeline = DataflowPipeline.create(options);
+ Pipeline pipeline = Pipeline.create(options);
DataflowPipelineTranslator t = DataflowPipelineTranslator.fromOptions(options);
applyRead(pipeline, "gs://bucket/foo");
@@ -670,7 +694,10 @@ public class DataflowPipelineTranslatorTest implements Serializable {
applyRead(pipeline, "gs://bucket/foo[0-9]/baz");
// Check that translation doesn't fail.
- t.translate(pipeline, pipeline.getRunner(), Collections.<DataflowPackage>emptyList());
+ t.translate(
+ pipeline,
+ (DataflowPipelineRunner) pipeline.getRunner(),
+ Collections.<DataflowPackage>emptyList());
}
private void applyRead(Pipeline pipeline, String path) {
@@ -684,7 +711,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
@Test
public void testBadWildcardRecursive() throws Exception {
DataflowPipelineOptions options = buildPipelineOptions();
- DataflowPipeline pipeline = DataflowPipeline.create(options);
+ Pipeline pipeline = Pipeline.create(options);
DataflowPipelineTranslator t = DataflowPipelineTranslator.fromOptions(options);
pipeline.apply(TextIO.Read.from("gs://bucket/foo**/baz"));
@@ -693,7 +720,10 @@ public class DataflowPipelineTranslatorTest implements Serializable {
thrown.expectCause(Matchers.allOf(
instanceOf(IllegalArgumentException.class),
ThrowableMessageMatcher.hasMessage(containsString("Unsupported wildcard usage"))));
- t.translate(pipeline, pipeline.getRunner(), Collections.<DataflowPackage>emptyList());
+ t.translate(
+ pipeline,
+ (DataflowPipelineRunner) pipeline.getRunner(),
+ Collections.<DataflowPackage>emptyList());
}
@Test
@@ -706,11 +736,16 @@ public class DataflowPipelineTranslatorTest implements Serializable {
options.setExperiments(ImmutableList.of("disable_ism_side_input"));
DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
- DataflowPipeline pipeline = DataflowPipeline.create(options);
+ Pipeline pipeline = Pipeline.create(options);
pipeline.apply(Create.of(1))
.apply(View.<Integer>asSingleton());
- Job job = translator.translate(
- pipeline, pipeline.getRunner(), Collections.<DataflowPackage>emptyList()).getJob();
+ Job job =
+ translator
+ .translate(
+ pipeline,
+ (DataflowPipelineRunner) pipeline.getRunner(),
+ Collections.<DataflowPackage>emptyList())
+ .getJob();
List<Step> steps = job.getSteps();
assertEquals(2, steps.size());
@@ -733,11 +768,16 @@ public class DataflowPipelineTranslatorTest implements Serializable {
options.setExperiments(ImmutableList.of("disable_ism_side_input"));
DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
- DataflowPipeline pipeline = DataflowPipeline.create(options);
+ Pipeline pipeline = Pipeline.create(options);
pipeline.apply(Create.of(1, 2, 3))
.apply(View.<Integer>asIterable());
- Job job = translator.translate(
- pipeline, pipeline.getRunner(), Collections.<DataflowPackage>emptyList()).getJob();
+ Job job =
+ translator
+ .translate(
+ pipeline,
+ (DataflowPipelineRunner) pipeline.getRunner(),
+ Collections.<DataflowPackage>emptyList())
+ .getJob();
List<Step> steps = job.getSteps();
assertEquals(2, steps.size());
@@ -758,11 +798,16 @@ public class DataflowPipelineTranslatorTest implements Serializable {
DataflowPipelineOptions options = buildPipelineOptions();
DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
- DataflowPipeline pipeline = DataflowPipeline.create(options);
+ Pipeline pipeline = Pipeline.create(options);
pipeline.apply(Create.of(1))
.apply(View.<Integer>asSingleton());
- Job job = translator.translate(
- pipeline, pipeline.getRunner(), Collections.<DataflowPackage>emptyList()).getJob();
+ Job job =
+ translator
+ .translate(
+ pipeline,
+ (DataflowPipelineRunner) pipeline.getRunner(),
+ Collections.<DataflowPackage>emptyList())
+ .getJob();
List<Step> steps = job.getSteps();
assertEquals(5, steps.size());
@@ -786,11 +831,16 @@ public class DataflowPipelineTranslatorTest implements Serializable {
DataflowPipelineOptions options = buildPipelineOptions();
DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
- DataflowPipeline pipeline = DataflowPipeline.create(options);
+ Pipeline pipeline = Pipeline.create(options);
pipeline.apply(Create.of(1, 2, 3))
.apply(View.<Integer>asIterable());
- Job job = translator.translate(
- pipeline, pipeline.getRunner(), Collections.<DataflowPackage>emptyList()).getJob();
+ Job job =
+ translator
+ .translate(
+ pipeline,
+ (DataflowPipelineRunner) pipeline.getRunner(),
+ Collections.<DataflowPackage>emptyList())
+ .getJob();
List<Step> steps = job.getSteps();
assertEquals(3, steps.size());
@@ -810,7 +860,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
public void testStepDisplayData() throws Exception {
DataflowPipelineOptions options = buildPipelineOptions();
DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
- DataflowPipeline pipeline = DataflowPipeline.create(options);
+ Pipeline pipeline = Pipeline.create(options);
DoFn<Integer, Integer> fn1 = new DoFn<Integer, Integer>() {
@Override
@@ -845,8 +895,13 @@ public class DataflowPipelineTranslatorTest implements Serializable {
.apply(ParDo.of(fn1))
.apply(ParDo.of(fn2));
- Job job = translator.translate(
- pipeline, pipeline.getRunner(), Collections.<DataflowPackage>emptyList()).getJob();
+ Job job =
+ translator
+ .translate(
+ pipeline,
+ (DataflowPipelineRunner) pipeline.getRunner(),
+ Collections.<DataflowPackage>emptyList())
+ .getJob();
List<Step> steps = job.getSteps();
assertEquals(3, steps.size());
[2/2] incubator-beam git commit: This closes #186
Posted by lc...@apache.org.
This closes #186
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bcefff6a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bcefff6a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bcefff6a
Branch: refs/heads/master
Commit: bcefff6a3bedb0f114d614c27da12a4cc2b34f48
Parents: 1eec686 b1eee4b
Author: Luke Cwik <lc...@google.com>
Authored: Fri Apr 15 10:08:18 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri Apr 15 10:08:18 2016 -0700
----------------------------------------------------------------------
.../sdk/options/DataflowPipelineOptions.java | 14 +-
.../beam/sdk/runners/DataflowPipeline.java | 60 -------
.../sdk/runners/DataflowPipelineRegistrar.java | 4 +-
.../sdk/runners/DataflowPipelineRunnerTest.java | 37 +++--
.../beam/sdk/runners/DataflowPipelineTest.java | 45 ------
.../runners/DataflowPipelineTranslatorTest.java | 159 +++++++++++++------
6 files changed, 136 insertions(+), 183 deletions(-)
----------------------------------------------------------------------