You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/09/01 05:04:28 UTC

[flink-statefun] branch master updated (c483184 -> 9a76143)

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git.


    from c483184  [FLINK-18979][docs] Update READMEs
     new 75749f9  [FLINK-19095] [core] Use SDK Expiration in StateSpec
     new 8050cb0  [FLINK-19095] [remote] Add "expireMode" to YAML function state spec
     new 9a76143  [FLINK-19095] [core] Respect state expiration configuration in PersistedRemoteFunctionValues

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../statefun/flink/core/httpfn/StateSpec.java      | 14 ++++----
 .../flink/core/jsonmodule/FunctionJsonEntity.java  | 38 ++++++++++++++++------
 .../reqreply/PersistedRemoteFunctionValues.java    | 11 +------
 .../src/test/resources/module-v2_0/module.yaml     |  1 +
 .../datastream/RequestReplyFunctionBuilder.java    |  9 ++---
 .../flink/statefun/sdk/state/Expiration.java       |  6 +++-
 6 files changed, 47 insertions(+), 32 deletions(-)


[flink-statefun] 02/03: [FLINK-19095] [remote] Add "expireMode" to YAML function state spec

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 8050cb05e54946e7263cd802536ee5a7392f7d8f
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Mon Aug 31 12:59:58 2020 +0800

    [FLINK-19095] [remote] Add "expireMode" to YAML function state spec
    
    With this new key in YAML modules, users can now specify expire mode
    independently for each state. Note that for remote functions, the
    expiration mode "after read and write" is semantically equivalent to
    "after each function invocation". Therefore, the modes supported are
    "after-invoke" and "after-write" for remote functions to keep semantics
    clear.
---
 .../flink/core/jsonmodule/FunctionJsonEntity.java  | 38 ++++++++++++++++------
 .../src/test/resources/module-v2_0/module.yaml     |  1 +
 2 files changed, 29 insertions(+), 10 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java
index 78bc18d..94e4fad 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java
@@ -50,6 +50,7 @@ import org.apache.flink.statefun.flink.core.httpfn.StateSpec;
 import org.apache.flink.statefun.sdk.FunctionType;
 import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
 import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule.Binder;
+import org.apache.flink.statefun.sdk.state.Expiration;
 import org.apache.flink.util.TimeUtils;
 
 final class FunctionJsonEntity implements JsonEntity {
@@ -74,6 +75,7 @@ final class FunctionJsonEntity implements JsonEntity {
   private static final class StateSpecPointers {
     private static final JsonPointer NAME = JsonPointer.compile("/name");
     private static final JsonPointer EXPIRE_DURATION = JsonPointer.compile("/expireAfter");
+    private static final JsonPointer EXPIRE_MODE = JsonPointer.compile("/expireMode");
   }
 
   @Override
@@ -153,13 +155,8 @@ final class FunctionJsonEntity implements JsonEntity {
     stateSpecNodes.forEach(
         stateSpecNode -> {
           final String name = Selectors.textAt(stateSpecNode, StateSpecPointers.NAME);
-          final Optional<Duration> optionalStateExpireDuration =
-              optionalStateExpireDuration(stateSpecNode);
-          if (optionalStateExpireDuration.isPresent()) {
-            stateSpecs.add(new StateSpec(name, optionalStateExpireDuration.get()));
-          } else {
-            stateSpecs.add(new StateSpec(name));
-          }
+          final Expiration expiration = stateTtlExpiration(stateSpecNode);
+          stateSpecs.add(new StateSpec(name, expiration));
         });
     return stateSpecs;
   }
@@ -173,9 +170,30 @@ final class FunctionJsonEntity implements JsonEntity {
         .map(TimeUtils::parseDuration);
   }
 
-  private static Optional<Duration> optionalStateExpireDuration(JsonNode stateSpecNode) {
-    return Selectors.optionalTextAt(stateSpecNode, StateSpecPointers.EXPIRE_DURATION)
-        .map(TimeUtils::parseDuration);
+  private static Expiration stateTtlExpiration(JsonNode stateSpecNode) {
+    final Optional<Duration> duration =
+        Selectors.optionalTextAt(stateSpecNode, StateSpecPointers.EXPIRE_DURATION)
+            .map(TimeUtils::parseDuration);
+
+    if (!duration.isPresent()) {
+      return Expiration.none();
+    }
+
+    final Optional<String> mode =
+        Selectors.optionalTextAt(stateSpecNode, StateSpecPointers.EXPIRE_MODE);
+    if (!mode.isPresent()) {
+      return Expiration.expireAfterReadingOrWriting(duration.get());
+    }
+
+    switch (mode.get()) {
+      case "after-invoke":
+        return Expiration.expireAfterReadingOrWriting(duration.get());
+      case "after-write":
+        return Expiration.expireAfterWriting(duration.get());
+      default:
+        throw new IllegalArgumentException(
+            "Invalid state ttl expire mode; must be one of [after-invoke, after-write].");
+    }
   }
 
   private static FunctionType functionType(JsonNode functionNode) {
diff --git a/statefun-flink/statefun-flink-core/src/test/resources/module-v2_0/module.yaml b/statefun-flink/statefun-flink-core/src/test/resources/module-v2_0/module.yaml
index 46ce6c1..ee903e1 100644
--- a/statefun-flink/statefun-flink-core/src/test/resources/module-v2_0/module.yaml
+++ b/statefun-flink/statefun-flink-core/src/test/resources/module-v2_0/module.yaml
@@ -36,6 +36,7 @@ module:
             states:
               - name: seen_count
                 expireAfter: 60000millisecond
+                expireMode: after-invoke
             maxNumBatchRequests: 10000
       - function:
           meta:


[flink-statefun] 01/03: [FLINK-19095] [core] Use SDK Expiration in StateSpec

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 75749f9a00067c4ebde3b084c75d576cae845054
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Mon Aug 31 12:58:35 2020 +0800

    [FLINK-19095] [core] Use SDK Expiration in StateSpec
    
    This simplifies how expiration configuration is being represented in the
    StateSpec class. Since StateSpec is serializable, this also changes
    Expiration to be serializable.
---
 .../apache/flink/statefun/flink/core/httpfn/StateSpec.java | 14 +++++++-------
 .../flink/datastream/RequestReplyFunctionBuilder.java      |  9 +++++----
 .../org/apache/flink/statefun/sdk/state/Expiration.java    |  6 +++++-
 3 files changed, 17 insertions(+), 12 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/StateSpec.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/StateSpec.java
index 5152e7f..8bb3c84 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/StateSpec.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/StateSpec.java
@@ -19,30 +19,30 @@
 package org.apache.flink.statefun.flink.core.httpfn;
 
 import java.io.Serializable;
-import java.time.Duration;
 import java.util.Objects;
+import org.apache.flink.statefun.sdk.state.Expiration;
 
 public final class StateSpec implements Serializable {
 
   private static final long serialVersionUID = 1;
 
   private final String name;
-  private final Duration ttlDuration;
+  private final Expiration ttlExpiration;
 
   public StateSpec(String name) {
-    this(name, Duration.ZERO);
+    this(name, Expiration.none());
   }
 
-  public StateSpec(String name, Duration ttlDuration) {
+  public StateSpec(String name, Expiration ttlExpiration) {
     this.name = Objects.requireNonNull(name);
-    this.ttlDuration = Objects.requireNonNull(ttlDuration);
+    this.ttlExpiration = Objects.requireNonNull(ttlExpiration);
   }
 
   public String name() {
     return name;
   }
 
-  public Duration ttlDuration() {
-    return ttlDuration;
+  public Expiration ttlExpiration() {
+    return ttlExpiration;
   }
 }
diff --git a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java
index f23c298..07b0ce1 100644
--- a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java
+++ b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java
@@ -24,6 +24,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionSpec;
 import org.apache.flink.statefun.flink.core.httpfn.StateSpec;
 import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.state.Expiration;
 
 /** A Builder for RequestReply remote function type. */
 public class RequestReplyFunctionBuilder {
@@ -53,7 +54,7 @@ public class RequestReplyFunctionBuilder {
    * @return this builder.
    */
   public RequestReplyFunctionBuilder withPersistedState(String name) {
-    builder.withState(new StateSpec(name, Duration.ZERO));
+    builder.withState(new StateSpec(name, Expiration.none()));
     return this;
   }
 
@@ -61,11 +62,11 @@ public class RequestReplyFunctionBuilder {
    * Declares a remote function state, with expiration.
    *
    * @param name the name of the state to be used remotely.
-   * @param expireAfter the duration after which this state might be deleted.
+   * @param ttlExpiration the expiration mode for which this state might be deleted.
    * @return this builder.
    */
-  public RequestReplyFunctionBuilder withExpiringState(String name, Duration expireAfter) {
-    builder.withState(new StateSpec(name, expireAfter));
+  public RequestReplyFunctionBuilder withExpiringState(String name, Expiration ttlExpiration) {
+    builder.withState(new StateSpec(name, ttlExpiration));
     return this;
   }
 
diff --git a/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/Expiration.java b/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/Expiration.java
index 5b22471..d3396f3 100644
--- a/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/Expiration.java
+++ b/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/Expiration.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.statefun.sdk.state;
 
+import java.io.Serializable;
 import java.time.Duration;
 import java.util.Objects;
 import org.apache.flink.statefun.sdk.annotations.ForRuntime;
@@ -32,7 +33,10 @@ import org.apache.flink.statefun.sdk.annotations.ForRuntime;
  * <p>State can be expired after a duration had passed since either from the last write to the
  * state, or the last read.
  */
-public final class Expiration {
+public final class Expiration implements Serializable {
+
+  private static final long serialVersionUID = 1L;
+
   public enum Mode {
     NONE,
     AFTER_WRITE,


[flink-statefun] 03/03: [FLINK-19095] [core] Respect state expiration configuration in PersistedRemoteFunctionValues

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 9a76143a10f7937abd2e166bf30c20f87f8d2029
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Mon Aug 31 13:02:32 2020 +0800

    [FLINK-19095] [core] Respect state expiration configuration in PersistedRemoteFunctionValues
    
    This closes #135.
---
 .../flink/core/reqreply/PersistedRemoteFunctionValues.java    | 11 +----------
 1 file changed, 1 insertion(+), 10 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValues.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValues.java
index e79dae7..e918c41 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValues.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValues.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.statefun.flink.core.reqreply;
 
-import java.time.Duration;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -26,7 +25,6 @@ import java.util.Objects;
 import java.util.function.BiConsumer;
 import org.apache.flink.statefun.flink.core.httpfn.StateSpec;
 import org.apache.flink.statefun.sdk.annotations.Persisted;
-import org.apache.flink.statefun.sdk.state.Expiration;
 import org.apache.flink.statefun.sdk.state.PersistedStateRegistry;
 import org.apache.flink.statefun.sdk.state.PersistedValue;
 
@@ -55,14 +53,7 @@ public final class PersistedRemoteFunctionValues {
   }
 
   private PersistedValue<byte[]> createStateHandle(StateSpec stateSpec) {
-    final String stateName = stateSpec.name();
-    final Duration stateTtlDuration = stateSpec.ttlDuration();
-    final Expiration stateExpirationConfig =
-        (stateTtlDuration.equals(Duration.ZERO))
-            ? Expiration.none()
-            : Expiration.expireAfterReadingOrWriting(stateTtlDuration);
-
-    return stateRegistry.registerValue(stateName, byte[].class, stateExpirationConfig);
+    return stateRegistry.registerValue(stateSpec.name(), byte[].class, stateSpec.ttlExpiration());
   }
 
   private PersistedValue<byte[]> getStateHandleOrThrow(String stateName) {