You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/06/04 02:10:55 UTC
[pulsar] branch master updated: [PIP-60] [Proxy-Server] Support SNI
routing to support various proxy-server in pulsar (#6566)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new fb4a627 [PIP-60] [Proxy-Server] Support SNI routing to support various proxy-server in pulsar (#6566)
fb4a627 is described below
commit fb4a627151c939a4af984c231b5891dde8c10cc6
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Wed Jun 3 19:10:45 2020 -0700
[PIP-60] [Proxy-Server] Support SNI routing to support various proxy-server in pulsar (#6566)
### Motivation
Implementation of [PIP-60](https://github.com/apache/pulsar/wiki/PIP-60:-Support-Proxy-server-with-SNI-routing)
A proxy server is a go‑between or intermediary server that forwards requests from multiple clients to different servers across the Internet. The proxy server can act as a “traffic cop,” in both forward and reverse proxy scenarios, and adds various benefits in your system such as load balancing, performance, security, auto-scaling, etc.. There are already many proxy servers already available in the market which are fast, scalable and more importantly covers various essential security a [...]
[Netty supports sending SNI header on TLS handshake](https://github.com/netty/netty/issues/3801#issuecomment-104274440) and this PR uses that Netty feature to send SNI header while connecting to proxy.
### Modification
https://github.com/apache/pulsar/wiki/PIP-60:-Support-Proxy-server-with-SNI-routing#changes
**Note:** we have fully tested this changes with ATS proxy for both forward and reverse proxy scenarios. And I have also shared e2e example in PIP to use ATS proxy for client and broker integration.
---
.../pulsar/broker/service/BrokerService.java | 7 +-
.../pulsar/client/api/ProxyProtocolTest.java | 109 +++++++++++++++++++++
.../apache/pulsar/client/api/ClientBuilder.java | 10 ++
.../apache/pulsar/client/api/ProxyProtocol.java | 31 ++++++
.../pulsar/client/api/PulsarClientException.java | 51 +++++++++-
.../org/apache/pulsar/admin/cli/CmdClusters.java | 10 +-
.../apache/pulsar/client/cli/PulsarClientTool.java | 14 +++
.../pulsar/client/impl/ClientBuilderImpl.java | 11 +++
.../apache/pulsar/client/impl/ConnectionPool.java | 55 +++++++++--
.../client/impl/PulsarChannelInitializer.java | 18 +++-
.../client/impl/conf/ClientConfigurationData.java | 5 +
.../pulsar/common/policies/data/ClusterData.java | 47 ++++++++-
12 files changed, 351 insertions(+), 17 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 54facc4..0790e73 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -85,6 +85,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoun
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarServerException;
@@ -861,7 +862,11 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
clientBuilder.serviceUrl(
isNotBlank(data.getBrokerServiceUrl()) ? data.getBrokerServiceUrl() : data.getServiceUrl());
}
-
+ if (data.getProxyProtocol() != null && StringUtils.isNotBlank(data.getProxyServiceUrl())) {
+ clientBuilder.proxyServiceUrl(data.getProxyServiceUrl(), data.getProxyProtocol());
+ log.info("Configuring proxy-url {} with protocol {}", data.getProxyServiceUrl(),
+ data.getProxyProtocol());
+ }
// Share all the IO threads across broker and client connections
ClientConfigurationData conf = ((ClientBuilderImpl) clientBuilder).getClientConfigurationData();
return new PulsarClientImpl(conf, workerGroup);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProxyProtocolTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProxyProtocolTest.java
new file mode 100644
index 0000000..964ebd9
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProxyProtocolTest.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.Test;
+
+import lombok.Cleanup;
+
+public class ProxyProtocolTest extends TlsProducerConsumerBase {
+ private static final Logger log = LoggerFactory.getLogger(ProxyProtocolTest.class);
+
+ @Test
+ public void testSniProxyProtocol() throws Exception {
+
+ // Client should try to connect to proxy and pass broker-url as SNI header
+ String proxyUrl = pulsar.getBrokerServiceUrlTls();
+ String brokerServiceUrl = "pulsar+ssl://1.1.1.1:6651";
+ String topicName = "persistent://my-property/use/my-ns/my-topic1";
+
+ ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(brokerServiceUrl)
+ .tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH).enableTls(true).allowTlsInsecureConnection(false)
+ .proxyServiceUrl(proxyUrl, ProxyProtocol.SNI).operationTimeout(1000, TimeUnit.MILLISECONDS);
+ Map<String, String> authParams = new HashMap<>();
+ authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
+ authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
+ clientBuilder.authentication(AuthenticationTls.class.getName(), authParams);
+
+ @Cleanup
+ PulsarClient pulsarClient = clientBuilder.build();
+
+ // should be able to create producer successfully
+ pulsarClient.newProducer().topic(topicName).create();
+ }
+
+ @Test
+ public void testSniProxyProtocolWithInvalidProxyUrl() throws Exception {
+
+ // Client should try to connect to proxy and pass broker-url as SNI header
+ String brokerServiceUrl = "pulsar+ssl://1.1.1.1:6651";
+ String proxyHost = "invalid-url";
+ String proxyUrl = "pulsar+ssl://" + proxyHost + ":5555";
+ String topicName = "persistent://my-property/use/my-ns/my-topic1";
+
+ ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(brokerServiceUrl)
+ .tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH).enableTls(true).allowTlsInsecureConnection(false)
+ .proxyServiceUrl(proxyUrl, ProxyProtocol.SNI).operationTimeout(1000, TimeUnit.MILLISECONDS);
+ Map<String, String> authParams = new HashMap<>();
+ authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
+ authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
+ clientBuilder.authentication(AuthenticationTls.class.getName(), authParams);
+
+ @Cleanup
+ PulsarClient pulsarClient = clientBuilder.build();
+
+ try {
+ pulsarClient.newProducer().topic(topicName).create();
+ fail("should have failed due to invalid url");
+ } catch (PulsarClientException e) {
+ assertTrue(e.getMessage().contains(proxyHost));
+ }
+ }
+
+ @Test
+ public void testSniProxyProtocolWithoutTls() throws Exception {
+ // Client should try to connect to proxy and pass broker-url as SNI header
+ String proxyUrl = pulsar.getBrokerServiceUrl();
+ String brokerServiceUrl = "pulsar+ssl://1.1.1.1:6651";
+ String topicName = "persistent://my-property/use/my-ns/my-topic1";
+
+ ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(brokerServiceUrl)
+ .proxyServiceUrl(proxyUrl, ProxyProtocol.SNI).operationTimeout(1000, TimeUnit.MILLISECONDS);
+
+ @Cleanup
+ PulsarClient pulsarClient = clientBuilder.build();
+
+ try {
+ pulsarClient.newProducer().topic(topicName).create();
+ fail("should have failed due to non-tls url");
+ } catch (PulsarClientException e) {
+ // Ok
+ }
+ }
+}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
index b3aee5b..865855c 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
@@ -468,4 +468,14 @@ public interface ClientBuilder extends Cloneable {
* @return the client builder instance
*/
ClientBuilder clock(Clock clock);
+
+ /**
+ * Proxy-service url when client would like to connect to broker via proxy. Client can choose type of proxy-routing
+ * using {@link ProxyProtocol}.
+ *
+ * @param proxyServiceUrl proxy service url
+ * @param proxyProtocol protocol to decide type of proxy routing eg: SNI-routing
+ * @return
+ */
+ ClientBuilder proxyServiceUrl(String proxyServiceUrl, ProxyProtocol proxyProtocol);
}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProxyProtocol.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProxyProtocol.java
new file mode 100644
index 0000000..470e475
--- /dev/null
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProxyProtocol.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+/**
+ * Protcol type to determine type of proxy routing when client connects to proxy using
+ * {@link ClientBuilder::proxyServiceUrl}.
+ */
+public enum ProxyProtocol {
+ /**
+ * Follows SNI-routing
+ * https://docs.trafficserver.apache.org/en/latest/admin-guide/layer-4-routing.en.html#sni-routing.
+ **/
+ SNI
+}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
index 16af009..b6e327c 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
@@ -68,12 +68,25 @@ public class PulsarClientException extends IOException {
/**
* Constructs an {@code PulsarClientException} with the specified cause.
*
+ * @param msg
+ * The detail message (which is saved for later retrieval by the {@link #getMessage()} method)
+ *
* @param t
- * The cause (which is saved for later retrieval by the
- * {@link #getCause()} method). (A null value is permitted,
- * and indicates that the cause is nonexistent or unknown.)
+ * The cause (which is saved for later retrieval by the {@link #getCause()} method). (A null value is
+ * permitted, and indicates that the cause is nonexistent or unknown.)
+ */
+ public PulsarClientException(String msg, Throwable t) {
+ super(msg, t);
+ }
+
+ /**
+ * Constructs an {@code PulsarClientException} with the specified cause.
+ *
+ * @param t
+ * The cause (which is saved for later retrieval by the {@link #getCause()} method). (A null value is
+ * permitted, and indicates that the cause is nonexistent or unknown.)
* @param sequenceId
- * The sequenceId of the message
+ * The sequenceId of the message
*/
public PulsarClientException(Throwable t, long sequenceId) {
super(t);
@@ -95,6 +108,21 @@ public class PulsarClientException extends IOException {
public InvalidServiceURL(Throwable t) {
super(t);
}
+
+ /**
+ * Constructs an {@code InvalidServiceURL} with the specified cause.
+ *
+ *@param msg
+ * The detail message (which is saved for later retrieval
+ * by the {@link #getMessage()} method)
+ * @param t
+ * The cause (which is saved for later retrieval by the
+ * {@link #getCause()} method). (A null value is permitted,
+ * and indicates that the cause is nonexistent or unknown.)
+ */
+ public InvalidServiceURL(String msg, Throwable t) {
+ super(msg, t);
+ }
}
/**
@@ -123,6 +151,21 @@ public class PulsarClientException extends IOException {
public InvalidConfigurationException(Throwable t) {
super(t);
}
+
+ /**
+ * Constructs an {@code InvalidConfigurationException} with the specified cause.
+ *
+ *@param msg
+ * The detail message (which is saved for later retrieval
+ * by the {@link #getMessage()} method)
+ * @param t
+ * The cause (which is saved for later retrieval by the
+ * {@link #getCause()} method). (A null value is permitted,
+ * and indicates that the cause is nonexistent or unknown.)
+ */
+ public InvalidConfigurationException(String msg, Throwable t) {
+ super(msg, t);
+ }
}
/**
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java
index 88f3b3f..75f80ee 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java
@@ -24,6 +24,7 @@ import org.apache.commons.lang3.StringUtils;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.ProxyProtocol;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.FailureDomain;
@@ -69,10 +70,17 @@ public class CmdClusters extends CmdBase {
@Parameter(names = "--broker-url-secure", description = "broker-service-url for secure connection", required = false)
private String brokerServiceUrlTls;
+ @Parameter(names = "--proxy-url", description = "Proxy-service url when client would like to connect to broker via proxy.", required = false)
+ private String proxyServiceUrl;
+
+ @Parameter(names = "--proxy-protocol", description = "protocol to decide type of proxy routing eg: SNI", required = false)
+ private ProxyProtocol proxyProtocol;
+
void run() throws PulsarAdminException {
String cluster = getOneArgument(params);
admin.clusters().createCluster(cluster,
- new ClusterData(serviceUrl, serviceUrlTls, brokerServiceUrl, brokerServiceUrlTls));
+ new ClusterData(serviceUrl, serviceUrlTls, brokerServiceUrl, brokerServiceUrlTls, proxyServiceUrl,
+ proxyProtocol));
}
}
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java
index b86bc79..0ba7227 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java
@@ -29,6 +29,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.ProxyProtocol;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
@@ -43,6 +44,12 @@ public class PulsarClientTool {
@Parameter(names = { "--url" }, description = "Broker URL to which to connect.")
String serviceURL = null;
+ @Parameter(names = { "--proxy-url" }, description = "Proxy-server URL to which to connect.")
+ String proxyServiceURL = null;
+
+ @Parameter(names = { "--proxy-protocol" }, description = "Proxy protocol to select type of routing at proxy.")
+ ProxyProtocol proxyProtocol = null;
+
@Parameter(names = { "--auth-plugin" }, description = "Authentication plugin class name.")
String authPluginClassName = null;
@@ -117,6 +124,13 @@ public class PulsarClientTool {
.tlsTrustStorePath(tlsTrustStorePath)
.tlsTrustStorePassword(tlsTrustStorePassword);
+ if (StringUtils.isNotBlank(proxyServiceURL)) {
+ if (proxyProtocol == null) {
+ System.out.println("proxy-protocol must be provided with proxy-url");
+ System.exit(-1);
+ }
+ clientBuilder.proxyServiceUrl(proxyServiceURL, proxyProtocol);
+ }
this.produceCommand.updateConfig(clientBuilder, authentication, this.serviceURL);
this.consumeCommand.updateConfig(clientBuilder, authentication, this.serviceURL);
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
index 2c920e2..6b820b9 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
@@ -27,6 +27,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.ProxyProtocol;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
@@ -294,4 +295,14 @@ public class ClientBuilderImpl implements ClientBuilder {
conf.setClock(clock);
return this;
}
+
+ @Override
+ public ClientBuilder proxyServiceUrl(String proxyServiceUrl, ProxyProtocol proxyProtocol) {
+ if (StringUtils.isNotBlank(proxyServiceUrl) && proxyProtocol == null) {
+ throw new IllegalArgumentException("proxyProtocol must be present with proxyServiceUrl");
+ }
+ conf.setProxyServiceUrl(proxyServiceUrl);
+ conf.setProxyProtocol(proxyProtocol);
+ return this;
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index 30301bb..fff9858 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.client.impl;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
@@ -34,6 +35,9 @@ import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
@@ -43,9 +47,12 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,6 +61,8 @@ public class ConnectionPool implements Closeable {
protected final ConcurrentHashMap<InetSocketAddress, ConcurrentMap<Integer, CompletableFuture<ClientCnx>>> pool;
private final Bootstrap bootstrap;
+ private final PulsarChannelInitializer channelInitializerHandler;
+ private final ClientConfigurationData clientConfig;
private final EventLoopGroup eventLoopGroup;
private final int maxConnectionsPerHosts;
@@ -66,6 +75,7 @@ public class ConnectionPool implements Closeable {
public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup,
Supplier<ClientCnx> clientCnxSupplier) throws PulsarClientException {
this.eventLoopGroup = eventLoopGroup;
+ this.clientConfig = conf;
this.maxConnectionsPerHosts = conf.getConnectionsPerBroker();
pool = new ConcurrentHashMap<>();
@@ -78,7 +88,8 @@ public class ConnectionPool implements Closeable {
bootstrap.option(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT);
try {
- bootstrap.handler(new PulsarChannelInitializer(conf, clientCnxSupplier));
+ channelInitializerHandler = new PulsarChannelInitializer(conf, clientCnxSupplier);
+ bootstrap.handler(channelInitializerHandler);
} catch (Exception e) {
log.error("Failed to create channel initializer");
throw new PulsarClientException(e);
@@ -214,10 +225,17 @@ public class ConnectionPool implements Closeable {
private CompletableFuture<Channel> createConnection(InetSocketAddress unresolvedAddress) {
String hostname = unresolvedAddress.getHostString();
int port = unresolvedAddress.getPort();
-
- // Resolve DNS --> Attempt to connect to all IP addresses until once succeeds
- return resolveName(hostname)
- .thenCompose(inetAddresses -> connectToResolvedAddresses(inetAddresses.iterator(), port));
+ try {
+ // For non-sni-proxy: Resolve DNS --> Attempt to connect to all IP addresses until once succeeds
+ CompletableFuture<List<InetAddress>> resolvedAddress = isSniProxy()
+ ? CompletableFuture.completedFuture(Lists.newArrayList(InetAddress.getByName(hostname)))
+ : resolveName(hostname);
+ return resolvedAddress
+ .thenCompose(inetAddresses -> connectToResolvedAddresses(inetAddresses.iterator(), port));
+ } catch (UnknownHostException e) {
+ log.error("Invalid remote url {}", hostname, e);
+ return FutureUtil.failedFuture(new InvalidServiceURL("Invalid url " + hostname, e));
+ }
}
/**
@@ -227,7 +245,7 @@ public class ConnectionPool implements Closeable {
private CompletableFuture<Channel> connectToResolvedAddresses(Iterator<InetAddress> unresolvedAddresses, int port) {
CompletableFuture<Channel> future = new CompletableFuture<>();
- connectToAddress(unresolvedAddresses.next(), port).thenAccept(channel -> {
+ connectToAddress(unresolvedAddresses.next(), port, false).thenAccept(channel -> {
// Successfully connected to server
future.complete(channel);
}).exceptionally(exception -> {
@@ -266,9 +284,27 @@ public class ConnectionPool implements Closeable {
/**
* Attempt to establish a TCP connection to an already resolved single IP address
*/
- private CompletableFuture<Channel> connectToAddress(InetAddress ipAddress, int port) {
+ private CompletableFuture<Channel> connectToAddress(InetAddress ipAddress, int port, boolean ignoreProxyUrl) {
CompletableFuture<Channel> future = new CompletableFuture<>();
+ if (!ignoreProxyUrl && isSniProxy()) {
+ // client wants to connect to proxy and wants to pass
+ // target connection host in sni header
+ channelInitializerHandler.setSniHostName(ipAddress.getHostName());
+ channelInitializerHandler.setSniHostPort(port);
+ // connect to proxy host
+ try {
+ URI proxyURI = new URI(clientConfig.getProxyServiceUrl());
+ // resolve proxy host-address and try to connect again by passing flag ignoreProxyUrl because proxy-host
+ // will be already resolved
+ return resolveName(proxyURI.getHost())
+ .thenCompose(inetAddresses -> connectToAddress(inetAddresses.iterator().next(), proxyURI.getPort(), true));
+ } catch (URISyntaxException e) {
+ log.error("Failed to parse proxy-service url {}", clientConfig.getProxyServiceUrl(), e);
+ future.completeExceptionally(e);
+ return future;
+ }
+ }
bootstrap.connect(ipAddress, port).addListener((ChannelFuture channelFuture) -> {
if (channelFuture.isSuccess()) {
future.complete(channelFuture.channel());
@@ -307,5 +343,10 @@ public class ConnectionPool implements Closeable {
return mod;
}
+ private boolean isSniProxy() {
+ return channelInitializerHandler.isTlsEnabled() && clientConfig.getProxyProtocol() != null
+ && StringUtils.isNotBlank(clientConfig.getProxyServiceUrl());
+ }
+
private static final Logger log = LoggerFactory.getLogger(ConnectionPool.class);
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
index 4a145e7..e418904 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
@@ -27,6 +27,10 @@ import java.security.cert.X509Certificate;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
+import lombok.Getter;
+import lombok.Setter;
+
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.util.ObjectCache;
@@ -41,11 +45,16 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel>
public static final String TLS_HANDLER = "tls";
private final Supplier<ClientCnx> clientCnxSupplier;
+ @Getter
private final boolean tlsEnabled;
private final boolean tlsEnabledWithKeyStore;
private final Supplier<SslContext> sslContextSupplier;
private NettySSLContextAutoRefreshBuilder nettySSLContextAutoRefreshBuilder;
+ @Setter
+ private String sniHostName;
+ @Setter
+ private int sniHostPort;
private static final long TLS_CERTIFICATE_CACHE_MILLIS = TimeUnit.MINUTES.toMillis(1);
@@ -99,9 +108,12 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel>
if (tlsEnabledWithKeyStore) {
ch.pipeline().addLast(TLS_HANDLER,
new SslHandler(nettySSLContextAutoRefreshBuilder.get().createSSLEngine()));
- } else {
- ch.pipeline().addLast(TLS_HANDLER, sslContextSupplier.get().newHandler(ch.alloc()));
- }
+ } else {
+ SslHandler handler = StringUtils.isNotBlank(sniHostName)
+ ? sslContextSupplier.get().newHandler(ch.alloc(), sniHostName, sniHostPort)
+ : sslContextSupplier.get().newHandler(ch.alloc());
+ ch.pipeline().addLast(TLS_HANDLER, handler);
+ }
ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.COPYING_ENCODER);
} else {
ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
index ac08553..7bedc01 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -26,6 +26,7 @@ import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.ProxyProtocol;
import org.apache.pulsar.client.api.ServiceUrlProvider;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
@@ -87,6 +88,10 @@ public class ClientConfigurationData implements Serializable, Cloneable {
private Set<String> tlsCiphers = Sets.newTreeSet();
private Set<String> tlsProtocols = Sets.newTreeSet();
+ /** proxyServiceUrl and proxyProtocol must be mutually inclusive **/
+ private String proxyServiceUrl;
+ private ProxyProtocol proxyProtocol;
+
@JsonIgnore
private Clock clock = Clock.systemDefaultZone();
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterData.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterData.java
index e5b2859..35cecbb 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterData.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterData.java
@@ -26,6 +26,8 @@ import io.swagger.annotations.ApiModelProperty;
import java.util.LinkedHashSet;
import java.util.Objects;
+import org.apache.pulsar.client.api.ProxyProtocol;
+
/**
* The configuration data for a cluster.
*/
@@ -58,6 +60,20 @@ public class ClusterData {
example = "pulsar+ssl://pulsar.example.com:6651"
)
private String brokerServiceUrlTls;
+ @ApiModelProperty(
+ name = "proxyServiceUrl",
+ value = "Proxy-service url when client would like to connect to broker via proxy.",
+ example = "pulsar+ssl://ats-proxy.example.com:4443 or "
+ + "pulsar://ats-proxy.example.com:4080"
+ )
+ private String proxyServiceUrl;
+ @ApiModelProperty(
+ name = "proxyProtocol",
+ value = "protocol to decide type of proxy routing eg: SNI-routing",
+ example = "SNI"
+ )
+ private ProxyProtocol proxyProtocol;
+
// For given Cluster1(us-west1, us-east1) and Cluster2(us-west2, us-east2)
// Peer: [us-west1 -> us-west2] and [us-east1 -> us-east2]
@ApiModelProperty(
@@ -85,6 +101,16 @@ public class ClusterData {
this.brokerServiceUrlTls = brokerServiceUrlTls;
}
+ public ClusterData(String serviceUrl, String serviceUrlTls, String brokerServiceUrl, String brokerServiceUrlTls,
+ String proxyServiceUrl, ProxyProtocol proxyProtocol) {
+ this.serviceUrl = serviceUrl;
+ this.serviceUrlTls = serviceUrlTls;
+ this.brokerServiceUrl = brokerServiceUrl;
+ this.brokerServiceUrlTls = brokerServiceUrlTls;
+ this.proxyServiceUrl = proxyServiceUrl;
+ this.proxyProtocol = proxyProtocol;
+ }
+
public void update(ClusterData other) {
checkNotNull(other);
this.serviceUrl = other.serviceUrl;
@@ -125,6 +151,22 @@ public class ClusterData {
this.brokerServiceUrlTls = brokerServiceUrlTls;
}
+ public String getProxyServiceUrl() {
+ return proxyServiceUrl;
+ }
+
+ public void setProxyServiceUrl(String proxyServiceUrl) {
+ this.proxyServiceUrl = proxyServiceUrl;
+ }
+
+ public ProxyProtocol getProxyProtocol() {
+ return proxyProtocol;
+ }
+
+ public void setProxyProtocol(ProxyProtocol proxyProtocol) {
+ this.proxyProtocol = proxyProtocol;
+ }
+
public LinkedHashSet<String> getPeerClusterNames() {
return peerClusterNames;
}
@@ -139,7 +181,9 @@ public class ClusterData {
ClusterData other = (ClusterData) obj;
return Objects.equals(serviceUrl, other.serviceUrl) && Objects.equals(serviceUrlTls, other.serviceUrlTls)
&& Objects.equals(brokerServiceUrl, other.brokerServiceUrl)
- && Objects.equals(brokerServiceUrlTls, other.brokerServiceUrlTls);
+ && Objects.equals(brokerServiceUrlTls, other.brokerServiceUrlTls)
+ && Objects.equals(proxyServiceUrl, other.proxyServiceUrl)
+ && Objects.equals(proxyProtocol, other.proxyProtocol);
}
return false;
@@ -154,6 +198,7 @@ public class ClusterData {
public String toString() {
return MoreObjects.toStringHelper(this).add("serviceUrl", serviceUrl).add("serviceUrlTls", serviceUrlTls)
.add("brokerServiceUrl", brokerServiceUrl).add("brokerServiceUrlTls", brokerServiceUrlTls)
+ .add("proxyServiceUrl", proxyServiceUrl).add("proxyProtocol", proxyProtocol)
.add("peerClusterNames", peerClusterNames).toString();
}