You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2022/03/29 22:12:34 UTC

[kafka] branch 3.1 updated: KAFKA-13418: Support key updates with TLS 1.3 (#11966)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.1 by this push:
     new e714cb5  KAFKA-13418: Support key updates with TLS 1.3 (#11966)
e714cb5 is described below

commit e714cb5dbb92bf3905550daca0053a18ec53426d
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Tue Mar 29 14:59:38 2022 -0700

    KAFKA-13418: Support key updates with TLS 1.3 (#11966)
    
    Key updates with TLS 1.3 trigger code paths similar to renegotiation with TLS 1.2.
    Update the read/write paths not to throw an exception in this case (kept the exception
    in the `handshake` method).
    
    With the default configuration, key updates happen after 2^37 bytes are encrypted.
    There is a security property to adjust this configuration, but the change has to be
    done before it is used for the first time and it cannot be changed after that. As such,
    it is best done via a system test (filed KAFKA-13779).
    
    To validate the change, I wrote a unit test that forces key updates and manually ran
    a producer workload that produced more than 2^37 bytes. Both cases failed without
    these changes and pass with them.
    
    Note that Shylaja Kokoori attached a patch with the SslTransportLayer fix and hence
    included them as a co-author of this change.
    
    Reviewers: Rajini Sivaram <ra...@googlemail.com>
    
    Co-authored-by: Shylaja Kokoori
---
 .../kafka/common/network/SslTransportLayer.java    | 16 ++--
 .../apache/kafka/common/network/SelectorTest.java  |  5 --
 .../kafka/common/network/SslSelectorTest.java      | 44 ++---------
 .../kafka/common/network/Tls12SelectorTest.java    | 72 +++++++++++++++++
 .../kafka/common/network/Tls13SelectorTest.java    | 92 ++++++++++++++++++++++
 5 files changed, 180 insertions(+), 49 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
index b9879ad..d276e99 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
@@ -71,6 +71,8 @@ public class SslTransportLayer implements TransportLayer {
         CLOSING
     }
 
+    private static final String TLS13 = "TLSv1.3";
+
     private final String channelId;
     private final SSLEngine sslEngine;
     private final SelectionKey key;
@@ -446,7 +448,7 @@ public class SslTransportLayer implements TransportLayer {
             if (netWriteBuffer.hasRemaining())
                 key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
             else {
-                state = sslEngine.getSession().getProtocol().equals("TLSv1.3") ? State.POST_HANDSHAKE : State.READY;
+                state = sslEngine.getSession().getProtocol().equals(TLS13) ? State.POST_HANDSHAKE : State.READY;
                 key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
                 SSLSession session = sslEngine.getSession();
                 log.debug("SSL handshake completed successfully with peerHost '{}' peerPort {} peerPrincipal '{}' cipherSuite '{}'",
@@ -578,10 +580,11 @@ public class SslTransportLayer implements TransportLayer {
                         throw e;
                 }
                 netReadBuffer.compact();
-                // handle ssl renegotiation.
+                // reject renegotiation if TLS < 1.3, key updates for TLS 1.3 are allowed
                 if (unwrapResult.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING &&
                         unwrapResult.getHandshakeStatus() != HandshakeStatus.FINISHED &&
-                        unwrapResult.getStatus() == Status.OK) {
+                        unwrapResult.getStatus() == Status.OK &&
+                        !sslEngine.getSession().getProtocol().equals(TLS13)) {
                     log.error("Renegotiation requested, but it is not supported, channelId {}, " +
                         "appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {} handshakeStatus {}", channelId,
                         appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position(), unwrapResult.getHandshakeStatus());
@@ -699,9 +702,12 @@ public class SslTransportLayer implements TransportLayer {
             SSLEngineResult wrapResult = sslEngine.wrap(src, netWriteBuffer);
             netWriteBuffer.flip();
 
-            //handle ssl renegotiation
-            if (wrapResult.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING && wrapResult.getStatus() == Status.OK)
+            // reject renegotiation if TLS < 1.3, key updates for TLS 1.3 are allowed
+            if (wrapResult.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING &&
+                    wrapResult.getStatus() == Status.OK &&
+                    !sslEngine.getSession().getProtocol().equals(TLS13)) {
                 throw renegotiationException();
+            }
 
             if (wrapResult.getStatus() == Status.OK) {
                 written += wrapResult.bytesConsumed();
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index f276cd4..43b0956 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -110,10 +110,6 @@ public class SelectorTest {
         }
     }
 
-    public SecurityProtocol securityProtocol() {
-        return SecurityProtocol.PLAINTEXT;
-    }
-
     protected Map<String, Object> clientConfigs() {
         return new HashMap<>();
     }
@@ -1015,7 +1011,6 @@ public class SelectorTest {
 
     private String blockingRequest(String node, String s) throws IOException {
         selector.send(createSend(node, s));
-        selector.poll(1000L);
         while (true) {
             selector.poll(1000L);
             for (NetworkReceive receive : selector.completedReceives())
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
index 7f95566..0ddfce6 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.network;
 
 import java.nio.channels.SelectionKey;
+import java.security.GeneralSecurityException;
 import javax.net.ssl.SSLEngine;
 
 import org.apache.kafka.common.config.SecurityConfig;
@@ -43,11 +44,9 @@ import java.net.InetSocketAddress;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 import java.security.Security;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -61,7 +60,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 /**
  * A set of tests for the selector. These use a test harness that runs a simple socket server that echos back responses.
  */
-public class SslSelectorTest extends SelectorTest {
+public abstract class SslSelectorTest extends SelectorTest {
 
     private Map<String, Object> sslClientConfigs;
 
@@ -73,7 +72,7 @@ public class SslSelectorTest extends SelectorTest {
         this.server = new EchoServer(SecurityProtocol.SSL, sslServerConfigs);
         this.server.start();
         this.time = new MockTime();
-        sslClientConfigs = TestSslUtils.createSslConfig(false, false, Mode.CLIENT, trustStoreFile, "client");
+        sslClientConfigs = createSslClientConfigs(trustStoreFile);
         LogContext logContext = new LogContext();
         this.channelBuilder = new SslChannelBuilder(Mode.CLIENT, null, false, logContext);
         this.channelBuilder.configure(sslClientConfigs);
@@ -81,6 +80,8 @@ public class SslSelectorTest extends SelectorTest {
         this.selector = new Selector(5000, metrics, time, "MetricGroup", channelBuilder, logContext);
     }
 
+    protected abstract Map<String, Object> createSslClientConfigs(File trustStoreFile) throws GeneralSecurityException, IOException;
+
     @AfterEach
     public void tearDown() throws Exception {
         this.selector.close();
@@ -89,18 +90,12 @@ public class SslSelectorTest extends SelectorTest {
     }
 
     @Override
-    public SecurityProtocol securityProtocol() {
-        return SecurityProtocol.PLAINTEXT;
-    }
-
-    @Override
     protected Map<String, Object> clientConfigs() {
         return sslClientConfigs;
     }
 
     @Test
     public void testConnectionWithCustomKeyManager() throws Exception {
-
         TestProviderCreator testProviderCreator = new TestProviderCreator();
 
         int requestSize = 100 * 1024;
@@ -249,35 +244,6 @@ public class SslSelectorTest extends SelectorTest {
         verifySelectorEmpty();
     }
 
-    /**
-     * Renegotiation is not supported since it is potentially unsafe and it has been removed in TLS 1.3
-     */
-    @Test
-    public void testRenegotiationFails() throws Exception {
-        String node = "0";
-        // create connections
-        InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
-        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
-
-        // send echo requests and receive responses
-        while (!selector.isChannelReady(node)) {
-            selector.poll(1000L);
-        }
-        selector.send(createSend(node, node + "-" + 0));
-        selector.poll(0L);
-        server.renegotiate();
-        selector.send(createSend(node, node + "-" + 1));
-        long expiryTime = System.currentTimeMillis() + 2000;
-
-        List<String> disconnected = new ArrayList<>();
-        while (!disconnected.contains(node) && System.currentTimeMillis() < expiryTime) {
-            selector.poll(10);
-            disconnected.addAll(selector.disconnected().keySet());
-        }
-        assertTrue(disconnected.contains(node), "Renegotiation should cause disconnection");
-
-    }
-
     @Override
     @Test
     public void testMuteOnOOM() throws Exception {
diff --git a/clients/src/test/java/org/apache/kafka/common/network/Tls12SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/Tls12SelectorTest.java
new file mode 100644
index 0000000..59903b5
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/network/Tls12SelectorTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.network;
+
+import static java.util.Arrays.asList;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.test.TestSslUtils;
+import org.junit.jupiter.api.Test;
+
+public class Tls12SelectorTest extends SslSelectorTest {
+
+    @Override
+    protected Map<String, Object> createSslClientConfigs(File trustStoreFile)
+        throws GeneralSecurityException, IOException {
+        Map<String, Object> configs = TestSslUtils.createSslConfig(false, false, Mode.CLIENT,
+            trustStoreFile, "client");
+        configs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, asList("TLSv1.2"));
+        return configs;
+    }
+
+    /**
+     * Renegotiation is not supported when TLS 1.2 is used (renegotiation was removed from TLS 1.3)
+     */
+    @Test
+    public void testRenegotiationFails() throws Exception {
+        String node = "0";
+        // create connections
+        InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
+        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+
+        // send echo requests and receive responses
+        while (!selector.isChannelReady(node)) {
+            selector.poll(1000L);
+        }
+        selector.send(createSend(node, node + "-" + 0));
+        selector.poll(0L);
+        server.renegotiate();
+        selector.send(createSend(node, node + "-" + 1));
+        long expiryTime = System.currentTimeMillis() + 2000;
+
+        List<String> disconnected = new ArrayList<>();
+        while (!disconnected.contains(node) && System.currentTimeMillis() < expiryTime) {
+            selector.poll(10);
+            disconnected.addAll(selector.disconnected().keySet());
+        }
+        assertTrue(disconnected.contains(node), "Renegotiation should cause disconnection");
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/network/Tls13SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/Tls13SelectorTest.java
new file mode 100644
index 0000000..afae3e2
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/network/Tls13SelectorTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.network;
+
+import static java.util.Arrays.asList;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.test.TestSslUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledForJreRange;
+import org.junit.jupiter.api.condition.JRE;
+
+@EnabledForJreRange(min = JRE.JAVA_11) // TLS 1.3 is only supported with Java 11 and newer
+public class Tls13SelectorTest extends SslSelectorTest {
+
+    @Override
+    protected Map<String, Object> createSslClientConfigs(File trustStoreFile) throws GeneralSecurityException, IOException {
+        Map<String, Object> configs = TestSslUtils.createSslConfig(false, false, Mode.CLIENT,
+            trustStoreFile, "client");
+        configs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, asList("TLSv1.3"));
+        return configs;
+    }
+
+    /**
+     * TLS 1.3 has a post-handshake key and IV update, which will update the sending and receiving keys
+     * for one side of the connection.
+     *
+     * Key Usage Limits will trigger an update when the algorithm limits are reached, but the default
+     * value is too large (2^37 bytes of plaintext data) for a unit test. This value can be overridden
+     * via the security property `jdk.tls.keyLimits`, but that's also difficult to achieve in a unit
+     * test.
+     *
+     * Applications can also trigger an update by calling `SSLSocket.startHandshake()` or
+     * `SSLEngine.beginHandshake()` (this would trigger `renegotiation` with TLS 1.2) and that's the
+     * approach we take here.
+     */
+    @Test
+    public void testKeyUpdate() throws Exception {
+        String node = "0";
+        // create connections
+        InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
+        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+        // send echo requests and receive responses
+        while (!selector.isChannelReady(node)) {
+            selector.poll(1000L);
+        }
+        selector.send(createSend(node, node + "-" + 0));
+        selector.poll(0L);
+        server.renegotiate();
+        selector.send(createSend(node,  node + "-" + 1));
+        List<NetworkReceive> received = new ArrayList<>();
+        TestUtils.waitForCondition(() -> {
+            try {
+                selector.poll(1000L);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+            for (NetworkReceive receive : selector.completedReceives()) {
+                if (receive.source().equals(node))
+                    received.add(receive);
+            }
+            return received.size() == 2;
+        }, "Expected two receives, got " + received.size());
+
+        assertEquals(asList("0-0", "0-1"), received.stream().map(this::asString).collect(Collectors.toList()));
+    }
+}