You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/09/01 05:04:31 UTC

[flink-statefun] 03/03: [FLINK-19095] [core] Respect state expiration configuration in PersistedRemoteFunctionValues

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 9a76143a10f7937abd2e166bf30c20f87f8d2029
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Mon Aug 31 13:02:32 2020 +0800

    [FLINK-19095] [core] Respect state expiration configuration in PersistedRemoteFunctionValues
    
    This closes #135.
---
 .../flink/core/reqreply/PersistedRemoteFunctionValues.java    | 11 +----------
 1 file changed, 1 insertion(+), 10 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValues.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValues.java
index e79dae7..e918c41 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValues.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValues.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.statefun.flink.core.reqreply;
 
-import java.time.Duration;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -26,7 +25,6 @@ import java.util.Objects;
 import java.util.function.BiConsumer;
 import org.apache.flink.statefun.flink.core.httpfn.StateSpec;
 import org.apache.flink.statefun.sdk.annotations.Persisted;
-import org.apache.flink.statefun.sdk.state.Expiration;
 import org.apache.flink.statefun.sdk.state.PersistedStateRegistry;
 import org.apache.flink.statefun.sdk.state.PersistedValue;
 
@@ -55,14 +53,7 @@ public final class PersistedRemoteFunctionValues {
   }
 
   private PersistedValue<byte[]> createStateHandle(StateSpec stateSpec) {
-    final String stateName = stateSpec.name();
-    final Duration stateTtlDuration = stateSpec.ttlDuration();
-    final Expiration stateExpirationConfig =
-        (stateTtlDuration.equals(Duration.ZERO))
-            ? Expiration.none()
-            : Expiration.expireAfterReadingOrWriting(stateTtlDuration);
-
-    return stateRegistry.registerValue(stateName, byte[].class, stateExpirationConfig);
+    return stateRegistry.registerValue(stateSpec.name(), byte[].class, stateSpec.ttlExpiration());
   }
 
   private PersistedValue<byte[]> getStateHandleOrThrow(String stateName) {