You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/16 07:03:09 UTC

[4/8] flink git commit: [FLINK-9313] [security] (part 2) Split SSL configuration into internal (rpc, data transport, blob server) and external (REST)

[FLINK-9313] [security] (part 2) Split SSL configuration into internal (rpc, data transport, blob server) and external (REST)

This also uses SSLEngineFactory for all SSLEngine creations.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/30c4bc84
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/30c4bc84
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/30c4bc84

Branch: refs/heads/master
Commit: 30c4bc847ade8cf0ae5c3ef6a6f8debdf72ddd61
Parents: 4db63c0
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Jul 12 17:20:30 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Jul 16 08:10:46 2018 +0200

----------------------------------------------------------------------
 docs/_includes/generated/common_section.html    |   9 +-
 .../generated/security_configuration.html       |  59 ++-
 .../generated/task_manager_configuration.html   |   2 +-
 .../configuration/HistoryServerOptions.java     |   2 +-
 .../flink/configuration/SecurityOptions.java    | 130 +++++-
 .../flink/configuration/TaskManagerOptions.java |   2 +-
 .../flink/mesos/util/MesosArtifactServer.java   |  21 +-
 .../runtime/webmonitor/WebRuntimeMonitor.java   |  15 +-
 .../webmonitor/history/HistoryServer.java       |  13 +-
 .../webmonitor/utils/WebFrontendBootstrap.java  |  17 +-
 .../apache/flink/runtime/blob/BlobClient.java   |   3 +-
 .../apache/flink/runtime/blob/BlobServer.java   |   3 +-
 .../HighAvailabilityServicesUtils.java          |   4 +-
 .../runtime/io/network/netty/NettyClient.java   |  25 +-
 .../runtime/io/network/netty/NettyConfig.java   |  51 +--
 .../runtime/io/network/netty/NettyServer.java   |  13 +-
 .../flink/runtime/net/SSLEngineFactory.java     |  12 +-
 .../org/apache/flink/runtime/net/SSLUtils.java  | 298 +++++++------
 .../runtime/rest/RestClientConfiguration.java   |   6 +-
 .../rest/RestServerEndpointConfiguration.java   |   6 +-
 .../runtime/rpc/akka/AkkaRpcServiceUtils.java   |   2 +-
 .../apache/flink/runtime/akka/AkkaUtils.scala   |  22 +-
 .../flink/runtime/blob/BlobClientSslTest.java   |  33 +-
 .../network/netty/NettyClientServerSslTest.java |  15 +-
 .../apache/flink/runtime/net/SSLUtilsTest.java  | 427 +++++++++++++------
 .../runtime/rest/RestServerEndpointITCase.java  |  18 +-
 .../flink/runtime/akka/AkkaSslITCase.scala      |  10 +-
 27 files changed, 791 insertions(+), 427 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/30c4bc84/docs/_includes/generated/common_section.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/common_section.html b/docs/_includes/generated/common_section.html
index 29245c9..ea881e0 100644
--- a/docs/_includes/generated/common_section.html
+++ b/docs/_includes/generated/common_section.html
@@ -53,9 +53,14 @@
             <td>File system path (URI) where Flink persists metadata in high-availability setups.</td>
         </tr>
         <tr>
-            <td><h5>security.ssl.enabled</h5></td>
+            <td><h5>security.ssl.internal.enabled</h5></td>
             <td style="word-wrap: break-word;">false</td>
-            <td>Turns on SSL for internal network communication. This can be optionally overridden by flags defined in different transport modules.</td>
+            <td>Turns on SSL for internal network communication. Optionally, specific components may override this through their own settings (rpc, data transport, REST, etc).</td>
+        </tr>
+        <tr>
+            <td><h5>security.ssl.rest.enabled</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Turns on SSL for external communication via the REST endpoints.</td>
         </tr>
     </tbody>
 </table>

http://git-wip-us.apache.org/repos/asf/flink/blob/30c4bc84/docs/_includes/generated/security_configuration.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/security_configuration.html b/docs/_includes/generated/security_configuration.html
index cd682ec..5042cf3 100644
--- a/docs/_includes/generated/security_configuration.html
+++ b/docs/_includes/generated/security_configuration.html
@@ -13,9 +13,34 @@
             <td>The comma separated list of standard SSL algorithms to be supported. Read more &#60;a href="http://docs.oracle.com/javase/8/docs/technotes/guides/security/StandardNames.html#ciphersuites"&#62;here&#60;/a&#62;.</td>
         </tr>
         <tr>
-            <td><h5>security.ssl.enabled</h5></td>
+            <td><h5>security.ssl.internal.enabled</h5></td>
             <td style="word-wrap: break-word;">false</td>
-            <td>Turns on SSL for internal network communication. This can be optionally overridden by flags defined in different transport modules.</td>
+            <td>Turns on SSL for internal network communication. Optionally, specific components may override this through their own settings (rpc, data transport, REST, etc).</td>
+        </tr>
+        <tr>
+            <td><h5>security.ssl.internal.key-password</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>The secret to decrypt the key in the keystore for Flink's internal endpoints (rpc, data transport, blob server).</td>
+        </tr>
+        <tr>
+            <td><h5>security.ssl.internal.keystore</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>The Java keystore file with SSL Key and Certificate, to be used Flink's internal endpoints (rpc, data transport, blob server).</td>
+        </tr>
+        <tr>
+            <td><h5>security.ssl.internal.keystore-password</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>The secret to decrypt the keystore file for Flink's for Flink's internal endpoints (rpc, data transport, blob server).</td>
+        </tr>
+        <tr>
+            <td><h5>security.ssl.internal.truststore</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>The truststore file containing the public CA certificates to verify the peer for Flink's internal endpoints (rpc, data transport, blob server).</td>
+        </tr>
+        <tr>
+            <td><h5>security.ssl.internal.truststore-password</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>The password to decrypt the truststore for Flink's internal endpoints (rpc, data transport, blob server).</td>
         </tr>
         <tr>
             <td><h5>security.ssl.key-password</h5></td>
@@ -38,6 +63,36 @@
             <td>The SSL protocol version to be supported for the ssl transport. Note that it doesn’t support comma separated list.</td>
         </tr>
         <tr>
+            <td><h5>security.ssl.rest.enabled</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Turns on SSL for external communication via the REST endpoints.</td>
+        </tr>
+        <tr>
+            <td><h5>security.ssl.rest.key-password</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>The secret to decrypt the key in the keystore for Flink's external REST endpoints.</td>
+        </tr>
+        <tr>
+            <td><h5>security.ssl.rest.keystore</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>The Java keystore file with SSL Key and Certificate, to be used Flink's external REST endpoints.</td>
+        </tr>
+        <tr>
+            <td><h5>security.ssl.rest.keystore-password</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>The secret to decrypt the keystore file for Flink's for Flink's external REST endpoints.</td>
+        </tr>
+        <tr>
+            <td><h5>security.ssl.rest.truststore</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>The truststore file containing the public CA certificates to verify the peer for Flink's external REST endpoints.</td>
+        </tr>
+        <tr>
+            <td><h5>security.ssl.rest.truststore-password</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>The password to decrypt the truststore for Flink's external REST endpoints.</td>
+        </tr>
+        <tr>
             <td><h5>security.ssl.truststore</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>The truststore file containing the public CA certificates to be used by flink endpoints to verify the peer’s certificate.</td>

http://git-wip-us.apache.org/repos/asf/flink/blob/30c4bc84/docs/_includes/generated/task_manager_configuration.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/task_manager_configuration.html b/docs/_includes/generated/task_manager_configuration.html
index e780f6e..c18c5d4 100644
--- a/docs/_includes/generated/task_manager_configuration.html
+++ b/docs/_includes/generated/task_manager_configuration.html
@@ -35,7 +35,7 @@
         <tr>
             <td><h5>taskmanager.data.ssl.enabled</h5></td>
             <td style="word-wrap: break-word;">true</td>
-            <td>Enable SSL support for the taskmanager data transport. This is applicable only when the global ssl flag security.ssl.enabled is set to true</td>
+            <td>Enable SSL support for the taskmanager data transport. This is applicable only when the global flag for internal SSL (security.ssl.internal.enabled) is set to true</td>
         </tr>
         <tr>
             <td><h5>taskmanager.debug.memory.log</h5></td>

http://git-wip-us.apache.org/repos/asf/flink/blob/30c4bc84/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java
index 13cdc1e..12e7f24 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java
@@ -80,7 +80,7 @@ public class HistoryServerOptions {
 
 	/**
 	 * Enables/Disables SSL support for the HistoryServer web-frontend. Only relevant if
-	 * {@link SecurityOptions#SSL_ENABLED} is enabled.
+	 * {@link SecurityOptions#SSL_REST_ENABLED} is enabled.
 	 */
 	public static final ConfigOption<Boolean> HISTORY_SERVER_WEB_SSL_ENABLED =
 		key("historyserver.web.ssl.enabled")

http://git-wip-us.apache.org/repos/asf/flink/blob/30c4bc84/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
index fc7e391..03ee4f2 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
@@ -85,14 +85,40 @@ public class SecurityOptions {
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Enable SSL support.
+	 * Enable SSL for internal (rpc, data transport, blob server) and external (HTTP/REST) communication.
+	 *
+	 * @deprecated Use {@link #SSL_INTERNAL_ENABLED} and {@link #SSL_REST_ENABLED} instead.
 	 */
-	@Documentation.CommonOption(position = Documentation.CommonOption.POSITION_SECURITY)
+	@Deprecated
 	public static final ConfigOption<Boolean> SSL_ENABLED =
 		key("security.ssl.enabled")
 			.defaultValue(false)
-			.withDescription("Turns on SSL for internal network communication. This can be optionally overridden by" +
-				" flags defined in different transport modules.");
+			.withDescription("Turns on SSL for internal and external network communication." +
+					"This can be overridden by 'security.ssl.internal.enabled', 'security.ssl.external.enabled'. " +
+					"Specific internal components (rpc, data transport, blob server) may optionally override " +
+					"this through their own settings.");
+
+	/**
+	 * Enable SSL for internal communication (akka rpc, netty data transport, blob server).
+	 */
+	@Documentation.CommonOption(position = Documentation.CommonOption.POSITION_SECURITY)
+	public static final ConfigOption<Boolean> SSL_INTERNAL_ENABLED =
+			key("security.ssl.internal.enabled")
+			.defaultValue(false)
+			.withDescription("Turns on SSL for internal network communication. " +
+					"Optionally, specific components may override this through their own settings " +
+					"(rpc, data transport, REST, etc).");
+
+	/**
+	 * Enable SSL for external REST endpoints.
+	 */
+	@Documentation.CommonOption(position = Documentation.CommonOption.POSITION_SECURITY)
+	public static final ConfigOption<Boolean> SSL_REST_ENABLED =
+			key("security.ssl.rest.enabled")
+			.defaultValue(false)
+			.withDescription("Turns on SSL for external communication via the REST endpoints.");
+
+	// ----------------- certificates (internal + external) -------------------
 
 	/**
 	 * The Java keystore file containing the flink endpoint key and certificate.
@@ -135,6 +161,102 @@ public class SecurityOptions {
 			.noDefaultValue()
 			.withDescription("The secret to decrypt the truststore.");
 
+	// ----------------------- certificates (internal) ------------------------
+
+	/**
+	 * For internal SSL, the Java keystore file containing the private key and certificate.
+	 */
+	public static final ConfigOption<String> SSL_INTERNAL_KEYSTORE =
+			key("security.ssl.internal.keystore")
+					.noDefaultValue()
+					.withDescription("The Java keystore file with SSL Key and Certificate, " +
+							"to be used Flink's internal endpoints (rpc, data transport, blob server).");
+
+	/**
+	 * For internal SSL, the password to decrypt the keystore file containing the certificate.
+	 */
+	public static final ConfigOption<String> SSL_INTERNAL_KEYSTORE_PASSWORD =
+			key("security.ssl.internal.keystore-password")
+					.noDefaultValue()
+					.withDescription("The secret to decrypt the keystore file for Flink's " +
+							"for Flink's internal endpoints (rpc, data transport, blob server).");
+
+	/**
+	 * For internal SSL, the password to decrypt the private key.
+	 */
+	public static final ConfigOption<String> SSL_INTERNAL_KEY_PASSWORD =
+			key("security.ssl.internal.key-password")
+					.noDefaultValue()
+					.withDescription("The secret to decrypt the key in the keystore " +
+							"for Flink's internal endpoints (rpc, data transport, blob server).");
+
+	/**
+	 * For internal SSL, the truststore file containing the public CA certificates to verify the ssl peers.
+	 */
+	public static final ConfigOption<String> SSL_INTERNAL_TRUSTSTORE =
+			key("security.ssl.internal.truststore")
+					.noDefaultValue()
+					.withDescription("The truststore file containing the public CA certificates to verify the peer " +
+							"for Flink's internal endpoints (rpc, data transport, blob server).");
+
+	/**
+	 * For internal SSL, the secret to decrypt the truststore.
+	 */
+	public static final ConfigOption<String> SSL_INTERNAL_TRUSTSTORE_PASSWORD =
+			key("security.ssl.internal.truststore-password")
+					.noDefaultValue()
+					.withDescription("The password to decrypt the truststore " +
+							"for Flink's internal endpoints (rpc, data transport, blob server).");
+
+	// ----------------------- certificates (external) ------------------------
+
+	/**
+	 * For external (REST) SSL, the Java keystore file containing the private key and certificate.
+	 */
+	public static final ConfigOption<String> SSL_REST_KEYSTORE =
+			key("security.ssl.rest.keystore")
+					.noDefaultValue()
+					.withDescription("The Java keystore file with SSL Key and Certificate, " +
+							"to be used Flink's external REST endpoints.");
+
+	/**
+	 * For external (REST) SSL, the password to decrypt the keystore file containing the certificate.
+	 */
+	public static final ConfigOption<String> SSL_REST_KEYSTORE_PASSWORD =
+			key("security.ssl.rest.keystore-password")
+					.noDefaultValue()
+					.withDescription("The secret to decrypt the keystore file for Flink's " +
+							"for Flink's external REST endpoints.");
+
+	/**
+	 * For external (REST) SSL, the password to decrypt the private key.
+	 */
+	public static final ConfigOption<String> SSL_REST_KEY_PASSWORD =
+			key("security.ssl.rest.key-password")
+					.noDefaultValue()
+					.withDescription("The secret to decrypt the key in the keystore " +
+							"for Flink's external REST endpoints.");
+
+	/**
+	 * For external (REST) SSL, the truststore file containing the public CA certificates to verify the ssl peers.
+	 */
+	public static final ConfigOption<String> SSL_REST_TRUSTSTORE =
+			key("security.ssl.rest.truststore")
+					.noDefaultValue()
+					.withDescription("The truststore file containing the public CA certificates to verify the peer " +
+							"for Flink's external REST endpoints.");
+
+	/**
+	 * For external (REST) SSL, the secret to decrypt the truststore.
+	 */
+	public static final ConfigOption<String> SSL_REST_TRUSTSTORE_PASSWORD =
+			key("security.ssl.rest.truststore-password")
+					.noDefaultValue()
+					.withDescription("The password to decrypt the truststore " +
+							"for Flink's external REST endpoints.");
+
+	// ------------------------ ssl parameters --------------------------------
+
 	/**
 	 * SSL protocol version to be supported.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/30c4bc84/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index 2907f6b..be76cf8 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -116,7 +116,7 @@ public class TaskManagerOptions {
 		key("taskmanager.data.ssl.enabled")
 			.defaultValue(true)
 			.withDescription("Enable SSL support for the taskmanager data transport. This is applicable only when the" +
-				" global ssl flag " + SecurityOptions.SSL_ENABLED.key() + " is set to true");
+				" global flag for internal SSL (" + SecurityOptions.SSL_INTERNAL_ENABLED.key() + ") is set to true");
 
 	/**
 	 * The initial registration backoff between two consecutive registration attempts. The backoff

http://git-wip-us.apache.org/repos/asf/flink/blob/30c4bc84/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
index d0d41e2..1fa2cd0 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
@@ -24,6 +24,7 @@ import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.mesos.configuration.MesosOptions;
+import org.apache.flink.runtime.net.SSLEngineFactory;
 import org.apache.flink.runtime.net.SSLUtils;
 import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
 import org.apache.flink.runtime.rest.handler.router.Router;
@@ -58,7 +59,6 @@ import org.apache.flink.shaded.netty4.io.netty.util.CharsetUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 
 import java.io.File;
@@ -104,8 +104,6 @@ public class MesosArtifactServer implements MesosArtifactResolver {
 
 	private final Map<Path, URL> paths = new HashMap<>();
 
-	private final SSLContext serverSSLContext;
-
 	public MesosArtifactServer(String prefix, String serverHostname, int configuredPort, Configuration config)
 		throws Exception {
 		if (configuredPort < 0 || configuredPort > 0xFFFF) {
@@ -113,19 +111,20 @@ public class MesosArtifactServer implements MesosArtifactResolver {
 		}
 
 		// Config to enable https access to the artifact server
-		boolean enableSSL = config.getBoolean(
+		final boolean enableSSL = config.getBoolean(
 				MesosOptions.ARTIFACT_SERVER_SSL_ENABLED) &&
-				SSLUtils.getSSLEnabled(config);
+				SSLUtils.isRestSSLEnabled(config);
 
+		final SSLEngineFactory sslFactory;
 		if (enableSSL) {
 			LOG.info("Enabling ssl for the artifact server");
 			try {
-				serverSSLContext = SSLUtils.createSSLServerContext(config);
+				sslFactory = SSLUtils.createRestServerSSLEngineFactory(config);
 			} catch (Exception e) {
 				throw new IOException("Failed to initialize SSLContext for the artifact server", e);
 			}
 		} else {
-			serverSSLContext = null;
+			sslFactory = null;
 		}
 
 		router = new Router();
@@ -138,10 +137,8 @@ public class MesosArtifactServer implements MesosArtifactResolver {
 				RouterHandler handler = new RouterHandler(router, new HashMap<>());
 
 				// SSL should be the first handler in the pipeline
-				if (serverSSLContext != null) {
-					SSLEngine sslEngine = serverSSLContext.createSSLEngine();
-					SSLUtils.setSSLVerAndCipherSuites(sslEngine, sslConfig);
-					sslEngine.setUseClientMode(false);
+				if (sslFactory != null) {
+					SSLEngine sslEngine = sslFactory.createSSLEngine();
 					ch.pipeline().addLast("ssl", new SslHandler(sslEngine));
 				}
 
@@ -169,7 +166,7 @@ public class MesosArtifactServer implements MesosArtifactResolver {
 		String address = bindAddress.getAddress().getHostAddress();
 		int port = bindAddress.getPort();
 
-		String httpProtocol = (serverSSLContext != null) ? "https" : "http";
+		String httpProtocol = (sslFactory != null) ? "https" : "http";
 
 		baseURL = new URL(httpProtocol, serverHostname, port, "/" + prefix + "/");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/30c4bc84/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 976b080..39c8a3c 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.net.SSLEngineFactory;
 import org.apache.flink.runtime.net.SSLUtils;
 import org.apache.flink.runtime.rest.handler.WebHandler;
 import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
@@ -92,8 +93,6 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.ssl.SSLContext;
-
 import java.io.File;
 import java.io.IOException;
 import java.util.UUID;
@@ -130,8 +129,6 @@ public class WebRuntimeMonitor implements WebMonitor {
 	/** Service which retrieves the currently leading JobManager and opens a JobManagerGateway. */
 	private final LeaderGatewayRetriever<JobManagerGateway> retriever;
 
-	private final SSLContext serverSSLContext;
-
 	private final CompletableFuture<String> localRestAddress = new CompletableFuture<>();
 
 	private final Time timeout;
@@ -234,17 +231,17 @@ public class WebRuntimeMonitor implements WebMonitor {
 		// --------------------------------------------------------------------
 
 		// Config to enable https access to the web-ui
-		boolean enableSSL = config.getBoolean(WebOptions.SSL_ENABLED) && SSLUtils.getSSLEnabled(config);
-
+		final SSLEngineFactory sslFactory;
+		final boolean enableSSL = SSLUtils.isRestSSLEnabled(config) && config.getBoolean(WebOptions.SSL_ENABLED);
 		if (enableSSL) {
 			LOG.info("Enabling ssl for the web frontend");
 			try {
-				serverSSLContext = SSLUtils.createSSLServerContext(config);
+				sslFactory = SSLUtils.createRestServerSSLEngineFactory(config);
 			} catch (Exception e) {
 				throw new IOException("Failed to initialize SSLContext for the web frontend", e);
 			}
 		} else {
-			serverSSLContext = null;
+			sslFactory = null;
 		}
 		metricFetcher = new MetricFetcher(retriever, queryServiceRetriever, scheduledExecutor, timeout);
 
@@ -385,7 +382,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 		// add shutdown hook for deleting the directories and remaining temp files on shutdown
 		ShutdownHookUtil.addShutdownHook(this::cleanup, getClass().getSimpleName(), LOG);
 
-		this.netty = new WebFrontendBootstrap(router, LOG, uploadDir, serverSSLContext, configuredAddress, configuredPort, config);
+		this.netty = new WebFrontendBootstrap(router, LOG, uploadDir, sslFactory, configuredAddress, configuredPort, config);
 
 		localRestAddress.complete(netty.getRestAddress());
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/30c4bc84/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
index 7e7e921..0891426 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.HistoryServerOptions;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.history.FsJobArchivist;
+import org.apache.flink.runtime.net.SSLEngineFactory;
 import org.apache.flink.runtime.net.SSLUtils;
 import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler;
 import org.apache.flink.runtime.rest.handler.router.Router;
@@ -42,8 +43,6 @@ import org.apache.flink.util.ShutdownHookUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.ssl.SSLContext;
-
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
@@ -89,7 +88,7 @@ public class HistoryServer {
 
 	private final HistoryServerArchiveFetcher archiveFetcher;
 
-	private final SSLContext serverSSLContext;
+	private final SSLEngineFactory serverSSLFactory;
 	private WebFrontendBootstrap netty;
 
 	private final Object startupShutdownLock = new Object();
@@ -143,15 +142,15 @@ public class HistoryServer {
 		Preconditions.checkNotNull(numFinishedPolls);
 
 		this.config = config;
-		if (config.getBoolean(HistoryServerOptions.HISTORY_SERVER_WEB_SSL_ENABLED) && SSLUtils.getSSLEnabled(config)) {
+		if (config.getBoolean(HistoryServerOptions.HISTORY_SERVER_WEB_SSL_ENABLED) && SSLUtils.isRestSSLEnabled(config)) {
 			LOG.info("Enabling SSL for the history server.");
 			try {
-				this.serverSSLContext = SSLUtils.createSSLServerContext(config);
+				this.serverSSLFactory = SSLUtils.createRestServerSSLEngineFactory(config);
 			} catch (Exception e) {
 				throw new IOException("Failed to initialize SSLContext for the history server.", e);
 			}
 		} else {
-			this.serverSSLContext = null;
+			this.serverSSLFactory = null;
 		}
 
 		webAddress = config.getString(HistoryServerOptions.HISTORY_SERVER_WEB_ADDRESS);
@@ -231,7 +230,7 @@ public class HistoryServer {
 
 			archiveFetcher.start();
 
-			netty = new WebFrontendBootstrap(router, LOG, webDir, serverSSLContext, webAddress, webPort, config);
+			netty = new WebFrontendBootstrap(router, LOG, webDir, serverSSLFactory, webAddress, webPort, config);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/30c4bc84/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
index bec9ea2..672fddb 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.webmonitor.utils;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.runtime.net.SSLUtils;
+import org.apache.flink.runtime.net.SSLEngineFactory;
 import org.apache.flink.runtime.rest.handler.router.Router;
 import org.apache.flink.runtime.rest.handler.router.RouterHandler;
 import org.apache.flink.runtime.webmonitor.HttpRequestHandler;
@@ -40,7 +40,7 @@ import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandle
 
 import org.slf4j.Logger;
 
-import javax.net.ssl.SSLContext;
+import javax.annotation.Nullable;
 import javax.net.ssl.SSLEngine;
 
 import java.io.File;
@@ -56,7 +56,6 @@ public class WebFrontendBootstrap {
 	private final Router router;
 	private final Logger log;
 	private final File uploadDir;
-	private final SSLContext serverSSLContext;
 	private final ServerBootstrap bootstrap;
 	private final Channel serverChannel;
 	private final String restAddress;
@@ -65,14 +64,14 @@ public class WebFrontendBootstrap {
 			Router router,
 			Logger log,
 			File directory,
-			SSLContext sslContext,
+			@Nullable SSLEngineFactory serverSSLFactory,
 			String configuredAddress,
 			int configuredPort,
 			final Configuration config) throws InterruptedException, UnknownHostException {
+
 		this.router = Preconditions.checkNotNull(router);
 		this.log = Preconditions.checkNotNull(log);
 		this.uploadDir = directory;
-		this.serverSSLContext = sslContext;
 
 		ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
 
@@ -81,10 +80,8 @@ public class WebFrontendBootstrap {
 				RouterHandler handler = new RouterHandler(WebFrontendBootstrap.this.router, new HashMap<>());
 
 				// SSL should be the first handler in the pipeline
-				if (serverSSLContext != null) {
-					SSLEngine sslEngine = serverSSLContext.createSSLEngine();
-					SSLUtils.setSSLVerAndCipherSuites(sslEngine, config);
-					sslEngine.setUseClientMode(false);
+				if (serverSSLFactory != null) {
+					SSLEngine sslEngine = serverSSLFactory.createSSLEngine();
 					ch.pipeline().addLast("ssl", new SslHandler(sslEngine));
 				}
 
@@ -129,7 +126,7 @@ public class WebFrontendBootstrap {
 
 		this.log.info("Web frontend listening at {}" + ':' + "{}", address, port);
 
-		final String protocol = serverSSLContext != null ? "https://" : "http://";
+		final String protocol = serverSSLFactory != null ? "https://" : "http://";
 
 		this.restAddress = protocol + address + ':' + port;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/30c4bc84/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
index 2ca250c..01e307e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.blob;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.net.SSLUtils;
@@ -84,7 +83,7 @@ public final class BlobClient implements Closeable {
 
 		try {
 			// create an SSL socket if configured
-			if (clientConfig.getBoolean(SecurityOptions.SSL_ENABLED) && clientConfig.getBoolean(BlobServerOptions.SSL_ENABLED)) {
+			if (SSLUtils.isInternalSSLEnabled(clientConfig) && clientConfig.getBoolean(BlobServerOptions.SSL_ENABLED)) {
 				LOG.info("Using ssl connection to the blob server");
 
 				socket = SSLUtils.createSSLClientSocketFactory(clientConfig).createSocket(

http://git-wip-us.apache.org/repos/asf/flink/blob/30c4bc84/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index ee1d50a..206be0e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.net.SSLUtils;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FileUtils;
@@ -176,7 +175,7 @@ public class BlobServer extends Thread implements BlobService, BlobWriter, Perma
 		final Iterator<Integer> ports = NetUtils.getPortRangeFromString(serverPortRange);
 
 		final ServerSocketFactory socketFactory;
-		if (config.getBoolean(SecurityOptions.SSL_ENABLED) && config.getBoolean(BlobServerOptions.SSL_ENABLED)) {
+		if (SSLUtils.isInternalSSLEnabled(config) && config.getBoolean(BlobServerOptions.SSL_ENABLED)) {
 			try {
 				socketFactory = SSLUtils.createSSLServerSocketFactory(config);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/30c4bc84/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
index 918f1f0..778d2db 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
-import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.blob.BlobStoreService;
 import org.apache.flink.runtime.blob.BlobUtils;
 import org.apache.flink.runtime.dispatcher.Dispatcher;
@@ -31,6 +30,7 @@ import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneHaSe
 import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.net.SSLUtils;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
@@ -103,7 +103,7 @@ public class HighAvailabilityServicesUtils {
 					"%s must be set",
 					RestOptions.ADDRESS.key());
 				final int port = configuration.getInteger(RestOptions.PORT);
-				final boolean enableSSL = configuration.getBoolean(SecurityOptions.SSL_ENABLED);
+				final boolean enableSSL = SSLUtils.isRestSSLEnabled(configuration);
 				final String protocol = enableSSL ? "https://" : "http://";
 
 				return new StandaloneHaServices(

http://git-wip-us.apache.org/repos/asf/flink/blob/30c4bc84/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
index dd40190..ab999d4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.io.network.netty;
 
+import org.apache.flink.runtime.net.SSLEngineFactory;
+
 import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelException;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
@@ -34,9 +36,9 @@ import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.ssl.SSLContext;
+import javax.annotation.Nullable;
 import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLParameters;
+
 import java.io.IOException;
 import java.net.InetSocketAddress;
 
@@ -52,7 +54,8 @@ class NettyClient {
 
 	private Bootstrap bootstrap;
 
-	private SSLContext clientSSLContext = null;
+	@Nullable
+	private SSLEngineFactory clientSSLFactory;
 
 	NettyClient(NettyConfig config) {
 		this.config = config;
@@ -112,7 +115,7 @@ class NettyClient {
 		}
 
 		try {
-			clientSSLContext = config.createClientSSLContext();
+			clientSSLFactory = config.createClientSSLEngineFactory();
 		} catch (Exception e) {
 			throw new IOException("Failed to initialize SSL Context for the Netty client", e);
 		}
@@ -177,18 +180,10 @@ class NettyClient {
 			public void initChannel(SocketChannel channel) throws Exception {
 
 				// SSL handler should be added first in the pipeline
-				if (clientSSLContext != null) {
-					SSLEngine sslEngine = clientSSLContext.createSSLEngine(
+				if (clientSSLFactory != null) {
+					SSLEngine sslEngine = clientSSLFactory.createSSLEngine(
 						serverSocketAddress.getAddress().getCanonicalHostName(),
 						serverSocketAddress.getPort());
-					sslEngine.setUseClientMode(true);
-
-					// Enable hostname verification for remote SSL connections
-					if (!serverSocketAddress.getAddress().isLoopbackAddress()) {
-						SSLParameters newSSLParameters = sslEngine.getSSLParameters();
-						config.setSSLVerifyHostname(newSSLParameters);
-						sslEngine.setSSLParameters(newSSLParameters);
-					}
 
 					channel.pipeline().addLast("ssl", new SslHandler(sslEngine));
 				}
@@ -200,7 +195,7 @@ class NettyClient {
 			return bootstrap.connect(serverSocketAddress);
 		}
 		catch (ChannelException e) {
-			if ( (e.getCause() instanceof java.net.SocketException &&
+			if ((e.getCause() instanceof java.net.SocketException &&
 					e.getCause().getMessage().equals("Too many open files")) ||
 				(e.getCause() instanceof ChannelException &&
 						e.getCause().getCause() instanceof java.net.SocketException &&

http://git-wip-us.apache.org/repos/asf/flink/blob/30c4bc84/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
index 18527c4..46cdaab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
@@ -22,13 +22,14 @@ import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.net.SSLEngineFactory;
 import org.apache.flink.runtime.net.SSLUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLParameters;
+import javax.annotation.Nullable;
+
 import java.net.InetAddress;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -89,9 +90,9 @@ public class NettyConfig {
 		NIO, EPOLL, AUTO
 	}
 
-	final static String SERVER_THREAD_GROUP_NAME = "Flink Netty Server";
+	static final String SERVER_THREAD_GROUP_NAME = "Flink Netty Server";
 
-	final static String CLIENT_THREAD_GROUP_NAME = "Flink Netty Client";
+	static final String CLIENT_THREAD_GROUP_NAME = "Flink Netty Client";
 
 	private final InetAddress serverAddress;
 
@@ -189,39 +190,23 @@ public class NettyConfig {
 		}
 	}
 
-	public SSLContext createClientSSLContext() throws Exception {
-
-		// Create SSL Context from config
-		SSLContext clientSSLContext = null;
-		if (getSSLEnabled()) {
-			clientSSLContext = SSLUtils.createSSLClientContext(config);
-		}
-
-		return clientSSLContext;
+	@Nullable
+	public SSLEngineFactory createClientSSLEngineFactory() throws Exception {
+		return getSSLEnabled() ?
+				SSLUtils.createInternalClientSSLEngineFactory(config) :
+				null;
 	}
 
-	public SSLContext createServerSSLContext() throws Exception {
-
-		// Create SSL Context from config
-		SSLContext serverSSLContext = null;
-		if (getSSLEnabled()) {
-			serverSSLContext = SSLUtils.createSSLServerContext(config);
-		}
-
-		return serverSSLContext;
+	@Nullable
+	public SSLEngineFactory createServerSSLEngineFactory() throws Exception {
+		return getSSLEnabled() ?
+				SSLUtils.createInternalServerSSLEngineFactory(config) :
+				null;
 	}
 
 	public boolean getSSLEnabled() {
 		return config.getBoolean(TaskManagerOptions.DATA_SSL_ENABLED)
-			&& SSLUtils.getSSLEnabled(config);
-	}
-
-	public void setSSLVerAndCipherSuites(SSLEngine engine) {
-		SSLUtils.setSSLVerAndCipherSuites(engine, config);
-	}
-
-	public void setSSLVerifyHostname(SSLParameters sslParams) {
-		SSLUtils.setSSLVerifyHostname(config, sslParams);
+			&& SSLUtils.isInternalSSLEnabled(config);
 	}
 
 	public boolean isCreditBasedEnabled() {
@@ -245,7 +230,7 @@ public class NettyConfig {
 		String def = "use Netty's default";
 		String man = "manual";
 
-		return String.format(format, serverAddress, serverPort, getSSLEnabled() ? "true":"false",
+		return String.format(format, serverAddress, serverPort, getSSLEnabled() ? "true" : "false",
 				memorySegmentSize, getTransportType(), getServerNumThreads(),
 				getServerNumThreads() == 0 ? def : man,
 				getClientNumThreads(), getClientNumThreads() == 0 ? def : man,

http://git-wip-us.apache.org/repos/asf/flink/blob/30c4bc84/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
index 81bc50d..cc260c6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.netty;
 
+import org.apache.flink.runtime.net.SSLEngineFactory;
 import org.apache.flink.runtime.util.FatalExitExceptionHandler;
 
 import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -36,7 +37,6 @@ import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 
 import java.io.IOException;
@@ -61,8 +61,6 @@ class NettyServer {
 
 	private ChannelFuture bindFuture;
 
-	private SSLContext serverSSLContext = null;
-
 	private InetSocketAddress localAddress;
 
 	NettyServer(NettyConfig config) {
@@ -138,8 +136,9 @@ class NettyServer {
 		}
 
 		// SSL related configuration
+		final SSLEngineFactory sslEngineFactory;
 		try {
-			serverSSLContext = config.createServerSSLContext();
+			sslEngineFactory = config.createServerSSLEngineFactory();
 		} catch (Exception e) {
 			throw new IOException("Failed to initialize SSL Context for the Netty Server", e);
 		}
@@ -151,10 +150,8 @@ class NettyServer {
 		bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
 			@Override
 			public void initChannel(SocketChannel channel) throws Exception {
-				if (serverSSLContext != null) {
-					SSLEngine sslEngine = serverSSLContext.createSSLEngine();
-					config.setSSLVerAndCipherSuites(sslEngine);
-					sslEngine.setUseClientMode(false);
+				if (sslEngineFactory != null) {
+					SSLEngine sslEngine = sslEngineFactory.createSSLEngine();
 					channel.pipeline().addLast("ssl", new SslHandler(sslEngine));
 				}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/30c4bc84/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLEngineFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLEngineFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLEngineFactory.java
index 7aca60c..d842267 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLEngineFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLEngineFactory.java
@@ -49,9 +49,19 @@ public class SSLEngineFactory {
 
 	public SSLEngine createSSLEngine() {
 		final SSLEngine sslEngine = sslContext.createSSLEngine();
+		configureSSLEngine(sslEngine);
+		return sslEngine;
+	}
+
+	public SSLEngine createSSLEngine(String hostname, int port) {
+		final SSLEngine sslEngine = sslContext.createSSLEngine(hostname, port);
+		configureSSLEngine(sslEngine);
+		return sslEngine;
+	}
+
+	private void configureSSLEngine(SSLEngine sslEngine) {
 		sslEngine.setEnabledProtocols(enabledProtocols);
 		sslEngine.setEnabledCipherSuites(enabledCipherSuites);
 		sslEngine.setUseClientMode(clientMode);
-		return sslEngine;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/30c4bc84/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
index 2bfc0d6..5c95535 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
@@ -18,58 +18,59 @@
 
 package org.apache.flink.runtime.net;
 
+import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.SecurityOptions;
-import org.apache.flink.util.Preconditions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 import javax.net.ServerSocketFactory;
 import javax.net.SocketFactory;
 import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLParameters;
 import javax.net.ssl.SSLServerSocket;
 import javax.net.ssl.SSLServerSocketFactory;
 import javax.net.ssl.TrustManagerFactory;
 
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.net.InetAddress;
 import java.net.ServerSocket;
+import java.nio.file.Files;
 import java.security.KeyStore;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * Common utilities to manage SSL transport settings.
  */
 public class SSLUtils {
 
-	private static final Logger LOG = LoggerFactory.getLogger(SSLUtils.class);
+	/**
+	 * Checks whether SSL for internal communication (rpc, data transport, blob server) is enabled.
+	 */
+	public static boolean isInternalSSLEnabled(Configuration sslConfig) {
+		@SuppressWarnings("deprecation")
+		final boolean fallbackFlag = sslConfig.getBoolean(SecurityOptions.SSL_ENABLED);
+		return sslConfig.getBoolean(SecurityOptions.SSL_INTERNAL_ENABLED, fallbackFlag);
+	}
 
 	/**
-	 * Retrieves the global ssl flag from configuration.
-	 *
-	 * @param sslConfig
-	 *        The application configuration
-	 * @return true if global ssl flag is set
+	 * Checks whether SSL for the external REST endpoint is enabled.
 	 */
-	public static boolean getSSLEnabled(Configuration sslConfig) {
-		return sslConfig.getBoolean(SecurityOptions.SSL_ENABLED);
+	public static boolean isRestSSLEnabled(Configuration sslConfig) {
+		@SuppressWarnings("deprecation")
+		final boolean fallbackFlag = sslConfig.getBoolean(SecurityOptions.SSL_ENABLED);
+		return sslConfig.getBoolean(SecurityOptions.SSL_REST_ENABLED, fallbackFlag);
 	}
 
 	/**
 	 * Creates a factory for SSL Server Sockets from the given configuration.
+	 * SSL Server Sockets are always part of internal communication.
 	 */
 	public static ServerSocketFactory createSSLServerSocketFactory(Configuration config) throws Exception {
-		SSLContext sslContext = createSSLServerContext(config);
+		SSLContext sslContext = createInternalSSLContext(config);
 		if (sslContext == null) {
 			throw new IllegalConfigurationException("SSL is not enabled");
 		}
@@ -83,9 +84,10 @@ public class SSLUtils {
 
 	/**
 	 * Creates a factory for SSL Client Sockets from the given configuration.
+	 * SSL Client Sockets are always part of internal communication.
 	 */
 	public static SocketFactory createSSLClientSocketFactory(Configuration config) throws Exception {
-		SSLContext sslContext = createSSLServerContext(config);
+		SSLContext sslContext = createInternalSSLContext(config);
 		if (sslContext == null) {
 			throw new IllegalConfigurationException("SSL is not enabled");
 		}
@@ -94,51 +96,71 @@ public class SSLUtils {
 	}
 
 	/**
-	 * Creates a {@link SSLEngineFactory} to be used by the Server.
-	 *
-	 * @param config The application configuration.
+	 * Creates a SSLEngineFactory to be used by internal communication server endpoints.
 	 */
-	public static SSLEngineFactory createServerSSLEngineFactory(final Configuration config) throws Exception {
-		return createSSLEngineFactory(config, false);
+	public static SSLEngineFactory createInternalServerSSLEngineFactory(final Configuration config) throws Exception {
+		SSLContext sslContext = createInternalSSLContext(config);
+		if (sslContext == null) {
+			throw new IllegalConfigurationException("SSL is not enabled for internal communication.");
+		}
+
+		return new SSLEngineFactory(
+				sslContext,
+				getEnabledProtocols(config),
+				getEnabledCipherSuites(config),
+				false);
 	}
 
 	/**
-	 * Creates a {@link SSLEngineFactory} to be used by the Client.
-	 * @param config The application configuration.
+	 * Creates a SSLEngineFactory to be used by internal communication client endpoints.
 	 */
-	public static SSLEngineFactory createClientSSLEngineFactory(final Configuration config) throws Exception {
-		return createSSLEngineFactory(config, true);
-	}
-
-	private static SSLEngineFactory createSSLEngineFactory(
-			final Configuration config,
-			final boolean clientMode) throws Exception {
+	public static SSLEngineFactory createInternalClientSSLEngineFactory(final Configuration config) throws Exception {
+		SSLContext sslContext = createInternalSSLContext(config);
+		if (sslContext == null) {
+			throw new IllegalConfigurationException("SSL is not enabled for internal communication.");
+		}
 
-		final SSLContext sslContext = clientMode ?
-			createSSLClientContext(config) :
-			createSSLServerContext(config);
+		return new SSLEngineFactory(
+				sslContext,
+				getEnabledProtocols(config),
+				getEnabledCipherSuites(config),
+				true);
+	}
 
-		checkState(sslContext != null, "%s it not enabled", SecurityOptions.SSL_ENABLED.key());
+	/**
+	 * Creates a {@link SSLEngineFactory} to be used by the REST Servers.
+	 *
+	 * @param config The application configuration.
+	 */
+	public static SSLEngineFactory createRestServerSSLEngineFactory(final Configuration config) throws Exception {
+		SSLContext sslContext = createRestServerSSLContext(config);
+		if (sslContext == null) {
+			throw new IllegalConfigurationException("SSL is not enabled for REST endpoints.");
+		}
 
 		return new SSLEngineFactory(
-			sslContext,
-			getEnabledProtocols(config),
-			getEnabledCipherSuites(config),
-			clientMode);
+				sslContext,
+				getEnabledProtocols(config),
+				getEnabledCipherSuites(config),
+				false);
 	}
 
 	/**
-	 * Sets SSL version and cipher suites for SSLEngine.
+	 * Creates a {@link SSLEngineFactory} to be used by the REST Clients.
 	 *
-	 * @param engine SSLEngine to be handled
-	 * @param config The application configuration
-	 * @deprecated Use {@link #createClientSSLEngineFactory(Configuration)} or
-	 * {@link #createServerSSLEngineFactory(Configuration)}.
+	 * @param config The application configuration.
 	 */
-	@Deprecated
-	public static void setSSLVerAndCipherSuites(SSLEngine engine, Configuration config) {
-		engine.setEnabledProtocols(getEnabledProtocols(config));
-		engine.setEnabledCipherSuites(getEnabledCipherSuites(config));
+	public static SSLEngineFactory createRestClientSSLEngineFactory(final Configuration config) throws Exception {
+		SSLContext sslContext = createRestClientSSLContext(config);
+		if (sslContext == null) {
+			throw new IllegalConfigurationException("SSL is not enabled for REST endpoints.");
+		}
+
+		return new SSLEngineFactory(
+				sslContext,
+				getEnabledProtocols(config),
+				getEnabledCipherSuites(config),
+				true);
 	}
 
 	private static String[] getEnabledProtocols(final Configuration config) {
@@ -152,120 +174,138 @@ public class SSLUtils {
 	}
 
 	/**
-	 * Sets SSL options to verify peer's hostname in the certificate.
-	 *
-	 * @param sslConfig
-	 *        The application configuration
-	 * @param sslParams
-	 *        The SSL parameters that need to be updated
+	 * Creates the SSL Context for internal SSL, if internal SSL is configured.
+	 * For internal SSL, the client and server side configuration are identical, because
+	 * of mutual authentication.
 	 */
-	public static void setSSLVerifyHostname(Configuration sslConfig, SSLParameters sslParams) {
+	@Nullable
+	public static SSLContext createInternalSSLContext(Configuration config) throws Exception {
+		checkNotNull(config, "config");
+
+		if (!isInternalSSLEnabled(config)) {
+			return null;
+		}
+		String keystoreFilePath = getAndCheckOption(
+				config, SecurityOptions.SSL_INTERNAL_KEYSTORE, SecurityOptions.SSL_KEYSTORE);
+
+		String keystorePassword = getAndCheckOption(
+				config, SecurityOptions.SSL_INTERNAL_KEYSTORE_PASSWORD, SecurityOptions.SSL_KEYSTORE_PASSWORD);
+
+		String certPassword = getAndCheckOption(
+				config, SecurityOptions.SSL_INTERNAL_KEY_PASSWORD, SecurityOptions.SSL_KEY_PASSWORD);
 
-		Preconditions.checkNotNull(sslConfig);
-		Preconditions.checkNotNull(sslParams);
+		String trustStoreFilePath = getAndCheckOption(
+				config, SecurityOptions.SSL_INTERNAL_TRUSTSTORE, SecurityOptions.SSL_TRUSTSTORE);
 
-		boolean verifyHostname = sslConfig.getBoolean(SecurityOptions.SSL_VERIFY_HOSTNAME);
-		if (verifyHostname) {
-			sslParams.setEndpointIdentificationAlgorithm("HTTPS");
+		String trustStorePassword = getAndCheckOption(
+				config, SecurityOptions.SSL_INTERNAL_TRUSTSTORE_PASSWORD, SecurityOptions.SSL_TRUSTSTORE_PASSWORD);
+
+		String sslProtocolVersion = config.getString(SecurityOptions.SSL_PROTOCOL);
+
+		KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
+		try (InputStream keyStoreFile = Files.newInputStream(new File(keystoreFilePath).toPath())) {
+			keyStore.load(keyStoreFile, keystorePassword.toCharArray());
+		}
+
+		KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
+		try (InputStream trustStoreFile = Files.newInputStream(new File(trustStoreFilePath).toPath())) {
+			trustStore.load(trustStoreFile, trustStorePassword.toCharArray());
 		}
+
+		KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+		kmf.init(keyStore, certPassword.toCharArray());
+
+		TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+		tmf.init(trustStore);
+
+		SSLContext sslContext = SSLContext.getInstance(sslProtocolVersion);
+		sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
+
+		return sslContext;
 	}
 
 	/**
-	 * Creates the SSL Context for the client if SSL is configured.
-	 *
-	 * @param sslConfig
-	 *        The application configuration
-	 * @return The SSLContext object which can be used by the ssl transport client
-	 * 	       Returns null if SSL is disabled
-	 * @throws Exception
-	 *         Thrown if there is any misconfiguration
+	 * Creates an SSL context for the external REST endpoint server.
 	 */
 	@Nullable
-	public static SSLContext createSSLClientContext(Configuration sslConfig) throws Exception {
+	public static SSLContext createRestServerSSLContext(Configuration config) throws Exception {
+		checkNotNull(config, "config");
 
-		Preconditions.checkNotNull(sslConfig);
-		SSLContext clientSSLContext = null;
+		if (!isRestSSLEnabled(config)) {
+			return null;
+		}
 
-		if (getSSLEnabled(sslConfig)) {
-			LOG.debug("Creating client SSL context from configuration");
+		String keystoreFilePath = getAndCheckOption(
+				config, SecurityOptions.SSL_REST_KEYSTORE, SecurityOptions.SSL_KEYSTORE);
 
-			String trustStoreFilePath = sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE);
-			String trustStorePassword = sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD);
-			String sslProtocolVersion = sslConfig.getString(SecurityOptions.SSL_PROTOCOL);
+		String keystorePassword = getAndCheckOption(
+				config, SecurityOptions.SSL_REST_KEYSTORE_PASSWORD, SecurityOptions.SSL_KEYSTORE_PASSWORD);
 
-			Preconditions.checkNotNull(trustStoreFilePath, SecurityOptions.SSL_TRUSTSTORE.key() + " was not configured.");
-			Preconditions.checkNotNull(trustStorePassword, SecurityOptions.SSL_TRUSTSTORE_PASSWORD.key() + " was not configured.");
+		String certPassword = getAndCheckOption(
+				config, SecurityOptions.SSL_REST_KEY_PASSWORD, SecurityOptions.SSL_KEY_PASSWORD);
 
-			KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
+		String sslProtocolVersion = config.getString(SecurityOptions.SSL_PROTOCOL);
 
-			FileInputStream trustStoreFile = null;
-			try {
-				trustStoreFile = new FileInputStream(new File(trustStoreFilePath));
-				trustStore.load(trustStoreFile, trustStorePassword.toCharArray());
-			} finally {
-				if (trustStoreFile != null) {
-					trustStoreFile.close();
-				}
-			}
+		KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
+		try (InputStream keyStoreFile = Files.newInputStream(new File(keystoreFilePath).toPath())) {
+			keyStore.load(keyStoreFile, keystorePassword.toCharArray());
+		}
 
-			TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(
-				TrustManagerFactory.getDefaultAlgorithm());
-			trustManagerFactory.init(trustStore);
+		KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+		kmf.init(keyStore, certPassword.toCharArray());
 
-			clientSSLContext = SSLContext.getInstance(sslProtocolVersion);
-			clientSSLContext.init(null, trustManagerFactory.getTrustManagers(), null);
-		}
+		SSLContext sslContext = SSLContext.getInstance(sslProtocolVersion);
+		sslContext.init(kmf.getKeyManagers(), null, null);
 
-		return clientSSLContext;
+		return sslContext;
 	}
 
 	/**
-	 * Creates the SSL Context for the server if SSL is configured.
-	 *
-	 * @param sslConfig
-	 *        The application configuration
-	 * @return The SSLContext object which can be used by the ssl transport server
-	 * 	       Returns null if SSL is disabled
-	 * @throws Exception
-	 *         Thrown if there is any misconfiguration
+	 * Creates an SSL context for clients against the external REST endpoint.
 	 */
 	@Nullable
-	public static SSLContext createSSLServerContext(Configuration sslConfig) throws Exception {
+	public static SSLContext createRestClientSSLContext(Configuration config) throws Exception {
+		checkNotNull(config, "config");
 
-		Preconditions.checkNotNull(sslConfig);
-		SSLContext serverSSLContext = null;
+		if (!isRestSSLEnabled(config)) {
+			return null;
+		}
 
-		if (getSSLEnabled(sslConfig)) {
-			LOG.debug("Creating server SSL context from configuration");
+		String trustStoreFilePath = getAndCheckOption(
+				config, SecurityOptions.SSL_REST_TRUSTSTORE, SecurityOptions.SSL_TRUSTSTORE);
 
-			String keystoreFilePath = sslConfig.getString(SecurityOptions.SSL_KEYSTORE);
+		String trustStorePassword = getAndCheckOption(
+				config, SecurityOptions.SSL_REST_TRUSTSTORE_PASSWORD, SecurityOptions.SSL_TRUSTSTORE_PASSWORD);
 
-			String keystorePassword = sslConfig.getString(SecurityOptions.SSL_KEYSTORE_PASSWORD);
+		String sslProtocolVersion = config.getString(SecurityOptions.SSL_PROTOCOL);
 
-			String certPassword = sslConfig.getString(SecurityOptions.SSL_KEY_PASSWORD);
+		KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
+		try (InputStream trustStoreFile = Files.newInputStream(new File(trustStoreFilePath).toPath())) {
+			trustStore.load(trustStoreFile, trustStorePassword.toCharArray());
+		}
 
-			String sslProtocolVersion = sslConfig.getString(SecurityOptions.SSL_PROTOCOL);
+		TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+		tmf.init(trustStore);
 
-			Preconditions.checkNotNull(keystoreFilePath, SecurityOptions.SSL_KEYSTORE.key() + " was not configured.");
-			Preconditions.checkNotNull(keystorePassword, SecurityOptions.SSL_KEYSTORE_PASSWORD.key() + " was not configured.");
-			Preconditions.checkNotNull(certPassword, SecurityOptions.SSL_KEY_PASSWORD.key() + " was not configured.");
+		SSLContext sslContext = SSLContext.getInstance(sslProtocolVersion);
+		sslContext.init(null, tmf.getTrustManagers(), null);
 
-			KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
-			try (FileInputStream keyStoreFile = new FileInputStream(new File(keystoreFilePath))) {
-				ks.load(keyStoreFile, keystorePassword.toCharArray());
-			}
+		return sslContext;
+	}
 
-			// Set up key manager factory to use the server key store
-			KeyManagerFactory kmf = KeyManagerFactory.getInstance(
-					KeyManagerFactory.getDefaultAlgorithm());
-			kmf.init(ks, certPassword.toCharArray());
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
 
-			// Initialize the SSLContext
-			serverSSLContext = SSLContext.getInstance(sslProtocolVersion);
-			serverSSLContext.init(kmf.getKeyManagers(), null, null);
+	private static String getAndCheckOption(Configuration config, ConfigOption<String> primaryOption, ConfigOption<String> fallbackOption) {
+		String value = config.getString(primaryOption, config.getString(fallbackOption));
+		if (value != null) {
+			return value;
+		}
+		else {
+			throw new IllegalConfigurationException("The config option " + primaryOption.key() +
+					" or " + fallbackOption.key() + " is missing.");
 		}
-
-		return serverSSLContext;
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/30c4bc84/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
index b1591be..cbd888d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.rest;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.RestOptions;
-import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.net.SSLEngineFactory;
 import org.apache.flink.runtime.net.SSLUtils;
 import org.apache.flink.util.ConfigurationException;
@@ -91,10 +90,9 @@ public final class RestClientConfiguration {
 		Preconditions.checkNotNull(config);
 
 		final SSLEngineFactory sslEngineFactory;
-		final boolean enableSSL = config.getBoolean(SecurityOptions.SSL_ENABLED);
-		if (enableSSL) {
+		if (SSLUtils.isRestSSLEnabled(config)) {
 			try {
-				sslEngineFactory = SSLUtils.createClientSSLEngineFactory(config);
+				sslEngineFactory = SSLUtils.createRestClientSSLEngineFactory(config);
 			} catch (Exception e) {
 				throw new ConfigurationException("Failed to initialize SSLContext for the web frontend", e);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/30c4bc84/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
index 875230f..561891f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.rest;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.RestOptions;
-import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.net.SSLEngineFactory;
 import org.apache.flink.runtime.net.SSLUtils;
@@ -157,10 +156,9 @@ public final class RestServerEndpointConfiguration {
 		final int port = config.getInteger(RestOptions.PORT);
 
 		final SSLEngineFactory sslEngineFactory;
-		final boolean enableSSL = config.getBoolean(SecurityOptions.SSL_ENABLED) && config.getBoolean(WebOptions.SSL_ENABLED);
-		if (enableSSL) {
+		if (SSLUtils.isRestSSLEnabled(config)) {
 			try {
-				sslEngineFactory = SSLUtils.createServerSSLEngineFactory(config);
+				sslEngineFactory = SSLUtils.createRestServerSSLEngineFactory(config);
 			} catch (Exception e) {
 				throw new ConfigurationException("Failed to initialize SSLEngineFactory for REST server endpoint.", e);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/30c4bc84/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
index 6ae142b..982a536 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
@@ -130,7 +130,7 @@ public class AkkaRpcServiceUtils {
 		checkNotNull(config, "config is null");
 
 		final boolean sslEnabled = config.getBoolean(AkkaOptions.SSL_ENABLED) &&
-				SSLUtils.getSSLEnabled(config);
+				SSLUtils.isInternalSSLEnabled(config);
 
 		return getRpcUrl(
 			hostname,

http://git-wip-us.apache.org/repos/asf/flink/blob/30c4bc84/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 57ca9d4..12378e0 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -336,21 +336,31 @@ object AkkaUtils {
     val logLifecycleEvents = if (lifecycleEvents) "on" else "off"
 
     val akkaEnableSSLConfig = configuration.getBoolean(AkkaOptions.SSL_ENABLED) &&
-          SSLUtils.getSSLEnabled(configuration)
+          SSLUtils.isInternalSSLEnabled(configuration)
 
     val retryGateClosedFor = configuration.getLong(AkkaOptions.RETRY_GATE_CLOSED_FOR)
 
     val akkaEnableSSL = if (akkaEnableSSLConfig) "on" else "off"
 
-    val akkaSSLKeyStore = configuration.getString(SecurityOptions.SSL_KEYSTORE)
+    val akkaSSLKeyStore = configuration.getString(
+                              SecurityOptions.SSL_INTERNAL_KEYSTORE,
+                              configuration.getString(SecurityOptions.SSL_KEYSTORE))
 
-    val akkaSSLKeyStorePassword = configuration.getString(SecurityOptions.SSL_KEYSTORE_PASSWORD)
+    val akkaSSLKeyStorePassword = configuration.getString(
+                              SecurityOptions.SSL_INTERNAL_KEYSTORE_PASSWORD,
+                              configuration.getString(SecurityOptions.SSL_KEYSTORE_PASSWORD))
 
-    val akkaSSLKeyPassword = configuration.getString(SecurityOptions.SSL_KEY_PASSWORD)
+    val akkaSSLKeyPassword = configuration.getString(
+                              SecurityOptions.SSL_INTERNAL_KEY_PASSWORD,
+                              configuration.getString(SecurityOptions.SSL_KEY_PASSWORD))
 
-    val akkaSSLTrustStore = configuration.getString(SecurityOptions.SSL_TRUSTSTORE)
+    val akkaSSLTrustStore = configuration.getString(
+                              SecurityOptions.SSL_INTERNAL_TRUSTSTORE,
+                              configuration.getString(SecurityOptions.SSL_TRUSTSTORE))
 
-    val akkaSSLTrustStorePassword = configuration.getString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD)
+    val akkaSSLTrustStorePassword = configuration.getString(
+                              SecurityOptions.SSL_INTERNAL_TRUSTSTORE_PASSWORD,
+                              configuration.getString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD))
 
     val akkaSSLProtocol = configuration.getString(SecurityOptions.SSL_PROTOCOL)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/30c4bc84/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
index b654cee..531f214 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.blob;
 
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.runtime.net.SSLUtilsTest;
 
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -55,40 +55,25 @@ public class BlobClientSslTest extends BlobClientTest {
 	 */
 	@BeforeClass
 	public static void startSSLServer() throws IOException {
-		Configuration config = new Configuration();
-		config.setString(BlobServerOptions.STORAGE_DIRECTORY,
-			temporarySslFolder.newFolder().getAbsolutePath());
-		config.setBoolean(SecurityOptions.SSL_ENABLED, true);
-		config.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore");
-		config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password");
-		config.setString(SecurityOptions.SSL_KEY_PASSWORD, "password");
+		Configuration config = SSLUtilsTest.createInternalSslConfigWithKeyAndTrustStores();
+		config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporarySslFolder.newFolder().getAbsolutePath());
+
 		blobSslServer = new BlobServer(config, new VoidBlobStore());
 		blobSslServer.start();
 
-		sslClientConfig = new Configuration();
-		sslClientConfig.setBoolean(SecurityOptions.SSL_ENABLED, true);
-		sslClientConfig.setString(SecurityOptions.SSL_TRUSTSTORE, "src/test/resources/local127.truststore");
-		sslClientConfig.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "password");
+		sslClientConfig = config;
 	}
 
 	@BeforeClass
 	public static void startNonSSLServer() throws IOException {
-		Configuration config = new Configuration();
-		config.setString(BlobServerOptions.STORAGE_DIRECTORY,
-			temporarySslFolder.newFolder().getAbsolutePath());
-		config.setBoolean(SecurityOptions.SSL_ENABLED, true);
+		Configuration config = SSLUtilsTest.createInternalSslConfigWithKeyAndTrustStores();
+		config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporarySslFolder.newFolder().getAbsolutePath());
 		config.setBoolean(BlobServerOptions.SSL_ENABLED, false);
-		config.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore");
-		config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password");
-		config.setString(SecurityOptions.SSL_KEY_PASSWORD, "password");
+
 		blobNonSslServer = new BlobServer(config, new VoidBlobStore());
 		blobNonSslServer.start();
 
-		nonSslClientConfig = new Configuration();
-		nonSslClientConfig.setBoolean(SecurityOptions.SSL_ENABLED, true);
-		nonSslClientConfig.setBoolean(BlobServerOptions.SSL_ENABLED, false);
-		nonSslClientConfig.setString(SecurityOptions.SSL_TRUSTSTORE, "src/test/resources/local127.truststore");
-		nonSslClientConfig.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "password");
+		nonSslClientConfig = config;
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/30c4bc84/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
index 33e004e..2750660 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
@@ -151,15 +151,14 @@ public class NettyClientServerSslTest {
 		NettyTestUtil.shutdown(serverAndClient);
 	}
 
-	private Configuration createSslConfig() throws Exception {
-
+	private static Configuration createSslConfig() throws Exception {
 		Configuration flinkConfig = new Configuration();
-		flinkConfig.setBoolean(SecurityOptions.SSL_ENABLED, true);
-		flinkConfig.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore");
-		flinkConfig.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password");
-		flinkConfig.setString(SecurityOptions.SSL_KEY_PASSWORD, "password");
-		flinkConfig.setString(SecurityOptions.SSL_TRUSTSTORE, "src/test/resources/local127.truststore");
-		flinkConfig.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "password");
+		flinkConfig.setBoolean(SecurityOptions.SSL_INTERNAL_ENABLED, true);
+		flinkConfig.setString(SecurityOptions.SSL_INTERNAL_KEYSTORE, "src/test/resources/local127.keystore");
+		flinkConfig.setString(SecurityOptions.SSL_INTERNAL_KEYSTORE_PASSWORD, "password");
+		flinkConfig.setString(SecurityOptions.SSL_INTERNAL_KEY_PASSWORD, "password");
+		flinkConfig.setString(SecurityOptions.SSL_INTERNAL_TRUSTSTORE, "src/test/resources/local127.truststore");
+		flinkConfig.setString(SecurityOptions.SSL_INTERNAL_TRUSTSTORE_PASSWORD, "password");
 		return flinkConfig;
 	}
 }