You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/09/15 08:29:32 UTC

[flink-statefun] 06/06: [FLINK-19197] [docs] Add documentation for PersistedStateRegistry

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 533e77f55ef49ae5975b9abe29f362e77c424c6c
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Fri Sep 11 11:40:01 2020 +0800

    [FLINK-19197] [docs] Add documentation for PersistedStateRegistry
    
    This closes #147.
---
 docs/sdk/java.md | 37 +++++++++++++++++++++++++++++++++++++
 1 file changed, 37 insertions(+)

diff --git a/docs/sdk/java.md b/docs/sdk/java.md
index 5436fad..94b357b 100644
--- a/docs/sdk/java.md
+++ b/docs/sdk/java.md
@@ -364,6 +364,43 @@ PersistedTable<String, Integer> table = PersistedTable.of("my-table", String.cla
 PersistedAppendingBuffer<Integer> buffer = PersistedAppendingBuffer.of("my-buffer", Integer.class);
 {% endhighlight %}
 
+### Dynamic State Registration
+
+Using the above state types, a function's persisted state must be defined eagerly. You cannot use those state types to
+register a new persisted state during invocations (i.e., in the ``invoke`` method) or after the function instance is created.
+
+If dynamic state registration is required, it can be achieved using a ``PersistedStateRegistry``:
+
+{% highlight java %}
+import org.apache.flink.statefun.sdk.Context;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.StatefulFunction;
+import org.apache.flink.statefun.sdk.annotations.Persisted;
+import org.apache.flink.statefun.sdk.state.PersistedStateRegistry;
+import org.apache.flink.statefun.sdk.state.PersistedValue;
+
+public class MyFunction implements StatefulFunction {
+
+	@Persisted
+	private final PersistedStateRegistry registry = new PersistedStateRegistry();
+
+	private PersistedValue<Integer> value;
+
+	public void invoke(Context context, Object input) {
+		if (value == null) {
+			value = PersistedValue.of("my-value", Integer.class);
+			registry.registerValue(valueOne);
+		}
+		int count = value.getOrDefault(0);
+		// ...
+	}
+}
+{% endhighlight %}
+
+Note how the ``PersistedValue`` field doesn't need to be annotated with the ``@Persisted`` annotations, and is initially
+empty. The state object is dynamically created during invocation and registered with the ``PersistedStateRegistry`` so
+that the system picks it up to be managed for fault-tolerance.
+
 ### State Expiration
 
 Persisted states may be configured to expire and be deleted after a specified duration.