You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/09/01 10:04:44 UTC

[flink-statefun] 03/07: [FLINK-19106] [remote] Add new keys for timeouts to module.yaml

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 478365229d5dcea883ea73da229b43e4d0cbc403
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Tue Sep 1 13:50:07 2020 +0800

    [FLINK-19106] [remote] Add new keys for timeouts to module.yaml
---
 docs/sdk/index.md                                  | 10 ++++++++++
 .../flink/core/jsonmodule/FunctionJsonEntity.java  | 23 ++++++++++++++++++----
 .../src/test/resources/module-v2_0/module.yaml     |  4 ++++
 3 files changed, 33 insertions(+), 4 deletions(-)

diff --git a/docs/sdk/index.md b/docs/sdk/index.md
index 42beaa3..5883dc9 100644
--- a/docs/sdk/index.md
+++ b/docs/sdk/index.md
@@ -71,7 +71,17 @@ A ``function`` is described via a number of properties:
     * Default: 1000
 * ``function.spec.timeout``
     * The maximum amount of time for the runtime to wait for the remote function to return before failing.
+      This spans the complete call, including connecting to the function endpoint, writing the request, function processing, and reading the response.
     * Default: 1 min
+* ``function.spec.connectTimeout``
+    * The maximum amount of time for the runtime to wait for connecting to the remote function endpoint.
+    * Default: 10 sec
+* ``function.spec.readTimeout``
+    * The maximum amount of time for the runtime to wait for individual read IO operations, such as reading the invocation response.
+    * Default: 10 sec
+* ``function.spec.writeTimeout``
+    * The maximum amount of time for the runtime to wait for individual write IO operations, such as writing the invocation request.
+    * Default: 10 sec
 
 ### Full Example
 
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java
index 94e4fad..2b9c866 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java
@@ -67,7 +67,15 @@ final class FunctionJsonEntity implements JsonEntity {
     private static final JsonPointer ENDPOINT = JsonPointer.compile("/function/spec/endpoint");
     private static final JsonPointer PORT = JsonPointer.compile("/function/spec/port");
     private static final JsonPointer STATES = JsonPointer.compile("/function/spec/states");
+
     private static final JsonPointer TIMEOUT = JsonPointer.compile("/function/spec/timeout");
+    private static final JsonPointer CONNECT_TIMEOUT =
+        JsonPointer.compile("/function/spec/connectTimeout");
+    private static final JsonPointer READ_TIMEOUT =
+        JsonPointer.compile("/function/spec/readTimeout");
+    private static final JsonPointer WRITE_TIMEOUT =
+        JsonPointer.compile("/function/spec/writeTimeout");
+
     private static final JsonPointer MAX_NUM_BATCH_REQUESTS =
         JsonPointer.compile("/function/spec/maxNumBatchRequests");
   }
@@ -120,7 +128,14 @@ final class FunctionJsonEntity implements JsonEntity {
           specBuilder.withState(state);
         }
         optionalMaxNumBatchRequests(functionNode).ifPresent(specBuilder::withMaxNumBatchRequests);
-        optionalMaxRequestDuration(functionNode).ifPresent(specBuilder::withMaxRequestDuration);
+        optionalTimeoutDuration(functionNode, SpecPointers.TIMEOUT)
+            .ifPresent(specBuilder::withMaxRequestDuration);
+        optionalTimeoutDuration(functionNode, SpecPointers.CONNECT_TIMEOUT)
+            .ifPresent(specBuilder::withConnectTimeoutDuration);
+        optionalTimeoutDuration(functionNode, SpecPointers.READ_TIMEOUT)
+            .ifPresent(specBuilder::withReadTimeoutDuration);
+        optionalTimeoutDuration(functionNode, SpecPointers.WRITE_TIMEOUT)
+            .ifPresent(specBuilder::withWriteTimeoutDuration);
 
         return specBuilder.build();
       case GRPC:
@@ -165,9 +180,9 @@ final class FunctionJsonEntity implements JsonEntity {
     return Selectors.optionalIntegerAt(functionNode, SpecPointers.MAX_NUM_BATCH_REQUESTS);
   }
 
-  private static Optional<Duration> optionalMaxRequestDuration(JsonNode functionNode) {
-    return Selectors.optionalTextAt(functionNode, SpecPointers.TIMEOUT)
-        .map(TimeUtils::parseDuration);
+  private static Optional<Duration> optionalTimeoutDuration(
+      JsonNode functionNode, JsonPointer timeoutPointer) {
+    return Selectors.optionalTextAt(functionNode, timeoutPointer).map(TimeUtils::parseDuration);
   }
 
   private static Expiration stateTtlExpiration(JsonNode stateSpecNode) {
diff --git a/statefun-flink/statefun-flink-core/src/test/resources/module-v2_0/module.yaml b/statefun-flink/statefun-flink-core/src/test/resources/module-v2_0/module.yaml
index ee903e1..f93b30e 100644
--- a/statefun-flink/statefun-flink-core/src/test/resources/module-v2_0/module.yaml
+++ b/statefun-flink/statefun-flink-core/src/test/resources/module-v2_0/module.yaml
@@ -37,6 +37,10 @@ module:
               - name: seen_count
                 expireAfter: 60000millisecond
                 expireMode: after-invoke
+            timeout: 1minutes
+            connectTimeout: 10seconds
+            readTimeout: 10second
+            writeTimeout: 10seconds
             maxNumBatchRequests: 10000
       - function:
           meta: