You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/09/01 05:04:30 UTC

[flink-statefun] 02/03: [FLINK-19095] [remote] Add "expireMode" to YAML function state spec

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 8050cb05e54946e7263cd802536ee5a7392f7d8f
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Mon Aug 31 12:59:58 2020 +0800

    [FLINK-19095] [remote] Add "expireMode" to YAML function state spec
    
    With this new key in YAML modules, users can now specify expire mode
    independently for each state. Note that for remote functions, the
    expiration mode "after read and write" is semantically equivalent to
    "after each function invocation". Therefore, the modes supported are
    "after-invoke" and "after-write" for remote functions to keep semantics
    clear.
---
 .../flink/core/jsonmodule/FunctionJsonEntity.java  | 38 ++++++++++++++++------
 .../src/test/resources/module-v2_0/module.yaml     |  1 +
 2 files changed, 29 insertions(+), 10 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java
index 78bc18d..94e4fad 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java
@@ -50,6 +50,7 @@ import org.apache.flink.statefun.flink.core.httpfn.StateSpec;
 import org.apache.flink.statefun.sdk.FunctionType;
 import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
 import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule.Binder;
+import org.apache.flink.statefun.sdk.state.Expiration;
 import org.apache.flink.util.TimeUtils;
 
 final class FunctionJsonEntity implements JsonEntity {
@@ -74,6 +75,7 @@ final class FunctionJsonEntity implements JsonEntity {
   private static final class StateSpecPointers {
     private static final JsonPointer NAME = JsonPointer.compile("/name");
     private static final JsonPointer EXPIRE_DURATION = JsonPointer.compile("/expireAfter");
+    private static final JsonPointer EXPIRE_MODE = JsonPointer.compile("/expireMode");
   }
 
   @Override
@@ -153,13 +155,8 @@ final class FunctionJsonEntity implements JsonEntity {
     stateSpecNodes.forEach(
         stateSpecNode -> {
           final String name = Selectors.textAt(stateSpecNode, StateSpecPointers.NAME);
-          final Optional<Duration> optionalStateExpireDuration =
-              optionalStateExpireDuration(stateSpecNode);
-          if (optionalStateExpireDuration.isPresent()) {
-            stateSpecs.add(new StateSpec(name, optionalStateExpireDuration.get()));
-          } else {
-            stateSpecs.add(new StateSpec(name));
-          }
+          final Expiration expiration = stateTtlExpiration(stateSpecNode);
+          stateSpecs.add(new StateSpec(name, expiration));
         });
     return stateSpecs;
   }
@@ -173,9 +170,30 @@ final class FunctionJsonEntity implements JsonEntity {
         .map(TimeUtils::parseDuration);
   }
 
-  private static Optional<Duration> optionalStateExpireDuration(JsonNode stateSpecNode) {
-    return Selectors.optionalTextAt(stateSpecNode, StateSpecPointers.EXPIRE_DURATION)
-        .map(TimeUtils::parseDuration);
+  private static Expiration stateTtlExpiration(JsonNode stateSpecNode) {
+    final Optional<Duration> duration =
+        Selectors.optionalTextAt(stateSpecNode, StateSpecPointers.EXPIRE_DURATION)
+            .map(TimeUtils::parseDuration);
+
+    if (!duration.isPresent()) {
+      return Expiration.none();
+    }
+
+    final Optional<String> mode =
+        Selectors.optionalTextAt(stateSpecNode, StateSpecPointers.EXPIRE_MODE);
+    if (!mode.isPresent()) {
+      return Expiration.expireAfterReadingOrWriting(duration.get());
+    }
+
+    switch (mode.get()) {
+      case "after-invoke":
+        return Expiration.expireAfterReadingOrWriting(duration.get());
+      case "after-write":
+        return Expiration.expireAfterWriting(duration.get());
+      default:
+        throw new IllegalArgumentException(
+            "Invalid state ttl expire mode; must be one of [after-invoke, after-write].");
+    }
   }
 
   private static FunctionType functionType(JsonNode functionNode) {
diff --git a/statefun-flink/statefun-flink-core/src/test/resources/module-v2_0/module.yaml b/statefun-flink/statefun-flink-core/src/test/resources/module-v2_0/module.yaml
index 46ce6c1..ee903e1 100644
--- a/statefun-flink/statefun-flink-core/src/test/resources/module-v2_0/module.yaml
+++ b/statefun-flink/statefun-flink-core/src/test/resources/module-v2_0/module.yaml
@@ -36,6 +36,7 @@ module:
             states:
               - name: seen_count
                 expireAfter: 60000millisecond
+                expireMode: after-invoke
             maxNumBatchRequests: 10000
       - function:
           meta: