You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2022/06/10 17:07:15 UTC

[pulsar] branch master updated: [fix][clients]Check pendingIndividualBatchIndexAcks size in doIndividualBatchAckAsync (#15877)

This is an automated email from the ASF dual-hosted git repository.

zhangmingao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new de9928a5bb4 [fix][clients]Check pendingIndividualBatchIndexAcks size in doIndividualBatchAckAsync (#15877)
de9928a5bb4 is described below

commit de9928a5bb4a35c3e6129fa43cc90c2a6b454666
Author: gaozhangmin <ga...@qq.com>
AuthorDate: Sat Jun 11 01:07:06 2022 +0800

    [fix][clients]Check pendingIndividualBatchIndexAcks size in doIndividualBatchAckAsync (#15877)
    
    * Check pendingIndividualBatchIndexAcks size and flush in doIndividualBatchAckAsync
    
    * put logic into method doIndividualBatchAck
---
 .../impl/PersistentAcknowledgmentsGroupingTracker.java       | 12 ++++++------
 1 file changed, 6 insertions(+), 6 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index f0f0cfd7548..005f2e6b74b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -312,9 +312,15 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
                 return this.currentIndividualAckFuture;
             } finally {
                 this.lock.readLock().unlock();
+                if (pendingIndividualBatchIndexAcks.size() >= MAX_ACK_GROUP_SIZE) {
+                    flush();
+                }
             }
         } else {
             doIndividualBatchAckAsync(batchMessageId);
+            if (pendingIndividualBatchIndexAcks.size() >= MAX_ACK_GROUP_SIZE) {
+                flush();
+            }
             return CompletableFuture.completedFuture(null);
         }
     }
@@ -337,15 +343,9 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
                     return this.currentCumulativeAckFuture;
                 } finally {
                     this.lock.readLock().unlock();
-                    if (pendingIndividualBatchIndexAcks.size() >= MAX_ACK_GROUP_SIZE) {
-                        flush();
-                    }
                 }
             } else {
                 doCumulativeAckAsync(messageId, bitSet);
-                if (pendingIndividualBatchIndexAcks.size() >= MAX_ACK_GROUP_SIZE) {
-                    flush();
-                }
                 return CompletableFuture.completedFuture(null);
             }
         }