You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/06/12 16:55:56 UTC
[41/50] [abbrv] beam git commit: Visit a Transform Hierarchy in
Topological Order
Visit a Transform Hierarchy in Topological Order
This reverts commit 6ad6433ec0c02aec8656e9e3b27f6e0f974f8ece.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/261e7df2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/261e7df2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/261e7df2
Branch: refs/heads/gearpump-runner
Commit: 261e7df2b860fe82d9f401e2621b020fe2020fea
Parents: d2c4093
Author: Thomas Groh <tg...@google.com>
Authored: Tue Jun 6 16:15:19 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Jun 9 14:44:42 2017 -0700
----------------------------------------------------------------------
.../spark/translation/StorageLevelTest.java | 4 +-
.../beam/sdk/runners/TransformHierarchy.java | 79 +++++++-
.../sdk/runners/TransformHierarchyTest.java | 197 +++++++++++++++++++
3 files changed, 274 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/261e7df2/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java
index 8f2e681..8bd6dae 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java
@@ -58,12 +58,12 @@ public class StorageLevelTest {
@Test
public void test() throws Exception {
- PCollection<String> pCollection = pipeline.apply(Create.of("foo"));
+ PCollection<String> pCollection = pipeline.apply("CreateFoo", Create.of("foo"));
// by default, the Spark runner doesn't cache the RDD if it accessed only one time.
// So, to "force" the caching of the RDD, we have to call the RDD at least two time.
// That's why we are using Count fn on the PCollection.
- pCollection.apply(Count.<String>globally());
+ pCollection.apply("CountAll", Count.<String>globally());
PCollection<String> output = pCollection.apply(new StorageLevelPTransform());
http://git-wip-us.apache.org/repos/asf/beam/blob/261e7df2/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
index ee1ce7b..5e048eb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
@@ -208,7 +208,7 @@ public class TransformHierarchy {
public Set<PValue> visit(PipelineVisitor visitor) {
finishSpecifying();
Set<PValue> visitedValues = new HashSet<>();
- root.visit(visitor, visitedValues);
+ root.visit(visitor, visitedValues, new HashSet<Node>(), new HashSet<Node>());
return visitedValues;
}
@@ -503,10 +503,60 @@ public class TransformHierarchy {
/**
* Visit the transform node.
*
+ * <p>The visit proceeds in the following order:
+ *
+ * <ul>
+ * <li>Visit all input {@link PValue PValues} returned by the flattened expansion of {@link
+ * Node#getInputs()}.
+ * <li>If the node is a composite:
+ * <ul>
+ * <li>Enter the node via {@link PipelineVisitor#enterCompositeTransform(Node)}.
+ * <li>If the result of {@link PipelineVisitor#enterCompositeTransform(Node)} was {@link
+ * CompositeBehavior#ENTER_TRANSFORM}, visit each child node of this {@link Node}.
+ * <li>Leave the node via {@link PipelineVisitor#leaveCompositeTransform(Node)}.
+ * </ul>
+ * <li>If the node is a primitive, visit it via {@link
+ * PipelineVisitor#visitPrimitiveTransform(Node)}.
+ * <li>Visit each {@link PValue} that was output by this node.
+ * </ul>
+ *
+ * <p>Additionally, the following ordering restrictions are observed:
+ *
+ * <ul>
+ * <li>A {@link Node} will be visited after its enclosing node has been entered and before its
+ * enclosing node has been left
+ * <li>A {@link Node} will not be visited if any enclosing {@link Node} has returned {@link
+ * CompositeBehavior#DO_NOT_ENTER_TRANSFORM} from the call to {@link
+ * PipelineVisitor#enterCompositeTransform(Node)}.
+ * <li>A {@link PValue} will only be visited after the {@link Node} that originally produced
+ * it has been visited.
+ * </ul>
+ *
* <p>Provides an ordered visit of the input values, the primitive transform (or child nodes for
* composite transforms), then the output values.
*/
- private void visit(PipelineVisitor visitor, Set<PValue> visitedValues) {
+ private void visit(
+ PipelineVisitor visitor,
+ Set<PValue> visitedValues,
+ Set<Node> visitedNodes,
+ Set<Node> skippedComposites) {
+ if (getEnclosingNode() != null && !visitedNodes.contains(getEnclosingNode())) {
+ // Recursively enter all enclosing nodes, as appropriate.
+ getEnclosingNode().visit(visitor, visitedValues, visitedNodes, skippedComposites);
+ }
+ // These checks occur after visiting the enclosing node to ensure that if this node has been
+ // visited while visiting the enclosing node the node is not revisited, or, if an enclosing
+ // Node is skipped, this node is also skipped.
+ if (!visitedNodes.add(this)) {
+ LOG.debug("Not revisiting previously visited node {}", this);
+ return;
+ } else if (childNodeOf(skippedComposites)) {
+ // This node is a child of a node that has been passed over via CompositeBehavior, and
+ // should also be skipped. All child nodes of a skipped composite should always be skipped.
+ LOG.debug("Not revisiting Node {} which is a child of a previously passed composite", this);
+ return;
+ }
+
if (!finishedSpecifying) {
finishSpecifying();
}
@@ -514,22 +564,31 @@ public class TransformHierarchy {
if (!isRootNode()) {
// Visit inputs.
for (PValue inputValue : inputs.values()) {
+ Node valueProducer = getProducer(inputValue);
+ if (!visitedNodes.contains(valueProducer)) {
+ valueProducer.visit(visitor, visitedValues, visitedNodes, skippedComposites);
+ }
if (visitedValues.add(inputValue)) {
- visitor.visitValue(inputValue, getProducer(inputValue));
+ LOG.debug("Visiting input value {}", inputValue);
+ visitor.visitValue(inputValue, valueProducer);
}
}
}
if (isCompositeNode()) {
+ LOG.debug("Visiting composite node {}", this);
PipelineVisitor.CompositeBehavior recurse = visitor.enterCompositeTransform(this);
if (recurse.equals(CompositeBehavior.ENTER_TRANSFORM)) {
for (Node child : parts) {
- child.visit(visitor, visitedValues);
+ child.visit(visitor, visitedValues, visitedNodes, skippedComposites);
}
+ } else {
+ skippedComposites.add(this);
}
visitor.leaveCompositeTransform(this);
} else {
+ LOG.debug("Visiting primitive node {}", this);
visitor.visitPrimitiveTransform(this);
}
@@ -538,12 +597,24 @@ public class TransformHierarchy {
// Visit outputs.
for (PValue pValue : outputs.values()) {
if (visitedValues.add(pValue)) {
+ LOG.debug("Visiting output value {}", pValue);
visitor.visitValue(pValue, this);
}
}
}
}
+ private boolean childNodeOf(Set<Node> nodes) {
+ if (isRootNode()) {
+ return false;
+ }
+ Node parent = this.getEnclosingNode();
+ while (!parent.isRootNode() && !nodes.contains(parent)) {
+ parent = parent.getEnclosingNode();
+ }
+ return nodes.contains(parent);
+ }
+
/**
* Finish specifying a transform.
*
http://git-wip-us.apache.org/repos/asf/beam/blob/261e7df2/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
index 1197d1b..93650dd 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.runners;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThat;
@@ -32,6 +33,8 @@ import java.util.Map.Entry;
import java.util.Set;
import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.Pipeline.PipelineVisitor.Defaults;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.CountingSource;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.Read;
@@ -492,4 +495,198 @@ public class TransformHierarchyTest implements Serializable {
assertThat(visitedPrimitiveNodes, containsInAnyOrder(upstreamNode, replacementParNode));
assertThat(visitedValues, Matchers.<PValue>containsInAnyOrder(upstream, output));
}
+
+ @Test
+ public void visitIsTopologicallyOrdered() {
+ PCollection<String> one =
+ PCollection.<String>createPrimitiveOutputInternal(
+ pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)
+ .setCoder(StringUtf8Coder.of());
+ final PCollection<Integer> two =
+ PCollection.<Integer>createPrimitiveOutputInternal(
+ pipeline, WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
+ .setCoder(VarIntCoder.of());
+ final PDone done = PDone.in(pipeline);
+ final TupleTag<String> oneTag = new TupleTag<String>() {};
+ final TupleTag<Integer> twoTag = new TupleTag<Integer>() {};
+ final PCollectionTuple oneAndTwo = PCollectionTuple.of(oneTag, one).and(twoTag, two);
+
+ PTransform<PCollection<String>, PDone> multiConsumer =
+ new PTransform<PCollection<String>, PDone>() {
+ @Override
+ public PDone expand(PCollection<String> input) {
+ return done;
+ }
+
+ @Override
+ public Map<TupleTag<?>, PValue> getAdditionalInputs() {
+ return Collections.<TupleTag<?>, PValue>singletonMap(twoTag, two);
+ }
+ };
+ hierarchy.pushNode("consumes_both", one, multiConsumer);
+ hierarchy.setOutput(done);
+ hierarchy.popNode();
+
+ final PTransform<PBegin, PCollectionTuple> producer =
+ new PTransform<PBegin, PCollectionTuple>() {
+ @Override
+ public PCollectionTuple expand(PBegin input) {
+ return oneAndTwo;
+ }
+ };
+ hierarchy.pushNode(
+ "encloses_producer",
+ PBegin.in(pipeline),
+ new PTransform<PBegin, PCollectionTuple>() {
+ @Override
+ public PCollectionTuple expand(PBegin input) {
+ return input.apply(producer);
+ }
+ });
+ hierarchy.pushNode(
+ "creates_one_and_two",
+ PBegin.in(pipeline), producer);
+ hierarchy.setOutput(oneAndTwo);
+ hierarchy.popNode();
+ hierarchy.setOutput(oneAndTwo);
+ hierarchy.popNode();
+
+ hierarchy.pushNode("second_copy_of_consumes_both", one, multiConsumer);
+ hierarchy.setOutput(done);
+ hierarchy.popNode();
+
+ final Set<Node> visitedNodes = new HashSet<>();
+ final Set<Node> exitedNodes = new HashSet<>();
+ final Set<PValue> visitedValues = new HashSet<>();
+ hierarchy.visit(
+ new PipelineVisitor.Defaults() {
+
+ @Override
+ public CompositeBehavior enterCompositeTransform(Node node) {
+ for (PValue input : node.getInputs().values()) {
+ assertThat(visitedValues, hasItem(input));
+ }
+ assertThat(
+ "Nodes should not be visited more than once", visitedNodes, not(hasItem(node)));
+ if (!node.isRootNode()) {
+ assertThat(
+ "Nodes should always be visited after their enclosing nodes",
+ visitedNodes,
+ hasItem(node.getEnclosingNode()));
+ }
+ visitedNodes.add(node);
+ return CompositeBehavior.ENTER_TRANSFORM;
+ }
+
+ @Override
+ public void leaveCompositeTransform(Node node) {
+ assertThat(visitedNodes, hasItem(node));
+ if (!node.isRootNode()) {
+ assertThat(
+ "Nodes should always be left before their enclosing nodes are left",
+ exitedNodes,
+ not(hasItem(node.getEnclosingNode())));
+ }
+ assertThat(exitedNodes, not(hasItem(node)));
+ exitedNodes.add(node);
+ }
+
+ @Override
+ public void visitPrimitiveTransform(Node node) {
+ assertThat(visitedNodes, hasItem(node.getEnclosingNode()));
+ assertThat(exitedNodes, not(hasItem(node.getEnclosingNode())));
+ assertThat(
+ "Nodes should not be visited more than once", visitedNodes, not(hasItem(node)));
+ for (PValue input : node.getInputs().values()) {
+ assertThat(visitedValues, hasItem(input));
+ }
+ visitedNodes.add(node);
+ }
+
+ @Override
+ public void visitValue(PValue value, Node producer) {
+ assertThat(visitedNodes, hasItem(producer));
+ assertThat(visitedValues, not(hasItem(value)));
+ visitedValues.add(value);
+ }
+ });
+ assertThat("Should have visited all the nodes", visitedNodes.size(), equalTo(5));
+ assertThat("Should have left all of the visited composites", exitedNodes.size(), equalTo(2));
+ }
+
+ @Test
+ public void visitDoesNotVisitSkippedNodes() {
+ PCollection<String> one =
+ PCollection.<String>createPrimitiveOutputInternal(
+ pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)
+ .setCoder(StringUtf8Coder.of());
+ final PCollection<Integer> two =
+ PCollection.<Integer>createPrimitiveOutputInternal(
+ pipeline, WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
+ .setCoder(VarIntCoder.of());
+ final PDone done = PDone.in(pipeline);
+ final TupleTag<String> oneTag = new TupleTag<String>() {};
+ final TupleTag<Integer> twoTag = new TupleTag<Integer>() {};
+ final PCollectionTuple oneAndTwo = PCollectionTuple.of(oneTag, one).and(twoTag, two);
+
+ hierarchy.pushNode(
+ "consumes_both",
+ one,
+ new PTransform<PCollection<String>, PDone>() {
+ @Override
+ public PDone expand(PCollection<String> input) {
+ return done;
+ }
+
+ @Override
+ public Map<TupleTag<?>, PValue> getAdditionalInputs() {
+ return Collections.<TupleTag<?>, PValue>singletonMap(twoTag, two);
+ }
+ });
+ hierarchy.setOutput(done);
+ hierarchy.popNode();
+
+ final PTransform<PBegin, PCollectionTuple> producer =
+ new PTransform<PBegin, PCollectionTuple>() {
+ @Override
+ public PCollectionTuple expand(PBegin input) {
+ return oneAndTwo;
+ }
+ };
+ final Node enclosing =
+ hierarchy.pushNode(
+ "encloses_producer",
+ PBegin.in(pipeline),
+ new PTransform<PBegin, PCollectionTuple>() {
+ @Override
+ public PCollectionTuple expand(PBegin input) {
+ return input.apply(producer);
+ }
+ });
+ Node enclosed = hierarchy.pushNode("creates_one_and_two", PBegin.in(pipeline), producer);
+ hierarchy.setOutput(oneAndTwo);
+ hierarchy.popNode();
+ hierarchy.setOutput(oneAndTwo);
+ hierarchy.popNode();
+
+ final Set<Node> visitedNodes = new HashSet<>();
+ hierarchy.visit(
+ new PipelineVisitor.Defaults() {
+ @Override
+ public CompositeBehavior enterCompositeTransform(Node node) {
+ visitedNodes.add(node);
+ return node.equals(enclosing)
+ ? CompositeBehavior.DO_NOT_ENTER_TRANSFORM
+ : CompositeBehavior.ENTER_TRANSFORM;
+ }
+
+ @Override
+ public void visitPrimitiveTransform(Node node) {
+ visitedNodes.add(node);
+ }
+ });
+
+ assertThat(visitedNodes, hasItem(enclosing));
+ assertThat(visitedNodes, not(hasItem(enclosed)));
+ }
}