You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/10/03 13:14:00 UTC

[jira] [Commented] (FLINK-10371) Allow to enable SSL mutual authentication on REST endpoints by configuration

    [ https://issues.apache.org/jira/browse/FLINK-10371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16636925#comment-16636925 ] 

ASF GitHub Bot commented on FLINK-10371:
----------------------------------------

asfgit closed pull request #6727: [FLINK-10371] Allow to enable SSL mutual authentication on REST endpoints by configuration
URL: https://github.com/apache/flink/pull/6727
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/_includes/generated/security_configuration.html b/docs/_includes/generated/security_configuration.html
index 19d0287df99..680c1c02434 100644
--- a/docs/_includes/generated/security_configuration.html
+++ b/docs/_includes/generated/security_configuration.html
@@ -62,6 +62,11 @@
             <td style="word-wrap: break-word;">"TLSv1.2"</td>
             <td>The SSL protocol version to be supported for the ssl transport. Note that it doesn’t support comma separated list.</td>
         </tr>
+        <tr>
+            <td><h5>security.ssl.rest.authentication-enabled</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Turns on mutual SSL authentication for external communication via the REST endpoints.</td>
+        </tr>
         <tr>
             <td><h5>security.ssl.rest.enabled</h5></td>
             <td style="word-wrap: break-word;">false</td>
diff --git a/docs/ops/security-ssl.md b/docs/ops/security-ssl.md
index ed5f4d771bc..6ea686203ee 100644
--- a/docs/ops/security-ssl.md
+++ b/docs/ops/security-ssl.md
@@ -43,11 +43,11 @@ Internal connectivity includes:
 
   - Control messages: RPC between JobManager / TaskManager / Dispatcher / ResourceManager
   - The data plane: The connections between TaskManagers to exchange data during shuffles, broadcasts, redistribution, etc.
-  - The Blob Service (distribution of libraries and other artifacts). 
+  - The Blob Service (distribution of libraries and other artifacts).
 
 All internal connections are SSL authenticated and encrypted. The connections use **mutual authentication**, meaning both server
 and client side of each connection need to present the certificate to each other. The certificate acts effectively as a shared
-secret. 
+secret.
 
 A common setup is to generate a dedicated certificate (may be self-signed) for a Flink deployment. The certificate for internal communication
 is not needed by any other party to interact with Flink, and can be simply added to the container images, or attached to the YARN deployment.
@@ -61,15 +61,14 @@ All external connectivity is exposed via an HTTP/REST endpoint, used for example
   - Communication with the *Dispatcher* to submit jobs (session clusters)
   - Communication with the *JobManager* to inspect and modify a running job/application
 
-The REST endpoints can be configured to require SSL connections. The server will, however, accept connections from any client, meaning the REST endpoint does not authenticate the client.
+The REST endpoints can be configured to require SSL connections. The server will, however, accept connections from any client by default, meaning the REST endpoint does not authenticate the client.
 
-If authentication of connections to the REST endpoint is required, we recommend to deploy a "side car proxy":
+Simple mutual authentication may be enabled by configuration if authentication of connections to the REST endpoint is required, but we recommend to deploy a "side car proxy":
 Bind the REST endpoint to the loopback interface (or the pod-local interface in Kubernetes) and start a REST proxy that authenticates and forwards the requests to Flink.
 Examples for proxies that Flink users have deployed are [Envoy Proxy](https://www.envoyproxy.io/) or
 [NGINX with MOD_AUTH](http://nginx.org/en/docs/http/ngx_http_auth_request_module.html).
 
-The rationale behind delegating authentication to a proxy is that such proxies offer many more authentication options than the Flink project could reasonably implement itself,
-and thus offer better integration into existing infrastructures.
+The rationale behind delegating authentication to a proxy is that such proxies offer a wide variety of authentication options and thus better integration into existing infrastructures.
 
 
 #### Queryable State
@@ -115,10 +114,12 @@ security.ssl.internal.truststore-password: truststore_password
 
 **REST Endpoints (external connectivity)**
 
-For REST endpoints, the keystore is used by the server endpoint, and the truststore is used by the REST clients (including the CLI client)
+For REST endpoints, by default the keystore is used by the server endpoint, and the truststore is used by the REST clients (including the CLI client)
 to accept the server's certificate. In the case where the REST keystore has a self-signed certificate, the truststore must trust that certificate directly.
 If the REST endpoint uses a certificate that is signed through a proper certification hierarchy, the roots of that hierarchy should
-be in the trust store. 
+be in the trust store.
+
+If mutual authentication is enabled, the keystore and the truststore are used by both, the server endpoint and the REST clients as with internal connectivity.
 
 {% highlight yaml %}
 security.ssl.rest.keystore: /path/to/file.keystore
@@ -126,6 +127,7 @@ security.ssl.rest.keystore-password: keystore_password
 security.ssl.rest.key-password: key_password
 security.ssl.rest.truststore: /path/to/file.truststore
 security.ssl.rest.truststore-password: truststore_password
+security.ssl.rest.authentication-enabled: false
 {% endhighlight %}
 
 **IMPORTANT**
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
index 750170c80bb..be413cb7264 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
@@ -120,6 +120,14 @@
 			.defaultValue(false)
 			.withDescription("Turns on SSL for external communication via the REST endpoints.");
 
+	/**
+	 * Enable mututal SSL authentication for external REST endpoints.
+	 */
+	public static final ConfigOption<Boolean> SSL_REST_AUTHENTICATION_ENABLED =
+		key("security.ssl.rest.authentication-enabled")
+			.defaultValue(false)
+			.withDescription("Turns on mutual SSL authentication for external communication via the REST endpoints.");
+
 	// ----------------- certificates (internal + external) -------------------
 
 	/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
index d209f5fd5bf..e0b208d4b8d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
@@ -26,10 +26,12 @@
 import javax.annotation.Nullable;
 import javax.net.ServerSocketFactory;
 import javax.net.SocketFactory;
+import javax.net.ssl.KeyManager;
 import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLServerSocket;
 import javax.net.ssl.SSLServerSocketFactory;
+import javax.net.ssl.TrustManager;
 import javax.net.ssl.TrustManagerFactory;
 
 import java.io.File;
@@ -65,6 +67,15 @@ public static boolean isRestSSLEnabled(Configuration sslConfig) {
 		return sslConfig.getBoolean(SecurityOptions.SSL_REST_ENABLED, fallbackFlag);
 	}
 
+	/**
+	 * Checks whether mutual SSL authentication for the external REST endpoint is enabled.
+	 */
+	public static boolean isRestSSLAuthenticationEnabled(Configuration sslConfig) {
+		checkNotNull(sslConfig, "sslConfig");
+		return isRestSSLEnabled(sslConfig) &&
+			sslConfig.getBoolean(SecurityOptions.SSL_REST_AUTHENTICATION_ENABLED);
+	}
+
 	/**
 	 * Creates a factory for SSL Server Sockets from the given configuration.
 	 * SSL Server Sockets are always part of internal communication.
@@ -145,7 +156,7 @@ public static SSLEngineFactory createRestServerSSLEngineFactory(final Configurat
 				getEnabledProtocols(config),
 				getEnabledCipherSuites(config),
 				false,
-				false);
+				isRestSSLAuthenticationEnabled(config));
 	}
 
 	/**
@@ -164,7 +175,7 @@ public static SSLEngineFactory createRestClientSSLEngineFactory(final Configurat
 				getEnabledProtocols(config),
 				getEnabledCipherSuites(config),
 				true,
-				false);
+				isRestSSLAuthenticationEnabled(config));
 	}
 
 	private static String[] getEnabledProtocols(final Configuration config) {
@@ -228,73 +239,100 @@ public static SSLContext createInternalSSLContext(Configuration config) throws E
 		return sslContext;
 	}
 
+	private enum RestSSLContextConfigMode {
+		CLIENT,
+		SERVER,
+		MUTUAL
+	}
+
 	/**
-	 * Creates an SSL context for the external REST endpoint server.
+	 * Creates an SSL context for the external REST SSL.
+	 * If mutual authentication is configured the client and the server side configuration are identical.
 	 */
 	@Nullable
-	public static SSLContext createRestServerSSLContext(Configuration config) throws Exception {
+	private static SSLContext createRestSSLContext(Configuration config, RestSSLContextConfigMode configMode) throws Exception {
 		checkNotNull(config, "config");
 
 		if (!isRestSSLEnabled(config)) {
 			return null;
 		}
 
-		String keystoreFilePath = getAndCheckOption(
+		KeyManager[] keyManagers = null;
+		if (configMode == RestSSLContextConfigMode.SERVER || configMode == RestSSLContextConfigMode.MUTUAL) {
+			String keystoreFilePath = getAndCheckOption(
 				config, SecurityOptions.SSL_REST_KEYSTORE, SecurityOptions.SSL_KEYSTORE);
 
-		String keystorePassword = getAndCheckOption(
+			String keystorePassword = getAndCheckOption(
 				config, SecurityOptions.SSL_REST_KEYSTORE_PASSWORD, SecurityOptions.SSL_KEYSTORE_PASSWORD);
 
-		String certPassword = getAndCheckOption(
+			String certPassword = getAndCheckOption(
 				config, SecurityOptions.SSL_REST_KEY_PASSWORD, SecurityOptions.SSL_KEY_PASSWORD);
 
-		String sslProtocolVersion = config.getString(SecurityOptions.SSL_PROTOCOL);
+			KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
+			try (InputStream keyStoreFile = Files.newInputStream(new File(keystoreFilePath).toPath())) {
+				keyStore.load(keyStoreFile, keystorePassword.toCharArray());
+			}
 
-		KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
-		try (InputStream keyStoreFile = Files.newInputStream(new File(keystoreFilePath).toPath())) {
-			keyStore.load(keyStoreFile, keystorePassword.toCharArray());
+			KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+			kmf.init(keyStore, certPassword.toCharArray());
+
+			keyManagers = kmf.getKeyManagers();
 		}
 
-		KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
-		kmf.init(keyStore, certPassword.toCharArray());
+		TrustManager[] trustManagers = null;
+		if (configMode == RestSSLContextConfigMode.CLIENT || configMode == RestSSLContextConfigMode.MUTUAL) {
+			String trustStoreFilePath = getAndCheckOption(
+				config, SecurityOptions.SSL_REST_TRUSTSTORE, SecurityOptions.SSL_TRUSTSTORE);
+
+			String trustStorePassword = getAndCheckOption(
+				config, SecurityOptions.SSL_REST_TRUSTSTORE_PASSWORD, SecurityOptions.SSL_TRUSTSTORE_PASSWORD);
 
+			KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
+			try (InputStream trustStoreFile = Files.newInputStream(new File(trustStoreFilePath).toPath())) {
+				trustStore.load(trustStoreFile, trustStorePassword.toCharArray());
+			}
+
+			TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+			tmf.init(trustStore);
+
+			trustManagers = tmf.getTrustManagers();
+		}
+
+		String sslProtocolVersion = config.getString(SecurityOptions.SSL_PROTOCOL);
 		SSLContext sslContext = SSLContext.getInstance(sslProtocolVersion);
-		sslContext.init(kmf.getKeyManagers(), null, null);
+		sslContext.init(keyManagers, trustManagers, null);
 
 		return sslContext;
 	}
 
 	/**
-	 * Creates an SSL context for clients against the external REST endpoint.
+	 * Creates an SSL context for the external REST endpoint server.
 	 */
 	@Nullable
-	public static SSLContext createRestClientSSLContext(Configuration config) throws Exception {
-		checkNotNull(config, "config");
-
-		if (!isRestSSLEnabled(config)) {
-			return null;
+	public static SSLContext createRestServerSSLContext(Configuration config) throws Exception {
+		final RestSSLContextConfigMode configMode;
+		if (isRestSSLAuthenticationEnabled(config)) {
+			configMode = RestSSLContextConfigMode.MUTUAL;
+		} else {
+			configMode = RestSSLContextConfigMode.SERVER;
 		}
 
-		String trustStoreFilePath = getAndCheckOption(
-				config, SecurityOptions.SSL_REST_TRUSTSTORE, SecurityOptions.SSL_TRUSTSTORE);
-
-		String trustStorePassword = getAndCheckOption(
-				config, SecurityOptions.SSL_REST_TRUSTSTORE_PASSWORD, SecurityOptions.SSL_TRUSTSTORE_PASSWORD);
-
-		String sslProtocolVersion = config.getString(SecurityOptions.SSL_PROTOCOL);
+		return createRestSSLContext(config, configMode);
+	}
 
-		KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
-		try (InputStream trustStoreFile = Files.newInputStream(new File(trustStoreFilePath).toPath())) {
-			trustStore.load(trustStoreFile, trustStorePassword.toCharArray());
+	/**
+	 * Creates an SSL context for clients against the external REST endpoint.
+	 */
+	@Nullable
+	public static SSLContext createRestClientSSLContext(Configuration config) throws Exception {
+		final RestSSLContextConfigMode configMode;
+		if (isRestSSLAuthenticationEnabled(config)) {
+			configMode = RestSSLContextConfigMode.MUTUAL;
+		} else {
+			configMode = RestSSLContextConfigMode.CLIENT;
 		}
 
-		TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
-		tmf.init(trustStore);
-
-		SSLContext sslContext = SSLContext.getInstance(sslProtocolVersion);
-		sslContext.init(null, tmf.getTrustManagers(), null);
-
-		return sslContext;
+		return createRestSSLContext(config, configMode);
 	}
 
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
index ca0be6b5bfd..9610e98b101 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
@@ -78,6 +78,28 @@ public void checkEnableSSL() {
 		assertFalse(SSLUtils.isRestSSLEnabled(precedence));
 	}
 
+	/**
+	 * Tests whether activation of REST mutual SSL authentication evaluates the config flags correctly.
+	 */
+	@Test
+	public void checkEnableRestSSLAuthentication() {
+		// SSL has to be enabled
+		Configuration noSSLOptions = new Configuration();
+		noSSLOptions.setBoolean(SecurityOptions.SSL_REST_ENABLED, false);
+		noSSLOptions.setBoolean(SecurityOptions.SSL_REST_AUTHENTICATION_ENABLED, true);
+		assertFalse(SSLUtils.isRestSSLAuthenticationEnabled(noSSLOptions));
+
+		// authentication is disabled by default
+		Configuration defaultOptions = new Configuration();
+		defaultOptions.setBoolean(SecurityOptions.SSL_REST_ENABLED, true);
+		assertFalse(SSLUtils.isRestSSLAuthenticationEnabled(defaultOptions));
+
+		Configuration options = new Configuration();
+		noSSLOptions.setBoolean(SecurityOptions.SSL_REST_ENABLED, true);
+		noSSLOptions.setBoolean(SecurityOptions.SSL_REST_AUTHENTICATION_ENABLED, true);
+		assertTrue(SSLUtils.isRestSSLAuthenticationEnabled(noSSLOptions));
+	}
+
 	@Test
 	public void testSocketFactoriesWhenSslDisabled() throws Exception {
 		Configuration config = new Configuration();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
index b017610aa3a..962619160e4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
@@ -71,7 +71,9 @@
 import org.junit.runners.Parameterized;
 
 import javax.annotation.Nonnull;
+import javax.net.ssl.HttpsURLConnection;
 import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSocketFactory;
 
 import java.io.File;
 import java.io.IOException;
@@ -126,6 +128,7 @@
 
 	private final Configuration config;
 	private SSLContext defaultSSLContext;
+	private SSLSocketFactory defaultSSLSocketFactory;
 
 	public RestServerEndpointITCase(final Configuration config) {
 		this.config = requireNonNull(config);
@@ -151,8 +154,11 @@ public RestServerEndpointITCase(final Configuration config) {
 		sslConfig.setString(SecurityOptions.SSL_REST_KEYSTORE_PASSWORD, "password");
 		sslConfig.setString(SecurityOptions.SSL_REST_KEY_PASSWORD, "password");
 
+		final Configuration sslRestAuthConfig = new Configuration(sslConfig);
+		sslRestAuthConfig.setBoolean(SecurityOptions.SSL_REST_AUTHENTICATION_ENABLED, true);
+
 		return Arrays.asList(new Object[][]{
-			{config}, {sslConfig}
+			{config}, {sslConfig}, {sslRestAuthConfig}
 		});
 	}
 
@@ -170,9 +176,11 @@ public void setup() throws Exception {
 		config.setString(WebOptions.UPLOAD_DIR, temporaryFolder.newFolder().getCanonicalPath());
 
 		defaultSSLContext = SSLContext.getDefault();
+		defaultSSLSocketFactory = HttpsURLConnection.getDefaultSSLSocketFactory();
 		final SSLContext sslClientContext = SSLUtils.createRestClientSSLContext(config);
 		if (sslClientContext != null) {
 			SSLContext.setDefault(sslClientContext);
+			HttpsURLConnection.setDefaultSSLSocketFactory(sslClientContext.getSocketFactory());
 		}
 
 		RestServerEndpointConfiguration serverConfig = RestServerEndpointConfiguration.fromConfiguration(config);
@@ -235,6 +243,7 @@ public void setup() throws Exception {
 	public void teardown() throws Exception {
 		if (defaultSSLContext != null) {
 			SSLContext.setDefault(defaultSSLContext);
+			HttpsURLConnection.setDefaultSSLSocketFactory(defaultSSLSocketFactory);
 		}
 
 		if (restClient != null) {
@@ -541,7 +550,7 @@ private static String createStringOfSize(int size) {
 		return sb.toString();
 	}
 
-	private static class TestRestServerEndpoint extends RestServerEndpoint {
+	static class TestRestServerEndpoint extends RestServerEndpoint {
 
 		private final List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers;
 
@@ -602,7 +611,7 @@ protected void startInternal() {}
 		}
 	}
 
-	private static class TestRestClient extends RestClient {
+	static class TestRestClient extends RestClient {
 
 		TestRestClient(RestClientConfiguration configuration) {
 			super(configuration, TestingUtils.defaultExecutor());
@@ -803,9 +812,9 @@ private TestUploadHandler(
 		}
 	}
 
-	private static class TestVersionHandler extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
+	static class TestVersionHandler extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
 
-		private TestVersionHandler(
+		TestVersionHandler(
 			final CompletableFuture<String> localRestAddress,
 			final GatewayRetriever<? extends RestfulGateway> leaderRetriever,
 			final Time timeout) {
@@ -818,7 +827,7 @@ private TestVersionHandler(
 		}
 	}
 
-	private enum TestVersionHeaders implements MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
+	enum TestVersionHeaders implements MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
 		INSTANCE;
 
 		@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerSSLAuthITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerSSLAuthITCase.java
new file mode 100644
index 00000000000..5d75a300673
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerSSLAuthITCase.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import javax.net.ssl.SSLHandshakeException;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * This test validates that connections are failing when mutual auth is enabled but untrusted
+ * keys are used.
+ */
+public class RestServerSSLAuthITCase extends TestLogger {
+
+	private static final String KEY_STORE_FILE = RestServerSSLAuthITCase.class.getResource("/local127.keystore").getFile();
+	private static final String TRUST_STORE_FILE = RestServerSSLAuthITCase.class.getResource("/local127.truststore").getFile();
+	private static final String UNTRUSTED_KEY_STORE_FILE = RestServerSSLAuthITCase.class.getResource("/untrusted.keystore").getFile();
+
+	private static final Time timeout = Time.seconds(10L);
+
+	private RestfulGateway restfulGateway;
+
+	@Test
+	public void testConnectFailure() throws Exception {
+		RestClient restClient = null;
+		RestServerEndpoint serverEndpoint = null;
+
+		try {
+			final Configuration baseConfig = new Configuration();
+			baseConfig.setInteger(RestOptions.PORT, 0);
+			baseConfig.setString(RestOptions.ADDRESS, "localhost");
+			baseConfig.setBoolean(SecurityOptions.SSL_REST_ENABLED, true);
+			baseConfig.setBoolean(SecurityOptions.SSL_REST_AUTHENTICATION_ENABLED, true);
+			baseConfig.setString(SecurityOptions.SSL_ALGORITHMS, "TLS_RSA_WITH_AES_128_CBC_SHA");
+
+			Configuration serverConfig = new Configuration(baseConfig);
+			serverConfig.setString(SecurityOptions.SSL_REST_TRUSTSTORE, TRUST_STORE_FILE);
+			serverConfig.setString(SecurityOptions.SSL_REST_TRUSTSTORE_PASSWORD, "password");
+			serverConfig.setString(SecurityOptions.SSL_REST_KEYSTORE, KEY_STORE_FILE);
+			serverConfig.setString(SecurityOptions.SSL_REST_KEYSTORE_PASSWORD, "password");
+			serverConfig.setString(SecurityOptions.SSL_REST_KEY_PASSWORD, "password");
+
+			Configuration clientConfig = new Configuration(baseConfig);
+			clientConfig.setString(SecurityOptions.SSL_REST_TRUSTSTORE, UNTRUSTED_KEY_STORE_FILE);
+			clientConfig.setString(SecurityOptions.SSL_REST_TRUSTSTORE_PASSWORD, "password");
+			clientConfig.setString(SecurityOptions.SSL_REST_KEYSTORE, KEY_STORE_FILE);
+			clientConfig.setString(SecurityOptions.SSL_REST_KEYSTORE_PASSWORD, "password");
+			clientConfig.setString(SecurityOptions.SSL_REST_KEY_PASSWORD, "password");
+
+			RestServerEndpointConfiguration restServerConfig = RestServerEndpointConfiguration.fromConfiguration(serverConfig);
+			RestClientConfiguration restClientConfig = RestClientConfiguration.fromConfiguration(clientConfig);
+
+			RestfulGateway restfulGateway = TestingRestfulGateway.newBuilder().build();
+			RestServerEndpointITCase.TestVersionHandler testVersionHandler = new RestServerEndpointITCase.TestVersionHandler(
+				CompletableFuture.completedFuture("http://localhost:1234"),
+				() -> CompletableFuture.completedFuture(restfulGateway),
+				RpcUtils.INF_TIMEOUT);
+
+			serverEndpoint = new RestServerEndpointITCase.TestRestServerEndpoint(
+				restServerConfig,
+				Arrays.asList(Tuple2.of(testVersionHandler.getMessageHeaders(), testVersionHandler)));
+			restClient = new RestServerEndpointITCase.TestRestClient(restClientConfig);
+			serverEndpoint.start();
+
+			CompletableFuture<EmptyResponseBody> response = restClient.sendRequest(
+				serverEndpoint.getServerAddress().getHostName(),
+				serverEndpoint.getServerAddress().getPort(),
+				RestServerEndpointITCase.TestVersionHeaders.INSTANCE,
+				EmptyMessageParameters.getInstance(),
+				EmptyRequestBody.getInstance(),
+				Collections.emptyList()
+			);
+			response.get(60, TimeUnit.SECONDS);
+
+			fail("should never complete normally");
+		} catch (ExecutionException exception) {
+			// that is what we want
+			assertTrue(ExceptionUtils.findThrowable(exception, SSLHandshakeException.class).isPresent());
+		} finally {
+			if (restClient != null) {
+				restClient.shutdown(timeout);
+			}
+
+			if (serverEndpoint != null) {
+				serverEndpoint.close();
+			}
+		}
+	}
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Allow to enable SSL mutual authentication on REST endpoints by configuration
> ----------------------------------------------------------------------------
>
>                 Key: FLINK-10371
>                 URL: https://issues.apache.org/jira/browse/FLINK-10371
>             Project: Flink
>          Issue Type: Improvement
>          Components: Client, REST, Security
>    Affects Versions: 1.6.0, 1.7.0
>            Reporter: Johannes Dillmann
>            Assignee: Johannes Dillmann
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.7.0, 1.6.2
>
>
> With Flink 1.6 SSL mutual authentication was introduced for internal connectivity in FLINK-9312. 
>  SSL support for external connectivity was also introduced in regard to encryption of the connection and verification of the Flink REST endpoint from the client side.
> But _mutual authentication between the REST endpoint and clients is not supported yet_.
>  The [documentation suggests |https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/security-ssl.html] using a side car proxy to enable SSL mutual auth on the REST endpoint and points out the advantages of using a feature rich proxy.
> While this is a good rationale, there are still important use cases for support of  simple mutual authentication directly in Flink: Mainly support for using standard images in a containerized environment.
> There are tools used to setup Flink Jobs (for example on Kubernetes clusters) and act as gateways to the Flink REST endpoint and the Flink web interface. To prevent unauthorised access to Flink the connectivity has to be secured. As the tools acts as gateway it is easy to create and pass a shared keystore  and truststore used for mutual authentication to the Flink instances configurations.
> To enable for SSL mutual authentication on REST endpoints, I am suggesting to add a the configuration parameter `security.ssl.rest.authentication-enabled` which defaults to `false`.
>  If it is set to `true` the `SSLUtils` factories for creating the REST server endpoint and the REST clients should set authentication to required and share `security.ssl.rest.keystore` and `security.ssl.rest.truststore` to setup SSL mutual authenticated connections.
>  
> I have a working prototype which I would gladly submit as a PR to get further feedback. The changes to Flink are minimal and the default behaviour won't change.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)