You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2016/12/07 17:15:06 UTC

[1/2] incubator-beam git commit: Only provide expanded Inputs and Outputs

Repository: incubator-beam
Updated Branches:
  refs/heads/master ae52ec1bc -> 5b31a3699


Only provide expanded Inputs and Outputs

This removes PInput and POutput from the immediate API Surface of
TransformHierarchy.Node, and forces Pipeline Visitors to access only
the expanded version of the output.

This is part of the move towards the runner-agnostic representation of a
graph.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/55d333bf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/55d333bf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/55d333bf

Branch: refs/heads/master
Commit: 55d333bff68809ff1a9154491ace02d2d16e3b85
Parents: ae52ec1
Author: Thomas Groh <tg...@google.com>
Authored: Mon Dec 5 14:29:05 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Dec 7 09:14:18 2016 -0800

----------------------------------------------------------------------
 .../apex/translation/TranslationContext.java    |  4 +--
 .../beam/runners/direct/DirectGraphVisitor.java |  9 +++----
 .../direct/KeyedPValueTrackingVisitor.java      |  2 +-
 .../FlinkBatchPipelineTranslator.java           |  4 +--
 .../FlinkStreamingPipelineTranslator.java       |  7 ++----
 .../dataflow/DataflowPipelineTranslator.java    |  3 +--
 .../apache/beam/runners/spark/SparkRunner.java  | 17 +++++++------
 .../beam/sdk/runners/TransformHierarchy.java    | 26 +++++++++++---------
 .../sdk/runners/TransformHierarchyTest.java     | 13 ++++------
 9 files changed, 38 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/55d333bf/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
index 259afbd..3bf01a8 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
@@ -35,7 +35,6 @@ import org.apache.beam.runners.apex.translation.utils.CoderAdapterStreamCodec;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
 import org.apache.beam.sdk.util.state.StateInternalsFactory;
 import org.apache.beam.sdk.values.PCollection;
@@ -72,8 +71,7 @@ class TranslationContext {
   }
 
   public void setCurrentTransform(TransformHierarchy.Node treeNode) {
-    this.currentTransform = AppliedPTransform.of(treeNode.getFullName(),
-        treeNode.getInput(), treeNode.getOutput(), (PTransform) treeNode.getTransform());
+    this.currentTransform = treeNode.toAppliedPTransform();
   }
 
   public ApexPipelineOptions getPipelineOptions() {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/55d333bf/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
index cd9d120..4f38bce 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
@@ -79,13 +79,13 @@ class DirectGraphVisitor extends PipelineVisitor.Defaults {
 
   @Override
   public void visitPrimitiveTransform(TransformHierarchy.Node node) {
-    toFinalize.removeAll(node.getInput().expand());
+    toFinalize.removeAll(node.getInputs());
     AppliedPTransform<?, ?, ?> appliedTransform = getAppliedTransform(node);
     stepNames.put(appliedTransform, genStepName());
-    if (node.getInput().expand().isEmpty()) {
+    if (node.getInputs().isEmpty()) {
       rootTransforms.add(appliedTransform);
     } else {
-      for (PValue value : node.getInput().expand()) {
+      for (PValue value : node.getInputs()) {
         primitiveConsumers.put(value, appliedTransform);
       }
     }
@@ -111,8 +111,7 @@ class DirectGraphVisitor extends PipelineVisitor.Defaults {
 
   private AppliedPTransform<?, ?, ?> getAppliedTransform(TransformHierarchy.Node node) {
     @SuppressWarnings({"rawtypes", "unchecked"})
-    AppliedPTransform<?, ?, ?> application = AppliedPTransform.of(
-        node.getFullName(), node.getInput(), node.getOutput(), (PTransform) node.getTransform());
+    AppliedPTransform<?, ?, ?> application = node.toAppliedPTransform();
     return application;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/55d333bf/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
index 5dc24c2..4161f9e 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
@@ -74,7 +74,7 @@ class KeyedPValueTrackingVisitor implements PipelineVisitor {
     if (node.isRootNode()) {
       finalized = true;
     } else if (producesKeyedOutputs.contains(node.getTransform().getClass())) {
-      keyedValues.addAll(node.getOutput().expand());
+      keyedValues.addAll(node.getOutputs());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/55d333bf/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
index 805c41c..209be69 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.flink.translation;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.TransformHierarchy;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -113,8 +112,7 @@ public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator {
     BatchTransformTranslator<T> typedTranslator = (BatchTransformTranslator<T>) translator;
 
     // create the applied PTransform on the batchContext
-    batchContext.setCurrentTransform(AppliedPTransform.of(
-        node.getFullName(), node.getInput(), node.getOutput(), (PTransform) transform));
+    batchContext.setCurrentTransform(node.toAppliedPTransform());
     typedTranslator.translateNode(typedTransform, batchContext);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/55d333bf/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
index a07dc3d..23f4d34 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
@@ -19,7 +19,6 @@ package org.apache.beam.runners.flink.translation;
 
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.TransformHierarchy;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -109,8 +108,7 @@ public class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
     StreamTransformTranslator<T> typedTranslator = (StreamTransformTranslator<T>) translator;
 
     // create the applied PTransform on the streamingContext
-    streamingContext.setCurrentTransform(AppliedPTransform.of(
-        node.getFullName(), node.getInput(), node.getOutput(), (PTransform) transform));
+    streamingContext.setCurrentTransform(node.toAppliedPTransform());
     typedTranslator.translateNode(typedTransform, streamingContext);
   }
 
@@ -125,8 +123,7 @@ public class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
     @SuppressWarnings("unchecked")
     StreamTransformTranslator<T> typedTranslator = (StreamTransformTranslator<T>) translator;
 
-    streamingContext.setCurrentTransform(AppliedPTransform.of(
-        node.getFullName(), node.getInput(), node.getOutput(), (PTransform) transform));
+    streamingContext.setCurrentTransform(node.toAppliedPTransform());
 
     return typedTranslator.canTranslate(typedTransform, streamingContext);
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/55d333bf/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index f43e176..8783056 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -531,8 +531,7 @@ public class DataflowPipelineTranslator {
             "no translator registered for " + transform);
       }
       LOG.debug("Translating {}", transform);
-      currentTransform = AppliedPTransform.of(
-          node.getFullName(), node.getInput(), node.getOutput(), (PTransform) transform);
+      currentTransform = node.toAppliedPTransform();
       translator.translate(transform, this);
       currentTransform = null;
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/55d333bf/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index d51ee7d..3d98b87 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -18,6 +18,7 @@
 
 package org.apache.beam.runners.spark;
 
+import com.google.common.collect.Iterables;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
@@ -42,7 +43,6 @@ import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PInput;
@@ -278,8 +278,11 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
     }
 
     private boolean shouldDefer(TransformHierarchy.Node node) {
-      PInput input = node.getInput();
       // if the input is not a PCollection, or it is but with non merging windows, don't defer.
+      if (node.getInputs().size() != 1) {
+        return false;
+      }
+      PValue input = Iterables.getOnlyElement(node.getInputs());
       if (!(input instanceof PCollection)
           || ((PCollection) input).getWindowingStrategy().getWindowFn().isNonMerging()) {
         return false;
@@ -319,8 +322,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
       @SuppressWarnings("unchecked") TransformEvaluator<TransformT> evaluator =
           translate(node, transform, transformClass);
       LOG.info("Evaluating {}", transform);
-      AppliedPTransform<PInput, POutput, TransformT> appliedTransform =
-          AppliedPTransform.of(node.getFullName(), node.getInput(), node.getOutput(), transform);
+      AppliedPTransform<?, ?, ?> appliedTransform = node.toAppliedPTransform();
       ctxt.setCurrentTransform(appliedTransform);
       evaluator.evaluate(transform, ctxt);
       ctxt.setCurrentTransform(null);
@@ -337,12 +339,11 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
       // usually, the input determines if the PCollection to apply the next transformation to
       // is BOUNDED or UNBOUNDED, meaning RDD/DStream.
       Collection<? extends PValue> pValues;
-      PInput pInput = node.getInput();
-      if (pInput instanceof PBegin) {
+      if (node.getInputs().isEmpty()) {
         // in case of a PBegin, it's the output.
-        pValues = node.getOutput().expand();
+        pValues = node.getOutputs();
       } else {
-        pValues = pInput.expand();
+        pValues = node.getInputs();
       }
       PCollection.IsBounded isNodeBounded = isBoundedCollection(pValues);
       // translate accordingly.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/55d333bf/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
index e9829cc..33d5231 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
@@ -23,6 +23,7 @@ import static com.google.common.base.Preconditions.checkState;
 import com.google.common.annotations.VisibleForTesting;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -83,8 +84,8 @@ public class TransformHierarchy {
    */
   public void finishSpecifyingInput() {
     // Inputs must be completely specified before they are consumed by a transform.
-    current.getInput().finishSpecifying();
-    for (PValue inputValue : current.getInput().expand()) {
+    for (PValue inputValue : current.getInputs()) {
+      inputValue.finishSpecifying();
       checkState(producers.get(inputValue) != null, "Producer unknown for input %s", inputValue);
       inputValue.finishSpecifying();
     }
@@ -101,6 +102,7 @@ public class TransformHierarchy {
    * nodes.
    */
   public void setOutput(POutput output) {
+    output.finishSpecifyingOutput();
     for (PValue value : output.expand()) {
       if (!producers.containsKey(value)) {
         producers.put(value, current);
@@ -253,11 +255,9 @@ public class TransformHierarchy {
       return fullName;
     }
 
-    /**
-     * Returns the transform input, in unexpanded form.
-     */
-    public PInput getInput() {
-      return input;
+    /** Returns the transform input, in unexpanded form. */
+    public Collection<? extends PValue> getInputs() {
+      return input == null ? Collections.<PValue>emptyList() : input.expand();
     }
 
     /**
@@ -296,13 +296,15 @@ public class TransformHierarchy {
     }
 
     /** Returns the transform output, in unexpanded form. */
-    public POutput getOutput() {
-      return output;
+    public Collection<? extends PValue> getOutputs() {
+      return output == null ? Collections.<PValue>emptyList() : output.expand();
     }
 
-    AppliedPTransform<?, ?, ?> toAppliedPTransform() {
-      return AppliedPTransform.of(
-          getFullName(), getInput(), getOutput(), (PTransform) getTransform());
+    /**
+     * Returns the {@link AppliedPTransform} representing this {@link Node}.
+     */
+    public AppliedPTransform<?, ?, ?> toAppliedPTransform() {
+      return AppliedPTransform.of(getFullName(), input, output, (PTransform) getTransform());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/55d333bf/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
index f4488f4..ea43188 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.runners;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertThat;
 
 import java.util.HashSet;
@@ -38,8 +37,6 @@ import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.PCollectionList;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
 import org.hamcrest.Matchers;
 import org.junit.Before;
@@ -172,24 +169,24 @@ public class TransformHierarchyTest {
 
     TransformHierarchy.Node compositeNode = hierarchy.pushNode("Create", begin, create);
     assertThat(hierarchy.getCurrent(), equalTo(compositeNode));
-    assertThat(compositeNode.getInput(), Matchers.<PInput>equalTo(begin));
+    assertThat(compositeNode.getInputs(), Matchers.emptyIterable());
     assertThat(compositeNode.getTransform(), Matchers.<PTransform<?, ?>>equalTo(create));
     // Not yet set
-    assertThat(compositeNode.getOutput(), nullValue());
+    assertThat(compositeNode.getOutputs(), Matchers.emptyIterable());
     assertThat(compositeNode.getEnclosingNode().isRootNode(), is(true));
 
     TransformHierarchy.Node primitiveNode = hierarchy.pushNode("Create/Read", begin, read);
     assertThat(hierarchy.getCurrent(), equalTo(primitiveNode));
     hierarchy.setOutput(created);
     hierarchy.popNode();
-    assertThat(primitiveNode.getOutput(), Matchers.<POutput>equalTo(created));
-    assertThat(primitiveNode.getInput(), Matchers.<PInput>equalTo(begin));
+    assertThat(primitiveNode.getOutputs(), Matchers.<PValue>containsInAnyOrder(created));
+    assertThat(primitiveNode.getInputs(), Matchers.<PValue>emptyIterable());
     assertThat(primitiveNode.getTransform(), Matchers.<PTransform<?, ?>>equalTo(read));
     assertThat(primitiveNode.getEnclosingNode(), equalTo(compositeNode));
 
     hierarchy.setOutput(created);
     // The composite is listed as outputting a PValue created by the contained primitive
-    assertThat(compositeNode.getOutput(), Matchers.<POutput>equalTo(created));
+    assertThat(compositeNode.getOutputs(), Matchers.<PValue>containsInAnyOrder(created));
     // The producer of that PValue is still the primitive in which it is first output
     assertThat(hierarchy.getProducer(created), equalTo(primitiveNode));
     hierarchy.popNode();


[2/2] incubator-beam git commit: This closes #1511

Posted by tg...@apache.org.
This closes #1511


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5b31a369
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5b31a369
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5b31a369

Branch: refs/heads/master
Commit: 5b31a369962907e257de8019fbf6cde4c615b1c0
Parents: ae52ec1 55d333b
Author: Thomas Groh <tg...@google.com>
Authored: Wed Dec 7 09:14:38 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Dec 7 09:14:38 2016 -0800

----------------------------------------------------------------------
 .../apex/translation/TranslationContext.java    |  4 +--
 .../beam/runners/direct/DirectGraphVisitor.java |  9 +++----
 .../direct/KeyedPValueTrackingVisitor.java      |  2 +-
 .../FlinkBatchPipelineTranslator.java           |  4 +--
 .../FlinkStreamingPipelineTranslator.java       |  7 ++----
 .../dataflow/DataflowPipelineTranslator.java    |  3 +--
 .../apache/beam/runners/spark/SparkRunner.java  | 17 +++++++------
 .../beam/sdk/runners/TransformHierarchy.java    | 26 +++++++++++---------
 .../sdk/runners/TransformHierarchyTest.java     | 13 ++++------
 9 files changed, 38 insertions(+), 47 deletions(-)
----------------------------------------------------------------------