You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rd...@apache.org on 2020/09/21 22:39:32 UTC

[pulsar] branch master updated: [pulsar-client] sni-proxy protocol should pass sni-host address without resolving (#8062)

This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a7e30d8  [pulsar-client] sni-proxy protocol should pass sni-host address without resolving (#8062)
a7e30d8 is described below

commit a7e30d8150f626e75d8a17980e6e1019de659a5d
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Mon Sep 21 15:39:09 2020 -0700

    [pulsar-client] sni-proxy protocol should pass sni-host address without resolving (#8062)
    
    make sni-proxy connection creation thread-safe
    
    remove unused pair
    
    initialize channel explicitly when sni-proxy is configured
    
    initialize channel in io thread
    
    fix channel var
---
 .../pulsar/client/api/ProxyProtocolTest.java       |   3 +-
 .../apache/pulsar/client/impl/ConnectionPool.java  | 105 +++++++++++----------
 .../client/impl/PulsarChannelInitializer.java      |  41 ++++----
 3 files changed, 79 insertions(+), 70 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProxyProtocolTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProxyProtocolTest.java
index 964ebd9..d264b83 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProxyProtocolTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProxyProtocolTest.java
@@ -40,7 +40,7 @@ public class ProxyProtocolTest extends TlsProducerConsumerBase {
 
         // Client should try to connect to proxy and pass broker-url as SNI header
         String proxyUrl = pulsar.getBrokerServiceUrlTls();
-        String brokerServiceUrl = "pulsar+ssl://1.1.1.1:6651";
+        String brokerServiceUrl = "pulsar+ssl://unresolvable-address:6651";
         String topicName = "persistent://my-property/use/my-ns/my-topic1";
 
         ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(brokerServiceUrl)
@@ -53,7 +53,6 @@ public class ProxyProtocolTest extends TlsProducerConsumerBase {
 
         @Cleanup
         PulsarClient pulsarClient = clientBuilder.build();
-
         // should be able to create producer successfully
         pulsarClient.newProducer().topic(topicName).create();
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index 316145e..32d7195 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -19,7 +19,6 @@
 package org.apache.pulsar.client.impl;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
 
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.Channel;
@@ -37,7 +36,6 @@ import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.net.UnknownHostException;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -66,6 +64,7 @@ public class ConnectionPool implements Closeable {
     private final ClientConfigurationData clientConfig;
     private final EventLoopGroup eventLoopGroup;
     private final int maxConnectionsPerHosts;
+    private final boolean isSniProxy;
 
     protected final DnsNameResolver dnsResolver;
 
@@ -78,6 +77,8 @@ public class ConnectionPool implements Closeable {
         this.eventLoopGroup = eventLoopGroup;
         this.clientConfig = conf;
         this.maxConnectionsPerHosts = conf.getConnectionsPerBroker();
+        this.isSniProxy = clientConfig.isUseTls() && clientConfig.getProxyProtocol() != null
+                && StringUtils.isNotBlank(clientConfig.getProxyServiceUrl());
 
         pool = new ConcurrentHashMap<>();
         bootstrap = new Bootstrap();
@@ -89,7 +90,7 @@ public class ConnectionPool implements Closeable {
         bootstrap.option(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT);
 
         try {
-            channelInitializerHandler = new PulsarChannelInitializer(conf, clientCnxSupplier);
+            channelInitializerHandler = new PulsarChannelInitializer(conf, clientCnxSupplier, isSniProxy);
             bootstrap.handler(channelInitializerHandler);
         } catch (Exception e) {
             log.error("Failed to create channel initializer");
@@ -224,18 +225,24 @@ public class ConnectionPool implements Closeable {
      * Resolve DNS asynchronously and attempt to connect to any IP address returned by DNS server
      */
     private CompletableFuture<Channel> createConnection(InetSocketAddress unresolvedAddress) {
-        String hostname = unresolvedAddress.getHostString();
-        int port = unresolvedAddress.getPort();
+        int port;
+        CompletableFuture<List<InetAddress>> resolvedAddress = null;
         try {
-            // For non-sni-proxy: Resolve DNS --> Attempt to connect to all IP addresses until once succeeds
-            CompletableFuture<List<InetAddress>> resolvedAddress = isSniProxy()
-                    ? CompletableFuture.completedFuture(Lists.newArrayList(InetAddress.getByName(hostname)))
-                    : resolveName(hostname);
-            return resolvedAddress
-                    .thenCompose(inetAddresses -> connectToResolvedAddresses(inetAddresses.iterator(), port));
-        } catch (UnknownHostException e) {
-            log.error("Invalid remote url {}", hostname, e);
-            return FutureUtil.failedFuture(new InvalidServiceURL("Invalid url " + hostname, e));
+            if (isSniProxy) {
+                URI proxyURI = new URI(clientConfig.getProxyServiceUrl());
+                port = proxyURI.getPort();
+                resolvedAddress = resolveName(proxyURI.getHost());
+            } else {
+                port = unresolvedAddress.getPort();
+                resolvedAddress = resolveName(unresolvedAddress.getHostString());
+            }
+            return resolvedAddress.thenCompose(
+                    inetAddresses -> connectToResolvedAddresses(inetAddresses.iterator(), port,
+                            isSniProxy ? unresolvedAddress : null));
+        } catch (URISyntaxException e) {
+            log.error("Invalid Proxy url {}", clientConfig.getProxyServiceUrl(), e);
+            return FutureUtil
+                    .failedFuture(new InvalidServiceURL("Invalid url " + clientConfig.getProxyServiceUrl(), e));
         }
     }
 
@@ -243,16 +250,16 @@ public class ConnectionPool implements Closeable {
      * Try to connect to a sequence of IP addresses until a successfull connection can be made, or fail if no address is
      * working
      */
-    private CompletableFuture<Channel> connectToResolvedAddresses(Iterator<InetAddress> unresolvedAddresses, int port) {
+    private CompletableFuture<Channel> connectToResolvedAddresses(Iterator<InetAddress> unresolvedAddresses, int port, InetSocketAddress sniHost) {
         CompletableFuture<Channel> future = new CompletableFuture<>();
 
-        connectToAddress(unresolvedAddresses.next(), port, false).thenAccept(channel -> {
+        connectToAddress(unresolvedAddresses.next(), port, sniHost).thenAccept(channel -> {
             // Successfully connected to server
             future.complete(channel);
         }).exceptionally(exception -> {
             if (unresolvedAddresses.hasNext()) {
                 // Try next IP address
-                connectToResolvedAddresses(unresolvedAddresses, port).thenAccept(channel -> {
+                connectToResolvedAddresses(unresolvedAddresses, port, sniHost).thenAccept(channel -> {
                     future.complete(channel);
                 }).exceptionally(ex -> {
                     // This is already unwinding the recursive call
@@ -285,35 +292,39 @@ public class ConnectionPool implements Closeable {
     /**
      * Attempt to establish a TCP connection to an already resolved single IP address
      */
-    private CompletableFuture<Channel> connectToAddress(InetAddress ipAddress, int port, boolean ignoreProxyUrl) {
+    private CompletableFuture<Channel> connectToAddress(InetAddress ipAddress, int port, InetSocketAddress sniHost) {
         CompletableFuture<Channel> future = new CompletableFuture<>();
-
-        if (!ignoreProxyUrl && isSniProxy()) {
-            // client wants to connect to proxy and wants to pass 
-            // target connection host in sni header
-            channelInitializerHandler.setSniHostName(ipAddress.getHostName());
-            channelInitializerHandler.setSniHostPort(port);
-            // connect to proxy host
-            try {
-                URI proxyURI = new URI(clientConfig.getProxyServiceUrl());
-                // resolve proxy host-address and try to connect again by passing flag ignoreProxyUrl because proxy-host
-                // will be already resolved
-                return resolveName(proxyURI.getHost())
-                        .thenCompose(inetAddresses -> connectToAddress(inetAddresses.iterator().next(), proxyURI.getPort(), true));
-            } catch (URISyntaxException e) {
-                log.error("Failed to parse proxy-service url {}", clientConfig.getProxyServiceUrl(), e);
-                future.completeExceptionally(e);
-                return future;
-            }
+        // if proxy is configured in pulsar-client then make it thread-safe while updating channelInitializerHandler
+        if (isSniProxy) {
+            bootstrap.register().addListener((ChannelFuture cf) -> {
+                if (!cf.isSuccess()) {
+                    future.completeExceptionally(cf.cause());
+                    return;
+                }
+                Channel channel = cf.channel();
+                try {
+                    channelInitializerHandler.initChannel(channel, sniHost);
+                    channel.connect(new InetSocketAddress(ipAddress, port)).addListener((ChannelFuture channelFuture) -> {
+                        if (channelFuture.isSuccess()) {
+                            future.complete(channelFuture.channel());
+                        } else {
+                            future.completeExceptionally(channelFuture.cause());
+                        }
+                    });
+                } catch (Exception e) {
+                    log.warn("Failed to initialize channel with {}, {}", ipAddress, sniHost, e);
+                    future.completeExceptionally(e);
+                }
+            });
+        } else {
+            bootstrap.connect(ipAddress, port).addListener((ChannelFuture channelFuture) -> {
+                if (channelFuture.isSuccess()) {
+                    future.complete(channelFuture.channel());
+                } else {
+                    future.completeExceptionally(channelFuture.cause());
+                }
+            });
         }
-        bootstrap.connect(ipAddress, port).addListener((ChannelFuture channelFuture) -> {
-            if (channelFuture.isSuccess()) {
-                future.complete(channelFuture.channel());
-            } else {
-                future.completeExceptionally(channelFuture.cause());
-            }
-        });
-
         return future;
     }
 
@@ -336,7 +347,6 @@ public class ConnectionPool implements Closeable {
         } catch (InterruptedException e) {
             log.warn("EventLoopGroup shutdown was interrupted", e);
         }
-
         dnsResolver.close();
     }
 
@@ -361,10 +371,5 @@ public class ConnectionPool implements Closeable {
         return mod;
     }
 
-    private boolean isSniProxy() {
-        return channelInitializerHandler.isTlsEnabled() && clientConfig.getProxyProtocol() != null
-                && StringUtils.isNotBlank(clientConfig.getProxyServiceUrl());
-    }
-
     private static final Logger log = LoggerFactory.getLogger(ConnectionPool.class);
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
index ef2a78b..043151c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
@@ -18,11 +18,11 @@
  */
 package org.apache.pulsar.client.impl;
 
+import java.net.InetSocketAddress;
 import java.security.cert.X509Certificate;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.util.ObjectCache;
@@ -31,13 +31,13 @@ import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.util.SecurityUtility;
 import org.apache.pulsar.common.util.keystoretls.NettySSLContextAutoRefreshBuilder;
 
+import io.netty.channel.Channel;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.SslHandler;
 import lombok.Getter;
-import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
@@ -52,19 +52,17 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel>
 
     private final Supplier<SslContext> sslContextSupplier;
     private NettySSLContextAutoRefreshBuilder nettySSLContextAutoRefreshBuilder;
-    @Setter
-    private String sniHostName;
-    @Setter
-    private int sniHostPort;
+    private final boolean isSniProxyEnabled;
 
     private static final long TLS_CERTIFICATE_CACHE_MILLIS = TimeUnit.MINUTES.toMillis(1);
 
-    public PulsarChannelInitializer(ClientConfigurationData conf, Supplier<ClientCnx> clientCnxSupplier)
+    public PulsarChannelInitializer(ClientConfigurationData conf, Supplier<ClientCnx> clientCnxSupplier, boolean isSniProxyEnabled)
             throws Exception {
         super();
         this.clientCnxSupplier = clientCnxSupplier;
         this.tlsEnabled = conf.isUseTls();
         this.tlsEnabledWithKeyStore = conf.isUseKeyStoreTls();
+        this.isSniProxyEnabled = isSniProxyEnabled;
 
         if (tlsEnabled) {
             if (tlsEnabledWithKeyStore) {
@@ -109,26 +107,33 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel>
 
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
+        /**
+         * skip initializing channel if sni-proxy is enabled in that case {@link ConnectionPool} will initialize the
+         * channel explicitly.
+         */
+        if (!isSniProxyEnabled) {
+            initChannel(ch, null);
+        }
+    }
+
+    public void initChannel(Channel ch, InetSocketAddress sniHost) throws Exception {
         if (tlsEnabled) {
             if (tlsEnabledWithKeyStore) {
                 ch.pipeline().addLast(TLS_HANDLER,
                         new SslHandler(nettySSLContextAutoRefreshBuilder.get().createSSLEngine()));
-			} else {
-				SslHandler handler = StringUtils.isNotBlank(sniHostName)
-						? sslContextSupplier.get().newHandler(ch.alloc(), sniHostName, sniHostPort)
-						: sslContextSupplier.get().newHandler(ch.alloc());
-				ch.pipeline().addLast(TLS_HANDLER, handler);
-			}
+            } else {
+                SslHandler handler = sniHost != null
+                        ? sslContextSupplier.get().newHandler(ch.alloc(), sniHost.getHostName(), sniHost.getPort())
+                        : sslContextSupplier.get().newHandler(ch.alloc());
+                ch.pipeline().addLast(TLS_HANDLER, handler);
+            }
             ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.COPYING_ENCODER);
         } else {
             ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
         }
 
-        ch.pipeline()
-                .addLast("frameDecoder",
-                        new LengthFieldBasedFrameDecoder(
-                                Commands.DEFAULT_MAX_MESSAGE_SIZE + Commands.MESSAGE_SIZE_FRAME_PADDING,
-                                0, 4, 0, 4));
+        ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
+                Commands.DEFAULT_MAX_MESSAGE_SIZE + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
         ch.pipeline().addLast("handler", clientCnxSupplier.get());
     }
 }