You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/09/01 05:04:29 UTC

[flink-statefun] 01/03: [FLINK-19095] [core] Use SDK Expiration in StateSpec

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 75749f9a00067c4ebde3b084c75d576cae845054
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Mon Aug 31 12:58:35 2020 +0800

    [FLINK-19095] [core] Use SDK Expiration in StateSpec
    
    This simplifies how expiration configuration is being represented in the
    StateSpec class. Since StateSpec is serializable, this also changes
    Expiration to be serializable.
---
 .../apache/flink/statefun/flink/core/httpfn/StateSpec.java | 14 +++++++-------
 .../flink/datastream/RequestReplyFunctionBuilder.java      |  9 +++++----
 .../org/apache/flink/statefun/sdk/state/Expiration.java    |  6 +++++-
 3 files changed, 17 insertions(+), 12 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/StateSpec.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/StateSpec.java
index 5152e7f..8bb3c84 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/StateSpec.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/StateSpec.java
@@ -19,30 +19,30 @@
 package org.apache.flink.statefun.flink.core.httpfn;
 
 import java.io.Serializable;
-import java.time.Duration;
 import java.util.Objects;
+import org.apache.flink.statefun.sdk.state.Expiration;
 
 public final class StateSpec implements Serializable {
 
   private static final long serialVersionUID = 1;
 
   private final String name;
-  private final Duration ttlDuration;
+  private final Expiration ttlExpiration;
 
   public StateSpec(String name) {
-    this(name, Duration.ZERO);
+    this(name, Expiration.none());
   }
 
-  public StateSpec(String name, Duration ttlDuration) {
+  public StateSpec(String name, Expiration ttlExpiration) {
     this.name = Objects.requireNonNull(name);
-    this.ttlDuration = Objects.requireNonNull(ttlDuration);
+    this.ttlExpiration = Objects.requireNonNull(ttlExpiration);
   }
 
   public String name() {
     return name;
   }
 
-  public Duration ttlDuration() {
-    return ttlDuration;
+  public Expiration ttlExpiration() {
+    return ttlExpiration;
   }
 }
diff --git a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java
index f23c298..07b0ce1 100644
--- a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java
+++ b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java
@@ -24,6 +24,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionSpec;
 import org.apache.flink.statefun.flink.core.httpfn.StateSpec;
 import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.state.Expiration;
 
 /** A Builder for RequestReply remote function type. */
 public class RequestReplyFunctionBuilder {
@@ -53,7 +54,7 @@ public class RequestReplyFunctionBuilder {
    * @return this builder.
    */
   public RequestReplyFunctionBuilder withPersistedState(String name) {
-    builder.withState(new StateSpec(name, Duration.ZERO));
+    builder.withState(new StateSpec(name, Expiration.none()));
     return this;
   }
 
@@ -61,11 +62,11 @@ public class RequestReplyFunctionBuilder {
    * Declares a remote function state, with expiration.
    *
    * @param name the name of the state to be used remotely.
-   * @param expireAfter the duration after which this state might be deleted.
+   * @param ttlExpiration the expiration mode for which this state might be deleted.
    * @return this builder.
    */
-  public RequestReplyFunctionBuilder withExpiringState(String name, Duration expireAfter) {
-    builder.withState(new StateSpec(name, expireAfter));
+  public RequestReplyFunctionBuilder withExpiringState(String name, Expiration ttlExpiration) {
+    builder.withState(new StateSpec(name, ttlExpiration));
     return this;
   }
 
diff --git a/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/Expiration.java b/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/Expiration.java
index 5b22471..d3396f3 100644
--- a/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/Expiration.java
+++ b/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/Expiration.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.statefun.sdk.state;
 
+import java.io.Serializable;
 import java.time.Duration;
 import java.util.Objects;
 import org.apache.flink.statefun.sdk.annotations.ForRuntime;
@@ -32,7 +33,10 @@ import org.apache.flink.statefun.sdk.annotations.ForRuntime;
  * <p>State can be expired after a duration had passed since either from the last write to the
  * state, or the last read.
  */
-public final class Expiration {
+public final class Expiration implements Serializable {
+
+  private static final long serialVersionUID = 1L;
+
   public enum Mode {
     NONE,
     AFTER_WRITE,