You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/05/21 06:27:56 UTC

[flink-statefun] 02/03: [FLINK-17611] [core] Bridge OkHttp and UNIX domain sockets

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 9c43dc9c7fe57f69668163502d231a0ed63ecde7
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Tue May 19 20:19:23 2020 +0200

    [FLINK-17611] [core] Bridge OkHttp and UNIX domain sockets
---
 pom.xml                                            |   5 -
 .../flink/core/httpfn/HttpFunctionProvider.java    |  69 +++-------
 .../flink/core/httpfn/OkHttpUnixSocketBridge.java  | 153 +++++++++++++++++++++
 .../flink/core/httpfn/UnixDomainHttpEndpoint.java  |  60 ++++++++
 .../core/httpfn/HttpFunctionProviderTest.java      |  35 -----
 .../core/httpfn/UnixDomainHttpEndpointTest.java    |  32 +++++
 6 files changed, 261 insertions(+), 93 deletions(-)

diff --git a/pom.xml b/pom.xml
index 6f40a25..e1f3668 100644
--- a/pom.xml
+++ b/pom.xml
@@ -171,11 +171,6 @@ under the License.
                             </configuration>
                         </execution>
                     </executions>
-                    <configuration>
-                        <includeDirectories>
-                            <include>/usr/include/</include>
-                        </includeDirectories>
-                    </configuration>
                 </plugin>
             </plugins>
         </pluginManagement>
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
index e0c9cb5..03159b1 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
@@ -18,22 +18,15 @@
 
 package org.apache.flink.statefun.flink.core.httpfn;
 
-import java.net.URI;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.AbstractMap;
-import java.util.Collections;
+import static org.apache.flink.statefun.flink.core.httpfn.OkHttpUnixSocketBridge.configureUnixDomainSocket;
+
 import java.util.Map;
-import java.util.stream.IntStream;
 import okhttp3.HttpUrl;
 import okhttp3.OkHttpClient;
-import okhttp3.Protocol;
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.statefun.flink.core.reqreply.RequestReplyClient;
 import org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction;
 import org.apache.flink.statefun.sdk.FunctionType;
 import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
-import org.newsclub.net.unix.AFUNIXSocketFactory;
 
 public class HttpFunctionProvider implements StatefulFunctionProvider {
   private final Map<FunctionType, HttpFunctionSpec> supportedTypes;
@@ -55,54 +48,24 @@ public class HttpFunctionProvider implements StatefulFunctionProvider {
   }
 
   private RequestReplyClient buildHttpClient(HttpFunctionSpec spec) {
-    // We need to build a UDS HTTP client
-    if (spec.isUnixDomainSocket()) {
+    OkHttpClient.Builder clientBuilder = sharedClient.newBuilder();
+    clientBuilder.callTimeout(spec.maxRequestDuration());
 
-      // We need to split the path in order to get the sock file and the path after the sock file
-      Map.Entry<String, String> splittedFilePathAndEndpoint =
-          splitFilePathAndEndpointForUDS(spec.endpoint());
+    final HttpUrl url;
+    if (spec.isUnixDomainSocket()) {
+      UnixDomainHttpEndpoint endpoint = UnixDomainHttpEndpoint.parseFrom(spec.endpoint());
 
-      OkHttpClient specificClient =
-          sharedClient
-              .newBuilder()
-              .socketFactory(
-                  new AFUNIXSocketFactory.FactoryArg(splittedFilePathAndEndpoint.getKey()))
-              // Enable HTTP/2 if available (uses H2 upgrade),
-              // otherwise fallback to HTTP/1.1
-              .protocols(Collections.singletonList(Protocol.HTTP_2))
-              .callTimeout(spec.maxRequestDuration())
+      url =
+          new HttpUrl.Builder()
+              .scheme("http")
+              .host("unused")
+              .addPathSegment(endpoint.pathSegment)
               .build();
 
-      return new HttpRequestReplyClient(
-          // Only the path matters!
-          HttpUrl.get(URI.create(splittedFilePathAndEndpoint.getValue())), specificClient);
-    }
-    // specific client reuses the same the connection pool and thread pool
-    // as the sharedClient.
-    OkHttpClient specificClient =
-        sharedClient.newBuilder().callTimeout(spec.maxRequestDuration()).build();
-    return new HttpRequestReplyClient(HttpUrl.get(spec.endpoint()), specificClient);
-  }
-
-  @VisibleForTesting
-  static Map.Entry<String, String> splitFilePathAndEndpointForUDS(URI input) {
-    // We need to split the path in order to get the sock file and the path after the sock file
-    Path path = Paths.get(input.getPath());
-
-    int sockPath =
-        IntStream.rangeClosed(0, path.getNameCount() - 1)
-            .filter(i -> path.getName(i).toString().endsWith(".sock"))
-            .findFirst()
-            .orElseThrow(
-                () ->
-                    new IllegalStateException(
-                        "Unix Domain Socket path should contain a .sock file"));
-
-    String filePath = "/" + path.subpath(0, sockPath + 1).toString();
-    String endpoint = "/";
-    if (sockPath != path.getNameCount() - 1) {
-      endpoint = "/" + path.subpath(sockPath + 1, path.getNameCount()).toString();
+      configureUnixDomainSocket(clientBuilder, endpoint.unixDomainFile);
+    } else {
+      url = HttpUrl.get(spec.endpoint());
     }
-    return new AbstractMap.SimpleImmutableEntry<>(filePath, endpoint);
+    return new HttpRequestReplyClient(url, clientBuilder.build());
   }
 }
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/OkHttpUnixSocketBridge.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/OkHttpUnixSocketBridge.java
new file mode 100644
index 0000000..b658ced
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/OkHttpUnixSocketBridge.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.httpfn;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import javax.net.SocketFactory;
+import okhttp3.Dns;
+import okhttp3.OkHttpClient;
+import org.apache.flink.util.IOUtils;
+import org.newsclub.net.unix.AFUNIXSocket;
+import org.newsclub.net.unix.AFUNIXSocketAddress;
+
+/** The following class holds utilities needed to bridge unix domain sockets and okhttp client. */
+final class OkHttpUnixSocketBridge {
+  private OkHttpUnixSocketBridge() {}
+
+  /** Configures the {@link OkHttpClient} builder to connect over a unix domain socket. */
+  static void configureUnixDomainSocket(OkHttpClient.Builder builder, File unixSocketFile) {
+    builder.socketFactory(new UnixSocketFactory(unixSocketFile)).dns(ConstantDnsLookup.INSTANCE);
+  }
+
+  /** resolve all host names to Ipv4 0.0.0.0 and port 0. */
+  private enum ConstantDnsLookup implements Dns {
+    INSTANCE;
+
+    @SuppressWarnings("NullableProblems")
+    @Override
+    public List<InetAddress> lookup(String hostname) throws UnknownHostException {
+      InetAddress address = InetAddress.getByAddress(hostname, new byte[] {0, 0, 0, 0});
+      return Collections.singletonList(address);
+    }
+  }
+
+  /**
+   * A {@code SocketFactory} that is bound to a specific path, and would return a {@code UnixSocket}
+   * for that path.
+   */
+  private static final class UnixSocketFactory extends SocketFactory {
+    private final File unixSocketFile;
+
+    public UnixSocketFactory(File unixSocketFile) {
+      this.unixSocketFile = Objects.requireNonNull(unixSocketFile);
+    }
+
+    @Override
+    public Socket createSocket() {
+      return new UnixSocket(unixSocketFile);
+    }
+
+    @Override
+    public Socket createSocket(String s, int i) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Socket createSocket(String s, int i, InetAddress inetAddress, int i1) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Socket createSocket(InetAddress inetAddress, int i) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Socket createSocket(InetAddress inetAddress, int i, InetAddress inetAddress1, int i1) {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  /**
+   * A {@code Socket} that is bound to a specific unix socket file, and delegates the relevant
+   * operations to {@link AFUNIXSocket}.
+   */
+  private static final class UnixSocket extends Socket {
+    private final File unixSocketFile;
+    private AFUNIXSocket delegate;
+
+    UnixSocket(File unixSocketFile) {
+      this.unixSocketFile = Objects.requireNonNull(unixSocketFile);
+    }
+
+    @Override
+    public void connect(SocketAddress endpoint, int timeout) throws IOException {
+      delegate = AFUNIXSocket.newInstance();
+      delegate.connect(new AFUNIXSocketAddress(unixSocketFile), timeout);
+      delegate.setSoTimeout(timeout);
+    }
+
+    @Override
+    public void bind(SocketAddress bindpoint) throws IOException {
+      delegate.bind(bindpoint);
+    }
+
+    @Override
+    public boolean isConnected() {
+      return delegate != null && delegate.isConnected();
+    }
+
+    @Override
+    public OutputStream getOutputStream() throws IOException {
+      return delegate.getOutputStream();
+    }
+
+    @Override
+    public InputStream getInputStream() throws IOException {
+      return delegate.getInputStream();
+    }
+
+    @Override
+    public synchronized void close() {
+      IOUtils.closeSocket(delegate);
+      delegate = null;
+    }
+
+    @Override
+    public boolean isClosed() {
+      return delegate.isClosed();
+    }
+
+    @Override
+    public synchronized void setSoTimeout(int timeout) {
+      // noop.
+      // we set the timeout after connecting
+    }
+  }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/UnixDomainHttpEndpoint.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/UnixDomainHttpEndpoint.java
new file mode 100644
index 0000000..c628fdb
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/UnixDomainHttpEndpoint.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.httpfn;
+
+import java.io.File;
+import java.net.URI;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Objects;
+
+/** Represents a Unix domain file path and an http endpoint */
+final class UnixDomainHttpEndpoint {
+
+  /** Parses a URI of the form {@code http+unix://<file system path>.sock/<http endpoint>}. */
+  static UnixDomainHttpEndpoint parseFrom(URI endpoint) {
+    final Path path = Paths.get(endpoint.getPath());
+    final int sockPathIndex = indexOfSockFile(path);
+    final String filePath = "/" + path.subpath(0, sockPathIndex + 1).toString();
+    final File unixDomainFile = new File(filePath);
+
+    if (sockPathIndex == path.getNameCount() - 1) {
+      return new UnixDomainHttpEndpoint(unixDomainFile, "/");
+    }
+    String pathSegment = "/" + path.subpath(sockPathIndex + 1, path.getNameCount()).toString();
+    return new UnixDomainHttpEndpoint(unixDomainFile, pathSegment);
+  }
+
+  private static int indexOfSockFile(Path path) {
+    for (int i = 0; i < path.getNameCount(); i++) {
+      if (path.getName(i).toString().endsWith(".sock")) {
+        return i;
+      }
+    }
+    throw new IllegalStateException("Unix Domain Socket path should contain a .sock file");
+  }
+
+  final File unixDomainFile;
+  final String pathSegment;
+
+  private UnixDomainHttpEndpoint(File unixDomainFile, String endpoint) {
+    this.unixDomainFile = Objects.requireNonNull(unixDomainFile);
+    this.pathSegment = Objects.requireNonNull(endpoint);
+  }
+}
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProviderTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProviderTest.java
deleted file mode 100644
index 21bbccc..0000000
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProviderTest.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package org.apache.flink.statefun.flink.core.httpfn;
-
-import static org.junit.Assert.assertEquals;
-
-import java.net.URI;
-import java.util.Map;
-import org.junit.Test;
-
-public class HttpFunctionProviderTest {
-
-  @Test
-  public void splitOnlyWithFile() {
-    Map.Entry<String, String> out =
-        HttpFunctionProvider.splitFilePathAndEndpointForUDS(
-            URI.create("http+unix:///some/path.sock"));
-
-    assertEquals("/some/path.sock", out.getKey());
-    assertEquals("/", out.getValue());
-  }
-
-  @Test
-  public void splitOnlyWithFileAndEndpoint() {
-    Map.Entry<String, String> out =
-        HttpFunctionProvider.splitFilePathAndEndpointForUDS(
-            URI.create("http+unix:///some/path.sock/hello"));
-
-    assertEquals("/some/path.sock", out.getKey());
-    assertEquals("/hello", out.getValue());
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void missingSockFile() {
-    HttpFunctionProvider.splitFilePathAndEndpointForUDS(URI.create("http+unix:///some/path/hello"));
-  }
-}
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/httpfn/UnixDomainHttpEndpointTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/httpfn/UnixDomainHttpEndpointTest.java
new file mode 100644
index 0000000..f96ac3e
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/httpfn/UnixDomainHttpEndpointTest.java
@@ -0,0 +1,32 @@
+package org.apache.flink.statefun.flink.core.httpfn;
+
+import static org.junit.Assert.assertEquals;
+
+import java.net.URI;
+import org.junit.Test;
+
+public class UnixDomainHttpEndpointTest {
+
+  @Test
+  public void splitOnlyWithFile() {
+    UnixDomainHttpEndpoint out =
+        UnixDomainHttpEndpoint.parseFrom(URI.create("http+unix:///some/path.sock"));
+
+    assertEquals("/some/path.sock", out.unixDomainFile.toString());
+    assertEquals("/", out.pathSegment);
+  }
+
+  @Test
+  public void splitOnlyWithFileAndEndpoint() {
+    UnixDomainHttpEndpoint out =
+        UnixDomainHttpEndpoint.parseFrom(URI.create("http+unix:///some/path.sock/hello"));
+
+    assertEquals("/some/path.sock", out.unixDomainFile.toString());
+    assertEquals("/hello", out.pathSegment);
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void missingSockFile() {
+    UnixDomainHttpEndpoint.parseFrom(URI.create("http+unix:///some/path/hello"));
+  }
+}