You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2016/12/01 20:56:49 UTC
[1/2] incubator-beam git commit: Move TransformHierarchy Maintenance
into it
Repository: incubator-beam
Updated Branches:
refs/heads/master 0c875ba70 -> 48130f718
Move TransformHierarchy Maintenance into it
This reduces the complexity of Pipeline.applyInternal by keeping the
responsiblities to passing a node into the Transform Hierarchy,
enforcing name uniqueness, and causing the runner to expand the
PTransform. This logic is moved to the appropriate application sites.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ab1f1ad0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ab1f1ad0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ab1f1ad0
Branch: refs/heads/master
Commit: ab1f1ad012bc559cdb099319a516e4437eed2825
Parents: 0c875ba
Author: Thomas Groh <tg...@google.com>
Authored: Tue Nov 29 14:29:47 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Thu Dec 1 12:55:25 2016 -0800
----------------------------------------------------------------------
.../direct/KeyedPValueTrackingVisitor.java | 2 +-
.../DataflowPipelineTranslatorTest.java | 2 +-
.../main/java/org/apache/beam/sdk/Pipeline.java | 117 +++---------
.../beam/sdk/runners/TransformHierarchy.java | 126 ++++++++-----
.../beam/sdk/runners/TransformTreeNode.java | 165 +++++++++--------
.../sdk/runners/TransformHierarchyTest.java | 180 ++++++++++++++-----
.../beam/sdk/runners/TransformTreeTest.java | 4 +-
7 files changed, 340 insertions(+), 256 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ab1f1ad0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
index 7c4376a..47b0857 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
@@ -74,7 +74,7 @@ class KeyedPValueTrackingVisitor implements PipelineVisitor {
if (node.isRootNode()) {
finalized = true;
} else if (producesKeyedOutputs.contains(node.getTransform().getClass())) {
- keyedValues.addAll(node.getExpandedOutputs());
+ keyedValues.addAll(node.getOutput().expand());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ab1f1ad0/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index c925454..95c7132 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -669,7 +669,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
PCollection<Integer> input = p.begin()
.apply(Create.of(1, 2, 3));
- thrown.expect(IllegalStateException.class);
+ thrown.expect(IllegalArgumentException.class);
input.apply(new PartiallyBoundOutputCreator());
Assert.fail("Failure expected from use of partially bound output");
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ab1f1ad0/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
index 9edf496..c8a4439 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
@@ -17,10 +17,11 @@
*/
package org.apache.beam.sdk;
+import static com.google.common.base.Preconditions.checkState;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.beam.sdk.coders.CoderRegistry;
@@ -31,7 +32,6 @@ import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.runners.TransformTreeNode;
import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.UserCodeException;
@@ -282,14 +282,12 @@ public class Pipeline {
* <p>Typically invoked by {@link PipelineRunner} subclasses.
*/
public void traverseTopologically(PipelineVisitor visitor) {
- Set<PValue> visitedValues = new HashSet<>();
- // Visit all the transforms, which should implicitly visit all the values.
- transforms.visit(visitor, visitedValues);
- if (!visitedValues.containsAll(values)) {
- throw new RuntimeException(
- "internal error: should have visited all the values "
- + "after visiting all the transforms");
- }
+ Set<PValue> visitedValues =
+ // Visit all the transforms, which should implicitly visit all the values.
+ transforms.visit(visitor);
+ checkState(
+ visitedValues.containsAll(values),
+ "internal error: should have visited all the values after visiting all the transforms");
}
/**
@@ -351,53 +349,43 @@ public class Pipeline {
*
* @see Pipeline#apply
*/
- private <InputT extends PInput, OutputT extends POutput>
- OutputT applyInternal(String name, InputT input,
- PTransform<? super InputT, OutputT> transform) {
- input.finishSpecifying();
+ private <InputT extends PInput, OutputT extends POutput> OutputT applyInternal(
+ String name, InputT input, PTransform<? super InputT, OutputT> transform) {
+ String namePrefix = transforms.getCurrent().getFullName();
+ String uniqueName = uniquifyInternal(namePrefix, name);
- TransformTreeNode parent = transforms.getCurrent();
- String namePrefix = parent.getFullName();
- String fullName = uniquifyInternal(namePrefix, name);
-
- boolean nameIsUnique = fullName.equals(buildName(namePrefix, name));
+ boolean nameIsUnique = uniqueName.equals(buildName(namePrefix, name));
if (!nameIsUnique) {
switch (getOptions().getStableUniqueNames()) {
case OFF:
break;
case WARNING:
- LOG.warn("Transform {} does not have a stable unique name. "
- + "This will prevent updating of pipelines.", fullName);
+ LOG.warn(
+ "Transform {} does not have a stable unique name. "
+ + "This will prevent updating of pipelines.",
+ uniqueName);
break;
case ERROR:
throw new IllegalStateException(
- "Transform " + fullName + " does not have a stable unique name. "
- + "This will prevent updating of pipelines.");
+ "Transform "
+ + uniqueName
+ + " does not have a stable unique name. "
+ + "This will prevent updating of pipelines.");
default:
throw new IllegalArgumentException(
"Unrecognized value for stable unique names: " + getOptions().getStableUniqueNames());
}
}
- TransformTreeNode child =
- new TransformTreeNode(parent, transform, fullName, input);
- parent.addComposite(child);
-
- transforms.addInput(child, input);
-
LOG.debug("Adding {} to {}", transform, this);
+ transforms.pushNode(uniqueName, input, transform);
try {
- transforms.pushNode(child);
+ transforms.finishSpecifyingInput();
transform.validate(input);
OutputT output = runner.apply(transform, input);
- transforms.setOutput(child, output);
+ transforms.setOutput(output);
- AppliedPTransform<?, ?, ?> applied = AppliedPTransform.of(
- child.getFullName(), input, output, transform);
- // recordAsOutput is a NOOP if already called;
- output.recordAsOutput(applied);
- verifyOutputState(output, child);
return output;
} finally {
transforms.popNode();
@@ -405,63 +393,6 @@ public class Pipeline {
}
/**
- * Returns all producing transforms for the {@link PValue PValues} contained
- * in {@code output}.
- */
- private List<AppliedPTransform<?, ?, ?>> getProducingTransforms(POutput output) {
- List<AppliedPTransform<?, ?, ?>> producingTransforms = new ArrayList<>();
- for (PValue value : output.expand()) {
- AppliedPTransform<?, ?, ?> transform = value.getProducingTransformInternal();
- if (transform != null) {
- producingTransforms.add(transform);
- }
- }
- return producingTransforms;
- }
-
- /**
- * Verifies that the output of a {@link PTransform} is correctly configured in its
- * {@link TransformTreeNode} in the {@link Pipeline} graph.
- *
- * <p>A non-composite {@link PTransform} must have all
- * of its outputs registered as produced by that {@link PTransform}.
- *
- * <p>A composite {@link PTransform} must have all of its outputs
- * registered as produced by the contained primitive {@link PTransform PTransforms}.
- * They have each had the above check performed already, when
- * they were applied, so the only possible failure state is
- * that the composite {@link PTransform} has returned a primitive output.
- */
- private void verifyOutputState(POutput output, TransformTreeNode node) {
- if (!node.isCompositeNode()) {
- PTransform<?, ?> thisTransform = node.getTransform();
- List<AppliedPTransform<?, ?, ?>> producingTransforms = getProducingTransforms(output);
- for (AppliedPTransform<?, ?, ?> producingTransform : producingTransforms) {
- // Using != because object identity indicates that the transforms
- // are the same node in the pipeline
- if (thisTransform != producingTransform.getTransform()) {
- throw new IllegalArgumentException("Output of non-composite transform "
- + thisTransform + " is registered as being produced by"
- + " a different transform: " + producingTransform);
- }
- }
- } else {
- PTransform<?, ?> thisTransform = node.getTransform();
- List<AppliedPTransform<?, ?, ?>> producingTransforms = getProducingTransforms(output);
- for (AppliedPTransform<?, ?, ?> producingTransform : producingTransforms) {
- // Using == because object identity indicates that the transforms
- // are the same node in the pipeline
- if (thisTransform == producingTransform.getTransform()) {
- throw new IllegalStateException("Output of composite transform "
- + thisTransform + " is registered as being produced by it,"
- + " but the output of every composite transform should be"
- + " produced by a primitive transform contained therein.");
- }
- }
- }
- }
-
- /**
* Returns the configured {@link PipelineRunner}.
*/
public PipelineRunner<?> getRunner() {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ab1f1ad0/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
index 0a4bb08..d3fd497 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
@@ -17,14 +17,17 @@
*/
package org.apache.beam.sdk.runners;
+import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
-import java.util.Deque;
+import java.util.ArrayList;
import java.util.HashMap;
-import java.util.LinkedList;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
@@ -34,70 +37,109 @@ import org.apache.beam.sdk.values.PValue;
* associated {@link PValue}s.
*/
public class TransformHierarchy {
- private final Deque<TransformTreeNode> transformStack = new LinkedList<>();
- private final Map<PInput, TransformTreeNode> producingTransformNode = new HashMap<>();
+ private final TransformTreeNode root;
+ private final Map<POutput, TransformTreeNode> producers;
+ // Maintain a stack based on the enclosing nodes
+ private TransformTreeNode current;
- /**
- * Create a {@code TransformHierarchy} containing a root node.
- */
public TransformHierarchy() {
- // First element in the stack is the root node, holding all child nodes.
- transformStack.add(new TransformTreeNode(null, null, "", null));
+ root = TransformTreeNode.root(this);
+ current = root;
+ producers = new HashMap<>();
}
/**
- * Returns the last TransformTreeNode on the stack.
+ * Adds the named {@link PTransform} consuming the provided {@link PInput} as a node in this
+ * {@link TransformHierarchy} as a child of the current node, and sets it to be the current node.
+ *
+ * <p>This call should be finished by expanding and recursively calling {@link #pushNode(String,
+ * PInput, PTransform)}, calling {@link #finishSpecifyingInput()}, setting the output with {@link
+ * #setOutput(POutput)}, and ending with a call to {@link #popNode()}.
+ *
+ * @return the added node
*/
- public TransformTreeNode getCurrent() {
- return transformStack.peek();
+ public TransformTreeNode pushNode(String name, PInput input, PTransform<?, ?> transform) {
+ checkNotNull(
+ transform, "A %s must be provided for all Nodes", PTransform.class.getSimpleName());
+ checkNotNull(
+ name, "A name must be provided for all %s Nodes", PTransform.class.getSimpleName());
+ checkNotNull(
+ input, "An input must be provided for all %s Nodes", PTransform.class.getSimpleName());
+ current = TransformTreeNode.subtransform(current, transform, name, input);
+ return current;
}
/**
- * Add a TransformTreeNode to the stack.
+ * Finish specifying all of the input {@link PValue PValues} of the current {@link
+ * TransformTreeNode}. Ensures that all of the inputs to the current node have been fully
+ * specified, and have been produced by a node in this graph.
*/
- public void pushNode(TransformTreeNode current) {
- transformStack.push(current);
+ public void finishSpecifyingInput() {
+ // Inputs must be completely specified before they are consumed by a transform.
+ current.getInput().finishSpecifying();
+ for (PValue inputValue : current.getInput().expand()) {
+ checkState(producers.get(inputValue) != null, "Producer unknown for input %s", inputValue);
+ inputValue.finishSpecifying();
+ }
}
/**
- * Removes the last TransformTreeNode from the stack.
+ * Set the output of the current {@link TransformTreeNode}. If the output is new (setOutput has
+ * not previously been called with it as the parameter), the current node is set as the producer
+ * of that {@link POutput}.
+ *
+ * <p>Also validates the output - specifically, a Primitive {@link PTransform} produces all of
+ * its outputs, and a Composite {@link PTransform} produces none of its outputs. Verifies that the
+ * expanded output does not contain {@link PValue PValues} produced by both this node and other
+ * nodes.
*/
- public void popNode() {
- transformStack.pop();
- checkState(!transformStack.isEmpty());
+ public void setOutput(POutput output) {
+ for (PValue value : output.expand()) {
+ if (!producers.containsKey(value)) {
+ producers.put(value, current);
+ }
+ }
+ current.setOutput(output);
+ // TODO: Replace with a "generateDefaultNames" method.
+ output.recordAsOutput(current.toAppliedPTransform());
}
/**
- * Adds an input to the given node.
- *
- * <p>This forces the producing node to be finished.
+ * Pops the current node off the top of the stack, finishing it. Outputs of the node are finished
+ * once they are consumed as input.
*/
- public void addInput(TransformTreeNode node, PInput input) {
- for (PValue i : input.expand()) {
- TransformTreeNode producer = producingTransformNode.get(i);
- checkState(producer != null, "Producer unknown for input: %s", i);
+ public void popNode() {
+ current.finishSpecifying();
+ current = current.getEnclosingNode();
+ checkState(current != null, "Can't pop the root node of a TransformHierarchy");
+ }
- producer.finishSpecifying();
- node.addInputProducer(i, producer);
- }
+ TransformTreeNode getProducer(PValue produced) {
+ return producers.get(produced);
}
/**
- * Sets the output of a transform node.
+ * Returns all producing transforms for the {@link PValue PValues} contained
+ * in {@code output}.
*/
- public void setOutput(TransformTreeNode producer, POutput output) {
- producer.setOutput(output);
-
- for (PValue o : output.expand()) {
- producingTransformNode.put(o, producer);
+ List<TransformTreeNode> getProducingTransforms(POutput output) {
+ List<TransformTreeNode> producingTransforms = new ArrayList<>();
+ for (PValue value : output.expand()) {
+ TransformTreeNode producer = getProducer(value);
+ if (producer != null) {
+ producingTransforms.add(producer);
+ }
}
+ return producingTransforms;
}
- /**
- * Visits all nodes in the transform hierarchy, in transitive order.
- */
- public void visit(Pipeline.PipelineVisitor visitor,
- Set<PValue> visitedNodes) {
- transformStack.peekFirst().visit(visitor, visitedNodes);
+ public Set<PValue> visit(PipelineVisitor visitor) {
+ Set<PValue> visitedValues = new HashSet<>();
+ root.visit(visitor, visitedValues);
+ return visitedValues;
+ }
+
+ public TransformTreeNode getCurrent() {
+ return current;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ab1f1ad0/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformTreeNode.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformTreeNode.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformTreeNode.java
index d16b828..ea94bd9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformTreeNode.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformTreeNode.java
@@ -17,18 +17,19 @@
*/
package org.apache.beam.sdk.runners;
-import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
+import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.HashSet;
import java.util.Set;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.Pipeline.PipelineVisitor.CompositeBehavior;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
@@ -39,6 +40,7 @@ import org.apache.beam.sdk.values.PValue;
* for initialization and ordered visitation.
*/
public class TransformTreeNode {
+ private final TransformHierarchy hierarchy;
private final TransformTreeNode enclosingNode;
// The PTransform for this node, which may be a composite PTransform.
@@ -51,10 +53,6 @@ public class TransformTreeNode {
// Nodes for sub-transforms of a composite transform.
private final Collection<TransformTreeNode> parts = new ArrayList<>();
- // Inputs to the transform, in expanded form and mapped to the producer
- // of the input.
- private final Map<PValue, TransformTreeNode> inputs = new HashMap<>();
-
// Input to the transform, in unexpanded form.
private final PInput input;
@@ -62,28 +60,57 @@ public class TransformTreeNode {
// Output of the transform, in unexpanded form.
private POutput output;
- private boolean finishedSpecifying = false;
+ @VisibleForTesting
+ boolean finishedSpecifying = false;
+
+ /**
+ * Create a root {@link TransformTreeNode}. This transform is the root of the provided {@link
+ * TransformHierarchy} - it has no enclosing node, no {@link PTransform}, no {@link PInput input},
+ * no {@link POutput output}, and an empty name. It contains all {@link PTransform transforms}
+ * within a {@link Pipeline} as component transforms.
+ */
+ public static TransformTreeNode root(TransformHierarchy hierarchy) {
+ return new TransformTreeNode(hierarchy, null, null, "", null);
+ }
+
+ /**
+ * Create a subtransform of the provided {@link TransformTreeNode node}. The enclosing node is a
+ * composite that contains this transform.
+ *
+ * <p>The returned node is a component node of the enclosing node.
+ */
+ public static TransformTreeNode subtransform(
+ TransformTreeNode enclosing, PTransform<?, ?> transform, String fullName, PInput input) {
+ checkNotNull(enclosing);
+ checkNotNull(transform);
+ checkNotNull(fullName);
+ checkNotNull(input);
+ TransformTreeNode node =
+ new TransformTreeNode(enclosing.hierarchy, enclosing, transform, fullName, input);
+ enclosing.addComposite(node);
+ return node;
+ }
/**
* Creates a new TransformTreeNode with the given parent and transform.
*
- * <p>EnclosingNode and transform may both be null for
- * a root-level node, which holds all other nodes.
+ * <p>EnclosingNode and transform may both be null for a root-level node, which holds all other
+ * nodes.
*
* @param enclosingNode the composite node containing this node
* @param transform the PTransform tracked by this node
* @param fullName the fully qualified name of the transform
* @param input the unexpanded input to the transform
*/
- public TransformTreeNode(@Nullable TransformTreeNode enclosingNode,
- @Nullable PTransform<?, ?> transform,
- String fullName,
- @Nullable PInput input) {
+ private TransformTreeNode(
+ TransformHierarchy hierarchy,
+ @Nullable TransformTreeNode enclosingNode,
+ @Nullable PTransform<?, ?> transform,
+ String fullName,
+ @Nullable PInput input) {
+ this.hierarchy = hierarchy;
this.enclosingNode = enclosingNode;
this.transform = transform;
- checkArgument((enclosingNode == null && transform == null)
- || (enclosingNode != null && transform != null),
- "EnclosingNode and transform must both be specified, or both be null");
this.fullName = fullName;
this.input = input;
}
@@ -113,21 +140,23 @@ public class TransformTreeNode {
}
/**
- * Returns true if this node represents a composite transform that does not perform
- * processing of its own, but merely encapsulates a sub-pipeline (which may be empty).
+ * Returns true if this node represents a composite transform that does not perform processing of
+ * its own, but merely encapsulates a sub-pipeline (which may be empty).
*
- * <p>Note that a node may be composite with no sub-transforms if it returns its input directly
+ * <p>Note that a node may be composite with no sub-transforms if it returns its input directly
* extracts a component of a tuple, or other operations that occur at pipeline assembly time.
*/
public boolean isCompositeNode() {
- return !parts.isEmpty() || returnsOthersOutput() || isRootNode();
+ return !parts.isEmpty() || isRootNode() || returnsOthersOutput();
}
private boolean returnsOthersOutput() {
PTransform<?, ?> transform = getTransform();
- for (PValue output : getExpandedOutputs()) {
- if (!output.getProducingTransformInternal().getTransform().equals(transform)) {
- return true;
+ if (output != null) {
+ for (PValue outputValue : output.expand()) {
+ if (!hierarchy.getProducer(outputValue).getTransform().equals(transform)) {
+ return true;
+ }
}
}
return false;
@@ -142,14 +171,6 @@ public class TransformTreeNode {
}
/**
- * Adds an input to the transform node.
- */
- public void addInputProducer(PValue expandedInput, TransformTreeNode producer) {
- checkState(!finishedSpecifying);
- inputs.put(expandedInput, producer);
- }
-
- /**
* Returns the transform input, in unexpanded form.
*/
public PInput getInput() {
@@ -157,20 +178,37 @@ public class TransformTreeNode {
}
/**
- * Returns a mapping of inputs to the producing nodes for all inputs to
- * the transform.
- */
- public Map<PValue, TransformTreeNode> getInputs() {
- return Collections.unmodifiableMap(inputs);
- }
-
- /**
* Adds an output to the transform node.
*/
public void setOutput(POutput output) {
checkState(!finishedSpecifying);
- checkState(this.output == null);
+ checkState(this.output == null, "Tried to specify more than one output for %s", getFullName());
+ checkNotNull(output, "Tried to set the output of %s to null", getFullName());
this.output = output;
+
+ // Validate that a primitive transform produces only primitive output, and a composite transform
+ // does not produce primitive output.
+ Set<TransformTreeNode> outputProducers = new HashSet<>();
+ for (PValue outputValue : output.expand()) {
+ outputProducers.add(hierarchy.getProducer(outputValue));
+ }
+ if (outputProducers.contains(this) && outputProducers.size() != 1) {
+ Set<String> otherProducerNames = new HashSet<>();
+ for (TransformTreeNode outputProducer : outputProducers) {
+ if (outputProducer != this) {
+ otherProducerNames.add(outputProducer.getFullName());
+ }
+ }
+ throw new IllegalArgumentException(
+ String.format(
+ "Output of transform [%s] contains a %s produced by it as well as other Transforms. "
+ + "A primitive transform must produce all of its outputs, and outputs of a "
+ + "composite transform must be produced by a component transform or be part of"
+ + "the input."
+ + "%n Other Outputs: %s"
+ + "%n Other Producers: %s",
+ getFullName(), POutput.class.getSimpleName(), output.expand(), otherProducerNames));
+ }
}
/**
@@ -180,17 +218,10 @@ public class TransformTreeNode {
return output;
}
- /**
- * Returns the transform outputs, in expanded form.
- */
- public Collection<? extends PValue> getExpandedOutputs() {
- if (output != null) {
- return output.expand();
- } else {
- return Collections.emptyList();
- }
+ AppliedPTransform<?, ?, ?> toAppliedPTransform() {
+ return AppliedPTransform.of(
+ getFullName(), getInput(), getOutput(), (PTransform) getTransform());
}
-
/**
* Visit the transform node.
*
@@ -204,10 +235,12 @@ public class TransformTreeNode {
finishSpecifying();
}
- // Visit inputs.
- for (Map.Entry<PValue, TransformTreeNode> entry : inputs.entrySet()) {
- if (visitedValues.add(entry.getKey())) {
- visitor.visitValue(entry.getKey(), entry.getValue());
+ if (!isRootNode()) {
+ // Visit inputs.
+ for (PValue inputValue : input.expand()) {
+ if (visitedValues.add(inputValue)) {
+ visitor.visitValue(inputValue, hierarchy.getProducer(inputValue));
+ }
}
}
@@ -224,10 +257,12 @@ public class TransformTreeNode {
visitor.visitPrimitiveTransform(this);
}
- // Visit outputs.
- for (PValue pValue : getExpandedOutputs()) {
- if (visitedValues.add(pValue)) {
- visitor.visitValue(pValue, this);
+ if (!isRootNode()) {
+ // Visit outputs.
+ for (PValue pValue : output.expand()) {
+ if (visitedValues.add(pValue)) {
+ visitor.visitValue(pValue, this);
+ }
}
}
}
@@ -243,15 +278,5 @@ public class TransformTreeNode {
return;
}
finishedSpecifying = true;
-
- for (TransformTreeNode input : inputs.values()) {
- if (input != null) {
- input.finishSpecifying();
- }
- }
-
- if (output != null) {
- output.finishSpecifyingOutput();
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ab1f1ad0/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
index c28f23e..3bf6d64 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.runners;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertThat;
import java.util.HashSet;
@@ -30,9 +31,15 @@ import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
import org.hamcrest.Matchers;
import org.junit.Before;
@@ -63,7 +70,7 @@ public class TransformHierarchyTest {
}
@Test
- public void popWithoutPushThrows() {
+ public void pushWithoutPushFails() {
thrown.expect(IllegalStateException.class);
hierarchy.popNode();
}
@@ -71,72 +78,153 @@ public class TransformHierarchyTest {
@Test
public void pushThenPopSucceeds() {
TransformTreeNode root = hierarchy.getCurrent();
- TransformTreeNode node =
- new TransformTreeNode(hierarchy.getCurrent(), Create.of(1), "Create", PBegin.in(pipeline));
- hierarchy.pushNode(node);
+ TransformTreeNode node = hierarchy.pushNode("Create", PBegin.in(pipeline), Create.of(1));
assertThat(hierarchy.getCurrent(), equalTo(node));
hierarchy.popNode();
+ assertThat(node.finishedSpecifying, is(true));
assertThat(hierarchy.getCurrent(), equalTo(root));
}
@Test
+ public void emptyCompositeSucceeds() {
+ PCollection<Long> created =
+ PCollection.createPrimitiveOutputInternal(
+ pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+ TransformTreeNode node = hierarchy.pushNode("Create", PBegin.in(pipeline), Create.of(1));
+ hierarchy.setOutput(created);
+ hierarchy.popNode();
+ PCollectionList<Long> pcList = PCollectionList.of(created);
+
+ TransformTreeNode emptyTransform =
+ hierarchy.pushNode(
+ "Extract",
+ pcList,
+ new PTransform<PCollectionList<Long>, PCollection<Long>>() {
+ @Override
+ public PCollection<Long> apply(PCollectionList<Long> input) {
+ return input.get(0);
+ }
+ });
+ hierarchy.setOutput(created);
+ hierarchy.popNode();
+ assertThat(hierarchy.getProducer(created), equalTo(node));
+ assertThat(
+ "A Transform that produces non-primtive output should be composite",
+ emptyTransform.isCompositeNode(),
+ is(true));
+ }
+
+ @Test
+ public void producingOwnAndOthersOutputsFails() {
+ PCollection<Long> created =
+ PCollection.createPrimitiveOutputInternal(
+ pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+ hierarchy.pushNode("Create", PBegin.in(pipeline), Create.of(1));
+ hierarchy.setOutput(created);
+ hierarchy.popNode();
+ PCollectionList<Long> pcList = PCollectionList.of(created);
+
+ final PCollectionList<Long> appended =
+ pcList.and(
+ PCollection.<Long>createPrimitiveOutputInternal(
+ pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED));
+ hierarchy.pushNode(
+ "AddPc",
+ pcList,
+ new PTransform<PCollectionList<Long>, PCollectionList<Long>>() {
+ @Override
+ public PCollectionList<Long> apply(PCollectionList<Long> input) {
+ return appended;
+ }
+ });
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("produced by it as well as other Transforms");
+ thrown.expectMessage("primitive transform must produce all of its outputs");
+ thrown.expectMessage("composite transform must be produced by a component transform");
+ thrown.expectMessage("AddPc");
+ thrown.expectMessage("Create");
+ thrown.expectMessage(appended.expand().toString());
+ hierarchy.setOutput(appended);
+ }
+
+ @Test
public void visitVisitsAllPushed() {
TransformTreeNode root = hierarchy.getCurrent();
- Create.Values<Integer> create = Create.of(1);
- PCollection<Integer> created = pipeline.apply(create);
PBegin begin = PBegin.in(pipeline);
- TransformTreeNode compositeNode =
- new TransformTreeNode(root, create, "Create", begin);
- root.addComposite(compositeNode);
- TransformTreeNode primitiveNode =
- new TransformTreeNode(
- compositeNode, Read.from(CountingSource.upTo(1L)), "Create/Read", begin);
- compositeNode.addComposite(primitiveNode);
-
- TransformTreeNode otherPrimitive =
- new TransformTreeNode(
- root, MapElements.via(new SimpleFunction<Integer, Integer>() {
- @Override
- public Integer apply(Integer input) {
- return input;
- }
- }), "ParDo", created);
- root.addComposite(otherPrimitive);
- otherPrimitive.addInputProducer(created, primitiveNode);
+ Create.Values<Long> create = Create.of(1L);
+ Read.Bounded<Long> read = Read.from(CountingSource.upTo(1L));
- hierarchy.pushNode(compositeNode);
- hierarchy.pushNode(primitiveNode);
+ PCollection<Long> created =
+ PCollection.createPrimitiveOutputInternal(
+ pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+
+ MapElements<Long, Long> map = MapElements.via(new SimpleFunction<Long, Long>() {
+ @Override
+ public Long apply(Long input) {
+ return input;
+ }
+ });
+
+ PCollection<Long> mapped =
+ PCollection.createPrimitiveOutputInternal(
+ pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+
+ TransformTreeNode compositeNode = hierarchy.pushNode("Create", begin, create);
+ assertThat(hierarchy.getCurrent(), equalTo(compositeNode));
+ assertThat(compositeNode.getInput(), Matchers.<PInput>equalTo(begin));
+ assertThat(compositeNode.getTransform(), Matchers.<PTransform<?, ?>>equalTo(create));
+ // Not yet set
+ assertThat(compositeNode.getOutput(), nullValue());
+ assertThat(compositeNode.getEnclosingNode().isRootNode(), is(true));
+
+ TransformTreeNode primitiveNode = hierarchy.pushNode("Create/Read", begin, read);
+ assertThat(hierarchy.getCurrent(), equalTo(primitiveNode));
+ hierarchy.setOutput(created);
hierarchy.popNode();
+ assertThat(primitiveNode.getOutput(), Matchers.<POutput>equalTo(created));
+ assertThat(primitiveNode.getInput(), Matchers.<PInput>equalTo(begin));
+ assertThat(primitiveNode.getTransform(), Matchers.<PTransform<?, ?>>equalTo(read));
+ assertThat(primitiveNode.getEnclosingNode(), equalTo(compositeNode));
+
+ hierarchy.setOutput(created);
+ // The composite is listed as outputting a PValue created by the contained primitive
+ assertThat(compositeNode.getOutput(), Matchers.<POutput>equalTo(created));
+ // The producer of that PValue is still the primitive in which it is first output
+ assertThat(hierarchy.getProducer(created), equalTo(primitiveNode));
hierarchy.popNode();
- hierarchy.pushNode(otherPrimitive);
+
+ TransformTreeNode otherPrimitive = hierarchy.pushNode("ParDo", created, map);
+ hierarchy.setOutput(mapped);
hierarchy.popNode();
final Set<TransformTreeNode> visitedCompositeNodes = new HashSet<>();
final Set<TransformTreeNode> visitedPrimitiveNodes = new HashSet<>();
final Set<PValue> visitedValuesInVisitor = new HashSet<>();
- Set<PValue> visitedValues = new HashSet<>();
- hierarchy.visit(new PipelineVisitor.Defaults() {
- @Override
- public CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
- visitedCompositeNodes.add(node);
- return CompositeBehavior.ENTER_TRANSFORM;
- }
-
- @Override
- public void visitPrimitiveTransform(TransformTreeNode node) {
- visitedPrimitiveNodes.add(node);
- }
-
- @Override
- public void visitValue(PValue value, TransformTreeNode producer) {
- visitedValuesInVisitor.add(value);
- }
- }, visitedValues);
+ Set<PValue> visitedValues =
+ hierarchy.visit(
+ new PipelineVisitor.Defaults() {
+ @Override
+ public CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
+ visitedCompositeNodes.add(node);
+ return CompositeBehavior.ENTER_TRANSFORM;
+ }
+
+ @Override
+ public void visitPrimitiveTransform(TransformTreeNode node) {
+ visitedPrimitiveNodes.add(node);
+ }
+
+ @Override
+ public void visitValue(PValue value, TransformTreeNode producer) {
+ visitedValuesInVisitor.add(value);
+ }
+ });
assertThat(visitedCompositeNodes, containsInAnyOrder(root, compositeNode));
assertThat(visitedPrimitiveNodes, containsInAnyOrder(primitiveNode, otherPrimitive));
- assertThat(visitedValuesInVisitor, Matchers.<PValue>containsInAnyOrder(created));
+ assertThat(visitedValuesInVisitor, Matchers.<PValue>containsInAnyOrder(created, mapped));
+ assertThat(visitedValuesInVisitor, equalTo(visitedValues));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ab1f1ad0/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
index def3a02..b95fa70 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
@@ -22,7 +22,6 @@ import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
import java.io.File;
import java.util.Arrays;
@@ -169,14 +168,13 @@ public class TransformTreeTest {
assertTrue(left.equals(EnumSet.of(TransformsSeen.SAMPLE_ANY)));
}
- @Test(expected = IllegalStateException.class)
+ @Test(expected = IllegalArgumentException.class)
public void testOutputChecking() throws Exception {
Pipeline p = TestPipeline.create();
p.apply(new InvalidCompositeTransform());
p.traverseTopologically(new Pipeline.PipelineVisitor.Defaults() {});
- fail("traversal should have failed with an IllegalStateException");
}
@Test
[2/2] incubator-beam git commit: This closes #1469
Posted by tg...@apache.org.
This closes #1469
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/48130f71
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/48130f71
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/48130f71
Branch: refs/heads/master
Commit: 48130f718d019d6928c464e6f7ad90cd510b62d2
Parents: 0c875ba ab1f1ad
Author: Thomas Groh <tg...@google.com>
Authored: Thu Dec 1 12:55:26 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Thu Dec 1 12:55:26 2016 -0800
----------------------------------------------------------------------
.../direct/KeyedPValueTrackingVisitor.java | 2 +-
.../DataflowPipelineTranslatorTest.java | 2 +-
.../main/java/org/apache/beam/sdk/Pipeline.java | 117 +++---------
.../beam/sdk/runners/TransformHierarchy.java | 126 ++++++++-----
.../beam/sdk/runners/TransformTreeNode.java | 165 +++++++++--------
.../sdk/runners/TransformHierarchyTest.java | 180 ++++++++++++++-----
.../beam/sdk/runners/TransformTreeTest.java | 4 +-
7 files changed, 340 insertions(+), 256 deletions(-)
----------------------------------------------------------------------