You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/12/18 21:16:35 UTC

[beam] branch master updated: [BEAM-2929] Remove Dataflow expansions for PCollectionView that have been migrated into the Dataflow service for the portability framework.

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d9ffa5  [BEAM-2929] Remove Dataflow expansions for PCollectionView that have been migrated into the Dataflow service for the portability framework.
9d9ffa5 is described below

commit 9d9ffa5f1a3a9f280dfafae15944764a568515ef
Author: Luke Cwik <lc...@google.com>
AuthorDate: Tue Dec 12 16:27:58 2017 -0800

    [BEAM-2929] Remove Dataflow expansions for PCollectionView that have been migrated into the Dataflow service for the portability framework.
---
 .../dataflow/DataflowPipelineTranslator.java       | 37 +++++++++++++++++++---
 .../beam/runners/dataflow/DataflowRunner.java      | 17 ++++++----
 2 files changed, 44 insertions(+), 10 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 4f9b939..5c26e0d 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.base.Strings.isNullOrEmpty;
+import static org.apache.beam.runners.dataflow.DataflowRunner.hasExperiment;
 import static org.apache.beam.runners.dataflow.util.Structs.addBoolean;
 import static org.apache.beam.runners.dataflow.util.Structs.addDictionary;
 import static org.apache.beam.runners.dataflow.util.Structs.addList;
@@ -84,6 +85,7 @@ import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
@@ -349,7 +351,7 @@ public class DataflowPipelineTranslator {
         job.setLabels(options.getLabels());
       }
       if (options.isStreaming()
-          && !DataflowRunner.hasExperiment(options, "enable_windmill_service")) {
+          && !hasExperiment(options, "enable_windmill_service")) {
         // Use separate data disk for streaming.
         Disk disk = new Disk();
         disk.setDiskType(options.getWorkerDiskType());
@@ -447,13 +449,22 @@ public class DataflowPipelineTranslator {
     public void visitValue(PValue value, TransformHierarchy.Node producer) {
       LOG.debug("Checking translation of {}", value);
       // Primitive transforms are the only ones assigned step names.
-      if (producer.getTransform() instanceof CreateDataflowView) {
-        // CreateDataflowView produces a dummy output (as it must be a primitive transform) but
-        // in the Dataflow Job graph produces only the view and not the output PCollection.
+      if (producer.getTransform() instanceof CreateDataflowView
+          && !hasExperiment(options, "beam_fn_api")) {
+        // CreateDataflowView produces a dummy output (as it must be a primitive transform)
+        // but in the Dataflow Job graph produces only the view and not the output PCollection.
         asOutputReference(
             ((CreateDataflowView) producer.getTransform()).getView(),
             producer.toAppliedPTransform(getPipeline()));
         return;
+      } else if (producer.getTransform() instanceof View.CreatePCollectionView
+          && hasExperiment(options, "beam_fn_api")) {
+        // View.CreatePCollectionView produces a dummy output (as it must be a primitive transform)
+        // but in the Dataflow Job graph produces only the view and not the output PCollection.
+        asOutputReference(
+            ((View.CreatePCollectionView) producer.getTransform()).getView(),
+            producer.toAppliedPTransform(getPipeline()));
+        return;
       }
       asOutputReference(value, producer.toAppliedPTransform(getPipeline()));
     }
@@ -680,6 +691,24 @@ public class DataflowPipelineTranslator {
 
   static {
     registerTransformTranslator(
+        View.CreatePCollectionView.class,
+        new TransformTranslator<View.CreatePCollectionView>() {
+          @Override
+          public void translate(View.CreatePCollectionView transform, TranslationContext context) {
+            translateTyped(transform, context);
+          }
+
+          private <ElemT, ViewT> void translateTyped(
+              View.CreatePCollectionView<ElemT, ViewT> transform, TranslationContext context) {
+            StepTranslationContext stepContext =
+                context.addStep(transform, "CollectionToSingleton");
+            PCollection<ElemT> input = context.getInput(transform);
+            stepContext.addInput(PropertyNames.PARALLEL_INPUT, input);
+            stepContext.addCollectionToSingletonOutput(input, transform.getView());
+          }
+        });
+
+    registerTransformTranslator(
         CreateDataflowView.class,
         new TransformTranslator<CreateDataflowView>() {
           @Override
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index ddad43f..942d36b 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -382,11 +382,13 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
           .add(
               PTransformOverride.of(
                   PTransformMatchers.classEqualTo(Read.Unbounded.class),
-                  new StreamingUnboundedReadOverrideFactory()))
-          .add(
-              PTransformOverride.of(
-                  PTransformMatchers.classEqualTo(View.CreatePCollectionView.class),
-                  new StreamingCreatePCollectionViewFactory()));
+                  new StreamingUnboundedReadOverrideFactory()));
+      if (!hasExperiment(options, "beam_fn_api")) {
+        overridesBuilder.add(
+            PTransformOverride.of(
+                PTransformMatchers.classEqualTo(View.CreatePCollectionView.class),
+                new StreamingCreatePCollectionViewFactory()));
+      }
     } else {
       overridesBuilder
           // State and timer pardos are implemented by expansion to GBK-then-ParDo
@@ -397,7 +399,9 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
           .add(
               PTransformOverride.of(
                   PTransformMatchers.stateOrTimerParDoSingle(),
-                  BatchStatefulParDoOverrides.singleOutputOverrideFactory(options)))
+                  BatchStatefulParDoOverrides.singleOutputOverrideFactory(options)));
+      if (!hasExperiment(options, "beam_fn_api")) {
+        overridesBuilder
           .add(
               PTransformOverride.of(
                   PTransformMatchers.classEqualTo(View.AsMap.class),
@@ -422,6 +426,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
                   PTransformMatchers.classEqualTo(View.AsIterable.class),
                   new ReflectiveViewOverrideFactory(
                       BatchViewOverrides.BatchViewAsIterable.class, this)));
+      }
     }
     // Expands into Reshuffle and single-output ParDo, so has to be before the overrides below.
     if (hasExperiment(options, "beam_fn_api")) {

-- 
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" <co...@beam.apache.org>'].