You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/05/08 07:41:31 UTC

[pulsar] branch master updated: Add Tls with keystore type config support (#6853)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 367ce78  Add Tls with keystore type config support (#6853)
367ce78 is described below

commit 367ce7829827e4b0853e1f5a50566192bb82bf54
Author: Jia Zhai <zh...@apache.org>
AuthorDate: Fri May 8 15:41:19 2020 +0800

    Add Tls with keystore type config support (#6853)
    
    Fixes #6640
    
    ### Motivation
    
    Add Tls with keystore type config.
    
    ### Modifications
    
    Add Tls with keystore type config.
    
    ### Verifying this change
    
    - Unit test passed
---
 conf/broker.conf                                   |  56 ++++
 conf/client.conf                                   |  11 +
 conf/standalone.conf                               |  96 ++++++
 .../apache/pulsar/broker/ServiceConfiguration.java |  98 ++++++
 .../authentication/AuthenticationDataHttps.java    |   1 -
 .../OneStageAuthenticationState.java               |   2 +-
 .../java/org/apache/pulsar/PulsarStandalone.java   |  25 +-
 .../org/apache/pulsar/broker/PulsarService.java    |  30 +-
 .../pulsar/broker/service/BrokerService.java       |  20 +-
 .../broker/service/PulsarChannelInitializer.java   |  62 ++--
 .../org/apache/pulsar/broker/web/WebService.java   |  45 +--
 .../pulsar/client/api/TlsProducerConsumerTest.java |   6 +-
 .../client/impl/AdminApiKeyStoreTlsAuthTest.java   | 229 +++++++++++++
 .../KeyStoreTlsProducerConsumerTestWithAuth.java   | 267 ++++++++++++++++
 ...KeyStoreTlsProducerConsumerTestWithoutAuth.java | 255 +++++++++++++++
 .../apache/pulsar/client/impl/KeyStoreTlsTest.java |  80 +++++
 .../authentication/keystoretls/broker.keystore.jks | Bin 0 -> 2767 bytes
 .../keystoretls/broker.truststore.jks              | Bin 0 -> 731 bytes
 .../keystoretls/brokerKeyStorePW.txt               |   1 +
 .../keystoretls/brokerTrustStorePW.txt             |   1 +
 .../authentication/keystoretls/client.keystore.jks | Bin 0 -> 2767 bytes
 .../keystoretls/client.truststore.jks              | Bin 0 -> 731 bytes
 .../keystoretls/clientKeyStorePW.txt               |   1 +
 .../keystoretls/clientTrustStorePW.txt             |   1 +
 .../pulsar/client/admin/PulsarAdminBuilder.java    |  58 ++++
 .../admin/internal/PulsarAdminBuilderImpl.java     |  43 +++
 .../admin/internal/http/AsyncHttpConnector.java    |  47 ++-
 .../client/api/AuthenticationDataProvider.java     |   9 +
 .../apache/pulsar/client/api/ClientBuilder.java    |  64 ++++
 .../apache/pulsar/client/api/KeyStoreParams.java   |  41 +--
 .../apache/pulsar/admin/cli/PulsarAdminTool.java   |  24 +-
 .../apache/pulsar/client/cli/PulsarClientTool.java |  18 ++
 .../pulsar/client/impl/ClientBuilderImpl.java      |  47 ++-
 .../org/apache/pulsar/client/impl/HttpClient.java  |  68 ++--
 .../pulsar/client/impl/HttpLookupService.java      |   3 +-
 .../client/impl/PulsarChannelInitializer.java      |  39 ++-
 .../impl/auth/AuthenticationDataKeyStoreTls.java   |  32 +-
 .../impl/auth/AuthenticationKeyStoreTls.java       | 136 ++++++++
 .../pulsar/client/impl/auth/AuthenticationTls.java |   4 +-
 .../client/impl/conf/ClientConfigurationData.java  |  12 +
 pulsar-common/pom.xml                              |   4 +
 .../common/util/ClientSslContextRefresher.java     |  67 ----
 .../common/util/DefaultSslContextBuilder.java      |  18 +-
 .../util/NettyClientSslContextRefresher.java       |  74 +++++
 ...lder.java => NettyServerSslContextBuilder.java} |  33 +-
 .../common/util/SslContextAutoRefreshBuilder.java  |  46 +--
 .../util/keystoretls/KeyStoreSSLContext.java       | 355 +++++++++++++++++++++
 .../util/keystoretls/NetSslContextBuilder.java     |  92 ++++++
 .../NettySSLContextAutoRefreshBuilder.java         | 144 +++++++++
 .../keystoretls/SSLContextValidatorEngine.java     | 176 ++++++++++
 .../SslContextFactoryWithAutoRefresh.java          |  63 ++++
 .../common/util/keystoretls/package-info.java      |  34 +-
 .../src/test/resources/broker.keystore.jks         | Bin 0 -> 2767 bytes
 .../src/test/resources/broker.truststore.jks       | Bin 0 -> 731 bytes
 .../src/test/resources/brokerKeyStorePW.txt        |   1 +
 .../src/test/resources/brokerTrustStorePW.txt      |   1 +
 pulsar-common/src/test/resources/ca-cert           |  16 +
 pulsar-common/src/test/resources/ca-cert.srl       |   1 +
 pulsar-common/src/test/resources/ca-key            |  30 ++
 pulsar-common/src/test/resources/cert-file         |  17 +
 pulsar-common/src/test/resources/cert-signed       |  22 ++
 .../src/test/resources/client.keystore.jks         | Bin 0 -> 2767 bytes
 .../src/test/resources/client.truststore.jks       | Bin 0 -> 731 bytes
 .../src/test/resources/clientKeyStorePW.txt        |   1 +
 .../src/test/resources/clientTrustStorePW.txt      |   1 +
 .../src/test/resources/old/broker.keystore.jks     | Bin 0 -> 2928 bytes
 .../src/test/resources/old/broker.truststore.jks   | Bin 0 -> 797 bytes
 .../src/test/resources/old/brokerKeyStorePW.txt    |   1 +
 .../src/test/resources/old/brokerTrustStorePW.txt  |   1 +
 .../src/test/resources/old/client.keystore.jks     | Bin 0 -> 2926 bytes
 .../src/test/resources/old/client.truststore.jks   | Bin 0 -> 797 bytes
 .../src/test/resources/old/clientKeyStorePW.txt    |   1 +
 .../src/test/resources/old/clientTrustStorePW.txt  |   1 +
 .../service/ServiceChannelInitializer.java         |  47 ++-
 .../discovery/service/server/ServerManager.java    |  36 ++-
 .../discovery/service/server/ServiceConfig.java    | 230 ++-----------
 .../pulsar/proxy/server/DirectProxyHandler.java    |  12 +-
 .../pulsar/proxy/server/ProxyConfiguration.java    | 104 +++++-
 .../pulsar/proxy/server/ProxyConnection.java       |  21 +-
 .../proxy/server/ServiceChannelInitializer.java    |  90 +++++-
 .../org/apache/pulsar/proxy/server/WebServer.java  |  37 ++-
 .../proxy/server/ProxyKeyStoreTlsTestWithAuth.java | 203 ++++++++++++
 .../server/ProxyKeyStoreTlsTestWithoutAuth.java    | 187 +++++++++++
 .../authentication/keystoretls/broker.keystore.jks | Bin 0 -> 2767 bytes
 .../keystoretls/broker.truststore.jks              | Bin 0 -> 731 bytes
 .../keystoretls/brokerKeyStorePW.txt               |   1 +
 .../keystoretls/brokerTrustStorePW.txt             |   1 +
 .../authentication/keystoretls/client.keystore.jks | Bin 0 -> 2767 bytes
 .../keystoretls/client.truststore.jks              | Bin 0 -> 731 bytes
 .../keystoretls/clientKeyStorePW.txt               |   1 +
 .../keystoretls/clientTrustStorePW.txt             |   1 +
 site2/docs/reference-configuration.md              |  12 +
 92 files changed, 3559 insertions(+), 556 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 2c5d6b9..4a9c8ec 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -406,6 +406,62 @@ tlsCiphers=
 # authentication.
 tlsRequireTrustedClientCertOnConnect=false
 
+### --- KeyStore TLS config variables --- ###
+# Enable TLS with KeyStore type configuration in broker.
+tlsEnabledWithKeyStore=false
+
+# TLS Provider for KeyStore type
+tlsProvider=
+
+# TLS KeyStore type configuration in broker: JKS, PKCS12
+tlsKeyStoreType=JKS
+
+# TLS KeyStore path in broker
+tlsKeyStore=
+
+# TLS KeyStore password for broker
+tlsKeyStorePassword=
+
+# TLS TrustStore type configuration in broker: JKS, PKCS12
+tlsTrustStoreType=JKS
+
+# TLS TrustStore path in broker
+tlsTrustStore=
+
+# TLS TrustStore password in broker
+tlsTrustStorePassword=
+
+# Whether internal client use KeyStore type to authenticate with Pulsar brokers
+brokerClientTlsEnabledWithKeyStore=false
+
+# The TLS Provider used by internal client to authenticate with other Pulsar brokers
+brokerClientSslProvider=
+
+# TLS TrustStore type configuration for internal client: JKS, PKCS12
+# used by the internal client to authenticate with Pulsar brokers
+brokerClientTlsTrustStoreType=JKS
+
+# TLS TrustStore path for internal client
+# used by the internal client to authenticate with Pulsar brokers
+brokerClientTlsTrustStore=
+
+# TLS TrustStore password for internal client,
+# used by the internal client to authenticate with Pulsar brokers
+brokerClientTlsTrustStorePassword=
+
+# Specify the tls cipher the internal client will use to negotiate during TLS Handshake
+# (a comma-separated list of ciphers)
+# e.g.  [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256].
+# used by the internal client to authenticate with Pulsar brokers
+brokerClientTlsCiphers=
+
+# Specify the tls protocols the broker will use to negotiate during TLS handshake
+# (a comma-separated list of protocol names).
+# e.g.  [TLSv1.2, TLSv1.1, TLSv1]
+# used by the internal client to authenticate with Pulsar brokers
+brokerClientTlsProtocols=
+
+
 ### --- Authentication --- ###
 
 # Enable authentication
diff --git a/conf/client.conf b/conf/client.conf
index 887785a..597478e 100644
--- a/conf/client.conf
+++ b/conf/client.conf
@@ -56,3 +56,14 @@ tlsEnableHostnameVerification=false
 # fails, then the cert is untrusted and the connection is dropped.
 tlsTrustCertsFilePath=
 
+# Enable TLS with KeyStore type configuration in broker.
+useKeyStoreTls=false;
+
+# TLS KeyStore type configuration: JKS, PKCS12
+tlsTrustStoreType=JKS
+
+# TLS TrustStore path
+tlsTrustStorePath=
+
+# TLS TrustStore password
+tlsTrustStorePassword=
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 0fd80a3..8a5773f 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -225,6 +225,102 @@ maxConsumersPerSubscription=0
 # Use 0 or negative number to disable the check
 maxNumPartitionsPerPartitionedTopic=0
 
+### --- TLS --- ###
+# Deprecated - Use webServicePortTls and brokerServicePortTls instead
+tlsEnabled=false
+
+# Tls cert refresh duration in seconds (set 0 to check on every new connection)
+tlsCertRefreshCheckDurationSec=300
+
+# Path for the TLS certificate file
+tlsCertificateFilePath=
+
+# Path for the TLS private key file
+tlsKeyFilePath=
+
+# Path for the trusted TLS certificate file.
+# This cert is used to verify that any certs presented by connecting clients
+# are signed by a certificate authority. If this verification
+# fails, then the certs are untrusted and the connections are dropped.
+tlsTrustCertsFilePath=
+
+# Accept untrusted TLS certificate from client.
+# If true, a client with a cert which cannot be verified with the
+# 'tlsTrustCertsFilePath' cert will allowed to connect to the server,
+# though the cert will not be used for client authentication.
+tlsAllowInsecureConnection=false
+
+# Specify the tls protocols the broker will use to negotiate during TLS handshake
+# (a comma-separated list of protocol names).
+# Examples:- [TLSv1.2, TLSv1.1, TLSv1]
+tlsProtocols=
+
+# Specify the tls cipher the broker will use to negotiate during TLS Handshake
+# (a comma-separated list of ciphers).
+# Examples:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256]
+tlsCiphers=
+
+# Trusted client certificates are required for to connect TLS
+# Reject the Connection if the Client Certificate is not trusted.
+# In effect, this requires that all connecting clients perform TLS client
+# authentication.
+tlsRequireTrustedClientCertOnConnect=false
+
+### --- KeyStore TLS config variables --- ###
+# Enable TLS with KeyStore type configuration in broker.
+tlsEnabledWithKeyStore=false
+
+# TLS Provider for KeyStore type
+tlsProvider=
+
+# TLS KeyStore type configuration in broker: JKS, PKCS12
+tlsKeyStoreType=JKS
+
+# TLS KeyStore path in broker
+tlsKeyStore=
+
+# TLS KeyStore password for broker
+tlsKeyStorePassword=
+
+# TLS TrustStore type configuration in broker: JKS, PKCS12
+tlsTrustStoreType=JKS
+
+# TLS TrustStore path in broker
+tlsTrustStore=
+
+# TLS TrustStore password for broker
+tlsTrustStorePassword=
+
+# Whether internal client use KeyStore type to authenticate with Pulsar brokers
+brokerClientTlsEnabledWithKeyStore=false
+
+# The TLS Provider used by internal client to authenticate with other Pulsar brokers
+brokerClientSslProvider=
+
+# TLS TrustStore type configuration for internal client: JKS, PKCS12
+# used by the internal client to authenticate with Pulsar brokers
+brokerClientTlsTrustStoreType=JKS
+
+# TLS TrustStore path for internal client
+# used by the internal client to authenticate with Pulsar brokers
+brokerClientTlsTrustStore=
+
+# TLS TrustStore password for internal client,
+# used by the internal client to authenticate with Pulsar brokers
+brokerClientTlsTrustStorePassword=
+
+# Specify the tls cipher the internal client will use to negotiate during TLS Handshake
+# (a comma-separated list of ciphers)
+# e.g.  [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256].
+# used by the internal client to authenticate with Pulsar brokers
+brokerClientTlsCiphers=
+
+# Specify the tls protocols the broker will use to negotiate during TLS handshake
+# (a comma-separated list of protocol names).
+# e.g.  [TLSv1.2, TLSv1.1, TLSv1]
+# used by the internal client to authenticate with Pulsar brokers
+brokerClientTlsProtocols=
+
 ### --- Authentication --- ###
 # Role names that are treated as "proxy roles". If the broker sees a request with
 #role as proxyRoles - it will demand to see a valid original principal.
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index d7ecb13..1d10b8d 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -77,6 +77,8 @@ public class ServiceConfiguration implements PulsarConfiguration {
     @Category
     private static final String CATEGORY_TLS = "TLS";
     @Category
+    private static final String CATEGORY_KEYSTORE_TLS = "KeyStoreTLS";
+    @Category
     private static final String CATEGORY_AUTHENTICATION = "Authentication";
     @Category
     private static final String CATEGORY_AUTHORIZATION = "Authorization";
@@ -1581,6 +1583,102 @@ public class ServiceConfiguration implements PulsarConfiguration {
     private String transactionMetadataStoreProviderClassName =
             "org.apache.pulsar.transaction.coordinator.impl.InMemTransactionMetadataStoreProvider";
 
+    /**** --- KeyStore TLS config variables --- ****/
+    @FieldContext(
+            category = CATEGORY_KEYSTORE_TLS,
+            doc = "Enable TLS with KeyStore type configuration in broker"
+    )
+    private boolean tlsEnabledWithKeyStore = false;
+
+    @FieldContext(
+            category = CATEGORY_KEYSTORE_TLS,
+            doc = "TLS Provider for KeyStore type"
+    )
+    private String tlsProvider = null;
+
+    @FieldContext(
+            category = CATEGORY_KEYSTORE_TLS,
+            doc = "TLS KeyStore type configuration in broker: JKS, PKCS12"
+    )
+    private String tlsKeyStoreType = "JKS";
+
+    @FieldContext(
+            category = CATEGORY_KEYSTORE_TLS,
+            doc = "TLS KeyStore path in broker"
+    )
+    private String tlsKeyStore = null;
+
+    @FieldContext(
+            category = CATEGORY_KEYSTORE_TLS,
+            doc = "TLS KeyStore password for broker"
+    )
+    private String tlsKeyStorePassword = null;
+
+    @FieldContext(
+            category = CATEGORY_KEYSTORE_TLS,
+            doc = "TLS TrustStore type configuration in broker: JKS, PKCS12"
+    )
+    private String tlsTrustStoreType = "JKS";
+
+    @FieldContext(
+            category = CATEGORY_KEYSTORE_TLS,
+            doc = "TLS TrustStore path in broker"
+    )
+    private String tlsTrustStore = null;
+
+    @FieldContext(
+            category = CATEGORY_KEYSTORE_TLS,
+            doc = "TLS TrustStore password for broker"
+    )
+    private String tlsTrustStorePassword = null;
+
+    /**** --- KeyStore TLS config variables used for internal client/admin to auth with other broker--- ****/
+    @FieldContext(
+            category = CATEGORY_KEYSTORE_TLS,
+            doc = "Whether internal client use KeyStore type to authenticate with other Pulsar brokers"
+    )
+    private boolean brokerClientTlsEnabledWithKeyStore = false;
+    @FieldContext(
+            category = CATEGORY_KEYSTORE_TLS,
+            doc = "The TLS Provider used by internal client to authenticate with other Pulsar brokers"
+    )
+    private String brokerClientSslProvider = null;
+    // needed when client auth is required
+    @FieldContext(
+            category = CATEGORY_KEYSTORE_TLS,
+            doc = "TLS TrustStore type configuration for internal client: JKS, PKCS12 "
+                  + " used by the internal client to authenticate with Pulsar brokers"
+    )
+    private String brokerClientTlsTrustStoreType = "JKS";
+    @FieldContext(
+            category = CATEGORY_KEYSTORE_TLS,
+            doc = "TLS TrustStore path for internal client, "
+                  + " used by the internal client to authenticate with Pulsar brokers"
+    )
+    private String brokerClientTlsTrustStore = null;
+    @FieldContext(
+            category = CATEGORY_KEYSTORE_TLS,
+            doc = "TLS TrustStore password for internal client, "
+                  + " used by the internal client to authenticate with Pulsar brokers"
+    )
+    private String brokerClientTlsTrustStorePassword = null;
+    @FieldContext(
+            category = CATEGORY_KEYSTORE_TLS,
+            doc = "Specify the tls cipher the internal client will use to negotiate during TLS Handshake"
+                  + " (a comma-separated list of ciphers).\n\n"
+                  + "Examples:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256].\n"
+                  + " used by the internal client to authenticate with Pulsar brokers"
+    )
+    private Set<String> brokerClientTlsCiphers = Sets.newTreeSet();
+    @FieldContext(
+            category = CATEGORY_KEYSTORE_TLS,
+            doc = "Specify the tls protocols the broker will use to negotiate during TLS handshake"
+                  + " (a comma-separated list of protocol names).\n\n"
+                  + "Examples:- [TLSv1.2, TLSv1.1, TLSv1] \n"
+                  + " used by the internal client to authenticate with Pulsar brokers"
+    )
+    private Set<String> brokerClientTlsProtocols = Sets.newTreeSet();
+
     /**
      * @deprecated See {@link #getConfigurationStoreServers}
      */
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataHttps.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataHttps.java
index 03a9bd3..4e1d33b 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataHttps.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataHttps.java
@@ -34,7 +34,6 @@ public class AuthenticationDataHttps extends AuthenticationDataHttp {
     /*
      * TLS
      */
-
     @Override
     public boolean hasDataFromTls() {
         return (certificates != null);
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/OneStageAuthenticationState.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/OneStageAuthenticationState.java
index 06b1749..f2667c3 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/OneStageAuthenticationState.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/OneStageAuthenticationState.java
@@ -42,7 +42,7 @@ public class OneStageAuthenticationState implements AuthenticationState {
                                        SSLSession sslSession,
                                        AuthenticationProvider provider) throws AuthenticationException {
         this.authenticationDataSource = new AuthenticationDataCommand(
-            new String(authData.getBytes(), UTF_8), remoteAddress, sslSession);;
+            new String(authData.getBytes(), UTF_8), remoteAddress, sslSession);
         this.authRole = provider.authenticate(authenticationDataSource);
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
index 7c76257..be1b276 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
@@ -33,6 +33,7 @@ import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.ServiceConfigurationUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
@@ -332,11 +333,29 @@ public class PulsarStandalone implements AutoCloseable {
             createSampleNameSpace(clusterData, cluster);
         } else {
             URL webServiceUrlTls = new URL(
-                    String.format("http://%s:%d", config.getAdvertisedAddress(), config.getWebServicePortTls().get()));
+                    String.format("https://%s:%d", config.getAdvertisedAddress(), config.getWebServicePortTls().get()));
             String brokerServiceUrlTls = String.format("pulsar+ssl://%s:%d", config.getAdvertisedAddress(),
                     config.getBrokerServicePortTls().get());
-            admin = PulsarAdmin.builder().serviceHttpUrl(webServiceUrlTls.toString()).authentication(
-                    config.getBrokerClientAuthenticationPlugin(), config.getBrokerClientAuthenticationParameters()).build();
+            PulsarAdminBuilder builder = PulsarAdmin.builder()
+                    .serviceHttpUrl(webServiceUrlTls.toString())
+                    .authentication(
+                            config.getBrokerClientAuthenticationPlugin(),
+                            config.getBrokerClientAuthenticationParameters());
+
+            // set trust store if needed.
+            if (config.isBrokerClientTlsEnabled()) {
+                if (config.isBrokerClientTlsEnabledWithKeyStore()) {
+                    builder.useKeyStoreTls(true)
+                            .tlsTrustStoreType(config.getBrokerClientTlsTrustStoreType())
+                            .tlsTrustStorePath(config.getBrokerClientTlsTrustStore())
+                            .tlsTrustStorePassword(config.getBrokerClientTlsTrustStorePassword());
+                } else {
+                    builder.tlsTrustCertsFilePath(config.getBrokerClientTrustCertsFilePath());
+                }
+                builder.allowTlsInsecureConnection(config.isTlsAllowInsecureConnection());
+            }
+
+            admin = builder.build();
             ClusterData clusterData = new ClusterData(null, webServiceUrlTls.toString(), null, brokerServiceUrlTls);
             createSampleNameSpace(clusterData, cluster);
         }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index e3874ef..30d1dde 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -524,9 +524,9 @@ public class PulsarService implements AutoCloseable {
 
             final String bootstrapMessage = "bootstrap service "
                     + (config.getWebServicePort().isPresent() ? "port = " + config.getWebServicePort().get() : "")
-                    + (config.getWebServicePortTls().isPresent() ? "tls-port = " + config.getWebServicePortTls() : "")
-                    + (config.getBrokerServicePort().isPresent() ? "broker url= " + brokerServiceUrl : "")
-                    + (config.getBrokerServicePortTls().isPresent() ? "broker url= " + brokerServiceUrlTls : "");
+                    + (config.getWebServicePortTls().isPresent() ? ", tls-port = " + config.getWebServicePortTls() : "")
+                    + (config.getBrokerServicePort().isPresent() ? ", broker url= " + brokerServiceUrl : "")
+                    + (config.getBrokerServicePortTls().isPresent() ? ", broker tls url= " + brokerServiceUrlTls : "");
             LOG.info("messaging service is ready");
 
             LOG.info("messaging service is ready, {}, cluster={}, configs={}", bootstrapMessage,
@@ -960,10 +960,17 @@ public class PulsarService implements AutoCloseable {
                     .tlsTrustCertsFilePath(this.getConfiguration().getTlsCertificateFilePath());
 
                 if (this.getConfiguration().isBrokerClientTlsEnabled()) {
-                    builder.tlsTrustCertsFilePath(
-                        isNotBlank(this.getConfiguration().getBrokerClientTrustCertsFilePath())
-                            ? this.getConfiguration().getBrokerClientTrustCertsFilePath()
-                            : this.getConfiguration().getTlsCertificateFilePath());
+                    if (this.getConfiguration().isBrokerClientTlsEnabledWithKeyStore()) {
+                        builder.useKeyStoreTls(true)
+                                .tlsTrustStoreType(this.getConfiguration().getBrokerClientTlsTrustStoreType())
+                                .tlsTrustStorePath(this.getConfiguration().getBrokerClientTlsTrustStore())
+                                .tlsTrustStorePassword(this.getConfiguration().getBrokerClientTlsTrustStorePassword());
+                    } else {
+                        builder.tlsTrustCertsFilePath(
+                                isNotBlank(this.getConfiguration().getBrokerClientTrustCertsFilePath())
+                                        ? this.getConfiguration().getBrokerClientTrustCertsFilePath()
+                                        : this.getConfiguration().getTlsCertificateFilePath());
+                    }
                 }
 
                 if (isNotBlank(this.getConfiguration().getBrokerClientAuthenticationPlugin())) {
@@ -989,7 +996,14 @@ public class PulsarService implements AutoCloseable {
                                 conf.getBrokerClientAuthenticationParameters());
 
                 if (conf.isBrokerClientTlsEnabled()) {
-                    builder.tlsTrustCertsFilePath(conf.getBrokerClientTrustCertsFilePath());
+                    if (conf.isBrokerClientTlsEnabledWithKeyStore()) {
+                        builder.useKeyStoreTls(true)
+                                .tlsTrustStoreType(conf.getBrokerClientTlsTrustStoreType())
+                                .tlsTrustStorePath(conf.getBrokerClientTlsTrustStore())
+                                .tlsTrustStorePassword(conf.getBrokerClientTlsTrustStorePassword());
+                    } else {
+                        builder.tlsTrustCertsFilePath(conf.getBrokerClientTrustCertsFilePath());
+                    }
                     builder.allowTlsInsecureConnection(conf.isTlsAllowInsecureConnection());
                 }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 7b88746..1734464 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -796,8 +796,17 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
                             .serviceUrl(isNotBlank(data.getBrokerServiceUrlTls()) ? data.getBrokerServiceUrlTls()
                                     : data.getServiceUrlTls())
                             .enableTls(true)
-                            .tlsTrustCertsFilePath(pulsar.getConfiguration().getBrokerClientTrustCertsFilePath())
                             .allowTlsInsecureConnection(pulsar.getConfiguration().isTlsAllowInsecureConnection());
+                    if (pulsar.getConfiguration().isBrokerClientTlsEnabledWithKeyStore()) {
+                        clientBuilder.useKeyStoreTls(true)
+                                .tlsTrustStoreType(pulsar.getConfiguration().getBrokerClientTlsTrustStoreType())
+                                .tlsTrustStorePath(pulsar.getConfiguration().getBrokerClientTlsTrustStore())
+                                .tlsTrustStorePassword(pulsar.getConfiguration()
+                                        .getBrokerClientTlsTrustStorePassword());
+                    } else {
+                        clientBuilder.tlsTrustCertsFilePath(pulsar.getConfiguration()
+                                .getBrokerClientTrustCertsFilePath());
+                    }
                 } else {
                     clientBuilder.serviceUrl(
                             isNotBlank(data.getBrokerServiceUrl()) ? data.getBrokerServiceUrl() : data.getServiceUrl());
@@ -833,8 +842,15 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
                                 conf.getBrokerClientAuthenticationParameters());
 
                 if (isTlsUrl) {
-                    builder.tlsTrustCertsFilePath(conf.getBrokerClientTrustCertsFilePath());
                     builder.allowTlsInsecureConnection(conf.isTlsAllowInsecureConnection());
+                    if (conf.isBrokerClientTlsEnabledWithKeyStore()) {
+                        builder.useKeyStoreTls(true)
+                                .tlsTrustStoreType(conf.getBrokerClientTlsTrustStoreType())
+                                .tlsTrustStorePath(conf.getBrokerClientTlsTrustStore())
+                                .tlsTrustStorePassword(conf.getBrokerClientTlsTrustStorePassword());
+                    } else {
+                        builder.tlsTrustCertsFilePath(conf.getBrokerClientTrustCertsFilePath());
+                    }
                 }
 
                 // most of the admin request requires to make zk-call so, keep the max read-timeout based on
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
index ce16a7e..2a2d3d5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
@@ -20,23 +20,24 @@ package org.apache.pulsar.broker.service;
 
 import static org.apache.bookkeeper.util.SafeRunnable.safeRun;
 
-import java.net.SocketAddress;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.pulsar.broker.PulsarService;
-import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.common.protocol.ByteBufPair;
-import org.apache.pulsar.common.protocol.Commands;
-import org.apache.pulsar.common.util.NettySslContextBuilder;
-
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
-
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import io.netty.handler.flow.FlowControlHandler;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslHandler;
+import java.net.SocketAddress;
+import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.common.protocol.ByteBufPair;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.util.NettyServerSslContextBuilder;
+import org.apache.pulsar.common.util.SslContextAutoRefreshBuilder;
+import org.apache.pulsar.common.util.keystoretls.NettySSLContextAutoRefreshBuilder;
 
 @Slf4j
 public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel> {
@@ -45,8 +46,10 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel>
 
     private final PulsarService pulsar;
     private final boolean enableTls;
-    private final NettySslContextBuilder sslCtxRefresher;
+    private final boolean tlsEnabledWithKeyStore;
+    private SslContextAutoRefreshBuilder<SslContext> sslCtxRefresher;
     private final ServiceConfiguration brokerConf;
+    private NettySSLContextAutoRefreshBuilder nettySSLContextAutoRefreshBuilder;
 
     // This cache is used to maintain a list of active connections to iterate over them
     // We keep weak references to have the cache to be auto cleaned up when the connections
@@ -66,13 +69,31 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel>
         super();
         this.pulsar = pulsar;
         this.enableTls = enableTLS;
+        ServiceConfiguration serviceConfig = pulsar.getConfiguration();
+        this.tlsEnabledWithKeyStore = serviceConfig.isTlsEnabledWithKeyStore();
         if (this.enableTls) {
-            ServiceConfiguration serviceConfig = pulsar.getConfiguration();
-            sslCtxRefresher = new NettySslContextBuilder(serviceConfig.isTlsAllowInsecureConnection(),
-                    serviceConfig.getTlsTrustCertsFilePath(), serviceConfig.getTlsCertificateFilePath(),
-                    serviceConfig.getTlsKeyFilePath(), serviceConfig.getTlsCiphers(), serviceConfig.getTlsProtocols(),
-                    serviceConfig.isTlsRequireTrustedClientCertOnConnect(),
-                    serviceConfig.getTlsCertRefreshCheckDurationSec());
+            if (tlsEnabledWithKeyStore) {
+                nettySSLContextAutoRefreshBuilder = new NettySSLContextAutoRefreshBuilder(
+                        serviceConfig.getTlsProvider(),
+                        serviceConfig.getTlsKeyStoreType(),
+                        serviceConfig.getTlsKeyStore(),
+                        serviceConfig.getTlsKeyStorePassword(),
+                        serviceConfig.isTlsAllowInsecureConnection(),
+                        serviceConfig.getTlsTrustStoreType(),
+                        serviceConfig.getTlsTrustStore(),
+                        serviceConfig.getTlsTrustStorePassword(),
+                        serviceConfig.isTlsRequireTrustedClientCertOnConnect(),
+                        serviceConfig.getTlsCiphers(),
+                        serviceConfig.getTlsProtocols(),
+                        serviceConfig.getTlsCertRefreshCheckDurationSec());
+            } else {
+                sslCtxRefresher = new NettyServerSslContextBuilder(serviceConfig.isTlsAllowInsecureConnection(),
+                        serviceConfig.getTlsTrustCertsFilePath(), serviceConfig.getTlsCertificateFilePath(),
+                        serviceConfig.getTlsKeyFilePath(),
+                        serviceConfig.getTlsCiphers(), serviceConfig.getTlsProtocols(),
+                        serviceConfig.isTlsRequireTrustedClientCertOnConnect(),
+                        serviceConfig.getTlsCertRefreshCheckDurationSec());
+            }
         } else {
             this.sslCtxRefresher = null;
         }
@@ -86,7 +107,12 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel>
     @Override
     protected void initChannel(SocketChannel ch) throws Exception {
         if (this.enableTls) {
-            ch.pipeline().addLast(TLS_HANDLER, sslCtxRefresher.get().newHandler(ch.alloc()));
+            if (this.tlsEnabledWithKeyStore) {
+                ch.pipeline().addLast(TLS_HANDLER,
+                        new SslHandler(nettySSLContextAutoRefreshBuilder.get().createSSLEngine()));
+            } else {
+                ch.pipeline().addLast(TLS_HANDLER, sslCtxRefresher.get().newHandler(ch.alloc()));
+            }
             ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.COPYING_ENCODER);
         } else {
             ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
index e111fbc..feb0e49 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
@@ -19,29 +19,19 @@
 package org.apache.pulsar.broker.web;
 
 import com.google.common.collect.Lists;
-
 import io.prometheus.client.jetty.JettyStatisticsCollector;
-
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.TimeZone;
-
 import javax.servlet.DispatcherType;
-import javax.servlet.Filter;
-import javax.servlet.FilterChain;
-import javax.servlet.FilterConfig;
-import javax.servlet.ServletException;
-import javax.servlet.ServletRequest;
-import javax.servlet.ServletResponse;
-import javax.servlet.http.HttpServletResponse;
-
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.common.util.SecurityUtility;
+import org.apache.pulsar.common.util.keystoretls.KeyStoreSSLContext;
 import org.eclipse.jetty.server.Handler;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.ServerConnector;
@@ -105,13 +95,30 @@ public class WebService implements AutoCloseable {
         Optional<Integer> tlsPort = pulsar.getConfiguration().getWebServicePortTls();
         if (tlsPort.isPresent()) {
             try {
-                SslContextFactory sslCtxFactory = SecurityUtility.createSslContextFactory(
-                        pulsar.getConfiguration().isTlsAllowInsecureConnection(),
-                        pulsar.getConfiguration().getTlsTrustCertsFilePath(),
-                        pulsar.getConfiguration().getTlsCertificateFilePath(),
-                        pulsar.getConfiguration().getTlsKeyFilePath(),
-                        pulsar.getConfiguration().isTlsRequireTrustedClientCertOnConnect(), true,
-                        pulsar.getConfiguration().getTlsCertRefreshCheckDurationSec());
+                SslContextFactory sslCtxFactory;
+                ServiceConfiguration config = pulsar.getConfiguration();
+                if (config.isTlsEnabledWithKeyStore()) {
+                    sslCtxFactory = KeyStoreSSLContext.createSslContextFactory(
+                            config.getTlsProvider(),
+                            config.getTlsKeyStoreType(),
+                            config.getTlsKeyStore(),
+                            config.getTlsKeyStorePassword(),
+                            config.isTlsAllowInsecureConnection(),
+                            config.getTlsTrustStoreType(),
+                            config.getTlsTrustStore(),
+                            config.getTlsTrustStorePassword(),
+                            config.isTlsRequireTrustedClientCertOnConnect(),
+                            config.getTlsCertRefreshCheckDurationSec()
+                    );
+                } else {
+                    sslCtxFactory = SecurityUtility.createSslContextFactory(
+                            config.isTlsAllowInsecureConnection(),
+                            config.getTlsTrustCertsFilePath(),
+                            config.getTlsCertificateFilePath(),
+                            config.getTlsKeyFilePath(),
+                            config.isTlsRequireTrustedClientCertOnConnect(), true,
+                            config.getTlsCertRefreshCheckDurationSec());
+                }
                 httpsConnector = new PulsarServerConnector(server, 1, 1, sslCtxFactory);
                 httpsConnector.setPort(tlsPort.get());
                 httpsConnector.setHost(pulsar.getBindAddress());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java
index 55bc4a7..9f1eac8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java
@@ -182,14 +182,14 @@ public class TlsProducerConsumerTest extends TlsProducerConsumerBase {
 
     /**
      * It verifies that AuthenticationTls provides cert refresh functionality.
-     * 
+     *
      * <pre>
      *  a. Create Auth with invalid cert
      *  b. Consumer fails with invalid tls certs
      *  c. refresh cert in provider
      *  d. Consumer successfully gets created
      * </pre>
-     * 
+     *
      * @throws Exception
      */
     @Test
@@ -234,5 +234,5 @@ public class TlsProducerConsumerTest extends TlsProducerConsumerBase {
 
     private ByteArrayInputStream getStream(AtomicInteger index, ByteArrayInputStream... streams) {
         return streams[index.intValue()];
-    } 
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AdminApiKeyStoreTlsAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AdminApiKeyStoreTlsAuthTest.java
new file mode 100644
index 0000000..ab2833d
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AdminApiKeyStoreTlsAuthTest.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls.mapToString;
+import static org.testng.Assert.fail;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import javax.net.ssl.SSLContext;
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.MediaType;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.internal.JacksonConfigurator;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.util.keystoretls.KeyStoreSSLContext;
+import org.glassfish.jersey.client.ClientConfig;
+import org.glassfish.jersey.client.ClientProperties;
+import org.glassfish.jersey.jackson.JacksonFeature;
+import org.glassfish.jersey.media.multipart.MultiPartFeature;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class AdminApiKeyStoreTlsAuthTest extends ProducerConsumerBase {
+    protected final String BROKER_KEYSTORE_FILE_PATH =
+            "./src/test/resources/authentication/keystoretls/broker.keystore.jks";
+    protected final String BROKER_TRUSTSTORE_FILE_PATH =
+            "./src/test/resources/authentication/keystoretls/broker.truststore.jks";
+    protected final String BROKER_KEYSTORE_PW = "111111";
+    protected final String BROKER_TRUSTSTORE_PW = "111111";
+
+    protected final String CLIENT_KEYSTORE_FILE_PATH =
+            "./src/test/resources/authentication/keystoretls/client.keystore.jks";
+    protected final String CLIENT_TRUSTSTORE_FILE_PATH =
+            "./src/test/resources/authentication/keystoretls/client.truststore.jks";
+    protected final String CLIENT_KEYSTORE_PW = "111111";
+    protected final String CLIENT_TRUSTSTORE_PW = "111111";
+
+    protected final String CLIENT_KEYSTORE_CN = "clientuser";
+    protected final String KEYSTORE_TYPE = "JKS";
+
+    private final String clusterName = "test";
+    Set<String> tlsProtocols = Sets.newConcurrentHashSet();
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception {
+        conf.setLoadBalancerEnabled(true);
+        conf.setBrokerServicePortTls(Optional.of(0));
+        conf.setWebServicePortTls(Optional.of(0));
+
+        conf.setTlsEnabledWithKeyStore(true);
+        conf.setTlsKeyStoreType(KEYSTORE_TYPE);
+        conf.setTlsKeyStore(BROKER_KEYSTORE_FILE_PATH);
+        conf.setTlsKeyStorePassword(BROKER_KEYSTORE_PW);
+
+        conf.setTlsTrustStoreType(KEYSTORE_TYPE);
+        conf.setTlsTrustStore(CLIENT_TRUSTSTORE_FILE_PATH);
+        conf.setTlsTrustStorePassword(CLIENT_TRUSTSTORE_PW);
+
+        conf.setClusterName(clusterName);
+        conf.setTlsRequireTrustedClientCertOnConnect(true);
+        tlsProtocols.add("TLSv1.2");
+        conf.setTlsProtocols(tlsProtocols);
+
+        // config for authentication and authorization.
+        conf.setSuperUserRoles(Sets.newHashSet(CLIENT_KEYSTORE_CN));
+        conf.setAuthenticationEnabled(true);
+        conf.setAuthorizationEnabled(true);
+        Set<String> providers = new HashSet<>();
+        providers.add(AuthenticationProviderTls.class.getName());
+        conf.setAuthenticationProviders(providers);
+
+        conf.setBrokerClientTlsEnabled(true);
+        conf.setBrokerClientTlsEnabledWithKeyStore(true);
+
+        // set broker client tls auth
+        Map<String, String> authParams = new HashMap<>();
+        authParams.put(AuthenticationKeyStoreTls.KEYSTORE_TYPE, KEYSTORE_TYPE);
+        authParams.put(AuthenticationKeyStoreTls.KEYSTORE_PATH, CLIENT_KEYSTORE_FILE_PATH);
+        authParams.put(AuthenticationKeyStoreTls.KEYSTORE_PW, CLIENT_KEYSTORE_PW);
+        conf.setBrokerClientAuthenticationPlugin(AuthenticationKeyStoreTls.class.getName());
+        conf.setBrokerClientAuthenticationParameters(mapToString(authParams));
+        conf.setBrokerClientTlsTrustStore(BROKER_TRUSTSTORE_FILE_PATH);
+        conf.setBrokerClientTlsTrustStorePassword(BROKER_TRUSTSTORE_PW);
+
+        super.init();
+    }
+
+    @AfterMethod
+    @Override
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    WebTarget buildWebClient() throws Exception {
+        ClientConfig httpConfig = new ClientConfig();
+        httpConfig.property(ClientProperties.FOLLOW_REDIRECTS, true);
+        httpConfig.property(ClientProperties.ASYNC_THREADPOOL_SIZE, 8);
+        httpConfig.register(MultiPartFeature.class);
+
+        ClientBuilder clientBuilder = ClientBuilder.newBuilder().withConfig(httpConfig)
+            .register(JacksonConfigurator.class).register(JacksonFeature.class);
+
+        SSLContext sslCtx = KeyStoreSSLContext.createClientSslContext(
+                KEYSTORE_TYPE,
+                CLIENT_KEYSTORE_FILE_PATH,
+                CLIENT_KEYSTORE_PW,
+                KEYSTORE_TYPE,
+                BROKER_TRUSTSTORE_FILE_PATH,
+                BROKER_TRUSTSTORE_PW);
+
+        clientBuilder.sslContext(sslCtx).hostnameVerifier(NoopHostnameVerifier.INSTANCE);
+        Client client = clientBuilder.build();
+
+        return client.target(brokerUrlTls.toString());
+    }
+
+    PulsarAdmin buildAdminClient() throws Exception {
+        Map<String, String> authParams = new HashMap<>();
+        authParams.put(AuthenticationKeyStoreTls.KEYSTORE_PATH, CLIENT_KEYSTORE_FILE_PATH);
+        authParams.put(AuthenticationKeyStoreTls.KEYSTORE_PW, CLIENT_KEYSTORE_PW);
+
+        return PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrlTls.toString())
+                .useKeyStoreTls(true)
+                .tlsTrustStorePath(BROKER_TRUSTSTORE_FILE_PATH)
+                .tlsTrustStorePassword(BROKER_TRUSTSTORE_PW)
+                .allowTlsInsecureConnection(false)
+                .authentication(AuthenticationKeyStoreTls.class.getName(), authParams)
+                .build();
+    }
+
+    @Test
+    public void testSuperUserCanListTenants() throws Exception {
+        try (PulsarAdmin admin = buildAdminClient()) {
+            admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString()));
+            admin.tenants().createTenant("tenant1",
+                                         new TenantInfo(ImmutableSet.of("foobar"),
+                                                        ImmutableSet.of("test")));
+            Assert.assertEquals(ImmutableSet.of("tenant1"), admin.tenants().getTenants());
+        }
+    }
+
+    @Test
+    public void testSuperUserCantListNamespaces() throws Exception {
+        try (PulsarAdmin admin = buildAdminClient()) {
+            admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString()));
+            admin.tenants().createTenant("tenant1",
+                                         new TenantInfo(ImmutableSet.of("proxy"),
+                                                        ImmutableSet.of("test")));
+            admin.namespaces().createNamespace("tenant1/ns1");
+            admin.namespaces().getNamespaces("tenant1").contains("tenant1/ns1");
+        }
+    }
+
+    @Test
+    public void testAuthorizedUserAsOriginalPrincipal() throws Exception {
+        try (PulsarAdmin admin = buildAdminClient()) {
+            admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString()));
+            admin.tenants().createTenant("tenant1",
+                                         new TenantInfo(ImmutableSet.of("proxy", "user1"),
+                                                        ImmutableSet.of("test")));
+            admin.namespaces().createNamespace("tenant1/ns1");
+        }
+        WebTarget root = buildWebClient();
+        Assert.assertEquals(ImmutableSet.of("tenant1/ns1"),
+                            root.path("/admin/v2/namespaces").path("tenant1")
+                            .request(MediaType.APPLICATION_JSON)
+                            .header("X-Original-Principal", "user1")
+                            .get(new GenericType<List<String>>() {}));
+    }
+
+    @Test
+    public void testPersistentList() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        try (PulsarAdmin admin = buildAdminClient()) {
+            admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString()));
+            admin.tenants().createTenant("tenant1",
+                    new TenantInfo(ImmutableSet.of("foobar"),
+                            ImmutableSet.of("test")));
+            Assert.assertEquals(ImmutableSet.of("tenant1"), admin.tenants().getTenants());
+
+            admin.namespaces().createNamespace("tenant1/ns1");
+
+            // this will calls internal admin to list nonpersist topics.
+            admin.topics().getList("tenant1/ns1");
+        } catch (PulsarAdminException ex) {
+            ex.printStackTrace();
+            fail("Should not have thrown an exception");
+        }
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithAuth.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithAuth.java
new file mode 100644
index 0000000..14177e0
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithAuth.java
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.mockito.Mockito.spy;
+
+import com.google.common.collect.Sets;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+// TLS authentication and authorization based on KeyStore type config.
+@Slf4j
+public class KeyStoreTlsProducerConsumerTestWithAuth extends ProducerConsumerBase {
+    protected final String BROKER_KEYSTORE_FILE_PATH =
+            "./src/test/resources/authentication/keystoretls/broker.keystore.jks";
+    protected final String BROKER_TRUSTSTORE_FILE_PATH =
+            "./src/test/resources/authentication/keystoretls/broker.truststore.jks";
+    protected final String BROKER_KEYSTORE_PW = "111111";
+    protected final String BROKER_TRUSTSTORE_PW = "111111";
+
+    protected final String CLIENT_KEYSTORE_FILE_PATH =
+            "./src/test/resources/authentication/keystoretls/client.keystore.jks";
+    protected final String CLIENT_TRUSTSTORE_FILE_PATH =
+            "./src/test/resources/authentication/keystoretls/client.truststore.jks";
+    protected final String CLIENT_KEYSTORE_PW = "111111";
+    protected final String CLIENT_TRUSTSTORE_PW = "111111";
+
+    protected final String CLIENT_KEYSTORE_CN = "clientuser";
+    protected final String KEYSTORE_TYPE = "JKS";
+
+    private final String clusterName = "use";
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        // TLS configuration for Broker
+        internalSetUpForBroker();
+
+        // Start Broker
+
+        super.init();
+    }
+
+    @AfterMethod
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    protected void internalSetUpForBroker() throws Exception {
+        conf.setBrokerServicePortTls(Optional.of(0));
+        conf.setWebServicePortTls(Optional.of(0));
+        conf.setTlsEnabledWithKeyStore(true);
+
+        conf.setTlsKeyStoreType(KEYSTORE_TYPE);
+        conf.setTlsKeyStore(BROKER_KEYSTORE_FILE_PATH);
+        conf.setTlsKeyStorePassword(BROKER_KEYSTORE_PW);
+
+        conf.setTlsTrustStoreType(KEYSTORE_TYPE);
+        conf.setTlsTrustStore(CLIENT_TRUSTSTORE_FILE_PATH);
+        conf.setTlsTrustStorePassword(CLIENT_TRUSTSTORE_PW);
+
+        conf.setClusterName(clusterName);
+        conf.setTlsRequireTrustedClientCertOnConnect(true);
+
+        // config for authentication and authorization.
+        conf.setSuperUserRoles(Sets.newHashSet(CLIENT_KEYSTORE_CN));
+        conf.setAuthenticationEnabled(true);
+        conf.setAuthorizationEnabled(true);
+        Set<String> providers = new HashSet<>();
+        providers.add(AuthenticationProviderTls.class.getName());
+        conf.setAuthenticationProviders(providers);
+    }
+
+    protected void internalSetUpForClient(boolean addCertificates, String lookupUrl) throws Exception {
+        if (pulsarClient != null) {
+            pulsarClient.close();
+        }
+
+        Set<String> tlsProtocols = Sets.newConcurrentHashSet();
+        tlsProtocols.add("TLSv1.2");
+
+        ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(lookupUrl)
+                .enableTls(true)
+                .useKeyStoreTls(true)
+                .tlsTrustStorePath(BROKER_TRUSTSTORE_FILE_PATH)
+                .tlsTrustStorePassword(BROKER_TRUSTSTORE_PW)
+                .allowTlsInsecureConnection(false)
+                .tlsProtocols(tlsProtocols)
+                .operationTimeout(1000, TimeUnit.MILLISECONDS);
+        if (addCertificates) {
+            Map<String, String> authParams = new HashMap<>();
+            authParams.put(AuthenticationKeyStoreTls.KEYSTORE_TYPE, KEYSTORE_TYPE);
+            authParams.put(AuthenticationKeyStoreTls.KEYSTORE_PATH, CLIENT_KEYSTORE_FILE_PATH);
+            authParams.put(AuthenticationKeyStoreTls.KEYSTORE_PW, CLIENT_KEYSTORE_PW);
+            clientBuilder.authentication(AuthenticationKeyStoreTls.class.getName(), authParams);
+        }
+        pulsarClient = clientBuilder.build();
+    }
+
+    protected void internalSetUpForNamespace() throws Exception {
+        Map<String, String> authParams = new HashMap<>();
+        authParams.put(AuthenticationKeyStoreTls.KEYSTORE_PATH, CLIENT_KEYSTORE_FILE_PATH);
+        authParams.put(AuthenticationKeyStoreTls.KEYSTORE_PW, CLIENT_KEYSTORE_PW);
+
+        if (admin != null) {
+            admin.close();
+        }
+
+        admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrlTls.toString())
+                .useKeyStoreTls(true)
+                .tlsTrustStorePath(BROKER_TRUSTSTORE_FILE_PATH)
+                .tlsTrustStorePassword(BROKER_TRUSTSTORE_PW)
+                .allowTlsInsecureConnection(false)
+                .authentication(AuthenticationKeyStoreTls.class.getName(), authParams).build());
+        admin.clusters().createCluster(clusterName, new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
+                pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls()));
+        admin.tenants().createTenant("my-property",
+                new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("use")));
+        admin.namespaces().createNamespace("my-property/my-ns");
+    }
+
+    /**
+     * verifies that messages whose size is larger than 2^14 bytes (max size of single TLS chunk) can be
+     * produced/consumed
+     *
+     * @throws Exception
+     */
+    @Test(timeOut = 30000)
+    public void testTlsLargeSizeMessage() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        final int MESSAGE_SIZE = 16 * 1024 + 1;
+        log.info("-- message size --", MESSAGE_SIZE);
+        String topicName = "persistent://my-property/use/my-ns/testTlsLargeSizeMessage"
+                           + System.currentTimeMillis();
+
+        internalSetUpForClient(true, pulsar.getBrokerServiceUrlTls());
+        internalSetUpForNamespace();
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
+                .subscriptionName("my-subscriber-name").subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+                .create();
+        for (int i = 0; i < 10; i++) {
+            byte[] message = new byte[MESSAGE_SIZE];
+            Arrays.fill(message, (byte) i);
+            producer.send(message);
+        }
+
+        Message<byte[]> msg = null;
+        for (int i = 0; i < 10; i++) {
+            msg = consumer.receive(5, TimeUnit.SECONDS);
+            byte[] expected = new byte[MESSAGE_SIZE];
+            Arrays.fill(expected, (byte) i);
+            Assert.assertEquals(expected, msg.getData());
+        }
+        // Acknowledge the consumption of all messages at once
+        consumer.acknowledgeCumulative(msg);
+        consumer.close();
+        log.info("-- Exiting {} test --", methodName);
+    }
+
+    @Test(timeOut = 300000)
+    public void testTlsClientAuthOverBinaryProtocol() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        final int MESSAGE_SIZE = 16 * 1024 + 1;
+        log.info("-- message size --", MESSAGE_SIZE);
+        String topicName = "persistent://my-property/use/my-ns/testTlsClientAuthOverBinaryProtocol"
+                           + System.currentTimeMillis();
+
+        internalSetUpForNamespace();
+
+        // Test 1 - Using TLS on binary protocol without sending certs - expect failure
+        internalSetUpForClient(false, pulsar.getBrokerServiceUrlTls());
+
+        try {
+            pulsarClient.newConsumer().topic(topicName)
+                    .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).subscribe();
+            Assert.fail("Server should have failed the TLS handshake since client didn't .");
+        } catch (Exception ex) {
+            // OK
+        }
+
+        // Using TLS on binary protocol - sending certs
+        internalSetUpForClient(true, pulsar.getBrokerServiceUrlTls());
+
+        try {
+            pulsarClient.newConsumer().topic(topicName)
+                    .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).subscribe();
+        } catch (Exception ex) {
+            Assert.fail("Should not fail since certs are sent.");
+        }
+    }
+
+    @Test(timeOut = 30000)
+    public void testTlsClientAuthOverHTTPProtocol() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        final int MESSAGE_SIZE = 16 * 1024 + 1;
+        log.info("-- message size --", MESSAGE_SIZE);
+        String topicName = "persistent://my-property/use/my-ns/testTlsClientAuthOverHTTPProtocol"
+                           + System.currentTimeMillis();
+
+        internalSetUpForNamespace();
+
+        // Test 1 - Using TLS on https without sending certs - expect failure
+        internalSetUpForClient(false, pulsar.getWebServiceAddressTls());
+        try {
+            pulsarClient.newConsumer().topic(topicName)
+                    .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).subscribe();
+            Assert.fail("Server should have failed the TLS handshake since client didn't .");
+        } catch (Exception ex) {
+            // OK
+        }
+
+        // Test 2 - Using TLS on https - sending certs
+        internalSetUpForClient(true, pulsar.getWebServiceAddressTls());
+        try {
+            pulsarClient.newConsumer().topic(topicName)
+                    .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).subscribe();
+        } catch (Exception ex) {
+            Assert.fail("Should not fail since certs are sent.");
+        }
+    }
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithoutAuth.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithoutAuth.java
new file mode 100644
index 0000000..c95f3df
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithoutAuth.java
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.mockito.Mockito.spy;
+
+import com.google.common.collect.Sets;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+// TLS test without authentication and authorization based on KeyStore type config.
+@Slf4j
+public class KeyStoreTlsProducerConsumerTestWithoutAuth extends ProducerConsumerBase {
+    protected final String BROKER_KEYSTORE_FILE_PATH =
+            "./src/test/resources/authentication/keystoretls/broker.keystore.jks";
+    protected final String BROKER_TRUSTSTORE_FILE_PATH =
+            "./src/test/resources/authentication/keystoretls/broker.truststore.jks";
+    protected final String BROKER_KEYSTORE_PW = "111111";
+    protected final String BROKER_TRUSTSTORE_PW = "111111";
+
+    protected final String CLIENT_KEYSTORE_FILE_PATH =
+            "./src/test/resources/authentication/keystoretls/client.keystore.jks";
+    protected final String CLIENT_TRUSTSTORE_FILE_PATH =
+            "./src/test/resources/authentication/keystoretls/client.truststore.jks";
+    protected final String CLIENT_KEYSTORE_PW = "111111";
+    protected final String CLIENT_TRUSTSTORE_PW = "111111";
+
+    protected final String KEYSTORE_TYPE = "JKS";
+
+    private final String clusterName = "use";
+    Set<String> tlsProtocols = Sets.newConcurrentHashSet();
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        // TLS configuration for Broker
+        internalSetUpForBroker();
+
+        // Start Broker
+        super.init();
+    }
+
+    @AfterMethod
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    protected void internalSetUpForBroker() throws Exception {
+        conf.setBrokerServicePortTls(Optional.of(0));
+        conf.setWebServicePortTls(Optional.of(0));
+        conf.setTlsEnabledWithKeyStore(true);
+
+        conf.setTlsKeyStoreType(KEYSTORE_TYPE);
+        conf.setTlsKeyStore(BROKER_KEYSTORE_FILE_PATH);
+        conf.setTlsKeyStorePassword(BROKER_KEYSTORE_PW);
+
+        conf.setTlsTrustStoreType(KEYSTORE_TYPE);
+        conf.setTlsTrustStore(CLIENT_TRUSTSTORE_FILE_PATH);
+        conf.setTlsTrustStorePassword(CLIENT_TRUSTSTORE_PW);
+
+        conf.setClusterName(clusterName);
+        conf.setTlsRequireTrustedClientCertOnConnect(true);
+        tlsProtocols.add("TLSv1.2");
+        conf.setTlsProtocols(tlsProtocols);
+    }
+
+    protected void internalSetUpForClient(boolean addCertificates, String lookupUrl) throws Exception {
+        if (pulsarClient != null) {
+            pulsarClient.close();
+        }
+
+        ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(lookupUrl)
+                .enableTls(true)
+                .useKeyStoreTls(true)
+                .tlsTrustStorePath(BROKER_TRUSTSTORE_FILE_PATH)
+                .tlsTrustStorePassword(BROKER_TRUSTSTORE_PW)
+                .allowTlsInsecureConnection(false)
+                .operationTimeout(1000, TimeUnit.MILLISECONDS);
+        if (addCertificates) {
+            Map<String, String> authParams = new HashMap<>();
+            authParams.put(AuthenticationKeyStoreTls.KEYSTORE_TYPE, KEYSTORE_TYPE);
+            authParams.put(AuthenticationKeyStoreTls.KEYSTORE_PATH, CLIENT_KEYSTORE_FILE_PATH);
+            authParams.put(AuthenticationKeyStoreTls.KEYSTORE_PW, CLIENT_KEYSTORE_PW);
+            clientBuilder.authentication(AuthenticationKeyStoreTls.class.getName(), authParams);
+        }
+        pulsarClient = clientBuilder.build();
+    }
+
+    protected void internalSetUpForNamespace() throws Exception {
+        Map<String, String> authParams = new HashMap<>();
+        authParams.put(AuthenticationKeyStoreTls.KEYSTORE_PATH, CLIENT_KEYSTORE_FILE_PATH);
+        authParams.put(AuthenticationKeyStoreTls.KEYSTORE_PW, CLIENT_KEYSTORE_PW);
+
+        if (admin != null) {
+            admin.close();
+        }
+
+        admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrlTls.toString())
+                .useKeyStoreTls(true)
+                .tlsTrustStorePath(BROKER_TRUSTSTORE_FILE_PATH)
+                .tlsTrustStorePassword(BROKER_TRUSTSTORE_PW)
+                .allowTlsInsecureConnection(true)
+                .authentication(AuthenticationKeyStoreTls.class.getName(), authParams).build());
+        admin.clusters().createCluster(clusterName, new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
+                pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls()));
+        admin.tenants().createTenant("my-property",
+                new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("use")));
+        admin.namespaces().createNamespace("my-property/my-ns");
+    }
+
+    /**
+     * verifies that messages whose size is larger than 2^14 bytes (max size of single TLS chunk) can be
+     * produced/consumed
+     *
+     * @throws Exception
+     */
+    @Test(timeOut = 30000)
+    public void testTlsLargeSizeMessage() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        final int MESSAGE_SIZE = 16 * 1024 + 1;
+        log.info("-- message size --", MESSAGE_SIZE);
+        String topicName = "persistent://my-property/use/my-ns/testTlsLargeSizeMessage"
+                           + System.currentTimeMillis();
+
+        internalSetUpForClient(true, pulsar.getBrokerServiceUrlTls());
+        internalSetUpForNamespace();
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
+                .subscriptionName("my-subscriber-name").subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+                .create();
+        for (int i = 0; i < 10; i++) {
+            byte[] message = new byte[MESSAGE_SIZE];
+            Arrays.fill(message, (byte) i);
+            producer.send(message);
+        }
+
+        Message<byte[]> msg = null;
+        for (int i = 0; i < 10; i++) {
+            msg = consumer.receive(5, TimeUnit.SECONDS);
+            byte[] expected = new byte[MESSAGE_SIZE];
+            Arrays.fill(expected, (byte) i);
+            Assert.assertEquals(expected, msg.getData());
+        }
+        // Acknowledge the consumption of all messages at once
+        consumer.acknowledgeCumulative(msg);
+        consumer.close();
+        log.info("-- Exiting {} test --", methodName);
+    }
+
+    @Test(timeOut = 300000)
+    public void testTlsClientAuthOverBinaryProtocol() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        final int MESSAGE_SIZE = 16 * 1024 + 1;
+        log.info("-- message size --", MESSAGE_SIZE);
+        String topicName = "persistent://my-property/use/my-ns/testTlsClientAuthOverBinaryProtocol"
+                           + System.currentTimeMillis();
+
+        internalSetUpForNamespace();
+
+        // Test 1 - Using TLS on binary protocol without sending certs - expect failure
+        internalSetUpForClient(false, pulsar.getBrokerServiceUrlTls());
+
+        try {
+            pulsarClient.newConsumer().topic(topicName)
+                    .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).subscribe();
+            Assert.fail("Server should have failed the TLS handshake since client didn't .");
+        } catch (Exception ex) {
+            // OK
+        }
+
+        // Test 2 - Using TLS on binary protocol - sending certs
+        internalSetUpForClient(true, pulsar.getBrokerServiceUrlTls());
+        try {
+            pulsarClient.newConsumer().topic(topicName)
+                    .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).subscribe();
+        } catch (Exception ex) {
+            Assert.fail("Should not fail since certs are sent.");
+        }
+    }
+
+    @Test(timeOut = 30000)
+    public void testTlsClientAuthOverHTTPProtocol() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        final int MESSAGE_SIZE = 16 * 1024 + 1;
+        log.info("-- message size --", MESSAGE_SIZE);
+        String topicName = "persistent://my-property/use/my-ns/testTlsClientAuthOverHTTPProtocol"
+                           + System.currentTimeMillis();
+
+        internalSetUpForNamespace();
+
+        // Test 1 - Using TLS on https without sending certs - expect failure
+        internalSetUpForClient(false, pulsar.getWebServiceAddressTls());
+        try {
+            pulsarClient.newConsumer().topic(topicName)
+                    .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).subscribe();
+            Assert.fail("Server should have failed the TLS handshake since client didn't .");
+        } catch (Exception ex) {
+            // OK
+        }
+
+        // Test 2 - Using TLS on https - sending certs
+        internalSetUpForClient(true, pulsar.getWebServiceAddressTls());
+        try {
+            pulsarClient.newConsumer().topic(topicName)
+                    .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).subscribe();
+        } catch (Exception ex) {
+            Assert.fail("Should not fail since certs are sent.");
+        }
+    }
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsTest.java
new file mode 100644
index 0000000..0f9993d
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsTest.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.apache.pulsar.common.util.SecurityUtility.getProvider;
+
+import java.security.Provider;
+import javax.net.ssl.SSLContext;
+import org.apache.pulsar.common.util.keystoretls.KeyStoreSSLContext;
+import org.apache.pulsar.common.util.keystoretls.SSLContextValidatorEngine;
+import org.testng.annotations.Test;
+
+public class KeyStoreTlsTest {
+
+    protected final String BROKER_KEYSTORE_FILE_PATH =
+            "./src/test/resources/authentication/keystoretls/broker.keystore.jks";
+    protected final String BROKER_TRUSTSTORE_FILE_PATH =
+            "./src/test/resources/authentication/keystoretls/broker.truststore.jks";
+    protected final String BROKER_KEYSTORE_PW = "111111";
+    protected final String BROKER_TRUSTSTORE_PW = "111111";
+
+    protected final String CLIENT_KEYSTORE_FILE_PATH =
+            "./src/test/resources/authentication/keystoretls/client.keystore.jks";
+    protected final String CLIENT_TRUSTSTORE_FILE_PATH =
+            "./src/test/resources/authentication/keystoretls/client.truststore.jks";
+    protected final String CLIENT_KEYSTORE_PW = "111111";
+    protected final String CLIENT_TRUSTSTORE_PW = "111111";
+    protected final String KEYSTORE_TYPE = "JKS";
+
+    public static final Provider BC_PROVIDER = getProvider();
+
+    @Test(timeOut = 300000)
+    public void testValidate() throws Exception {
+        KeyStoreSSLContext serverSSLContext = new KeyStoreSSLContext(KeyStoreSSLContext.Mode.SERVER,
+                null,
+                KEYSTORE_TYPE,
+                BROKER_KEYSTORE_FILE_PATH,
+                BROKER_KEYSTORE_PW,
+                false,
+                KEYSTORE_TYPE,
+                BROKER_TRUSTSTORE_FILE_PATH,
+                BROKER_TRUSTSTORE_PW,
+                true,
+                null,
+                null);
+        SSLContext serverCnx = serverSSLContext.createSSLContext();
+
+        KeyStoreSSLContext clientSSLContext = new KeyStoreSSLContext(KeyStoreSSLContext.Mode.CLIENT,
+                null,
+                KEYSTORE_TYPE,
+                CLIENT_KEYSTORE_FILE_PATH,
+                CLIENT_KEYSTORE_PW,
+                false,
+                KEYSTORE_TYPE,
+                CLIENT_TRUSTSTORE_FILE_PATH,
+                CLIENT_TRUSTSTORE_PW,
+                false,
+                null,
+                null);
+        SSLContext clientCnx = clientSSLContext.createSSLContext();
+
+        SSLContextValidatorEngine.validate(clientCnx, serverCnx);
+    }
+}
diff --git a/pulsar-broker/src/test/resources/authentication/keystoretls/broker.keystore.jks b/pulsar-broker/src/test/resources/authentication/keystoretls/broker.keystore.jks
new file mode 100644
index 0000000..b4fec69
Binary files /dev/null and b/pulsar-broker/src/test/resources/authentication/keystoretls/broker.keystore.jks differ
diff --git a/pulsar-broker/src/test/resources/authentication/keystoretls/broker.truststore.jks b/pulsar-broker/src/test/resources/authentication/keystoretls/broker.truststore.jks
new file mode 100644
index 0000000..8ac03d8
Binary files /dev/null and b/pulsar-broker/src/test/resources/authentication/keystoretls/broker.truststore.jks differ
diff --git a/pulsar-broker/src/test/resources/authentication/keystoretls/brokerKeyStorePW.txt b/pulsar-broker/src/test/resources/authentication/keystoretls/brokerKeyStorePW.txt
new file mode 100644
index 0000000..90d2950
--- /dev/null
+++ b/pulsar-broker/src/test/resources/authentication/keystoretls/brokerKeyStorePW.txt
@@ -0,0 +1 @@
+111111
diff --git a/pulsar-broker/src/test/resources/authentication/keystoretls/brokerTrustStorePW.txt b/pulsar-broker/src/test/resources/authentication/keystoretls/brokerTrustStorePW.txt
new file mode 100644
index 0000000..90d2950
--- /dev/null
+++ b/pulsar-broker/src/test/resources/authentication/keystoretls/brokerTrustStorePW.txt
@@ -0,0 +1 @@
+111111
diff --git a/pulsar-broker/src/test/resources/authentication/keystoretls/client.keystore.jks b/pulsar-broker/src/test/resources/authentication/keystoretls/client.keystore.jks
new file mode 100644
index 0000000..499c8be
Binary files /dev/null and b/pulsar-broker/src/test/resources/authentication/keystoretls/client.keystore.jks differ
diff --git a/pulsar-broker/src/test/resources/authentication/keystoretls/client.truststore.jks b/pulsar-broker/src/test/resources/authentication/keystoretls/client.truststore.jks
new file mode 100644
index 0000000..8eaa06b
Binary files /dev/null and b/pulsar-broker/src/test/resources/authentication/keystoretls/client.truststore.jks differ
diff --git a/pulsar-broker/src/test/resources/authentication/keystoretls/clientKeyStorePW.txt b/pulsar-broker/src/test/resources/authentication/keystoretls/clientKeyStorePW.txt
new file mode 100644
index 0000000..90d2950
--- /dev/null
+++ b/pulsar-broker/src/test/resources/authentication/keystoretls/clientKeyStorePW.txt
@@ -0,0 +1 @@
+111111
diff --git a/pulsar-broker/src/test/resources/authentication/keystoretls/clientTrustStorePW.txt b/pulsar-broker/src/test/resources/authentication/keystoretls/clientTrustStorePW.txt
new file mode 100644
index 0000000..90d2950
--- /dev/null
+++ b/pulsar-broker/src/test/resources/authentication/keystoretls/clientTrustStorePW.txt
@@ -0,0 +1 @@
+111111
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java
index 3f6dbf4..5bcc725 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.admin;
 
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.pulsar.client.api.Authentication;
@@ -171,6 +172,63 @@ public interface PulsarAdminBuilder {
     PulsarAdminBuilder enableTlsHostnameVerification(boolean enableTlsHostnameVerification);
 
     /**
+     * If Tls is enabled, whether use KeyStore type as tls configuration parameter.
+     * False means use default pem type configuration.
+     *
+     * @param useKeyStoreTls
+     */
+    PulsarAdminBuilder useKeyStoreTls(boolean useKeyStoreTls);
+
+    /**
+     * The name of the security provider used for SSL connections.
+     * Default value is the default security provider of the JVM.
+     *
+     * @param sslProvider
+     */
+    PulsarAdminBuilder sslProvider(String sslProvider);
+
+    /**
+     * The file format of the trust store file.
+     *
+     * @param tlsTrustStoreType
+     */
+    PulsarAdminBuilder tlsTrustStoreType(String tlsTrustStoreType);
+
+    /**
+     * The location of the trust store file.
+     *
+     * @param tlsTrustStorePath
+     */
+    PulsarAdminBuilder tlsTrustStorePath(String tlsTrustStorePath);
+
+    /**
+     * The store password for the key store file.
+     *
+     * @param tlsTrustStorePassword
+     * @return the client builder instance
+     */
+    PulsarAdminBuilder tlsTrustStorePassword(String tlsTrustStorePassword);
+
+    /**
+     * A list of cipher suites.
+     * This is a named combination of authentication, encryption, MAC and key exchange algorithm
+     * used to negotiate the security settings for a network connection using TLS or SSL network protocol.
+     * By default all the available cipher suites are supported.
+     *
+     * @param tlsCiphers
+     */
+    PulsarAdminBuilder tlsCiphers(Set<String> tlsCiphers);
+
+    /**
+     * The SSL protocol used to generate the SSLContext.
+     * Default setting is TLS, which is fine for most cases.
+     * Allowed values in recent JVMs are TLS, TLSv1.1 and TLSv1.2. SSL, SSLv2.
+     *
+     * @param tlsProtocols
+     */
+    PulsarAdminBuilder tlsProtocols(Set<String> tlsProtocols);
+
+    /**
      * This sets the connection time out for the pulsar admin client.
      *
      * @param connectionTimeout
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
index 6cc4d69..d62ac33 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.admin.internal;
 
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -104,6 +105,48 @@ public class PulsarAdminBuilderImpl implements PulsarAdminBuilder {
     }
 
     @Override
+    public PulsarAdminBuilder useKeyStoreTls(boolean useKeyStoreTls) {
+        conf.setUseKeyStoreTls(useKeyStoreTls);
+        return this;
+    }
+
+    @Override
+    public PulsarAdminBuilder sslProvider(String sslProvider) {
+        conf.setSslProvider(sslProvider);
+        return this;
+    }
+
+    @Override
+    public PulsarAdminBuilder tlsTrustStoreType(String tlsTrustStoreType) {
+        conf.setTlsTrustStoreType(tlsTrustStoreType);
+        return this;
+    }
+
+    @Override
+    public PulsarAdminBuilder tlsTrustStorePath(String tlsTrustStorePath) {
+        conf.setTlsTrustStorePath(tlsTrustStorePath);
+        return this;
+    }
+
+    @Override
+    public PulsarAdminBuilder tlsTrustStorePassword(String tlsTrustStorePassword) {
+        conf.setTlsTrustStorePassword(tlsTrustStorePassword);
+        return this;
+    }
+
+    @Override
+    public PulsarAdminBuilder tlsCiphers(Set<String> tlsCiphers) {
+        conf.setTlsCiphers(tlsCiphers);
+        return this;
+    }
+
+    @Override
+    public PulsarAdminBuilder tlsProtocols(Set<String> tlsProtocols) {
+        conf.setTlsProtocols(tlsProtocols);
+        return this;
+    }
+
+    @Override
     public PulsarAdminBuilder connectionTimeout(int connectionTimeout, TimeUnit connectionTimeoutUnit) {
         this.connectTimeout = connectionTimeout;
         this.connectTimeoutUnit = connectionTimeoutUnit;
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
index dd4a83e..27bc25c 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
@@ -38,6 +38,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.function.Function;
 import java.util.function.Supplier;
 
+import javax.net.ssl.SSLContext;
 import javax.ws.rs.client.Client;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.Response.Status;
@@ -50,9 +51,11 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.api.KeyStoreParams;
 import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.util.SecurityUtility;
+import org.apache.pulsar.common.util.keystoretls.KeyStoreSSLContext;
 import org.asynchttpclient.AsyncHttpClient;
 import org.asynchttpclient.BoundRequestBuilder;
 import org.asynchttpclient.DefaultAsyncHttpClient;
@@ -60,6 +63,7 @@ import org.asynchttpclient.DefaultAsyncHttpClientConfig;
 import org.asynchttpclient.Request;
 import org.asynchttpclient.Response;
 import org.asynchttpclient.channel.DefaultKeepAliveStrategy;
+import org.asynchttpclient.netty.ssl.JsseSslEngineFactory;
 import org.glassfish.jersey.client.ClientProperties;
 import org.glassfish.jersey.client.ClientRequest;
 import org.glassfish.jersey.client.ClientResponse;
@@ -109,24 +113,41 @@ public class AsyncHttpConnector implements Connector {
         if (conf != null && StringUtils.isNotBlank(conf.getServiceUrl())) {
             serviceNameResolver.updateServiceUrl(conf.getServiceUrl());
             if (conf.getServiceUrl().startsWith("https://")) {
-
-                SslContext sslCtx = null;
-
                 // Set client key and certificate if available
                 AuthenticationDataProvider authData = conf.getAuthentication().getAuthData();
-                if (authData.hasDataForTls()) {
-                    sslCtx = SecurityUtility.createNettySslContextForClient(
+
+                if (conf.isUseKeyStoreTls()) {
+                    KeyStoreParams params = authData.hasDataForTls() ? authData.getTlsKeyStoreParams() : null;
+
+                    final SSLContext sslCtx = KeyStoreSSLContext.createClientSslContext(
+                            conf.getSslProvider(),
+                            params != null ? params.getKeyStoreType() : null,
+                            params != null ? params.getKeyStorePath() : null,
+                            params != null ? params.getKeyStorePassword() : null,
                             conf.isTlsAllowInsecureConnection() || !conf.isTlsHostnameVerificationEnable(),
-                            conf.getTlsTrustCertsFilePath(),
-                            authData.getTlsCertificates(),
-                            authData.getTlsPrivateKey());
+                            conf.getTlsTrustStoreType(),
+                            conf.getTlsTrustStorePath(),
+                            conf.getTlsTrustStorePassword(),
+                            conf.getTlsCiphers(),
+                            conf.getTlsProtocols());
+
+                    JsseSslEngineFactory sslEngineFactory = new JsseSslEngineFactory(sslCtx);
+                    confBuilder.setSslEngineFactory(sslEngineFactory);
                 } else {
-                    sslCtx = SecurityUtility.createNettySslContextForClient(
-                            conf.isTlsAllowInsecureConnection() || !conf.isTlsHostnameVerificationEnable(),
-                            conf.getTlsTrustCertsFilePath());
+                    SslContext sslCtx = null;
+                    if (authData.hasDataForTls()) {
+                        sslCtx = SecurityUtility.createNettySslContextForClient(
+                                conf.isTlsAllowInsecureConnection() || !conf.isTlsHostnameVerificationEnable(),
+                                conf.getTlsTrustCertsFilePath(),
+                                authData.getTlsCertificates(),
+                                authData.getTlsPrivateKey());
+                    } else {
+                        sslCtx = SecurityUtility.createNettySslContextForClient(
+                                conf.isTlsAllowInsecureConnection() || !conf.isTlsHostnameVerificationEnable(),
+                                conf.getTlsTrustCertsFilePath());
+                    }
+                    confBuilder.setSslContext(sslCtx);
                 }
-
-                confBuilder.setSslContext(sslCtx);
             }
         }
         httpClient = new DefaultAsyncHttpClient(confBuilder.build());
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationDataProvider.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationDataProvider.java
index 122dd5b..77eafe5 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationDataProvider.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationDataProvider.java
@@ -63,6 +63,15 @@ public interface AuthenticationDataProvider extends Serializable {
         return null;
     }
 
+    /**
+     * Used for TLS authentication with keystore type.
+     *
+     * @return a KeyStoreParams for the client certificate chain, or null if the data are not available
+     */
+    default KeyStoreParams getTlsKeyStoreParams() {
+        return null;
+    }
+
     /*
      * HTTP
      */
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
index addedaa..e84f8ba 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.api;
 
 import java.time.Clock;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
 
@@ -290,6 +291,69 @@ public interface ClientBuilder extends Cloneable {
     ClientBuilder enableTlsHostnameVerification(boolean enableTlsHostnameVerification);
 
     /**
+     * If Tls is enabled, whether use KeyStore type as tls configuration parameter.
+     * False means use default pem type configuration.
+     *
+     * @param useKeyStoreTls
+     * @return the client builder instance
+     */
+    ClientBuilder useKeyStoreTls(boolean useKeyStoreTls);
+
+    /**
+     * The name of the security provider used for SSL connections.
+     * Default value is the default security provider of the JVM.
+     *
+     * @param sslProvider
+     * @return the client builder instance
+     */
+    ClientBuilder sslProvider(String sslProvider);
+
+    /**
+     * The file format of the trust store file.
+     *
+     * @param tlsTrustStoreType
+     * @return the client builder instance
+     */
+    ClientBuilder tlsTrustStoreType(String tlsTrustStoreType);
+
+    /**
+     * The location of the trust store file.
+     *
+     * @param tlsTrustStorePath
+     * @return the client builder instance
+     */
+    ClientBuilder tlsTrustStorePath(String tlsTrustStorePath);
+
+    /**
+     * The store password for the key store file.
+     *
+     * @param tlsTrustStorePassword
+     * @return the client builder instance
+     */
+    ClientBuilder tlsTrustStorePassword(String tlsTrustStorePassword);
+
+    /**
+     * A list of cipher suites.
+     * This is a named combination of authentication, encryption, MAC and key exchange algorithm
+     * used to negotiate the security settings for a network connection using TLS or SSL network protocol.
+     * By default all the available cipher suites are supported.
+     *
+     * @param tlsCiphers
+     * @return the client builder instance
+     */
+    ClientBuilder tlsCiphers(Set<String> tlsCiphers);
+
+    /**
+     * The SSL protocol used to generate the SSLContext.
+     * Default setting is TLS, which is fine for most cases.
+     * Allowed values in recent JVMs are TLS, TLSv1.1 and TLSv1.2. SSL, SSLv2.
+     *
+     * @param tlsProtocols
+     * @return the client builder instance
+     */
+    ClientBuilder tlsProtocols(Set<String> tlsProtocols);
+
+    /**
      * Set the interval between each stat info <i>(default: 60 seconds)</i> Stats will be activated with positive
      * statsInterval It should be set to at least 1 second.
      *
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataHttps.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/KeyStoreParams.java
similarity index 54%
copy from pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataHttps.java
copy to pulsar-client-api/src/main/java/org/apache/pulsar/client/api/KeyStoreParams.java
index 03a9bd3..5759801 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataHttps.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/KeyStoreParams.java
@@ -16,33 +16,20 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.broker.authentication;
+package org.apache.pulsar.client.api;
 
-import java.security.cert.X509Certificate;
-
-import javax.servlet.http.HttpServletRequest;
-
-public class AuthenticationDataHttps extends AuthenticationDataHttp {
-
-    protected final X509Certificate[] certificates;
-
-    public AuthenticationDataHttps(HttpServletRequest request) {
-        super(request);
-        certificates = (X509Certificate[]) request.getAttribute("javax.servlet.request.X509Certificate");
-    }
-
-    /*
-     * TLS
-     */
-
-    @Override
-    public boolean hasDataFromTls() {
-        return (certificates != null);
-    }
-
-    @Override
-    public X509Certificate[] getTlsCertificates() {
-        return certificates;
-    }
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
 
+/**
+ * KeyStore parameters used for tls authentication.
+ */
+@Data
+@Builder
+@AllArgsConstructor
+public class KeyStoreParams{
+    private String keyStoreType;
+    private String keyStorePath;
+    private String keyStorePassword;
 }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
index 148fd6b..ccc2854 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
@@ -53,16 +53,22 @@ public class PulsarAdminTool {
 
     @Parameter(names = { "--tls-allow-insecure" }, description = "Allow TLS insecure connection")
     Boolean tlsAllowInsecureConnection;
-    
+
     @Parameter(names = { "--tls-trust-cert-path" }, description = "Allow TLS trust cert file path")
     String tlsTrustCertsFilePath;
-    
+
     @Parameter(names = { "--tls-enable-hostname-verification" }, description = "Enable TLS common name verification")
     Boolean tlsEnableHostnameVerification;
 
     @Parameter(names = { "-h", "--help", }, help = true, description = "Show this help.")
     boolean help;
 
+    // for tls with keystore type config
+    boolean useKeyStoreTls = false;
+    String tlsTrustStoreType = "JKS";
+    String tlsTrustStorePath = null;
+    String tlsTrustStorePassword = null;
+
     PulsarAdminTool(Properties properties) throws Exception {
         // fallback to previous-version serviceUrl property to maintain backward-compatibility
         serviceUrl = StringUtils.isNotBlank(properties.getProperty("webServiceUrl"))
@@ -80,9 +86,19 @@ public class PulsarAdminTool {
                 ? this.tlsTrustCertsFilePath
                 : properties.getProperty("tlsTrustCertsFilePath");
 
+        this.useKeyStoreTls = Boolean
+                .parseBoolean(properties.getProperty("useKeyStoreTls", "false"));
+        this.tlsTrustStoreType = properties.getProperty("tlsTrustStoreType", "JKS");
+        this.tlsTrustStorePath = properties.getProperty("tlsTrustStorePath");
+        this.tlsTrustStorePassword = properties.getProperty("tlsTrustStorePassword");
+
         adminBuilder = PulsarAdmin.builder().allowTlsInsecureConnection(tlsAllowInsecureConnection)
                 .enableTlsHostnameVerification(tlsEnableHostnameVerification)
-                .tlsTrustCertsFilePath(tlsTrustCertsFilePath);
+                .tlsTrustCertsFilePath(tlsTrustCertsFilePath)
+                .useKeyStoreTls(useKeyStoreTls)
+                .tlsTrustStoreType(tlsTrustStoreType)
+                .tlsTrustStorePath(tlsTrustStorePath)
+                .tlsTrustStorePassword(tlsTrustStorePassword);
 
         jcommander = new JCommander();
         jcommander.setProgramName("pulsar-admin");
@@ -108,7 +124,7 @@ public class PulsarAdminTool {
         commandMap.put("resource-quotas", CmdResourceQuotas.class);
         // pulsar-proxy cli
         commandMap.put("proxy-stats", CmdProxyStats.class);
-        
+
         commandMap.put("functions", CmdFunctions.class);
         commandMap.put("functions-worker", CmdFunctionWorker.class);
         commandMap.put("sources", CmdSources.class);
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java
index 2c38b34..b86bc79 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java
@@ -60,6 +60,12 @@ public class PulsarClientTool {
     boolean tlsEnableHostnameVerification = false;
     String tlsTrustCertsFilePath = null;
 
+    // for tls with keystore type config
+    boolean useKeyStoreTls = false;
+    String tlsTrustStoreType = "JKS";
+    String tlsTrustStorePath = null;
+    String tlsTrustStorePassword = null;
+
     JCommander commandParser;
     CmdProduce produceCommand;
     CmdConsume consumeCommand;
@@ -79,6 +85,12 @@ public class PulsarClientTool {
                 .parseBoolean(properties.getProperty("tlsEnableHostnameVerification", "false"));
         this.tlsTrustCertsFilePath = properties.getProperty("tlsTrustCertsFilePath");
 
+        this.useKeyStoreTls = Boolean
+                .parseBoolean(properties.getProperty("useKeyStoreTls", "false"));
+        this.tlsTrustStoreType = properties.getProperty("tlsTrustStoreType", "JKS");
+        this.tlsTrustStorePath = properties.getProperty("tlsTrustStorePath");
+        this.tlsTrustStorePassword = properties.getProperty("tlsTrustStorePassword");
+
         produceCommand = new CmdProduce();
         consumeCommand = new CmdConsume();
 
@@ -99,6 +111,12 @@ public class PulsarClientTool {
         clientBuilder.allowTlsInsecureConnection(this.tlsAllowInsecureConnection);
         clientBuilder.tlsTrustCertsFilePath(this.tlsTrustCertsFilePath);
         clientBuilder.serviceUrl(serviceURL);
+
+        clientBuilder.useKeyStoreTls(useKeyStoreTls)
+                .tlsTrustStoreType(tlsTrustStoreType)
+                .tlsTrustStorePath(tlsTrustStorePath)
+                .tlsTrustStorePassword(tlsTrustStorePassword);
+
         this.produceCommand.updateConfig(clientBuilder, authentication, this.serviceURL);
         this.consumeCommand.updateConfig(clientBuilder, authentication, this.serviceURL);
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
index 16ea688..3283166 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.impl;
 
 import java.time.Clock;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang3.StringUtils;
@@ -174,6 +175,48 @@ public class ClientBuilderImpl implements ClientBuilder {
     }
 
     @Override
+    public ClientBuilder useKeyStoreTls(boolean useKeyStoreTls) {
+        conf.setUseKeyStoreTls(useKeyStoreTls);
+        return this;
+    }
+
+    @Override
+    public ClientBuilder sslProvider(String sslProvider) {
+        conf.setSslProvider(sslProvider);
+        return this;
+    }
+
+    @Override
+    public ClientBuilder tlsTrustStoreType(String tlsTrustStoreType) {
+        conf.setTlsTrustStoreType(tlsTrustStoreType);
+        return this;
+    }
+
+    @Override
+    public ClientBuilder tlsTrustStorePath(String tlsTrustStorePath) {
+        conf.setTlsTrustStorePath(tlsTrustStorePath);
+        return this;
+    }
+
+    @Override
+    public ClientBuilder tlsTrustStorePassword(String tlsTrustStorePassword) {
+        conf.setTlsTrustStorePassword(tlsTrustStorePassword);
+        return this;
+    }
+
+    @Override
+    public ClientBuilder tlsCiphers(Set<String> tlsCiphers) {
+        conf.setTlsCiphers(tlsCiphers);
+        return this;
+    }
+
+    @Override
+    public ClientBuilder tlsProtocols(Set<String> tlsProtocols) {
+        conf.setTlsProtocols(tlsProtocols);
+        return this;
+    }
+
+    @Override
     public ClientBuilder statsInterval(long statsInterval, TimeUnit unit) {
         conf.setStatsIntervalSeconds(unit.toSeconds(statsInterval));
         return this;
@@ -214,13 +257,13 @@ public class ClientBuilderImpl implements ClientBuilder {
     	conf.setInitialBackoffIntervalNanos(unit.toNanos(duration));
     	return this;
     }
-    
+
     @Override
     public ClientBuilder maxBackoffInterval(long duration, TimeUnit unit) {
     	conf.setMaxBackoffIntervalNanos(unit.toNanos(duration));
     	return this;
     }
-    
+
     public ClientConfigurationData getClientConfigurationData() {
         return conf;
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
index 845c741..3e693ad 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
@@ -32,14 +32,18 @@ import io.netty.channel.EventLoopGroup;
 import io.netty.handler.codec.http.HttpRequest;
 import io.netty.handler.codec.http.HttpResponse;
 import io.netty.handler.ssl.SslContext;
+import javax.net.ssl.SSLContext;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.api.KeyStoreParams;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.PulsarClientException.NotFoundException;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.common.util.SecurityUtility;
+import org.apache.pulsar.common.util.keystoretls.KeyStoreSSLContext;
 import org.asynchttpclient.AsyncHttpClient;
 import org.asynchttpclient.AsyncHttpClientConfig;
 import org.asynchttpclient.BoundRequestBuilder;
@@ -47,6 +51,7 @@ import org.asynchttpclient.DefaultAsyncHttpClient;
 import org.asynchttpclient.DefaultAsyncHttpClientConfig;
 import org.asynchttpclient.Request;
 import org.asynchttpclient.channel.DefaultKeepAliveStrategy;
+import org.asynchttpclient.netty.ssl.JsseSslEngineFactory;
 
 
 @Slf4j
@@ -59,24 +64,15 @@ public class HttpClient implements Closeable {
     protected final ServiceNameResolver serviceNameResolver;
     protected final Authentication authentication;
 
-    protected HttpClient(String serviceUrl, Authentication authentication,
-            EventLoopGroup eventLoopGroup, boolean tlsAllowInsecureConnection, String tlsTrustCertsFilePath)
-            throws PulsarClientException {
-        this(serviceUrl, authentication, eventLoopGroup, tlsAllowInsecureConnection,
-                tlsTrustCertsFilePath, DEFAULT_CONNECT_TIMEOUT_IN_SECONDS, DEFAULT_READ_TIMEOUT_IN_SECONDS);
-    }
-
-    protected HttpClient(String serviceUrl, Authentication authentication,
-            EventLoopGroup eventLoopGroup, boolean tlsAllowInsecureConnection, String tlsTrustCertsFilePath,
-            int connectTimeoutInSeconds, int readTimeoutInSeconds) throws PulsarClientException {
-        this.authentication = authentication;
+    protected HttpClient(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException {
+        this.authentication = conf.getAuthentication();
         this.serviceNameResolver = new PulsarServiceNameResolver();
-        this.serviceNameResolver.updateServiceUrl(serviceUrl);
+        this.serviceNameResolver.updateServiceUrl(conf.getServiceUrl());
 
         DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder();
         confBuilder.setFollowRedirect(true);
-        confBuilder.setConnectTimeout(connectTimeoutInSeconds * 1000);
-        confBuilder.setReadTimeout(readTimeoutInSeconds * 1000);
+        confBuilder.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_IN_SECONDS * 1000);
+        confBuilder.setReadTimeout(DEFAULT_READ_TIMEOUT_IN_SECONDS * 1000);
         confBuilder.setUserAgent(String.format("Pulsar-Java-v%s", PulsarVersion.getVersion()));
         confBuilder.setKeepAliveStrategy(new DefaultKeepAliveStrategy() {
             @Override
@@ -88,19 +84,45 @@ public class HttpClient implements Closeable {
 
         if ("https".equals(serviceNameResolver.getServiceUri().getServiceName())) {
             try {
-                SslContext sslCtx = null;
-
                 // Set client key and certificate if available
                 AuthenticationDataProvider authData = authentication.getAuthData();
-                if (authData.hasDataForTls()) {
-                    sslCtx = SecurityUtility.createNettySslContextForClient(tlsAllowInsecureConnection, tlsTrustCertsFilePath,
-                            authData.getTlsCertificates(), authData.getTlsPrivateKey());
+
+                if (conf.isUseKeyStoreTls()) {
+                    SSLContext sslCtx = null;
+                    KeyStoreParams params = authData.hasDataForTls() ? authData.getTlsKeyStoreParams() : null;
+
+                    sslCtx = KeyStoreSSLContext.createClientSslContext(
+                            conf.getSslProvider(),
+                            params != null ? params.getKeyStoreType() : null,
+                            params != null ? params.getKeyStorePath() : null,
+                            params != null ? params.getKeyStorePassword() : null,
+                            conf.isTlsAllowInsecureConnection(),
+                            conf.getTlsTrustStoreType(),
+                            conf.getTlsTrustStorePath(),
+                            conf.getTlsTrustStorePassword(),
+                            conf.getTlsCiphers(),
+                            conf.getTlsProtocols());
+
+                    JsseSslEngineFactory sslEngineFactory = new JsseSslEngineFactory(sslCtx);
+                    confBuilder.setSslEngineFactory(sslEngineFactory);
                 } else {
-                    sslCtx = SecurityUtility.createNettySslContextForClient(tlsAllowInsecureConnection, tlsTrustCertsFilePath);
+                    SslContext sslCtx = null;
+                    if (authData.hasDataForTls()) {
+                        sslCtx = SecurityUtility.createNettySslContextForClient(
+                                conf.isTlsAllowInsecureConnection(),
+                                conf.getTlsTrustCertsFilePath(),
+                                authData.getTlsCertificates(),
+                                authData.getTlsPrivateKey());
+                    }
+                    else {
+                        sslCtx = SecurityUtility.createNettySslContextForClient(
+                                conf.isTlsAllowInsecureConnection(),
+                                conf.getTlsTrustCertsFilePath());
+                    }
+                    confBuilder.setSslContext(sslCtx);
                 }
 
-                confBuilder.setSslContext(sslCtx);
-                confBuilder.setUseInsecureTrustManager(tlsAllowInsecureConnection);
+                confBuilder.setUseInsecureTrustManager(conf.isTlsAllowInsecureConnection());
             } catch (Exception e) {
                 throw new PulsarClientException.InvalidConfigurationException(e);
             }
@@ -109,7 +131,7 @@ public class HttpClient implements Closeable {
         AsyncHttpClientConfig config = confBuilder.build();
         httpClient = new DefaultAsyncHttpClient(config);
 
-        log.debug("Using HTTP url: {}", serviceUrl);
+        log.debug("Using HTTP url: {}", conf.getServiceUrl());
     }
 
     String getServiceUrl() {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
index 602b7aa..da7f148 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
@@ -57,8 +57,7 @@ public class HttpLookupService implements LookupService {
 
     public HttpLookupService(ClientConfigurationData conf, EventLoopGroup eventLoopGroup)
             throws PulsarClientException {
-        this.httpClient = new HttpClient(conf.getServiceUrl(), conf.getAuthentication(),
-                eventLoopGroup, conf.isTlsAllowInsecureConnection(), conf.getTlsTrustCertsFilePath());
+        this.httpClient = new HttpClient(conf, eventLoopGroup);
         this.useTls = conf.isUseTls();
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
index a932253..4a145e7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
@@ -18,30 +18,34 @@
  */
 package org.apache.pulsar.client.impl;
 
-import java.security.cert.X509Certificate;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
-
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import io.netty.handler.ssl.SslContext;
-
+import io.netty.handler.ssl.SslHandler;
+import java.security.cert.X509Certificate;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.util.ObjectCache;
 import org.apache.pulsar.common.protocol.ByteBufPair;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.util.SecurityUtility;
+import org.apache.pulsar.common.util.keystoretls.NettySSLContextAutoRefreshBuilder;
 
+@Slf4j
 public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel> {
 
     public static final String TLS_HANDLER = "tls";
 
     private final Supplier<ClientCnx> clientCnxSupplier;
     private final boolean tlsEnabled;
+    private final boolean tlsEnabledWithKeyStore;
 
     private final Supplier<SslContext> sslContextSupplier;
+    private NettySSLContextAutoRefreshBuilder nettySSLContextAutoRefreshBuilder;
 
     private static final long TLS_CERTIFICATE_CACHE_MILLIS = TimeUnit.MINUTES.toMillis(1);
 
@@ -50,8 +54,24 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel>
         super();
         this.clientCnxSupplier = clientCnxSupplier;
         this.tlsEnabled = conf.isUseTls();
+        this.tlsEnabledWithKeyStore = conf.isUseKeyStoreTls();
+
+        if (tlsEnabled) {
+            if (tlsEnabledWithKeyStore) {
+                AuthenticationDataProvider authData1 = conf.getAuthentication().getAuthData();
+
+                nettySSLContextAutoRefreshBuilder = new NettySSLContextAutoRefreshBuilder(
+                            conf.getSslProvider(),
+                            conf.isTlsAllowInsecureConnection(),
+                            conf.getTlsTrustStoreType(),
+                            conf.getTlsTrustStorePath(),
+                            conf.getTlsTrustStorePassword(),
+                            conf.getTlsCiphers(),
+                            conf.getTlsProtocols(),
+                            TLS_CERTIFICATE_CACHE_MILLIS,
+                            authData1);
+            }
 
-        if (conf.isUseTls()) {
             sslContextSupplier = new ObjectCache<SslContext>(() -> {
                 try {
                     // Set client certificate if available
@@ -76,7 +96,12 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel>
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
         if (tlsEnabled) {
-            ch.pipeline().addLast(TLS_HANDLER, sslContextSupplier.get().newHandler(ch.alloc()));
+            if (tlsEnabledWithKeyStore) {
+                ch.pipeline().addLast(TLS_HANDLER,
+                        new SslHandler(nettySSLContextAutoRefreshBuilder.get().createSSLEngine()));
+            } else {
+                ch.pipeline().addLast(TLS_HANDLER, sslContextSupplier.get().newHandler(ch.alloc()));
+            }
             ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.COPYING_ENCODER);
         } else {
             ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataHttps.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataKeyStoreTls.java
similarity index 56%
copy from pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataHttps.java
copy to pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataKeyStoreTls.java
index 03a9bd3..6d78004 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataHttps.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataKeyStoreTls.java
@@ -16,33 +16,35 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.broker.authentication;
+package org.apache.pulsar.client.impl.auth;
 
-import java.security.cert.X509Certificate;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.api.KeyStoreParams;
 
-import javax.servlet.http.HttpServletRequest;
+@Slf4j
+public class AuthenticationDataKeyStoreTls implements AuthenticationDataProvider {
+    private final KeyStoreParams keyStoreParams;
 
-public class AuthenticationDataHttps extends AuthenticationDataHttp {
-
-    protected final X509Certificate[] certificates;
-
-    public AuthenticationDataHttps(HttpServletRequest request) {
-        super(request);
-        certificates = (X509Certificate[]) request.getAttribute("javax.servlet.request.X509Certificate");
+    public AuthenticationDataKeyStoreTls(KeyStoreParams keyStoreParams) throws Exception {
+        this.keyStoreParams = keyStoreParams;
     }
 
     /*
      * TLS
      */
-
     @Override
-    public boolean hasDataFromTls() {
-        return (certificates != null);
+    public boolean hasDataForTls() {
+        return true;
     }
 
     @Override
-    public X509Certificate[] getTlsCertificates() {
-        return certificates;
+    public KeyStoreParams getTlsKeyStoreParams() {
+        return this.keyStoreParams;
     }
 
+    @Override
+    public String getCommandData() {
+        return null;
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationKeyStoreTls.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationKeyStoreTls.java
new file mode 100644
index 0000000..e8c7764
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationKeyStoreTls.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Strings;
+import java.io.IOException;
+import java.util.Map;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.api.EncodedAuthenticationParameterSupport;
+import org.apache.pulsar.client.api.KeyStoreParams;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.AuthenticationUtil;
+
+/**
+ * This plugin requires these parameters: keyStoreType, keyStorePath, and  keyStorePassword.
+ * This parameter will construct a AuthenticationDataProvider
+ */
+@Slf4j
+public class AuthenticationKeyStoreTls implements Authentication, EncodedAuthenticationParameterSupport {
+    private static final long serialVersionUID = 1L;
+
+    private final static String AUTH_NAME = "tls";
+
+    // parameter name
+    public final static String KEYSTORE_TYPE = "keyStoreType";
+    public final static String KEYSTORE_PATH= "keyStorePath";
+    public final static String KEYSTORE_PW = "keyStorePassword";
+    private final static String DEFAULT_KEYSTORE_TYPE = "JKS";
+
+    private KeyStoreParams keyStoreParams;
+
+    public AuthenticationKeyStoreTls() {
+    }
+
+    public AuthenticationKeyStoreTls(String keyStoreType, String keyStorePath, String keyStorePassword) {
+        this.keyStoreParams = KeyStoreParams.builder()
+                .keyStoreType(keyStoreType)
+                .keyStorePath(keyStorePath)
+                .keyStorePassword(keyStorePassword)
+                .build();
+    }
+
+    @Override
+    public void close() throws IOException {
+        // noop
+    }
+
+    @Override
+    public String getAuthMethodName() {
+        return AUTH_NAME;
+    }
+
+    @Override
+    public AuthenticationDataProvider getAuthData() throws PulsarClientException {
+        try {
+            return new AuthenticationDataKeyStoreTls(this.keyStoreParams);
+        } catch (Exception e) {
+            throw new PulsarClientException(e);
+        }
+    }
+
+    // passed in KEYSTORE_TYPE/KEYSTORE_PATH/KEYSTORE_PW to construct parameters.
+    // e.g. {"keyStoreType":"JKS","keyStorePath":"/path/to/keystorefile","keyStorePassword":"keystorepw"}
+    //  or: "keyStoreType":"JKS","keyStorePath":"/path/to/keystorefile","keyStorePassword":"keystorepw"
+    @Override
+    public void configure(String paramsString) {
+        Map<String, String> params = null;
+        try {
+            params = AuthenticationUtil.configureFromJsonString(paramsString);
+        } catch (Exception e) {
+            // auth-param is not in json format
+            log.info("parameter not in Json format: {}", paramsString);
+        }
+
+        // in ":" "," format.
+        params = (params == null || params.isEmpty())
+                ? AuthenticationUtil.configureFromPulsar1AuthParamString(paramsString)
+                : params;
+
+        configure(params);
+    }
+
+    @Override
+    public void configure(Map<String, String> params) {
+        String keyStoreType = params.get(KEYSTORE_TYPE);
+        String keyStorePath = params.get(KEYSTORE_PATH);
+        String keyStorePassword = params.get(KEYSTORE_PW);
+
+        if (Strings.isNullOrEmpty(keyStorePath)
+            || Strings.isNullOrEmpty(keyStorePassword)) {
+            throw new IllegalArgumentException("Passed in parameter empty. "
+                                               + KEYSTORE_PATH + ": " + keyStorePath
+                                               + " " + KEYSTORE_PW + ": " + keyStorePassword);
+        }
+
+        if (Strings.isNullOrEmpty(keyStoreType)) {
+            keyStoreType = DEFAULT_KEYSTORE_TYPE;
+        }
+
+        this.keyStoreParams = KeyStoreParams.builder()
+                .keyStoreType(keyStoreType)
+                .keyStorePath(keyStorePath)
+                .keyStorePassword(keyStorePassword)
+                .build();
+    }
+
+    @Override
+    public void start() throws PulsarClientException {
+        // noop
+    }
+
+    // return strings like : "key1":"value1", "key2":"value2", ...
+    public static String mapToString(Map<String, String> map) {
+        return Joiner.on(',').withKeyValueSeparator(':').join(map);
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationTls.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationTls.java
index 22cd2f5..d899146 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationTls.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationTls.java
@@ -40,7 +40,7 @@ import com.google.common.annotations.VisibleForTesting;
  *
  */
 public class AuthenticationTls implements Authentication, EncodedAuthenticationParameterSupport {
-
+    private final static String AUTH_NAME = "tls";
     private static final long serialVersionUID = 1L;
 
     private String certFilePath;
@@ -67,7 +67,7 @@ public class AuthenticationTls implements Authentication, EncodedAuthenticationP
 
     @Override
     public String getAuthMethodName() {
-        return "tls";
+        return AUTH_NAME;
     }
 
     @Override
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
index af6ad8d..0cf496a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -19,7 +19,9 @@
 package org.apache.pulsar.client.impl.conf;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.common.collect.Sets;
 import java.time.Clock;
+import java.util.Set;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
@@ -70,6 +72,16 @@ public class ClientConfigurationData implements Serializable, Cloneable {
     private long initialBackoffIntervalNanos = TimeUnit.MILLISECONDS.toNanos(100);
     private long maxBackoffIntervalNanos = TimeUnit.SECONDS.toNanos(60);
 
+    // set TLS using KeyStore way.
+    private boolean useKeyStoreTls = false;
+    private String sslProvider = null;
+    // needed when client auth is required
+    private String tlsTrustStoreType = "JKS";
+    private String tlsTrustStorePath = null;
+    private String tlsTrustStorePassword = null;
+    private Set<String> tlsCiphers = Sets.newTreeSet();
+    private Set<String> tlsProtocols = Sets.newTreeSet();
+
     @JsonIgnore
     private Clock clock = Clock.systemDefaultZone();
 
diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml
index 9208cfe..3bc1500 100644
--- a/pulsar-common/pom.xml
+++ b/pulsar-common/pom.xml
@@ -151,6 +151,10 @@
       <artifactId>javax.ws.rs-api</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ClientSslContextRefresher.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ClientSslContextRefresher.java
deleted file mode 100644
index 48ac937..0000000
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ClientSslContextRefresher.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.common.util;
-
-import io.netty.handler.ssl.SslContext;
-import java.io.IOException;
-import java.security.GeneralSecurityException;
-import java.security.cert.X509Certificate;
-import org.apache.pulsar.client.api.AuthenticationDataProvider;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@SuppressWarnings("checkstyle:JavadocType")
-public class ClientSslContextRefresher {
-    private volatile SslContext sslContext;
-    private boolean tlsAllowInsecureConnection;
-    private String tlsTrustCertsFilePath;
-    private AuthenticationDataProvider authData;
-
-    public ClientSslContextRefresher(boolean allowInsecure, String trustCertsFilePath,
-            AuthenticationDataProvider authData) throws IOException, GeneralSecurityException {
-        this.tlsAllowInsecureConnection = allowInsecure;
-        this.tlsTrustCertsFilePath = trustCertsFilePath;
-        this.authData = authData;
-
-        if (authData != null && authData.hasDataForTls()) {
-            this.sslContext = SecurityUtility.createNettySslContextForClient(this.tlsAllowInsecureConnection,
-                    this.tlsTrustCertsFilePath, (X509Certificate[]) authData.getTlsCertificates(),
-                    authData.getTlsPrivateKey());
-        } else {
-            this.sslContext = SecurityUtility.createNettySslContextForClient(this.tlsAllowInsecureConnection,
-                    this.tlsTrustCertsFilePath);
-        }
-    }
-
-    public SslContext get() {
-        if (authData != null && authData.hasDataForTls()) {
-            try {
-                this.sslContext = SecurityUtility.createNettySslContextForClient(this.tlsAllowInsecureConnection,
-                        this.tlsTrustCertsFilePath, (X509Certificate[]) authData.getTlsCertificates(),
-                        authData.getTlsPrivateKey());
-            } catch (GeneralSecurityException | IOException e) {
-                LOG.error("Exception occured while trying to refresh sslContext: ", e);
-            }
-
-        }
-        return sslContext;
-    }
-
-    private static final Logger LOG = LoggerFactory.getLogger(ClientSslContextRefresher.class);
-}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/DefaultSslContextBuilder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/DefaultSslContextBuilder.java
index 3e888f4..c49bdd6 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/DefaultSslContextBuilder.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/DefaultSslContextBuilder.java
@@ -29,11 +29,19 @@ import javax.net.ssl.SSLException;
 public class DefaultSslContextBuilder extends SslContextAutoRefreshBuilder<SSLContext> {
     private volatile SSLContext sslContext;
 
+    protected final boolean tlsAllowInsecureConnection;
+    protected final FileModifiedTimeUpdater tlsTrustCertsFilePath, tlsCertificateFilePath, tlsKeyFilePath;
+    protected final boolean tlsRequireTrustedClientCertOnConnect;
+
     public DefaultSslContextBuilder(boolean allowInsecure, String trustCertsFilePath, String certificateFilePath,
             String keyFilePath, boolean requireTrustedClientCertOnConnect, long certRefreshInSec)
             throws SSLException, FileNotFoundException, GeneralSecurityException, IOException {
-        super(allowInsecure, trustCertsFilePath, certificateFilePath, keyFilePath, null, null,
-                requireTrustedClientCertOnConnect, certRefreshInSec);
+        super(certRefreshInSec);
+        this.tlsAllowInsecureConnection = allowInsecure;
+        this.tlsTrustCertsFilePath = new FileModifiedTimeUpdater(trustCertsFilePath);
+        this.tlsCertificateFilePath = new FileModifiedTimeUpdater(certificateFilePath);
+        this.tlsKeyFilePath = new FileModifiedTimeUpdater(keyFilePath);
+        this.tlsRequireTrustedClientCertOnConnect = requireTrustedClientCertOnConnect;
     }
 
     @Override
@@ -49,4 +57,10 @@ public class DefaultSslContextBuilder extends SslContextAutoRefreshBuilder<SSLCo
         return this.sslContext;
     }
 
+    @Override
+    public boolean needUpdate() {
+        return  tlsTrustCertsFilePath.checkAndRefresh()
+                || tlsCertificateFilePath.checkAndRefresh()
+                || tlsKeyFilePath.checkAndRefresh();
+    }
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/NettyClientSslContextRefresher.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/NettyClientSslContextRefresher.java
new file mode 100644
index 0000000..48cf992
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/NettyClientSslContextRefresher.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util;
+
+import io.netty.handler.ssl.SslContext;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.security.cert.X509Certificate;
+import javax.net.ssl.SSLException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+
+/**
+ * SSL context builder for Netty Client side.
+ */
+@Slf4j
+public class NettyClientSslContextRefresher extends SslContextAutoRefreshBuilder<SslContext> {
+    private volatile SslContext sslNettyContext;
+    private boolean tlsAllowInsecureConnection;
+    protected final FileModifiedTimeUpdater tlsTrustCertsFilePath;
+    private AuthenticationDataProvider authData;
+
+    public NettyClientSslContextRefresher(boolean allowInsecure,
+                                          String trustCertsFilePath,
+                                          AuthenticationDataProvider authData,
+                                          long delayInSeconds)
+            throws IOException, GeneralSecurityException {
+        super(delayInSeconds);
+        this.tlsAllowInsecureConnection = allowInsecure;
+        this.tlsTrustCertsFilePath = new FileModifiedTimeUpdater(trustCertsFilePath);
+        this.authData = authData;
+    }
+
+    @Override
+    public synchronized SslContext update()
+            throws SSLException, FileNotFoundException, GeneralSecurityException, IOException {
+        if (authData != null && authData.hasDataForTls()) {
+            this.sslNettyContext = SecurityUtility.createNettySslContextForClient(this.tlsAllowInsecureConnection,
+                    this.tlsTrustCertsFilePath.getFileName(), (X509Certificate[]) authData.getTlsCertificates(),
+                    authData.getTlsPrivateKey());
+        } else {
+            this.sslNettyContext = SecurityUtility.createNettySslContextForClient(this.tlsAllowInsecureConnection,
+                    this.tlsTrustCertsFilePath.getFileName());
+        }
+        return this.sslNettyContext;
+    }
+
+    @Override
+    public SslContext getSslContext() {
+        return this.sslNettyContext;
+    }
+
+    @Override
+    public boolean needUpdate() {
+        return  tlsTrustCertsFilePath.checkAndRefresh();
+    }
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/NettySslContextBuilder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/NettyServerSslContextBuilder.java
similarity index 52%
rename from pulsar-common/src/main/java/org/apache/pulsar/common/util/NettySslContextBuilder.java
rename to pulsar-common/src/main/java/org/apache/pulsar/common/util/NettyServerSslContextBuilder.java
index 713c52d..250e628 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/NettySslContextBuilder.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/NettyServerSslContextBuilder.java
@@ -26,16 +26,29 @@ import java.util.Set;
 import javax.net.ssl.SSLException;
 
 /**
- * SSL context builder for Netty.
+ * SSL context builder for Netty Server side.
  */
-public class NettySslContextBuilder extends SslContextAutoRefreshBuilder<SslContext> {
+public class NettyServerSslContextBuilder extends SslContextAutoRefreshBuilder<SslContext> {
     private volatile SslContext sslNettyContext;
 
-    public NettySslContextBuilder(boolean allowInsecure, String trustCertsFilePath, String certificateFilePath,
-            String keyFilePath, Set<String> ciphers, Set<String> protocols, boolean requireTrustedClientCertOnConnect,
-            long delayInSeconds) throws SSLException, FileNotFoundException, GeneralSecurityException, IOException {
-        super(allowInsecure, trustCertsFilePath, certificateFilePath, keyFilePath, ciphers, protocols,
-                requireTrustedClientCertOnConnect, delayInSeconds);
+    protected final boolean tlsAllowInsecureConnection;
+    protected final FileModifiedTimeUpdater tlsTrustCertsFilePath, tlsCertificateFilePath, tlsKeyFilePath;
+    protected final Set<String> tlsCiphers;
+    protected final Set<String> tlsProtocols;
+    protected final boolean tlsRequireTrustedClientCertOnConnect;
+
+    public NettyServerSslContextBuilder(boolean allowInsecure, String trustCertsFilePath, String certificateFilePath,
+                                        String keyFilePath, Set<String> ciphers, Set<String> protocols,
+                                        boolean requireTrustedClientCertOnConnect,
+                                        long delayInSeconds) {
+        super(delayInSeconds);
+        this.tlsAllowInsecureConnection = allowInsecure;
+        this.tlsTrustCertsFilePath = new FileModifiedTimeUpdater(trustCertsFilePath);
+        this.tlsCertificateFilePath = new FileModifiedTimeUpdater(certificateFilePath);
+        this.tlsKeyFilePath = new FileModifiedTimeUpdater(keyFilePath);
+        this.tlsCiphers = ciphers;
+        this.tlsProtocols = protocols;
+        this.tlsRequireTrustedClientCertOnConnect = requireTrustedClientCertOnConnect;
     }
 
     @Override
@@ -52,4 +65,10 @@ public class NettySslContextBuilder extends SslContextAutoRefreshBuilder<SslCont
         return this.sslNettyContext;
     }
 
+    @Override
+    public boolean needUpdate() {
+        return  tlsTrustCertsFilePath.checkAndRefresh()
+                || tlsCertificateFilePath.checkAndRefresh()
+                || tlsKeyFilePath.checkAndRefresh();
+    }
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SslContextAutoRefreshBuilder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SslContextAutoRefreshBuilder.java
index 5fa9c1b..a29f051 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SslContextAutoRefreshBuilder.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SslContextAutoRefreshBuilder.java
@@ -18,16 +18,10 @@
  */
 package org.apache.pulsar.common.util;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.security.GeneralSecurityException;
-import java.util.Set;
 import java.util.concurrent.TimeUnit;
-
-import javax.net.ssl.SSLException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
 
 /**
  * Auto refresher and builder of SSLContext.
@@ -35,30 +29,18 @@ import org.slf4j.LoggerFactory;
  * @param <T>
  *            type of SSLContext
  */
+@Slf4j
 public abstract class SslContextAutoRefreshBuilder<T> {
-    protected final boolean tlsAllowInsecureConnection;
-    protected final FileModifiedTimeUpdater tlsTrustCertsFilePath, tlsCertificateFilePath, tlsKeyFilePath;
-    protected final Set<String> tlsCiphers;
-    protected final Set<String> tlsProtocols;
-    protected final boolean tlsRequireTrustedClientCertOnConnect;
     protected final long refreshTime;
     protected long lastRefreshTime;
 
-    public SslContextAutoRefreshBuilder(boolean allowInsecure, String trustCertsFilePath, String certificateFilePath,
-            String keyFilePath, Set<String> ciphers, Set<String> protocols, boolean requireTrustedClientCertOnConnect,
-            long certRefreshInSec) throws SSLException, FileNotFoundException, GeneralSecurityException, IOException {
-        this.tlsAllowInsecureConnection = allowInsecure;
-        this.tlsTrustCertsFilePath = new FileModifiedTimeUpdater(trustCertsFilePath);
-        this.tlsCertificateFilePath = new FileModifiedTimeUpdater(certificateFilePath);
-        this.tlsKeyFilePath = new FileModifiedTimeUpdater(keyFilePath);
-        this.tlsCiphers = ciphers;
-        this.tlsProtocols = protocols;
-        this.tlsRequireTrustedClientCertOnConnect = requireTrustedClientCertOnConnect;
+    public SslContextAutoRefreshBuilder(
+            long certRefreshInSec) {
         this.refreshTime = TimeUnit.SECONDS.toMillis(certRefreshInSec);
         this.lastRefreshTime = -1;
 
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Certs will be refreshed every {} seconds", certRefreshInSec);
+        if (log.isDebugEnabled()) {
+            log.debug("Certs will be refreshed every {} seconds", certRefreshInSec);
         }
     }
 
@@ -79,6 +61,13 @@ public abstract class SslContextAutoRefreshBuilder<T> {
     protected abstract T getSslContext();
 
     /**
+     * Returns whether the key files modified after a refresh time, and context need update.
+     *
+     * @return true if files modified
+     */
+    protected abstract boolean needUpdate();
+
+    /**
      * It updates SSLContext at every configured refresh time and returns updated SSLContext.
      *
      * @return
@@ -91,24 +80,21 @@ public abstract class SslContextAutoRefreshBuilder<T> {
                 lastRefreshTime = System.currentTimeMillis();
                 return getSslContext();
             } catch (GeneralSecurityException | IOException e) {
-                LOG.error("Execption while trying to refresh ssl Context {}", e.getMessage(), e);
+                log.error("Exception while trying to refresh ssl Context {}", e.getMessage(), e);
             }
         } else {
             long now = System.currentTimeMillis();
             if (refreshTime <= 0 || now > (lastRefreshTime + refreshTime)) {
-                if (tlsTrustCertsFilePath.checkAndRefresh() || tlsCertificateFilePath.checkAndRefresh()
-                        || tlsKeyFilePath.checkAndRefresh()) {
+                if (needUpdate()) {
                     try {
                         ctx = update();
                         lastRefreshTime = now;
                     } catch (GeneralSecurityException | IOException e) {
-                        LOG.error("Execption while trying to refresh ssl Context {} ", e.getMessage(), e);
+                        log.error("Exception while trying to refresh ssl Context {} ", e.getMessage(), e);
                     }
                 }
             }
         }
         return ctx;
     }
-
-    private static final Logger LOG = LoggerFactory.getLogger(SslContextAutoRefreshBuilder.class);
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/KeyStoreSSLContext.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/KeyStoreSSLContext.java
new file mode 100644
index 0000000..b9ad2e7
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/KeyStoreSSLContext.java
@@ -0,0 +1,355 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.keystoretls;
+
+import static org.apache.pulsar.common.util.SecurityUtility.getProvider;
+
+import com.google.common.base.Strings;
+import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.Provider;
+import java.security.SecureRandom;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.TrustManagerFactory;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+
+/**
+ * KeyStoreSSLContext that mainly wrap a SSLContext to provide SSL context for both webservice and netty.
+ */
+@Slf4j
+public class KeyStoreSSLContext {
+    public static final String DEFAULT_KEYSTORE_TYPE = "JKS";
+    public static final String DEFAULT_SSL_PROTOCOL = "TLS";
+    public static final String DEFAULT_SSL_ENABLED_PROTOCOLS = "TLSv1.2,TLSv1.1,TLSv1";
+    public static final String DEFAULT_SSL_KEYMANGER_ALGORITHM = KeyManagerFactory.getDefaultAlgorithm();
+    public static final String DEFAULT_SSL_TRUSTMANAGER_ALGORITHM = TrustManagerFactory.getDefaultAlgorithm();
+
+    public static final Provider BC_PROVIDER = getProvider();
+
+    /**
+     * Connection Mode for TLS.
+     */
+    public enum Mode {
+        CLIENT,
+        SERVER
+    }
+
+    @Getter
+    private final Mode mode;
+
+    private String sslProviderString;
+    private String keyStoreTypeString;
+    private String keyStorePath;
+    private String keyStorePassword;
+    private boolean allowInsecureConnection;
+    private String trustStoreTypeString;
+    private String trustStorePath;
+    private String trustStorePassword;
+    private boolean needClientAuth;
+    private Set<String> ciphers;
+    private Set<String> protocols;
+    @Getter
+    private SSLContext sslContext;
+
+    private String protocol = DEFAULT_SSL_PROTOCOL;
+    private String kmfAlgorithm = DEFAULT_SSL_KEYMANGER_ALGORITHM;
+    private String tmfAlgorithm = DEFAULT_SSL_TRUSTMANAGER_ALGORITHM;
+
+    // only init vars, before using it, need to call createSSLContext to create ssl context.
+    public KeyStoreSSLContext(Mode mode,
+                              String sslProviderString,
+                              String keyStoreTypeString,
+                              String keyStorePath,
+                              String keyStorePassword,
+                              boolean allowInsecureConnection,
+                              String trustStoreTypeString,
+                              String trustStorePath,
+                              String trustStorePassword,
+                              boolean requireTrustedClientCertOnConnect,
+                              Set<String> ciphers,
+                              Set<String> protocols) {
+        this.mode = mode;
+        this.sslProviderString = sslProviderString;
+        this.keyStoreTypeString = Strings.isNullOrEmpty(keyStoreTypeString)
+                ? DEFAULT_KEYSTORE_TYPE
+                : keyStoreTypeString;
+        this.keyStorePath = keyStorePath;
+        this.keyStorePassword = keyStorePassword;
+        this.trustStoreTypeString = Strings.isNullOrEmpty(trustStoreTypeString)
+                ? DEFAULT_KEYSTORE_TYPE
+                : trustStoreTypeString;
+        this.trustStorePath = trustStorePath;
+        this.trustStorePassword = trustStorePassword;
+        this.needClientAuth = requireTrustedClientCertOnConnect;
+        this.ciphers = ciphers;
+        this.protocols = protocols;
+
+        if (protocols != null && protocols.size() > 0) {
+            this.protocols = protocols;
+        } else {
+            this.protocols = new HashSet<>(Arrays.asList(DEFAULT_SSL_ENABLED_PROTOCOLS.split("\\s*,\\s*")));
+        }
+
+        if (ciphers != null && ciphers.size() > 0) {
+            this.ciphers = ciphers;
+        } else {
+            this.ciphers = null;
+        }
+
+        this.allowInsecureConnection = allowInsecureConnection;
+    }
+
+    public SSLContext createSSLContext() throws GeneralSecurityException, IOException {
+        SSLContext sslContext;
+        if (sslProviderString != null) {
+            sslContext = SSLContext.getInstance(protocol, sslProviderString);
+        } else {
+            sslContext = SSLContext.getInstance(protocol);
+        }
+
+        // key store
+        KeyManager[] keyManagers = null;
+        if (!Strings.isNullOrEmpty(keyStorePath)) {
+            KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(kmfAlgorithm);
+            KeyStore keyStore = KeyStore.getInstance(keyStoreTypeString);
+            char[] passwordChars = keyStorePassword.toCharArray();
+            keyStore.load(new FileInputStream(keyStorePath), passwordChars);
+            keyManagerFactory.init(keyStore, passwordChars);
+            keyManagers = keyManagerFactory.getKeyManagers();
+        }
+
+        // trust store
+        TrustManagerFactory trustManagerFactory;
+        if (this.allowInsecureConnection) {
+            trustManagerFactory = InsecureTrustManagerFactory.INSTANCE;
+        } else {
+            trustManagerFactory = TrustManagerFactory.getInstance(tmfAlgorithm);
+            KeyStore trustStore = KeyStore.getInstance(trustStoreTypeString);
+            char[] passwordChars = trustStorePassword.toCharArray();
+            trustStore.load(new FileInputStream(trustStorePath), passwordChars);
+            trustManagerFactory.init(trustStore);
+        }
+
+        // init
+        sslContext.init(keyManagers, trustManagerFactory.getTrustManagers(), new SecureRandom());
+        this.sslContext = sslContext;
+        return sslContext;
+    }
+
+    public SSLEngine createSSLEngine() {
+        SSLEngine sslEngine = sslContext.createSSLEngine();
+
+        sslEngine.setEnabledProtocols(sslEngine.getSupportedProtocols());
+        sslEngine.setEnabledCipherSuites(sslEngine.getSupportedCipherSuites());
+
+        if (this.mode == Mode.SERVER) {
+            sslEngine.setNeedClientAuth(this.needClientAuth);
+            sslEngine.setUseClientMode(false);
+        } else {
+            sslEngine.setUseClientMode(true);
+        }
+
+        return sslEngine;
+    }
+
+    public static KeyStoreSSLContext createClientKeyStoreSslContext(String sslProviderString,
+                                                            String keyStoreTypeString,
+                                                            String keyStorePath,
+                                                            String keyStorePassword,
+                                                            boolean allowInsecureConnection,
+                                                            String trustStoreTypeString,
+                                                            String trustStorePath,
+                                                            String trustStorePassword,
+                                                            Set<String> ciphers,
+                                                            Set<String> protocols)
+            throws GeneralSecurityException, SSLException, FileNotFoundException, IOException {
+        KeyStoreSSLContext keyStoreSSLContext = new KeyStoreSSLContext(Mode.CLIENT,
+                sslProviderString,
+                keyStoreTypeString,
+                keyStorePath,
+                keyStorePassword,
+                allowInsecureConnection,
+                trustStoreTypeString,
+                trustStorePath,
+                trustStorePassword,
+                false,
+                ciphers,
+                protocols);
+
+        keyStoreSSLContext.createSSLContext();
+        return keyStoreSSLContext;
+    }
+
+
+    public static KeyStoreSSLContext createServerKeyStoreSslContext(String sslProviderString,
+                                                    String keyStoreTypeString,
+                                                    String keyStorePath,
+                                                    String keyStorePassword,
+                                                    boolean allowInsecureConnection,
+                                                    String trustStoreTypeString,
+                                                    String trustStorePath,
+                                                    String trustStorePassword,
+                                                    boolean requireTrustedClientCertOnConnect,
+                                                    Set<String> ciphers,
+                                                    Set<String> protocols)
+            throws GeneralSecurityException, SSLException, FileNotFoundException, IOException {
+        KeyStoreSSLContext keyStoreSSLContext = new KeyStoreSSLContext(Mode.SERVER,
+                sslProviderString,
+                keyStoreTypeString,
+                keyStorePath,
+                keyStorePassword,
+                allowInsecureConnection,
+                trustStoreTypeString,
+                trustStorePath,
+                trustStorePassword,
+                requireTrustedClientCertOnConnect,
+                ciphers,
+                protocols);
+
+        keyStoreSSLContext.createSSLContext();
+        return keyStoreSSLContext;
+    }
+
+    // for web server use case, no need ciphers and protocols
+    public static SSLContext createServerSslContext(String sslProviderString,
+                                                    String keyStoreTypeString,
+                                                    String keyStorePath,
+                                                    String keyStorePassword,
+                                                    boolean allowInsecureConnection,
+                                                    String trustStoreTypeString,
+                                                    String trustStorePath,
+                                                    String trustStorePassword,
+                                                    boolean requireTrustedClientCertOnConnect)
+            throws GeneralSecurityException, SSLException, FileNotFoundException, IOException {
+
+        return createServerKeyStoreSslContext(
+                sslProviderString,
+                keyStoreTypeString,
+                keyStorePath,
+                keyStorePassword,
+                allowInsecureConnection,
+                trustStoreTypeString,
+                trustStorePath,
+                trustStorePassword,
+                requireTrustedClientCertOnConnect,
+                null,
+                null).getSslContext();
+    }
+
+    // for web client
+    public static SSLContext createClientSslContext(String sslProviderString,
+                                                    String keyStoreTypeString,
+                                                    String keyStorePath,
+                                                    String keyStorePassword,
+                                                    boolean allowInsecureConnection,
+                                                    String trustStoreTypeString,
+                                                    String trustStorePath,
+                                                    String trustStorePassword,
+                                                    Set<String> ciphers,
+                                                    Set<String> protocol)
+            throws GeneralSecurityException, SSLException, FileNotFoundException, IOException {
+        KeyStoreSSLContext keyStoreSSLContext = new KeyStoreSSLContext(Mode.CLIENT,
+                sslProviderString,
+                keyStoreTypeString,
+                keyStorePath,
+                keyStorePassword,
+                allowInsecureConnection,
+                trustStoreTypeString,
+                trustStorePath,
+                trustStorePassword,
+                false,
+                ciphers,
+                protocol);
+
+        return keyStoreSSLContext.createSSLContext();
+    }
+
+    // for web client
+    public static SSLContext createClientSslContext(String keyStoreTypeString,
+                                                    String keyStorePath,
+                                                    String keyStorePassword,
+                                                    String trustStoreTypeString,
+                                                    String trustStorePath,
+                                                    String trustStorePassword)
+            throws GeneralSecurityException, SSLException, FileNotFoundException, IOException {
+        KeyStoreSSLContext keyStoreSSLContext = new KeyStoreSSLContext(Mode.CLIENT,
+                null,
+                keyStoreTypeString,
+                keyStorePath,
+                keyStorePassword,
+                false,
+                trustStoreTypeString,
+                trustStorePath,
+                trustStorePassword,
+                false,
+                null,
+                null);
+
+        return keyStoreSSLContext.createSSLContext();
+    }
+
+    // for web server. autoRefresh is default true.
+    public static SslContextFactory createSslContextFactory(String sslProviderString,
+                                                            String keyStoreTypeString,
+                                                            String keyStore,
+                                                            String keyStorePassword,
+                                                            boolean allowInsecureConnection,
+                                                            String trustStoreTypeString,
+                                                            String trustStore,
+                                                            String trustStorePassword,
+                                                            boolean requireTrustedClientCertOnConnect,
+                                                            long certRefreshInSec)
+            throws GeneralSecurityException, SSLException, FileNotFoundException, IOException {
+        SslContextFactory sslCtxFactory;
+
+        sslCtxFactory = new SslContextFactoryWithAutoRefresh(
+                sslProviderString,
+                keyStoreTypeString,
+                keyStore,
+                keyStorePassword,
+                allowInsecureConnection,
+                trustStoreTypeString,
+                trustStore,
+                trustStorePassword,
+                requireTrustedClientCertOnConnect,
+                certRefreshInSec);
+
+        if (requireTrustedClientCertOnConnect) {
+            sslCtxFactory.setNeedClientAuth(true);
+        } else {
+            sslCtxFactory.setWantClientAuth(true);
+        }
+        sslCtxFactory.setTrustAll(true);
+
+        return sslCtxFactory;
+    }
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/NetSslContextBuilder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/NetSslContextBuilder.java
new file mode 100644
index 0000000..38ebdb4
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/NetSslContextBuilder.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.keystoretls;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLException;
+import org.apache.pulsar.common.util.FileModifiedTimeUpdater;
+import org.apache.pulsar.common.util.SslContextAutoRefreshBuilder;
+
+/**
+ * Similar to `DefaultSslContextBuilder`, which build `javax.net.ssl.SSLContext` for web service.
+ */
+public class NetSslContextBuilder extends SslContextAutoRefreshBuilder<SSLContext> {
+    private volatile SSLContext sslContext;
+
+    protected final boolean tlsAllowInsecureConnection;
+    protected final boolean tlsRequireTrustedClientCertOnConnect;
+
+    protected final String tlsProvider;
+    protected final String tlsKeyStoreType;
+    protected final String tlsKeyStorePassword;
+    protected final FileModifiedTimeUpdater tlsKeyStore;
+    protected final String tlsTrustStoreType;
+    protected final String tlsTrustStorePassword;
+    protected final FileModifiedTimeUpdater tlsTrustStore;
+
+    public NetSslContextBuilder(String sslProviderString,
+                                String keyStoreTypeString,
+                                String keyStore,
+                                String keyStorePasswordPath,
+                                boolean allowInsecureConnection,
+                                String trustStoreTypeString,
+                                String trustStore,
+                                String trustStorePasswordPath,
+                                boolean requireTrustedClientCertOnConnect,
+                                long certRefreshInSec) {
+        super(certRefreshInSec);
+
+        this.tlsAllowInsecureConnection = allowInsecureConnection;
+        this.tlsProvider = sslProviderString;
+        this.tlsKeyStoreType = keyStoreTypeString;
+        this.tlsKeyStore = new FileModifiedTimeUpdater(keyStore);
+        this.tlsKeyStorePassword = keyStorePasswordPath;
+
+        this.tlsTrustStoreType = trustStoreTypeString;
+        this.tlsTrustStore = new FileModifiedTimeUpdater(trustStore);
+        this.tlsTrustStorePassword = trustStorePasswordPath;
+
+        this.tlsRequireTrustedClientCertOnConnect = requireTrustedClientCertOnConnect;
+    }
+
+    @Override
+    public synchronized SSLContext update()
+            throws SSLException, FileNotFoundException, GeneralSecurityException, IOException {
+        this.sslContext = KeyStoreSSLContext.createServerSslContext(tlsProvider,
+                tlsKeyStoreType, tlsKeyStore.getFileName(), tlsKeyStorePassword,
+                tlsAllowInsecureConnection,
+                tlsTrustStoreType, tlsTrustStore.getFileName(), tlsTrustStorePassword,
+                tlsRequireTrustedClientCertOnConnect);
+        return this.sslContext;
+    }
+
+    @Override
+    public SSLContext getSslContext() {
+        return this.sslContext;
+    }
+
+    @Override
+    public boolean needUpdate() {
+        return  tlsKeyStore.checkAndRefresh()
+                || tlsTrustStore.checkAndRefresh();
+    }
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/NettySSLContextAutoRefreshBuilder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/NettySSLContextAutoRefreshBuilder.java
new file mode 100644
index 0000000..363fe1e
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/NettySSLContextAutoRefreshBuilder.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.keystoretls;
+
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.util.Set;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.api.KeyStoreParams;
+import org.apache.pulsar.common.util.FileModifiedTimeUpdater;
+import org.apache.pulsar.common.util.SslContextAutoRefreshBuilder;
+
+/**
+ * SSL context builder for Netty.
+ */
+public class NettySSLContextAutoRefreshBuilder extends SslContextAutoRefreshBuilder<KeyStoreSSLContext> {
+    private volatile KeyStoreSSLContext keyStoreSSLContext;
+
+    protected final boolean tlsAllowInsecureConnection;
+    protected final Set<String> tlsCiphers;
+    protected final Set<String> tlsProtocols;
+    protected boolean tlsRequireTrustedClientCertOnConnect;
+
+    protected final String tlsProvider;
+    protected final String tlsTrustStoreType;
+    protected final String tlsTrustStorePassword;
+    protected final FileModifiedTimeUpdater tlsTrustStore;
+
+    // client context not need keystore at start time, keyStore is passed in by authData.
+    protected String tlsKeyStoreType;
+    protected String tlsKeyStorePassword;
+    protected FileModifiedTimeUpdater tlsKeyStore;
+
+    protected AuthenticationDataProvider authData;
+    protected final boolean isServer;
+
+    // for server
+    public NettySSLContextAutoRefreshBuilder(String sslProviderString,
+                                             String keyStoreTypeString,
+                                             String keyStore,
+                                             String keyStorePassword,
+                                             boolean allowInsecureConnection,
+                                             String trustStoreTypeString,
+                                             String trustStore,
+                                             String trustStorePassword,
+                                             boolean requireTrustedClientCertOnConnect,
+                                             Set<String> ciphers,
+                                             Set<String> protocols,
+                                             long certRefreshInSec) {
+        super(certRefreshInSec);
+
+        this.tlsAllowInsecureConnection = allowInsecureConnection;
+        this.tlsProvider = sslProviderString;
+
+        this.tlsKeyStoreType = keyStoreTypeString;
+        this.tlsKeyStore = new FileModifiedTimeUpdater(keyStore);
+        this.tlsKeyStorePassword = keyStorePassword;
+
+        this.tlsTrustStoreType = trustStoreTypeString;
+        this.tlsTrustStore = new FileModifiedTimeUpdater(trustStore);
+        this.tlsTrustStorePassword = trustStorePassword;
+
+        this.tlsRequireTrustedClientCertOnConnect = requireTrustedClientCertOnConnect;
+        this.tlsCiphers = ciphers;
+        this.tlsProtocols = protocols;
+
+        this.isServer = true;
+    }
+
+    // for client
+    public NettySSLContextAutoRefreshBuilder(String sslProviderString,
+                                             boolean allowInsecureConnection,
+                                             String trustStoreTypeString,
+                                             String trustStore,
+                                             String trustStorePassword,
+                                             Set<String> ciphers,
+                                             Set<String> protocols,
+                                             long certRefreshInSec,
+                                             AuthenticationDataProvider authData) {
+        super(certRefreshInSec);
+
+        this.tlsAllowInsecureConnection = allowInsecureConnection;
+        this.tlsProvider = sslProviderString;
+
+        this.authData = authData;
+
+        this.tlsTrustStoreType = trustStoreTypeString;
+        this.tlsTrustStore = new FileModifiedTimeUpdater(trustStore);
+        this.tlsTrustStorePassword = trustStorePassword;
+
+        this.tlsCiphers = ciphers;
+        this.tlsProtocols = protocols;
+
+        this.isServer = false;
+    }
+
+    @Override
+    public synchronized KeyStoreSSLContext update() throws GeneralSecurityException, IOException {
+        if (isServer) {
+            this.keyStoreSSLContext = KeyStoreSSLContext.createServerKeyStoreSslContext(tlsProvider,
+                    tlsKeyStoreType, tlsKeyStore.getFileName(), tlsKeyStorePassword,
+                    tlsAllowInsecureConnection,
+                    tlsTrustStoreType, tlsTrustStore.getFileName(), tlsTrustStorePassword,
+                    tlsRequireTrustedClientCertOnConnect, tlsCiphers, tlsProtocols);
+        } else {
+            KeyStoreParams authParams = authData.getTlsKeyStoreParams();
+            this.keyStoreSSLContext = KeyStoreSSLContext.createClientKeyStoreSslContext(tlsProvider,
+                    authParams != null ? authParams.getKeyStoreType() : null,
+                    authParams != null ? authParams.getKeyStorePath() : null,
+                    authParams != null ? authParams.getKeyStorePassword() : null,
+                    tlsAllowInsecureConnection,
+                    tlsTrustStoreType, tlsTrustStore.getFileName(), tlsTrustStorePassword,
+                    tlsCiphers, tlsProtocols);
+        }
+        return this.keyStoreSSLContext;
+    }
+
+    @Override
+    public KeyStoreSSLContext getSslContext() {
+        return this.keyStoreSSLContext;
+    }
+
+    @Override
+    public boolean needUpdate() {
+        return  (tlsKeyStore != null && tlsKeyStore.checkAndRefresh())
+                || (tlsTrustStore != null && tlsTrustStore.checkAndRefresh());
+    }
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/SSLContextValidatorEngine.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/SSLContextValidatorEngine.java
new file mode 100644
index 0000000..555d96e
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/SSLContextValidatorEngine.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.keystoretls;
+
+import java.nio.ByteBuffer;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLParameters;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * SSLContextValidatorEngine to validate 2 SSlContext.
+ */
+@Slf4j
+public class SSLContextValidatorEngine {
+    /**
+     * Mode of peer.
+     */
+    public enum Mode {
+        CLIENT,
+        SERVER
+    }
+
+    private static final ByteBuffer EMPTY_BUF = ByteBuffer.allocate(0);
+    private final SSLEngine sslEngine;
+    private SSLEngineResult handshakeResult;
+    private ByteBuffer appBuffer;
+    private ByteBuffer netBuffer;
+    private Mode mode;
+
+    public static void validate(SSLContext clientSslContext, SSLContext serverSslContext) throws SSLException {
+        SSLContextValidatorEngine clientEngine = new SSLContextValidatorEngine(clientSslContext, Mode.CLIENT);
+        SSLContextValidatorEngine serverEngine = new SSLContextValidatorEngine(serverSslContext, Mode.SERVER);
+        try {
+            clientEngine.beginHandshake();
+            serverEngine.beginHandshake();
+            while (!serverEngine.complete() || !clientEngine.complete()) {
+                clientEngine.handshake(serverEngine);
+                serverEngine.handshake(clientEngine);
+            }
+        } finally {
+            clientEngine.close();
+            serverEngine.close();
+        }
+    }
+
+    private SSLContextValidatorEngine(SSLContext sslContext, Mode mode) {
+        this.mode = mode;
+        this.sslEngine = createSslEngine(sslContext, "localhost", 0); // these hints are not used for validation
+        sslEngine.setUseClientMode(mode == Mode.CLIENT);
+        appBuffer = ByteBuffer.allocate(sslEngine.getSession().getApplicationBufferSize());
+        netBuffer = ByteBuffer.allocate(sslEngine.getSession().getPacketBufferSize());
+    }
+
+    private SSLEngine createSslEngine(SSLContext sslContext, String peerHost, int peerPort) {
+        SSLEngine sslEngine = sslContext.createSSLEngine(peerHost, peerPort);
+
+        if (mode == Mode.SERVER) {
+            sslEngine.setNeedClientAuth(true);
+        } else {
+            sslEngine.setUseClientMode(true);
+            SSLParameters sslParams = sslEngine.getSSLParameters();
+            sslEngine.setSSLParameters(sslParams);
+        }
+        return sslEngine;
+    }
+
+    void beginHandshake() throws SSLException {
+        sslEngine.beginHandshake();
+    }
+
+    void handshake(SSLContextValidatorEngine peerEngine) throws SSLException {
+        SSLEngineResult.HandshakeStatus handshakeStatus = sslEngine.getHandshakeStatus();
+        while (true) {
+            switch (handshakeStatus) {
+                case NEED_WRAP:
+                    handshakeResult = sslEngine.wrap(EMPTY_BUF, netBuffer);
+                    switch (handshakeResult.getStatus()) {
+                        case OK: break;
+                        case BUFFER_OVERFLOW:
+                            netBuffer.compact();
+                            netBuffer = ensureCapacity(netBuffer, sslEngine.getSession().getPacketBufferSize());
+                            netBuffer.flip();
+                            break;
+                        case BUFFER_UNDERFLOW:
+                        case CLOSED:
+                        default:
+                            throw new SSLException("Unexpected handshake status: " + handshakeResult.getStatus());
+                    }
+                    return;
+                case NEED_UNWRAP:
+                    if (peerEngine.netBuffer.position() == 0) {
+                        return;
+                    }
+                    peerEngine.netBuffer.flip(); // unwrap the data from peer
+                    handshakeResult = sslEngine.unwrap(peerEngine.netBuffer, appBuffer);
+                    peerEngine.netBuffer.compact();
+                    handshakeStatus = handshakeResult.getHandshakeStatus();
+                    switch (handshakeResult.getStatus()) {
+                        case OK: break;
+                        case BUFFER_OVERFLOW:
+                            appBuffer = ensureCapacity(appBuffer, sslEngine.getSession().getApplicationBufferSize());
+                            break;
+                        case BUFFER_UNDERFLOW:
+                            netBuffer = ensureCapacity(netBuffer, sslEngine.getSession().getPacketBufferSize());
+                            break;
+                        case CLOSED:
+                        default:
+                            throw new SSLException("Unexpected handshake status: " + handshakeResult.getStatus());
+                    }
+                    break;
+                case NEED_TASK:
+                    sslEngine.getDelegatedTask().run();
+                    handshakeStatus = sslEngine.getHandshakeStatus();
+                    break;
+                case FINISHED:
+                    return;
+                case NOT_HANDSHAKING:
+                    if (handshakeResult.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.FINISHED) {
+                        throw new SSLException("Did not finish handshake");
+                    }
+                    return;
+                default:
+                    throw new IllegalStateException("Unexpected handshake status " + handshakeStatus);
+            }
+        }
+    }
+
+    boolean complete() {
+        return sslEngine.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.FINISHED
+               || sslEngine.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING;
+    }
+
+    void close() {
+        sslEngine.closeOutbound();
+        try {
+            sslEngine.closeInbound();
+        } catch (Exception e) {
+            // ignore
+        }
+    }
+
+    /**
+     * Check if the given ByteBuffer capacity.
+     * @param existingBuffer ByteBuffer capacity to check
+     * @param newLength new length for the ByteBuffer.
+     * returns ByteBuffer
+     */
+    public static ByteBuffer ensureCapacity(ByteBuffer existingBuffer, int newLength) {
+        if (newLength > existingBuffer.capacity()) {
+            ByteBuffer newBuffer = ByteBuffer.allocate(newLength);
+            existingBuffer.flip();
+            newBuffer.put(existingBuffer);
+            return newBuffer;
+        }
+        return existingBuffer;
+    }
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/SslContextFactoryWithAutoRefresh.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/SslContextFactoryWithAutoRefresh.java
new file mode 100644
index 0000000..e18e8c6
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/SslContextFactoryWithAutoRefresh.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.keystoretls;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLException;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+
+/**
+ * SslContextFactoryWithAutoRefresh that create SSLContext for web server, and refresh in time.
+ */
+public class SslContextFactoryWithAutoRefresh extends SslContextFactory {
+    private final NetSslContextBuilder sslCtxRefresher;
+
+    public SslContextFactoryWithAutoRefresh(String sslProviderString,
+                                            String keyStoreTypeString,
+                                            String keyStore,
+                                            String keyStorePassword,
+                                            boolean allowInsecureConnection,
+                                            String trustStoreTypeString,
+                                            String trustStore,
+                                            String trustStorePassword,
+                                            boolean requireTrustedClientCertOnConnect,
+                                            long certRefreshInSec)
+            throws SSLException, FileNotFoundException, GeneralSecurityException, IOException {
+        super();
+        sslCtxRefresher = new NetSslContextBuilder(
+                sslProviderString,
+                keyStoreTypeString,
+                keyStore,
+                keyStorePassword,
+                allowInsecureConnection,
+                trustStoreTypeString,
+                trustStore,
+                trustStorePassword,
+                requireTrustedClientCertOnConnect,
+                certRefreshInSec);
+    }
+
+    @Override
+    public SSLContext getSslContext() {
+        return sslCtxRefresher.get();
+    }
+}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataHttps.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/package-info.java
similarity index 53%
copy from pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataHttps.java
copy to pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/package-info.java
index 03a9bd3..11a8db4 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataHttps.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/package-info.java
@@ -16,33 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.broker.authentication;
-
-import java.security.cert.X509Certificate;
-
-import javax.servlet.http.HttpServletRequest;
-
-public class AuthenticationDataHttps extends AuthenticationDataHttp {
-
-    protected final X509Certificate[] certificates;
-
-    public AuthenticationDataHttps(HttpServletRequest request) {
-        super(request);
-        certificates = (X509Certificate[]) request.getAttribute("javax.servlet.request.X509Certificate");
-    }
-
-    /*
-     * TLS
-     */
-
-    @Override
-    public boolean hasDataFromTls() {
-        return (certificates != null);
-    }
-
-    @Override
-    public X509Certificate[] getTlsCertificates() {
-        return certificates;
-    }
-
-}
+/**
+ * Helpers to work with events from the non-blocking I/O client-server framework.
+ */
+package org.apache.pulsar.common.util.keystoretls;
diff --git a/pulsar-common/src/test/resources/broker.keystore.jks b/pulsar-common/src/test/resources/broker.keystore.jks
new file mode 100644
index 0000000..b4fec69
Binary files /dev/null and b/pulsar-common/src/test/resources/broker.keystore.jks differ
diff --git a/pulsar-common/src/test/resources/broker.truststore.jks b/pulsar-common/src/test/resources/broker.truststore.jks
new file mode 100644
index 0000000..8ac03d8
Binary files /dev/null and b/pulsar-common/src/test/resources/broker.truststore.jks differ
diff --git a/pulsar-common/src/test/resources/brokerKeyStorePW.txt b/pulsar-common/src/test/resources/brokerKeyStorePW.txt
new file mode 100644
index 0000000..90d2950
--- /dev/null
+++ b/pulsar-common/src/test/resources/brokerKeyStorePW.txt
@@ -0,0 +1 @@
+111111
diff --git a/pulsar-common/src/test/resources/brokerTrustStorePW.txt b/pulsar-common/src/test/resources/brokerTrustStorePW.txt
new file mode 100644
index 0000000..90d2950
--- /dev/null
+++ b/pulsar-common/src/test/resources/brokerTrustStorePW.txt
@@ -0,0 +1 @@
+111111
diff --git a/pulsar-common/src/test/resources/ca-cert b/pulsar-common/src/test/resources/ca-cert
new file mode 100644
index 0000000..32c8d92
--- /dev/null
+++ b/pulsar-common/src/test/resources/ca-cert
@@ -0,0 +1,16 @@
+-----BEGIN CERTIFICATE-----
+MIICmDCCAYACCQCYZHWHBQWWnTANBgkqhkiG9w0BAQsFADANMQswCQYDVQQGEwJj
+bjAgFw0yMDA0MjgxMzIxMDBaGA8yMTIwMDQwNDEzMjEwMFowDTELMAkGA1UEBhMC
+Y24wggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQC5CuNhU2OYdwlrd/04
+FOgXbzGLs/RoWToWq39Q4WSEVr7uaEBktFEyaoCcLYwsWm9ziF0ms5SLqNDptFiS
+c8Ftt1YJdTS+SfFRg+CWUyq8LmA9NsT/X6/Oy3Q/4398stzWdU1X6KXnWr4UIkDo
+wLDJEFCBkJCqK6w0mg92rzu8pPfIzQ8kvTy0ECK0DfRqe7+9YVPVCrR5ZItln/nu
+MnccQ9fQPL0pvTUXkWFAh/GupaUJkvA69SiAfzXJohtdCdrKrxV3m76kHHxoRlcf
+z1MRHq/r1DWBHLuV6c8p22TW4fAke0i/qwjEroiDRGX2MohCaHTJT3xxIbopznBF
+7GfJAgMBAAEwDQYJKoZIhvcNAQELBQADggEBAJOz1oyP6TmRc2t7LcUzfEBFdKGZ
+PRyF7cin8o+c/IBl8svViTFk4dmdGAoeGpRQsn1i+J+AOKNBSRdyydgEFvE8qopR
+h7c9DzfSssf00eRAgdg8oCz2fOXDtLPwBTMe52q8MdJRe4OelRjNFs4VRpyVgZVQ
+13+GMgcj1E8taGqqSBqLccujWNJW1bsoesLzb6bYQWe8WrCPTQxB1NLIoYTZvXoK
+AkHShUEJV502UOxeRhjDjYeearlILIoV82sczXmlFNrhuiYxgIYa9tCywsagenRe
+6/WANyQP5nmCky0odbK0Uh7XweppFdb76FrooWVcd94HZaJBV7PnNdLoj/8=
+-----END CERTIFICATE-----
diff --git a/pulsar-common/src/test/resources/ca-cert.srl b/pulsar-common/src/test/resources/ca-cert.srl
new file mode 100644
index 0000000..aee9981
--- /dev/null
+++ b/pulsar-common/src/test/resources/ca-cert.srl
@@ -0,0 +1 @@
+A30DEADB8FD23BED
diff --git a/pulsar-common/src/test/resources/ca-key b/pulsar-common/src/test/resources/ca-key
new file mode 100644
index 0000000..485d220
--- /dev/null
+++ b/pulsar-common/src/test/resources/ca-key
@@ -0,0 +1,30 @@
+-----BEGIN ENCRYPTED PRIVATE KEY-----
+MIIFHzBJBgkqhkiG9w0BBQ0wPDAbBgkqhkiG9w0BBQwwDgQII+lV8LqZ9n4CAggA
+MB0GCWCGSAFlAwQBKgQQOGncQpnXEogdhApthe09awSCBNCErYMzOQEblSjp2HUq
+whuE1l7EUp4Et3cCtMenXoGlNfzMG0llnCmbIJA4j13X2IfGGpYRKNEbkeUUAuX5
+Er8nwWBZw3ux3iD4zYUl2Q69tcnC62eQaA5+Zj5T58i0ptxYXNTZ0p+q9ytMSP30
+9gb8KwJQdXiKZU5UxflLb9TrU7OWi3Ucnjbw4YYmRkwRZGZ6P+fUSC2FpixzI0J6
+73yBI36zfTvFJR5bslIFw2CSHbIFZeJ535oiLVqzOzjJHEFZ+OTeuN+Vh8Cktz3Y
+KAOdD30knci64vugxo5iLLWc/IXQRcuTBNHskPsym5ahYVSM9/+JqMw4OL2xyDbR
++YCdx3iER4PD9ErCXSCdMLsx4izuuULQGAyONtnj9awUwmzKMqEbtr6lLFChgyEc
+TiWpm5sLZPP1SiyjGKnhesiWRyB25b6iC/fSktf10Nrl+Fb1YJeLLSufG+ZVpy5h
+sN2cPkAnymRw9WyEaitkUqNI52GfClOYPJ0H0bU1LssUs2+d8HRHz6GWb3oW2IX8
+046PE6y0kIuUYEryTQ1lzmLdREIOG2yfkcL7ywN1WBwtqPZBV69peA8P3M3T9VEz
+nnTtEL/5VU0M3Bsm6GM8j9fmJDBmBG+6E0hUD6JCuDeBBvTBuknZIbiBf704DjyI
+qPDZhAkkVfD7dylm96GJLn38PrPk8sQPa3IR4zAXga7YHkXvh7HuYc0V9tZNn7mI
+/f3XoDV2wk279TAr8NLDgLGQHK0K1tDTirJXf09KxWj4zZOC914SeASjEhzQZ6se
+K3PbG4ZQnJ4+dAsY9K6Qgc9BxjyeInCdsDoDROtjqNfKSkcSelKhkp8VEJBYinWT
+PcBHt6/1iQvB1fyp+OexBjq9CiUDg/Be2QTUZfVqCkyIVptPnSvPyTg67pvWl4M6
+uRHxsufQ35WsZAhqEr5eQ7mPAvem7XCUJ14hPz8/f5Qm2+llPUVIq9Rpa83GP/TI
+P9W9F6tRj0qpW+QpPlxmyISf6oHiE9IOGmRkdFV0m54JiSQsqGqXr/NCH423LzFd
+21TJVKH02+v0Tgu1A1+HT+dEKQOorBZ3/HQ2NuQuYi+rF3adNEcMVmfy6DlnHkYr
+lDIG0exVC6Bveuzs4BCupKz/g7CbNzAEOO9XwcD/crNRnjE+nzdu/SDNtw5vIlKA
+hSSzEcsgVkqGbeaV5fASgb3pAz50xJHwvX6O4cAOvbcemjUGyd16IxIdI5jPRVvh
+u1BiK3YwSsdtg8sQ54YVbirgQ6SWKIXdN+79luksimbUVnEu8VJS1fu9H5ojefAd
+J9hMeiGht+6LKvyPh6Sa++bCfYRjZmbkX4h6Afc3Wwibh7KnfpAUlt4QzqA7o+x4
+7rCaI/w/uK+EFaqtn67TowAg/iq6Lxd7i9l06JBSC/BA+Hsw6tS86f13qPg2OtTK
+GydNfxnGtfZIMsUUtfldp9mB+afRFqX49joEGGmb2vnm4Q09QaGP5tagpJboIAqO
+c/pxtGWzqokR8sdAwX0oAb1vsPrpY3sbUGqSYJfVR6s4SMXJdbSSiIKzuwrcO+HX
+TSiq2yGGfJBl5bh9E8cnH4NifAJC4kXsBERwy+Ahq/64MRps3EW2tFl6nPuA+HMl
+IXg4wepHtC7w7W9nJ5Tw3b6X4g==
+-----END ENCRYPTED PRIVATE KEY-----
diff --git a/pulsar-common/src/test/resources/cert-file b/pulsar-common/src/test/resources/cert-file
new file mode 100644
index 0000000..7d94877
--- /dev/null
+++ b/pulsar-common/src/test/resources/cert-file
@@ -0,0 +1,17 @@
+-----BEGIN NEW CERTIFICATE REQUEST-----
+MIICpTCCAmECAQAwbzEQMA4GA1UEBhMHVW5rbm93bjEQMA4GA1UECBMHVW5rbm93
+bjEQMA4GA1UEBxMHVW5rbm93bjEQMA4GA1UEChMHVW5rbm93bjEQMA4GA1UECxMH
+VW5rbm93bjETMBEGA1UEAxMKY2xpZW50dXNlcjCCAbcwggEsBgcqhkjOOAQBMIIB
+HwKBgQD9f1OBHXUSKVLfSpwu7OTn9hG3UjzvRADDHj+AtlEmaUVdQCJR+1k9jVj6
+v8X1ujD2y5tVbNeBO4AdNG/yZmC3a5lQpaSfn+gEexAiwk+7qdf+t8Yb+DtX58ao
+phUPBPuD9tPFHsMCNVQTWhaRMvZ1864rYdcq7/IiAxmd0UgBxwIVAJdgUI8VIwvM
+spK5gqLrhAvwWBz1AoGBAPfhoIXWmz3ey7yrXDa4V7l5lK+7+jrqgvlXTAs9B4Jn
+UVlXjrrUWU/mcQcQgYC0SRZxI+hMKBYTt88JMozIpuE8FnqLVHyNKOCjrh4rs6Z1
+kW6jfwv6ITVi8ftiegEkO8yk8b6oUZCJqIPf4VrlnwaSi2ZegHtVJWQBTDv+z0kq
+A4GEAAKBgBmF8WdZ9Yv1Sf2qjqF19DUSY3YB67B0azz+689y8lZw0tlnSuej0bBE
+NIP6lvgC/PIPFdxvkInZOgB3TsWwkpxHzKbFZTo2Yg2txZ1IH4KX1QggePeybi2m
+E2soysZ2/r3nX2ZSOTdzDLicVo3yyKAuM8u14N0zBeJR9NMdOG1NoDAwLgYJKoZI
+hvcNAQkOMSEwHzAdBgNVHQ4EFgQUXx44DNZ7cUAoduGpv/MC+d5noyIwCwYHKoZI
+zjgEAwUAAzEAMC4CFQCQ2BDtunGs9G0Ra+16OHPaWAI6+QIVAIrGtZWtGka43D+3
+GqOEI5+wGsbh
+-----END NEW CERTIFICATE REQUEST-----
diff --git a/pulsar-common/src/test/resources/cert-signed b/pulsar-common/src/test/resources/cert-signed
new file mode 100644
index 0000000..20db5c0
--- /dev/null
+++ b/pulsar-common/src/test/resources/cert-signed
@@ -0,0 +1,22 @@
+-----BEGIN CERTIFICATE-----
+MIIDjzCCAncCCQCjDerbj9I77TANBgkqhkiG9w0BAQUFADANMQswCQYDVQQGEwJj
+bjAgFw0yMDA0MjgxMzI4NDJaGA8yMTIwMDQwNDEzMjg0MlowbzEQMA4GA1UEBhMH
+VW5rbm93bjEQMA4GA1UECBMHVW5rbm93bjEQMA4GA1UEBxMHVW5rbm93bjEQMA4G
+A1UEChMHVW5rbm93bjEQMA4GA1UECxMHVW5rbm93bjETMBEGA1UEAxMKY2xpZW50
+dXNlcjCCAbcwggEsBgcqhkjOOAQBMIIBHwKBgQD9f1OBHXUSKVLfSpwu7OTn9hG3
+UjzvRADDHj+AtlEmaUVdQCJR+1k9jVj6v8X1ujD2y5tVbNeBO4AdNG/yZmC3a5lQ
+paSfn+gEexAiwk+7qdf+t8Yb+DtX58aophUPBPuD9tPFHsMCNVQTWhaRMvZ1864r
+Ydcq7/IiAxmd0UgBxwIVAJdgUI8VIwvMspK5gqLrhAvwWBz1AoGBAPfhoIXWmz3e
+y7yrXDa4V7l5lK+7+jrqgvlXTAs9B4JnUVlXjrrUWU/mcQcQgYC0SRZxI+hMKBYT
+t88JMozIpuE8FnqLVHyNKOCjrh4rs6Z1kW6jfwv6ITVi8ftiegEkO8yk8b6oUZCJ
+qIPf4VrlnwaSi2ZegHtVJWQBTDv+z0kqA4GEAAKBgBmF8WdZ9Yv1Sf2qjqF19DUS
+Y3YB67B0azz+689y8lZw0tlnSuej0bBENIP6lvgC/PIPFdxvkInZOgB3TsWwkpxH
+zKbFZTo2Yg2txZ1IH4KX1QggePeybi2mE2soysZ2/r3nX2ZSOTdzDLicVo3yyKAu
+M8u14N0zBeJR9NMdOG1NMA0GCSqGSIb3DQEBBQUAA4IBAQAlf/MlmkGXvOHi68LU
+FoRoDh0UMUVUMYcpf+LicZkOveD0r5J5z6igDQZ5qT7RMfkSkM8pSl7xcuPNzNkT
+teeH29QOaTiYax+T9yAT9p/i2/DwiLbrcSdPT8UKOy5CVPHEtlreHupiezaID0Op
+IFaeuvBaI/HSbRZQ2IdCXTnXSQ+8rkrcoxDyIi9wjaEnWKwAqphgq0C9icNVMleu
+Lz3Wz51Xn03DQTH9uOtZu6kQYzfAEi7Z0hKF98TQ3BmwEwCRf+h5kE3wFbuT9QFh
+uHLeCvNlJoaajT2Qud0YWkIN+z1yVKzT3NdndmNm5SuM2Mzec3b4PHSLpmW0Cnwv
+UW4t
+-----END CERTIFICATE-----
diff --git a/pulsar-common/src/test/resources/client.keystore.jks b/pulsar-common/src/test/resources/client.keystore.jks
new file mode 100644
index 0000000..499c8be
Binary files /dev/null and b/pulsar-common/src/test/resources/client.keystore.jks differ
diff --git a/pulsar-common/src/test/resources/client.truststore.jks b/pulsar-common/src/test/resources/client.truststore.jks
new file mode 100644
index 0000000..8eaa06b
Binary files /dev/null and b/pulsar-common/src/test/resources/client.truststore.jks differ
diff --git a/pulsar-common/src/test/resources/clientKeyStorePW.txt b/pulsar-common/src/test/resources/clientKeyStorePW.txt
new file mode 100644
index 0000000..90d2950
--- /dev/null
+++ b/pulsar-common/src/test/resources/clientKeyStorePW.txt
@@ -0,0 +1 @@
+111111
diff --git a/pulsar-common/src/test/resources/clientTrustStorePW.txt b/pulsar-common/src/test/resources/clientTrustStorePW.txt
new file mode 100644
index 0000000..90d2950
--- /dev/null
+++ b/pulsar-common/src/test/resources/clientTrustStorePW.txt
@@ -0,0 +1 @@
+111111
diff --git a/pulsar-common/src/test/resources/old/broker.keystore.jks b/pulsar-common/src/test/resources/old/broker.keystore.jks
new file mode 100644
index 0000000..d4526ac
Binary files /dev/null and b/pulsar-common/src/test/resources/old/broker.keystore.jks differ
diff --git a/pulsar-common/src/test/resources/old/broker.truststore.jks b/pulsar-common/src/test/resources/old/broker.truststore.jks
new file mode 100644
index 0000000..0c0e694
Binary files /dev/null and b/pulsar-common/src/test/resources/old/broker.truststore.jks differ
diff --git a/pulsar-common/src/test/resources/old/brokerKeyStorePW.txt b/pulsar-common/src/test/resources/old/brokerKeyStorePW.txt
new file mode 100644
index 0000000..bdc331e
--- /dev/null
+++ b/pulsar-common/src/test/resources/old/brokerKeyStorePW.txt
@@ -0,0 +1 @@
+broker
diff --git a/pulsar-common/src/test/resources/old/brokerTrustStorePW.txt b/pulsar-common/src/test/resources/old/brokerTrustStorePW.txt
new file mode 100644
index 0000000..bdc331e
--- /dev/null
+++ b/pulsar-common/src/test/resources/old/brokerTrustStorePW.txt
@@ -0,0 +1 @@
+broker
diff --git a/pulsar-common/src/test/resources/old/client.keystore.jks b/pulsar-common/src/test/resources/old/client.keystore.jks
new file mode 100644
index 0000000..e5d074e
Binary files /dev/null and b/pulsar-common/src/test/resources/old/client.keystore.jks differ
diff --git a/pulsar-common/src/test/resources/old/client.truststore.jks b/pulsar-common/src/test/resources/old/client.truststore.jks
new file mode 100644
index 0000000..36f9d72
Binary files /dev/null and b/pulsar-common/src/test/resources/old/client.truststore.jks differ
diff --git a/pulsar-common/src/test/resources/old/clientKeyStorePW.txt b/pulsar-common/src/test/resources/old/clientKeyStorePW.txt
new file mode 100644
index 0000000..b051c6c
--- /dev/null
+++ b/pulsar-common/src/test/resources/old/clientKeyStorePW.txt
@@ -0,0 +1 @@
+client
diff --git a/pulsar-common/src/test/resources/old/clientTrustStorePW.txt b/pulsar-common/src/test/resources/old/clientTrustStorePW.txt
new file mode 100644
index 0000000..b051c6c
--- /dev/null
+++ b/pulsar-common/src/test/resources/old/clientTrustStorePW.txt
@@ -0,0 +1 @@
+client
diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServiceChannelInitializer.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServiceChannelInitializer.java
index 6052f2b..250259b 100644
--- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServiceChannelInitializer.java
+++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServiceChannelInitializer.java
@@ -18,8 +18,11 @@
  */
 package org.apache.pulsar.discovery.service;
 
+import io.netty.handler.ssl.SslHandler;
 import org.apache.pulsar.common.protocol.Commands;
-import org.apache.pulsar.common.util.NettySslContextBuilder;
+import org.apache.pulsar.common.util.NettyServerSslContextBuilder;
+import org.apache.pulsar.common.util.SslContextAutoRefreshBuilder;
+import org.apache.pulsar.common.util.keystoretls.NettySSLContextAutoRefreshBuilder;
 import org.apache.pulsar.discovery.service.server.ServiceConfig;
 
 import io.netty.channel.ChannelInitializer;
@@ -36,19 +39,38 @@ public class ServiceChannelInitializer extends ChannelInitializer<SocketChannel>
     public static final String TLS_HANDLER = "tls";
     private final DiscoveryService discoveryService;
     private final boolean enableTls;
-    private final NettySslContextBuilder sslCtxRefresher;
+    private final boolean tlsEnabledWithKeyStore;
+    private SslContextAutoRefreshBuilder<SslContext> sslCtxRefresher;
+    private NettySSLContextAutoRefreshBuilder nettySSLContextAutoRefreshBuilder;
 
     public ServiceChannelInitializer(DiscoveryService discoveryService, ServiceConfig serviceConfig, boolean e)
             throws Exception {
         super();
         this.discoveryService = discoveryService;
         this.enableTls = e;
+        this.tlsEnabledWithKeyStore = serviceConfig.isTlsEnabledWithKeyStore();
         if (this.enableTls) {
-            sslCtxRefresher = new NettySslContextBuilder(serviceConfig.isTlsAllowInsecureConnection(),
-                    serviceConfig.getTlsTrustCertsFilePath(), serviceConfig.getTlsCertificateFilePath(),
-                    serviceConfig.getTlsKeyFilePath(), serviceConfig.getTlsCiphers(), serviceConfig.getTlsProtocols(),
-                    serviceConfig.getTlsRequireTrustedClientCertOnConnect(),
-                    serviceConfig.getTlsCertRefreshCheckDurationSec());
+            if (tlsEnabledWithKeyStore) {
+                nettySSLContextAutoRefreshBuilder = new NettySSLContextAutoRefreshBuilder(
+                        serviceConfig.getTlsProvider(),
+                        serviceConfig.getTlsKeyStoreType(),
+                        serviceConfig.getTlsKeyStore(),
+                        serviceConfig.getTlsKeyStorePassword(),
+                        serviceConfig.isTlsAllowInsecureConnection(),
+                        serviceConfig.getTlsTrustStoreType(),
+                        serviceConfig.getTlsTrustStore(),
+                        serviceConfig.getTlsTrustStorePassword(),
+                        serviceConfig.isTlsRequireTrustedClientCertOnConnect(),
+                        serviceConfig.getTlsCiphers(),
+                        serviceConfig.getTlsProtocols(),
+                        serviceConfig.getTlsCertRefreshCheckDurationSec());
+            } else {
+                sslCtxRefresher = new NettyServerSslContextBuilder(serviceConfig.isTlsAllowInsecureConnection(),
+                        serviceConfig.getTlsTrustCertsFilePath(), serviceConfig.getTlsCertificateFilePath(),
+                        serviceConfig.getTlsKeyFilePath(), serviceConfig.getTlsCiphers(), serviceConfig.getTlsProtocols(),
+                        serviceConfig.isTlsRequireTrustedClientCertOnConnect(),
+                        serviceConfig.getTlsCertRefreshCheckDurationSec());
+            }
         } else {
             this.sslCtxRefresher = null;
         }
@@ -57,9 +79,14 @@ public class ServiceChannelInitializer extends ChannelInitializer<SocketChannel>
     @Override
     protected void initChannel(SocketChannel ch) throws Exception {
         if (sslCtxRefresher != null && this.enableTls) {
-            SslContext sslContext = sslCtxRefresher.get();
-            if (sslContext != null) {
-                ch.pipeline().addLast(TLS_HANDLER, sslContext.newHandler(ch.alloc()));
+            if (this.tlsEnabledWithKeyStore) {
+                ch.pipeline().addLast(TLS_HANDLER,
+                        new SslHandler(nettySSLContextAutoRefreshBuilder.get().createSSLEngine()));
+            } else{
+                SslContext sslContext = sslCtxRefresher.get();
+                if (sslContext != null) {
+                    ch.pipeline().addLast(TLS_HANDLER, sslContext.newHandler(ch.alloc()));
+                }
             }
         }
         ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServerManager.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServerManager.java
index fa4761c..0c0bb95 100644
--- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServerManager.java
+++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServerManager.java
@@ -19,17 +19,15 @@
 package org.apache.pulsar.discovery.service.server;
 
 import com.google.common.collect.Lists;
-
 import java.net.URI;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.TimeZone;
-
 import javax.servlet.Servlet;
-
 import org.apache.pulsar.common.util.RestException;
 import org.apache.pulsar.common.util.SecurityUtility;
+import org.apache.pulsar.common.util.keystoretls.KeyStoreSSLContext;
 import org.eclipse.jetty.server.Handler;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.ServerConnector;
@@ -74,14 +72,30 @@ public class ServerManager {
 
         if (config.getWebServicePortTls().isPresent()) {
             try {
-                SslContextFactory sslCtxFactory = SecurityUtility.createSslContextFactory(
-                        config.isTlsAllowInsecureConnection(),
-                        config.getTlsTrustCertsFilePath(),
-                        config.getTlsCertificateFilePath(),
-                        config.getTlsKeyFilePath(),
-                        config.getTlsRequireTrustedClientCertOnConnect(),
-                        true,
-                        config.getTlsCertRefreshCheckDurationSec());
+                SslContextFactory sslCtxFactory;
+                if (config.isTlsEnabledWithKeyStore()) {
+                    sslCtxFactory = KeyStoreSSLContext.createSslContextFactory(
+                            config.getTlsProvider(),
+                            config.getTlsKeyStoreType(),
+                            config.getTlsKeyStore(),
+                            config.getTlsKeyStorePassword(),
+                            config.isTlsAllowInsecureConnection(),
+                            config.getTlsTrustStoreType(),
+                            config.getTlsTrustStore(),
+                            config.getTlsTrustStorePassword(),
+                            config.isTlsRequireTrustedClientCertOnConnect(),
+                            config.getTlsCertRefreshCheckDurationSec()
+                    );
+                } else {
+                    sslCtxFactory = SecurityUtility.createSslContextFactory(
+                            config.isTlsAllowInsecureConnection(),
+                            config.getTlsTrustCertsFilePath(),
+                            config.getTlsCertificateFilePath(),
+                            config.getTlsKeyFilePath(),
+                            config.isTlsRequireTrustedClientCertOnConnect(),
+                            true,
+                            config.getTlsCertRefreshCheckDurationSec());
+                }
                 connectorTls = new ServerConnector(server, 1, 1, sslCtxFactory);
                 connectorTls.setPort(config.getWebServicePortTls().get());
                 connectors.add(connectorTls);
diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java
index 30269e4..57e18fa 100644
--- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java
+++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java
@@ -22,6 +22,7 @@ import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 
+import lombok.Data;
 import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
 import org.apache.pulsar.common.configuration.PulsarConfiguration;
 import org.apache.pulsar.discovery.service.web.DiscoveryServiceServlet;
@@ -32,6 +33,7 @@ import com.google.common.collect.Sets;
  * Service Configuration to start :{@link DiscoveryServiceServlet}
  *
  */
+@Data
 public class ServiceConfig implements PulsarConfiguration {
 
     // Local-Zookeeper quorum connection string
@@ -81,7 +83,7 @@ public class ServiceConfig implements PulsarConfiguration {
     /***** --- TLS --- ****/
     @Deprecated
     private boolean tlsEnabled = false;
-    // Tls cert refresh duration in seconds (set 0 to check on every new connection) 
+    // Tls cert refresh duration in seconds (set 0 to check on every new connection)
     private long tlsCertRefreshCheckDurationSec = 300;
     // Path for the TLS certificate file
     private String tlsCertificateFilePath;
@@ -101,217 +103,27 @@ public class ServiceConfig implements PulsarConfiguration {
     // Reject the Connection if the Client Certificate is not trusted.
     private boolean tlsRequireTrustedClientCertOnConnect = false;
 
-    private Properties properties = new Properties();
-
-    public String getZookeeperServers() {
-        return zookeeperServers;
-    }
-
-    public void setZookeeperServers(String zookeeperServers) {
-        this.zookeeperServers = zookeeperServers;
-    }
-
-    @Deprecated
-    public String getGlobalZookeeperServers() {
-        return globalZookeeperServers;
-    }
+    /***** --- TLS with KeyStore--- ****/
+    // Enable TLS with KeyStore type configuration in broker
+    private boolean tlsEnabledWithKeyStore = false;
+    // TLS Provider
+    private String tlsProvider = null;
+    // TLS KeyStore type configuration in broker: JKS, PKCS12
+    private String tlsKeyStoreType = "JKS";
+    // TLS KeyStore path in broker
+    private String tlsKeyStore = null;
+    // TLS KeyStore password in broker
+    private String tlsKeyStorePassword = null;
+    // TLS TrustStore type configuration in broker: JKS, PKCS12
+    private String tlsTrustStoreType = "JKS";
+    // TLS TrustStore path in broker
+    private String tlsTrustStore = null;
+    // TLS TrustStore password in broker"
+    private String tlsTrustStorePassword = null;
 
-    @Deprecated
-    public void setGlobalZookeeperServers(String globalZookeeperServers) {
-        this.globalZookeeperServers = globalZookeeperServers;
-    }
+    private Properties properties = new Properties();
 
     public String getConfigurationStoreServers() {
         return null == configurationStoreServers ? getGlobalZookeeperServers() : configurationStoreServers;
     }
-
-    public void setConfigurationStoreServers(String configurationStoreServers) {
-        this.configurationStoreServers = configurationStoreServers;
-    }
-
-    public int getZookeeperSessionTimeoutMs() {
-        return zookeeperSessionTimeoutMs;
-    }
-
-    public void setZookeeperSessionTimeoutMs(int zookeeperSessionTimeoutMs) {
-        this.zookeeperSessionTimeoutMs = zookeeperSessionTimeoutMs;
-    }
-
-    public int getZooKeeperCacheExpirySeconds() {
-        return zooKeeperCacheExpirySeconds;
-    }
-
-    public void setZooKeeperCacheExpirySeconds(int zooKeeperCacheExpirySeconds) {
-        this.zooKeeperCacheExpirySeconds = zooKeeperCacheExpirySeconds;
-    }
-
-    public Optional<Integer> getServicePort() {
-        return servicePort;
-    }
-
-    public void setServicePort(Optional<Integer> servicePort) {
-        this.servicePort = servicePort;
-    }
-
-    public Optional<Integer> getServicePortTls() {
-        return servicePortTls;
-    }
-
-    public void setServicePortTls(Optional<Integer> servicePortTls) {
-        this.servicePortTls = servicePortTls;
-    }
-
-    public Optional<Integer> getWebServicePort() {
-        return webServicePort;
-    }
-
-    public void setWebServicePort(Optional<Integer> webServicePort) {
-        this.webServicePort = webServicePort;
-    }
-
-    public Optional<Integer> getWebServicePortTls() {
-        return webServicePortTls;
-    }
-
-    public void setWebServicePortTls(Optional<Integer> webServicePortTls) {
-        this.webServicePortTls = webServicePortTls;
-    }
-
-    @Deprecated
-    public boolean isTlsEnabled() {
-        return tlsEnabled || webServicePortTls.isPresent() || servicePortTls.isPresent();
-    }
-
-    @Deprecated
-    public void setTlsEnabled(boolean tlsEnabled) {
-        this.tlsEnabled = tlsEnabled;
-    }
-
-    public String getTlsCertificateFilePath() {
-        return tlsCertificateFilePath;
-    }
-
-    public void setTlsCertificateFilePath(String tlsCertificateFilePath) {
-        this.tlsCertificateFilePath = tlsCertificateFilePath;
-    }
-
-    public String getTlsKeyFilePath() {
-        return tlsKeyFilePath;
-    }
-
-    public void setTlsKeyFilePath(String tlsKeyFilePath) {
-        this.tlsKeyFilePath = tlsKeyFilePath;
-    }
-
-    public String getTlsTrustCertsFilePath() {
-        return tlsTrustCertsFilePath;
-    }
-
-    public void setTlsTrustCertsFilePath(String tlsTrustCertsFilePath) {
-        this.tlsTrustCertsFilePath = tlsTrustCertsFilePath;
-    }
-
-    public boolean isTlsAllowInsecureConnection() {
-        return tlsAllowInsecureConnection;
-    }
-
-    public void setTlsAllowInsecureConnection(boolean tlsAllowInsecureConnection) {
-        this.tlsAllowInsecureConnection = tlsAllowInsecureConnection;
-    }
-
-    public boolean isBindOnLocalhost() {
-        return bindOnLocalhost;
-    }
-
-    public void setBindOnLocalhost(boolean bindOnLocalhost) {
-        this.bindOnLocalhost = bindOnLocalhost;
-    }
-
-    public boolean isAuthenticationEnabled() {
-        return authenticationEnabled;
-    }
-
-    public void setAuthenticationEnabled(boolean authenticationEnabled) {
-        this.authenticationEnabled = authenticationEnabled;
-    }
-
-    public Set<String> getAuthenticationProviders() {
-        return authenticationProviders;
-    }
-
-    public void setAuthenticationProviders(Set<String> authenticationProviders) {
-        this.authenticationProviders = authenticationProviders;
-    }
-
-    public boolean isAuthorizationEnabled() {
-        return authorizationEnabled;
-    }
-
-    public void setAuthorizationEnabled(boolean authorizationEnabled) {
-        this.authorizationEnabled = authorizationEnabled;
-    }
-
-    public String getAuthorizationProvider() {
-        return authorizationProvider;
-    }
-
-    public void setAuthorizationProvider(String authorizationProvider) {
-        this.authorizationProvider = authorizationProvider;
-    }
-
-    public Set<String> getSuperUserRoles() {
-        return superUserRoles;
-    }
-
-    public void setSuperUserRoles(Set<String> superUserRoles) {
-        this.superUserRoles = superUserRoles;
-    }
-
-    public boolean getAuthorizationAllowWildcardsMatching() {
-        return authorizationAllowWildcardsMatching;
-    }
-
-    public void setAuthorizationAllowWildcardsMatching(boolean authorizationAllowWildcardsMatching) {
-        this.authorizationAllowWildcardsMatching = authorizationAllowWildcardsMatching;
-    }
-
-    public Properties getProperties() {
-        return properties;
-    }
-
-    public void setProperties(Properties properties) {
-        this.properties = properties;
-    }
-
-    public Set<String> getTlsProtocols() {
-        return tlsProtocols;
-    }
-
-    public void setTlsProtocols(Set<String> tlsProtocols) {
-        this.tlsProtocols = tlsProtocols;
-    }
-
-    public long getTlsCertRefreshCheckDurationSec() {
-        return tlsCertRefreshCheckDurationSec;
-    }
-
-    public void setTlsCertRefreshCheckDurationSec(long tlsCertRefreshCheckDurationSec) {
-        this.tlsCertRefreshCheckDurationSec = tlsCertRefreshCheckDurationSec;
-    }
-
-    public Set<String> getTlsCiphers() {
-        return tlsCiphers;
-    }
-
-    public void setTlsCiphers(Set<String> tlsCiphers) {
-        this.tlsCiphers = tlsCiphers;
-    }
-
-    public boolean getTlsRequireTrustedClientCertOnConnect() {
-        return tlsRequireTrustedClientCertOnConnect;
-    }
-
-    public void setTlsRequireTrustedClientCertOnConnect(boolean tlsRequireTrustedClientCertOnConnect) {
-        this.tlsRequireTrustedClientCertOnConnect = tlsRequireTrustedClientCertOnConnect;
-    }
 }
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index 989b726..e786ba8 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -33,10 +33,10 @@ import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.FutureListener;
+import java.util.function.Supplier;
 import lombok.Getter;
 
 import java.net.URI;
@@ -76,12 +76,12 @@ public class DirectProxyHandler {
     public static final String TLS_HANDLER = "tls";
 
     private final Authentication authentication;
-    private final SslContext sslCtx;
+    private final Supplier<SslHandler> sslHandlerSupplier;
     private AuthenticationDataProvider authenticationDataProvider;
     private ProxyService service;
 
     public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection, String targetBrokerUrl,
-            int protocolVersion, SslContext sslCtx) {
+            int protocolVersion, Supplier<SslHandler> sslHandlerSupplier) {
         this.service = service;
         this.authentication = proxyConnection.getClientAuthentication();
         this.inboundChannel = proxyConnection.ctx().channel();
@@ -90,7 +90,7 @@ public class DirectProxyHandler {
         this.clientAuthData = proxyConnection.clientAuthData;
         this.clientAuthMethod = proxyConnection.clientAuthMethod;
         this.protocolVersion = protocolVersion;
-        this.sslCtx = sslCtx;
+        this.sslHandlerSupplier = sslHandlerSupplier;
         ProxyConfiguration config = service.getConfiguration();
 
         // Start the connection attempt.
@@ -103,8 +103,8 @@ public class DirectProxyHandler {
         b.handler(new ChannelInitializer<SocketChannel>() {
             @Override
             protected void initChannel(SocketChannel ch) throws Exception {
-                if (sslCtx != null) {
-                    ch.pipeline().addLast(TLS_HANDLER, sslCtx.newHandler(ch.alloc()));
+                if (sslHandlerSupplier != null) {
+                    ch.pipeline().addLast(TLS_HANDLER, sslHandlerSupplier.get());
                 }
                 ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
                     Commands.DEFAULT_MAX_MESSAGE_SIZE + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index e2f4644..7eac236 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -60,6 +60,8 @@ public class ProxyConfiguration implements PulsarConfiguration {
     @Category
     private static final String CATEGORY_TLS = "TLS";
     @Category
+    private static final String CATEGORY_KEYSTORE_TLS = "KeyStoreTLS";
+    @Category
     private static final String CATEGORY_TOKEN_AUTH = "Token Authentication Provider";
     @Category
     private static final String CATEGORY_HTTP = "HTTP";
@@ -319,7 +321,7 @@ public class ProxyConfiguration implements PulsarConfiguration {
     private Set<String> tlsProtocols = Sets.newTreeSet();
     @FieldContext(
         category = CATEGORY_TLS,
-        doc = "Specify the tls cipher the broker will use to negotiate during TLS Handshake"
+        doc = "Specify the tls cipher the proxy will use to negotiate during TLS Handshake"
             + " (a comma-separated list of ciphers).\n\n"
             + "Examples:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256]"
     )
@@ -331,6 +333,106 @@ public class ProxyConfiguration implements PulsarConfiguration {
     )
     private boolean tlsRequireTrustedClientCertOnConnect = false;
 
+    /**** --- KeyStore TLS config variables --- ****/
+
+    @FieldContext(
+            category = CATEGORY_KEYSTORE_TLS,
+            doc = "Enable TLS with KeyStore type configuration for proxy"
+    )
+    private boolean tlsEnabledWithKeyStore = false;
+
+    @FieldContext(
+            category = CATEGORY_KEYSTORE_TLS,
+            doc = "TLS Provider"
+    )
+    private String tlsProvider = null;
+
+    @FieldContext(
+            category = CATEGORY_KEYSTORE_TLS,
+            doc = "TLS KeyStore type configuration for proxy: JKS, PKCS12"
+    )
+    private String tlsKeyStoreType = "JKS";
+
+    @FieldContext(
+            category = CATEGORY_KEYSTORE_TLS,
+            doc = "TLS KeyStore path for proxy"
+    )
+    private String tlsKeyStore = null;
+
+    @FieldContext(
+            category = CATEGORY_KEYSTORE_TLS,
+            doc = "TLS KeyStore password for proxy"
+    )
+    private String tlsKeyStorePassword = null;
+
+    @FieldContext(
+            category = CATEGORY_KEYSTORE_TLS,
+            doc = "TLS TrustStore type configuration for proxy: JKS, PKCS12"
+    )
+    private String tlsTrustStoreType = "JKS";
+
+    @FieldContext(
+            category = CATEGORY_KEYSTORE_TLS,
+            doc = "TLS TrustStore path for proxy"
+    )
+    private String tlsTrustStore = null;
+
+    @FieldContext(
+            category = CATEGORY_KEYSTORE_TLS,
+            doc = "TLS TrustStore password for proxy"
+    )
+    private String tlsTrustStorePassword = null;
+
+    /**** --- KeyStore TLS config variables used for proxy to auth with broker--- ****/
+    @FieldContext(
+            category = CATEGORY_KEYSTORE_TLS,
+            doc = "Whether the Pulsar proxy use KeyStore type to authenticate with Pulsar brokers"
+    )
+    private boolean brokerClientTlsEnabledWithKeyStore = false;
+    @FieldContext(
+            category = CATEGORY_KEYSTORE_TLS,
+            doc = "The TLS Provider used by the Pulsar proxy to authenticate with Pulsar brokers"
+    )
+    private String brokerClientSslProvider = null;
+
+    // needed when client auth is required
+    @FieldContext(
+            category = CATEGORY_KEYSTORE_TLS,
+            doc = "TLS TrustStore type configuration for proxy: JKS, PKCS12 "
+                  + " used by the Pulsar proxy to authenticate with Pulsar brokers"
+    )
+    private String brokerClientTlsTrustStoreType = "JKS";
+    @FieldContext(
+            category = CATEGORY_KEYSTORE_TLS,
+            doc = "TLS TrustStore path for proxy, "
+                  + " used by the Pulsar proxy to authenticate with Pulsar brokers"
+    )
+    private String brokerClientTlsTrustStore = null;
+    @FieldContext(
+            category = CATEGORY_KEYSTORE_TLS,
+            doc = "TLS TrustStore password for proxy, "
+                  + " used by the Pulsar proxy to authenticate with Pulsar brokers"
+    )
+    private String brokerClientTlsTrustStorePassword = null;
+    @FieldContext(
+            category = CATEGORY_KEYSTORE_TLS,
+            doc = "Specify the tls cipher the proxy will use to negotiate during TLS Handshake"
+                  + " (a comma-separated list of ciphers).\n\n"
+                  + "Examples:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256].\n"
+                  + " used by the Pulsar proxy to authenticate with Pulsar brokers"
+    )
+    private Set<String> brokerClientTlsCiphers = Sets.newTreeSet();
+    @FieldContext(
+            category = CATEGORY_KEYSTORE_TLS,
+            doc = "Specify the tls protocols the broker will use to negotiate during TLS handshake"
+                  + " (a comma-separated list of protocol names).\n\n"
+                  + "Examples:- [TLSv1.2, TLSv1.1, TLSv1] \n"
+                  + " used by the Pulsar proxy to authenticate with Pulsar brokers"
+    )
+    private Set<String> brokerClientTlsProtocols = Sets.newTreeSet();
+
+    /***** --- HTTP --- ****/
+
     @FieldContext(
         category = CATEGORY_HTTP,
         doc = "Http directs to redirect to non-pulsar services"
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 19fd771..d0bc217 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -23,6 +23,7 @@ import static com.google.common.base.Preconditions.checkArgument;
 import java.net.SocketAddress;
 import java.util.concurrent.TimeUnit;
 
+import java.util.function.Supplier;
 import javax.naming.AuthenticationException;
 import javax.net.ssl.SSLSession;
 
@@ -55,7 +56,6 @@ import org.slf4j.LoggerFactory;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.FutureListener;
@@ -72,7 +72,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
     private Authentication clientAuthentication;
     AuthenticationDataSource authenticationData;
     private State state;
-    private final SslContext sslCtx;
+    private final Supplier<SslHandler> sslHandlerSupplier;
 
     private LookupProxyHandler lookupProxyHandler = null;
     @Getter
@@ -112,11 +112,11 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
         return client.getCnxPool();
     }
 
-    public ProxyConnection(ProxyService proxyService, SslContext sslCtx) {
+    public ProxyConnection(ProxyService proxyService, Supplier<SslHandler> sslHandlerSupplier) {
         super(30, TimeUnit.SECONDS);
         this.service = proxyService;
         this.state = State.Init;
-        this.sslCtx = sslCtx;
+        this.sslHandlerSupplier = sslHandlerSupplier;
     }
 
     @Override
@@ -214,7 +214,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
             // connection there and just pass bytes in both directions
             state = State.ProxyConnectionToBroker;
             directProxyHandler = new DirectProxyHandler(service, this, proxyToBrokerUrl,
-                protocolVersionToAdvertise, sslCtx);
+                protocolVersionToAdvertise, sslHandlerSupplier);
             cancelKeepAliveTask();
         } else {
             // Client is doing a lookup, we can consider the handshake complete
@@ -413,8 +413,15 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
         }
         if (proxyConfig.isTlsEnabledWithBroker()) {
             clientConf.setUseTls(true);
-            clientConf.setTlsTrustCertsFilePath(proxyConfig.getBrokerClientTrustCertsFilePath());
-            clientConf.setTlsAllowInsecureConnection(proxyConfig.isTlsAllowInsecureConnection());
+            if (proxyConfig.isBrokerClientTlsEnabledWithKeyStore()) {
+                clientConf.setUseKeyStoreTls(true);
+                clientConf.setTlsTrustStoreType(proxyConfig.getBrokerClientTlsTrustStoreType());
+                clientConf.setTlsTrustStorePath(proxyConfig.getBrokerClientTlsTrustStore());
+                clientConf.setTlsTrustStorePassword(proxyConfig.getBrokerClientTlsTrustStorePassword());
+            } else {
+                clientConf.setTlsTrustCertsFilePath(proxyConfig.getBrokerClientTrustCertsFilePath());
+                clientConf.setTlsAllowInsecureConnection(proxyConfig.isTlsAllowInsecureConnection());
+            }
         }
         return clientConf;
     }
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
index 156492a..42a5c07 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
@@ -20,16 +20,20 @@ package org.apache.pulsar.proxy.server;
 
 import static org.apache.commons.lang3.StringUtils.isEmpty;
 
+import io.netty.handler.ssl.SslHandler;
+import java.util.function.Supplier;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
 import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.common.protocol.Commands;
-import org.apache.pulsar.common.util.ClientSslContextRefresher;
-import org.apache.pulsar.common.util.NettySslContextBuilder;
+import org.apache.pulsar.common.util.NettyClientSslContextRefresher;
+import org.apache.pulsar.common.util.NettyServerSslContextBuilder;
 
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import io.netty.handler.ssl.SslContext;
+import org.apache.pulsar.common.util.SslContextAutoRefreshBuilder;
+import org.apache.pulsar.common.util.keystoretls.NettySSLContextAutoRefreshBuilder;
 
 /**
  * Initialize service channel handlers.
@@ -39,22 +43,43 @@ public class ServiceChannelInitializer extends ChannelInitializer<SocketChannel>
 
     public static final String TLS_HANDLER = "tls";
     private final ProxyService proxyService;
-    private final NettySslContextBuilder serverSslCtxRefresher;
-    private final ClientSslContextRefresher clientSslCtxRefresher;
     private final boolean enableTls;
+    private final boolean tlsEnabledWithKeyStore;
+
+    private SslContextAutoRefreshBuilder<SslContext> serverSslCtxRefresher;
+    private SslContextAutoRefreshBuilder<SslContext> clientSslCtxRefresher;
+    private NettySSLContextAutoRefreshBuilder serverSSLContextAutoRefreshBuilder;
+    private NettySSLContextAutoRefreshBuilder clientSSLContextAutoRefreshBuilder;
 
     public ServiceChannelInitializer(ProxyService proxyService, ProxyConfiguration serviceConfig, boolean enableTls)
             throws Exception {
         super();
         this.proxyService = proxyService;
         this.enableTls = enableTls;
+        this.tlsEnabledWithKeyStore = serviceConfig.isTlsEnabledWithKeyStore();
 
         if (enableTls) {
-            serverSslCtxRefresher = new NettySslContextBuilder(serviceConfig.isTlsAllowInsecureConnection(),
-                    serviceConfig.getTlsTrustCertsFilePath(), serviceConfig.getTlsCertificateFilePath(),
-                    serviceConfig.getTlsKeyFilePath(), serviceConfig.getTlsCiphers(), serviceConfig.getTlsProtocols(),
-                    serviceConfig.isTlsRequireTrustedClientCertOnConnect(),
-                    serviceConfig.getTlsCertRefreshCheckDurationSec());
+            if (tlsEnabledWithKeyStore) {
+                serverSSLContextAutoRefreshBuilder = new NettySSLContextAutoRefreshBuilder(
+                        serviceConfig.getTlsProvider(),
+                        serviceConfig.getTlsKeyStoreType(),
+                        serviceConfig.getTlsKeyStore(),
+                        serviceConfig.getTlsKeyStorePassword(),
+                        serviceConfig.isTlsAllowInsecureConnection(),
+                        serviceConfig.getTlsTrustStoreType(),
+                        serviceConfig.getTlsTrustStore(),
+                        serviceConfig.getTlsTrustStorePassword(),
+                        serviceConfig.isTlsRequireTrustedClientCertOnConnect(),
+                        serviceConfig.getTlsCiphers(),
+                        serviceConfig.getTlsProtocols(),
+                        serviceConfig.getTlsCertRefreshCheckDurationSec());
+            } else {
+                serverSslCtxRefresher = new NettyServerSslContextBuilder(serviceConfig.isTlsAllowInsecureConnection(),
+                        serviceConfig.getTlsTrustCertsFilePath(), serviceConfig.getTlsCertificateFilePath(),
+                        serviceConfig.getTlsKeyFilePath(), serviceConfig.getTlsCiphers(), serviceConfig.getTlsProtocols(),
+                        serviceConfig.isTlsRequireTrustedClientCertOnConnect(),
+                        serviceConfig.getTlsCertRefreshCheckDurationSec());
+            }
         } else {
             this.serverSslCtxRefresher = null;
         }
@@ -67,9 +92,24 @@ public class ServiceChannelInitializer extends ChannelInitializer<SocketChannel>
                         serviceConfig.getBrokerClientAuthenticationParameters()).getAuthData();
             }
 
-            clientSslCtxRefresher = new ClientSslContextRefresher(serviceConfig.isTlsAllowInsecureConnection(),
-                    serviceConfig.getBrokerClientTrustCertsFilePath(), authData);
-
+            if (tlsEnabledWithKeyStore) {
+                clientSSLContextAutoRefreshBuilder = new NettySSLContextAutoRefreshBuilder(
+                        serviceConfig.getBrokerClientSslProvider(),
+                        serviceConfig.isTlsAllowInsecureConnection(),
+                        serviceConfig.getBrokerClientTlsTrustStoreType(),
+                        serviceConfig.getBrokerClientTlsTrustStore(),
+                        serviceConfig.getBrokerClientTlsTrustStorePassword(),
+                        serviceConfig.getBrokerClientTlsCiphers(),
+                        serviceConfig.getBrokerClientTlsProtocols(),
+                        serviceConfig.getTlsCertRefreshCheckDurationSec(),
+                        authData);
+            } else {
+                clientSslCtxRefresher = new NettyClientSslContextRefresher(
+                        serviceConfig.isTlsAllowInsecureConnection(),
+                        serviceConfig.getBrokerClientTrustCertsFilePath(),
+                        authData,
+                        serviceConfig.getTlsCertRefreshCheckDurationSec());
+            }
         } else {
             this.clientSslCtxRefresher = null;
         }
@@ -82,11 +122,33 @@ public class ServiceChannelInitializer extends ChannelInitializer<SocketChannel>
             if (sslContext != null) {
                 ch.pipeline().addLast(TLS_HANDLER, sslContext.newHandler(ch.alloc()));
             }
+        } else if (this.tlsEnabledWithKeyStore && serverSSLContextAutoRefreshBuilder != null) {
+            ch.pipeline().addLast(TLS_HANDLER,
+                    new SslHandler(serverSSLContextAutoRefreshBuilder.get().createSSLEngine()));
         }
 
         ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
-            Commands.DEFAULT_MAX_MESSAGE_SIZE + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
+                Commands.DEFAULT_MAX_MESSAGE_SIZE + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
+
+        Supplier<SslHandler> sslHandlerSupplier = null;
+        if (clientSslCtxRefresher != null) {
+            sslHandlerSupplier = new Supplier<SslHandler>() {
+                @Override
+                public SslHandler get() {
+                    return clientSslCtxRefresher.get().newHandler(ch.alloc());
+                }
+            };
+        } else if (clientSSLContextAutoRefreshBuilder != null) {
+            sslHandlerSupplier = new Supplier<SslHandler>() {
+                @Override
+                public SslHandler get() {
+                    return new SslHandler(clientSSLContextAutoRefreshBuilder.get().createSSLEngine());
+                }
+            };
+        }
+
         ch.pipeline().addLast("handler",
-                new ProxyConnection(proxyService, clientSslCtxRefresher == null ? null : clientSslCtxRefresher.get()));
+                new ProxyConnection(proxyService, sslHandlerSupplier));
+
     }
 }
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
index 0b8dca2..3a45399 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
@@ -19,9 +19,7 @@
 package org.apache.pulsar.proxy.server;
 
 import com.google.common.collect.Lists;
-
 import io.prometheus.client.jetty.JettyStatisticsCollector;
-
 import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
@@ -31,15 +29,14 @@ import java.util.EnumSet;
 import java.util.List;
 import java.util.Optional;
 import java.util.TimeZone;
-
 import javax.servlet.DispatcherType;
-
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.web.AuthenticationFilter;
 import org.apache.pulsar.broker.web.JsonMapperProvider;
 import org.apache.pulsar.broker.web.WebExecutorThreadPool;
 import org.apache.pulsar.common.util.SecurityUtility;
+import org.apache.pulsar.common.util.keystoretls.KeyStoreSSLContext;
 import org.eclipse.jetty.server.Connector;
 import org.eclipse.jetty.server.Handler;
 import org.eclipse.jetty.server.HttpConfiguration;
@@ -99,14 +96,30 @@ public class WebServer {
         }
         if (config.getWebServicePortTls().isPresent()) {
             try {
-                SslContextFactory sslCtxFactory = SecurityUtility.createSslContextFactory(
-                        config.isTlsAllowInsecureConnection(),
-                        config.getTlsTrustCertsFilePath(),
-                        config.getTlsCertificateFilePath(),
-                        config.getTlsKeyFilePath(),
-                        config.isTlsRequireTrustedClientCertOnConnect(),
-                        true,
-                        config.getTlsCertRefreshCheckDurationSec());
+                SslContextFactory sslCtxFactory;
+                if (config.isTlsEnabledWithKeyStore()) {
+                    sslCtxFactory = KeyStoreSSLContext.createSslContextFactory(
+                            config.getTlsProvider(),
+                            config.getTlsKeyStoreType(),
+                            config.getTlsKeyStore(),
+                            config.getTlsKeyStorePassword(),
+                            config.isTlsAllowInsecureConnection(),
+                            config.getTlsTrustStoreType(),
+                            config.getTlsTrustStore(),
+                            config.getTlsTrustStorePassword(),
+                            config.isTlsRequireTrustedClientCertOnConnect(),
+                            config.getTlsCertRefreshCheckDurationSec()
+                    );
+                } else {
+                    sslCtxFactory = SecurityUtility.createSslContextFactory(
+                            config.isTlsAllowInsecureConnection(),
+                            config.getTlsTrustCertsFilePath(),
+                            config.getTlsCertificateFilePath(),
+                            config.getTlsKeyFilePath(),
+                            config.isTlsRequireTrustedClientCertOnConnect(),
+                            true,
+                            config.getTlsCertRefreshCheckDurationSec());
+                }
                 connectorTls = new ServerConnector(server, 1, 1, sslCtxFactory);
                 connectorTls.setPort(config.getWebServicePortTls().get());
                 connectors.add(connectorTls);
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithAuth.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithAuth.java
new file mode 100644
index 0000000..b082019
--- /dev/null
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithAuth.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.mockito.Mockito.doReturn;
+
+import com.google.common.collect.Sets;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
+import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class ProxyKeyStoreTlsTestWithAuth extends MockedPulsarServiceBaseTest {
+    protected final String BROKER_KEYSTORE_FILE_PATH =
+            "./src/test/resources/authentication/keystoretls/broker.keystore.jks";
+    protected final String BROKER_TRUSTSTORE_FILE_PATH =
+            "./src/test/resources/authentication/keystoretls/broker.truststore.jks";
+    protected final String BROKER_KEYSTORE_PW = "111111";
+    protected final String BROKER_TRUSTSTORE_PW = "111111";
+
+    protected final String CLIENT_KEYSTORE_FILE_PATH =
+            "./src/test/resources/authentication/keystoretls/client.keystore.jks";
+    protected final String CLIENT_TRUSTSTORE_FILE_PATH =
+            "./src/test/resources/authentication/keystoretls/client.truststore.jks";
+    protected final String CLIENT_KEYSTORE_PW = "111111";
+    protected final String CLIENT_TRUSTSTORE_PW = "111111";
+
+    protected final String CLIENT_KEYSTORE_CN = "clientuser";
+    protected final String KEYSTORE_TYPE = "JKS";
+
+    private final String DUMMY_VALUE = "DUMMY_VALUE";
+
+    private ProxyService proxyService;
+    private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+
+    @Override
+    @BeforeMethod
+    protected void setup() throws Exception {
+        internalSetup();
+
+        proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setServicePortTls(Optional.of(0));
+        proxyConfig.setWebServicePort(Optional.of(0));
+        proxyConfig.setWebServicePortTls(Optional.of(0));
+        proxyConfig.setTlsEnabledWithBroker(false);
+
+        proxyConfig.setTlsEnabledWithKeyStore(true);
+        proxyConfig.setTlsKeyStoreType(KEYSTORE_TYPE);
+        proxyConfig.setTlsKeyStore(BROKER_KEYSTORE_FILE_PATH);
+        proxyConfig.setTlsKeyStorePassword(BROKER_KEYSTORE_PW);
+        proxyConfig.setTlsTrustStoreType(KEYSTORE_TYPE);
+        proxyConfig.setTlsTrustStore(CLIENT_TRUSTSTORE_FILE_PATH);
+        proxyConfig.setTlsTrustStorePassword(CLIENT_TRUSTSTORE_PW);
+
+        proxyConfig.setZookeeperServers(DUMMY_VALUE);
+        proxyConfig.setConfigurationStoreServers(DUMMY_VALUE);
+
+
+        // config for authentication and authorization.
+        proxyConfig.setTlsRequireTrustedClientCertOnConnect(true);
+        proxyConfig.setSuperUserRoles(Sets.newHashSet(CLIENT_KEYSTORE_CN));
+        proxyConfig.setAuthenticationEnabled(true);
+        proxyConfig.setAuthorizationEnabled(true);
+        Set<String> providers = new HashSet<>();
+        providers.add(AuthenticationProviderTls.class.getName());
+        proxyConfig.setAuthenticationProviders(providers);
+
+        proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService(
+                                                            PulsarConfigurationLoader.convertFrom(proxyConfig))));
+        doReturn(mockZooKeeperClientFactory).when(proxyService).getZooKeeperClientFactory();
+
+        proxyService.start();
+    }
+
+    @Override
+    @AfterMethod
+    protected void cleanup() throws Exception {
+        internalCleanup();
+
+        proxyService.close();
+    }
+
+    protected PulsarClient internalSetUpForClient(boolean addCertificates, String lookupUrl) throws Exception {
+        ClientBuilder clientBuilder = PulsarClient.builder()
+                .serviceUrl(lookupUrl)
+                .enableTls(true)
+                .useKeyStoreTls(true)
+                .tlsTrustStorePath(BROKER_TRUSTSTORE_FILE_PATH)
+                .tlsTrustStorePassword(BROKER_TRUSTSTORE_PW)
+                .allowTlsInsecureConnection(false)
+                .operationTimeout(1000, TimeUnit.MILLISECONDS);
+        if (addCertificates) {
+            Map<String, String> authParams = new HashMap<>();
+            authParams.put(AuthenticationKeyStoreTls.KEYSTORE_TYPE, KEYSTORE_TYPE);
+            authParams.put(AuthenticationKeyStoreTls.KEYSTORE_PATH, CLIENT_KEYSTORE_FILE_PATH);
+            authParams.put(AuthenticationKeyStoreTls.KEYSTORE_PW, CLIENT_KEYSTORE_PW);
+            clientBuilder.authentication(AuthenticationKeyStoreTls.class.getName(), authParams);
+        }
+        return clientBuilder.build();
+    }
+
+    @Test
+    public void testProducer() throws Exception {
+        @Cleanup
+        PulsarClient client = internalSetUpForClient(true, proxyService.getServiceUrlTls());
+        @Cleanup
+        Producer<byte[]> producer = client.newProducer(Schema.BYTES)
+                .topic("persistent://sample/test/local/topic" + System.currentTimeMillis())
+                .create();
+
+        for (int i = 0; i < 10; i++) {
+            producer.send("test".getBytes());
+        }
+    }
+
+    @Test
+    public void testProducerFailed() throws Exception {
+        @Cleanup
+        PulsarClient client = internalSetUpForClient(false, proxyService.getServiceUrlTls());
+        try {
+            @Cleanup
+            Producer<byte[]> producer = client.newProducer(Schema.BYTES)
+                    .topic("persistent://sample/test/local/topic" + System.currentTimeMillis())
+                    .create();
+            Assert.fail("Should failed since broker setTlsRequireTrustedClientCertOnConnect, "
+                        + "while client not set keystore");
+        } catch (Exception e) {
+            // expected
+            log.info("Expected failed since broker setTlsRequireTrustedClientCertOnConnect,"
+                     + " while client not set keystore");
+        }
+    }
+
+    @Test
+    public void testPartitions() throws Exception {
+        @Cleanup
+        PulsarClient client = internalSetUpForClient(true, proxyService.getServiceUrlTls());
+        String topicName = "persistent://sample/test/local/partitioned-topic" + System.currentTimeMillis();
+        TenantInfo tenantInfo = createDefaultTenantInfo();
+        admin.tenants().createTenant("sample", tenantInfo);
+        admin.topics().createPartitionedTopic(topicName, 2);
+
+        @Cleanup
+        Producer<byte[]> producer = client.newProducer(Schema.BYTES).topic(topicName)
+                .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
+
+        // Create a consumer directly attached to broker
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
+                .subscriptionName("my-sub").subscribe();
+
+        for (int i = 0; i < 10; i++) {
+            producer.send("test".getBytes());
+        }
+
+        for (int i = 0; i < 10; i++) {
+            Message<byte[]> msg = consumer.receive(1, TimeUnit.SECONDS);
+            checkNotNull(msg);
+        }
+    }
+
+
+}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithoutAuth.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithoutAuth.java
new file mode 100644
index 0000000..b920ef4
--- /dev/null
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithoutAuth.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.mockito.Mockito.doReturn;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class ProxyKeyStoreTlsTestWithoutAuth extends MockedPulsarServiceBaseTest {
+    protected final String BROKER_KEYSTORE_FILE_PATH =
+            "./src/test/resources/authentication/keystoretls/broker.keystore.jks";
+    protected final String BROKER_TRUSTSTORE_FILE_PATH =
+            "./src/test/resources/authentication/keystoretls/broker.truststore.jks";
+    protected final String BROKER_KEYSTORE_PW = "111111";
+    protected final String BROKER_TRUSTSTORE_PW = "111111";
+
+    protected final String CLIENT_KEYSTORE_FILE_PATH =
+            "./src/test/resources/authentication/keystoretls/client.keystore.jks";
+    protected final String CLIENT_TRUSTSTORE_FILE_PATH =
+            "./src/test/resources/authentication/keystoretls/client.truststore.jks";
+    protected final String CLIENT_KEYSTORE_PW = "111111";
+    protected final String CLIENT_TRUSTSTORE_PW = "111111";
+
+    protected final String KEYSTORE_TYPE = "JKS";
+
+    private final String DUMMY_VALUE = "DUMMY_VALUE";
+
+    private ProxyService proxyService;
+    private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+
+    @Override
+    @BeforeMethod
+    protected void setup() throws Exception {
+        internalSetup();
+
+        proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setServicePortTls(Optional.of(0));
+        proxyConfig.setWebServicePort(Optional.of(0));
+        proxyConfig.setWebServicePortTls(Optional.of(0));
+        proxyConfig.setTlsEnabledWithBroker(false);
+
+        proxyConfig.setTlsEnabledWithKeyStore(true);
+        proxyConfig.setTlsKeyStoreType(KEYSTORE_TYPE);
+        proxyConfig.setTlsKeyStore(BROKER_KEYSTORE_FILE_PATH);
+        proxyConfig.setTlsKeyStorePassword(BROKER_KEYSTORE_PW);
+        proxyConfig.setTlsTrustStoreType(KEYSTORE_TYPE);
+        proxyConfig.setTlsTrustStore(CLIENT_TRUSTSTORE_FILE_PATH);
+        proxyConfig.setTlsTrustStorePassword(CLIENT_TRUSTSTORE_PW);
+        proxyConfig.setTlsRequireTrustedClientCertOnConnect(true);
+
+        proxyConfig.setZookeeperServers(DUMMY_VALUE);
+        proxyConfig.setConfigurationStoreServers(DUMMY_VALUE);
+
+        proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService(
+                                                            PulsarConfigurationLoader.convertFrom(proxyConfig))));
+        doReturn(mockZooKeeperClientFactory).when(proxyService).getZooKeeperClientFactory();
+
+        proxyService.start();
+    }
+
+    protected PulsarClient internalSetUpForClient(boolean addCertificates, String lookupUrl) throws Exception {
+        ClientBuilder clientBuilder = PulsarClient.builder()
+                .serviceUrl(lookupUrl)
+                .enableTls(true)
+                .useKeyStoreTls(true)
+                .tlsTrustStorePath(BROKER_TRUSTSTORE_FILE_PATH)
+                .tlsTrustStorePassword(BROKER_TRUSTSTORE_PW)
+                .allowTlsInsecureConnection(false)
+                .operationTimeout(1000, TimeUnit.MILLISECONDS);
+        if (addCertificates) {
+            Map<String, String> authParams = new HashMap<>();
+            authParams.put(AuthenticationKeyStoreTls.KEYSTORE_TYPE, KEYSTORE_TYPE);
+            authParams.put(AuthenticationKeyStoreTls.KEYSTORE_PATH, CLIENT_KEYSTORE_FILE_PATH);
+            authParams.put(AuthenticationKeyStoreTls.KEYSTORE_PW, CLIENT_KEYSTORE_PW);
+            clientBuilder.authentication(AuthenticationKeyStoreTls.class.getName(), authParams);
+        }
+        return clientBuilder.build();
+    }
+
+    @Override
+    @AfterMethod
+    protected void cleanup() throws Exception {
+        internalCleanup();
+        proxyService.close();
+    }
+
+    @Test
+    public void testProducer() throws Exception {
+        @Cleanup
+        PulsarClient client = internalSetUpForClient(true, proxyService.getServiceUrlTls());
+        @Cleanup
+        Producer<byte[]> producer = client.newProducer(Schema.BYTES)
+                .topic("persistent://sample/test/local/topic" + System.currentTimeMillis())
+                .create();
+
+        for (int i = 0; i < 10; i++) {
+            producer.send("test".getBytes());
+        }
+    }
+
+    @Test
+    public void testProducerFailed() throws Exception {
+        @Cleanup
+        PulsarClient client = internalSetUpForClient(false, proxyService.getServiceUrlTls());
+        try {
+            @Cleanup
+            Producer<byte[]> producer = client.newProducer(Schema.BYTES)
+                    .topic("persistent://sample/test/local/topic" + System.currentTimeMillis())
+                    .create();
+            Assert.fail("Should failed since broker setTlsRequireTrustedClientCertOnConnect, "
+                        + "while client not set keystore");
+        } catch (Exception e) {
+            // expected
+            log.info("Expected failed since broker setTlsRequireTrustedClientCertOnConnect,"
+                     + " while client not set keystore");
+        }
+    }
+
+    @Test
+    public void testPartitions() throws Exception {
+        @Cleanup
+        PulsarClient client = internalSetUpForClient(true, proxyService.getServiceUrlTls());
+        String topicName = "persistent://sample/test/local/partitioned-topic" + System.currentTimeMillis();
+        TenantInfo tenantInfo = createDefaultTenantInfo();
+        admin.tenants().createTenant("sample", tenantInfo);
+        admin.topics().createPartitionedTopic(topicName, 2);
+
+        @Cleanup
+        Producer<byte[]> producer = client.newProducer(Schema.BYTES).topic(topicName)
+                .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
+
+        // Create a consumer directly attached to broker
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
+                .subscriptionName("my-sub").subscribe();
+
+        for (int i = 0; i < 10; i++) {
+            producer.send("test".getBytes());
+        }
+
+        for (int i = 0; i < 10; i++) {
+            Message<byte[]> msg = consumer.receive(1, TimeUnit.SECONDS);
+            checkNotNull(msg);
+        }
+    }
+
+}
diff --git a/pulsar-proxy/src/test/resources/authentication/keystoretls/broker.keystore.jks b/pulsar-proxy/src/test/resources/authentication/keystoretls/broker.keystore.jks
new file mode 100644
index 0000000..b4fec69
Binary files /dev/null and b/pulsar-proxy/src/test/resources/authentication/keystoretls/broker.keystore.jks differ
diff --git a/pulsar-proxy/src/test/resources/authentication/keystoretls/broker.truststore.jks b/pulsar-proxy/src/test/resources/authentication/keystoretls/broker.truststore.jks
new file mode 100644
index 0000000..8ac03d8
Binary files /dev/null and b/pulsar-proxy/src/test/resources/authentication/keystoretls/broker.truststore.jks differ
diff --git a/pulsar-proxy/src/test/resources/authentication/keystoretls/brokerKeyStorePW.txt b/pulsar-proxy/src/test/resources/authentication/keystoretls/brokerKeyStorePW.txt
new file mode 100644
index 0000000..90d2950
--- /dev/null
+++ b/pulsar-proxy/src/test/resources/authentication/keystoretls/brokerKeyStorePW.txt
@@ -0,0 +1 @@
+111111
diff --git a/pulsar-proxy/src/test/resources/authentication/keystoretls/brokerTrustStorePW.txt b/pulsar-proxy/src/test/resources/authentication/keystoretls/brokerTrustStorePW.txt
new file mode 100644
index 0000000..90d2950
--- /dev/null
+++ b/pulsar-proxy/src/test/resources/authentication/keystoretls/brokerTrustStorePW.txt
@@ -0,0 +1 @@
+111111
diff --git a/pulsar-proxy/src/test/resources/authentication/keystoretls/client.keystore.jks b/pulsar-proxy/src/test/resources/authentication/keystoretls/client.keystore.jks
new file mode 100644
index 0000000..499c8be
Binary files /dev/null and b/pulsar-proxy/src/test/resources/authentication/keystoretls/client.keystore.jks differ
diff --git a/pulsar-proxy/src/test/resources/authentication/keystoretls/client.truststore.jks b/pulsar-proxy/src/test/resources/authentication/keystoretls/client.truststore.jks
new file mode 100644
index 0000000..8eaa06b
Binary files /dev/null and b/pulsar-proxy/src/test/resources/authentication/keystoretls/client.truststore.jks differ
diff --git a/pulsar-proxy/src/test/resources/authentication/keystoretls/clientKeyStorePW.txt b/pulsar-proxy/src/test/resources/authentication/keystoretls/clientKeyStorePW.txt
new file mode 100644
index 0000000..90d2950
--- /dev/null
+++ b/pulsar-proxy/src/test/resources/authentication/keystoretls/clientKeyStorePW.txt
@@ -0,0 +1 @@
+111111
diff --git a/pulsar-proxy/src/test/resources/authentication/keystoretls/clientTrustStorePW.txt b/pulsar-proxy/src/test/resources/authentication/keystoretls/clientTrustStorePW.txt
new file mode 100644
index 0000000..90d2950
--- /dev/null
+++ b/pulsar-proxy/src/test/resources/authentication/keystoretls/clientTrustStorePW.txt
@@ -0,0 +1 @@
+111111
diff --git a/site2/docs/reference-configuration.md b/site2/docs/reference-configuration.md
index 2ce29d5..b1e893f 100644
--- a/site2/docs/reference-configuration.md
+++ b/site2/docs/reference-configuration.md
@@ -151,6 +151,18 @@ Pulsar brokers are responsible for handling incoming messages from producers, di
 |tlsAllowInsecureConnection|  Accept untrusted TLS certificate from client  |false|
 |tlsProtocols|Specify the tls protocols the broker will use to negotiate during TLS Handshake. Multiple values can be specified, separated by commas. Example:- ```TLSv1.2```, ```TLSv1.1```, ```TLSv1``` ||
 |tlsCiphers|Specify the tls cipher the broker will use to negotiate during TLS Handshake. Multiple values can be specified, separated by commas. Example:- ```TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256```||
+|tlsEnabledWithKeyStore| Enable TLS with KeyStore type configuration in broker |false|
+|tlsProvider| TLS Provider for KeyStore type ||
+|tlsKeyStoreType| LS KeyStore type configuration in broker: JKS, PKCS12 |JKS|
+|tlsKeyStore| TLS KeyStore path in broker ||
+|tlsKeyStorePassword| TLS KeyStore password for broker ||
+|brokerClientTlsEnabledWithKeyStore| Whether internal client use KeyStore type to authenticate with Pulsar brokers |false|
+|brokerClientSslProvider| The TLS Provider used by internal client to authenticate with other Pulsar brokers ||
+|brokerClientTlsTrustStoreType| TLS TrustStore type configuration for internal client: JKS, PKCS12, used by the internal client to authenticate with Pulsar brokers |JKS|
+|brokerClientTlsTrustStore| TLS TrustStore path for internal client, used by the internal client to authenticate with Pulsar brokers ||
+|brokerClientTlsTrustStorePassword| TLS TrustStore password for internal client, used by the internal client to authenticate with Pulsar brokers ||
+|brokerClientTlsCiphers| Specify the tls cipher the internal client will use to negotiate during TLS Handshake. (a comma-separated list of ciphers) e.g.  [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256]||
+|brokerClientTlsProtocols|Specify the tls protocols the broker will use to negotiate during TLS handshake. (a comma-separated list of protocol names). e.g.  [TLSv1.2, TLSv1.1, TLSv1] ||
 |ttlDurationDefaultInSeconds|  The default ttl for namespaces if ttl is not configured at namespace policies.  |0|
 |tokenSecretKey| Configure the secret key to be used to validate auth tokens. The key can be specified like: `tokenSecretKey=data:base64,xxxxxxxxx` or `tokenSecretKey=file:///my/secret.key`||
 |tokenPublicKey| Configure the public key to be used to validate auth tokens. The key can be specified like: `tokenPublicKey=data:base64,xxxxxxxxx` or `tokenPublicKey=file:///my/secret.key`||