You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/16 07:03:09 UTC
[4/8] flink git commit: [FLINK-9313] [security] (part 2) Split SSL
configuration into internal (rpc, data transport,
blob server) and external (REST)
[FLINK-9313] [security] (part 2) Split SSL configuration into internal (rpc, data transport, blob server) and external (REST)
This also uses SSLEngineFactory for all SSLEngine creations.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/30c4bc84
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/30c4bc84
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/30c4bc84
Branch: refs/heads/master
Commit: 30c4bc847ade8cf0ae5c3ef6a6f8debdf72ddd61
Parents: 4db63c0
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Jul 12 17:20:30 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Jul 16 08:10:46 2018 +0200
----------------------------------------------------------------------
docs/_includes/generated/common_section.html | 9 +-
.../generated/security_configuration.html | 59 ++-
.../generated/task_manager_configuration.html | 2 +-
.../configuration/HistoryServerOptions.java | 2 +-
.../flink/configuration/SecurityOptions.java | 130 +++++-
.../flink/configuration/TaskManagerOptions.java | 2 +-
.../flink/mesos/util/MesosArtifactServer.java | 21 +-
.../runtime/webmonitor/WebRuntimeMonitor.java | 15 +-
.../webmonitor/history/HistoryServer.java | 13 +-
.../webmonitor/utils/WebFrontendBootstrap.java | 17 +-
.../apache/flink/runtime/blob/BlobClient.java | 3 +-
.../apache/flink/runtime/blob/BlobServer.java | 3 +-
.../HighAvailabilityServicesUtils.java | 4 +-
.../runtime/io/network/netty/NettyClient.java | 25 +-
.../runtime/io/network/netty/NettyConfig.java | 51 +--
.../runtime/io/network/netty/NettyServer.java | 13 +-
.../flink/runtime/net/SSLEngineFactory.java | 12 +-
.../org/apache/flink/runtime/net/SSLUtils.java | 298 +++++++------
.../runtime/rest/RestClientConfiguration.java | 6 +-
.../rest/RestServerEndpointConfiguration.java | 6 +-
.../runtime/rpc/akka/AkkaRpcServiceUtils.java | 2 +-
.../apache/flink/runtime/akka/AkkaUtils.scala | 22 +-
.../flink/runtime/blob/BlobClientSslTest.java | 33 +-
.../network/netty/NettyClientServerSslTest.java | 15 +-
.../apache/flink/runtime/net/SSLUtilsTest.java | 427 +++++++++++++------
.../runtime/rest/RestServerEndpointITCase.java | 18 +-
.../flink/runtime/akka/AkkaSslITCase.scala | 10 +-
27 files changed, 791 insertions(+), 427 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/30c4bc84/docs/_includes/generated/common_section.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/common_section.html b/docs/_includes/generated/common_section.html
index 29245c9..ea881e0 100644
--- a/docs/_includes/generated/common_section.html
+++ b/docs/_includes/generated/common_section.html
@@ -53,9 +53,14 @@
<td>File system path (URI) where Flink persists metadata in high-availability setups.</td>
</tr>
<tr>
- <td><h5>security.ssl.enabled</h5></td>
+ <td><h5>security.ssl.internal.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
- <td>Turns on SSL for internal network communication. This can be optionally overridden by flags defined in different transport modules.</td>
+ <td>Turns on SSL for internal network communication. Optionally, specific components may override this through their own settings (rpc, data transport, REST, etc).</td>
+ </tr>
+ <tr>
+ <td><h5>security.ssl.rest.enabled</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Turns on SSL for external communication via the REST endpoints.</td>
</tr>
</tbody>
</table>
http://git-wip-us.apache.org/repos/asf/flink/blob/30c4bc84/docs/_includes/generated/security_configuration.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/security_configuration.html b/docs/_includes/generated/security_configuration.html
index cd682ec..5042cf3 100644
--- a/docs/_includes/generated/security_configuration.html
+++ b/docs/_includes/generated/security_configuration.html
@@ -13,9 +13,34 @@
<td>The comma separated list of standard SSL algorithms to be supported. Read more <a href="http://docs.oracle.com/javase/8/docs/technotes/guides/security/StandardNames.html#ciphersuites">here</a>.</td>
</tr>
<tr>
- <td><h5>security.ssl.enabled</h5></td>
+ <td><h5>security.ssl.internal.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
- <td>Turns on SSL for internal network communication. This can be optionally overridden by flags defined in different transport modules.</td>
+ <td>Turns on SSL for internal network communication. Optionally, specific components may override this through their own settings (rpc, data transport, REST, etc).</td>
+ </tr>
+ <tr>
+ <td><h5>security.ssl.internal.key-password</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>The secret to decrypt the key in the keystore for Flink's internal endpoints (rpc, data transport, blob server).</td>
+ </tr>
+ <tr>
+ <td><h5>security.ssl.internal.keystore</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>The Java keystore file with SSL Key and Certificate, to be used Flink's internal endpoints (rpc, data transport, blob server).</td>
+ </tr>
+ <tr>
+ <td><h5>security.ssl.internal.keystore-password</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>The secret to decrypt the keystore file for Flink's for Flink's internal endpoints (rpc, data transport, blob server).</td>
+ </tr>
+ <tr>
+ <td><h5>security.ssl.internal.truststore</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>The truststore file containing the public CA certificates to verify the peer for Flink's internal endpoints (rpc, data transport, blob server).</td>
+ </tr>
+ <tr>
+ <td><h5>security.ssl.internal.truststore-password</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>The password to decrypt the truststore for Flink's internal endpoints (rpc, data transport, blob server).</td>
</tr>
<tr>
<td><h5>security.ssl.key-password</h5></td>
@@ -38,6 +63,36 @@
<td>The SSL protocol version to be supported for the ssl transport. Note that it doesn’t support comma separated list.</td>
</tr>
<tr>
+ <td><h5>security.ssl.rest.enabled</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Turns on SSL for external communication via the REST endpoints.</td>
+ </tr>
+ <tr>
+ <td><h5>security.ssl.rest.key-password</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>The secret to decrypt the key in the keystore for Flink's external REST endpoints.</td>
+ </tr>
+ <tr>
+ <td><h5>security.ssl.rest.keystore</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>The Java keystore file with SSL Key and Certificate, to be used Flink's external REST endpoints.</td>
+ </tr>
+ <tr>
+ <td><h5>security.ssl.rest.keystore-password</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>The secret to decrypt the keystore file for Flink's for Flink's external REST endpoints.</td>
+ </tr>
+ <tr>
+ <td><h5>security.ssl.rest.truststore</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>The truststore file containing the public CA certificates to verify the peer for Flink's external REST endpoints.</td>
+ </tr>
+ <tr>
+ <td><h5>security.ssl.rest.truststore-password</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>The password to decrypt the truststore for Flink's external REST endpoints.</td>
+ </tr>
+ <tr>
<td><h5>security.ssl.truststore</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>The truststore file containing the public CA certificates to be used by flink endpoints to verify the peer’s certificate.</td>
http://git-wip-us.apache.org/repos/asf/flink/blob/30c4bc84/docs/_includes/generated/task_manager_configuration.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/task_manager_configuration.html b/docs/_includes/generated/task_manager_configuration.html
index e780f6e..c18c5d4 100644
--- a/docs/_includes/generated/task_manager_configuration.html
+++ b/docs/_includes/generated/task_manager_configuration.html
@@ -35,7 +35,7 @@
<tr>
<td><h5>taskmanager.data.ssl.enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
- <td>Enable SSL support for the taskmanager data transport. This is applicable only when the global ssl flag security.ssl.enabled is set to true</td>
+ <td>Enable SSL support for the taskmanager data transport. This is applicable only when the global flag for internal SSL (security.ssl.internal.enabled) is set to true</td>
</tr>
<tr>
<td><h5>taskmanager.debug.memory.log</h5></td>
http://git-wip-us.apache.org/repos/asf/flink/blob/30c4bc84/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java
index 13cdc1e..12e7f24 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java
@@ -80,7 +80,7 @@ public class HistoryServerOptions {
/**
* Enables/Disables SSL support for the HistoryServer web-frontend. Only relevant if
- * {@link SecurityOptions#SSL_ENABLED} is enabled.
+ * {@link SecurityOptions#SSL_REST_ENABLED} is enabled.
*/
public static final ConfigOption<Boolean> HISTORY_SERVER_WEB_SSL_ENABLED =
key("historyserver.web.ssl.enabled")
http://git-wip-us.apache.org/repos/asf/flink/blob/30c4bc84/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
index fc7e391..03ee4f2 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
@@ -85,14 +85,40 @@ public class SecurityOptions {
// ------------------------------------------------------------------------
/**
- * Enable SSL support.
+ * Enable SSL for internal (rpc, data transport, blob server) and external (HTTP/REST) communication.
+ *
+ * @deprecated Use {@link #SSL_INTERNAL_ENABLED} and {@link #SSL_REST_ENABLED} instead.
*/
- @Documentation.CommonOption(position = Documentation.CommonOption.POSITION_SECURITY)
+ @Deprecated
public static final ConfigOption<Boolean> SSL_ENABLED =
key("security.ssl.enabled")
.defaultValue(false)
- .withDescription("Turns on SSL for internal network communication. This can be optionally overridden by" +
- " flags defined in different transport modules.");
+ .withDescription("Turns on SSL for internal and external network communication." +
+ "This can be overridden by 'security.ssl.internal.enabled', 'security.ssl.external.enabled'. " +
+ "Specific internal components (rpc, data transport, blob server) may optionally override " +
+ "this through their own settings.");
+
+ /**
+ * Enable SSL for internal communication (akka rpc, netty data transport, blob server).
+ */
+ @Documentation.CommonOption(position = Documentation.CommonOption.POSITION_SECURITY)
+ public static final ConfigOption<Boolean> SSL_INTERNAL_ENABLED =
+ key("security.ssl.internal.enabled")
+ .defaultValue(false)
+ .withDescription("Turns on SSL for internal network communication. " +
+ "Optionally, specific components may override this through their own settings " +
+ "(rpc, data transport, REST, etc).");
+
+ /**
+ * Enable SSL for external REST endpoints.
+ */
+ @Documentation.CommonOption(position = Documentation.CommonOption.POSITION_SECURITY)
+ public static final ConfigOption<Boolean> SSL_REST_ENABLED =
+ key("security.ssl.rest.enabled")
+ .defaultValue(false)
+ .withDescription("Turns on SSL for external communication via the REST endpoints.");
+
+ // ----------------- certificates (internal + external) -------------------
/**
* The Java keystore file containing the flink endpoint key and certificate.
@@ -135,6 +161,102 @@ public class SecurityOptions {
.noDefaultValue()
.withDescription("The secret to decrypt the truststore.");
+ // ----------------------- certificates (internal) ------------------------
+
+ /**
+ * For internal SSL, the Java keystore file containing the private key and certificate.
+ */
+ public static final ConfigOption<String> SSL_INTERNAL_KEYSTORE =
+ key("security.ssl.internal.keystore")
+ .noDefaultValue()
+ .withDescription("The Java keystore file with SSL Key and Certificate, " +
+ "to be used Flink's internal endpoints (rpc, data transport, blob server).");
+
+ /**
+ * For internal SSL, the password to decrypt the keystore file containing the certificate.
+ */
+ public static final ConfigOption<String> SSL_INTERNAL_KEYSTORE_PASSWORD =
+ key("security.ssl.internal.keystore-password")
+ .noDefaultValue()
+ .withDescription("The secret to decrypt the keystore file for Flink's " +
+ "for Flink's internal endpoints (rpc, data transport, blob server).");
+
+ /**
+ * For internal SSL, the password to decrypt the private key.
+ */
+ public static final ConfigOption<String> SSL_INTERNAL_KEY_PASSWORD =
+ key("security.ssl.internal.key-password")
+ .noDefaultValue()
+ .withDescription("The secret to decrypt the key in the keystore " +
+ "for Flink's internal endpoints (rpc, data transport, blob server).");
+
+ /**
+ * For internal SSL, the truststore file containing the public CA certificates to verify the ssl peers.
+ */
+ public static final ConfigOption<String> SSL_INTERNAL_TRUSTSTORE =
+ key("security.ssl.internal.truststore")
+ .noDefaultValue()
+ .withDescription("The truststore file containing the public CA certificates to verify the peer " +
+ "for Flink's internal endpoints (rpc, data transport, blob server).");
+
+ /**
+ * For internal SSL, the secret to decrypt the truststore.
+ */
+ public static final ConfigOption<String> SSL_INTERNAL_TRUSTSTORE_PASSWORD =
+ key("security.ssl.internal.truststore-password")
+ .noDefaultValue()
+ .withDescription("The password to decrypt the truststore " +
+ "for Flink's internal endpoints (rpc, data transport, blob server).");
+
+ // ----------------------- certificates (external) ------------------------
+
+ /**
+ * For external (REST) SSL, the Java keystore file containing the private key and certificate.
+ */
+ public static final ConfigOption<String> SSL_REST_KEYSTORE =
+ key("security.ssl.rest.keystore")
+ .noDefaultValue()
+ .withDescription("The Java keystore file with SSL Key and Certificate, " +
+ "to be used Flink's external REST endpoints.");
+
+ /**
+ * For external (REST) SSL, the password to decrypt the keystore file containing the certificate.
+ */
+ public static final ConfigOption<String> SSL_REST_KEYSTORE_PASSWORD =
+ key("security.ssl.rest.keystore-password")
+ .noDefaultValue()
+ .withDescription("The secret to decrypt the keystore file for Flink's " +
+ "for Flink's external REST endpoints.");
+
+ /**
+ * For external (REST) SSL, the password to decrypt the private key.
+ */
+ public static final ConfigOption<String> SSL_REST_KEY_PASSWORD =
+ key("security.ssl.rest.key-password")
+ .noDefaultValue()
+ .withDescription("The secret to decrypt the key in the keystore " +
+ "for Flink's external REST endpoints.");
+
+ /**
+ * For external (REST) SSL, the truststore file containing the public CA certificates to verify the ssl peers.
+ */
+ public static final ConfigOption<String> SSL_REST_TRUSTSTORE =
+ key("security.ssl.rest.truststore")
+ .noDefaultValue()
+ .withDescription("The truststore file containing the public CA certificates to verify the peer " +
+ "for Flink's external REST endpoints.");
+
+ /**
+ * For external (REST) SSL, the secret to decrypt the truststore.
+ */
+ public static final ConfigOption<String> SSL_REST_TRUSTSTORE_PASSWORD =
+ key("security.ssl.rest.truststore-password")
+ .noDefaultValue()
+ .withDescription("The password to decrypt the truststore " +
+ "for Flink's external REST endpoints.");
+
+ // ------------------------ ssl parameters --------------------------------
+
/**
* SSL protocol version to be supported.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/30c4bc84/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index 2907f6b..be76cf8 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -116,7 +116,7 @@ public class TaskManagerOptions {
key("taskmanager.data.ssl.enabled")
.defaultValue(true)
.withDescription("Enable SSL support for the taskmanager data transport. This is applicable only when the" +
- " global ssl flag " + SecurityOptions.SSL_ENABLED.key() + " is set to true");
+ " global flag for internal SSL (" + SecurityOptions.SSL_INTERNAL_ENABLED.key() + ") is set to true");
/**
* The initial registration backoff between two consecutive registration attempts. The backoff
http://git-wip-us.apache.org/repos/asf/flink/blob/30c4bc84/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
index d0d41e2..1fa2cd0 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
@@ -24,6 +24,7 @@ import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.mesos.configuration.MesosOptions;
+import org.apache.flink.runtime.net.SSLEngineFactory;
import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
import org.apache.flink.runtime.rest.handler.router.Router;
@@ -58,7 +59,6 @@ import org.apache.flink.shaded.netty4.io.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import java.io.File;
@@ -104,8 +104,6 @@ public class MesosArtifactServer implements MesosArtifactResolver {
private final Map<Path, URL> paths = new HashMap<>();
- private final SSLContext serverSSLContext;
-
public MesosArtifactServer(String prefix, String serverHostname, int configuredPort, Configuration config)
throws Exception {
if (configuredPort < 0 || configuredPort > 0xFFFF) {
@@ -113,19 +111,20 @@ public class MesosArtifactServer implements MesosArtifactResolver {
}
// Config to enable https access to the artifact server
- boolean enableSSL = config.getBoolean(
+ final boolean enableSSL = config.getBoolean(
MesosOptions.ARTIFACT_SERVER_SSL_ENABLED) &&
- SSLUtils.getSSLEnabled(config);
+ SSLUtils.isRestSSLEnabled(config);
+ final SSLEngineFactory sslFactory;
if (enableSSL) {
LOG.info("Enabling ssl for the artifact server");
try {
- serverSSLContext = SSLUtils.createSSLServerContext(config);
+ sslFactory = SSLUtils.createRestServerSSLEngineFactory(config);
} catch (Exception e) {
throw new IOException("Failed to initialize SSLContext for the artifact server", e);
}
} else {
- serverSSLContext = null;
+ sslFactory = null;
}
router = new Router();
@@ -138,10 +137,8 @@ public class MesosArtifactServer implements MesosArtifactResolver {
RouterHandler handler = new RouterHandler(router, new HashMap<>());
// SSL should be the first handler in the pipeline
- if (serverSSLContext != null) {
- SSLEngine sslEngine = serverSSLContext.createSSLEngine();
- SSLUtils.setSSLVerAndCipherSuites(sslEngine, sslConfig);
- sslEngine.setUseClientMode(false);
+ if (sslFactory != null) {
+ SSLEngine sslEngine = sslFactory.createSSLEngine();
ch.pipeline().addLast("ssl", new SslHandler(sslEngine));
}
@@ -169,7 +166,7 @@ public class MesosArtifactServer implements MesosArtifactResolver {
String address = bindAddress.getAddress().getHostAddress();
int port = bindAddress.getPort();
- String httpProtocol = (serverSSLContext != null) ? "https" : "http";
+ String httpProtocol = (sslFactory != null) ? "https" : "http";
baseURL = new URL(httpProtocol, serverHostname, port, "/" + prefix + "/");
http://git-wip-us.apache.org/repos/asf/flink/blob/30c4bc84/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 976b080..39c8a3c 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.net.SSLEngineFactory;
import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.runtime.rest.handler.WebHandler;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
@@ -92,8 +93,6 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.net.ssl.SSLContext;
-
import java.io.File;
import java.io.IOException;
import java.util.UUID;
@@ -130,8 +129,6 @@ public class WebRuntimeMonitor implements WebMonitor {
/** Service which retrieves the currently leading JobManager and opens a JobManagerGateway. */
private final LeaderGatewayRetriever<JobManagerGateway> retriever;
- private final SSLContext serverSSLContext;
-
private final CompletableFuture<String> localRestAddress = new CompletableFuture<>();
private final Time timeout;
@@ -234,17 +231,17 @@ public class WebRuntimeMonitor implements WebMonitor {
// --------------------------------------------------------------------
// Config to enable https access to the web-ui
- boolean enableSSL = config.getBoolean(WebOptions.SSL_ENABLED) && SSLUtils.getSSLEnabled(config);
-
+ final SSLEngineFactory sslFactory;
+ final boolean enableSSL = SSLUtils.isRestSSLEnabled(config) && config.getBoolean(WebOptions.SSL_ENABLED);
if (enableSSL) {
LOG.info("Enabling ssl for the web frontend");
try {
- serverSSLContext = SSLUtils.createSSLServerContext(config);
+ sslFactory = SSLUtils.createRestServerSSLEngineFactory(config);
} catch (Exception e) {
throw new IOException("Failed to initialize SSLContext for the web frontend", e);
}
} else {
- serverSSLContext = null;
+ sslFactory = null;
}
metricFetcher = new MetricFetcher(retriever, queryServiceRetriever, scheduledExecutor, timeout);
@@ -385,7 +382,7 @@ public class WebRuntimeMonitor implements WebMonitor {
// add shutdown hook for deleting the directories and remaining temp files on shutdown
ShutdownHookUtil.addShutdownHook(this::cleanup, getClass().getSimpleName(), LOG);
- this.netty = new WebFrontendBootstrap(router, LOG, uploadDir, serverSSLContext, configuredAddress, configuredPort, config);
+ this.netty = new WebFrontendBootstrap(router, LOG, uploadDir, sslFactory, configuredAddress, configuredPort, config);
localRestAddress.complete(netty.getRestAddress());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/30c4bc84/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
index 7e7e921..0891426 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.HistoryServerOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.history.FsJobArchivist;
+import org.apache.flink.runtime.net.SSLEngineFactory;
import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler;
import org.apache.flink.runtime.rest.handler.router.Router;
@@ -42,8 +43,6 @@ import org.apache.flink.util.ShutdownHookUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.net.ssl.SSLContext;
-
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
@@ -89,7 +88,7 @@ public class HistoryServer {
private final HistoryServerArchiveFetcher archiveFetcher;
- private final SSLContext serverSSLContext;
+ private final SSLEngineFactory serverSSLFactory;
private WebFrontendBootstrap netty;
private final Object startupShutdownLock = new Object();
@@ -143,15 +142,15 @@ public class HistoryServer {
Preconditions.checkNotNull(numFinishedPolls);
this.config = config;
- if (config.getBoolean(HistoryServerOptions.HISTORY_SERVER_WEB_SSL_ENABLED) && SSLUtils.getSSLEnabled(config)) {
+ if (config.getBoolean(HistoryServerOptions.HISTORY_SERVER_WEB_SSL_ENABLED) && SSLUtils.isRestSSLEnabled(config)) {
LOG.info("Enabling SSL for the history server.");
try {
- this.serverSSLContext = SSLUtils.createSSLServerContext(config);
+ this.serverSSLFactory = SSLUtils.createRestServerSSLEngineFactory(config);
} catch (Exception e) {
throw new IOException("Failed to initialize SSLContext for the history server.", e);
}
} else {
- this.serverSSLContext = null;
+ this.serverSSLFactory = null;
}
webAddress = config.getString(HistoryServerOptions.HISTORY_SERVER_WEB_ADDRESS);
@@ -231,7 +230,7 @@ public class HistoryServer {
archiveFetcher.start();
- netty = new WebFrontendBootstrap(router, LOG, webDir, serverSSLContext, webAddress, webPort, config);
+ netty = new WebFrontendBootstrap(router, LOG, webDir, serverSSLFactory, webAddress, webPort, config);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/30c4bc84/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
index bec9ea2..672fddb 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.webmonitor.utils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.runtime.net.SSLUtils;
+import org.apache.flink.runtime.net.SSLEngineFactory;
import org.apache.flink.runtime.rest.handler.router.Router;
import org.apache.flink.runtime.rest.handler.router.RouterHandler;
import org.apache.flink.runtime.webmonitor.HttpRequestHandler;
@@ -40,7 +40,7 @@ import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandle
import org.slf4j.Logger;
-import javax.net.ssl.SSLContext;
+import javax.annotation.Nullable;
import javax.net.ssl.SSLEngine;
import java.io.File;
@@ -56,7 +56,6 @@ public class WebFrontendBootstrap {
private final Router router;
private final Logger log;
private final File uploadDir;
- private final SSLContext serverSSLContext;
private final ServerBootstrap bootstrap;
private final Channel serverChannel;
private final String restAddress;
@@ -65,14 +64,14 @@ public class WebFrontendBootstrap {
Router router,
Logger log,
File directory,
- SSLContext sslContext,
+ @Nullable SSLEngineFactory serverSSLFactory,
String configuredAddress,
int configuredPort,
final Configuration config) throws InterruptedException, UnknownHostException {
+
this.router = Preconditions.checkNotNull(router);
this.log = Preconditions.checkNotNull(log);
this.uploadDir = directory;
- this.serverSSLContext = sslContext;
ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
@@ -81,10 +80,8 @@ public class WebFrontendBootstrap {
RouterHandler handler = new RouterHandler(WebFrontendBootstrap.this.router, new HashMap<>());
// SSL should be the first handler in the pipeline
- if (serverSSLContext != null) {
- SSLEngine sslEngine = serverSSLContext.createSSLEngine();
- SSLUtils.setSSLVerAndCipherSuites(sslEngine, config);
- sslEngine.setUseClientMode(false);
+ if (serverSSLFactory != null) {
+ SSLEngine sslEngine = serverSSLFactory.createSSLEngine();
ch.pipeline().addLast("ssl", new SslHandler(sslEngine));
}
@@ -129,7 +126,7 @@ public class WebFrontendBootstrap {
this.log.info("Web frontend listening at {}" + ':' + "{}", address, port);
- final String protocol = serverSSLContext != null ? "https://" : "http://";
+ final String protocol = serverSSLFactory != null ? "https://" : "http://";
this.restAddress = protocol + address + ':' + port;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/30c4bc84/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
index 2ca250c..01e307e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.blob;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.net.SSLUtils;
@@ -84,7 +83,7 @@ public final class BlobClient implements Closeable {
try {
// create an SSL socket if configured
- if (clientConfig.getBoolean(SecurityOptions.SSL_ENABLED) && clientConfig.getBoolean(BlobServerOptions.SSL_ENABLED)) {
+ if (SSLUtils.isInternalSSLEnabled(clientConfig) && clientConfig.getBoolean(BlobServerOptions.SSL_ENABLED)) {
LOG.info("Using ssl connection to the blob server");
socket = SSLUtils.createSSLClientSocketFactory(clientConfig).createSocket(
http://git-wip-us.apache.org/repos/asf/flink/blob/30c4bc84/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index ee1d50a..206be0e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FileUtils;
@@ -176,7 +175,7 @@ public class BlobServer extends Thread implements BlobService, BlobWriter, Perma
final Iterator<Integer> ports = NetUtils.getPortRangeFromString(serverPortRange);
final ServerSocketFactory socketFactory;
- if (config.getBoolean(SecurityOptions.SSL_ENABLED) && config.getBoolean(BlobServerOptions.SSL_ENABLED)) {
+ if (SSLUtils.isInternalSSLEnabled(config) && config.getBoolean(BlobServerOptions.SSL_ENABLED)) {
try {
socketFactory = SSLUtils.createSSLServerSocketFactory(config);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/30c4bc84/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
index 918f1f0..778d2db 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
-import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.blob.BlobStoreService;
import org.apache.flink.runtime.blob.BlobUtils;
import org.apache.flink.runtime.dispatcher.Dispatcher;
@@ -31,6 +30,7 @@ import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneHaSe
import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
@@ -103,7 +103,7 @@ public class HighAvailabilityServicesUtils {
"%s must be set",
RestOptions.ADDRESS.key());
final int port = configuration.getInteger(RestOptions.PORT);
- final boolean enableSSL = configuration.getBoolean(SecurityOptions.SSL_ENABLED);
+ final boolean enableSSL = SSLUtils.isRestSSLEnabled(configuration);
final String protocol = enableSSL ? "https://" : "http://";
return new StandaloneHaServices(
http://git-wip-us.apache.org/repos/asf/flink/blob/30c4bc84/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
index dd40190..ab999d4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.io.network.netty;
+import org.apache.flink.runtime.net.SSLEngineFactory;
+
import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelException;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
@@ -34,9 +36,9 @@ import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.net.ssl.SSLContext;
+import javax.annotation.Nullable;
import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLParameters;
+
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -52,7 +54,8 @@ class NettyClient {
private Bootstrap bootstrap;
- private SSLContext clientSSLContext = null;
+ @Nullable
+ private SSLEngineFactory clientSSLFactory;
NettyClient(NettyConfig config) {
this.config = config;
@@ -112,7 +115,7 @@ class NettyClient {
}
try {
- clientSSLContext = config.createClientSSLContext();
+ clientSSLFactory = config.createClientSSLEngineFactory();
} catch (Exception e) {
throw new IOException("Failed to initialize SSL Context for the Netty client", e);
}
@@ -177,18 +180,10 @@ class NettyClient {
public void initChannel(SocketChannel channel) throws Exception {
// SSL handler should be added first in the pipeline
- if (clientSSLContext != null) {
- SSLEngine sslEngine = clientSSLContext.createSSLEngine(
+ if (clientSSLFactory != null) {
+ SSLEngine sslEngine = clientSSLFactory.createSSLEngine(
serverSocketAddress.getAddress().getCanonicalHostName(),
serverSocketAddress.getPort());
- sslEngine.setUseClientMode(true);
-
- // Enable hostname verification for remote SSL connections
- if (!serverSocketAddress.getAddress().isLoopbackAddress()) {
- SSLParameters newSSLParameters = sslEngine.getSSLParameters();
- config.setSSLVerifyHostname(newSSLParameters);
- sslEngine.setSSLParameters(newSSLParameters);
- }
channel.pipeline().addLast("ssl", new SslHandler(sslEngine));
}
@@ -200,7 +195,7 @@ class NettyClient {
return bootstrap.connect(serverSocketAddress);
}
catch (ChannelException e) {
- if ( (e.getCause() instanceof java.net.SocketException &&
+ if ((e.getCause() instanceof java.net.SocketException &&
e.getCause().getMessage().equals("Too many open files")) ||
(e.getCause() instanceof ChannelException &&
e.getCause().getCause() instanceof java.net.SocketException &&
http://git-wip-us.apache.org/repos/asf/flink/blob/30c4bc84/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
index 18527c4..46cdaab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
@@ -22,13 +22,14 @@ import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.net.SSLEngineFactory;
import org.apache.flink.runtime.net.SSLUtils;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLParameters;
+import javax.annotation.Nullable;
+
import java.net.InetAddress;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -89,9 +90,9 @@ public class NettyConfig {
NIO, EPOLL, AUTO
}
- final static String SERVER_THREAD_GROUP_NAME = "Flink Netty Server";
+ static final String SERVER_THREAD_GROUP_NAME = "Flink Netty Server";
- final static String CLIENT_THREAD_GROUP_NAME = "Flink Netty Client";
+ static final String CLIENT_THREAD_GROUP_NAME = "Flink Netty Client";
private final InetAddress serverAddress;
@@ -189,39 +190,23 @@ public class NettyConfig {
}
}
- public SSLContext createClientSSLContext() throws Exception {
-
- // Create SSL Context from config
- SSLContext clientSSLContext = null;
- if (getSSLEnabled()) {
- clientSSLContext = SSLUtils.createSSLClientContext(config);
- }
-
- return clientSSLContext;
+ @Nullable
+ public SSLEngineFactory createClientSSLEngineFactory() throws Exception {
+ return getSSLEnabled() ?
+ SSLUtils.createInternalClientSSLEngineFactory(config) :
+ null;
}
- public SSLContext createServerSSLContext() throws Exception {
-
- // Create SSL Context from config
- SSLContext serverSSLContext = null;
- if (getSSLEnabled()) {
- serverSSLContext = SSLUtils.createSSLServerContext(config);
- }
-
- return serverSSLContext;
+ @Nullable
+ public SSLEngineFactory createServerSSLEngineFactory() throws Exception {
+ return getSSLEnabled() ?
+ SSLUtils.createInternalServerSSLEngineFactory(config) :
+ null;
}
public boolean getSSLEnabled() {
return config.getBoolean(TaskManagerOptions.DATA_SSL_ENABLED)
- && SSLUtils.getSSLEnabled(config);
- }
-
- public void setSSLVerAndCipherSuites(SSLEngine engine) {
- SSLUtils.setSSLVerAndCipherSuites(engine, config);
- }
-
- public void setSSLVerifyHostname(SSLParameters sslParams) {
- SSLUtils.setSSLVerifyHostname(config, sslParams);
+ && SSLUtils.isInternalSSLEnabled(config);
}
public boolean isCreditBasedEnabled() {
@@ -245,7 +230,7 @@ public class NettyConfig {
String def = "use Netty's default";
String man = "manual";
- return String.format(format, serverAddress, serverPort, getSSLEnabled() ? "true":"false",
+ return String.format(format, serverAddress, serverPort, getSSLEnabled() ? "true" : "false",
memorySegmentSize, getTransportType(), getServerNumThreads(),
getServerNumThreads() == 0 ? def : man,
getClientNumThreads(), getClientNumThreads() == 0 ? def : man,
http://git-wip-us.apache.org/repos/asf/flink/blob/30c4bc84/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
index 81bc50d..cc260c6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.io.network.netty;
+import org.apache.flink.runtime.net.SSLEngineFactory;
import org.apache.flink.runtime.util.FatalExitExceptionHandler;
import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -36,7 +37,6 @@ import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import java.io.IOException;
@@ -61,8 +61,6 @@ class NettyServer {
private ChannelFuture bindFuture;
- private SSLContext serverSSLContext = null;
-
private InetSocketAddress localAddress;
NettyServer(NettyConfig config) {
@@ -138,8 +136,9 @@ class NettyServer {
}
// SSL related configuration
+ final SSLEngineFactory sslEngineFactory;
try {
- serverSSLContext = config.createServerSSLContext();
+ sslEngineFactory = config.createServerSSLEngineFactory();
} catch (Exception e) {
throw new IOException("Failed to initialize SSL Context for the Netty Server", e);
}
@@ -151,10 +150,8 @@ class NettyServer {
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
- if (serverSSLContext != null) {
- SSLEngine sslEngine = serverSSLContext.createSSLEngine();
- config.setSSLVerAndCipherSuites(sslEngine);
- sslEngine.setUseClientMode(false);
+ if (sslEngineFactory != null) {
+ SSLEngine sslEngine = sslEngineFactory.createSSLEngine();
channel.pipeline().addLast("ssl", new SslHandler(sslEngine));
}
http://git-wip-us.apache.org/repos/asf/flink/blob/30c4bc84/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLEngineFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLEngineFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLEngineFactory.java
index 7aca60c..d842267 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLEngineFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLEngineFactory.java
@@ -49,9 +49,19 @@ public class SSLEngineFactory {
public SSLEngine createSSLEngine() {
final SSLEngine sslEngine = sslContext.createSSLEngine();
+ configureSSLEngine(sslEngine);
+ return sslEngine;
+ }
+
+ public SSLEngine createSSLEngine(String hostname, int port) {
+ final SSLEngine sslEngine = sslContext.createSSLEngine(hostname, port);
+ configureSSLEngine(sslEngine);
+ return sslEngine;
+ }
+
+ private void configureSSLEngine(SSLEngine sslEngine) {
sslEngine.setEnabledProtocols(enabledProtocols);
sslEngine.setEnabledCipherSuites(enabledCipherSuites);
sslEngine.setUseClientMode(clientMode);
- return sslEngine;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/30c4bc84/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
index 2bfc0d6..5c95535 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
@@ -18,58 +18,59 @@
package org.apache.flink.runtime.net;
+import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.SecurityOptions;
-import org.apache.flink.util.Preconditions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import javax.net.ServerSocketFactory;
import javax.net.SocketFactory;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLParameters;
import javax.net.ssl.SSLServerSocket;
import javax.net.ssl.SSLServerSocketFactory;
import javax.net.ssl.TrustManagerFactory;
import java.io.File;
-import java.io.FileInputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.net.InetAddress;
import java.net.ServerSocket;
+import java.nio.file.Files;
import java.security.KeyStore;
import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
/**
* Common utilities to manage SSL transport settings.
*/
public class SSLUtils {
- private static final Logger LOG = LoggerFactory.getLogger(SSLUtils.class);
+ /**
+ * Checks whether SSL for internal communication (rpc, data transport, blob server) is enabled.
+ */
+ public static boolean isInternalSSLEnabled(Configuration sslConfig) {
+ @SuppressWarnings("deprecation")
+ final boolean fallbackFlag = sslConfig.getBoolean(SecurityOptions.SSL_ENABLED);
+ return sslConfig.getBoolean(SecurityOptions.SSL_INTERNAL_ENABLED, fallbackFlag);
+ }
/**
- * Retrieves the global ssl flag from configuration.
- *
- * @param sslConfig
- * The application configuration
- * @return true if global ssl flag is set
+ * Checks whether SSL for the external REST endpoint is enabled.
*/
- public static boolean getSSLEnabled(Configuration sslConfig) {
- return sslConfig.getBoolean(SecurityOptions.SSL_ENABLED);
+ public static boolean isRestSSLEnabled(Configuration sslConfig) {
+ @SuppressWarnings("deprecation")
+ final boolean fallbackFlag = sslConfig.getBoolean(SecurityOptions.SSL_ENABLED);
+ return sslConfig.getBoolean(SecurityOptions.SSL_REST_ENABLED, fallbackFlag);
}
/**
* Creates a factory for SSL Server Sockets from the given configuration.
+ * SSL Server Sockets are always part of internal communication.
*/
public static ServerSocketFactory createSSLServerSocketFactory(Configuration config) throws Exception {
- SSLContext sslContext = createSSLServerContext(config);
+ SSLContext sslContext = createInternalSSLContext(config);
if (sslContext == null) {
throw new IllegalConfigurationException("SSL is not enabled");
}
@@ -83,9 +84,10 @@ public class SSLUtils {
/**
* Creates a factory for SSL Client Sockets from the given configuration.
+ * SSL Client Sockets are always part of internal communication.
*/
public static SocketFactory createSSLClientSocketFactory(Configuration config) throws Exception {
- SSLContext sslContext = createSSLServerContext(config);
+ SSLContext sslContext = createInternalSSLContext(config);
if (sslContext == null) {
throw new IllegalConfigurationException("SSL is not enabled");
}
@@ -94,51 +96,71 @@ public class SSLUtils {
}
/**
- * Creates a {@link SSLEngineFactory} to be used by the Server.
- *
- * @param config The application configuration.
+ * Creates a SSLEngineFactory to be used by internal communication server endpoints.
*/
- public static SSLEngineFactory createServerSSLEngineFactory(final Configuration config) throws Exception {
- return createSSLEngineFactory(config, false);
+ public static SSLEngineFactory createInternalServerSSLEngineFactory(final Configuration config) throws Exception {
+ SSLContext sslContext = createInternalSSLContext(config);
+ if (sslContext == null) {
+ throw new IllegalConfigurationException("SSL is not enabled for internal communication.");
+ }
+
+ return new SSLEngineFactory(
+ sslContext,
+ getEnabledProtocols(config),
+ getEnabledCipherSuites(config),
+ false);
}
/**
- * Creates a {@link SSLEngineFactory} to be used by the Client.
- * @param config The application configuration.
+ * Creates a SSLEngineFactory to be used by internal communication client endpoints.
*/
- public static SSLEngineFactory createClientSSLEngineFactory(final Configuration config) throws Exception {
- return createSSLEngineFactory(config, true);
- }
-
- private static SSLEngineFactory createSSLEngineFactory(
- final Configuration config,
- final boolean clientMode) throws Exception {
+ public static SSLEngineFactory createInternalClientSSLEngineFactory(final Configuration config) throws Exception {
+ SSLContext sslContext = createInternalSSLContext(config);
+ if (sslContext == null) {
+ throw new IllegalConfigurationException("SSL is not enabled for internal communication.");
+ }
- final SSLContext sslContext = clientMode ?
- createSSLClientContext(config) :
- createSSLServerContext(config);
+ return new SSLEngineFactory(
+ sslContext,
+ getEnabledProtocols(config),
+ getEnabledCipherSuites(config),
+ true);
+ }
- checkState(sslContext != null, "%s it not enabled", SecurityOptions.SSL_ENABLED.key());
+ /**
+ * Creates a {@link SSLEngineFactory} to be used by the REST Servers.
+ *
+ * @param config The application configuration.
+ */
+ public static SSLEngineFactory createRestServerSSLEngineFactory(final Configuration config) throws Exception {
+ SSLContext sslContext = createRestServerSSLContext(config);
+ if (sslContext == null) {
+ throw new IllegalConfigurationException("SSL is not enabled for REST endpoints.");
+ }
return new SSLEngineFactory(
- sslContext,
- getEnabledProtocols(config),
- getEnabledCipherSuites(config),
- clientMode);
+ sslContext,
+ getEnabledProtocols(config),
+ getEnabledCipherSuites(config),
+ false);
}
/**
- * Sets SSL version and cipher suites for SSLEngine.
+ * Creates a {@link SSLEngineFactory} to be used by the REST Clients.
*
- * @param engine SSLEngine to be handled
- * @param config The application configuration
- * @deprecated Use {@link #createClientSSLEngineFactory(Configuration)} or
- * {@link #createServerSSLEngineFactory(Configuration)}.
+ * @param config The application configuration.
*/
- @Deprecated
- public static void setSSLVerAndCipherSuites(SSLEngine engine, Configuration config) {
- engine.setEnabledProtocols(getEnabledProtocols(config));
- engine.setEnabledCipherSuites(getEnabledCipherSuites(config));
+ public static SSLEngineFactory createRestClientSSLEngineFactory(final Configuration config) throws Exception {
+ SSLContext sslContext = createRestClientSSLContext(config);
+ if (sslContext == null) {
+ throw new IllegalConfigurationException("SSL is not enabled for REST endpoints.");
+ }
+
+ return new SSLEngineFactory(
+ sslContext,
+ getEnabledProtocols(config),
+ getEnabledCipherSuites(config),
+ true);
}
private static String[] getEnabledProtocols(final Configuration config) {
@@ -152,120 +174,138 @@ public class SSLUtils {
}
/**
- * Sets SSL options to verify peer's hostname in the certificate.
- *
- * @param sslConfig
- * The application configuration
- * @param sslParams
- * The SSL parameters that need to be updated
+ * Creates the SSL Context for internal SSL, if internal SSL is configured.
+ * For internal SSL, the client and server side configuration are identical, because
+ * of mutual authentication.
*/
- public static void setSSLVerifyHostname(Configuration sslConfig, SSLParameters sslParams) {
+ @Nullable
+ public static SSLContext createInternalSSLContext(Configuration config) throws Exception {
+ checkNotNull(config, "config");
+
+ if (!isInternalSSLEnabled(config)) {
+ return null;
+ }
+ String keystoreFilePath = getAndCheckOption(
+ config, SecurityOptions.SSL_INTERNAL_KEYSTORE, SecurityOptions.SSL_KEYSTORE);
+
+ String keystorePassword = getAndCheckOption(
+ config, SecurityOptions.SSL_INTERNAL_KEYSTORE_PASSWORD, SecurityOptions.SSL_KEYSTORE_PASSWORD);
+
+ String certPassword = getAndCheckOption(
+ config, SecurityOptions.SSL_INTERNAL_KEY_PASSWORD, SecurityOptions.SSL_KEY_PASSWORD);
- Preconditions.checkNotNull(sslConfig);
- Preconditions.checkNotNull(sslParams);
+ String trustStoreFilePath = getAndCheckOption(
+ config, SecurityOptions.SSL_INTERNAL_TRUSTSTORE, SecurityOptions.SSL_TRUSTSTORE);
- boolean verifyHostname = sslConfig.getBoolean(SecurityOptions.SSL_VERIFY_HOSTNAME);
- if (verifyHostname) {
- sslParams.setEndpointIdentificationAlgorithm("HTTPS");
+ String trustStorePassword = getAndCheckOption(
+ config, SecurityOptions.SSL_INTERNAL_TRUSTSTORE_PASSWORD, SecurityOptions.SSL_TRUSTSTORE_PASSWORD);
+
+ String sslProtocolVersion = config.getString(SecurityOptions.SSL_PROTOCOL);
+
+ KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
+ try (InputStream keyStoreFile = Files.newInputStream(new File(keystoreFilePath).toPath())) {
+ keyStore.load(keyStoreFile, keystorePassword.toCharArray());
+ }
+
+ KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
+ try (InputStream trustStoreFile = Files.newInputStream(new File(trustStoreFilePath).toPath())) {
+ trustStore.load(trustStoreFile, trustStorePassword.toCharArray());
}
+
+ KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+ kmf.init(keyStore, certPassword.toCharArray());
+
+ TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+ tmf.init(trustStore);
+
+ SSLContext sslContext = SSLContext.getInstance(sslProtocolVersion);
+ sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
+
+ return sslContext;
}
/**
- * Creates the SSL Context for the client if SSL is configured.
- *
- * @param sslConfig
- * The application configuration
- * @return The SSLContext object which can be used by the ssl transport client
- * Returns null if SSL is disabled
- * @throws Exception
- * Thrown if there is any misconfiguration
+ * Creates an SSL context for the external REST endpoint server.
*/
@Nullable
- public static SSLContext createSSLClientContext(Configuration sslConfig) throws Exception {
+ public static SSLContext createRestServerSSLContext(Configuration config) throws Exception {
+ checkNotNull(config, "config");
- Preconditions.checkNotNull(sslConfig);
- SSLContext clientSSLContext = null;
+ if (!isRestSSLEnabled(config)) {
+ return null;
+ }
- if (getSSLEnabled(sslConfig)) {
- LOG.debug("Creating client SSL context from configuration");
+ String keystoreFilePath = getAndCheckOption(
+ config, SecurityOptions.SSL_REST_KEYSTORE, SecurityOptions.SSL_KEYSTORE);
- String trustStoreFilePath = sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE);
- String trustStorePassword = sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD);
- String sslProtocolVersion = sslConfig.getString(SecurityOptions.SSL_PROTOCOL);
+ String keystorePassword = getAndCheckOption(
+ config, SecurityOptions.SSL_REST_KEYSTORE_PASSWORD, SecurityOptions.SSL_KEYSTORE_PASSWORD);
- Preconditions.checkNotNull(trustStoreFilePath, SecurityOptions.SSL_TRUSTSTORE.key() + " was not configured.");
- Preconditions.checkNotNull(trustStorePassword, SecurityOptions.SSL_TRUSTSTORE_PASSWORD.key() + " was not configured.");
+ String certPassword = getAndCheckOption(
+ config, SecurityOptions.SSL_REST_KEY_PASSWORD, SecurityOptions.SSL_KEY_PASSWORD);
- KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
+ String sslProtocolVersion = config.getString(SecurityOptions.SSL_PROTOCOL);
- FileInputStream trustStoreFile = null;
- try {
- trustStoreFile = new FileInputStream(new File(trustStoreFilePath));
- trustStore.load(trustStoreFile, trustStorePassword.toCharArray());
- } finally {
- if (trustStoreFile != null) {
- trustStoreFile.close();
- }
- }
+ KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
+ try (InputStream keyStoreFile = Files.newInputStream(new File(keystoreFilePath).toPath())) {
+ keyStore.load(keyStoreFile, keystorePassword.toCharArray());
+ }
- TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(
- TrustManagerFactory.getDefaultAlgorithm());
- trustManagerFactory.init(trustStore);
+ KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+ kmf.init(keyStore, certPassword.toCharArray());
- clientSSLContext = SSLContext.getInstance(sslProtocolVersion);
- clientSSLContext.init(null, trustManagerFactory.getTrustManagers(), null);
- }
+ SSLContext sslContext = SSLContext.getInstance(sslProtocolVersion);
+ sslContext.init(kmf.getKeyManagers(), null, null);
- return clientSSLContext;
+ return sslContext;
}
/**
- * Creates the SSL Context for the server if SSL is configured.
- *
- * @param sslConfig
- * The application configuration
- * @return The SSLContext object which can be used by the ssl transport server
- * Returns null if SSL is disabled
- * @throws Exception
- * Thrown if there is any misconfiguration
+ * Creates an SSL context for clients against the external REST endpoint.
*/
@Nullable
- public static SSLContext createSSLServerContext(Configuration sslConfig) throws Exception {
+ public static SSLContext createRestClientSSLContext(Configuration config) throws Exception {
+ checkNotNull(config, "config");
- Preconditions.checkNotNull(sslConfig);
- SSLContext serverSSLContext = null;
+ if (!isRestSSLEnabled(config)) {
+ return null;
+ }
- if (getSSLEnabled(sslConfig)) {
- LOG.debug("Creating server SSL context from configuration");
+ String trustStoreFilePath = getAndCheckOption(
+ config, SecurityOptions.SSL_REST_TRUSTSTORE, SecurityOptions.SSL_TRUSTSTORE);
- String keystoreFilePath = sslConfig.getString(SecurityOptions.SSL_KEYSTORE);
+ String trustStorePassword = getAndCheckOption(
+ config, SecurityOptions.SSL_REST_TRUSTSTORE_PASSWORD, SecurityOptions.SSL_TRUSTSTORE_PASSWORD);
- String keystorePassword = sslConfig.getString(SecurityOptions.SSL_KEYSTORE_PASSWORD);
+ String sslProtocolVersion = config.getString(SecurityOptions.SSL_PROTOCOL);
- String certPassword = sslConfig.getString(SecurityOptions.SSL_KEY_PASSWORD);
+ KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
+ try (InputStream trustStoreFile = Files.newInputStream(new File(trustStoreFilePath).toPath())) {
+ trustStore.load(trustStoreFile, trustStorePassword.toCharArray());
+ }
- String sslProtocolVersion = sslConfig.getString(SecurityOptions.SSL_PROTOCOL);
+ TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+ tmf.init(trustStore);
- Preconditions.checkNotNull(keystoreFilePath, SecurityOptions.SSL_KEYSTORE.key() + " was not configured.");
- Preconditions.checkNotNull(keystorePassword, SecurityOptions.SSL_KEYSTORE_PASSWORD.key() + " was not configured.");
- Preconditions.checkNotNull(certPassword, SecurityOptions.SSL_KEY_PASSWORD.key() + " was not configured.");
+ SSLContext sslContext = SSLContext.getInstance(sslProtocolVersion);
+ sslContext.init(null, tmf.getTrustManagers(), null);
- KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
- try (FileInputStream keyStoreFile = new FileInputStream(new File(keystoreFilePath))) {
- ks.load(keyStoreFile, keystorePassword.toCharArray());
- }
+ return sslContext;
+ }
- // Set up key manager factory to use the server key store
- KeyManagerFactory kmf = KeyManagerFactory.getInstance(
- KeyManagerFactory.getDefaultAlgorithm());
- kmf.init(ks, certPassword.toCharArray());
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
- // Initialize the SSLContext
- serverSSLContext = SSLContext.getInstance(sslProtocolVersion);
- serverSSLContext.init(kmf.getKeyManagers(), null, null);
+ private static String getAndCheckOption(Configuration config, ConfigOption<String> primaryOption, ConfigOption<String> fallbackOption) {
+ String value = config.getString(primaryOption, config.getString(fallbackOption));
+ if (value != null) {
+ return value;
+ }
+ else {
+ throw new IllegalConfigurationException("The config option " + primaryOption.key() +
+ " or " + fallbackOption.key() + " is missing.");
}
-
- return serverSSLContext;
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/30c4bc84/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
index b1591be..cbd888d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.rest;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
-import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.net.SSLEngineFactory;
import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.util.ConfigurationException;
@@ -91,10 +90,9 @@ public final class RestClientConfiguration {
Preconditions.checkNotNull(config);
final SSLEngineFactory sslEngineFactory;
- final boolean enableSSL = config.getBoolean(SecurityOptions.SSL_ENABLED);
- if (enableSSL) {
+ if (SSLUtils.isRestSSLEnabled(config)) {
try {
- sslEngineFactory = SSLUtils.createClientSSLEngineFactory(config);
+ sslEngineFactory = SSLUtils.createRestClientSSLEngineFactory(config);
} catch (Exception e) {
throw new ConfigurationException("Failed to initialize SSLContext for the web frontend", e);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/30c4bc84/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
index 875230f..561891f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.rest;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
-import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.net.SSLEngineFactory;
import org.apache.flink.runtime.net.SSLUtils;
@@ -157,10 +156,9 @@ public final class RestServerEndpointConfiguration {
final int port = config.getInteger(RestOptions.PORT);
final SSLEngineFactory sslEngineFactory;
- final boolean enableSSL = config.getBoolean(SecurityOptions.SSL_ENABLED) && config.getBoolean(WebOptions.SSL_ENABLED);
- if (enableSSL) {
+ if (SSLUtils.isRestSSLEnabled(config)) {
try {
- sslEngineFactory = SSLUtils.createServerSSLEngineFactory(config);
+ sslEngineFactory = SSLUtils.createRestServerSSLEngineFactory(config);
} catch (Exception e) {
throw new ConfigurationException("Failed to initialize SSLEngineFactory for REST server endpoint.", e);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/30c4bc84/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
index 6ae142b..982a536 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
@@ -130,7 +130,7 @@ public class AkkaRpcServiceUtils {
checkNotNull(config, "config is null");
final boolean sslEnabled = config.getBoolean(AkkaOptions.SSL_ENABLED) &&
- SSLUtils.getSSLEnabled(config);
+ SSLUtils.isInternalSSLEnabled(config);
return getRpcUrl(
hostname,
http://git-wip-us.apache.org/repos/asf/flink/blob/30c4bc84/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 57ca9d4..12378e0 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -336,21 +336,31 @@ object AkkaUtils {
val logLifecycleEvents = if (lifecycleEvents) "on" else "off"
val akkaEnableSSLConfig = configuration.getBoolean(AkkaOptions.SSL_ENABLED) &&
- SSLUtils.getSSLEnabled(configuration)
+ SSLUtils.isInternalSSLEnabled(configuration)
val retryGateClosedFor = configuration.getLong(AkkaOptions.RETRY_GATE_CLOSED_FOR)
val akkaEnableSSL = if (akkaEnableSSLConfig) "on" else "off"
- val akkaSSLKeyStore = configuration.getString(SecurityOptions.SSL_KEYSTORE)
+ val akkaSSLKeyStore = configuration.getString(
+ SecurityOptions.SSL_INTERNAL_KEYSTORE,
+ configuration.getString(SecurityOptions.SSL_KEYSTORE))
- val akkaSSLKeyStorePassword = configuration.getString(SecurityOptions.SSL_KEYSTORE_PASSWORD)
+ val akkaSSLKeyStorePassword = configuration.getString(
+ SecurityOptions.SSL_INTERNAL_KEYSTORE_PASSWORD,
+ configuration.getString(SecurityOptions.SSL_KEYSTORE_PASSWORD))
- val akkaSSLKeyPassword = configuration.getString(SecurityOptions.SSL_KEY_PASSWORD)
+ val akkaSSLKeyPassword = configuration.getString(
+ SecurityOptions.SSL_INTERNAL_KEY_PASSWORD,
+ configuration.getString(SecurityOptions.SSL_KEY_PASSWORD))
- val akkaSSLTrustStore = configuration.getString(SecurityOptions.SSL_TRUSTSTORE)
+ val akkaSSLTrustStore = configuration.getString(
+ SecurityOptions.SSL_INTERNAL_TRUSTSTORE,
+ configuration.getString(SecurityOptions.SSL_TRUSTSTORE))
- val akkaSSLTrustStorePassword = configuration.getString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD)
+ val akkaSSLTrustStorePassword = configuration.getString(
+ SecurityOptions.SSL_INTERNAL_TRUSTSTORE_PASSWORD,
+ configuration.getString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD))
val akkaSSLProtocol = configuration.getString(SecurityOptions.SSL_PROTOCOL)
http://git-wip-us.apache.org/repos/asf/flink/blob/30c4bc84/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
index b654cee..531f214 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.blob;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.runtime.net.SSLUtilsTest;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -55,40 +55,25 @@ public class BlobClientSslTest extends BlobClientTest {
*/
@BeforeClass
public static void startSSLServer() throws IOException {
- Configuration config = new Configuration();
- config.setString(BlobServerOptions.STORAGE_DIRECTORY,
- temporarySslFolder.newFolder().getAbsolutePath());
- config.setBoolean(SecurityOptions.SSL_ENABLED, true);
- config.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore");
- config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password");
- config.setString(SecurityOptions.SSL_KEY_PASSWORD, "password");
+ Configuration config = SSLUtilsTest.createInternalSslConfigWithKeyAndTrustStores();
+ config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporarySslFolder.newFolder().getAbsolutePath());
+
blobSslServer = new BlobServer(config, new VoidBlobStore());
blobSslServer.start();
- sslClientConfig = new Configuration();
- sslClientConfig.setBoolean(SecurityOptions.SSL_ENABLED, true);
- sslClientConfig.setString(SecurityOptions.SSL_TRUSTSTORE, "src/test/resources/local127.truststore");
- sslClientConfig.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "password");
+ sslClientConfig = config;
}
@BeforeClass
public static void startNonSSLServer() throws IOException {
- Configuration config = new Configuration();
- config.setString(BlobServerOptions.STORAGE_DIRECTORY,
- temporarySslFolder.newFolder().getAbsolutePath());
- config.setBoolean(SecurityOptions.SSL_ENABLED, true);
+ Configuration config = SSLUtilsTest.createInternalSslConfigWithKeyAndTrustStores();
+ config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporarySslFolder.newFolder().getAbsolutePath());
config.setBoolean(BlobServerOptions.SSL_ENABLED, false);
- config.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore");
- config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password");
- config.setString(SecurityOptions.SSL_KEY_PASSWORD, "password");
+
blobNonSslServer = new BlobServer(config, new VoidBlobStore());
blobNonSslServer.start();
- nonSslClientConfig = new Configuration();
- nonSslClientConfig.setBoolean(SecurityOptions.SSL_ENABLED, true);
- nonSslClientConfig.setBoolean(BlobServerOptions.SSL_ENABLED, false);
- nonSslClientConfig.setString(SecurityOptions.SSL_TRUSTSTORE, "src/test/resources/local127.truststore");
- nonSslClientConfig.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "password");
+ nonSslClientConfig = config;
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/30c4bc84/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
index 33e004e..2750660 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
@@ -151,15 +151,14 @@ public class NettyClientServerSslTest {
NettyTestUtil.shutdown(serverAndClient);
}
- private Configuration createSslConfig() throws Exception {
-
+ private static Configuration createSslConfig() throws Exception {
Configuration flinkConfig = new Configuration();
- flinkConfig.setBoolean(SecurityOptions.SSL_ENABLED, true);
- flinkConfig.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore");
- flinkConfig.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password");
- flinkConfig.setString(SecurityOptions.SSL_KEY_PASSWORD, "password");
- flinkConfig.setString(SecurityOptions.SSL_TRUSTSTORE, "src/test/resources/local127.truststore");
- flinkConfig.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "password");
+ flinkConfig.setBoolean(SecurityOptions.SSL_INTERNAL_ENABLED, true);
+ flinkConfig.setString(SecurityOptions.SSL_INTERNAL_KEYSTORE, "src/test/resources/local127.keystore");
+ flinkConfig.setString(SecurityOptions.SSL_INTERNAL_KEYSTORE_PASSWORD, "password");
+ flinkConfig.setString(SecurityOptions.SSL_INTERNAL_KEY_PASSWORD, "password");
+ flinkConfig.setString(SecurityOptions.SSL_INTERNAL_TRUSTSTORE, "src/test/resources/local127.truststore");
+ flinkConfig.setString(SecurityOptions.SSL_INTERNAL_TRUSTSTORE_PASSWORD, "password");
return flinkConfig;
}
}