You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/02/23 16:29:20 UTC
[flink-statefun] 01/03: [FLINK-16226] [remote] Add maxBatchSize to
http function type
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 6dfb8af7c5d5707ee55d130ffb9d5cd397371932
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Fri Feb 21 18:29:43 2020 +0100
[FLINK-16226] [remote] Add maxBatchSize to http function type
This commit adds a property to the remote function spec
that allows users to specify an upper limit to the number
of invocations an HttpFunction can batch together and send
to a remote function. Exceeding this number would trigger
a backpressure.
---
.../apache/flink/statefun/flink/common/json/Selectors.java | 11 +++++++++++
.../flink/statefun/flink/core/httpfn/HttpFunctionSpec.java | 12 +++++++++++-
.../flink/statefun/flink/core/jsonmodule/JsonModule.java | 9 ++++++++-
.../flink/statefun/flink/core/jsonmodule/Pointers.java | 2 ++
4 files changed, 32 insertions(+), 2 deletions(-)
diff --git a/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/Selectors.java b/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/Selectors.java
index 3fbdbd4..70c6d24 100644
--- a/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/Selectors.java
+++ b/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/Selectors.java
@@ -52,6 +52,17 @@ public final class Selectors {
return node.asInt();
}
+ public static OptionalInt optionalIntegerAt(JsonNode node, JsonPointer pointer) {
+ node = node.at(pointer);
+ if (node.isMissingNode()) {
+ return OptionalInt.empty();
+ }
+ if (!node.isInt()) {
+ throw new WrongTypeException(pointer, "not an integer");
+ }
+ return OptionalInt.of(node.asInt());
+ }
+
public static Iterable<? extends JsonNode> listAt(JsonNode node, JsonPointer pointer) {
node = node.at(pointer);
if (node.isMissingNode()) {
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java
index 1b016ac..5126ee6 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java
@@ -29,13 +29,19 @@ public final class HttpFunctionSpec implements FunctionSpec {
private final URI endpoint;
private final List<String> states;
private final Duration maxRequestDuration;
+ private final int maxBatchSize;
public HttpFunctionSpec(
- FunctionType functionType, URI endpoint, List<String> states, Duration maxRequestDuration) {
+ FunctionType functionType,
+ URI endpoint,
+ List<String> states,
+ Duration maxRequestDuration,
+ int maxBatchSize) {
this.functionType = Objects.requireNonNull(functionType);
this.endpoint = Objects.requireNonNull(endpoint);
this.states = Objects.requireNonNull(states);
this.maxRequestDuration = Objects.requireNonNull(maxRequestDuration);
+ this.maxBatchSize = maxBatchSize;
}
@Override
@@ -59,4 +65,8 @@ public final class HttpFunctionSpec implements FunctionSpec {
public Duration maxRequestDuration() {
return maxRequestDuration;
}
+
+ public int maxBatchSize() {
+ return maxBatchSize;
+ }
}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
index dd43367..442b974 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
@@ -62,6 +62,7 @@ import org.apache.flink.util.TimeUtils;
final class JsonModule implements StatefulFunctionModule {
private static final Duration DEFAULT_HTTP_TIMEOUT = Duration.ofMinutes(1);
+ private static final Integer DEFAULT_MAX_HTTP_BATCH_SIZE = 1000;
private final JsonNode spec;
private final URL moduleUrl;
@@ -212,7 +213,8 @@ final class JsonModule implements StatefulFunctionModule {
functionType,
functionUri(functionNode),
functionStates(functionNode),
- maxRequestDuration(functionNode));
+ maxRequestDuration(functionNode),
+ maxBatchSize(functionNode));
case GRPC:
return new GrpcFunctionSpec(functionType, functionAddress(functionNode));
default:
@@ -220,6 +222,11 @@ final class JsonModule implements StatefulFunctionModule {
}
}
+ private static int maxBatchSize(JsonNode functionNode) {
+ return Selectors.optionalIntegerAt(functionNode, Functions.FUNCTION_MAX_HTTP_BATCH_SIZE)
+ .orElse(DEFAULT_MAX_HTTP_BATCH_SIZE);
+ }
+
private static Duration maxRequestDuration(JsonNode functionNode) {
return Selectors.optionalTextAt(functionNode, Functions.FUNCTION_TIMEOUT)
.map(TimeUtils::parseDuration)
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/Pointers.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/Pointers.java
index 6d01344..60fc6be 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/Pointers.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/Pointers.java
@@ -48,6 +48,8 @@ public final class Pointers {
public static final JsonPointer FUNCTION_STATES = JsonPointer.compile("/function/spec/states");
public static final JsonPointer FUNCTION_TIMEOUT =
JsonPointer.compile("/function/spec/timeout");
+ public static final JsonPointer FUNCTION_MAX_HTTP_BATCH_SIZE =
+ JsonPointer.compile("/function/spec/maxBatchSize");
}
// -------------------------------------------------------------------------------------