You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/02/27 15:18:25 UTC
[flink-statefun] 01/01: [FLINK-16312] [core] Fix HttpFunction
onAsyncResult
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit e0eff08f04c83b5bf61721a797ef9d00a77c3d84
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Thu Feb 27 22:48:50 2020 +0800
[FLINK-16312] [core] Fix HttpFunction onAsyncResult
The HttpFunction was relying on incorrect contracts promised by
PersistedAppendingBuffer, causing it to endlessly resend 0-sized batch
requests. This commit fixes that to rely on the revisited contracts.
This closes #39.
---
.../statefun/flink/core/httpfn/HttpFunction.java | 37 ++++++++++------------
1 file changed, 17 insertions(+), 20 deletions(-)
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java
index e0a9b68..369cae3 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java
@@ -30,7 +30,6 @@ import com.google.protobuf.ByteString;
import java.io.InputStream;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
-import javax.annotation.Nullable;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import okhttp3.Request;
@@ -135,31 +134,29 @@ final class HttpFunction implements StatefulFunction {
InvocationResponse invocationResult =
unpackInvocationResultOrThrow(context.self(), asyncResult);
handleInvocationResponse(context, invocationResult);
- InvocationBatchRequest.Builder nextBatch = getNextBatch();
- if (nextBatch == null) {
- // the async request was completed, and there is nothing else in the batch
- // so we clear the requestState.
+
+ final int state = requestState.getOrDefault(-1);
+ if (state < 0) {
+ throw new IllegalStateException("Got an unexpected async result.");
+ } else if (state == 0) {
requestState.clear();
- return;
+ } else {
+ final InvocationBatchRequest.Builder nextBatch = getNextBatch();
+ // an async request was just completed, but while it was in flight we have
+ // accumulated a batch, we now proceed with:
+ // a) clearing the batch from our own persisted state (the batch moves to the async operation
+ // state)
+ // b) sending the accumulated batch to the remote function.
+ requestState.set(0);
+ batch.clear();
+ sendToFunction(context, nextBatch);
}
- // an async request was just completed, but while it was in flight we have
- // accumulated a batch, we now proceed with:
- // a) clearing the batch from our own persisted state (the batch moves to the async operation
- // state)
- // b) sending the accumulated batch to the remote function.
- requestState.set(0);
- batch.clear();
- sendToFunction(context, nextBatch);
}
- @Nullable
private InvocationBatchRequest.Builder getNextBatch() {
- @Nullable Iterable<Invocation> next = batch.view();
- if (next == null) {
- return null;
- }
InvocationBatchRequest.Builder builder = InvocationBatchRequest.newBuilder();
- builder.addAllInvocations(next);
+ Iterable<Invocation> view = batch.view();
+ builder.addAllInvocations(view);
return builder;
}