You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ma...@apache.org on 2021/05/11 15:14:06 UTC

[kafka] branch 2.8 updated: KAFKA-8562; SaslChannelBuilder - Avoid (reverse) DNS lookup while building SslTransportLayer (#10059)

This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new bfc7ade  KAFKA-8562; SaslChannelBuilder - Avoid (reverse) DNS lookup while building SslTransportLayer (#10059)
bfc7ade is described below

commit bfc7adee8049d87b01ae63d198fc8043e8f42874
Author: Davor Poldrugo <dp...@users.noreply.github.com>
AuthorDate: Fri Feb 26 02:02:40 2021 +0100

    KAFKA-8562; SaslChannelBuilder - Avoid (reverse) DNS lookup while building SslTransportLayer (#10059)
    
    This patch moves the `peerHost` helper defined in `SslChannelBuilder` into `SslFactor`. `SaslChannelBuilder` is then updated to use a new `createSslEngine` overload which relies on `peerHost` when building its `SslEngine`. The purpose is to avoid the reverse DNS in `getHostName`.
    
    Reviewers: Ismael Juma <is...@juma.me.uk>, Manikumar Reddy <ma...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
 .../kafka/common/network/SaslChannelBuilder.java   |  3 +-
 .../kafka/common/network/SslChannelBuilder.java    | 48 ++--------------------
 .../kafka/common/security/ssl/SslFactory.java      | 48 ++++++++++++++++++++++
 .../kafka/common/network/SslSelectorTest.java      |  4 +-
 .../common/network/SslTransportLayerTest.java      |  4 +-
 .../SaslAuthenticatorFailureDelayTest.java         |  2 +-
 .../authenticator/SaslAuthenticatorTest.java       | 10 ++---
 7 files changed, 62 insertions(+), 57 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
index 17988db..8b390d1 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
@@ -256,8 +256,7 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl
                                                  ChannelMetadataRegistry metadataRegistry) throws IOException {
         if (this.securityProtocol == SecurityProtocol.SASL_SSL) {
             return SslTransportLayer.create(id, key,
-                sslFactory.createSslEngine(socketChannel.socket().getInetAddress().getHostName(),
-                    socketChannel.socket().getPort()),
+                sslFactory.createSslEngine(socketChannel.socket()),
                 metadataRegistry);
         } else {
             return new PlaintextTransportLayer(key);
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
index 909009b..1140ea7 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
@@ -33,7 +33,6 @@ import org.slf4j.Logger;
 import java.io.Closeable;
 import java.io.IOException;
 import java.net.InetAddress;
-import java.net.InetSocketAddress;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
 import java.util.Map;
@@ -103,8 +102,7 @@ public class SslChannelBuilder implements ChannelBuilder, ListenerReconfigurable
     public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize,
                                      MemoryPool memoryPool, ChannelMetadataRegistry metadataRegistry) throws KafkaException {
         try {
-            SslTransportLayer transportLayer = buildTransportLayer(sslFactory, id, key,
-                peerHost(key), metadataRegistry);
+            SslTransportLayer transportLayer = buildTransportLayer(sslFactory, id, key, metadataRegistry);
             Supplier<Authenticator> authenticatorCreator = () ->
                 new SslAuthenticator(configs, transportLayer, listenerName, sslPrincipalMapper);
             return new KafkaChannel(id, transportLayer, authenticatorCreator, maxReceiveSize,
@@ -120,53 +118,13 @@ public class SslChannelBuilder implements ChannelBuilder, ListenerReconfigurable
         if (sslFactory != null) sslFactory.close();
     }
 
-    protected SslTransportLayer buildTransportLayer(SslFactory sslFactory, String id, SelectionKey key,
-                                                    String host, ChannelMetadataRegistry metadataRegistry) throws IOException {
+    protected SslTransportLayer buildTransportLayer(SslFactory sslFactory, String id, SelectionKey key, ChannelMetadataRegistry metadataRegistry) throws IOException {
         SocketChannel socketChannel = (SocketChannel) key.channel();
-        return SslTransportLayer.create(id, key, sslFactory.createSslEngine(host, socketChannel.socket().getPort()),
+        return SslTransportLayer.create(id, key, sslFactory.createSslEngine(socketChannel.socket()),
             metadataRegistry);
     }
 
     /**
-     * Returns host/IP address of remote host without reverse DNS lookup to be used as the host
-     * for creating SSL engine. This is used as a hint for session reuse strategy and also for
-     * hostname verification of server hostnames.
-     * <p>
-     * Scenarios:
-     * <ul>
-     *   <li>Server-side
-     *   <ul>
-     *     <li>Server accepts connection from a client. Server knows only client IP
-     *     address. We want to avoid reverse DNS lookup of the client IP address since the server
-     *     does not verify or use client hostname. The IP address can be used directly.</li>
-     *   </ul>
-     *   </li>
-     *   <li>Client-side
-     *   <ul>
-     *     <li>Client connects to server using hostname. No lookup is necessary
-     *     and the hostname should be used to create the SSL engine. This hostname is validated
-     *     against the hostname in SubjectAltName (dns) or CommonName in the certificate if
-     *     hostname verification is enabled. Authentication fails if hostname does not match.</li>
-     *     <li>Client connects to server using IP address, but certificate contains only
-     *     SubjectAltName (dns). Use of reverse DNS lookup to determine hostname introduces
-     *     a security vulnerability since authentication would be reliant on a secure DNS.
-     *     Hence hostname verification should fail in this case.</li>
-     *     <li>Client connects to server using IP address and certificate contains
-     *     SubjectAltName (ipaddress). This could be used when Kafka is on a private network.
-     *     If reverse DNS lookup is used, authentication would succeed using IP address if lookup
-     *     fails and IP address is used, but authentication would fail if lookup succeeds and
-     *     dns name is used. For consistency and to avoid dependency on a potentially insecure
-     *     DNS, reverse DNS lookup should be avoided and the IP address specified by the client for
-     *     connection should be used to create the SSL engine.</li>
-     *   </ul></li>
-     * </ul>
-     */
-    private String peerHost(SelectionKey key) {
-        SocketChannel socketChannel = (SocketChannel) key.channel();
-        return new InetSocketAddress(socketChannel.socket().getInetAddress(), 0).getHostString();
-    }
-
-    /**
      * Note that client SSL authentication is handled in {@link SslTransportLayer}. This class is only used
      * to transform the derived principal using a {@link KafkaPrincipalBuilder} configured by the user.
      */
diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
index 91b23b6..d0cc4cc 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
@@ -31,6 +31,8 @@ import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLEngineResult;
 import javax.net.ssl.SSLException;
 import java.io.Closeable;
+import java.net.InetSocketAddress;
+import java.net.Socket;
 import java.nio.ByteBuffer;
 import java.security.cert.Certificate;
 import java.security.cert.X509Certificate;
@@ -183,6 +185,14 @@ public class SslFactory implements Reconfigurable, Closeable {
         }
     }
 
+    public SSLEngine createSslEngine(Socket socket) {
+        return createSslEngine(peerHost(socket), socket.getPort());
+    }
+
+    /**
+     * Prefer `createSslEngine(Socket)` if a `Socket` instance is available. If using this overload,
+     * avoid reverse DNS resolution in the computation of `peerHost`.
+     */
     public SSLEngine createSslEngine(String peerHost, int peerPort) {
         if (sslEngineFactory == null) {
             throw new IllegalStateException("SslFactory has not been configured.");
@@ -194,6 +204,44 @@ public class SslFactory implements Reconfigurable, Closeable {
         }
     }
 
+    /**
+     * Returns host/IP address of remote host without reverse DNS lookup to be used as the host
+     * for creating SSL engine. This is used as a hint for session reuse strategy and also for
+     * hostname verification of server hostnames.
+     * <p>
+     * Scenarios:
+     * <ul>
+     *   <li>Server-side
+     *   <ul>
+     *     <li>Server accepts connection from a client. Server knows only client IP
+     *     address. We want to avoid reverse DNS lookup of the client IP address since the server
+     *     does not verify or use client hostname. The IP address can be used directly.</li>
+     *   </ul>
+     *   </li>
+     *   <li>Client-side
+     *   <ul>
+     *     <li>Client connects to server using hostname. No lookup is necessary
+     *     and the hostname should be used to create the SSL engine. This hostname is validated
+     *     against the hostname in SubjectAltName (dns) or CommonName in the certificate if
+     *     hostname verification is enabled. Authentication fails if hostname does not match.</li>
+     *     <li>Client connects to server using IP address, but certificate contains only
+     *     SubjectAltName (dns). Use of reverse DNS lookup to determine hostname introduces
+     *     a security vulnerability since authentication would be reliant on a secure DNS.
+     *     Hence hostname verification should fail in this case.</li>
+     *     <li>Client connects to server using IP address and certificate contains
+     *     SubjectAltName (ipaddress). This could be used when Kafka is on a private network.
+     *     If reverse DNS lookup is used, authentication would succeed using IP address if lookup
+     *     fails and IP address is used, but authentication would fail if lookup succeeds and
+     *     dns name is used. For consistency and to avoid dependency on a potentially insecure
+     *     DNS, reverse DNS lookup should be avoided and the IP address specified by the client for
+     *     connection should be used to create the SSL engine.</li>
+     *   </ul></li>
+     * </ul>
+     */
+    private String peerHost(Socket socket) {
+        return new InetSocketAddress(socket.getInetAddress(), 0).getHostString();
+    }
+
     public SslEngineFactory sslEngineFactory() {
         return sslEngineFactory;
     }
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
index 0e94744..7f95566 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
@@ -383,9 +383,9 @@ public class SslSelectorTest extends SelectorTest {
 
         @Override
         protected SslTransportLayer buildTransportLayer(SslFactory sslFactory, String id, SelectionKey key,
-                                                        String host, ChannelMetadataRegistry metadataRegistry) throws IOException {
+                                                        ChannelMetadataRegistry metadataRegistry) throws IOException {
             SocketChannel socketChannel = (SocketChannel) key.channel();
-            SSLEngine sslEngine = sslFactory.createSslEngine(host, socketChannel.socket().getPort());
+            SSLEngine sslEngine = sslFactory.createSslEngine(socketChannel.socket());
             TestSslTransportLayer transportLayer = new TestSslTransportLayer(id, key, sslEngine, metadataRegistry);
             return transportLayer;
         }
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
index 13f7632..4418713 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
@@ -1368,9 +1368,9 @@ public class SslTransportLayerTest {
 
         @Override
         protected SslTransportLayer buildTransportLayer(SslFactory sslFactory, String id, SelectionKey key,
-                                                        String host, ChannelMetadataRegistry metadataRegistry) throws IOException {
+                                                        ChannelMetadataRegistry metadataRegistry) throws IOException {
             SocketChannel socketChannel = (SocketChannel) key.channel();
-            SSLEngine sslEngine = sslFactory.createSslEngine(host, socketChannel.socket().getPort());
+            SSLEngine sslEngine = sslFactory.createSslEngine(socketChannel.socket());
             return newTransportLayer(id, key, sslEngine);
         }
 
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorFailureDelayTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorFailureDelayTest.java
index 7dfde12..db6ba89 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorFailureDelayTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorFailureDelayTest.java
@@ -219,7 +219,7 @@ public abstract class SaslAuthenticatorFailureDelayTest {
 
     private void createClientConnection(SecurityProtocol securityProtocol, String node) throws Exception {
         createSelector(securityProtocol, saslClientConfigs);
-        InetSocketAddress addr = new InetSocketAddress("127.0.0.1", server.port());
+        InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index 713f817..60c8dbf 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -258,7 +258,7 @@ public class SaslAuthenticatorTest {
         SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
         server = createEchoServer(securityProtocol);
         createSelector(securityProtocol, saslClientConfigs);
-        InetSocketAddress addr = new InetSocketAddress("127.0.0.1", server.port());
+        InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
         try {
             selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
             fail("SASL/PLAIN channel created without username");
@@ -282,7 +282,7 @@ public class SaslAuthenticatorTest {
         SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
         server = createEchoServer(securityProtocol);
         createSelector(securityProtocol, saslClientConfigs);
-        InetSocketAddress addr = new InetSocketAddress("127.0.0.1", server.port());
+        InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
         try {
             selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
             fail("SASL/PLAIN channel created without password");
@@ -399,7 +399,7 @@ public class SaslAuthenticatorTest {
             saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "DIGEST-MD5");
             createSelector(securityProtocol, saslClientConfigs);
             selector2 = selector;
-            InetSocketAddress addr = new InetSocketAddress("127.0.0.1", server.port());
+            InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
             selector.connect(node2, addr, BUFFER_SIZE, BUFFER_SIZE);
             NetworkTestUtils.checkClientConnection(selector, node2, 100, 10);
             selector = null; // keeps it from being closed when next one is created
@@ -409,7 +409,7 @@ public class SaslAuthenticatorTest {
             saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
             createSelector(securityProtocol, saslClientConfigs);
             selector3 = selector;
-            selector.connect(node3, new InetSocketAddress("127.0.0.1", server.port()), BUFFER_SIZE, BUFFER_SIZE);
+            selector.connect(node3, new InetSocketAddress("localhost", server.port()), BUFFER_SIZE, BUFFER_SIZE);
             NetworkTestUtils.checkClientConnection(selector, node3, 100, 10);
             server.verifyAuthenticationMetrics(3, 0);
             
@@ -2016,7 +2016,7 @@ public class SaslAuthenticatorTest {
         };
         clientChannelBuilder.configure(saslClientConfigs);
         this.selector = NetworkTestUtils.createSelector(clientChannelBuilder, time);
-        InetSocketAddress addr = new InetSocketAddress("127.0.0.1", server.port());
+        InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
     }