You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by nk...@apache.org on 2018/08/20 21:53:17 UTC

[flink] branch release-1.5 updated: [FLINK-9878][network][ssl] add more low-level ssl options

This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.5 by this push:
     new 9e421a4  [FLINK-9878][network][ssl] add more low-level ssl options
9e421a4 is described below

commit 9e421a438dd830c6be72e5f13f855e68a82aef21
Author: Nico Kruber <ni...@gmail.com>
AuthorDate: Mon Aug 20 23:53:12 2018 +0200

    [FLINK-9878][network][ssl] add more low-level ssl options
    
    This is mostly to tackle bugs like https://github.com/netty/netty/issues/832
    (JDK issue during garbage collection when the SSL session cache is not limited).
    We add the following low-level configuration options for the user to fine-tune
    their system:
    
    - SSL session cache size via 'security.ssl.session-cache-size'
    - SSL session timeout via 'security.ssl.session-timeout'
    - SSL handshake timeout via 'security.ssl.handshake-timeout'
    - SSL close notify flush timeout via 'security.ssl.close-notify-flush-timeout'
    
    This closes #6355.
---
 .../generated/security_configuration.html          |  20 +++
 docs/ops/security-ssl.md                           |   4 +
 .../flink/configuration/SecurityOptions.java       |  37 ++++++
 .../flink/mesos/util/MesosArtifactServer.java      |   7 +-
 .../runtime/webmonitor/WebRuntimeMonitor.java      |   5 +-
 .../runtime/webmonitor/history/HistoryServer.java  |   5 +-
 .../webmonitor/utils/WebFrontendBootstrap.java     |   9 +-
 .../org/apache/flink/runtime/blob/BlobClient.java  |   5 +-
 .../org/apache/flink/runtime/blob/BlobServer.java  |   5 +-
 .../runtime/io/network/netty/NettyClient.java      |  19 ++-
 .../runtime/io/network/netty/NettyConfig.java      |  27 ++--
 .../runtime/io/network/netty/NettyServer.java      |  17 ++-
 .../org/apache/flink/runtime/net/SSLUtils.java     | 139 ++++++++++++++-------
 .../io/network/netty/NettyClientServerSslTest.java | 112 +++++++++++------
 .../org/apache/flink/runtime/net/SSLUtilsTest.java |  32 +++--
 .../runtime/rest/RestServerEndpointITCase.java     |   4 +-
 16 files changed, 299 insertions(+), 148 deletions(-)

diff --git a/docs/_includes/generated/security_configuration.html b/docs/_includes/generated/security_configuration.html
index cd682ec..3576294 100644
--- a/docs/_includes/generated/security_configuration.html
+++ b/docs/_includes/generated/security_configuration.html
@@ -13,11 +13,21 @@
             <td>The comma separated list of standard SSL algorithms to be supported. Read more &#60;a href="http://docs.oracle.com/javase/8/docs/technotes/guides/security/StandardNames.html#ciphersuites"&#62;here&#60;/a&#62;.</td>
         </tr>
         <tr>
+            <td><h5>security.ssl.close-notify-flush-timeout</h5></td>
+            <td style="word-wrap: break-word;">-1</td>
+            <td>The timeout (in ms) for flushing the `close_notify` that was triggered by closing a channel. If the `close_notify` was not flushed in the given timeout the channel will be closed forcibly. (-1 = use system default)</td>
+        </tr>
+        <tr>
             <td><h5>security.ssl.enabled</h5></td>
             <td style="word-wrap: break-word;">false</td>
             <td>Turns on SSL for internal network communication. This can be optionally overridden by flags defined in different transport modules.</td>
         </tr>
         <tr>
+            <td><h5>security.ssl.handshake-timeout</h5></td>
+            <td style="word-wrap: break-word;">-1</td>
+            <td>The timeout (in ms) during SSL handshake. (-1 = use system default)</td>
+        </tr>
+        <tr>
             <td><h5>security.ssl.key-password</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>The secret to decrypt the server key in the keystore.</td>
@@ -38,6 +48,16 @@
             <td>The SSL protocol version to be supported for the ssl transport. Note that it doesn’t support comma separated list.</td>
         </tr>
         <tr>
+            <td><h5>security.ssl.session-cache-size</h5></td>
+            <td style="word-wrap: break-word;">-1</td>
+            <td>The size of the cache used for storing SSL session objects. According to https://github.com/netty/netty/issues/832, you should always set this to an appropriate number to not run into a bug with stalling IO threads during garbage collection. (-1 = use system default).</td>
+        </tr>
+        <tr>
+            <td><h5>security.ssl.session-timeout</h5></td>
+            <td style="word-wrap: break-word;">-1</td>
+            <td>The timeout (in ms) for the cached SSL session objects. (-1 = use system default)</td>
+        </tr>
+        <tr>
             <td><h5>security.ssl.truststore</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>The truststore file containing the public CA certificates to be used by flink endpoints to verify the peer’s certificate.</td>
diff --git a/docs/ops/security-ssl.md b/docs/ops/security-ssl.md
index c2ba7df..a805238 100644
--- a/docs/ops/security-ssl.md
+++ b/docs/ops/security-ssl.md
@@ -33,6 +33,10 @@ SSL can be enabled for all network communication between Flink components. SSL k
 * **akka.ssl.enabled**: SSL flag for akka based control connection between the Flink client, jobmanager and taskmanager 
 * **jobmanager.web.ssl.enabled**: Flag to enable https access to the jobmanager's web frontend
 
+### Complete List of SSL Options
+
+{% include generated/security_configuration.html %}
+
 ## Deploying Keystores and Truststores
 
 You need to have a Java Keystore generated and copied to each node in the Flink cluster. The common name or subject alternative names in the certificate should match the node's hostname and IP address. Keystores and truststores can be generated using the [keytool utility](https://docs.oracle.com/javase/8/docs/technotes/tools/unix/keytool.html). All Flink components should have read access to the keystore and truststore files.
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
index 0f25c6c..60a9764 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
@@ -160,4 +160,41 @@ public class SecurityOptions {
 		key("security.ssl.verify-hostname")
 			.defaultValue(true)
 			.withDescription("Flag to enable peer’s hostname verification during ssl handshake.");
+
+	/**
+	 * SSL session cache size.
+	 */
+	public static final ConfigOption<Integer> SSL_SESSION_CACHE_SIZE =
+		key("security.ssl.session-cache-size")
+			.defaultValue(-1)
+			.withDescription("The size of the cache used for storing SSL session objects. "
+				+ "According to https://github.com/netty/netty/issues/832, you should always set "
+				+ "this to an appropriate number to not run into a bug with stalling IO threads "
+				+ "during garbage collection. (-1 = use system default).");
+
+	/**
+	 * SSL session timeout.
+	 */
+	public static final ConfigOption<Integer> SSL_SESSION_TIMEOUT =
+		key("security.ssl.session-timeout")
+			.defaultValue(-1)
+			.withDescription("The timeout (in ms) for the cached SSL session objects. (-1 = use system default)");
+
+	/**
+	 * SSL session timeout during handshakes.
+	 */
+	public static final ConfigOption<Integer> SSL_HANDSHAKE_TIMEOUT =
+		key("security.ssl.handshake-timeout")
+			.defaultValue(-1)
+			.withDescription("The timeout (in ms) during SSL handshake. (-1 = use system default)");
+
+	/**
+	 * SSL session timeout after flushing the <tt>close_notify</tt> message.
+	 */
+	public static final ConfigOption<Integer> SSL_CLOSE_NOTIFY_FLUSH_TIMEOUT =
+		key("security.ssl.close-notify-flush-timeout")
+			.defaultValue(-1)
+			.withDescription("The timeout (in ms) for flushing the `close_notify` that was triggered by closing a " +
+				"channel. If the `close_notify` was not flushed in the given timeout the channel will be closed " +
+				"forcibly. (-1 = use system default)");
 }
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
index 57f4718..30c4edf 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
@@ -58,7 +58,7 @@ import org.apache.flink.shaded.netty4.io.netty.util.CharsetUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.ssl.SSLContext;
+import javax.annotation.Nullable;
 import javax.net.ssl.SSLEngine;
 
 import java.io.File;
@@ -104,7 +104,8 @@ public class MesosArtifactServer implements MesosArtifactResolver {
 
 	private final Map<Path, URL> paths = new HashMap<>();
 
-	private final SSLContext serverSSLContext;
+	@Nullable
+	private final SSLUtils.SSLContext serverSSLContext;
 
 	public MesosArtifactServer(String prefix, String serverHostname, int configuredPort, Configuration config)
 		throws Exception {
@@ -139,7 +140,7 @@ public class MesosArtifactServer implements MesosArtifactResolver {
 
 				// SSL should be the first handler in the pipeline
 				if (serverSSLContext != null) {
-					SSLEngine sslEngine = serverSSLContext.createSSLEngine();
+					SSLEngine sslEngine = serverSSLContext.getSslContext().createSSLEngine();
 					SSLUtils.setSSLVerAndCipherSuites(sslEngine, sslConfig);
 					sslEngine.setUseClientMode(false);
 					ch.pipeline().addLast("ssl", new SslHandler(sslEngine));
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index f27ae00..4323ad0 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -92,7 +92,7 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.ssl.SSLContext;
+import javax.annotation.Nullable;
 
 import java.io.File;
 import java.io.IOException;
@@ -130,7 +130,8 @@ public class WebRuntimeMonitor implements WebMonitor {
 	/** Service which retrieves the currently leading JobManager and opens a JobManagerGateway. */
 	private final LeaderGatewayRetriever<JobManagerGateway> retriever;
 
-	private final SSLContext serverSSLContext;
+	@Nullable
+	private final SSLUtils.SSLContext serverSSLContext;
 
 	private final CompletableFuture<String> localRestAddress = new CompletableFuture<>();
 
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
index 108f5c4..0484afd 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
@@ -43,7 +43,7 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.ssl.SSLContext;
+import javax.annotation.Nullable;
 
 import java.io.File;
 import java.io.FileWriter;
@@ -90,7 +90,8 @@ public class HistoryServer {
 
 	private final HistoryServerArchiveFetcher archiveFetcher;
 
-	private final SSLContext serverSSLContext;
+	@Nullable
+	private final SSLUtils.SSLContext serverSSLContext;
 	private WebFrontendBootstrap netty;
 
 	private final Object startupShutdownLock = new Object();
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
index 740beae..c3148b7 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
@@ -40,7 +40,7 @@ import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandle
 
 import org.slf4j.Logger;
 
-import javax.net.ssl.SSLContext;
+import javax.annotation.Nullable;
 import javax.net.ssl.SSLEngine;
 
 import java.io.File;
@@ -55,7 +55,8 @@ public class WebFrontendBootstrap {
 	private final Router router;
 	private final Logger log;
 	private final File uploadDir;
-	private final SSLContext serverSSLContext;
+	@Nullable
+	private final SSLUtils.SSLContext serverSSLContext;
 	private final ServerBootstrap bootstrap;
 	private final Channel serverChannel;
 	private final String restAddress;
@@ -64,7 +65,7 @@ public class WebFrontendBootstrap {
 			Router router,
 			Logger log,
 			File directory,
-			SSLContext sslContext,
+			@Nullable SSLUtils.SSLContext sslContext,
 			String configuredAddress,
 			int configuredPort,
 			final Configuration config) throws InterruptedException, UnknownHostException {
@@ -81,7 +82,7 @@ public class WebFrontendBootstrap {
 
 				// SSL should be the first handler in the pipeline
 				if (serverSSLContext != null) {
-					SSLEngine sslEngine = serverSSLContext.createSSLEngine();
+					SSLEngine sslEngine = serverSSLContext.getSslContext().createSSLEngine();
 					SSLUtils.setSSLVerAndCipherSuites(sslEngine, config);
 					sslEngine.setUseClientMode(false);
 					ch.pipeline().addLast("ssl", new SslHandler(sslEngine));
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
index 8e6b328..4bf7177 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
@@ -31,7 +31,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
-import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLParameters;
 import javax.net.ssl.SSLSocket;
 
@@ -91,7 +90,7 @@ public final class BlobClient implements Closeable {
 
 		try {
 			// Check if ssl is enabled
-			SSLContext clientSSLContext = null;
+			SSLUtils.SSLContext clientSSLContext = null;
 			if (clientConfig != null &&
 				clientConfig.getBoolean(BlobServerOptions.SSL_ENABLED)) {
 
@@ -102,7 +101,7 @@ public final class BlobClient implements Closeable {
 
 				LOG.info("Using ssl connection to the blob server");
 
-				SSLSocket sslSocket = (SSLSocket) clientSSLContext.getSocketFactory().createSocket(
+				SSLSocket sslSocket = (SSLSocket) clientSSLContext.getSslContext().getSocketFactory().createSocket(
 					serverAddress.getAddress(),
 					serverAddress.getPort());
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index dd0155c..1a1b0da 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -33,7 +33,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
-import javax.net.ssl.SSLContext;
 
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -79,7 +78,7 @@ public class BlobServer extends Thread implements BlobService, BlobWriter, Perma
 	private final ServerSocket serverSocket;
 
 	/** The SSL server context if ssl is enabled for the connections. */
-	private final SSLContext serverSSLContext;
+	private final SSLUtils.SSLContext serverSSLContext;
 
 	/** Blob Server configuration. */
 	private final Configuration blobServiceConfiguration;
@@ -196,7 +195,7 @@ public class BlobServer extends Thread implements BlobService, BlobWriter, Perma
 					return new ServerSocket(port, finalBacklog);
 				} else {
 					LOG.info("Enabling ssl for the blob server");
-					return serverSSLContext.getServerSocketFactory().createServerSocket(port, finalBacklog);
+					return serverSSLContext.getSslContext().getServerSocketFactory().createServerSocket(port, finalBacklog);
 				}
 			}
 		});
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
index 5fb083d..44561b0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.io.network.netty;
 
+import org.apache.flink.runtime.net.SSLUtils;
+
 import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelException;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
@@ -34,9 +36,10 @@ import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.ssl.SSLContext;
+import javax.annotation.Nullable;
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLParameters;
+
 import java.io.IOException;
 import java.net.InetSocketAddress;
 
@@ -52,7 +55,8 @@ class NettyClient {
 
 	private Bootstrap bootstrap;
 
-	private SSLContext clientSSLContext = null;
+	@Nullable
+	private SSLUtils.SSLContext clientSSLContext = null;
 
 	NettyClient(NettyConfig config) {
 		this.config = config;
@@ -178,7 +182,7 @@ class NettyClient {
 
 				// SSL handler should be added first in the pipeline
 				if (clientSSLContext != null) {
-					SSLEngine sslEngine = clientSSLContext.createSSLEngine(
+					SSLEngine sslEngine = clientSSLContext.getSslContext().createSSLEngine(
 						serverSocketAddress.getAddress().getCanonicalHostName(),
 						serverSocketAddress.getPort());
 					sslEngine.setUseClientMode(true);
@@ -190,7 +194,14 @@ class NettyClient {
 						sslEngine.setSSLParameters(newSSLParameters);
 					}
 
-					channel.pipeline().addLast("ssl", new SslHandler(sslEngine));
+					SslHandler sslHandler = new SslHandler(sslEngine);
+					if (clientSSLContext.getHandshakeTimeoutMs() >= 0) {
+						sslHandler.setHandshakeTimeoutMillis(clientSSLContext.getHandshakeTimeoutMs());
+					}
+					if (clientSSLContext.getCloseNotifyFlushTimeoutMs() >= 0) {
+						sslHandler.setCloseNotifyTimeoutMillis(clientSSLContext.getCloseNotifyFlushTimeoutMs());
+					}
+					channel.pipeline().addLast("ssl", sslHandler);
 				}
 				channel.pipeline().addLast(protocol.getClientChannelHandlers());
 			}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
index 18527c4..9b32ebb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
@@ -23,12 +23,14 @@ import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.net.SSLUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.ssl.SSLContext;
+import javax.annotation.Nullable;
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLParameters;
+
 import java.net.InetAddress;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -189,26 +191,13 @@ public class NettyConfig {
 		}
 	}
 
-	public SSLContext createClientSSLContext() throws Exception {
-
-		// Create SSL Context from config
-		SSLContext clientSSLContext = null;
-		if (getSSLEnabled()) {
-			clientSSLContext = SSLUtils.createSSLClientContext(config);
-		}
-
-		return clientSSLContext;
+	@Nullable
+	public SSLUtils.SSLContext createClientSSLContext() throws Exception {
+		return SSLUtils.createSSLClientContext(config);
 	}
 
-	public SSLContext createServerSSLContext() throws Exception {
-
-		// Create SSL Context from config
-		SSLContext serverSSLContext = null;
-		if (getSSLEnabled()) {
-			serverSSLContext = SSLUtils.createSSLServerContext(config);
-		}
-
-		return serverSSLContext;
+	public SSLUtils.SSLContext createServerSSLContext() throws Exception {
+		return SSLUtils.createSSLServerContext(config);
 	}
 
 	public boolean getSSLEnabled() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
index c6d09d0..f919ded 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.netty;
 
+import org.apache.flink.runtime.net.SSLUtils;
 import org.apache.flink.runtime.util.FatalExitExceptionHandler;
 
 import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -36,7 +37,7 @@ import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.ssl.SSLContext;
+import javax.annotation.Nullable;
 import javax.net.ssl.SSLEngine;
 
 import java.io.IOException;
@@ -61,7 +62,8 @@ class NettyServer {
 
 	private ChannelFuture bindFuture;
 
-	private SSLContext serverSSLContext = null;
+	@Nullable
+	private SSLUtils.SSLContext serverSSLContext = null;
 
 	private InetSocketAddress localAddress;
 
@@ -152,10 +154,17 @@ class NettyServer {
 			@Override
 			public void initChannel(SocketChannel channel) throws Exception {
 				if (serverSSLContext != null) {
-					SSLEngine sslEngine = serverSSLContext.createSSLEngine();
+					SSLEngine sslEngine = serverSSLContext.getSslContext().createSSLEngine();
 					config.setSSLVerAndCipherSuites(sslEngine);
 					sslEngine.setUseClientMode(false);
-					channel.pipeline().addLast("ssl", new SslHandler(sslEngine));
+					SslHandler sslHandler = new SslHandler(sslEngine);
+					if (serverSSLContext.getHandshakeTimeoutMs() >= 0) {
+						sslHandler.setHandshakeTimeoutMillis(serverSSLContext.getHandshakeTimeoutMs());
+					}
+					if (serverSSLContext.getCloseNotifyFlushTimeoutMs() >= 0) {
+						sslHandler.setCloseNotifyTimeoutMillis(serverSSLContext.getCloseNotifyFlushTimeoutMs());
+					}
+					channel.pipeline().addLast("ssl", sslHandler);
 				}
 
 				channel.pipeline().addLast(protocol.getServerChannelHandlers());
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
index b574d30..69da666 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
@@ -113,7 +113,7 @@ public class SSLUtils {
 		checkState(sslContext != null, "%s it not enabled", SecurityOptions.SSL_ENABLED.key());
 
 		return new SSLEngineFactory(
-			sslContext,
+			sslContext.getSslContext(),
 			getEnabledProtocols(config),
 			getEnabledCipherSuites(config),
 			clientMode);
@@ -176,39 +176,43 @@ public class SSLUtils {
 	public static SSLContext createSSLClientContext(Configuration sslConfig) throws Exception {
 
 		Preconditions.checkNotNull(sslConfig);
-		SSLContext clientSSLContext = null;
 
-		if (getSSLEnabled(sslConfig)) {
-			LOG.debug("Creating client SSL context from configuration");
-
-			String trustStoreFilePath = sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE);
-			String trustStorePassword = sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD);
-			String sslProtocolVersion = sslConfig.getString(SecurityOptions.SSL_PROTOCOL);
+		if (!getSSLEnabled(sslConfig)) {
+			return null;
+		}
 
-			Preconditions.checkNotNull(trustStoreFilePath, SecurityOptions.SSL_TRUSTSTORE.key() + " was not configured.");
-			Preconditions.checkNotNull(trustStorePassword, SecurityOptions.SSL_TRUSTSTORE_PASSWORD.key() + " was not configured.");
+		LOG.debug("Creating client SSL context from configuration");
 
-			KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
+		String trustStoreFilePath = sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE);
+		String trustStorePassword = sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD);
+		String sslProtocolVersion = sslConfig.getString(SecurityOptions.SSL_PROTOCOL);
+		int sessionCacheSize = sslConfig.getInteger(SecurityOptions.SSL_SESSION_CACHE_SIZE);
+		int sessionTimeoutMs = sslConfig.getInteger(SecurityOptions.SSL_SESSION_TIMEOUT);
+		int handshakeTimeoutMs = sslConfig.getInteger(SecurityOptions.SSL_HANDSHAKE_TIMEOUT);
+		int closeNotifyFlushTimeoutMs = sslConfig.getInteger(SecurityOptions.SSL_CLOSE_NOTIFY_FLUSH_TIMEOUT);
 
-			FileInputStream trustStoreFile = null;
-			try {
-				trustStoreFile = new FileInputStream(new File(trustStoreFilePath));
-				trustStore.load(trustStoreFile, trustStorePassword.toCharArray());
-			} finally {
-				if (trustStoreFile != null) {
-					trustStoreFile.close();
-				}
-			}
+		Preconditions.checkNotNull(trustStoreFilePath, SecurityOptions.SSL_TRUSTSTORE.key() + " was not configured.");
+		Preconditions.checkNotNull(trustStorePassword, SecurityOptions.SSL_TRUSTSTORE_PASSWORD.key() + " was not configured.");
 
-			TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(
-				TrustManagerFactory.getDefaultAlgorithm());
-			trustManagerFactory.init(trustStore);
+		KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
 
-			clientSSLContext = SSLContext.getInstance(sslProtocolVersion);
-			clientSSLContext.init(null, trustManagerFactory.getTrustManagers(), null);
+		try (FileInputStream trustStoreFile = new FileInputStream(new File(trustStoreFilePath))) {
+			trustStore.load(trustStoreFile, trustStorePassword.toCharArray());
 		}
 
-		return clientSSLContext;
+		TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(
+			TrustManagerFactory.getDefaultAlgorithm());
+		trustManagerFactory.init(trustStore);
+
+		javax.net.ssl.SSLContext clientSSLContext = javax.net.ssl.SSLContext.getInstance(sslProtocolVersion);
+		clientSSLContext.init(null, trustManagerFactory.getTrustManagers(), null);
+		if (sessionCacheSize >= 0) {
+			clientSSLContext.getClientSessionContext().setSessionCacheSize(sessionCacheSize);
+		}
+		if (sessionTimeoutMs >= 0) {
+			clientSSLContext.getClientSessionContext().setSessionTimeout(sessionTimeoutMs / 1000);
+		}
+		return new SSLContext(clientSSLContext, handshakeTimeoutMs, closeNotifyFlushTimeoutMs);
 	}
 
 	/**
@@ -225,38 +229,77 @@ public class SSLUtils {
 	public static SSLContext createSSLServerContext(Configuration sslConfig) throws Exception {
 
 		Preconditions.checkNotNull(sslConfig);
-		SSLContext serverSSLContext = null;
 
-		if (getSSLEnabled(sslConfig)) {
-			LOG.debug("Creating server SSL context from configuration");
+		if (!getSSLEnabled(sslConfig)) {
+			return null;
+		}
 
-			String keystoreFilePath = sslConfig.getString(SecurityOptions.SSL_KEYSTORE);
+		LOG.debug("Creating server SSL context from configuration");
 
-			String keystorePassword = sslConfig.getString(SecurityOptions.SSL_KEYSTORE_PASSWORD);
+		String keystoreFilePath = sslConfig.getString(SecurityOptions.SSL_KEYSTORE);
+		String keystorePassword = sslConfig.getString(SecurityOptions.SSL_KEYSTORE_PASSWORD);
+		String certPassword = sslConfig.getString(SecurityOptions.SSL_KEY_PASSWORD);
+		String sslProtocolVersion = sslConfig.getString(SecurityOptions.SSL_PROTOCOL);
+		int sessionCacheSize = sslConfig.getInteger(SecurityOptions.SSL_SESSION_CACHE_SIZE);
+		int sessionTimeoutMs = sslConfig.getInteger(SecurityOptions.SSL_SESSION_TIMEOUT);
+		int handshakeTimeoutMs = sslConfig.getInteger(SecurityOptions.SSL_HANDSHAKE_TIMEOUT);
+		int closeNotifyFlushTimeoutMs = sslConfig.getInteger(SecurityOptions.SSL_CLOSE_NOTIFY_FLUSH_TIMEOUT);
 
-			String certPassword = sslConfig.getString(SecurityOptions.SSL_KEY_PASSWORD);
+		Preconditions.checkNotNull(keystoreFilePath, SecurityOptions.SSL_KEYSTORE.key() + " was not configured.");
+		Preconditions.checkNotNull(keystorePassword, SecurityOptions.SSL_KEYSTORE_PASSWORD.key() + " was not configured.");
+		Preconditions.checkNotNull(certPassword, SecurityOptions.SSL_KEY_PASSWORD.key() + " was not configured.");
 
-			String sslProtocolVersion = sslConfig.getString(SecurityOptions.SSL_PROTOCOL);
+		KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
+		try (FileInputStream keyStoreFile = new FileInputStream(new File(keystoreFilePath))) {
+			ks.load(keyStoreFile, keystorePassword.toCharArray());
+		}
 
-			Preconditions.checkNotNull(keystoreFilePath, SecurityOptions.SSL_KEYSTORE.key() + " was not configured.");
-			Preconditions.checkNotNull(keystorePassword, SecurityOptions.SSL_KEYSTORE_PASSWORD.key() + " was not configured.");
-			Preconditions.checkNotNull(certPassword, SecurityOptions.SSL_KEY_PASSWORD.key() + " was not configured.");
+		// Set up key manager factory to use the server key store
+		KeyManagerFactory kmf = KeyManagerFactory.getInstance(
+			KeyManagerFactory.getDefaultAlgorithm());
+		kmf.init(ks, certPassword.toCharArray());
 
-			KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
-			try (FileInputStream keyStoreFile = new FileInputStream(new File(keystoreFilePath))) {
-				ks.load(keyStoreFile, keystorePassword.toCharArray());
-			}
+		// Initialize the SSLContext
+		javax.net.ssl.SSLContext serverSSLContext = javax.net.ssl.SSLContext.getInstance(sslProtocolVersion);
+		serverSSLContext.init(kmf.getKeyManagers(), null, null);
+		if (sessionCacheSize >= 0) {
+			serverSSLContext.getServerSessionContext().setSessionCacheSize(sessionCacheSize);
+		}
+		if (sessionTimeoutMs >= 0) {
+			serverSSLContext.getServerSessionContext().setSessionTimeout(sessionTimeoutMs / 1000);
+		}
 
-			// Set up key manager factory to use the server key store
-			KeyManagerFactory kmf = KeyManagerFactory.getInstance(
-					KeyManagerFactory.getDefaultAlgorithm());
-			kmf.init(ks, certPassword.toCharArray());
+		return new SSLContext(serverSSLContext, handshakeTimeoutMs, closeNotifyFlushTimeoutMs);
+	}
 
-			// Initialize the SSLContext
-			serverSSLContext = SSLContext.getInstance(sslProtocolVersion);
-			serverSSLContext.init(kmf.getKeyManagers(), null, null);
+	/**
+	 * Wrapper around javax.net.ssl.SSLContext, adding SSL handshake and close notify timeouts
+	 * which cannot be set on the SSL context directly.
+	 */
+	public static class SSLContext {
+		private final javax.net.ssl.SSLContext sslContext;
+		private final int handshakeTimeoutMs;
+		private final int closeNotifyFlushTimeoutMs;
+
+		public SSLContext(
+				javax.net.ssl.SSLContext sslContext,
+				int handshakeTimeoutMs,
+				int closeNotifyFlushTimeoutMs) {
+			this.sslContext = sslContext;
+			this.handshakeTimeoutMs = handshakeTimeoutMs;
+			this.closeNotifyFlushTimeoutMs = closeNotifyFlushTimeoutMs;
 		}
 
-		return serverSSLContext;
+		public javax.net.ssl.SSLContext getSslContext() {
+			return sslContext;
+		}
+
+		public int getHandshakeTimeoutMs() {
+			return handshakeTimeoutMs;
+		}
+
+		public int getCloseNotifyFlushTimeoutMs() {
+			return closeNotifyFlushTimeoutMs;
+		}
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
index 33e004e..e7113ec 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.netty;
 
+import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.util.NetUtils;
@@ -26,15 +27,27 @@ import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.string.StringDecoder;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.string.StringEncoder;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
 
 import org.junit.Assert;
 import org.junit.Test;
 
+import javax.net.ssl.SSLSessionContext;
+
 import java.net.InetAddress;
 
+import static org.apache.flink.configuration.SecurityOptions.SSL_CLOSE_NOTIFY_FLUSH_TIMEOUT;
+import static org.apache.flink.configuration.SecurityOptions.SSL_HANDSHAKE_TIMEOUT;
+import static org.apache.flink.configuration.SecurityOptions.SSL_SESSION_CACHE_SIZE;
+import static org.apache.flink.configuration.SecurityOptions.SSL_SESSION_TIMEOUT;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
+/**
+ * Tests for communication between {@link NettyServer} and {@link NettyClient} via SSL.
+ */
 public class NettyClientServerSslTest {
 
 	/**
@@ -42,52 +55,76 @@ public class NettyClientServerSslTest {
 	 */
 	@Test
 	public void testValidSslConnection() throws Exception {
-		NettyProtocol protocol = new NettyProtocol(null, null, true) {
-			@Override
-			public ChannelHandler[] getServerChannelHandlers() {
-				return new ChannelHandler[0];
-			}
+		testValidSslConnection(createSslConfig());
+	}
 
-			@Override
-			public ChannelHandler[] getClientChannelHandlers() {
-				return new ChannelHandler[0];
-			}
-		};
+	/**
+	 * Verify valid (advanced) ssl configuration and connection.
+	 */
+	@Test
+	public void testValidSslConnectionAdvanced() throws Exception {
+		Configuration sslConfig = createSslConfig();
+		sslConfig.setInteger(SSL_SESSION_CACHE_SIZE, 1);
+		sslConfig.setInteger(SSL_SESSION_TIMEOUT, 1_000);
+		sslConfig.setInteger(SSL_HANDSHAKE_TIMEOUT, 1_000);
+		sslConfig.setInteger(SSL_CLOSE_NOTIFY_FLUSH_TIMEOUT, 1_000);
+
+		testValidSslConnection(sslConfig);
+	}
+
+	private void testValidSslConnection(Configuration sslConfig) throws Exception {
+		NettyProtocol protocol = getEmptyNettyProtocol();
 
 		NettyConfig nettyConfig = new NettyConfig(
 			InetAddress.getLoopbackAddress(),
 			NetUtils.getAvailablePort(),
 			NettyTestUtil.DEFAULT_SEGMENT_SIZE,
 			1,
-			createSslConfig());
+			sslConfig);
 
 		NettyTestUtil.NettyServerAndClient serverAndClient = NettyTestUtil.initServerAndClient(protocol, nettyConfig);
 
 		Channel ch = NettyTestUtil.connect(serverAndClient);
 
+		SslHandler sslHandler = (SslHandler) ch.pipeline().get("ssl");
+		assertEqualsOrDefault(sslConfig, SSL_HANDSHAKE_TIMEOUT, sslHandler.getHandshakeTimeoutMillis());
+		assertEqualsOrDefault(sslConfig, SSL_CLOSE_NOTIFY_FLUSH_TIMEOUT, sslHandler.getCloseNotifyTimeoutMillis());
+
 		// should be able to send text data
 		ch.pipeline().addLast(new StringDecoder()).addLast(new StringEncoder());
 		assertTrue(ch.writeAndFlush("test").await().isSuccess());
 
+		// session context is only be available after a session was setup -> this should be true after data was sent
+		SSLSessionContext sessionContext = sslHandler.engine().getSession().getSessionContext();
+		assertNotNull("bug in unit test setup: session context not available", sessionContext);
+		assertEqualsOrDefault(sslConfig, SSL_SESSION_CACHE_SIZE, sessionContext.getSessionCacheSize());
+		int sessionTimeout = sslConfig.getInteger(SSL_SESSION_TIMEOUT);
+		if (sessionTimeout != -1) {
+			// session timeout config is in milliseconds but the context returns it in seconds
+			assertEquals(sessionTimeout / 1000, sessionContext.getSessionTimeout());
+		} else {
+			assertTrue("default value (-1) should not be propagated", sessionContext.getSessionTimeout() >= 0);
+		}
+
 		NettyTestUtil.shutdown(serverAndClient);
 	}
 
+	private static void assertEqualsOrDefault(Configuration sslConfig, ConfigOption<Integer> option, long actual) {
+		long expected = sslConfig.getInteger(option);
+		if (expected != option.defaultValue()) {
+			assertEquals(expected, actual);
+		} else {
+			assertTrue("default value (" + option.defaultValue() + ") should not be propagated",
+				actual >= 0);
+		}
+	}
+
 	/**
 	 * Verify failure on invalid ssl configuration.
 	 */
 	@Test
-	public void testInvalidSslConfiguration() throws Exception {
-		NettyProtocol protocol = new NettyProtocol(null, null, true) {
-			@Override
-			public ChannelHandler[] getServerChannelHandlers() {
-				return new ChannelHandler[0];
-			}
-
-			@Override
-			public ChannelHandler[] getClientChannelHandlers() {
-				return new ChannelHandler[0];
-			}
-		};
+	public void testInvalidSslConfiguration() {
+		NettyProtocol protocol = getEmptyNettyProtocol();
 
 		Configuration config = createSslConfig();
 		// Modify the keystore password to an incorrect one
@@ -116,17 +153,7 @@ public class NettyClientServerSslTest {
 	 */
 	@Test
 	public void testSslHandshakeError() throws Exception {
-		NettyProtocol protocol = new NettyProtocol(null, null, true) {
-			@Override
-			public ChannelHandler[] getServerChannelHandlers() {
-				return new ChannelHandler[0];
-			}
-
-			@Override
-			public ChannelHandler[] getClientChannelHandlers() {
-				return new ChannelHandler[0];
-			}
-		};
+		NettyProtocol protocol = getEmptyNettyProtocol();
 
 		Configuration config = createSslConfig();
 
@@ -151,8 +178,7 @@ public class NettyClientServerSslTest {
 		NettyTestUtil.shutdown(serverAndClient);
 	}
 
-	private Configuration createSslConfig() throws Exception {
-
+	private Configuration createSslConfig() {
 		Configuration flinkConfig = new Configuration();
 		flinkConfig.setBoolean(SecurityOptions.SSL_ENABLED, true);
 		flinkConfig.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore");
@@ -162,4 +188,18 @@ public class NettyClientServerSslTest {
 		flinkConfig.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "password");
 		return flinkConfig;
 	}
+
+	private static NettyProtocol getEmptyNettyProtocol() {
+		return new NettyProtocol(null, null, true) {
+			@Override
+			public ChannelHandler[] getServerChannelHandlers() {
+				return new ChannelHandler[0];
+			}
+
+			@Override
+			public ChannelHandler[] getClientChannelHandlers() {
+				return new ChannelHandler[0];
+			}
+		};
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
index 38c8cee..a5db40f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.configuration.SecurityOptions;
 import org.junit.Assert;
 import org.junit.Test;
 
-import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLServerSocket;
 
@@ -33,6 +32,7 @@ import java.util.Arrays;
 
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 
 /**
@@ -58,7 +58,7 @@ public class SSLUtilsTest {
 		clientConfig.setString(SecurityOptions.SSL_TRUSTSTORE, "src/test/resources/local127.truststore");
 		clientConfig.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "password");
 
-		SSLContext clientContext = SSLUtils.createSSLClientContext(clientConfig);
+		SSLUtils.SSLContext clientContext = SSLUtils.createSSLClientContext(clientConfig);
 		Assert.assertNotNull(clientContext);
 	}
 
@@ -71,7 +71,7 @@ public class SSLUtilsTest {
 		Configuration clientConfig = new Configuration();
 		clientConfig.setBoolean(SecurityOptions.SSL_ENABLED, false);
 
-		SSLContext clientContext = SSLUtils.createSSLClientContext(clientConfig);
+		SSLUtils.SSLContext clientContext = SSLUtils.createSSLClientContext(clientConfig);
 		Assert.assertNull(clientContext);
 	}
 
@@ -87,7 +87,7 @@ public class SSLUtilsTest {
 		clientConfig.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "badpassword");
 
 		try {
-			SSLContext clientContext = SSLUtils.createSSLClientContext(clientConfig);
+			SSLUtils.SSLContext clientContext = SSLUtils.createSSLClientContext(clientConfig);
 			Assert.fail("SSL client context created even with bad SSL configuration ");
 		} catch (Exception e) {
 			// Exception here is valid
@@ -106,7 +106,7 @@ public class SSLUtilsTest {
 		serverConfig.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password");
 		serverConfig.setString(SecurityOptions.SSL_KEY_PASSWORD, "password");
 
-		SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig);
+		SSLUtils.SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig);
 		Assert.assertNotNull(serverContext);
 	}
 
@@ -119,7 +119,7 @@ public class SSLUtilsTest {
 		Configuration serverConfig = new Configuration();
 		serverConfig.setBoolean(SecurityOptions.SSL_ENABLED, false);
 
-		SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig);
+		SSLUtils.SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig);
 		Assert.assertNull(serverContext);
 	}
 
@@ -136,7 +136,7 @@ public class SSLUtilsTest {
 		serverConfig.setString(SecurityOptions.SSL_KEY_PASSWORD, "badpassword");
 
 		try {
-			SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig);
+			SSLUtils.SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig);
 			Assert.fail("SSL server context created even with bad SSL configuration ");
 		} catch (Exception e) {
 			// Exception here is valid
@@ -157,7 +157,7 @@ public class SSLUtilsTest {
 		serverConfig.setString(SecurityOptions.SSL_PROTOCOL, "TLSv1,TLSv1.2");
 
 		try {
-			SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig);
+			SSLUtils.SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig);
 			Assert.fail("SSL server context created even with multiple protocols set ");
 		} catch (Exception e) {
 			// Exception here is valid
@@ -178,10 +178,9 @@ public class SSLUtilsTest {
 		serverConfig.setString(SecurityOptions.SSL_PROTOCOL, "TLSv1.1");
 		serverConfig.setString(SecurityOptions.SSL_ALGORITHMS, "TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_128_CBC_SHA256");
 
-		SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig);
-		ServerSocket socket = null;
-		try {
-			socket = serverContext.getServerSocketFactory().createServerSocket(0);
+		SSLUtils.SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig);
+		assertNotNull(serverContext);
+		try (ServerSocket socket = serverContext.getSslContext().getServerSocketFactory().createServerSocket(0)) {
 
 			String[] protocols = ((SSLServerSocket) socket).getEnabledProtocols();
 			String[] algorithms = ((SSLServerSocket) socket).getEnabledCipherSuites();
@@ -198,10 +197,6 @@ public class SSLUtilsTest {
 			Assert.assertEquals(2, algorithms.length);
 			Assert.assertTrue(algorithms[0].equals("TLS_RSA_WITH_AES_128_CBC_SHA") || algorithms[0].equals("TLS_RSA_WITH_AES_128_CBC_SHA256"));
 			Assert.assertTrue(algorithms[1].equals("TLS_RSA_WITH_AES_128_CBC_SHA") || algorithms[1].equals("TLS_RSA_WITH_AES_128_CBC_SHA256"));
-		} finally {
-			if (socket != null) {
-				socket.close();
-			}
 		}
 	}
 
@@ -219,8 +214,9 @@ public class SSLUtilsTest {
 		serverConfig.setString(SecurityOptions.SSL_PROTOCOL, "TLSv1");
 		serverConfig.setString(SecurityOptions.SSL_ALGORITHMS, "TLS_DHE_RSA_WITH_AES_128_CBC_SHA,TLS_DHE_RSA_WITH_AES_128_CBC_SHA256");
 
-		SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig);
-		SSLEngine engine = serverContext.createSSLEngine();
+		SSLUtils.SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig);
+		assertNotNull(serverContext);
+		SSLEngine engine = serverContext.getSslContext().createSSLEngine();
 
 		String[] protocols = engine.getEnabledProtocols();
 		String[] algorithms = engine.getEnabledCipherSuites();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
index 93dbb5d..59db163 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
@@ -164,9 +164,9 @@ public class RestServerEndpointITCase extends TestLogger {
 		config.setString(WebOptions.UPLOAD_DIR, temporaryFolder.newFolder().getCanonicalPath());
 
 		defaultSSLContext = SSLContext.getDefault();
-		final SSLContext sslClientContext = SSLUtils.createSSLClientContext(config);
+		final SSLUtils.SSLContext sslClientContext = SSLUtils.createSSLClientContext(config);
 		if (sslClientContext != null) {
-			SSLContext.setDefault(sslClientContext);
+			SSLContext.setDefault(sslClientContext.getSslContext());
 		}
 
 		RestServerEndpointConfiguration serverConfig = RestServerEndpointConfiguration.fromConfiguration(config);