You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/16 07:03:13 UTC
[8/8] flink git commit: [FLINK-9314] [security] (part 4) Add mutual
authentication for internal Netty and Blob Server connections
[FLINK-9314] [security] (part 4) Add mutual authentication for internal Netty and Blob Server connections
This closes #6326.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a502f827
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a502f827
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a502f827
Branch: refs/heads/master
Commit: a502f82777acc825aaea4de22ece6b4d247260b9
Parents: 3aeb00f
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Jul 12 21:18:46 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Jul 16 08:10:46 2018 +0200
----------------------------------------------------------------------
.../runtime/io/network/netty/NettyConfig.java | 4 +
.../flink/runtime/net/SSLEngineFactory.java | 10 +-
.../org/apache/flink/runtime/net/SSLUtils.java | 9 +-
.../network/netty/NettyClientServerSslTest.java | 133 ++++++++++---------
4 files changed, 90 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a502f827/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
index 46cdaab..59971e1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
@@ -213,6 +213,10 @@ public class NettyConfig {
return config.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
}
+ public Configuration getConfig() {
+ return config;
+ }
+
@Override
public String toString() {
String format = "NettyConfig [" +
http://git-wip-us.apache.org/repos/asf/flink/blob/a502f827/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLEngineFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLEngineFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLEngineFactory.java
index d842267..68f60b8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLEngineFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLEngineFactory.java
@@ -36,15 +36,20 @@ public class SSLEngineFactory {
private final boolean clientMode;
+ final boolean clientAuthentication;
+
public SSLEngineFactory(
final SSLContext sslContext,
final String[] enabledProtocols,
final String[] enabledCipherSuites,
- final boolean clientMode) {
+ final boolean clientMode,
+ final boolean clientAuthentication) {
+
this.sslContext = requireNonNull(sslContext, "sslContext must not be null");
this.enabledProtocols = requireNonNull(enabledProtocols, "enabledProtocols must not be null");
this.enabledCipherSuites = requireNonNull(enabledCipherSuites, "cipherSuites must not be null");
this.clientMode = clientMode;
+ this.clientAuthentication = clientAuthentication;
}
public SSLEngine createSSLEngine() {
@@ -63,5 +68,8 @@ public class SSLEngineFactory {
sslEngine.setEnabledProtocols(enabledProtocols);
sslEngine.setEnabledCipherSuites(enabledCipherSuites);
sslEngine.setUseClientMode(clientMode);
+ if (!clientMode) {
+ sslEngine.setNeedClientAuth(clientAuthentication);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a502f827/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
index 5c95535..d209f5f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
@@ -108,7 +108,8 @@ public class SSLUtils {
sslContext,
getEnabledProtocols(config),
getEnabledCipherSuites(config),
- false);
+ false,
+ true);
}
/**
@@ -124,6 +125,7 @@ public class SSLUtils {
sslContext,
getEnabledProtocols(config),
getEnabledCipherSuites(config),
+ true,
true);
}
@@ -142,6 +144,7 @@ public class SSLUtils {
sslContext,
getEnabledProtocols(config),
getEnabledCipherSuites(config),
+ false,
false);
}
@@ -160,7 +163,8 @@ public class SSLUtils {
sslContext,
getEnabledProtocols(config),
getEnabledCipherSuites(config),
- true);
+ true,
+ false);
}
private static String[] getEnabledProtocols(final Configuration config) {
@@ -352,6 +356,7 @@ public class SSLUtils {
private void configureServerSocket(SSLServerSocket socket) {
socket.setEnabledProtocols(protocols);
socket.setEnabledCipherSuites(cipherSuites);
+ socket.setNeedClientAuth(true);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a502f827/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
index 2750660..b1f3b48 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.io.network.netty;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.runtime.io.network.netty.NettyTestUtil.NettyServerAndClient;
+import org.apache.flink.runtime.net.SSLUtilsTest;
import org.apache.flink.util.NetUtils;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
@@ -35,6 +37,10 @@ import java.net.InetAddress;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+/**
+ * Tests for the SSL connection between Netty Server and Client used for the
+ * data plane.
+ */
public class NettyClientServerSslTest {
/**
@@ -42,24 +48,9 @@ public class NettyClientServerSslTest {
*/
@Test
public void testValidSslConnection() throws Exception {
- NettyProtocol protocol = new NettyProtocol(null, null, true) {
- @Override
- public ChannelHandler[] getServerChannelHandlers() {
- return new ChannelHandler[0];
- }
-
- @Override
- public ChannelHandler[] getClientChannelHandlers() {
- return new ChannelHandler[0];
- }
- };
-
- NettyConfig nettyConfig = new NettyConfig(
- InetAddress.getLoopbackAddress(),
- NetUtils.getAvailablePort(),
- NettyTestUtil.DEFAULT_SEGMENT_SIZE,
- 1,
- createSslConfig());
+ NettyProtocol protocol = new NoOpProtocol();
+
+ NettyConfig nettyConfig = createNettyConfig(createSslConfig());
NettyTestUtil.NettyServerAndClient serverAndClient = NettyTestUtil.initServerAndClient(protocol, nettyConfig);
@@ -77,28 +68,13 @@ public class NettyClientServerSslTest {
*/
@Test
public void testInvalidSslConfiguration() throws Exception {
- NettyProtocol protocol = new NettyProtocol(null, null, true) {
- @Override
- public ChannelHandler[] getServerChannelHandlers() {
- return new ChannelHandler[0];
- }
-
- @Override
- public ChannelHandler[] getClientChannelHandlers() {
- return new ChannelHandler[0];
- }
- };
+ NettyProtocol protocol = new NoOpProtocol();
Configuration config = createSslConfig();
// Modify the keystore password to an incorrect one
- config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "invalidpassword");
+ config.setString(SecurityOptions.SSL_INTERNAL_KEYSTORE_PASSWORD, "invalidpassword");
- NettyConfig nettyConfig = new NettyConfig(
- InetAddress.getLoopbackAddress(),
- NetUtils.getAvailablePort(),
- NettyTestUtil.DEFAULT_SEGMENT_SIZE,
- 1,
- config);
+ NettyConfig nettyConfig = createNettyConfig(config);
NettyTestUtil.NettyServerAndClient serverAndClient = null;
try {
@@ -116,29 +92,14 @@ public class NettyClientServerSslTest {
*/
@Test
public void testSslHandshakeError() throws Exception {
- NettyProtocol protocol = new NettyProtocol(null, null, true) {
- @Override
- public ChannelHandler[] getServerChannelHandlers() {
- return new ChannelHandler[0];
- }
-
- @Override
- public ChannelHandler[] getClientChannelHandlers() {
- return new ChannelHandler[0];
- }
- };
+ NettyProtocol protocol = new NoOpProtocol();
Configuration config = createSslConfig();
// Use a server certificate which is not present in the truststore
- config.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/untrusted.keystore");
+ config.setString(SecurityOptions.SSL_INTERNAL_KEYSTORE, "src/test/resources/untrusted.keystore");
- NettyConfig nettyConfig = new NettyConfig(
- InetAddress.getLoopbackAddress(),
- NetUtils.getAvailablePort(),
- NettyTestUtil.DEFAULT_SEGMENT_SIZE,
- 1,
- config);
+ NettyConfig nettyConfig = createNettyConfig(config);
NettyTestUtil.NettyServerAndClient serverAndClient = NettyTestUtil.initServerAndClient(protocol, nettyConfig);
@@ -151,14 +112,60 @@ public class NettyClientServerSslTest {
NettyTestUtil.shutdown(serverAndClient);
}
- private static Configuration createSslConfig() throws Exception {
- Configuration flinkConfig = new Configuration();
- flinkConfig.setBoolean(SecurityOptions.SSL_INTERNAL_ENABLED, true);
- flinkConfig.setString(SecurityOptions.SSL_INTERNAL_KEYSTORE, "src/test/resources/local127.keystore");
- flinkConfig.setString(SecurityOptions.SSL_INTERNAL_KEYSTORE_PASSWORD, "password");
- flinkConfig.setString(SecurityOptions.SSL_INTERNAL_KEY_PASSWORD, "password");
- flinkConfig.setString(SecurityOptions.SSL_INTERNAL_TRUSTSTORE, "src/test/resources/local127.truststore");
- flinkConfig.setString(SecurityOptions.SSL_INTERNAL_TRUSTSTORE_PASSWORD, "password");
- return flinkConfig;
+ @Test
+ public void testClientUntrustedCertificate() throws Exception {
+ final Configuration serverConfig = createSslConfig();
+ final Configuration clientConfig = createSslConfig();
+
+ // give the client a different keystore / certificate
+ clientConfig.setString(SecurityOptions.SSL_INTERNAL_KEYSTORE, "src/test/resources/untrusted.keystore");
+
+ final NettyConfig nettyServerConfig = createNettyConfig(serverConfig);
+ final NettyConfig nettyClientConfig = createNettyConfig(clientConfig);
+
+ final NettyBufferPool bufferPool = new NettyBufferPool(1);
+ final NettyProtocol protocol = new NoOpProtocol();
+
+ final NettyServer server = NettyTestUtil.initServer(nettyServerConfig, protocol, bufferPool);
+ final NettyClient client = NettyTestUtil.initClient(nettyClientConfig, protocol, bufferPool);
+ final NettyServerAndClient serverAndClient = new NettyServerAndClient(server, client);
+
+ final Channel ch = NettyTestUtil.connect(serverAndClient);
+ ch.pipeline().addLast(new StringDecoder()).addLast(new StringEncoder());
+
+ // Attempting to write data over ssl should fail
+ assertFalse(ch.writeAndFlush("test").await().isSuccess());
+
+ NettyTestUtil.shutdown(serverAndClient);
+ }
+
+ private static Configuration createSslConfig() {
+ return SSLUtilsTest.createInternalSslConfigWithKeyAndTrustStores();
+ }
+
+ private static NettyConfig createNettyConfig(Configuration config) {
+ return new NettyConfig(
+ InetAddress.getLoopbackAddress(),
+ NetUtils.getAvailablePort(),
+ NettyTestUtil.DEFAULT_SEGMENT_SIZE,
+ 1,
+ config);
+ }
+
+ private static final class NoOpProtocol extends NettyProtocol {
+
+ NoOpProtocol() {
+ super(null, null, true);
+ }
+
+ @Override
+ public ChannelHandler[] getServerChannelHandlers() {
+ return new ChannelHandler[0];
+ }
+
+ @Override
+ public ChannelHandler[] getClientChannelHandlers() {
+ return new ChannelHandler[0];
+ }
}
}