You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2020/12/23 20:21:41 UTC

[pulsar] branch master updated: [pulsar-broker] Perform auto cert refresh for Pulsar-admin (#8831)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 98bf97e  [pulsar-broker] Perform auto cert refresh for Pulsar-admin (#8831)
98bf97e is described below

commit 98bf97e0765db30022597dd468afcc5227b417e8
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Wed Dec 23 12:21:16 2020 -0800

    [pulsar-broker] Perform auto cert refresh for Pulsar-admin (#8831)
    
    ### Motivation
    
    We are frequently getting 500 on `pulsar-admin topics list <ns>` cli command. It happens because  `pulsar-admin topics` rest-api internally uses `pulsar-admin` to get list of non-persistent topics. `PulsarAdmin-HttpClient` crates persistent connection but it doesn't perform auto-cert refresh so, if cert is expired and reconnection happens then broker always gets 500 when it uses `pulsar-admin` internally due to invalid certs.
    ```
    21:09:16.025 [AsyncHttpClient-48-9] ERROR org.apache.pulsar.broker.admin.v1.NonPersistentTopics - [role] Failed to get list of topics under namespace prop/cluster/ns
    java.util.concurrent.ExecutionException: org.apache.pulsar.client.admin.PulsarAdminException: java.net.ConnectException: error:10000416:SSL routines:OPENSSL_internal:SSLV3_ALERT_CERTIFICATE_UNKNOWN
            at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) ~[?:?]
            at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) ~[?:?]
            at org.apache.pulsar.broker.admin.v1.NonPersistentTopics.lambda$getList$0(NonPersistentTopics.java:211) ~[pulsar-broker.jar:]
            at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930) ~[?:?]
            at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907) ~[?:?]
            at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
            at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
            at org.apache.pulsar.client.admin.internal.NonPersistentTopicsImpl$4.failed(NonPersistentTopicsImpl.java:215) ~[pulsar-client-admin-original.jar:]
            at org.glassfish.jersey.client.JerseyInvocation$4.failed(JerseyInvocation.java:1030) ~[jersey-client-2.27.jar:?]
            at org.glassfish.jersey.client.ClientRuntime.processFailure(ClientRuntime.java:231) ~[jersey-client-2.27.jar:?]
            at org.glassfish.jersey.client.ClientRuntime.access$100(ClientRuntime.java:85) ~[jersey-client-2.27.jar:?]
            at org.glassfish.jersey.client.ClientRuntime$2.lambda$failure$1(ClientRuntime.java:183) ~[jersey-client-2.27.jar:?]
            at org.glassfish.jersey.internal.Errors$1.call(Errors.java:272) [jersey-common-2.27.jar:?]
            at org.glassfish.jersey.internal.Errors$1.call(Errors.java:268) [jersey-common-2.27.jar:?]
            at org.glassfish.jersey.internal.Errors.process(Errors.java:316) [jersey-common-2.27.jar:?]
            at org.glassfish.jersey.internal.Errors.process(Errors.java:298) [jersey-common-2.27.jar:?]
            at org.glassfish.jersey.internal.Errors.process(Errors.java:268) [jersey-common-2.27.jar:?]
    ```
    
    ### Modification
    Add Capability in HttpClient to perform auto-cert refresh to avoid any tls handshake failure.
---
 .../pulsar/broker/admin/AdminApiTlsAuthTest.java   |  60 ++++++++
 .../apache/pulsar/client/admin/PulsarAdmin.java    |  14 +-
 .../pulsar/client/admin/PulsarAdminBuilder.java    |   7 +
 .../admin/internal/PulsarAdminBuilderImpl.java     |  15 +-
 .../admin/internal/http/AsyncHttpConnector.java    |  12 +-
 .../internal/http/AsyncHttpConnectorProvider.java  |  12 +-
 .../client/api/AuthenticationDataProvider.java     |  15 ++
 .../client/impl/auth/AuthenticationDataTls.java    |  10 ++
 .../common/util/FileModifiedTimeUpdater.java       |   3 +
 .../apache/pulsar/common/util/KeyManagerProxy.java | 152 +++++++++++++++++++++
 .../apache/pulsar/common/util/SecurityUtility.java |  34 +++++
 .../pulsar/common/util/TrustManagerProxy.java      | 129 +++++++++++++++++
 12 files changed, 446 insertions(+), 17 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTlsAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTlsAuthTest.java
index b623725..1cc110f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTlsAuthTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTlsAuthTest.java
@@ -20,9 +20,15 @@ package org.apache.pulsar.broker.admin;
 
 import com.google.common.collect.ImmutableSet;
 
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
 import java.security.cert.X509Certificate;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.TimeUnit;
 
 import javax.net.ssl.SSLContext;
 import javax.ws.rs.NotAuthorizedException;
@@ -34,6 +40,7 @@ import javax.ws.rs.core.MediaType;
 
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.commons.lang.mutable.MutableBoolean;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -371,4 +378,57 @@ public class AdminApiTlsAuthTest extends MockedPulsarServiceBaseTest {
             admin.namespaces().deleteNamespace("tenant1/ns1");
         }
     }
+
+    /**
+     * Validates Pulsar-admin performs auto cert refresh.
+     * @throws Exception
+     */
+    @Test
+    public void testCertRefreshForPulsarAdmin() throws Exception {
+        String adminUser = "admin";
+        String user2 = "user1";
+        File keyFile = new File(getTLSFile("temp" + ".key-pk8"));
+        Path keyFilePath = Paths.get(keyFile.getAbsolutePath());
+        int autoCertRefreshTimeSec = 1;
+        try {
+            Files.copy(Paths.get(getTLSFile(user2 + ".key-pk8")), keyFilePath, StandardCopyOption.REPLACE_EXISTING);
+            PulsarAdmin admin = PulsarAdmin.builder()
+                    .allowTlsInsecureConnection(false)
+                    .enableTlsHostnameVerification(false)
+                    .serviceHttpUrl(brokerUrlTls.toString())
+                    .autoCertRefreshTime(1, TimeUnit.SECONDS)
+                    .authentication("org.apache.pulsar.client.impl.auth.AuthenticationTls",
+                                    String.format("tlsCertFile:%s,tlsKeyFile:%s",
+                                                  getTLSFile(adminUser + ".cert"), keyFile))
+                    .tlsTrustCertsFilePath(getTLSFile("ca.cert")).build();
+            // try to call admin-api which should fail due to incorrect key-cert
+            try {
+                admin.tenants().createTenant("tenantX",
+                        new TenantInfo(ImmutableSet.of("foobar"), ImmutableSet.of("test")));
+                Assert.fail("should have failed due to invalid key file");
+            } catch (Exception e) {
+                //OK
+            }
+            // replace correct key file
+            Files.delete(keyFile.toPath());
+            Thread.sleep(2 * autoCertRefreshTimeSec * 1000);
+            Files.copy(Paths.get(getTLSFile(adminUser + ".key-pk8")), keyFilePath);
+            MutableBoolean success = new MutableBoolean(false);
+            retryStrategically((test) -> {
+                try {
+                    admin.tenants().createTenant("tenantX",
+                            new TenantInfo(ImmutableSet.of("foobar"), ImmutableSet.of("test")));
+                    success.setValue(true);
+                    return true;
+                }catch(Exception e) {
+                    return false;
+                }
+            }, 5, 1000);
+            Assert.assertTrue(success.booleanValue());
+            Assert.assertEquals(ImmutableSet.of("tenantX"), admin.tenants().getTenants());
+            admin.close();
+        }finally {
+            Files.delete(keyFile.toPath());
+        }
+    }
 }
\ No newline at end of file
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
index f202397..7695e32 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
@@ -73,6 +73,7 @@ public class PulsarAdmin implements Closeable {
     public static final int DEFAULT_CONNECT_TIMEOUT_SECONDS = 60;
     public static final int DEFAULT_READ_TIMEOUT_SECONDS = 60;
     public static final int DEFAULT_REQUEST_TIMEOUT_SECONDS = 300;
+    public static final int DEFAULT_CERT_REFRESH_SECONDS = 300;
 
     private final Clusters clusters;
     private final Brokers brokers;
@@ -133,9 +134,8 @@ public class PulsarAdmin implements Closeable {
 
     public PulsarAdmin(String serviceUrl, ClientConfigurationData clientConfigData) throws PulsarClientException {
         this(serviceUrl, clientConfigData, DEFAULT_CONNECT_TIMEOUT_SECONDS, TimeUnit.SECONDS,
-                DEFAULT_READ_TIMEOUT_SECONDS, TimeUnit.SECONDS,
-                DEFAULT_REQUEST_TIMEOUT_SECONDS, TimeUnit.SECONDS, null);
-
+                DEFAULT_READ_TIMEOUT_SECONDS, TimeUnit.SECONDS, DEFAULT_REQUEST_TIMEOUT_SECONDS, TimeUnit.SECONDS,
+                DEFAULT_CERT_REFRESH_SECONDS, TimeUnit.SECONDS, null);
     }
 
     public PulsarAdmin(String serviceUrl,
@@ -146,6 +146,8 @@ public class PulsarAdmin implements Closeable {
                        TimeUnit readTimeoutUnit,
                        int requestTimeout,
                        TimeUnit requestTimeoutUnit,
+                       int autoCertRefreshTime,
+                       TimeUnit autoCertRefreshTimeUnit,
                        ClassLoader clientBuilderClassLoader) throws PulsarClientException {
         this.connectTimeout = connectTimeout;
         this.connectTimeoutUnit = connectTimeoutUnit;
@@ -166,7 +168,8 @@ public class PulsarAdmin implements Closeable {
             clientConfigData.setServiceUrl(serviceUrl);
         }
 
-        AsyncHttpConnectorProvider asyncConnectorProvider = new AsyncHttpConnectorProvider(clientConfigData);
+        AsyncHttpConnectorProvider asyncConnectorProvider = new AsyncHttpConnectorProvider(clientConfigData,
+                (int) autoCertRefreshTimeUnit.toSeconds(autoCertRefreshTime));
 
         ClientConfig httpConfig = new ClientConfig();
         httpConfig.property(ClientProperties.FOLLOW_REDIRECTS, true);
@@ -200,7 +203,8 @@ public class PulsarAdmin implements Closeable {
         this.asyncHttpConnector = asyncConnectorProvider.getConnector(
                 Math.toIntExact(connectTimeoutUnit.toMillis(this.connectTimeout)),
                 Math.toIntExact(readTimeoutUnit.toMillis(this.readTimeout)),
-                Math.toIntExact(requestTimeoutUnit.toMillis(this.requestTimeout)));
+                Math.toIntExact(requestTimeoutUnit.toMillis(this.requestTimeout)),
+                (int) autoCertRefreshTimeUnit.toSeconds(autoCertRefreshTime));
 
         long readTimeoutMs = readTimeoutUnit.toMillis(this.readTimeout);
         this.clusters = new ClustersImpl(root, auth, readTimeoutMs);
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java
index fc647e0..fda3694 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java
@@ -252,6 +252,13 @@ public interface PulsarAdminBuilder {
     PulsarAdminBuilder requestTimeout(int requestTimeout, TimeUnit requestTimeoutUnit);
 
     /**
+     * This sets auto cert refresh time if Pulsar admin uses tls authentication.
+     *
+     * @param autoCertRefreshTime
+     * @param autoCertRefreshTimeUnit
+     */
+    PulsarAdminBuilder autoCertRefreshTime(int autoCertRefreshTime, TimeUnit autoCertRefreshTimeUnit);
+    /**
      *
      * @return
      */
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
index b942cdb..75985c5 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
@@ -35,16 +35,18 @@ public class PulsarAdminBuilderImpl implements PulsarAdminBuilder {
     private int connectTimeout = PulsarAdmin.DEFAULT_CONNECT_TIMEOUT_SECONDS;
     private int readTimeout = PulsarAdmin.DEFAULT_READ_TIMEOUT_SECONDS;
     private int requestTimeout = PulsarAdmin.DEFAULT_REQUEST_TIMEOUT_SECONDS;
+    private int autoCertRefreshTime = PulsarAdmin.DEFAULT_CERT_REFRESH_SECONDS;
     private TimeUnit connectTimeoutUnit = TimeUnit.SECONDS;
     private TimeUnit readTimeoutUnit = TimeUnit.SECONDS;
     private TimeUnit requestTimeoutUnit = TimeUnit.SECONDS;
+    private TimeUnit autoCertRefreshTimeUnit = TimeUnit.SECONDS;
     private ClassLoader clientBuilderClassLoader = null;
 
     @Override
     public PulsarAdmin build() throws PulsarClientException {
-        return new PulsarAdmin(conf.getServiceUrl(),
-                conf, connectTimeout, connectTimeoutUnit, readTimeout, readTimeoutUnit,
-                requestTimeout, requestTimeoutUnit, clientBuilderClassLoader);
+        return new PulsarAdmin(conf.getServiceUrl(), conf, connectTimeout, connectTimeoutUnit, readTimeout,
+                readTimeoutUnit, requestTimeout, requestTimeoutUnit, autoCertRefreshTime,
+                autoCertRefreshTimeUnit, clientBuilderClassLoader);
     }
 
     public PulsarAdminBuilderImpl() {
@@ -168,6 +170,13 @@ public class PulsarAdminBuilderImpl implements PulsarAdminBuilder {
     }
 
     @Override
+    public PulsarAdminBuilder autoCertRefreshTime(int autoCertRefreshTime, TimeUnit autoCertRefreshTimeUnit) {
+        this.autoCertRefreshTime = autoCertRefreshTime;
+        this.autoCertRefreshTimeUnit = autoCertRefreshTimeUnit;
+        return this;
+    }
+
+    @Override
     public PulsarAdminBuilder setContextClassLoader(ClassLoader clientBuilderClassLoader) {
         this.clientBuilderClassLoader = clientBuilderClassLoader;
         return this;
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
index 1ff6e19..bc15f24 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
@@ -80,16 +80,18 @@ public class AsyncHttpConnector implements Connector {
     private final ScheduledExecutorService delayer = Executors.newScheduledThreadPool(1,
             new DefaultThreadFactory("delayer"));
 
-    public AsyncHttpConnector(Client client, ClientConfigurationData conf) {
+    public AsyncHttpConnector(Client client, ClientConfigurationData conf, int autoCertRefreshTimeSeconds) {
         this((int) client.getConfiguration().getProperty(ClientProperties.CONNECT_TIMEOUT),
                 (int) client.getConfiguration().getProperty(ClientProperties.READ_TIMEOUT),
                 PulsarAdmin.DEFAULT_REQUEST_TIMEOUT_SECONDS * 1000,
+                autoCertRefreshTimeSeconds,
                 conf);
     }
 
     @SneakyThrows
     public AsyncHttpConnector(int connectTimeoutMs, int readTimeoutMs,
-                              int requestTimeoutMs, ClientConfigurationData conf) {
+                              int requestTimeoutMs,
+                              int autoCertRefreshTimeSeconds, ClientConfigurationData conf) {
         DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder();
         confBuilder.setFollowRedirect(true);
         confBuilder.setRequestTimeout(conf.getRequestTimeoutMs());
@@ -136,10 +138,10 @@ public class AsyncHttpConnector implements Connector {
                     SslContext sslCtx = null;
                     if (authData.hasDataForTls()) {
                         sslCtx = authData.getTlsTrustStoreStream() == null
-                                ? SecurityUtility.createNettySslContextForClient(
+                                ? SecurityUtility.createAutoRefreshSslContextForClient(
                                         conf.isTlsAllowInsecureConnection() || !conf.isTlsHostnameVerificationEnable(),
-                                        conf.getTlsTrustCertsFilePath(), authData.getTlsCertificates(),
-                                        authData.getTlsPrivateKey())
+                                        conf.getTlsTrustCertsFilePath(), authData.getTlsCerificateFilePath(),
+                                        authData.getTlsPrivateKeyFilePath(), null, autoCertRefreshTimeSeconds, delayer)
                                 : SecurityUtility.createNettySslContextForClient(
                                         conf.isTlsAllowInsecureConnection() || !conf.isTlsHostnameVerificationEnable(),
                                         authData.getTlsTrustStoreStream(), authData.getTlsCertificates(),
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java
index 33f2439..c73bdfd 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java
@@ -31,21 +31,25 @@ public class AsyncHttpConnectorProvider implements ConnectorProvider {
 
     private final ClientConfigurationData conf;
     private Connector connector;
+    private final int autoCertRefreshTimeSeconds;
 
-    public AsyncHttpConnectorProvider(ClientConfigurationData conf) {
+    public AsyncHttpConnectorProvider(ClientConfigurationData conf, int autoCertRefreshTimeSeconds) {
         this.conf = conf;
+        this.autoCertRefreshTimeSeconds = autoCertRefreshTimeSeconds;
     }
 
     @Override
     public Connector getConnector(Client client, Configuration runtimeConfig) {
         if (connector == null) {
-            connector = new AsyncHttpConnector(client, conf);
+            connector = new AsyncHttpConnector(client, conf, autoCertRefreshTimeSeconds);
         }
         return connector;
     }
 
 
-    public AsyncHttpConnector getConnector(int connectTimeoutMs, int readTimeoutMs, int requestTimeoutMs) {
-        return new AsyncHttpConnector(connectTimeoutMs, readTimeoutMs, requestTimeoutMs, conf);
+    public AsyncHttpConnector getConnector(int connectTimeoutMs, int readTimeoutMs, int requestTimeoutMs,
+            int autoCertRefreshTimeSeconds) {
+        return new AsyncHttpConnector(connectTimeoutMs, readTimeoutMs, requestTimeoutMs, autoCertRefreshTimeSeconds,
+                conf);
     }
 }
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationDataProvider.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationDataProvider.java
index 87adba3..4624821 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationDataProvider.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationDataProvider.java
@@ -58,6 +58,13 @@ public interface AuthenticationDataProvider extends Serializable {
     }
 
     /**
+     * @return a client certificate file path
+     */
+    default String getTlsCerificateFilePath() {
+        return null;
+    }
+
+    /**
      *
      * @return a private key for the client certificate, or null if the data are not available
      */
@@ -67,6 +74,14 @@ public interface AuthenticationDataProvider extends Serializable {
 
     /**
      *
+     * @return a private key file path
+     */
+    default String getTlsPrivateKeyFilePath() {
+        return null;
+    }
+
+    /**
+     *
      * @return an input-stream of the trust store, or null if the trust-store provided at
      *         {@link ClientConfigurationData#getTlsTrustStorePath()}
      */
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataTls.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataTls.java
index 588f614..3e9e903 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataTls.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataTls.java
@@ -129,5 +129,15 @@ public class AuthenticationDataTls implements AuthenticationDataProvider {
         return trustStoreStreamProvider != null ? trustStoreStreamProvider.get() : null;
     }
 
+    @Override
+    public String getTlsCerificateFilePath() {
+        return certFile != null ? certFile.getFileName() : null;
+    }
+
+    @Override
+    public String getTlsPrivateKeyFilePath() {
+        return keyFile != null ? keyFile.getFileName() : null;
+    }
+
     private static final Logger LOG = LoggerFactory.getLogger(AuthenticationDataTls.class);
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FileModifiedTimeUpdater.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FileModifiedTimeUpdater.java
index d269624..99bdb8e 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FileModifiedTimeUpdater.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FileModifiedTimeUpdater.java
@@ -24,12 +24,15 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.nio.file.attribute.FileTime;
 import lombok.Getter;
+import lombok.ToString;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Class working with file's modified time.
  */
+@ToString
 public class FileModifiedTimeUpdater {
     @Getter
     String fileName;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/KeyManagerProxy.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/KeyManagerProxy.java
new file mode 100644
index 0000000..b7b5173
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/KeyManagerProxy.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.Socket;
+import java.security.KeyManagementException;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.Principal;
+import java.security.PrivateKey;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
+import java.security.cert.CertificateFactory;
+import java.security.cert.X509Certificate;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.X509ExtendedKeyManager;
+
+import io.netty.handler.ssl.SslContext;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * This class wraps {@link X509ExtendedKeyManager} and gives opportunity to refresh key-manager with refreshed certs
+ * without changing {@link SslContext}.
+ */
+@Slf4j
+public class KeyManagerProxy extends X509ExtendedKeyManager {
+
+    private static final char[] KEYSTORE_PASSWORD = "secret".toCharArray();
+    private volatile X509ExtendedKeyManager keyManager;
+    private FileModifiedTimeUpdater certFile, keyFile;
+
+    public KeyManagerProxy(String certFilePath, String keyFilePath, int refreshDurationSec,
+            ScheduledExecutorService executor) {
+        this.certFile = new FileModifiedTimeUpdater(certFilePath);
+        this.keyFile = new FileModifiedTimeUpdater(keyFilePath);
+        try {
+            updateKeyManager();
+        } catch (CertificateException e) {
+            log.warn("Failed to load cert {}", certFile, e);
+            throw new IllegalArgumentException(e);
+        } catch (KeyStoreException e) {
+            log.warn("Failed to load key {}", keyFile, e);
+            throw new IllegalArgumentException(e);
+        } catch (NoSuchAlgorithmException | UnrecoverableKeyException e) {
+            log.warn("Failed to update key Manager", e);
+            throw new IllegalArgumentException(e);
+        }
+        executor.scheduleWithFixedDelay(() -> updateKeyManagerSafely(), refreshDurationSec, refreshDurationSec,
+                TimeUnit.SECONDS);
+    }
+
+    public void updateKeyManagerSafely() {
+        try {
+            updateKeyManager();
+        } catch (Exception e) {
+            log.warn("Failed to update key Manager for {}, {}", certFile.getFileName(), keyFile.getFileName(), e);
+        }
+    }
+
+    public void updateKeyManager()
+            throws CertificateException, KeyStoreException, NoSuchAlgorithmException, UnrecoverableKeyException {
+        if (keyManager != null && !certFile.checkAndRefresh() && !keyFile.checkAndRefresh()) {
+            return;
+        }
+        log.info("refreshing key manager for {} {}", certFile.getFileName(), keyFile.getFileName());
+        X509Certificate certificate;
+        PrivateKey privateKey = null;
+        KeyStore keyStore;
+        try (InputStream publicCertStream = new FileInputStream(certFile.getFileName());
+                InputStream privateKeyStream = new FileInputStream(keyFile.getFileName())) {
+            final CertificateFactory cf = CertificateFactory.getInstance("X.509");
+            certificate = (X509Certificate) cf.generateCertificate(publicCertStream);
+            keyStore = KeyStore.getInstance("JKS");
+            String alias = certificate.getSubjectX500Principal().getName();
+            privateKey = SecurityUtility.loadPrivateKeyFromPemFile(keyFile.getFileName());
+            keyStore.load(null);
+            keyStore.setKeyEntry(alias, privateKey, KEYSTORE_PASSWORD, new X509Certificate[] { certificate });
+        } catch (IOException | KeyManagementException e) {
+            throw new IllegalArgumentException(e);
+        }
+
+        final KeyManagerFactory keyManagerFactory = KeyManagerFactory
+                .getInstance(KeyManagerFactory.getDefaultAlgorithm());
+        keyManagerFactory.init(keyStore, KEYSTORE_PASSWORD);
+        this.keyManager = (X509ExtendedKeyManager) keyManagerFactory.getKeyManagers()[0];
+    }
+
+    @Override
+    public String[] getClientAliases(String s, Principal[] principals) {
+        return keyManager.getClientAliases(s, principals);
+    }
+
+    @Override
+    public String chooseClientAlias(String[] strings, Principal[] principals, Socket socket) {
+        return keyManager.chooseClientAlias(strings, principals, socket);
+    }
+
+    @Override
+    public String[] getServerAliases(String s, Principal[] principals) {
+        return keyManager.getServerAliases(s, principals);
+    }
+
+    @Override
+    public String chooseServerAlias(String s, Principal[] principals, Socket socket) {
+        return keyManager.chooseServerAlias(s, principals, socket);
+    }
+
+    @Override
+    public X509Certificate[] getCertificateChain(String s) {
+        return keyManager.getCertificateChain(s);
+    }
+
+    @Override
+    public PrivateKey getPrivateKey(String s) {
+        return keyManager.getPrivateKey(s);
+    }
+
+    @Override
+    public String chooseEngineClientAlias(String[] keyType, Principal[] issuers, SSLEngine engine) {
+        return keyManager.chooseEngineClientAlias(keyType, issuers, engine);
+    }
+
+    @Override
+    public String chooseEngineServerAlias(String keyType, Principal[] issuers, SSLEngine engine) {
+        return keyManager.chooseEngineServerAlias(keyType, issuers, engine);
+    }
+
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java
index 5d0b896..8438f13 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java
@@ -49,6 +49,8 @@ import java.security.spec.PKCS8EncodedKeySpec;
 import java.util.Base64;
 import java.util.Collection;
 import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+
 import javax.net.ssl.KeyManager;
 import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.SSLContext;
@@ -72,6 +74,7 @@ public class SecurityUtility {
     // also used to get Factories. e.g. CertificateFactory.getInstance("X.509", "BCFIPS")
     public static final String BC_FIPS = "BCFIPS";
     public static final String BC = "BC";
+    private static final String SSLCONTEXT_ALGORITHM = "TLSv1.2";
 
     public static boolean isBCFIPS() {
         return BC_PROVIDER.getClass().getCanonicalName().equals(BC_FIPS_PROVIDER_CLASS);
@@ -148,6 +151,37 @@ public class SecurityUtility {
         return createSslContext(allowInsecureConnection, trustCertificates, certificates, privateKey);
     }
 
+    /**
+     * Creates {@link SslContext} with capability to do auto-cert refresh.
+     * @param allowInsecureConnection
+     * @param trustCertsFilePath
+     * @param certFilePath
+     * @param keyFilePath
+     * @param sslContextAlgorithm
+     * @param refreshDurationSec
+     * @param executor
+     * @return
+     * @throws GeneralSecurityException
+     * @throws SSLException
+     * @throws FileNotFoundException
+     * @throws IOException
+     */
+    public static SslContext createAutoRefreshSslContextForClient(boolean allowInsecureConnection,
+            String trustCertsFilePath, String certFilePath, String keyFilePath, String sslContextAlgorithm,
+            int refreshDurationSec, ScheduledExecutorService executor)
+            throws GeneralSecurityException, SSLException, FileNotFoundException, IOException {
+        KeyManagerProxy keyManager = new KeyManagerProxy(certFilePath, keyFilePath, refreshDurationSec, executor);
+        SslContextBuilder sslContexBuilder = SslContextBuilder.forClient();
+        sslContexBuilder.keyManager(keyManager);
+        if (allowInsecureConnection) {
+            sslContexBuilder.trustManager(InsecureTrustManagerFactory.INSTANCE);
+        } else {
+            TrustManagerProxy trustManager = new TrustManagerProxy(trustCertsFilePath, refreshDurationSec, executor);
+            sslContexBuilder.trustManager(trustManager);
+        }
+        return sslContexBuilder.build();
+    }
+
     public static SslContext createNettySslContextForClient(boolean allowInsecureConnection, String trustCertsFilePath,
             String certFilePath, String keyFilePath)
             throws GeneralSecurityException, SSLException, FileNotFoundException, IOException {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/TrustManagerProxy.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/TrustManagerProxy.java
new file mode 100644
index 0000000..7edbbb4
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/TrustManagerProxy.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.Socket;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.CertificateException;
+import java.security.cert.CertificateFactory;
+import java.security.cert.X509Certificate;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.TrustManagerFactory;
+import javax.net.ssl.X509ExtendedTrustManager;
+
+import io.netty.handler.ssl.SslContext;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * This class wraps {@link X509ExtendedTrustManager} and gives opportunity to refresh Trust-manager with refreshed certs
+ * without changing {@link SslContext}.
+ */
+@Slf4j
+public class TrustManagerProxy extends X509ExtendedTrustManager {
+
+    private volatile X509ExtendedTrustManager trustManager;
+    private FileModifiedTimeUpdater certFile;
+
+    public TrustManagerProxy(String caCertFile, int refreshDurationSec, ScheduledExecutorService executor) {
+        this.certFile = new FileModifiedTimeUpdater(caCertFile);
+        try {
+            updateTrustManager();
+        } catch (IOException | CertificateException e) {
+            log.warn("Failed to load cert {}, {}", certFile, e.getMessage());
+            throw new IllegalArgumentException(e);
+        } catch (NoSuchAlgorithmException | KeyStoreException e) {
+            log.warn("Failed to init trust-store", e);
+            throw new IllegalArgumentException(e);
+        }
+        executor.scheduleWithFixedDelay(() -> updateTrustManagerSafely(), refreshDurationSec, refreshDurationSec,
+                TimeUnit.SECONDS);
+    }
+
+    private void updateTrustManagerSafely() {
+        try {
+            updateTrustManager();
+        } catch (Exception e) {
+            log.warn("Failed to init trust-store {}", certFile.getFileName(), e);
+        }
+    }
+
+    private void updateTrustManager() throws CertificateException, KeyStoreException, NoSuchAlgorithmException,
+            FileNotFoundException, IOException {
+        CertificateFactory factory = CertificateFactory.getInstance("X.509");
+        try (InputStream inputStream = new FileInputStream(certFile.getFileName())) {
+            X509Certificate certificate = (X509Certificate) factory.generateCertificate(inputStream);
+            String alias = certificate.getSubjectX500Principal().getName();
+            KeyStore keyStore = KeyStore.getInstance("JKS");
+            keyStore.load(null);
+            keyStore.setCertificateEntry(alias, certificate);
+            final TrustManagerFactory trustManagerFactory = TrustManagerFactory
+                    .getInstance(TrustManagerFactory.getDefaultAlgorithm());
+            trustManagerFactory.init(keyStore);
+            trustManager = (X509ExtendedTrustManager) trustManagerFactory.getTrustManagers()[0];
+        }
+    }
+
+    @Override
+    public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
+        trustManager.checkClientTrusted(x509Certificates, s);
+    }
+
+    @Override
+    public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
+        trustManager.checkServerTrusted(x509Certificates, s);
+    }
+
+    @Override
+    public X509Certificate[] getAcceptedIssuers() {
+        return trustManager.getAcceptedIssuers();
+    }
+
+    @Override
+    public void checkClientTrusted(X509Certificate[] chain, String authType, Socket socket)
+            throws CertificateException {
+        trustManager.checkClientTrusted(chain, authType, socket);
+    }
+
+    @Override
+    public void checkClientTrusted(X509Certificate[] chain, String authType, SSLEngine engine)
+            throws CertificateException {
+        trustManager.checkClientTrusted(chain, authType, engine);
+    }
+
+    @Override
+    public void checkServerTrusted(X509Certificate[] chain, String authType, Socket socket)
+            throws CertificateException {
+        trustManager.checkServerTrusted(chain, authType, socket);
+    }
+
+    @Override
+    public void checkServerTrusted(X509Certificate[] chain, String authType, SSLEngine engine)
+            throws CertificateException {
+        trustManager.checkServerTrusted(chain, authType, engine);
+    }
+}