You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zi...@apache.org on 2022/09/29 10:52:32 UTC
[pulsar] branch branch-2.11 updated: [fix][proxy] Fix refresh client auth (#17831)
This is an automated email from the ASF dual-hosted git repository.
zixuan pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 298e3789b3e [fix][proxy] Fix refresh client auth (#17831)
298e3789b3e is described below
commit 298e3789b3ee6761b3273d172c22d5a2c61f8134
Author: Zixuan Liu <no...@gmail.com>
AuthorDate: Thu Sep 29 18:48:38 2022 +0800
[fix][proxy] Fix refresh client auth (#17831)
* [fix][proxy] Fix refresh client auth
Signed-off-by: Zixuan Liu <no...@gmail.com>
* Fix style
Signed-off-by: Zixuan Liu <no...@gmail.com>
(cherry picked from commit c952f3c9f891f85ff4b6cee6e28b6f68db3b5bcd)
---
.../org/apache/pulsar/client/impl/ClientCnx.java | 17 +-
.../apache/pulsar/client/impl/ConnectionPool.java | 9 +-
pulsar-proxy/pom.xml | 5 +
.../apache/pulsar/proxy/server/ProxyClientCnx.java | 77 +++++++--
.../pulsar/proxy/server/ProxyConnection.java | 73 ++++++--
.../pulsar/proxy/server/ProxyRefreshAuthTest.java | 186 +++++++++++++++++++++
6 files changed, 338 insertions(+), 29 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index eb39fe53f1a..a6b9005611c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -104,7 +104,7 @@ import org.slf4j.LoggerFactory;
public class ClientCnx extends PulsarHandler {
protected final Authentication authentication;
- private State state;
+ protected State state;
@Getter
private final ConcurrentLongHashMap<TimedCompletableFuture<? extends Object>> pendingRequests =
@@ -155,7 +155,7 @@ public class ClientCnx extends PulsarHandler {
private final int maxNumberOfRejectedRequestPerConnection;
private final int rejectedRequestResetTimeSec = 60;
- private final int protocolVersion;
+ protected final int protocolVersion;
private final long operationTimeoutMs;
protected String proxyToTargetBrokerAddress = null;
@@ -176,7 +176,10 @@ public class ClientCnx extends PulsarHandler {
@Getter
private final ClientCnxIdleState idleState;
- enum State {
+ @Getter
+ private long lastDisconnectedTimestamp;
+
+ protected enum State {
None, SentConnectFrame, Ready, Failed, Connecting
}
@@ -281,6 +284,7 @@ public class ClientCnx extends PulsarHandler {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
+ lastDisconnectedTimestamp = System.currentTimeMillis();
log.info("{} Disconnected", ctx.channel());
if (!connectionFuture.isDone()) {
connectionFuture.completeExceptionally(new PulsarClientException("Connection already closed"));
@@ -1243,6 +1247,13 @@ public class ClientCnx extends PulsarHandler {
}
}
+ protected void closeWithException(Throwable e) {
+ if (ctx != null) {
+ connectionFuture.completeExceptionally(e);
+ ctx.close();
+ }
+ }
+
private void checkRequestTimeout() {
while (!requestTimeoutQueue.isEmpty()) {
RequestTime request = requestTimeoutQueue.peek();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index d85d853c4aa..36f28fb5756 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -35,16 +35,19 @@ import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL;
@@ -453,5 +456,9 @@ public class ConnectionPool implements AutoCloseable {
// Do release idle connections.
releaseIdleConnectionTaskList.forEach(Runnable::run);
}
-}
+ public Set<CompletableFuture<ClientCnx>> getConnections() {
+ return Collections.unmodifiableSet(
+ pool.values().stream().flatMap(n -> n.values().stream()).collect(Collectors.toSet()));
+ }
+}
diff --git a/pulsar-proxy/pom.xml b/pulsar-proxy/pom.xml
index fc33470eff0..ee493de91a3 100644
--- a/pulsar-proxy/pom.xml
+++ b/pulsar-proxy/pom.xml
@@ -174,6 +174,11 @@
<artifactId>ipaddress</artifactId>
<version>${seancfoley.ipaddress.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
index 50a77d33683..283b835fff5 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
@@ -18,30 +18,35 @@
*/
package org.apache.pulsar.proxy.server;
+import static com.google.common.base.Preconditions.checkArgument;
import io.netty.buffer.ByteBuf;
import io.netty.channel.EventLoopGroup;
+import java.util.Arrays;
+import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.api.AuthData;
+import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
import org.apache.pulsar.common.protocol.Commands;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+@Slf4j
public class ProxyClientCnx extends ClientCnx {
-
- String clientAuthRole;
- AuthData clientAuthData;
- String clientAuthMethod;
- int protocolVersion;
+ private final boolean forwardClientAuthData;
+ private final String clientAuthMethod;
+ private final String clientAuthRole;
+ private final AuthData clientAuthData;
+ private final ProxyConnection proxyConnection;
public ProxyClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, String clientAuthRole,
- AuthData clientAuthData, String clientAuthMethod, int protocolVersion) {
- super(conf, eventLoopGroup);
+ AuthData clientAuthData, String clientAuthMethod, int protocolVersion,
+ boolean forwardClientAuthData, ProxyConnection proxyConnection) {
+ super(conf, eventLoopGroup, protocolVersion);
this.clientAuthRole = clientAuthRole;
this.clientAuthData = clientAuthData;
this.clientAuthMethod = clientAuthMethod;
- this.protocolVersion = protocolVersion;
+ this.forwardClientAuthData = forwardClientAuthData;
+ this.proxyConnection = proxyConnection;
}
@Override
@@ -54,10 +59,54 @@ public class ProxyClientCnx extends ClientCnx {
authenticationDataProvider = authentication.getAuthData(remoteHostName);
AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
- return Commands.newConnect(authentication.getAuthMethodName(), authData, this.protocolVersion,
- PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData,
- clientAuthMethod);
+ return Commands.newConnect(authentication.getAuthMethodName(), authData, protocolVersion,
+ PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData,
+ clientAuthMethod);
}
- private static final Logger log = LoggerFactory.getLogger(ProxyClientCnx.class);
+ @Override
+ protected void handleAuthChallenge(CommandAuthChallenge authChallenge) {
+ checkArgument(authChallenge.hasChallenge());
+ checkArgument(authChallenge.getChallenge().hasAuthData());
+
+ boolean isRefresh = Arrays.equals(AuthData.REFRESH_AUTH_DATA_BYTES, authChallenge.getChallenge().getAuthData());
+ if (!forwardClientAuthData || !isRefresh) {
+ super.handleAuthChallenge(authChallenge);
+ return;
+ }
+
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug("Proxy {} request to refresh the original client authentication data for "
+ + "the proxy client {}", proxyConnection.ctx().channel(), ctx.channel());
+ }
+
+ proxyConnection.ctx().writeAndFlush(Commands.newAuthChallenge(clientAuthMethod, AuthData.REFRESH_AUTH_DATA,
+ protocolVersion))
+ .addListener(writeFuture -> {
+ if (writeFuture.isSuccess()) {
+ if (log.isDebugEnabled()) {
+ log.debug("Proxy {} sent the auth challenge to original client to refresh credentials "
+ + "with method {} for the proxy client {}",
+ proxyConnection.ctx().channel(), clientAuthMethod, ctx.channel());
+ }
+ } else {
+ log.error("Failed to send the auth challenge to original client by the proxy {} "
+ + "for the proxy client {}",
+ proxyConnection.ctx().channel(),
+ ctx.channel(),
+ writeFuture.cause());
+ closeWithException(writeFuture.cause());
+ }
+ });
+
+ if (state == State.SentConnectFrame) {
+ state = State.Connecting;
+ }
+ } catch (Exception e) {
+ log.error("Failed to send the auth challenge to origin client by the proxy {} for the proxy client {}",
+ proxyConnection.ctx().channel(), ctx.channel(), e);
+ closeWithException(e);
+ }
+ }
}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index fe62b606314..30d36446ccd 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -45,6 +45,7 @@ import java.util.function.Supplier;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
import lombok.Getter;
+import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
@@ -307,12 +308,11 @@ public class ProxyConnection extends PulsarHandler {
this.clientAuthData = clientData;
this.clientAuthMethod = authMethod;
}
- clientCnxSupplier =
- () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, clientAuthData,
- clientAuthMethod, protocolVersionToAdvertise);
+ clientCnxSupplier = () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole,
+ clientAuthData, clientAuthMethod, protocolVersionToAdvertise,
+ service.getConfiguration().isForwardAuthorizationCredentials(), this);
} else {
- clientCnxSupplier =
- () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersionToAdvertise);
+ clientCnxSupplier = () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersionToAdvertise);
}
if (this.connectionPool == null) {
@@ -414,16 +414,22 @@ public class ProxyConnection extends PulsarHandler {
}
// According to auth result, send newConnected or newAuthChallenge command.
- private void doAuthentication(AuthData clientData) throws Exception {
+ private void doAuthentication(AuthData clientData)
+ throws Exception {
AuthData brokerData = authState.authenticate(clientData);
// authentication has completed, will send newConnected command.
if (authState.isComplete()) {
clientAuthRole = authState.getAuthRole();
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Client successfully authenticated with {} role {}",
- remoteAddress, authMethod, clientAuthRole);
+ remoteAddress, authMethod, clientAuthRole);
+ }
+
+ // First connection
+ if (this.connectionPool == null || state == State.Connecting) {
+ // authentication has completed, will send newConnected command.
+ completeConnect(clientData);
}
- completeConnect(clientData);
return;
}
@@ -432,7 +438,7 @@ public class ProxyConnection extends PulsarHandler {
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Authentication in progress client by method {}.",
- remoteAddress, authMethod);
+ remoteAddress, authMethod);
}
state = State.Connecting;
}
@@ -514,18 +520,63 @@ public class ProxyConnection extends PulsarHandler {
@Override
protected void handleAuthResponse(CommandAuthResponse authResponse) {
- checkArgument(state == State.Connecting);
checkArgument(authResponse.hasResponse());
checkArgument(authResponse.getResponse().hasAuthData() && authResponse.getResponse().hasAuthMethodName());
if (LOG.isDebugEnabled()) {
LOG.debug("Received AuthResponse from {}, auth method: {}",
- remoteAddress, authResponse.getResponse().getAuthMethodName());
+ remoteAddress, authResponse.getResponse().getAuthMethodName());
}
try {
AuthData clientData = AuthData.of(authResponse.getResponse().getAuthData());
doAuthentication(clientData);
+ if (service.getConfiguration().isForwardAuthorizationCredentials()
+ && connectionPool != null && state == State.ProxyLookupRequests) {
+ connectionPool.getConnections().forEach(toBrokerCnxFuture -> {
+ String clientVersion;
+ if (authResponse.hasClientVersion()) {
+ clientVersion = authResponse.getClientVersion();
+ } else {
+ clientVersion = PulsarVersion.getVersion();
+ }
+ int protocolVersion;
+ if (authResponse.hasProtocolVersion()) {
+ protocolVersion = authResponse.getProtocolVersion();
+ } else {
+ protocolVersion = Commands.getCurrentProtocolVersion();
+ }
+
+ ByteBuf cmd =
+ Commands.newAuthResponse(clientAuthMethod, clientData, protocolVersion, clientVersion);
+ toBrokerCnxFuture.thenAccept(toBrokerCnx -> toBrokerCnx.ctx().writeAndFlush(cmd)
+ .addListener(writeFuture -> {
+ if (writeFuture.isSuccess()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} authentication is refreshed successfully by {}, "
+ + "auth method: {} ",
+ toBrokerCnx.ctx().channel(), ctx.channel(), clientAuthMethod);
+ }
+ } else {
+ LOG.error("Failed to forward the auth response "
+ + "from the proxy to the broker through the proxy client, "
+ + "proxy: {}, proxy client: {}",
+ ctx.channel(),
+ toBrokerCnx.ctx().channel(),
+ writeFuture.cause());
+ toBrokerCnx.ctx().channel().pipeline()
+ .fireExceptionCaught(writeFuture.cause());
+ }
+ }))
+ .whenComplete((__, ex) -> {
+ if (ex != null) {
+ LOG.error("Failed to forward the auth response from the proxy to "
+ + "the broker through the proxy client, proxy: {}",
+ ctx().channel(), ex);
+ }
+ });
+ });
+ }
} catch (Exception e) {
String msg = "Unable to handleAuthResponse";
LOG.warn("[{}] {} ", remoteAddress, msg, e);
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
new file mode 100644
index 00000000000..9ccb067adbf
--- /dev/null
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertTrue;
+import com.google.common.collect.Sets;
+import io.jsonwebtoken.SignatureAlgorithm;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import javax.crypto.SecretKey;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.ClientCnx;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.awaitility.Awaitility;
+import org.mockito.Mockito;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class ProxyRefreshAuthTest extends ProducerConsumerBase {
+ private final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+
+ private ProxyService proxyService;
+ private final ProxyConfiguration proxyConfig = new ProxyConfiguration();
+
+ @Override
+ protected void doInitConf() throws Exception {
+ super.doInitConf();
+
+ // enable tls and auth&auth at broker
+ conf.setAuthenticationEnabled(true);
+ conf.setAuthorizationEnabled(false);
+ conf.setTopicLevelPoliciesEnabled(false);
+ conf.setProxyRoles(Collections.singleton("Proxy"));
+ conf.setAdvertisedAddress(null);
+ conf.setAuthenticateOriginalAuthData(true);
+ conf.setBrokerServicePort(Optional.of(0));
+ conf.setWebServicePort(Optional.of(0));
+
+ Set<String> superUserRoles = new HashSet<>();
+ superUserRoles.add("superUser");
+ conf.setSuperUserRoles(superUserRoles);
+
+ conf.setAuthenticationProviders(Set.of(AuthenticationProviderToken.class.getName()));
+ Properties properties = new Properties();
+ properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(SECRET_KEY));
+ conf.setProperties(properties);
+
+ conf.setClusterName("proxy-authorization");
+ conf.setNumExecutorThreadPoolSize(5);
+
+ conf.setAuthenticationRefreshCheckSeconds(1);
+ }
+
+ @BeforeClass
+ @Override
+ protected void setup() throws Exception {
+ super.init();
+
+ admin = PulsarAdmin.builder().serviceHttpUrl(pulsar.getWebServiceAddress())
+ .authentication(new AuthenticationToken(
+ () -> AuthTokenUtils.createToken(SECRET_KEY, "client", Optional.empty()))).build();
+ String namespaceName = "my-tenant/my-ns";
+ admin.clusters().createCluster("proxy-authorization",
+ ClusterData.builder().serviceUrlTls(brokerUrlTls.toString()).build());
+ admin.tenants().createTenant("my-tenant",
+ new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("proxy-authorization")));
+ admin.namespaces().createNamespace(namespaceName);
+
+ // start proxy service
+ proxyConfig.setAuthenticationEnabled(true);
+ proxyConfig.setAuthorizationEnabled(false);
+ proxyConfig.setForwardAuthorizationCredentials(true);
+ proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
+ proxyConfig.setAdvertisedAddress(null);
+
+ proxyConfig.setServicePort(Optional.of(0));
+ proxyConfig.setBrokerProxyAllowedTargetPorts("*");
+ proxyConfig.setWebServicePort(Optional.of(0));
+
+ proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
+ proxyConfig.setBrokerClientAuthenticationParameters(
+ AuthTokenUtils.createToken(SECRET_KEY, "Proxy", Optional.empty()));
+ proxyConfig.setAuthenticationProviders(Set.of(AuthenticationProviderToken.class.getName()));
+ Properties properties = new Properties();
+ properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(SECRET_KEY));
+ proxyConfig.setProperties(properties);
+
+ proxyService = Mockito.spy(new ProxyService(proxyConfig,
+ new AuthenticationService(
+ PulsarConfigurationLoader.convertFrom(proxyConfig))));
+ }
+
+ @AfterClass(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ proxyService.close();
+ }
+
+ private void startProxy(boolean forwardAuthData) throws Exception {
+ pulsar.getConfiguration().setAuthenticateOriginalAuthData(forwardAuthData);
+ proxyConfig.setForwardAuthorizationCredentials(forwardAuthData);
+ proxyService.start();
+ }
+
+ @DataProvider
+ Object[] forwardAuthDataProvider() {
+ return new Object[]{true, false};
+ }
+
+ @Test(dataProvider = "forwardAuthDataProvider")
+ public void testAuthDataRefresh(boolean forwardAuthData) throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ startProxy(forwardAuthData);
+
+ AuthenticationToken authenticationToken = new AuthenticationToken(() -> {
+ Calendar calendar = Calendar.getInstance();
+ calendar.add(Calendar.SECOND, 1);
+ return AuthTokenUtils.createToken(SECRET_KEY, "client", Optional.of(calendar.getTime()));
+ });
+
+ pulsarClient = PulsarClient.builder().serviceUrl(proxyService.getServiceUrl())
+ .authentication(authenticationToken)
+ .build();
+
+ String topic = "persistent://my-tenant/my-ns/my-topic1";
+ @Cleanup
+ Producer<byte[]> ignored = spy(pulsarClient.newProducer()
+ .topic(topic).create());
+
+ PulsarClientImpl pulsarClientImpl = (PulsarClientImpl) pulsarClient;
+ Set<CompletableFuture<ClientCnx>> connections = pulsarClientImpl.getCnxPool().getConnections();
+
+ Awaitility.await().during(4, SECONDS).untilAsserted(() -> {
+ pulsarClient.getPartitionsForTopic(topic).get();
+ assertTrue(connections.stream().allMatch(n -> {
+ try {
+ ClientCnx clientCnx = n.get();
+ long timestamp = clientCnx.getLastDisconnectedTimestamp();
+ return timestamp == 0;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }));
+ });
+ }
+}