You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/02/27 15:18:25 UTC

[flink-statefun] 01/01: [FLINK-16312] [core] Fix HttpFunction onAsyncResult

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit e0eff08f04c83b5bf61721a797ef9d00a77c3d84
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Thu Feb 27 22:48:50 2020 +0800

    [FLINK-16312] [core] Fix HttpFunction onAsyncResult
    
    The HttpFunction was relying on incorrect contracts promised by
    PersistedAppendingBuffer, causing it to endlessly resend 0-sized batch
    requests. This commit fixes that to rely on the revisited contracts.
    
    This closes #39.
---
 .../statefun/flink/core/httpfn/HttpFunction.java   | 37 ++++++++++------------
 1 file changed, 17 insertions(+), 20 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java
index e0a9b68..369cae3 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java
@@ -30,7 +30,6 @@ import com.google.protobuf.ByteString;
 import java.io.InputStream;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
-import javax.annotation.Nullable;
 import okhttp3.HttpUrl;
 import okhttp3.OkHttpClient;
 import okhttp3.Request;
@@ -135,31 +134,29 @@ final class HttpFunction implements StatefulFunction {
     InvocationResponse invocationResult =
         unpackInvocationResultOrThrow(context.self(), asyncResult);
     handleInvocationResponse(context, invocationResult);
-    InvocationBatchRequest.Builder nextBatch = getNextBatch();
-    if (nextBatch == null) {
-      // the async request was completed, and there is nothing else in the batch
-      // so we clear the requestState.
+
+    final int state = requestState.getOrDefault(-1);
+    if (state < 0) {
+      throw new IllegalStateException("Got an unexpected async result.");
+    } else if (state == 0) {
       requestState.clear();
-      return;
+    } else {
+      final InvocationBatchRequest.Builder nextBatch = getNextBatch();
+      // an async request was just completed, but while it was in flight we have
+      // accumulated a batch, we now proceed with:
+      // a) clearing the batch from our own persisted state (the batch moves to the async operation
+      // state)
+      // b) sending the accumulated batch to the remote function.
+      requestState.set(0);
+      batch.clear();
+      sendToFunction(context, nextBatch);
     }
-    // an async request was just completed, but while it was in flight we have
-    // accumulated a batch, we now proceed with:
-    // a) clearing the batch from our own persisted state (the batch moves to the async operation
-    // state)
-    // b) sending the accumulated batch to the remote function.
-    requestState.set(0);
-    batch.clear();
-    sendToFunction(context, nextBatch);
   }
 
-  @Nullable
   private InvocationBatchRequest.Builder getNextBatch() {
-    @Nullable Iterable<Invocation> next = batch.view();
-    if (next == null) {
-      return null;
-    }
     InvocationBatchRequest.Builder builder = InvocationBatchRequest.newBuilder();
-    builder.addAllInvocations(next);
+    Iterable<Invocation> view = batch.view();
+    builder.addAllInvocations(view);
     return builder;
   }