You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/16 07:03:13 UTC

[8/8] flink git commit: [FLINK-9314] [security] (part 4) Add mutual authentication for internal Netty and Blob Server connections

[FLINK-9314] [security] (part 4) Add mutual authentication for internal Netty and Blob Server connections

This closes #6326.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a502f827
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a502f827
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a502f827

Branch: refs/heads/master
Commit: a502f82777acc825aaea4de22ece6b4d247260b9
Parents: 3aeb00f
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Jul 12 21:18:46 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Jul 16 08:10:46 2018 +0200

----------------------------------------------------------------------
 .../runtime/io/network/netty/NettyConfig.java   |   4 +
 .../flink/runtime/net/SSLEngineFactory.java     |  10 +-
 .../org/apache/flink/runtime/net/SSLUtils.java  |   9 +-
 .../network/netty/NettyClientServerSslTest.java | 133 ++++++++++---------
 4 files changed, 90 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a502f827/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
index 46cdaab..59971e1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
@@ -213,6 +213,10 @@ public class NettyConfig {
 		return config.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
 	}
 
+	public Configuration getConfig() {
+		return config;
+	}
+
 	@Override
 	public String toString() {
 		String format = "NettyConfig [" +

http://git-wip-us.apache.org/repos/asf/flink/blob/a502f827/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLEngineFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLEngineFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLEngineFactory.java
index d842267..68f60b8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLEngineFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLEngineFactory.java
@@ -36,15 +36,20 @@ public class SSLEngineFactory {
 
 	private final boolean clientMode;
 
+	final boolean clientAuthentication;
+
 	public SSLEngineFactory(
 			final SSLContext sslContext,
 			final String[] enabledProtocols,
 			final String[] enabledCipherSuites,
-			final boolean clientMode) {
+			final boolean clientMode,
+			final boolean clientAuthentication) {
+
 		this.sslContext = requireNonNull(sslContext, "sslContext must not be null");
 		this.enabledProtocols = requireNonNull(enabledProtocols, "enabledProtocols must not be null");
 		this.enabledCipherSuites = requireNonNull(enabledCipherSuites, "cipherSuites must not be null");
 		this.clientMode = clientMode;
+		this.clientAuthentication = clientAuthentication;
 	}
 
 	public SSLEngine createSSLEngine() {
@@ -63,5 +68,8 @@ public class SSLEngineFactory {
 		sslEngine.setEnabledProtocols(enabledProtocols);
 		sslEngine.setEnabledCipherSuites(enabledCipherSuites);
 		sslEngine.setUseClientMode(clientMode);
+		if (!clientMode) {
+			sslEngine.setNeedClientAuth(clientAuthentication);
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a502f827/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
index 5c95535..d209f5f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
@@ -108,7 +108,8 @@ public class SSLUtils {
 				sslContext,
 				getEnabledProtocols(config),
 				getEnabledCipherSuites(config),
-				false);
+				false,
+				true);
 	}
 
 	/**
@@ -124,6 +125,7 @@ public class SSLUtils {
 				sslContext,
 				getEnabledProtocols(config),
 				getEnabledCipherSuites(config),
+				true,
 				true);
 	}
 
@@ -142,6 +144,7 @@ public class SSLUtils {
 				sslContext,
 				getEnabledProtocols(config),
 				getEnabledCipherSuites(config),
+				false,
 				false);
 	}
 
@@ -160,7 +163,8 @@ public class SSLUtils {
 				sslContext,
 				getEnabledProtocols(config),
 				getEnabledCipherSuites(config),
-				true);
+				true,
+				false);
 	}
 
 	private static String[] getEnabledProtocols(final Configuration config) {
@@ -352,6 +356,7 @@ public class SSLUtils {
 		private void configureServerSocket(SSLServerSocket socket) {
 			socket.setEnabledProtocols(protocols);
 			socket.setEnabledCipherSuites(cipherSuites);
+			socket.setNeedClientAuth(true);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a502f827/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
index 2750660..b1f3b48 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.io.network.netty;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.runtime.io.network.netty.NettyTestUtil.NettyServerAndClient;
+import org.apache.flink.runtime.net.SSLUtilsTest;
 import org.apache.flink.util.NetUtils;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
@@ -35,6 +37,10 @@ import java.net.InetAddress;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+/**
+ * Tests for the SSL connection between Netty Server and Client used for the
+ * data plane.
+ */
 public class NettyClientServerSslTest {
 
 	/**
@@ -42,24 +48,9 @@ public class NettyClientServerSslTest {
 	 */
 	@Test
 	public void testValidSslConnection() throws Exception {
-		NettyProtocol protocol = new NettyProtocol(null, null, true) {
-			@Override
-			public ChannelHandler[] getServerChannelHandlers() {
-				return new ChannelHandler[0];
-			}
-
-			@Override
-			public ChannelHandler[] getClientChannelHandlers() {
-				return new ChannelHandler[0];
-			}
-		};
-
-		NettyConfig nettyConfig = new NettyConfig(
-			InetAddress.getLoopbackAddress(),
-			NetUtils.getAvailablePort(),
-			NettyTestUtil.DEFAULT_SEGMENT_SIZE,
-			1,
-			createSslConfig());
+		NettyProtocol protocol = new NoOpProtocol();
+
+		NettyConfig nettyConfig = createNettyConfig(createSslConfig());
 
 		NettyTestUtil.NettyServerAndClient serverAndClient = NettyTestUtil.initServerAndClient(protocol, nettyConfig);
 
@@ -77,28 +68,13 @@ public class NettyClientServerSslTest {
 	 */
 	@Test
 	public void testInvalidSslConfiguration() throws Exception {
-		NettyProtocol protocol = new NettyProtocol(null, null, true) {
-			@Override
-			public ChannelHandler[] getServerChannelHandlers() {
-				return new ChannelHandler[0];
-			}
-
-			@Override
-			public ChannelHandler[] getClientChannelHandlers() {
-				return new ChannelHandler[0];
-			}
-		};
+		NettyProtocol protocol = new NoOpProtocol();
 
 		Configuration config = createSslConfig();
 		// Modify the keystore password to an incorrect one
-		config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "invalidpassword");
+		config.setString(SecurityOptions.SSL_INTERNAL_KEYSTORE_PASSWORD, "invalidpassword");
 
-		NettyConfig nettyConfig = new NettyConfig(
-			InetAddress.getLoopbackAddress(),
-			NetUtils.getAvailablePort(),
-			NettyTestUtil.DEFAULT_SEGMENT_SIZE,
-			1,
-			config);
+		NettyConfig nettyConfig = createNettyConfig(config);
 
 		NettyTestUtil.NettyServerAndClient serverAndClient = null;
 		try {
@@ -116,29 +92,14 @@ public class NettyClientServerSslTest {
 	 */
 	@Test
 	public void testSslHandshakeError() throws Exception {
-		NettyProtocol protocol = new NettyProtocol(null, null, true) {
-			@Override
-			public ChannelHandler[] getServerChannelHandlers() {
-				return new ChannelHandler[0];
-			}
-
-			@Override
-			public ChannelHandler[] getClientChannelHandlers() {
-				return new ChannelHandler[0];
-			}
-		};
+		NettyProtocol protocol = new NoOpProtocol();
 
 		Configuration config = createSslConfig();
 
 		// Use a server certificate which is not present in the truststore
-		config.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/untrusted.keystore");
+		config.setString(SecurityOptions.SSL_INTERNAL_KEYSTORE, "src/test/resources/untrusted.keystore");
 
-		NettyConfig nettyConfig = new NettyConfig(
-			InetAddress.getLoopbackAddress(),
-			NetUtils.getAvailablePort(),
-			NettyTestUtil.DEFAULT_SEGMENT_SIZE,
-			1,
-			config);
+		NettyConfig nettyConfig = createNettyConfig(config);
 
 		NettyTestUtil.NettyServerAndClient serverAndClient = NettyTestUtil.initServerAndClient(protocol, nettyConfig);
 
@@ -151,14 +112,60 @@ public class NettyClientServerSslTest {
 		NettyTestUtil.shutdown(serverAndClient);
 	}
 
-	private static Configuration createSslConfig() throws Exception {
-		Configuration flinkConfig = new Configuration();
-		flinkConfig.setBoolean(SecurityOptions.SSL_INTERNAL_ENABLED, true);
-		flinkConfig.setString(SecurityOptions.SSL_INTERNAL_KEYSTORE, "src/test/resources/local127.keystore");
-		flinkConfig.setString(SecurityOptions.SSL_INTERNAL_KEYSTORE_PASSWORD, "password");
-		flinkConfig.setString(SecurityOptions.SSL_INTERNAL_KEY_PASSWORD, "password");
-		flinkConfig.setString(SecurityOptions.SSL_INTERNAL_TRUSTSTORE, "src/test/resources/local127.truststore");
-		flinkConfig.setString(SecurityOptions.SSL_INTERNAL_TRUSTSTORE_PASSWORD, "password");
-		return flinkConfig;
+	@Test
+	public void testClientUntrustedCertificate() throws Exception {
+		final Configuration serverConfig = createSslConfig();
+		final Configuration clientConfig = createSslConfig();
+
+		// give the client a different keystore / certificate
+		clientConfig.setString(SecurityOptions.SSL_INTERNAL_KEYSTORE, "src/test/resources/untrusted.keystore");
+
+		final NettyConfig nettyServerConfig = createNettyConfig(serverConfig);
+		final NettyConfig nettyClientConfig = createNettyConfig(clientConfig);
+
+		final NettyBufferPool bufferPool = new NettyBufferPool(1);
+		final NettyProtocol protocol = new NoOpProtocol();
+
+		final NettyServer server = NettyTestUtil.initServer(nettyServerConfig, protocol, bufferPool);
+		final NettyClient client = NettyTestUtil.initClient(nettyClientConfig, protocol, bufferPool);
+		final NettyServerAndClient serverAndClient = new NettyServerAndClient(server, client);
+
+		final Channel ch = NettyTestUtil.connect(serverAndClient);
+		ch.pipeline().addLast(new StringDecoder()).addLast(new StringEncoder());
+
+		// Attempting to write data over ssl should fail
+		assertFalse(ch.writeAndFlush("test").await().isSuccess());
+
+		NettyTestUtil.shutdown(serverAndClient);
+	}
+
+	private static Configuration createSslConfig() {
+		return SSLUtilsTest.createInternalSslConfigWithKeyAndTrustStores();
+	}
+
+	private static NettyConfig createNettyConfig(Configuration config) {
+		return new NettyConfig(
+				InetAddress.getLoopbackAddress(),
+				NetUtils.getAvailablePort(),
+				NettyTestUtil.DEFAULT_SEGMENT_SIZE,
+				1,
+				config);
+	}
+
+	private static final class NoOpProtocol extends NettyProtocol {
+
+		NoOpProtocol() {
+			super(null, null, true);
+		}
+
+		@Override
+		public ChannelHandler[] getServerChannelHandlers() {
+			return new ChannelHandler[0];
+		}
+
+		@Override
+		public ChannelHandler[] getClientChannelHandlers() {
+			return new ChannelHandler[0];
+		}
 	}
 }