You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/09/30 17:51:40 UTC

[GitHub] [pulsar] merlimat commented on a change in pull request #5276: Fixed race condition while triggering message redelivery after an ack-timeout event

merlimat commented on a change in pull request #5276: Fixed race condition while triggering message redelivery after an ack-timeout event
URL: https://github.com/apache/pulsar/pull/5276#discussion_r329706992
 
 

 ##########
 File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
 ##########
 @@ -546,21 +546,29 @@ public void redeliverUnacknowledgedMessages() {
         if (log.isDebugEnabled()) {
             log.debug("[{}-{}] consumer {} received redelivery", topicName, subscription, consumerId);
         }
-        // redeliver unacked-msgs
-        subscription.redeliverUnacknowledgedMessages(this);
-        flowConsumerBlockedPermits(this);
+
         if (pendingAcks != null) {
-            AtomicInteger totalRedeliveryMessages = new AtomicInteger(0);
-            pendingAcks.forEach(
-                    (ledgerId, entryId, batchSize, none) -> totalRedeliveryMessages.addAndGet((int) batchSize));
-            msgRedeliver.recordMultipleEvents(totalRedeliveryMessages.get(), totalRedeliveryMessages.get());
-            pendingAcks.clear();
+            List<PositionImpl> pendingPositions = new ArrayList<>((int) pendingAcks.size());
+            MutableInt totalRedeliveryMessages = new MutableInt(0);
+            pendingAcks.forEach((ledgerId, entryId, batchSize, none) -> {
+                totalRedeliveryMessages.add((int) batchSize);
+                pendingPositions.add(new PositionImpl(ledgerId, entryId));
+            });
+
+            for (PositionImpl p : pendingPositions) {
+                pendingAcks.remove(p.getLedgerId(), p.getEntryId());
 
 Review comment:
   It's to avoid missing any insertion that might be happening into pendingAcks from a different thread. (eg: if the dispatcher thread was actually sending messages at the same time)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services