You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/08/20 13:12:16 UTC

[flink] 01/02: [FLINK-10068][docs] Add documentation for RocksDB-based timers and stopping timers

This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b456cca8c92adf62e7e58a495f9692b2c9c2cb96
Author: Stefan Richter <s....@data-artisans.com>
AuthorDate: Mon Aug 6 14:54:04 2018 +0200

    [FLINK-10068][docs] Add documentation for RocksDB-based timers and stopping timers
    
    This closes #6504.
---
 docs/dev/stream/operators/process_function.md | 49 ++++++++++++++++++++++++---
 docs/ops/state/large_state_tuning.md          | 20 ++++++++---
 docs/ops/state/state_backends.md              |  3 +-
 3 files changed, 61 insertions(+), 11 deletions(-)

diff --git a/docs/dev/stream/operators/process_function.md b/docs/dev/stream/operators/process_function.md
index 4f36721..b2a373e 100644
--- a/docs/dev/stream/operators/process_function.md
+++ b/docs/dev/stream/operators/process_function.md
@@ -277,19 +277,18 @@ Both types of timers (processing-time and event-time) are internally maintained
 
 The `TimerService` deduplicates timers per key and timestamp, i.e., there is at most one timer per key and timestamp. If multiple timers are registered for the same timestamp, the `onTimer()` method will be called just once.
 
-**Note:** Flink synchronizes invocations of `onTimer()` and `processElement()`. Hence, users do not have to worry about concurrent modification of state.
+<span class="label label-info">Note</span> Flink synchronizes invocations of `onTimer()` and `processElement()`. Hence, users do not have to worry about concurrent modification of state.
 
 ### Fault Tolerance
 
 Timers are fault tolerant and checkpointed along with the state of the application. 
 In case of a failure recovery or when starting an application from a savepoint, the timers are restored.
 
-**Note:** Checkpointed processing-time timers that were supposed to fire before their restoration, will fire immediately. 
+<span class="label label-info">Note</span> Checkpointed processing-time timers that were supposed to fire before their restoration, will fire immediately.
 This might happen when an application recovers from a failure or when it is started from a savepoint.
 
-**Note:** Timers are always synchronously checkpointed, regardless of the configuration of the state backends. 
-Therefore, a large number of timers can significantly increase checkpointing time. 
-See the "Timer Coalescing" section for advice on how to reduce the number of timers.
+<span class="label label-info">Note</span> Timers are always asynchronously checkpointed, except for the combination of RocksDB backend / with incremental snapshots / with heap-based timers (will be resolved with `FLINK-10026`).
+Notice that large numbers of timers can increase the checkpointing time because timers are part of the checkpointed state. See the "Timer Coalescing" section for advice on how to reduce the number of timers.
 
 ### Timer Coalescing
 
@@ -333,3 +332,43 @@ ctx.timerService.registerEventTimeTimer(coalescedTime)
 {% endhighlight %}
 </div>
 </div>
+
+Timers can also be stopped and removed as follows:
+
+Stopping a processing-time timer:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+long timestampOfTimerToStop = ...
+ctx.timerService().deleteProcessingTimeTimer(timestampOfTimerToStop);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val timestampOfTimerToStop = ...
+ctx.timerService.deleteProcessingTimeTimer(timestampOfTimerToStop)
+{% endhighlight %}
+</div>
+</div>
+
+Stopping an event-time timer:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+long timestampOfTimerToStop = ...
+ctx.timerService().deleteEventTimeTimer(timestampOfTimerToStop);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val timestampOfTimerToStop = ...
+ctx.timerService.deleteEventTimeTimer(timestampOfTimerToStop)
+{% endhighlight %}
+</div>
+</div>
+
+<span class="label label-info">Note</span> Stopping a timer has no effect if no such timer with the given timestamp is registered.
diff --git a/docs/ops/state/large_state_tuning.md b/docs/ops/state/large_state_tuning.md
index 6df551f..62b3ee5 100644
--- a/docs/ops/state/large_state_tuning.md
+++ b/docs/ops/state/large_state_tuning.md
@@ -142,6 +142,17 @@ by default. To enable this feature, users can instantiate a `RocksDBStateBackend
         new RocksDBStateBackend(filebackend, true);
 {% endhighlight %}
 
+**RocksDB Timers**
+
+For RocksDB, a user can chose whether timers are stored on the heap (default) or inside RocksDB. Heap-based timers can have a better performance for smaller numbers of
+timers, while storing timers inside RocksDB offers higher scalability as the number of timers in RocksDB can exceed the available main memory (spilling to disk).
+
+When using RockDB as state backend, the type of timer storage can be selected through Flink's configuration via option key `state.backend.rocksdb.timer-service.factory`.
+Possible choices are `heap` (to store timers on the heap, default) and `rocksdb` (to store timers in RocksDB).
+
+<span class="label label-info">Note</span> *The combination RocksDB state backend / with incremental checkpoint / with heap-based timers currently does NOT support asynchronous snapshots for the timers state.
+Other state like keyed state is still snapshotted asynchronously. Please note that this is not a regression from previous versions and will be resolved with `FLINK-10026`.*
+
 **Passing Options to RocksDB**
 
 {% highlight java %}
@@ -177,11 +188,10 @@ Flink provides some predefined collections of option for RocksDB for different s
 We expect to accumulate more such profiles over time. Feel free to contribute such predefined option profiles when you
 found a set of options that work well and seem representative for certain workloads.
 
-**Important:** RocksDB is a native library, whose allocated memory not from the JVM, but directly from the process'
-native memory. Any memory you assign to RocksDB will have to be accounted for, typically by decreasing the JVM heap size
+<span class="label label-info">Note</span> RocksDB is a native library that allocates memory directly from the process,
+and not from the JVM. Any memory you assign to RocksDB will have to be accounted for, typically by decreasing the JVM heap size
 of the TaskManagers by the same amount. Not doing that may result in YARN/Mesos/etc terminating the JVM processes for
-allocating more memory than configures.
-
+allocating more memory than configured.
 
 ## Capacity Planning
 
@@ -231,7 +241,7 @@ Compression can be activated through the `ExecutionConfig`:
 		executionConfig.setUseSnapshotCompression(true);
 {% endhighlight %}
 
-**Notice:** The compression option has no impact on incremental snapshots, because they are using RocksDB's internal
+<span class="label label-info">Note</span> The compression option has no impact on incremental snapshots, because they are using RocksDB's internal
 format which is always using snappy compression out of the box.
 
 ## Task-Local Recovery
diff --git a/docs/ops/state/state_backends.md b/docs/ops/state/state_backends.md
index 3d4ce58..4e09b34 100644
--- a/docs/ops/state/state_backends.md
+++ b/docs/ops/state/state_backends.md
@@ -116,7 +116,8 @@ The RocksDBStateBackend is encouraged for:
 Note that the amount of state that you can keep is only limited by the amount of disk space available.
 This allows keeping very large state, compared to the FsStateBackend that keeps state in memory.
 This also means, however, that the maximum throughput that can be achieved will be lower with
-this state backend.
+this state backend. All reads/writes from/to this backend have to go through de-/serialization to retrieve/store the state objects, which is also more expensive than always working with the
+on-heap representation as the heap-based backends are doing.
 
 RocksDBStateBackend is currently the only backend that offers incremental checkpoints (see [here](large_state_tuning.html)).