You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/06/15 03:18:44 UTC
[2/3] incubator-beam git commit: Add
DoFnTester#peekOutputValuesInWindow
Add DoFnTester#peekOutputValuesInWindow
This permits DoFns that interact with windowing to test the windowed,
rather than overall output.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c627fa84
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c627fa84
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c627fa84
Branch: refs/heads/master
Commit: c627fa847fe649b4308c2becbe101dcb0a945843
Parents: 4da5ebf
Author: Thomas Groh <tg...@google.com>
Authored: Tue Jun 14 13:18:41 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Jun 14 18:25:23 2016 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/transforms/DoFnTester.java | 25 ++++++++++++++++++++
.../beam/sdk/transforms/DoFnTesterTest.java | 22 +++++++++++++++++
2 files changed, 47 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c627fa84/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 1df42e2..415af95 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -34,6 +34,7 @@ import org.apache.beam.sdk.values.TupleTagList;
import com.google.common.base.Function;
import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
@@ -287,6 +288,30 @@ public class DoFnTester<InputT, OutputT> {
}
/**
+ * Returns the elements output so far to the main output in the provided window with associated
+ * timestamps.
+ */
+ public List<TimestampedValue<OutputT>> peekOutputElementsInWindow(BoundedWindow window) {
+ return peekOutputElementsInWindow(mainOutputTag, window);
+ }
+
+ /**
+ * Returns the elements output so far to the specified output in the provided window with
+ * associated timestamps.
+ */
+ public List<TimestampedValue<OutputT>> peekOutputElementsInWindow(
+ TupleTag<OutputT> tag,
+ BoundedWindow window) {
+ ImmutableList.Builder<TimestampedValue<OutputT>> valuesBuilder = ImmutableList.builder();
+ for (WindowedValue<OutputT> value : getOutput(tag)) {
+ if (value.getWindows().contains(window)) {
+ valuesBuilder.add(TimestampedValue.of(value.getValue(), value.getTimestamp()));
+ }
+ }
+ return valuesBuilder.build();
+ }
+
+ /**
* Clears the record of the elements output so far to the main output.
*
* @see #peekOutputElements
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c627fa84/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
index 490ed7f..3261f85 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
@@ -17,14 +17,18 @@
*/
package org.apache.beam.sdk.transforms;
+import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItems;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.values.TimestampedValue;
+import org.hamcrest.Matchers;
import org.joda.time.Instant;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -205,6 +209,24 @@ public class DoFnTesterTest {
assertThat(aggValue, equalTo(1L + 2L));
}
+ @Test
+ public void peekValuesInWindow() throws Exception {
+ CounterDoFn fn = new CounterDoFn(1L, 2L);
+ DoFnTester<Long, String> tester = DoFnTester.of(fn);
+
+ tester.startBundle();
+ tester.processElement(1L);
+ tester.processElement(2L);
+ tester.finishBundle();
+
+ assertThat(tester.peekOutputElementsInWindow(GlobalWindow.INSTANCE),
+ containsInAnyOrder(TimestampedValue.of("1", new Instant(1000L)),
+ TimestampedValue.of("2", new Instant(2000L))));
+ assertThat(tester.peekOutputElementsInWindow(
+ new IntervalWindow(new Instant(0L), new Instant(10L))),
+ Matchers.<TimestampedValue<String>>emptyIterable());
+ }
+
/**
* A DoFn that adds values to an aggregator and converts input to String in processElement.
*/