You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2020/10/02 21:32:20 UTC
[pulsar] branch master updated: Improve refactored client
connection code (#8117). (#8177)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1af5c8e Improve refactored client connection code (#8117). (#8177)
1af5c8e is described below
commit 1af5c8ec77f2e1372932be349f8c5a3d0e068a1b
Author: Rolf Arne Corneliussen <ra...@users.noreply.github.com>
AuthorDate: Fri Oct 2 23:31:51 2020 +0200
Improve refactored client connection code (#8117). (#8177)
### Motivation
Improve previous PR #8117 (Always use SNI for TLS enabled Java client)
### Modifications
- Use `ChannelFutures.toCompletableFuture` instead of private static utility method.
- When TLS is not enabled, use 'original' code that invokes `Bootstrap.connect(InetSocketAddress)`; it is only when TLS is enabled we need custom setup code to properly set SNI headers.
- Add documentation and argument checks to `PulsarChannelInitializer.initTls`
---
.../apache/pulsar/client/impl/ConnectionPool.java | 27 ++++++++--------------
.../client/impl/PulsarChannelInitializer.java | 12 ++++++++++
2 files changed, 22 insertions(+), 17 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index 6203990..e1ebecd 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -18,12 +18,13 @@
*/
package org.apache.pulsar.client.impl;
+import static org.apache.pulsar.common.util.netty.ChannelFutures.toCompletableFuture;
+
import com.google.common.annotations.VisibleForTesting;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
-import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.resolver.dns.DnsNameResolver;
@@ -294,11 +295,14 @@ public class ConnectionPool implements Closeable {
*/
private CompletableFuture<Channel> connectToAddress(InetAddress ipAddress, int port, InetSocketAddress sniHost) {
InetSocketAddress remoteAddress = new InetSocketAddress(ipAddress, port);
- return adapt(bootstrap.register())
- .thenCompose(channel -> clientConfig.isUseTls()
- ? channelInitializerHandler.initTls(channel, sniHost != null ? sniHost : remoteAddress)
- : CompletableFuture.completedFuture(channel))
- .thenCompose(channel -> adapt(channel.connect(remoteAddress)));
+ if (clientConfig.isUseTls()) {
+ return toCompletableFuture(bootstrap.register())
+ .thenCompose(channel -> channelInitializerHandler
+ .initTls(channel, sniHost != null ? sniHost : remoteAddress))
+ .thenCompose(channel -> toCompletableFuture(channel.connect(remoteAddress)));
+ } else {
+ return toCompletableFuture(bootstrap.connect(remoteAddress));
+ }
}
public void releaseConnection(ClientCnx cnx) {
@@ -346,16 +350,5 @@ public class ConnectionPool implements Closeable {
private static final Logger log = LoggerFactory.getLogger(ConnectionPool.class);
- private static CompletableFuture<Channel> adapt(ChannelFuture channelFuture) {
- CompletableFuture<Channel> adapter = new CompletableFuture<>();
- channelFuture.addListener((ChannelFuture cf) ->{
- if (cf.isSuccess()) {
- adapter.complete(channelFuture.channel());
- } else {
- adapter.completeExceptionally(channelFuture.cause());
- }
- });
- return adapter;
- }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
index f50bed5..e9a8bcd 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.client.impl;
import java.net.InetSocketAddress;
+import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
@@ -115,7 +116,18 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel>
ch.pipeline().addLast("handler", clientCnxSupplier.get());
}
+ /**
+ * Initialize TLS for a channel. Should be invoked before the channel is connected to the remote address.
+ *
+ * @param ch the channel
+ * @param sniHost the value of this argument will be passed as peer host and port when creating the SSLEngine (which
+ * in turn will use these values to set SNI header when doing the TLS handshake). Cannot be
+ * <code>null</code>.
+ * @return a {@link CompletableFuture} that completes when the TLS is set up.
+ */
CompletableFuture<Channel> initTls(Channel ch, InetSocketAddress sniHost) {
+ Objects.requireNonNull(ch, "A channel is required");
+ Objects.requireNonNull(sniHost, "A sniHost is required");
if (!tlsEnabled) {
throw new IllegalStateException("TLS is not enabled in client configuration");
}