You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/03/17 11:14:18 UTC

flink git commit: [FLINK-5981] [security] Make SSL pick up configured protocols and cipher suites

Repository: flink
Updated Branches:
  refs/heads/master 78f22aaec -> e0614f655


[FLINK-5981] [security] Make SSL pick up configured protocols and cipher suites

This closes #3486


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e0614f65
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e0614f65
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e0614f65

Branch: refs/heads/master
Commit: e0614f6551a232706b74963563694486fe2461b1
Parents: 78f22aa
Author: WangTaoTheTonic <wa...@huawei.com>
Authored: Tue Mar 7 20:05:21 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Mar 17 12:02:18 2017 +0100

----------------------------------------------------------------------
 docs/setup/config.md                            |   2 +-
 .../flink/mesos/util/MesosArtifactServer.java   |   2 +
 .../runtime/webmonitor/WebRuntimeMonitor.java   |   2 +
 .../apache/flink/runtime/blob/BlobServer.java   |   1 +
 .../runtime/io/network/netty/NettyConfig.java   |   5 +
 .../runtime/io/network/netty/NettyServer.java   |   1 +
 .../org/apache/flink/runtime/net/SSLUtils.java  |  39 ++++++++
 .../apache/flink/runtime/net/SSLUtilsTest.java  | 100 +++++++++++++++++++
 .../flink/runtime/akka/AkkaSslITCase.scala      |  27 +++++
 9 files changed, 178 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e0614f65/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index c835882..d9d5f15 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -327,7 +327,7 @@ The following parameters configure Flink's JobManager and TaskManagers.
 
 - `security.ssl.truststore-password`: The secret to decrypt the truststore.
 
-- `security.ssl.protocol`: The SSL protocol version to be supported for the ssl transport (DEFAULT: **TLSv1.2**).
+- `security.ssl.protocol`: The SSL protocol version to be supported for the ssl transport (DEFAULT: **TLSv1.2**). Note that it doesn't support comma separated list.
 
 - `security.ssl.algorithms`: The comma separated list of standard SSL algorithms to be supported. Read more [here](http://docs.oracle.com/javase/8/docs/technotes/guides/security/StandardNames.html#ciphersuites) (DEFAULT: **TLS_RSA_WITH_AES_128_CBC_SHA**).
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e0614f65/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
index 37cb260..ae826db 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
@@ -130,6 +130,7 @@ public class MesosArtifactServer implements MesosArtifactResolver {
 
 		router = new Router();
 
+		final Configuration sslConfig = config;
 		ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
 
 			@Override
@@ -139,6 +140,7 @@ public class MesosArtifactServer implements MesosArtifactResolver {
 				// SSL should be the first handler in the pipeline
 				if (serverSSLContext != null) {
 					SSLEngine sslEngine = serverSSLContext.createSSLEngine();
+					SSLUtils.setSSLVerAndCipherSuites(sslEngine, sslConfig);
 					sslEngine.setUseClientMode(false);
 					ch.pipeline().addLast("ssl", new SslHandler(sslEngine));
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/e0614f65/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index e604ce8..d88fdcf 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -380,6 +380,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 			LOG.warn("Error while adding shutdown hook", t);
 		}
 
+		final Configuration sslConfig = config;
 		ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
 
 			@Override
@@ -389,6 +390,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 				// SSL should be the first handler in the pipeline
 				if (serverSSLContext != null) {
 					SSLEngine sslEngine = serverSSLContext.createSSLEngine();
+					SSLUtils.setSSLVerAndCipherSuites(sslEngine, sslConfig);
 					sslEngine.setUseClientMode(false);
 					ch.pipeline().addLast("ssl", new SslHandler(sslEngine));
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/e0614f65/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index 5b00ae4..8a70559 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -168,6 +168,7 @@ public class BlobServer extends Thread implements BlobService {
 		if(socketAttempt == null) {
 			throw new IOException("Unable to allocate socket for blob server in specified port range: "+serverPortRange);
 		} else {
+			SSLUtils.setSSLVerAndCipherSuites(socketAttempt, config);
 			this.serverSocket = socketAttempt;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e0614f65/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
index 7b0da43..b9a1b90 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
@@ -25,6 +25,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLParameters;
 import java.net.InetAddress;
 
@@ -236,6 +237,10 @@ public class NettyConfig {
 			&& SSLUtils.getSSLEnabled(config);
 	}
 
+	public void setSSLVerAndCipherSuites(SSLEngine engine) {
+		SSLUtils.setSSLVerAndCipherSuites(engine, config);
+	}
+
 	public void setSSLVerifyHostname(SSLParameters sslParams) {
 		SSLUtils.setSSLVerifyHostname(config, sslParams);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e0614f65/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
index 55d2b18..3cf14b8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
@@ -140,6 +140,7 @@ class NettyServer {
 			public void initChannel(SocketChannel channel) throws Exception {
 				if (serverSSLContext != null) {
 					SSLEngine sslEngine = serverSSLContext.createSSLEngine();
+					config.setSSLVerAndCipherSuites(sslEngine);
 					sslEngine.setUseClientMode(false);
 					channel.pipeline().addLast("ssl", new SslHandler(sslEngine));
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/e0614f65/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
index fc38b5d..c2d7a7b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
@@ -27,10 +27,13 @@ import org.slf4j.LoggerFactory;
 
 import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLParameters;
+import javax.net.ssl.SSLServerSocket;
 import javax.net.ssl.TrustManagerFactory;
 import java.io.File;
 import java.io.FileInputStream;
+import java.net.ServerSocket;
 import java.security.KeyStore;
 
 /**
@@ -55,6 +58,42 @@ public class SSLUtils {
 	}
 
 	/**
+	 * Sets SSl version and cipher suites for SSLServerSocket
+	 * @param socket
+	 *        Socket to be handled
+	 * @param config
+	 *        The application configuration
+	 */
+	public static void setSSLVerAndCipherSuites(ServerSocket socket, Configuration config) {
+		if (socket instanceof SSLServerSocket) {
+			((SSLServerSocket) socket).setEnabledProtocols(config.getString(
+				ConfigConstants.SECURITY_SSL_PROTOCOL,
+				ConfigConstants.DEFAULT_SECURITY_SSL_PROTOCOL).split(","));
+			((SSLServerSocket) socket).setEnabledCipherSuites(config.getString(
+				ConfigConstants.SECURITY_SSL_ALGORITHMS,
+				ConfigConstants.DEFAULT_SECURITY_SSL_ALGORITHMS).split(","));
+		} else {
+			LOG.warn("Not a SSL socket, will skip setting tls version and cipher suites.");
+		}
+	}
+
+	/**
+	 * Sets SSL version and cipher suites for SSLEngine
+	 * @param engine
+	 *        SSLEngine to be handled
+	 * @param config
+	 *        The application configuration
+	 */
+	public static void setSSLVerAndCipherSuites(SSLEngine engine, Configuration config) {
+		engine.setEnabledProtocols(config.getString(
+			ConfigConstants.SECURITY_SSL_PROTOCOL,
+			ConfigConstants.DEFAULT_SECURITY_SSL_PROTOCOL).split(","));
+		engine.setEnabledCipherSuites(config.getString(
+			ConfigConstants.SECURITY_SSL_ALGORITHMS,
+			ConfigConstants.DEFAULT_SECURITY_SSL_ALGORITHMS).split(","));
+	}
+
+	/**
 	 * Sets SSL options to verify peer's hostname in the certificate
 	 *
 	 * @param sslConfig

http://git-wip-us.apache.org/repos/asf/flink/blob/e0614f65/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
index 1137341..d28d693 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
@@ -23,6 +23,10 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLServerSocket;
+import java.net.ServerSocket;
+import java.util.Random;
 
 /*
  * Tests for the SSL utilities
@@ -125,4 +129,100 @@ public class SSLUtilsTest {
 		}
 	}
 
+	/**
+	 * Tests if SSL Server Context creation fails with bad SSL configuration
+	 */
+	@Test
+	public void testCreateSSLServerContextWithMultiProtocols() {
+
+		Configuration serverConfig = new Configuration();
+		serverConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true);
+		serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore");
+		serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password");
+		serverConfig.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password");
+		serverConfig.setString(ConfigConstants.SECURITY_SSL_PROTOCOL, "TLSv1,TLSv1.2");
+
+		try {
+			SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig);
+			Assert.fail("SSL server context created even with multiple protocols set ");
+		} catch (Exception e) {
+			// Exception here is valid
+		}
+	}
+
+	/**
+	 * Tests if SSLUtils set the right ssl version and cipher suites for SSLServerSocket
+	 */
+	@Test
+	public void testSetSSLVersionAndCipherSuitesForSSLServerSocket() throws Exception {
+
+		Configuration serverConfig = new Configuration();
+		serverConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true);
+		serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore");
+		serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password");
+		serverConfig.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password");
+		serverConfig.setString(ConfigConstants.SECURITY_SSL_PROTOCOL, "TLSv1.1");
+		serverConfig.setString(ConfigConstants.SECURITY_SSL_ALGORITHMS, "TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_128_CBC_SHA256");
+
+		SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig);
+		ServerSocket socket = null;
+		try {
+			socket = serverContext.getServerSocketFactory().createServerSocket(0);
+
+			String[] protocols = ((SSLServerSocket) socket).getEnabledProtocols();
+			String[] algorithms = ((SSLServerSocket) socket).getEnabledCipherSuites();
+
+			Assert.assertNotEquals(1, protocols.length);
+			Assert.assertNotEquals(2, algorithms.length);
+
+			SSLUtils.setSSLVerAndCipherSuites(socket, serverConfig);
+			protocols = ((SSLServerSocket) socket).getEnabledProtocols();
+			algorithms = ((SSLServerSocket) socket).getEnabledCipherSuites();
+
+			Assert.assertEquals(1, protocols.length);
+			Assert.assertEquals("TLSv1.1", protocols[0]);
+			Assert.assertEquals(2, algorithms.length);
+			Assert.assertTrue(algorithms[0].equals("TLS_RSA_WITH_AES_128_CBC_SHA") || algorithms[0].equals("TLS_RSA_WITH_AES_128_CBC_SHA256"));
+			Assert.assertTrue(algorithms[1].equals("TLS_RSA_WITH_AES_128_CBC_SHA") || algorithms[1].equals("TLS_RSA_WITH_AES_128_CBC_SHA256"));
+		} finally {
+			if (socket != null) {
+				socket.close();
+			}
+		}
+	}
+
+	/**
+	 * Tests if SSLUtils set the right ssl version and cipher suites for SSLEngine
+	 */
+	@Test
+	public void testSetSSLVersionAndCipherSuitesForSSLEngine() throws Exception {
+
+		Configuration serverConfig = new Configuration();
+		serverConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true);
+		serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore");
+		serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password");
+		serverConfig.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password");
+		serverConfig.setString(ConfigConstants.SECURITY_SSL_PROTOCOL, "TLSv1");
+		serverConfig.setString(ConfigConstants.SECURITY_SSL_ALGORITHMS, "TLS_DHE_RSA_WITH_AES_128_CBC_SHA,TLS_DHE_RSA_WITH_AES_128_CBC_SHA256");
+
+		SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig);
+		SSLEngine engine = serverContext.createSSLEngine();
+
+		String[] protocols = engine.getEnabledProtocols();
+		String[] algorithms = engine.getEnabledCipherSuites();
+
+		Assert.assertNotEquals(1, protocols.length);
+		Assert.assertNotEquals(2, algorithms.length);
+
+		SSLUtils.setSSLVerAndCipherSuites(engine, serverConfig);
+		protocols = engine.getEnabledProtocols();
+		algorithms = engine.getEnabledCipherSuites();
+
+		Assert.assertEquals(1, protocols.length);
+		Assert.assertEquals("TLSv1", protocols[0]);
+		Assert.assertEquals(2, algorithms.length);
+		Assert.assertTrue(algorithms[0].equals("TLS_DHE_RSA_WITH_AES_128_CBC_SHA") || algorithms[0].equals("TLS_DHE_RSA_WITH_AES_128_CBC_SHA256"));
+		Assert.assertTrue(algorithms[1].equals("TLS_DHE_RSA_WITH_AES_128_CBC_SHA") || algorithms[1].equals("TLS_DHE_RSA_WITH_AES_128_CBC_SHA256"));
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e0614f65/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
index 0f6509c..9f8e3e1 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
@@ -71,6 +71,33 @@ class AkkaSslITCase(_system: ActorSystem)
       assert(cluster.running)
     }
 
+    "Failed to start ssl enabled akka with two protocols set" in {
+
+      an[Exception] should be thrownBy {
+
+        val config = new Configuration()
+        config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "127.0.0.1")
+        config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "127.0.0.1")
+        config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
+        config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
+
+        config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true)
+        config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE,
+          getClass.getResource("/local127.keystore").getPath)
+        config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password")
+        config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password")
+        config.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE,
+          getClass.getResource("/local127.truststore").getPath)
+
+        config.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, "password")
+        config.setString(ConfigConstants.SECURITY_SSL_ALGORITHMS, "TLSv1,TLSv1.1")
+
+        val cluster = new TestingCluster(config, false)
+
+        cluster.start(true)
+      }
+    }
+
     "start with akka ssl disabled" in {
 
       val config = new Configuration()