You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/06/23 03:11:13 UTC

[1/2] beam git commit: Allow output from FinishBundle in DoFnTester

Repository: beam
Updated Branches:
  refs/heads/master 5f8cfa741 -> b3c36256e


Allow output from FinishBundle in DoFnTester


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/649994b3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/649994b3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/649994b3

Branch: refs/heads/master
Commit: 649994b353afe28c917969609c7a1a47a4f39aaf
Parents: 5f8cfa7
Author: Rune Fevang <fe...@exabel.com>
Authored: Thu Jun 15 13:51:12 2017 +0200
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jun 22 20:07:11 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/transforms/DoFnTester.java  | 16 ++--------
 .../beam/sdk/transforms/DoFnTesterTest.java     | 32 ++++++++++++++++++++
 2 files changed, 35 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/649994b3/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 8a03f3c..4da9a80 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -546,11 +546,6 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
       fn.super();
     }
 
-    private void throwUnsupportedOutputFromBundleMethods() {
-      throw new UnsupportedOperationException(
-          "DoFnTester doesn't support output from bundle methods");
-    }
-
     @Override
     public PipelineOptions getPipelineOptions() {
       return options;
@@ -559,12 +554,13 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
     @Override
     public void output(
         OutputT output, Instant timestamp, BoundedWindow window) {
-      throwUnsupportedOutputFromBundleMethods();
+      output(mainOutputTag, output, timestamp, window);
     }
 
     @Override
     public <T> void output(TupleTag<T> tag, T output, Instant timestamp, BoundedWindow window) {
-      throwUnsupportedOutputFromBundleMethods();
+      getMutableOutput(tag)
+          .add(ValueInSingleWindow.of(output, timestamp, window, PaneInfo.NO_FIRING));
     }
   }
 
@@ -642,12 +638,6 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
       getMutableOutput(tag)
           .add(ValueInSingleWindow.of(output, timestamp, element.getWindow(), element.getPane()));
     }
-
-    private void throwUnsupportedOutputFromBundleMethods() {
-      throw new UnsupportedOperationException(
-          "DoFnTester doesn't support output from bundle methods");
-    }
-
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/649994b3/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
index 1bb71bb..5cb9e18 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
@@ -360,6 +360,38 @@ public class DoFnTesterTest {
     }
   }
 
+  @Test
+  public void testSupportsFinishBundleOutput() throws Exception {
+    for (DoFnTester.CloningBehavior cloning : DoFnTester.CloningBehavior.values()) {
+      try (DoFnTester<Integer, Integer> tester = DoFnTester.of(new BundleCounterDoFn())) {
+        tester.setCloningBehavior(cloning);
+
+        assertThat(tester.processBundle(1, 2, 3, 4), contains(4));
+        assertThat(tester.processBundle(5, 6, 7), contains(3));
+        assertThat(tester.processBundle(8, 9), contains(2));
+      }
+    }
+  }
+
+  private static class BundleCounterDoFn extends DoFn<Integer, Integer> {
+    private int elements;
+
+    @StartBundle
+    public void startBundle() {
+      elements = 0;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      elements++;
+    }
+
+    @FinishBundle
+    public void finishBundle(FinishBundleContext c) {
+      c.output(elements, Instant.now(), GlobalWindow.INSTANCE);
+    }
+  }
+
   private static class SideInputDoFn extends DoFn<Integer, Integer> {
     private final PCollectionView<Integer> value;
 


[2/2] beam git commit: This closes #3367: Allow output from FinishBundle in DoFnTester

Posted by ke...@apache.org.
This closes #3367: Allow output from FinishBundle in DoFnTester


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b3c36256
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b3c36256
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b3c36256

Branch: refs/heads/master
Commit: b3c36256e32f4a4a616d0e152686d29f1174c5cd
Parents: 5f8cfa7 649994b
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jun 22 20:07:48 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jun 22 20:07:48 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/transforms/DoFnTester.java  | 16 ++--------
 .../beam/sdk/transforms/DoFnTesterTest.java     | 32 ++++++++++++++++++++
 2 files changed, 35 insertions(+), 13 deletions(-)
----------------------------------------------------------------------