You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/06/04 02:10:55 UTC

[pulsar] branch master updated: [PIP-60] [Proxy-Server] Support SNI routing to support various proxy-server in pulsar (#6566)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new fb4a627  [PIP-60] [Proxy-Server] Support SNI routing to support various proxy-server in pulsar (#6566)
fb4a627 is described below

commit fb4a627151c939a4af984c231b5891dde8c10cc6
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Wed Jun 3 19:10:45 2020 -0700

    [PIP-60] [Proxy-Server] Support SNI routing to support various proxy-server in pulsar (#6566)
    
    ### Motivation
    Implementation of [PIP-60](https://github.com/apache/pulsar/wiki/PIP-60:-Support-Proxy-server-with-SNI-routing)
    A proxy server is a go‑between or intermediary server that forwards requests from multiple clients to different servers across the Internet. The proxy server can act as a “traffic cop,” in both forward and reverse proxy scenarios, and adds various benefits in your system such as load balancing, performance, security, auto-scaling, etc.. There are already many proxy servers already available in the market which are fast, scalable and more importantly covers various essential security a [...]
    [Netty supports sending SNI header on TLS handshake](https://github.com/netty/netty/issues/3801#issuecomment-104274440) and this PR uses that Netty feature to send SNI header while connecting to proxy.
    
    ### Modification
    https://github.com/apache/pulsar/wiki/PIP-60:-Support-Proxy-server-with-SNI-routing#changes
    
    **Note:** we have fully tested this changes with ATS proxy for both forward and reverse proxy scenarios. And I have also shared e2e example in PIP to use ATS proxy for client and broker integration.
---
 .../pulsar/broker/service/BrokerService.java       |   7 +-
 .../pulsar/client/api/ProxyProtocolTest.java       | 109 +++++++++++++++++++++
 .../apache/pulsar/client/api/ClientBuilder.java    |  10 ++
 .../apache/pulsar/client/api/ProxyProtocol.java    |  31 ++++++
 .../pulsar/client/api/PulsarClientException.java   |  51 +++++++++-
 .../org/apache/pulsar/admin/cli/CmdClusters.java   |  10 +-
 .../apache/pulsar/client/cli/PulsarClientTool.java |  14 +++
 .../pulsar/client/impl/ClientBuilderImpl.java      |  11 +++
 .../apache/pulsar/client/impl/ConnectionPool.java  |  55 +++++++++--
 .../client/impl/PulsarChannelInitializer.java      |  18 +++-
 .../client/impl/conf/ClientConfigurationData.java  |   5 +
 .../pulsar/common/policies/data/ClusterData.java   |  47 ++++++++-
 12 files changed, 351 insertions(+), 17 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 54facc4..0790e73 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -85,6 +85,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoun
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.util.Futures;
 import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.PulsarServerException;
@@ -861,7 +862,11 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
                     clientBuilder.serviceUrl(
                             isNotBlank(data.getBrokerServiceUrl()) ? data.getBrokerServiceUrl() : data.getServiceUrl());
                 }
-
+                if (data.getProxyProtocol() != null && StringUtils.isNotBlank(data.getProxyServiceUrl())) {
+                    clientBuilder.proxyServiceUrl(data.getProxyServiceUrl(), data.getProxyProtocol());
+                    log.info("Configuring proxy-url {} with protocol {}", data.getProxyServiceUrl(),
+                            data.getProxyProtocol());
+                }
                 // Share all the IO threads across broker and client connections
                 ClientConfigurationData conf = ((ClientBuilderImpl) clientBuilder).getClientConfigurationData();
                 return new PulsarClientImpl(conf, workerGroup);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProxyProtocolTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProxyProtocolTest.java
new file mode 100644
index 0000000..964ebd9
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProxyProtocolTest.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.Test;
+
+import lombok.Cleanup;
+
+public class ProxyProtocolTest extends TlsProducerConsumerBase {
+    private static final Logger log = LoggerFactory.getLogger(ProxyProtocolTest.class);
+
+    @Test
+    public void testSniProxyProtocol() throws Exception {
+
+        // Client should try to connect to proxy and pass broker-url as SNI header
+        String proxyUrl = pulsar.getBrokerServiceUrlTls();
+        String brokerServiceUrl = "pulsar+ssl://1.1.1.1:6651";
+        String topicName = "persistent://my-property/use/my-ns/my-topic1";
+
+        ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(brokerServiceUrl)
+                .tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH).enableTls(true).allowTlsInsecureConnection(false)
+                .proxyServiceUrl(proxyUrl, ProxyProtocol.SNI).operationTimeout(1000, TimeUnit.MILLISECONDS);
+        Map<String, String> authParams = new HashMap<>();
+        authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
+        authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
+        clientBuilder.authentication(AuthenticationTls.class.getName(), authParams);
+
+        @Cleanup
+        PulsarClient pulsarClient = clientBuilder.build();
+
+        // should be able to create producer successfully
+        pulsarClient.newProducer().topic(topicName).create();
+    }
+
+    @Test
+    public void testSniProxyProtocolWithInvalidProxyUrl() throws Exception {
+
+        // Client should try to connect to proxy and pass broker-url as SNI header
+        String brokerServiceUrl = "pulsar+ssl://1.1.1.1:6651";
+        String proxyHost = "invalid-url";
+        String proxyUrl = "pulsar+ssl://" + proxyHost + ":5555";
+        String topicName = "persistent://my-property/use/my-ns/my-topic1";
+
+        ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(brokerServiceUrl)
+                .tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH).enableTls(true).allowTlsInsecureConnection(false)
+                .proxyServiceUrl(proxyUrl, ProxyProtocol.SNI).operationTimeout(1000, TimeUnit.MILLISECONDS);
+        Map<String, String> authParams = new HashMap<>();
+        authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
+        authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
+        clientBuilder.authentication(AuthenticationTls.class.getName(), authParams);
+
+        @Cleanup
+        PulsarClient pulsarClient = clientBuilder.build();
+
+        try {
+            pulsarClient.newProducer().topic(topicName).create();
+            fail("should have failed due to invalid url");
+        } catch (PulsarClientException e) {
+            assertTrue(e.getMessage().contains(proxyHost));
+        }
+    }
+
+    @Test
+    public void testSniProxyProtocolWithoutTls() throws Exception {
+        // Client should try to connect to proxy and pass broker-url as SNI header
+        String proxyUrl = pulsar.getBrokerServiceUrl();
+        String brokerServiceUrl = "pulsar+ssl://1.1.1.1:6651";
+        String topicName = "persistent://my-property/use/my-ns/my-topic1";
+
+        ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(brokerServiceUrl)
+                .proxyServiceUrl(proxyUrl, ProxyProtocol.SNI).operationTimeout(1000, TimeUnit.MILLISECONDS);
+
+        @Cleanup
+        PulsarClient pulsarClient = clientBuilder.build();
+
+        try {
+            pulsarClient.newProducer().topic(topicName).create();
+            fail("should have failed due to non-tls url");
+        } catch (PulsarClientException e) {
+            // Ok
+        }
+    }
+}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
index b3aee5b..865855c 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
@@ -468,4 +468,14 @@ public interface ClientBuilder extends Cloneable {
      * @return the client builder instance
      */
     ClientBuilder clock(Clock clock);
+
+    /**
+     * Proxy-service url when client would like to connect to broker via proxy. Client can choose type of proxy-routing
+     * using {@link ProxyProtocol}.
+     *
+     * @param proxyServiceUrl proxy service url
+     * @param proxyProtocol   protocol to decide type of proxy routing eg: SNI-routing
+     * @return
+     */
+    ClientBuilder proxyServiceUrl(String proxyServiceUrl, ProxyProtocol proxyProtocol);
 }
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProxyProtocol.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProxyProtocol.java
new file mode 100644
index 0000000..470e475
--- /dev/null
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProxyProtocol.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+/**
+ * Protcol type to determine type of proxy routing when client connects to proxy using
+ * {@link ClientBuilder::proxyServiceUrl}.
+ */
+public enum ProxyProtocol {
+    /**
+     * Follows SNI-routing
+     * https://docs.trafficserver.apache.org/en/latest/admin-guide/layer-4-routing.en.html#sni-routing.
+     **/
+    SNI
+}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
index 16af009..b6e327c 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
@@ -68,12 +68,25 @@ public class PulsarClientException extends IOException {
     /**
      * Constructs an {@code PulsarClientException} with the specified cause.
      *
+     * @param msg
+     *            The detail message (which is saved for later retrieval by the {@link #getMessage()} method)
+     *
      * @param t
-     *        The cause (which is saved for later retrieval by the
-     *        {@link #getCause()} method).  (A null value is permitted,
-     *        and indicates that the cause is nonexistent or unknown.)
+     *            The cause (which is saved for later retrieval by the {@link #getCause()} method). (A null value is
+     *            permitted, and indicates that the cause is nonexistent or unknown.)
+     */
+    public PulsarClientException(String msg, Throwable t) {
+        super(msg, t);
+    }
+
+    /**
+     * Constructs an {@code PulsarClientException} with the specified cause.
+     *
+     * @param t
+     *            The cause (which is saved for later retrieval by the {@link #getCause()} method). (A null value is
+     *            permitted, and indicates that the cause is nonexistent or unknown.)
      * @param sequenceId
-     *        The sequenceId of the message
+     *            The sequenceId of the message
      */
     public PulsarClientException(Throwable t, long sequenceId) {
         super(t);
@@ -95,6 +108,21 @@ public class PulsarClientException extends IOException {
         public InvalidServiceURL(Throwable t) {
             super(t);
         }
+
+        /**
+         * Constructs an {@code InvalidServiceURL} with the specified cause.
+         *
+         *@param msg
+         *        The detail message (which is saved for later retrieval
+         *        by the {@link #getMessage()} method)
+         * @param t
+         *        The cause (which is saved for later retrieval by the
+         *        {@link #getCause()} method).  (A null value is permitted,
+         *        and indicates that the cause is nonexistent or unknown.)
+         */
+        public InvalidServiceURL(String msg, Throwable t) {
+            super(msg, t);
+        }
     }
 
     /**
@@ -123,6 +151,21 @@ public class PulsarClientException extends IOException {
         public InvalidConfigurationException(Throwable t) {
             super(t);
         }
+
+        /**
+         * Constructs an {@code InvalidConfigurationException} with the specified cause.
+         *
+         *@param msg
+         *        The detail message (which is saved for later retrieval
+         *        by the {@link #getMessage()} method)
+         * @param t
+         *        The cause (which is saved for later retrieval by the
+         *        {@link #getCause()} method).  (A null value is permitted,
+         *        and indicates that the cause is nonexistent or unknown.)
+         */
+        public InvalidConfigurationException(String msg, Throwable t) {
+            super(msg, t);
+        }
     }
 
     /**
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java
index 88f3b3f..75f80ee 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java
@@ -24,6 +24,7 @@ import org.apache.commons.lang3.StringUtils;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.ProxyProtocol;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.FailureDomain;
 
@@ -69,10 +70,17 @@ public class CmdClusters extends CmdBase {
         @Parameter(names = "--broker-url-secure", description = "broker-service-url for secure connection", required = false)
         private String brokerServiceUrlTls;
 
+        @Parameter(names = "--proxy-url", description = "Proxy-service url when client would like to connect to broker via proxy.", required = false)
+        private String proxyServiceUrl;
+
+        @Parameter(names = "--proxy-protocol", description = "protocol to decide type of proxy routing eg: SNI", required = false)
+        private ProxyProtocol proxyProtocol;
+
         void run() throws PulsarAdminException {
             String cluster = getOneArgument(params);
             admin.clusters().createCluster(cluster,
-                    new ClusterData(serviceUrl, serviceUrlTls, brokerServiceUrl, brokerServiceUrlTls));
+                    new ClusterData(serviceUrl, serviceUrlTls, brokerServiceUrl, brokerServiceUrlTls, proxyServiceUrl,
+                            proxyProtocol));
         }
     }
 
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java
index b86bc79..0ba7227 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java
@@ -29,6 +29,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.ProxyProtocol;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
 
@@ -43,6 +44,12 @@ public class PulsarClientTool {
     @Parameter(names = { "--url" }, description = "Broker URL to which to connect.")
     String serviceURL = null;
 
+    @Parameter(names = { "--proxy-url" }, description = "Proxy-server URL to which to connect.")
+    String proxyServiceURL = null;
+
+    @Parameter(names = { "--proxy-protocol" }, description = "Proxy protocol to select type of routing at proxy.")
+    ProxyProtocol proxyProtocol = null;
+
     @Parameter(names = { "--auth-plugin" }, description = "Authentication plugin class name.")
     String authPluginClassName = null;
 
@@ -117,6 +124,13 @@ public class PulsarClientTool {
                 .tlsTrustStorePath(tlsTrustStorePath)
                 .tlsTrustStorePassword(tlsTrustStorePassword);
 
+        if (StringUtils.isNotBlank(proxyServiceURL)) {
+            if (proxyProtocol == null) {
+                System.out.println("proxy-protocol must be provided with proxy-url");
+                System.exit(-1);
+            }
+            clientBuilder.proxyServiceUrl(proxyServiceURL, proxyProtocol);
+        }
         this.produceCommand.updateConfig(clientBuilder, authentication, this.serviceURL);
         this.consumeCommand.updateConfig(clientBuilder, authentication, this.serviceURL);
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
index 2c920e2..6b820b9 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
@@ -27,6 +27,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.ProxyProtocol;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
@@ -294,4 +295,14 @@ public class ClientBuilderImpl implements ClientBuilder {
         conf.setClock(clock);
         return this;
     }
+
+    @Override
+    public ClientBuilder proxyServiceUrl(String proxyServiceUrl, ProxyProtocol proxyProtocol) {
+        if (StringUtils.isNotBlank(proxyServiceUrl) && proxyProtocol == null) {
+            throw new IllegalArgumentException("proxyProtocol must be present with proxyServiceUrl");
+        }
+        conf.setProxyServiceUrl(proxyServiceUrl);
+        conf.setProxyProtocol(proxyProtocol);
+        return this;
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index 30301bb..fff9858 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.impl;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
 
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.Channel;
@@ -34,6 +35,9 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Random;
@@ -43,9 +47,12 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,6 +61,8 @@ public class ConnectionPool implements Closeable {
     protected final ConcurrentHashMap<InetSocketAddress, ConcurrentMap<Integer, CompletableFuture<ClientCnx>>> pool;
 
     private final Bootstrap bootstrap;
+    private final PulsarChannelInitializer channelInitializerHandler;
+    private final ClientConfigurationData clientConfig;
     private final EventLoopGroup eventLoopGroup;
     private final int maxConnectionsPerHosts;
 
@@ -66,6 +75,7 @@ public class ConnectionPool implements Closeable {
     public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup,
             Supplier<ClientCnx> clientCnxSupplier) throws PulsarClientException {
         this.eventLoopGroup = eventLoopGroup;
+        this.clientConfig = conf;
         this.maxConnectionsPerHosts = conf.getConnectionsPerBroker();
 
         pool = new ConcurrentHashMap<>();
@@ -78,7 +88,8 @@ public class ConnectionPool implements Closeable {
         bootstrap.option(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT);
 
         try {
-            bootstrap.handler(new PulsarChannelInitializer(conf, clientCnxSupplier));
+            channelInitializerHandler = new PulsarChannelInitializer(conf, clientCnxSupplier);
+            bootstrap.handler(channelInitializerHandler);
         } catch (Exception e) {
             log.error("Failed to create channel initializer");
             throw new PulsarClientException(e);
@@ -214,10 +225,17 @@ public class ConnectionPool implements Closeable {
     private CompletableFuture<Channel> createConnection(InetSocketAddress unresolvedAddress) {
         String hostname = unresolvedAddress.getHostString();
         int port = unresolvedAddress.getPort();
-
-        // Resolve DNS --> Attempt to connect to all IP addresses until once succeeds
-        return resolveName(hostname)
-                .thenCompose(inetAddresses -> connectToResolvedAddresses(inetAddresses.iterator(), port));
+        try {
+            // For non-sni-proxy: Resolve DNS --> Attempt to connect to all IP addresses until once succeeds
+            CompletableFuture<List<InetAddress>> resolvedAddress = isSniProxy()
+                    ? CompletableFuture.completedFuture(Lists.newArrayList(InetAddress.getByName(hostname)))
+                    : resolveName(hostname);
+            return resolvedAddress
+                    .thenCompose(inetAddresses -> connectToResolvedAddresses(inetAddresses.iterator(), port));
+        } catch (UnknownHostException e) {
+            log.error("Invalid remote url {}", hostname, e);
+            return FutureUtil.failedFuture(new InvalidServiceURL("Invalid url " + hostname, e));
+        }
     }
 
     /**
@@ -227,7 +245,7 @@ public class ConnectionPool implements Closeable {
     private CompletableFuture<Channel> connectToResolvedAddresses(Iterator<InetAddress> unresolvedAddresses, int port) {
         CompletableFuture<Channel> future = new CompletableFuture<>();
 
-        connectToAddress(unresolvedAddresses.next(), port).thenAccept(channel -> {
+        connectToAddress(unresolvedAddresses.next(), port, false).thenAccept(channel -> {
             // Successfully connected to server
             future.complete(channel);
         }).exceptionally(exception -> {
@@ -266,9 +284,27 @@ public class ConnectionPool implements Closeable {
     /**
      * Attempt to establish a TCP connection to an already resolved single IP address
      */
-    private CompletableFuture<Channel> connectToAddress(InetAddress ipAddress, int port) {
+    private CompletableFuture<Channel> connectToAddress(InetAddress ipAddress, int port, boolean ignoreProxyUrl) {
         CompletableFuture<Channel> future = new CompletableFuture<>();
 
+        if (!ignoreProxyUrl && isSniProxy()) {
+            // client wants to connect to proxy and wants to pass 
+            // target connection host in sni header
+            channelInitializerHandler.setSniHostName(ipAddress.getHostName());
+            channelInitializerHandler.setSniHostPort(port);
+            // connect to proxy host
+            try {
+                URI proxyURI = new URI(clientConfig.getProxyServiceUrl());
+                // resolve proxy host-address and try to connect again by passing flag ignoreProxyUrl because proxy-host
+                // will be already resolved
+                return resolveName(proxyURI.getHost())
+                        .thenCompose(inetAddresses -> connectToAddress(inetAddresses.iterator().next(), proxyURI.getPort(), true));
+            } catch (URISyntaxException e) {
+                log.error("Failed to parse proxy-service url {}", clientConfig.getProxyServiceUrl(), e);
+                future.completeExceptionally(e);
+                return future;
+            }
+        }
         bootstrap.connect(ipAddress, port).addListener((ChannelFuture channelFuture) -> {
             if (channelFuture.isSuccess()) {
                 future.complete(channelFuture.channel());
@@ -307,5 +343,10 @@ public class ConnectionPool implements Closeable {
         return mod;
     }
 
+    private boolean isSniProxy() {
+        return channelInitializerHandler.isTlsEnabled() && clientConfig.getProxyProtocol() != null
+                && StringUtils.isNotBlank(clientConfig.getProxyServiceUrl());
+    }
+
     private static final Logger log = LoggerFactory.getLogger(ConnectionPool.class);
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
index 4a145e7..e418904 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
@@ -27,6 +27,10 @@ import java.security.cert.X509Certificate;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
+import lombok.Getter;
+import lombok.Setter;
+
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.util.ObjectCache;
@@ -41,11 +45,16 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel>
     public static final String TLS_HANDLER = "tls";
 
     private final Supplier<ClientCnx> clientCnxSupplier;
+    @Getter
     private final boolean tlsEnabled;
     private final boolean tlsEnabledWithKeyStore;
 
     private final Supplier<SslContext> sslContextSupplier;
     private NettySSLContextAutoRefreshBuilder nettySSLContextAutoRefreshBuilder;
+    @Setter
+    private String sniHostName;
+    @Setter
+    private int sniHostPort;
 
     private static final long TLS_CERTIFICATE_CACHE_MILLIS = TimeUnit.MINUTES.toMillis(1);
 
@@ -99,9 +108,12 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel>
             if (tlsEnabledWithKeyStore) {
                 ch.pipeline().addLast(TLS_HANDLER,
                         new SslHandler(nettySSLContextAutoRefreshBuilder.get().createSSLEngine()));
-            } else {
-                ch.pipeline().addLast(TLS_HANDLER, sslContextSupplier.get().newHandler(ch.alloc()));
-            }
+			} else {
+				SslHandler handler = StringUtils.isNotBlank(sniHostName)
+						? sslContextSupplier.get().newHandler(ch.alloc(), sniHostName, sniHostPort)
+						: sslContextSupplier.get().newHandler(ch.alloc());
+				ch.pipeline().addLast(TLS_HANDLER, handler);
+			}
             ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.COPYING_ENCODER);
         } else {
             ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
index ac08553..7bedc01 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -26,6 +26,7 @@ import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.ProxyProtocol;
 import org.apache.pulsar.client.api.ServiceUrlProvider;
 import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
 
@@ -87,6 +88,10 @@ public class ClientConfigurationData implements Serializable, Cloneable {
     private Set<String> tlsCiphers = Sets.newTreeSet();
     private Set<String> tlsProtocols = Sets.newTreeSet();
 
+    /** proxyServiceUrl and proxyProtocol must be mutually inclusive **/
+    private String proxyServiceUrl;
+    private ProxyProtocol proxyProtocol;
+    
     @JsonIgnore
     private Clock clock = Clock.systemDefaultZone();
 
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterData.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterData.java
index e5b2859..35cecbb 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterData.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterData.java
@@ -26,6 +26,8 @@ import io.swagger.annotations.ApiModelProperty;
 import java.util.LinkedHashSet;
 import java.util.Objects;
 
+import org.apache.pulsar.client.api.ProxyProtocol;
+
 /**
  * The configuration data for a cluster.
  */
@@ -58,6 +60,20 @@ public class ClusterData {
         example = "pulsar+ssl://pulsar.example.com:6651"
     )
     private String brokerServiceUrlTls;
+    @ApiModelProperty(
+        name = "proxyServiceUrl",
+        value = "Proxy-service url when client would like to connect to broker via proxy.",
+        example = "pulsar+ssl://ats-proxy.example.com:4443 or "
+                    + "pulsar://ats-proxy.example.com:4080"
+    )
+    private String proxyServiceUrl;
+    @ApiModelProperty(
+        name = "proxyProtocol",
+        value = "protocol to decide type of proxy routing eg: SNI-routing",
+        example = "SNI"
+    )
+    private ProxyProtocol proxyProtocol;
+
     // For given Cluster1(us-west1, us-east1) and Cluster2(us-west2, us-east2)
     // Peer: [us-west1 -> us-west2] and [us-east1 -> us-east2]
     @ApiModelProperty(
@@ -85,6 +101,16 @@ public class ClusterData {
         this.brokerServiceUrlTls = brokerServiceUrlTls;
     }
 
+    public ClusterData(String serviceUrl, String serviceUrlTls, String brokerServiceUrl, String brokerServiceUrlTls,
+            String proxyServiceUrl, ProxyProtocol proxyProtocol) {
+        this.serviceUrl = serviceUrl;
+        this.serviceUrlTls = serviceUrlTls;
+        this.brokerServiceUrl = brokerServiceUrl;
+        this.brokerServiceUrlTls = brokerServiceUrlTls;
+        this.proxyServiceUrl = proxyServiceUrl;
+        this.proxyProtocol = proxyProtocol;
+    }
+
     public void update(ClusterData other) {
         checkNotNull(other);
         this.serviceUrl = other.serviceUrl;
@@ -125,6 +151,22 @@ public class ClusterData {
         this.brokerServiceUrlTls = brokerServiceUrlTls;
     }
 
+    public String getProxyServiceUrl() {
+        return proxyServiceUrl;
+    }
+
+    public void setProxyServiceUrl(String proxyServiceUrl) {
+        this.proxyServiceUrl = proxyServiceUrl;
+    }
+
+    public ProxyProtocol getProxyProtocol() {
+        return proxyProtocol;
+    }
+
+    public void setProxyProtocol(ProxyProtocol proxyProtocol) {
+        this.proxyProtocol = proxyProtocol;
+    }
+
     public LinkedHashSet<String> getPeerClusterNames() {
         return peerClusterNames;
     }
@@ -139,7 +181,9 @@ public class ClusterData {
             ClusterData other = (ClusterData) obj;
             return Objects.equals(serviceUrl, other.serviceUrl) && Objects.equals(serviceUrlTls, other.serviceUrlTls)
                     && Objects.equals(brokerServiceUrl, other.brokerServiceUrl)
-                    && Objects.equals(brokerServiceUrlTls, other.brokerServiceUrlTls);
+                    && Objects.equals(brokerServiceUrlTls, other.brokerServiceUrlTls)
+                    && Objects.equals(proxyServiceUrl, other.proxyServiceUrl)
+                    && Objects.equals(proxyProtocol, other.proxyProtocol);
         }
 
         return false;
@@ -154,6 +198,7 @@ public class ClusterData {
     public String toString() {
         return MoreObjects.toStringHelper(this).add("serviceUrl", serviceUrl).add("serviceUrlTls", serviceUrlTls)
                 .add("brokerServiceUrl", brokerServiceUrl).add("brokerServiceUrlTls", brokerServiceUrlTls)
+                .add("proxyServiceUrl", proxyServiceUrl).add("proxyProtocol", proxyProtocol)
                 .add("peerClusterNames", peerClusterNames).toString();
     }