You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/03/16 01:28:52 UTC

[pulsar] branch master updated: Use correct number of messages in batch for publish rate stats during replication (#3834)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ce13251  Use correct number of messages in batch for publish rate stats during replication (#3834)
ce13251 is described below

commit ce13251caf8649f74e3f5196784eec24c062046c
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Fri Mar 15 18:28:47 2019 -0700

    Use correct number of messages in batch for publish rate stats during replication (#3834)
---
 .../main/java/org/apache/pulsar/client/impl/ProducerImpl.java  | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 6dcaca0..0793a50 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -372,12 +372,18 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                     ByteBuf encryptedPayload = encryptMessage(msgMetadataBuilder, compressedPayload);
 
                     MessageMetadata msgMetadata = msgMetadataBuilder.build();
-                    ByteBufPair cmd = sendMessage(producerId, sequenceId, 1, msgMetadata, encryptedPayload);
+
+                    // When publishing during replication, we need to set the correct number of message in batch
+                    // This is only used in tracking the publish rate stats
+                    int numMessages = msg.getMessageBuilder().hasNumMessagesInBatch()
+                            ? msg.getMessageBuilder().getNumMessagesInBatch()
+                            : 1;
+                    ByteBufPair cmd = sendMessage(producerId, sequenceId, numMessages, msgMetadata, encryptedPayload);
                     msgMetadataBuilder.recycle();
                     msgMetadata.recycle();
 
                     final OpSendMsg op = OpSendMsg.create(msg, cmd, sequenceId, callback);
-                    op.setNumMessagesInBatch(1);
+                    op.setNumMessagesInBatch(numMessages);
                     op.setBatchSizeByte(encryptedPayload.readableBytes());
                     pendingMessages.put(op);
                     lastSendFuture = callback.getFuture();