You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/05/21 06:27:57 UTC

[flink-statefun] 03/03: [FLINK-17611] [core] Add an ITCase for Unix domain sockets

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 1c018e19d96f44334fe1b24280337679dc2a7be8
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Wed May 20 20:52:27 2020 +0200

    [FLINK-17611] [core] Add an ITCase for Unix domain sockets
    
    Co-authored-by: slinkydeveloper <fr...@gmail.com>
    
    This closes #114.
---
 statefun-flink/statefun-flink-core/pom.xml         |  6 ++
 .../flink/core/httpfn/UnixDomainSocketITCase.java  | 99 ++++++++++++++++++++++
 2 files changed, 105 insertions(+)

diff --git a/statefun-flink/statefun-flink-core/pom.xml b/statefun-flink/statefun-flink-core/pom.xml
index e99077e..d140f9b 100644
--- a/statefun-flink/statefun-flink-core/pom.xml
+++ b/statefun-flink/statefun-flink-core/pom.xml
@@ -100,6 +100,12 @@ under the License.
             <version>${flink.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>com.squareup.okhttp3</groupId>
+            <artifactId>mockwebserver</artifactId>
+            <version>${okhttp.version}</version>
+            <scope>test</scope>
+        </dependency>
 
         <!-- benchmarks -->
         <dependency>
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/httpfn/UnixDomainSocketITCase.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/httpfn/UnixDomainSocketITCase.java
new file mode 100644
index 0000000..8ba733c
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/httpfn/UnixDomainSocketITCase.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.httpfn;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import javax.net.ServerSocketFactory;
+import okhttp3.OkHttpClient;
+import okhttp3.OkHttpClient.Builder;
+import okhttp3.Request;
+import okhttp3.Response;
+import okhttp3.mockwebserver.MockResponse;
+import okhttp3.mockwebserver.MockWebServer;
+import org.junit.Test;
+import org.newsclub.net.unix.AFUNIXServerSocket;
+import org.newsclub.net.unix.AFUNIXSocketAddress;
+
+public class UnixDomainSocketITCase {
+
+  @Test(timeout = 10 * 1_000)
+  public void unixDomainSocket() throws IOException {
+    final File sockFile = new File("/tmp/uds-" + System.nanoTime() + ".sock");
+    sockFile.deleteOnExit();
+
+    try (MockWebServer server = new MockWebServer()) {
+      server.setServerSocketFactory(udsSocketFactory(sockFile));
+      server.enqueue(new MockResponse().setBody("hi"));
+      server.start();
+
+      OkHttpClient client = udsSocketClient(sockFile);
+
+      Response response = request(client);
+
+      assertTrue(response.isSuccessful());
+      assertThat(response.body(), is(notNullValue()));
+      assertThat(response.body().string(), is("hi"));
+    }
+  }
+
+  private static Response request(OkHttpClient client) throws IOException {
+    Request request = new Request.Builder().url("http://unused/").build();
+    return client.newCall(request).execute();
+  }
+
+  /** returns an {@link OkHttpClient} that connects trough the provided socket file. */
+  private static OkHttpClient udsSocketClient(File sockFile) {
+    Builder sharedClient = OkHttpUtils.newClient().newBuilder();
+    OkHttpUnixSocketBridge.configureUnixDomainSocket(sharedClient, sockFile);
+    return sharedClient.build();
+  }
+
+  private static ServerSocketFactory udsSocketFactory(File sockFile) {
+    return new ServerSocketFactory() {
+      @Override
+      public ServerSocket createServerSocket() throws IOException {
+        return AFUNIXServerSocket.forceBindOn(new AFUNIXSocketAddress(sockFile));
+      }
+
+      @Override
+      public ServerSocket createServerSocket(int i) throws IOException {
+        return createServerSocket();
+      }
+
+      @Override
+      public ServerSocket createServerSocket(int i, int i1) throws IOException {
+        return createServerSocket();
+      }
+
+      @Override
+      public ServerSocket createServerSocket(int i, int i1, InetAddress inetAddress)
+          throws IOException {
+        return createServerSocket();
+      }
+    };
+  }
+}