You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rd...@apache.org on 2020/09/21 22:39:32 UTC
[pulsar] branch master updated: [pulsar-client] sni-proxy protocol
should pass sni-host address without resolving (#8062)
This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a7e30d8 [pulsar-client] sni-proxy protocol should pass sni-host address without resolving (#8062)
a7e30d8 is described below
commit a7e30d8150f626e75d8a17980e6e1019de659a5d
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Mon Sep 21 15:39:09 2020 -0700
[pulsar-client] sni-proxy protocol should pass sni-host address without resolving (#8062)
make sni-proxy connection creation thread-safe
remove unused pair
initialize channel explicitly when sni-proxy is configured
initialize channel in io thread
fix channel var
---
.../pulsar/client/api/ProxyProtocolTest.java | 3 +-
.../apache/pulsar/client/impl/ConnectionPool.java | 105 +++++++++++----------
.../client/impl/PulsarChannelInitializer.java | 41 ++++----
3 files changed, 79 insertions(+), 70 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProxyProtocolTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProxyProtocolTest.java
index 964ebd9..d264b83 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProxyProtocolTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProxyProtocolTest.java
@@ -40,7 +40,7 @@ public class ProxyProtocolTest extends TlsProducerConsumerBase {
// Client should try to connect to proxy and pass broker-url as SNI header
String proxyUrl = pulsar.getBrokerServiceUrlTls();
- String brokerServiceUrl = "pulsar+ssl://1.1.1.1:6651";
+ String brokerServiceUrl = "pulsar+ssl://unresolvable-address:6651";
String topicName = "persistent://my-property/use/my-ns/my-topic1";
ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(brokerServiceUrl)
@@ -53,7 +53,6 @@ public class ProxyProtocolTest extends TlsProducerConsumerBase {
@Cleanup
PulsarClient pulsarClient = clientBuilder.build();
-
// should be able to create producer successfully
pulsarClient.newProducer().topic(topicName).create();
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index 316145e..32d7195 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -19,7 +19,6 @@
package org.apache.pulsar.client.impl;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
@@ -37,7 +36,6 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
-import java.net.UnknownHostException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -66,6 +64,7 @@ public class ConnectionPool implements Closeable {
private final ClientConfigurationData clientConfig;
private final EventLoopGroup eventLoopGroup;
private final int maxConnectionsPerHosts;
+ private final boolean isSniProxy;
protected final DnsNameResolver dnsResolver;
@@ -78,6 +77,8 @@ public class ConnectionPool implements Closeable {
this.eventLoopGroup = eventLoopGroup;
this.clientConfig = conf;
this.maxConnectionsPerHosts = conf.getConnectionsPerBroker();
+ this.isSniProxy = clientConfig.isUseTls() && clientConfig.getProxyProtocol() != null
+ && StringUtils.isNotBlank(clientConfig.getProxyServiceUrl());
pool = new ConcurrentHashMap<>();
bootstrap = new Bootstrap();
@@ -89,7 +90,7 @@ public class ConnectionPool implements Closeable {
bootstrap.option(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT);
try {
- channelInitializerHandler = new PulsarChannelInitializer(conf, clientCnxSupplier);
+ channelInitializerHandler = new PulsarChannelInitializer(conf, clientCnxSupplier, isSniProxy);
bootstrap.handler(channelInitializerHandler);
} catch (Exception e) {
log.error("Failed to create channel initializer");
@@ -224,18 +225,24 @@ public class ConnectionPool implements Closeable {
* Resolve DNS asynchronously and attempt to connect to any IP address returned by DNS server
*/
private CompletableFuture<Channel> createConnection(InetSocketAddress unresolvedAddress) {
- String hostname = unresolvedAddress.getHostString();
- int port = unresolvedAddress.getPort();
+ int port;
+ CompletableFuture<List<InetAddress>> resolvedAddress = null;
try {
- // For non-sni-proxy: Resolve DNS --> Attempt to connect to all IP addresses until once succeeds
- CompletableFuture<List<InetAddress>> resolvedAddress = isSniProxy()
- ? CompletableFuture.completedFuture(Lists.newArrayList(InetAddress.getByName(hostname)))
- : resolveName(hostname);
- return resolvedAddress
- .thenCompose(inetAddresses -> connectToResolvedAddresses(inetAddresses.iterator(), port));
- } catch (UnknownHostException e) {
- log.error("Invalid remote url {}", hostname, e);
- return FutureUtil.failedFuture(new InvalidServiceURL("Invalid url " + hostname, e));
+ if (isSniProxy) {
+ URI proxyURI = new URI(clientConfig.getProxyServiceUrl());
+ port = proxyURI.getPort();
+ resolvedAddress = resolveName(proxyURI.getHost());
+ } else {
+ port = unresolvedAddress.getPort();
+ resolvedAddress = resolveName(unresolvedAddress.getHostString());
+ }
+ return resolvedAddress.thenCompose(
+ inetAddresses -> connectToResolvedAddresses(inetAddresses.iterator(), port,
+ isSniProxy ? unresolvedAddress : null));
+ } catch (URISyntaxException e) {
+ log.error("Invalid Proxy url {}", clientConfig.getProxyServiceUrl(), e);
+ return FutureUtil
+ .failedFuture(new InvalidServiceURL("Invalid url " + clientConfig.getProxyServiceUrl(), e));
}
}
@@ -243,16 +250,16 @@ public class ConnectionPool implements Closeable {
* Try to connect to a sequence of IP addresses until a successfull connection can be made, or fail if no address is
* working
*/
- private CompletableFuture<Channel> connectToResolvedAddresses(Iterator<InetAddress> unresolvedAddresses, int port) {
+ private CompletableFuture<Channel> connectToResolvedAddresses(Iterator<InetAddress> unresolvedAddresses, int port, InetSocketAddress sniHost) {
CompletableFuture<Channel> future = new CompletableFuture<>();
- connectToAddress(unresolvedAddresses.next(), port, false).thenAccept(channel -> {
+ connectToAddress(unresolvedAddresses.next(), port, sniHost).thenAccept(channel -> {
// Successfully connected to server
future.complete(channel);
}).exceptionally(exception -> {
if (unresolvedAddresses.hasNext()) {
// Try next IP address
- connectToResolvedAddresses(unresolvedAddresses, port).thenAccept(channel -> {
+ connectToResolvedAddresses(unresolvedAddresses, port, sniHost).thenAccept(channel -> {
future.complete(channel);
}).exceptionally(ex -> {
// This is already unwinding the recursive call
@@ -285,35 +292,39 @@ public class ConnectionPool implements Closeable {
/**
* Attempt to establish a TCP connection to an already resolved single IP address
*/
- private CompletableFuture<Channel> connectToAddress(InetAddress ipAddress, int port, boolean ignoreProxyUrl) {
+ private CompletableFuture<Channel> connectToAddress(InetAddress ipAddress, int port, InetSocketAddress sniHost) {
CompletableFuture<Channel> future = new CompletableFuture<>();
-
- if (!ignoreProxyUrl && isSniProxy()) {
- // client wants to connect to proxy and wants to pass
- // target connection host in sni header
- channelInitializerHandler.setSniHostName(ipAddress.getHostName());
- channelInitializerHandler.setSniHostPort(port);
- // connect to proxy host
- try {
- URI proxyURI = new URI(clientConfig.getProxyServiceUrl());
- // resolve proxy host-address and try to connect again by passing flag ignoreProxyUrl because proxy-host
- // will be already resolved
- return resolveName(proxyURI.getHost())
- .thenCompose(inetAddresses -> connectToAddress(inetAddresses.iterator().next(), proxyURI.getPort(), true));
- } catch (URISyntaxException e) {
- log.error("Failed to parse proxy-service url {}", clientConfig.getProxyServiceUrl(), e);
- future.completeExceptionally(e);
- return future;
- }
+ // if proxy is configured in pulsar-client then make it thread-safe while updating channelInitializerHandler
+ if (isSniProxy) {
+ bootstrap.register().addListener((ChannelFuture cf) -> {
+ if (!cf.isSuccess()) {
+ future.completeExceptionally(cf.cause());
+ return;
+ }
+ Channel channel = cf.channel();
+ try {
+ channelInitializerHandler.initChannel(channel, sniHost);
+ channel.connect(new InetSocketAddress(ipAddress, port)).addListener((ChannelFuture channelFuture) -> {
+ if (channelFuture.isSuccess()) {
+ future.complete(channelFuture.channel());
+ } else {
+ future.completeExceptionally(channelFuture.cause());
+ }
+ });
+ } catch (Exception e) {
+ log.warn("Failed to initialize channel with {}, {}", ipAddress, sniHost, e);
+ future.completeExceptionally(e);
+ }
+ });
+ } else {
+ bootstrap.connect(ipAddress, port).addListener((ChannelFuture channelFuture) -> {
+ if (channelFuture.isSuccess()) {
+ future.complete(channelFuture.channel());
+ } else {
+ future.completeExceptionally(channelFuture.cause());
+ }
+ });
}
- bootstrap.connect(ipAddress, port).addListener((ChannelFuture channelFuture) -> {
- if (channelFuture.isSuccess()) {
- future.complete(channelFuture.channel());
- } else {
- future.completeExceptionally(channelFuture.cause());
- }
- });
-
return future;
}
@@ -336,7 +347,6 @@ public class ConnectionPool implements Closeable {
} catch (InterruptedException e) {
log.warn("EventLoopGroup shutdown was interrupted", e);
}
-
dnsResolver.close();
}
@@ -361,10 +371,5 @@ public class ConnectionPool implements Closeable {
return mod;
}
- private boolean isSniProxy() {
- return channelInitializerHandler.isTlsEnabled() && clientConfig.getProxyProtocol() != null
- && StringUtils.isNotBlank(clientConfig.getProxyServiceUrl());
- }
-
private static final Logger log = LoggerFactory.getLogger(ConnectionPool.class);
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
index ef2a78b..043151c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
@@ -18,11 +18,11 @@
*/
package org.apache.pulsar.client.impl;
+import java.net.InetSocketAddress;
import java.security.cert.X509Certificate;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
-import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.util.ObjectCache;
@@ -31,13 +31,13 @@ import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.SecurityUtility;
import org.apache.pulsar.common.util.keystoretls.NettySSLContextAutoRefreshBuilder;
+import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import lombok.Getter;
-import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@@ -52,19 +52,17 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel>
private final Supplier<SslContext> sslContextSupplier;
private NettySSLContextAutoRefreshBuilder nettySSLContextAutoRefreshBuilder;
- @Setter
- private String sniHostName;
- @Setter
- private int sniHostPort;
+ private final boolean isSniProxyEnabled;
private static final long TLS_CERTIFICATE_CACHE_MILLIS = TimeUnit.MINUTES.toMillis(1);
- public PulsarChannelInitializer(ClientConfigurationData conf, Supplier<ClientCnx> clientCnxSupplier)
+ public PulsarChannelInitializer(ClientConfigurationData conf, Supplier<ClientCnx> clientCnxSupplier, boolean isSniProxyEnabled)
throws Exception {
super();
this.clientCnxSupplier = clientCnxSupplier;
this.tlsEnabled = conf.isUseTls();
this.tlsEnabledWithKeyStore = conf.isUseKeyStoreTls();
+ this.isSniProxyEnabled = isSniProxyEnabled;
if (tlsEnabled) {
if (tlsEnabledWithKeyStore) {
@@ -109,26 +107,33 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel>
@Override
public void initChannel(SocketChannel ch) throws Exception {
+ /**
+ * skip initializing channel if sni-proxy is enabled in that case {@link ConnectionPool} will initialize the
+ * channel explicitly.
+ */
+ if (!isSniProxyEnabled) {
+ initChannel(ch, null);
+ }
+ }
+
+ public void initChannel(Channel ch, InetSocketAddress sniHost) throws Exception {
if (tlsEnabled) {
if (tlsEnabledWithKeyStore) {
ch.pipeline().addLast(TLS_HANDLER,
new SslHandler(nettySSLContextAutoRefreshBuilder.get().createSSLEngine()));
- } else {
- SslHandler handler = StringUtils.isNotBlank(sniHostName)
- ? sslContextSupplier.get().newHandler(ch.alloc(), sniHostName, sniHostPort)
- : sslContextSupplier.get().newHandler(ch.alloc());
- ch.pipeline().addLast(TLS_HANDLER, handler);
- }
+ } else {
+ SslHandler handler = sniHost != null
+ ? sslContextSupplier.get().newHandler(ch.alloc(), sniHost.getHostName(), sniHost.getPort())
+ : sslContextSupplier.get().newHandler(ch.alloc());
+ ch.pipeline().addLast(TLS_HANDLER, handler);
+ }
ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.COPYING_ENCODER);
} else {
ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
}
- ch.pipeline()
- .addLast("frameDecoder",
- new LengthFieldBasedFrameDecoder(
- Commands.DEFAULT_MAX_MESSAGE_SIZE + Commands.MESSAGE_SIZE_FRAME_PADDING,
- 0, 4, 0, 4));
+ ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
+ Commands.DEFAULT_MAX_MESSAGE_SIZE + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
ch.pipeline().addLast("handler", clientCnxSupplier.get());
}
}