You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/05/21 06:27:56 UTC
[flink-statefun] 02/03: [FLINK-17611] [core] Bridge OkHttp and UNIX
domain sockets
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 9c43dc9c7fe57f69668163502d231a0ed63ecde7
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Tue May 19 20:19:23 2020 +0200
[FLINK-17611] [core] Bridge OkHttp and UNIX domain sockets
---
pom.xml | 5 -
.../flink/core/httpfn/HttpFunctionProvider.java | 69 +++-------
.../flink/core/httpfn/OkHttpUnixSocketBridge.java | 153 +++++++++++++++++++++
.../flink/core/httpfn/UnixDomainHttpEndpoint.java | 60 ++++++++
.../core/httpfn/HttpFunctionProviderTest.java | 35 -----
.../core/httpfn/UnixDomainHttpEndpointTest.java | 32 +++++
6 files changed, 261 insertions(+), 93 deletions(-)
diff --git a/pom.xml b/pom.xml
index 6f40a25..e1f3668 100644
--- a/pom.xml
+++ b/pom.xml
@@ -171,11 +171,6 @@ under the License.
</configuration>
</execution>
</executions>
- <configuration>
- <includeDirectories>
- <include>/usr/include/</include>
- </includeDirectories>
- </configuration>
</plugin>
</plugins>
</pluginManagement>
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
index e0c9cb5..03159b1 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
@@ -18,22 +18,15 @@
package org.apache.flink.statefun.flink.core.httpfn;
-import java.net.URI;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.AbstractMap;
-import java.util.Collections;
+import static org.apache.flink.statefun.flink.core.httpfn.OkHttpUnixSocketBridge.configureUnixDomainSocket;
+
import java.util.Map;
-import java.util.stream.IntStream;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
-import okhttp3.Protocol;
-import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.statefun.flink.core.reqreply.RequestReplyClient;
import org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
-import org.newsclub.net.unix.AFUNIXSocketFactory;
public class HttpFunctionProvider implements StatefulFunctionProvider {
private final Map<FunctionType, HttpFunctionSpec> supportedTypes;
@@ -55,54 +48,24 @@ public class HttpFunctionProvider implements StatefulFunctionProvider {
}
private RequestReplyClient buildHttpClient(HttpFunctionSpec spec) {
- // We need to build a UDS HTTP client
- if (spec.isUnixDomainSocket()) {
+ OkHttpClient.Builder clientBuilder = sharedClient.newBuilder();
+ clientBuilder.callTimeout(spec.maxRequestDuration());
- // We need to split the path in order to get the sock file and the path after the sock file
- Map.Entry<String, String> splittedFilePathAndEndpoint =
- splitFilePathAndEndpointForUDS(spec.endpoint());
+ final HttpUrl url;
+ if (spec.isUnixDomainSocket()) {
+ UnixDomainHttpEndpoint endpoint = UnixDomainHttpEndpoint.parseFrom(spec.endpoint());
- OkHttpClient specificClient =
- sharedClient
- .newBuilder()
- .socketFactory(
- new AFUNIXSocketFactory.FactoryArg(splittedFilePathAndEndpoint.getKey()))
- // Enable HTTP/2 if available (uses H2 upgrade),
- // otherwise fallback to HTTP/1.1
- .protocols(Collections.singletonList(Protocol.HTTP_2))
- .callTimeout(spec.maxRequestDuration())
+ url =
+ new HttpUrl.Builder()
+ .scheme("http")
+ .host("unused")
+ .addPathSegment(endpoint.pathSegment)
.build();
- return new HttpRequestReplyClient(
- // Only the path matters!
- HttpUrl.get(URI.create(splittedFilePathAndEndpoint.getValue())), specificClient);
- }
- // specific client reuses the same the connection pool and thread pool
- // as the sharedClient.
- OkHttpClient specificClient =
- sharedClient.newBuilder().callTimeout(spec.maxRequestDuration()).build();
- return new HttpRequestReplyClient(HttpUrl.get(spec.endpoint()), specificClient);
- }
-
- @VisibleForTesting
- static Map.Entry<String, String> splitFilePathAndEndpointForUDS(URI input) {
- // We need to split the path in order to get the sock file and the path after the sock file
- Path path = Paths.get(input.getPath());
-
- int sockPath =
- IntStream.rangeClosed(0, path.getNameCount() - 1)
- .filter(i -> path.getName(i).toString().endsWith(".sock"))
- .findFirst()
- .orElseThrow(
- () ->
- new IllegalStateException(
- "Unix Domain Socket path should contain a .sock file"));
-
- String filePath = "/" + path.subpath(0, sockPath + 1).toString();
- String endpoint = "/";
- if (sockPath != path.getNameCount() - 1) {
- endpoint = "/" + path.subpath(sockPath + 1, path.getNameCount()).toString();
+ configureUnixDomainSocket(clientBuilder, endpoint.unixDomainFile);
+ } else {
+ url = HttpUrl.get(spec.endpoint());
}
- return new AbstractMap.SimpleImmutableEntry<>(filePath, endpoint);
+ return new HttpRequestReplyClient(url, clientBuilder.build());
}
}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/OkHttpUnixSocketBridge.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/OkHttpUnixSocketBridge.java
new file mode 100644
index 0000000..b658ced
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/OkHttpUnixSocketBridge.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.httpfn;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import javax.net.SocketFactory;
+import okhttp3.Dns;
+import okhttp3.OkHttpClient;
+import org.apache.flink.util.IOUtils;
+import org.newsclub.net.unix.AFUNIXSocket;
+import org.newsclub.net.unix.AFUNIXSocketAddress;
+
+/** The following class holds utilities needed to bridge unix domain sockets and okhttp client. */
+final class OkHttpUnixSocketBridge {
+ private OkHttpUnixSocketBridge() {}
+
+ /** Configures the {@link OkHttpClient} builder to connect over a unix domain socket. */
+ static void configureUnixDomainSocket(OkHttpClient.Builder builder, File unixSocketFile) {
+ builder.socketFactory(new UnixSocketFactory(unixSocketFile)).dns(ConstantDnsLookup.INSTANCE);
+ }
+
+ /** resolve all host names to Ipv4 0.0.0.0 and port 0. */
+ private enum ConstantDnsLookup implements Dns {
+ INSTANCE;
+
+ @SuppressWarnings("NullableProblems")
+ @Override
+ public List<InetAddress> lookup(String hostname) throws UnknownHostException {
+ InetAddress address = InetAddress.getByAddress(hostname, new byte[] {0, 0, 0, 0});
+ return Collections.singletonList(address);
+ }
+ }
+
+ /**
+ * A {@code SocketFactory} that is bound to a specific path, and would return a {@code UnixSocket}
+ * for that path.
+ */
+ private static final class UnixSocketFactory extends SocketFactory {
+ private final File unixSocketFile;
+
+ public UnixSocketFactory(File unixSocketFile) {
+ this.unixSocketFile = Objects.requireNonNull(unixSocketFile);
+ }
+
+ @Override
+ public Socket createSocket() {
+ return new UnixSocket(unixSocketFile);
+ }
+
+ @Override
+ public Socket createSocket(String s, int i) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Socket createSocket(String s, int i, InetAddress inetAddress, int i1) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Socket createSocket(InetAddress inetAddress, int i) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Socket createSocket(InetAddress inetAddress, int i, InetAddress inetAddress1, int i1) {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ /**
+ * A {@code Socket} that is bound to a specific unix socket file, and delegates the relevant
+ * operations to {@link AFUNIXSocket}.
+ */
+ private static final class UnixSocket extends Socket {
+ private final File unixSocketFile;
+ private AFUNIXSocket delegate;
+
+ UnixSocket(File unixSocketFile) {
+ this.unixSocketFile = Objects.requireNonNull(unixSocketFile);
+ }
+
+ @Override
+ public void connect(SocketAddress endpoint, int timeout) throws IOException {
+ delegate = AFUNIXSocket.newInstance();
+ delegate.connect(new AFUNIXSocketAddress(unixSocketFile), timeout);
+ delegate.setSoTimeout(timeout);
+ }
+
+ @Override
+ public void bind(SocketAddress bindpoint) throws IOException {
+ delegate.bind(bindpoint);
+ }
+
+ @Override
+ public boolean isConnected() {
+ return delegate != null && delegate.isConnected();
+ }
+
+ @Override
+ public OutputStream getOutputStream() throws IOException {
+ return delegate.getOutputStream();
+ }
+
+ @Override
+ public InputStream getInputStream() throws IOException {
+ return delegate.getInputStream();
+ }
+
+ @Override
+ public synchronized void close() {
+ IOUtils.closeSocket(delegate);
+ delegate = null;
+ }
+
+ @Override
+ public boolean isClosed() {
+ return delegate.isClosed();
+ }
+
+ @Override
+ public synchronized void setSoTimeout(int timeout) {
+ // noop.
+ // we set the timeout after connecting
+ }
+ }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/UnixDomainHttpEndpoint.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/UnixDomainHttpEndpoint.java
new file mode 100644
index 0000000..c628fdb
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/UnixDomainHttpEndpoint.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.httpfn;
+
+import java.io.File;
+import java.net.URI;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Objects;
+
+/** Represents a Unix domain file path and an http endpoint */
+final class UnixDomainHttpEndpoint {
+
+ /** Parses a URI of the form {@code http+unix://<file system path>.sock/<http endpoint>}. */
+ static UnixDomainHttpEndpoint parseFrom(URI endpoint) {
+ final Path path = Paths.get(endpoint.getPath());
+ final int sockPathIndex = indexOfSockFile(path);
+ final String filePath = "/" + path.subpath(0, sockPathIndex + 1).toString();
+ final File unixDomainFile = new File(filePath);
+
+ if (sockPathIndex == path.getNameCount() - 1) {
+ return new UnixDomainHttpEndpoint(unixDomainFile, "/");
+ }
+ String pathSegment = "/" + path.subpath(sockPathIndex + 1, path.getNameCount()).toString();
+ return new UnixDomainHttpEndpoint(unixDomainFile, pathSegment);
+ }
+
+ private static int indexOfSockFile(Path path) {
+ for (int i = 0; i < path.getNameCount(); i++) {
+ if (path.getName(i).toString().endsWith(".sock")) {
+ return i;
+ }
+ }
+ throw new IllegalStateException("Unix Domain Socket path should contain a .sock file");
+ }
+
+ final File unixDomainFile;
+ final String pathSegment;
+
+ private UnixDomainHttpEndpoint(File unixDomainFile, String endpoint) {
+ this.unixDomainFile = Objects.requireNonNull(unixDomainFile);
+ this.pathSegment = Objects.requireNonNull(endpoint);
+ }
+}
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProviderTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProviderTest.java
deleted file mode 100644
index 21bbccc..0000000
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProviderTest.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package org.apache.flink.statefun.flink.core.httpfn;
-
-import static org.junit.Assert.assertEquals;
-
-import java.net.URI;
-import java.util.Map;
-import org.junit.Test;
-
-public class HttpFunctionProviderTest {
-
- @Test
- public void splitOnlyWithFile() {
- Map.Entry<String, String> out =
- HttpFunctionProvider.splitFilePathAndEndpointForUDS(
- URI.create("http+unix:///some/path.sock"));
-
- assertEquals("/some/path.sock", out.getKey());
- assertEquals("/", out.getValue());
- }
-
- @Test
- public void splitOnlyWithFileAndEndpoint() {
- Map.Entry<String, String> out =
- HttpFunctionProvider.splitFilePathAndEndpointForUDS(
- URI.create("http+unix:///some/path.sock/hello"));
-
- assertEquals("/some/path.sock", out.getKey());
- assertEquals("/hello", out.getValue());
- }
-
- @Test(expected = IllegalStateException.class)
- public void missingSockFile() {
- HttpFunctionProvider.splitFilePathAndEndpointForUDS(URI.create("http+unix:///some/path/hello"));
- }
-}
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/httpfn/UnixDomainHttpEndpointTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/httpfn/UnixDomainHttpEndpointTest.java
new file mode 100644
index 0000000..f96ac3e
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/httpfn/UnixDomainHttpEndpointTest.java
@@ -0,0 +1,32 @@
+package org.apache.flink.statefun.flink.core.httpfn;
+
+import static org.junit.Assert.assertEquals;
+
+import java.net.URI;
+import org.junit.Test;
+
+public class UnixDomainHttpEndpointTest {
+
+ @Test
+ public void splitOnlyWithFile() {
+ UnixDomainHttpEndpoint out =
+ UnixDomainHttpEndpoint.parseFrom(URI.create("http+unix:///some/path.sock"));
+
+ assertEquals("/some/path.sock", out.unixDomainFile.toString());
+ assertEquals("/", out.pathSegment);
+ }
+
+ @Test
+ public void splitOnlyWithFileAndEndpoint() {
+ UnixDomainHttpEndpoint out =
+ UnixDomainHttpEndpoint.parseFrom(URI.create("http+unix:///some/path.sock/hello"));
+
+ assertEquals("/some/path.sock", out.unixDomainFile.toString());
+ assertEquals("/hello", out.pathSegment);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void missingSockFile() {
+ UnixDomainHttpEndpoint.parseFrom(URI.create("http+unix:///some/path/hello"));
+ }
+}