You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/02/20 03:14:58 UTC
[flink-statefun] 01/03: [FLINK-16176] Use PersistedAppendingBuffer
in HttpFunction
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 1f876d898dbb3872763cf046930b9addc8db79ff
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Wed Feb 19 16:43:12 2020 +0100
[FLINK-16176] Use PersistedAppendingBuffer in HttpFunction
This commit replaces the previously used PersistedValue
for batch accumulation, with a PersistedAppendingBuffer
that supports efficient appends.
---
.../statefun/flink/core/httpfn/HttpFunction.java | 33 +++++++++++++---------
1 file changed, 20 insertions(+), 13 deletions(-)
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java
index 506081a..7c07a6f 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java
@@ -30,6 +30,7 @@ import com.google.protobuf.ByteString;
import java.io.InputStream;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
+import javax.annotation.Nullable;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import okhttp3.Request;
@@ -45,6 +46,7 @@ import org.apache.flink.statefun.sdk.AsyncOperationResult;
import org.apache.flink.statefun.sdk.Context;
import org.apache.flink.statefun.sdk.StatefulFunction;
import org.apache.flink.statefun.sdk.annotations.Persisted;
+import org.apache.flink.statefun.sdk.state.PersistedAppendingBuffer;
import org.apache.flink.statefun.sdk.state.PersistedTable;
import org.apache.flink.statefun.sdk.state.PersistedValue;
@@ -59,8 +61,8 @@ final class HttpFunction implements StatefulFunction {
PersistedValue.of("inflight", Boolean.class);
@Persisted
- private final PersistedValue<ToFunction.InvocationBatchRequest> batch =
- PersistedValue.of("batch", ToFunction.InvocationBatchRequest.class);
+ private final PersistedAppendingBuffer<ToFunction.Invocation> batch =
+ PersistedAppendingBuffer.of("batch", ToFunction.Invocation.class);
@Persisted
private final PersistedTable<String, byte[]> managedStates =
@@ -87,21 +89,13 @@ final class HttpFunction implements StatefulFunction {
private void onRequest(Context context, Any message) {
Invocation.Builder invocationBuilder = singeInvocationBuilder(context, message);
if (hasInFlightRpc.getOrDefault(Boolean.FALSE)) {
- addToOrCreateBatch(invocationBuilder);
+ batch.append(invocationBuilder.build());
return;
}
hasInFlightRpc.set(Boolean.TRUE);
sendToFunction(context, invocationBuilder);
}
- private void addToOrCreateBatch(Invocation.Builder invocationBuilder) {
- InvocationBatchRequest current = batch.get();
- InvocationBatchRequest.Builder next =
- (current == null) ? InvocationBatchRequest.newBuilder() : current.toBuilder();
- next.addInvocations(invocationBuilder);
- batch.set(next.build());
- }
-
private void onAsyncResult(
Context context, AsyncOperationResult<ToFunction, Response> asyncResult) {
if (asyncResult.unknown()) {
@@ -112,13 +106,26 @@ final class HttpFunction implements StatefulFunction {
InvocationResponse invocationResult =
unpackInvocationResultOrThrow(context.self(), asyncResult);
handleInvocationResponse(context, invocationResult);
- InvocationBatchRequest nextBatch = batch.get();
+ InvocationBatchRequest.Builder nextBatch = getNextBatch();
if (nextBatch == null) {
hasInFlightRpc.clear();
return;
}
batch.clear();
- sendToFunction(context, nextBatch.toBuilder());
+ sendToFunction(context, nextBatch);
+ }
+
+ @Nullable
+ private InvocationBatchRequest.Builder getNextBatch() {
+ @Nullable Iterable<Invocation> next = batch.view();
+ if (next == null) {
+ return null;
+ }
+ InvocationBatchRequest.Builder builder = InvocationBatchRequest.newBuilder();
+ for (Invocation invocation : next) {
+ builder.addInvocations(invocation);
+ }
+ return builder;
}
private void handleInvocationResponse(Context context, InvocationResponse invocationResult) {