You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/09/01 05:04:29 UTC
[flink-statefun] 01/03: [FLINK-19095] [core] Use SDK Expiration in
StateSpec
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 75749f9a00067c4ebde3b084c75d576cae845054
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Mon Aug 31 12:58:35 2020 +0800
[FLINK-19095] [core] Use SDK Expiration in StateSpec
This simplifies how expiration configuration is being represented in the
StateSpec class. Since StateSpec is serializable, this also changes
Expiration to be serializable.
---
.../apache/flink/statefun/flink/core/httpfn/StateSpec.java | 14 +++++++-------
.../flink/datastream/RequestReplyFunctionBuilder.java | 9 +++++----
.../org/apache/flink/statefun/sdk/state/Expiration.java | 6 +++++-
3 files changed, 17 insertions(+), 12 deletions(-)
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/StateSpec.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/StateSpec.java
index 5152e7f..8bb3c84 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/StateSpec.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/StateSpec.java
@@ -19,30 +19,30 @@
package org.apache.flink.statefun.flink.core.httpfn;
import java.io.Serializable;
-import java.time.Duration;
import java.util.Objects;
+import org.apache.flink.statefun.sdk.state.Expiration;
public final class StateSpec implements Serializable {
private static final long serialVersionUID = 1;
private final String name;
- private final Duration ttlDuration;
+ private final Expiration ttlExpiration;
public StateSpec(String name) {
- this(name, Duration.ZERO);
+ this(name, Expiration.none());
}
- public StateSpec(String name, Duration ttlDuration) {
+ public StateSpec(String name, Expiration ttlExpiration) {
this.name = Objects.requireNonNull(name);
- this.ttlDuration = Objects.requireNonNull(ttlDuration);
+ this.ttlExpiration = Objects.requireNonNull(ttlExpiration);
}
public String name() {
return name;
}
- public Duration ttlDuration() {
- return ttlDuration;
+ public Expiration ttlExpiration() {
+ return ttlExpiration;
}
}
diff --git a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java
index f23c298..07b0ce1 100644
--- a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java
+++ b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java
@@ -24,6 +24,7 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionSpec;
import org.apache.flink.statefun.flink.core.httpfn.StateSpec;
import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.state.Expiration;
/** A Builder for RequestReply remote function type. */
public class RequestReplyFunctionBuilder {
@@ -53,7 +54,7 @@ public class RequestReplyFunctionBuilder {
* @return this builder.
*/
public RequestReplyFunctionBuilder withPersistedState(String name) {
- builder.withState(new StateSpec(name, Duration.ZERO));
+ builder.withState(new StateSpec(name, Expiration.none()));
return this;
}
@@ -61,11 +62,11 @@ public class RequestReplyFunctionBuilder {
* Declares a remote function state, with expiration.
*
* @param name the name of the state to be used remotely.
- * @param expireAfter the duration after which this state might be deleted.
+ * @param ttlExpiration the expiration mode for which this state might be deleted.
* @return this builder.
*/
- public RequestReplyFunctionBuilder withExpiringState(String name, Duration expireAfter) {
- builder.withState(new StateSpec(name, expireAfter));
+ public RequestReplyFunctionBuilder withExpiringState(String name, Expiration ttlExpiration) {
+ builder.withState(new StateSpec(name, ttlExpiration));
return this;
}
diff --git a/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/Expiration.java b/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/Expiration.java
index 5b22471..d3396f3 100644
--- a/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/Expiration.java
+++ b/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/Expiration.java
@@ -18,6 +18,7 @@
package org.apache.flink.statefun.sdk.state;
+import java.io.Serializable;
import java.time.Duration;
import java.util.Objects;
import org.apache.flink.statefun.sdk.annotations.ForRuntime;
@@ -32,7 +33,10 @@ import org.apache.flink.statefun.sdk.annotations.ForRuntime;
* <p>State can be expired after a duration had passed since either from the last write to the
* state, or the last read.
*/
-public final class Expiration {
+public final class Expiration implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
public enum Mode {
NONE,
AFTER_WRITE,