You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/02/23 16:29:20 UTC

[flink-statefun] 01/03: [FLINK-16226] [remote] Add maxBatchSize to http function type

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 6dfb8af7c5d5707ee55d130ffb9d5cd397371932
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Fri Feb 21 18:29:43 2020 +0100

    [FLINK-16226] [remote] Add maxBatchSize to http function type
    
    This commit adds a property to the remote function spec
    that allows users to specify an upper limit to the number
    of invocations an HttpFunction can batch together and send
    to a remote function. Exceeding this number would trigger
    a backpressure.
---
 .../apache/flink/statefun/flink/common/json/Selectors.java   | 11 +++++++++++
 .../flink/statefun/flink/core/httpfn/HttpFunctionSpec.java   | 12 +++++++++++-
 .../flink/statefun/flink/core/jsonmodule/JsonModule.java     |  9 ++++++++-
 .../flink/statefun/flink/core/jsonmodule/Pointers.java       |  2 ++
 4 files changed, 32 insertions(+), 2 deletions(-)

diff --git a/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/Selectors.java b/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/Selectors.java
index 3fbdbd4..70c6d24 100644
--- a/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/Selectors.java
+++ b/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/Selectors.java
@@ -52,6 +52,17 @@ public final class Selectors {
     return node.asInt();
   }
 
+  public static OptionalInt optionalIntegerAt(JsonNode node, JsonPointer pointer) {
+    node = node.at(pointer);
+    if (node.isMissingNode()) {
+      return OptionalInt.empty();
+    }
+    if (!node.isInt()) {
+      throw new WrongTypeException(pointer, "not an integer");
+    }
+    return OptionalInt.of(node.asInt());
+  }
+
   public static Iterable<? extends JsonNode> listAt(JsonNode node, JsonPointer pointer) {
     node = node.at(pointer);
     if (node.isMissingNode()) {
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java
index 1b016ac..5126ee6 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java
@@ -29,13 +29,19 @@ public final class HttpFunctionSpec implements FunctionSpec {
   private final URI endpoint;
   private final List<String> states;
   private final Duration maxRequestDuration;
+  private final int maxBatchSize;
 
   public HttpFunctionSpec(
-      FunctionType functionType, URI endpoint, List<String> states, Duration maxRequestDuration) {
+      FunctionType functionType,
+      URI endpoint,
+      List<String> states,
+      Duration maxRequestDuration,
+      int maxBatchSize) {
     this.functionType = Objects.requireNonNull(functionType);
     this.endpoint = Objects.requireNonNull(endpoint);
     this.states = Objects.requireNonNull(states);
     this.maxRequestDuration = Objects.requireNonNull(maxRequestDuration);
+    this.maxBatchSize = maxBatchSize;
   }
 
   @Override
@@ -59,4 +65,8 @@ public final class HttpFunctionSpec implements FunctionSpec {
   public Duration maxRequestDuration() {
     return maxRequestDuration;
   }
+
+  public int maxBatchSize() {
+    return maxBatchSize;
+  }
 }
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
index dd43367..442b974 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
@@ -62,6 +62,7 @@ import org.apache.flink.util.TimeUtils;
 
 final class JsonModule implements StatefulFunctionModule {
   private static final Duration DEFAULT_HTTP_TIMEOUT = Duration.ofMinutes(1);
+  private static final Integer DEFAULT_MAX_HTTP_BATCH_SIZE = 1000;
   private final JsonNode spec;
   private final URL moduleUrl;
 
@@ -212,7 +213,8 @@ final class JsonModule implements StatefulFunctionModule {
             functionType,
             functionUri(functionNode),
             functionStates(functionNode),
-            maxRequestDuration(functionNode));
+            maxRequestDuration(functionNode),
+            maxBatchSize(functionNode));
       case GRPC:
         return new GrpcFunctionSpec(functionType, functionAddress(functionNode));
       default:
@@ -220,6 +222,11 @@ final class JsonModule implements StatefulFunctionModule {
     }
   }
 
+  private static int maxBatchSize(JsonNode functionNode) {
+    return Selectors.optionalIntegerAt(functionNode, Functions.FUNCTION_MAX_HTTP_BATCH_SIZE)
+        .orElse(DEFAULT_MAX_HTTP_BATCH_SIZE);
+  }
+
   private static Duration maxRequestDuration(JsonNode functionNode) {
     return Selectors.optionalTextAt(functionNode, Functions.FUNCTION_TIMEOUT)
         .map(TimeUtils::parseDuration)
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/Pointers.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/Pointers.java
index 6d01344..60fc6be 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/Pointers.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/Pointers.java
@@ -48,6 +48,8 @@ public final class Pointers {
     public static final JsonPointer FUNCTION_STATES = JsonPointer.compile("/function/spec/states");
     public static final JsonPointer FUNCTION_TIMEOUT =
         JsonPointer.compile("/function/spec/timeout");
+    public static final JsonPointer FUNCTION_MAX_HTTP_BATCH_SIZE =
+        JsonPointer.compile("/function/spec/maxBatchSize");
   }
 
   // -------------------------------------------------------------------------------------