You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/04/04 17:10:43 UTC

flink git commit: [FLINK-9107] [docs] Document timer coalescing for ProcessFunction

Repository: flink
Updated Branches:
  refs/heads/master bd8b47956 -> 7b0fc58f7


[FLINK-9107] [docs] Document timer coalescing for ProcessFunction

This closes #5790.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7b0fc58f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7b0fc58f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7b0fc58f

Branch: refs/heads/master
Commit: 7b0fc58f75494c9a2c71d551632445ded85c0a45
Parents: bd8b479
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Mar 29 16:20:00 2018 +0200
Committer: Timo Walther <tw...@apache.org>
Committed: Wed Apr 4 19:09:53 2018 +0200

----------------------------------------------------------------------
 docs/dev/stream/operators/process_function.md | 54 +++++++++++++++++++++-
 1 file changed, 53 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7b0fc58f/docs/dev/stream/operators/process_function.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/operators/process_function.md b/docs/dev/stream/operators/process_function.md
index d967983..1ed4edf 100644
--- a/docs/dev/stream/operators/process_function.md
+++ b/docs/dev/stream/operators/process_function.md
@@ -269,4 +269,56 @@ override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT]):
 }
 {% endhighlight %}
 </div>
-</div>
\ No newline at end of file
+</div>
+
+## Optimizations
+
+### Timer Coalescing
+
+Every timer registered at the `TimerService` via `registerEventTimeTimer()` or
+`registerProcessingTimeTimer()` will be stored on the Java heap and enqueued for execution. There is,
+however, a maximum of one timer per key and timestamp at a millisecond resolution and thus, in the
+worst case, every key may have a timer for each upcoming millisecond. Even if you do not do any
+processing for outdated timers in `onTimer`, this may put a significant burden on the
+Flink runtime.
+
+Since there is only one timer per key and timestamp, however, you may coalesce timers by reducing the
+timer resolution. For a timer resolution of 1 second (event or processing time), for example, you
+can round down the target time to full seconds and therefore allow the timer to fire at most 1
+second earlier but not later than with millisecond accuracy. As a result, there would be at most
+one timer for each combination of key and timestamp:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+long coalescedTime = ((ctx.timestamp() + timeout) / 1000) * 1000;
+ctx.timerService().registerProcessingTimeTimer(coalescedTime);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val coalescedTime = ((ctx.timestamp + timeout) / 1000) * 1000
+ctx.timerService.registerProcessingTimeTimer(coalescedTime)
+{% endhighlight %}
+</div>
+</div>
+
+Since event-time timers only fire with watermarks coming in, you may also schedule and coalesce
+these timers with the next watermark by using the current one:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+long coalescedTime = ctx.timerService().currentWatermark() + 1;
+ctx.timerService().registerEventTimeTimer(coalescedTime);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val coalescedTime = ctx.timerService.currentWatermark + 1
+ctx.timerService.registerEventTimeTimer(coalescedTime)
+{% endhighlight %}
+</div>
+</div>