You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/06/23 03:11:13 UTC
[1/2] beam git commit: Allow output from FinishBundle in DoFnTester
Repository: beam
Updated Branches:
refs/heads/master 5f8cfa741 -> b3c36256e
Allow output from FinishBundle in DoFnTester
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/649994b3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/649994b3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/649994b3
Branch: refs/heads/master
Commit: 649994b353afe28c917969609c7a1a47a4f39aaf
Parents: 5f8cfa7
Author: Rune Fevang <fe...@exabel.com>
Authored: Thu Jun 15 13:51:12 2017 +0200
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jun 22 20:07:11 2017 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/transforms/DoFnTester.java | 16 ++--------
.../beam/sdk/transforms/DoFnTesterTest.java | 32 ++++++++++++++++++++
2 files changed, 35 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/649994b3/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 8a03f3c..4da9a80 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -546,11 +546,6 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
fn.super();
}
- private void throwUnsupportedOutputFromBundleMethods() {
- throw new UnsupportedOperationException(
- "DoFnTester doesn't support output from bundle methods");
- }
-
@Override
public PipelineOptions getPipelineOptions() {
return options;
@@ -559,12 +554,13 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
@Override
public void output(
OutputT output, Instant timestamp, BoundedWindow window) {
- throwUnsupportedOutputFromBundleMethods();
+ output(mainOutputTag, output, timestamp, window);
}
@Override
public <T> void output(TupleTag<T> tag, T output, Instant timestamp, BoundedWindow window) {
- throwUnsupportedOutputFromBundleMethods();
+ getMutableOutput(tag)
+ .add(ValueInSingleWindow.of(output, timestamp, window, PaneInfo.NO_FIRING));
}
}
@@ -642,12 +638,6 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
getMutableOutput(tag)
.add(ValueInSingleWindow.of(output, timestamp, element.getWindow(), element.getPane()));
}
-
- private void throwUnsupportedOutputFromBundleMethods() {
- throw new UnsupportedOperationException(
- "DoFnTester doesn't support output from bundle methods");
- }
-
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/649994b3/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
index 1bb71bb..5cb9e18 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
@@ -360,6 +360,38 @@ public class DoFnTesterTest {
}
}
+ @Test
+ public void testSupportsFinishBundleOutput() throws Exception {
+ for (DoFnTester.CloningBehavior cloning : DoFnTester.CloningBehavior.values()) {
+ try (DoFnTester<Integer, Integer> tester = DoFnTester.of(new BundleCounterDoFn())) {
+ tester.setCloningBehavior(cloning);
+
+ assertThat(tester.processBundle(1, 2, 3, 4), contains(4));
+ assertThat(tester.processBundle(5, 6, 7), contains(3));
+ assertThat(tester.processBundle(8, 9), contains(2));
+ }
+ }
+ }
+
+ private static class BundleCounterDoFn extends DoFn<Integer, Integer> {
+ private int elements;
+
+ @StartBundle
+ public void startBundle() {
+ elements = 0;
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ elements++;
+ }
+
+ @FinishBundle
+ public void finishBundle(FinishBundleContext c) {
+ c.output(elements, Instant.now(), GlobalWindow.INSTANCE);
+ }
+ }
+
private static class SideInputDoFn extends DoFn<Integer, Integer> {
private final PCollectionView<Integer> value;
[2/2] beam git commit: This closes #3367: Allow output from
FinishBundle in DoFnTester
Posted by ke...@apache.org.
This closes #3367: Allow output from FinishBundle in DoFnTester
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b3c36256
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b3c36256
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b3c36256
Branch: refs/heads/master
Commit: b3c36256e32f4a4a616d0e152686d29f1174c5cd
Parents: 5f8cfa7 649994b
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jun 22 20:07:48 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jun 22 20:07:48 2017 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/transforms/DoFnTester.java | 16 ++--------
.../beam/sdk/transforms/DoFnTesterTest.java | 32 ++++++++++++++++++++
2 files changed, 35 insertions(+), 13 deletions(-)
----------------------------------------------------------------------