You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/05/26 16:23:54 UTC
[3/4] beam git commit: Visit a Transform Hierarchy in Topological
Order
Visit a Transform Hierarchy in Topological Order
Always ensure that the producer of a value is visited before that value
is visited for the first time. Visit a compoosite before visiting any of
its child nodes.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bd1dfdf3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bd1dfdf3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bd1dfdf3
Branch: refs/heads/master
Commit: bd1dfdf3c8e145a99bcacebd0c64dcf6580f3ffe
Parents: 7568f02
Author: Thomas Groh <tg...@google.com>
Authored: Tue May 23 13:29:51 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri May 26 07:50:37 2017 -0700
----------------------------------------------------------------------
.../apache/beam/runners/spark/SparkRunner.java | 13 +++
.../spark/translation/BoundedDataset.java | 6 ++
.../spark/translation/TransformTranslator.java | 1 +
.../spark/translation/StorageLevelTest.java | 4 +-
.../beam/sdk/runners/TransformHierarchy.java | 46 ++++++++++-
.../sdk/runners/TransformHierarchyTest.java | 86 ++++++++++++++++++++
6 files changed, 150 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/bd1dfdf3/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index 9e2426e..1be5e13 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -21,8 +21,10 @@ package org.apache.beam.runners.spark;
import com.google.common.collect.Iterables;
import java.util.Arrays;
import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -49,6 +51,7 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
@@ -336,6 +339,8 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
protected final EvaluationContext ctxt;
protected final SparkPipelineTranslator translator;
+ private final Set<Node> shouldIgnoreChildren = new HashSet<>();
+
public Evaluator(SparkPipelineTranslator translator, EvaluationContext ctxt) {
this.translator = translator;
this.ctxt = ctxt;
@@ -351,6 +356,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
LOG.info("Entering directly-translatable composite transform: '{}'", node.getFullName());
LOG.debug("Composite transform class: '{}'", transformClass);
doVisitTransform(node);
+ shouldIgnoreChildren.add(node);
return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
}
}
@@ -392,6 +398,13 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
@Override
public void visitPrimitiveTransform(TransformHierarchy.Node node) {
+ Node parent = node.getEnclosingNode();
+ while (!parent.isRootNode()) {
+ if (shouldIgnoreChildren.contains(parent)) {
+ return;
+ }
+ parent = parent.getEnclosingNode();
+ }
doVisitTransform(node);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/bd1dfdf3/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
index 652c753..a746634 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
@@ -19,6 +19,7 @@
package org.apache.beam.runners.spark.translation;
import com.google.common.base.Function;
+import com.google.common.base.Joiner;
import com.google.common.collect.Iterables;
import java.util.List;
import javax.annotation.Nullable;
@@ -97,8 +98,13 @@ public class BoundedDataset<T> implements Dataset {
return windowedValues;
}
+ int timesCached = 0;
@Override
public void cache(String storageLevel) {
+ System.out.printf(
+ "Persisting Dataset %s for RDD %s (id %s) at level %s. %s times before%n",
+ this, getRDD(), getRDD().toDebugString(), storageLevel, timesCached++);
+ System.out.println(Joiner.on("\n\t").join(new Throwable().getStackTrace()));
// populate the rdd if needed
getRDD().persist(StorageLevel.fromString(storageLevel));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/bd1dfdf3/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index 742ea83..6ca12c9 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -407,6 +407,7 @@ public final class TransformTranslator {
JavaRDD<WindowedValue<T>> input = new SourceRDD.Bounded<>(
jsc.sc(), transform.getSource(), runtimeContext, stepName).toJavaRDD();
// cache to avoid re-evaluation of the source by Spark's lazy DAG evaluation.
+ System.out.println("Evaluating Bounded Read " + transform);
context.putDataset(transform, new BoundedDataset<>(input.cache()));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/bd1dfdf3/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java
index 8f2e681..8bd6dae 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java
@@ -58,12 +58,12 @@ public class StorageLevelTest {
@Test
public void test() throws Exception {
- PCollection<String> pCollection = pipeline.apply(Create.of("foo"));
+ PCollection<String> pCollection = pipeline.apply("CreateFoo", Create.of("foo"));
// by default, the Spark runner doesn't cache the RDD if it accessed only one time.
// So, to "force" the caching of the RDD, we have to call the RDD at least two time.
// That's why we are using Count fn on the PCollection.
- pCollection.apply(Count.<String>globally());
+ pCollection.apply("CountAll", Count.<String>globally());
PCollection<String> output = pCollection.apply(new StorageLevelPTransform());
http://git-wip-us.apache.org/repos/asf/beam/blob/bd1dfdf3/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
index 2f0e8ef..630d24c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
@@ -202,10 +202,12 @@ public class TransformHierarchy {
return producers.get(produced);
}
+ int traversed = 0;
public Set<PValue> visit(PipelineVisitor visitor) {
finishSpecifying();
Set<PValue> visitedValues = new HashSet<>();
- root.visit(visitor, visitedValues);
+ traversed++;
+ root.visit(visitor, visitedValues, new HashSet<Node>(), new HashSet<Node>());
return visitedValues;
}
@@ -462,7 +464,22 @@ public class TransformHierarchy {
* <p>Provides an ordered visit of the input values, the primitive transform (or child nodes for
* composite transforms), then the output values.
*/
- private void visit(PipelineVisitor visitor, Set<PValue> visitedValues) {
+ private void visit(
+ PipelineVisitor visitor,
+ Set<PValue> visitedValues,
+ Set<Node> visitedNodes,
+ Set<Node> passedComposites) {
+ if (getEnclosingNode() != null && !visitedNodes.contains(getEnclosingNode())) {
+ getEnclosingNode().visit(visitor, visitedValues, visitedNodes, passedComposites);
+ }
+ if (!visitedNodes.add(this)) {
+ LOG.debug("Not revisiting previously visited node {}", this);
+ return;
+ } else if (childNodeOf(passedComposites)) {
+ LOG.debug("Not revisiting Node {} which is a child of a previously passed composite", this);
+ return;
+ }
+
if (!finishedSpecifying) {
finishSpecifying();
}
@@ -470,22 +487,31 @@ public class TransformHierarchy {
if (!isRootNode()) {
// Visit inputs.
for (PValue inputValue : inputs.values()) {
+ Node valueProducer = getProducer(inputValue);
+ if (!visitedNodes.contains(valueProducer)) {
+ valueProducer.visit(visitor, visitedValues, visitedNodes, passedComposites);
+ }
if (visitedValues.add(inputValue)) {
- visitor.visitValue(inputValue, getProducer(inputValue));
+ LOG.debug("Visiting input value {}", inputValue);
+ visitor.visitValue(inputValue, valueProducer);
}
}
}
if (isCompositeNode()) {
+ LOG.debug("Visiting composite node {}", this);
PipelineVisitor.CompositeBehavior recurse = visitor.enterCompositeTransform(this);
if (recurse.equals(CompositeBehavior.ENTER_TRANSFORM)) {
for (Node child : parts) {
- child.visit(visitor, visitedValues);
+ child.visit(visitor, visitedValues, visitedNodes, passedComposites);
}
+ } else {
+ passedComposites.add(this);
}
visitor.leaveCompositeTransform(this);
} else {
+ LOG.debug("Visiting primitive node {}", this);
visitor.visitPrimitiveTransform(this);
}
@@ -494,12 +520,24 @@ public class TransformHierarchy {
// Visit outputs.
for (PValue pValue : outputs.values()) {
if (visitedValues.add(pValue)) {
+ LOG.debug("Visiting output value {}", pValue);
visitor.visitValue(pValue, this);
}
}
}
}
+ private boolean childNodeOf(Set<Node> nodes) {
+ if (isRootNode()) {
+ return false;
+ }
+ Node parent = this.getEnclosingNode();
+ while (!parent.isRootNode() && !nodes.contains(parent)) {
+ parent = parent.getEnclosingNode();
+ }
+ return nodes.contains(parent);
+ }
+
/**
* Finish specifying a transform.
*
http://git-wip-us.apache.org/repos/asf/beam/blob/bd1dfdf3/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
index 1197d1b..2fe2817 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.runners;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThat;
@@ -32,6 +33,8 @@ import java.util.Map.Entry;
import java.util.Set;
import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.Pipeline.PipelineVisitor.Defaults;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.CountingSource;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.Read;
@@ -492,4 +495,87 @@ public class TransformHierarchyTest implements Serializable {
assertThat(visitedPrimitiveNodes, containsInAnyOrder(upstreamNode, replacementParNode));
assertThat(visitedValues, Matchers.<PValue>containsInAnyOrder(upstream, output));
}
+
+ @Test
+ public void visitIsTopologicallyOrdered() {
+ PCollection<String> one =
+ PCollection.<String>createPrimitiveOutputInternal(
+ pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)
+ .setCoder(StringUtf8Coder.of());
+ final PCollection<Integer> two =
+ PCollection.<Integer>createPrimitiveOutputInternal(
+ pipeline, WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
+ .setCoder(VarIntCoder.of());
+ final PDone done = PDone.in(pipeline);
+ final TupleTag<String> oneTag = new TupleTag<String>() {};
+ final TupleTag<Integer> twoTag = new TupleTag<Integer>() {};
+ final PCollectionTuple oneAndTwo = PCollectionTuple.of(oneTag, one).and(twoTag, two);
+
+ hierarchy.pushNode("consumes_both", one, new PTransform<PCollection<String>, PDone>() {
+ @Override
+ public PDone expand(PCollection<String> input) {
+ return done;
+ }
+
+ @Override
+ public Map<TupleTag<?>, PValue> getAdditionalInputs() {
+ return Collections.<TupleTag<?>, PValue>singletonMap(twoTag, two);
+ }
+ });
+ hierarchy.setOutput(done);
+ hierarchy.popNode();
+
+ final PTransform<PBegin, PCollectionTuple> producer =
+ new PTransform<PBegin, PCollectionTuple>() {
+ @Override
+ public PCollectionTuple expand(PBegin input) {
+ return oneAndTwo;
+ }
+ };
+ hierarchy.pushNode(
+ "encloses_producer",
+ PBegin.in(pipeline),
+ new PTransform<PBegin, PCollectionTuple>() {
+ @Override
+ public PCollectionTuple expand(PBegin input) {
+ return input.apply(producer);
+ }
+ });
+ hierarchy.pushNode(
+ "creates_one_and_two",
+ PBegin.in(pipeline), producer);
+ hierarchy.setOutput(oneAndTwo);
+ hierarchy.popNode();
+ hierarchy.setOutput(oneAndTwo);
+ hierarchy.popNode();
+
+ hierarchy.visit(new PipelineVisitor.Defaults() {
+ private final Set<Node> visitedNodes = new HashSet<>();
+ private final Set<PValue> visitedValues = new HashSet<>();
+ @Override
+ public CompositeBehavior enterCompositeTransform(Node node) {
+ for (PValue input : node.getInputs().values()) {
+ assertThat(visitedValues, hasItem(input));
+ }
+ visitedNodes.add(node);
+ return CompositeBehavior.ENTER_TRANSFORM;
+ }
+
+ @Override
+ public void visitPrimitiveTransform(Node node) {
+ assertThat(visitedNodes, hasItem(node.getEnclosingNode()));
+ for (PValue input : node.getInputs().values()) {
+ assertThat(visitedValues, hasItem(input));
+ }
+ visitedNodes.add(node);
+ }
+
+ @Override
+ public void visitValue(PValue value, Node producer) {
+ assertThat(visitedNodes, hasItem(producer));
+ assertThat(visitedValues, not(hasItem(value)));
+ visitedValues.add(value);
+ }
+ });
+ }
}