You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/09/15 08:29:32 UTC
[flink-statefun] 06/06: [FLINK-19197] [docs] Add documentation for
PersistedStateRegistry
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 533e77f55ef49ae5975b9abe29f362e77c424c6c
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Fri Sep 11 11:40:01 2020 +0800
[FLINK-19197] [docs] Add documentation for PersistedStateRegistry
This closes #147.
---
docs/sdk/java.md | 37 +++++++++++++++++++++++++++++++++++++
1 file changed, 37 insertions(+)
diff --git a/docs/sdk/java.md b/docs/sdk/java.md
index 5436fad..94b357b 100644
--- a/docs/sdk/java.md
+++ b/docs/sdk/java.md
@@ -364,6 +364,43 @@ PersistedTable<String, Integer> table = PersistedTable.of("my-table", String.cla
PersistedAppendingBuffer<Integer> buffer = PersistedAppendingBuffer.of("my-buffer", Integer.class);
{% endhighlight %}
+### Dynamic State Registration
+
+Using the above state types, a function's persisted state must be defined eagerly. You cannot use those state types to
+register a new persisted state during invocations (i.e., in the ``invoke`` method) or after the function instance is created.
+
+If dynamic state registration is required, it can be achieved using a ``PersistedStateRegistry``:
+
+{% highlight java %}
+import org.apache.flink.statefun.sdk.Context;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.StatefulFunction;
+import org.apache.flink.statefun.sdk.annotations.Persisted;
+import org.apache.flink.statefun.sdk.state.PersistedStateRegistry;
+import org.apache.flink.statefun.sdk.state.PersistedValue;
+
+public class MyFunction implements StatefulFunction {
+
+ @Persisted
+ private final PersistedStateRegistry registry = new PersistedStateRegistry();
+
+ private PersistedValue<Integer> value;
+
+ public void invoke(Context context, Object input) {
+ if (value == null) {
+ value = PersistedValue.of("my-value", Integer.class);
+ registry.registerValue(valueOne);
+ }
+ int count = value.getOrDefault(0);
+ // ...
+ }
+}
+{% endhighlight %}
+
+Note how the ``PersistedValue`` field doesn't need to be annotated with the ``@Persisted`` annotations, and is initially
+empty. The state object is dynamically created during invocation and registered with the ``PersistedStateRegistry`` so
+that the system picks it up to be managed for fault-tolerance.
+
### State Expiration
Persisted states may be configured to expire and be deleted after a specified duration.