You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/06/12 16:55:56 UTC

[41/50] [abbrv] beam git commit: Visit a Transform Hierarchy in Topological Order

Visit a Transform Hierarchy in Topological Order

This reverts commit 6ad6433ec0c02aec8656e9e3b27f6e0f974f8ece.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/261e7df2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/261e7df2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/261e7df2

Branch: refs/heads/gearpump-runner
Commit: 261e7df2b860fe82d9f401e2621b020fe2020fea
Parents: d2c4093
Author: Thomas Groh <tg...@google.com>
Authored: Tue Jun 6 16:15:19 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Jun 9 14:44:42 2017 -0700

----------------------------------------------------------------------
 .../spark/translation/StorageLevelTest.java     |   4 +-
 .../beam/sdk/runners/TransformHierarchy.java    |  79 +++++++-
 .../sdk/runners/TransformHierarchyTest.java     | 197 +++++++++++++++++++
 3 files changed, 274 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/261e7df2/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java
index 8f2e681..8bd6dae 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java
@@ -58,12 +58,12 @@ public class StorageLevelTest {
 
   @Test
   public void test() throws Exception {
-    PCollection<String> pCollection = pipeline.apply(Create.of("foo"));
+    PCollection<String> pCollection = pipeline.apply("CreateFoo", Create.of("foo"));
 
     // by default, the Spark runner doesn't cache the RDD if it accessed only one time.
     // So, to "force" the caching of the RDD, we have to call the RDD at least two time.
     // That's why we are using Count fn on the PCollection.
-    pCollection.apply(Count.<String>globally());
+    pCollection.apply("CountAll", Count.<String>globally());
 
     PCollection<String> output = pCollection.apply(new StorageLevelPTransform());
 

http://git-wip-us.apache.org/repos/asf/beam/blob/261e7df2/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
index ee1ce7b..5e048eb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
@@ -208,7 +208,7 @@ public class TransformHierarchy {
   public Set<PValue> visit(PipelineVisitor visitor) {
     finishSpecifying();
     Set<PValue> visitedValues = new HashSet<>();
-    root.visit(visitor, visitedValues);
+    root.visit(visitor, visitedValues, new HashSet<Node>(), new HashSet<Node>());
     return visitedValues;
   }
 
@@ -503,10 +503,60 @@ public class TransformHierarchy {
     /**
      * Visit the transform node.
      *
+     * <p>The visit proceeds in the following order:
+     *
+     * <ul>
+     *   <li>Visit all input {@link PValue PValues} returned by the flattened expansion of {@link
+     *       Node#getInputs()}.
+     *   <li>If the node is a composite:
+     *       <ul>
+     *         <li>Enter the node via {@link PipelineVisitor#enterCompositeTransform(Node)}.
+     *         <li>If the result of {@link PipelineVisitor#enterCompositeTransform(Node)} was {@link
+     *             CompositeBehavior#ENTER_TRANSFORM}, visit each child node of this {@link Node}.
+     *         <li>Leave the node via {@link PipelineVisitor#leaveCompositeTransform(Node)}.
+     *       </ul>
+     *   <li>If the node is a primitive, visit it via {@link
+     *       PipelineVisitor#visitPrimitiveTransform(Node)}.
+     *   <li>Visit each {@link PValue} that was output by this node.
+     * </ul>
+     *
+     * <p>Additionally, the following ordering restrictions are observed:
+     *
+     * <ul>
+     *   <li>A {@link Node} will be visited after its enclosing node has been entered and before its
+     *       enclosing node has been left
+     *   <li>A {@link Node} will not be visited if any enclosing {@link Node} has returned {@link
+     *       CompositeBehavior#DO_NOT_ENTER_TRANSFORM} from the call to {@link
+     *       PipelineVisitor#enterCompositeTransform(Node)}.
+     *   <li>A {@link PValue} will only be visited after the {@link Node} that originally produced
+     *       it has been visited.
+     * </ul>
+     *
      * <p>Provides an ordered visit of the input values, the primitive transform (or child nodes for
      * composite transforms), then the output values.
      */
-    private void visit(PipelineVisitor visitor, Set<PValue> visitedValues) {
+    private void visit(
+        PipelineVisitor visitor,
+        Set<PValue> visitedValues,
+        Set<Node> visitedNodes,
+        Set<Node> skippedComposites) {
+      if (getEnclosingNode() != null && !visitedNodes.contains(getEnclosingNode())) {
+        // Recursively enter all enclosing nodes, as appropriate.
+        getEnclosingNode().visit(visitor, visitedValues, visitedNodes, skippedComposites);
+      }
+      // These checks occur after visiting the enclosing node to ensure that if this node has been
+      // visited while visiting the enclosing node the node is not revisited, or, if an enclosing
+      // Node is skipped, this node is also skipped.
+      if (!visitedNodes.add(this)) {
+        LOG.debug("Not revisiting previously visited node {}", this);
+        return;
+      } else if (childNodeOf(skippedComposites)) {
+        // This node is a child of a node that has been passed over via CompositeBehavior, and
+        // should also be skipped. All child nodes of a skipped composite should always be skipped.
+        LOG.debug("Not revisiting Node {} which is a child of a previously passed composite", this);
+        return;
+      }
+
       if (!finishedSpecifying) {
         finishSpecifying();
       }
@@ -514,22 +564,31 @@ public class TransformHierarchy {
       if (!isRootNode()) {
         // Visit inputs.
         for (PValue inputValue : inputs.values()) {
+          Node valueProducer = getProducer(inputValue);
+          if (!visitedNodes.contains(valueProducer)) {
+            valueProducer.visit(visitor, visitedValues, visitedNodes, skippedComposites);
+          }
           if (visitedValues.add(inputValue)) {
-            visitor.visitValue(inputValue, getProducer(inputValue));
+            LOG.debug("Visiting input value {}", inputValue);
+            visitor.visitValue(inputValue, valueProducer);
           }
         }
       }
 
       if (isCompositeNode()) {
+        LOG.debug("Visiting composite node {}", this);
         PipelineVisitor.CompositeBehavior recurse = visitor.enterCompositeTransform(this);
 
         if (recurse.equals(CompositeBehavior.ENTER_TRANSFORM)) {
           for (Node child : parts) {
-            child.visit(visitor, visitedValues);
+            child.visit(visitor, visitedValues, visitedNodes, skippedComposites);
           }
+        } else {
+          skippedComposites.add(this);
         }
         visitor.leaveCompositeTransform(this);
       } else {
+        LOG.debug("Visiting primitive node {}", this);
         visitor.visitPrimitiveTransform(this);
       }
 
@@ -538,12 +597,24 @@ public class TransformHierarchy {
         // Visit outputs.
         for (PValue pValue : outputs.values()) {
           if (visitedValues.add(pValue)) {
+            LOG.debug("Visiting output value {}", pValue);
             visitor.visitValue(pValue, this);
           }
         }
       }
     }
 
+    private boolean childNodeOf(Set<Node> nodes) {
+      if (isRootNode()) {
+        return false;
+      }
+      Node parent = this.getEnclosingNode();
+      while (!parent.isRootNode() && !nodes.contains(parent)) {
+        parent = parent.getEnclosingNode();
+      }
+      return nodes.contains(parent);
+    }
+
     /**
      * Finish specifying a transform.
      *

http://git-wip-us.apache.org/repos/asf/beam/blob/261e7df2/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
index 1197d1b..93650dd 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.runners;
 
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertThat;
@@ -32,6 +33,8 @@ import java.util.Map.Entry;
 import java.util.Set;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor.Defaults;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.io.CountingSource;
 import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.io.Read;
@@ -492,4 +495,198 @@ public class TransformHierarchyTest implements Serializable {
     assertThat(visitedPrimitiveNodes, containsInAnyOrder(upstreamNode, replacementParNode));
     assertThat(visitedValues, Matchers.<PValue>containsInAnyOrder(upstream, output));
   }
+
+  @Test
+  public void visitIsTopologicallyOrdered() {
+    PCollection<String> one =
+        PCollection.<String>createPrimitiveOutputInternal(
+                pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)
+            .setCoder(StringUtf8Coder.of());
+    final PCollection<Integer> two =
+        PCollection.<Integer>createPrimitiveOutputInternal(
+                pipeline, WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
+            .setCoder(VarIntCoder.of());
+    final PDone done = PDone.in(pipeline);
+    final TupleTag<String> oneTag = new TupleTag<String>() {};
+    final TupleTag<Integer> twoTag = new TupleTag<Integer>() {};
+    final PCollectionTuple oneAndTwo = PCollectionTuple.of(oneTag, one).and(twoTag, two);
+
+    PTransform<PCollection<String>, PDone> multiConsumer =
+        new PTransform<PCollection<String>, PDone>() {
+          @Override
+          public PDone expand(PCollection<String> input) {
+            return done;
+          }
+
+          @Override
+          public Map<TupleTag<?>, PValue> getAdditionalInputs() {
+            return Collections.<TupleTag<?>, PValue>singletonMap(twoTag, two);
+          }
+        };
+    hierarchy.pushNode("consumes_both", one, multiConsumer);
+    hierarchy.setOutput(done);
+    hierarchy.popNode();
+
+    final PTransform<PBegin, PCollectionTuple> producer =
+        new PTransform<PBegin, PCollectionTuple>() {
+          @Override
+          public PCollectionTuple expand(PBegin input) {
+            return oneAndTwo;
+          }
+        };
+    hierarchy.pushNode(
+        "encloses_producer",
+        PBegin.in(pipeline),
+        new PTransform<PBegin, PCollectionTuple>() {
+          @Override
+          public PCollectionTuple expand(PBegin input) {
+            return input.apply(producer);
+          }
+        });
+    hierarchy.pushNode(
+        "creates_one_and_two",
+        PBegin.in(pipeline), producer);
+    hierarchy.setOutput(oneAndTwo);
+    hierarchy.popNode();
+    hierarchy.setOutput(oneAndTwo);
+    hierarchy.popNode();
+
+    hierarchy.pushNode("second_copy_of_consumes_both", one, multiConsumer);
+    hierarchy.setOutput(done);
+    hierarchy.popNode();
+
+    final Set<Node> visitedNodes = new HashSet<>();
+    final Set<Node> exitedNodes = new HashSet<>();
+    final Set<PValue> visitedValues = new HashSet<>();
+    hierarchy.visit(
+        new PipelineVisitor.Defaults() {
+
+          @Override
+          public CompositeBehavior enterCompositeTransform(Node node) {
+            for (PValue input : node.getInputs().values()) {
+              assertThat(visitedValues, hasItem(input));
+            }
+            assertThat(
+                "Nodes should not be visited more than once", visitedNodes, not(hasItem(node)));
+            if (!node.isRootNode()) {
+              assertThat(
+                  "Nodes should always be visited after their enclosing nodes",
+                  visitedNodes,
+                  hasItem(node.getEnclosingNode()));
+            }
+            visitedNodes.add(node);
+            return CompositeBehavior.ENTER_TRANSFORM;
+          }
+
+          @Override
+          public void leaveCompositeTransform(Node node) {
+            assertThat(visitedNodes, hasItem(node));
+            if (!node.isRootNode()) {
+              assertThat(
+                  "Nodes should always be left before their enclosing nodes are left",
+                  exitedNodes,
+                  not(hasItem(node.getEnclosingNode())));
+            }
+            assertThat(exitedNodes, not(hasItem(node)));
+            exitedNodes.add(node);
+          }
+
+          @Override
+          public void visitPrimitiveTransform(Node node) {
+            assertThat(visitedNodes, hasItem(node.getEnclosingNode()));
+            assertThat(exitedNodes, not(hasItem(node.getEnclosingNode())));
+            assertThat(
+                "Nodes should not be visited more than once", visitedNodes, not(hasItem(node)));
+            for (PValue input : node.getInputs().values()) {
+              assertThat(visitedValues, hasItem(input));
+            }
+            visitedNodes.add(node);
+          }
+
+          @Override
+          public void visitValue(PValue value, Node producer) {
+            assertThat(visitedNodes, hasItem(producer));
+            assertThat(visitedValues, not(hasItem(value)));
+            visitedValues.add(value);
+          }
+        });
+    assertThat("Should have visited all the nodes", visitedNodes.size(), equalTo(5));
+    assertThat("Should have left all of the visited composites", exitedNodes.size(), equalTo(2));
+  }
+
+  @Test
+  public void visitDoesNotVisitSkippedNodes() {
+    PCollection<String> one =
+        PCollection.<String>createPrimitiveOutputInternal(
+                pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)
+            .setCoder(StringUtf8Coder.of());
+    final PCollection<Integer> two =
+        PCollection.<Integer>createPrimitiveOutputInternal(
+                pipeline, WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
+            .setCoder(VarIntCoder.of());
+    final PDone done = PDone.in(pipeline);
+    final TupleTag<String> oneTag = new TupleTag<String>() {};
+    final TupleTag<Integer> twoTag = new TupleTag<Integer>() {};
+    final PCollectionTuple oneAndTwo = PCollectionTuple.of(oneTag, one).and(twoTag, two);
+
+    hierarchy.pushNode(
+        "consumes_both",
+        one,
+        new PTransform<PCollection<String>, PDone>() {
+          @Override
+          public PDone expand(PCollection<String> input) {
+            return done;
+          }
+
+          @Override
+          public Map<TupleTag<?>, PValue> getAdditionalInputs() {
+            return Collections.<TupleTag<?>, PValue>singletonMap(twoTag, two);
+          }
+        });
+    hierarchy.setOutput(done);
+    hierarchy.popNode();
+
+    final PTransform<PBegin, PCollectionTuple> producer =
+        new PTransform<PBegin, PCollectionTuple>() {
+          @Override
+          public PCollectionTuple expand(PBegin input) {
+            return oneAndTwo;
+          }
+        };
+    final Node enclosing =
+        hierarchy.pushNode(
+            "encloses_producer",
+            PBegin.in(pipeline),
+            new PTransform<PBegin, PCollectionTuple>() {
+              @Override
+              public PCollectionTuple expand(PBegin input) {
+                return input.apply(producer);
+              }
+            });
+    Node enclosed = hierarchy.pushNode("creates_one_and_two", PBegin.in(pipeline), producer);
+    hierarchy.setOutput(oneAndTwo);
+    hierarchy.popNode();
+    hierarchy.setOutput(oneAndTwo);
+    hierarchy.popNode();
+
+    final Set<Node> visitedNodes = new HashSet<>();
+    hierarchy.visit(
+        new PipelineVisitor.Defaults() {
+          @Override
+          public CompositeBehavior enterCompositeTransform(Node node) {
+            visitedNodes.add(node);
+            return node.equals(enclosing)
+                ? CompositeBehavior.DO_NOT_ENTER_TRANSFORM
+                : CompositeBehavior.ENTER_TRANSFORM;
+          }
+
+          @Override
+          public void visitPrimitiveTransform(Node node) {
+            visitedNodes.add(node);
+          }
+        });
+
+    assertThat(visitedNodes, hasItem(enclosing));
+    assertThat(visitedNodes, not(hasItem(enclosed)));
+  }
 }