You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/03/17 11:14:18 UTC
flink git commit: [FLINK-5981] [security] Make SSL pick up configured
protocols and cipher suites
Repository: flink
Updated Branches:
refs/heads/master 78f22aaec -> e0614f655
[FLINK-5981] [security] Make SSL pick up configured protocols and cipher suites
This closes #3486
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e0614f65
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e0614f65
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e0614f65
Branch: refs/heads/master
Commit: e0614f6551a232706b74963563694486fe2461b1
Parents: 78f22aa
Author: WangTaoTheTonic <wa...@huawei.com>
Authored: Tue Mar 7 20:05:21 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Mar 17 12:02:18 2017 +0100
----------------------------------------------------------------------
docs/setup/config.md | 2 +-
.../flink/mesos/util/MesosArtifactServer.java | 2 +
.../runtime/webmonitor/WebRuntimeMonitor.java | 2 +
.../apache/flink/runtime/blob/BlobServer.java | 1 +
.../runtime/io/network/netty/NettyConfig.java | 5 +
.../runtime/io/network/netty/NettyServer.java | 1 +
.../org/apache/flink/runtime/net/SSLUtils.java | 39 ++++++++
.../apache/flink/runtime/net/SSLUtilsTest.java | 100 +++++++++++++++++++
.../flink/runtime/akka/AkkaSslITCase.scala | 27 +++++
9 files changed, 178 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e0614f65/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index c835882..d9d5f15 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -327,7 +327,7 @@ The following parameters configure Flink's JobManager and TaskManagers.
- `security.ssl.truststore-password`: The secret to decrypt the truststore.
-- `security.ssl.protocol`: The SSL protocol version to be supported for the ssl transport (DEFAULT: **TLSv1.2**).
+- `security.ssl.protocol`: The SSL protocol version to be supported for the ssl transport (DEFAULT: **TLSv1.2**). Note that it doesn't support comma separated list.
- `security.ssl.algorithms`: The comma separated list of standard SSL algorithms to be supported. Read more [here](http://docs.oracle.com/javase/8/docs/technotes/guides/security/StandardNames.html#ciphersuites) (DEFAULT: **TLS_RSA_WITH_AES_128_CBC_SHA**).
http://git-wip-us.apache.org/repos/asf/flink/blob/e0614f65/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
index 37cb260..ae826db 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
@@ -130,6 +130,7 @@ public class MesosArtifactServer implements MesosArtifactResolver {
router = new Router();
+ final Configuration sslConfig = config;
ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
@Override
@@ -139,6 +140,7 @@ public class MesosArtifactServer implements MesosArtifactResolver {
// SSL should be the first handler in the pipeline
if (serverSSLContext != null) {
SSLEngine sslEngine = serverSSLContext.createSSLEngine();
+ SSLUtils.setSSLVerAndCipherSuites(sslEngine, sslConfig);
sslEngine.setUseClientMode(false);
ch.pipeline().addLast("ssl", new SslHandler(sslEngine));
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e0614f65/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index e604ce8..d88fdcf 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -380,6 +380,7 @@ public class WebRuntimeMonitor implements WebMonitor {
LOG.warn("Error while adding shutdown hook", t);
}
+ final Configuration sslConfig = config;
ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
@Override
@@ -389,6 +390,7 @@ public class WebRuntimeMonitor implements WebMonitor {
// SSL should be the first handler in the pipeline
if (serverSSLContext != null) {
SSLEngine sslEngine = serverSSLContext.createSSLEngine();
+ SSLUtils.setSSLVerAndCipherSuites(sslEngine, sslConfig);
sslEngine.setUseClientMode(false);
ch.pipeline().addLast("ssl", new SslHandler(sslEngine));
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e0614f65/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index 5b00ae4..8a70559 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -168,6 +168,7 @@ public class BlobServer extends Thread implements BlobService {
if(socketAttempt == null) {
throw new IOException("Unable to allocate socket for blob server in specified port range: "+serverPortRange);
} else {
+ SSLUtils.setSSLVerAndCipherSuites(socketAttempt, config);
this.serverSocket = socketAttempt;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e0614f65/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
index 7b0da43..b9a1b90 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
@@ -25,6 +25,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLParameters;
import java.net.InetAddress;
@@ -236,6 +237,10 @@ public class NettyConfig {
&& SSLUtils.getSSLEnabled(config);
}
+ public void setSSLVerAndCipherSuites(SSLEngine engine) {
+ SSLUtils.setSSLVerAndCipherSuites(engine, config);
+ }
+
public void setSSLVerifyHostname(SSLParameters sslParams) {
SSLUtils.setSSLVerifyHostname(config, sslParams);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e0614f65/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
index 55d2b18..3cf14b8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
@@ -140,6 +140,7 @@ class NettyServer {
public void initChannel(SocketChannel channel) throws Exception {
if (serverSSLContext != null) {
SSLEngine sslEngine = serverSSLContext.createSSLEngine();
+ config.setSSLVerAndCipherSuites(sslEngine);
sslEngine.setUseClientMode(false);
channel.pipeline().addLast("ssl", new SslHandler(sslEngine));
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e0614f65/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
index fc38b5d..c2d7a7b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
@@ -27,10 +27,13 @@ import org.slf4j.LoggerFactory;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLParameters;
+import javax.net.ssl.SSLServerSocket;
import javax.net.ssl.TrustManagerFactory;
import java.io.File;
import java.io.FileInputStream;
+import java.net.ServerSocket;
import java.security.KeyStore;
/**
@@ -55,6 +58,42 @@ public class SSLUtils {
}
/**
+ * Sets SSl version and cipher suites for SSLServerSocket
+ * @param socket
+ * Socket to be handled
+ * @param config
+ * The application configuration
+ */
+ public static void setSSLVerAndCipherSuites(ServerSocket socket, Configuration config) {
+ if (socket instanceof SSLServerSocket) {
+ ((SSLServerSocket) socket).setEnabledProtocols(config.getString(
+ ConfigConstants.SECURITY_SSL_PROTOCOL,
+ ConfigConstants.DEFAULT_SECURITY_SSL_PROTOCOL).split(","));
+ ((SSLServerSocket) socket).setEnabledCipherSuites(config.getString(
+ ConfigConstants.SECURITY_SSL_ALGORITHMS,
+ ConfigConstants.DEFAULT_SECURITY_SSL_ALGORITHMS).split(","));
+ } else {
+ LOG.warn("Not a SSL socket, will skip setting tls version and cipher suites.");
+ }
+ }
+
+ /**
+ * Sets SSL version and cipher suites for SSLEngine
+ * @param engine
+ * SSLEngine to be handled
+ * @param config
+ * The application configuration
+ */
+ public static void setSSLVerAndCipherSuites(SSLEngine engine, Configuration config) {
+ engine.setEnabledProtocols(config.getString(
+ ConfigConstants.SECURITY_SSL_PROTOCOL,
+ ConfigConstants.DEFAULT_SECURITY_SSL_PROTOCOL).split(","));
+ engine.setEnabledCipherSuites(config.getString(
+ ConfigConstants.SECURITY_SSL_ALGORITHMS,
+ ConfigConstants.DEFAULT_SECURITY_SSL_ALGORITHMS).split(","));
+ }
+
+ /**
* Sets SSL options to verify peer's hostname in the certificate
*
* @param sslConfig
http://git-wip-us.apache.org/repos/asf/flink/blob/e0614f65/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
index 1137341..d28d693 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
@@ -23,6 +23,10 @@ import org.junit.Assert;
import org.junit.Test;
import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLServerSocket;
+import java.net.ServerSocket;
+import java.util.Random;
/*
* Tests for the SSL utilities
@@ -125,4 +129,100 @@ public class SSLUtilsTest {
}
}
+ /**
+ * Tests if SSL Server Context creation fails with bad SSL configuration
+ */
+ @Test
+ public void testCreateSSLServerContextWithMultiProtocols() {
+
+ Configuration serverConfig = new Configuration();
+ serverConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true);
+ serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore");
+ serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password");
+ serverConfig.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password");
+ serverConfig.setString(ConfigConstants.SECURITY_SSL_PROTOCOL, "TLSv1,TLSv1.2");
+
+ try {
+ SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig);
+ Assert.fail("SSL server context created even with multiple protocols set ");
+ } catch (Exception e) {
+ // Exception here is valid
+ }
+ }
+
+ /**
+ * Tests if SSLUtils set the right ssl version and cipher suites for SSLServerSocket
+ */
+ @Test
+ public void testSetSSLVersionAndCipherSuitesForSSLServerSocket() throws Exception {
+
+ Configuration serverConfig = new Configuration();
+ serverConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true);
+ serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore");
+ serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password");
+ serverConfig.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password");
+ serverConfig.setString(ConfigConstants.SECURITY_SSL_PROTOCOL, "TLSv1.1");
+ serverConfig.setString(ConfigConstants.SECURITY_SSL_ALGORITHMS, "TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_128_CBC_SHA256");
+
+ SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig);
+ ServerSocket socket = null;
+ try {
+ socket = serverContext.getServerSocketFactory().createServerSocket(0);
+
+ String[] protocols = ((SSLServerSocket) socket).getEnabledProtocols();
+ String[] algorithms = ((SSLServerSocket) socket).getEnabledCipherSuites();
+
+ Assert.assertNotEquals(1, protocols.length);
+ Assert.assertNotEquals(2, algorithms.length);
+
+ SSLUtils.setSSLVerAndCipherSuites(socket, serverConfig);
+ protocols = ((SSLServerSocket) socket).getEnabledProtocols();
+ algorithms = ((SSLServerSocket) socket).getEnabledCipherSuites();
+
+ Assert.assertEquals(1, protocols.length);
+ Assert.assertEquals("TLSv1.1", protocols[0]);
+ Assert.assertEquals(2, algorithms.length);
+ Assert.assertTrue(algorithms[0].equals("TLS_RSA_WITH_AES_128_CBC_SHA") || algorithms[0].equals("TLS_RSA_WITH_AES_128_CBC_SHA256"));
+ Assert.assertTrue(algorithms[1].equals("TLS_RSA_WITH_AES_128_CBC_SHA") || algorithms[1].equals("TLS_RSA_WITH_AES_128_CBC_SHA256"));
+ } finally {
+ if (socket != null) {
+ socket.close();
+ }
+ }
+ }
+
+ /**
+ * Tests if SSLUtils set the right ssl version and cipher suites for SSLEngine
+ */
+ @Test
+ public void testSetSSLVersionAndCipherSuitesForSSLEngine() throws Exception {
+
+ Configuration serverConfig = new Configuration();
+ serverConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true);
+ serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore");
+ serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password");
+ serverConfig.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password");
+ serverConfig.setString(ConfigConstants.SECURITY_SSL_PROTOCOL, "TLSv1");
+ serverConfig.setString(ConfigConstants.SECURITY_SSL_ALGORITHMS, "TLS_DHE_RSA_WITH_AES_128_CBC_SHA,TLS_DHE_RSA_WITH_AES_128_CBC_SHA256");
+
+ SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig);
+ SSLEngine engine = serverContext.createSSLEngine();
+
+ String[] protocols = engine.getEnabledProtocols();
+ String[] algorithms = engine.getEnabledCipherSuites();
+
+ Assert.assertNotEquals(1, protocols.length);
+ Assert.assertNotEquals(2, algorithms.length);
+
+ SSLUtils.setSSLVerAndCipherSuites(engine, serverConfig);
+ protocols = engine.getEnabledProtocols();
+ algorithms = engine.getEnabledCipherSuites();
+
+ Assert.assertEquals(1, protocols.length);
+ Assert.assertEquals("TLSv1", protocols[0]);
+ Assert.assertEquals(2, algorithms.length);
+ Assert.assertTrue(algorithms[0].equals("TLS_DHE_RSA_WITH_AES_128_CBC_SHA") || algorithms[0].equals("TLS_DHE_RSA_WITH_AES_128_CBC_SHA256"));
+ Assert.assertTrue(algorithms[1].equals("TLS_DHE_RSA_WITH_AES_128_CBC_SHA") || algorithms[1].equals("TLS_DHE_RSA_WITH_AES_128_CBC_SHA256"));
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e0614f65/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
index 0f6509c..9f8e3e1 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
@@ -71,6 +71,33 @@ class AkkaSslITCase(_system: ActorSystem)
assert(cluster.running)
}
+ "Failed to start ssl enabled akka with two protocols set" in {
+
+ an[Exception] should be thrownBy {
+
+ val config = new Configuration()
+ config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "127.0.0.1")
+ config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "127.0.0.1")
+ config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
+ config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
+
+ config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true)
+ config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE,
+ getClass.getResource("/local127.keystore").getPath)
+ config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password")
+ config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password")
+ config.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE,
+ getClass.getResource("/local127.truststore").getPath)
+
+ config.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, "password")
+ config.setString(ConfigConstants.SECURITY_SSL_ALGORITHMS, "TLSv1,TLSv1.1")
+
+ val cluster = new TestingCluster(config, false)
+
+ cluster.start(true)
+ }
+ }
+
"start with akka ssl disabled" in {
val config = new Configuration()