You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/02/21 18:53:55 UTC

[GitHub] [flink-statefun] igalshilman opened a new pull request #30: [FLINK-16226] Add Backpressure to HttpFunction

igalshilman opened a new pull request #30: [FLINK-16226] Add Backpressure to HttpFunction
URL: https://github.com/apache/flink-statefun/pull/30
 
 
   ## This PR uses the back pressure mechanism introduced in #29 to support back pressure in `HttpFunction`.
   
   ## Background  
   When an Http remote function (`HttpFunction`) invokes a function remotely for a specific address, it would wait for a response to come back asynchronously. While it waits for 
   the response, new messages for that address might arrive, the `HttpFunction` start logging these requests in a persisted state, and when the original request completes the `HttpFunction` would batch these requests and send it off as a single request.
   
   In order to keep that state under control, the `HttpFunction` needs to limit the maximum allowed batch size, this PR provides that.
    
   
   ## User supplied property
   The first part of this PR adds a property `maxBatchSize` to the function spec in `module.yaml`
   
   ```
   - function:
             meta:
               kind: http
               type: org.foo/bar
             spec:
              ...
               maxBatch: 10
   ``` 
   
   This would indicate that the max batch size that would be sent, per address, for the function `org.foo/bar` is limited to 10 messages. Users can also omit that value and then the default value of 1,000 is selected.
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction

Posted by GitBox <gi...@apache.org>.
tzulitai commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction
URL: https://github.com/apache/flink-statefun/pull/30#discussion_r382901358
 
 

 ##########
 File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java
 ##########
 @@ -89,12 +101,28 @@ public void invoke(Context context, Object input) {
 
   private void onRequest(Context context, Any message) {
     Invocation.Builder invocationBuilder = singeInvocationBuilder(context, message);
-    if (hasInFlightRpc.getOrDefault(Boolean.FALSE)) {
-      batch.append(invocationBuilder.build());
+    int inflightOrBatched = requestState.getOrDefault(-1);
+    if (inflightOrBatched < 0) {
+      // no inflight requests, and nothing in the batch.
+      // so we let this request to go through, and change state to indicate that:
+      // a) there is a request in flight.
+      // b) there is nothing in the batch.
+      requestState.set(0);
+      sendToFunction(context, invocationBuilder);
       return;
     }
-    hasInFlightRpc.set(Boolean.TRUE);
-    sendToFunction(context, invocationBuilder);
+    // there is at least one request in flight (inflightOrBatched >= 0),
+    // so we add that request to the batch.
+    batch.append(invocationBuilder.build());
+    inflightOrBatched++;
+    requestState.set(inflightOrBatched);
+    if (isMaxBatchSizeExceeded(inflightOrBatched)) {
+      // we are at capacity, can't add anything to the batch.
+      // we need to signal to the runtime that we are unable to process any new input
+      // and we must wait for our in flight asynchronous operation to complete before
+      // we are able to process more input.
+      ((AsyncWaiter) context).awaitAsyncOperationComplete();
 
 Review comment:
   Yes, so I was thinking about something like
   `onRequest(context, (AsyncWaiter) context, message)`.
   It doesn't matter though, to keep it as is.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction

Posted by GitBox <gi...@apache.org>.
tzulitai commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction
URL: https://github.com/apache/flink-statefun/pull/30#discussion_r382903046
 
 

 ##########
 File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/Pointers.java
 ##########
 @@ -48,6 +48,8 @@ private Pointers() {}
     public static final JsonPointer FUNCTION_STATES = JsonPointer.compile("/function/spec/states");
     public static final JsonPointer FUNCTION_TIMEOUT =
         JsonPointer.compile("/function/spec/timeout");
+    public static final JsonPointer FUNCTION_MAX_HTTP_BATCH_SIZE =
+        JsonPointer.compile("/function/spec/maxBatchSize");
 
 Review comment:
   But of course, that is something we can add in the future and not now.
   Just suggesting a config key rename here in this PR, to leave us room for doing that in the future.
   
   WDYT?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction

Posted by GitBox <gi...@apache.org>.
tzulitai commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction
URL: https://github.com/apache/flink-statefun/pull/30#discussion_r382902451
 
 

 ##########
 File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/Pointers.java
 ##########
 @@ -48,6 +48,8 @@ private Pointers() {}
     public static final JsonPointer FUNCTION_STATES = JsonPointer.compile("/function/spec/states");
     public static final JsonPointer FUNCTION_TIMEOUT =
         JsonPointer.compile("/function/spec/timeout");
+    public static final JsonPointer FUNCTION_MAX_HTTP_BATCH_SIZE =
+        JsonPointer.compile("/function/spec/maxBatchSize");
 
 Review comment:
   I'm wondering should we rather call this `maxNumBatchRequests` or something along those lines.
   Just to clearly avoid potential confusion that its the accumulated data size of the HTTP request, and also to leave room for us to add such a data-size-based threshold in the future.
   
   My reasoning is:
   Just seeing it from the user perspective, if I were the user I would probably set this based on the desired max HTTP request data size, i.e.:
   ```
   configValue = desiredMaxBatchDataSize / averageMessageSize
   ```
   
   I can see a reason in the future to add such a data-size-based threshold.
   This _might_ in some cases be more intuitive to use than a request-count-based threshold. For example, AWS API gateway has an upper limit based on the size of HTTP request bodies.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction

Posted by GitBox <gi...@apache.org>.
tzulitai commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction
URL: https://github.com/apache/flink-statefun/pull/30#discussion_r382902451
 
 

 ##########
 File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/Pointers.java
 ##########
 @@ -48,6 +48,8 @@ private Pointers() {}
     public static final JsonPointer FUNCTION_STATES = JsonPointer.compile("/function/spec/states");
     public static final JsonPointer FUNCTION_TIMEOUT =
         JsonPointer.compile("/function/spec/timeout");
+    public static final JsonPointer FUNCTION_MAX_HTTP_BATCH_SIZE =
+        JsonPointer.compile("/function/spec/maxBatchSize");
 
 Review comment:
   I'm wondering should we rather call this `maxNumBatchRequests` or something along those lines.
   Just to clearly avoid confusion that its the actual batch data size of the HTTP request, and also to leave room for us to add such a data-size-based threshold in the future.
   
   My reasoning is:
   Just seeing it from the user perspective, if I were the user I would probably set this based on the desired max HTTP request data size, i.e.:
   ```
   configValue = desiredMaxBatchDataSize / averageExpectedMessageSize
   ```
   
   I can see a reason in the future to add such a data-size-based threshold.
   This _might_ in some cases be more intuitive to use than a request-count-based threshold. For example, AWS API gateway has an upper limit based on the size of HTTP request bodies.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction

Posted by GitBox <gi...@apache.org>.
tzulitai commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction
URL: https://github.com/apache/flink-statefun/pull/30#discussion_r382902451
 
 

 ##########
 File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/Pointers.java
 ##########
 @@ -48,6 +48,8 @@ private Pointers() {}
     public static final JsonPointer FUNCTION_STATES = JsonPointer.compile("/function/spec/states");
     public static final JsonPointer FUNCTION_TIMEOUT =
         JsonPointer.compile("/function/spec/timeout");
+    public static final JsonPointer FUNCTION_MAX_HTTP_BATCH_SIZE =
+        JsonPointer.compile("/function/spec/maxBatchSize");
 
 Review comment:
   I'm wondering should we rather call this `maxNumBatchRequests` or something along those lines.
   Just to clearly avoid confusion that its the actual batch data size of the HTTP request, and also to leave room for us to add such a data-size-based threshold in the future.
   
   My reasoning is:
   Just seeing it from the user perspective, if I were the user I would probably set this based on the desired max batch data size, i.e.:
   ```
   configValue = desiredMaxBatchDataSize / averageExpectedMessageSize
   ```
   to control the size of the HTTP requests.
   
   I can see a reason in the future to add such a data-size-based threshold.
   This _might_ in some cases be more intuitive to use than a request-count-based threshold.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction

Posted by GitBox <gi...@apache.org>.
tzulitai commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction
URL: https://github.com/apache/flink-statefun/pull/30#discussion_r382901358
 
 

 ##########
 File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java
 ##########
 @@ -89,12 +101,28 @@ public void invoke(Context context, Object input) {
 
   private void onRequest(Context context, Any message) {
     Invocation.Builder invocationBuilder = singeInvocationBuilder(context, message);
-    if (hasInFlightRpc.getOrDefault(Boolean.FALSE)) {
-      batch.append(invocationBuilder.build());
+    int inflightOrBatched = requestState.getOrDefault(-1);
+    if (inflightOrBatched < 0) {
+      // no inflight requests, and nothing in the batch.
+      // so we let this request to go through, and change state to indicate that:
+      // a) there is a request in flight.
+      // b) there is nothing in the batch.
+      requestState.set(0);
+      sendToFunction(context, invocationBuilder);
       return;
     }
-    hasInFlightRpc.set(Boolean.TRUE);
-    sendToFunction(context, invocationBuilder);
+    // there is at least one request in flight (inflightOrBatched >= 0),
+    // so we add that request to the batch.
+    batch.append(invocationBuilder.build());
+    inflightOrBatched++;
+    requestState.set(inflightOrBatched);
+    if (isMaxBatchSizeExceeded(inflightOrBatched)) {
+      // we are at capacity, can't add anything to the batch.
+      // we need to signal to the runtime that we are unable to process any new input
+      // and we must wait for our in flight asynchronous operation to complete before
+      // we are able to process more input.
+      ((AsyncWaiter) context).awaitAsyncOperationComplete();
 
 Review comment:
   Yes, so I was thinking about something like `onRequest(context, (AsyncWaiter) context, Any message)`. It doesn't matter though, to keep it as is.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction

Posted by GitBox <gi...@apache.org>.
tzulitai commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction
URL: https://github.com/apache/flink-statefun/pull/30#discussion_r382902451
 
 

 ##########
 File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/Pointers.java
 ##########
 @@ -48,6 +48,8 @@ private Pointers() {}
     public static final JsonPointer FUNCTION_STATES = JsonPointer.compile("/function/spec/states");
     public static final JsonPointer FUNCTION_TIMEOUT =
         JsonPointer.compile("/function/spec/timeout");
+    public static final JsonPointer FUNCTION_MAX_HTTP_BATCH_SIZE =
+        JsonPointer.compile("/function/spec/maxBatchSize");
 
 Review comment:
   I'm wondering should we rather call this `maxNumBatchRequests` or something along those lines.
   Just to clearly avoid confusion that its the actual batch data size of the HTTP request, and also to leave room for us to add such a data-size-based threshold in the future.
   
   My reasoning is:
   Just seeing it from the user perspective, if I were the user I would probably set this based on the desired max batch data size, i.e.:
   ```
   configValue = desiredMaxBatchDataSize / averageExpectedMessageSize
   ```
   to control the size of the HTTP requests.
   
   I can see a reason in the future to add such a size-based threshold.
   This _might_ in some cases be more intuitive to use than a request-count-based threshold.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction

Posted by GitBox <gi...@apache.org>.
tzulitai commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction
URL: https://github.com/apache/flink-statefun/pull/30#discussion_r382876194
 
 

 ##########
 File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java
 ##########
 @@ -109,9 +137,17 @@ private void onAsyncResult(
     handleInvocationResponse(context, invocationResult);
     InvocationBatchRequest.Builder nextBatch = getNextBatch();
     if (nextBatch == null) {
-      hasInFlightRpc.clear();
+      // the async request was completed, and there is nothing else in the batch
+      // so we clear the requestState.
+      requestState.clear();
       return;
     }
+    // an async request was just completed, but while it was in flight we have
+    // accumulated a batch, we now proceed with:
+    // a) clearing the batch from our own persisted state (the batch moves to the async operation
+    // state)
+    // b) sending the accumulated batch to the remote function.
+    requestState.set(0);
 
 Review comment:
   Should this actually be `requestState.set(nextBatch.size())`?
   Or rather, we don't set it to 0 because at this point request state is already == accumulated batch size.
   
   Reasoning is:
   In `onRequest` method, as I understood it from the naming, the `inflightOrBatched` (counter obtained from `requestState`) reflects total number of "pending" records, regardless of whether they are buffered or in-flight.
   Setting `requestState` to 0 here breaks that semantic.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction

Posted by GitBox <gi...@apache.org>.
tzulitai commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction
URL: https://github.com/apache/flink-statefun/pull/30#discussion_r382876852
 
 

 ##########
 File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java
 ##########
 @@ -109,9 +137,17 @@ private void onAsyncResult(
     handleInvocationResponse(context, invocationResult);
     InvocationBatchRequest.Builder nextBatch = getNextBatch();
     if (nextBatch == null) {
-      hasInFlightRpc.clear();
+      // the async request was completed, and there is nothing else in the batch
+      // so we clear the requestState.
+      requestState.clear();
       return;
     }
+    // an async request was just completed, but while it was in flight we have
+    // accumulated a batch, we now proceed with:
+    // a) clearing the batch from our own persisted state (the batch moves to the async operation
+    // state)
+    // b) sending the accumulated batch to the remote function.
+    requestState.set(0);
 
 Review comment:
   So just to sum up 😄 
   I'm trying to clarify the intended backpressuring semantics w.r.t. some of the variable namings here.
   There seems to be some inconsistency in the naming?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] igalshilman edited a comment on issue #30: [FLINK-16226] Add Backpressure to HttpFunction

Posted by GitBox <gi...@apache.org>.
igalshilman edited a comment on issue #30: [FLINK-16226] Add Backpressure to HttpFunction
URL: https://github.com/apache/flink-statefun/pull/30#issuecomment-589930328
 
 
   Now that I hope it is clearer, I’m open for suggestions

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction

Posted by GitBox <gi...@apache.org>.
tzulitai commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction
URL: https://github.com/apache/flink-statefun/pull/30#discussion_r382902451
 
 

 ##########
 File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/Pointers.java
 ##########
 @@ -48,6 +48,8 @@ private Pointers() {}
     public static final JsonPointer FUNCTION_STATES = JsonPointer.compile("/function/spec/states");
     public static final JsonPointer FUNCTION_TIMEOUT =
         JsonPointer.compile("/function/spec/timeout");
+    public static final JsonPointer FUNCTION_MAX_HTTP_BATCH_SIZE =
+        JsonPointer.compile("/function/spec/maxBatchSize");
 
 Review comment:
   I'm wondering should we rather call this `maxNumBatchRequests` or something along those lines.
   Just to clearly avoid confusion that its the actual batch data size of the HTTP request, and also to leave room for us to add such a data-size-based threshold in the future.
   
   My reasoning is:
   Just seeing it from the user perspective, if I were the user I would probably use this by doing: `desiredMaxBatchDataSize / averageExpectedMessageSize` to control the size of the HTTP requests.
   I can see a reason in the future to add such a size-based threshold.
   This _might_ in some cases be more intuitive to use than a request-count-based threshold.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] igalshilman commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction

Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction
URL: https://github.com/apache/flink-statefun/pull/30#discussion_r382895705
 
 

 ##########
 File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java
 ##########
 @@ -89,12 +101,28 @@ public void invoke(Context context, Object input) {
 
   private void onRequest(Context context, Any message) {
     Invocation.Builder invocationBuilder = singeInvocationBuilder(context, message);
-    if (hasInFlightRpc.getOrDefault(Boolean.FALSE)) {
-      batch.append(invocationBuilder.build());
+    int inflightOrBatched = requestState.getOrDefault(-1);
+    if (inflightOrBatched < 0) {
+      // no inflight requests, and nothing in the batch.
+      // so we let this request to go through, and change state to indicate that:
+      // a) there is a request in flight.
+      // b) there is nothing in the batch.
+      requestState.set(0);
+      sendToFunction(context, invocationBuilder);
       return;
     }
-    hasInFlightRpc.set(Boolean.TRUE);
-    sendToFunction(context, invocationBuilder);
+    // there is at least one request in flight (inflightOrBatched >= 0),
+    // so we add that request to the batch.
+    batch.append(invocationBuilder.build());
+    inflightOrBatched++;
+    requestState.set(inflightOrBatched);
+    if (isMaxBatchSizeExceeded(inflightOrBatched)) {
+      // we are at capacity, can't add anything to the batch.
+      // we need to signal to the runtime that we are unable to process any new input
+      // and we must wait for our in flight asynchronous operation to complete before
+      // we are able to process more input.
+      ((AsyncWaiter) context).awaitAsyncOperationComplete();
 
 Review comment:
   AsyncWaiter doesn’t extend from Context, and me need both of them in this method

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction

Posted by GitBox <gi...@apache.org>.
tzulitai commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction
URL: https://github.com/apache/flink-statefun/pull/30#discussion_r382876533
 
 

 ##########
 File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java
 ##########
 @@ -109,9 +137,17 @@ private void onAsyncResult(
     handleInvocationResponse(context, invocationResult);
     InvocationBatchRequest.Builder nextBatch = getNextBatch();
     if (nextBatch == null) {
-      hasInFlightRpc.clear();
+      // the async request was completed, and there is nothing else in the batch
+      // so we clear the requestState.
+      requestState.clear();
       return;
     }
+    // an async request was just completed, but while it was in flight we have
+    // accumulated a batch, we now proceed with:
+    // a) clearing the batch from our own persisted state (the batch moves to the async operation
+    // state)
+    // b) sending the accumulated batch to the remote function.
+    requestState.set(0);
 
 Review comment:
   On the other hand, if the intended semantics is that we only backpressure when local accumulated batch exceeds threshold, and not accounting number of in-flights (records, not requests; a batch request would have multiple records), then probably `inflightOrBatched` should be renamed to `batchedSize`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] igalshilman edited a comment on issue #30: [FLINK-16226] Add Backpressure to HttpFunction

Posted by GitBox <gi...@apache.org>.
igalshilman edited a comment on issue #30: [FLINK-16226] Add Backpressure to HttpFunction
URL: https://github.com/apache/flink-statefun/pull/30#issuecomment-589930100
 
 
   I think I understand from where the confusion comes from.
   requstState (see field comment)
   Has technically two states: (to reduce StateHandle count)
   1. A flag rather there is something in flight 
   (NULL means nothing, 0 means there is a batch in flight)
   2. The second state that is tracked is, the size of the currently accumulating batch (regardless of what is on the wire) 
   
   The semantics of the property as defined in module.yaml only limits the batch size that a remote function can receive.
     
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] igalshilman commented on issue #30: [FLINK-16226] Add Backpressure to HttpFunction

Posted by GitBox <gi...@apache.org>.
igalshilman commented on issue #30: [FLINK-16226] Add Backpressure to HttpFunction
URL: https://github.com/apache/flink-statefun/pull/30#issuecomment-589930100
 
 
   I think I understand from where the confusion comes from.
   requstState (see field comment)
   Has technically two states: (to reduce StateHandle count)
   1. A flag rather there is something in flight 
   (NULL means nothing, 0 means there is a batch in flight)
   2. The second state that is tracked is, the size of the currently accumulating batch (regardless of what is on the wire) 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] igalshilman commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction

Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction
URL: https://github.com/apache/flink-statefun/pull/30#discussion_r382997615
 
 

 ##########
 File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java
 ##########
 @@ -89,12 +101,28 @@ public void invoke(Context context, Object input) {
 
   private void onRequest(Context context, Any message) {
     Invocation.Builder invocationBuilder = singeInvocationBuilder(context, message);
-    if (hasInFlightRpc.getOrDefault(Boolean.FALSE)) {
-      batch.append(invocationBuilder.build());
+    int inflightOrBatched = requestState.getOrDefault(-1);
+    if (inflightOrBatched < 0) {
+      // no inflight requests, and nothing in the batch.
+      // so we let this request to go through, and change state to indicate that:
+      // a) there is a request in flight.
+      // b) there is nothing in the batch.
+      requestState.set(0);
+      sendToFunction(context, invocationBuilder);
       return;
     }
-    hasInFlightRpc.set(Boolean.TRUE);
-    sendToFunction(context, invocationBuilder);
+    // there is at least one request in flight (inflightOrBatched >= 0),
+    // so we add that request to the batch.
+    batch.append(invocationBuilder.build());
+    inflightOrBatched++;
+    requestState.set(inflightOrBatched);
+    if (isMaxBatchSizeExceeded(inflightOrBatched)) {
+      // we are at capacity, can't add anything to the batch.
+      // we need to signal to the runtime that we are unable to process any new input
+      // and we must wait for our in flight asynchronous operation to complete before
+      // we are able to process more input.
+      ((AsyncWaiter) context).awaitAsyncOperationComplete();
 
 Review comment:
   oh I see, we can do that, but this would incur a redundant cast most of the times.
   as this case is only hit while the batch size exceeds.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction

Posted by GitBox <gi...@apache.org>.
tzulitai commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction
URL: https://github.com/apache/flink-statefun/pull/30#discussion_r382902451
 
 

 ##########
 File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/Pointers.java
 ##########
 @@ -48,6 +48,8 @@ private Pointers() {}
     public static final JsonPointer FUNCTION_STATES = JsonPointer.compile("/function/spec/states");
     public static final JsonPointer FUNCTION_TIMEOUT =
         JsonPointer.compile("/function/spec/timeout");
+    public static final JsonPointer FUNCTION_MAX_HTTP_BATCH_SIZE =
+        JsonPointer.compile("/function/spec/maxBatchSize");
 
 Review comment:
   I'm wondering should we rather call this `maxNumBatchRequests` or something along those lines.
   Just to clearly avoid confusion that its the actual batch data size of the HTTP request, and also to leave room for us to add such a data-size-based threshold in the future.
   
   My reasoning is:
   Just seeing it from the user perspective, I would personally use this by doing: `desiredMaxBatchDataSize / averageExpectedMessageSize` to control the size of the HTTP requests.
   I can see a reason in the future to add such a size-based threshold.
   This _might_ in some cases be more intuitive to use than a request-count-based threshold.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction

Posted by GitBox <gi...@apache.org>.
tzulitai commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction
URL: https://github.com/apache/flink-statefun/pull/30#discussion_r383008706
 
 

 ##########
 File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/Pointers.java
 ##########
 @@ -48,6 +48,8 @@ private Pointers() {}
     public static final JsonPointer FUNCTION_STATES = JsonPointer.compile("/function/spec/states");
     public static final JsonPointer FUNCTION_TIMEOUT =
         JsonPointer.compile("/function/spec/timeout");
+    public static final JsonPointer FUNCTION_MAX_HTTP_BATCH_SIZE =
+        JsonPointer.compile("/function/spec/maxBatchSize");
 
 Review comment:
   I'll change while merging 👌

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction

Posted by GitBox <gi...@apache.org>.
tzulitai commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction
URL: https://github.com/apache/flink-statefun/pull/30#discussion_r382903046
 
 

 ##########
 File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/Pointers.java
 ##########
 @@ -48,6 +48,8 @@ private Pointers() {}
     public static final JsonPointer FUNCTION_STATES = JsonPointer.compile("/function/spec/states");
     public static final JsonPointer FUNCTION_TIMEOUT =
         JsonPointer.compile("/function/spec/timeout");
+    public static final JsonPointer FUNCTION_MAX_HTTP_BATCH_SIZE =
+        JsonPointer.compile("/function/spec/maxBatchSize");
 
 Review comment:
   But of course, that is something we can add in the future and not now.
   Just suggesting a config key rename here, to leave us room for doing that in the future.
   
   WDYT?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction

Posted by GitBox <gi...@apache.org>.
tzulitai commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction
URL: https://github.com/apache/flink-statefun/pull/30#discussion_r382901358
 
 

 ##########
 File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java
 ##########
 @@ -89,12 +101,28 @@ public void invoke(Context context, Object input) {
 
   private void onRequest(Context context, Any message) {
     Invocation.Builder invocationBuilder = singeInvocationBuilder(context, message);
-    if (hasInFlightRpc.getOrDefault(Boolean.FALSE)) {
-      batch.append(invocationBuilder.build());
+    int inflightOrBatched = requestState.getOrDefault(-1);
+    if (inflightOrBatched < 0) {
+      // no inflight requests, and nothing in the batch.
+      // so we let this request to go through, and change state to indicate that:
+      // a) there is a request in flight.
+      // b) there is nothing in the batch.
+      requestState.set(0);
+      sendToFunction(context, invocationBuilder);
       return;
     }
-    hasInFlightRpc.set(Boolean.TRUE);
-    sendToFunction(context, invocationBuilder);
+    // there is at least one request in flight (inflightOrBatched >= 0),
+    // so we add that request to the batch.
+    batch.append(invocationBuilder.build());
+    inflightOrBatched++;
+    requestState.set(inflightOrBatched);
+    if (isMaxBatchSizeExceeded(inflightOrBatched)) {
+      // we are at capacity, can't add anything to the batch.
+      // we need to signal to the runtime that we are unable to process any new input
+      // and we must wait for our in flight asynchronous operation to complete before
+      // we are able to process more input.
+      ((AsyncWaiter) context).awaitAsyncOperationComplete();
 
 Review comment:
   Yes, so I was thinking about something like
   `onRequest(context, (AsyncWaiter) context, Any message)`.
   It doesn't matter though, to keep it as is.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction

Posted by GitBox <gi...@apache.org>.
tzulitai commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction
URL: https://github.com/apache/flink-statefun/pull/30#discussion_r382874440
 
 

 ##########
 File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java
 ##########
 @@ -89,12 +101,28 @@ public void invoke(Context context, Object input) {
 
   private void onRequest(Context context, Any message) {
     Invocation.Builder invocationBuilder = singeInvocationBuilder(context, message);
-    if (hasInFlightRpc.getOrDefault(Boolean.FALSE)) {
-      batch.append(invocationBuilder.build());
+    int inflightOrBatched = requestState.getOrDefault(-1);
+    if (inflightOrBatched < 0) {
+      // no inflight requests, and nothing in the batch.
+      // so we let this request to go through, and change state to indicate that:
+      // a) there is a request in flight.
+      // b) there is nothing in the batch.
+      requestState.set(0);
+      sendToFunction(context, invocationBuilder);
       return;
     }
-    hasInFlightRpc.set(Boolean.TRUE);
-    sendToFunction(context, invocationBuilder);
+    // there is at least one request in flight (inflightOrBatched >= 0),
+    // so we add that request to the batch.
+    batch.append(invocationBuilder.build());
+    inflightOrBatched++;
+    requestState.set(inflightOrBatched);
+    if (isMaxBatchSizeExceeded(inflightOrBatched)) {
+      // we are at capacity, can't add anything to the batch.
+      // we need to signal to the runtime that we are unable to process any new input
+      // and we must wait for our in flight asynchronous operation to complete before
+      // we are able to process more input.
+      ((AsyncWaiter) context).awaitAsyncOperationComplete();
 
 Review comment:
   Would it make sense to add a `AsyncWaiter` argument to the `onRequest` method, and do the casting on the call site? Just makes this a little bit more readable.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai closed pull request #30: [FLINK-16226] Add Backpressure to HttpFunction

Posted by GitBox <gi...@apache.org>.
tzulitai closed pull request #30: [FLINK-16226] Add Backpressure to HttpFunction
URL: https://github.com/apache/flink-statefun/pull/30
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction

Posted by GitBox <gi...@apache.org>.
tzulitai commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction
URL: https://github.com/apache/flink-statefun/pull/30#discussion_r382902451
 
 

 ##########
 File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/Pointers.java
 ##########
 @@ -48,6 +48,8 @@ private Pointers() {}
     public static final JsonPointer FUNCTION_STATES = JsonPointer.compile("/function/spec/states");
     public static final JsonPointer FUNCTION_TIMEOUT =
         JsonPointer.compile("/function/spec/timeout");
+    public static final JsonPointer FUNCTION_MAX_HTTP_BATCH_SIZE =
+        JsonPointer.compile("/function/spec/maxBatchSize");
 
 Review comment:
   I'm wondering should we rather call this `maxNumBatchRequests` or something along those lines.
   Just to clearly avoid confusion that its the actual batch data size of the HTTP request, and also to leave room for us to add such a data-size-based threshold in the future.
   
   My reasoning is:
   Just seeing it from the user perspective, if I were the user I would probably use this by doing: `configValue = desiredMaxBatchDataSize / averageExpectedMessageSize` to control the size of the HTTP requests.
   I can see a reason in the future to add such a size-based threshold.
   This _might_ in some cases be more intuitive to use than a request-count-based threshold.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction

Posted by GitBox <gi...@apache.org>.
tzulitai commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction
URL: https://github.com/apache/flink-statefun/pull/30#discussion_r382903046
 
 

 ##########
 File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/Pointers.java
 ##########
 @@ -48,6 +48,8 @@ private Pointers() {}
     public static final JsonPointer FUNCTION_STATES = JsonPointer.compile("/function/spec/states");
     public static final JsonPointer FUNCTION_TIMEOUT =
         JsonPointer.compile("/function/spec/timeout");
+    public static final JsonPointer FUNCTION_MAX_HTTP_BATCH_SIZE =
+        JsonPointer.compile("/function/spec/maxBatchSize");
 
 Review comment:
   But of course, that is something we can add in the future and not now.
   Just suggesting a config key rename here, to leave us room for doing that in the future.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai commented on issue #30: [FLINK-16226] Add Backpressure to HttpFunction

Posted by GitBox <gi...@apache.org>.
tzulitai commented on issue #30: [FLINK-16226] Add Backpressure to HttpFunction
URL: https://github.com/apache/flink-statefun/pull/30#issuecomment-589940730
 
 
   @igalshilman just one last comment, otherwise I think this is good to merge :)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai commented on issue #30: [FLINK-16226] Add Backpressure to HttpFunction

Posted by GitBox <gi...@apache.org>.
tzulitai commented on issue #30: [FLINK-16226] Add Backpressure to HttpFunction
URL: https://github.com/apache/flink-statefun/pull/30#issuecomment-589940086
 
 
   Thanks for the explanation. I think the field comment on `requestState` is pretty clear as is; I simply missed the fact that the value `0` has a special meaning in this case (which is also stated quite clearly already).
   
   So, from my understanding, the semantics are:
   - Backpressure occurs once local buffer for a given address hits `maxBatchSize`
   - The max pending requests, would be `2 * maxBatchSize` (`maxBatchSize` accumulated in local buffer state, and another `maxBatchSize` that is in-flight as a single request)
   
   👌 I think that's good.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction

Posted by GitBox <gi...@apache.org>.
tzulitai commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction
URL: https://github.com/apache/flink-statefun/pull/30#discussion_r382903046
 
 

 ##########
 File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/Pointers.java
 ##########
 @@ -48,6 +48,8 @@ private Pointers() {}
     public static final JsonPointer FUNCTION_STATES = JsonPointer.compile("/function/spec/states");
     public static final JsonPointer FUNCTION_TIMEOUT =
         JsonPointer.compile("/function/spec/timeout");
+    public static final JsonPointer FUNCTION_MAX_HTTP_BATCH_SIZE =
+        JsonPointer.compile("/function/spec/maxBatchSize");
 
 Review comment:
   But of course, that is something we can add in the future and not now.
   Just suggesting a config key rename here, to leave as room for doing that in the future.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction

Posted by GitBox <gi...@apache.org>.
tzulitai commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction
URL: https://github.com/apache/flink-statefun/pull/30#discussion_r382902451
 
 

 ##########
 File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/Pointers.java
 ##########
 @@ -48,6 +48,8 @@ private Pointers() {}
     public static final JsonPointer FUNCTION_STATES = JsonPointer.compile("/function/spec/states");
     public static final JsonPointer FUNCTION_TIMEOUT =
         JsonPointer.compile("/function/spec/timeout");
+    public static final JsonPointer FUNCTION_MAX_HTTP_BATCH_SIZE =
+        JsonPointer.compile("/function/spec/maxBatchSize");
 
 Review comment:
   I'm wondering should we rather call this `maxNumBatchRequests` or something along those lines.
   Just to clearly avoid potential confusion that its the accumulated data size of the HTTP request, and also to leave room for us to add such a data-size-based threshold in the future.
   
   My reasoning is:
   Just seeing it from the user perspective, if I were the user I would probably set this based on the desired max HTTP request data size, i.e.:
   ```
   configValue = desiredMaxBatchDataSize / averageExpectedMessageSize
   ```
   
   I can see a reason in the future to add such a data-size-based threshold.
   This _might_ in some cases be more intuitive to use than a request-count-based threshold. For example, AWS API gateway has an upper limit based on the size of HTTP request bodies.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction

Posted by GitBox <gi...@apache.org>.
tzulitai commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction
URL: https://github.com/apache/flink-statefun/pull/30#discussion_r382902451
 
 

 ##########
 File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/Pointers.java
 ##########
 @@ -48,6 +48,8 @@ private Pointers() {}
     public static final JsonPointer FUNCTION_STATES = JsonPointer.compile("/function/spec/states");
     public static final JsonPointer FUNCTION_TIMEOUT =
         JsonPointer.compile("/function/spec/timeout");
+    public static final JsonPointer FUNCTION_MAX_HTTP_BATCH_SIZE =
+        JsonPointer.compile("/function/spec/maxBatchSize");
 
 Review comment:
   I'm wondering should we rather call this `maxNumBatchRequests` or something along those lines.
   Just to clearly avoid confusion that its the actual batch data size of the HTTP request, and also to leave room for us to add such a data-size-based threshold in the future.
   
   My reasoning is:
   Just seeing it from the user perspective, if I were the user I would probably set this based on the desired max HTTP request data size, i.e.:
   ```
   configValue = desiredMaxBatchDataSize / averageExpectedMessageSize
   ```
   
   I can see a reason in the future to add such a data-size-based threshold.
   This _might_ in some cases be more intuitive to use than a request-count-based threshold. For example, AWS API gateway has an upper limit based on the body size of HTTP requests.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction

Posted by GitBox <gi...@apache.org>.
tzulitai commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction
URL: https://github.com/apache/flink-statefun/pull/30#discussion_r382902451
 
 

 ##########
 File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/Pointers.java
 ##########
 @@ -48,6 +48,8 @@ private Pointers() {}
     public static final JsonPointer FUNCTION_STATES = JsonPointer.compile("/function/spec/states");
     public static final JsonPointer FUNCTION_TIMEOUT =
         JsonPointer.compile("/function/spec/timeout");
+    public static final JsonPointer FUNCTION_MAX_HTTP_BATCH_SIZE =
+        JsonPointer.compile("/function/spec/maxBatchSize");
 
 Review comment:
   I'm wondering should we rather call this `maxNumBatchRequests` or something along those lines.
   Just to clearly avoid potential confusion that its the accumulated data size of the HTTP request, and also to leave room for us to add such a data-size-based threshold in the future.
   
   My reasoning is:
   Just seeing it from the user perspective, if I were the user I would probably set this based on the desired max HTTP request data size, i.e.:
   ```
   configValue = desiredMaxHttpBatchRequestSizeMbs / averagePerRequestSizeMbs
   ```
   
   I can see a reason in the future to add such a data-size-based threshold.
   This _might_ in some cases be more intuitive to use than a request-count-based threshold. For example, AWS API gateway has an upper limit based on the size of HTTP request bodies.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] igalshilman commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction

Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction
URL: https://github.com/apache/flink-statefun/pull/30#discussion_r382895705
 
 

 ##########
 File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java
 ##########
 @@ -89,12 +101,28 @@ public void invoke(Context context, Object input) {
 
   private void onRequest(Context context, Any message) {
     Invocation.Builder invocationBuilder = singeInvocationBuilder(context, message);
-    if (hasInFlightRpc.getOrDefault(Boolean.FALSE)) {
-      batch.append(invocationBuilder.build());
+    int inflightOrBatched = requestState.getOrDefault(-1);
+    if (inflightOrBatched < 0) {
+      // no inflight requests, and nothing in the batch.
+      // so we let this request to go through, and change state to indicate that:
+      // a) there is a request in flight.
+      // b) there is nothing in the batch.
+      requestState.set(0);
+      sendToFunction(context, invocationBuilder);
       return;
     }
-    hasInFlightRpc.set(Boolean.TRUE);
-    sendToFunction(context, invocationBuilder);
+    // there is at least one request in flight (inflightOrBatched >= 0),
+    // so we add that request to the batch.
+    batch.append(invocationBuilder.build());
+    inflightOrBatched++;
+    requestState.set(inflightOrBatched);
+    if (isMaxBatchSizeExceeded(inflightOrBatched)) {
+      // we are at capacity, can't add anything to the batch.
+      // we need to signal to the runtime that we are unable to process any new input
+      // and we must wait for our in flight asynchronous operation to complete before
+      // we are able to process more input.
+      ((AsyncWaiter) context).awaitAsyncOperationComplete();
 
 Review comment:
   AsyncWaiter doesn’t extend from Context, and me need both of them in this method

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] igalshilman commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction

Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction
URL: https://github.com/apache/flink-statefun/pull/30#discussion_r382997460
 
 

 ##########
 File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/Pointers.java
 ##########
 @@ -48,6 +48,8 @@ private Pointers() {}
     public static final JsonPointer FUNCTION_STATES = JsonPointer.compile("/function/spec/states");
     public static final JsonPointer FUNCTION_TIMEOUT =
         JsonPointer.compile("/function/spec/timeout");
+    public static final JsonPointer FUNCTION_MAX_HTTP_BATCH_SIZE =
+        JsonPointer.compile("/function/spec/maxBatchSize");
 
 Review comment:
   Sounds good, lets change to that!
   If you'd like you can change it while merging, or I'll push a hot fix to this PR

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] igalshilman commented on issue #30: [FLINK-16226] Add Backpressure to HttpFunction

Posted by GitBox <gi...@apache.org>.
igalshilman commented on issue #30: [FLINK-16226] Add Backpressure to HttpFunction
URL: https://github.com/apache/flink-statefun/pull/30#issuecomment-589930328
 
 
   Knowing that, I’m open for suggestions

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction

Posted by GitBox <gi...@apache.org>.
tzulitai commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction
URL: https://github.com/apache/flink-statefun/pull/30#discussion_r382876277
 
 

 ##########
 File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java
 ##########
 @@ -109,9 +137,17 @@ private void onAsyncResult(
     handleInvocationResponse(context, invocationResult);
     InvocationBatchRequest.Builder nextBatch = getNextBatch();
     if (nextBatch == null) {
-      hasInFlightRpc.clear();
+      // the async request was completed, and there is nothing else in the batch
+      // so we clear the requestState.
+      requestState.clear();
       return;
     }
+    // an async request was just completed, but while it was in flight we have
+    // accumulated a batch, we now proceed with:
+    // a) clearing the batch from our own persisted state (the batch moves to the async operation
+    // state)
+    // b) sending the accumulated batch to the remote function.
+    requestState.set(0);
 
 Review comment:
   If that's the case, then probably `maxBatchSize` should be renamed to `maxPendingSize`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] igalshilman commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction

Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #30: [FLINK-16226] Add Backpressure to HttpFunction
URL: https://github.com/apache/flink-statefun/pull/30#discussion_r382895710
 
 

 ##########
 File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java
 ##########
 @@ -89,12 +101,28 @@ public void invoke(Context context, Object input) {
 
   private void onRequest(Context context, Any message) {
     Invocation.Builder invocationBuilder = singeInvocationBuilder(context, message);
-    if (hasInFlightRpc.getOrDefault(Boolean.FALSE)) {
-      batch.append(invocationBuilder.build());
+    int inflightOrBatched = requestState.getOrDefault(-1);
+    if (inflightOrBatched < 0) {
+      // no inflight requests, and nothing in the batch.
+      // so we let this request to go through, and change state to indicate that:
+      // a) there is a request in flight.
+      // b) there is nothing in the batch.
+      requestState.set(0);
+      sendToFunction(context, invocationBuilder);
       return;
     }
-    hasInFlightRpc.set(Boolean.TRUE);
-    sendToFunction(context, invocationBuilder);
+    // there is at least one request in flight (inflightOrBatched >= 0),
+    // so we add that request to the batch.
+    batch.append(invocationBuilder.build());
+    inflightOrBatched++;
+    requestState.set(inflightOrBatched);
+    if (isMaxBatchSizeExceeded(inflightOrBatched)) {
+      // we are at capacity, can't add anything to the batch.
+      // we need to signal to the runtime that we are unable to process any new input
+      // and we must wait for our in flight asynchronous operation to complete before
+      // we are able to process more input.
+      ((AsyncWaiter) context).awaitAsyncOperationComplete();
 
 Review comment:
   AsyncWaiter doesn’t extend from Context, and me need both of them in this method

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services