You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zi...@apache.org on 2022/09/29 11:09:08 UTC

[pulsar] branch branch-2.9 updated: [fix][proxy] Fix refresh client auth (#17831)

This is an automated email from the ASF dual-hosted git repository.

zixuan pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 39ce3240b4e [fix][proxy] Fix refresh client auth (#17831)
39ce3240b4e is described below

commit 39ce3240b4eacc7e2da5d338ffda8c0b10f2fdb7
Author: Zixuan Liu <no...@gmail.com>
AuthorDate: Thu Sep 29 18:48:38 2022 +0800

    [fix][proxy] Fix refresh client auth (#17831)
    
    * [fix][proxy] Fix refresh client auth
    
    Signed-off-by: Zixuan Liu <no...@gmail.com>
    
    * Fix style
    
    Signed-off-by: Zixuan Liu <no...@gmail.com>
    
    (cherry picked from commit c952f3c9f891f85ff4b6cee6e28b6f68db3b5bcd)
    Signed-off-by: Zixuan Liu <no...@gmail.com>
---
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  24 ++-
 .../apache/pulsar/client/impl/ConnectionPool.java  |   8 +
 pulsar-proxy/pom.xml                               |   5 +
 .../apache/pulsar/proxy/server/ProxyClientCnx.java |  82 +++++++--
 .../pulsar/proxy/server/ProxyConnection.java       |  89 +++++++---
 .../pulsar/proxy/server/ProxyRefreshAuthTest.java  | 186 +++++++++++++++++++++
 6 files changed, 349 insertions(+), 45 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 03a72fe16b4..506a0d36ca7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -55,12 +55,9 @@ import org.apache.pulsar.client.api.AuthenticationDataProvider;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.PulsarClientException.ConnectException;
 import org.apache.pulsar.client.api.PulsarClientException.TimeoutException;
-import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
-import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.BinaryProtoLookupService.LookupDataResult;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.apache.pulsar.common.api.proto.CommandTcClientConnectResponse;
-import org.apache.pulsar.common.tls.TlsHostnameVerifier;
+import org.apache.pulsar.client.impl.schema.SchemaInfoUtil;
 import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler;
 import org.apache.pulsar.client.util.TimedCompletableFuture;
 import org.apache.pulsar.common.api.AuthData;
@@ -89,10 +86,10 @@ import org.apache.pulsar.common.api.proto.CommandReachedEndOfTopic;
 import org.apache.pulsar.common.api.proto.CommandSendError;
 import org.apache.pulsar.common.api.proto.CommandSendReceipt;
 import org.apache.pulsar.common.api.proto.CommandSuccess;
+import org.apache.pulsar.common.api.proto.CommandTcClientConnectResponse;
 import org.apache.pulsar.common.api.proto.ServerError;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.PulsarHandler;
-import org.apache.pulsar.client.impl.schema.SchemaInfoUtil;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -104,7 +101,7 @@ import org.slf4j.LoggerFactory;
 public class ClientCnx extends PulsarHandler {
 
     protected final Authentication authentication;
-    private State state;
+    protected State state;
 
     @Getter
     private final ConcurrentLongHashMap<TimedCompletableFuture<? extends Object>> pendingRequests =
@@ -149,7 +146,7 @@ public class ClientCnx extends PulsarHandler {
 
     private final int maxNumberOfRejectedRequestPerConnection;
     private final int rejectedRequestResetTimeSec = 60;
-    private final int protocolVersion;
+    protected final int protocolVersion;
     private final long operationTimeoutMs;
 
     protected String proxyToTargetBrokerAddress = null;
@@ -165,7 +162,10 @@ public class ClientCnx extends PulsarHandler {
     protected AuthenticationDataProvider authenticationDataProvider;
     private TransactionBufferHandler transactionBufferHandler;
 
-    enum State {
+    @Getter
+    private long lastDisconnectedTimestamp;
+
+    protected enum State {
         None, SentConnectFrame, Ready, Failed, Connecting
     }
 
@@ -268,6 +268,7 @@ public class ClientCnx extends PulsarHandler {
     @Override
     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
         super.channelInactive(ctx);
+        lastDisconnectedTimestamp = System.currentTimeMillis();
         log.info("{} Disconnected", ctx.channel());
         if (!connectionFuture.isDone()) {
             connectionFuture.completeExceptionally(new PulsarClientException("Connection already closed"));
@@ -1163,6 +1164,13 @@ public class ClientCnx extends PulsarHandler {
        }
     }
 
+    protected void closeWithException(Throwable e) {
+       if (ctx != null) {
+           connectionFuture.completeExceptionally(e);
+           ctx.close();
+       }
+    }
+
     private void checkRequestTimeout() {
         while (!requestTimeoutQueue.isEmpty()) {
             RequestTime request = requestTimeoutQueue.peek();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index 15517e45ba3..433f20b3daf 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -33,15 +33,18 @@ import io.netty.util.concurrent.Future;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL;
@@ -345,6 +348,11 @@ public class ConnectionPool implements AutoCloseable {
         }
     }
 
+    public Set<CompletableFuture<ClientCnx>> getConnections() {
+        return Collections.unmodifiableSet(
+                pool.values().stream().flatMap(n -> n.values().stream()).collect(Collectors.toSet()));
+    }
+
     @VisibleForTesting
     int getPoolSize() {
         return pool.values().stream().mapToInt(Map::size).sum();
diff --git a/pulsar-proxy/pom.xml b/pulsar-proxy/pom.xml
index 983bc627cbc..f0687580ee8 100644
--- a/pulsar-proxy/pom.xml
+++ b/pulsar-proxy/pom.xml
@@ -185,6 +185,11 @@
       <artifactId>ipaddress</artifactId>
       <version>${seancfoley.ipaddress.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.awaitility</groupId>
+      <artifactId>awaitility</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
   <profiles>
     <profile>
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
index 665b9f83fd6..283b835fff5 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
@@ -18,47 +18,95 @@
  */
 package org.apache.pulsar.proxy.server;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.EventLoopGroup;
-
+import java.util.Arrays;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.api.AuthData;
+import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
 import org.apache.pulsar.common.protocol.Commands;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
+@Slf4j
 public class ProxyClientCnx extends ClientCnx {
-
-    String clientAuthRole;
-    AuthData clientAuthData;
-    String clientAuthMethod;
-    int protocolVersion;
+    private final boolean forwardClientAuthData;
+    private final String clientAuthMethod;
+    private final String clientAuthRole;
+    private final AuthData clientAuthData;
+    private final ProxyConnection proxyConnection;
 
     public ProxyClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, String clientAuthRole,
-                          AuthData clientAuthData, String clientAuthMethod, int protocolVersion) {
-        super(conf, eventLoopGroup);
+                          AuthData clientAuthData, String clientAuthMethod, int protocolVersion,
+                          boolean forwardClientAuthData, ProxyConnection proxyConnection) {
+        super(conf, eventLoopGroup, protocolVersion);
         this.clientAuthRole = clientAuthRole;
         this.clientAuthData = clientAuthData;
         this.clientAuthMethod = clientAuthMethod;
-        this.protocolVersion = protocolVersion;
+        this.forwardClientAuthData = forwardClientAuthData;
+        this.proxyConnection = proxyConnection;
     }
 
     @Override
     protected ByteBuf newConnectCommand() throws Exception {
         if (log.isDebugEnabled()) {
-            log.debug("New Connection opened via ProxyClientCnx with params clientAuthRole = {}," +
-                    " clientAuthData = {}, clientAuthMethod = {}",
+            log.debug("New Connection opened via ProxyClientCnx with params clientAuthRole = {},"
+                            + " clientAuthData = {}, clientAuthMethod = {}",
                     clientAuthRole, clientAuthData, clientAuthMethod);
         }
 
         authenticationDataProvider = authentication.getAuthData(remoteHostName);
         AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
-        return Commands.newConnect(authentication.getAuthMethodName(), authData, this.protocolVersion,
-            PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData,
-            clientAuthMethod);
+        return Commands.newConnect(authentication.getAuthMethodName(), authData, protocolVersion,
+                PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData,
+                clientAuthMethod);
     }
 
-    private static final Logger log = LoggerFactory.getLogger(ProxyClientCnx.class);
+    @Override
+    protected void handleAuthChallenge(CommandAuthChallenge authChallenge) {
+        checkArgument(authChallenge.hasChallenge());
+        checkArgument(authChallenge.getChallenge().hasAuthData());
+
+        boolean isRefresh = Arrays.equals(AuthData.REFRESH_AUTH_DATA_BYTES, authChallenge.getChallenge().getAuthData());
+        if (!forwardClientAuthData || !isRefresh) {
+            super.handleAuthChallenge(authChallenge);
+            return;
+        }
+
+        try {
+            if (log.isDebugEnabled()) {
+                log.debug("Proxy {} request to refresh the original client authentication data for "
+                        + "the proxy client {}", proxyConnection.ctx().channel(), ctx.channel());
+            }
+
+            proxyConnection.ctx().writeAndFlush(Commands.newAuthChallenge(clientAuthMethod, AuthData.REFRESH_AUTH_DATA,
+                            protocolVersion))
+                    .addListener(writeFuture -> {
+                        if (writeFuture.isSuccess()) {
+                            if (log.isDebugEnabled()) {
+                                log.debug("Proxy {} sent the auth challenge to original client to refresh credentials "
+                                                + "with method {} for the proxy client {}",
+                                        proxyConnection.ctx().channel(), clientAuthMethod, ctx.channel());
+                            }
+                        } else {
+                            log.error("Failed to send the auth challenge to original client by the proxy {} "
+                                            + "for the proxy client {}",
+                                    proxyConnection.ctx().channel(),
+                                    ctx.channel(),
+                                    writeFuture.cause());
+                            closeWithException(writeFuture.cause());
+                        }
+                    });
+
+            if (state == State.SentConnectFrame) {
+                state = State.Connecting;
+            }
+        } catch (Exception e) {
+            log.error("Failed to send the auth challenge to origin client by the proxy {} for the proxy client {}",
+                    proxyConnection.ctx().channel(), ctx.channel(), e);
+            closeWithException(e);
+        }
+    }
 }
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 42b4d0f08f7..bb8f2678745 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -20,7 +20,10 @@ package org.apache.pulsar.proxy.server;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
+import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.haproxy.HAProxyMessage;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.resolver.dns.DnsAddressResolverGroup;
@@ -37,7 +40,8 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Supplier;
 import javax.naming.AuthenticationException;
 import javax.net.ssl.SSLSession;
-
+import lombok.Getter;
+import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.authentication.AuthenticationProvider;
@@ -51,26 +55,21 @@ import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
 import org.apache.pulsar.client.internal.PropertiesUtils;
 import org.apache.pulsar.common.api.AuthData;
-import org.apache.pulsar.common.protocol.Commands;
-import org.apache.pulsar.common.protocol.PulsarHandler;
 import org.apache.pulsar.common.api.proto.CommandAuthResponse;
 import org.apache.pulsar.common.api.proto.CommandConnect;
 import org.apache.pulsar.common.api.proto.CommandConnected;
+import org.apache.pulsar.common.api.proto.CommandGetSchema;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
 import org.apache.pulsar.common.api.proto.CommandLookupTopic;
-import org.apache.pulsar.common.api.proto.CommandGetSchema;
 import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadata;
 import org.apache.pulsar.common.api.proto.ProtocolVersion;
 import org.apache.pulsar.common.api.proto.ServerError;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.protocol.PulsarHandler;
 import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
-import lombok.Getter;
-
 /**
  * Handles incoming discovery request from client and sends appropriate response back to client
  *
@@ -268,12 +267,11 @@ public class ProxyConnection extends PulsarHandler {
                 this.clientAuthData = clientData;
                 this.clientAuthMethod = authMethod;
             }
-            clientCnxSupplier =
-                    () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, clientAuthData,
-                            clientAuthMethod, protocolVersionToAdvertise);
+            clientCnxSupplier = () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole,
+                    clientAuthData, clientAuthMethod, protocolVersionToAdvertise,
+                    service.getConfiguration().isForwardAuthorizationCredentials(), this);
         } else {
-            clientCnxSupplier =
-                    () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersionToAdvertise);
+            clientCnxSupplier = () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersionToAdvertise);
         }
 
         if (this.connectionPool == null) {
@@ -374,16 +372,22 @@ public class ProxyConnection extends PulsarHandler {
     }
 
     // According to auth result, send newConnected or newAuthChallenge command.
-    private void doAuthentication(AuthData clientData) throws Exception {
+    private void doAuthentication(AuthData clientData)
+            throws Exception {
         AuthData brokerData = authState.authenticate(clientData);
         // authentication has completed, will send newConnected command.
         if (authState.isComplete()) {
             clientAuthRole = authState.getAuthRole();
             if (LOG.isDebugEnabled()) {
                 LOG.debug("[{}] Client successfully authenticated with {} role {}",
-                    remoteAddress, authMethod, clientAuthRole);
+                        remoteAddress, authMethod, clientAuthRole);
+            }
+
+            // First connection
+            if (this.connectionPool == null || state == State.Connecting) {
+                // authentication has completed, will send newConnected command.
+                completeConnect(clientData);
             }
-            completeConnect(clientData);
             return;
         }
 
@@ -392,7 +396,7 @@ public class ProxyConnection extends PulsarHandler {
                 .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
         if (LOG.isDebugEnabled()) {
             LOG.debug("[{}] Authentication in progress client by method {}.",
-                remoteAddress, authMethod);
+                    remoteAddress, authMethod);
         }
         state = State.Connecting;
     }
@@ -474,18 +478,63 @@ public class ProxyConnection extends PulsarHandler {
 
     @Override
     protected void handleAuthResponse(CommandAuthResponse authResponse) {
-        checkArgument(state == State.Connecting);
         checkArgument(authResponse.hasResponse());
         checkArgument(authResponse.getResponse().hasAuthData() && authResponse.getResponse().hasAuthMethodName());
 
         if (LOG.isDebugEnabled()) {
             LOG.debug("Received AuthResponse from {}, auth method: {}",
-                remoteAddress, authResponse.getResponse().getAuthMethodName());
+                    remoteAddress, authResponse.getResponse().getAuthMethodName());
         }
 
         try {
             AuthData clientData = AuthData.of(authResponse.getResponse().getAuthData());
             doAuthentication(clientData);
+            if (service.getConfiguration().isForwardAuthorizationCredentials()
+                    && connectionPool != null && state == State.ProxyLookupRequests) {
+                connectionPool.getConnections().forEach(toBrokerCnxFuture -> {
+                    String clientVersion;
+                    if (authResponse.hasClientVersion()) {
+                        clientVersion = authResponse.getClientVersion();
+                    } else {
+                        clientVersion = PulsarVersion.getVersion();
+                    }
+                    int protocolVersion;
+                    if (authResponse.hasProtocolVersion()) {
+                        protocolVersion = authResponse.getProtocolVersion();
+                    } else {
+                        protocolVersion = Commands.getCurrentProtocolVersion();
+                    }
+
+                    ByteBuf cmd =
+                            Commands.newAuthResponse(clientAuthMethod, clientData, protocolVersion, clientVersion);
+                    toBrokerCnxFuture.thenAccept(toBrokerCnx -> toBrokerCnx.ctx().writeAndFlush(cmd)
+                                    .addListener(writeFuture -> {
+                                        if (writeFuture.isSuccess()) {
+                                            if (LOG.isDebugEnabled()) {
+                                                LOG.debug("{} authentication is refreshed successfully by {}, "
+                                                                + "auth method: {} ",
+                                                        toBrokerCnx.ctx().channel(), ctx.channel(), clientAuthMethod);
+                                            }
+                                        } else {
+                                            LOG.error("Failed to forward the auth response "
+                                                            + "from the proxy to the broker through the proxy client, "
+                                                            + "proxy: {}, proxy client: {}",
+                                                    ctx.channel(),
+                                                    toBrokerCnx.ctx().channel(),
+                                                    writeFuture.cause());
+                                            toBrokerCnx.ctx().channel().pipeline()
+                                                    .fireExceptionCaught(writeFuture.cause());
+                                        }
+                                    }))
+                            .whenComplete((__, ex) -> {
+                                if (ex != null) {
+                                    LOG.error("Failed to forward the auth response from the proxy to "
+                                                    + "the broker through the proxy client, proxy: {}",
+                                            ctx().channel(), ex);
+                                }
+                            });
+                });
+            }
         } catch (Exception e) {
             String msg = "Unable to handleAuthResponse";
             LOG.warn("[{}] {} ", remoteAddress, msg, e);
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
new file mode 100644
index 00000000000..9ccb067adbf
--- /dev/null
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertTrue;
+import com.google.common.collect.Sets;
+import io.jsonwebtoken.SignatureAlgorithm;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import javax.crypto.SecretKey;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.ClientCnx;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.awaitility.Awaitility;
+import org.mockito.Mockito;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class ProxyRefreshAuthTest extends ProducerConsumerBase {
+    private final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+
+    private ProxyService proxyService;
+    private final ProxyConfiguration proxyConfig = new ProxyConfiguration();
+
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+
+        // enable tls and auth&auth at broker
+        conf.setAuthenticationEnabled(true);
+        conf.setAuthorizationEnabled(false);
+        conf.setTopicLevelPoliciesEnabled(false);
+        conf.setProxyRoles(Collections.singleton("Proxy"));
+        conf.setAdvertisedAddress(null);
+        conf.setAuthenticateOriginalAuthData(true);
+        conf.setBrokerServicePort(Optional.of(0));
+        conf.setWebServicePort(Optional.of(0));
+
+        Set<String> superUserRoles = new HashSet<>();
+        superUserRoles.add("superUser");
+        conf.setSuperUserRoles(superUserRoles);
+
+        conf.setAuthenticationProviders(Set.of(AuthenticationProviderToken.class.getName()));
+        Properties properties = new Properties();
+        properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(SECRET_KEY));
+        conf.setProperties(properties);
+
+        conf.setClusterName("proxy-authorization");
+        conf.setNumExecutorThreadPoolSize(5);
+
+        conf.setAuthenticationRefreshCheckSeconds(1);
+    }
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        super.init();
+
+        admin = PulsarAdmin.builder().serviceHttpUrl(pulsar.getWebServiceAddress())
+                .authentication(new AuthenticationToken(
+                        () -> AuthTokenUtils.createToken(SECRET_KEY, "client", Optional.empty()))).build();
+        String namespaceName = "my-tenant/my-ns";
+        admin.clusters().createCluster("proxy-authorization",
+                ClusterData.builder().serviceUrlTls(brokerUrlTls.toString()).build());
+        admin.tenants().createTenant("my-tenant",
+                new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("proxy-authorization")));
+        admin.namespaces().createNamespace(namespaceName);
+
+        // start proxy service
+        proxyConfig.setAuthenticationEnabled(true);
+        proxyConfig.setAuthorizationEnabled(false);
+        proxyConfig.setForwardAuthorizationCredentials(true);
+        proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
+        proxyConfig.setAdvertisedAddress(null);
+
+        proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
+        proxyConfig.setWebServicePort(Optional.of(0));
+
+        proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
+        proxyConfig.setBrokerClientAuthenticationParameters(
+                AuthTokenUtils.createToken(SECRET_KEY, "Proxy", Optional.empty()));
+        proxyConfig.setAuthenticationProviders(Set.of(AuthenticationProviderToken.class.getName()));
+        Properties properties = new Properties();
+        properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(SECRET_KEY));
+        proxyConfig.setProperties(properties);
+
+        proxyService = Mockito.spy(new ProxyService(proxyConfig,
+                new AuthenticationService(
+                        PulsarConfigurationLoader.convertFrom(proxyConfig))));
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+        proxyService.close();
+    }
+
+    private void startProxy(boolean forwardAuthData) throws Exception {
+        pulsar.getConfiguration().setAuthenticateOriginalAuthData(forwardAuthData);
+        proxyConfig.setForwardAuthorizationCredentials(forwardAuthData);
+        proxyService.start();
+    }
+
+    @DataProvider
+    Object[] forwardAuthDataProvider() {
+        return new Object[]{true, false};
+    }
+
+    @Test(dataProvider = "forwardAuthDataProvider")
+    public void testAuthDataRefresh(boolean forwardAuthData) throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        startProxy(forwardAuthData);
+
+        AuthenticationToken authenticationToken = new AuthenticationToken(() -> {
+            Calendar calendar = Calendar.getInstance();
+            calendar.add(Calendar.SECOND, 1);
+            return AuthTokenUtils.createToken(SECRET_KEY, "client", Optional.of(calendar.getTime()));
+        });
+
+        pulsarClient = PulsarClient.builder().serviceUrl(proxyService.getServiceUrl())
+                .authentication(authenticationToken)
+                .build();
+
+        String topic = "persistent://my-tenant/my-ns/my-topic1";
+        @Cleanup
+        Producer<byte[]> ignored = spy(pulsarClient.newProducer()
+                .topic(topic).create());
+
+        PulsarClientImpl pulsarClientImpl = (PulsarClientImpl) pulsarClient;
+        Set<CompletableFuture<ClientCnx>> connections = pulsarClientImpl.getCnxPool().getConnections();
+
+        Awaitility.await().during(4, SECONDS).untilAsserted(() -> {
+            pulsarClient.getPartitionsForTopic(topic).get();
+            assertTrue(connections.stream().allMatch(n -> {
+                try {
+                    ClientCnx clientCnx = n.get();
+                    long timestamp = clientCnx.getLastDisconnectedTimestamp();
+                    return timestamp == 0;
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }));
+        });
+    }
+}