You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/02/20 03:14:58 UTC

[flink-statefun] 01/03: [FLINK-16176] Use PersistedAppendingBuffer in HttpFunction

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 1f876d898dbb3872763cf046930b9addc8db79ff
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Wed Feb 19 16:43:12 2020 +0100

    [FLINK-16176] Use PersistedAppendingBuffer in HttpFunction
    
    This commit replaces the previously used PersistedValue
    for batch accumulation, with a PersistedAppendingBuffer
    that supports efficient appends.
---
 .../statefun/flink/core/httpfn/HttpFunction.java   | 33 +++++++++++++---------
 1 file changed, 20 insertions(+), 13 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java
index 506081a..7c07a6f 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java
@@ -30,6 +30,7 @@ import com.google.protobuf.ByteString;
 import java.io.InputStream;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
+import javax.annotation.Nullable;
 import okhttp3.HttpUrl;
 import okhttp3.OkHttpClient;
 import okhttp3.Request;
@@ -45,6 +46,7 @@ import org.apache.flink.statefun.sdk.AsyncOperationResult;
 import org.apache.flink.statefun.sdk.Context;
 import org.apache.flink.statefun.sdk.StatefulFunction;
 import org.apache.flink.statefun.sdk.annotations.Persisted;
+import org.apache.flink.statefun.sdk.state.PersistedAppendingBuffer;
 import org.apache.flink.statefun.sdk.state.PersistedTable;
 import org.apache.flink.statefun.sdk.state.PersistedValue;
 
@@ -59,8 +61,8 @@ final class HttpFunction implements StatefulFunction {
       PersistedValue.of("inflight", Boolean.class);
 
   @Persisted
-  private final PersistedValue<ToFunction.InvocationBatchRequest> batch =
-      PersistedValue.of("batch", ToFunction.InvocationBatchRequest.class);
+  private final PersistedAppendingBuffer<ToFunction.Invocation> batch =
+      PersistedAppendingBuffer.of("batch", ToFunction.Invocation.class);
 
   @Persisted
   private final PersistedTable<String, byte[]> managedStates =
@@ -87,21 +89,13 @@ final class HttpFunction implements StatefulFunction {
   private void onRequest(Context context, Any message) {
     Invocation.Builder invocationBuilder = singeInvocationBuilder(context, message);
     if (hasInFlightRpc.getOrDefault(Boolean.FALSE)) {
-      addToOrCreateBatch(invocationBuilder);
+      batch.append(invocationBuilder.build());
       return;
     }
     hasInFlightRpc.set(Boolean.TRUE);
     sendToFunction(context, invocationBuilder);
   }
 
-  private void addToOrCreateBatch(Invocation.Builder invocationBuilder) {
-    InvocationBatchRequest current = batch.get();
-    InvocationBatchRequest.Builder next =
-        (current == null) ? InvocationBatchRequest.newBuilder() : current.toBuilder();
-    next.addInvocations(invocationBuilder);
-    batch.set(next.build());
-  }
-
   private void onAsyncResult(
       Context context, AsyncOperationResult<ToFunction, Response> asyncResult) {
     if (asyncResult.unknown()) {
@@ -112,13 +106,26 @@ final class HttpFunction implements StatefulFunction {
     InvocationResponse invocationResult =
         unpackInvocationResultOrThrow(context.self(), asyncResult);
     handleInvocationResponse(context, invocationResult);
-    InvocationBatchRequest nextBatch = batch.get();
+    InvocationBatchRequest.Builder nextBatch = getNextBatch();
     if (nextBatch == null) {
       hasInFlightRpc.clear();
       return;
     }
     batch.clear();
-    sendToFunction(context, nextBatch.toBuilder());
+    sendToFunction(context, nextBatch);
+  }
+
+  @Nullable
+  private InvocationBatchRequest.Builder getNextBatch() {
+    @Nullable Iterable<Invocation> next = batch.view();
+    if (next == null) {
+      return null;
+    }
+    InvocationBatchRequest.Builder builder = InvocationBatchRequest.newBuilder();
+    for (Invocation invocation : next) {
+      builder.addInvocations(invocation);
+    }
+    return builder;
   }
 
   private void handleInvocationResponse(Context context, InvocationResponse invocationResult) {