You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/03/23 22:46:37 UTC
[1/2] beam git commit: This closes #2276
Repository: beam
Updated Branches:
refs/heads/master 890bc1a23 -> 87c8ef049
This closes #2276
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/87c8ef04
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/87c8ef04
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/87c8ef04
Branch: refs/heads/master
Commit: 87c8ef049c6a6cde23bb6ed0948cbe8196e000cf
Parents: 890bc1a 28a840d
Author: Thomas Groh <tg...@google.com>
Authored: Thu Mar 23 15:46:23 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Thu Mar 23 15:46:23 2017 -0700
----------------------------------------------------------------------
.../beam/runners/dataflow/DataflowRunner.java | 37 ++++++++++++++------
.../runners/dataflow/DataflowRunnerTest.java | 32 -----------------
2 files changed, 27 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
[2/2] beam git commit: Remove need for Streaming Flag in Dataflow
Posted by tg...@apache.org.
Remove need for Streaming Flag in Dataflow
Autodetect if streaming is required, and if so run in Streaming.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/28a840d9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/28a840d9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/28a840d9
Branch: refs/heads/master
Commit: 28a840d9537bc4be8adeac3ab1096b93b4f463aa
Parents: 890bc1a
Author: Thomas Groh <tg...@google.com>
Authored: Mon Mar 20 14:40:05 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Thu Mar 23 15:46:23 2017 -0700
----------------------------------------------------------------------
.../beam/runners/dataflow/DataflowRunner.java | 37 ++++++++++++++------
.../runners/dataflow/DataflowRunnerTest.java | 32 -----------------
2 files changed, 27 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/28a840d9/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 82e9db0..e2ab5c2 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -96,6 +96,7 @@ import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Combine.GroupedValues;
@@ -160,9 +161,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
/** Translator for this DataflowRunner, based on options. */
private final DataflowPipelineTranslator translator;
- /** Custom transforms implementations. */
- private final ImmutableMap<PTransformMatcher, PTransformOverrideFactory> overrides;
-
/** A set of user defined functions to invoke at different points in execution. */
private DataflowRunnerHooks hooks;
@@ -289,13 +287,15 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
this.translator = DataflowPipelineTranslator.fromOptions(options);
this.pcollectionsRequiringIndexedFormat = new HashSet<>();
this.ptransformViewsWithNonDeterministicKeyCoders = new HashSet<>();
+ }
+ private Map<PTransformMatcher, PTransformOverrideFactory> getOverrides(boolean streaming) {
ImmutableMap.Builder<PTransformMatcher, PTransformOverrideFactory> ptoverrides =
ImmutableMap.builder();
// Create is implemented in terms of a Read, so it must precede the override to Read in
// streaming
ptoverrides.put(PTransformMatchers.emptyFlatten(), EmptyFlattenAsCreateFactory.instance());
- if (options.isStreaming()) {
+ if (streaming) {
// In streaming mode must use either the custom Pubsub unbounded source/sink or
// defer to Windmill's built-in implementation.
for (Class<? extends DoFn> unsupported :
@@ -334,10 +334,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
PTransformMatchers.classEqualTo(unsupported),
UnsupportedOverrideFactory.withMessage(getUnsupportedMessage(unsupported, false)));
}
- ptoverrides.put(
- PTransformMatchers.classEqualTo(Read.Unbounded.class),
- UnsupportedOverrideFactory.withMessage(
- "The DataflowRunner in batch mode does not support Read.Unbounded"));
ptoverrides
// State and timer pardos are implemented by expansion to GBK-then-ParDo
.put(PTransformMatchers.stateOrTimerParDoMulti(),
@@ -372,7 +368,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
PTransformMatchers.classEqualTo(Combine.GroupedValues.class),
new PrimitiveCombineGroupedValuesOverrideFactory())
.put(PTransformMatchers.classEqualTo(ParDo.Bound.class), new PrimitiveParDoSingleFactory());
- overrides = ptoverrides.build();
+ return ptoverrides.build();
}
private String getUnsupportedMessage(Class<?> unsupported, boolean streaming) {
@@ -485,6 +481,9 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
@Override
public DataflowPipelineJob run(Pipeline pipeline) {
logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline);
+ if (containsUnboundedPCollection(pipeline)) {
+ options.setStreaming(true);
+ }
replaceTransforms(pipeline);
LOG.info("Executing pipeline on the Dataflow Service, which will have billing implications "
@@ -674,11 +673,29 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
@VisibleForTesting
void replaceTransforms(Pipeline pipeline) {
- for (Map.Entry<PTransformMatcher, PTransformOverrideFactory> override : overrides.entrySet()) {
+ boolean streaming = options.isStreaming() || containsUnboundedPCollection(pipeline);
+ for (Map.Entry<PTransformMatcher, PTransformOverrideFactory> override :
+ getOverrides(streaming).entrySet()) {
pipeline.replace(PTransformOverride.of(override.getKey(), override.getValue()));
}
}
+ private boolean containsUnboundedPCollection(Pipeline p) {
+ class BoundednessVisitor extends PipelineVisitor.Defaults {
+ IsBounded boundedness = IsBounded.BOUNDED;
+
+ @Override
+ public void visitValue(PValue value, Node producer) {
+ if (value instanceof PCollection) {
+ boundedness = boundedness.and(((PCollection) value).isBounded());
+ }
+ }
+ }
+ BoundednessVisitor visitor = new BoundednessVisitor();
+ p.traverseTopologically(visitor);
+ return visitor.boundedness == IsBounded.UNBOUNDED;
+ };
+
/**
* Returns the DataflowPipelineTranslator associated with this object.
*/
http://git-wip-us.apache.org/repos/asf/beam/blob/28a840d9/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 1509210..0735b5c 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -63,14 +63,12 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.runners.TransformHierarchy;
-import org.apache.beam.sdk.runners.dataflow.TestCountingSource;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
@@ -82,7 +80,6 @@ import org.apache.beam.sdk.util.ReleaseInfo;
import org.apache.beam.sdk.util.TestCredential;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.gcsfs.GcsPath;
-import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.hamcrest.Description;
@@ -1049,35 +1046,6 @@ public class DataflowRunnerTest {
DataflowRunner.fromOptions(options).toString());
}
- private static PipelineOptions makeOptions(boolean streaming) {
- DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
- options.setRunner(DataflowRunner.class);
- options.setStreaming(streaming);
- options.setJobName("TestJobName");
- options.setProject("test-project");
- options.setTempLocation("gs://test/temp/location");
- options.setGcpCredential(new TestCredential());
- options.setPathValidatorClass(NoopPathValidator.class);
- return options;
- }
-
- private void testUnsupportedSource(PTransform<PBegin, ?> source, String name, boolean streaming)
- throws Exception {
- String mode = streaming ? "streaming" : "batch";
- thrown.expect(UnsupportedOperationException.class);
- thrown.expectMessage(
- "The DataflowRunner in " + mode + " mode does not support " + name);
-
- Pipeline p = Pipeline.create(makeOptions(streaming));
- p.apply(source);
- p.run();
- }
-
- @Test
- public void testReadUnboundedUnsupportedInBatch() throws Exception {
- testUnsupportedSource(Read.from(new TestCountingSource(1)), "Read.Unbounded", false);
- }
-
/**
* Tests that the {@link DataflowRunner} with {@code --templateLocation} returns normally
* when the runner issuccessfully run.