You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/05/21 06:27:57 UTC
[flink-statefun] 03/03: [FLINK-17611] [core] Add an ITCase for Unix
domain sockets
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 1c018e19d96f44334fe1b24280337679dc2a7be8
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Wed May 20 20:52:27 2020 +0200
[FLINK-17611] [core] Add an ITCase for Unix domain sockets
Co-authored-by: slinkydeveloper <fr...@gmail.com>
This closes #114.
---
statefun-flink/statefun-flink-core/pom.xml | 6 ++
.../flink/core/httpfn/UnixDomainSocketITCase.java | 99 ++++++++++++++++++++++
2 files changed, 105 insertions(+)
diff --git a/statefun-flink/statefun-flink-core/pom.xml b/statefun-flink/statefun-flink-core/pom.xml
index e99077e..d140f9b 100644
--- a/statefun-flink/statefun-flink-core/pom.xml
+++ b/statefun-flink/statefun-flink-core/pom.xml
@@ -100,6 +100,12 @@ under the License.
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.squareup.okhttp3</groupId>
+ <artifactId>mockwebserver</artifactId>
+ <version>${okhttp.version}</version>
+ <scope>test</scope>
+ </dependency>
<!-- benchmarks -->
<dependency>
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/httpfn/UnixDomainSocketITCase.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/httpfn/UnixDomainSocketITCase.java
new file mode 100644
index 0000000..8ba733c
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/httpfn/UnixDomainSocketITCase.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.httpfn;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import javax.net.ServerSocketFactory;
+import okhttp3.OkHttpClient;
+import okhttp3.OkHttpClient.Builder;
+import okhttp3.Request;
+import okhttp3.Response;
+import okhttp3.mockwebserver.MockResponse;
+import okhttp3.mockwebserver.MockWebServer;
+import org.junit.Test;
+import org.newsclub.net.unix.AFUNIXServerSocket;
+import org.newsclub.net.unix.AFUNIXSocketAddress;
+
+public class UnixDomainSocketITCase {
+
+ @Test(timeout = 10 * 1_000)
+ public void unixDomainSocket() throws IOException {
+ final File sockFile = new File("/tmp/uds-" + System.nanoTime() + ".sock");
+ sockFile.deleteOnExit();
+
+ try (MockWebServer server = new MockWebServer()) {
+ server.setServerSocketFactory(udsSocketFactory(sockFile));
+ server.enqueue(new MockResponse().setBody("hi"));
+ server.start();
+
+ OkHttpClient client = udsSocketClient(sockFile);
+
+ Response response = request(client);
+
+ assertTrue(response.isSuccessful());
+ assertThat(response.body(), is(notNullValue()));
+ assertThat(response.body().string(), is("hi"));
+ }
+ }
+
+ private static Response request(OkHttpClient client) throws IOException {
+ Request request = new Request.Builder().url("http://unused/").build();
+ return client.newCall(request).execute();
+ }
+
+ /** returns an {@link OkHttpClient} that connects trough the provided socket file. */
+ private static OkHttpClient udsSocketClient(File sockFile) {
+ Builder sharedClient = OkHttpUtils.newClient().newBuilder();
+ OkHttpUnixSocketBridge.configureUnixDomainSocket(sharedClient, sockFile);
+ return sharedClient.build();
+ }
+
+ private static ServerSocketFactory udsSocketFactory(File sockFile) {
+ return new ServerSocketFactory() {
+ @Override
+ public ServerSocket createServerSocket() throws IOException {
+ return AFUNIXServerSocket.forceBindOn(new AFUNIXSocketAddress(sockFile));
+ }
+
+ @Override
+ public ServerSocket createServerSocket(int i) throws IOException {
+ return createServerSocket();
+ }
+
+ @Override
+ public ServerSocket createServerSocket(int i, int i1) throws IOException {
+ return createServerSocket();
+ }
+
+ @Override
+ public ServerSocket createServerSocket(int i, int i1, InetAddress inetAddress)
+ throws IOException {
+ return createServerSocket();
+ }
+ };
+ }
+}