You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/01/24 00:24:38 UTC
[1/2] beam git commit: Ensure Composite Nodes produce no output
Repository: beam
Updated Branches:
refs/heads/master 338012d14 -> 6ecbfb9e1
Ensure Composite Nodes produce no output
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0e1893a4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0e1893a4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0e1893a4
Branch: refs/heads/master
Commit: 0e1893a471bb87fbb48be94370a2c0b0872f23b3
Parents: 338012d
Author: Thomas Groh <tg...@google.com>
Authored: Wed Jan 18 11:45:25 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Jan 23 16:24:23 2017 -0800
----------------------------------------------------------------------
.../beam/sdk/runners/TransformHierarchy.java | 33 +++++++++++---------
.../sdk/runners/TransformHierarchyTest.java | 30 ++++++++++++++++--
2 files changed, 46 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/0e1893a4/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
index 3676e1a..dc8f823 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
@@ -290,22 +290,27 @@ public class TransformHierarchy {
for (TaggedPValue outputValue : output.expand()) {
outputProducers.add(getProducer(outputValue.getValue()));
}
- if (outputProducers.contains(this) && outputProducers.size() != 1) {
- Set<String> otherProducerNames = new HashSet<>();
- for (Node outputProducer : outputProducers) {
- if (outputProducer != this) {
- otherProducerNames.add(outputProducer.getFullName());
+ if (outputProducers.contains(this)) {
+ if (!parts.isEmpty() || outputProducers.size() > 1) {
+ Set<String> otherProducerNames = new HashSet<>();
+ for (Node outputProducer : outputProducers) {
+ if (outputProducer != this) {
+ otherProducerNames.add(outputProducer.getFullName());
+ }
}
+ throw new IllegalArgumentException(
+ String.format(
+ "Output of composite transform [%s] contains a primitive %s produced by it. "
+ + "Only primitive transforms are permitted to produce primitive outputs."
+ + "%n Outputs: %s"
+ + "%n Other Producers: %s"
+ + "%n Components: %s",
+ getFullName(),
+ POutput.class.getSimpleName(),
+ output.expand(),
+ otherProducerNames,
+ parts));
}
- throw new IllegalArgumentException(
- String.format(
- "Output of transform [%s] contains a %s produced by it as well as other "
- + "Transforms. A primitive transform must produce all of its outputs, and "
- + "outputs of a composite transform must be produced by a component transform "
- + "or be part of the input."
- + "%n Other Outputs: %s"
- + "%n Other Producers: %s",
- getFullName(), POutput.class.getSimpleName(), output.expand(), otherProducerNames));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/0e1893a4/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
index d373caf..9a77b9b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
@@ -140,9 +140,7 @@ public class TransformHierarchyTest implements Serializable {
}
});
thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("produced by it as well as other Transforms");
- thrown.expectMessage("primitive transform must produce all of its outputs");
- thrown.expectMessage("composite transform must be produced by a component transform");
+ thrown.expectMessage("contains a primitive POutput produced by it");
thrown.expectMessage("AddPc");
thrown.expectMessage("Create");
thrown.expectMessage(appended.expand().toString());
@@ -150,6 +148,32 @@ public class TransformHierarchyTest implements Serializable {
}
@Test
+ public void producingOwnOutputWithCompositeFails() {
+ final PCollection<Long> comp =
+ PCollection.createPrimitiveOutputInternal(
+ pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+ PTransform<PBegin, PCollection<Long>> root =
+ new PTransform<PBegin, PCollection<Long>>() {
+ @Override
+ public PCollection<Long> expand(PBegin input) {
+ return comp;
+ }
+ };
+ hierarchy.pushNode("Composite", PBegin.in(pipeline), root);
+
+ Create.Values<Integer> create = Create.of(1);
+ hierarchy.pushNode("Create", PBegin.in(pipeline), create);
+ hierarchy.setOutput(pipeline.apply(create));
+ hierarchy.popNode();
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("contains a primitive POutput produced by it");
+ thrown.expectMessage("primitive transforms are permitted to produce");
+ thrown.expectMessage("Composite");
+ hierarchy.setOutput(comp);
+ }
+
+ @Test
public void visitVisitsAllPushed() {
TransformHierarchy.Node root = hierarchy.getCurrent();
PBegin begin = PBegin.in(pipeline);
[2/2] beam git commit: This closes #1788
Posted by tg...@apache.org.
This closes #1788
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6ecbfb9e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6ecbfb9e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6ecbfb9e
Branch: refs/heads/master
Commit: 6ecbfb9e155755b5fa39df6a5212cd38a00ea45e
Parents: 338012d 0e1893a
Author: Thomas Groh <tg...@google.com>
Authored: Mon Jan 23 16:24:24 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Jan 23 16:24:24 2017 -0800
----------------------------------------------------------------------
.../beam/sdk/runners/TransformHierarchy.java | 33 +++++++++++---------
.../sdk/runners/TransformHierarchyTest.java | 30 ++++++++++++++++--
2 files changed, 46 insertions(+), 17 deletions(-)
----------------------------------------------------------------------