You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2020/10/02 21:32:20 UTC

[pulsar] branch master updated: Improve refactored client connection code (#8117). (#8177)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1af5c8e  Improve refactored client connection code (#8117). (#8177)
1af5c8e is described below

commit 1af5c8ec77f2e1372932be349f8c5a3d0e068a1b
Author: Rolf Arne Corneliussen <ra...@users.noreply.github.com>
AuthorDate: Fri Oct 2 23:31:51 2020 +0200

    Improve refactored client connection code (#8117). (#8177)
    
    ### Motivation
    
    Improve previous PR #8117 (Always use SNI for TLS enabled Java client)
    
    ### Modifications
    - Use `ChannelFutures.toCompletableFuture` instead of private static utility method.
    - When TLS is not enabled, use 'original' code that invokes `Bootstrap.connect(InetSocketAddress)`; it is only when TLS is enabled we need custom setup code to properly set SNI headers.
    - Add documentation and argument checks to `PulsarChannelInitializer.initTls`
---
 .../apache/pulsar/client/impl/ConnectionPool.java  | 27 ++++++++--------------
 .../client/impl/PulsarChannelInitializer.java      | 12 ++++++++++
 2 files changed, 22 insertions(+), 17 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index 6203990..e1ebecd 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -18,12 +18,13 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static org.apache.pulsar.common.util.netty.ChannelFutures.toCompletableFuture;
+
 import com.google.common.annotations.VisibleForTesting;
 
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelException;
-import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
 import io.netty.resolver.dns.DnsNameResolver;
@@ -294,11 +295,14 @@ public class ConnectionPool implements Closeable {
      */
     private CompletableFuture<Channel> connectToAddress(InetAddress ipAddress, int port, InetSocketAddress sniHost) {
         InetSocketAddress remoteAddress = new InetSocketAddress(ipAddress, port);
-        return adapt(bootstrap.register())
-                .thenCompose(channel -> clientConfig.isUseTls()
-                        ? channelInitializerHandler.initTls(channel, sniHost != null ? sniHost : remoteAddress)
-                        : CompletableFuture.completedFuture(channel))
-                .thenCompose(channel -> adapt(channel.connect(remoteAddress)));
+        if (clientConfig.isUseTls()) {
+            return toCompletableFuture(bootstrap.register())
+                    .thenCompose(channel -> channelInitializerHandler
+                            .initTls(channel, sniHost != null ? sniHost : remoteAddress))
+                    .thenCompose(channel -> toCompletableFuture(channel.connect(remoteAddress)));
+        } else {
+            return toCompletableFuture(bootstrap.connect(remoteAddress));
+        }
     }
 
     public void releaseConnection(ClientCnx cnx) {
@@ -346,16 +350,5 @@ public class ConnectionPool implements Closeable {
 
     private static final Logger log = LoggerFactory.getLogger(ConnectionPool.class);
 
-    private static CompletableFuture<Channel> adapt(ChannelFuture channelFuture) {
-        CompletableFuture<Channel> adapter = new CompletableFuture<>();
-        channelFuture.addListener((ChannelFuture cf) ->{
-            if (cf.isSuccess()) {
-                adapter.complete(channelFuture.channel());
-            } else {
-                adapter.completeExceptionally(channelFuture.cause());
-            }
-        });
-        return adapter;
-    }
 }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
index f50bed5..e9a8bcd 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.impl;
 
 import java.net.InetSocketAddress;
+import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
@@ -115,7 +116,18 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel>
         ch.pipeline().addLast("handler", clientCnxSupplier.get());
     }
 
+   /**
+     * Initialize TLS for a channel. Should be invoked before the channel is connected to the remote address.
+     *
+     * @param ch      the channel
+     * @param sniHost the value of this argument will be passed as peer host and port when creating the SSLEngine (which
+     *                in turn will use these values to set SNI header when doing the TLS handshake). Cannot be
+     *                <code>null</code>.
+     * @return a {@link CompletableFuture} that completes when the TLS is set up.
+     */
     CompletableFuture<Channel> initTls(Channel ch, InetSocketAddress sniHost) {
+        Objects.requireNonNull(ch, "A channel is required");
+        Objects.requireNonNull(sniHost, "A sniHost is required");
         if (!tlsEnabled) {
             throw new IllegalStateException("TLS is not enabled in client configuration");
         }