You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rd...@apache.org on 2020/09/24 20:38:20 UTC

[pulsar] branch master updated: Always use SNI for TLS enabled Pulsar Java client. (#8117)

This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f2933f7  Always use SNI for TLS enabled Pulsar Java client. (#8117)
f2933f7 is described below

commit f2933f7da4850814d92fac0e54c5314c51c8fc32
Author: Rolf Arne Corneliussen <ra...@users.noreply.github.com>
AuthorDate: Thu Sep 24 22:37:51 2020 +0200

    Always use SNI for TLS enabled Pulsar Java client. (#8117)
    
    Co-authored-by: Rolf Arne Corneliussen <ro...@addsecure.com>
---
 .../org/apache/pulsar/client/api/TlsSniTest.java   | 66 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ConnectionPool.java  | 56 +++++++-----------
 .../client/impl/PulsarChannelInitializer.java      | 58 +++++++++----------
 .../util/keystoretls/KeyStoreSSLContext.java       | 10 +++-
 4 files changed, 124 insertions(+), 66 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsSniTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsSniTest.java
new file mode 100644
index 0000000..fc8c242
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsSniTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.net.InetAddress;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+import org.testng.annotations.Test;
+
+import lombok.Cleanup;
+
+public class TlsSniTest extends TlsProducerConsumerBase {
+
+    /**
+     * Verify that using an IP-address in the broker service URL will work with using the SNI capabilities
+     * of the client. If we try to create an {@link javax.net.ssl.SSLEngine} with a peer host that is an
+     * IP address, the peer host is ignored, see for example
+     * {@link io.netty.handler.ssl.ReferenceCountedOpenSslEngine}.
+     *
+     */
+    @Test
+    public void testIpAddressInBrokerServiceUrl() throws Exception {
+        String topicName = "persistent://my-property/use/my-ns/my-topic1";
+
+        URI brokerServiceUrlTls = new URI(pulsar.getBrokerServiceUrlTls());
+
+        String brokerServiceIpAddressUrl = String.format("pulsar+ssl://%s:%d",
+                    InetAddress.getByName(brokerServiceUrlTls.getHost()).getHostAddress(),
+                    brokerServiceUrlTls.getPort());
+
+        ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(brokerServiceIpAddressUrl)
+                .tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH).allowTlsInsecureConnection(false)
+                .enableTlsHostnameVerification(false)
+                .operationTimeout(1000, TimeUnit.MILLISECONDS);
+        Map<String, String> authParams = new HashMap<>();
+        authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
+        authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
+        clientBuilder.authentication(AuthenticationTls.class.getName(), authParams);
+
+        @Cleanup
+        PulsarClient pulsarClient = clientBuilder.build();
+        // should be able to create producer successfully
+        pulsarClient.newProducer().topic(topicName).create();
+    }
+}
+
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index 32d7195..6203990 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -90,7 +90,7 @@ public class ConnectionPool implements Closeable {
         bootstrap.option(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT);
 
         try {
-            channelInitializerHandler = new PulsarChannelInitializer(conf, clientCnxSupplier, isSniProxy);
+            channelInitializerHandler = new PulsarChannelInitializer(conf, clientCnxSupplier);
             bootstrap.handler(channelInitializerHandler);
         } catch (Exception e) {
             log.error("Failed to create channel initializer");
@@ -293,39 +293,12 @@ public class ConnectionPool implements Closeable {
      * Attempt to establish a TCP connection to an already resolved single IP address
      */
     private CompletableFuture<Channel> connectToAddress(InetAddress ipAddress, int port, InetSocketAddress sniHost) {
-        CompletableFuture<Channel> future = new CompletableFuture<>();
-        // if proxy is configured in pulsar-client then make it thread-safe while updating channelInitializerHandler
-        if (isSniProxy) {
-            bootstrap.register().addListener((ChannelFuture cf) -> {
-                if (!cf.isSuccess()) {
-                    future.completeExceptionally(cf.cause());
-                    return;
-                }
-                Channel channel = cf.channel();
-                try {
-                    channelInitializerHandler.initChannel(channel, sniHost);
-                    channel.connect(new InetSocketAddress(ipAddress, port)).addListener((ChannelFuture channelFuture) -> {
-                        if (channelFuture.isSuccess()) {
-                            future.complete(channelFuture.channel());
-                        } else {
-                            future.completeExceptionally(channelFuture.cause());
-                        }
-                    });
-                } catch (Exception e) {
-                    log.warn("Failed to initialize channel with {}, {}", ipAddress, sniHost, e);
-                    future.completeExceptionally(e);
-                }
-            });
-        } else {
-            bootstrap.connect(ipAddress, port).addListener((ChannelFuture channelFuture) -> {
-                if (channelFuture.isSuccess()) {
-                    future.complete(channelFuture.channel());
-                } else {
-                    future.completeExceptionally(channelFuture.cause());
-                }
-            });
-        }
-        return future;
+        InetSocketAddress remoteAddress = new InetSocketAddress(ipAddress, port);
+        return adapt(bootstrap.register())
+                .thenCompose(channel -> clientConfig.isUseTls()
+                        ? channelInitializerHandler.initTls(channel, sniHost != null ? sniHost : remoteAddress)
+                        : CompletableFuture.completedFuture(channel))
+                .thenCompose(channel -> adapt(channel.connect(remoteAddress)));
     }
 
     public void releaseConnection(ClientCnx cnx) {
@@ -364,7 +337,7 @@ public class ConnectionPool implements Closeable {
     }
 
     public static int signSafeMod(long dividend, int divisor) {
-        int mod = (int) (dividend % (long) divisor);
+        int mod = (int) (dividend % divisor);
         if (mod < 0) {
             mod += divisor;
         }
@@ -372,4 +345,17 @@ public class ConnectionPool implements Closeable {
     }
 
     private static final Logger log = LoggerFactory.getLogger(ConnectionPool.class);
+
+    private static CompletableFuture<Channel> adapt(ChannelFuture channelFuture) {
+        CompletableFuture<Channel> adapter = new CompletableFuture<>();
+        channelFuture.addListener((ChannelFuture cf) ->{
+            if (cf.isSuccess()) {
+                adapter.complete(channelFuture.channel());
+            } else {
+                adapter.completeExceptionally(channelFuture.cause());
+            }
+        });
+        return adapter;
+    }
 }
+
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
index 043151c..f50bed5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
@@ -19,7 +19,7 @@
 package org.apache.pulsar.client.impl;
 
 import java.net.InetSocketAddress;
-import java.security.cert.X509Certificate;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 
@@ -52,17 +52,15 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel>
 
     private final Supplier<SslContext> sslContextSupplier;
     private NettySSLContextAutoRefreshBuilder nettySSLContextAutoRefreshBuilder;
-    private final boolean isSniProxyEnabled;
 
     private static final long TLS_CERTIFICATE_CACHE_MILLIS = TimeUnit.MINUTES.toMillis(1);
 
-    public PulsarChannelInitializer(ClientConfigurationData conf, Supplier<ClientCnx> clientCnxSupplier, boolean isSniProxyEnabled)
+    public PulsarChannelInitializer(ClientConfigurationData conf, Supplier<ClientCnx> clientCnxSupplier)
             throws Exception {
         super();
         this.clientCnxSupplier = clientCnxSupplier;
         this.tlsEnabled = conf.isUseTls();
         this.tlsEnabledWithKeyStore = conf.isUseKeyStoreTls();
-        this.isSniProxyEnabled = isSniProxyEnabled;
 
         if (tlsEnabled) {
             if (tlsEnabledWithKeyStore) {
@@ -88,10 +86,10 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel>
                         return authData.getTlsTrustStoreStream() == null
                                 ? SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection(),
                                         conf.getTlsTrustCertsFilePath(),
-                                        (X509Certificate[]) authData.getTlsCertificates(), authData.getTlsPrivateKey())
+                                        authData.getTlsCertificates(), authData.getTlsPrivateKey())
                                 : SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection(),
                                         authData.getTlsTrustStoreStream(),
-                                        (X509Certificate[]) authData.getTlsCertificates(), authData.getTlsPrivateKey());
+                                        authData.getTlsCertificates(), authData.getTlsPrivateKey());
                     } else {
                         return SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection(),
                                 conf.getTlsTrustCertsFilePath());
@@ -107,33 +105,35 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel>
 
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
-        /**
-         * skip initializing channel if sni-proxy is enabled in that case {@link ConnectionPool} will initialize the
-         * channel explicitly.
-         */
-        if (!isSniProxyEnabled) {
-            initChannel(ch, null);
-        }
-    }
 
-    public void initChannel(Channel ch, InetSocketAddress sniHost) throws Exception {
-        if (tlsEnabled) {
-            if (tlsEnabledWithKeyStore) {
-                ch.pipeline().addLast(TLS_HANDLER,
-                        new SslHandler(nettySSLContextAutoRefreshBuilder.get().createSSLEngine()));
-            } else {
-                SslHandler handler = sniHost != null
-                        ? sslContextSupplier.get().newHandler(ch.alloc(), sniHost.getHostName(), sniHost.getPort())
-                        : sslContextSupplier.get().newHandler(ch.alloc());
-                ch.pipeline().addLast(TLS_HANDLER, handler);
-            }
-            ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.COPYING_ENCODER);
-        } else {
-            ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
-        }
+        // Setup channel except for the SsHandler for TLS enabled connections
+
+        ch.pipeline().addLast("ByteBufPairEncoder", tlsEnabled ? ByteBufPair.COPYING_ENCODER : ByteBufPair.ENCODER);
 
         ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
                 Commands.DEFAULT_MAX_MESSAGE_SIZE + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
         ch.pipeline().addLast("handler", clientCnxSupplier.get());
     }
+
+    CompletableFuture<Channel> initTls(Channel ch, InetSocketAddress sniHost) {
+        if (!tlsEnabled) {
+            throw new IllegalStateException("TLS is not enabled in client configuration");
+        }
+        CompletableFuture<Channel> initTlsFuture = new CompletableFuture<>();
+        ch.eventLoop().execute(() -> {
+            try {
+                SslHandler handler = tlsEnabledWithKeyStore
+                        ? new SslHandler(nettySSLContextAutoRefreshBuilder.get()
+                                .createSSLEngine(sniHost.getHostString(), sniHost.getPort()))
+                        : sslContextSupplier.get().newHandler(ch.alloc(), sniHost.getHostString(), sniHost.getPort());
+                ch.pipeline().addFirst(TLS_HANDLER, handler);
+                initTlsFuture.complete(ch);
+            } catch (Throwable t) {
+                initTlsFuture.completeExceptionally(t);
+            }
+        });
+
+        return initTlsFuture;
+    }
 }
+
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/KeyStoreSSLContext.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/KeyStoreSSLContext.java
index b9ad2e7..736d189 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/KeyStoreSSLContext.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/KeyStoreSSLContext.java
@@ -166,8 +166,14 @@ public class KeyStoreSSLContext {
     }
 
     public SSLEngine createSSLEngine() {
-        SSLEngine sslEngine = sslContext.createSSLEngine();
+        return configureSSLEngine(sslContext.createSSLEngine());
+    }
+
+    public SSLEngine createSSLEngine(String peerHost, int peerPort) {
+        return configureSSLEngine(sslContext.createSSLEngine(peerHost, peerPort));
+    }
 
+    private SSLEngine configureSSLEngine(SSLEngine sslEngine) {
         sslEngine.setEnabledProtocols(sslEngine.getSupportedProtocols());
         sslEngine.setEnabledCipherSuites(sslEngine.getSupportedCipherSuites());
 
@@ -177,7 +183,6 @@ public class KeyStoreSSLContext {
         } else {
             sslEngine.setUseClientMode(true);
         }
-
         return sslEngine;
     }
 
@@ -353,3 +358,4 @@ public class KeyStoreSSLContext {
         return sslCtxFactory;
     }
 }
+