You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/06/05 17:19:43 UTC

[1/2] beam git commit: Remove the requirement to visit PCollectionViews in Dataflow

Repository: beam
Updated Branches:
  refs/heads/master 09d75a0b0 -> 6543e56d2


Remove the requirement to visit PCollectionViews in Dataflow

Remove the unused addStep method in the Dataflow Translator.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7e82bc4b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7e82bc4b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7e82bc4b

Branch: refs/heads/master
Commit: 7e82bc4b280cd35fca042a50f0055cd68850da68
Parents: 09d75a0
Author: Thomas Groh <tg...@google.com>
Authored: Wed May 31 15:42:03 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Jun 5 10:19:21 2017 -0700

----------------------------------------------------------------------
 .../dataflow/DataflowPipelineTranslator.java    | 72 +++------------
 .../beam/runners/dataflow/ReadTranslator.java   |  6 +-
 .../runners/dataflow/TransformTranslator.java   | 26 +++---
 .../DataflowPipelineTranslatorTest.java         | 95 --------------------
 4 files changed, 24 insertions(+), 175 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7e82bc4b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index af93ef5..8eaf61b 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -438,12 +438,9 @@ public class DataflowPipelineTranslator {
 
     @Override
     public void visitValue(PValue value, TransformHierarchy.Node producer) {
-      producers.put(value, producer.toAppliedPTransform(getPipeline()));
       LOG.debug("Checking translation of {}", value);
-      if (!producer.isCompositeNode()) {
-        // Primitive transforms are the only ones assigned step names.
-        asOutputReference(value, producer.toAppliedPTransform(getPipeline()));
-      }
+      // Primitive transforms are the only ones assigned step names.
+      asOutputReference(value, producer.toAppliedPTransform(getPipeline()));
     }
 
     @Override
@@ -471,48 +468,6 @@ public class DataflowPipelineTranslator {
       return stepContext;
     }
 
-    @Override
-    public Step addStep(PTransform<?, ? extends PValue> transform, Step original) {
-      Step step = original.clone();
-      String stepName = step.getName();
-      if (stepNames.put(getCurrentTransform(transform), stepName) != null) {
-        throw new IllegalArgumentException(transform + " already has a name specified");
-      }
-
-      Map<String, Object> properties = step.getProperties();
-      if (properties != null) {
-        @Nullable List<Map<String, Object>> outputInfoList = null;
-        try {
-          // TODO: This should be done via a Structs accessor.
-          @Nullable List<Map<String, Object>> list =
-              (List<Map<String, Object>>) properties.get(PropertyNames.OUTPUT_INFO);
-          outputInfoList = list;
-        } catch (Exception e) {
-          throw new RuntimeException("Inconsistent dataflow pipeline translation", e);
-        }
-        if (outputInfoList != null && outputInfoList.size() > 0) {
-          Map<String, Object> firstOutputPort = outputInfoList.get(0);
-          @Nullable String name;
-          try {
-            name = getString(firstOutputPort, PropertyNames.OUTPUT_NAME);
-          } catch (Exception e) {
-            name = null;
-          }
-          if (name != null) {
-            registerOutputName(getOutput(transform), name);
-          }
-        }
-      }
-
-      List<Step> steps = job.getSteps();
-      if (steps == null) {
-        steps = new LinkedList<>();
-        job.setSteps(steps);
-      }
-      steps.add(step);
-      return step;
-    }
-
     public OutputReference asOutputReference(PValue value, AppliedPTransform<?, ?, ?> producer) {
       String stepName = stepNames.get(producer);
       checkArgument(stepName != null, "%s doesn't have a name specified", producer);
@@ -607,26 +562,19 @@ public class DataflowPipelineTranslator {
     }
 
     @Override
-    public long addOutput(PValue value) {
-      Coder<?> coder;
-      if (value instanceof PCollection) {
-        coder = ((PCollection<?>) value).getCoder();
-        if (value instanceof PCollection) {
-          // Wrap the PCollection element Coder inside a WindowedValueCoder.
-          coder = WindowedValue.getFullCoder(
-              coder,
-              ((PCollection<?>) value).getWindowingStrategy().getWindowFn().windowCoder());
-        }
-      } else {
-        // No output coder to encode.
-        coder = null;
-      }
+    public long addOutput(PCollection<?> value) {
+      translator.producers.put(value, translator.currentTransform);
+      // Wrap the PCollection element Coder inside a WindowedValueCoder.
+      Coder<?> coder =
+          WindowedValue.getFullCoder(
+              value.getCoder(), value.getWindowingStrategy().getWindowFn().windowCoder());
       return addOutput(value, coder);
     }
 
     @Override
     public long addCollectionToSingletonOutput(
-        PValue inputValue, PValue outputValue) {
+        PCollection<?> inputValue, PCollectionView<?> outputValue) {
+      translator.producers.put(outputValue, translator.currentTransform);
       Coder<?> inputValueCoder =
           checkNotNull(translator.outputCoders.get(inputValue));
       // The inputValueCoder for the input PCollection should be some

http://git-wip-us.apache.org/repos/asf/beam/blob/7e82bc4b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java
index 0b22d7e..693748a 100755
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java
@@ -29,7 +29,7 @@ import org.apache.beam.runners.dataflow.util.PropertyNames;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.Source;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.PCollection;
 
 /**
  * Translator for the {@code Read} {@code PTransform} for the Dataflow back-end.
@@ -41,7 +41,9 @@ class ReadTranslator implements TransformTranslator<Read.Bounded<?>> {
   }
 
   public static <T> void translateReadHelper(
-      Source<T> source, PTransform<?, ? extends PValue> transform, TranslationContext context) {
+      Source<T> source,
+      PTransform<?, ? extends PCollection<?>> transform,
+      TranslationContext context) {
     try {
       StepTranslationContext stepContext = context.addStep(transform, "ParallelRead");
       stepContext.addInput(PropertyNames.FORMAT, PropertyNames.CUSTOM_SOURCE_FORMAT);

http://git-wip-us.apache.org/repos/asf/beam/blob/7e82bc4b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
index 23949bd..a7452b2 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
@@ -17,15 +17,16 @@
  */
 package org.apache.beam.runners.dataflow;
 
-import com.google.api.services.dataflow.model.Step;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.runners.dataflow.util.OutputReference;
+import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
@@ -65,14 +66,6 @@ interface TransformTranslator<TransformT extends PTransform> {
      */
     StepTranslationContext addStep(PTransform<?, ?> transform, String type);
 
-    /**
-     * Adds a pre-defined step to the Dataflow workflow. The given PTransform should be consistent
-     * with the Step, in terms of input, output and coder types.
-     *
-     * <p>This is a low-level operation, when using this method it is up to the caller to ensure
-     * that names do not collide.
-     */
-    Step addStep(PTransform<?, ? extends PValue> transform, Step step);
     /** Encode a PValue reference as an output reference. */
     OutputReference asOutputReference(PValue value, AppliedPTransform<?, ?, ?> producer);
 
@@ -100,10 +93,11 @@ interface TransformTranslator<TransformT extends PTransform> {
      * Adds an input with the given name to this Dataflow step, coming from the specified input
      * PValue.
      *
-     * <p>The input {@link PValue} must have already been produced by a step earlier in this {@link
-     * Pipeline}. If the input value has not yet been produced yet (either by a call to {@link
-     * StepTranslationContext#addOutput(PValue)} or within a call to {@link
-     * TranslationContext#addStep(PTransform, Step)}), this method will throw an exception.
+     * <p>The input {@link PValue} must have already been produced by a step earlier in this
+     * {@link Pipeline}. If the input value has not yet been produced yet (by a call to either
+     * {@link StepTranslationContext#addOutput(PCollection)} or
+     * {@link StepTranslationContext#addCollectionToSingletonOutput(PCollection, PCollectionView)})
+     * this method will throw an exception.
      */
     void addInput(String name, PInput value);
 
@@ -114,18 +108,18 @@ interface TransformTranslator<TransformT extends PTransform> {
     void addInput(String name, List<? extends Map<String, Object>> elements);
 
     /**
-     * Adds an output to this Dataflow step, producing the specified output {@code PValue},
+     * Adds a primitive output to this Dataflow step, producing the specified output {@code PValue},
      * including its {@code Coder} if a {@code TypedPValue}. If the {@code PValue} is a {@code
      * PCollection}, wraps its coder inside a {@code WindowedValueCoder}. Returns a pipeline level
      * unique id.
      */
-    long addOutput(PValue value);
+    long addOutput(PCollection<?> value);
 
     /**
      * Adds an output to this {@code CollectionToSingleton} Dataflow step, consuming the specified
      * input {@code PValue} and producing the specified output {@code PValue}. This step requires
      * special treatment for its output encoding. Returns a pipeline level unique id.
      */
-    long addCollectionToSingletonOutput(PValue inputValue, PValue outputValue);
+    long addCollectionToSingletonOutput(PCollection<?> inputValue, PCollectionView<?> outputValue);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7e82bc4b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 87744f0..89dc2d5 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -17,8 +17,6 @@
  */
 package org.apache.beam.runners.dataflow;
 
-import static org.apache.beam.runners.dataflow.util.Structs.addObject;
-import static org.apache.beam.runners.dataflow.util.Structs.getDictionary;
 import static org.apache.beam.runners.dataflow.util.Structs.getString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasEntry;
@@ -73,7 +71,6 @@ import org.apache.beam.runners.dataflow.util.PropertyNames;
 import org.apache.beam.runners.dataflow.util.Structs;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
@@ -531,57 +528,6 @@ public class DataflowPipelineTranslatorTest implements Serializable {
         job.getEnvironment().getWorkerPools().get(0).getDiskSizeGb());
   }
 
-  @Test
-  public void testPredefinedAddStep() throws Exception {
-    DataflowPipelineOptions options = buildPipelineOptions();
-
-    DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
-    DataflowPipelineTranslator.registerTransformTranslator(
-        EmbeddedTransform.class, new EmbeddedTranslator());
-
-    // Create a predefined step using another pipeline
-    Step predefinedStep = createPredefinedStep();
-
-    // Create a pipeline that the predefined step will be embedded into
-    Pipeline pipeline = Pipeline.create(options);
-    pipeline.apply("ReadMyFile", TextIO.read().from("gs://bucket/in"))
-        .apply(ParDo.of(new NoOpFn()))
-        .apply(new EmbeddedTransform(predefinedStep.clone()))
-        .apply(ParDo.of(new NoOpFn()));
-    DataflowRunner runner = DataflowRunner.fromOptions(options);
-    runner.replaceTransforms(pipeline);
-    Job job =
-        translator
-            .translate(
-                pipeline,
-                runner,
-                Collections.<DataflowPackage>emptyList())
-            .getJob();
-    assertAllStepOutputsHaveUniqueIds(job);
-
-    List<Step> steps = job.getSteps();
-    assertEquals(4, steps.size());
-
-    // The input to the embedded step should match the output of the step before
-    Map<String, Object> step1Out = getOutputPortReference(steps.get(1));
-    Map<String, Object> step2In = getDictionary(
-        steps.get(2).getProperties(), PropertyNames.PARALLEL_INPUT);
-    assertEquals(step1Out, step2In);
-
-    // The output from the embedded step should match the input of the step after
-    Map<String, Object> step2Out = getOutputPortReference(steps.get(2));
-    Map<String, Object> step3In = getDictionary(
-        steps.get(3).getProperties(), PropertyNames.PARALLEL_INPUT);
-    assertEquals(step2Out, step3In);
-
-    // The step should not have been modified other than remapping the input
-    Step predefinedStepClone = predefinedStep.clone();
-    Step embeddedStepClone = steps.get(2).clone();
-    predefinedStepClone.getProperties().remove(PropertyNames.PARALLEL_INPUT);
-    embeddedStepClone.getProperties().remove(PropertyNames.PARALLEL_INPUT);
-    assertEquals(predefinedStepClone, embeddedStepClone);
-  }
-
   /**
    * Construct a OutputReference for the output of the step.
    */
@@ -630,47 +576,6 @@ public class DataflowPipelineTranslatorTest implements Serializable {
   }
 
   /**
-   * A placeholder transform that will be used to substitute a predefined Step.
-   */
-  private static class EmbeddedTransform
-      extends PTransform<PCollection<String>, PCollection<String>> {
-    private final Step step;
-
-    public EmbeddedTransform(Step step) {
-      this.step = step;
-    }
-
-    @Override
-    public PCollection<String> expand(PCollection<String> input) {
-      return PCollection.createPrimitiveOutputInternal(
-          input.getPipeline(),
-          WindowingStrategy.globalDefault(),
-          input.isBounded());
-    }
-
-    @Override
-    protected Coder<?> getDefaultOutputCoder() {
-      return StringUtf8Coder.of();
-    }
-  }
-
-  /**
-   * A TransformTranslator that adds the predefined Step using
-   * {@link TranslationContext#addStep} and remaps the input port reference.
-   */
-  private static class EmbeddedTranslator
-      implements TransformTranslator<EmbeddedTransform> {
-    @Override public void translate(EmbeddedTransform transform, TranslationContext context) {
-      PCollection<String> input = context.getInput(transform);
-      addObject(
-          transform.step.getProperties(),
-          PropertyNames.PARALLEL_INPUT,
-          context.asOutputReference(input, context.getProducer(input)));
-      context.addStep(transform, transform.step);
-    }
-  }
-
-  /**
    * A composite transform that returns an output that is unrelated to
    * the input.
    */


[2/2] beam git commit: This closes #3272

Posted by tg...@apache.org.
This closes #3272


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6543e56d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6543e56d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6543e56d

Branch: refs/heads/master
Commit: 6543e56d264f56f8c7cd92afd71c5311a859895c
Parents: 09d75a0 7e82bc4
Author: Thomas Groh <tg...@google.com>
Authored: Mon Jun 5 10:19:32 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Jun 5 10:19:32 2017 -0700

----------------------------------------------------------------------
 .../dataflow/DataflowPipelineTranslator.java    | 72 +++------------
 .../beam/runners/dataflow/ReadTranslator.java   |  6 +-
 .../runners/dataflow/TransformTranslator.java   | 26 +++---
 .../DataflowPipelineTranslatorTest.java         | 95 --------------------
 4 files changed, 24 insertions(+), 175 deletions(-)
----------------------------------------------------------------------