You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/15 09:07:47 UTC

[GitHub] [flink-statefun] igalshilman commented on a change in pull request #110: [FLINK-17611] Unix domain socket connection between worker and function

igalshilman commented on a change in pull request #110:
URL: https://github.com/apache/flink-statefun/pull/110#discussion_r425185702



##########
File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java
##########
@@ -27,18 +27,21 @@
 public final class HttpFunctionSpec implements FunctionSpec {
   private final FunctionType functionType;
   private final URI endpoint;
+  private final String unixDomainSocket;

Review comment:
       This should be marked `@Nullable`

##########
File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java
##########
@@ -58,6 +61,10 @@ public URI endpoint() {
     return endpoint;
   }
 
+  public String unixDomainSocket() {

Review comment:
       This should be `Optional<String> unixDomainSocket()`

##########
File path: statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleTest.java
##########
@@ -95,6 +96,7 @@ public void testEgresses() {
 
   private static StatefulFunctionModule fromPath(String path) {
     URL moduleUrl = JsonModuleTest.class.getClassLoader().getResource(path);
+    assertThat(moduleUrl, not(nullValue()));

Review comment:
       This is an independent change, can you separate that to a different commit?

##########
File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
##########
@@ -41,12 +45,32 @@ public RequestReplyFunction functionOfType(FunctionType type) {
     if (spec == null) {
       throw new IllegalArgumentException("Unsupported type " + type);
     }
-    // specific client reuses the same the connection pool and thread pool
-    // as the sharedClient.
-    OkHttpClient specificClient =
-        sharedClient.newBuilder().callTimeout(spec.maxRequestDuration()).build();
-    RequestReplyClient httpClient =
-        new HttpRequestReplyClient(HttpUrl.get(spec.endpoint()), specificClient);
-    return new RequestReplyFunction(spec.states(), spec.maxNumBatchRequests(), httpClient);
+    return new RequestReplyFunction(
+        spec.states(), spec.maxNumBatchRequests(), buildHttpClient(spec));
+  }
+
+  private RequestReplyClient buildHttpClient(HttpFunctionSpec spec) {
+    // We need to build a UDS HTTP client
+    if (spec.unixDomainSocket() != null) {
+      OkHttpClient specificClient =
+          sharedClient
+              .newBuilder()
+              .socketFactory(new AFUNIXSocketFactory.FactoryArg(spec.unixDomainSocket()))
+              // Enable HTTP/2 if available (uses H2 upgrade),
+              // otherwise fallback to HTTP/1.1
+              .protocols(Collections.singletonList(Protocol.HTTP_2))
+              .callTimeout(spec.maxRequestDuration())
+              .build();
+
+      return new HttpRequestReplyClient(
+          // Only the path matters!
+          HttpUrl.get(URI.create(spec.endpoint().getPath())), specificClient);
+    } else {

Review comment:
       style: you can skip the `else {` since it is followed by a return.

##########
File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
##########
@@ -41,12 +45,32 @@ public RequestReplyFunction functionOfType(FunctionType type) {
     if (spec == null) {
       throw new IllegalArgumentException("Unsupported type " + type);
     }
-    // specific client reuses the same the connection pool and thread pool
-    // as the sharedClient.
-    OkHttpClient specificClient =
-        sharedClient.newBuilder().callTimeout(spec.maxRequestDuration()).build();
-    RequestReplyClient httpClient =
-        new HttpRequestReplyClient(HttpUrl.get(spec.endpoint()), specificClient);
-    return new RequestReplyFunction(spec.states(), spec.maxNumBatchRequests(), httpClient);
+    return new RequestReplyFunction(
+        spec.states(), spec.maxNumBatchRequests(), buildHttpClient(spec));
+  }
+
+  private RequestReplyClient buildHttpClient(HttpFunctionSpec spec) {
+    // We need to build a UDS HTTP client
+    if (spec.unixDomainSocket() != null) {
+      OkHttpClient specificClient =
+          sharedClient
+              .newBuilder()
+              .socketFactory(new AFUNIXSocketFactory.FactoryArg(spec.unixDomainSocket()))
+              // Enable HTTP/2 if available (uses H2 upgrade),
+              // otherwise fallback to HTTP/1.1
+              .protocols(Collections.singletonList(Protocol.HTTP_2))

Review comment:
       I think that this should be added to the `sharedClient`

##########
File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
##########
@@ -41,12 +48,51 @@ public RequestReplyFunction functionOfType(FunctionType type) {
     if (spec == null) {
       throw new IllegalArgumentException("Unsupported type " + type);
     }
-    // specific client reuses the same the connection pool and thread pool
-    // as the sharedClient.
-    OkHttpClient specificClient =
-        sharedClient.newBuilder().callTimeout(spec.maxRequestDuration()).build();
-    RequestReplyClient httpClient =
-        new HttpRequestReplyClient(HttpUrl.get(spec.endpoint()), specificClient);
-    return new RequestReplyFunction(spec.states(), spec.maxNumBatchRequests(), httpClient);
+    return new RequestReplyFunction(
+        spec.states(), spec.maxNumBatchRequests(), buildHttpClient(spec));
+  }
+
+  private RequestReplyClient buildHttpClient(HttpFunctionSpec spec) {
+    // We need to build a UDS HTTP client
+    if (spec.isUnixDomainSocket()) {
+
+      // We need to split the path in order to get the sock file and the path after the sock file
+      Path path = Paths.get(spec.endpoint().getPath());
+
+      int sockPath =

Review comment:
       Can you move the `.sock` / endpoint seperation to a separate method,
   and add a unit test for that?
   A static method annotated with `@VisibleForTesting`.

##########
File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
##########
@@ -294,21 +295,31 @@ private static InetSocketAddress functionAddress(JsonNode functionNode) {
 
   private static URI functionUri(JsonNode functionNode) {
     String uri = Selectors.textAt(functionNode, Pointers.Functions.FUNCTION_ENDPOINT);
+    boolean hasUdsConfig =
+        Selectors.optionalTextAt(functionNode, Functions.FUNCTION_UDS).isPresent();
     URI typedUri = URI.create(uri);
     @Nullable String scheme = typedUri.getScheme();
-    if (scheme == null) {
+    if (!hasUdsConfig) {
+      if (scheme == null) {
+        throw new IllegalArgumentException(
+            "Missing scheme in function endpoint "
+                + uri
+                + "; an http or https scheme must be provided.");
+      }
+      if (scheme.equalsIgnoreCase("http") || scheme.equalsIgnoreCase("https")) {
+        return typedUri;
+      }
       throw new IllegalArgumentException(
           "Missing scheme in function endpoint "
               + uri
               + "; an http or https scheme must be provided.");
-    }
-    if (scheme.equalsIgnoreCase("http") || scheme.equalsIgnoreCase("https")) {
+    } else {
+      if (typedUri.getScheme() != null || typedUri.getAuthority() != null) {

Review comment:
       👍 nice validation




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org