You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/03/16 01:28:52 UTC
[pulsar] branch master updated: Use correct number of messages in
batch for publish rate stats during replication (#3834)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ce13251 Use correct number of messages in batch for publish rate stats during replication (#3834)
ce13251 is described below
commit ce13251caf8649f74e3f5196784eec24c062046c
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Fri Mar 15 18:28:47 2019 -0700
Use correct number of messages in batch for publish rate stats during replication (#3834)
---
.../main/java/org/apache/pulsar/client/impl/ProducerImpl.java | 10 ++++++++--
1 file changed, 8 insertions(+), 2 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 6dcaca0..0793a50 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -372,12 +372,18 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
ByteBuf encryptedPayload = encryptMessage(msgMetadataBuilder, compressedPayload);
MessageMetadata msgMetadata = msgMetadataBuilder.build();
- ByteBufPair cmd = sendMessage(producerId, sequenceId, 1, msgMetadata, encryptedPayload);
+
+ // When publishing during replication, we need to set the correct number of message in batch
+ // This is only used in tracking the publish rate stats
+ int numMessages = msg.getMessageBuilder().hasNumMessagesInBatch()
+ ? msg.getMessageBuilder().getNumMessagesInBatch()
+ : 1;
+ ByteBufPair cmd = sendMessage(producerId, sequenceId, numMessages, msgMetadata, encryptedPayload);
msgMetadataBuilder.recycle();
msgMetadata.recycle();
final OpSendMsg op = OpSendMsg.create(msg, cmd, sequenceId, callback);
- op.setNumMessagesInBatch(1);
+ op.setNumMessagesInBatch(numMessages);
op.setBatchSizeByte(encryptedPayload.readableBytes());
pendingMessages.put(op);
lastSendFuture = callback.getFuture();