You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/06/20 22:16:25 UTC
[32/50] [abbrv] incubator-beam git commit: Finish removing
DirectPipelineRunner references
Finish removing DirectPipelineRunner references
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/340d0984
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/340d0984
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/340d0984
Branch: refs/heads/python-sdk
Commit: 340d09845959340f73577512437ebe0939bdeff9
Parents: 09bf9b3
Author: Thomas Groh <tg...@google.com>
Authored: Fri Jun 17 13:22:26 2016 -0700
Committer: Davor Bonaci <da...@google.com>
Committed: Mon Jun 20 15:14:30 2016 -0700
----------------------------------------------------------------------
.../examples/common/DataflowExampleUtils.java | 4 +-
.../examples/cookbook/DatastoreWordCount.java | 4 +-
.../translation/TransformTranslatorTest.java | 2 +-
.../java/org/apache/beam/sdk/io/PubsubIO.java | 2 +-
.../java/org/apache/beam/sdk/io/TextIO.java | 2 +-
.../beam/sdk/options/DirectPipelineOptions.java | 74 --------------------
.../beam/sdk/options/PipelineOptions.java | 7 +-
.../beam/sdk/util/BigQueryTableRowIterator.java | 2 +-
.../apache/beam/sdk/util/DoFnRunnerBase.java | 2 +-
.../beam/sdk/runners/PipelineRunnerTest.java | 24 ++++---
.../main/java/common/DataflowExampleUtils.java | 2 +-
.../src/main/java/StarterPipeline.java | 2 +-
.../src/main/java/it/pkg/StarterPipeline.java | 2 +-
13 files changed, 28 insertions(+), 101 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/340d0984/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java b/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java
index 5b1af6d..46b8af3 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java
@@ -315,7 +315,7 @@ public class DataflowExampleUtils {
}
/**
- * Do some runner setup: check that the DirectPipelineRunner is not used in conjunction with
+ * Do some runner setup: check that the DirectRunner is not used in conjunction with
* streaming, and if streaming is specified, use the DataflowPipelineRunner.
*/
public void setupRunner() {
@@ -413,7 +413,7 @@ public class DataflowExampleUtils {
}
} else {
// Do nothing if the given PipelineResult doesn't support waitToFinish(),
- // such as EvaluationResults returned by DirectPipelineRunner.
+ // such as EvaluationResults returned by DirectRunner.
tearDown();
printPendingMessages();
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/340d0984/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java
index bfaecdf..2d1f88c 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java
@@ -193,7 +193,7 @@ public class DatastoreWordCount {
/**
* An example that creates a pipeline to populate DatastoreIO from a
- * text input. Forces use of DirectPipelineRunner for local execution mode.
+ * text input. Forces use of DirectRunner for local execution mode.
*/
public static void writeDataToDatastore(Options options) {
Pipeline p = Pipeline.create(options);
@@ -247,7 +247,7 @@ public class DatastoreWordCount {
/**
* An example to demo how to use {@link DatastoreIO}. The runner here is
- * customizable, which means users could pass either {@code DirectPipelineRunner}
+ * customizable, which means users could pass either {@code DirectRunner}
* or {@code DataflowPipelineRunner} in the pipeline options.
*/
public static void main(String args[]) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/340d0984/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
index 5fdfb49..b593316 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
@@ -53,7 +53,7 @@ public class TransformTranslatorTest {
/**
* Builds a simple pipeline with TextIO.Read and TextIO.Write, runs the pipeline
- * in DirectPipelineRunner and on SparkPipelineRunner, with the mapped dataflow-to-spark
+ * in DirectRunner and on SparkPipelineRunner, with the mapped dataflow-to-spark
* transforms. Finally it makes sure that the results are the same for both runs.
*/
@Test
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/340d0984/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
index 2a5698c..c6de8b4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
@@ -364,7 +364,7 @@ public class PubsubIO {
* the stream.
*
* <p>When running with a {@link PipelineRunner} that only supports bounded
- * {@link PCollection PCollections} (such as {@link DirectPipelineRunner}),
+ * {@link PCollection PCollections} (such as {@link DirectRunner}),
* only a bounded portion of the input Pub/Sub stream can be processed. As such, either
* {@link Bound#maxNumRecords(int)} or {@link Bound#maxReadTime(Duration)} must be set.
*/
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/340d0984/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index bbef072..dc50a8c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -111,7 +111,7 @@ import javax.annotation.Nullable;
* }</pre>
*
* <h3>Permissions</h3>
- * <p>When run using the {@link DirectPipelineRunner}, your pipeline can read and write text files
+ * <p>When run using the {@link DirectRunner}, your pipeline can read and write text files
* on your local drive and remote text files on Google Cloud Storage that you have access to using
* your {@code gcloud} credentials. When running in the Dataflow service, the pipeline can only
* read and write files from GCS. For more information about permissions, see the Cloud Dataflow
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/340d0984/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DirectPipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DirectPipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DirectPipelineOptions.java
deleted file mode 100644
index c2095e3..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DirectPipelineOptions.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.options;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.values.PCollection;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-
-/**
- * Options that can be used to configure the {@link DirectPipelineRunner}.
- */
-public interface DirectPipelineOptions
- extends ApplicationNameOptions, BigQueryOptions, GcsOptions, GcpOptions, PipelineOptions,
- StreamingOptions {
-
- /**
- * The random seed to use for pseudorandom behaviors in the {@link DirectPipelineRunner}.
- * If not explicitly specified, a random seed will be generated.
- */
- @JsonIgnore
- @Description("The random seed to use for pseudorandom behaviors in the DirectPipelineRunner."
- + " If not explicitly specified, a random seed will be generated.")
- Long getDirectPipelineRunnerRandomSeed();
- void setDirectPipelineRunnerRandomSeed(Long value);
-
- /**
- * Controls whether the runner should ensure that all of the elements of
- * the pipeline, such as DoFns, can be serialized.
- */
- @JsonIgnore
- @Description("Controls whether the runner should ensure that all of the elements of the "
- + "pipeline, such as DoFns, can be serialized.")
- @Default.Boolean(true)
- boolean isTestSerializability();
- void setTestSerializability(boolean testSerializability);
-
- /**
- * Controls whether the runner should ensure that all of the elements of
- * every {@link PCollection} can be encoded using the appropriate
- * {@link Coder}.
- */
- @JsonIgnore
- @Description("Controls whether the runner should ensure that all of the elements of every "
- + "PCollection can be encoded using the appropriate Coder.")
- @Default.Boolean(true)
- boolean isTestEncodability();
- void setTestEncodability(boolean testEncodability);
-
- /**
- * Controls whether the runner should randomize the order of each
- * {@link PCollection}.
- */
- @JsonIgnore
- @Description("Controls whether the runner should randomize the order of each PCollection.")
- @Default.Boolean(true)
- boolean isTestUnorderedness();
- void setTestUnorderedness(boolean testUnorderedness);
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/340d0984/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
index 456b6ae..e89e5ad 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
@@ -70,10 +70,9 @@ import javax.annotation.concurrent.ThreadSafe;
* p.run();
* }
*
- * // To create options for the DirectPipeline:
- * DirectPipelineOptions directPipelineOptions =
- * PipelineOptionsFactory.as(DirectPipelineOptions.class);
- * directPipelineOptions.setStreaming(true);
+ * // To create options for the DirectRunner:
+ * DirectOptions directRunnerOptions =
+ * PipelineOptionsFactory.as(DirectOptions.class);
*
* // To cast from one type to another using the as(Class) method:
* DataflowPipelineOptions dataflowPipelineOptions =
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/340d0984/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableRowIterator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableRowIterator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableRowIterator.java
index ca1ac69..ad41a3f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableRowIterator.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableRowIterator.java
@@ -322,7 +322,7 @@ public class BigQueryTableRowIterator implements AutoCloseable {
if (convertedValue == null) {
// BigQuery does not include null values when the export operation (to JSON) is used.
- // To match that behavior, BigQueryTableRowiterator, and the DirectPipelineRunner,
+ // To match that behavior, BigQueryTableRowiterator, and the DirectRunner,
// intentionally omits columns with null values.
continue;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/340d0984/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
index 58b10a7..1ebe72b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
@@ -91,7 +91,7 @@ public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<Inpu
/**
* An implementation of {@code OutputManager} using simple lists, for testing and in-memory
- * contexts such as the {@link DirectPipelineRunner}.
+ * contexts such as the {@link DirectRunner}.
*/
public static class ListOutputManager implements OutputManager {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/340d0984/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java
index 5d2e69d..fb8bb72 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java
@@ -20,7 +20,9 @@ package org.apache.beam.sdk.runners;
import static org.junit.Assert.assertTrue;
import org.apache.beam.sdk.options.ApplicationNameOptions;
-import org.apache.beam.sdk.options.DirectPipelineOptions;
+import org.apache.beam.sdk.options.GcpOptions;
+import org.apache.beam.sdk.options.GcsOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.CrashingRunner;
import org.apache.beam.sdk.util.GcsUtil;
@@ -50,12 +52,12 @@ public class PipelineRunnerTest {
@Test
public void testLongName() {
// Check we can create a pipeline runner using the full class name.
- DirectPipelineOptions options = PipelineOptionsFactory.as(DirectPipelineOptions.class);
- options.setAppName("test");
- options.setProject("test");
- options.setGcsUtil(mockGcsUtil);
+ PipelineOptions options = PipelineOptionsFactory.create();
+ options.as(ApplicationNameOptions.class).setAppName("test");
+ options.as(GcpOptions.class).setProject("test");
+ options.as(GcsOptions.class).setGcsUtil(mockGcsUtil);
options.setRunner(CrashingRunner.class);
- options.setGcpCredential(new TestCredential());
+ options.as(GcpOptions.class).setGcpCredential(new TestCredential());
PipelineRunner<?> runner = PipelineRunner.fromOptions(options);
assertTrue(runner instanceof CrashingRunner);
}
@@ -63,12 +65,12 @@ public class PipelineRunnerTest {
@Test
public void testShortName() {
// Check we can create a pipeline runner using the short class name.
- DirectPipelineOptions options = PipelineOptionsFactory.as(DirectPipelineOptions.class);
- options.setAppName("test");
- options.setProject("test");
- options.setGcsUtil(mockGcsUtil);
+ PipelineOptions options = PipelineOptionsFactory.create();
+ options.as(ApplicationNameOptions.class).setAppName("test");
+ options.as(GcpOptions.class).setProject("test");
+ options.as(GcsOptions.class).setGcsUtil(mockGcsUtil);
options.setRunner(CrashingRunner.class);
- options.setGcpCredential(new TestCredential());
+ options.as(GcpOptions.class).setGcpCredential(new TestCredential());
PipelineRunner<?> runner = PipelineRunner.fromOptions(options);
assertTrue(runner instanceof CrashingRunner);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/340d0984/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java
index 76df4d4..6ec4540 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java
@@ -315,7 +315,7 @@ public class DataflowExampleUtils {
}
} else {
// Do nothing if the given PipelineResult doesn't support waitToFinish(),
- // such as EvaluationResults returned by DirectPipelineRunner.
+ // such as EvaluationResults returned by DirectRunner.
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/340d0984/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java b/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java
index 2146b77..027431f 100644
--- a/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java
+++ b/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java
@@ -32,7 +32,7 @@ import org.slf4j.LoggerFactory;
* <p>The example takes two strings, converts them to their upper-case
* representation and logs them.
*
- * <p>To run this starter example locally using DirectPipelineRunner, just
+ * <p>To run this starter example locally using DirectRunner, just
* execute it without any additional parameters from your favorite development
* environment.
*
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/340d0984/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java b/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java
index 6cd27e7..bb86b0d 100644
--- a/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java
+++ b/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java
@@ -32,7 +32,7 @@ import org.slf4j.LoggerFactory;
* <p>The example takes two strings, converts them to their upper-case
* representation and logs them.
*
- * <p>To run this starter example locally using DirectPipelineRunner, just
+ * <p>To run this starter example locally using DirectRunner, just
* execute it without any additional parameters from your favorite development
* environment.
*