You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/01/24 00:24:38 UTC

[1/2] beam git commit: Ensure Composite Nodes produce no output

Repository: beam
Updated Branches:
  refs/heads/master 338012d14 -> 6ecbfb9e1


Ensure Composite Nodes produce no output


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0e1893a4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0e1893a4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0e1893a4

Branch: refs/heads/master
Commit: 0e1893a471bb87fbb48be94370a2c0b0872f23b3
Parents: 338012d
Author: Thomas Groh <tg...@google.com>
Authored: Wed Jan 18 11:45:25 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Jan 23 16:24:23 2017 -0800

----------------------------------------------------------------------
 .../beam/sdk/runners/TransformHierarchy.java    | 33 +++++++++++---------
 .../sdk/runners/TransformHierarchyTest.java     | 30 ++++++++++++++++--
 2 files changed, 46 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/0e1893a4/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
index 3676e1a..dc8f823 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
@@ -290,22 +290,27 @@ public class TransformHierarchy {
       for (TaggedPValue outputValue : output.expand()) {
         outputProducers.add(getProducer(outputValue.getValue()));
       }
-      if (outputProducers.contains(this) && outputProducers.size() != 1) {
-        Set<String> otherProducerNames = new HashSet<>();
-        for (Node outputProducer : outputProducers) {
-          if (outputProducer != this) {
-            otherProducerNames.add(outputProducer.getFullName());
+      if (outputProducers.contains(this)) {
+        if (!parts.isEmpty() || outputProducers.size() > 1) {
+          Set<String> otherProducerNames = new HashSet<>();
+          for (Node outputProducer : outputProducers) {
+            if (outputProducer != this) {
+              otherProducerNames.add(outputProducer.getFullName());
+            }
           }
+          throw new IllegalArgumentException(
+              String.format(
+                  "Output of composite transform [%s] contains a primitive %s produced by it. "
+                      + "Only primitive transforms are permitted to produce primitive outputs."
+                      + "%n    Outputs: %s"
+                      + "%n    Other Producers: %s"
+                      + "%n    Components: %s",
+                  getFullName(),
+                  POutput.class.getSimpleName(),
+                  output.expand(),
+                  otherProducerNames,
+                  parts));
         }
-        throw new IllegalArgumentException(
-            String.format(
-                "Output of transform [%s] contains a %s produced by it as well as other "
-                    + "Transforms. A primitive transform must produce all of its outputs, and "
-                    + "outputs of a composite transform must be produced by a component transform "
-                    + "or be part of the input."
-                    + "%n    Other Outputs: %s"
-                    + "%n    Other Producers: %s",
-                getFullName(), POutput.class.getSimpleName(), output.expand(), otherProducerNames));
       }
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/0e1893a4/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
index d373caf..9a77b9b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
@@ -140,9 +140,7 @@ public class TransformHierarchyTest implements Serializable {
           }
         });
     thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("produced by it as well as other Transforms");
-    thrown.expectMessage("primitive transform must produce all of its outputs");
-    thrown.expectMessage("composite transform must be produced by a component transform");
+    thrown.expectMessage("contains a primitive POutput produced by it");
     thrown.expectMessage("AddPc");
     thrown.expectMessage("Create");
     thrown.expectMessage(appended.expand().toString());
@@ -150,6 +148,32 @@ public class TransformHierarchyTest implements Serializable {
   }
 
   @Test
+  public void producingOwnOutputWithCompositeFails() {
+    final PCollection<Long> comp =
+        PCollection.createPrimitiveOutputInternal(
+            pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+    PTransform<PBegin, PCollection<Long>> root =
+        new PTransform<PBegin, PCollection<Long>>() {
+          @Override
+          public PCollection<Long> expand(PBegin input) {
+            return comp;
+          }
+        };
+    hierarchy.pushNode("Composite", PBegin.in(pipeline), root);
+
+    Create.Values<Integer> create = Create.of(1);
+    hierarchy.pushNode("Create", PBegin.in(pipeline), create);
+    hierarchy.setOutput(pipeline.apply(create));
+    hierarchy.popNode();
+
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("contains a primitive POutput produced by it");
+    thrown.expectMessage("primitive transforms are permitted to produce");
+    thrown.expectMessage("Composite");
+    hierarchy.setOutput(comp);
+  }
+
+  @Test
   public void visitVisitsAllPushed() {
     TransformHierarchy.Node root = hierarchy.getCurrent();
     PBegin begin = PBegin.in(pipeline);


[2/2] beam git commit: This closes #1788

Posted by tg...@apache.org.
This closes #1788


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6ecbfb9e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6ecbfb9e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6ecbfb9e

Branch: refs/heads/master
Commit: 6ecbfb9e155755b5fa39df6a5212cd38a00ea45e
Parents: 338012d 0e1893a
Author: Thomas Groh <tg...@google.com>
Authored: Mon Jan 23 16:24:24 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Jan 23 16:24:24 2017 -0800

----------------------------------------------------------------------
 .../beam/sdk/runners/TransformHierarchy.java    | 33 +++++++++++---------
 .../sdk/runners/TransformHierarchyTest.java     | 30 ++++++++++++++++--
 2 files changed, 46 insertions(+), 17 deletions(-)
----------------------------------------------------------------------