You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/06/15 08:51:49 UTC

[GitHub] [pulsar] BewareMyPower opened a new pull request, #16072: [fix][Java Client] Fix thread safety issue of `LastCumulativeAck`

BewareMyPower opened a new pull request, #16072:
URL: https://github.com/apache/pulsar/pull/16072

   ### Motivation
   
   There were several issues caused by the thread safe issue of
   `LastCumulativeAck`, see:
   - https://github.com/apache/pulsar/pull/10586
   - https://github.com/apache/pulsar/pull/12343
   
   The root cause is that `LastCumulativeAck` could be accessed by
   different threads, especially in `flushAsync` method. But the fields are
   accessed directly and no thread safety can be guaranteed.
   
   In addition, the current `LastCumulativeAck` class  was added in
   https://github.com/apache/pulsar/pull/8996 to hold two object
   references, but this modification is wrong.
   
   Before #8996, there are two CAS operations in `doCumulativeAck` method
   in case it's called concurretly. Though the composite CAS operation is
   not atomic.
   
   However, after #8996, only CAS operation was performed but it's compared
   with a `LastCumulativeAck` object, not the two fields (`messageId` and
   `bitSetRecyclable`).
   
   ### Modifications
   
   To solve the thread safety issue, this PR move the `LastCumulativeAck`
   out of the `PersistentAcknowledgmentsGroupingTracker` to disable
   directly access to the internal fields. Then, two synchronized methods
   were added to guarantee the thread safety:
   - `update`: Guarantee the safe write operations. It also recycles the
     `BitSetRecyclable` object before assigning new values.
   - `moveOwnershipTo`: This method moves the ownership to another
     `LastCumulativeAck` object, which will be responsible to recycle the
     `BitSetRecyclable` field after that.
   
   With the methods above, each time `flushAsync` is called, move the
   ownership of `lastCumulativeAck` field to another thread local field to
   send the ACK command and recycle the `BitSetRecyclable` field.
   
   - `lastCumulativeAck` updates the latest message ID and bit set, the
     update operations can be performed by multiple threads and
     `lastCumulativeAck` saves the latest value.
   - `threadLocalLastCumulativeAckToFlush` only acts as a temporary cache
     to the latest value in `flushAsync`.
   
   ### Documentation
   
   Check the box below or label this PR directly.
   
   Need to update docs? 
   
   - [ ] `doc-required` 
   (Your PR needs to update docs and you will update later)
     
   - [x] `doc-not-needed` 
   (Please explain why)
     
   - [ ] `doc` 
   (Your PR contains doc changes)
   
   - [ ] `doc-complete`
   (Docs have been already added)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on a diff in pull request #16072: [fix][Java Client] Fix thread safety issue of `LastCumulativeAck`

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on code in PR #16072:
URL: https://github.com/apache/pulsar/pull/16072#discussion_r902144130


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -370,29 +371,9 @@ private void doIndividualBatchAckAsync(BatchMessageIdImpl batchMessageId) {
 
     private void doCumulativeAckAsync(MessageIdImpl msgId, BitSetRecyclable bitSet) {
         // Handle concurrent updates from different threads
-        LastCumulativeAck currentCumulativeAck = LastCumulativeAck.create(msgId, bitSet);
-        while (true) {
-            LastCumulativeAck lastCumulativeAck = this.lastCumulativeAck;
-            if (msgId.compareTo(lastCumulativeAck.messageId) > 0) {
-                if (LAST_CUMULATIVE_ACK_UPDATER.compareAndSet(this, this.lastCumulativeAck, currentCumulativeAck)) {
-                    if (lastCumulativeAck.bitSetRecyclable != null) {
-                        try {
-                            lastCumulativeAck.bitSetRecyclable.recycle();
-                        } catch (Exception ignore) {
-                            // no-op
-                        }
-                        lastCumulativeAck.bitSetRecyclable = null;
-                    }
-                    lastCumulativeAck.recycle();
-                    // Successfully updated the last cumulative ack. Next flush iteration will send this to broker.
-                    cumulativeAckFlushRequired = true;
-                    return;
-                }
-            } else {
-                currentCumulativeAck.recycle();
-                // message id acknowledging an before the current last cumulative ack
-                return;
-            }
+        synchronized (lastCumulativeAck) {
+            lastCumulativeAck.update(msgId, bitSet);

Review Comment:
   may we should check the msgId is rather than the last cumuative ack



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -60,6 +59,13 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
      * When reaching the max group size, an ack command is sent out immediately.
      */
     private static final int MAX_ACK_GROUP_SIZE = 1000;
+    private static final FastThreadLocal<LastCumulativeAck> LOCAL_LAST_CUMULATIVE_ACK_TO_FLUSH =

Review Comment:
   If use FastThreadLocal, multiple `PersistentAcknowledgmentsGroupingTrackers` will affect each other `LastCumulativeAck `



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #16072: [fix][Java Client] Fix thread safety issue of `LastCumulativeAck`

Posted by GitBox <gi...@apache.org>.
Demogorgon314 commented on code in PR #16072:
URL: https://github.com/apache/pulsar/pull/16072#discussion_r901774237


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -116,8 +118,8 @@ public PersistentAcknowledgmentsGroupingTracker(ConsumerImpl<?> consumer, Consum
      * resent after a disconnection and for which the user has already sent an acknowledgement.
      */
     @Override
-    public boolean isDuplicate(@NonNull MessageId messageId) {
-        final MessageId messageIdOfLastAck = lastCumulativeAck.messageId;
+    public boolean isDuplicate(MessageId messageId) {
+        final MessageIdImpl messageIdOfLastAck = lastCumulativeAck.getMessageId();

Review Comment:
   I see.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on a diff in pull request #16072: [fix][Java Client] Fix thread safety issue of `LastCumulativeAck`

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on code in PR #16072:
URL: https://github.com/apache/pulsar/pull/16072#discussion_r902398053


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -476,12 +457,24 @@ public void flush() {
     private void flushAsync(ClientCnx cnx) {
         boolean shouldFlush = false;
         if (cumulativeAckFlushRequired) {
-            newMessageAckCommandAndWrite(cnx, consumer.consumerId, lastCumulativeAck.messageId.ledgerId,
-                    lastCumulativeAck.messageId.getEntryId(), lastCumulativeAck.bitSetRecyclable,
+            // `doCumulativeAckAsync` has been invoked before, we need to retrieve the latest value from
+            // `lastCumulativeAck`, here we transfer the ownership of `lastCumulativeAck` to the thread local object
+            // so that we can avoid locking this whole code block or recycling the bit set after `lastCumulativeAck`
+            // is updated.
+            final LastCumulativeAck cumulativeAck = LOCAL_LAST_CUMULATIVE_ACK_TO_FLUSH.get();
+            lastCumulativeAck.moveOwnershipTo(cumulativeAck);
+
+            newMessageAckCommandAndWrite(cnx, consumer.consumerId, cumulativeAck.getMessageId().getLedgerId(),
+                    cumulativeAck.getMessageId().getEntryId(), cumulativeAck.getBitSetRecyclable(),
                     AckType.Cumulative, null, Collections.emptyMap(), false,
                     this.currentCumulativeAckFuture, null);
-            this.consumer.unAckedChunkedMessageIdSequenceMap.remove(lastCumulativeAck.messageId);
+            this.consumer.unAckedChunkedMessageIdSequenceMap.remove(cumulativeAck.getMessageId());
             shouldFlush = true;
+
+            if (!lastCumulativeAck.restoreOwnershipIfEmpty(cumulativeAck)) {

Review Comment:
   I was thinking if invoke
   https://github.com/apache/pulsar/blob/141c44022a27be2fc07eab9827cfdb168e448953/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java#L561
   
   then restoreOwnershipIfEmpty, it may will lose messages to receive and ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #16072: [fix][Java Client] Fix thread safety issue of `LastCumulativeAck`

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #16072:
URL: https://github.com/apache/pulsar/pull/16072#discussion_r902269696


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -60,6 +59,13 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
      * When reaching the max group size, an ack command is sent out immediately.
      */
     private static final int MAX_ACK_GROUP_SIZE = 1000;
+    private static final FastThreadLocal<LastCumulativeAck> LOCAL_LAST_CUMULATIVE_ACK_TO_FLUSH =

Review Comment:
   `LOCAL_LAST_CUMULATIVE_ACK_TO_FLUSH` is only used to store the value of `lastCumulativeAck` before sending the ACK so that we can avoid synchronizing the whole code block. After ACK is sent, the `lastCumulativeAck` will restore from the temporary value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on pull request #16072: [fix][Java Client] Fix thread safety issue of `LastCumulativeAck`

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on PR #16072:
URL: https://github.com/apache/pulsar/pull/16072#issuecomment-1161675466

   I've updated this PR with a refactored change and the PR description has also been updated, PTAL again @Demogorgon314 @congbobo184 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #16072: [fix][Java Client] Fix thread safety issue of `LastCumulativeAck`

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #16072:
URL: https://github.com/apache/pulsar/pull/16072#discussion_r901656825


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -116,8 +118,8 @@ public PersistentAcknowledgmentsGroupingTracker(ConsumerImpl<?> consumer, Consum
      * resent after a disconnection and for which the user has already sent an acknowledgement.
      */
     @Override
-    public boolean isDuplicate(@NonNull MessageId messageId) {
-        final MessageId messageIdOfLastAck = lastCumulativeAck.messageId;
+    public boolean isDuplicate(MessageId messageId) {
+        final MessageIdImpl messageIdOfLastAck = lastCumulativeAck.getMessageId();

Review Comment:
   It's added by me before. The null check is redundant here because the only usage accepts a `MessageId` object returned by `new`, which cannot be `null`. See the following code:
   
   https://github.com/apache/pulsar/blob/d5d8923265b3a4b9731892879b2f2b9dc48c0f98/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1267-L1268
   
   In addition, `@NonNull` will still throw an exception, see https://projectlombok.org/features/NonNull.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #16072: [fix][Java Client] Fix thread safety issue of `LastCumulativeAck`

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #16072:
URL: https://github.com/apache/pulsar/pull/16072#discussion_r903219234


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -560,7 +529,6 @@ private void flushAsync(ClientCnx cnx) {
     @Override
     public void flushAndClean() {
         flush();
-        lastCumulativeAck = LastCumulativeAck.create((MessageIdImpl) MessageIdImpl.earliest, null);

Review Comment:
   The original code uses `cumulativeAckFlushRequired` to determine whether to send ACK in `flushAsync`. Even if `lastCumulativeAck` is reset to earliest here, the `lastCumulativeAck` won't work without calling `doCumulativeAckAsync`, which is the only place that changes `cumulativeAckFlushRequired` with true. And after `doCumulativeAckAsync`, the internal message ID of `lastCumulativeAck` will change, so I think this line here is useless. Could you show a test to verify your point?
   
   In this PR, after `lastCumulativeAck.flush()` is called, the internal `flushRequired` field became false and `lastCumulativeAck` wouldn't work in `flushAsync`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on pull request #16072: [fix][Java Client] Fix thread safety issue of `LastCumulativeAck`

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on PR #16072:
URL: https://github.com/apache/pulsar/pull/16072#issuecomment-1157809992

   Now all tests passed, PTAL, @lhotari @gaozhangmin @congbobo184 @mattisonchao @Technoboy- @codelipenghui 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #16072: [fix][Java Client] Fix thread safety issue of `LastCumulativeAck`

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #16072:
URL: https://github.com/apache/pulsar/pull/16072#discussion_r903226527


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -664,36 +632,61 @@ private boolean isAckReceiptEnabled(ClientCnx cnx) {
         return ackReceiptEnabled && cnx != null
                 && Commands.peerSupportsAckReceipt(cnx.getRemoteEndpointProtocolVersion());
     }
+}
 
-    private static class LastCumulativeAck {
-        private MessageIdImpl messageId;
-        private BitSetRecyclable bitSetRecyclable;
+@Getter
+class LastCumulativeAck {
 
-        static LastCumulativeAck create(MessageIdImpl messageId, BitSetRecyclable bitSetRecyclable) {
-            LastCumulativeAck op = RECYCLER.get();
-            op.messageId = messageId;
-            op.bitSetRecyclable = bitSetRecyclable;
-            return op;
-        }
+    // It's used as a returned value by `flush()` to avoid creating a new instance each time `flush()` is called
+    public static final FastThreadLocal<LastCumulativeAck> LOCAL_LAST_CUMULATIVE_ACK =
+            new FastThreadLocal<LastCumulativeAck>() {
+
+                @Override
+                protected LastCumulativeAck initialValue() {
+                    return new LastCumulativeAck();
+                }
+            };
+    public static final MessageIdImpl DEFAULT_MESSAGE_ID = (MessageIdImpl) MessageIdImpl.earliest;
+
+    private volatile MessageIdImpl messageId = DEFAULT_MESSAGE_ID;
+    private BitSetRecyclable bitSetRecyclable = null;
+    private boolean flushRequired = false;
 
-        private LastCumulativeAck(Recycler.Handle<LastCumulativeAck> recyclerHandle) {
-            this.recyclerHandle = recyclerHandle;
+    public synchronized void update(final MessageIdImpl messageId, final BitSetRecyclable bitSetRecyclable) {
+        if (this.bitSetRecyclable != null && this.bitSetRecyclable != bitSetRecyclable) {
+            this.bitSetRecyclable.recycle();
         }
+        set(messageId, bitSetRecyclable);
+        flushRequired = true;
+    }
 
-        void recycle() {
+    public synchronized LastCumulativeAck flush() {
+        if (flushRequired) {
+            final LastCumulativeAck localLastCumulativeAck = LOCAL_LAST_CUMULATIVE_ACK.get();
             if (bitSetRecyclable != null) {
-                this.bitSetRecyclable.recycle();
+                localLastCumulativeAck.set(messageId, BitSetRecyclable.valueOf(bitSetRecyclable.toLongArray()));

Review Comment:
   I'm not sure if it would be proper to add another constructor that accepts a long array to `bitSetRecyclable` to allocate memory from Netty recycler.
   
   ```java
       public static BitSetRecyclable create(BitSetRecyclable other) {
           final BitSetRecyclable bitSetRecyclable = create();
           bitSetRecyclable.words = Arrays.copyOf(other.words, other.length());
           bitSetRecyclable.wordsInUse = other.wordsInUse;
           bitSetRecyclable.sizeIsSticky = other.sizeIsSticky;
           return bitSetRecyclable;
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #16072: [fix][Java Client] Fix thread safety issue of `LastCumulativeAck`

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #16072:
URL: https://github.com/apache/pulsar/pull/16072#discussion_r902469578


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -476,12 +457,24 @@ public void flush() {
     private void flushAsync(ClientCnx cnx) {
         boolean shouldFlush = false;
         if (cumulativeAckFlushRequired) {
-            newMessageAckCommandAndWrite(cnx, consumer.consumerId, lastCumulativeAck.messageId.ledgerId,
-                    lastCumulativeAck.messageId.getEntryId(), lastCumulativeAck.bitSetRecyclable,
+            // `doCumulativeAckAsync` has been invoked before, we need to retrieve the latest value from
+            // `lastCumulativeAck`, here we transfer the ownership of `lastCumulativeAck` to the thread local object
+            // so that we can avoid locking this whole code block or recycling the bit set after `lastCumulativeAck`
+            // is updated.
+            final LastCumulativeAck cumulativeAck = LOCAL_LAST_CUMULATIVE_ACK_TO_FLUSH.get();
+            lastCumulativeAck.moveOwnershipTo(cumulativeAck);
+
+            newMessageAckCommandAndWrite(cnx, consumer.consumerId, cumulativeAck.getMessageId().getLedgerId(),
+                    cumulativeAck.getMessageId().getEntryId(), cumulativeAck.getBitSetRecyclable(),
                     AckType.Cumulative, null, Collections.emptyMap(), false,
                     this.currentCumulativeAckFuture, null);
-            this.consumer.unAckedChunkedMessageIdSequenceMap.remove(lastCumulativeAck.messageId);
+            this.consumer.unAckedChunkedMessageIdSequenceMap.remove(cumulativeAck.getMessageId());
             shouldFlush = true;
+
+            if (!lastCumulativeAck.restoreOwnershipIfEmpty(cumulativeAck)) {

Review Comment:
   `restoreOwnershipIfEmpty` is called in `flushAsync` and `flushAndClean` will also calls `flushAsync`, do you mean `flushAsync` is invoked concurrently? 
   
   ----
   
   But I just found another problem, in this case, the 2nd invocation of `flushAsync` might use a default message id because `lastCumulativeAck` has been reset before by `moveOwnershipTo`.
   
   I think we need to use a CAS operation on `cumulativeAckFlushRequired`. I'll check the code again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #16072: [fix][Java Client] Fix thread safety issue of `LastCumulativeAck`

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #16072:
URL: https://github.com/apache/pulsar/pull/16072#discussion_r901656825


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -116,8 +118,8 @@ public PersistentAcknowledgmentsGroupingTracker(ConsumerImpl<?> consumer, Consum
      * resent after a disconnection and for which the user has already sent an acknowledgement.
      */
     @Override
-    public boolean isDuplicate(@NonNull MessageId messageId) {
-        final MessageId messageIdOfLastAck = lastCumulativeAck.messageId;
+    public boolean isDuplicate(MessageId messageId) {
+        final MessageIdImpl messageIdOfLastAck = lastCumulativeAck.getMessageId();

Review Comment:
   It's added by me before. The null check is redundant here because the only usage accepts a `MessageId` object returned by `new`, which cannot be `null`. See the following code:
   
   See https://github.com/apache/pulsar/blob/d5d8923265b3a4b9731892879b2f2b9dc48c0f98/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1267-L1268
   
   In addition, `@NonNull` will still throw an exception, see https://projectlombok.org/features/NonNull.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #16072: [fix][Java Client] Fix thread safety issue of `LastCumulativeAck`

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #16072:
URL: https://github.com/apache/pulsar/pull/16072#discussion_r902266409


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -60,6 +59,13 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
      * When reaching the max group size, an ack command is sent out immediately.
      */
     private static final int MAX_ACK_GROUP_SIZE = 1000;
+    private static final FastThreadLocal<LastCumulativeAck> LOCAL_LAST_CUMULATIVE_ACK_TO_FLUSH =

Review Comment:
   I think not. Because this field is just a temporary cache, it's only used in the following code block.
   
   ```java
               final LastCumulativeAck cumulativeAck = LOCAL_LAST_CUMULATIVE_ACK_TO_FLUSH.get();
               lastCumulativeAck.moveOwnershipTo(cumulativeAck);
               /* ... */
               if (!lastCumulativeAck.restoreOwnershipIfEmpty(cumulativeAck)) {
                   // `lastCumulativeAck` has been updated so that restoreOwnershipIfEmpty failed
                   cumulativeAck.recycle();
               }
               cumulativeAckFlushRequired = false;
   ```
   
   The code above has no side effect on the `lastCumulativeAck` field if it's executed atomically.
   
   Assuming there are two trackers, let's say `T1`, `T2`. The actual `lastCumulativeAck` objects are `T1.lastCumulativeAck` and `T2.lastCumulativeAck`.
   
   If `T1.flushAsync()` and `T2.flushAsync()` are called in different threads, `LOCAL_LAST_CUMULATIVE_ACK_TO_FLUSH.get()` will return two different `LastCumulativeAck` objects and they won't affect each other.
   
   If `T1.flushAsync()` and `T2.flushAsync()` are called in the same thread, these two calls cannot be executed concurrently. Each time `flushAsync()` is done, the `lastCumulativeAck` field of each tracker won't change unless it's updated in other methods.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #16072: [fix][Java Client] Fix thread safety issue of `LastCumulativeAck`

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #16072:
URL: https://github.com/apache/pulsar/pull/16072#discussion_r902471736


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -476,12 +457,24 @@ public void flush() {
     private void flushAsync(ClientCnx cnx) {
         boolean shouldFlush = false;
         if (cumulativeAckFlushRequired) {
-            newMessageAckCommandAndWrite(cnx, consumer.consumerId, lastCumulativeAck.messageId.ledgerId,
-                    lastCumulativeAck.messageId.getEntryId(), lastCumulativeAck.bitSetRecyclable,
+            // `doCumulativeAckAsync` has been invoked before, we need to retrieve the latest value from
+            // `lastCumulativeAck`, here we transfer the ownership of `lastCumulativeAck` to the thread local object
+            // so that we can avoid locking this whole code block or recycling the bit set after `lastCumulativeAck`
+            // is updated.
+            final LastCumulativeAck cumulativeAck = LOCAL_LAST_CUMULATIVE_ACK_TO_FLUSH.get();
+            lastCumulativeAck.moveOwnershipTo(cumulativeAck);
+
+            newMessageAckCommandAndWrite(cnx, consumer.consumerId, cumulativeAck.getMessageId().getLedgerId(),
+                    cumulativeAck.getMessageId().getEntryId(), cumulativeAck.getBitSetRecyclable(),
                     AckType.Cumulative, null, Collections.emptyMap(), false,
                     this.currentCumulativeAckFuture, null);
-            this.consumer.unAckedChunkedMessageIdSequenceMap.remove(lastCumulativeAck.messageId);
+            this.consumer.unAckedChunkedMessageIdSequenceMap.remove(cumulativeAck.getMessageId());
             shouldFlush = true;
+
+            if (!lastCumulativeAck.restoreOwnershipIfEmpty(cumulativeAck)) {

Review Comment:
   I will refactor the existing code again. PTAL after I commit the latest code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower merged pull request #16072: [fix][Java Client] Fix thread safety issue of `LastCumulativeAck`

Posted by GitBox <gi...@apache.org>.
BewareMyPower merged PR #16072:
URL: https://github.com/apache/pulsar/pull/16072


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #16072: [fix][Java Client] Fix thread safety issue of `LastCumulativeAck`

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #16072:
URL: https://github.com/apache/pulsar/pull/16072#discussion_r902259296


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -370,29 +371,9 @@ private void doIndividualBatchAckAsync(BatchMessageIdImpl batchMessageId) {
 
     private void doCumulativeAckAsync(MessageIdImpl msgId, BitSetRecyclable bitSet) {
         // Handle concurrent updates from different threads
-        LastCumulativeAck currentCumulativeAck = LastCumulativeAck.create(msgId, bitSet);
-        while (true) {
-            LastCumulativeAck lastCumulativeAck = this.lastCumulativeAck;
-            if (msgId.compareTo(lastCumulativeAck.messageId) > 0) {
-                if (LAST_CUMULATIVE_ACK_UPDATER.compareAndSet(this, this.lastCumulativeAck, currentCumulativeAck)) {
-                    if (lastCumulativeAck.bitSetRecyclable != null) {
-                        try {
-                            lastCumulativeAck.bitSetRecyclable.recycle();
-                        } catch (Exception ignore) {
-                            // no-op
-                        }
-                        lastCumulativeAck.bitSetRecyclable = null;
-                    }
-                    lastCumulativeAck.recycle();
-                    // Successfully updated the last cumulative ack. Next flush iteration will send this to broker.
-                    cumulativeAckFlushRequired = true;
-                    return;
-                }
-            } else {
-                currentCumulativeAck.recycle();
-                // message id acknowledging an before the current last cumulative ack
-                return;
-            }
+        synchronized (lastCumulativeAck) {
+            lastCumulativeAck.update(msgId, bitSet);

Review Comment:
   Did you mean when the `msgId` is the same with the `lastCumulativeAck.messageId`? I think we should check `bitSetRecyclable` to avoid invoking the `recycle()` method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #16072: [fix][Java Client] Fix thread safety issue of `LastCumulativeAck`

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #16072:
URL: https://github.com/apache/pulsar/pull/16072#discussion_r903245160


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -560,7 +529,6 @@ private void flushAsync(ClientCnx cnx) {
     @Override
     public void flushAndClean() {
         flush();
-        lastCumulativeAck = LastCumulativeAck.create((MessageIdImpl) MessageIdImpl.earliest, null);

Review Comment:
   I added the comparison log with `messageId` in `LastCumulativeAck#update` back. So we need to reset the `lastCumulativeAck` now to allow it being updated with any value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #16072: [fix][Java Client] Fix thread safety issue of `LastCumulativeAck`

Posted by GitBox <gi...@apache.org>.
Demogorgon314 commented on code in PR #16072:
URL: https://github.com/apache/pulsar/pull/16072#discussion_r902131202


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -476,12 +457,24 @@ public void flush() {
     private void flushAsync(ClientCnx cnx) {
         boolean shouldFlush = false;
         if (cumulativeAckFlushRequired) {
-            newMessageAckCommandAndWrite(cnx, consumer.consumerId, lastCumulativeAck.messageId.ledgerId,
-                    lastCumulativeAck.messageId.getEntryId(), lastCumulativeAck.bitSetRecyclable,
+            // `doCumulativeAckAsync` has been invoked before, we need to retrieve the latest value from
+            // `lastCumulativeAck`, here we transfer the ownership of `lastCumulativeAck` to the thread local object
+            // so that we can avoid locking this whole code block or recycling the bit set after `lastCumulativeAck`
+            // is updated.
+            final LastCumulativeAck cumulativeAck = LOCAL_LAST_CUMULATIVE_ACK_TO_FLUSH.get();
+            lastCumulativeAck.moveOwnershipTo(LOCAL_LAST_CUMULATIVE_ACK_TO_FLUSH.get());

Review Comment:
   ```suggestion
               lastCumulativeAck.moveOwnershipTo(cumulativeAck);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #16072: [fix][Java Client] Fix thread safety issue of `LastCumulativeAck`

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #16072:
URL: https://github.com/apache/pulsar/pull/16072#discussion_r903224237


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -664,36 +632,61 @@ private boolean isAckReceiptEnabled(ClientCnx cnx) {
         return ackReceiptEnabled && cnx != null
                 && Commands.peerSupportsAckReceipt(cnx.getRemoteEndpointProtocolVersion());
     }
+}
 
-    private static class LastCumulativeAck {
-        private MessageIdImpl messageId;
-        private BitSetRecyclable bitSetRecyclable;
+@Getter
+class LastCumulativeAck {
 
-        static LastCumulativeAck create(MessageIdImpl messageId, BitSetRecyclable bitSetRecyclable) {
-            LastCumulativeAck op = RECYCLER.get();
-            op.messageId = messageId;
-            op.bitSetRecyclable = bitSetRecyclable;
-            return op;
-        }
+    // It's used as a returned value by `flush()` to avoid creating a new instance each time `flush()` is called
+    public static final FastThreadLocal<LastCumulativeAck> LOCAL_LAST_CUMULATIVE_ACK =
+            new FastThreadLocal<LastCumulativeAck>() {
+
+                @Override
+                protected LastCumulativeAck initialValue() {
+                    return new LastCumulativeAck();
+                }
+            };
+    public static final MessageIdImpl DEFAULT_MESSAGE_ID = (MessageIdImpl) MessageIdImpl.earliest;
+
+    private volatile MessageIdImpl messageId = DEFAULT_MESSAGE_ID;
+    private BitSetRecyclable bitSetRecyclable = null;
+    private boolean flushRequired = false;
 
-        private LastCumulativeAck(Recycler.Handle<LastCumulativeAck> recyclerHandle) {
-            this.recyclerHandle = recyclerHandle;
+    public synchronized void update(final MessageIdImpl messageId, final BitSetRecyclable bitSetRecyclable) {
+        if (this.bitSetRecyclable != null && this.bitSetRecyclable != bitSetRecyclable) {
+            this.bitSetRecyclable.recycle();
         }
+        set(messageId, bitSetRecyclable);
+        flushRequired = true;
+    }
 
-        void recycle() {
+    public synchronized LastCumulativeAck flush() {
+        if (flushRequired) {
+            final LastCumulativeAck localLastCumulativeAck = LOCAL_LAST_CUMULATIVE_ACK.get();
             if (bitSetRecyclable != null) {
-                this.bitSetRecyclable.recycle();
+                localLastCumulativeAck.set(messageId, BitSetRecyclable.valueOf(bitSetRecyclable.toLongArray()));

Review Comment:
   Actually not. `BitSetRecyclable#valueOf` doesn't use Netty recycler to create a `BitSetRecyclable` so it doesn't need to recycle. Regarding to the `bitSetRecyclable` in the current instance, it will be recycled next time `update` is called.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on a diff in pull request #16072: [fix][Java Client] Fix thread safety issue of `LastCumulativeAck`

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on code in PR #16072:
URL: https://github.com/apache/pulsar/pull/16072#discussion_r903211468


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -560,7 +529,6 @@ private void flushAsync(ClientCnx cnx) {
     @Override
     public void flushAndClean() {
         flush();
-        lastCumulativeAck = LastCumulativeAck.create((MessageIdImpl) MessageIdImpl.earliest, null);

Review Comment:
   if delete this line when seek the message to early position, the consumer will ack the incorrect position



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -664,36 +632,61 @@ private boolean isAckReceiptEnabled(ClientCnx cnx) {
         return ackReceiptEnabled && cnx != null
                 && Commands.peerSupportsAckReceipt(cnx.getRemoteEndpointProtocolVersion());
     }
+}
 
-    private static class LastCumulativeAck {
-        private MessageIdImpl messageId;
-        private BitSetRecyclable bitSetRecyclable;
+@Getter
+class LastCumulativeAck {
 
-        static LastCumulativeAck create(MessageIdImpl messageId, BitSetRecyclable bitSetRecyclable) {
-            LastCumulativeAck op = RECYCLER.get();
-            op.messageId = messageId;
-            op.bitSetRecyclable = bitSetRecyclable;
-            return op;
-        }
+    // It's used as a returned value by `flush()` to avoid creating a new instance each time `flush()` is called
+    public static final FastThreadLocal<LastCumulativeAck> LOCAL_LAST_CUMULATIVE_ACK =
+            new FastThreadLocal<LastCumulativeAck>() {
+
+                @Override
+                protected LastCumulativeAck initialValue() {
+                    return new LastCumulativeAck();
+                }
+            };
+    public static final MessageIdImpl DEFAULT_MESSAGE_ID = (MessageIdImpl) MessageIdImpl.earliest;
+
+    private volatile MessageIdImpl messageId = DEFAULT_MESSAGE_ID;
+    private BitSetRecyclable bitSetRecyclable = null;
+    private boolean flushRequired = false;
 
-        private LastCumulativeAck(Recycler.Handle<LastCumulativeAck> recyclerHandle) {
-            this.recyclerHandle = recyclerHandle;
+    public synchronized void update(final MessageIdImpl messageId, final BitSetRecyclable bitSetRecyclable) {
+        if (this.bitSetRecyclable != null && this.bitSetRecyclable != bitSetRecyclable) {
+            this.bitSetRecyclable.recycle();
         }
+        set(messageId, bitSetRecyclable);
+        flushRequired = true;
+    }
 
-        void recycle() {
+    public synchronized LastCumulativeAck flush() {
+        if (flushRequired) {
+            final LastCumulativeAck localLastCumulativeAck = LOCAL_LAST_CUMULATIVE_ACK.get();
             if (bitSetRecyclable != null) {
-                this.bitSetRecyclable.recycle();
+                localLastCumulativeAck.set(messageId, BitSetRecyclable.valueOf(bitSetRecyclable.toLongArray()));

Review Comment:
   this bitSet need to recycle



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #16072: [fix][Java Client] Fix thread safety issue of `LastCumulativeAck`

Posted by GitBox <gi...@apache.org>.
Demogorgon314 commented on code in PR #16072:
URL: https://github.com/apache/pulsar/pull/16072#discussion_r901633127


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -68,18 +74,14 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
     private volatile TimedCompletableFuture<Void> currentIndividualAckFuture;
     private volatile TimedCompletableFuture<Void> currentCumulativeAckFuture;
 
-    private volatile LastCumulativeAck lastCumulativeAck =
-            LastCumulativeAck.create((MessageIdImpl) MessageIdImpl.earliest, null);
+    private final LastCumulativeAck lastCumulativeAck = new LastCumulativeAck();
 
     private volatile boolean cumulativeAckFlushRequired = false;
 
+

Review Comment:
   Unnecessary new line.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -116,8 +118,8 @@ public PersistentAcknowledgmentsGroupingTracker(ConsumerImpl<?> consumer, Consum
      * resent after a disconnection and for which the user has already sent an acknowledgement.
      */
     @Override
-    public boolean isDuplicate(@NonNull MessageId messageId) {
-        final MessageId messageIdOfLastAck = lastCumulativeAck.messageId;
+    public boolean isDuplicate(MessageId messageId) {
+        final MessageIdImpl messageIdOfLastAck = lastCumulativeAck.getMessageId();

Review Comment:
   Why do we remove the `@NonNull` annotation? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #16072: [fix][Java Client] Fix thread safety issue of `LastCumulativeAck`

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #16072:
URL: https://github.com/apache/pulsar/pull/16072#discussion_r902248174


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -60,6 +59,13 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
      * When reaching the max group size, an ack command is sent out immediately.
      */
     private static final int MAX_ACK_GROUP_SIZE = 1000;
+    private static final FastThreadLocal<LastCumulativeAck> LOCAL_LAST_CUMULATIVE_ACK_TO_FLUSH =

Review Comment:
   Good catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org