You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/06/15 03:18:44 UTC

[2/3] incubator-beam git commit: Add DoFnTester#peekOutputValuesInWindow

Add DoFnTester#peekOutputValuesInWindow

This permits DoFns that interact with windowing to test the windowed,
rather than overall output.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c627fa84
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c627fa84
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c627fa84

Branch: refs/heads/master
Commit: c627fa847fe649b4308c2becbe101dcb0a945843
Parents: 4da5ebf
Author: Thomas Groh <tg...@google.com>
Authored: Tue Jun 14 13:18:41 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Jun 14 18:25:23 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/transforms/DoFnTester.java  | 25 ++++++++++++++++++++
 .../beam/sdk/transforms/DoFnTesterTest.java     | 22 +++++++++++++++++
 2 files changed, 47 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c627fa84/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 1df42e2..415af95 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -34,6 +34,7 @@ import org.apache.beam.sdk.values.TupleTagList;
 
 import com.google.common.base.Function;
 import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 
@@ -287,6 +288,30 @@ public class DoFnTester<InputT, OutputT> {
   }
 
   /**
+   * Returns the elements output so far to the main output in the provided window with associated
+   * timestamps.
+   */
+  public List<TimestampedValue<OutputT>> peekOutputElementsInWindow(BoundedWindow window) {
+    return peekOutputElementsInWindow(mainOutputTag, window);
+  }
+
+  /**
+   * Returns the elements output so far to the specified output in the provided window with
+   * associated timestamps.
+   */
+  public List<TimestampedValue<OutputT>> peekOutputElementsInWindow(
+      TupleTag<OutputT> tag,
+      BoundedWindow window) {
+    ImmutableList.Builder<TimestampedValue<OutputT>> valuesBuilder = ImmutableList.builder();
+    for (WindowedValue<OutputT> value : getOutput(tag)) {
+      if (value.getWindows().contains(window)) {
+        valuesBuilder.add(TimestampedValue.of(value.getValue(), value.getTimestamp()));
+      }
+    }
+    return valuesBuilder.build();
+  }
+
+  /**
    * Clears the record of the elements output so far to the main output.
    *
    * @see #peekOutputElements

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c627fa84/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
index 490ed7f..3261f85 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
@@ -17,14 +17,18 @@
  */
 package org.apache.beam.sdk.transforms;
 
+import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasItems;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.values.TimestampedValue;
 
+import org.hamcrest.Matchers;
 import org.joda.time.Instant;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -205,6 +209,24 @@ public class DoFnTesterTest {
     assertThat(aggValue, equalTo(1L + 2L));
   }
 
+  @Test
+  public void peekValuesInWindow() throws Exception {
+    CounterDoFn fn = new CounterDoFn(1L, 2L);
+    DoFnTester<Long, String> tester = DoFnTester.of(fn);
+
+    tester.startBundle();
+    tester.processElement(1L);
+    tester.processElement(2L);
+    tester.finishBundle();
+
+    assertThat(tester.peekOutputElementsInWindow(GlobalWindow.INSTANCE),
+        containsInAnyOrder(TimestampedValue.of("1", new Instant(1000L)),
+            TimestampedValue.of("2", new Instant(2000L))));
+    assertThat(tester.peekOutputElementsInWindow(
+        new IntervalWindow(new Instant(0L), new Instant(10L))),
+        Matchers.<TimestampedValue<String>>emptyIterable());
+  }
+
   /**
    * A DoFn that adds values to an aggregator and converts input to String in processElement.
    */