You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/01/26 12:03:23 UTC

[pulsar] branch branch-2.7 updated (307c8fe -> 0e6ca07)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


 discard 307c8fe  Add refresh authentication command in broker (#9064)
     new 0e6ca07  Add refresh authentication command in broker (#9064)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (307c8fe)
            \
             N -- N -- N   refs/heads/branch-2.7 (0e6ca07)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)


[pulsar] 01/01: Add refresh authentication command in broker (#9064)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0e6ca076777697fff5e044ea15252727f594107a
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Tue Jan 26 18:11:04 2021 +0800

    Add refresh authentication command in broker (#9064)
    
    * Add refresh authentication command in the broker
    ---
    
    **Motivation**
    
    Some authentication provider is using cached authentication data. Until
    redoing the 'getAuthData' then it will provide the new auth data. So
    I add a refresh command and do the refresh authentication data provider
    when received the command.
    
    **Verify this change**
    
    Existent tests can pass.
    
    (cherry picked from commit 373948ee1df6294ef43dafe457a86e53cd86ab32)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  3 -
 .../authentication/SaslAuthenticateTest.java       |  2 +-
 .../broker/authentication/AuthenticationState.java |  4 +-
 .../AuthenticationProviderTokenTest.java           |  2 +-
 .../client/api/MutualAuthenticationTest.java       |  2 +-
 ...kenOauth2AuthenticatedProducerConsumerTest.java | 76 ++++++++++++++++++++++
 .../org/apache/pulsar/common/api/AuthData.java     |  5 +-
 .../client/impl/auth/AuthenticationSasl.java       |  2 +-
 .../impl/auth/SaslAuthenticationDataProvider.java  |  2 +-
 .../org/apache/pulsar/client/impl/ClientCnx.java   | 14 +++-
 .../worker/rest/api/FunctionsImplTest.java         |  1 +
 .../pulsar/proxy/server/DirectProxyHandler.java    | 13 +++-
 .../apache/pulsar/proxy/server/ProxyClientCnx.java |  2 +-
 .../token/PulsarTokenAuthenticationBaseSuite.java  | 11 +++-
 14 files changed, 122 insertions(+), 17 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 10c54bb..0434fab 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -3119,9 +3119,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             mlInfo.setTerminatedPosition(NestedPositionInfo.newBuilder().setLedgerId(lastConfirmedEntry.getLedgerId())
                     .setEntryId(lastConfirmedEntry.getEntryId()));
         }
-        if (managedLedgerInterceptor != null) {
-            managedLedgerInterceptor.onUpdateManagedLedgerInfo(propertiesMap);
-        }
         for (Map.Entry<String, String> property : propertiesMap.entrySet()) {
             mlInfo.addProperties(MLDataFormats.KeyValue.newBuilder()
                     .setKey(property.getKey()).setValue(property.getValue()));
diff --git a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
index e94d2ae..bb20077 100644
--- a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
+++ b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
@@ -260,7 +260,7 @@ public class SaslAuthenticateTest extends ProducerConsumerBase {
 
         // auth between server and client.
         // first time auth
-        AuthData initData1 = dataProvider.authenticate(AuthData.of(AuthData.INIT_AUTH_DATA));
+        AuthData initData1 = dataProvider.authenticate(AuthData.INIT_AUTH_DATA);
         AuthData serverData1 = authState.authenticate(initData1);
         boolean complete = authState.isComplete();
         assertFalse(complete);
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationState.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationState.java
index ac881ac..f51e818 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationState.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationState.java
@@ -68,7 +68,7 @@ public interface AuthenticationState {
 
     /**
      * If the authentication state supports refreshing and the credentials are expired,
-     * the auth provider will call this method ot initiate the refresh process.
+     * the auth provider will call this method to initiate the refresh process.
      * <p>
      * The auth state here will return the broker side data that will be used to send
      * a challenge to the client.
@@ -77,6 +77,6 @@ public interface AuthenticationState {
      * @throws AuthenticationException
      */
     default AuthData refreshAuthentication() throws AuthenticationException {
-        return null;
+        return AuthData.REFRESH_AUTH_DATA;
     }
 }
diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTokenTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTokenTest.java
index 60fa5f6..c2610aa 100644
--- a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTokenTest.java
+++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTokenTest.java
@@ -634,7 +634,7 @@ public class AuthenticationProviderTokenTest {
         assertTrue(authState.isComplete());
 
         AuthData brokerData = authState.refreshAuthentication();
-        assertNull(brokerData);
+        assertEquals(brokerData, AuthData.REFRESH_AUTH_DATA);
     }
 
     // tests for Token Audience
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
index ef06323..458c4ae 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
@@ -70,7 +70,7 @@ public class MutualAuthenticationTest extends ProducerConsumerBase {
             String dataString = new String(data.getBytes(), UTF_8);
             AuthData toSend;
 
-            if (Arrays.equals(dataString.getBytes(), AuthData.INIT_AUTH_DATA)) {
+            if (Arrays.equals(dataString.getBytes(), AuthData.INIT_AUTH_DATA_BYTES)) {
                 toSend = AuthData.of(clientAuthStrings[0].getBytes(UTF_8));
             } else if (Arrays.equals(dataString.getBytes(), serverAuthStrings[0].getBytes(UTF_8))) {
                 toSend = AuthData.of(clientAuthStrings[1].getBytes(UTF_8));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
index cfb80dc..b4e7bb0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
@@ -25,15 +25,19 @@ import java.net.URI;
 import java.net.URL;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.time.Duration;
 import java.util.HashSet;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationFactoryOAuth2;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.awaitility.Awaitility;
+import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterMethod;
@@ -62,6 +66,7 @@ public class TokenOauth2AuthenticatedProducerConsumerTest extends ProducerConsum
     protected void setup() throws Exception {
         conf.setAuthenticationEnabled(true);
         conf.setAuthorizationEnabled(true);
+        conf.setAuthenticationRefreshCheckSeconds(5);
 
         Set<String> superUserRoles = new HashSet<>();
         superUserRoles.add(ADMIN_ROLE);
@@ -157,4 +162,75 @@ public class TokenOauth2AuthenticatedProducerConsumerTest extends ProducerConsum
         log.info("-- Exiting {} test --", methodName);
     }
 
+    @Test
+    public void testOAuth2TokenRefreshedWithoutReconnect() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+        clientSetup();
+
+        // test rest by admin
+        admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString()));
+        admin.tenants().createTenant("my-property",
+            new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
+        admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test"));
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic")
+            .subscriptionName("my-subscriber-name").subscribe();
+
+        ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic");
+        Producer<byte[]> producer = producerBuilder.create();
+        for (int i = 0; i < 10; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+
+        Message<byte[]> msg = null;
+        Set<String> messageSet = Sets.newHashSet();
+        for (int i = 0; i < 10; i++) {
+            msg = consumer.receive(5, TimeUnit.SECONDS);
+            String receivedMessage = new String(msg.getData());
+            log.debug("Received message: [{}]", receivedMessage);
+            String expectedMessage = "my-message-" + i;
+            testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
+        }
+        // Acknowledge the consumption of all messages at once
+        consumer.acknowledgeCumulative(msg);
+
+        // get the first connection stats
+        ProducerImpl producerImpl = (ProducerImpl) producer;
+        String accessTokenOld = producerImpl.getClientCnx().getAuthenticationDataProvider().getCommandData();
+        long lastDisconnectTime = producer.getLastDisconnectedTimestamp();
+
+        // the token expire duration is 10 seconds, so we need to wait for the authenticationData refreshed
+        Awaitility.await()
+            .atLeast(10, TimeUnit.SECONDS)
+            .atMost(20, TimeUnit.SECONDS)
+            .with()
+            .pollInterval(Duration.ofSeconds(1))
+            .untilAsserted(() -> {
+                String accessTokenNew = producerImpl.getClientCnx().getAuthenticationDataProvider().getCommandData();
+                Assert.assertNotEquals(accessTokenOld, accessTokenNew);
+            });
+
+        // get the lastDisconnectTime, it should be same with the before, because the connection shouldn't disconnect
+        long lastDisconnectTimeAfterTokenExpired = producer.getLastDisconnectedTimestamp();
+        Assert.assertEquals(lastDisconnectTime, lastDisconnectTimeAfterTokenExpired);
+
+        for (int i = 0; i < 10; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+
+        msg = null;
+        messageSet = Sets.newHashSet();
+        for (int i = 0; i < 10; i++) {
+            msg = consumer.receive(5, TimeUnit.SECONDS);
+            String receivedMessage = new String(msg.getData());
+            log.debug("Received message: [{}]", receivedMessage);
+            String expectedMessage = "my-message-" + i;
+            testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
+        }
+        // Acknowledge the consumption of all messages at once
+        consumer.acknowledgeCumulative(msg);
+        consumer.close();
+    }
 }
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/common/api/AuthData.java b/pulsar-client-api/src/main/java/org/apache/pulsar/common/api/AuthData.java
index f12c436..188cb27 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/common/api/AuthData.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/common/api/AuthData.java
@@ -28,7 +28,10 @@ import lombok.Data;
 @Data(staticConstructor = "of")
 public final class AuthData {
     // CHECKSTYLE.OFF: StaticVariableName
-    public static byte[] INIT_AUTH_DATA = "PulsarAuthInit".getBytes(UTF_8);
+    public static byte[] INIT_AUTH_DATA_BYTES = "PulsarAuthInit".getBytes(UTF_8);
+    public static byte[] REFRESH_AUTH_DATA_BYTES = "PulsarAuthRefresh".getBytes(UTF_8);
+    public static AuthData INIT_AUTH_DATA = AuthData.of(INIT_AUTH_DATA_BYTES);
+    public static AuthData REFRESH_AUTH_DATA = AuthData.of(REFRESH_AUTH_DATA_BYTES);
     // CHECKSTYLE.ON: StaticVariableName
 
     private final byte[] bytes;
diff --git a/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java
index ee850e1..54d1802 100644
--- a/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java
+++ b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java
@@ -261,7 +261,7 @@ public class AuthenticationSasl implements Authentication, EncodedAuthentication
             }
             // first time init
             headers.put(SASL_HEADER_STATE, SASL_STATE_CLIENT_INIT);
-            AuthData initData = authData.authenticate(AuthData.of(AuthData.INIT_AUTH_DATA));
+            AuthData initData = authData.authenticate(AuthData.INIT_AUTH_DATA);
             headers.put(SASL_AUTH_TOKEN,
                 Base64.getEncoder().encodeToString(initData.getBytes()));
         } else {
diff --git a/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/SaslAuthenticationDataProvider.java b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/SaslAuthenticationDataProvider.java
index 261e06c..daae735 100644
--- a/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/SaslAuthenticationDataProvider.java
+++ b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/SaslAuthenticationDataProvider.java
@@ -51,7 +51,7 @@ public class SaslAuthenticationDataProvider implements AuthenticationDataProvide
     @Override
     public AuthData authenticate(AuthData commandData) throws AuthenticationException {
         // init
-        if (Arrays.equals(commandData.getBytes(), AuthData.INIT_AUTH_DATA)) {
+        if (Arrays.equals(commandData.getBytes(), AuthData.INIT_AUTH_DATA_BYTES)) {
             if (pulsarSaslClient.hasInitialResponse()) {
                 return pulsarSaslClient.evaluateChallenge(AuthData.of(new byte[0]));
             }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index bbcba68..c551588 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -37,6 +37,7 @@ import io.netty.util.concurrent.Promise;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.nio.channels.ClosedChannelException;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
 import java.util.Queue;
@@ -138,6 +139,7 @@ public class ClientCnx extends PulsarHandler {
     private ScheduledFuture<?> timeoutTask;
 
     // Added for mutual authentication.
+    @Getter
     protected AuthenticationDataProvider authenticationDataProvider;
     private TransactionBufferHandler transactionBufferHandler;
 
@@ -226,7 +228,7 @@ public class ClientCnx extends PulsarHandler {
         // each channel will have a mutual client/server pair, mutual client evaluateChallenge with init data,
         // and return authData to server.
         authenticationDataProvider = authentication.getAuthData(remoteHostName);
-        AuthData authData = authenticationDataProvider.authenticate(AuthData.of(AuthData.INIT_AUTH_DATA));
+        AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
         return Commands.newConnect(authentication.getAuthMethodName(), authData, this.protocolVersion,
                 PulsarVersion.getVersion(), proxyToTargetBrokerAddress, null, null, null);
     }
@@ -318,6 +320,16 @@ public class ClientCnx extends PulsarHandler {
         checkArgument(authChallenge.hasChallenge());
         checkArgument(authChallenge.getChallenge().hasAuthData());
 
+        if (Arrays.equals(AuthData.REFRESH_AUTH_DATA_BYTES, authChallenge.getChallenge().getAuthData().toByteArray())) {
+            try {
+                authenticationDataProvider = authentication.getAuthData(remoteHostName);
+            } catch (PulsarClientException e) {
+                log.error("{} Error when refreshing authentication data provider: {}", ctx.channel(), e);
+                connectionFuture.completeExceptionally(e);
+                return;
+            }
+        }
+
         // mutual authn. If auth not complete, continue auth; if auth complete, complete connectionFuture.
         try {
             AuthData authData = authenticationDataProvider
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
index d1cf95b..e8618c2 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
@@ -42,6 +42,7 @@ import org.apache.pulsar.functions.utils.FunctionConfigUtils;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
 import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
 import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
+import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.WorkerUtils;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index 44d7ad5..3e3d668 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -46,6 +46,7 @@ import io.netty.util.concurrent.FutureListener;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.Arrays;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Supplier;
@@ -57,6 +58,7 @@ import lombok.Getter;
 import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.tls.TlsHostnameVerifier;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.api.AuthData;
@@ -222,7 +224,7 @@ public class DirectProxyHandler {
             this.ctx = ctx;
             // Send the Connect command to broker
             authenticationDataProvider = authentication.getAuthData(remoteHostName);
-            AuthData authData = authenticationDataProvider.authenticate(AuthData.of(AuthData.INIT_AUTH_DATA));
+            AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
             ByteBuf command = null;
             command = Commands.newConnect(authentication.getAuthMethodName(), authData, protocolVersion, "Pulsar proxy",
                     null /* target broker */, originalPrincipal, clientAuthData, clientAuthMethod);
@@ -262,6 +264,15 @@ public class DirectProxyHandler {
             checkArgument(authChallenge.hasChallenge());
             checkArgument(authChallenge.getChallenge().hasAuthData() && authChallenge.getChallenge().hasAuthData());
 
+            if (Arrays.equals(AuthData.REFRESH_AUTH_DATA_BYTES, authChallenge.getChallenge().getAuthData().toByteArray())) {
+                try {
+                    authenticationDataProvider = authentication.getAuthData(remoteHostName);
+                } catch (PulsarClientException e) {
+                    log.error("{} Error when refreshing authentication data provider: {}", ctx.channel(), e);
+                    return;
+                }
+            }
+
             // mutual authn. If auth not complete, continue auth; if auth complete, complete connectionFuture.
             try {
                 AuthData authData = authenticationDataProvider
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
index 0878370..665b9f8 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
@@ -54,7 +54,7 @@ public class ProxyClientCnx extends ClientCnx {
         }
 
         authenticationDataProvider = authentication.getAuthData(remoteHostName);
-        AuthData authData = authenticationDataProvider.authenticate(AuthData.of(AuthData.INIT_AUTH_DATA));
+        AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
         return Commands.newConnect(authentication.getAuthMethodName(), authData, this.protocolVersion,
             PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData,
             clientAuthMethod);
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java
index 7a923ee..eb96555 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java
@@ -35,6 +35,8 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.ProducerImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.auth.AuthenticationToken;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.TenantInfo;
@@ -288,6 +290,7 @@ public abstract class PulsarTokenAuthenticationBaseSuite extends PulsarClusterTe
         admin.namespaces().grantPermissionOnNamespace(namespace, REGULAR_USER_ROLE, EnumSet.allOf(AuthAction.class));
 
         String initialToken = this.createClientTokenWithExpiry(5, TimeUnit.SECONDS);
+        String refreshedToken = this.createClientTokenWithExpiry(30, TimeUnit.SECONDS);
 
         @Cleanup
         PulsarClient client = PulsarClient.builder()
@@ -295,7 +298,7 @@ public abstract class PulsarTokenAuthenticationBaseSuite extends PulsarClusterTe
                 .authentication(AuthenticationFactory.token(() -> {
                     if (shouldRefreshToken) {
                         try {
-                            return createClientTokenWithExpiry(5, TimeUnit.SECONDS);
+                            return refreshedToken;
                         } catch (Exception e) {
                             return null;
                         }
@@ -308,17 +311,19 @@ public abstract class PulsarTokenAuthenticationBaseSuite extends PulsarClusterTe
         @Cleanup
         Producer<String> producer = client.newProducer(Schema.STRING)
                 .topic(topic)
-                .sendTimeout(1, TimeUnit.SECONDS)
+                .sendTimeout(3, TimeUnit.SECONDS)
                 .create();
-
         // Initially the token is valid and producer will be able to publish
         producer.send("hello-1");
+        long lastDisconnectedTimestamp = producer.getLastDisconnectedTimestamp();
 
         Thread.sleep(TimeUnit.SECONDS.toMillis(10));
 
         if (shouldRefreshToken) {
             // The token will have been refreshed, so the app won't see any error
             producer.send("hello-2");
+            long timestamp = producer.getLastDisconnectedTimestamp();
+            assertEquals(timestamp, lastDisconnectedTimestamp);
         } else {
             // The token has expired, so this next message will be rejected
             try {