You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/06/22 23:03:55 UTC

[3/9] beam git commit: ReduceFnTester can advance clocks without firing timers

ReduceFnTester can advance clocks without firing timers


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1c1f2395
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1c1f2395
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1c1f2395

Branch: refs/heads/master
Commit: 1c1f239501349f5120b0d619c4eea9c435500b78
Parents: d4e5db5
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jun 22 12:52:42 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jun 22 13:58:08 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/core/ReduceFnTester.java       | 24 +++++++++++++++++++-
 1 file changed, 23 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/1c1f2395/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index 7f83eae..ab9fd6e 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -420,6 +420,10 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
     return result;
   }
 
+  public void advanceInputWatermarkNoTimers(Instant newInputWatermark) throws Exception {
+    timerInternals.advanceInputWatermark(newInputWatermark);
+  }
+
   /**
    * Advance the input watermark to the specified time, firing any timers that should
    * fire. Then advance the output watermark as far as possible.
@@ -451,6 +455,10 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
     runner.persist();
   }
 
+  public void advanceProcessingTimeNoTimers(Instant newProcessingTime) throws Exception {
+    timerInternals.advanceProcessingTime(newProcessingTime);
+  }
+
   /**
    * If {@link #autoAdvanceOutputWatermark} is {@literal false}, advance the output watermark
    * to the given value. Otherwise throw.
@@ -535,13 +543,27 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
 
   public void fireTimer(W window, Instant timestamp, TimeDomain domain) throws Exception {
     ReduceFnRunner<String, InputT, OutputT, W> runner = createRunner();
-    ArrayList timers = new ArrayList(1);
+    ArrayList<TimerData> timers = new ArrayList<>(1);
     timers.add(
         TimerData.of(StateNamespaces.window(windowFn.windowCoder(), window), timestamp, domain));
     runner.onTimers(timers);
     runner.persist();
   }
 
+  public void fireTimers(W window, TimestampedValue<TimeDomain>... timers) throws Exception {
+    ReduceFnRunner<String, InputT, OutputT, W> runner = createRunner();
+    ArrayList<TimerData> timerData = new ArrayList<>(timers.length);
+    for (TimestampedValue<TimeDomain> timer : timers) {
+      timerData.add(
+          TimerData.of(
+              StateNamespaces.window(windowFn.windowCoder(), window),
+              timer.getTimestamp(),
+              timer.getValue()));
+    }
+    runner.onTimers(timerData);
+    runner.persist();
+  }
+
   /**
    * Convey the simulated state and implement {@link #outputWindowedValue} to capture all output
    * elements.