You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/19 09:51:22 UTC

[pulsar] 20/26: [improve][transaction] Optimize topic lookup when TC end tx. (#14991)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1df54b93be662228dc45f0180a938b1f22476e7a
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Tue Apr 12 18:53:03 2022 +0800

    [improve][transaction] Optimize topic lookup when TC end tx. (#14991)
    
    When TC ends tx, it has to look up the topic. The original way uses pulsar-client to do this. It will look up every topic.
    If using bundle cache to find the topic owner broker, it can avoid lookup every topic and then decrease the lookup time.
    
    - Using bundle cache to find the topic owner broker. If occurs an error, fall back to look up the topic.
    - Remove the topic cache, because using bundle cache, the original cache is useless.
    
    (cherry picked from commit 46baae6a5e89a5feed171d231b59042a2ce9e1f8)
---
 .../org/apache/pulsar/broker/PulsarService.java    |   2 +-
 .../buffer/impl/TransactionBufferClientImpl.java   |   9 +-
 .../buffer/impl/TransactionBufferHandlerImpl.java  | 155 ++++++++++-----------
 .../broker/transaction/TransactionProduceTest.java |  29 ++++
 .../broker/transaction/TransactionTestBase.java    |   2 +-
 .../buffer/TransactionBufferClientTest.java        |  37 +++--
 .../buffer/TransactionBufferHandlerImplTest.java   |  41 ++++--
 .../pulsar/client/impl/PulsarClientImpl.java       |  10 +-
 8 files changed, 175 insertions(+), 110 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 6451e518b88..60cd75501d0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -732,7 +732,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
                 this.transactionBufferSnapshotService = new SystemTopicBaseTxnBufferSnapshotService(getClient());
                 this.transactionTimer =
                         new HashedWheelTimer(new DefaultThreadFactory("pulsar-transaction-timer"));
-                transactionBufferClient = TransactionBufferClientImpl.create(getClient(), transactionTimer,
+                transactionBufferClient = TransactionBufferClientImpl.create(this, transactionTimer,
                         config.getTransactionBufferClientMaxConcurrentRequests(),
                         config.getTransactionBufferClientOperationTimeoutInMills());
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java
index 060476e573c..c531f9f1871 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java
@@ -21,7 +21,8 @@ package org.apache.pulsar.broker.transaction.buffer.impl;
 import io.netty.util.HashedWheelTimer;
 import java.util.concurrent.CompletableFuture;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler;
@@ -39,9 +40,9 @@ public class TransactionBufferClientImpl implements TransactionBufferClient {
         this.tbHandler = tbHandler;
     }
 
-    public static TransactionBufferClient create(PulsarClient pulsarClient, HashedWheelTimer timer,
-             int maxConcurrentRequests, long operationTimeoutInMills) {
-        TransactionBufferHandler handler = new TransactionBufferHandlerImpl(pulsarClient, timer,
+    public static TransactionBufferClient create(PulsarService pulsarService, HashedWheelTimer timer,
+        int maxConcurrentRequests, long operationTimeoutInMills) throws PulsarServerException {
+        TransactionBufferHandler handler = new TransactionBufferHandlerImpl(pulsarService, timer,
                 maxConcurrentRequests, operationTimeoutInMills);
         return new TransactionBufferClientImpl(handler);
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
index b80a273bc6f..3f9a083787b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
@@ -18,21 +18,23 @@
  */
 package org.apache.pulsar.broker.transaction.buffer.impl;
 
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
 import io.netty.buffer.ByteBuf;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Recycler;
 import io.netty.util.ReferenceCountUtil;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicLong;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
+import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.transaction.TransactionBufferClientException;
 import org.apache.pulsar.client.api.transaction.TxnID;
@@ -42,7 +44,10 @@ import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler;
 import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartitionResponse;
 import org.apache.pulsar.common.api.proto.CommandEndTxnOnSubscriptionResponse;
 import org.apache.pulsar.common.api.proto.TxnAction;
+import org.apache.pulsar.common.naming.NamespaceBundle;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
 
 @Slf4j
@@ -53,31 +58,17 @@ public class TransactionBufferHandlerImpl implements TransactionBufferHandler {
     private final AtomicLong requestIdGenerator = new AtomicLong();
     private final long operationTimeoutInMills;
     private final HashedWheelTimer timer;
-    private final PulsarClient pulsarClient;
+    private final PulsarService pulsarService;
+    private final PulsarClientImpl pulsarClient;
 
     private static final AtomicIntegerFieldUpdater<TransactionBufferHandlerImpl> REQUEST_CREDITS_UPDATER =
             AtomicIntegerFieldUpdater.newUpdater(TransactionBufferHandlerImpl.class, "requestCredits");
     private volatile int requestCredits;
 
-    private final LoadingCache<String, CompletableFuture<ClientCnx>> lookupCache = CacheBuilder.newBuilder()
-            .maximumSize(100000)
-            .expireAfterAccess(30, TimeUnit.MINUTES)
-            .build(new CacheLoader<String, CompletableFuture<ClientCnx>>() {
-                @Override
-                public CompletableFuture<ClientCnx> load(String topic) {
-                    CompletableFuture<ClientCnx> siFuture = getClientCnx(topic);
-                    siFuture.whenComplete((si, cause) -> {
-                        if (null != cause) {
-                            lookupCache.invalidate(topic);
-                        }
-                    });
-                    return siFuture;
-                }
-            });
-
-    public TransactionBufferHandlerImpl(PulsarClient pulsarClient, HashedWheelTimer timer,
-                                        int maxConcurrentRequests, long operationTimeoutInMills) {
-        this.pulsarClient = pulsarClient;
+    public TransactionBufferHandlerImpl(PulsarService pulsarService, HashedWheelTimer timer,
+        int maxConcurrentRequests, long operationTimeoutInMills) throws PulsarServerException {
+        this.pulsarService = pulsarService;
+        this.pulsarClient = (PulsarClientImpl) pulsarService.getClient();
         this.outstandingRequests = new ConcurrentSkipListMap<>();
         this.pendingRequests = new GrowableArrayBlockingQueue<>();
         this.operationTimeoutInMills = operationTimeoutInMills;
@@ -97,15 +88,9 @@ public class TransactionBufferHandlerImpl implements TransactionBufferHandler {
         ByteBuf cmd = Commands.newEndTxnOnPartition(requestId, txnIdLeastBits, txnIdMostBits,
                 topic, action, lowWaterMark);
 
-        try {
-            OpRequestSend op = OpRequestSend.create(requestId, topic, cmd, cb, lookupCache.get(topic));
-            if (checkRequestCredits(op)) {
-                endTxn(op);
-            }
-        } catch (ExecutionException e) {
-            log.error("[{}] failed to get client cnx from lookup cache", topic, e);
-            lookupCache.invalidate(topic);
-            cb.completeExceptionally(new PulsarClientException.LookupException(e.getCause().getMessage()));
+        OpRequestSend op = OpRequestSend.create(requestId, topic, cmd, cb, getClientCnx(topic));
+        if (checkRequestCredits(op)) {
+            endTxn(op);
         }
         return cb;
     }
@@ -122,15 +107,9 @@ public class TransactionBufferHandlerImpl implements TransactionBufferHandler {
         long requestId = requestIdGenerator.getAndIncrement();
         ByteBuf cmd = Commands.newEndTxnOnSubscription(requestId, txnIdLeastBits, txnIdMostBits,
                 topic, subscription, action, lowWaterMark);
-        try {
-            OpRequestSend op = OpRequestSend.create(requestId, topic, cmd, cb, lookupCache.get(topic));
-            if (checkRequestCredits(op)) {
-                endTxn(op);
-            }
-        } catch (ExecutionException e) {
-            log.error("[{}] failed to get client cnx from lookup cache", topic, e);
-            lookupCache.invalidate(topic);
-            cb.completeExceptionally(new PulsarClientException.LookupException(e.getCause().getMessage()));
+        OpRequestSend op = OpRequestSend.create(requestId, topic, cmd, cb, getClientCnx(topic));
+        if (checkRequestCredits(op)) {
+            endTxn(op);
         }
         return cb;
     }
@@ -150,8 +129,8 @@ public class TransactionBufferHandlerImpl implements TransactionBufferHandler {
     }
 
     public void endTxn(OpRequestSend op) {
-        op.cnx.whenComplete((clientCnx, throwable) -> {
-            if (throwable == null) {
+        op.cnx.whenComplete((clientCnx, ex) -> {
+            if (ex == null) {
                 if (clientCnx.ctx().channel().isActive()) {
                     clientCnx.registerTransactionBufferHandler(TransactionBufferHandlerImpl.this);
                     outstandingRequests.put(op.requestId, op);
@@ -166,16 +145,19 @@ public class TransactionBufferHandlerImpl implements TransactionBufferHandler {
                     op.cmd.retain();
                     clientCnx.ctx().writeAndFlush(op.cmd, clientCnx.ctx().voidPromise());
                 } else {
-                    invalidateLookupCache(op);
                     op.cb.completeExceptionally(
                             new PulsarClientException.LookupException(op.topic + " endTxn channel is not active"));
                     onResponse(op);
                 }
             } else {
-                log.error("endTxn error topic: [{}]", op.topic, throwable);
-                invalidateLookupCache(op);
-                op.cb.completeExceptionally(
-                        new PulsarClientException.LookupException(throwable.getMessage()));
+                Throwable cause = FutureUtil.unwrapCompletionException(ex);
+                log.error("endTxn error topic: [{}]", op.topic, cause);
+                if (cause instanceof PulsarClientException.BrokerMetadataException) {
+                    op.cb.complete(null);
+                } else {
+                    op.cb.completeExceptionally(
+                            new PulsarClientException.LookupException(cause.getMessage()));
+                }
                 onResponse(op);
             }
         });
@@ -202,12 +184,9 @@ public class TransactionBufferHandlerImpl implements TransactionBufferHandler {
                 log.error("[{}] Got end txn on topic response for request {} error {}", op.topic,
                         response.getRequestId(),
                         response.getError());
-                invalidateLookupCache(op);
                 op.cb.completeExceptionally(ClientCnx.getPulsarClientException(response.getError(),
                         response.getMessage()));
             }
-        } catch (Exception e) {
-            log.error("[{}] Got exception when complete EndTxnOnTopic op for request {}", op.topic, e);
         } finally {
             onResponse(op);
         }
@@ -235,12 +214,9 @@ public class TransactionBufferHandlerImpl implements TransactionBufferHandler {
             } else {
                 log.error("[{}] Got end txn on subscription response for request {} error {}",
                         op.topic, response.getRequestId(), response.getError());
-                invalidateLookupCache(op);
                 op.cb.completeExceptionally(ClientCnx.getPulsarClientException(response.getError(),
                         response.getMessage()));
             }
-        } catch (Exception e) {
-            log.error("[{}] Got exception when complete EndTxnOnSub op for request {}", op.topic, e);
         } finally {
             onResponse(op);
         }
@@ -262,21 +238,14 @@ public class TransactionBufferHandlerImpl implements TransactionBufferHandler {
                 if (REQUEST_CREDITS_UPDATER.compareAndSet(this, permits, permits - 1)) {
                     OpRequestSend polled = pendingRequests.poll();
                     if (polled != null) {
-                        try {
-                            if (polled.cnx != lookupCache.get(polled.topic)) {
-                                OpRequestSend invalid = polled;
-                                polled = OpRequestSend.create(invalid.requestId, invalid.topic, invalid.cmd, invalid.cb,
-                                        lookupCache.get(invalid.topic));
-                                invalid.recycle();
-                            }
-                            endTxn(polled);
-                        } catch (ExecutionException e) {
-                            log.error("[{}] failed to get client cnx from lookup cache", polled.topic, e);
-                            lookupCache.invalidate(polled.topic);
-                            polled.cb.completeExceptionally(new PulsarClientException.LookupException(
-                                    e.getCause().getMessage()));
-                            REQUEST_CREDITS_UPDATER.incrementAndGet(this);
+                        CompletableFuture<ClientCnx> clientCnx = getClientCnx(polled.topic);
+                        if (polled.cnx != clientCnx) {
+                            OpRequestSend invalid = polled;
+                            polled = OpRequestSend.create(invalid.requestId, invalid.topic, invalid.cmd, invalid.cb,
+                                    clientCnx);
+                            invalid.recycle();
                         }
+                        endTxn(polled);
                     } else {
                         REQUEST_CREDITS_UPDATER.incrementAndGet(this);
                     }
@@ -287,16 +256,6 @@ public class TransactionBufferHandlerImpl implements TransactionBufferHandler {
         }
     }
 
-    private void invalidateLookupCache(OpRequestSend op) {
-        try {
-            if (lookupCache.get(op.topic) == op.cnx) {
-                lookupCache.invalidate(op.topic);
-            }
-        } catch (ExecutionException e) {
-            lookupCache.invalidate(op.topic);
-        }
-    }
-
     public static final class OpRequestSend {
 
         long requestId;
@@ -336,8 +295,42 @@ public class TransactionBufferHandlerImpl implements TransactionBufferHandler {
         };
     }
 
+    public CompletableFuture<ClientCnx> getClientCnxWithLookup(String topic) {
+        return pulsarClient.getConnection(topic);
+    }
+
     public CompletableFuture<ClientCnx> getClientCnx(String topic) {
-        return ((PulsarClientImpl) pulsarClient).getConnection(topic);
+        NamespaceService namespaceService = pulsarService.getNamespaceService();
+        CompletableFuture<NamespaceBundle> nsBundle = namespaceService.getBundleAsync(TopicName.get(topic));
+        return nsBundle
+                .thenCompose(bundle -> namespaceService.getOwnerAsync(bundle))
+                .thenCompose(data -> {
+                    if (data.isPresent()) {
+                        NamespaceEphemeralData ephemeralData = data.get();
+                        try {
+                            if (!ephemeralData.isDisabled()) {
+                                URI uri;
+                                if (pulsarClient.getConfiguration().isUseTls()) {
+                                    uri = new URI(ephemeralData.getNativeUrlTls());
+                                } else {
+                                    uri = new URI(ephemeralData.getNativeUrl());
+                                }
+                                InetSocketAddress brokerAddress =
+                                        InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort());
+                                return pulsarClient.getConnection(brokerAddress, brokerAddress);
+                            } else {
+                                // Bundle is unloading, lookup topic
+                                return getClientCnxWithLookup(topic);
+                            }
+                        } catch (URISyntaxException e) {
+                            // Should never go here
+                            return getClientCnxWithLookup(topic);
+                        }
+                    } else {
+                        // Bundle is not loaded yet, lookup topic
+                        return getClientCnxWithLookup(topic);
+                    }
+                });
     }
 
     @Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
index cbae03b1a8b..350dfa8c3d0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
@@ -89,6 +89,35 @@ public class TransactionProduceTest extends TransactionTestBase {
         produceTest(true);
     }
 
+    @Test
+    public void testDeleteNamespaceBeforeCommit() throws Exception {
+        final String topic = NAMESPACE1 + "/testDeleteTopicBeforeCommit";
+        PulsarClient pulsarClient = this.pulsarClient;
+        Transaction tnx = pulsarClient.newTransaction()
+                .withTransactionTimeout(60, TimeUnit.SECONDS)
+                .build().get();
+        long txnIdMostBits = ((TransactionImpl) tnx).getTxnIdMostBits();
+        long txnIdLeastBits = ((TransactionImpl) tnx).getTxnIdLeastBits();
+        Assert.assertTrue(txnIdMostBits > -1);
+        Assert.assertTrue(txnIdLeastBits > -1);
+
+        @Cleanup
+        Producer<byte[]> outProducer = pulsarClient
+                .newProducer()
+                .topic(topic)
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .enableBatching(false)
+                .create();
+
+        String content = "Hello Txn";
+        outProducer.newMessage(tnx).value(content.getBytes(UTF_8)).send();
+
+        try {
+            admin.namespaces().deleteNamespace(NAMESPACE1, true);
+        } catch (Exception ignore) {}
+        tnx.commit().get();
+    }
+
     @Test
     public void produceAndAbortTest() throws Exception {
         produceTest(false);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
index d7a0d3dd7ad..7cae6ca3ec3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
@@ -156,7 +156,7 @@ public abstract class TransactionTestBase extends TestRetrySupport {
             conf.setConfigurationStoreServers("localhost:3181");
             conf.setAllowAutoTopicCreationType("non-partitioned");
             conf.setBookkeeperClientExposeStatsToPrometheus(true);
-
+            conf.setForceDeleteNamespaceAllowed(true);
             conf.setBrokerShutdownTimeoutMs(0L);
             conf.setBrokerServicePort(Optional.of(0));
             conf.setBrokerServicePortTls(Optional.of(0));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
index d85f6b42f23..66460778dc2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
@@ -30,10 +30,13 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.transaction.TransactionTestBase;
 import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
 import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferHandlerImpl;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
 import org.apache.pulsar.client.api.transaction.TransactionBufferClientException;
@@ -45,6 +48,8 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.awaitility.Awaitility;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterClass;
@@ -80,7 +85,7 @@ public class TransactionBufferClientTest extends TransactionTestBase {
                 new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
         admin.namespaces().createNamespace(namespace, 10);
         admin.topics().createPartitionedTopic(partitionedTopicName.getPartitionedTopicName(), partitions);
-        tbClient = TransactionBufferClientImpl.create(pulsarClient,
+        tbClient = TransactionBufferClientImpl.create(pulsarServiceList.get(0),
                 new HashedWheelTimer(new DefaultThreadFactory("transaction-buffer")), 1000, 3000);
     }
 
@@ -145,22 +150,30 @@ public class TransactionBufferClientTest extends TransactionTestBase {
 
     @Test
     public void testTransactionBufferClientTimeout() throws Exception {
-        PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
+        PulsarService pulsarService = pulsarServiceList.get(0);
+        PulsarClient mockClient = mock(PulsarClientImpl.class);
         CompletableFuture<ClientCnx> completableFuture = new CompletableFuture<>();
         ClientCnx clientCnx = mock(ClientCnx.class);
         completableFuture.complete(clientCnx);
-        when(mockClient.getConnection(anyString())).thenReturn(completableFuture);
+        when(((PulsarClientImpl)mockClient).getConnection(anyString())).thenReturn(completableFuture);
         ChannelHandlerContext cnx = mock(ChannelHandlerContext.class);
         when(clientCnx.ctx()).thenReturn(cnx);
         Channel channel = mock(Channel.class);
         when(cnx.channel()).thenReturn(channel);
+        when(pulsarService.getClient()).thenAnswer(new Answer<PulsarClient>(){
+
+            @Override
+            public PulsarClient answer(InvocationOnMock invocation) throws Throwable {
+                return mockClient;
+            }
+        });
 
         when(channel.isActive()).thenReturn(true);
 
         @Cleanup("stop")
         HashedWheelTimer hashedWheelTimer = new HashedWheelTimer();
         TransactionBufferHandlerImpl transactionBufferHandler =
-                new TransactionBufferHandlerImpl(mockClient, hashedWheelTimer, 1000, 3000);
+                new TransactionBufferHandlerImpl(pulsarService, hashedWheelTimer, 1000, 3000);
         CompletableFuture<TxnID> endFuture =
                 transactionBufferHandler.endTxnOnTopic("test", 1, 1, TxnAction.ABORT, 1);
 
@@ -187,23 +200,31 @@ public class TransactionBufferClientTest extends TransactionTestBase {
     }
 
     @Test
-    public void testTransactionBufferChannelUnActive() {
-        PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
+    public void testTransactionBufferChannelUnActive() throws PulsarServerException {
+        PulsarService pulsarService = pulsarServiceList.get(0);
+        PulsarClient mockClient = mock(PulsarClientImpl.class);
         CompletableFuture<ClientCnx> completableFuture = new CompletableFuture<>();
         ClientCnx clientCnx = mock(ClientCnx.class);
         completableFuture.complete(clientCnx);
-        when(mockClient.getConnection(anyString())).thenReturn(completableFuture);
+        when(((PulsarClientImpl)mockClient).getConnection(anyString())).thenReturn(completableFuture);
         ChannelHandlerContext cnx = mock(ChannelHandlerContext.class);
         when(clientCnx.ctx()).thenReturn(cnx);
         Channel channel = mock(Channel.class);
         when(cnx.channel()).thenReturn(channel);
 
         when(channel.isActive()).thenReturn(false);
+        when(pulsarService.getClient()).thenAnswer(new Answer<PulsarClient>(){
+
+            @Override
+            public PulsarClient answer(InvocationOnMock invocation) throws Throwable {
+                return mockClient;
+            }
+        });
 
         @Cleanup("stop")
         HashedWheelTimer hashedWheelTimer = new HashedWheelTimer();
         TransactionBufferHandlerImpl transactionBufferHandler =
-                new TransactionBufferHandlerImpl(mockClient, hashedWheelTimer, 1000, 3000);
+                new TransactionBufferHandlerImpl(pulsarServiceList.get(0), hashedWheelTimer, 1000, 3000);
         try {
             transactionBufferHandler.endTxnOnTopic("test", 1, 1, TxnAction.ABORT, 1).get();
             fail();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferHandlerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferHandlerImplTest.java
index 5241342635b..af4e442f617 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferHandlerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferHandlerImplTest.java
@@ -18,42 +18,55 @@
  */
 package org.apache.pulsar.broker.transaction.buffer;
 
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
+import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferHandlerImpl;
-
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
-
+import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.api.proto.TxnAction;
+import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.testng.annotations.Test;
 
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 @Test(groups = "broker")
 public class TransactionBufferHandlerImplTest {
 
     @Test
-    public void testRequestCredits() {
-        PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
-        when(pulsarClient.getConnection(anyString())).thenReturn(CompletableFuture.completedFuture(mock(ClientCnx.class)));
-        TransactionBufferHandlerImpl handler = spy(
-                new TransactionBufferHandlerImpl(pulsarClient, null, 1000, 3000));
+    public void testRequestCredits() throws PulsarServerException {
+        PulsarClient pulsarClient = mock(PulsarClientImpl.class);
+        PulsarService pulsarService = mock(PulsarService.class);
+        NamespaceService namespaceService = mock(NamespaceService.class);
+        when(pulsarService.getNamespaceService()).thenReturn(namespaceService);
+        when(pulsarService.getClient()).thenReturn(pulsarClient);
+        when(namespaceService.getBundleAsync(any())).thenReturn(CompletableFuture.completedFuture(mock(NamespaceBundle.class)));
+        Optional<NamespaceEphemeralData> opData = Optional.empty();
+        when(namespaceService.getOwnerAsync(any())).thenReturn(CompletableFuture.completedFuture(opData));
+        when(((PulsarClientImpl)pulsarClient).getConnection(anyString())).thenReturn(CompletableFuture.completedFuture(mock(ClientCnx.class)));
+        TransactionBufferHandlerImpl handler = spy(new TransactionBufferHandlerImpl(pulsarService, null, 1000, 3000));
         doNothing().when(handler).endTxn(any());
+        doReturn(CompletableFuture.completedFuture(mock(ClientCnx.class))).when(handler).getClientCnx(anyString());
         for (int i = 0; i < 500; i++) {
-            handler.endTxnOnTopic("t", 1L, 1L, TxnAction.COMMIT, 1L);
+            handler.endTxnOnTopic("public/default/t", 1L, 1L, TxnAction.COMMIT, 1L);
         }
         assertEquals(handler.getAvailableRequestCredits(), 500);
         for (int i = 0; i < 500; i++) {
-            handler.endTxnOnTopic("t", 1L, 1L, TxnAction.COMMIT, 1L);
+            handler.endTxnOnTopic("public/default/t", 1L, 1L, TxnAction.COMMIT, 1L);
         }
         assertEquals(handler.getAvailableRequestCredits(), 0);
-        handler.endTxnOnTopic("t", 1L, 1L, TxnAction.COMMIT, 1L);
+        handler.endTxnOnTopic("public/default/t", 1L, 1L, TxnAction.COMMIT, 1L);
         assertEquals(handler.getPendingRequestsCount(), 1);
         handler.onResponse(null);
         assertEquals(handler.getAvailableRequestCredits(), 0);
@@ -61,9 +74,11 @@ public class TransactionBufferHandlerImplTest {
     }
 
     @Test
-    public void testMinRequestCredits() {
-        TransactionBufferHandlerImpl handler = spy(
-                new TransactionBufferHandlerImpl(null, null, 50, 3000));
+    public void testMinRequestCredits() throws PulsarServerException {
+        PulsarClient pulsarClient = mock(PulsarClientImpl.class);
+        PulsarService pulsarService = mock(PulsarService.class);
+        when(pulsarService.getClient()).thenReturn(pulsarClient);
+        TransactionBufferHandlerImpl handler = spy(new TransactionBufferHandlerImpl(pulsarService, null, 50, 3000));
         assertEquals(handler.getAvailableRequestCredits(), 100);
     }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 0ab7a137608..7c318f25a8e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -30,7 +30,8 @@ import io.netty.channel.EventLoopGroup;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timer;
 import io.netty.util.concurrent.DefaultThreadFactory;
-
+import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.time.Clock;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -828,7 +829,12 @@ public class PulsarClientImpl implements PulsarClient {
     public CompletableFuture<ClientCnx> getConnection(final String topic) {
         TopicName topicName = TopicName.get(topic);
         return lookup.getBroker(topicName)
-                .thenCompose(pair -> cnxPool.getConnection(pair.getLeft(), pair.getRight()));
+                .thenCompose(pair -> getConnection(pair.getLeft(), pair.getRight()));
+    }
+
+    public CompletableFuture<ClientCnx> getConnection(final InetSocketAddress logicalAddress,
+                                                      final InetSocketAddress physicalAddress) {
+        return cnxPool.getConnection(logicalAddress, physicalAddress);
     }
 
     /** visible for pulsar-functions **/