You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/06/22 23:03:53 UTC

[1/9] beam git commit: Add tests for corner cases of processing time timers

Repository: beam
Updated Branches:
  refs/heads/master 5506be87d -> 7645c44b9


Add tests for corner cases of processing time timers


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d2b384a2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d2b384a2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d2b384a2

Branch: refs/heads/master
Commit: d2b384a20dbb0213d0f63e74713a06d63bad8d39
Parents: fda589c
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jun 22 13:05:42 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jun 22 13:58:08 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/core/ReduceFnRunnerTest.java   | 70 ++++++++++++++++++++
 .../beam/sdk/transforms/GroupByKeyTest.java     | 39 +++++++++++
 2 files changed, 109 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d2b384a2/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
index fa5ba8b..4f68038 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
@@ -284,6 +284,44 @@ public class ReduceFnRunnerTest {
   }
 
   /**
+   * Tests that when a processing time timer comes in after a window is expired
+   * but in the same bundle it does not cause a spurious output.
+   */
+  @Test
+  public void testCombiningAccumulatingProcessingTime() throws Exception {
+    WindowingStrategy<?, IntervalWindow> strategy =
+        WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(100)))
+            .withTimestampCombiner(TimestampCombiner.EARLIEST)
+            .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
+            .withAllowedLateness(Duration.ZERO)
+            .withTrigger(
+                Repeatedly.forever(
+                    AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(10))));
+
+    ReduceFnTester<Integer, Integer, IntervalWindow> tester =
+        ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of());
+
+    tester.advanceProcessingTime(new Instant(5000));
+    injectElement(tester, 2); // processing timer @ 5000 + 10; EOW timer @ 100
+    injectElement(tester, 5);
+
+    tester.advanceInputWatermarkNoTimers(new Instant(100));
+    tester.advanceProcessingTimeNoTimers(new Instant(5010));
+
+    // Fires the GC/EOW timer at the same time as the processing time timer.
+    tester.fireTimers(
+        new IntervalWindow(new Instant(0), new Instant(100)),
+        TimestampedValue.of(TimeDomain.EVENT_TIME, new Instant(100)),
+        TimestampedValue.of(TimeDomain.PROCESSING_TIME, new Instant(5010)));
+
+    assertThat(
+        tester.extractOutput(),
+        contains(
+            isSingleWindowedValue(
+                equalTo(7), 2, 0, 100, PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0))));
+  }
+
+  /**
    * Tests that the garbage collection time for a fixed window does not overflow the end of time.
    */
   @Test
@@ -351,6 +389,38 @@ public class ReduceFnRunnerTest {
   }
 
   /**
+   * Tests that when a processing time timers comes in after a window is expired
+   * and GC'd it does not cause a spurious output.
+   */
+  @Test
+  public void testCombiningAccumulatingProcessingTimeSeparateBundles() throws Exception {
+    WindowingStrategy<?, IntervalWindow> strategy =
+        WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(100)))
+            .withTimestampCombiner(TimestampCombiner.EARLIEST)
+            .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
+            .withAllowedLateness(Duration.ZERO)
+            .withTrigger(
+                Repeatedly.forever(
+                    AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(10))));
+
+    ReduceFnTester<Integer, Integer, IntervalWindow> tester =
+        ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of());
+
+    tester.advanceProcessingTime(new Instant(5000));
+    injectElement(tester, 2); // processing timer @ 5000 + 10; EOW timer @ 100
+    injectElement(tester, 5);
+
+    tester.advanceInputWatermark(new Instant(100));
+    tester.advanceProcessingTime(new Instant(5011));
+
+    assertThat(
+        tester.extractOutput(),
+        contains(
+            isSingleWindowedValue(
+                equalTo(7), 2, 0, 100, PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0))));
+  }
+
+  /**
    * Tests that if end-of-window and GC timers come in together, that the pane is correctly
    * marked as final.
    */

http://git-wip-us.apache.org/repos/asf/beam/blob/d2b384a2/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
index 171171f..4b5d5f5 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
@@ -45,14 +45,19 @@ import org.apache.beam.sdk.coders.CoderProviders;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.MapCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.testing.LargeKeys;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.testing.UsesTestStream;
 import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.InvalidWindows;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
 import org.apache.beam.sdk.transforms.windowing.Sessions;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.transforms.windowing.Window;
@@ -184,6 +189,40 @@ public class GroupByKeyTest {
     p.run();
   }
 
+  /**
+   * Tests that when a processing time timers comes in after a window is expired it does not cause a
+   * spurious output.
+   */
+  @Test
+  @Category({ValidatesRunner.class, UsesTestStream.class})
+  public void testCombiningAccumulatingProcessingTime() throws Exception {
+    PCollection<Integer> triggeredSums =
+        p.apply(
+                TestStream.create(VarIntCoder.of())
+                    .advanceWatermarkTo(new Instant(0))
+                    .addElements(
+                        TimestampedValue.of(2, new Instant(2)),
+                        TimestampedValue.of(5, new Instant(5)))
+                    .advanceWatermarkTo(new Instant(100))
+                    .advanceProcessingTime(Duration.millis(10))
+                    .advanceWatermarkToInfinity())
+            .apply(
+                Window.<Integer>into(FixedWindows.of(Duration.millis(100)))
+                    .withTimestampCombiner(TimestampCombiner.EARLIEST)
+                    .accumulatingFiredPanes()
+                    .withAllowedLateness(Duration.ZERO)
+                    .triggering(
+                        Repeatedly.forever(
+                            AfterProcessingTime.pastFirstElementInPane()
+                                .plusDelayOf(Duration.millis(10)))))
+            .apply(Sum.integersGlobally().withoutDefaults());
+
+    PAssert.that(triggeredSums)
+        .containsInAnyOrder(7);
+
+    p.run();
+  }
+
   @Test
   public void testGroupByKeyNonDeterministic() throws Exception {
 


[7/9] beam git commit: Tidy LateDataDroppingDoFnRunner

Posted by ke...@apache.org.
Tidy LateDataDroppingDoFnRunner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d4e5db51
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d4e5db51
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d4e5db51

Branch: refs/heads/master
Commit: d4e5db51a025a831ddf4e3bc0e003caebabf647b
Parents: 497cfab
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jun 22 11:56:53 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jun 22 13:58:08 2017 -0700

----------------------------------------------------------------------
 .../core/LateDataDroppingDoFnRunner.java        | 33 ++++++++++----------
 1 file changed, 17 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d4e5db51/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
index 1cf1509..28938c1 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
@@ -134,26 +134,27 @@ public class LateDataDroppingDoFnRunner<K, InputT, OutputT, W extends BoundedWin
           // The element is too late for this window.
           droppedDueToLateness.inc();
           WindowTracing.debug(
-              "ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{} "
-              + "since too far behind inputWatermark:{}; outputWatermark:{}",
-              input.getTimestamp(), key, window, timerInternals.currentInputWatermarkTime(),
+              "{}: Dropping element at {} for key:{}; window:{} "
+                  + "since too far behind inputWatermark:{}; outputWatermark:{}",
+              LateDataFilter.class.getSimpleName(),
+              input.getTimestamp(),
+              key,
+              window,
+              timerInternals.currentInputWatermarkTime(),
               timerInternals.currentOutputWatermarkTime());
         }
       }
 
-      Iterable<WindowedValue<InputT>> nonLateElements = Iterables.filter(
-          concatElements,
-          new Predicate<WindowedValue<InputT>>() {
-            @Override
-            public boolean apply(WindowedValue<InputT> input) {
-              BoundedWindow window = Iterables.getOnlyElement(input.getWindows());
-              if (canDropDueToExpiredWindow(window)) {
-                return false;
-              } else {
-                return true;
-              }
-            }
-          });
+      Iterable<WindowedValue<InputT>> nonLateElements =
+          Iterables.filter(
+              concatElements,
+              new Predicate<WindowedValue<InputT>>() {
+                @Override
+                public boolean apply(WindowedValue<InputT> input) {
+                  BoundedWindow window = Iterables.getOnlyElement(input.getWindows());
+                  return !canDropDueToExpiredWindow(window);
+                }
+              });
       return nonLateElements;
     }
 


[2/9] beam git commit: Add test reproducing BEAM-2505, ignored

Posted by ke...@apache.org.
Add test reproducing BEAM-2505, ignored


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fda589c0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fda589c0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fda589c0

Branch: refs/heads/master
Commit: fda589c00c8920e76cfc9aaa87cecfa94077599d
Parents: 50c43d9
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jun 22 13:04:23 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jun 22 13:58:08 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/core/ReduceFnRunnerTest.java   | 31 ++++++++++++++++++++
 1 file changed, 31 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/fda589c0/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
index 2b66162..fa5ba8b 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
@@ -78,6 +78,7 @@ import org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -349,6 +350,36 @@ public class ReduceFnRunnerTest {
     assertThat(tester.extractOutput(), contains(isWindowedValue(equalTo(55))));
   }
 
+  /**
+   * Tests that if end-of-window and GC timers come in together, that the pane is correctly
+   * marked as final.
+   */
+  @Test
+  @Ignore("https://issues.apache.org/jira/browse/BEAM-2505")
+  public void testCombiningAccumulatingEventTime() throws Exception {
+    WindowingStrategy<?, IntervalWindow> strategy =
+        WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(100)))
+            .withTimestampCombiner(TimestampCombiner.EARLIEST)
+            .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
+            .withAllowedLateness(Duration.millis(1))
+            .withTrigger(Repeatedly.forever(AfterWatermark.pastEndOfWindow()));
+
+    ReduceFnTester<Integer, Integer, IntervalWindow> tester =
+        ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of());
+
+    injectElement(tester, 2); // processing timer @ 5000 + 10; EOW timer @ 100
+    injectElement(tester, 5);
+
+    tester.advanceInputWatermark(new Instant(1000));
+
+    assertThat(
+        tester.extractOutput(),
+        contains(
+            isSingleWindowedValue(
+                equalTo(7), 2, 0, 100, PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0))));
+  }
+
+
   @Test
   public void testOnElementCombiningAccumulating() throws Exception {
     // Test basic execution of a trigger using a non-combining window set and accumulating mode.


[6/9] beam git commit: Do not GC windows based on processing time timer!

Posted by ke...@apache.org.
Do not GC windows based on processing time timer!


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/50c43d96
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/50c43d96
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/50c43d96

Branch: refs/heads/master
Commit: 50c43d96adb8c2523cf38c09f32e241eacc47823
Parents: 412fd7e
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jun 22 12:56:34 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jun 22 13:58:08 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/core/ReduceFnRunner.java       |  3 +-
 .../beam/runners/core/ReduceFnRunnerTest.java   | 35 +++++++++++++++++++-
 2 files changed, 36 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/50c43d96/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
index b5c3e3e..75b6acd 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
@@ -663,7 +663,8 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
       this.isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain()
           && timer.getTimestamp().equals(window.maxTimestamp());
       Instant cleanupTime = LateDataUtils.garbageCollectionTime(window, windowingStrategy);
-      this.isGarbageCollection = !timer.getTimestamp().isBefore(cleanupTime);
+      this.isGarbageCollection =
+          TimeDomain.EVENT_TIME == timer.getDomain() && !timer.getTimestamp().isBefore(cleanupTime);
     }
 
     // Has this window had its trigger finish?

http://git-wip-us.apache.org/repos/asf/beam/blob/50c43d96/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
index 9e71300..2b66162 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
@@ -140,7 +140,40 @@ public class ReduceFnRunnerTest {
       }
     })
     .when(mockTrigger).onFire(anyTriggerContext());
- }
+  }
+
+  /**
+   * Tests that a processing time timer does not cause window GC.
+   */
+  @Test
+  public void testProcessingTimeTimerDoesNotGc() throws Exception {
+    WindowingStrategy<?, IntervalWindow> strategy =
+        WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(100)))
+            .withTimestampCombiner(TimestampCombiner.EARLIEST)
+            .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
+            .withAllowedLateness(Duration.ZERO)
+            .withTrigger(
+                Repeatedly.forever(
+                    AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(10))));
+
+    ReduceFnTester<Integer, Integer, IntervalWindow> tester =
+        ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of());
+
+    tester.advanceProcessingTime(new Instant(5000));
+    injectElement(tester, 2); // processing timer @ 5000 + 10; EOW timer @ 100
+    injectElement(tester, 5);
+
+    tester.advanceProcessingTime(new Instant(10000));
+
+    tester.assertHasOnlyGlobalAndStateFor(
+        new IntervalWindow(new Instant(0), new Instant(100)));
+
+    assertThat(
+        tester.extractOutput(),
+        contains(
+            isSingleWindowedValue(
+                equalTo(7), 2, 0, 100, PaneInfo.createPane(true, false, Timing.EARLY, 0, 0))));
+  }
 
   @Test
   public void testOnElementBufferingDiscarding() throws Exception {


[8/9] beam git commit: Drop late data in ReduceFnTester

Posted by ke...@apache.org.
Drop late data in ReduceFnTester


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/412fd7ea
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/412fd7ea
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/412fd7ea

Branch: refs/heads/master
Commit: 412fd7eab9e58a4d412f4dff5ffec023610b4f22
Parents: 795760d
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jun 22 12:56:14 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jun 22 13:58:08 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/runners/core/ReduceFnTester.java     | 11 ++++++++---
 1 file changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/412fd7ea/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index 1fe8f73..7ca96b9 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -529,8 +529,8 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
     for (TimestampedValue<InputT> value : values) {
       WindowTracing.trace("TriggerTester.injectElements: {}", value);
     }
-    ReduceFnRunner<String, InputT, OutputT, W> runner = createRunner();
-    runner.processElements(
+
+    Iterable<WindowedValue<InputT>> inputs =
         Iterables.transform(
             Arrays.asList(values),
             new Function<TimestampedValue<InputT>, WindowedValue<InputT>>() {
@@ -548,7 +548,12 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
                   throw new RuntimeException(e);
                 }
               }
-            }));
+            });
+
+    ReduceFnRunner<String, InputT, OutputT, W> runner = createRunner();
+    runner.processElements(
+        new LateDataDroppingDoFnRunner.LateDataFilter(objectStrategy, timerInternals)
+            .filter(KEY, inputs));
 
     // Persist after each bundle.
     runner.persist();


[9/9] beam git commit: This closes #3425: Fix processing time timer handling in ReduceFnRunner

Posted by ke...@apache.org.
This closes #3425: Fix processing time timer handling in ReduceFnRunner

  Add tests for corner cases of processing time timers
  Add test reproducing BEAM-2505, ignored
  Do not GC windows based on processing time timer!
  Drop late data in ReduceFnTester
  ReduceFnTester assertion for windows that have data buffered
  ReduceFnTester can advance clocks without firing timers
  Tidy LateDataDroppingDoFnRunner
  Add window matcher for pane info


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7645c44b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7645c44b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7645c44b

Branch: refs/heads/master
Commit: 7645c44b9bae9a77192fbe6744918cfb95ca1a3a
Parents: 5506be8 d2b384a
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jun 22 15:41:02 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jun 22 15:41:02 2017 -0700

----------------------------------------------------------------------
 .../core/LateDataDroppingDoFnRunner.java        |  33 ++---
 .../beam/runners/core/ReduceFnRunner.java       |   3 +-
 .../beam/runners/core/SystemReduceFn.java       |   6 +
 .../beam/runners/core/ReduceFnRunnerTest.java   | 136 ++++++++++++++++++-
 .../beam/runners/core/ReduceFnTester.java       |  48 ++++++-
 .../beam/runners/core/WindowMatchers.java       |  15 ++
 .../beam/sdk/transforms/GroupByKeyTest.java     |  39 ++++++
 7 files changed, 258 insertions(+), 22 deletions(-)
----------------------------------------------------------------------



[5/9] beam git commit: ReduceFnTester assertion for windows that have data buffered

Posted by ke...@apache.org.
ReduceFnTester assertion for windows that have data buffered


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/795760d3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/795760d3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/795760d3

Branch: refs/heads/master
Commit: 795760d370bcbe28e1f0ca373ad4c8c841e6e6b5
Parents: 1c1f239
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jun 22 12:53:15 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jun 22 13:58:08 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/runners/core/SystemReduceFn.java   |  6 ++++++
 .../org/apache/beam/runners/core/ReduceFnTester.java   | 13 +++++++++++++
 2 files changed, 19 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/795760d3/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
index c189b0d..3144bd6 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.core;
 
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.state.BagState;
 import org.apache.beam.sdk.state.CombiningState;
@@ -103,6 +104,11 @@ public abstract class SystemReduceFn<K, InputT, AccumT, OutputT, W extends Bound
     this.bufferTag = bufferTag;
   }
 
+  @VisibleForTesting
+  StateTag<? extends GroupingState<InputT, OutputT>> getBufferTag() {
+    return bufferTag;
+  }
+
   @Override
   public void processValue(ProcessValueContext c) throws Exception {
     c.state().access(bufferTag).add(c.value());

http://git-wip-us.apache.org/repos/asf/beam/blob/795760d3/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index ab9fd6e..1fe8f73 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -318,6 +318,19 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
   }
 
   @SafeVarargs
+  public final void assertHasOnlyGlobalAndStateFor(W... expectedWindows) {
+    assertHasOnlyGlobalAndAllowedTags(
+        ImmutableSet.copyOf(expectedWindows),
+        ImmutableSet.<StateTag<?>>of(
+            ((SystemReduceFn<?, ?, ?, ?, ?>) reduceFn).getBufferTag(),
+            TriggerStateMachineRunner.FINISHED_BITS_TAG,
+            PaneInfoTracker.PANE_INFO_TAG,
+            WatermarkHold.watermarkHoldTagForTimestampCombiner(
+                objectStrategy.getTimestampCombiner()),
+            WatermarkHold.EXTRA_HOLD_TAG));
+  }
+
+  @SafeVarargs
   public final void assertHasOnlyGlobalAndFinishedSetsAndPaneInfoFor(W... expectedWindows) {
     assertHasOnlyGlobalAndAllowedTags(
         ImmutableSet.copyOf(expectedWindows),


[3/9] beam git commit: ReduceFnTester can advance clocks without firing timers

Posted by ke...@apache.org.
ReduceFnTester can advance clocks without firing timers


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1c1f2395
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1c1f2395
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1c1f2395

Branch: refs/heads/master
Commit: 1c1f239501349f5120b0d619c4eea9c435500b78
Parents: d4e5db5
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jun 22 12:52:42 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jun 22 13:58:08 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/core/ReduceFnTester.java       | 24 +++++++++++++++++++-
 1 file changed, 23 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/1c1f2395/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index 7f83eae..ab9fd6e 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -420,6 +420,10 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
     return result;
   }
 
+  public void advanceInputWatermarkNoTimers(Instant newInputWatermark) throws Exception {
+    timerInternals.advanceInputWatermark(newInputWatermark);
+  }
+
   /**
    * Advance the input watermark to the specified time, firing any timers that should
    * fire. Then advance the output watermark as far as possible.
@@ -451,6 +455,10 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
     runner.persist();
   }
 
+  public void advanceProcessingTimeNoTimers(Instant newProcessingTime) throws Exception {
+    timerInternals.advanceProcessingTime(newProcessingTime);
+  }
+
   /**
    * If {@link #autoAdvanceOutputWatermark} is {@literal false}, advance the output watermark
    * to the given value. Otherwise throw.
@@ -535,13 +543,27 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
 
   public void fireTimer(W window, Instant timestamp, TimeDomain domain) throws Exception {
     ReduceFnRunner<String, InputT, OutputT, W> runner = createRunner();
-    ArrayList timers = new ArrayList(1);
+    ArrayList<TimerData> timers = new ArrayList<>(1);
     timers.add(
         TimerData.of(StateNamespaces.window(windowFn.windowCoder(), window), timestamp, domain));
     runner.onTimers(timers);
     runner.persist();
   }
 
+  public void fireTimers(W window, TimestampedValue<TimeDomain>... timers) throws Exception {
+    ReduceFnRunner<String, InputT, OutputT, W> runner = createRunner();
+    ArrayList<TimerData> timerData = new ArrayList<>(timers.length);
+    for (TimestampedValue<TimeDomain> timer : timers) {
+      timerData.add(
+          TimerData.of(
+              StateNamespaces.window(windowFn.windowCoder(), window),
+              timer.getTimestamp(),
+              timer.getValue()));
+    }
+    runner.onTimers(timerData);
+    runner.persist();
+  }
+
   /**
    * Convey the simulated state and implement {@link #outputWindowedValue} to capture all output
    * elements.


[4/9] beam git commit: Add window matcher for pane info

Posted by ke...@apache.org.
Add window matcher for pane info


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/497cfabe
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/497cfabe
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/497cfabe

Branch: refs/heads/master
Commit: 497cfabea7d6dcee0c5d327022678c571c3ec487
Parents: 9ed0af8
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jun 22 11:31:28 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jun 22 13:58:08 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/runners/core/WindowMatchers.java | 15 +++++++++++++++
 1 file changed, 15 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/497cfabe/runners/core-java/src/test/java/org/apache/beam/runners/core/WindowMatchers.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/WindowMatchers.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/WindowMatchers.java
index 9769d10..26cbfee 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/WindowMatchers.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/WindowMatchers.java
@@ -116,6 +116,21 @@ public class WindowMatchers {
   }
 
   public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
+      Matcher<T> valueMatcher,
+      long timestamp,
+      long windowStart,
+      long windowEnd,
+      PaneInfo paneInfo) {
+    IntervalWindow intervalWindow =
+        new IntervalWindow(new Instant(windowStart), new Instant(windowEnd));
+    return WindowMatchers.<T>isSingleWindowedValue(
+        valueMatcher,
+        Matchers.describedAs("%0", Matchers.equalTo(new Instant(timestamp)), timestamp),
+        Matchers.<BoundedWindow>equalTo(intervalWindow),
+        Matchers.equalTo(paneInfo));
+  }
+
+  public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
       Matcher<? super T> valueMatcher,
       Matcher<? super Instant> timestampMatcher,
       Matcher<? super BoundedWindow> windowMatcher) {