You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/12/03 05:35:15 UTC
[pulsar] 01/04: Make ServerCnx,
Producer and Consumer independent of Netty (#6720)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit b9855083d2bd6128897bfecb068d142f7c1a6e24
Author: Christophe Bornet <cb...@hotmail.com>
AuthorDate: Wed Nov 18 07:09:35 2020 +0100
Make ServerCnx, Producer and Consumer independent of Netty (#6720)
This PR is a first step to implement [PIP 59: gRPC Protocol Handler](https://github.com/apache/pulsar/wiki/PIP-59%3A-gPRC-Protocol-Handler)
It separates `ServerCnx` used by `Consumer` and `Producer` into an interface and an implementation. Alternate protocol handlers that use Pulsar's topics, consumers and producer will implement the interface with their own implementation.
(cherry picked from commit 2a183ab0dfedfda9d13a8536c02d95c109056b63)
---
.../pulsar/broker/service/BrokerService.java | 4 +-
.../org/apache/pulsar/broker/service/Consumer.java | 120 +++------------------
.../org/apache/pulsar/broker/service/Producer.java | 52 ++++-----
.../broker/service/PulsarChannelInitializer.java | 2 +-
.../pulsar/broker/service/PulsarCommandSender.java | 16 ++-
.../broker/service/PulsarCommandSenderImpl.java | 105 ++++++++++++++++++
.../apache/pulsar/broker/service/ServerCnx.java | 39 +++++--
.../org/apache/pulsar/broker/service/Topic.java | 2 +-
.../apache/pulsar/broker/service/TransportCnx.java | 80 ++++++++++++++
.../service/nonpersistent/NonPersistentTopic.java | 6 +-
.../broker/service/persistent/PersistentTopic.java | 10 +-
.../service/MessagePublishBufferThrottleTest.java | 23 ++--
.../PersistentDispatcherFailoverConsumerTest.java | 4 +
.../pulsar/broker/service/PersistentTopicTest.java | 2 +
14 files changed, 307 insertions(+), 158 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 2dee374..b27dba2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2260,8 +2260,8 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
}
}
- private void foreachCnx(Consumer<ServerCnx> consumer) {
- Set<ServerCnx> cnxSet = new HashSet<>();
+ private void foreachCnx(Consumer<TransportCnx> consumer) {
+ Set<TransportCnx> cnxSet = new HashSet<>();
topics.forEach((n, t) -> {
Optional<Topic> topic = extractTopic(t);
topic.ifPresent(value -> value.getProducers().values().forEach(producer -> cnxSet.add(producer.getCnx())));
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index c2dd900..968050c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -23,12 +23,6 @@ import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.base.MoreObjects;
import com.google.common.collect.Lists;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelPromise;
-
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
@@ -39,6 +33,8 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.Promise;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -53,10 +49,8 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosi
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.PulsarApi.IntRange;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
-import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ConsumerStats;
-import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.stats.Rate;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.SafeCollectionUtils;
@@ -69,7 +63,7 @@ import org.slf4j.LoggerFactory;
public class Consumer {
private final Subscription subscription;
private final SubType subType;
- private final ServerCnx cnx;
+ private final TransportCnx cnx;
private final String appId;
private AuthenticationDataSource authenticationData;
private final String topicName;
@@ -131,7 +125,7 @@ public class Consumer {
public Consumer(Subscription subscription, SubType subType, String topicName, long consumerId,
int priorityLevel, String consumerName,
- int maxUnackedMessages, ServerCnx cnx, String appId,
+ int maxUnackedMessages, TransportCnx cnx, String appId,
Map<String, String> metadata, boolean readCompacted, InitialPosition subscriptionInitialPosition,
PulsarApi.KeySharedMeta keySharedMeta) throws BrokerServiceException {
@@ -191,18 +185,11 @@ public class Consumer {
}
void notifyActiveConsumerChange(Consumer activeConsumer) {
- if (!Commands.peerSupportsActiveConsumerListener(cnx.getRemoteEndpointProtocolVersion())) {
- // if the client is older than `v12`, we don't need to send consumer group changes.
- return;
- }
-
if (log.isDebugEnabled()) {
log.debug("notify consumer {} - that [{}] for subscription {} has new active consumer : {}",
consumerId, topicName, subscription.getName(), activeConsumer);
}
- cnx.ctx().writeAndFlush(
- Commands.newActiveConsumerChange(consumerId, this == activeConsumer),
- cnx.ctx().voidPromise());
+ cnx.getCommandSender().sendActiveConsumerChange(consumerId, this == activeConsumer);
}
public boolean readCompacted() {
@@ -215,23 +202,21 @@ public class Consumer {
*
* @return a SendMessageInfo object that contains the detail of what was sent to consumer
*/
-
- public ChannelPromise sendMessages(final List<Entry> entries, EntryBatchSizes batchSizes, EntryBatchIndexesAcks batchIndexesAcks,
+ public Future<Void> sendMessages(final List<Entry> entries, EntryBatchSizes batchSizes, EntryBatchIndexesAcks batchIndexesAcks,
int totalMessages, long totalBytes, long totalChunkedMessages, RedeliveryTracker redeliveryTracker) {
this.lastConsumedTimestamp = System.currentTimeMillis();
- final ChannelHandlerContext ctx = cnx.ctx();
- final ChannelPromise writePromise = ctx.newPromise();
if (entries.isEmpty() || totalMessages == 0) {
if (log.isDebugEnabled()) {
log.debug("[{}-{}] List of messages is empty, triggering write future immediately for consumerId {}",
topicName, subscription, consumerId);
}
- writePromise.setSuccess();
batchSizes.recyle();
if (batchIndexesAcks != null) {
batchIndexesAcks.recycle();
}
+ final Promise<Void> writePromise = cnx.newPromise();
+ writePromise.setSuccess(null);
return writePromise;
}
@@ -263,66 +248,8 @@ public class Consumer {
bytesOutCounter.add(totalBytes);
chuckedMessageRate.recordMultipleEvents(totalChunkedMessages, 0);
- ctx.channel().eventLoop().execute(() -> {
- for (int i = 0; i < entries.size(); i++) {
- Entry entry = entries.get(i);
- if (entry == null) {
- // Entry was filtered out
- continue;
- }
-
- int batchSize = batchSizes.getBatchSize(i);
-
- if (batchSize > 1 && !cnx.isBatchMessageCompatibleVersion()) {
- log.warn("[{}-{}] Consumer doesn't support batch messages - consumerId {}, msg id {}-{}",
- topicName, subscription,
- consumerId, entry.getLedgerId(), entry.getEntryId());
- ctx.close();
- entry.release();
- continue;
- }
-
- MessageIdData.Builder messageIdBuilder = MessageIdData.newBuilder();
- MessageIdData messageId = messageIdBuilder
- .setLedgerId(entry.getLedgerId())
- .setEntryId(entry.getEntryId())
- .setPartition(partitionIdx)
- .build();
-
- ByteBuf metadataAndPayload = entry.getDataBuffer();
- // increment ref-count of data and release at the end of process: so, we can get chance to call entry.release
- metadataAndPayload.retain();
- // skip checksum by incrementing reader-index if consumer-client doesn't support checksum verification
- if (cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v11.getNumber()) {
- Commands.skipChecksumIfPresent(metadataAndPayload);
- }
-
- if (log.isDebugEnabled()) {
- log.debug("[{}-{}] Sending message to consumerId {}, msg id {}-{}", topicName, subscription,
- consumerId, entry.getLedgerId(), entry.getEntryId());
- }
-
- int redeliveryCount = 0;
- PositionImpl position = PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId());
- if (redeliveryTracker.contains(position)) {
- redeliveryCount = redeliveryTracker.incrementAndGetRedeliveryCount(position);
- }
- ctx.write(cnx.newMessageAndIntercept(consumerId, messageId, redeliveryCount, metadataAndPayload,
- batchIndexesAcks == null ? null : batchIndexesAcks.getAckSet(i), topicName), ctx.voidPromise());
- messageId.recycle();
- messageIdBuilder.recycle();
- entry.release();
- }
-
- // Use an empty write here so that we can just tie the flush with the write promise for last entry
- ctx.writeAndFlush(Unpooled.EMPTY_BUFFER, writePromise);
- batchSizes.recyle();
- if (batchIndexesAcks != null) {
- batchIndexesAcks.recycle();
- }
- });
-
- return writePromise;
+ return cnx.getCommandSender().sendMessagesToConsumer(consumerId, topicName, subscription, partitionIdx,
+ entries, batchSizes, batchIndexesAcks, redeliveryTracker);
}
private void incrementUnackedMessages(int ackedMessages) {
@@ -337,10 +264,6 @@ public class Consumer {
return cnx.isWritable();
}
- public void sendError(ByteBuf error) {
- cnx.ctx().writeAndFlush(error).addListener(ChannelFutureListener.CLOSE);
- }
-
/**
* Close the consumer if: a. the connection is dropped b. connection is open (graceful close) and there are no
* pending message acks
@@ -368,18 +291,15 @@ public class Consumer {
}
}
- void doUnsubscribe(final long requestId) {
- final ChannelHandlerContext ctx = cnx.ctx();
-
+ public void doUnsubscribe(final long requestId) {
subscription.doUnsubscribe(this).thenAccept(v -> {
log.info("Unsubscribed successfully from {}", subscription);
cnx.removedConsumer(this);
- ctx.writeAndFlush(Commands.newSuccess(requestId));
+ cnx.getCommandSender().sendSuccess(requestId);
}).exceptionally(exception -> {
log.warn("Unsubscribe failed for {}", subscription, exception);
- ctx.writeAndFlush(
- Commands.newError(requestId, BrokerServiceException.getClientErrorCode(exception),
- exception.getCause().getMessage()));
+ cnx.getCommandSender().sendError(requestId, BrokerServiceException.getClientErrorCode(exception),
+ exception.getCause().getMessage());
return null;
});
}
@@ -439,7 +359,7 @@ public class Consumer {
}
}
- void flowPermits(int additionalNumberOfMessages) {
+ public void flowPermits(int additionalNumberOfMessages) {
checkArgument(additionalNumberOfMessages > 0);
// block shared consumer when unacked-messages reaches limit
@@ -489,11 +409,7 @@ public class Consumer {
}
public void reachedEndOfTopic() {
- // Only send notification if the client understand the command
- if (cnx.getRemoteEndpointProtocolVersion() >= ProtocolVersion.v9_VALUE) {
- log.info("[{}] Notifying consumer that end of topic has been reached", this);
- cnx.ctx().writeAndFlush(Commands.newReachedEndOfTopic(consumerId));
- }
+ cnx.getCommandSender().sendReachedEndOfTopic(consumerId);
}
/**
@@ -546,10 +462,6 @@ public class Consumer {
.add("consumerName", consumerName).add("address", this.cnx.clientAddress()).toString();
}
- public ChannelHandlerContext ctx() {
- return cnx.ctx();
- }
-
public void checkPermissions() {
TopicName topicName = TopicName.get(subscription.getTopicName());
if (cnx.getBrokerService().getAuthorizationService() != null) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index 2fd9c2e..3216d43 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.service;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.scurrilous.circe.checksum.Crc32cIntChecksum.computeChecksum;
+import static org.apache.pulsar.broker.service.AbstractReplicator.REPL_PRODUCER_NAME_DELIMITER;
import static org.apache.pulsar.common.protocol.Commands.hasChecksum;
import static org.apache.pulsar.common.protocol.Commands.readChecksum;
@@ -52,14 +53,13 @@ import org.apache.pulsar.common.stats.Rate;
import org.apache.pulsar.common.util.DateFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.pulsar.broker.service.AbstractReplicator.REPL_PRODUCER_NAME_DELIMITER;
/**
* Represents a currently connected producer
*/
public class Producer {
private final Topic topic;
- private final ServerCnx cnx;
+ private final TransportCnx cnx;
private final String producerName;
private final long epoch;
private final boolean userProvidedProducerName;
@@ -88,7 +88,7 @@ public class Producer {
private final SchemaVersion schemaVersion;
- public Producer(Topic topic, ServerCnx cnx, long producerId, String producerName, String appId,
+ public Producer(Topic topic, TransportCnx cnx, long producerId, String producerName, String appId,
boolean isEncrypted, Map<String, String> metadata, SchemaVersion schemaVersion, long epoch,
boolean userProvidedProducerName) {
this.topic = topic;
@@ -140,41 +140,42 @@ public class Producer {
public void publishMessage(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize,
boolean isChunked) {
- beforePublish(producerId, sequenceId, headersAndPayload, batchSize);
- publishMessageToTopic(headersAndPayload, sequenceId, batchSize, isChunked);
+ if (checkAndStartPublish(producerId, sequenceId, headersAndPayload, batchSize)) {
+ publishMessageToTopic(headersAndPayload, sequenceId, batchSize, isChunked);
+ }
}
public void publishMessage(long producerId, long lowestSequenceId, long highestSequenceId,
- ByteBuf headersAndPayload, long batchSize, boolean isChunked) {
+ ByteBuf headersAndPayload, long batchSize, boolean isChunked) {
if (lowestSequenceId > highestSequenceId) {
- cnx.ctx().channel().eventLoop().execute(() -> {
+ cnx.execute(() -> {
cnx.getCommandSender().sendSendError(producerId, highestSequenceId, ServerError.MetadataError,
"Invalid lowest or highest sequence id");
cnx.completedSendOperation(isNonPersistentTopic, headersAndPayload.readableBytes());
});
return;
}
- beforePublish(producerId, highestSequenceId, headersAndPayload, batchSize);
- publishMessageToTopic(headersAndPayload, lowestSequenceId, highestSequenceId, batchSize, isChunked);
+ if (checkAndStartPublish(producerId, highestSequenceId, headersAndPayload, batchSize)) {
+ publishMessageToTopic(headersAndPayload, lowestSequenceId, highestSequenceId, batchSize, isChunked);
+ }
}
- public void beforePublish(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize) {
+ public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize) {
if (isClosed) {
- cnx.ctx().channel().eventLoop().execute(() -> {
+ cnx.execute(() -> {
cnx.getCommandSender().sendSendError(producerId, sequenceId, ServerError.PersistenceError,
"Producer is closed");
cnx.completedSendOperation(isNonPersistentTopic, headersAndPayload.readableBytes());
});
-
- return;
+ return false;
}
if (!verifyChecksum(headersAndPayload)) {
- cnx.ctx().channel().eventLoop().execute(() -> {
+ cnx.execute(() -> {
cnx.getCommandSender().sendSendError(producerId, sequenceId, ServerError.ChecksumError, "Checksum failed on the broker");
cnx.completedSendOperation(isNonPersistentTopic, headersAndPayload.readableBytes());
});
- return;
+ return false;
}
if (topic.isEncryptionRequired()) {
@@ -187,16 +188,17 @@ public class Producer {
// Check whether the message is encrypted or not
if (encryptionKeysCount < 1) {
log.warn("[{}] Messages must be encrypted", getTopic().getName());
- cnx.ctx().channel().eventLoop().execute(() -> {
+ cnx.execute(() -> {
cnx.getCommandSender().sendSendError(producerId, sequenceId, ServerError.MetadataError,
"Messages must be encrypted");
cnx.completedSendOperation(isNonPersistentTopic, headersAndPayload.readableBytes());
});
- return;
+ return false;
}
}
startPublishOperation((int) batchSize, headersAndPayload.readableBytes());
+ return true;
}
private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, long batchSize, boolean isChunked) {
@@ -266,7 +268,7 @@ public class Producer {
/**
* Return the sequence id of
- * @return
+ * @return the sequence id
*/
public long getLastSequenceId() {
if (isNonPersistentTopic) {
@@ -276,7 +278,7 @@ public class Producer {
}
}
- public ServerCnx getCnx() {
+ public TransportCnx getCnx() {
return this.cnx;
}
@@ -354,7 +356,7 @@ public class Producer {
ServerError serverError = (exception instanceof TopicTerminatedException)
? ServerError.TopicTerminatedError : ServerError.PersistenceError;
- producer.cnx.ctx().channel().eventLoop().execute(() -> {
+ producer.cnx.execute(() -> {
if (!(exception instanceof TopicClosedException)) {
// For TopicClosed exception there's no need to send explicit error, since the client was
// already notified
@@ -374,7 +376,7 @@ public class Producer {
this.ledgerId = ledgerId;
this.entryId = entryId;
- producer.cnx.ctx().channel().eventLoop().execute(this);
+ producer.cnx.execute(this);
}
}
@@ -417,7 +419,7 @@ public class Producer {
}
static MessagePublishContext get(Producer producer, long lowestSequenceId, long highestSequenceId, Rate rateIn,
- int msgSize, long batchSize, boolean chunked, long startTimeNs) {
+ int msgSize, long batchSize, boolean chunked, long startTimeNs) {
MessagePublishContext callback = RECYCLER.get();
callback.producer = producer;
callback.sequenceId = lowestSequenceId;
@@ -439,7 +441,7 @@ public class Producer {
}
private static final Recycler<MessagePublishContext> RECYCLER = new Recycler<MessagePublishContext>() {
- protected MessagePublishContext newObject(Recycler.Handle<MessagePublishContext> handle) {
+ protected MessagePublishContext newObject(Handle<MessagePublishContext> handle) {
return new MessagePublishContext(handle);
}
};
@@ -511,7 +513,7 @@ public class Producer {
return closeFuture;
}
- void closeNow(boolean removeFromTopic) {
+ public void closeNow(boolean removeFromTopic) {
if (removeFromTopic) {
topic.removeProducer(this);
}
@@ -532,7 +534,7 @@ public class Producer {
public CompletableFuture<Void> disconnect() {
if (!closeFuture.isDone()) {
log.info("Disconnecting producer: {}", this);
- cnx.ctx().executor().execute(() -> {
+ cnx.execute(() -> {
cnx.closeProducer(this);
closeNow(true);
});
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
index 2a2d3d5..7425ffb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
@@ -137,7 +137,7 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel>
try {
cnx.refreshAuthenticationCredentials();
} catch (Throwable t) {
- log.warn("[{}] Failed to refresh auth credentials", cnx.getRemoteAddress());
+ log.warn("[{}] Failed to refresh auth credentials", cnx.clientAddress());
}
});
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
index 1773106..141c7c8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
@@ -18,6 +18,9 @@
*/
package org.apache.pulsar.broker.service;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.ImmediateEventExecutor;
+import org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
@@ -26,7 +29,6 @@ import java.util.List;
public interface PulsarCommandSender {
-
void sendPartitionMetadataResponse(PulsarApi.ServerError error, String errorMsg, long requestId);
void sendPartitionMetadataResponse(int partitions, long requestId);
@@ -61,4 +63,16 @@ public interface PulsarCommandSender {
PulsarApi.CommandLookupTopicResponse.LookupType response, long requestId, boolean proxyThroughServiceUrl);
void sendLookupResponse(PulsarApi.ServerError error, String errorMsg, long requestId);
+
+ void sendActiveConsumerChange(long consumerId, boolean isActive);
+
+ void sendSuccess(long requestId);
+
+ void sendError(long requestId, PulsarApi.ServerError error, String message);
+
+ void sendReachedEndOfTopic(long consumerId);
+
+ Future<Void> sendMessagesToConsumer(long consumerId, String topicName, Subscription subscription,
+ int partitionIdx, final List<Entry> entries, EntryBatchSizes batchSizes, EntryBatchIndexesAcks batchIndexesAcks,
+ RedeliveryTracker redeliveryTracker);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
index c9f1b61..c3d6a56 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
@@ -19,9 +19,16 @@
package org.apache.pulsar.broker.service;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
+import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
@@ -206,6 +213,104 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
cnx.ctx().writeAndFlush(outBuf);
}
+ @Override
+ public void sendActiveConsumerChange(long consumerId, boolean isActive) {
+ if (!Commands.peerSupportsActiveConsumerListener(cnx.getRemoteEndpointProtocolVersion())) {
+ // if the client is older than `v12`, we don't need to send consumer group changes.
+ return;
+ }
+ cnx.ctx().writeAndFlush(
+ Commands.newActiveConsumerChange(consumerId, isActive),
+ cnx.ctx().voidPromise());
+ }
+
+ @Override
+ public void sendSuccess(long requestId) {
+ cnx.ctx().writeAndFlush(Commands.newSuccess(requestId));
+ }
+
+ @Override
+ public void sendError(long requestId, PulsarApi.ServerError error, String message) {
+ cnx.ctx().writeAndFlush(Commands.newError(requestId, error, message));
+ }
+
+ @Override
+ public void sendReachedEndOfTopic(long consumerId) {
+ // Only send notification if the client understand the command
+ if (cnx.getRemoteEndpointProtocolVersion() >= PulsarApi.ProtocolVersion.v9_VALUE) {
+ log.info("[{}] Notifying consumer that end of topic has been reached", this);
+ cnx.ctx().writeAndFlush(Commands.newReachedEndOfTopic(consumerId));
+ }
+ }
+
+ @Override
+ public ChannelPromise sendMessagesToConsumer(long consumerId, String topicName, Subscription subscription,
+ int partitionIdx, final List<Entry> entries, EntryBatchSizes batchSizes, EntryBatchIndexesAcks batchIndexesAcks,
+ RedeliveryTracker redeliveryTracker) {
+ final ChannelHandlerContext ctx = cnx.ctx();
+ final ChannelPromise writePromise = ctx.newPromise();
+ ctx.channel().eventLoop().execute(() -> {
+ for (int i = 0; i < entries.size(); i++) {
+ Entry entry = entries.get(i);
+ if (entry == null) {
+ // Entry was filtered out
+ continue;
+ }
+
+ int batchSize = batchSizes.getBatchSize(i);
+
+ if (batchSize > 1 && !cnx.isBatchMessageCompatibleVersion()) {
+ log.warn("[{}-{}] Consumer doesn't support batch messages - consumerId {}, msg id {}-{}",
+ topicName, subscription,
+ consumerId, entry.getLedgerId(), entry.getEntryId());
+ ctx.close();
+ entry.release();
+ continue;
+ }
+
+ MessageIdData.Builder messageIdBuilder = MessageIdData.newBuilder();
+ MessageIdData messageId = messageIdBuilder
+ .setLedgerId(entry.getLedgerId())
+ .setEntryId(entry.getEntryId())
+ .setPartition(partitionIdx)
+ .build();
+
+ ByteBuf metadataAndPayload = entry.getDataBuffer();
+ // increment ref-count of data and release at the end of process: so, we can get chance to call entry.release
+ metadataAndPayload.retain();
+ // skip checksum by incrementing reader-index if consumer-client doesn't support checksum verification
+ if (cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v11.getNumber()) {
+ Commands.skipChecksumIfPresent(metadataAndPayload);
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("[{}-{}] Sending message to consumerId {}, msg id {}-{}", topicName, subscription,
+ consumerId, entry.getLedgerId(), entry.getEntryId());
+ }
+
+ int redeliveryCount = 0;
+ PositionImpl position = PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId());
+ if (redeliveryTracker.contains(position)) {
+ redeliveryCount = redeliveryTracker.incrementAndGetRedeliveryCount(position);
+ }
+ ctx.write(cnx.newMessageAndIntercept(consumerId, messageId, redeliveryCount, metadataAndPayload,
+ batchIndexesAcks == null ? null : batchIndexesAcks.getAckSet(i), topicName), ctx.voidPromise());
+ messageId.recycle();
+ messageIdBuilder.recycle();
+ entry.release();
+ }
+
+ // Use an empty write here so that we can just tie the flush with the write promise for last entry
+ ctx.writeAndFlush(Unpooled.EMPTY_BUFFER, writePromise);
+ batchSizes.recyle();
+ if (batchIndexesAcks != null) {
+ batchIndexesAcks.recycle();
+ }
+ });
+
+ return writePromise;
+ }
+
private void safeIntercept(PulsarApi.BaseCommand command, ServerCnx cnx) {
try {
this.interceptor.onPulsarCommand(command, cnx);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 3b0426e..6497e12 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -34,6 +34,7 @@ import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.handler.ssl.SslHandler;
+import io.netty.util.concurrent.Promise;
import java.net.SocketAddress;
@@ -134,7 +135,7 @@ import org.apache.pulsar.transaction.impl.common.TxnStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ServerCnx extends PulsarHandler {
+public class ServerCnx extends PulsarHandler implements TransportCnx {
private final BrokerService service;
private final SchemaRegistryService schemaService;
private final ConcurrentLongHashMap<CompletableFuture<Producer>> producers;
@@ -1771,6 +1772,7 @@ public class ServerCnx extends PulsarHandler {
}
+ @Override
public void closeConsumer(Consumer consumer) {
// removes consumer-connection from map and send close command to consumer
if (log.isDebugEnabled()) {
@@ -1793,10 +1795,12 @@ public class ServerCnx extends PulsarHandler {
ctx.close();
}
+ @Override
public SocketAddress clientAddress() {
return remoteAddress;
}
+ @Override
public void removedConsumer(Consumer consumer) {
if (log.isDebugEnabled()) {
log.debug("[{}] Removed consumer: {}", remoteAddress, consumer);
@@ -1805,6 +1809,7 @@ public class ServerCnx extends PulsarHandler {
consumers.remove(consumer.consumerId());
}
+ @Override
public void removedProducer(Producer producer) {
if (log.isDebugEnabled()) {
log.debug("[{}] Removed producer: {}", remoteAddress, producer);
@@ -1812,10 +1817,12 @@ public class ServerCnx extends PulsarHandler {
producers.remove(producer.getProducerId());
}
+ @Override
public boolean isActive() {
return isActive;
}
+ @Override
public boolean isWritable() {
return ctx.channel().isWritable();
}
@@ -1848,6 +1855,7 @@ public class ServerCnx extends PulsarHandler {
}
}
+ @Override
public void completedSendOperation(boolean isNonPersistentTopic, int msgSize) {
MSG_PUBLISH_BUFFER_SIZE_UPDATER.getAndAdd(this, -msgSize);
if (--pendingSendRequest == resumeReadsThreshold) {
@@ -1861,6 +1869,7 @@ public class ServerCnx extends PulsarHandler {
}
}
+ @Override
public void enableCnxAutoRead() {
// we can add check (&& pendingSendRequest < MaxPendingSendRequests) here but then it requires
// pendingSendRequest to be volatile and it can be expensive while writing. also this will be called on if
@@ -1874,21 +1883,22 @@ public class ServerCnx extends PulsarHandler {
}
}
+ @Override
public void disableCnxAutoRead() {
if (ctx != null && ctx.channel().config().isAutoRead() ) {
ctx.channel().config().setAutoRead(false);
}
}
- @VisibleForTesting
- void cancelPublishRateLimiting() {
+ @Override
+ public void cancelPublishRateLimiting() {
if (autoReadDisabledRateLimiting) {
autoReadDisabledRateLimiting = false;
}
}
- @VisibleForTesting
- void cancelPublishBufferLimiting() {
+ @Override
+ public void cancelPublishBufferLimiting() {
if (autoReadDisabledPublishBufferLimiting) {
autoReadDisabledPublishBufferLimiting = false;
}
@@ -1963,7 +1973,7 @@ public class ServerCnx extends PulsarHandler {
/**
* Helper method for testability
*
- * @return
+ * @return the connection state
*/
public State getState() {
return state;
@@ -1981,10 +1991,16 @@ public class ServerCnx extends PulsarHandler {
return authRole;
}
+ @Override
+ public Promise<Void> newPromise() {
+ return ctx.newPromise();
+ }
+
boolean hasConsumer(long consumerId) {
return consumers.containsKey(consumerId);
}
+ @Override
public boolean isBatchMessageCompatibleVersion() {
return remoteEndpointProtocolVersion >= ProtocolVersion.v4.getNumber();
}
@@ -1993,10 +2009,12 @@ public class ServerCnx extends PulsarHandler {
return features != null && features.getSupportsAuthRefresh();
}
+ @Override
public String getClientVersion() {
return clientVersion;
}
+ @Override
public long getMessagePublishBufferSize() {
return this.messagePublishBufferSize;
}
@@ -2011,6 +2029,7 @@ public class ServerCnx extends PulsarHandler {
this.autoReadDisabledRateLimiting = isLimiting;
}
+ @Override
public boolean isPreciseDispatcherFlowControl() {
return preciseDispatcherFlowControl;
}
@@ -2019,6 +2038,7 @@ public class ServerCnx extends PulsarHandler {
return authState;
}
+ @Override
public AuthenticationDataSource getAuthenticationData() {
return originalAuthData != null ? originalAuthData : authenticationData;
}
@@ -2031,6 +2051,7 @@ public class ServerCnx extends PulsarHandler {
return authenticationProvider;
}
+ @Override
public String getAuthRole() {
return authRole;
}
@@ -2047,7 +2068,13 @@ public class ServerCnx extends PulsarHandler {
return producers;
}
+ @Override
public PulsarCommandSender getCommandSender() {
return commandSender;
}
+
+ @Override
+ public void execute(Runnable runnable) {
+ ctx.channel().eventLoop().execute(runnable);
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 4d14326..ff5e962 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -102,7 +102,7 @@ public interface Topic {
*/
void recordAddLatency(long latency, TimeUnit unit);
- CompletableFuture<Consumer> subscribe(ServerCnx cnx, String subscriptionName, long consumerId, SubType subType,
+ CompletableFuture<Consumer> subscribe(TransportCnx cnx, String subscriptionName, long consumerId, SubType subType,
int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId,
Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition,
long startMessageRollbackDurationSec, boolean replicateSubscriptionState, PulsarApi.KeySharedMeta keySharedMeta);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java
new file mode 100644
index 0000000..2b43eaa
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import io.netty.util.concurrent.Promise;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
+
+import java.net.SocketAddress;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+public interface TransportCnx {
+
+ String getClientVersion();
+
+ SocketAddress clientAddress();
+
+ BrokerService getBrokerService();
+
+ PulsarCommandSender getCommandSender();
+
+ boolean isBatchMessageCompatibleVersion();
+
+ /**
+ * The security role for this connection
+ * @return the role
+ */
+ String getAuthRole();
+
+ AuthenticationDataSource getAuthenticationData();
+
+ boolean isActive();
+
+ boolean isWritable();
+
+ void completedSendOperation(boolean isNonPersistentTopic, int msgSize);
+
+ void removedProducer(Producer producer);
+
+ void closeProducer(Producer producer);
+
+ long getMessagePublishBufferSize();
+
+ void cancelPublishRateLimiting();
+
+ void cancelPublishBufferLimiting();
+
+ void disableCnxAutoRead();
+
+ void enableCnxAutoRead();
+
+ void execute(Runnable runnable);
+
+ void removedConsumer(Consumer consumer);
+
+ void closeConsumer(Consumer consumer);
+
+ boolean isPreciseDispatcherFlowControl();
+
+ Promise<Void> newPromise();
+
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index b770f87..dd729bc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -56,7 +56,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException.UnsupportedVersio
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.Replicator;
-import org.apache.pulsar.broker.service.ServerCnx;
+import org.apache.pulsar.broker.service.TransportCnx;
import org.apache.pulsar.broker.service.StreamingStats;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
@@ -237,7 +237,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
}
@Override
- public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscriptionName, long consumerId,
+ public CompletableFuture<Consumer> subscribe(final TransportCnx cnx, String subscriptionName, long consumerId,
SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId,
Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition,
long resetStartMessageBackInSec, boolean replicateSubscriptionState, PulsarApi.KeySharedMeta keySharedMeta) {
@@ -291,7 +291,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
try {
Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName, 0,
- cnx, cnx.getRole(), metadata, readCompacted, initialPosition, keySharedMeta);
+ cnx, cnx.getAuthRole(), metadata, readCompacted, initialPosition, keySharedMeta);
addConsumerToSubscription(subscription, consumer);
if (!cnx.isActive()) {
consumer.close();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 198195d..9241ce9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -84,7 +84,7 @@ import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.Replicator;
-import org.apache.pulsar.broker.service.ServerCnx;
+import org.apache.pulsar.broker.service.TransportCnx;
import org.apache.pulsar.broker.service.StreamingStats;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
@@ -514,7 +514,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
}
@Override
- public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscriptionName, long consumerId,
+ public CompletableFuture<Consumer> subscribe(final TransportCnx cnx, String subscriptionName, long consumerId,
SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId,
Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition,
long startMessageRollbackDurationSec, boolean replicatedSubscriptionState, PulsarApi.KeySharedMeta keySharedMeta) {
@@ -571,9 +571,9 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
return future;
}
- if (cnx.getRemoteAddress() != null && cnx.getRemoteAddress().toString().contains(":")) {
+ if (cnx.clientAddress() != null && cnx.clientAddress().toString().contains(":")) {
SubscribeRateLimiter.ConsumerIdentifier consumer = new SubscribeRateLimiter.ConsumerIdentifier(
- cnx.getRemoteAddress().toString().split(":")[0], consumerName, consumerId);
+ cnx.clientAddress().toString().split(":")[0], consumerName, consumerId);
if (subscribeRateLimiter.isPresent() && !subscribeRateLimiter.get().subscribeAvailable(consumer) || !subscribeRateLimiter.get().tryAcquire(consumer)) {
log.warn("[{}] Failed to create subscription for {} {} limited by {}, available {}",
topic, subscriptionName, consumer, subscribeRateLimiter.get().getSubscribeRate(),
@@ -610,7 +610,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
subscriptionFuture.thenAccept(subscription -> {
try {
Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName,
- maxUnackedMessages, cnx, cnx.getRole(), metadata, readCompacted, initialPosition, keySharedMeta);
+ maxUnackedMessages, cnx, cnx.getAuthRole(), metadata, readCompacted, initialPosition, keySharedMeta);
addConsumerToSubscription(subscription, consumer);
checkBackloggedCursors();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java
index 8aa9c9a..b230f8d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java
@@ -56,7 +56,8 @@ public class MessagePublishBufferThrottleTest extends BrokerTestBase {
.create();
Topic topicRef = pulsar.getBrokerService().getTopicReference(topic).get();
Assert.assertNotNull(topicRef);
- ((AbstractTopic)topicRef).producers.get("producer-name").getCnx().setMessagePublishBufferSize(Long.MAX_VALUE / 2);
+ TransportCnx cnx = ((AbstractTopic) topicRef).producers.get("producer-name").getCnx();
+ ((ServerCnx) cnx).setMessagePublishBufferSize(Long.MAX_VALUE / 2);
Thread.sleep(20);
Assert.assertFalse(pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
List<CompletableFuture<MessageId>> futures = new ArrayList<>();
@@ -86,14 +87,15 @@ public class MessagePublishBufferThrottleTest extends BrokerTestBase {
.create();
Topic topicRef = pulsar.getBrokerService().getTopicReference(topic).get();
Assert.assertNotNull(topicRef);
- ((AbstractTopic)topicRef).producers.get("producer-name").getCnx().setMessagePublishBufferSize(Long.MAX_VALUE / 2);
+ TransportCnx cnx = ((AbstractTopic) topicRef).producers.get("producer-name").getCnx();
+ ((ServerCnx) cnx).setMessagePublishBufferSize(Long.MAX_VALUE / 2);
Assert.assertFalse(pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
// The first message can publish success, but the second message should be blocked
producer.sendAsync(new byte[1024]).get(1, TimeUnit.SECONDS);
getPulsar().getBrokerService().checkMessagePublishBuffer();
Assert.assertTrue(pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
- ((AbstractTopic)topicRef).producers.get("producer-name").getCnx().setMessagePublishBufferSize(0L);
+ ((ServerCnx) cnx).setMessagePublishBufferSize(0L);
getPulsar().getBrokerService().checkMessagePublishBuffer();
Assert.assertFalse(pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
List<CompletableFuture<MessageId>> futures = new ArrayList<>();
@@ -122,7 +124,8 @@ public class MessagePublishBufferThrottleTest extends BrokerTestBase {
.create();
Topic topicRef = pulsar.getBrokerService().getTopicReference(topic).get();
Assert.assertNotNull(topicRef);
- ((AbstractTopic)topicRef).producers.get("producer-name").getCnx().setMessagePublishBufferSize(Long.MAX_VALUE / 2);
+ TransportCnx cnx = ((AbstractTopic) topicRef).producers.get("producer-name").getCnx();
+ ((ServerCnx) cnx).setMessagePublishBufferSize(Long.MAX_VALUE / 2);
Assert.assertFalse(pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
producer.sendAsync(new byte[1024]).get(1, TimeUnit.SECONDS);
@@ -131,15 +134,15 @@ public class MessagePublishBufferThrottleTest extends BrokerTestBase {
Assert.assertTrue(pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
// Block by publish rate.
- ((AbstractTopic)topicRef).producers.get("producer-name").getCnx().setMessagePublishBufferSize(0L);
+ ((ServerCnx) cnx).setMessagePublishBufferSize(0L);
getPulsar().getBrokerService().checkMessagePublishBuffer();
- ((AbstractTopic)topicRef).producers.get("producer-name").getCnx().setAutoReadDisabledRateLimiting(true);
- ((AbstractTopic)topicRef).producers.get("producer-name").getCnx().disableCnxAutoRead();
- ((AbstractTopic)topicRef).producers.get("producer-name").getCnx().enableCnxAutoRead();
+ ((ServerCnx) cnx).setAutoReadDisabledRateLimiting(true);
+ cnx.disableCnxAutoRead();
+ cnx.enableCnxAutoRead();
// Resume message publish.
- ((AbstractTopic)topicRef).producers.get("producer-name").getCnx().setAutoReadDisabledRateLimiting(false);
- ((AbstractTopic)topicRef).producers.get("producer-name").getCnx().enableCnxAutoRead();
+ ((ServerCnx) cnx).setAutoReadDisabledRateLimiting(false);
+ cnx.enableCnxAutoRead();
Assert.assertFalse(pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
List<CompletableFuture<MessageId>> futures = new ArrayList<>();
// Make sure the producer can publish succeed.
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index 29829be..98cce6e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -173,6 +173,8 @@ public class PersistentDispatcherFailoverConsumerTest {
doReturn(new InetSocketAddress("localhost", 1234)).when(serverCnx).clientAddress();
when(serverCnx.getRemoteEndpointProtocolVersion()).thenReturn(ProtocolVersion.v12.getNumber());
when(serverCnx.ctx()).thenReturn(channelCtx);
+ doReturn(new PulsarCommandSenderImpl(null, serverCnx))
+ .when(serverCnx).getCommandSender();
serverCnxWithOldVersion = spy(new ServerCnx(pulsar));
doReturn(true).when(serverCnxWithOldVersion).isActive();
@@ -182,6 +184,8 @@ public class PersistentDispatcherFailoverConsumerTest {
when(serverCnxWithOldVersion.getRemoteEndpointProtocolVersion())
.thenReturn(ProtocolVersion.v11.getNumber());
when(serverCnxWithOldVersion.ctx()).thenReturn(channelCtx);
+ doReturn(new PulsarCommandSenderImpl(null, serverCnxWithOldVersion))
+ .when(serverCnxWithOldVersion).getCommandSender();
NamespaceService nsSvc = mock(NamespaceService.class);
doReturn(nsSvc).when(pulsar).getNamespaceService();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index bd56f35..ed49143 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -199,6 +199,8 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
doReturn(true).when(serverCnx).isActive();
doReturn(true).when(serverCnx).isWritable();
doReturn(new InetSocketAddress("localhost", 1234)).when(serverCnx).clientAddress();
+ doReturn(new PulsarCommandSenderImpl(null, serverCnx))
+ .when(serverCnx).getCommandSender();
NamespaceService nsSvc = mock(NamespaceService.class);
doReturn(nsSvc).when(pulsar).getNamespaceService();