You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/10/23 08:48:49 UTC

[GitHub] NicoK closed pull request #6838: [FLINK-9878][network][ssl] add more low-level ssl options

NicoK closed pull request #6838: [FLINK-9878][network][ssl] add more low-level ssl options
URL: https://github.com/apache/flink/pull/6838
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/_includes/generated/security_configuration.html b/docs/_includes/generated/security_configuration.html
index 680c1c02434..8999336926f 100644
--- a/docs/_includes/generated/security_configuration.html
+++ b/docs/_includes/generated/security_configuration.html
@@ -12,11 +12,21 @@
             <td style="word-wrap: break-word;">"TLS_RSA_WITH_AES_128_CBC_SHA"</td>
             <td>The comma separated list of standard SSL algorithms to be supported. Read more <a href="http://docs.oracle.com/javase/8/docs/technotes/guides/security/StandardNames.html#ciphersuites">here</a></td>
         </tr>
+        <tr>
+            <td><h5>security.ssl.internal.close-notify-flush-timeout</h5></td>
+            <td style="word-wrap: break-word;">-1</td>
+            <td>The timeout (in ms) for flushing the `close_notify` that was triggered by closing a channel. If the `close_notify` was not flushed in the given timeout the channel will be closed forcibly. (-1 = use system default)</td>
+        </tr>
         <tr>
             <td><h5>security.ssl.internal.enabled</h5></td>
             <td style="word-wrap: break-word;">false</td>
             <td>Turns on SSL for internal network communication. Optionally, specific components may override this through their own settings (rpc, data transport, REST, etc).</td>
         </tr>
+        <tr>
+            <td><h5>security.ssl.internal.handshake-timeout</h5></td>
+            <td style="word-wrap: break-word;">-1</td>
+            <td>The timeout (in ms) during SSL handshake. (-1 = use system default)</td>
+        </tr>
         <tr>
             <td><h5>security.ssl.internal.key-password</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
@@ -32,6 +42,16 @@
             <td style="word-wrap: break-word;">(none)</td>
             <td>The secret to decrypt the keystore file for Flink's for Flink's internal endpoints (rpc, data transport, blob server).</td>
         </tr>
+        <tr>
+            <td><h5>security.ssl.internal.session-cache-size</h5></td>
+            <td style="word-wrap: break-word;">-1</td>
+            <td>The size of the cache used for storing SSL session objects. According to https://github.com/netty/netty/issues/832, you should always set this to an appropriate number to not run into a bug with stalling IO threads during garbage collection. (-1 = use system default).</td>
+        </tr>
+        <tr>
+            <td><h5>security.ssl.internal.session-timeout</h5></td>
+            <td style="word-wrap: break-word;">-1</td>
+            <td>The timeout (in ms) for the cached SSL session objects. (-1 = use system default)</td>
+        </tr>
         <tr>
             <td><h5>security.ssl.internal.truststore</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git a/docs/ops/security-ssl.md b/docs/ops/security-ssl.md
index 6ea686203ee..4e3716218d2 100644
--- a/docs/ops/security-ssl.md
+++ b/docs/ops/security-ssl.md
@@ -22,6 +22,9 @@ specific language governing permissions and limitations
 under the License.
 -->
 
+* ToC
+{:toc}
+
 This page provides instructions on how to enable TLS/SSL authentication and encryption for network communication with and between Flink processes.
 
 ## Internal and External Connectivity
@@ -37,7 +40,7 @@ For more flexibility, security for internal and external connectivity can be ena
   <img src="{{ site.baseurl }}/fig/ssl_internal_external.svg" alt="Internal and External Connectivity" style="width:75%; padding-top:10px; padding-bottom:10px;" />
 </div>
 
-#### Internal Connectivity
+### Internal Connectivity
 
 Internal connectivity includes:
 
@@ -54,7 +57,7 @@ is not needed by any other party to interact with Flink, and can be simply added
 
 *Note: Because internal connections are mutually authenticated with shared certificates, Flink can skip hostname verification. This makes container-based setups easier.*
 
-#### External / REST Connectivity
+### External / REST Connectivity
 
 All external connectivity is exposed via an HTTP/REST endpoint, used for example by the web UI and the CLI:
 
@@ -71,7 +74,7 @@ Examples for proxies that Flink users have deployed are [Envoy Proxy](https://ww
 The rationale behind delegating authentication to a proxy is that such proxies offer a wide variety of authentication options and thus better integration into existing infrastructures.
 
 
-#### Queryable State
+### Queryable State
 
 Connections to the queryable state endpoints is currently not authenticated or encrypted.
 
@@ -92,13 +95,13 @@ When `security.ssl.internal.enabled` is set to `true`, you can set the following
   - `blob.service.ssl.enabled`: Transport of BLOBs from JobManager to TaskManager
   - `akka.ssl.enabled`: Akka-based RPC connections between JobManager / TaskManager / ResourceManager
 
-#### Keystores and Truststores
+### Keystores and Truststores
 
 The SSL configuration requires to configure a **keystore** and a **truststore**. The *keystore* contains the public certificate
 (public key) and the private key, while the truststore contains the trusted certificates or the trusted authorities. Both stores
 need to be set up such that the truststore trusts the keystore's certificate.
 
-**Internal Connectivity**
+#### Internal Connectivity
 
 Because internal communication is mutually authenticated, keystore and truststore typically contain the same dedicated certificate.
 The certificate can use wild card hostnames or addresses, because the certificate is expected to be a shared secret and host
@@ -112,7 +115,7 @@ security.ssl.internal.truststore: /path/to/file.truststore
 security.ssl.internal.truststore-password: truststore_password
 {% endhighlight %}
 
-**REST Endpoints (external connectivity)**
+#### REST Endpoints (external connectivity)
 
 For REST endpoints, by default the keystore is used by the server endpoint, and the truststore is used by the REST clients (including the CLI client)
 to accept the server's certificate. In the case where the REST keystore has a self-signed certificate, the truststore must trust that certificate directly.
@@ -130,8 +133,9 @@ security.ssl.rest.truststore-password: truststore_password
 security.ssl.rest.authentication-enabled: false
 {% endhighlight %}
 
-**IMPORTANT**
+### Cipher suites
 
+<span class="label label-danger">IMPORTANT</span>
 The [IETF RFC 7525](https://tools.ietf.org/html/rfc7525) recommends to use a specific set of cipher suites for strong security.
 Because these cipher suites were not available on many setups out of the box, Flink's default value is set to a slightly
 weaker but more compatible cipher suite.
@@ -143,6 +147,9 @@ security.ssl.algorithms: TLS_DHE_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_RSA_WITH_
 
 If these cipher suites are not supported on your setup, you will see that Flink processes will not be able to connect to each other.
 
+### Complete List of SSL Options
+
+{% include generated/security_configuration.html %}
 
 ## Creating and Deploying Keystores and Truststores
 
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
index be413cb7264..87ab1713215 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
@@ -298,4 +298,47 @@
 		key("security.ssl.verify-hostname")
 			.defaultValue(true)
 			.withDescription("Flag to enable peer’s hostname verification during ssl handshake.");
+
+	// ------------------------ ssl parameters --------------------------------
+
+	/**
+	 * SSL session cache size.
+	 */
+	public static final ConfigOption<Integer> SSL_INTERNAL_SESSION_CACHE_SIZE =
+		key("security.ssl.internal.session-cache-size")
+			.defaultValue(-1)
+			.withDescription("The size of the cache used for storing SSL session objects. "
+				+ "According to https://github.com/netty/netty/issues/832, you should always set "
+				+ "this to an appropriate number to not run into a bug with stalling IO threads "
+				+ "during garbage collection. (-1 = use system default).")
+		.withDeprecatedKeys("security.ssl.session-cache-size");
+
+	/**
+	 * SSL session timeout.
+	 */
+	public static final ConfigOption<Integer> SSL_INTERNAL_SESSION_TIMEOUT =
+		key("security.ssl.internal.session-timeout")
+			.defaultValue(-1)
+			.withDescription("The timeout (in ms) for the cached SSL session objects. (-1 = use system default)")
+			.withDeprecatedKeys("security.ssl.session-timeout");
+
+	/**
+	 * SSL session timeout during handshakes.
+	 */
+	public static final ConfigOption<Integer> SSL_INTERNAL_HANDSHAKE_TIMEOUT =
+		key("security.ssl.internal.handshake-timeout")
+			.defaultValue(-1)
+			.withDescription("The timeout (in ms) during SSL handshake. (-1 = use system default)")
+			.withDeprecatedKeys("security.ssl.handshake-timeout");
+
+	/**
+	 * SSL session timeout after flushing the <tt>close_notify</tt> message.
+	 */
+	public static final ConfigOption<Integer> SSL_INTERNAL_CLOSE_NOTIFY_FLUSH_TIMEOUT =
+		key("security.ssl.internal.close-notify-flush-timeout")
+			.defaultValue(-1)
+			.withDescription("The timeout (in ms) for flushing the `close_notify` that was triggered by closing a " +
+				"channel. If the `close_notify` was not flushed in the given timeout the channel will be closed " +
+				"forcibly. (-1 = use system default)")
+			.withDeprecatedKeys("security.ssl.close-notify-flush-timeout");
 }
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
index 1fa2cd0ec5c..89a32f19caf 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
@@ -24,7 +24,7 @@
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.mesos.configuration.MesosOptions;
-import org.apache.flink.runtime.net.SSLEngineFactory;
+import org.apache.flink.runtime.io.network.netty.SSLHandlerFactory;
 import org.apache.flink.runtime.net.SSLUtils;
 import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
 import org.apache.flink.runtime.rest.handler.router.Router;
@@ -51,7 +51,6 @@
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpServerCodec;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
-import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
 import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedStream;
 import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
 import org.apache.flink.shaded.netty4.io.netty.util.CharsetUtil;
@@ -59,8 +58,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.ssl.SSLEngine;
-
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -115,7 +112,7 @@ public MesosArtifactServer(String prefix, String serverHostname, int configuredP
 				MesosOptions.ARTIFACT_SERVER_SSL_ENABLED) &&
 				SSLUtils.isRestSSLEnabled(config);
 
-		final SSLEngineFactory sslFactory;
+		final SSLHandlerFactory sslFactory;
 		if (enableSSL) {
 			LOG.info("Enabling ssl for the artifact server");
 			try {
@@ -138,8 +135,7 @@ protected void initChannel(SocketChannel ch) {
 
 				// SSL should be the first handler in the pipeline
 				if (sslFactory != null) {
-					SSLEngine sslEngine = sslFactory.createSSLEngine();
-					ch.pipeline().addLast("ssl", new SslHandler(sslEngine));
+					ch.pipeline().addLast("ssl", sslFactory.createNettySSLHandler());
 				}
 
 				ch.pipeline()
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 39c8a3c0d03..0f067c0e2c3 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -23,10 +23,10 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.io.network.netty.SSLHandlerFactory;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.net.SSLEngineFactory;
 import org.apache.flink.runtime.net.SSLUtils;
 import org.apache.flink.runtime.rest.handler.WebHandler;
 import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
@@ -231,7 +231,7 @@ public WebRuntimeMonitor(
 		// --------------------------------------------------------------------
 
 		// Config to enable https access to the web-ui
-		final SSLEngineFactory sslFactory;
+		final SSLHandlerFactory sslFactory;
 		final boolean enableSSL = SSLUtils.isRestSSLEnabled(config) && config.getBoolean(WebOptions.SSL_ENABLED);
 		if (enableSSL) {
 			LOG.info("Enabling ssl for the web frontend");
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
index 08914265591..5d65d64afa8 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
@@ -26,7 +26,7 @@
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.history.FsJobArchivist;
-import org.apache.flink.runtime.net.SSLEngineFactory;
+import org.apache.flink.runtime.io.network.netty.SSLHandlerFactory;
 import org.apache.flink.runtime.net.SSLUtils;
 import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler;
 import org.apache.flink.runtime.rest.handler.router.Router;
@@ -43,6 +43,8 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
@@ -88,7 +90,8 @@
 
 	private final HistoryServerArchiveFetcher archiveFetcher;
 
-	private final SSLEngineFactory serverSSLFactory;
+	@Nullable
+	private final SSLHandlerFactory serverSSLFactory;
 	private WebFrontendBootstrap netty;
 
 	private final Object startupShutdownLock = new Object();
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
index 672fddb3eec..cf30fac156c 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
@@ -20,7 +20,7 @@
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.runtime.net.SSLEngineFactory;
+import org.apache.flink.runtime.io.network.netty.SSLHandlerFactory;
 import org.apache.flink.runtime.rest.handler.router.Router;
 import org.apache.flink.runtime.rest.handler.router.RouterHandler;
 import org.apache.flink.runtime.webmonitor.HttpRequestHandler;
@@ -35,13 +35,11 @@
 import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
 import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpServerCodec;
-import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
 import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
 
 import org.slf4j.Logger;
 
 import javax.annotation.Nullable;
-import javax.net.ssl.SSLEngine;
 
 import java.io.File;
 import java.net.InetAddress;
@@ -64,7 +62,7 @@ public WebFrontendBootstrap(
 			Router router,
 			Logger log,
 			File directory,
-			@Nullable SSLEngineFactory serverSSLFactory,
+			@Nullable SSLHandlerFactory serverSSLFactory,
 			String configuredAddress,
 			int configuredPort,
 			final Configuration config) throws InterruptedException, UnknownHostException {
@@ -81,8 +79,7 @@ protected void initChannel(SocketChannel ch) {
 
 				// SSL should be the first handler in the pipeline
 				if (serverSSLFactory != null) {
-					SSLEngine sslEngine = serverSSLFactory.createSSLEngine();
-					ch.pipeline().addLast("ssl", new SslHandler(sslEngine));
+					ch.pipeline().addLast("ssl", serverSSLFactory.createNettySSLHandler());
 				}
 
 				ch.pipeline()
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
index ab999d4de8d..d7cad7fcd8e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.io.network.netty;
 
-import org.apache.flink.runtime.net.SSLEngineFactory;
-
 import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelException;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
@@ -37,7 +35,6 @@
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
-import javax.net.ssl.SSLEngine;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -55,7 +52,7 @@
 	private Bootstrap bootstrap;
 
 	@Nullable
-	private SSLEngineFactory clientSSLFactory;
+	private SSLHandlerFactory clientSSLFactory;
 
 	NettyClient(NettyConfig config) {
 		this.config = config;
@@ -181,11 +178,10 @@ public void initChannel(SocketChannel channel) throws Exception {
 
 				// SSL handler should be added first in the pipeline
 				if (clientSSLFactory != null) {
-					SSLEngine sslEngine = clientSSLFactory.createSSLEngine(
-						serverSocketAddress.getAddress().getCanonicalHostName(),
-						serverSocketAddress.getPort());
-
-					channel.pipeline().addLast("ssl", new SslHandler(sslEngine));
+					SslHandler sslHandler = clientSSLFactory.createNettySSLHandler(
+							serverSocketAddress.getAddress().getCanonicalHostName(),
+							serverSocketAddress.getPort());
+					channel.pipeline().addLast("ssl", sslHandler);
 				}
 				channel.pipeline().addLast(protocol.getClientChannelHandlers());
 			}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
index 59971e1c5c1..4694c69385c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
@@ -22,7 +22,6 @@
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.net.SSLEngineFactory;
 import org.apache.flink.runtime.net.SSLUtils;
 
 import org.slf4j.Logger;
@@ -191,14 +190,14 @@ public TransportType getTransportType() {
 	}
 
 	@Nullable
-	public SSLEngineFactory createClientSSLEngineFactory() throws Exception {
+	public SSLHandlerFactory createClientSSLEngineFactory() throws Exception {
 		return getSSLEnabled() ?
 				SSLUtils.createInternalClientSSLEngineFactory(config) :
 				null;
 	}
 
 	@Nullable
-	public SSLEngineFactory createServerSSLEngineFactory() throws Exception {
+	public SSLHandlerFactory createServerSSLEngineFactory() throws Exception {
 		return getSSLEnabled() ?
 				SSLUtils.createInternalServerSSLEngineFactory(config) :
 				null;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
index cc260c6ed9b..f818ff64016 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.io.network.netty;
 
-import org.apache.flink.runtime.net.SSLEngineFactory;
 import org.apache.flink.runtime.util.FatalExitExceptionHandler;
 
 import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -32,13 +31,10 @@
 import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
 import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
 import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
-import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.ssl.SSLEngine;
-
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.concurrent.ThreadFactory;
@@ -136,9 +132,9 @@ void init(final NettyProtocol protocol, NettyBufferPool nettyBufferPool) throws
 		}
 
 		// SSL related configuration
-		final SSLEngineFactory sslEngineFactory;
+		final SSLHandlerFactory sslHandlerFactory;
 		try {
-			sslEngineFactory = config.createServerSSLEngineFactory();
+			sslHandlerFactory = config.createServerSSLEngineFactory();
 		} catch (Exception e) {
 			throw new IOException("Failed to initialize SSL Context for the Netty Server", e);
 		}
@@ -150,9 +146,8 @@ void init(final NettyProtocol protocol, NettyBufferPool nettyBufferPool) throws
 		bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
 			@Override
 			public void initChannel(SocketChannel channel) throws Exception {
-				if (sslEngineFactory != null) {
-					SSLEngine sslEngine = sslEngineFactory.createSSLEngine();
-					channel.pipeline().addLast("ssl", new SslHandler(sslEngine));
+				if (sslHandlerFactory != null) {
+					channel.pipeline().addLast("ssl", sslHandlerFactory.createNettySSLHandler());
 				}
 
 				channel.pipeline().addLast(protocol.getServerChannelHandlers());
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLEngineFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SSLHandlerFactory.java
similarity index 57%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLEngineFactory.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SSLHandlerFactory.java
index 68f60b81ace..66d9466122f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLEngineFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SSLHandlerFactory.java
@@ -16,7 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.net;
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
 
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
@@ -24,9 +26,9 @@
 import static java.util.Objects.requireNonNull;
 
 /**
- * Creates and configures {@link SSLEngine} instances.
+ * Creates and configures {@link SslHandler} instances.
  */
-public class SSLEngineFactory {
+public class SSLHandlerFactory {
 
 	private final SSLContext sslContext;
 
@@ -36,29 +38,66 @@
 
 	private final boolean clientMode;
 
-	final boolean clientAuthentication;
+	private final boolean clientAuthentication;
+
+	private final int handshakeTimeoutMs;
+
+	private final int closeNotifyFlushTimeoutMs;
 
-	public SSLEngineFactory(
+	/**
+	 * Create a new SSLEngine factory.
+	 *
+	 * @param handshakeTimeoutMs
+	 * 		SSL session timeout during handshakes (-1 = use system default)
+	 * @param closeNotifyFlushTimeoutMs
+	 * 		SSL session timeout after flushing the <tt>close_notify</tt> message (-1 = use system
+	 * 		default)
+	 */
+	public SSLHandlerFactory(
 			final SSLContext sslContext,
 			final String[] enabledProtocols,
 			final String[] enabledCipherSuites,
 			final boolean clientMode,
-			final boolean clientAuthentication) {
+			final boolean clientAuthentication,
+			final int handshakeTimeoutMs,
+			final int closeNotifyFlushTimeoutMs) {
 
 		this.sslContext = requireNonNull(sslContext, "sslContext must not be null");
 		this.enabledProtocols = requireNonNull(enabledProtocols, "enabledProtocols must not be null");
 		this.enabledCipherSuites = requireNonNull(enabledCipherSuites, "cipherSuites must not be null");
 		this.clientMode = clientMode;
 		this.clientAuthentication = clientAuthentication;
+		this.handshakeTimeoutMs = handshakeTimeoutMs;
+		this.closeNotifyFlushTimeoutMs = closeNotifyFlushTimeoutMs;
+	}
+
+	public SslHandler createNettySSLHandler() {
+		return createNettySSLHandler(createSSLEngine());
+	}
+
+	public SslHandler createNettySSLHandler(String hostname, int port) {
+		return createNettySSLHandler(createSSLEngine(hostname, port));
+	}
+
+	private SslHandler createNettySSLHandler(SSLEngine sslEngine) {
+		SslHandler sslHandler = new SslHandler(sslEngine);
+		if (handshakeTimeoutMs >= 0) {
+			sslHandler.setHandshakeTimeoutMillis(handshakeTimeoutMs);
+		}
+		if (closeNotifyFlushTimeoutMs >= 0) {
+			sslHandler.setCloseNotifyFlushTimeoutMillis(closeNotifyFlushTimeoutMs);
+		}
+
+		return sslHandler;
 	}
 
-	public SSLEngine createSSLEngine() {
+	private SSLEngine createSSLEngine() {
 		final SSLEngine sslEngine = sslContext.createSSLEngine();
 		configureSSLEngine(sslEngine);
 		return sslEngine;
 	}
 
-	public SSLEngine createSSLEngine(String hostname, int port) {
+	private SSLEngine createSSLEngine(String hostname, int port) {
 		final SSLEngine sslEngine = sslContext.createSSLEngine(hostname, port);
 		configureSSLEngine(sslEngine);
 		return sslEngine;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
index e0b208d4b8d..c756fabbd0c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
@@ -22,6 +22,7 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.runtime.io.network.netty.SSLHandlerFactory;
 
 import javax.annotation.Nullable;
 import javax.net.ServerSocketFactory;
@@ -109,73 +110,81 @@ public static SocketFactory createSSLClientSocketFactory(Configuration config) t
 	/**
 	 * Creates a SSLEngineFactory to be used by internal communication server endpoints.
 	 */
-	public static SSLEngineFactory createInternalServerSSLEngineFactory(final Configuration config) throws Exception {
+	public static SSLHandlerFactory createInternalServerSSLEngineFactory(final Configuration config) throws Exception {
 		SSLContext sslContext = createInternalSSLContext(config);
 		if (sslContext == null) {
 			throw new IllegalConfigurationException("SSL is not enabled for internal communication.");
 		}
 
-		return new SSLEngineFactory(
+		return new SSLHandlerFactory(
 				sslContext,
 				getEnabledProtocols(config),
 				getEnabledCipherSuites(config),
 				false,
-				true);
+				true,
+				config.getInteger(SecurityOptions.SSL_INTERNAL_HANDSHAKE_TIMEOUT),
+				config.getInteger(SecurityOptions.SSL_INTERNAL_CLOSE_NOTIFY_FLUSH_TIMEOUT));
 	}
 
 	/**
 	 * Creates a SSLEngineFactory to be used by internal communication client endpoints.
 	 */
-	public static SSLEngineFactory createInternalClientSSLEngineFactory(final Configuration config) throws Exception {
+	public static SSLHandlerFactory createInternalClientSSLEngineFactory(final Configuration config) throws Exception {
 		SSLContext sslContext = createInternalSSLContext(config);
 		if (sslContext == null) {
 			throw new IllegalConfigurationException("SSL is not enabled for internal communication.");
 		}
 
-		return new SSLEngineFactory(
+		return new SSLHandlerFactory(
 				sslContext,
 				getEnabledProtocols(config),
 				getEnabledCipherSuites(config),
 				true,
-				true);
+				true,
+				config.getInteger(SecurityOptions.SSL_INTERNAL_HANDSHAKE_TIMEOUT),
+				config.getInteger(SecurityOptions.SSL_INTERNAL_CLOSE_NOTIFY_FLUSH_TIMEOUT));
 	}
 
 	/**
-	 * Creates a {@link SSLEngineFactory} to be used by the REST Servers.
+	 * Creates a {@link SSLHandlerFactory} to be used by the REST Servers.
 	 *
 	 * @param config The application configuration.
 	 */
-	public static SSLEngineFactory createRestServerSSLEngineFactory(final Configuration config) throws Exception {
+	public static SSLHandlerFactory createRestServerSSLEngineFactory(final Configuration config) throws Exception {
 		SSLContext sslContext = createRestServerSSLContext(config);
 		if (sslContext == null) {
 			throw new IllegalConfigurationException("SSL is not enabled for REST endpoints.");
 		}
 
-		return new SSLEngineFactory(
+		return new SSLHandlerFactory(
 				sslContext,
 				getEnabledProtocols(config),
 				getEnabledCipherSuites(config),
 				false,
-				isRestSSLAuthenticationEnabled(config));
+				isRestSSLAuthenticationEnabled(config),
+				-1,
+				-1);
 	}
 
 	/**
-	 * Creates a {@link SSLEngineFactory} to be used by the REST Clients.
+	 * Creates a {@link SSLHandlerFactory} to be used by the REST Clients.
 	 *
 	 * @param config The application configuration.
 	 */
-	public static SSLEngineFactory createRestClientSSLEngineFactory(final Configuration config) throws Exception {
+	public static SSLHandlerFactory createRestClientSSLEngineFactory(final Configuration config) throws Exception {
 		SSLContext sslContext = createRestClientSSLContext(config);
 		if (sslContext == null) {
 			throw new IllegalConfigurationException("SSL is not enabled for REST endpoints.");
 		}
 
-		return new SSLEngineFactory(
+		return new SSLHandlerFactory(
 				sslContext,
 				getEnabledProtocols(config),
 				getEnabledCipherSuites(config),
 				true,
-				isRestSSLAuthenticationEnabled(config));
+				isRestSSLAuthenticationEnabled(config),
+				-1,
+				-1);
 	}
 
 	private static String[] getEnabledProtocols(final Configuration config) {
@@ -194,7 +203,7 @@ public static SSLEngineFactory createRestClientSSLEngineFactory(final Configurat
 	 * of mutual authentication.
 	 */
 	@Nullable
-	public static SSLContext createInternalSSLContext(Configuration config) throws Exception {
+	private static SSLContext createInternalSSLContext(Configuration config) throws Exception {
 		checkNotNull(config, "config");
 
 		if (!isInternalSSLEnabled(config)) {
@@ -216,6 +225,8 @@ public static SSLContext createInternalSSLContext(Configuration config) throws E
 				config, SecurityOptions.SSL_INTERNAL_TRUSTSTORE_PASSWORD, SecurityOptions.SSL_TRUSTSTORE_PASSWORD);
 
 		String sslProtocolVersion = config.getString(SecurityOptions.SSL_PROTOCOL);
+		int sessionCacheSize = config.getInteger(SecurityOptions.SSL_INTERNAL_SESSION_CACHE_SIZE);
+		int sessionTimeoutMs = config.getInteger(SecurityOptions.SSL_INTERNAL_SESSION_TIMEOUT);
 
 		KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
 		try (InputStream keyStoreFile = Files.newInputStream(new File(keystoreFilePath).toPath())) {
@@ -235,6 +246,12 @@ public static SSLContext createInternalSSLContext(Configuration config) throws E
 
 		SSLContext sslContext = SSLContext.getInstance(sslProtocolVersion);
 		sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
+		if (sessionCacheSize >= 0) {
+			sslContext.getClientSessionContext().setSessionCacheSize(sessionCacheSize);
+		}
+		if (sessionTimeoutMs >= 0) {
+			sslContext.getClientSessionContext().setSessionTimeout(sessionTimeoutMs / 1000);
+		}
 
 		return sslContext;
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index 6aef080a348..c7614ba2fdf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -22,7 +22,7 @@
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.net.SSLEngineFactory;
+import org.apache.flink.runtime.io.network.netty.SSLHandlerFactory;
 import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.messages.MessageParameters;
@@ -68,7 +68,6 @@
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DefaultHttpDataFactory;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostRequestEncoder;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.MemoryAttribute;
-import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
 import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
 import org.apache.flink.shaded.netty4.io.netty.handler.timeout.IdleStateEvent;
 import org.apache.flink.shaded.netty4.io.netty.handler.timeout.IdleStateHandler;
@@ -114,14 +113,14 @@ public RestClient(RestClientConfiguration configuration, Executor executor) {
 		this.executor = Preconditions.checkNotNull(executor);
 		this.terminationFuture = new CompletableFuture<>();
 
-		final SSLEngineFactory sslEngineFactory = configuration.getSslEngineFactory();
+		final SSLHandlerFactory sslHandlerFactory = configuration.getSslHandlerFactory();
 		ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
 			@Override
 			protected void initChannel(SocketChannel socketChannel) {
 				try {
 					// SSL should be the first handler in the pipeline
-					if (sslEngineFactory != null) {
-						socketChannel.pipeline().addLast("ssl", new SslHandler(sslEngineFactory.createSSLEngine()));
+					if (sslHandlerFactory != null) {
+						socketChannel.pipeline().addLast("ssl", sslHandlerFactory.createNettySSLHandler());
 					}
 
 					socketChannel.pipeline()
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
index b70b1f81ab3..dbddc069c2e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
@@ -20,7 +20,7 @@
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.RestOptions;
-import org.apache.flink.runtime.net.SSLEngineFactory;
+import org.apache.flink.runtime.io.network.netty.SSLHandlerFactory;
 import org.apache.flink.runtime.net.SSLUtils;
 import org.apache.flink.util.ConfigurationException;
 import org.apache.flink.util.Preconditions;
@@ -36,7 +36,7 @@
 public final class RestClientConfiguration {
 
 	@Nullable
-	private final SSLEngineFactory sslEngineFactory;
+	private final SSLHandlerFactory sslHandlerFactory;
 
 	private final long connectionTimeout;
 
@@ -45,12 +45,12 @@
 	private final int maxContentLength;
 
 	private RestClientConfiguration(
-			@Nullable final SSLEngineFactory sslEngineFactory,
+			@Nullable final SSLHandlerFactory sslHandlerFactory,
 			final long connectionTimeout,
 			final long idlenessTimeout,
 			final int maxContentLength) {
 		checkArgument(maxContentLength > 0, "maxContentLength must be positive, was: %d", maxContentLength);
-		this.sslEngineFactory = sslEngineFactory;
+		this.sslHandlerFactory = sslHandlerFactory;
 		this.connectionTimeout = connectionTimeout;
 		this.idlenessTimeout = idlenessTimeout;
 		this.maxContentLength = maxContentLength;
@@ -62,8 +62,8 @@ private RestClientConfiguration(
 	 * @return SSLEngine that the REST client endpoint should use, or null if SSL was disabled
 	 */
 	@Nullable
-	public SSLEngineFactory getSslEngineFactory() {
-		return sslEngineFactory;
+	public SSLHandlerFactory getSslHandlerFactory() {
+		return sslHandlerFactory;
 	}
 
 	/**
@@ -100,15 +100,15 @@ public int getMaxContentLength() {
 	public static RestClientConfiguration fromConfiguration(Configuration config) throws ConfigurationException {
 		Preconditions.checkNotNull(config);
 
-		final SSLEngineFactory sslEngineFactory;
+		final SSLHandlerFactory sslHandlerFactory;
 		if (SSLUtils.isRestSSLEnabled(config)) {
 			try {
-				sslEngineFactory = SSLUtils.createRestClientSSLEngineFactory(config);
+				sslHandlerFactory = SSLUtils.createRestClientSSLEngineFactory(config);
 			} catch (Exception e) {
 				throw new ConfigurationException("Failed to initialize SSLContext for the REST client", e);
 			}
 		} else {
-			sslEngineFactory = null;
+			sslHandlerFactory = null;
 		}
 
 		final long connectionTimeout = config.getLong(RestOptions.CONNECTION_TIMEOUT);
@@ -117,6 +117,6 @@ public static RestClientConfiguration fromConfiguration(Configuration config) th
 
 		int maxContentLength = config.getInteger(RestOptions.CLIENT_MAX_CONTENT_LENGTH);
 
-		return new RestClientConfiguration(sslEngineFactory, connectionTimeout, idlenessTimeout, maxContentLength);
+		return new RestClientConfiguration(sslHandlerFactory, connectionTimeout, idlenessTimeout, maxContentLength);
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
index ab8278512c7..9a699a04fb3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
@@ -22,7 +22,7 @@
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.net.SSLEngineFactory;
+import org.apache.flink.runtime.io.network.netty.SSLHandlerFactory;
 import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 import org.apache.flink.runtime.rest.handler.router.Router;
@@ -41,7 +41,6 @@
 import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
 import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpServerCodec;
-import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
 import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
 import org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory;
 
@@ -76,7 +75,7 @@
 	private final String restBindAddress;
 	private final int restBindPort;
 	@Nullable
-	private final SSLEngineFactory sslEngineFactory;
+	private final SSLHandlerFactory sslHandlerFactory;
 	private final int maxContentLength;
 
 	protected final Path uploadDir;
@@ -96,7 +95,7 @@ public RestServerEndpoint(RestServerEndpointConfiguration configuration) throws
 		this.restAddress = configuration.getRestAddress();
 		this.restBindAddress = configuration.getRestBindAddress();
 		this.restBindPort = configuration.getRestBindPort();
-		this.sslEngineFactory = configuration.getSslEngineFactory();
+		this.sslHandlerFactory = configuration.getSslHandlerFactory();
 
 		this.uploadDir = configuration.getUploadDir();
 		createUploadDir(uploadDir, log);
@@ -155,8 +154,8 @@ protected void initChannel(SocketChannel ch) {
 					RouterHandler handler = new RouterHandler(router, responseHeaders);
 
 					// SSL should be the first handler in the pipeline
-					if (sslEngineFactory != null) {
-						ch.pipeline().addLast("ssl", new SslHandler(sslEngineFactory.createSSLEngine()));
+					if (sslHandlerFactory != null) {
+						ch.pipeline().addLast("ssl", sslHandlerFactory.createNettySSLHandler());
 					}
 
 					ch.pipeline()
@@ -200,7 +199,7 @@ protected void initChannel(SocketChannel ch) {
 
 			final String protocol;
 
-			if (sslEngineFactory != null) {
+			if (sslHandlerFactory != null) {
 				protocol = "https://";
 			} else {
 				protocol = "http://";
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
index 561891f9276..5a54d206c99 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
@@ -21,7 +21,7 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.WebOptions;
-import org.apache.flink.runtime.net.SSLEngineFactory;
+import org.apache.flink.runtime.io.network.netty.SSLHandlerFactory;
 import org.apache.flink.runtime.net.SSLUtils;
 import org.apache.flink.util.ConfigurationException;
 import org.apache.flink.util.Preconditions;
@@ -51,7 +51,7 @@
 	private final int restBindPort;
 
 	@Nullable
-	private final SSLEngineFactory sslEngineFactory;
+	private final SSLHandlerFactory sslHandlerFactory;
 
 	private final Path uploadDir;
 
@@ -63,7 +63,7 @@ private RestServerEndpointConfiguration(
 			final String restAddress,
 			@Nullable String restBindAddress,
 			int restBindPort,
-			@Nullable SSLEngineFactory sslEngineFactory,
+			@Nullable SSLHandlerFactory sslHandlerFactory,
 			final Path uploadDir,
 			final int maxContentLength,
 			final Map<String, String> responseHeaders) {
@@ -74,7 +74,7 @@ private RestServerEndpointConfiguration(
 		this.restAddress = requireNonNull(restAddress);
 		this.restBindAddress = restBindAddress;
 		this.restBindPort = restBindPort;
-		this.sslEngineFactory = sslEngineFactory;
+		this.sslHandlerFactory = sslHandlerFactory;
 		this.uploadDir = requireNonNull(uploadDir);
 		this.maxContentLength = maxContentLength;
 		this.responseHeaders = Collections.unmodifiableMap(requireNonNull(responseHeaders));
@@ -111,8 +111,8 @@ public int getRestBindPort() {
 	 * @return SSLEngine that the REST server endpoint should use, or null if SSL was disabled
 	 */
 	@Nullable
-	public SSLEngineFactory getSslEngineFactory() {
-		return sslEngineFactory;
+	public SSLHandlerFactory getSslHandlerFactory() {
+		return sslHandlerFactory;
 	}
 
 	/**
@@ -155,15 +155,15 @@ public static RestServerEndpointConfiguration fromConfiguration(Configuration co
 		final String restBindAddress = config.getString(RestOptions.BIND_ADDRESS);
 		final int port = config.getInteger(RestOptions.PORT);
 
-		final SSLEngineFactory sslEngineFactory;
+		final SSLHandlerFactory sslHandlerFactory;
 		if (SSLUtils.isRestSSLEnabled(config)) {
 			try {
-				sslEngineFactory = SSLUtils.createRestServerSSLEngineFactory(config);
+				sslHandlerFactory = SSLUtils.createRestServerSSLEngineFactory(config);
 			} catch (Exception e) {
 				throw new ConfigurationException("Failed to initialize SSLEngineFactory for REST server endpoint.", e);
 			}
 		} else {
-			sslEngineFactory = null;
+			sslHandlerFactory = null;
 		}
 
 		final Path uploadDir = Paths.get(
@@ -180,7 +180,7 @@ public static RestServerEndpointConfiguration fromConfiguration(Configuration co
 			restAddress,
 			restBindAddress,
 			port,
-			sslEngineFactory,
+			sslHandlerFactory,
 			uploadDir,
 			maxContentLength,
 			responseHeaders);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
index c1fef146a9d..6ec7a235c3f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.netty;
 
+import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.io.network.netty.NettyTestUtil.NettyServerAndClient;
@@ -29,13 +30,22 @@
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.string.StringDecoder;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.string.StringEncoder;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
 
 import org.junit.Assert;
 import org.junit.Test;
 
+import javax.net.ssl.SSLSessionContext;
+
 import java.net.InetAddress;
 
+import static org.apache.flink.configuration.SecurityOptions.SSL_INTERNAL_CLOSE_NOTIFY_FLUSH_TIMEOUT;
+import static org.apache.flink.configuration.SecurityOptions.SSL_INTERNAL_HANDSHAKE_TIMEOUT;
+import static org.apache.flink.configuration.SecurityOptions.SSL_INTERNAL_SESSION_CACHE_SIZE;
+import static org.apache.flink.configuration.SecurityOptions.SSL_INTERNAL_SESSION_TIMEOUT;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -49,21 +59,65 @@
 	 */
 	@Test
 	public void testValidSslConnection() throws Exception {
+		testValidSslConnection(createSslConfig());
+	}
+
+	/**
+	 * Verify valid (advanced) ssl configuration and connection.
+	 */
+	@Test
+	public void testValidSslConnectionAdvanced() throws Exception {
+		Configuration sslConfig = createSslConfig();
+		sslConfig.setInteger(SSL_INTERNAL_SESSION_CACHE_SIZE, 1);
+		sslConfig.setInteger(SSL_INTERNAL_SESSION_TIMEOUT, 1_000);
+		sslConfig.setInteger(SSL_INTERNAL_HANDSHAKE_TIMEOUT, 1_000);
+		sslConfig.setInteger(SSL_INTERNAL_CLOSE_NOTIFY_FLUSH_TIMEOUT, 1_000);
+
+		testValidSslConnection(sslConfig);
+	}
+
+	private void testValidSslConnection(Configuration sslConfig) throws Exception {
 		NettyProtocol protocol = new NoOpProtocol();
 
-		NettyConfig nettyConfig = createNettyConfig(createSslConfig());
+		NettyConfig nettyConfig = createNettyConfig(sslConfig);
 
 		NettyTestUtil.NettyServerAndClient serverAndClient = NettyTestUtil.initServerAndClient(protocol, nettyConfig);
 
 		Channel ch = NettyTestUtil.connect(serverAndClient);
 
+		SslHandler sslHandler = (SslHandler) ch.pipeline().get("ssl");
+		assertEqualsOrDefault(sslConfig, SSL_INTERNAL_HANDSHAKE_TIMEOUT, sslHandler.getHandshakeTimeoutMillis());
+		assertEqualsOrDefault(sslConfig, SSL_INTERNAL_CLOSE_NOTIFY_FLUSH_TIMEOUT, sslHandler.getCloseNotifyFlushTimeoutMillis());
+
 		// should be able to send text data
 		ch.pipeline().addLast(new StringDecoder()).addLast(new StringEncoder());
 		assertTrue(ch.writeAndFlush("test").await().isSuccess());
 
+		// session context is only be available after a session was setup -> this should be true after data was sent
+		SSLSessionContext sessionContext = sslHandler.engine().getSession().getSessionContext();
+		assertNotNull("bug in unit test setup: session context not available", sessionContext);
+		assertEqualsOrDefault(sslConfig, SSL_INTERNAL_SESSION_CACHE_SIZE, sessionContext.getSessionCacheSize());
+		int sessionTimeout = sslConfig.getInteger(SSL_INTERNAL_SESSION_TIMEOUT);
+		if (sessionTimeout != -1) {
+			// session timeout config is in milliseconds but the context returns it in seconds
+			assertEquals(sessionTimeout / 1000, sessionContext.getSessionTimeout());
+		} else {
+			assertTrue("default value (-1) should not be propagated", sessionContext.getSessionTimeout() >= 0);
+		}
+
 		NettyTestUtil.shutdown(serverAndClient);
 	}
 
+	private static void assertEqualsOrDefault(Configuration sslConfig, ConfigOption<Integer> option, long actual) {
+		long expected = sslConfig.getInteger(option);
+		if (expected != option.defaultValue()) {
+			assertEquals(expected, actual);
+		} else {
+			assertTrue("default value (" + option.defaultValue() + ") should not be propagated",
+				actual >= 0);
+		}
+	}
+
 	/**
 	 * Verify failure on invalid ssl configuration.
 	 */
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
index 9610e98b101..5031accb1a9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
@@ -21,11 +21,13 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.runtime.io.network.netty.SSLHandlerFactory;
 import org.apache.flink.util.TestLogger;
 
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
+
 import org.junit.Test;
 
-import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLServerSocket;
 
 import java.net.ServerSocket;
@@ -124,7 +126,7 @@ public void testSocketFactoriesWhenSslDisabled() throws Exception {
 	public void testRESTClientSSL() throws Exception {
 		Configuration clientConfig = createRestSslConfigWithTrustStore();
 
-		SSLEngineFactory ssl = SSLUtils.createRestClientSSLEngineFactory(clientConfig);
+		SSLHandlerFactory ssl = SSLUtils.createRestClientSSLEngineFactory(clientConfig);
 		assertNotNull(ssl);
 	}
 
@@ -195,7 +197,7 @@ public void testRESTClientSSLWrongPassword() throws Exception {
 	public void testRESTServerSSL() throws Exception {
 		Configuration serverConfig = createRestSslConfigWithKeyStore();
 
-		SSLEngineFactory ssl = SSLUtils.createRestServerSSLEngineFactory(serverConfig);
+		SSLHandlerFactory ssl = SSLUtils.createRestServerSSLEngineFactory(serverConfig);
 		assertNotNull(ssl);
 	}
 
@@ -373,7 +375,7 @@ public void testSetSSLVersionAndCipherSuitesForSSLServerSocket() throws Exceptio
 	}
 
 	/**
-	 * Tests that {@link SSLEngineFactory} is created correctly.
+	 * Tests that {@link SSLHandlerFactory} is created correctly.
 	 */
 	@Test
 	public void testCreateSSLEngineFactory() throws Exception {
@@ -383,14 +385,14 @@ public void testCreateSSLEngineFactory() throws Exception {
 		serverConfig.setString(SecurityOptions.SSL_PROTOCOL, "TLSv1");
 		serverConfig.setString(SecurityOptions.SSL_ALGORITHMS, "TLS_DHE_RSA_WITH_AES_128_CBC_SHA,TLS_DHE_RSA_WITH_AES_128_CBC_SHA256");
 
-		final SSLEngineFactory serverSSLEngineFactory = SSLUtils.createInternalServerSSLEngineFactory(serverConfig);
-		final SSLEngine sslEngine = serverSSLEngineFactory.createSSLEngine();
+		final SSLHandlerFactory serverSSLHandlerFactory = SSLUtils.createInternalServerSSLEngineFactory(serverConfig);
+		final SslHandler sslHandler = serverSSLHandlerFactory.createNettySSLHandler();
 
-		assertEquals(1, sslEngine.getEnabledProtocols().length);
-		assertEquals("TLSv1", sslEngine.getEnabledProtocols()[0]);
+		assertEquals(1, sslHandler.engine().getEnabledProtocols().length);
+		assertEquals("TLSv1", sslHandler.engine().getEnabledProtocols()[0]);
 
-		assertEquals(2, sslEngine.getEnabledCipherSuites().length);
-		assertThat(sslEngine.getEnabledCipherSuites(), arrayContainingInAnyOrder(
+		assertEquals(2, sslHandler.engine().getEnabledCipherSuites().length);
+		assertThat(sslHandler.engine().getEnabledCipherSuites(), arrayContainingInAnyOrder(
 				"TLS_DHE_RSA_WITH_AES_128_CBC_SHA", "TLS_DHE_RSA_WITH_AES_128_CBC_SHA256"));
 	}
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services