You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zi...@apache.org on 2022/09/29 10:52:32 UTC

[pulsar] branch branch-2.11 updated: [fix][proxy] Fix refresh client auth (#17831)

This is an automated email from the ASF dual-hosted git repository.

zixuan pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 298e3789b3e [fix][proxy] Fix refresh client auth (#17831)
298e3789b3e is described below

commit 298e3789b3ee6761b3273d172c22d5a2c61f8134
Author: Zixuan Liu <no...@gmail.com>
AuthorDate: Thu Sep 29 18:48:38 2022 +0800

    [fix][proxy] Fix refresh client auth (#17831)
    
    * [fix][proxy] Fix refresh client auth
    
    Signed-off-by: Zixuan Liu <no...@gmail.com>
    
    * Fix style
    
    Signed-off-by: Zixuan Liu <no...@gmail.com>
    (cherry picked from commit c952f3c9f891f85ff4b6cee6e28b6f68db3b5bcd)
---
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  17 +-
 .../apache/pulsar/client/impl/ConnectionPool.java  |   9 +-
 pulsar-proxy/pom.xml                               |   5 +
 .../apache/pulsar/proxy/server/ProxyClientCnx.java |  77 +++++++--
 .../pulsar/proxy/server/ProxyConnection.java       |  73 ++++++--
 .../pulsar/proxy/server/ProxyRefreshAuthTest.java  | 186 +++++++++++++++++++++
 6 files changed, 338 insertions(+), 29 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index eb39fe53f1a..a6b9005611c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -104,7 +104,7 @@ import org.slf4j.LoggerFactory;
 public class ClientCnx extends PulsarHandler {
 
     protected final Authentication authentication;
-    private State state;
+    protected State state;
 
     @Getter
     private final ConcurrentLongHashMap<TimedCompletableFuture<? extends Object>> pendingRequests =
@@ -155,7 +155,7 @@ public class ClientCnx extends PulsarHandler {
 
     private final int maxNumberOfRejectedRequestPerConnection;
     private final int rejectedRequestResetTimeSec = 60;
-    private final int protocolVersion;
+    protected final int protocolVersion;
     private final long operationTimeoutMs;
 
     protected String proxyToTargetBrokerAddress = null;
@@ -176,7 +176,10 @@ public class ClientCnx extends PulsarHandler {
     @Getter
     private final ClientCnxIdleState idleState;
 
-    enum State {
+    @Getter
+    private long lastDisconnectedTimestamp;
+
+    protected enum State {
         None, SentConnectFrame, Ready, Failed, Connecting
     }
 
@@ -281,6 +284,7 @@ public class ClientCnx extends PulsarHandler {
     @Override
     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
         super.channelInactive(ctx);
+        lastDisconnectedTimestamp = System.currentTimeMillis();
         log.info("{} Disconnected", ctx.channel());
         if (!connectionFuture.isDone()) {
             connectionFuture.completeExceptionally(new PulsarClientException("Connection already closed"));
@@ -1243,6 +1247,13 @@ public class ClientCnx extends PulsarHandler {
        }
     }
 
+    protected void closeWithException(Throwable e) {
+       if (ctx != null) {
+           connectionFuture.completeExceptionally(e);
+           ctx.close();
+       }
+    }
+
     private void checkRequestTimeout() {
         while (!requestTimeoutQueue.isEmpty()) {
             RequestTime request = requestTimeoutQueue.peek();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index d85d853c4aa..36f28fb5756 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -35,16 +35,19 @@ import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL;
@@ -453,5 +456,9 @@ public class ConnectionPool implements AutoCloseable {
         // Do release idle connections.
         releaseIdleConnectionTaskList.forEach(Runnable::run);
     }
-}
 
+    public Set<CompletableFuture<ClientCnx>> getConnections() {
+        return Collections.unmodifiableSet(
+                pool.values().stream().flatMap(n -> n.values().stream()).collect(Collectors.toSet()));
+    }
+}
diff --git a/pulsar-proxy/pom.xml b/pulsar-proxy/pom.xml
index fc33470eff0..ee493de91a3 100644
--- a/pulsar-proxy/pom.xml
+++ b/pulsar-proxy/pom.xml
@@ -174,6 +174,11 @@
       <artifactId>ipaddress</artifactId>
       <version>${seancfoley.ipaddress.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.awaitility</groupId>
+      <artifactId>awaitility</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
   <build>
     <plugins>
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
index 50a77d33683..283b835fff5 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
@@ -18,30 +18,35 @@
  */
 package org.apache.pulsar.proxy.server;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.EventLoopGroup;
+import java.util.Arrays;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.api.AuthData;
+import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
 import org.apache.pulsar.common.protocol.Commands;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
+@Slf4j
 public class ProxyClientCnx extends ClientCnx {
-
-    String clientAuthRole;
-    AuthData clientAuthData;
-    String clientAuthMethod;
-    int protocolVersion;
+    private final boolean forwardClientAuthData;
+    private final String clientAuthMethod;
+    private final String clientAuthRole;
+    private final AuthData clientAuthData;
+    private final ProxyConnection proxyConnection;
 
     public ProxyClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, String clientAuthRole,
-                          AuthData clientAuthData, String clientAuthMethod, int protocolVersion) {
-        super(conf, eventLoopGroup);
+                          AuthData clientAuthData, String clientAuthMethod, int protocolVersion,
+                          boolean forwardClientAuthData, ProxyConnection proxyConnection) {
+        super(conf, eventLoopGroup, protocolVersion);
         this.clientAuthRole = clientAuthRole;
         this.clientAuthData = clientAuthData;
         this.clientAuthMethod = clientAuthMethod;
-        this.protocolVersion = protocolVersion;
+        this.forwardClientAuthData = forwardClientAuthData;
+        this.proxyConnection = proxyConnection;
     }
 
     @Override
@@ -54,10 +59,54 @@ public class ProxyClientCnx extends ClientCnx {
 
         authenticationDataProvider = authentication.getAuthData(remoteHostName);
         AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
-        return Commands.newConnect(authentication.getAuthMethodName(), authData, this.protocolVersion,
-            PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData,
-            clientAuthMethod);
+        return Commands.newConnect(authentication.getAuthMethodName(), authData, protocolVersion,
+                PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData,
+                clientAuthMethod);
     }
 
-    private static final Logger log = LoggerFactory.getLogger(ProxyClientCnx.class);
+    @Override
+    protected void handleAuthChallenge(CommandAuthChallenge authChallenge) {
+        checkArgument(authChallenge.hasChallenge());
+        checkArgument(authChallenge.getChallenge().hasAuthData());
+
+        boolean isRefresh = Arrays.equals(AuthData.REFRESH_AUTH_DATA_BYTES, authChallenge.getChallenge().getAuthData());
+        if (!forwardClientAuthData || !isRefresh) {
+            super.handleAuthChallenge(authChallenge);
+            return;
+        }
+
+        try {
+            if (log.isDebugEnabled()) {
+                log.debug("Proxy {} request to refresh the original client authentication data for "
+                        + "the proxy client {}", proxyConnection.ctx().channel(), ctx.channel());
+            }
+
+            proxyConnection.ctx().writeAndFlush(Commands.newAuthChallenge(clientAuthMethod, AuthData.REFRESH_AUTH_DATA,
+                            protocolVersion))
+                    .addListener(writeFuture -> {
+                        if (writeFuture.isSuccess()) {
+                            if (log.isDebugEnabled()) {
+                                log.debug("Proxy {} sent the auth challenge to original client to refresh credentials "
+                                                + "with method {} for the proxy client {}",
+                                        proxyConnection.ctx().channel(), clientAuthMethod, ctx.channel());
+                            }
+                        } else {
+                            log.error("Failed to send the auth challenge to original client by the proxy {} "
+                                            + "for the proxy client {}",
+                                    proxyConnection.ctx().channel(),
+                                    ctx.channel(),
+                                    writeFuture.cause());
+                            closeWithException(writeFuture.cause());
+                        }
+                    });
+
+            if (state == State.SentConnectFrame) {
+                state = State.Connecting;
+            }
+        } catch (Exception e) {
+            log.error("Failed to send the auth challenge to origin client by the proxy {} for the proxy client {}",
+                    proxyConnection.ctx().channel(), ctx.channel(), e);
+            closeWithException(e);
+        }
+    }
 }
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index fe62b606314..30d36446ccd 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -45,6 +45,7 @@ import java.util.function.Supplier;
 import javax.naming.AuthenticationException;
 import javax.net.ssl.SSLSession;
 import lombok.Getter;
+import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.authentication.AuthenticationProvider;
@@ -307,12 +308,11 @@ public class ProxyConnection extends PulsarHandler {
                 this.clientAuthData = clientData;
                 this.clientAuthMethod = authMethod;
             }
-            clientCnxSupplier =
-                    () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, clientAuthData,
-                            clientAuthMethod, protocolVersionToAdvertise);
+            clientCnxSupplier = () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole,
+                    clientAuthData, clientAuthMethod, protocolVersionToAdvertise,
+                    service.getConfiguration().isForwardAuthorizationCredentials(), this);
         } else {
-            clientCnxSupplier =
-                    () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersionToAdvertise);
+            clientCnxSupplier = () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersionToAdvertise);
         }
 
         if (this.connectionPool == null) {
@@ -414,16 +414,22 @@ public class ProxyConnection extends PulsarHandler {
     }
 
     // According to auth result, send newConnected or newAuthChallenge command.
-    private void doAuthentication(AuthData clientData) throws Exception {
+    private void doAuthentication(AuthData clientData)
+            throws Exception {
         AuthData brokerData = authState.authenticate(clientData);
         // authentication has completed, will send newConnected command.
         if (authState.isComplete()) {
             clientAuthRole = authState.getAuthRole();
             if (LOG.isDebugEnabled()) {
                 LOG.debug("[{}] Client successfully authenticated with {} role {}",
-                    remoteAddress, authMethod, clientAuthRole);
+                        remoteAddress, authMethod, clientAuthRole);
+            }
+
+            // First connection
+            if (this.connectionPool == null || state == State.Connecting) {
+                // authentication has completed, will send newConnected command.
+                completeConnect(clientData);
             }
-            completeConnect(clientData);
             return;
         }
 
@@ -432,7 +438,7 @@ public class ProxyConnection extends PulsarHandler {
                 .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
         if (LOG.isDebugEnabled()) {
             LOG.debug("[{}] Authentication in progress client by method {}.",
-                remoteAddress, authMethod);
+                    remoteAddress, authMethod);
         }
         state = State.Connecting;
     }
@@ -514,18 +520,63 @@ public class ProxyConnection extends PulsarHandler {
 
     @Override
     protected void handleAuthResponse(CommandAuthResponse authResponse) {
-        checkArgument(state == State.Connecting);
         checkArgument(authResponse.hasResponse());
         checkArgument(authResponse.getResponse().hasAuthData() && authResponse.getResponse().hasAuthMethodName());
 
         if (LOG.isDebugEnabled()) {
             LOG.debug("Received AuthResponse from {}, auth method: {}",
-                remoteAddress, authResponse.getResponse().getAuthMethodName());
+                    remoteAddress, authResponse.getResponse().getAuthMethodName());
         }
 
         try {
             AuthData clientData = AuthData.of(authResponse.getResponse().getAuthData());
             doAuthentication(clientData);
+            if (service.getConfiguration().isForwardAuthorizationCredentials()
+                    && connectionPool != null && state == State.ProxyLookupRequests) {
+                connectionPool.getConnections().forEach(toBrokerCnxFuture -> {
+                    String clientVersion;
+                    if (authResponse.hasClientVersion()) {
+                        clientVersion = authResponse.getClientVersion();
+                    } else {
+                        clientVersion = PulsarVersion.getVersion();
+                    }
+                    int protocolVersion;
+                    if (authResponse.hasProtocolVersion()) {
+                        protocolVersion = authResponse.getProtocolVersion();
+                    } else {
+                        protocolVersion = Commands.getCurrentProtocolVersion();
+                    }
+
+                    ByteBuf cmd =
+                            Commands.newAuthResponse(clientAuthMethod, clientData, protocolVersion, clientVersion);
+                    toBrokerCnxFuture.thenAccept(toBrokerCnx -> toBrokerCnx.ctx().writeAndFlush(cmd)
+                                    .addListener(writeFuture -> {
+                                        if (writeFuture.isSuccess()) {
+                                            if (LOG.isDebugEnabled()) {
+                                                LOG.debug("{} authentication is refreshed successfully by {}, "
+                                                                + "auth method: {} ",
+                                                        toBrokerCnx.ctx().channel(), ctx.channel(), clientAuthMethod);
+                                            }
+                                        } else {
+                                            LOG.error("Failed to forward the auth response "
+                                                            + "from the proxy to the broker through the proxy client, "
+                                                            + "proxy: {}, proxy client: {}",
+                                                    ctx.channel(),
+                                                    toBrokerCnx.ctx().channel(),
+                                                    writeFuture.cause());
+                                            toBrokerCnx.ctx().channel().pipeline()
+                                                    .fireExceptionCaught(writeFuture.cause());
+                                        }
+                                    }))
+                            .whenComplete((__, ex) -> {
+                                if (ex != null) {
+                                    LOG.error("Failed to forward the auth response from the proxy to "
+                                                    + "the broker through the proxy client, proxy: {}",
+                                            ctx().channel(), ex);
+                                }
+                            });
+                });
+            }
         } catch (Exception e) {
             String msg = "Unable to handleAuthResponse";
             LOG.warn("[{}] {} ", remoteAddress, msg, e);
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
new file mode 100644
index 00000000000..9ccb067adbf
--- /dev/null
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertTrue;
+import com.google.common.collect.Sets;
+import io.jsonwebtoken.SignatureAlgorithm;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import javax.crypto.SecretKey;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.ClientCnx;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.awaitility.Awaitility;
+import org.mockito.Mockito;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class ProxyRefreshAuthTest extends ProducerConsumerBase {
+    private final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+
+    private ProxyService proxyService;
+    private final ProxyConfiguration proxyConfig = new ProxyConfiguration();
+
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+
+        // enable tls and auth&auth at broker
+        conf.setAuthenticationEnabled(true);
+        conf.setAuthorizationEnabled(false);
+        conf.setTopicLevelPoliciesEnabled(false);
+        conf.setProxyRoles(Collections.singleton("Proxy"));
+        conf.setAdvertisedAddress(null);
+        conf.setAuthenticateOriginalAuthData(true);
+        conf.setBrokerServicePort(Optional.of(0));
+        conf.setWebServicePort(Optional.of(0));
+
+        Set<String> superUserRoles = new HashSet<>();
+        superUserRoles.add("superUser");
+        conf.setSuperUserRoles(superUserRoles);
+
+        conf.setAuthenticationProviders(Set.of(AuthenticationProviderToken.class.getName()));
+        Properties properties = new Properties();
+        properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(SECRET_KEY));
+        conf.setProperties(properties);
+
+        conf.setClusterName("proxy-authorization");
+        conf.setNumExecutorThreadPoolSize(5);
+
+        conf.setAuthenticationRefreshCheckSeconds(1);
+    }
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        super.init();
+
+        admin = PulsarAdmin.builder().serviceHttpUrl(pulsar.getWebServiceAddress())
+                .authentication(new AuthenticationToken(
+                        () -> AuthTokenUtils.createToken(SECRET_KEY, "client", Optional.empty()))).build();
+        String namespaceName = "my-tenant/my-ns";
+        admin.clusters().createCluster("proxy-authorization",
+                ClusterData.builder().serviceUrlTls(brokerUrlTls.toString()).build());
+        admin.tenants().createTenant("my-tenant",
+                new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("proxy-authorization")));
+        admin.namespaces().createNamespace(namespaceName);
+
+        // start proxy service
+        proxyConfig.setAuthenticationEnabled(true);
+        proxyConfig.setAuthorizationEnabled(false);
+        proxyConfig.setForwardAuthorizationCredentials(true);
+        proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
+        proxyConfig.setAdvertisedAddress(null);
+
+        proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
+        proxyConfig.setWebServicePort(Optional.of(0));
+
+        proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
+        proxyConfig.setBrokerClientAuthenticationParameters(
+                AuthTokenUtils.createToken(SECRET_KEY, "Proxy", Optional.empty()));
+        proxyConfig.setAuthenticationProviders(Set.of(AuthenticationProviderToken.class.getName()));
+        Properties properties = new Properties();
+        properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(SECRET_KEY));
+        proxyConfig.setProperties(properties);
+
+        proxyService = Mockito.spy(new ProxyService(proxyConfig,
+                new AuthenticationService(
+                        PulsarConfigurationLoader.convertFrom(proxyConfig))));
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+        proxyService.close();
+    }
+
+    private void startProxy(boolean forwardAuthData) throws Exception {
+        pulsar.getConfiguration().setAuthenticateOriginalAuthData(forwardAuthData);
+        proxyConfig.setForwardAuthorizationCredentials(forwardAuthData);
+        proxyService.start();
+    }
+
+    @DataProvider
+    Object[] forwardAuthDataProvider() {
+        return new Object[]{true, false};
+    }
+
+    @Test(dataProvider = "forwardAuthDataProvider")
+    public void testAuthDataRefresh(boolean forwardAuthData) throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        startProxy(forwardAuthData);
+
+        AuthenticationToken authenticationToken = new AuthenticationToken(() -> {
+            Calendar calendar = Calendar.getInstance();
+            calendar.add(Calendar.SECOND, 1);
+            return AuthTokenUtils.createToken(SECRET_KEY, "client", Optional.of(calendar.getTime()));
+        });
+
+        pulsarClient = PulsarClient.builder().serviceUrl(proxyService.getServiceUrl())
+                .authentication(authenticationToken)
+                .build();
+
+        String topic = "persistent://my-tenant/my-ns/my-topic1";
+        @Cleanup
+        Producer<byte[]> ignored = spy(pulsarClient.newProducer()
+                .topic(topic).create());
+
+        PulsarClientImpl pulsarClientImpl = (PulsarClientImpl) pulsarClient;
+        Set<CompletableFuture<ClientCnx>> connections = pulsarClientImpl.getCnxPool().getConnections();
+
+        Awaitility.await().during(4, SECONDS).untilAsserted(() -> {
+            pulsarClient.getPartitionsForTopic(topic).get();
+            assertTrue(connections.stream().allMatch(n -> {
+                try {
+                    ClientCnx clientCnx = n.get();
+                    long timestamp = clientCnx.getLastDisconnectedTimestamp();
+                    return timestamp == 0;
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }));
+        });
+    }
+}