You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/02/10 01:43:29 UTC

[GitHub] rdhabalia closed pull request #1208: Add hostname-verification at client tls connection

rdhabalia closed pull request #1208: Add hostname-verification at client tls connection
URL: https://github.com/apache/incubator-pulsar/pull/1208
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/all/src/assemble/LICENSE.bin.txt b/all/src/assemble/LICENSE.bin.txt
index 41f9000d6..a7e70defb 100644
--- a/all/src/assemble/LICENSE.bin.txt
+++ b/all/src/assemble/LICENSE.bin.txt
@@ -332,6 +332,8 @@ The Apache Software License, Version 2.0
  * Jetty - org.eclipse.jetty-*.jar
  * SnakeYaml -- org.yaml-snakeyaml-*.jar
  * RocksDB - org.rocksdb.*.jar
+ * HttpClient - org.apache.httpcomponents.httpclient.jar
+ * CommonsLogging - commons-logging-*.jar
 
 BSD 3-clause "New" or "Revised" License
  * EA Agent Loader -- com.ea.agentloader-*.jar -- licenses/LICENSE-EA-Agent-Loader.txt
diff --git a/pom.xml b/pom.xml
index 27ff691fb..7320661d5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -138,6 +138,18 @@ flexible messaging model and an intuitive client API.</description>
         </exclusions>
       </dependency>
 
+      <dependency>
+        <groupId>org.apache.httpcomponents</groupId>
+        <artifactId>httpclient</artifactId>
+        <version>4.5.5</version>
+        <exclusions>
+          <exclusion>
+            <groupId>*</groupId>
+            <artifactId>*</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+      
       <dependency>
         <groupId>org.testng</groupId>
         <artifactId>testng</artifactId>
@@ -760,6 +772,7 @@ flexible messaging model and an intuitive client API.</description>
             <exclude>**/*.crt</exclude>
             <exclude>**/*.key</exclude>
             <exclude>**/*.csr</exclude>
+            <exclude>**/*.pem</exclude>
             <exclude>**/*.json</exclude>
             <exclude>**/*.htpasswd</exclude>
             <exclude>src/test/resources/athenz.conf.test</exclude>
diff --git a/pulsar-broker-shaded/pom.xml b/pulsar-broker-shaded/pom.xml
index fd3ff68de..bda3037eb 100644
--- a/pulsar-broker-shaded/pom.xml
+++ b/pulsar-broker-shaded/pom.xml
@@ -104,6 +104,8 @@
                   <include>org.aspectj:*</include>
                   <include>com.ea.agentloader:*</include>
                   <include>com.wordnik:swagger-annotations</include>
+                  <include>org.apache.httpcomponents:httpclient</include>
+                  <include>commons-logging:commons-logging</include>
                 </includes>
               </artifactSet>
               <filters>
@@ -298,6 +300,10 @@
                   <pattern>com.wordnik</pattern>
                   <shadedPattern>org.apache.pulsar.shade.com.worknik</shadedPattern>
                 </relocation>
+                <relocation>
+                  <pattern>org.apache.http</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.org.apache.http</shadedPattern>
+                </relocation>
               </relocations>
             </configuration>
           </execution>
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
index cd0415aab..31387696a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
@@ -19,8 +19,11 @@
 package org.apache.pulsar.broker.service;
 
 import java.io.File;
+import java.security.cert.X509Certificate;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.impl.auth.AuthenticationDataTls;
 import org.apache.pulsar.common.api.ByteBufPair;
 import org.apache.pulsar.common.api.PulsarDecoder;
 
@@ -68,6 +71,17 @@ protected void initChannel(SocketChannel ch) throws Exception {
                     builder.trustManager(trustCertCollection);
                 }
             }
+            
+            ServiceConfiguration config = brokerService.pulsar().getConfiguration();
+            String certFilePath = config.getTlsCertificateFilePath();
+            String keyFilePath = config.getTlsKeyFilePath();
+            if (StringUtils.isNotBlank(certFilePath) && StringUtils.isNotBlank(keyFilePath)) {
+                AuthenticationDataTls authTlsData = new AuthenticationDataTls(certFilePath, keyFilePath);
+                builder.keyManager(authTlsData.getTlsPrivateKey(),
+                        (X509Certificate[]) authTlsData.getTlsCertificates());
+            }
+            
+            
             SslContext sslCtx = builder.clientAuth(ClientAuth.OPTIONAL).build();
             ch.pipeline().addLast(TLS_HANDLER, sslCtx.newHandler(ch.alloc()));
         }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java
new file mode 100644
index 000000000..5ccfc142b
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static org.mockito.Mockito.spy;
+
+import java.lang.reflect.Method;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.http.conn.ssl.DefaultHostnameVerifier;
+import org.apache.http.conn.util.PublicSuffixMatcher;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderBasic;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+import org.apache.pulsar.common.policies.data.PropertyAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class AuthenticationTlsHostnameVerificationTest extends ProducerConsumerBase {
+    private static final Logger log = LoggerFactory.getLogger(AuthenticationTlsHostnameVerificationTest.class);
+
+    // Man in middle certificate which tries to act as a broker by sending its own valid certificate
+    private final String TLS_MIM_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/hn-verification/cacert.pem";
+    private final String TLS_MIM_SERVER_CERT_FILE_PATH = "./src/test/resources/authentication/tls/hn-verification/broker-cert.pem";
+    private final String TLS_MIM_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/hn-verification/broker-key.pem";
+
+    private final String TLS_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem";
+    private final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/authentication/tls/broker-cert.pem";
+    private final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/broker-key.pem";
+
+    private final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/client-cert.pem";
+    private final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/client-key.pem";
+
+    private final String BASIC_CONF_FILE_PATH = "./src/test/resources/authentication/basic/.htpasswd";
+
+    private final static String brokerHostName = "localhost";
+    private boolean hostnameVerificationEnabled = true;
+
+    protected void setup() throws Exception {
+        if (methodName.equals("testAnonymousSyncProducerAndConsumer")) {
+            conf.setAnonymousUserRole("anonymousUser");
+        }
+
+        conf.setAuthenticationEnabled(true);
+        conf.setAuthorizationEnabled(true);
+
+        conf.setTlsEnabled(true);
+        conf.setTlsAllowInsecureConnection(true);
+
+        Set<String> superUserRoles = new HashSet<>();
+        superUserRoles.add("localhost");
+        superUserRoles.add("superUser");
+        superUserRoles.add("superUser2");
+        conf.setSuperUserRoles(superUserRoles);
+
+        conf.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
+        conf.setBrokerClientAuthenticationParameters(
+                "tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_SERVER_KEY_FILE_PATH);
+
+        Set<String> providers = new HashSet<>();
+        providers.add(AuthenticationProviderTls.class.getName());
+        providers.add(AuthenticationProviderBasic.class.getName());
+        System.setProperty("pulsar.auth.basic.conf", BASIC_CONF_FILE_PATH);
+        conf.setAuthenticationProviders(providers);
+
+        conf.setClusterName("use");
+
+        super.init();
+
+        setupClient();
+    }
+
+    protected void setupClient() throws Exception {
+
+        Map<String, String> authParams = new HashMap<>();
+        authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
+        authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
+        Authentication authTls = new AuthenticationTls();
+        authTls.configure(authParams);
+
+        org.apache.pulsar.client.api.ClientConfiguration clientConf = new org.apache.pulsar.client.api.ClientConfiguration();
+        clientConf.setStatsInterval(0, TimeUnit.SECONDS);
+        clientConf.setTlsTrustCertsFilePath(TLS_MIM_TRUST_CERT_FILE_PATH);
+        clientConf.setTlsAllowInsecureConnection(true);
+        clientConf.setAuthentication(authTls);
+        clientConf.setUseTls(true);
+        clientConf.setTlsHostnameVerificationEnable(hostnameVerificationEnabled);
+
+        admin = spy(new PulsarAdmin(brokerUrlTls, clientConf));
+        String lookupUrl;
+        lookupUrl = new URI("pulsar+ssl://" + brokerHostName + ":" + BROKER_PORT_TLS).toString();
+        pulsarClient = PulsarClient.create(lookupUrl, clientConf);
+
+        admin.properties().createProperty("my-property",
+                new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));
+        admin.namespaces().createNamespace("my-property/use/my-ns");
+    }
+
+    @AfterMethod
+    @Override
+    protected void cleanup() throws Exception {
+        if (!methodName.equals("testDefaultHostVerifier")) {
+            super.internalCleanup();
+        }
+    }
+
+    @DataProvider(name = "hostnameVerification")
+    public Object[][] codecProvider() {
+        return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
+    }
+
+    /**
+     * It verifies that client performs host-verification in order to create producer/consumer.
+     * 
+     * <pre>
+     * 1. Client tries to connect to broker with hostname="localhost"
+     * 2. Broker sends x509 certificates with CN = "pulsar"
+     * 3. Client verifies the host-name and closes the connection and fails consumer creation
+     * </pre>
+     * 
+     * @throws Exception
+     */
+    @Test(dataProvider = "hostnameVerification")
+    public void testTlsSyncProducerAndConsumerWithInvalidBrokerHost(boolean hostnameVerificationEnabled)
+            throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        this.hostnameVerificationEnabled = hostnameVerificationEnabled;
+        // setup broker cert which has CN = "pulsar" different than broker's hostname="localhost"
+        conf.setTlsTrustCertsFilePath(TLS_MIM_TRUST_CERT_FILE_PATH);
+        conf.setTlsCertificateFilePath(TLS_MIM_SERVER_CERT_FILE_PATH);
+        conf.setTlsKeyFilePath(TLS_MIM_SERVER_KEY_FILE_PATH);
+        conf.setBrokerClientAuthenticationParameters(
+                "tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_MIM_SERVER_KEY_FILE_PATH);
+
+        setup();
+
+        ConsumerConfiguration conf = new ConsumerConfiguration();
+        conf.setSubscriptionType(SubscriptionType.Exclusive);
+        try {
+            Consumer consumer = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic",
+                    "my-subscriber-name", conf);
+            if (hostnameVerificationEnabled) {
+                Assert.fail("Connection should be failed due to hostnameVerification enabled");
+            }
+        } catch (PulsarClientException e) {
+            if (!hostnameVerificationEnabled) {
+                Assert.fail("Consumer should be created because hostnameverification is disabled");
+            }
+        }
+
+        log.info("-- Exiting {} test --", methodName);
+    }
+
+    /**
+     * It verifies that client performs host-verification in order to create producer/consumer.
+     * 
+     * <pre>
+     * 1. Client tries to connect to broker with hostname="localhost"
+     * 2. Broker sends x509 certificates with CN = "localhost"
+     * 3. Client verifies the host-name and continues
+     * </pre>
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testTlsSyncProducerAndConsumerCorrectBrokerHost() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        // setup broker cert which has CN = "localhost"
+        conf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
+        conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
+        conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
+
+        setup();
+
+        ConsumerConfiguration conf = new ConsumerConfiguration();
+        conf.setSubscriptionType(SubscriptionType.Exclusive);
+        Consumer consumer = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic", "my-subscriber-name",
+                conf);
+
+        ProducerConfiguration producerConf = new ProducerConfiguration();
+
+        Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic", producerConf);
+        for (int i = 0; i < 10; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+
+        Message msg = null;
+        Set<String> messageSet = Sets.newHashSet();
+        for (int i = 0; i < 10; i++) {
+            msg = consumer.receive(5, TimeUnit.SECONDS);
+            String receivedMessage = new String(msg.getData());
+            log.debug("Received message: [{}]", receivedMessage);
+            String expectedMessage = "my-message-" + i;
+            testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
+        }
+        // Acknowledge the consumption of all messages at once
+        consumer.acknowledgeCumulative(msg);
+        consumer.close();
+
+        log.info("-- Exiting {} test --", methodName);
+    }
+
+    /**
+     * This test verifies {@link DefaultHostnameVerifier} behavior and gives fair idea about host matching result
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testDefaultHostVerifier() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+        Method matchIdentityStrict = DefaultHostnameVerifier.class.getDeclaredMethod("matchIdentityStrict",
+                String.class, String.class, PublicSuffixMatcher.class);
+        matchIdentityStrict.setAccessible(true);
+        Assert.assertTrue((boolean) matchIdentityStrict.invoke(null, "pulsar", "pulsar", null));
+        Assert.assertFalse((boolean) matchIdentityStrict.invoke(null, "pulsar.com", "pulsar", null));
+        Assert.assertTrue((boolean) matchIdentityStrict.invoke(null, "pulsar-broker1.com", "pulsar*.com", null));
+        // unmatched remainder: "1-broker." should not contain "."
+        Assert.assertFalse((boolean) matchIdentityStrict.invoke(null, "pulsar-broker1.com", "pulsar*com", null));
+        Assert.assertFalse((boolean) matchIdentityStrict.invoke(null, "pulsar.com", "*", null));
+        log.info("-- Exiting {} test --", methodName);
+    }
+
+}
diff --git a/pulsar-broker/src/test/resources/authentication/tls/hn-verification/broker-cert.pem b/pulsar-broker/src/test/resources/authentication/tls/hn-verification/broker-cert.pem
new file mode 100644
index 000000000..ac9d51be7
--- /dev/null
+++ b/pulsar-broker/src/test/resources/authentication/tls/hn-verification/broker-cert.pem
@@ -0,0 +1,82 @@
+Certificate:
+    Data:
+        Version: 3 (0x2)
+        Serial Number:
+            d8:99:d5:ce:27:f5:be:50
+    Signature Algorithm: sha256WithRSAEncryption
+        Issuer: C=AU, ST=Some-State, O=Internet Widgits Pty Ltd, CN=testCA
+        Validity
+            Not Before: Feb  9 01:11:41 2018 GMT
+            Not After : Feb  9 01:11:41 2019 GMT
+        Subject: C=AU, ST=Some-State, O=Internet Widgits Pty Ltd, CN=pulsar*.apache.com
+        Subject Public Key Info:
+            Public Key Algorithm: rsaEncryption
+                Public-Key: (2048 bit)
+                Modulus:
+                    00:e8:bb:b6:87:37:6b:68:44:c9:d6:01:ba:a5:93:
+                    e4:5f:b1:0e:64:23:a9:7b:bd:c1:a6:a8:b8:b9:2c:
+                    c9:73:57:5a:41:89:db:01:64:30:06:dc:5b:4e:01:
+                    d3:02:73:86:d1:f9:c2:a2:5f:8c:c1:4c:00:bc:b1:
+                    bd:67:18:f6:88:ee:b6:72:be:37:18:2f:5d:c2:a1:
+                    30:20:02:38:2b:5e:a9:50:f2:c4:f7:23:74:ef:ad:
+                    4e:b1:25:f7:49:5e:8d:98:cd:2d:71:88:2c:73:df:
+                    eb:5c:2e:f0:5e:e6:15:1e:82:1e:94:33:15:f5:7b:
+                    65:9e:b2:78:89:7a:7f:b7:c1:6a:a3:a9:34:3c:96:
+                    32:2a:26:1d:67:d1:0a:80:1f:7c:95:34:c6:fb:ea:
+                    11:1c:53:86:81:04:bb:90:45:2b:4f:99:9c:72:f5:
+                    ec:86:4b:2f:7e:c3:65:6c:ac:e0:74:5f:35:4e:ee:
+                    3f:d0:82:2b:20:bb:80:65:3f:fe:78:96:42:19:35:
+                    e1:46:bd:d9:4e:b7:b8:95:5f:25:6b:a6:f2:e3:87:
+                    13:d3:29:11:c5:a2:84:bb:12:81:ea:15:60:2f:16:
+                    7e:f9:86:bc:e3:93:ed:d7:ec:5a:34:ae:4c:cd:00:
+                    40:dc:c6:e7:f6:19:ed:63:7f:8f:d0:dd:c5:11:9d:
+                    95:2d
+                Exponent: 65537 (0x10001)
+        X509v3 extensions:
+            X509v3 Basic Constraints: 
+                CA:FALSE
+            Netscape Comment: 
+                OpenSSL Generated Certificate
+            X509v3 Subject Key Identifier: 
+                06:DC:92:77:64:D3:21:AB:08:F6:E4:0C:9A:47:3F:3A:8B:CB:E8:D8
+            X509v3 Authority Key Identifier: 
+                keyid:62:6F:F8:A2:85:3C:5C:7E:94:CC:3E:89:D6:AC:4F:65:F2:2E:02:39
+
+    Signature Algorithm: sha256WithRSAEncryption
+         70:0b:e4:07:45:98:d3:17:02:2f:44:ec:aa:41:2e:39:57:5e:
+         8a:e0:21:77:59:39:1d:66:c2:10:ea:ae:73:8a:50:94:5e:ad:
+         05:56:aa:8a:2f:87:44:09:cb:50:2c:5a:44:d1:99:fe:ee:5c:
+         82:fb:db:d4:5c:bd:56:dd:e6:37:87:0a:64:2c:85:19:dc:2d:
+         d1:22:00:91:53:5d:4c:f2:1c:4f:61:84:8e:77:e1:cc:9e:f8:
+         16:bb:15:b0:5a:f4:12:c7:b6:3b:28:cf:e3:95:9a:a8:68:ad:
+         02:7e:88:34:88:cd:31:d9:cd:17:8a:ef:5d:d5:40:c7:37:ca:
+         d0:38:35:46:d0:7d:f9:b6:85:f5:ef:9d:f3:05:9c:38:3f:67:
+         df:97:94:a8:81:5d:e3:70:ff:96:28:58:13:37:8a:3f:2a:b9:
+         6a:2a:c6:aa:89:16:91:9a:e7:9c:f3:72:36:74:de:46:7f:4f:
+         26:56:6e:05:47:99:ee:38:26:13:77:16:f5:07:cd:f1:69:6e:
+         08:c8:3b:ef:35:96:b3:b1:8e:87:eb:bd:da:02:b8:40:aa:e8:
+         16:11:80:98:81:77:5a:97:41:58:bd:01:50:4c:6c:c4:14:43:
+         d4:ac:c7:25:8b:df:a4:94:f5:29:12:72:56:8c:25:94:d8:8f:
+         c1:fa:4b:59
+-----BEGIN CERTIFICATE-----
+MIIDtjCCAp6gAwIBAgIJANiZ1c4n9b5QMA0GCSqGSIb3DQEBCwUAMFYxCzAJBgNV
+BAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBX
+aWRnaXRzIFB0eSBMdGQxDzANBgNVBAMMBnRlc3RDQTAeFw0xODAyMDkwMTExNDFa
+Fw0xOTAyMDkwMTExNDFaMGIxCzAJBgNVBAYTAkFVMRMwEQYDVQQIDApTb21lLVN0
+YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQxGzAZBgNVBAMM
+EnB1bHNhciouYXBhY2hlLmNvbTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoC
+ggEBAOi7toc3a2hEydYBuqWT5F+xDmQjqXu9waaouLksyXNXWkGJ2wFkMAbcW04B
+0wJzhtH5wqJfjMFMALyxvWcY9ojutnK+NxgvXcKhMCACOCteqVDyxPcjdO+tTrEl
+90lejZjNLXGILHPf61wu8F7mFR6CHpQzFfV7ZZ6yeIl6f7fBaqOpNDyWMiomHWfR
+CoAffJU0xvvqERxThoEEu5BFK0+ZnHL17IZLL37DZWys4HRfNU7uP9CCKyC7gGU/
+/niWQhk14Ua92U63uJVfJWum8uOHE9MpEcWihLsSgeoVYC8WfvmGvOOT7dfsWjSu
+TM0AQNzG5/YZ7WN/j9DdxRGdlS0CAwEAAaN7MHkwCQYDVR0TBAIwADAsBglghkgB
+hvhCAQ0EHxYdT3BlblNTTCBHZW5lcmF0ZWQgQ2VydGlmaWNhdGUwHQYDVR0OBBYE
+FAbckndk0yGrCPbkDJpHPzqLy+jYMB8GA1UdIwQYMBaAFGJv+KKFPFx+lMw+idas
+T2XyLgI5MA0GCSqGSIb3DQEBCwUAA4IBAQBwC+QHRZjTFwIvROyqQS45V16K4CF3
+WTkdZsIQ6q5zilCUXq0FVqqKL4dECctQLFpE0Zn+7lyC+9vUXL1W3eY3hwpkLIUZ
+3C3RIgCRU11M8hxPYYSOd+HMnvgWuxWwWvQSx7Y7KM/jlZqoaK0Cfog0iM0x2c0X
+iu9d1UDHN8rQODVG0H35toX1753zBZw4P2ffl5SogV3jcP+WKFgTN4o/KrlqKsaq
+iRaRmuec83I2dN5Gf08mVm4FR5nuOCYTdxb1B83xaW4IyDvvNZazsY6H673aArhA
+qugWEYCYgXdal0FYvQFQTGzEFEPUrMcli9+klPUpEnJWjCWU2I/B+ktZ
+-----END CERTIFICATE-----
diff --git a/pulsar-broker/src/test/resources/authentication/tls/hn-verification/broker-key.pem b/pulsar-broker/src/test/resources/authentication/tls/hn-verification/broker-key.pem
new file mode 100644
index 000000000..b6bde087f
--- /dev/null
+++ b/pulsar-broker/src/test/resources/authentication/tls/hn-verification/broker-key.pem
@@ -0,0 +1,28 @@
+-----BEGIN PRIVATE KEY-----
+MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDou7aHN2toRMnW
+Abqlk+RfsQ5kI6l7vcGmqLi5LMlzV1pBidsBZDAG3FtOAdMCc4bR+cKiX4zBTAC8
+sb1nGPaI7rZyvjcYL13CoTAgAjgrXqlQ8sT3I3TvrU6xJfdJXo2YzS1xiCxz3+tc
+LvBe5hUegh6UMxX1e2WesniJen+3wWqjqTQ8ljIqJh1n0QqAH3yVNMb76hEcU4aB
+BLuQRStPmZxy9eyGSy9+w2VsrOB0XzVO7j/Qgisgu4BlP/54lkIZNeFGvdlOt7iV
+XyVrpvLjhxPTKRHFooS7EoHqFWAvFn75hrzjk+3X7Fo0rkzNAEDcxuf2Ge1jf4/Q
+3cURnZUtAgMBAAECggEBAKUj5V3HBlDDVtCjA3TQHyGDeim2YGGsgQen+wNyczOD
+zUhp8FvpYmbL34HXq4m2vfiql+AtmqviKTe7iyDnxq/datq6fE+N9KLRS1u7F242
+yj/lM7wFjckwGYF75h9Kl4DQPimsLZa/Ubtkly1PZ7bxL4+LPE6nE7FrBDrREGUq
+39bUGmMPXzLRxVSUdmLQIUsgLtuAOVfQB5qZ75zIUMmBhPhNhDgUv35cLxmgj5J8
+GPJxG21BBm88UYA+dhPLTAk+k3rLVKeZfXV75U0Zt04JHthhnFZ+/mJk8AD6c+jZ
+d2M1TdRSMkyTgd0DpN/bQiBvs+MK6dSkDJvYQOVGfQUCgYEA+7C1fNRQgeyJh5HJ
+waRr+9oKBLk1bTq5KaiMFF0SQo0rp5AShjG3ucTiKOBleUkiig/CpLH3CvToapq6
+uh8xLZm8Fz1AIwQ/qjRlVeNzNPCrstRk/BYgmQREr7kDH7RzvynJZYKdwpaJA3+4
+ICK/ES2FGcgNZahnm5brrCc/gxMCgYEA7LfnzWj0x5vCOlGSwo/LjFb9UgreJLQ9
+U1W/ACg9H5cp81AVTMRr9UsZOyaWJrdCTyfiQJOEZQ3YdwjBSr6f5vOxwqF68Mmi
+WG1PhP/kZsGI/cwlEA2odkoy/BGfxSMrfiCaxQNovG35agbRiJ5Awci2lOViPnvF
+HPKUULHpTr8CgYBbykVWAiReTcKWc5/OBEXxcsJmmJkYfesbe0GjB5JqPQvnr05i
+LG2hzWDhoXzAb+Ct0zOcVt8O2uSMRGPHDysjQ0bqfscOPjVtwHAYk7vnWcJ0lKtD
+mFpJE9ps759pB6mS1Q2C/NDGL5pGcWTYK3PdMumwzlm8cl9eyfqnLSUniwKBgQCO
+drfpJat7nkAsfP+IXKYyFgBrKeM7z8XAq7BB1fXDV2SF7MKE6wnWHJZYxQZE0rHz
+lZtTJfTeJJEMQpah90ug4TUwX6Lv20n7Uf4zmxXIyd06cWw01yN13X4Fuk2fhWUd
+iV3cCAs2rDEZIHVmdWefuL45qjuQQ0kD/PJKBmjVXQKBgQC2kaXVskAqZJwyfn5r
+g2hoRxjgv58UGyTsVwiwkSfoYQGdw1otO2zuyYbZZxGttMo1HkKTXFUNDELpiFXb
+5GcfT6xxssEH8zvh30M8rS0nF0AkMGZhxJxPdBnh5enwNg6glStcMY6ZaEDVz34k
+MAr7/FNPcrJt/EgvQ7PYj0/HVg==
+-----END PRIVATE KEY-----
diff --git a/pulsar-broker/src/test/resources/authentication/tls/hn-verification/cacert.pem b/pulsar-broker/src/test/resources/authentication/tls/hn-verification/cacert.pem
new file mode 100644
index 000000000..4c98286c9
--- /dev/null
+++ b/pulsar-broker/src/test/resources/authentication/tls/hn-verification/cacert.pem
@@ -0,0 +1,79 @@
+Certificate:
+    Data:
+        Version: 3 (0x2)
+        Serial Number:
+            d8:99:d5:ce:27:f5:be:4f
+    Signature Algorithm: sha256WithRSAEncryption
+        Issuer: C=AU, ST=Some-State, O=Internet Widgits Pty Ltd, CN=testCA
+        Validity
+            Not Before: Feb  9 01:11:04 2018 GMT
+            Not After : Feb  8 01:11:04 2021 GMT
+        Subject: C=AU, ST=Some-State, O=Internet Widgits Pty Ltd, CN=testCA
+        Subject Public Key Info:
+            Public Key Algorithm: rsaEncryption
+                Public-Key: (2048 bit)
+                Modulus:
+                    00:cc:50:cd:b6:68:b2:e0:5f:bd:a5:4a:5c:17:bc:
+                    d8:b9:43:e6:22:9a:8a:2e:1b:87:13:b6:ca:59:7e:
+                    d7:ee:50:fe:ef:bf:ae:4d:cc:26:70:b4:27:03:64:
+                    36:73:d5:fd:2e:08:37:b2:2d:36:26:c8:e3:d3:9e:
+                    d3:37:0d:56:fa:a9:78:55:db:09:b3:21:b7:ac:c8:
+                    12:35:16:21:ed:a8:5e:4a:a4:e3:11:a0:67:ae:4c:
+                    5b:a7:15:ff:72:b1:7a:77:2b:ea:bd:3c:89:5c:40:
+                    ae:58:4d:69:56:d6:d9:50:42:e7:d7:b1:58:cc:c8:
+                    2a:84:b0:16:7c:3a:82:38:46:78:cc:4b:8a:db:ac:
+                    cc:4c:e1:a8:c2:d4:8f:b0:d9:dc:79:f8:70:28:8a:
+                    76:4f:dc:b1:09:a2:15:65:33:de:2a:2f:8e:27:7a:
+                    0b:93:6b:66:4b:e2:53:33:97:a2:26:bf:f3:b2:8a:
+                    f2:6c:5c:41:5b:1a:bb:12:6c:2f:f3:14:35:c4:40:
+                    4b:16:65:64:72:16:bf:a3:d6:1b:4d:9b:e6:12:cb:
+                    0a:c7:a9:01:f8:63:2b:b7:22:7a:fd:ef:6b:03:9e:
+                    e5:06:87:1d:a5:d5:11:4c:11:ae:55:62:11:f5:57:
+                    7b:21:51:77:8e:b8:cf:2f:7d:86:d6:38:d3:af:28:
+                    bc:8d
+                Exponent: 65537 (0x10001)
+        X509v3 extensions:
+            X509v3 Subject Key Identifier: 
+                62:6F:F8:A2:85:3C:5C:7E:94:CC:3E:89:D6:AC:4F:65:F2:2E:02:39
+            X509v3 Authority Key Identifier: 
+                keyid:62:6F:F8:A2:85:3C:5C:7E:94:CC:3E:89:D6:AC:4F:65:F2:2E:02:39
+
+            X509v3 Basic Constraints: 
+                CA:TRUE
+    Signature Algorithm: sha256WithRSAEncryption
+         01:5a:ff:b8:36:ff:0c:9c:12:cc:ad:b2:60:ac:3c:91:c1:04:
+         c0:6b:10:f6:e0:0b:1c:17:44:76:1b:5a:98:c5:33:a2:2c:c8:
+         bf:e7:f7:2b:b7:97:37:43:8c:e7:a4:77:5f:5d:48:f6:77:2d:
+         bb:e0:f9:02:9e:df:0b:71:63:fd:ff:63:f1:23:ec:ed:bc:ac:
+         ea:a8:52:60:a7:c8:b0:f9:f7:66:62:35:ab:72:32:9a:cf:7f:
+         cc:96:fe:3b:01:31:04:21:e9:da:76:d1:09:be:66:03:c8:14:
+         48:d0:ad:73:3a:16:98:72:d9:1e:98:57:9b:49:59:8b:9a:23:
+         a9:e6:66:e6:d0:bc:65:45:fa:eb:ce:5a:21:24:9c:15:99:b9:
+         f3:63:ef:0a:bb:68:4d:ee:2e:52:6a:a2:bc:77:79:be:36:b1:
+         b5:d8:01:c5:9b:37:b0:db:38:f0:0c:59:35:7f:0c:8b:bf:ec:
+         22:bc:dc:14:c8:01:31:4f:a1:0b:82:34:ba:0f:5b:93:2e:4c:
+         ee:20:72:31:30:b1:d9:2c:42:84:2a:4e:c5:ea:d8:af:f4:da:
+         dd:b5:c4:f2:b0:43:f1:c4:09:9f:3d:5e:44:9f:b3:52:9f:92:
+         fe:9d:e3:f4:5b:6f:38:7e:3a:11:5b:99:b8:22:fd:a7:72:5d:
+         40:7c:50:f8
+-----BEGIN CERTIFICATE-----
+MIIDfzCCAmegAwIBAgIJANiZ1c4n9b5PMA0GCSqGSIb3DQEBCwUAMFYxCzAJBgNV
+BAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBX
+aWRnaXRzIFB0eSBMdGQxDzANBgNVBAMMBnRlc3RDQTAeFw0xODAyMDkwMTExMDRa
+Fw0yMTAyMDgwMTExMDRaMFYxCzAJBgNVBAYTAkFVMRMwEQYDVQQIDApTb21lLVN0
+YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQxDzANBgNVBAMM
+BnRlc3RDQTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAMxQzbZosuBf
+vaVKXBe82LlD5iKaii4bhxO2yll+1+5Q/u+/rk3MJnC0JwNkNnPV/S4IN7ItNibI
+49Oe0zcNVvqpeFXbCbMht6zIEjUWIe2oXkqk4xGgZ65MW6cV/3Kxencr6r08iVxA
+rlhNaVbW2VBC59exWMzIKoSwFnw6gjhGeMxLituszEzhqMLUj7DZ3Hn4cCiKdk/c
+sQmiFWUz3iovjid6C5NrZkviUzOXoia/87KK8mxcQVsauxJsL/MUNcRASxZlZHIW
+v6PWG02b5hLLCsepAfhjK7ciev3vawOe5QaHHaXVEUwRrlViEfVXeyFRd464zy99
+htY4068ovI0CAwEAAaNQME4wHQYDVR0OBBYEFGJv+KKFPFx+lMw+idasT2XyLgI5
+MB8GA1UdIwQYMBaAFGJv+KKFPFx+lMw+idasT2XyLgI5MAwGA1UdEwQFMAMBAf8w
+DQYJKoZIhvcNAQELBQADggEBAAFa/7g2/wycEsytsmCsPJHBBMBrEPbgCxwXRHYb
+WpjFM6IsyL/n9yu3lzdDjOekd19dSPZ3Lbvg+QKe3wtxY/3/Y/Ej7O28rOqoUmCn
+yLD592ZiNatyMprPf8yW/jsBMQQh6dp20Qm+ZgPIFEjQrXM6Fphy2R6YV5tJWYua
+I6nmZubQvGVF+uvOWiEknBWZufNj7wq7aE3uLlJqorx3eb42sbXYAcWbN7DbOPAM
+WTV/DIu/7CK83BTIATFPoQuCNLoPW5MuTO4gcjEwsdksQoQqTsXq2K/02t21xPKw
+Q/HECZ89XkSfs1Kfkv6d4/Rbbzh+OhFbmbgi/adyXUB8UPg=
+-----END CERTIFICATE-----
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml b/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml
index 986264db0..7212c3936 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml
@@ -125,6 +125,8 @@
                   <include>org.apache.pulsar:pulsar-checksum</include>
                   <include>net.jpountz.lz4:lz4</include>
                   <include>com.yahoo.datasketches:sketches-core</include>
+                  <include>org.apache.httpcomponents:httpclient</include>
+                  <include>commons-logging:commons-logging</include>
                 </includes>
               </artifactSet>
               <relocations>
@@ -195,6 +197,10 @@
                   <pattern>com.yahoo.sketches</pattern>
                   <shadedPattern>org.apache.pulsar.shade.com.yahoo.sketches</shadedPattern>
                 </relocation>
+                <relocation>
+                  <pattern>org.apache.http</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.org.apache.http</shadedPattern>
+                </relocation>
               </relocations>
               <filters>
                 <filter>
diff --git a/pulsar-client-shaded/pom.xml b/pulsar-client-shaded/pom.xml
index c3239cca8..3fbd95a1b 100644
--- a/pulsar-client-shaded/pom.xml
+++ b/pulsar-client-shaded/pom.xml
@@ -81,6 +81,8 @@
                   <include>org.apache.pulsar:pulsar-checksum</include>
                   <include>net.jpountz.lz4:lz4</include>
                   <include>com.yahoo.datasketches:sketches-core</include>
+                  <include>org.apache.httpcomponents:httpclient</include>
+                  <include>commons-logging:commons-logging</include>
                 </includes>
               </artifactSet>
               <filters>
@@ -146,6 +148,10 @@
                   <pattern>com.yahoo.sketches</pattern>
                   <shadedPattern>org.apache.pulsar.shade.com.yahoo.sketches</shadedPattern>
                 </relocation>
+                <relocation>
+                  <pattern>org.apache.http</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.org.apache.http</shadedPattern>
+                </relocation>
               </relocations>
             </configuration>
           </execution>
diff --git a/pulsar-client/pom.xml b/pulsar-client/pom.xml
index 3c26a9e87..fcce6c0f5 100644
--- a/pulsar-client/pom.xml
+++ b/pulsar-client/pom.xml
@@ -74,6 +74,24 @@
       <groupId>com.google.code.gson</groupId>
       <artifactId>gson</artifactId>
     </dependency>
+    
+    <dependency>
+      <groupId>org.apache.httpcomponents</groupId>
+      <artifactId>httpclient</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+         </exclusion>
+      </exclusions>
+    </dependency>
+    
+    <!-- httpclient uses it for logging --> 
+    <dependency>
+      <groupId>commons-logging</groupId>
+      <artifactId>commons-logging</artifactId>
+      <version>1.1.1</version>
+    </dependency>
 
   </dependencies>
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java
index dd62728f7..9e4aecec7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java
@@ -51,6 +51,7 @@
     private boolean useTls = false;
     private String tlsTrustCertsFilePath = "";
     private boolean tlsAllowInsecureConnection = false;
+    private boolean tlsHostnameVerificationEnable = false;
     private int concurrentLookupRequest = 50000;
     private int maxNumberOfRejectedRequestPerConnection = 50;
 
@@ -356,4 +357,21 @@ public void setMaxNumberOfRejectedRequestPerConnection(int maxNumberOfRejectedRe
         this.maxNumberOfRejectedRequestPerConnection = maxNumberOfRejectedRequestPerConnection;
     }
 
+    public boolean isTlsHostnameVerificationEnable() {
+        return tlsHostnameVerificationEnable;
+    }
+
+    /**
+     * It allows to validate hostname verification when client connects to broker over tls. It validates incoming x509
+     * certificate and matches provided hostname(CN/SAN) with expected broker's host name. It follows RFC 2818, 3.1. Server
+     * Identity hostname verification.
+     * 
+     * @see <a href="https://tools.ietf.org/html/rfc2818">rfc2818</a>
+     * 
+     * @param tlsHostnameVerificationEnable
+     */
+    public void setTlsHostnameVerificationEnable(boolean tlsHostnameVerificationEnable) {
+        this.tlsHostnameVerificationEnable = tlsHostnameVerificationEnable;
+    }
+    
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 3f2d176a7..38e96edd4 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -29,11 +29,14 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
+import javax.net.ssl.SSLSession;
+
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.ClientConfiguration;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.PulsarClientException.TimeoutException;
 import org.apache.pulsar.client.impl.BinaryProtoLookupService.LookupDataResult;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.PulsarHandler;
@@ -51,16 +54,18 @@
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSuccess;
 import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
+import org.apache.http.conn.ssl.DefaultHostnameVerifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.unix.Errors.NativeIoException;
+import io.netty.handler.ssl.SslHandler;
 import io.netty.util.concurrent.Promise;
-import org.apache.pulsar.client.api.PulsarClientException.TimeoutException;
 
 public class ClientCnx extends PulsarHandler {
 
@@ -87,6 +92,10 @@
     private final long operationTimeoutMs;
 
     private String proxyToTargetBrokerAddress = null;
+    // Remote hostName with which client is connected
+    private String remoteHostName = null;
+    private boolean isTlsHostnameVerificationEnable;
+    private DefaultHostnameVerifier hostnameVerifier;
 
     enum State {
         None, SentConnectFrame, Ready, Failed
@@ -100,6 +109,8 @@ public ClientCnx(ClientConfiguration conf, EventLoopGroup eventLoopGroup) {
         this.maxNumberOfRejectedRequestPerConnection = conf.getMaxNumberOfRejectedRequestPerConnection();
         this.operationTimeoutMs = conf.getOperationTimeoutMs();
         this.state = State.None;
+        this.isTlsHostnameVerificationEnable = conf.isTlsHostnameVerificationEnable();
+        this.hostnameVerifier = new DefaultHostnameVerifier();
     }
 
     @Override
@@ -179,6 +190,14 @@ public static boolean isKnownException(Throwable t) {
 
     @Override
     protected void handleConnected(CommandConnected connected) {
+        
+        if (isTlsHostnameVerificationEnable && remoteHostName != null && !verifyTlsHostName(remoteHostName, ctx)) {
+            // close the connection if host-verification failed with the broker
+            log.warn("[{}] Failed to verify hostname of {}", ctx.channel(), remoteHostName);
+            ctx.close();
+            return;
+        }
+        
         checkArgument(state == State.SentConnectFrame);
 
         if (log.isDebugEnabled()) {
@@ -521,6 +540,35 @@ private void checkServerError(ServerError error, String errMsg) {
         }
     }
 
+    /**
+     * verifies host name provided in x509 Certificate in tls session
+     * 
+     * it matches hostname with below scenarios
+     * 
+     * <pre>
+     *  1. Supports IPV4 and IPV6 host matching
+     *  2. Supports wild card matching for DNS-name
+     *  eg:
+     *     HostName                     CN           Result
+     * 1.  localhost                    localhost    PASS
+     * 2.  localhost                    local*       PASS
+     * 3.  pulsar1-broker.com           pulsar*.com  PASS
+     * </pre>
+     * 
+     * @param ctx
+     * @return true if hostname is verified else return false
+     */
+    private boolean verifyTlsHostName(String hostname, ChannelHandlerContext ctx) {
+        ChannelHandler sslHandler = ctx.channel().pipeline().get("tls");
+
+        SSLSession sslSession = null;
+        if (sslHandler != null) {
+            sslSession = ((SslHandler) sslHandler).engine().getSession();
+            return hostnameVerifier.verify(hostname, sslSession);
+        }
+        return false;
+    }
+
     void registerConsumer(final long consumerId, final ConsumerImpl consumer) {
         consumers.put(consumerId, consumer);
     }
@@ -542,6 +590,10 @@ void setTargetBroker(InetSocketAddress targetBrokerAddress) {
                 targetBrokerAddress.getPort());
     }
 
+     void setRemoteHostName(String remoteHostName) {
+        this.remoteHostName = remoteHostName;
+    }
+    
     private PulsarClientException getPulsarClientException(ServerError error, String errorMsg) {
         switch (error) {
         case AuthenticationError:
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index ed3c1845a..f598abe70 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -194,6 +194,8 @@ public void initChannel(SocketChannel ch) throws Exception {
                 cnx.setTargetBroker(logicalAddress);
             }
 
+            cnx.setRemoteHostName(physicalAddress.getHostName());
+            
             cnx.connectionFuture().thenRun(() -> {
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] Connection handshake completed", cnx.channel());
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
index 86ebd3741..e7b0be74b 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
@@ -19,7 +19,11 @@
 package org.apache.pulsar.proxy.server;
 
 import java.io.File;
+import java.security.cert.X509Certificate;
 
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.impl.auth.AuthenticationDataTls;
 import org.apache.pulsar.common.api.PulsarDecoder;
 
 import io.netty.channel.ChannelInitializer;
@@ -58,7 +62,17 @@ protected void initChannel(SocketChannel ch) throws Exception {
             builder.trustManager(InsecureTrustManagerFactory.INSTANCE);
             SslContext sslCtx = builder.clientAuth(ClientAuth.OPTIONAL).build();
             ch.pipeline().addLast(TLS_HANDLER, sslCtx.newHandler(ch.alloc()));
+            
+            String certFilePath = serviceConfig.getTlsCertificateFilePath();
+            String keyFilePath = serviceConfig.getTlsKeyFilePath();
+            if (StringUtils.isNotBlank(certFilePath) && StringUtils.isNotBlank(keyFilePath)) {
+                AuthenticationDataTls authTlsData = new AuthenticationDataTls(certFilePath, keyFilePath);
+                builder.keyManager(authTlsData.getTlsPrivateKey(),
+                        (X509Certificate[]) authTlsData.getTlsCertificates());
+            }
+            
         }
+        
         ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(PulsarDecoder.MaxFrameSize, 0, 4, 0, 4));
         ch.pipeline().addLast("handler", new ProxyConnection(proxyService));
     }
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithProxyAuthorizationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithProxyAuthorizationTest.java
index 558f5e0e8..1619e2564 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithProxyAuthorizationTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithProxyAuthorizationTest.java
@@ -18,10 +18,8 @@
  */
 package org.apache.pulsar.proxy.server;
 
-import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
 
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
@@ -42,7 +40,6 @@
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.apache.pulsar.common.policies.data.AuthAction;
-import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.PropertyAdmin;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
@@ -50,6 +47,7 @@
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 import org.testng.collections.Maps;
 
@@ -75,6 +73,11 @@
     private ProxyService proxyService;
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
 
+    @DataProvider(name = "hostnameVerification")
+    public Object[][] codecProvider() {
+        return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
+    }
+    
     @BeforeMethod
     @Override
     protected void setup() throws Exception {
@@ -161,7 +164,7 @@ public void textProxyAuthorization() throws Exception {
         createAdminClient();
         final String proxyServiceUrl = "pulsar://localhost:" + proxyConfig.getServicePortTls();
         // create a client which connects to proxy over tls and pass authData
-        PulsarClient proxyClient = createPulsarClient(proxyServiceUrl);
+        PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, false);
 
         String namespaceName = "my-property/proxy-authorization/my-ns";
         
@@ -205,6 +208,43 @@ public void textProxyAuthorization() throws Exception {
         log.info("-- Exiting {} test --", methodName);
     }
 
+    @Test(dataProvider = "hostnameVerification")
+    public void textProxyAuthorizationTlsHostVerification(boolean hostnameVerificationEnabled) throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        createAdminClient();
+        final String proxyServiceUrl = "pulsar://localhost:" + proxyConfig.getServicePortTls();
+        // create a client which connects to proxy over tls and pass authData
+        PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, hostnameVerificationEnabled);
+
+        String namespaceName = "my-property/proxy-authorization/my-ns";
+
+        admin.properties().createProperty("my-property",
+                new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("proxy-authorization")));
+        admin.namespaces().createNamespace(namespaceName);
+
+        admin.namespaces().grantPermissionOnNamespace(namespaceName, "Proxy",
+                Sets.newHashSet(AuthAction.consume, AuthAction.produce));
+        admin.namespaces().grantPermissionOnNamespace(namespaceName, "Client",
+                Sets.newHashSet(AuthAction.consume, AuthAction.produce));
+
+        ConsumerConfiguration conf = new ConsumerConfiguration();
+        conf.setSubscriptionType(SubscriptionType.Exclusive);
+        try {
+            Consumer consumer = proxyClient.subscribe("persistent://my-property/proxy-authorization/my-ns/my-topic1",
+                    "my-subscriber-name", conf);
+            if (hostnameVerificationEnabled) {
+                Assert.fail("Connection should be failed due to hostnameVerification enabled");
+            }
+        } catch (PulsarClientException e) {
+            if (!hostnameVerificationEnabled) {
+                Assert.fail("Consumer should be created because hostnameverification is disabled");
+            }
+        }
+
+        log.info("-- Exiting {} test --", methodName);
+    }
+    
     protected final void createAdminClient() throws Exception {
         Map<String, String> authParams = Maps.newHashMap();
         authParams.put("tlsCertFile", TLS_SUPERUSER_CLIENT_CERT_FILE_PATH);
@@ -221,7 +261,7 @@ protected final void createAdminClient() throws Exception {
         admin = spy(new PulsarAdmin(brokerUrlTls, clientConf));
     }
     
-    private PulsarClient createPulsarClient(String proxyServiceUrl) throws PulsarClientException {
+    private PulsarClient createPulsarClient(String proxyServiceUrl, boolean hosnameVerificationEnabled) throws PulsarClientException {
         Map<String, String> authParams = Maps.newHashMap();
         authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
         authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
@@ -233,6 +273,7 @@ private PulsarClient createPulsarClient(String proxyServiceUrl) throws PulsarCli
         clientConf.setTlsAllowInsecureConnection(true);
         clientConf.setAuthentication(authTls);
         clientConf.setUseTls(true);
+        clientConf.setTlsHostnameVerificationEnable(hosnameVerificationEnabled);
         return PulsarClient.create(proxyServiceUrl, clientConf);
     }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services