You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/04/04 17:09:04 UTC

[2/2] beam git commit: Use Batch Replacement API in the Dataflow Runner

Use Batch Replacement API in the Dataflow Runner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7efee543
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7efee543
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7efee543

Branch: refs/heads/master
Commit: 7efee543abc6ec9c74578f454c4963912356ab1d
Parents: db3144b
Author: Thomas Groh <tg...@google.com>
Authored: Thu Mar 23 09:38:53 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Apr 4 10:08:53 2017 -0700

----------------------------------------------------------------------
 .../dataflow/DataflowPipelineTranslator.java    |   5 +-
 .../beam/runners/dataflow/DataflowRunner.java   | 169 +++++++++++--------
 .../main/java/org/apache/beam/sdk/Pipeline.java |   3 +-
 3 files changed, 102 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7efee543/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 9b80756..ab9df70 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -408,7 +408,10 @@ public class DataflowPipelineTranslator {
       PTransform<?, ?> transform = node.getTransform();
       TransformTranslator translator = getTransformTranslator(transform.getClass());
       checkState(
-          translator != null, "no translator registered for primitive transform %s", transform);
+          translator != null,
+          "no translator registered for primitive transform %s at node %s",
+          transform,
+          node.getFullName());
       LOG.debug("Translating {}", transform);
       currentTransform = node.toAppliedPTransform();
       translator.translate(transform, this);

http://git-wip-us.apache.org/repos/asf/beam/blob/7efee543/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 6eec8f8..a2b715f 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -37,6 +37,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Strings;
 import com.google.common.base.Utf8;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
@@ -64,6 +65,7 @@ import org.apache.beam.runners.core.construction.PTransformMatchers;
 import org.apache.beam.runners.core.construction.ReplacementOutputs;
 import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
 import org.apache.beam.runners.core.construction.UnsupportedOverrideFactory;
+import org.apache.beam.runners.dataflow.BatchViewOverrides.BatchCombineGloballyAsSingletonViewFactory;
 import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification;
 import org.apache.beam.runners.dataflow.StreamingViewOverrides.StreamingCreatePCollectionViewFactory;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
@@ -91,7 +93,6 @@ import org.apache.beam.sdk.io.Write;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
-import org.apache.beam.sdk.runners.PTransformMatcher;
 import org.apache.beam.sdk.runners.PTransformOverride;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.runners.PipelineRunner;
@@ -289,96 +290,123 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     this.ptransformViewsWithNonDeterministicKeyCoders = new HashSet<>();
   }
 
-  private Map<PTransformMatcher, PTransformOverrideFactory> getOverrides(boolean streaming) {
-    ImmutableMap.Builder<PTransformMatcher, PTransformOverrideFactory> ptoverrides =
-        ImmutableMap.builder();
+  private List<PTransformOverride> getOverrides(boolean streaming) {
+    ImmutableList.Builder<PTransformOverride> overridesBuilder = ImmutableList.builder();
     // Create is implemented in terms of a Read, so it must precede the override to Read in
     // streaming
-    ptoverrides
-        .put(PTransformMatchers.flattenWithDuplicateInputs(), DeduplicatedFlattenFactory.create())
-        .put(PTransformMatchers.emptyFlatten(), EmptyFlattenAsCreateFactory.instance());
+    overridesBuilder
+        .add(
+            PTransformOverride.of(
+                PTransformMatchers.flattenWithDuplicateInputs(),
+                DeduplicatedFlattenFactory.create()))
+        .add(
+            PTransformOverride.of(
+                PTransformMatchers.emptyFlatten(), EmptyFlattenAsCreateFactory.instance()));
     if (streaming) {
       // In streaming mode must use either the custom Pubsub unbounded source/sink or
       // defer to Windmill's built-in implementation.
       for (Class<? extends DoFn> unsupported :
           ImmutableSet.of(PubsubBoundedReader.class, PubsubBoundedWriter.class)) {
-        ptoverrides.put(
-            PTransformMatchers.parDoWithFnType(unsupported),
-            UnsupportedOverrideFactory.withMessage(getUnsupportedMessage(unsupported, true)));
+        overridesBuilder.add(
+            PTransformOverride.of(
+                PTransformMatchers.parDoWithFnType(unsupported),
+                UnsupportedOverrideFactory.withMessage(getUnsupportedMessage(unsupported, true))));
       }
       if (!hasExperiment(options, "enable_custom_pubsub_source")) {
-        ptoverrides.put(
-            PTransformMatchers.classEqualTo(PubsubUnboundedSource.class),
-            new ReflectiveRootOverrideFactory(StreamingPubsubIORead.class, this));
+        overridesBuilder.add(
+            PTransformOverride.of(
+                PTransformMatchers.classEqualTo(PubsubUnboundedSource.class),
+                new ReflectiveRootOverrideFactory(StreamingPubsubIORead.class, this)));
       }
       if (!hasExperiment(options, "enable_custom_pubsub_sink")) {
-        ptoverrides.put(
-            PTransformMatchers.classEqualTo(PubsubUnboundedSink.class),
-            new StreamingPubsubIOWriteOverrideFactory(this));
+        overridesBuilder.add(
+            PTransformOverride.of(
+                PTransformMatchers.classEqualTo(PubsubUnboundedSink.class),
+                new StreamingPubsubIOWriteOverrideFactory(this)));
       }
-      ptoverrides
-          .put(
+      overridesBuilder
+          .add(
               // Streaming Bounded Read is implemented in terms of Streaming Unbounded Read, and
               // must precede it
-              PTransformMatchers.classEqualTo(Read.Bounded.class),
-              new ReflectiveRootOverrideFactory(StreamingBoundedRead.class, this))
-          .put(
-              PTransformMatchers.classEqualTo(Read.Unbounded.class),
-              new ReflectiveRootOverrideFactory(StreamingUnboundedRead.class, this))
-          .put(
-              PTransformMatchers.classEqualTo(View.CreatePCollectionView.class),
-              new StreamingCreatePCollectionViewFactory());
+              PTransformOverride.of(
+                  PTransformMatchers.classEqualTo(Read.Bounded.class),
+                  new ReflectiveRootOverrideFactory(StreamingBoundedRead.class, this)))
+          .add(
+              PTransformOverride.of(
+                  PTransformMatchers.classEqualTo(Read.Unbounded.class),
+                  new ReflectiveRootOverrideFactory(StreamingUnboundedRead.class, this)))
+          .add(
+              PTransformOverride.of(
+                  PTransformMatchers.classEqualTo(View.CreatePCollectionView.class),
+                  new StreamingCreatePCollectionViewFactory()));
     } else {
       // In batch mode must use the custom Pubsub bounded source/sink.
       for (Class<? extends PTransform> unsupported :
           ImmutableSet.of(PubsubUnboundedSink.class, PubsubUnboundedSource.class)) {
-        ptoverrides.put(
-            PTransformMatchers.classEqualTo(unsupported),
-            UnsupportedOverrideFactory.withMessage(getUnsupportedMessage(unsupported, false)));
+        overridesBuilder.add(
+            PTransformOverride.of(
+                PTransformMatchers.classEqualTo(unsupported),
+                UnsupportedOverrideFactory.withMessage(getUnsupportedMessage(unsupported, false))));
       }
-      ptoverrides
+      overridesBuilder
           // State and timer pardos are implemented by expansion to GBK-then-ParDo
-          .put(
-              PTransformMatchers.stateOrTimerParDoMulti(),
-              BatchStatefulParDoOverrides.multiOutputOverrideFactory())
-          .put(
-              PTransformMatchers.stateOrTimerParDoSingle(),
-              BatchStatefulParDoOverrides.singleOutputOverrideFactory())
+          .add(
+              PTransformOverride.of(
+                  PTransformMatchers.stateOrTimerParDoMulti(),
+                  BatchStatefulParDoOverrides.multiOutputOverrideFactory()))
+          .add(
+              PTransformOverride.of(
+                  PTransformMatchers.stateOrTimerParDoSingle(),
+                  BatchStatefulParDoOverrides.singleOutputOverrideFactory()))
 
           // Write uses views internally
-          .put(PTransformMatchers.classEqualTo(Write.class), new BatchWriteFactory(this))
-          .put(
-              PTransformMatchers.classEqualTo(View.AsMap.class),
-              new ReflectiveOneToOneOverrideFactory(BatchViewOverrides.BatchViewAsMap.class, this))
-          .put(
-              PTransformMatchers.classEqualTo(View.AsMultimap.class),
-              new ReflectiveOneToOneOverrideFactory(
-                  BatchViewOverrides.BatchViewAsMultimap.class, this))
-          .put(
-              PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
-              new BatchViewOverrides.BatchCombineGloballyAsSingletonViewFactory(this))
-          .put(
-              PTransformMatchers.classEqualTo(View.AsSingleton.class),
-              new ReflectiveOneToOneOverrideFactory(
-                  BatchViewOverrides.BatchViewAsSingleton.class, this))
-          .put(
-              PTransformMatchers.classEqualTo(View.AsList.class),
-              new ReflectiveOneToOneOverrideFactory(BatchViewOverrides.BatchViewAsList.class, this))
-          .put(
-              PTransformMatchers.classEqualTo(View.AsIterable.class),
-              new ReflectiveOneToOneOverrideFactory(
-                  BatchViewOverrides.BatchViewAsIterable.class, this));
-    }
-    ptoverrides
-        .put(PTransformMatchers.classEqualTo(Reshuffle.class), new ReshuffleOverrideFactory())
+          .add(
+              PTransformOverride.of(
+                  PTransformMatchers.classEqualTo(Write.class), new BatchWriteFactory(this)))
+          .add(
+              PTransformOverride.of(
+                  PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
+                  new BatchCombineGloballyAsSingletonViewFactory(this)))
+          .add(
+              PTransformOverride.of(
+                  PTransformMatchers.classEqualTo(View.AsMap.class),
+                  new ReflectiveOneToOneOverrideFactory(
+                      BatchViewOverrides.BatchViewAsMap.class, this)))
+          .add(
+              PTransformOverride.of(
+                  PTransformMatchers.classEqualTo(View.AsMultimap.class),
+                  new ReflectiveOneToOneOverrideFactory(
+                      BatchViewOverrides.BatchViewAsMultimap.class, this)))
+          .add(
+              PTransformOverride.of(
+                  PTransformMatchers.classEqualTo(View.AsSingleton.class),
+                  new ReflectiveOneToOneOverrideFactory(
+                      BatchViewOverrides.BatchViewAsSingleton.class, this)))
+          .add(
+              PTransformOverride.of(
+                  PTransformMatchers.classEqualTo(View.AsList.class),
+                  new ReflectiveOneToOneOverrideFactory(
+                      BatchViewOverrides.BatchViewAsList.class, this)))
+          .add(
+              PTransformOverride.of(
+                  PTransformMatchers.classEqualTo(View.AsIterable.class),
+                  new ReflectiveOneToOneOverrideFactory(
+                      BatchViewOverrides.BatchViewAsIterable.class, this)));
+    }
+    overridesBuilder
+        .add(
+            PTransformOverride.of(
+                PTransformMatchers.classEqualTo(Reshuffle.class), new ReshuffleOverrideFactory()))
         // Order is important. Streaming views almost all use Combine internally.
-        .put(
-            PTransformMatchers.classEqualTo(Combine.GroupedValues.class),
-            new PrimitiveCombineGroupedValuesOverrideFactory())
-        .put(
-            PTransformMatchers.classEqualTo(ParDo.SingleOutput.class),
-            new PrimitiveParDoSingleFactory());
-    return ptoverrides.build();
+        .add(
+            PTransformOverride.of(
+                PTransformMatchers.classEqualTo(Combine.GroupedValues.class),
+                new PrimitiveCombineGroupedValuesOverrideFactory()))
+        .add(
+            PTransformOverride.of(
+                PTransformMatchers.classEqualTo(ParDo.SingleOutput.class),
+                new PrimitiveParDoSingleFactory()));
+    return overridesBuilder.build();
   }
 
   private String getUnsupportedMessage(Class<?> unsupported, boolean streaming) {
@@ -682,10 +710,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
   @VisibleForTesting
   void replaceTransforms(Pipeline pipeline) {
     boolean streaming = options.isStreaming() || containsUnboundedPCollection(pipeline);
-    for (Map.Entry<PTransformMatcher, PTransformOverrideFactory> override :
-        getOverrides(streaming).entrySet()) {
-      pipeline.replace(PTransformOverride.of(override.getKey(), override.getValue()));
-    }
+    pipeline.replaceAll(getOverrides(streaming));
   }
 
   private boolean containsUnboundedPCollection(Pipeline p) {

http://git-wip-us.apache.org/repos/asf/beam/blob/7efee543/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
index f980a0b..fa8277f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
@@ -228,8 +228,7 @@ public class Pipeline {
         });
   }
 
-  public void replace(
-      final PTransformOverride override) {
+  public void replace(final PTransformOverride override) {
     final Collection<Node> matches = new ArrayList<>();
     transforms.visit(
         new PipelineVisitor.Defaults() {