You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2016/12/01 20:56:49 UTC

[1/2] incubator-beam git commit: Move TransformHierarchy Maintenance into it

Repository: incubator-beam
Updated Branches:
  refs/heads/master 0c875ba70 -> 48130f718


Move TransformHierarchy Maintenance into it

This reduces the complexity of Pipeline.applyInternal by keeping the
responsiblities to passing a node into the Transform Hierarchy,
enforcing name uniqueness, and causing the runner to expand the
PTransform. This logic is moved to the appropriate application sites.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ab1f1ad0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ab1f1ad0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ab1f1ad0

Branch: refs/heads/master
Commit: ab1f1ad012bc559cdb099319a516e4437eed2825
Parents: 0c875ba
Author: Thomas Groh <tg...@google.com>
Authored: Tue Nov 29 14:29:47 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Thu Dec 1 12:55:25 2016 -0800

----------------------------------------------------------------------
 .../direct/KeyedPValueTrackingVisitor.java      |   2 +-
 .../DataflowPipelineTranslatorTest.java         |   2 +-
 .../main/java/org/apache/beam/sdk/Pipeline.java | 117 +++---------
 .../beam/sdk/runners/TransformHierarchy.java    | 126 ++++++++-----
 .../beam/sdk/runners/TransformTreeNode.java     | 165 +++++++++--------
 .../sdk/runners/TransformHierarchyTest.java     | 180 ++++++++++++++-----
 .../beam/sdk/runners/TransformTreeTest.java     |   4 +-
 7 files changed, 340 insertions(+), 256 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ab1f1ad0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
index 7c4376a..47b0857 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
@@ -74,7 +74,7 @@ class KeyedPValueTrackingVisitor implements PipelineVisitor {
     if (node.isRootNode()) {
       finalized = true;
     } else if (producesKeyedOutputs.contains(node.getTransform().getClass())) {
-      keyedValues.addAll(node.getExpandedOutputs());
+      keyedValues.addAll(node.getOutput().expand());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ab1f1ad0/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index c925454..95c7132 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -669,7 +669,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     PCollection<Integer> input = p.begin()
         .apply(Create.of(1, 2, 3));
 
-    thrown.expect(IllegalStateException.class);
+    thrown.expect(IllegalArgumentException.class);
     input.apply(new PartiallyBoundOutputCreator());
 
     Assert.fail("Failure expected from use of partially bound output");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ab1f1ad0/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
index 9edf496..c8a4439 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
@@ -17,10 +17,11 @@
  */
 package org.apache.beam.sdk;
 
+import static com.google.common.base.Preconditions.checkState;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.beam.sdk.coders.CoderRegistry;
@@ -31,7 +32,6 @@ import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.runners.TransformTreeNode;
 import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.UserCodeException;
@@ -282,14 +282,12 @@ public class Pipeline {
    * <p>Typically invoked by {@link PipelineRunner} subclasses.
    */
   public void traverseTopologically(PipelineVisitor visitor) {
-    Set<PValue> visitedValues = new HashSet<>();
-    // Visit all the transforms, which should implicitly visit all the values.
-    transforms.visit(visitor, visitedValues);
-    if (!visitedValues.containsAll(values)) {
-      throw new RuntimeException(
-          "internal error: should have visited all the values "
-          + "after visiting all the transforms");
-    }
+    Set<PValue> visitedValues =
+        // Visit all the transforms, which should implicitly visit all the values.
+        transforms.visit(visitor);
+    checkState(
+        visitedValues.containsAll(values),
+        "internal error: should have visited all the values after visiting all the transforms");
   }
 
   /**
@@ -351,53 +349,43 @@ public class Pipeline {
    *
    * @see Pipeline#apply
    */
-  private <InputT extends PInput, OutputT extends POutput>
-  OutputT applyInternal(String name, InputT input,
-      PTransform<? super InputT, OutputT> transform) {
-    input.finishSpecifying();
+  private <InputT extends PInput, OutputT extends POutput> OutputT applyInternal(
+      String name, InputT input, PTransform<? super InputT, OutputT> transform) {
+    String namePrefix = transforms.getCurrent().getFullName();
+    String uniqueName = uniquifyInternal(namePrefix, name);
 
-    TransformTreeNode parent = transforms.getCurrent();
-    String namePrefix = parent.getFullName();
-    String fullName = uniquifyInternal(namePrefix, name);
-
-    boolean nameIsUnique = fullName.equals(buildName(namePrefix, name));
+    boolean nameIsUnique = uniqueName.equals(buildName(namePrefix, name));
 
     if (!nameIsUnique) {
       switch (getOptions().getStableUniqueNames()) {
         case OFF:
           break;
         case WARNING:
-          LOG.warn("Transform {} does not have a stable unique name. "
-              + "This will prevent updating of pipelines.", fullName);
+          LOG.warn(
+              "Transform {} does not have a stable unique name. "
+                  + "This will prevent updating of pipelines.",
+              uniqueName);
           break;
         case ERROR:
           throw new IllegalStateException(
-              "Transform " + fullName + " does not have a stable unique name. "
-              + "This will prevent updating of pipelines.");
+              "Transform "
+                  + uniqueName
+                  + " does not have a stable unique name. "
+                  + "This will prevent updating of pipelines.");
         default:
           throw new IllegalArgumentException(
               "Unrecognized value for stable unique names: " + getOptions().getStableUniqueNames());
       }
     }
 
-    TransformTreeNode child =
-        new TransformTreeNode(parent, transform, fullName, input);
-    parent.addComposite(child);
-
-    transforms.addInput(child, input);
-
     LOG.debug("Adding {} to {}", transform, this);
+    transforms.pushNode(uniqueName, input, transform);
     try {
-      transforms.pushNode(child);
+      transforms.finishSpecifyingInput();
       transform.validate(input);
       OutputT output = runner.apply(transform, input);
-      transforms.setOutput(child, output);
+      transforms.setOutput(output);
 
-      AppliedPTransform<?, ?, ?> applied = AppliedPTransform.of(
-          child.getFullName(), input, output, transform);
-      // recordAsOutput is a NOOP if already called;
-      output.recordAsOutput(applied);
-      verifyOutputState(output, child);
       return output;
     } finally {
       transforms.popNode();
@@ -405,63 +393,6 @@ public class Pipeline {
   }
 
   /**
-   * Returns all producing transforms for the {@link PValue PValues} contained
-   * in {@code output}.
-   */
-  private List<AppliedPTransform<?, ?, ?>> getProducingTransforms(POutput output) {
-    List<AppliedPTransform<?, ?, ?>> producingTransforms = new ArrayList<>();
-    for (PValue value : output.expand()) {
-      AppliedPTransform<?, ?, ?> transform = value.getProducingTransformInternal();
-      if (transform != null) {
-        producingTransforms.add(transform);
-      }
-    }
-    return producingTransforms;
-  }
-
-  /**
-   * Verifies that the output of a {@link PTransform} is correctly configured in its
-   * {@link TransformTreeNode} in the {@link Pipeline} graph.
-   *
-   * <p>A non-composite {@link PTransform} must have all
-   * of its outputs registered as produced by that {@link PTransform}.
-   *
-   * <p>A composite {@link PTransform} must have all of its outputs
-   * registered as produced by the contained primitive {@link PTransform PTransforms}.
-   * They have each had the above check performed already, when
-   * they were applied, so the only possible failure state is
-   * that the composite {@link PTransform} has returned a primitive output.
-   */
-  private void verifyOutputState(POutput output, TransformTreeNode node) {
-    if (!node.isCompositeNode()) {
-      PTransform<?, ?> thisTransform = node.getTransform();
-      List<AppliedPTransform<?, ?, ?>> producingTransforms = getProducingTransforms(output);
-      for (AppliedPTransform<?, ?, ?> producingTransform : producingTransforms) {
-        // Using != because object identity indicates that the transforms
-        // are the same node in the pipeline
-        if (thisTransform != producingTransform.getTransform()) {
-          throw new IllegalArgumentException("Output of non-composite transform "
-              + thisTransform + " is registered as being produced by"
-              + " a different transform: " + producingTransform);
-        }
-      }
-    } else {
-      PTransform<?, ?> thisTransform = node.getTransform();
-      List<AppliedPTransform<?, ?, ?>> producingTransforms = getProducingTransforms(output);
-      for (AppliedPTransform<?, ?, ?> producingTransform : producingTransforms) {
-        // Using == because object identity indicates that the transforms
-        // are the same node in the pipeline
-        if (thisTransform == producingTransform.getTransform()) {
-          throw new IllegalStateException("Output of composite transform "
-              + thisTransform + " is registered as being produced by it,"
-              + " but the output of every composite transform should be"
-              + " produced by a primitive transform contained therein.");
-        }
-      }
-    }
-  }
-
-  /**
    * Returns the configured {@link PipelineRunner}.
    */
   public PipelineRunner<?> getRunner() {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ab1f1ad0/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
index 0a4bb08..d3fd497 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
@@ -17,14 +17,17 @@
  */
 package org.apache.beam.sdk.runners;
 
+import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
-import java.util.Deque;
+import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.LinkedList;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
@@ -34,70 +37,109 @@ import org.apache.beam.sdk.values.PValue;
  * associated {@link PValue}s.
  */
 public class TransformHierarchy {
-  private final Deque<TransformTreeNode> transformStack = new LinkedList<>();
-  private final Map<PInput, TransformTreeNode> producingTransformNode = new HashMap<>();
+  private final TransformTreeNode root;
+  private final Map<POutput, TransformTreeNode> producers;
+  // Maintain a stack based on the enclosing nodes
+  private TransformTreeNode current;
 
-  /**
-   * Create a {@code TransformHierarchy} containing a root node.
-   */
   public TransformHierarchy() {
-    // First element in the stack is the root node, holding all child nodes.
-    transformStack.add(new TransformTreeNode(null, null, "", null));
+    root = TransformTreeNode.root(this);
+    current = root;
+    producers = new HashMap<>();
   }
 
   /**
-   * Returns the last TransformTreeNode on the stack.
+   * Adds the named {@link PTransform} consuming the provided {@link PInput} as a node in this
+   * {@link TransformHierarchy} as a child of the current node, and sets it to be the current node.
+   *
+   * <p>This call should be finished by expanding and recursively calling {@link #pushNode(String,
+   * PInput, PTransform)}, calling {@link #finishSpecifyingInput()}, setting the output with {@link
+   * #setOutput(POutput)}, and ending with a call to {@link #popNode()}.
+   *
+   * @return the added node
    */
-  public TransformTreeNode getCurrent() {
-    return transformStack.peek();
+  public TransformTreeNode pushNode(String name, PInput input, PTransform<?, ?> transform) {
+    checkNotNull(
+        transform, "A %s must be provided for all Nodes", PTransform.class.getSimpleName());
+    checkNotNull(
+        name, "A name must be provided for all %s Nodes", PTransform.class.getSimpleName());
+    checkNotNull(
+        input, "An input must be provided for all %s Nodes", PTransform.class.getSimpleName());
+    current = TransformTreeNode.subtransform(current, transform, name, input);
+    return current;
   }
 
   /**
-   * Add a TransformTreeNode to the stack.
+   * Finish specifying all of the input {@link PValue PValues} of the current {@link
+   * TransformTreeNode}. Ensures that all of the inputs to the current node have been fully
+   * specified, and have been produced by a node in this graph.
    */
-  public void pushNode(TransformTreeNode current) {
-    transformStack.push(current);
+  public void finishSpecifyingInput() {
+    // Inputs must be completely specified before they are consumed by a transform.
+    current.getInput().finishSpecifying();
+    for (PValue inputValue : current.getInput().expand()) {
+      checkState(producers.get(inputValue) != null, "Producer unknown for input %s", inputValue);
+      inputValue.finishSpecifying();
+    }
   }
 
   /**
-   * Removes the last TransformTreeNode from the stack.
+   * Set the output of the current {@link TransformTreeNode}. If the output is new (setOutput has
+   * not previously been called with it as the parameter), the current node is set as the producer
+   * of that {@link POutput}.
+   *
+   * <p>Also validates the output - specifically, a Primitive {@link PTransform} produces all of
+   * its outputs, and a Composite {@link PTransform} produces none of its outputs. Verifies that the
+   * expanded output does not contain {@link PValue PValues} produced by both this node and other
+   * nodes.
    */
-  public void popNode() {
-    transformStack.pop();
-    checkState(!transformStack.isEmpty());
+  public void setOutput(POutput output) {
+    for (PValue value : output.expand()) {
+      if (!producers.containsKey(value)) {
+        producers.put(value, current);
+      }
+    }
+    current.setOutput(output);
+    // TODO: Replace with a "generateDefaultNames" method.
+    output.recordAsOutput(current.toAppliedPTransform());
   }
 
   /**
-   * Adds an input to the given node.
-   *
-   * <p>This forces the producing node to be finished.
+   * Pops the current node off the top of the stack, finishing it. Outputs of the node are finished
+   * once they are consumed as input.
    */
-  public void addInput(TransformTreeNode node, PInput input) {
-    for (PValue i : input.expand()) {
-      TransformTreeNode producer = producingTransformNode.get(i);
-      checkState(producer != null, "Producer unknown for input: %s", i);
+  public void popNode() {
+    current.finishSpecifying();
+    current = current.getEnclosingNode();
+    checkState(current != null, "Can't pop the root node of a TransformHierarchy");
+  }
 
-      producer.finishSpecifying();
-      node.addInputProducer(i, producer);
-    }
+  TransformTreeNode getProducer(PValue produced) {
+    return producers.get(produced);
   }
 
   /**
-   * Sets the output of a transform node.
+   * Returns all producing transforms for the {@link PValue PValues} contained
+   * in {@code output}.
    */
-  public void setOutput(TransformTreeNode producer, POutput output) {
-    producer.setOutput(output);
-
-    for (PValue o : output.expand()) {
-      producingTransformNode.put(o, producer);
+  List<TransformTreeNode> getProducingTransforms(POutput output) {
+    List<TransformTreeNode> producingTransforms = new ArrayList<>();
+    for (PValue value : output.expand()) {
+      TransformTreeNode producer = getProducer(value);
+      if (producer != null) {
+        producingTransforms.add(producer);
+      }
     }
+    return producingTransforms;
   }
 
-  /**
-   * Visits all nodes in the transform hierarchy, in transitive order.
-   */
-  public void visit(Pipeline.PipelineVisitor visitor,
-                    Set<PValue> visitedNodes) {
-    transformStack.peekFirst().visit(visitor, visitedNodes);
+  public Set<PValue> visit(PipelineVisitor visitor) {
+    Set<PValue> visitedValues = new HashSet<>();
+    root.visit(visitor, visitedValues);
+    return visitedValues;
+  }
+
+  public TransformTreeNode getCurrent() {
+    return current;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ab1f1ad0/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformTreeNode.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformTreeNode.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformTreeNode.java
index d16b828..ea94bd9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformTreeNode.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformTreeNode.java
@@ -17,18 +17,19 @@
  */
 package org.apache.beam.sdk.runners;
 
-import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.HashSet;
 import java.util.Set;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor.CompositeBehavior;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
@@ -39,6 +40,7 @@ import org.apache.beam.sdk.values.PValue;
  * for initialization and ordered visitation.
  */
 public class TransformTreeNode {
+  private final TransformHierarchy hierarchy;
   private final TransformTreeNode enclosingNode;
 
   // The PTransform for this node, which may be a composite PTransform.
@@ -51,10 +53,6 @@ public class TransformTreeNode {
   // Nodes for sub-transforms of a composite transform.
   private final Collection<TransformTreeNode> parts = new ArrayList<>();
 
-  // Inputs to the transform, in expanded form and mapped to the producer
-  // of the input.
-  private final Map<PValue, TransformTreeNode> inputs = new HashMap<>();
-
   // Input to the transform, in unexpanded form.
   private final PInput input;
 
@@ -62,28 +60,57 @@ public class TransformTreeNode {
   // Output of the transform, in unexpanded form.
   private POutput output;
 
-  private boolean finishedSpecifying = false;
+  @VisibleForTesting
+  boolean finishedSpecifying = false;
+
+  /**
+   * Create a root {@link TransformTreeNode}. This transform is the root of the provided {@link
+   * TransformHierarchy} - it has no enclosing node, no {@link PTransform}, no {@link PInput input},
+   * no {@link POutput output}, and an empty name. It contains all {@link PTransform transforms}
+   * within a {@link Pipeline} as component transforms.
+   */
+  public static TransformTreeNode root(TransformHierarchy hierarchy) {
+    return new TransformTreeNode(hierarchy, null, null, "", null);
+  }
+
+  /**
+   * Create a subtransform of the provided {@link TransformTreeNode node}. The enclosing node is a
+   * composite that contains this transform.
+   *
+   * <p>The returned node is a component node of the enclosing node.
+   */
+  public static TransformTreeNode subtransform(
+      TransformTreeNode enclosing, PTransform<?, ?> transform, String fullName, PInput input) {
+    checkNotNull(enclosing);
+    checkNotNull(transform);
+    checkNotNull(fullName);
+    checkNotNull(input);
+    TransformTreeNode node =
+        new TransformTreeNode(enclosing.hierarchy, enclosing, transform, fullName, input);
+    enclosing.addComposite(node);
+    return node;
+  }
 
   /**
    * Creates a new TransformTreeNode with the given parent and transform.
    *
-   * <p>EnclosingNode and transform may both be null for
-   * a root-level node, which holds all other nodes.
+   * <p>EnclosingNode and transform may both be null for a root-level node, which holds all other
+   * nodes.
    *
    * @param enclosingNode the composite node containing this node
    * @param transform the PTransform tracked by this node
    * @param fullName the fully qualified name of the transform
    * @param input the unexpanded input to the transform
    */
-  public TransformTreeNode(@Nullable TransformTreeNode enclosingNode,
-                           @Nullable PTransform<?, ?> transform,
-                           String fullName,
-                           @Nullable PInput input) {
+  private TransformTreeNode(
+      TransformHierarchy hierarchy,
+      @Nullable TransformTreeNode enclosingNode,
+      @Nullable PTransform<?, ?> transform,
+      String fullName,
+      @Nullable PInput input) {
+    this.hierarchy = hierarchy;
     this.enclosingNode = enclosingNode;
     this.transform = transform;
-    checkArgument((enclosingNode == null && transform == null)
-        || (enclosingNode != null && transform != null),
-        "EnclosingNode and transform must both be specified, or both be null");
     this.fullName = fullName;
     this.input = input;
   }
@@ -113,21 +140,23 @@ public class TransformTreeNode {
   }
 
   /**
-   * Returns true if this node represents a composite transform that does not perform
-   * processing of its own, but merely encapsulates a sub-pipeline (which may be empty).
+   * Returns true if this node represents a composite transform that does not perform processing of
+   * its own, but merely encapsulates a sub-pipeline (which may be empty).
    *
-   * <p>Note that a node may be composite with no sub-transforms if it  returns its input directly
+   * <p>Note that a node may be composite with no sub-transforms if it returns its input directly
    * extracts a component of a tuple, or other operations that occur at pipeline assembly time.
    */
   public boolean isCompositeNode() {
-    return !parts.isEmpty() || returnsOthersOutput() || isRootNode();
+    return !parts.isEmpty() || isRootNode() || returnsOthersOutput();
   }
 
   private boolean returnsOthersOutput() {
     PTransform<?, ?> transform = getTransform();
-    for (PValue output : getExpandedOutputs()) {
-      if (!output.getProducingTransformInternal().getTransform().equals(transform)) {
-        return true;
+    if (output != null) {
+      for (PValue outputValue : output.expand()) {
+        if (!hierarchy.getProducer(outputValue).getTransform().equals(transform)) {
+          return true;
+        }
       }
     }
     return false;
@@ -142,14 +171,6 @@ public class TransformTreeNode {
   }
 
   /**
-   * Adds an input to the transform node.
-   */
-  public void addInputProducer(PValue expandedInput, TransformTreeNode producer) {
-    checkState(!finishedSpecifying);
-    inputs.put(expandedInput, producer);
-  }
-
-  /**
    * Returns the transform input, in unexpanded form.
    */
   public PInput getInput() {
@@ -157,20 +178,37 @@ public class TransformTreeNode {
   }
 
   /**
-   * Returns a mapping of inputs to the producing nodes for all inputs to
-   * the transform.
-   */
-  public Map<PValue, TransformTreeNode> getInputs() {
-    return Collections.unmodifiableMap(inputs);
-  }
-
-  /**
    * Adds an output to the transform node.
    */
   public void setOutput(POutput output) {
     checkState(!finishedSpecifying);
-    checkState(this.output == null);
+    checkState(this.output == null, "Tried to specify more than one output for %s", getFullName());
+    checkNotNull(output, "Tried to set the output of %s to null", getFullName());
     this.output = output;
+
+    // Validate that a primitive transform produces only primitive output, and a composite transform
+    // does not produce primitive output.
+    Set<TransformTreeNode> outputProducers = new HashSet<>();
+    for (PValue outputValue : output.expand()) {
+      outputProducers.add(hierarchy.getProducer(outputValue));
+    }
+    if (outputProducers.contains(this) && outputProducers.size() != 1) {
+      Set<String> otherProducerNames = new HashSet<>();
+      for (TransformTreeNode outputProducer : outputProducers) {
+        if (outputProducer != this) {
+          otherProducerNames.add(outputProducer.getFullName());
+        }
+      }
+      throw new IllegalArgumentException(
+          String.format(
+              "Output of transform [%s] contains a %s produced by it as well as other Transforms. "
+                  + "A primitive transform must produce all of its outputs, and outputs of a "
+                  + "composite transform must be produced by a component transform or be part of"
+                  + "the input."
+                  + "%n    Other Outputs: %s"
+                  + "%n    Other Producers: %s",
+              getFullName(), POutput.class.getSimpleName(), output.expand(), otherProducerNames));
+    }
   }
 
   /**
@@ -180,17 +218,10 @@ public class TransformTreeNode {
     return output;
   }
 
-  /**
-   * Returns the transform outputs, in expanded form.
-   */
-  public Collection<? extends PValue> getExpandedOutputs() {
-    if (output != null) {
-      return output.expand();
-    } else {
-      return Collections.emptyList();
-    }
+  AppliedPTransform<?, ?, ?> toAppliedPTransform() {
+    return AppliedPTransform.of(
+        getFullName(), getInput(), getOutput(), (PTransform) getTransform());
   }
-
   /**
    * Visit the transform node.
    *
@@ -204,10 +235,12 @@ public class TransformTreeNode {
       finishSpecifying();
     }
 
-    // Visit inputs.
-    for (Map.Entry<PValue, TransformTreeNode> entry : inputs.entrySet()) {
-      if (visitedValues.add(entry.getKey())) {
-        visitor.visitValue(entry.getKey(), entry.getValue());
+    if (!isRootNode()) {
+      // Visit inputs.
+      for (PValue inputValue : input.expand()) {
+        if (visitedValues.add(inputValue)) {
+          visitor.visitValue(inputValue, hierarchy.getProducer(inputValue));
+        }
       }
     }
 
@@ -224,10 +257,12 @@ public class TransformTreeNode {
       visitor.visitPrimitiveTransform(this);
     }
 
-    // Visit outputs.
-    for (PValue pValue : getExpandedOutputs()) {
-      if (visitedValues.add(pValue)) {
-        visitor.visitValue(pValue, this);
+    if (!isRootNode()) {
+      // Visit outputs.
+      for (PValue pValue : output.expand()) {
+        if (visitedValues.add(pValue)) {
+          visitor.visitValue(pValue, this);
+        }
       }
     }
   }
@@ -243,15 +278,5 @@ public class TransformTreeNode {
       return;
     }
     finishedSpecifying = true;
-
-    for (TransformTreeNode input : inputs.values()) {
-      if (input != null) {
-        input.finishSpecifying();
-      }
-    }
-
-    if (output != null) {
-      output.finishSpecifyingOutput();
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ab1f1ad0/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
index c28f23e..3bf6d64 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.runners;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertThat;
 
 import java.util.HashSet;
@@ -30,9 +31,15 @@ import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
 import org.hamcrest.Matchers;
 import org.junit.Before;
@@ -63,7 +70,7 @@ public class TransformHierarchyTest {
   }
 
   @Test
-  public void popWithoutPushThrows() {
+  public void pushWithoutPushFails() {
     thrown.expect(IllegalStateException.class);
     hierarchy.popNode();
   }
@@ -71,72 +78,153 @@ public class TransformHierarchyTest {
   @Test
   public void pushThenPopSucceeds() {
     TransformTreeNode root = hierarchy.getCurrent();
-    TransformTreeNode node =
-        new TransformTreeNode(hierarchy.getCurrent(), Create.of(1), "Create", PBegin.in(pipeline));
-    hierarchy.pushNode(node);
+    TransformTreeNode node = hierarchy.pushNode("Create", PBegin.in(pipeline), Create.of(1));
     assertThat(hierarchy.getCurrent(), equalTo(node));
     hierarchy.popNode();
+    assertThat(node.finishedSpecifying, is(true));
     assertThat(hierarchy.getCurrent(), equalTo(root));
   }
 
   @Test
+  public void emptyCompositeSucceeds() {
+    PCollection<Long> created =
+        PCollection.createPrimitiveOutputInternal(
+            pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+    TransformTreeNode node = hierarchy.pushNode("Create", PBegin.in(pipeline), Create.of(1));
+    hierarchy.setOutput(created);
+    hierarchy.popNode();
+    PCollectionList<Long> pcList = PCollectionList.of(created);
+
+    TransformTreeNode emptyTransform =
+        hierarchy.pushNode(
+            "Extract",
+            pcList,
+            new PTransform<PCollectionList<Long>, PCollection<Long>>() {
+              @Override
+              public PCollection<Long> apply(PCollectionList<Long> input) {
+                return input.get(0);
+              }
+            });
+    hierarchy.setOutput(created);
+    hierarchy.popNode();
+    assertThat(hierarchy.getProducer(created), equalTo(node));
+    assertThat(
+        "A Transform that produces non-primtive output should be composite",
+        emptyTransform.isCompositeNode(),
+        is(true));
+  }
+
+  @Test
+  public void producingOwnAndOthersOutputsFails() {
+    PCollection<Long> created =
+        PCollection.createPrimitiveOutputInternal(
+            pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+    hierarchy.pushNode("Create", PBegin.in(pipeline), Create.of(1));
+    hierarchy.setOutput(created);
+    hierarchy.popNode();
+    PCollectionList<Long> pcList = PCollectionList.of(created);
+
+    final PCollectionList<Long> appended =
+        pcList.and(
+            PCollection.<Long>createPrimitiveOutputInternal(
+                pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED));
+    hierarchy.pushNode(
+        "AddPc",
+        pcList,
+        new PTransform<PCollectionList<Long>, PCollectionList<Long>>() {
+          @Override
+          public PCollectionList<Long> apply(PCollectionList<Long> input) {
+            return appended;
+          }
+        });
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("produced by it as well as other Transforms");
+    thrown.expectMessage("primitive transform must produce all of its outputs");
+    thrown.expectMessage("composite transform must be produced by a component transform");
+    thrown.expectMessage("AddPc");
+    thrown.expectMessage("Create");
+    thrown.expectMessage(appended.expand().toString());
+    hierarchy.setOutput(appended);
+  }
+
+  @Test
   public void visitVisitsAllPushed() {
     TransformTreeNode root = hierarchy.getCurrent();
-    Create.Values<Integer> create = Create.of(1);
-    PCollection<Integer> created = pipeline.apply(create);
     PBegin begin = PBegin.in(pipeline);
 
-    TransformTreeNode compositeNode =
-        new TransformTreeNode(root, create, "Create", begin);
-    root.addComposite(compositeNode);
-    TransformTreeNode primitiveNode =
-        new TransformTreeNode(
-            compositeNode, Read.from(CountingSource.upTo(1L)), "Create/Read", begin);
-    compositeNode.addComposite(primitiveNode);
-
-    TransformTreeNode otherPrimitive =
-        new TransformTreeNode(
-            root, MapElements.via(new SimpleFunction<Integer, Integer>() {
-          @Override
-          public Integer apply(Integer input) {
-            return input;
-          }
-        }), "ParDo", created);
-    root.addComposite(otherPrimitive);
-    otherPrimitive.addInputProducer(created, primitiveNode);
+    Create.Values<Long> create = Create.of(1L);
+    Read.Bounded<Long> read = Read.from(CountingSource.upTo(1L));
 
-    hierarchy.pushNode(compositeNode);
-    hierarchy.pushNode(primitiveNode);
+    PCollection<Long> created =
+        PCollection.createPrimitiveOutputInternal(
+            pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+
+    MapElements<Long, Long> map = MapElements.via(new SimpleFunction<Long, Long>() {
+      @Override
+      public Long apply(Long input) {
+        return input;
+      }
+    });
+
+    PCollection<Long> mapped =
+        PCollection.createPrimitiveOutputInternal(
+            pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+
+    TransformTreeNode compositeNode = hierarchy.pushNode("Create", begin, create);
+    assertThat(hierarchy.getCurrent(), equalTo(compositeNode));
+    assertThat(compositeNode.getInput(), Matchers.<PInput>equalTo(begin));
+    assertThat(compositeNode.getTransform(), Matchers.<PTransform<?, ?>>equalTo(create));
+    // Not yet set
+    assertThat(compositeNode.getOutput(), nullValue());
+    assertThat(compositeNode.getEnclosingNode().isRootNode(), is(true));
+
+    TransformTreeNode primitiveNode = hierarchy.pushNode("Create/Read", begin, read);
+    assertThat(hierarchy.getCurrent(), equalTo(primitiveNode));
+    hierarchy.setOutput(created);
     hierarchy.popNode();
+    assertThat(primitiveNode.getOutput(), Matchers.<POutput>equalTo(created));
+    assertThat(primitiveNode.getInput(), Matchers.<PInput>equalTo(begin));
+    assertThat(primitiveNode.getTransform(), Matchers.<PTransform<?, ?>>equalTo(read));
+    assertThat(primitiveNode.getEnclosingNode(), equalTo(compositeNode));
+
+    hierarchy.setOutput(created);
+    // The composite is listed as outputting a PValue created by the contained primitive
+    assertThat(compositeNode.getOutput(), Matchers.<POutput>equalTo(created));
+    // The producer of that PValue is still the primitive in which it is first output
+    assertThat(hierarchy.getProducer(created), equalTo(primitiveNode));
     hierarchy.popNode();
-    hierarchy.pushNode(otherPrimitive);
+
+    TransformTreeNode otherPrimitive = hierarchy.pushNode("ParDo", created, map);
+    hierarchy.setOutput(mapped);
     hierarchy.popNode();
 
     final Set<TransformTreeNode> visitedCompositeNodes = new HashSet<>();
     final Set<TransformTreeNode> visitedPrimitiveNodes = new HashSet<>();
     final Set<PValue> visitedValuesInVisitor = new HashSet<>();
 
-    Set<PValue> visitedValues = new HashSet<>();
-    hierarchy.visit(new PipelineVisitor.Defaults() {
-      @Override
-      public CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
-        visitedCompositeNodes.add(node);
-        return CompositeBehavior.ENTER_TRANSFORM;
-      }
-
-      @Override
-      public void visitPrimitiveTransform(TransformTreeNode node) {
-        visitedPrimitiveNodes.add(node);
-      }
-
-      @Override
-      public void visitValue(PValue value, TransformTreeNode producer) {
-        visitedValuesInVisitor.add(value);
-      }
-    }, visitedValues);
+    Set<PValue> visitedValues =
+        hierarchy.visit(
+            new PipelineVisitor.Defaults() {
+              @Override
+              public CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
+                visitedCompositeNodes.add(node);
+                return CompositeBehavior.ENTER_TRANSFORM;
+              }
+
+              @Override
+              public void visitPrimitiveTransform(TransformTreeNode node) {
+                visitedPrimitiveNodes.add(node);
+              }
+
+              @Override
+              public void visitValue(PValue value, TransformTreeNode producer) {
+                visitedValuesInVisitor.add(value);
+              }
+            });
 
     assertThat(visitedCompositeNodes, containsInAnyOrder(root, compositeNode));
     assertThat(visitedPrimitiveNodes, containsInAnyOrder(primitiveNode, otherPrimitive));
-    assertThat(visitedValuesInVisitor, Matchers.<PValue>containsInAnyOrder(created));
+    assertThat(visitedValuesInVisitor, Matchers.<PValue>containsInAnyOrder(created, mapped));
+    assertThat(visitedValuesInVisitor, equalTo(visitedValues));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ab1f1ad0/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
index def3a02..b95fa70 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
@@ -22,7 +22,6 @@ import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 import java.io.File;
 import java.util.Arrays;
@@ -169,14 +168,13 @@ public class TransformTreeTest {
     assertTrue(left.equals(EnumSet.of(TransformsSeen.SAMPLE_ANY)));
   }
 
-  @Test(expected = IllegalStateException.class)
+  @Test(expected = IllegalArgumentException.class)
   public void testOutputChecking() throws Exception {
     Pipeline p = TestPipeline.create();
 
     p.apply(new InvalidCompositeTransform());
 
     p.traverseTopologically(new Pipeline.PipelineVisitor.Defaults() {});
-    fail("traversal should have failed with an IllegalStateException");
   }
 
   @Test



[2/2] incubator-beam git commit: This closes #1469

Posted by tg...@apache.org.
This closes #1469


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/48130f71
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/48130f71
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/48130f71

Branch: refs/heads/master
Commit: 48130f718d019d6928c464e6f7ad90cd510b62d2
Parents: 0c875ba ab1f1ad
Author: Thomas Groh <tg...@google.com>
Authored: Thu Dec 1 12:55:26 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Thu Dec 1 12:55:26 2016 -0800

----------------------------------------------------------------------
 .../direct/KeyedPValueTrackingVisitor.java      |   2 +-
 .../DataflowPipelineTranslatorTest.java         |   2 +-
 .../main/java/org/apache/beam/sdk/Pipeline.java | 117 +++---------
 .../beam/sdk/runners/TransformHierarchy.java    | 126 ++++++++-----
 .../beam/sdk/runners/TransformTreeNode.java     | 165 +++++++++--------
 .../sdk/runners/TransformHierarchyTest.java     | 180 ++++++++++++++-----
 .../beam/sdk/runners/TransformTreeTest.java     |   4 +-
 7 files changed, 340 insertions(+), 256 deletions(-)
----------------------------------------------------------------------