You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/09/01 05:04:30 UTC
[flink-statefun] 02/03: [FLINK-19095] [remote] Add "expireMode" to
YAML function state spec
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 8050cb05e54946e7263cd802536ee5a7392f7d8f
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Mon Aug 31 12:59:58 2020 +0800
[FLINK-19095] [remote] Add "expireMode" to YAML function state spec
With this new key in YAML modules, users can now specify expire mode
independently for each state. Note that for remote functions, the
expiration mode "after read and write" is semantically equivalent to
"after each function invocation". Therefore, the modes supported are
"after-invoke" and "after-write" for remote functions to keep semantics
clear.
---
.../flink/core/jsonmodule/FunctionJsonEntity.java | 38 ++++++++++++++++------
.../src/test/resources/module-v2_0/module.yaml | 1 +
2 files changed, 29 insertions(+), 10 deletions(-)
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java
index 78bc18d..94e4fad 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java
@@ -50,6 +50,7 @@ import org.apache.flink.statefun.flink.core.httpfn.StateSpec;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule.Binder;
+import org.apache.flink.statefun.sdk.state.Expiration;
import org.apache.flink.util.TimeUtils;
final class FunctionJsonEntity implements JsonEntity {
@@ -74,6 +75,7 @@ final class FunctionJsonEntity implements JsonEntity {
private static final class StateSpecPointers {
private static final JsonPointer NAME = JsonPointer.compile("/name");
private static final JsonPointer EXPIRE_DURATION = JsonPointer.compile("/expireAfter");
+ private static final JsonPointer EXPIRE_MODE = JsonPointer.compile("/expireMode");
}
@Override
@@ -153,13 +155,8 @@ final class FunctionJsonEntity implements JsonEntity {
stateSpecNodes.forEach(
stateSpecNode -> {
final String name = Selectors.textAt(stateSpecNode, StateSpecPointers.NAME);
- final Optional<Duration> optionalStateExpireDuration =
- optionalStateExpireDuration(stateSpecNode);
- if (optionalStateExpireDuration.isPresent()) {
- stateSpecs.add(new StateSpec(name, optionalStateExpireDuration.get()));
- } else {
- stateSpecs.add(new StateSpec(name));
- }
+ final Expiration expiration = stateTtlExpiration(stateSpecNode);
+ stateSpecs.add(new StateSpec(name, expiration));
});
return stateSpecs;
}
@@ -173,9 +170,30 @@ final class FunctionJsonEntity implements JsonEntity {
.map(TimeUtils::parseDuration);
}
- private static Optional<Duration> optionalStateExpireDuration(JsonNode stateSpecNode) {
- return Selectors.optionalTextAt(stateSpecNode, StateSpecPointers.EXPIRE_DURATION)
- .map(TimeUtils::parseDuration);
+ private static Expiration stateTtlExpiration(JsonNode stateSpecNode) {
+ final Optional<Duration> duration =
+ Selectors.optionalTextAt(stateSpecNode, StateSpecPointers.EXPIRE_DURATION)
+ .map(TimeUtils::parseDuration);
+
+ if (!duration.isPresent()) {
+ return Expiration.none();
+ }
+
+ final Optional<String> mode =
+ Selectors.optionalTextAt(stateSpecNode, StateSpecPointers.EXPIRE_MODE);
+ if (!mode.isPresent()) {
+ return Expiration.expireAfterReadingOrWriting(duration.get());
+ }
+
+ switch (mode.get()) {
+ case "after-invoke":
+ return Expiration.expireAfterReadingOrWriting(duration.get());
+ case "after-write":
+ return Expiration.expireAfterWriting(duration.get());
+ default:
+ throw new IllegalArgumentException(
+ "Invalid state ttl expire mode; must be one of [after-invoke, after-write].");
+ }
}
private static FunctionType functionType(JsonNode functionNode) {
diff --git a/statefun-flink/statefun-flink-core/src/test/resources/module-v2_0/module.yaml b/statefun-flink/statefun-flink-core/src/test/resources/module-v2_0/module.yaml
index 46ce6c1..ee903e1 100644
--- a/statefun-flink/statefun-flink-core/src/test/resources/module-v2_0/module.yaml
+++ b/statefun-flink/statefun-flink-core/src/test/resources/module-v2_0/module.yaml
@@ -36,6 +36,7 @@ module:
states:
- name: seen_count
expireAfter: 60000millisecond
+ expireMode: after-invoke
maxNumBatchRequests: 10000
- function:
meta: