You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/03/23 22:46:38 UTC

[2/2] beam git commit: Remove need for Streaming Flag in Dataflow

Remove need for Streaming Flag in Dataflow

Autodetect if streaming is required, and if so run in Streaming.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/28a840d9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/28a840d9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/28a840d9

Branch: refs/heads/master
Commit: 28a840d9537bc4be8adeac3ab1096b93b4f463aa
Parents: 890bc1a
Author: Thomas Groh <tg...@google.com>
Authored: Mon Mar 20 14:40:05 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Thu Mar 23 15:46:23 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowRunner.java   | 37 ++++++++++++++------
 .../runners/dataflow/DataflowRunnerTest.java    | 32 -----------------
 2 files changed, 27 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/28a840d9/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 82e9db0..e2ab5c2 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -96,6 +96,7 @@ import org.apache.beam.sdk.runners.PTransformOverride;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Combine.GroupedValues;
@@ -160,9 +161,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
   /** Translator for this DataflowRunner, based on options. */
   private final DataflowPipelineTranslator translator;
 
-  /** Custom transforms implementations. */
-  private final ImmutableMap<PTransformMatcher, PTransformOverrideFactory> overrides;
-
   /** A set of user defined functions to invoke at different points in execution. */
   private DataflowRunnerHooks hooks;
 
@@ -289,13 +287,15 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     this.translator = DataflowPipelineTranslator.fromOptions(options);
     this.pcollectionsRequiringIndexedFormat = new HashSet<>();
     this.ptransformViewsWithNonDeterministicKeyCoders = new HashSet<>();
+  }
 
+  private Map<PTransformMatcher, PTransformOverrideFactory> getOverrides(boolean streaming) {
     ImmutableMap.Builder<PTransformMatcher, PTransformOverrideFactory> ptoverrides =
         ImmutableMap.builder();
     // Create is implemented in terms of a Read, so it must precede the override to Read in
     // streaming
     ptoverrides.put(PTransformMatchers.emptyFlatten(), EmptyFlattenAsCreateFactory.instance());
-    if (options.isStreaming()) {
+    if (streaming) {
       // In streaming mode must use either the custom Pubsub unbounded source/sink or
       // defer to Windmill's built-in implementation.
       for (Class<? extends DoFn> unsupported :
@@ -334,10 +334,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
             PTransformMatchers.classEqualTo(unsupported),
             UnsupportedOverrideFactory.withMessage(getUnsupportedMessage(unsupported, false)));
       }
-      ptoverrides.put(
-          PTransformMatchers.classEqualTo(Read.Unbounded.class),
-          UnsupportedOverrideFactory.withMessage(
-              "The DataflowRunner in batch mode does not support Read.Unbounded"));
       ptoverrides
           // State and timer pardos are implemented by expansion to GBK-then-ParDo
           .put(PTransformMatchers.stateOrTimerParDoMulti(),
@@ -372,7 +368,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
             PTransformMatchers.classEqualTo(Combine.GroupedValues.class),
             new PrimitiveCombineGroupedValuesOverrideFactory())
         .put(PTransformMatchers.classEqualTo(ParDo.Bound.class), new PrimitiveParDoSingleFactory());
-    overrides = ptoverrides.build();
+    return ptoverrides.build();
   }
 
   private String getUnsupportedMessage(Class<?> unsupported, boolean streaming) {
@@ -485,6 +481,9 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
   @Override
   public DataflowPipelineJob run(Pipeline pipeline) {
     logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline);
+    if (containsUnboundedPCollection(pipeline)) {
+      options.setStreaming(true);
+    }
     replaceTransforms(pipeline);
 
     LOG.info("Executing pipeline on the Dataflow Service, which will have billing implications "
@@ -674,11 +673,29 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
 
   @VisibleForTesting
   void replaceTransforms(Pipeline pipeline) {
-    for (Map.Entry<PTransformMatcher, PTransformOverrideFactory> override : overrides.entrySet()) {
+    boolean streaming = options.isStreaming() || containsUnboundedPCollection(pipeline);
+    for (Map.Entry<PTransformMatcher, PTransformOverrideFactory> override :
+        getOverrides(streaming).entrySet()) {
       pipeline.replace(PTransformOverride.of(override.getKey(), override.getValue()));
     }
   }
 
+  private boolean containsUnboundedPCollection(Pipeline p) {
+    class BoundednessVisitor extends PipelineVisitor.Defaults {
+      IsBounded boundedness = IsBounded.BOUNDED;
+
+      @Override
+      public void visitValue(PValue value, Node producer) {
+        if (value instanceof PCollection) {
+          boundedness = boundedness.and(((PCollection) value).isBounded());
+        }
+      }
+    }
+    BoundednessVisitor visitor = new BoundednessVisitor();
+    p.traverseTopologically(visitor);
+    return visitor.boundedness == IsBounded.UNBOUNDED;
+  };
+
   /**
    * Returns the DataflowPipelineTranslator associated with this object.
    */

http://git-wip-us.apache.org/repos/asf/beam/blob/28a840d9/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 1509210..0735b5c 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -63,14 +63,12 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.runners.TransformHierarchy;
-import org.apache.beam.sdk.runners.dataflow.TestCountingSource;
 import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
@@ -82,7 +80,6 @@ import org.apache.beam.sdk.util.ReleaseInfo;
 import org.apache.beam.sdk.util.TestCredential;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.gcsfs.GcsPath;
-import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.hamcrest.Description;
@@ -1049,35 +1046,6 @@ public class DataflowRunnerTest {
         DataflowRunner.fromOptions(options).toString());
   }
 
-  private static PipelineOptions makeOptions(boolean streaming) {
-    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    options.setRunner(DataflowRunner.class);
-    options.setStreaming(streaming);
-    options.setJobName("TestJobName");
-    options.setProject("test-project");
-    options.setTempLocation("gs://test/temp/location");
-    options.setGcpCredential(new TestCredential());
-    options.setPathValidatorClass(NoopPathValidator.class);
-    return options;
-  }
-
-  private void testUnsupportedSource(PTransform<PBegin, ?> source, String name, boolean streaming)
-      throws Exception {
-    String mode = streaming ? "streaming" : "batch";
-    thrown.expect(UnsupportedOperationException.class);
-    thrown.expectMessage(
-        "The DataflowRunner in " + mode + " mode does not support " + name);
-
-    Pipeline p = Pipeline.create(makeOptions(streaming));
-    p.apply(source);
-    p.run();
-  }
-
-  @Test
-  public void testReadUnboundedUnsupportedInBatch() throws Exception {
-    testUnsupportedSource(Read.from(new TestCountingSource(1)), "Read.Unbounded", false);
-  }
-
   /**
    * Tests that the {@link DataflowRunner} with {@code --templateLocation} returns normally
    * when the runner issuccessfully run.