You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/02/20 03:14:57 UTC
[flink-statefun] branch master updated (2a4b8ad -> 57b1035)
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git.
from 2a4b8ad [hotfix] [core] Rename variables / methods in PersistedStates
new 1f876d8 [FLINK-16176] Use PersistedAppendingBuffer in HttpFunction
new 2825589 [FLINK-16176] Simplify InvocationBatchRequest building
new 57b1035 [hotfix] Temporary relax HttpFunction#unpackInvocationResultOrThrow
The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
.../statefun/flink/core/httpfn/HttpFunction.java | 47 +++++++++++++---------
1 file changed, 28 insertions(+), 19 deletions(-)
[flink-statefun] 02/03: [FLINK-16176] Simplify
InvocationBatchRequest building
Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 2825589d4827b2686872f6faeac78b5721c5b2b7
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Thu Feb 20 10:45:52 2020 +0800
[FLINK-16176] Simplify InvocationBatchRequest building
This closes #26.
---
.../org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java | 4 +---
1 file changed, 1 insertion(+), 3 deletions(-)
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java
index 7c07a6f..77857b3 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java
@@ -122,9 +122,7 @@ final class HttpFunction implements StatefulFunction {
return null;
}
InvocationBatchRequest.Builder builder = InvocationBatchRequest.newBuilder();
- for (Invocation invocation : next) {
- builder.addInvocations(invocation);
- }
+ builder.addAllInvocations(next);
return builder;
}
[flink-statefun] 01/03: [FLINK-16176] Use PersistedAppendingBuffer
in HttpFunction
Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 1f876d898dbb3872763cf046930b9addc8db79ff
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Wed Feb 19 16:43:12 2020 +0100
[FLINK-16176] Use PersistedAppendingBuffer in HttpFunction
This commit replaces the previously used PersistedValue
for batch accumulation, with a PersistedAppendingBuffer
that supports efficient appends.
---
.../statefun/flink/core/httpfn/HttpFunction.java | 33 +++++++++++++---------
1 file changed, 20 insertions(+), 13 deletions(-)
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java
index 506081a..7c07a6f 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java
@@ -30,6 +30,7 @@ import com.google.protobuf.ByteString;
import java.io.InputStream;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
+import javax.annotation.Nullable;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import okhttp3.Request;
@@ -45,6 +46,7 @@ import org.apache.flink.statefun.sdk.AsyncOperationResult;
import org.apache.flink.statefun.sdk.Context;
import org.apache.flink.statefun.sdk.StatefulFunction;
import org.apache.flink.statefun.sdk.annotations.Persisted;
+import org.apache.flink.statefun.sdk.state.PersistedAppendingBuffer;
import org.apache.flink.statefun.sdk.state.PersistedTable;
import org.apache.flink.statefun.sdk.state.PersistedValue;
@@ -59,8 +61,8 @@ final class HttpFunction implements StatefulFunction {
PersistedValue.of("inflight", Boolean.class);
@Persisted
- private final PersistedValue<ToFunction.InvocationBatchRequest> batch =
- PersistedValue.of("batch", ToFunction.InvocationBatchRequest.class);
+ private final PersistedAppendingBuffer<ToFunction.Invocation> batch =
+ PersistedAppendingBuffer.of("batch", ToFunction.Invocation.class);
@Persisted
private final PersistedTable<String, byte[]> managedStates =
@@ -87,21 +89,13 @@ final class HttpFunction implements StatefulFunction {
private void onRequest(Context context, Any message) {
Invocation.Builder invocationBuilder = singeInvocationBuilder(context, message);
if (hasInFlightRpc.getOrDefault(Boolean.FALSE)) {
- addToOrCreateBatch(invocationBuilder);
+ batch.append(invocationBuilder.build());
return;
}
hasInFlightRpc.set(Boolean.TRUE);
sendToFunction(context, invocationBuilder);
}
- private void addToOrCreateBatch(Invocation.Builder invocationBuilder) {
- InvocationBatchRequest current = batch.get();
- InvocationBatchRequest.Builder next =
- (current == null) ? InvocationBatchRequest.newBuilder() : current.toBuilder();
- next.addInvocations(invocationBuilder);
- batch.set(next.build());
- }
-
private void onAsyncResult(
Context context, AsyncOperationResult<ToFunction, Response> asyncResult) {
if (asyncResult.unknown()) {
@@ -112,13 +106,26 @@ final class HttpFunction implements StatefulFunction {
InvocationResponse invocationResult =
unpackInvocationResultOrThrow(context.self(), asyncResult);
handleInvocationResponse(context, invocationResult);
- InvocationBatchRequest nextBatch = batch.get();
+ InvocationBatchRequest.Builder nextBatch = getNextBatch();
if (nextBatch == null) {
hasInFlightRpc.clear();
return;
}
batch.clear();
- sendToFunction(context, nextBatch.toBuilder());
+ sendToFunction(context, nextBatch);
+ }
+
+ @Nullable
+ private InvocationBatchRequest.Builder getNextBatch() {
+ @Nullable Iterable<Invocation> next = batch.view();
+ if (next == null) {
+ return null;
+ }
+ InvocationBatchRequest.Builder builder = InvocationBatchRequest.newBuilder();
+ for (Invocation invocation : next) {
+ builder.addInvocations(invocation);
+ }
+ return builder;
}
private void handleInvocationResponse(Context context, InvocationResponse invocationResult) {
[flink-statefun] 03/03: [hotfix] Temporary relax
HttpFunction#unpackInvocationResultOrThrow
Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 57b1035e81f155b34d844cded773ede4352322b9
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Thu Feb 20 10:50:28 2020 +0800
[hotfix] Temporary relax HttpFunction#unpackInvocationResultOrThrow
Since the works for remote functions are still work-in-progress and may
not have the protocol fully implemented yet, in the case that an
invocation result is not present we relax the method to simply return a
default InvocationResponse.
---
.../flink/statefun/flink/core/httpfn/HttpFunction.java | 16 ++++++++++------
1 file changed, 10 insertions(+), 6 deletions(-)
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java
index 77857b3..2b3dbcb 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java
@@ -49,6 +49,7 @@ import org.apache.flink.statefun.sdk.annotations.Persisted;
import org.apache.flink.statefun.sdk.state.PersistedAppendingBuffer;
import org.apache.flink.statefun.sdk.state.PersistedTable;
import org.apache.flink.statefun.sdk.state.PersistedValue;
+import org.apache.flink.util.IOUtils;
final class HttpFunction implements StatefulFunction {
@@ -226,11 +227,14 @@ final class HttpFunction implements StatefulFunction {
"Failure forwarding a message to a remote function " + self, asyncResult.throwable());
}
InputStream httpResponseBody = responseBody(asyncResult.value());
- FromFunction fromFunction = parseProtobufOrThrow(FromFunction.parser(), httpResponseBody);
- checkState(
- fromFunction.hasInvocationResult(),
- "The received HTTP payload does not contain an InvocationResult, but rather [%s]",
- fromFunction);
- return fromFunction.getInvocationResult();
+ try {
+ FromFunction fromFunction = parseProtobufOrThrow(FromFunction.parser(), httpResponseBody);
+ if (fromFunction.hasInvocationResult()) {
+ return fromFunction.getInvocationResult();
+ }
+ return InvocationResponse.getDefaultInstance();
+ } finally {
+ IOUtils.closeQuietly(httpResponseBody);
+ }
}
}