You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/05/21 06:27:55 UTC
[flink-statefun] 01/03: [FLINK-17611] [core] Support UNIX domain
sockets for remote function endpoints
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 95e8757cb706d703ddde073b49dec7325a679400
Author: slinkydeveloper <fr...@gmail.com>
AuthorDate: Thu May 14 14:58:51 2020 +0200
[FLINK-17611] [core] Support UNIX domain sockets for remote function endpoints
---
pom.xml | 6 +++
.../statefun-e2e-tests-common/pom.xml | 10 ++++
statefun-flink/statefun-flink-core/pom.xml | 11 ++--
.../flink/core/httpfn/HttpFunctionProvider.java | 62 ++++++++++++++++++++--
.../flink/core/httpfn/HttpFunctionSpec.java | 5 ++
.../statefun/flink/core/jsonmodule/JsonModule.java | 7 ++-
.../core/httpfn/HttpFunctionProviderTest.java | 35 ++++++++++++
.../flink/core/jsonmodule/JsonModuleTest.java | 4 +-
.../src/test/resources/bar-module/module.yaml | 9 ++++
.../src/main/resources/META-INF/NOTICE | 2 +
10 files changed, 142 insertions(+), 9 deletions(-)
diff --git a/pom.xml b/pom.xml
index 93483b4..6f40a25 100644
--- a/pom.xml
+++ b/pom.xml
@@ -74,6 +74,7 @@ under the License.
<spotless-maven-plugin.version>1.20.0</spotless-maven-plugin.version>
<auto-service.version>1.0-rc6</auto-service.version>
<protobuf.version>3.7.1</protobuf.version>
+ <unixsocket.version>2.3.2</unixsocket.version>
<protoc-jar-maven-plugin.version>3.11.1</protoc-jar-maven-plugin.version>
<flink.version>1.10.1</flink.version>
</properties>
@@ -170,6 +171,11 @@ under the License.
</configuration>
</execution>
</executions>
+ <configuration>
+ <includeDirectories>
+ <include>/usr/include/</include>
+ </includeDirectories>
+ </configuration>
</plugin>
</plugins>
</pluginManagement>
diff --git a/statefun-e2e-tests/statefun-e2e-tests-common/pom.xml b/statefun-e2e-tests/statefun-e2e-tests-common/pom.xml
index 9ce09ec..3989414 100644
--- a/statefun-e2e-tests/statefun-e2e-tests-common/pom.xml
+++ b/statefun-e2e-tests/statefun-e2e-tests-common/pom.xml
@@ -63,9 +63,19 @@ under the License.
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>com.kohlschutter.junixsocket</groupId>
+ <artifactId>junixsocket-common</artifactId>
+ </exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>com.kohlschutter.junixsocket</groupId>
+ <artifactId>junixsocket-common</artifactId>
+ <version>${unixsocket.version}</version>
+ </dependency>
+
<!-- Flink Config -->
<dependency>
<groupId>org.apache.flink</groupId>
diff --git a/statefun-flink/statefun-flink-core/pom.xml b/statefun-flink/statefun-flink-core/pom.xml
index ff7ffb0..e99077e 100644
--- a/statefun-flink/statefun-flink-core/pom.xml
+++ b/statefun-flink/statefun-flink-core/pom.xml
@@ -19,7 +19,7 @@ under the License.
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
-
+
<parent>
<groupId>org.apache.flink</groupId>
<artifactId>statefun-flink</artifactId>
@@ -28,11 +28,11 @@ under the License.
</parent>
<artifactId>statefun-flink-core</artifactId>
-
+
<properties>
<okhttp.version>3.14.6</okhttp.version>
</properties>
-
+
<dependencies>
<!-- sdk -->
@@ -68,6 +68,11 @@ under the License.
<artifactId>okhttp</artifactId>
<version>${okhttp.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.kohlschutter.junixsocket</groupId>
+ <artifactId>junixsocket-core</artifactId>
+ <version>${unixsocket.version}</version>
+ </dependency>
<!-- tests -->
<dependency>
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
index 24a9114..e0c9cb5 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
@@ -18,13 +18,22 @@
package org.apache.flink.statefun.flink.core.httpfn;
+import java.net.URI;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.AbstractMap;
+import java.util.Collections;
import java.util.Map;
+import java.util.stream.IntStream;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
+import okhttp3.Protocol;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.statefun.flink.core.reqreply.RequestReplyClient;
import org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
+import org.newsclub.net.unix.AFUNIXSocketFactory;
public class HttpFunctionProvider implements StatefulFunctionProvider {
private final Map<FunctionType, HttpFunctionSpec> supportedTypes;
@@ -41,12 +50,59 @@ public class HttpFunctionProvider implements StatefulFunctionProvider {
if (spec == null) {
throw new IllegalArgumentException("Unsupported type " + type);
}
+ return new RequestReplyFunction(
+ spec.states(), spec.maxNumBatchRequests(), buildHttpClient(spec));
+ }
+
+ private RequestReplyClient buildHttpClient(HttpFunctionSpec spec) {
+ // We need to build a UDS HTTP client
+ if (spec.isUnixDomainSocket()) {
+
+ // We need to split the path in order to get the sock file and the path after the sock file
+ Map.Entry<String, String> splittedFilePathAndEndpoint =
+ splitFilePathAndEndpointForUDS(spec.endpoint());
+
+ OkHttpClient specificClient =
+ sharedClient
+ .newBuilder()
+ .socketFactory(
+ new AFUNIXSocketFactory.FactoryArg(splittedFilePathAndEndpoint.getKey()))
+ // Enable HTTP/2 if available (uses H2 upgrade),
+ // otherwise fallback to HTTP/1.1
+ .protocols(Collections.singletonList(Protocol.HTTP_2))
+ .callTimeout(spec.maxRequestDuration())
+ .build();
+
+ return new HttpRequestReplyClient(
+ // Only the path matters!
+ HttpUrl.get(URI.create(splittedFilePathAndEndpoint.getValue())), specificClient);
+ }
// specific client reuses the same the connection pool and thread pool
// as the sharedClient.
OkHttpClient specificClient =
sharedClient.newBuilder().callTimeout(spec.maxRequestDuration()).build();
- RequestReplyClient httpClient =
- new HttpRequestReplyClient(HttpUrl.get(spec.endpoint()), specificClient);
- return new RequestReplyFunction(spec.states(), spec.maxNumBatchRequests(), httpClient);
+ return new HttpRequestReplyClient(HttpUrl.get(spec.endpoint()), specificClient);
+ }
+
+ @VisibleForTesting
+ static Map.Entry<String, String> splitFilePathAndEndpointForUDS(URI input) {
+ // We need to split the path in order to get the sock file and the path after the sock file
+ Path path = Paths.get(input.getPath());
+
+ int sockPath =
+ IntStream.rangeClosed(0, path.getNameCount() - 1)
+ .filter(i -> path.getName(i).toString().endsWith(".sock"))
+ .findFirst()
+ .orElseThrow(
+ () ->
+ new IllegalStateException(
+ "Unix Domain Socket path should contain a .sock file"));
+
+ String filePath = "/" + path.subpath(0, sockPath + 1).toString();
+ String endpoint = "/";
+ if (sockPath != path.getNameCount() - 1) {
+ endpoint = "/" + path.subpath(sockPath + 1, path.getNameCount()).toString();
+ }
+ return new AbstractMap.SimpleImmutableEntry<>(filePath, endpoint);
}
}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java
index 0f38760..61945f4 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java
@@ -58,6 +58,11 @@ public final class HttpFunctionSpec implements FunctionSpec {
return endpoint;
}
+ public boolean isUnixDomainSocket() {
+ String scheme = endpoint.getScheme();
+ return "http+unix".equalsIgnoreCase(scheme) || "https+unix".equalsIgnoreCase(scheme);
+ }
+
public List<String> states() {
return states;
}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
index 0d167c1..bd96211 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
@@ -302,13 +302,16 @@ final class JsonModule implements StatefulFunctionModule {
+ uri
+ "; an http or https scheme must be provided.");
}
- if (scheme.equalsIgnoreCase("http") || scheme.equalsIgnoreCase("https")) {
+ if (scheme.equalsIgnoreCase("http")
+ || scheme.equalsIgnoreCase("https")
+ || scheme.equalsIgnoreCase("http+unix")
+ || scheme.equalsIgnoreCase("https+unix")) {
return typedUri;
}
throw new IllegalArgumentException(
"Missing scheme in function endpoint "
+ uri
- + "; an http or https scheme must be provided.");
+ + "; an http or https or http+unix or https+unix scheme must be provided.");
}
private static Collector<FunctionSpec, ?, Map<FunctionType, FunctionSpec>> groupByFunctionType() {
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProviderTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProviderTest.java
new file mode 100644
index 0000000..21bbccc
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProviderTest.java
@@ -0,0 +1,35 @@
+package org.apache.flink.statefun.flink.core.httpfn;
+
+import static org.junit.Assert.assertEquals;
+
+import java.net.URI;
+import java.util.Map;
+import org.junit.Test;
+
+public class HttpFunctionProviderTest {
+
+ @Test
+ public void splitOnlyWithFile() {
+ Map.Entry<String, String> out =
+ HttpFunctionProvider.splitFilePathAndEndpointForUDS(
+ URI.create("http+unix:///some/path.sock"));
+
+ assertEquals("/some/path.sock", out.getKey());
+ assertEquals("/", out.getValue());
+ }
+
+ @Test
+ public void splitOnlyWithFileAndEndpoint() {
+ Map.Entry<String, String> out =
+ HttpFunctionProvider.splitFilePathAndEndpointForUDS(
+ URI.create("http+unix:///some/path.sock/hello"));
+
+ assertEquals("/some/path.sock", out.getKey());
+ assertEquals("/hello", out.getValue());
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void missingSockFile() {
+ HttpFunctionProvider.splitFilePathAndEndpointForUDS(URI.create("http+unix:///some/path/hello"));
+ }
+}
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleTest.java
index a261ea4..f1f3aa1 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleTest.java
@@ -55,7 +55,8 @@ public class JsonModuleTest {
universe.functions(),
allOf(
hasKey(new FunctionType("com.example", "hello")),
- hasKey(new FunctionType("com.foo", "world"))));
+ hasKey(new FunctionType("com.foo", "world")),
+ hasKey(new FunctionType("com.bar", "world"))));
}
@Test
@@ -95,6 +96,7 @@ public class JsonModuleTest {
private static StatefulFunctionModule fromPath(String path) {
URL moduleUrl = JsonModuleTest.class.getClassLoader().getResource(path);
+ assertThat(moduleUrl, not(nullValue()));
ObjectMapper mapper = JsonServiceLoader.mapper();
final JsonNode json;
try {
diff --git a/statefun-flink/statefun-flink-core/src/test/resources/bar-module/module.yaml b/statefun-flink/statefun-flink-core/src/test/resources/bar-module/module.yaml
index 61aa52f..0afafc6 100644
--- a/statefun-flink/statefun-flink-core/src/test/resources/bar-module/module.yaml
+++ b/statefun-flink/statefun-flink-core/src/test/resources/bar-module/module.yaml
@@ -36,6 +36,15 @@ module:
states:
- seen_count
maxNumBatchRequests: 10000
+ - function:
+ meta:
+ kind: http
+ type: com.bar/world
+ spec:
+ endpoint: http+unix:///hello/world.sock/statefun
+ states:
+ - seen_count
+ maxNumBatchRequests: 10000
routers:
- router:
meta:
diff --git a/statefun-flink/statefun-flink-distribution/src/main/resources/META-INF/NOTICE b/statefun-flink/statefun-flink-distribution/src/main/resources/META-INF/NOTICE
index 79b470d..1cd43cc 100644
--- a/statefun-flink/statefun-flink-distribution/src/main/resources/META-INF/NOTICE
+++ b/statefun-flink/statefun-flink-distribution/src/main/resources/META-INF/NOTICE
@@ -6,6 +6,8 @@ The Apache Software Foundation (http://www.apache.org/).
This project bundles the following dependencies under the Apache Software License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt)
+- com.kohlschutter.junixsocket:junixsocket-core:2.3.2
+- com.kohlschutter.junixsocket:junixsocket-common:2.3.2
- commons-codec:commons-codec:1.10
- commons-lang:commons-lang:2.6
- commons-logging:commons-logging:1.1.3