You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/12/03 05:35:15 UTC

[pulsar] 01/04: Make ServerCnx, Producer and Consumer independent of Netty (#6720)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b9855083d2bd6128897bfecb068d142f7c1a6e24
Author: Christophe Bornet <cb...@hotmail.com>
AuthorDate: Wed Nov 18 07:09:35 2020 +0100

    Make ServerCnx, Producer and Consumer independent of Netty (#6720)
    
    This PR is a first step to implement [PIP 59: gRPC Protocol Handler](https://github.com/apache/pulsar/wiki/PIP-59%3A-gPRC-Protocol-Handler)
    It separates `ServerCnx` used by `Consumer` and `Producer` into an interface and an implementation. Alternate protocol handlers that use Pulsar's topics, consumers and producer will implement the interface with their own implementation.
    
    (cherry picked from commit 2a183ab0dfedfda9d13a8536c02d95c109056b63)
---
 .../pulsar/broker/service/BrokerService.java       |   4 +-
 .../org/apache/pulsar/broker/service/Consumer.java | 120 +++------------------
 .../org/apache/pulsar/broker/service/Producer.java |  52 ++++-----
 .../broker/service/PulsarChannelInitializer.java   |   2 +-
 .../pulsar/broker/service/PulsarCommandSender.java |  16 ++-
 .../broker/service/PulsarCommandSenderImpl.java    | 105 ++++++++++++++++++
 .../apache/pulsar/broker/service/ServerCnx.java    |  39 +++++--
 .../org/apache/pulsar/broker/service/Topic.java    |   2 +-
 .../apache/pulsar/broker/service/TransportCnx.java |  80 ++++++++++++++
 .../service/nonpersistent/NonPersistentTopic.java  |   6 +-
 .../broker/service/persistent/PersistentTopic.java |  10 +-
 .../service/MessagePublishBufferThrottleTest.java  |  23 ++--
 .../PersistentDispatcherFailoverConsumerTest.java  |   4 +
 .../pulsar/broker/service/PersistentTopicTest.java |   2 +
 14 files changed, 307 insertions(+), 158 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 2dee374..b27dba2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2260,8 +2260,8 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
         }
     }
 
-    private void foreachCnx(Consumer<ServerCnx> consumer) {
-        Set<ServerCnx> cnxSet = new HashSet<>();
+    private void foreachCnx(Consumer<TransportCnx> consumer) {
+        Set<TransportCnx> cnxSet = new HashSet<>();
         topics.forEach((n, t) -> {
             Optional<Topic> topic = extractTopic(t);
             topic.ifPresent(value -> value.getProducers().values().forEach(producer -> cnxSet.add(producer.getCnx())));
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index c2dd900..968050c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -23,12 +23,6 @@ import static com.google.common.base.Preconditions.checkArgument;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.Lists;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelPromise;
-
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.Collections;
@@ -39,6 +33,8 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.stream.Collectors;
 
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.Promise;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -53,10 +49,8 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosi
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.PulsarApi.IntRange;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
-import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ConsumerStats;
-import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.stats.Rate;
 import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.common.util.SafeCollectionUtils;
@@ -69,7 +63,7 @@ import org.slf4j.LoggerFactory;
 public class Consumer {
     private final Subscription subscription;
     private final SubType subType;
-    private final ServerCnx cnx;
+    private final TransportCnx cnx;
     private final String appId;
     private AuthenticationDataSource authenticationData;
     private final String topicName;
@@ -131,7 +125,7 @@ public class Consumer {
 
     public Consumer(Subscription subscription, SubType subType, String topicName, long consumerId,
                     int priorityLevel, String consumerName,
-                    int maxUnackedMessages, ServerCnx cnx, String appId,
+                    int maxUnackedMessages, TransportCnx cnx, String appId,
                     Map<String, String> metadata, boolean readCompacted, InitialPosition subscriptionInitialPosition,
                     PulsarApi.KeySharedMeta keySharedMeta) throws BrokerServiceException {
 
@@ -191,18 +185,11 @@ public class Consumer {
     }
 
     void notifyActiveConsumerChange(Consumer activeConsumer) {
-        if (!Commands.peerSupportsActiveConsumerListener(cnx.getRemoteEndpointProtocolVersion())) {
-            // if the client is older than `v12`, we don't need to send consumer group changes.
-            return;
-        }
-
         if (log.isDebugEnabled()) {
             log.debug("notify consumer {} - that [{}] for subscription {} has new active consumer : {}",
                 consumerId, topicName, subscription.getName(), activeConsumer);
         }
-        cnx.ctx().writeAndFlush(
-            Commands.newActiveConsumerChange(consumerId, this == activeConsumer),
-            cnx.ctx().voidPromise());
+        cnx.getCommandSender().sendActiveConsumerChange(consumerId, this == activeConsumer);
     }
 
     public boolean readCompacted() {
@@ -215,23 +202,21 @@ public class Consumer {
      *
      * @return a SendMessageInfo object that contains the detail of what was sent to consumer
      */
-
-    public ChannelPromise sendMessages(final List<Entry> entries, EntryBatchSizes batchSizes, EntryBatchIndexesAcks batchIndexesAcks,
+    public Future<Void> sendMessages(final List<Entry> entries, EntryBatchSizes batchSizes, EntryBatchIndexesAcks batchIndexesAcks,
                int totalMessages, long totalBytes, long totalChunkedMessages, RedeliveryTracker redeliveryTracker) {
         this.lastConsumedTimestamp = System.currentTimeMillis();
-        final ChannelHandlerContext ctx = cnx.ctx();
-        final ChannelPromise writePromise = ctx.newPromise();
 
         if (entries.isEmpty() || totalMessages == 0) {
             if (log.isDebugEnabled()) {
                 log.debug("[{}-{}] List of messages is empty, triggering write future immediately for consumerId {}",
                         topicName, subscription, consumerId);
             }
-            writePromise.setSuccess();
             batchSizes.recyle();
             if (batchIndexesAcks != null) {
                 batchIndexesAcks.recycle();
             }
+            final Promise<Void> writePromise = cnx.newPromise();
+            writePromise.setSuccess(null);
             return writePromise;
         }
 
@@ -263,66 +248,8 @@ public class Consumer {
         bytesOutCounter.add(totalBytes);
         chuckedMessageRate.recordMultipleEvents(totalChunkedMessages, 0);
 
-        ctx.channel().eventLoop().execute(() -> {
-            for (int i = 0; i < entries.size(); i++) {
-                Entry entry = entries.get(i);
-                if (entry == null) {
-                    // Entry was filtered out
-                    continue;
-                }
-
-                int batchSize = batchSizes.getBatchSize(i);
-
-                if (batchSize > 1 && !cnx.isBatchMessageCompatibleVersion()) {
-                    log.warn("[{}-{}] Consumer doesn't support batch messages -  consumerId {}, msg id {}-{}",
-                            topicName, subscription,
-                            consumerId, entry.getLedgerId(), entry.getEntryId());
-                    ctx.close();
-                    entry.release();
-                    continue;
-                }
-
-                MessageIdData.Builder messageIdBuilder = MessageIdData.newBuilder();
-                MessageIdData messageId = messageIdBuilder
-                    .setLedgerId(entry.getLedgerId())
-                    .setEntryId(entry.getEntryId())
-                    .setPartition(partitionIdx)
-                    .build();
-
-                ByteBuf metadataAndPayload = entry.getDataBuffer();
-                // increment ref-count of data and release at the end of process: so, we can get chance to call entry.release
-                metadataAndPayload.retain();
-                // skip checksum by incrementing reader-index if consumer-client doesn't support checksum verification
-                if (cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v11.getNumber()) {
-                    Commands.skipChecksumIfPresent(metadataAndPayload);
-                }
-
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}-{}] Sending message to consumerId {}, msg id {}-{}", topicName, subscription,
-                            consumerId, entry.getLedgerId(), entry.getEntryId());
-                }
-
-                int redeliveryCount = 0;
-                PositionImpl position = PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId());
-                if (redeliveryTracker.contains(position)) {
-                    redeliveryCount = redeliveryTracker.incrementAndGetRedeliveryCount(position);
-                }
-                ctx.write(cnx.newMessageAndIntercept(consumerId, messageId, redeliveryCount, metadataAndPayload,
-                    batchIndexesAcks == null ? null : batchIndexesAcks.getAckSet(i), topicName), ctx.voidPromise());
-                messageId.recycle();
-                messageIdBuilder.recycle();
-                entry.release();
-            }
-
-            // Use an empty write here so that we can just tie the flush with the write promise for last entry
-            ctx.writeAndFlush(Unpooled.EMPTY_BUFFER, writePromise);
-            batchSizes.recyle();
-            if (batchIndexesAcks != null) {
-                batchIndexesAcks.recycle();
-            }
-        });
-
-        return writePromise;
+        return cnx.getCommandSender().sendMessagesToConsumer(consumerId, topicName, subscription, partitionIdx,
+                entries, batchSizes, batchIndexesAcks, redeliveryTracker);
     }
 
     private void incrementUnackedMessages(int ackedMessages) {
@@ -337,10 +264,6 @@ public class Consumer {
         return cnx.isWritable();
     }
 
-    public void sendError(ByteBuf error) {
-        cnx.ctx().writeAndFlush(error).addListener(ChannelFutureListener.CLOSE);
-    }
-
     /**
      * Close the consumer if: a. the connection is dropped b. connection is open (graceful close) and there are no
      * pending message acks
@@ -368,18 +291,15 @@ public class Consumer {
         }
     }
 
-    void doUnsubscribe(final long requestId) {
-        final ChannelHandlerContext ctx = cnx.ctx();
-
+    public void doUnsubscribe(final long requestId) {
         subscription.doUnsubscribe(this).thenAccept(v -> {
             log.info("Unsubscribed successfully from {}", subscription);
             cnx.removedConsumer(this);
-            ctx.writeAndFlush(Commands.newSuccess(requestId));
+            cnx.getCommandSender().sendSuccess(requestId);
         }).exceptionally(exception -> {
             log.warn("Unsubscribe failed for {}", subscription, exception);
-            ctx.writeAndFlush(
-                    Commands.newError(requestId, BrokerServiceException.getClientErrorCode(exception),
-                            exception.getCause().getMessage()));
+            cnx.getCommandSender().sendError(requestId, BrokerServiceException.getClientErrorCode(exception),
+                    exception.getCause().getMessage());
             return null;
         });
     }
@@ -439,7 +359,7 @@ public class Consumer {
         }
     }
 
-    void flowPermits(int additionalNumberOfMessages) {
+    public void flowPermits(int additionalNumberOfMessages) {
         checkArgument(additionalNumberOfMessages > 0);
 
         // block shared consumer when unacked-messages reaches limit
@@ -489,11 +409,7 @@ public class Consumer {
     }
 
     public void reachedEndOfTopic() {
-        // Only send notification if the client understand the command
-        if (cnx.getRemoteEndpointProtocolVersion() >= ProtocolVersion.v9_VALUE) {
-            log.info("[{}] Notifying consumer that end of topic has been reached", this);
-            cnx.ctx().writeAndFlush(Commands.newReachedEndOfTopic(consumerId));
-        }
+        cnx.getCommandSender().sendReachedEndOfTopic(consumerId);
     }
 
     /**
@@ -546,10 +462,6 @@ public class Consumer {
                 .add("consumerName", consumerName).add("address", this.cnx.clientAddress()).toString();
     }
 
-    public ChannelHandlerContext ctx() {
-        return cnx.ctx();
-    }
-
     public void checkPermissions() {
         TopicName topicName = TopicName.get(subscription.getTopicName());
         if (cnx.getBrokerService().getAuthorizationService() != null) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index 2fd9c2e..3216d43 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.service;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.scurrilous.circe.checksum.Crc32cIntChecksum.computeChecksum;
+import static org.apache.pulsar.broker.service.AbstractReplicator.REPL_PRODUCER_NAME_DELIMITER;
 import static org.apache.pulsar.common.protocol.Commands.hasChecksum;
 import static org.apache.pulsar.common.protocol.Commands.readChecksum;
 
@@ -52,14 +53,13 @@ import org.apache.pulsar.common.stats.Rate;
 import org.apache.pulsar.common.util.DateFormatter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import static org.apache.pulsar.broker.service.AbstractReplicator.REPL_PRODUCER_NAME_DELIMITER;
 
 /**
  * Represents a currently connected producer
  */
 public class Producer {
     private final Topic topic;
-    private final ServerCnx cnx;
+    private final TransportCnx cnx;
     private final String producerName;
     private final long epoch;
     private final boolean userProvidedProducerName;
@@ -88,7 +88,7 @@ public class Producer {
 
     private final SchemaVersion schemaVersion;
 
-    public Producer(Topic topic, ServerCnx cnx, long producerId, String producerName, String appId,
+    public Producer(Topic topic, TransportCnx cnx, long producerId, String producerName, String appId,
             boolean isEncrypted, Map<String, String> metadata, SchemaVersion schemaVersion, long epoch,
             boolean userProvidedProducerName) {
         this.topic = topic;
@@ -140,41 +140,42 @@ public class Producer {
 
     public void publishMessage(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize,
             boolean isChunked) {
-        beforePublish(producerId, sequenceId, headersAndPayload, batchSize);
-        publishMessageToTopic(headersAndPayload, sequenceId, batchSize, isChunked);
+        if (checkAndStartPublish(producerId, sequenceId, headersAndPayload, batchSize)) {
+            publishMessageToTopic(headersAndPayload, sequenceId, batchSize, isChunked);
+        }
     }
 
     public void publishMessage(long producerId, long lowestSequenceId, long highestSequenceId,
-           ByteBuf headersAndPayload, long batchSize, boolean isChunked) {
+            ByteBuf headersAndPayload, long batchSize, boolean isChunked) {
         if (lowestSequenceId > highestSequenceId) {
-            cnx.ctx().channel().eventLoop().execute(() -> {
+            cnx.execute(() -> {
                 cnx.getCommandSender().sendSendError(producerId, highestSequenceId, ServerError.MetadataError,
                         "Invalid lowest or highest sequence id");
                 cnx.completedSendOperation(isNonPersistentTopic, headersAndPayload.readableBytes());
             });
             return;
         }
-        beforePublish(producerId, highestSequenceId, headersAndPayload, batchSize);
-        publishMessageToTopic(headersAndPayload, lowestSequenceId, highestSequenceId, batchSize, isChunked);
+        if (checkAndStartPublish(producerId, highestSequenceId, headersAndPayload, batchSize)) {
+            publishMessageToTopic(headersAndPayload, lowestSequenceId, highestSequenceId, batchSize, isChunked);
+        }
     }
 
-    public void beforePublish(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize) {
+    public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize) {
         if (isClosed) {
-            cnx.ctx().channel().eventLoop().execute(() -> {
+            cnx.execute(() -> {
                 cnx.getCommandSender().sendSendError(producerId, sequenceId, ServerError.PersistenceError,
                         "Producer is closed");
                 cnx.completedSendOperation(isNonPersistentTopic, headersAndPayload.readableBytes());
             });
-
-            return;
+            return false;
         }
 
         if (!verifyChecksum(headersAndPayload)) {
-            cnx.ctx().channel().eventLoop().execute(() -> {
+            cnx.execute(() -> {
                 cnx.getCommandSender().sendSendError(producerId, sequenceId, ServerError.ChecksumError, "Checksum failed on the broker");
                 cnx.completedSendOperation(isNonPersistentTopic, headersAndPayload.readableBytes());
             });
-            return;
+            return false;
         }
 
         if (topic.isEncryptionRequired()) {
@@ -187,16 +188,17 @@ public class Producer {
             // Check whether the message is encrypted or not
             if (encryptionKeysCount < 1) {
                 log.warn("[{}] Messages must be encrypted", getTopic().getName());
-                cnx.ctx().channel().eventLoop().execute(() -> {
+                cnx.execute(() -> {
                     cnx.getCommandSender().sendSendError(producerId, sequenceId, ServerError.MetadataError,
                             "Messages must be encrypted");
                     cnx.completedSendOperation(isNonPersistentTopic, headersAndPayload.readableBytes());
                 });
-                return;
+                return false;
             }
         }
 
         startPublishOperation((int) batchSize, headersAndPayload.readableBytes());
+        return true;
     }
 
     private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, long batchSize, boolean isChunked) {
@@ -266,7 +268,7 @@ public class Producer {
 
     /**
      * Return the sequence id of
-     * @return
+     * @return the sequence id
      */
     public long getLastSequenceId() {
         if (isNonPersistentTopic) {
@@ -276,7 +278,7 @@ public class Producer {
         }
     }
 
-    public ServerCnx getCnx() {
+    public TransportCnx getCnx() {
         return this.cnx;
     }
 
@@ -354,7 +356,7 @@ public class Producer {
                 ServerError serverError = (exception instanceof TopicTerminatedException)
                         ? ServerError.TopicTerminatedError : ServerError.PersistenceError;
 
-                producer.cnx.ctx().channel().eventLoop().execute(() -> {
+                producer.cnx.execute(() -> {
                     if (!(exception instanceof TopicClosedException)) {
                         // For TopicClosed exception there's no need to send explicit error, since the client was
                         // already notified
@@ -374,7 +376,7 @@ public class Producer {
 
                 this.ledgerId = ledgerId;
                 this.entryId = entryId;
-                producer.cnx.ctx().channel().eventLoop().execute(this);
+                producer.cnx.execute(this);
             }
         }
 
@@ -417,7 +419,7 @@ public class Producer {
         }
 
         static MessagePublishContext get(Producer producer, long lowestSequenceId, long highestSequenceId, Rate rateIn,
-                 int msgSize, long batchSize, boolean chunked, long startTimeNs) {
+                int msgSize, long batchSize, boolean chunked, long startTimeNs) {
             MessagePublishContext callback = RECYCLER.get();
             callback.producer = producer;
             callback.sequenceId = lowestSequenceId;
@@ -439,7 +441,7 @@ public class Producer {
         }
 
         private static final Recycler<MessagePublishContext> RECYCLER = new Recycler<MessagePublishContext>() {
-            protected MessagePublishContext newObject(Recycler.Handle<MessagePublishContext> handle) {
+            protected MessagePublishContext newObject(Handle<MessagePublishContext> handle) {
                 return new MessagePublishContext(handle);
             }
         };
@@ -511,7 +513,7 @@ public class Producer {
         return closeFuture;
     }
 
-    void closeNow(boolean removeFromTopic) {
+    public void closeNow(boolean removeFromTopic) {
         if (removeFromTopic) {
             topic.removeProducer(this);
         }
@@ -532,7 +534,7 @@ public class Producer {
     public CompletableFuture<Void> disconnect() {
         if (!closeFuture.isDone()) {
             log.info("Disconnecting producer: {}", this);
-            cnx.ctx().executor().execute(() -> {
+            cnx.execute(() -> {
                 cnx.closeProducer(this);
                 closeNow(true);
             });
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
index 2a2d3d5..7425ffb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
@@ -137,7 +137,7 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel>
             try {
                 cnx.refreshAuthenticationCredentials();
             } catch (Throwable t) {
-                log.warn("[{}] Failed to refresh auth credentials", cnx.getRemoteAddress());
+                log.warn("[{}] Failed to refresh auth credentials", cnx.clientAddress());
             }
         });
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
index 1773106..141c7c8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
@@ -18,6 +18,9 @@
  */
 package org.apache.pulsar.broker.service;
 
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.ImmediateEventExecutor;
+import org.apache.bookkeeper.mledger.Entry;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.schema.SchemaInfo;
@@ -26,7 +29,6 @@ import java.util.List;
 
 public interface PulsarCommandSender {
 
-
     void sendPartitionMetadataResponse(PulsarApi.ServerError error, String errorMsg, long requestId);
 
     void sendPartitionMetadataResponse(int partitions, long requestId);
@@ -61,4 +63,16 @@ public interface PulsarCommandSender {
                             PulsarApi.CommandLookupTopicResponse.LookupType response, long requestId, boolean proxyThroughServiceUrl);
 
     void sendLookupResponse(PulsarApi.ServerError error, String errorMsg, long requestId);
+
+    void sendActiveConsumerChange(long consumerId, boolean isActive);
+
+    void sendSuccess(long requestId);
+
+    void sendError(long requestId, PulsarApi.ServerError error, String message);
+
+    void sendReachedEndOfTopic(long consumerId);
+
+    Future<Void> sendMessagesToConsumer(long consumerId, String topicName, Subscription subscription,
+            int partitionIdx, final List<Entry> entries, EntryBatchSizes batchSizes, EntryBatchIndexesAcks batchIndexesAcks,
+            RedeliveryTracker redeliveryTracker);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
index c9f1b61..c3d6a56 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
@@ -19,9 +19,16 @@
 package org.apache.pulsar.broker.service;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.intercept.BrokerInterceptor;
 import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
+import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.schema.SchemaInfo;
@@ -206,6 +213,104 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
         cnx.ctx().writeAndFlush(outBuf);
     }
 
+    @Override
+    public void sendActiveConsumerChange(long consumerId, boolean isActive) {
+        if (!Commands.peerSupportsActiveConsumerListener(cnx.getRemoteEndpointProtocolVersion())) {
+            // if the client is older than `v12`, we don't need to send consumer group changes.
+            return;
+        }
+        cnx.ctx().writeAndFlush(
+                Commands.newActiveConsumerChange(consumerId, isActive),
+                cnx.ctx().voidPromise());
+    }
+
+    @Override
+    public void sendSuccess(long requestId) {
+        cnx.ctx().writeAndFlush(Commands.newSuccess(requestId));
+    }
+
+    @Override
+    public void sendError(long requestId, PulsarApi.ServerError error, String message) {
+        cnx.ctx().writeAndFlush(Commands.newError(requestId, error, message));
+    }
+
+    @Override
+    public void sendReachedEndOfTopic(long consumerId) {
+        // Only send notification if the client understand the command
+        if (cnx.getRemoteEndpointProtocolVersion() >= PulsarApi.ProtocolVersion.v9_VALUE) {
+            log.info("[{}] Notifying consumer that end of topic has been reached", this);
+            cnx.ctx().writeAndFlush(Commands.newReachedEndOfTopic(consumerId));
+        }
+    }
+
+    @Override
+    public ChannelPromise sendMessagesToConsumer(long consumerId, String topicName, Subscription subscription,
+            int partitionIdx, final List<Entry> entries, EntryBatchSizes batchSizes, EntryBatchIndexesAcks batchIndexesAcks,
+            RedeliveryTracker redeliveryTracker) {
+        final ChannelHandlerContext ctx = cnx.ctx();
+        final ChannelPromise writePromise = ctx.newPromise();
+        ctx.channel().eventLoop().execute(() -> {
+            for (int i = 0; i < entries.size(); i++) {
+                Entry entry = entries.get(i);
+                if (entry == null) {
+                    // Entry was filtered out
+                    continue;
+                }
+
+                int batchSize = batchSizes.getBatchSize(i);
+
+                if (batchSize > 1 && !cnx.isBatchMessageCompatibleVersion()) {
+                    log.warn("[{}-{}] Consumer doesn't support batch messages -  consumerId {}, msg id {}-{}",
+                            topicName, subscription,
+                            consumerId, entry.getLedgerId(), entry.getEntryId());
+                    ctx.close();
+                    entry.release();
+                    continue;
+                }
+
+                MessageIdData.Builder messageIdBuilder = MessageIdData.newBuilder();
+                MessageIdData messageId = messageIdBuilder
+                        .setLedgerId(entry.getLedgerId())
+                        .setEntryId(entry.getEntryId())
+                        .setPartition(partitionIdx)
+                        .build();
+
+                ByteBuf metadataAndPayload = entry.getDataBuffer();
+                // increment ref-count of data and release at the end of process: so, we can get chance to call entry.release
+                metadataAndPayload.retain();
+                // skip checksum by incrementing reader-index if consumer-client doesn't support checksum verification
+                if (cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v11.getNumber()) {
+                    Commands.skipChecksumIfPresent(metadataAndPayload);
+                }
+
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}-{}] Sending message to consumerId {}, msg id {}-{}", topicName, subscription,
+                            consumerId, entry.getLedgerId(), entry.getEntryId());
+                }
+
+                int redeliveryCount = 0;
+                PositionImpl position = PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId());
+                if (redeliveryTracker.contains(position)) {
+                    redeliveryCount = redeliveryTracker.incrementAndGetRedeliveryCount(position);
+                }
+                ctx.write(cnx.newMessageAndIntercept(consumerId, messageId, redeliveryCount, metadataAndPayload,
+                        batchIndexesAcks == null ? null : batchIndexesAcks.getAckSet(i), topicName), ctx.voidPromise());
+                messageId.recycle();
+                messageIdBuilder.recycle();
+                entry.release();
+            }
+
+            // Use an empty write here so that we can just tie the flush with the write promise for last entry
+            ctx.writeAndFlush(Unpooled.EMPTY_BUFFER, writePromise);
+            batchSizes.recyle();
+            if (batchIndexesAcks != null) {
+                batchIndexesAcks.recycle();
+            }
+        });
+
+        return writePromise;
+    }
+
     private void safeIntercept(PulsarApi.BaseCommand command, ServerCnx cnx) {
         try {
             this.interceptor.onPulsarCommand(command, cnx);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 3b0426e..6497e12 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -34,6 +34,7 @@ import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelOption;
 import io.netty.handler.ssl.SslHandler;
+import io.netty.util.concurrent.Promise;
 
 import java.net.SocketAddress;
 
@@ -134,7 +135,7 @@ import org.apache.pulsar.transaction.impl.common.TxnStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ServerCnx extends PulsarHandler {
+public class ServerCnx extends PulsarHandler implements TransportCnx {
     private final BrokerService service;
     private final SchemaRegistryService schemaService;
     private final ConcurrentLongHashMap<CompletableFuture<Producer>> producers;
@@ -1771,6 +1772,7 @@ public class ServerCnx extends PulsarHandler {
 
     }
 
+    @Override
     public void closeConsumer(Consumer consumer) {
         // removes consumer-connection from map and send close command to consumer
         if (log.isDebugEnabled()) {
@@ -1793,10 +1795,12 @@ public class ServerCnx extends PulsarHandler {
         ctx.close();
     }
 
+    @Override
     public SocketAddress clientAddress() {
         return remoteAddress;
     }
 
+    @Override
     public void removedConsumer(Consumer consumer) {
         if (log.isDebugEnabled()) {
             log.debug("[{}] Removed consumer: {}", remoteAddress, consumer);
@@ -1805,6 +1809,7 @@ public class ServerCnx extends PulsarHandler {
         consumers.remove(consumer.consumerId());
     }
 
+    @Override
     public void removedProducer(Producer producer) {
         if (log.isDebugEnabled()) {
             log.debug("[{}] Removed producer: {}", remoteAddress, producer);
@@ -1812,10 +1817,12 @@ public class ServerCnx extends PulsarHandler {
         producers.remove(producer.getProducerId());
     }
 
+    @Override
     public boolean isActive() {
         return isActive;
     }
 
+    @Override
     public boolean isWritable() {
         return ctx.channel().isWritable();
     }
@@ -1848,6 +1855,7 @@ public class ServerCnx extends PulsarHandler {
         }
     }
 
+    @Override
     public void completedSendOperation(boolean isNonPersistentTopic, int msgSize) {
         MSG_PUBLISH_BUFFER_SIZE_UPDATER.getAndAdd(this, -msgSize);
         if (--pendingSendRequest == resumeReadsThreshold) {
@@ -1861,6 +1869,7 @@ public class ServerCnx extends PulsarHandler {
         }
     }
 
+    @Override
     public void enableCnxAutoRead() {
         // we can add check (&& pendingSendRequest < MaxPendingSendRequests) here but then it requires
         // pendingSendRequest to be volatile and it can be expensive while writing. also this will be called on if
@@ -1874,21 +1883,22 @@ public class ServerCnx extends PulsarHandler {
         }
     }
 
+    @Override
     public void disableCnxAutoRead() {
         if (ctx != null && ctx.channel().config().isAutoRead() ) {
             ctx.channel().config().setAutoRead(false);
         }
     }
 
-    @VisibleForTesting
-    void cancelPublishRateLimiting() {
+    @Override
+    public void cancelPublishRateLimiting() {
         if (autoReadDisabledRateLimiting) {
             autoReadDisabledRateLimiting = false;
         }
     }
 
-    @VisibleForTesting
-    void cancelPublishBufferLimiting() {
+    @Override
+    public void cancelPublishBufferLimiting() {
         if (autoReadDisabledPublishBufferLimiting) {
             autoReadDisabledPublishBufferLimiting = false;
         }
@@ -1963,7 +1973,7 @@ public class ServerCnx extends PulsarHandler {
     /**
      * Helper method for testability
      *
-     * @return
+     * @return the connection state
      */
     public State getState() {
         return state;
@@ -1981,10 +1991,16 @@ public class ServerCnx extends PulsarHandler {
         return authRole;
     }
 
+    @Override
+    public Promise<Void> newPromise() {
+        return ctx.newPromise();
+    }
+
     boolean hasConsumer(long consumerId) {
         return consumers.containsKey(consumerId);
     }
 
+    @Override
     public boolean isBatchMessageCompatibleVersion() {
         return remoteEndpointProtocolVersion >= ProtocolVersion.v4.getNumber();
     }
@@ -1993,10 +2009,12 @@ public class ServerCnx extends PulsarHandler {
         return features != null && features.getSupportsAuthRefresh();
     }
 
+    @Override
     public String getClientVersion() {
         return clientVersion;
     }
 
+    @Override
     public long getMessagePublishBufferSize() {
         return this.messagePublishBufferSize;
     }
@@ -2011,6 +2029,7 @@ public class ServerCnx extends PulsarHandler {
         this.autoReadDisabledRateLimiting = isLimiting;
     }
 
+    @Override
     public boolean isPreciseDispatcherFlowControl() {
         return preciseDispatcherFlowControl;
     }
@@ -2019,6 +2038,7 @@ public class ServerCnx extends PulsarHandler {
         return authState;
     }
 
+    @Override
     public AuthenticationDataSource getAuthenticationData() {
         return originalAuthData != null ? originalAuthData : authenticationData;
     }
@@ -2031,6 +2051,7 @@ public class ServerCnx extends PulsarHandler {
         return authenticationProvider;
     }
 
+    @Override
     public String getAuthRole() {
         return authRole;
     }
@@ -2047,7 +2068,13 @@ public class ServerCnx extends PulsarHandler {
         return producers;
     }
 
+    @Override
     public PulsarCommandSender getCommandSender() {
         return commandSender;
     }
+
+    @Override
+    public void execute(Runnable runnable) {
+        ctx.channel().eventLoop().execute(runnable);
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 4d14326..ff5e962 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -102,7 +102,7 @@ public interface Topic {
      */
     void recordAddLatency(long latency, TimeUnit unit);
 
-    CompletableFuture<Consumer> subscribe(ServerCnx cnx, String subscriptionName, long consumerId, SubType subType,
+    CompletableFuture<Consumer> subscribe(TransportCnx cnx, String subscriptionName, long consumerId, SubType subType,
             int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId,
             Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition,
             long startMessageRollbackDurationSec, boolean replicateSubscriptionState, PulsarApi.KeySharedMeta keySharedMeta);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java
new file mode 100644
index 0000000..2b43eaa
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import io.netty.util.concurrent.Promise;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
+
+import java.net.SocketAddress;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+public interface TransportCnx {
+
+    String getClientVersion();
+
+    SocketAddress clientAddress();
+
+    BrokerService getBrokerService();
+
+    PulsarCommandSender getCommandSender();
+
+    boolean isBatchMessageCompatibleVersion();
+
+    /**
+     * The security role for this connection
+     * @return the role
+     */
+    String getAuthRole();
+
+    AuthenticationDataSource getAuthenticationData();
+
+    boolean isActive();
+
+    boolean isWritable();
+
+    void completedSendOperation(boolean isNonPersistentTopic, int msgSize);
+
+    void removedProducer(Producer producer);
+
+    void closeProducer(Producer producer);
+
+    long getMessagePublishBufferSize();
+
+    void cancelPublishRateLimiting();
+
+    void cancelPublishBufferLimiting();
+
+    void disableCnxAutoRead();
+
+    void enableCnxAutoRead();
+
+    void execute(Runnable runnable);
+
+    void removedConsumer(Consumer consumer);
+
+    void closeConsumer(Consumer consumer);
+
+    boolean isPreciseDispatcherFlowControl();
+
+    Promise<Void> newPromise();
+
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index b770f87..dd729bc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -56,7 +56,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException.UnsupportedVersio
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Producer;
 import org.apache.pulsar.broker.service.Replicator;
-import org.apache.pulsar.broker.service.ServerCnx;
+import org.apache.pulsar.broker.service.TransportCnx;
 import org.apache.pulsar.broker.service.StreamingStats;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.Topic;
@@ -237,7 +237,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
     }
 
     @Override
-    public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscriptionName, long consumerId,
+    public CompletableFuture<Consumer> subscribe(final TransportCnx cnx, String subscriptionName, long consumerId,
             SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId,
             Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition,
             long resetStartMessageBackInSec, boolean replicateSubscriptionState, PulsarApi.KeySharedMeta keySharedMeta) {
@@ -291,7 +291,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
 
         try {
             Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName, 0,
-                    cnx, cnx.getRole(), metadata, readCompacted, initialPosition, keySharedMeta);
+                    cnx, cnx.getAuthRole(), metadata, readCompacted, initialPosition, keySharedMeta);
             addConsumerToSubscription(subscription, consumer);
             if (!cnx.isActive()) {
                 consumer.close();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 198195d..9241ce9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -84,7 +84,7 @@ import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Dispatcher;
 import org.apache.pulsar.broker.service.Producer;
 import org.apache.pulsar.broker.service.Replicator;
-import org.apache.pulsar.broker.service.ServerCnx;
+import org.apache.pulsar.broker.service.TransportCnx;
 import org.apache.pulsar.broker.service.StreamingStats;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.Topic;
@@ -514,7 +514,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
     }
 
     @Override
-    public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscriptionName, long consumerId,
+    public CompletableFuture<Consumer> subscribe(final TransportCnx cnx, String subscriptionName, long consumerId,
             SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId,
             Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition,
             long startMessageRollbackDurationSec, boolean replicatedSubscriptionState, PulsarApi.KeySharedMeta keySharedMeta) {
@@ -571,9 +571,9 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
             return future;
         }
 
-        if (cnx.getRemoteAddress() != null && cnx.getRemoteAddress().toString().contains(":")) {
+        if (cnx.clientAddress() != null && cnx.clientAddress().toString().contains(":")) {
             SubscribeRateLimiter.ConsumerIdentifier consumer = new SubscribeRateLimiter.ConsumerIdentifier(
-                    cnx.getRemoteAddress().toString().split(":")[0], consumerName, consumerId);
+                    cnx.clientAddress().toString().split(":")[0], consumerName, consumerId);
             if (subscribeRateLimiter.isPresent() && !subscribeRateLimiter.get().subscribeAvailable(consumer) || !subscribeRateLimiter.get().tryAcquire(consumer)) {
                 log.warn("[{}] Failed to create subscription for {} {} limited by {}, available {}",
                         topic, subscriptionName, consumer, subscribeRateLimiter.get().getSubscribeRate(),
@@ -610,7 +610,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         subscriptionFuture.thenAccept(subscription -> {
             try {
                 Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName,
-                                                 maxUnackedMessages, cnx, cnx.getRole(), metadata, readCompacted, initialPosition, keySharedMeta);
+                                                 maxUnackedMessages, cnx, cnx.getAuthRole(), metadata, readCompacted, initialPosition, keySharedMeta);
                 addConsumerToSubscription(subscription, consumer);
 
                 checkBackloggedCursors();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java
index 8aa9c9a..b230f8d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java
@@ -56,7 +56,8 @@ public class MessagePublishBufferThrottleTest extends BrokerTestBase {
                 .create();
         Topic topicRef = pulsar.getBrokerService().getTopicReference(topic).get();
         Assert.assertNotNull(topicRef);
-        ((AbstractTopic)topicRef).producers.get("producer-name").getCnx().setMessagePublishBufferSize(Long.MAX_VALUE / 2);
+        TransportCnx cnx = ((AbstractTopic) topicRef).producers.get("producer-name").getCnx();
+        ((ServerCnx) cnx).setMessagePublishBufferSize(Long.MAX_VALUE / 2);
         Thread.sleep(20);
         Assert.assertFalse(pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
         List<CompletableFuture<MessageId>> futures = new ArrayList<>();
@@ -86,14 +87,15 @@ public class MessagePublishBufferThrottleTest extends BrokerTestBase {
                 .create();
         Topic topicRef = pulsar.getBrokerService().getTopicReference(topic).get();
         Assert.assertNotNull(topicRef);
-        ((AbstractTopic)topicRef).producers.get("producer-name").getCnx().setMessagePublishBufferSize(Long.MAX_VALUE / 2);
+        TransportCnx cnx = ((AbstractTopic) topicRef).producers.get("producer-name").getCnx();
+        ((ServerCnx) cnx).setMessagePublishBufferSize(Long.MAX_VALUE / 2);
         Assert.assertFalse(pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
         // The first message can publish success, but the second message should be blocked
         producer.sendAsync(new byte[1024]).get(1, TimeUnit.SECONDS);
         getPulsar().getBrokerService().checkMessagePublishBuffer();
         Assert.assertTrue(pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
 
-        ((AbstractTopic)topicRef).producers.get("producer-name").getCnx().setMessagePublishBufferSize(0L);
+        ((ServerCnx) cnx).setMessagePublishBufferSize(0L);
         getPulsar().getBrokerService().checkMessagePublishBuffer();
         Assert.assertFalse(pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
         List<CompletableFuture<MessageId>> futures = new ArrayList<>();
@@ -122,7 +124,8 @@ public class MessagePublishBufferThrottleTest extends BrokerTestBase {
                 .create();
         Topic topicRef = pulsar.getBrokerService().getTopicReference(topic).get();
         Assert.assertNotNull(topicRef);
-        ((AbstractTopic)topicRef).producers.get("producer-name").getCnx().setMessagePublishBufferSize(Long.MAX_VALUE / 2);
+        TransportCnx cnx = ((AbstractTopic) topicRef).producers.get("producer-name").getCnx();
+        ((ServerCnx) cnx).setMessagePublishBufferSize(Long.MAX_VALUE / 2);
         Assert.assertFalse(pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
         producer.sendAsync(new byte[1024]).get(1, TimeUnit.SECONDS);
 
@@ -131,15 +134,15 @@ public class MessagePublishBufferThrottleTest extends BrokerTestBase {
         Assert.assertTrue(pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
 
         // Block by publish rate.
-        ((AbstractTopic)topicRef).producers.get("producer-name").getCnx().setMessagePublishBufferSize(0L);
+        ((ServerCnx) cnx).setMessagePublishBufferSize(0L);
         getPulsar().getBrokerService().checkMessagePublishBuffer();
-        ((AbstractTopic)topicRef).producers.get("producer-name").getCnx().setAutoReadDisabledRateLimiting(true);
-        ((AbstractTopic)topicRef).producers.get("producer-name").getCnx().disableCnxAutoRead();
-        ((AbstractTopic)topicRef).producers.get("producer-name").getCnx().enableCnxAutoRead();
+        ((ServerCnx) cnx).setAutoReadDisabledRateLimiting(true);
+        cnx.disableCnxAutoRead();
+        cnx.enableCnxAutoRead();
 
         // Resume message publish.
-        ((AbstractTopic)topicRef).producers.get("producer-name").getCnx().setAutoReadDisabledRateLimiting(false);
-        ((AbstractTopic)topicRef).producers.get("producer-name").getCnx().enableCnxAutoRead();
+        ((ServerCnx) cnx).setAutoReadDisabledRateLimiting(false);
+        cnx.enableCnxAutoRead();
         Assert.assertFalse(pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
         List<CompletableFuture<MessageId>> futures = new ArrayList<>();
         // Make sure the producer can publish succeed.
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index 29829be..98cce6e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -173,6 +173,8 @@ public class PersistentDispatcherFailoverConsumerTest {
         doReturn(new InetSocketAddress("localhost", 1234)).when(serverCnx).clientAddress();
         when(serverCnx.getRemoteEndpointProtocolVersion()).thenReturn(ProtocolVersion.v12.getNumber());
         when(serverCnx.ctx()).thenReturn(channelCtx);
+        doReturn(new PulsarCommandSenderImpl(null, serverCnx))
+                .when(serverCnx).getCommandSender();
 
         serverCnxWithOldVersion = spy(new ServerCnx(pulsar));
         doReturn(true).when(serverCnxWithOldVersion).isActive();
@@ -182,6 +184,8 @@ public class PersistentDispatcherFailoverConsumerTest {
         when(serverCnxWithOldVersion.getRemoteEndpointProtocolVersion())
             .thenReturn(ProtocolVersion.v11.getNumber());
         when(serverCnxWithOldVersion.ctx()).thenReturn(channelCtx);
+        doReturn(new PulsarCommandSenderImpl(null, serverCnxWithOldVersion))
+                .when(serverCnxWithOldVersion).getCommandSender();
 
         NamespaceService nsSvc = mock(NamespaceService.class);
         doReturn(nsSvc).when(pulsar).getNamespaceService();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index bd56f35..ed49143 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -199,6 +199,8 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
         doReturn(true).when(serverCnx).isActive();
         doReturn(true).when(serverCnx).isWritable();
         doReturn(new InetSocketAddress("localhost", 1234)).when(serverCnx).clientAddress();
+        doReturn(new PulsarCommandSenderImpl(null, serverCnx))
+                .when(serverCnx).getCommandSender();
 
         NamespaceService nsSvc = mock(NamespaceService.class);
         doReturn(nsSvc).when(pulsar).getNamespaceService();