You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/05/21 06:27:55 UTC

[flink-statefun] 01/03: [FLINK-17611] [core] Support UNIX domain sockets for remote function endpoints

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 95e8757cb706d703ddde073b49dec7325a679400
Author: slinkydeveloper <fr...@gmail.com>
AuthorDate: Thu May 14 14:58:51 2020 +0200

    [FLINK-17611] [core] Support UNIX domain sockets for remote function endpoints
---
 pom.xml                                            |  6 +++
 .../statefun-e2e-tests-common/pom.xml              | 10 ++++
 statefun-flink/statefun-flink-core/pom.xml         | 11 ++--
 .../flink/core/httpfn/HttpFunctionProvider.java    | 62 ++++++++++++++++++++--
 .../flink/core/httpfn/HttpFunctionSpec.java        |  5 ++
 .../statefun/flink/core/jsonmodule/JsonModule.java |  7 ++-
 .../core/httpfn/HttpFunctionProviderTest.java      | 35 ++++++++++++
 .../flink/core/jsonmodule/JsonModuleTest.java      |  4 +-
 .../src/test/resources/bar-module/module.yaml      |  9 ++++
 .../src/main/resources/META-INF/NOTICE             |  2 +
 10 files changed, 142 insertions(+), 9 deletions(-)

diff --git a/pom.xml b/pom.xml
index 93483b4..6f40a25 100644
--- a/pom.xml
+++ b/pom.xml
@@ -74,6 +74,7 @@ under the License.
         <spotless-maven-plugin.version>1.20.0</spotless-maven-plugin.version>
         <auto-service.version>1.0-rc6</auto-service.version>
         <protobuf.version>3.7.1</protobuf.version>
+        <unixsocket.version>2.3.2</unixsocket.version>
         <protoc-jar-maven-plugin.version>3.11.1</protoc-jar-maven-plugin.version>
         <flink.version>1.10.1</flink.version>
     </properties>
@@ -170,6 +171,11 @@ under the License.
                             </configuration>
                         </execution>
                     </executions>
+                    <configuration>
+                        <includeDirectories>
+                            <include>/usr/include/</include>
+                        </includeDirectories>
+                    </configuration>
                 </plugin>
             </plugins>
         </pluginManagement>
diff --git a/statefun-e2e-tests/statefun-e2e-tests-common/pom.xml b/statefun-e2e-tests/statefun-e2e-tests-common/pom.xml
index 9ce09ec..3989414 100644
--- a/statefun-e2e-tests/statefun-e2e-tests-common/pom.xml
+++ b/statefun-e2e-tests/statefun-e2e-tests-common/pom.xml
@@ -63,9 +63,19 @@ under the License.
                     <groupId>org.apache.commons</groupId>
                     <artifactId>commons-compress</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>com.kohlschutter.junixsocket</groupId>
+                    <artifactId>junixsocket-common</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
 
+        <dependency>
+            <groupId>com.kohlschutter.junixsocket</groupId>
+            <artifactId>junixsocket-common</artifactId>
+            <version>${unixsocket.version}</version>
+        </dependency>
+
         <!-- Flink Config -->
         <dependency>
             <groupId>org.apache.flink</groupId>
diff --git a/statefun-flink/statefun-flink-core/pom.xml b/statefun-flink/statefun-flink-core/pom.xml
index ff7ffb0..e99077e 100644
--- a/statefun-flink/statefun-flink-core/pom.xml
+++ b/statefun-flink/statefun-flink-core/pom.xml
@@ -19,7 +19,7 @@ under the License.
          xmlns="http://maven.apache.org/POM/4.0.0"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
-   
+
     <parent>
         <groupId>org.apache.flink</groupId>
         <artifactId>statefun-flink</artifactId>
@@ -28,11 +28,11 @@ under the License.
     </parent>
 
     <artifactId>statefun-flink-core</artifactId>
-    
+
     <properties>
         <okhttp.version>3.14.6</okhttp.version>
     </properties>
-    
+
     <dependencies>
 
         <!-- sdk -->
@@ -68,6 +68,11 @@ under the License.
             <artifactId>okhttp</artifactId>
             <version>${okhttp.version}</version>
         </dependency>
+        <dependency>
+            <groupId>com.kohlschutter.junixsocket</groupId>
+            <artifactId>junixsocket-core</artifactId>
+            <version>${unixsocket.version}</version>
+        </dependency>
 
         <!-- tests -->
         <dependency>
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
index 24a9114..e0c9cb5 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
@@ -18,13 +18,22 @@
 
 package org.apache.flink.statefun.flink.core.httpfn;
 
+import java.net.URI;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.AbstractMap;
+import java.util.Collections;
 import java.util.Map;
+import java.util.stream.IntStream;
 import okhttp3.HttpUrl;
 import okhttp3.OkHttpClient;
+import okhttp3.Protocol;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.statefun.flink.core.reqreply.RequestReplyClient;
 import org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction;
 import org.apache.flink.statefun.sdk.FunctionType;
 import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
+import org.newsclub.net.unix.AFUNIXSocketFactory;
 
 public class HttpFunctionProvider implements StatefulFunctionProvider {
   private final Map<FunctionType, HttpFunctionSpec> supportedTypes;
@@ -41,12 +50,59 @@ public class HttpFunctionProvider implements StatefulFunctionProvider {
     if (spec == null) {
       throw new IllegalArgumentException("Unsupported type " + type);
     }
+    return new RequestReplyFunction(
+        spec.states(), spec.maxNumBatchRequests(), buildHttpClient(spec));
+  }
+
+  private RequestReplyClient buildHttpClient(HttpFunctionSpec spec) {
+    // We need to build a UDS HTTP client
+    if (spec.isUnixDomainSocket()) {
+
+      // We need to split the path in order to get the sock file and the path after the sock file
+      Map.Entry<String, String> splittedFilePathAndEndpoint =
+          splitFilePathAndEndpointForUDS(spec.endpoint());
+
+      OkHttpClient specificClient =
+          sharedClient
+              .newBuilder()
+              .socketFactory(
+                  new AFUNIXSocketFactory.FactoryArg(splittedFilePathAndEndpoint.getKey()))
+              // Enable HTTP/2 if available (uses H2 upgrade),
+              // otherwise fallback to HTTP/1.1
+              .protocols(Collections.singletonList(Protocol.HTTP_2))
+              .callTimeout(spec.maxRequestDuration())
+              .build();
+
+      return new HttpRequestReplyClient(
+          // Only the path matters!
+          HttpUrl.get(URI.create(splittedFilePathAndEndpoint.getValue())), specificClient);
+    }
     // specific client reuses the same the connection pool and thread pool
     // as the sharedClient.
     OkHttpClient specificClient =
         sharedClient.newBuilder().callTimeout(spec.maxRequestDuration()).build();
-    RequestReplyClient httpClient =
-        new HttpRequestReplyClient(HttpUrl.get(spec.endpoint()), specificClient);
-    return new RequestReplyFunction(spec.states(), spec.maxNumBatchRequests(), httpClient);
+    return new HttpRequestReplyClient(HttpUrl.get(spec.endpoint()), specificClient);
+  }
+
+  @VisibleForTesting
+  static Map.Entry<String, String> splitFilePathAndEndpointForUDS(URI input) {
+    // We need to split the path in order to get the sock file and the path after the sock file
+    Path path = Paths.get(input.getPath());
+
+    int sockPath =
+        IntStream.rangeClosed(0, path.getNameCount() - 1)
+            .filter(i -> path.getName(i).toString().endsWith(".sock"))
+            .findFirst()
+            .orElseThrow(
+                () ->
+                    new IllegalStateException(
+                        "Unix Domain Socket path should contain a .sock file"));
+
+    String filePath = "/" + path.subpath(0, sockPath + 1).toString();
+    String endpoint = "/";
+    if (sockPath != path.getNameCount() - 1) {
+      endpoint = "/" + path.subpath(sockPath + 1, path.getNameCount()).toString();
+    }
+    return new AbstractMap.SimpleImmutableEntry<>(filePath, endpoint);
   }
 }
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java
index 0f38760..61945f4 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java
@@ -58,6 +58,11 @@ public final class HttpFunctionSpec implements FunctionSpec {
     return endpoint;
   }
 
+  public boolean isUnixDomainSocket() {
+    String scheme = endpoint.getScheme();
+    return "http+unix".equalsIgnoreCase(scheme) || "https+unix".equalsIgnoreCase(scheme);
+  }
+
   public List<String> states() {
     return states;
   }
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
index 0d167c1..bd96211 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
@@ -302,13 +302,16 @@ final class JsonModule implements StatefulFunctionModule {
               + uri
               + "; an http or https scheme must be provided.");
     }
-    if (scheme.equalsIgnoreCase("http") || scheme.equalsIgnoreCase("https")) {
+    if (scheme.equalsIgnoreCase("http")
+        || scheme.equalsIgnoreCase("https")
+        || scheme.equalsIgnoreCase("http+unix")
+        || scheme.equalsIgnoreCase("https+unix")) {
       return typedUri;
     }
     throw new IllegalArgumentException(
         "Missing scheme in function endpoint "
             + uri
-            + "; an http or https scheme must be provided.");
+            + "; an http or https or http+unix or https+unix scheme must be provided.");
   }
 
   private static Collector<FunctionSpec, ?, Map<FunctionType, FunctionSpec>> groupByFunctionType() {
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProviderTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProviderTest.java
new file mode 100644
index 0000000..21bbccc
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProviderTest.java
@@ -0,0 +1,35 @@
+package org.apache.flink.statefun.flink.core.httpfn;
+
+import static org.junit.Assert.assertEquals;
+
+import java.net.URI;
+import java.util.Map;
+import org.junit.Test;
+
+public class HttpFunctionProviderTest {
+
+  @Test
+  public void splitOnlyWithFile() {
+    Map.Entry<String, String> out =
+        HttpFunctionProvider.splitFilePathAndEndpointForUDS(
+            URI.create("http+unix:///some/path.sock"));
+
+    assertEquals("/some/path.sock", out.getKey());
+    assertEquals("/", out.getValue());
+  }
+
+  @Test
+  public void splitOnlyWithFileAndEndpoint() {
+    Map.Entry<String, String> out =
+        HttpFunctionProvider.splitFilePathAndEndpointForUDS(
+            URI.create("http+unix:///some/path.sock/hello"));
+
+    assertEquals("/some/path.sock", out.getKey());
+    assertEquals("/hello", out.getValue());
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void missingSockFile() {
+    HttpFunctionProvider.splitFilePathAndEndpointForUDS(URI.create("http+unix:///some/path/hello"));
+  }
+}
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleTest.java
index a261ea4..f1f3aa1 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleTest.java
@@ -55,7 +55,8 @@ public class JsonModuleTest {
         universe.functions(),
         allOf(
             hasKey(new FunctionType("com.example", "hello")),
-            hasKey(new FunctionType("com.foo", "world"))));
+            hasKey(new FunctionType("com.foo", "world")),
+            hasKey(new FunctionType("com.bar", "world"))));
   }
 
   @Test
@@ -95,6 +96,7 @@ public class JsonModuleTest {
 
   private static StatefulFunctionModule fromPath(String path) {
     URL moduleUrl = JsonModuleTest.class.getClassLoader().getResource(path);
+    assertThat(moduleUrl, not(nullValue()));
     ObjectMapper mapper = JsonServiceLoader.mapper();
     final JsonNode json;
     try {
diff --git a/statefun-flink/statefun-flink-core/src/test/resources/bar-module/module.yaml b/statefun-flink/statefun-flink-core/src/test/resources/bar-module/module.yaml
index 61aa52f..0afafc6 100644
--- a/statefun-flink/statefun-flink-core/src/test/resources/bar-module/module.yaml
+++ b/statefun-flink/statefun-flink-core/src/test/resources/bar-module/module.yaml
@@ -36,6 +36,15 @@ module:
             states:
               - seen_count
             maxNumBatchRequests: 10000
+      - function:
+          meta:
+            kind: http
+            type: com.bar/world
+          spec:
+            endpoint: http+unix:///hello/world.sock/statefun
+            states:
+              - seen_count
+            maxNumBatchRequests: 10000
     routers:
       - router:
           meta:
diff --git a/statefun-flink/statefun-flink-distribution/src/main/resources/META-INF/NOTICE b/statefun-flink/statefun-flink-distribution/src/main/resources/META-INF/NOTICE
index 79b470d..1cd43cc 100644
--- a/statefun-flink/statefun-flink-distribution/src/main/resources/META-INF/NOTICE
+++ b/statefun-flink/statefun-flink-distribution/src/main/resources/META-INF/NOTICE
@@ -6,6 +6,8 @@ The Apache Software Foundation (http://www.apache.org/).
 
 This project bundles the following dependencies under the Apache Software License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt)
 
+- com.kohlschutter.junixsocket:junixsocket-core:2.3.2
+- com.kohlschutter.junixsocket:junixsocket-common:2.3.2
 - commons-codec:commons-codec:1.10
 - commons-lang:commons-lang:2.6
 - commons-logging:commons-logging:1.1.3