You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2016/04/20 01:03:40 UTC

[1/3] incubator-beam git commit: Add test for empty ON_TIME and no empty final pane

Repository: incubator-beam
Updated Branches:
  refs/heads/master 0952f4433 -> f1aa490b9


Add test for empty ON_TIME and no empty final pane

Add a test that we get an empty `ON_TIME` pane, and don't get the empty
final pane when using accumulation mode with the only if non-empty
`ClosingBehavior`.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e4c5f530
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e4c5f530
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e4c5f530

Branch: refs/heads/master
Commit: e4c5f530effc591ef56f8d49162a0d82069a9e31
Parents: fa45809
Author: bchambers <bc...@google.com>
Authored: Tue Apr 19 12:43:52 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Tue Apr 19 15:23:53 2016 -0700

----------------------------------------------------------------------
 .../beam/sdk/util/ReduceFnRunnerTest.java       | 54 ++++++++++++++++++++
 1 file changed, 54 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4c5f530/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
index 5eccb04..65b5ee6 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
@@ -548,6 +548,60 @@ public class ReduceFnRunnerTest {
   }
 
   @Test
+  public void noEmptyPanesFinalIfNonEmpty() throws Exception {
+    ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = ReduceFnTester.nonCombining(
+        WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
+            .withTrigger(Repeatedly.<IntervalWindow>forever(AfterFirst.<IntervalWindow>of(
+                AfterPane.elementCountAtLeast(2),
+                AfterWatermark.pastEndOfWindow())))
+            .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
+            .withAllowedLateness(Duration.millis(100))
+            .withClosingBehavior(ClosingBehavior.FIRE_IF_NON_EMPTY));
+
+    tester.advanceInputWatermark(new Instant(0));
+    tester.injectElements(
+        TimestampedValue.of(1, new Instant(1)),
+        TimestampedValue.of(2, new Instant(2)));
+    tester.advanceInputWatermark(new Instant(20));
+    tester.advanceInputWatermark(new Instant(250));
+
+    List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
+    assertThat(output, contains(
+        // Trigger with 2 elements
+        WindowMatchers.isSingleWindowedValue(containsInAnyOrder(1, 2), 1, 0, 10),
+        // Trigger for the empty on time pane
+        WindowMatchers.isSingleWindowedValue(containsInAnyOrder(1, 2), 9, 0, 10)));
+  }
+
+  @Test
+  public void noEmptyPanesFinalAlways() throws Exception {
+    ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = ReduceFnTester.nonCombining(
+        WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
+            .withTrigger(Repeatedly.<IntervalWindow>forever(AfterFirst.<IntervalWindow>of(
+                AfterPane.elementCountAtLeast(2),
+                AfterWatermark.pastEndOfWindow())))
+            .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
+            .withAllowedLateness(Duration.millis(100))
+            .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS));
+
+    tester.advanceInputWatermark(new Instant(0));
+    tester.injectElements(
+        TimestampedValue.of(1, new Instant(1)),
+        TimestampedValue.of(2, new Instant(2)));
+    tester.advanceInputWatermark(new Instant(20));
+    tester.advanceInputWatermark(new Instant(250));
+
+    List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
+    assertThat(output, contains(
+        // Trigger with 2 elements
+        WindowMatchers.isSingleWindowedValue(containsInAnyOrder(1, 2), 1, 0, 10),
+        // Trigger for the empty on time pane
+        WindowMatchers.isSingleWindowedValue(containsInAnyOrder(1, 2), 9, 0, 10),
+        // Trigger for the final pane
+        WindowMatchers.isSingleWindowedValue(containsInAnyOrder(1, 2), 9, 0, 10)));
+  }
+
+  @Test
   public void testPaneInfoAllStatesAfterWatermarkAccumulating() throws Exception {
     ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = ReduceFnTester.nonCombining(
         WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))


[3/3] incubator-beam git commit: This closes #211

Posted by bc...@apache.org.
This closes #211


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f1aa490b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f1aa490b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f1aa490b

Branch: refs/heads/master
Commit: f1aa490b948be3f3c09f19b0a41ff8bc6b83089b
Parents: 0952f44 e4c5f53
Author: bchambers <bc...@google.com>
Authored: Tue Apr 19 15:23:58 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Tue Apr 19 15:23:58 2016 -0700

----------------------------------------------------------------------
 .../beam/sdk/util/ReduceFnRunnerTest.java       | 72 +++++++++++++++++---
 1 file changed, 63 insertions(+), 9 deletions(-)
----------------------------------------------------------------------



[2/3] incubator-beam git commit: Remove unused generic arguments in ReduceFnRunnerTest.

Posted by bc...@apache.org.
Remove unused generic arguments in ReduceFnRunnerTest.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fa458091
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fa458091
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fa458091

Branch: refs/heads/master
Commit: fa4580915943a0bac58f1452f1ef1e43aad986fb
Parents: 0952f44
Author: bchambers <bc...@google.com>
Authored: Tue Apr 19 11:34:25 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Tue Apr 19 15:23:53 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/util/ReduceFnRunnerTest.java  | 18 +++++++++---------
 1 file changed, 9 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa458091/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
index 1939b20..5eccb04 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
@@ -500,8 +500,8 @@ public class ReduceFnRunnerTest {
     ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = ReduceFnTester.nonCombining(
         WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
             .withTrigger(Repeatedly.<IntervalWindow>forever(AfterFirst.<IntervalWindow>of(
-                AfterPane.<IntervalWindow>elementCountAtLeast(2),
-                AfterWatermark.<IntervalWindow>pastEndOfWindow())))
+                AfterPane.elementCountAtLeast(2),
+                AfterWatermark.pastEndOfWindow())))
             .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
             .withAllowedLateness(Duration.millis(100))
             .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS));
@@ -552,8 +552,8 @@ public class ReduceFnRunnerTest {
     ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = ReduceFnTester.nonCombining(
         WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
             .withTrigger(Repeatedly.<IntervalWindow>forever(AfterFirst.<IntervalWindow>of(
-                AfterPane.<IntervalWindow>elementCountAtLeast(2),
-                AfterWatermark.<IntervalWindow>pastEndOfWindow())))
+                AfterPane.elementCountAtLeast(2),
+                AfterWatermark.pastEndOfWindow())))
             .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
             .withAllowedLateness(Duration.millis(100))
             .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS));
@@ -604,8 +604,8 @@ public class ReduceFnRunnerTest {
     ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = ReduceFnTester.nonCombining(
         WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
             .withTrigger(
-                Repeatedly.<IntervalWindow>forever(AfterPane.<IntervalWindow>elementCountAtLeast(2))
-                    .orFinally(AfterWatermark.<IntervalWindow>pastEndOfWindow()))
+                Repeatedly.<IntervalWindow>forever(AfterPane.elementCountAtLeast(2))
+                    .orFinally(AfterWatermark.pastEndOfWindow()))
             .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
             .withAllowedLateness(Duration.millis(100))
             .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS));
@@ -890,7 +890,7 @@ public class ReduceFnRunnerTest {
     ReduceFnTester<Integer, Integer, IntervalWindow> tester = ReduceFnTester.combining(
         WindowingStrategy.of(
             SlidingWindows.of(Duration.millis(100)).every(Duration.millis(30)))
-        .withTrigger(AfterWatermark.<IntervalWindow>pastEndOfWindow())
+        .withTrigger(AfterWatermark.pastEndOfWindow())
         .withAllowedLateness(Duration.millis(1000)),
         new Sum.SumIntegerFn().<String>asKeyedFn(), VarIntCoder.of());
 
@@ -1024,7 +1024,7 @@ public class ReduceFnRunnerTest {
                     .<IntervalWindow>forever(
                         AfterProcessingTime.<IntervalWindow>pastFirstElementInPane().plusDelayOf(
                             new Duration(5)))
-                    .orFinally(AfterWatermark.<IntervalWindow>pastEndOfWindow()),
+                    .orFinally(AfterWatermark.pastEndOfWindow()),
                 Repeatedly.<IntervalWindow>forever(
                     AfterProcessingTime.<IntervalWindow>pastFirstElementInPane().plusDelayOf(
                         new Duration(25)))),
@@ -1074,7 +1074,7 @@ public class ReduceFnRunnerTest {
                     .<IntervalWindow>forever(
                         AfterProcessingTime.<IntervalWindow>pastFirstElementInPane().plusDelayOf(
                             new Duration(5)))
-                    .orFinally(AfterWatermark.<IntervalWindow>pastEndOfWindow()),
+                    .orFinally(AfterWatermark.pastEndOfWindow()),
                 Repeatedly.<IntervalWindow>forever(
                     AfterProcessingTime.<IntervalWindow>pastFirstElementInPane().plusDelayOf(
                         new Duration(25)))),