You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/09/01 05:04:31 UTC
[flink-statefun] 03/03: [FLINK-19095] [core] Respect state
expiration configuration in PersistedRemoteFunctionValues
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 9a76143a10f7937abd2e166bf30c20f87f8d2029
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Mon Aug 31 13:02:32 2020 +0800
[FLINK-19095] [core] Respect state expiration configuration in PersistedRemoteFunctionValues
This closes #135.
---
.../flink/core/reqreply/PersistedRemoteFunctionValues.java | 11 +----------
1 file changed, 1 insertion(+), 10 deletions(-)
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValues.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValues.java
index e79dae7..e918c41 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValues.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValues.java
@@ -18,7 +18,6 @@
package org.apache.flink.statefun.flink.core.reqreply;
-import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -26,7 +25,6 @@ import java.util.Objects;
import java.util.function.BiConsumer;
import org.apache.flink.statefun.flink.core.httpfn.StateSpec;
import org.apache.flink.statefun.sdk.annotations.Persisted;
-import org.apache.flink.statefun.sdk.state.Expiration;
import org.apache.flink.statefun.sdk.state.PersistedStateRegistry;
import org.apache.flink.statefun.sdk.state.PersistedValue;
@@ -55,14 +53,7 @@ public final class PersistedRemoteFunctionValues {
}
private PersistedValue<byte[]> createStateHandle(StateSpec stateSpec) {
- final String stateName = stateSpec.name();
- final Duration stateTtlDuration = stateSpec.ttlDuration();
- final Expiration stateExpirationConfig =
- (stateTtlDuration.equals(Duration.ZERO))
- ? Expiration.none()
- : Expiration.expireAfterReadingOrWriting(stateTtlDuration);
-
- return stateRegistry.registerValue(stateName, byte[].class, stateExpirationConfig);
+ return stateRegistry.registerValue(stateSpec.name(), byte[].class, stateSpec.ttlExpiration());
}
private PersistedValue<byte[]> getStateHandleOrThrow(String stateName) {