You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2022/06/10 17:07:15 UTC
[pulsar] branch master updated: [fix][clients]Check pendingIndividualBatchIndexAcks size in doIndividualBatchAckAsync (#15877)
This is an automated email from the ASF dual-hosted git repository.
zhangmingao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new de9928a5bb4 [fix][clients]Check pendingIndividualBatchIndexAcks size in doIndividualBatchAckAsync (#15877)
de9928a5bb4 is described below
commit de9928a5bb4a35c3e6129fa43cc90c2a6b454666
Author: gaozhangmin <ga...@qq.com>
AuthorDate: Sat Jun 11 01:07:06 2022 +0800
[fix][clients]Check pendingIndividualBatchIndexAcks size in doIndividualBatchAckAsync (#15877)
* Check pendingIndividualBatchIndexAcks size and flush in doIndividualBatchAckAsync
* put logic into method doIndividualBatchAck
---
.../impl/PersistentAcknowledgmentsGroupingTracker.java | 12 ++++++------
1 file changed, 6 insertions(+), 6 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index f0f0cfd7548..005f2e6b74b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -312,9 +312,15 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
return this.currentIndividualAckFuture;
} finally {
this.lock.readLock().unlock();
+ if (pendingIndividualBatchIndexAcks.size() >= MAX_ACK_GROUP_SIZE) {
+ flush();
+ }
}
} else {
doIndividualBatchAckAsync(batchMessageId);
+ if (pendingIndividualBatchIndexAcks.size() >= MAX_ACK_GROUP_SIZE) {
+ flush();
+ }
return CompletableFuture.completedFuture(null);
}
}
@@ -337,15 +343,9 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
return this.currentCumulativeAckFuture;
} finally {
this.lock.readLock().unlock();
- if (pendingIndividualBatchIndexAcks.size() >= MAX_ACK_GROUP_SIZE) {
- flush();
- }
}
} else {
doCumulativeAckAsync(messageId, bitSet);
- if (pendingIndividualBatchIndexAcks.size() >= MAX_ACK_GROUP_SIZE) {
- flush();
- }
return CompletableFuture.completedFuture(null);
}
}