You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/02/20 03:14:57 UTC

[flink-statefun] branch master updated (2a4b8ad -> 57b1035)

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git.


    from 2a4b8ad  [hotfix] [core] Rename variables / methods in PersistedStates
     new 1f876d8  [FLINK-16176] Use PersistedAppendingBuffer in HttpFunction
     new 2825589  [FLINK-16176] Simplify InvocationBatchRequest building
     new 57b1035  [hotfix] Temporary relax HttpFunction#unpackInvocationResultOrThrow

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../statefun/flink/core/httpfn/HttpFunction.java   | 47 +++++++++++++---------
 1 file changed, 28 insertions(+), 19 deletions(-)


[flink-statefun] 02/03: [FLINK-16176] Simplify InvocationBatchRequest building

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 2825589d4827b2686872f6faeac78b5721c5b2b7
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Thu Feb 20 10:45:52 2020 +0800

    [FLINK-16176] Simplify InvocationBatchRequest building
    
    This closes #26.
---
 .../org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java     | 4 +---
 1 file changed, 1 insertion(+), 3 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java
index 7c07a6f..77857b3 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java
@@ -122,9 +122,7 @@ final class HttpFunction implements StatefulFunction {
       return null;
     }
     InvocationBatchRequest.Builder builder = InvocationBatchRequest.newBuilder();
-    for (Invocation invocation : next) {
-      builder.addInvocations(invocation);
-    }
+    builder.addAllInvocations(next);
     return builder;
   }
 


[flink-statefun] 01/03: [FLINK-16176] Use PersistedAppendingBuffer in HttpFunction

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 1f876d898dbb3872763cf046930b9addc8db79ff
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Wed Feb 19 16:43:12 2020 +0100

    [FLINK-16176] Use PersistedAppendingBuffer in HttpFunction
    
    This commit replaces the previously used PersistedValue
    for batch accumulation, with a PersistedAppendingBuffer
    that supports efficient appends.
---
 .../statefun/flink/core/httpfn/HttpFunction.java   | 33 +++++++++++++---------
 1 file changed, 20 insertions(+), 13 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java
index 506081a..7c07a6f 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java
@@ -30,6 +30,7 @@ import com.google.protobuf.ByteString;
 import java.io.InputStream;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
+import javax.annotation.Nullable;
 import okhttp3.HttpUrl;
 import okhttp3.OkHttpClient;
 import okhttp3.Request;
@@ -45,6 +46,7 @@ import org.apache.flink.statefun.sdk.AsyncOperationResult;
 import org.apache.flink.statefun.sdk.Context;
 import org.apache.flink.statefun.sdk.StatefulFunction;
 import org.apache.flink.statefun.sdk.annotations.Persisted;
+import org.apache.flink.statefun.sdk.state.PersistedAppendingBuffer;
 import org.apache.flink.statefun.sdk.state.PersistedTable;
 import org.apache.flink.statefun.sdk.state.PersistedValue;
 
@@ -59,8 +61,8 @@ final class HttpFunction implements StatefulFunction {
       PersistedValue.of("inflight", Boolean.class);
 
   @Persisted
-  private final PersistedValue<ToFunction.InvocationBatchRequest> batch =
-      PersistedValue.of("batch", ToFunction.InvocationBatchRequest.class);
+  private final PersistedAppendingBuffer<ToFunction.Invocation> batch =
+      PersistedAppendingBuffer.of("batch", ToFunction.Invocation.class);
 
   @Persisted
   private final PersistedTable<String, byte[]> managedStates =
@@ -87,21 +89,13 @@ final class HttpFunction implements StatefulFunction {
   private void onRequest(Context context, Any message) {
     Invocation.Builder invocationBuilder = singeInvocationBuilder(context, message);
     if (hasInFlightRpc.getOrDefault(Boolean.FALSE)) {
-      addToOrCreateBatch(invocationBuilder);
+      batch.append(invocationBuilder.build());
       return;
     }
     hasInFlightRpc.set(Boolean.TRUE);
     sendToFunction(context, invocationBuilder);
   }
 
-  private void addToOrCreateBatch(Invocation.Builder invocationBuilder) {
-    InvocationBatchRequest current = batch.get();
-    InvocationBatchRequest.Builder next =
-        (current == null) ? InvocationBatchRequest.newBuilder() : current.toBuilder();
-    next.addInvocations(invocationBuilder);
-    batch.set(next.build());
-  }
-
   private void onAsyncResult(
       Context context, AsyncOperationResult<ToFunction, Response> asyncResult) {
     if (asyncResult.unknown()) {
@@ -112,13 +106,26 @@ final class HttpFunction implements StatefulFunction {
     InvocationResponse invocationResult =
         unpackInvocationResultOrThrow(context.self(), asyncResult);
     handleInvocationResponse(context, invocationResult);
-    InvocationBatchRequest nextBatch = batch.get();
+    InvocationBatchRequest.Builder nextBatch = getNextBatch();
     if (nextBatch == null) {
       hasInFlightRpc.clear();
       return;
     }
     batch.clear();
-    sendToFunction(context, nextBatch.toBuilder());
+    sendToFunction(context, nextBatch);
+  }
+
+  @Nullable
+  private InvocationBatchRequest.Builder getNextBatch() {
+    @Nullable Iterable<Invocation> next = batch.view();
+    if (next == null) {
+      return null;
+    }
+    InvocationBatchRequest.Builder builder = InvocationBatchRequest.newBuilder();
+    for (Invocation invocation : next) {
+      builder.addInvocations(invocation);
+    }
+    return builder;
   }
 
   private void handleInvocationResponse(Context context, InvocationResponse invocationResult) {


[flink-statefun] 03/03: [hotfix] Temporary relax HttpFunction#unpackInvocationResultOrThrow

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 57b1035e81f155b34d844cded773ede4352322b9
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Thu Feb 20 10:50:28 2020 +0800

    [hotfix] Temporary relax HttpFunction#unpackInvocationResultOrThrow
    
    Since the works for remote functions are still work-in-progress and may
    not have the protocol fully implemented yet, in the case that an
    invocation result is not present we relax the method to simply return a
    default InvocationResponse.
---
 .../flink/statefun/flink/core/httpfn/HttpFunction.java   | 16 ++++++++++------
 1 file changed, 10 insertions(+), 6 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java
index 77857b3..2b3dbcb 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java
@@ -49,6 +49,7 @@ import org.apache.flink.statefun.sdk.annotations.Persisted;
 import org.apache.flink.statefun.sdk.state.PersistedAppendingBuffer;
 import org.apache.flink.statefun.sdk.state.PersistedTable;
 import org.apache.flink.statefun.sdk.state.PersistedValue;
+import org.apache.flink.util.IOUtils;
 
 final class HttpFunction implements StatefulFunction {
 
@@ -226,11 +227,14 @@ final class HttpFunction implements StatefulFunction {
           "Failure forwarding a message to a remote function " + self, asyncResult.throwable());
     }
     InputStream httpResponseBody = responseBody(asyncResult.value());
-    FromFunction fromFunction = parseProtobufOrThrow(FromFunction.parser(), httpResponseBody);
-    checkState(
-        fromFunction.hasInvocationResult(),
-        "The received HTTP payload does not contain an InvocationResult, but rather [%s]",
-        fromFunction);
-    return fromFunction.getInvocationResult();
+    try {
+      FromFunction fromFunction = parseProtobufOrThrow(FromFunction.parser(), httpResponseBody);
+      if (fromFunction.hasInvocationResult()) {
+        return fromFunction.getInvocationResult();
+      }
+      return InvocationResponse.getDefaultInstance();
+    } finally {
+      IOUtils.closeQuietly(httpResponseBody);
+    }
   }
 }