You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/05/28 11:41:49 UTC
[flink-statefun] 06/10: [FLINK-17875] [core] Wire in StateSpec into
RequestReplyFunction
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit b852dcc1e02844a7fe123a61f84c37bc4619ed89
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed May 27 12:18:42 2020 +0800
[FLINK-17875] [core] Wire in StateSpec into RequestReplyFunction
---
.../flink/core/reqreply/RequestReplyFunction.java | 39 +++++++++++++++++-----
.../core/reqreply/RequestReplyFunctionTest.java | 3 +-
2 files changed, 32 insertions(+), 10 deletions(-)
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java
index 6687ed0..8cde450 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java
@@ -28,6 +28,7 @@ import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.statefun.flink.core.backpressure.AsyncWaiter;
+import org.apache.flink.statefun.flink.core.httpfn.StateSpec;
import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction;
import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.EgressMessage;
import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.InvocationResponse;
@@ -40,6 +41,7 @@ import org.apache.flink.statefun.sdk.Context;
import org.apache.flink.statefun.sdk.StatefulFunction;
import org.apache.flink.statefun.sdk.annotations.Persisted;
import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+import org.apache.flink.statefun.sdk.state.Expiration;
import org.apache.flink.statefun.sdk.state.PersistedAppendingBuffer;
import org.apache.flink.statefun.sdk.state.PersistedTable;
import org.apache.flink.statefun.sdk.state.PersistedValue;
@@ -47,7 +49,7 @@ import org.apache.flink.statefun.sdk.state.PersistedValue;
public final class RequestReplyFunction implements StatefulFunction {
private final RequestReplyClient client;
- private final List<String> registeredStateNames;
+ private final List<StateSpec> registeredStates;
private final int maxNumBatchRequests;
/**
@@ -69,15 +71,17 @@ public final class RequestReplyFunction implements StatefulFunction {
private final PersistedAppendingBuffer<ToFunction.Invocation> batch =
PersistedAppendingBuffer.of("batch", ToFunction.Invocation.class);
- @Persisted
- private final PersistedTable<String, byte[]> managedStates =
- PersistedTable.of("states", String.class, byte[].class);
+ @Persisted private final PersistedTable<String, byte[]> managedStates;
public RequestReplyFunction(
- List<String> registeredStateNames, int maxNumBatchRequests, RequestReplyClient client) {
+ List<StateSpec> registeredStates, int maxNumBatchRequests, RequestReplyClient client) {
this.client = Objects.requireNonNull(client);
- this.registeredStateNames = Objects.requireNonNull(registeredStateNames);
+ this.registeredStates = Objects.requireNonNull(registeredStates);
this.maxNumBatchRequests = maxNumBatchRequests;
+
+ this.managedStates =
+ PersistedTable.of(
+ "states", String.class, byte[].class, resolveStateTtlExpiration(registeredStates));
}
@Override
@@ -92,6 +96,23 @@ public final class RequestReplyFunction implements StatefulFunction {
onAsyncResult(context, result);
}
+ private static Expiration resolveStateTtlExpiration(List<StateSpec> stateSpecs) {
+ // TODO applying the below limitations due to state multiplexing (see FLINK-17954)
+ // TODO 1) use the max TTL duration across all state, 2) only allow AFTER_READ_AND_WRITE
+
+ Duration maxDuration = Duration.ZERO;
+ for (StateSpec stateSpec : stateSpecs) {
+ if (stateSpec.ttlDuration().compareTo(maxDuration) > 0) {
+ maxDuration = stateSpec.ttlDuration();
+ }
+ }
+
+ if (maxDuration.equals(Duration.ZERO)) {
+ return Expiration.none();
+ }
+ return Expiration.expireAfterReadingOrWriting(maxDuration);
+ }
+
private void onRequest(Context context, Any message) {
Invocation.Builder invocationBuilder = singeInvocationBuilder(context, message);
int inflightOrBatched = requestState.getOrDefault(-1);
@@ -207,11 +228,11 @@ public final class RequestReplyFunction implements StatefulFunction {
// --------------------------------------------------------------------------------
private void addStates(ToFunction.InvocationBatchRequest.Builder batchBuilder) {
- for (String stateName : registeredStateNames) {
+ for (StateSpec stateSpec : registeredStates) {
ToFunction.PersistedValue.Builder valueBuilder =
- ToFunction.PersistedValue.newBuilder().setStateName(stateName);
+ ToFunction.PersistedValue.newBuilder().setStateName(stateSpec.name());
- byte[] stateValue = managedStates.get(stateName);
+ byte[] stateValue = managedStates.get(stateSpec.name());
if (stateValue != null) {
valueBuilder.setStateValue(ByteString.copyFrom(stateValue));
}
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
index 04d389c..e934399 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
@@ -37,6 +37,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import org.apache.flink.statefun.flink.core.TestUtils;
import org.apache.flink.statefun.flink.core.backpressure.AsyncWaiter;
+import org.apache.flink.statefun.flink.core.httpfn.StateSpec;
import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction;
import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.DelayedInvocation;
import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.EgressMessage;
@@ -58,7 +59,7 @@ public class RequestReplyFunctionTest {
private final FakeClient client = new FakeClient();
private final FakeContext context = new FakeContext();
- private final List<String> states = Collections.singletonList("session");
+ private final List<StateSpec> states = Collections.singletonList(new StateSpec("session"));
private final RequestReplyFunction functionUnderTest =
new RequestReplyFunction(states, 10, client);