You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/07/26 10:44:35 UTC

[pulsar] branch master updated: [PIP-158][improve][client] Split client TLS transport encryption from authentication (#15634)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f1f6aac566 [PIP-158][improve][client] Split client TLS transport encryption from authentication (#15634)
6f1f6aac566 is described below

commit 6f1f6aac56657098145446fce1e655fb246f19a2
Author: Zixuan Liu <no...@gmail.com>
AuthorDate: Tue Jul 26 18:44:25 2022 +0800

    [PIP-158][improve][client] Split client TLS transport encryption from authentication (#15634)
---
 .../api/AuthenticatedProducerConsumerTest.java     | 64 +++++++++++++++++++++-
 .../pulsar/client/api/TlsProducerConsumerTest.java | 45 ++++++++++++++-
 ...eyStoreTlsProducerConsumerTestWithAuthTest.java | 64 +++++++++++++++++++++-
 ...toreTlsProducerConsumerTestWithoutAuthTest.java | 50 ++++++++++++++++-
 .../apache/pulsar/client/api/ClientBuilder.java    | 40 ++++++++++++++
 .../pulsar/client/impl/ClientBuilderImpl.java      | 30 ++++++++++
 .../org/apache/pulsar/client/impl/HttpClient.java  | 16 ++++--
 .../client/impl/PulsarChannelInitializer.java      |  5 ++
 .../client/impl/conf/ClientConfigurationData.java  | 33 ++++++++++-
 .../NettySSLContextAutoRefreshBuilder.java         | 13 ++++-
 .../pulsar/common/util/netty/SslContextTest.java   |  1 +
 .../pulsar/proxy/server/DirectProxyHandler.java    |  3 +
 12 files changed, 349 insertions(+), 15 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
index 046b26846e2..4cb8e71c1ab 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
@@ -28,15 +28,23 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import javax.crypto.SecretKey;
 import javax.ws.rs.InternalServerErrorException;
+import io.jsonwebtoken.SignatureAlgorithm;
+import lombok.Cleanup;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderBasic;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.impl.auth.AuthenticationBasic;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AuthAction;
@@ -64,6 +72,10 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase {
 
     private final String BASIC_CONF_FILE_PATH = "./src/test/resources/authentication/basic/.htpasswd";
 
+    private final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+    private final String ADMIN_TOKEN = AuthTokenUtils.createToken(SECRET_KEY, "admin", Optional.empty());
+
+
     @BeforeMethod
     @Override
     protected void setup() throws Exception {
@@ -95,8 +107,15 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase {
 
         Set<String> providers = new HashSet<>();
         providers.add(AuthenticationProviderTls.class.getName());
-        providers.add(AuthenticationProviderBasic.class.getName());
+
         System.setProperty("pulsar.auth.basic.conf", BASIC_CONF_FILE_PATH);
+        providers.add(AuthenticationProviderBasic.class.getName());
+
+        Properties properties = new Properties();
+        properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(SECRET_KEY));
+        conf.setProperties(properties);
+        providers.add(AuthenticationProviderToken.class.getName());
+
         conf.setAuthenticationProviders(providers);
 
         conf.setClusterName("test");
@@ -403,4 +422,47 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase {
         admin.tenants().deleteTenant("p1");
         admin.clusters().deleteCluster("test");
     }
+
+    private final Authentication tlsAuth = new AuthenticationTls(TLS_CLIENT_CERT_FILE_PATH, TLS_CLIENT_KEY_FILE_PATH);
+    private final Authentication tokenAuth = new AuthenticationToken(ADMIN_TOKEN);
+
+    @DataProvider
+    public Object[][] tlsTransportWithAuth() {
+        Supplier<String> webServiceAddressTls = () -> pulsar.getWebServiceAddressTls();
+        Supplier<String> brokerServiceUrlTls = () -> pulsar.getBrokerServiceUrlTls();
+
+        return new Object[][]{
+                // Verify TLS transport encryption with TLS authentication
+                {webServiceAddressTls, tlsAuth},
+                {brokerServiceUrlTls, tlsAuth},
+                // Verify TLS transport encryption with token authentication
+                {webServiceAddressTls, tokenAuth},
+                {brokerServiceUrlTls, tokenAuth},
+        };
+    }
+
+    @Test(dataProvider = "tlsTransportWithAuth")
+    public void testTlsTransportWithAnyAuth(Supplier<String> url, Authentication auth) throws Exception {
+        final String topicName = "persistent://my-property/my-ns/my-topic-1";
+
+        internalSetup(new AuthenticationToken(ADMIN_TOKEN));
+        admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+        admin.tenants().createTenant("my-property",
+                new TenantInfoImpl(Sets.newHashSet(), Sets.newHashSet("test")));
+        admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test"));
+
+        @Cleanup
+        PulsarClient client = PulsarClient.builder().serviceUrl(url.get())
+                .tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH)
+                .tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH)
+                .tlsKeyFilePath(TLS_CLIENT_KEY_FILE_PATH)
+                .tlsCertificateFilePath(TLS_CLIENT_CERT_FILE_PATH)
+                .authentication(auth)
+                .allowTlsInsecureConnection(false)
+                .enableTlsHostnameVerification(false)
+                .build();
+
+        @Cleanup
+        Producer<byte[]> ignored = client.newProducer().topic(topicName).create();
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java
index 78aafbf1756..06666d55ec4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java
@@ -27,17 +27,16 @@ import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
-
+import lombok.Cleanup;
 import org.apache.commons.compress.utils.IOUtils;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
-import lombok.Cleanup;
-
 @Test(groups = "broker-api")
 public class TlsProducerConsumerTest extends TlsProducerConsumerBase {
     private static final Logger log = LoggerFactory.getLogger(TlsProducerConsumerTest.class);
@@ -252,4 +251,44 @@ public class TlsProducerConsumerTest extends TlsProducerConsumerBase {
     private ByteArrayInputStream getStream(AtomicInteger index, ByteArrayInputStream... streams) {
         return streams[index.intValue()];
     }
+
+    private final Authentication tlsAuth = new AuthenticationTls(TLS_CLIENT_CERT_FILE_PATH, TLS_CLIENT_KEY_FILE_PATH);
+
+    @DataProvider
+    public Object[] tlsTransport() {
+        Supplier<String> webServiceAddressTls = () -> pulsar.getWebServiceAddressTls();
+        Supplier<String> brokerServiceUrlTls = () -> pulsar.getBrokerServiceUrlTls();
+
+        return new Object[][]{
+                // Set TLS transport directly.
+                {webServiceAddressTls, null},
+                {brokerServiceUrlTls, null},
+                // Using TLS authentication data to set up TLS transport.
+                {webServiceAddressTls, tlsAuth},
+                {brokerServiceUrlTls, tlsAuth},
+        };
+    }
+
+    @Test(dataProvider = "tlsTransport")
+    public void testTlsTransport(Supplier<String> url, Authentication auth) throws Exception {
+        final String topicName = "persistent://my-property/my-ns/my-topic-1";
+
+        internalSetUpForNamespace();
+
+        ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(url.get())
+                .tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH)
+                .allowTlsInsecureConnection(false)
+                .enableTlsHostnameVerification(false)
+                .authentication(auth);
+
+        if (auth == null) {
+            clientBuilder.tlsKeyFilePath(TLS_CLIENT_KEY_FILE_PATH).tlsCertificateFilePath(TLS_CLIENT_CERT_FILE_PATH);
+        }
+
+        @Cleanup
+        PulsarClient client = clientBuilder.build();
+
+        @Cleanup
+        Producer<byte[]> ignored = client.newProducer().topic(topicName).create();
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithAuthTest.java
index 18041d1a928..3b658065150 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithAuthTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithAuthTest.java
@@ -19,18 +19,25 @@
 package org.apache.pulsar.client.impl;
 
 import static org.mockito.Mockito.spy;
-
 import com.google.common.collect.Sets;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import io.jsonwebtoken.SignatureAlgorithm;
+import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -39,17 +46,23 @@ import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
+import javax.crypto.SecretKey;
+
 // TLS authentication and authorization based on KeyStore type config.
 @Slf4j
 @Test(groups = "broker-impl")
 public class KeyStoreTlsProducerConsumerTestWithAuthTest extends ProducerConsumerBase {
+    private final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+    private final String CLIENTUSER_TOKEN = AuthTokenUtils.createToken(SECRET_KEY, "clientuser", Optional.empty());
 
     private final String clusterName = "use";
 
@@ -92,6 +105,13 @@ public class KeyStoreTlsProducerConsumerTestWithAuthTest extends ProducerConsume
         conf.setAuthorizationEnabled(true);
         Set<String> providers = new HashSet<>();
         providers.add(AuthenticationProviderTls.class.getName());
+
+        Properties properties = new Properties();
+        properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(SECRET_KEY));
+        conf.setProperties(properties);
+
+        providers.add(AuthenticationProviderToken.class.getName());
+
         conf.setAuthenticationProviders(providers);
         conf.setNumExecutorThreadPoolSize(5);
     }
@@ -255,4 +275,46 @@ public class KeyStoreTlsProducerConsumerTestWithAuthTest extends ProducerConsume
                 .subscribe();
     }
 
+    private final Authentication tlsAuth =
+            new AuthenticationKeyStoreTls(KEYSTORE_TYPE, CLIENT_KEYSTORE_FILE_PATH, CLIENT_KEYSTORE_PW);
+    private final Authentication tokenAuth = new AuthenticationToken(CLIENTUSER_TOKEN);
+
+    @DataProvider
+    public Object[][] keyStoreTlsTransportWithAuth() {
+        Supplier<String> webServiceAddressTls = () -> pulsar.getWebServiceAddressTls();
+        Supplier<String> brokerServiceUrlTls = () -> pulsar.getBrokerServiceUrlTls();
+
+        return new Object[][]{
+                // Verify JKS TLS transport encryption with TLS authentication
+                {webServiceAddressTls, tlsAuth},
+                {brokerServiceUrlTls, tlsAuth},
+                // Verify JKS TLS transport encryption with token authentication
+                {webServiceAddressTls, tokenAuth},
+                {brokerServiceUrlTls, tokenAuth},
+        };
+    }
+
+    @Test(dataProvider = "keyStoreTlsTransportWithAuth")
+    public void testKeyStoreTlsTransportWithAuth(Supplier<String> url, Authentication auth) throws Exception {
+        final String topicName = "persistent://my-property/my-ns/my-topic-1";
+
+        internalSetUpForNamespace();
+
+        @Cleanup
+        PulsarClient client = PulsarClient.builder().serviceUrl(url.get())
+                .useKeyStoreTls(true)
+                .tlsTrustStoreType(KEYSTORE_TYPE)
+                .tlsTrustStorePath(BROKER_TRUSTSTORE_FILE_PATH)
+                .tlsTrustStorePassword(BROKER_TRUSTSTORE_PW)
+                .tlsKeyStoreType(KEYSTORE_TYPE)
+                .tlsKeyStorePath(CLIENT_KEYSTORE_FILE_PATH)
+                .tlsKeyStorePassword(CLIENT_KEYSTORE_PW)
+                .authentication(auth)
+                .allowTlsInsecureConnection(false)
+                .enableTlsHostnameVerification(false)
+                .build();
+
+        @Cleanup
+        Producer<byte[]> ignored = client.newProducer().topic(topicName).create();
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithoutAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithoutAuthTest.java
index d0bdf54dc63..a4b6ef599b5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithoutAuthTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithoutAuthTest.java
@@ -19,7 +19,6 @@
 package org.apache.pulsar.client.impl;
 
 import static org.mockito.Mockito.spy;
-
 import com.google.common.collect.Sets;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -27,8 +26,11 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -42,6 +44,7 @@ import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 // TLS test without authentication and authorization based on KeyStore type config.
@@ -241,4 +244,49 @@ public class KeyStoreTlsProducerConsumerTestWithoutAuthTest extends ProducerCons
         }
     }
 
+    private final Authentication tlsAuth =
+            new AuthenticationKeyStoreTls(KEYSTORE_TYPE, CLIENT_KEYSTORE_FILE_PATH, CLIENT_KEYSTORE_PW);
+
+    @DataProvider
+    public Object[][] keyStoreTlsTransport() {
+        Supplier<String> webServiceAddressTls = () -> pulsar.getWebServiceAddressTls();
+        Supplier<String> brokerServiceUrlTls = () -> pulsar.getBrokerServiceUrlTls();
+
+        return new Object[][]{
+                // Set TLS transport directly.
+                {webServiceAddressTls, null},
+                {brokerServiceUrlTls, null},
+                // Using TLS authentication data to set up TLS transport.
+                {webServiceAddressTls, tlsAuth},
+                {brokerServiceUrlTls, tlsAuth},
+        };
+    }
+
+    @Test(dataProvider = "keyStoreTlsTransport")
+    public void testKeyStoreTlsTransport(Supplier<String> url, Authentication auth) throws Exception {
+        final String topicName = "persistent://my-property/my-ns/my-topic-1";
+
+        internalSetUpForNamespace();
+
+        ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(url.get())
+                .useKeyStoreTls(true)
+                .tlsTrustStoreType(KEYSTORE_TYPE)
+                .tlsTrustStorePath(BROKER_TRUSTSTORE_FILE_PATH)
+                .tlsTrustStorePassword(BROKER_TRUSTSTORE_PW)
+                .allowTlsInsecureConnection(false)
+                .enableTlsHostnameVerification(false)
+                .authentication(auth);
+
+        if (auth == null) {
+            clientBuilder.tlsKeyStoreType(KEYSTORE_TYPE)
+                    .tlsKeyStorePath(CLIENT_KEYSTORE_FILE_PATH)
+                    .tlsKeyStorePassword(CLIENT_KEYSTORE_PW);
+        }
+
+        @Cleanup
+        PulsarClient client = clientBuilder.build();
+
+        @Cleanup
+        Producer<byte[]> ignored = client.newProducer().topic(topicName).create();
+    }
 }
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
index 49d79ecdaf6..037b1966ba5 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
@@ -294,6 +294,22 @@ public interface ClientBuilder extends Serializable, Cloneable {
     @Deprecated
     ClientBuilder enableTls(boolean enableTls);
 
+    /**
+     * Set the path to the TLS key file.
+     *
+     * @param tlsKeyFilePath
+     * @return the client builder instance
+     */
+    ClientBuilder tlsKeyFilePath(String tlsKeyFilePath);
+
+    /**
+     * Set the path to the TLS certificate file.
+     *
+     * @param tlsCertificateFilePath
+     * @return the client builder instance
+     */
+    ClientBuilder tlsCertificateFilePath(String tlsCertificateFilePath);
+
     /**
      * Set the path to the trusted TLS certificate file.
      *
@@ -340,6 +356,30 @@ public interface ClientBuilder extends Serializable, Cloneable {
      */
     ClientBuilder sslProvider(String sslProvider);
 
+    /**
+     * The file format of the key store file.
+     *
+     * @param tlsKeyStoreType
+     * @return the client builder instance
+     */
+    ClientBuilder tlsKeyStoreType(String tlsKeyStoreType);
+
+    /**
+     * The location of the key store file.
+     *
+     * @param tlsTrustStorePath
+     * @return the client builder instance
+     */
+    ClientBuilder tlsKeyStorePath(String tlsTrustStorePath);
+
+    /**
+     * The store password for the key store file.
+     *
+     * @param tlsKeyStorePassword
+     * @return the client builder instance
+     */
+    ClientBuilder tlsKeyStorePassword(String tlsKeyStorePassword);
+
     /**
      * The file format of the trust store file.
      *
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
index 46befaf6eb9..93fe3bf55ed 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
@@ -174,6 +174,18 @@ public class ClientBuilderImpl implements ClientBuilder {
         return this;
     }
 
+    @Override
+    public ClientBuilder tlsKeyFilePath(String tlsKeyFilePath) {
+        conf.setTlsKeyFilePath(tlsKeyFilePath);
+        return this;
+    }
+
+    @Override
+    public ClientBuilder tlsCertificateFilePath(String tlsCertificateFilePath) {
+        conf.setTlsCertificateFilePath(tlsCertificateFilePath);
+        return this;
+    }
+
     @Override
     public ClientBuilder enableTlsHostnameVerification(boolean enableTlsHostnameVerification) {
         conf.setTlsHostnameVerificationEnable(enableTlsHostnameVerification);
@@ -204,6 +216,24 @@ public class ClientBuilderImpl implements ClientBuilder {
         return this;
     }
 
+    @Override
+    public ClientBuilder tlsKeyStoreType(String tlsKeyStoreType) {
+        conf.setTlsKeyStoreType(tlsKeyStoreType);
+        return this;
+    }
+
+    @Override
+    public ClientBuilder tlsKeyStorePath(String tlsTrustStorePath) {
+        conf.setTlsKeyStorePath(tlsTrustStorePath);
+        return this;
+    }
+
+    @Override
+    public ClientBuilder tlsKeyStorePassword(String tlsKeyStorePassword) {
+        conf.setTlsKeyStorePassword(tlsKeyStorePassword);
+        return this;
+    }
+
     @Override
     public ClientBuilder tlsTrustStoreType(String tlsTrustStoreType) {
         conf.setTlsTrustStoreType(tlsTrustStoreType);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
index 61d0c2bd64e..5d22a9bef1f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
@@ -95,13 +95,15 @@ public class HttpClient implements Closeable {
 
                 if (conf.isUseKeyStoreTls()) {
                     SSLContext sslCtx = null;
-                    KeyStoreParams params = authData.hasDataForTls() ? authData.getTlsKeyStoreParams() : null;
+                    KeyStoreParams params = authData.hasDataForTls() ? authData.getTlsKeyStoreParams() :
+                            new KeyStoreParams(conf.getTlsKeyStoreType(), conf.getTlsKeyStorePath(),
+                                    conf.getTlsKeyStorePassword());
 
                     sslCtx = KeyStoreSSLContext.createClientSslContext(
                             conf.getSslProvider(),
-                            params != null ? params.getKeyStoreType() : null,
-                            params != null ? params.getKeyStorePath() : null,
-                            params != null ? params.getKeyStorePassword() : null,
+                            params.getKeyStoreType(),
+                            params.getKeyStorePath(),
+                            params.getKeyStorePassword(),
                             conf.isTlsAllowInsecureConnection(),
                             conf.getTlsTrustStoreType(),
                             conf.getTlsTrustStorePath(),
@@ -131,7 +133,11 @@ public class HttpClient implements Closeable {
                         sslCtx = SecurityUtility.createNettySslContextForClient(
                                 sslProvider,
                                 conf.isTlsAllowInsecureConnection(),
-                                conf.getTlsTrustCertsFilePath(), conf.getTlsCiphers(), conf.getTlsProtocols());
+                                conf.getTlsTrustCertsFilePath(),
+                                conf.getTlsCertificateFilePath(),
+                                conf.getTlsKeyFilePath(),
+                                conf.getTlsCiphers(),
+                                conf.getTlsProtocols());
                     }
                     confBuilder.setSslContext(sslCtx);
                 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
index ffe1daab2d5..b046d1030ed 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
@@ -88,6 +88,9 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel>
                             conf.getTlsTrustStoreType(),
                             conf.getTlsTrustStorePath(),
                             conf.getTlsTrustStorePassword(),
+                            conf.getTlsKeyStoreType(),
+                            conf.getTlsKeyStorePath(),
+                            conf.getTlsKeyStorePassword(),
                             conf.getTlsCiphers(),
                             conf.getTlsProtocols(),
                             TLS_CERTIFICATE_CACHE_MILLIS,
@@ -124,6 +127,8 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel>
                                 sslProvider,
                                 conf.isTlsAllowInsecureConnection(),
                                 conf.getTlsTrustCertsFilePath(),
+                                conf.getTlsCertificateFilePath(),
+                                conf.getTlsKeyFilePath(),
                                 conf.getTlsCiphers(),
                                 conf.getTlsProtocols());
                     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
index 5214722f77f..2d34f81fe78 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -136,11 +136,23 @@ public class ClientConfigurationData implements Serializable, Cloneable {
     )
     private boolean useTls = false;
 
+    @ApiModelProperty(
+            name = "tlsKeyFilePath",
+            value = "Path to the TLS key file."
+    )
+    private String tlsKeyFilePath = null;
+
+    @ApiModelProperty(
+            name = "tlsCertificateFilePath",
+            value = "Path to the TLS certificate file."
+    )
+    private String tlsCertificateFilePath = null;
+
     @ApiModelProperty(
             name = "tlsTrustCertsFilePath",
             value = "Path to the trusted TLS certificate file."
     )
-    private String tlsTrustCertsFilePath = "";
+    private String tlsTrustCertsFilePath = null;
 
     @ApiModelProperty(
             name = "tlsAllowInsecureConnection",
@@ -236,6 +248,25 @@ public class ClientConfigurationData implements Serializable, Cloneable {
     )
     private String sslProvider = null;
 
+    @ApiModelProperty(
+            name = "tlsKeyStoreType",
+            value = "TLS KeyStore type configuration."
+    )
+    private String tlsKeyStoreType = "JKS";
+
+    @ApiModelProperty(
+            name = "tlsKeyStorePath",
+            value = "Path of TLS KeyStore."
+    )
+    private String tlsKeyStorePath = null;
+
+    @ApiModelProperty(
+            name = "tlsKeyStorePassword",
+            value = "Password of TLS KeyStore."
+    )
+    @Secret
+    private String tlsKeyStorePassword = null;
+
     @ApiModelProperty(
             name = "tlsTrustStoreType",
             value = "TLS TrustStore type configuration. You need to set this configuration when client authentication"
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/NettySSLContextAutoRefreshBuilder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/NettySSLContextAutoRefreshBuilder.java
index 363fe1e75c4..68271963af5 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/NettySSLContextAutoRefreshBuilder.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/NettySSLContextAutoRefreshBuilder.java
@@ -89,6 +89,9 @@ public class NettySSLContextAutoRefreshBuilder extends SslContextAutoRefreshBuil
                                              String trustStoreTypeString,
                                              String trustStore,
                                              String trustStorePassword,
+                                             String keyStoreTypeString,
+                                             String keyStore,
+                                             String keyStorePassword,
                                              Set<String> ciphers,
                                              Set<String> protocols,
                                              long certRefreshInSec,
@@ -100,6 +103,10 @@ public class NettySSLContextAutoRefreshBuilder extends SslContextAutoRefreshBuil
 
         this.authData = authData;
 
+        this.tlsKeyStoreType = keyStoreTypeString;
+        this.tlsKeyStore = new FileModifiedTimeUpdater(keyStore);
+        this.tlsKeyStorePassword = keyStorePassword;
+
         this.tlsTrustStoreType = trustStoreTypeString;
         this.tlsTrustStore = new FileModifiedTimeUpdater(trustStore);
         this.tlsTrustStorePassword = trustStorePassword;
@@ -121,9 +128,9 @@ public class NettySSLContextAutoRefreshBuilder extends SslContextAutoRefreshBuil
         } else {
             KeyStoreParams authParams = authData.getTlsKeyStoreParams();
             this.keyStoreSSLContext = KeyStoreSSLContext.createClientKeyStoreSslContext(tlsProvider,
-                    authParams != null ? authParams.getKeyStoreType() : null,
-                    authParams != null ? authParams.getKeyStorePath() : null,
-                    authParams != null ? authParams.getKeyStorePassword() : null,
+                    authParams != null ? authParams.getKeyStoreType() : tlsKeyStoreType,
+                    authParams != null ? authParams.getKeyStorePath() : tlsKeyStore.getFileName(),
+                    authParams != null ? authParams.getKeyStorePassword() : tlsKeyStorePassword,
                     tlsAllowInsecureConnection,
                     tlsTrustStoreType, tlsTrustStore.getFileName(), tlsTrustStorePassword,
                     tlsCiphers, tlsProtocols);
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/SslContextTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/SslContextTest.java
index fb035d66a76..e66bbbc17a2 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/SslContextTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/SslContextTest.java
@@ -107,6 +107,7 @@ public class SslContextTest {
                 null,
                 false,
                 keyStoreType, brokerTrustStorePath, keyStorePassword,
+                null, null, null,
                 cipher, null, 0, new ClientAuthenticationData());
         contextAutoRefreshBuilder.update();
     }
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index 6494e47484c..2f067282115 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -128,6 +128,9 @@ public class DirectProxyHandler {
                         config.getBrokerClientTlsTrustStoreType(),
                         config.getBrokerClientTlsTrustStore(),
                         config.getBrokerClientTlsTrustStorePassword(),
+                        null,
+                        null,
+                        null,
                         config.getBrokerClientTlsCiphers(),
                         config.getBrokerClientTlsProtocols(),
                         config.getTlsCertRefreshCheckDurationSec(),