You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/05/26 16:23:54 UTC

[3/4] beam git commit: Visit a Transform Hierarchy in Topological Order

Visit a Transform Hierarchy in Topological Order

Always ensure that the producer of a value is visited before that value
is visited for the first time. Visit a compoosite before visiting any of
its child nodes.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bd1dfdf3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bd1dfdf3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bd1dfdf3

Branch: refs/heads/master
Commit: bd1dfdf3c8e145a99bcacebd0c64dcf6580f3ffe
Parents: 7568f02
Author: Thomas Groh <tg...@google.com>
Authored: Tue May 23 13:29:51 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri May 26 07:50:37 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/spark/SparkRunner.java  | 13 +++
 .../spark/translation/BoundedDataset.java       |  6 ++
 .../spark/translation/TransformTranslator.java  |  1 +
 .../spark/translation/StorageLevelTest.java     |  4 +-
 .../beam/sdk/runners/TransformHierarchy.java    | 46 ++++++++++-
 .../sdk/runners/TransformHierarchyTest.java     | 86 ++++++++++++++++++++
 6 files changed, 150 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/bd1dfdf3/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index 9e2426e..1be5e13 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -21,8 +21,10 @@ package org.apache.beam.runners.spark;
 import com.google.common.collect.Iterables;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -49,6 +51,7 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
@@ -336,6 +339,8 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
     protected final EvaluationContext ctxt;
     protected final SparkPipelineTranslator translator;
 
+    private final Set<Node> shouldIgnoreChildren = new HashSet<>();
+
     public Evaluator(SparkPipelineTranslator translator, EvaluationContext ctxt) {
       this.translator = translator;
       this.ctxt = ctxt;
@@ -351,6 +356,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
           LOG.info("Entering directly-translatable composite transform: '{}'", node.getFullName());
           LOG.debug("Composite transform class: '{}'", transformClass);
           doVisitTransform(node);
+          shouldIgnoreChildren.add(node);
           return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
         }
       }
@@ -392,6 +398,13 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
 
     @Override
     public void visitPrimitiveTransform(TransformHierarchy.Node node) {
+      Node parent = node.getEnclosingNode();
+      while (!parent.isRootNode()) {
+        if (shouldIgnoreChildren.contains(parent)) {
+          return;
+        }
+        parent = parent.getEnclosingNode();
+      }
       doVisitTransform(node);
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/bd1dfdf3/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
index 652c753..a746634 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
@@ -19,6 +19,7 @@
 package org.apache.beam.runners.spark.translation;
 
 import com.google.common.base.Function;
+import com.google.common.base.Joiner;
 import com.google.common.collect.Iterables;
 import java.util.List;
 import javax.annotation.Nullable;
@@ -97,8 +98,13 @@ public class BoundedDataset<T> implements Dataset {
     return windowedValues;
   }
 
+  int timesCached = 0;
   @Override
   public void cache(String storageLevel) {
+    System.out.printf(
+        "Persisting Dataset %s for RDD %s (id %s) at level %s. %s times before%n",
+        this, getRDD(), getRDD().toDebugString(), storageLevel, timesCached++);
+    System.out.println(Joiner.on("\n\t").join(new Throwable().getStackTrace()));
     // populate the rdd if needed
     getRDD().persist(StorageLevel.fromString(storageLevel));
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/bd1dfdf3/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index 742ea83..6ca12c9 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -407,6 +407,7 @@ public final class TransformTranslator {
         JavaRDD<WindowedValue<T>> input = new SourceRDD.Bounded<>(
             jsc.sc(), transform.getSource(), runtimeContext, stepName).toJavaRDD();
         // cache to avoid re-evaluation of the source by Spark's lazy DAG evaluation.
+        System.out.println("Evaluating Bounded Read " + transform);
         context.putDataset(transform, new BoundedDataset<>(input.cache()));
       }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/bd1dfdf3/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java
index 8f2e681..8bd6dae 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java
@@ -58,12 +58,12 @@ public class StorageLevelTest {
 
   @Test
   public void test() throws Exception {
-    PCollection<String> pCollection = pipeline.apply(Create.of("foo"));
+    PCollection<String> pCollection = pipeline.apply("CreateFoo", Create.of("foo"));
 
     // by default, the Spark runner doesn't cache the RDD if it accessed only one time.
     // So, to "force" the caching of the RDD, we have to call the RDD at least two time.
     // That's why we are using Count fn on the PCollection.
-    pCollection.apply(Count.<String>globally());
+    pCollection.apply("CountAll", Count.<String>globally());
 
     PCollection<String> output = pCollection.apply(new StorageLevelPTransform());
 

http://git-wip-us.apache.org/repos/asf/beam/blob/bd1dfdf3/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
index 2f0e8ef..630d24c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
@@ -202,10 +202,12 @@ public class TransformHierarchy {
     return producers.get(produced);
   }
 
+  int traversed = 0;
   public Set<PValue> visit(PipelineVisitor visitor) {
     finishSpecifying();
     Set<PValue> visitedValues = new HashSet<>();
-    root.visit(visitor, visitedValues);
+    traversed++;
+    root.visit(visitor, visitedValues, new HashSet<Node>(), new HashSet<Node>());
     return visitedValues;
   }
 
@@ -462,7 +464,22 @@ public class TransformHierarchy {
      * <p>Provides an ordered visit of the input values, the primitive transform (or child nodes for
      * composite transforms), then the output values.
      */
-    private void visit(PipelineVisitor visitor, Set<PValue> visitedValues) {
+    private void visit(
+        PipelineVisitor visitor,
+        Set<PValue> visitedValues,
+        Set<Node> visitedNodes,
+        Set<Node> passedComposites) {
+      if (getEnclosingNode() != null && !visitedNodes.contains(getEnclosingNode())) {
+        getEnclosingNode().visit(visitor, visitedValues, visitedNodes, passedComposites);
+      }
+      if (!visitedNodes.add(this)) {
+        LOG.debug("Not revisiting previously visited node {}", this);
+        return;
+      } else if (childNodeOf(passedComposites)) {
+        LOG.debug("Not revisiting Node {} which is a child of a previously passed composite", this);
+        return;
+      }
+
       if (!finishedSpecifying) {
         finishSpecifying();
       }
@@ -470,22 +487,31 @@ public class TransformHierarchy {
       if (!isRootNode()) {
         // Visit inputs.
         for (PValue inputValue : inputs.values()) {
+          Node valueProducer = getProducer(inputValue);
+          if (!visitedNodes.contains(valueProducer)) {
+            valueProducer.visit(visitor, visitedValues, visitedNodes, passedComposites);
+          }
           if (visitedValues.add(inputValue)) {
-            visitor.visitValue(inputValue, getProducer(inputValue));
+            LOG.debug("Visiting input value {}", inputValue);
+            visitor.visitValue(inputValue, valueProducer);
           }
         }
       }
 
       if (isCompositeNode()) {
+        LOG.debug("Visiting composite node {}", this);
         PipelineVisitor.CompositeBehavior recurse = visitor.enterCompositeTransform(this);
 
         if (recurse.equals(CompositeBehavior.ENTER_TRANSFORM)) {
           for (Node child : parts) {
-            child.visit(visitor, visitedValues);
+            child.visit(visitor, visitedValues, visitedNodes, passedComposites);
           }
+        } else {
+          passedComposites.add(this);
         }
         visitor.leaveCompositeTransform(this);
       } else {
+        LOG.debug("Visiting primitive node {}", this);
         visitor.visitPrimitiveTransform(this);
       }
 
@@ -494,12 +520,24 @@ public class TransformHierarchy {
         // Visit outputs.
         for (PValue pValue : outputs.values()) {
           if (visitedValues.add(pValue)) {
+            LOG.debug("Visiting output value {}", pValue);
             visitor.visitValue(pValue, this);
           }
         }
       }
     }
 
+    private boolean childNodeOf(Set<Node> nodes) {
+      if (isRootNode()) {
+        return false;
+      }
+      Node parent = this.getEnclosingNode();
+      while (!parent.isRootNode() && !nodes.contains(parent)) {
+        parent = parent.getEnclosingNode();
+      }
+      return nodes.contains(parent);
+    }
+
     /**
      * Finish specifying a transform.
      *

http://git-wip-us.apache.org/repos/asf/beam/blob/bd1dfdf3/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
index 1197d1b..2fe2817 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.runners;
 
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertThat;
@@ -32,6 +33,8 @@ import java.util.Map.Entry;
 import java.util.Set;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor.Defaults;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.io.CountingSource;
 import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.io.Read;
@@ -492,4 +495,87 @@ public class TransformHierarchyTest implements Serializable {
     assertThat(visitedPrimitiveNodes, containsInAnyOrder(upstreamNode, replacementParNode));
     assertThat(visitedValues, Matchers.<PValue>containsInAnyOrder(upstream, output));
   }
+
+  @Test
+  public void visitIsTopologicallyOrdered() {
+    PCollection<String> one =
+        PCollection.<String>createPrimitiveOutputInternal(
+                pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)
+            .setCoder(StringUtf8Coder.of());
+    final PCollection<Integer> two =
+        PCollection.<Integer>createPrimitiveOutputInternal(
+                pipeline, WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
+            .setCoder(VarIntCoder.of());
+    final PDone done = PDone.in(pipeline);
+    final TupleTag<String> oneTag = new TupleTag<String>() {};
+    final TupleTag<Integer> twoTag = new TupleTag<Integer>() {};
+    final PCollectionTuple oneAndTwo = PCollectionTuple.of(oneTag, one).and(twoTag, two);
+
+    hierarchy.pushNode("consumes_both", one, new PTransform<PCollection<String>, PDone>() {
+      @Override
+      public PDone expand(PCollection<String> input) {
+        return done;
+      }
+
+      @Override
+      public Map<TupleTag<?>, PValue> getAdditionalInputs() {
+        return Collections.<TupleTag<?>, PValue>singletonMap(twoTag, two);
+      }
+    });
+    hierarchy.setOutput(done);
+    hierarchy.popNode();
+
+    final PTransform<PBegin, PCollectionTuple> producer =
+        new PTransform<PBegin, PCollectionTuple>() {
+          @Override
+          public PCollectionTuple expand(PBegin input) {
+            return oneAndTwo;
+          }
+        };
+    hierarchy.pushNode(
+        "encloses_producer",
+        PBegin.in(pipeline),
+        new PTransform<PBegin, PCollectionTuple>() {
+          @Override
+          public PCollectionTuple expand(PBegin input) {
+            return input.apply(producer);
+          }
+        });
+    hierarchy.pushNode(
+        "creates_one_and_two",
+        PBegin.in(pipeline), producer);
+    hierarchy.setOutput(oneAndTwo);
+    hierarchy.popNode();
+    hierarchy.setOutput(oneAndTwo);
+    hierarchy.popNode();
+
+    hierarchy.visit(new PipelineVisitor.Defaults() {
+      private final Set<Node> visitedNodes = new HashSet<>();
+      private final Set<PValue> visitedValues = new HashSet<>();
+      @Override
+      public CompositeBehavior enterCompositeTransform(Node node) {
+        for (PValue input : node.getInputs().values()) {
+          assertThat(visitedValues, hasItem(input));
+        }
+        visitedNodes.add(node);
+        return CompositeBehavior.ENTER_TRANSFORM;
+      }
+
+      @Override
+      public void visitPrimitiveTransform(Node node) {
+        assertThat(visitedNodes, hasItem(node.getEnclosingNode()));
+        for (PValue input : node.getInputs().values()) {
+          assertThat(visitedValues, hasItem(input));
+        }
+        visitedNodes.add(node);
+      }
+
+      @Override
+      public void visitValue(PValue value, Node producer) {
+        assertThat(visitedNodes, hasItem(producer));
+        assertThat(visitedValues, not(hasItem(value)));
+        visitedValues.add(value);
+      }
+    });
+  }
 }