You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/05/28 11:41:49 UTC

[flink-statefun] 06/10: [FLINK-17875] [core] Wire in StateSpec into RequestReplyFunction

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit b852dcc1e02844a7fe123a61f84c37bc4619ed89
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed May 27 12:18:42 2020 +0800

    [FLINK-17875] [core] Wire in StateSpec into RequestReplyFunction
---
 .../flink/core/reqreply/RequestReplyFunction.java  | 39 +++++++++++++++++-----
 .../core/reqreply/RequestReplyFunctionTest.java    |  3 +-
 2 files changed, 32 insertions(+), 10 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java
index 6687ed0..8cde450 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import org.apache.flink.statefun.flink.core.backpressure.AsyncWaiter;
+import org.apache.flink.statefun.flink.core.httpfn.StateSpec;
 import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction;
 import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.EgressMessage;
 import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.InvocationResponse;
@@ -40,6 +41,7 @@ import org.apache.flink.statefun.sdk.Context;
 import org.apache.flink.statefun.sdk.StatefulFunction;
 import org.apache.flink.statefun.sdk.annotations.Persisted;
 import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+import org.apache.flink.statefun.sdk.state.Expiration;
 import org.apache.flink.statefun.sdk.state.PersistedAppendingBuffer;
 import org.apache.flink.statefun.sdk.state.PersistedTable;
 import org.apache.flink.statefun.sdk.state.PersistedValue;
@@ -47,7 +49,7 @@ import org.apache.flink.statefun.sdk.state.PersistedValue;
 public final class RequestReplyFunction implements StatefulFunction {
 
   private final RequestReplyClient client;
-  private final List<String> registeredStateNames;
+  private final List<StateSpec> registeredStates;
   private final int maxNumBatchRequests;
 
   /**
@@ -69,15 +71,17 @@ public final class RequestReplyFunction implements StatefulFunction {
   private final PersistedAppendingBuffer<ToFunction.Invocation> batch =
       PersistedAppendingBuffer.of("batch", ToFunction.Invocation.class);
 
-  @Persisted
-  private final PersistedTable<String, byte[]> managedStates =
-      PersistedTable.of("states", String.class, byte[].class);
+  @Persisted private final PersistedTable<String, byte[]> managedStates;
 
   public RequestReplyFunction(
-      List<String> registeredStateNames, int maxNumBatchRequests, RequestReplyClient client) {
+      List<StateSpec> registeredStates, int maxNumBatchRequests, RequestReplyClient client) {
     this.client = Objects.requireNonNull(client);
-    this.registeredStateNames = Objects.requireNonNull(registeredStateNames);
+    this.registeredStates = Objects.requireNonNull(registeredStates);
     this.maxNumBatchRequests = maxNumBatchRequests;
+
+    this.managedStates =
+        PersistedTable.of(
+            "states", String.class, byte[].class, resolveStateTtlExpiration(registeredStates));
   }
 
   @Override
@@ -92,6 +96,23 @@ public final class RequestReplyFunction implements StatefulFunction {
     onAsyncResult(context, result);
   }
 
+  private static Expiration resolveStateTtlExpiration(List<StateSpec> stateSpecs) {
+    // TODO applying the below limitations due to state multiplexing (see FLINK-17954)
+    // TODO 1) use the max TTL duration across all state, 2) only allow AFTER_READ_AND_WRITE
+
+    Duration maxDuration = Duration.ZERO;
+    for (StateSpec stateSpec : stateSpecs) {
+      if (stateSpec.ttlDuration().compareTo(maxDuration) > 0) {
+        maxDuration = stateSpec.ttlDuration();
+      }
+    }
+
+    if (maxDuration.equals(Duration.ZERO)) {
+      return Expiration.none();
+    }
+    return Expiration.expireAfterReadingOrWriting(maxDuration);
+  }
+
   private void onRequest(Context context, Any message) {
     Invocation.Builder invocationBuilder = singeInvocationBuilder(context, message);
     int inflightOrBatched = requestState.getOrDefault(-1);
@@ -207,11 +228,11 @@ public final class RequestReplyFunction implements StatefulFunction {
   // --------------------------------------------------------------------------------
 
   private void addStates(ToFunction.InvocationBatchRequest.Builder batchBuilder) {
-    for (String stateName : registeredStateNames) {
+    for (StateSpec stateSpec : registeredStates) {
       ToFunction.PersistedValue.Builder valueBuilder =
-          ToFunction.PersistedValue.newBuilder().setStateName(stateName);
+          ToFunction.PersistedValue.newBuilder().setStateName(stateSpec.name());
 
-      byte[] stateValue = managedStates.get(stateName);
+      byte[] stateValue = managedStates.get(stateSpec.name());
       if (stateValue != null) {
         valueBuilder.setStateValue(ByteString.copyFrom(stateValue));
       }
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
index 04d389c..e934399 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
@@ -37,6 +37,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.function.Supplier;
 import org.apache.flink.statefun.flink.core.TestUtils;
 import org.apache.flink.statefun.flink.core.backpressure.AsyncWaiter;
+import org.apache.flink.statefun.flink.core.httpfn.StateSpec;
 import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction;
 import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.DelayedInvocation;
 import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.EgressMessage;
@@ -58,7 +59,7 @@ public class RequestReplyFunctionTest {
 
   private final FakeClient client = new FakeClient();
   private final FakeContext context = new FakeContext();
-  private final List<String> states = Collections.singletonList("session");
+  private final List<StateSpec> states = Collections.singletonList(new StateSpec("session"));
 
   private final RequestReplyFunction functionUnderTest =
       new RequestReplyFunction(states, 10, client);