You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/08/09 14:10:41 UTC

[flink] branch master updated: [FLINK-9637] [docs] Add public user documentation for state TTL feature

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 44eef6f  [FLINK-9637] [docs] Add public user documentation for state TTL feature
44eef6f is described below

commit 44eef6fdff73f5aa59573519ad83005a42153e6e
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Fri Jul 20 14:57:08 2018 +0200

    [FLINK-9637] [docs] Add public user documentation for state TTL feature
    
    This closes #6379.
---
 docs/dev/stream/state/state.md | 127 +++++++++++++++++++++++++++++++++++++++++
 1 file changed, 127 insertions(+)

diff --git a/docs/dev/stream/state/state.md b/docs/dev/stream/state/state.md
index 44a3653..fb78776 100644
--- a/docs/dev/stream/state/state.md
+++ b/docs/dev/stream/state/state.md
@@ -266,6 +266,133 @@ a `ValueState`. Once the count reaches 2 it will emit the average and clear the
 we start over from `0`. Note that this would keep a different state value for each different input
 key if we had tuples with different values in the first field.
 
+### State Time-To-Live (TTL)
+
+A *time-to-live* (TTL) can be assigned to the keyed state of any type. If a TTL is configured and a
+state value has expired, the stored value will be cleaned up on a best effort basis which is
+discussed in more detail below.
+
+All state collection types support per-entry TTLs. This means that list elements and map entries
+expire independently.
+
+In order to use state TTL one must first build a `StateTtlConfig` configuration object. The TTL 
+functionality can then be enabled in any state descriptor by passing the configuration:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Time;
+
+StateTtlConfig ttlConfig = StateTtlConfig
+    .newBuilder(Time.seconds(1))
+    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
+    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
+    .build();
+    
+ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
+stateDescriptor.enableTimeToLive(ttlConfig);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.api.common.state.StateTtlConfig
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.common.time.Time
+
+val ttlConfig = StateTtlConfig
+    .newBuilder(Time.seconds(1))
+    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
+    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
+    .build
+    
+val stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String])
+stateDescriptor.enableTimeToLive(ttlConfig)
+{% endhighlight %}
+</div>
+</div>
+
+The configuration has several options to consider:
+
+The first parameter of the `newBuilder` method is mandatory, it is the time-to-live value.
+
+The update type configures when the state TTL is refreshed (by default `OnCreateAndWrite`):
+
+ - `StateTtlConfig.UpdateType.OnCreateAndWrite` - only on creation and write access
+ - `StateTtlConfig.UpdateType.OnReadAndWrite` - also on read access
+ 
+The state visibility configures whether the expired value is returned on read access 
+if it is not cleaned up yet (by default `NeverReturnExpired`):
+
+ - `StateTtlConfig.StateVisibility.NeverReturnExpired` - expired value is never returned
+ - `StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp` - returned if still available
+ 
+In case of `NeverReturnExpired`, the expired state behaves as if it does not exist anymore, 
+even if it still has to be removed. The option can be useful for use cases 
+where data has to become unavailable for read access strictly after TTL, 
+e.g. application working with privacy sensitive data.
+ 
+Another option `ReturnExpiredIfNotCleanedUp` allows to return the expired state before its cleanup.
+
+**Notes:** 
+
+- The state backends store the timestamp of the last modification along with the user value, 
+which means that enabling this feature increases consumption of state storage. 
+Heap state backend stores an additional Java object with a reference to the user state object 
+and a primitive long value in memory. The RocksDB state backend adds 8 bytes per stored value, list entry or map entry.
+
+- Only TTLs in reference to *processing time* are currently supported.
+
+- Trying to restore state, which was previously configured without TTL, using TTL enabled descriptor or vice versa
+will lead to compatibility failure and `StateMigrationException`.
+
+- The TTL configuration is not part of check- or savepoints but rather a way of how Flink treats it in the currently running job.
+
+#### Cleanup of Expired State
+
+Currently, expired values are only removed when they are read out explicitly, 
+e.g. by calling `ValueState.value()`.
+
+<span class="label label-danger">Attention</span> This means that by default if expired state is not read, 
+it won't be removed, possibly leading to ever growing state. This might change in future releases. 
+
+Additionally, you can activate the cleanup at the moment of taking the full state snapshot which 
+will reduce its size. The local state is not cleaned up under the current implementation 
+but it will not include the removed expired state in case of restoration from the previous snapshot.
+It can be configured in `StateTtlConfig`:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.api.common.time.Time;
+
+StateTtlConfig ttlConfig = StateTtlConfig
+    .newBuilder(Time.seconds(1))
+    .cleanupFullSnapshot()
+    .build();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.api.common.state.StateTtlConfig
+import org.apache.flink.api.common.time.Time
+
+val ttlConfig = StateTtlConfig
+    .newBuilder(Time.seconds(1))
+    .cleanupFullSnapshot
+    .build
+{% endhighlight %}
+</div>
+</div>
+
+This option is not applicable for the incremental checkpointing in the RocksDB state backend.
+
+More strategies will be added in the future for cleaning up expired state automatically in the background.
+
 ### State in the Scala DataStream API
 
 In addition to the interface described above, the Scala API has shortcuts for stateful