You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/06/22 23:03:55 UTC
[3/9] beam git commit: ReduceFnTester can advance clocks without
firing timers
ReduceFnTester can advance clocks without firing timers
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1c1f2395
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1c1f2395
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1c1f2395
Branch: refs/heads/master
Commit: 1c1f239501349f5120b0d619c4eea9c435500b78
Parents: d4e5db5
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jun 22 12:52:42 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jun 22 13:58:08 2017 -0700
----------------------------------------------------------------------
.../beam/runners/core/ReduceFnTester.java | 24 +++++++++++++++++++-
1 file changed, 23 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/1c1f2395/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index 7f83eae..ab9fd6e 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -420,6 +420,10 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
return result;
}
+ public void advanceInputWatermarkNoTimers(Instant newInputWatermark) throws Exception {
+ timerInternals.advanceInputWatermark(newInputWatermark);
+ }
+
/**
* Advance the input watermark to the specified time, firing any timers that should
* fire. Then advance the output watermark as far as possible.
@@ -451,6 +455,10 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
runner.persist();
}
+ public void advanceProcessingTimeNoTimers(Instant newProcessingTime) throws Exception {
+ timerInternals.advanceProcessingTime(newProcessingTime);
+ }
+
/**
* If {@link #autoAdvanceOutputWatermark} is {@literal false}, advance the output watermark
* to the given value. Otherwise throw.
@@ -535,13 +543,27 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
public void fireTimer(W window, Instant timestamp, TimeDomain domain) throws Exception {
ReduceFnRunner<String, InputT, OutputT, W> runner = createRunner();
- ArrayList timers = new ArrayList(1);
+ ArrayList<TimerData> timers = new ArrayList<>(1);
timers.add(
TimerData.of(StateNamespaces.window(windowFn.windowCoder(), window), timestamp, domain));
runner.onTimers(timers);
runner.persist();
}
+ public void fireTimers(W window, TimestampedValue<TimeDomain>... timers) throws Exception {
+ ReduceFnRunner<String, InputT, OutputT, W> runner = createRunner();
+ ArrayList<TimerData> timerData = new ArrayList<>(timers.length);
+ for (TimestampedValue<TimeDomain> timer : timers) {
+ timerData.add(
+ TimerData.of(
+ StateNamespaces.window(windowFn.windowCoder(), window),
+ timer.getTimestamp(),
+ timer.getValue()));
+ }
+ runner.onTimers(timerData);
+ runner.persist();
+ }
+
/**
* Convey the simulated state and implement {@link #outputWindowedValue} to capture all output
* elements.