You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/06/21 04:35:28 UTC

[GitHub] [pulsar] congbobo184 commented on a diff in pull request #16072: [fix][Java Client] Fix thread safety issue of `LastCumulativeAck`

congbobo184 commented on code in PR #16072:
URL: https://github.com/apache/pulsar/pull/16072#discussion_r902144130


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -370,29 +371,9 @@ private void doIndividualBatchAckAsync(BatchMessageIdImpl batchMessageId) {
 
     private void doCumulativeAckAsync(MessageIdImpl msgId, BitSetRecyclable bitSet) {
         // Handle concurrent updates from different threads
-        LastCumulativeAck currentCumulativeAck = LastCumulativeAck.create(msgId, bitSet);
-        while (true) {
-            LastCumulativeAck lastCumulativeAck = this.lastCumulativeAck;
-            if (msgId.compareTo(lastCumulativeAck.messageId) > 0) {
-                if (LAST_CUMULATIVE_ACK_UPDATER.compareAndSet(this, this.lastCumulativeAck, currentCumulativeAck)) {
-                    if (lastCumulativeAck.bitSetRecyclable != null) {
-                        try {
-                            lastCumulativeAck.bitSetRecyclable.recycle();
-                        } catch (Exception ignore) {
-                            // no-op
-                        }
-                        lastCumulativeAck.bitSetRecyclable = null;
-                    }
-                    lastCumulativeAck.recycle();
-                    // Successfully updated the last cumulative ack. Next flush iteration will send this to broker.
-                    cumulativeAckFlushRequired = true;
-                    return;
-                }
-            } else {
-                currentCumulativeAck.recycle();
-                // message id acknowledging an before the current last cumulative ack
-                return;
-            }
+        synchronized (lastCumulativeAck) {
+            lastCumulativeAck.update(msgId, bitSet);

Review Comment:
   may we should check the msgId is rather than the last cumuative ack



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -60,6 +59,13 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
      * When reaching the max group size, an ack command is sent out immediately.
      */
     private static final int MAX_ACK_GROUP_SIZE = 1000;
+    private static final FastThreadLocal<LastCumulativeAck> LOCAL_LAST_CUMULATIVE_ACK_TO_FLUSH =

Review Comment:
   If use FastThreadLocal, multiple `PersistentAcknowledgmentsGroupingTrackers` will affect each other `LastCumulativeAck `



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org