You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/02/12 17:15:24 UTC

[pulsar] 02/02: Fix race condition of OpSendMsgQueue when publishing messages (#14231)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1c3c3f40b0d58108a4d0f53489f272f8468bed65
Author: lipenghui <pe...@apache.org>
AuthorDate: Sat Feb 12 13:26:42 2022 +0800

    Fix race condition of OpSendMsgQueue when publishing messages (#14231)
    
    * Add synchronized for getPendingQueueSize();
    
    * Use iterator instead.
    
    * Use counter to keep track of messages count
    
    * Changed to int
    
    Co-authored-by: Matteo Merli <mm...@apache.org>
---
 .../apache/pulsar/client/impl/ProducerImpl.java    | 34 ++++++++++------------
 1 file changed, 16 insertions(+), 18 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 877d88b..8486283 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -57,7 +57,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.function.Consumer;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.pulsar.client.api.BatcherBuilder;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.Message;
@@ -994,7 +993,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
 
             if (sequenceId > op.sequenceId) {
                 log.warn("[{}] [{}] Got ack for msg. expecting: {} - {} - got: {} - {} - queue-size: {}", topic, producerName,
-                        op.sequenceId, op.highestSequenceId, sequenceId, highestSequenceId, pendingMessages.size());
+                        op.sequenceId, op.highestSequenceId, sequenceId, highestSequenceId, pendingMessages.messagesCount());
                 // Force connection closing so that messages can be re-transmitted in a new connection
                 cnx.channel().close();
                 return;
@@ -1016,7 +1015,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                     releaseSemaphoreForSendOp(op);
                 } else {
                     log.warn("[{}] [{}] Got ack for batch msg error. expecting: {} - {} - got: {} - {} - queue-size: {}", topic, producerName,
-                            op.sequenceId, op.highestSequenceId, sequenceId, highestSequenceId, pendingMessages.size());
+                            op.sequenceId, op.highestSequenceId, sequenceId, highestSequenceId, pendingMessages.messagesCount());
                     // Force connection closing so that messages can be re-transmitted in a new connection
                     cnx.channel().close();
                     return;
@@ -1332,6 +1331,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
         private final Queue<OpSendMsg> delegate = new ArrayDeque<>();
         private int forEachDepth = 0;
         private List<OpSendMsg> postponedOpSendMgs;
+        private final AtomicInteger messagesCount = new AtomicInteger(0);
 
         @Override
         public void forEach(Consumer<? super OpSendMsg> action) {
@@ -1352,6 +1352,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
 
         public boolean add(OpSendMsg o) {
             // postpone adding to the queue while forEach iteration is in progress
+            messagesCount.addAndGet(o.numMessagesInBatch);
             if (forEachDepth > 0) {
                 if (postponedOpSendMgs == null) {
                     postponedOpSendMgs = new ArrayList<>();
@@ -1364,18 +1365,22 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
 
         public void clear() {
             delegate.clear();
+            messagesCount.set(0);
         }
 
         public void remove() {
-            delegate.remove();
+            OpSendMsg op = delegate.remove();
+            if (op != null) {
+                messagesCount.addAndGet(-op.numMessagesInBatch);
+            }
         }
 
         public OpSendMsg peek() {
             return delegate.peek();
         }
 
-        public int size() {
-            return delegate.size();
+        public int messagesCount() {
+            return messagesCount.get();
         }
 
         @Override
@@ -1548,7 +1553,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
 
                             if (log.isDebugEnabled()) {
                                 log.debug("[{}] [{}] Pending messages: {}", topic, producerName,
-                                        pendingMessages.size());
+                                        pendingMessages.messagesCount());
                             }
 
                             PulsarClientException bqe = new PulsarClientException.ProducerBlockedQuotaExceededException(
@@ -1649,7 +1654,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                     cnx.channel().close();
                     return;
                 }
-                int messagesToResend = pendingMessages.size();
+                int messagesToResend = pendingMessages.messagesCount();
                 if (messagesToResend == 0) {
                     if (log.isDebugEnabled()) {
                         log.debug("[{}] [{}] No pending messages to resend {}", topic, producerName, messagesToResend);
@@ -1755,7 +1760,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                     // The diff is less than or equal to zero, meaning that the message has been timed out.
                     // Set the callback to timeout on every message, then clear the pending queue.
                     log.info("[{}] [{}] Message send timed out. Failing {} messages", topic, producerName,
-                            pendingMessages.size());
+                            pendingMessages.messagesCount());
 
                     PulsarClientException te = new PulsarClientException.TimeoutException(
                         format("The producer %s can not send message to the topic %s within given timeout",
@@ -1925,7 +1930,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
             // called again once the new connection registers the producer with the broker.
             log.info("[{}][{}] Producer epoch mismatch or the current connection is null. Skip re-sending the "
                     + " {} pending messages since they will deliver using another connection.", topic, producerName,
-                    pendingMessages.size());
+                    pendingMessages.messagesCount());
             return;
         }
         final boolean stripChecksum = cnx.getRemoteEndpointProtocolVersion() < brokerChecksumSupportedVersion();
@@ -1997,14 +2002,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
     }
 
     public int getPendingQueueSize() {
-        if (!isBatchMessagingEnabled()) {
-            return pendingMessages.size();
-        }
-        MutableInt size = new MutableInt(0);
-        pendingMessages.forEach(op -> {
-            size.add(Math.max(op.numMessagesInBatch, 1));
-        });
-        return size.getValue();
+        return pendingMessages.messagesCount();
     }
 
     @Override