You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/28 15:13:23 UTC

[pulsar] 18/29: [fix][Java Client] Fix thread safety issue of `LastCumulativeAck` (#16072)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5eefdf10e563c32552772a3d50127c55ff18d557
Author: Yunze Xu <xy...@163.com>
AuthorDate: Wed Jun 22 23:34:49 2022 +0800

    [fix][Java Client] Fix thread safety issue of `LastCumulativeAck` (#16072)
    
    ### Motivation
    
    There were several issues caused by the thread safe issue of
    `LastCumulativeAck`, see:
    - https://github.com/apache/pulsar/pull/10586
    - https://github.com/apache/pulsar/pull/12343
    
    The root cause is that `LastCumulativeAck` could be accessed by
    different threads, especially in `flushAsync` method. But the fields are
    accessed directly and no thread safety can be guaranteed.
    
    In addition, the current `LastCumulativeAck` class  was added in
    https://github.com/apache/pulsar/pull/8996 to hold two object
    references, but this modification is wrong.
    
    Before #8996, there are two CAS operations in `doCumulativeAck` method
    in case it's called concurretly. Though the composite CAS operation is
    not atomic.
    
    However, after #8996, only CAS operation was performed but it's compared
    with a `LastCumulativeAck` object, not the two fields (`messageId` and
    `bitSetRecyclable`).
    
    There is another issue that it uses a flag `cumulativeAckFlushRequired`
    to mark if `lastCumulativeAck` should flush. However, if `flushAsync`
    was called concurrently, both would send ACK commands to broker.
    
    ### Modifications
    
    To solve the thread safety issue, this PR move the `LastCumulativeAck`
    out of the `PersistentAcknowledgmentsGroupingTracker` to disable
    directly access to the internal fields. Then, the following synchronized
    methods were added to guarantee the thread safety:
    - `update`: Guarantee the safe write operations. It also recycles the
      `BitSetRecyclable` object before assigning new values and indicates
      itself can be flushed.
    - `flush`: If it can be flushed, return a thread local
      `LastCumulativeAck` instance that contains the message ID and the bit
      set. The bit set is deep copied to avoid the original reference being
      recycled in another `update` call.
    
    In addition, since the `messageId` field is volatile, the `getMessageId`
    method can always retrieve the latest reference.
    
    `LastCumulativeAckTest` is added to verify the sematics above.
    
    Based on the new design, we can only maintain a `LastCumulativeAck`
    field in `PersistentAcknowledgmentsGroupingTracker` and call the related
    methods in `doCumulativeAck` and `flushAsync`. It also fixes the problem
    that two concurrent `flushAsync` calls might send the same ACK command
    twice.
    
    (cherry picked from commit 936d6fdc780ea454e72e82b6c7a1885799158d02)
---
 .../PersistentAcknowledgmentsGroupingTracker.java  | 141 +++++++++++----------
 .../pulsar/client/impl/LastCumulativeAckTest.java  |  86 +++++++++++++
 2 files changed, 159 insertions(+), 68 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index f0f0cfd7548..9829babece7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -21,7 +21,7 @@ package org.apache.pulsar.client.impl;
 import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.EventLoopGroup;
-import io.netty.util.Recycler;
+import io.netty.util.concurrent.FastThreadLocal;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -34,9 +34,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import lombok.NonNull;
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.tuple.Triple;
 import org.apache.pulsar.client.api.MessageId;
@@ -68,18 +67,11 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
     private volatile TimedCompletableFuture<Void> currentIndividualAckFuture;
     private volatile TimedCompletableFuture<Void> currentCumulativeAckFuture;
 
-    private volatile LastCumulativeAck lastCumulativeAck =
-            LastCumulativeAck.create((MessageIdImpl) MessageIdImpl.earliest, null);
-
-    private volatile boolean cumulativeAckFlushRequired = false;
+    private final LastCumulativeAck lastCumulativeAck = new LastCumulativeAck();
 
     // When we flush the command, we should ensure current ack request will send correct
     private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
 
-    private static final AtomicReferenceFieldUpdater<PersistentAcknowledgmentsGroupingTracker, LastCumulativeAck>
-            LAST_CUMULATIVE_ACK_UPDATER = AtomicReferenceFieldUpdater.newUpdater(
-                    PersistentAcknowledgmentsGroupingTracker.class, LastCumulativeAck.class, "lastCumulativeAck");
-
     /**
      * This is a set of all the individual acks that the application has issued and that were not already sent to
      * broker.
@@ -116,13 +108,13 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
      * resent after a disconnection and for which the user has already sent an acknowledgement.
      */
     @Override
-    public boolean isDuplicate(@NonNull MessageId messageId) {
-        final MessageId messageIdOfLastAck = lastCumulativeAck.messageId;
+    public boolean isDuplicate(MessageId messageId) {
+        final MessageIdImpl messageIdOfLastAck = lastCumulativeAck.getMessageId();
         if (messageIdOfLastAck != null && messageId.compareTo(messageIdOfLastAck) <= 0) {
             // Already included in a cumulative ack
             return true;
         } else {
-            return pendingIndividualAcks.contains(messageId);
+            return pendingIndividualAcks.contains((MessageIdImpl) messageId);
         }
     }
 
@@ -370,30 +362,7 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
 
     private void doCumulativeAckAsync(MessageIdImpl msgId, BitSetRecyclable bitSet) {
         // Handle concurrent updates from different threads
-        LastCumulativeAck currentCumulativeAck = LastCumulativeAck.create(msgId, bitSet);
-        while (true) {
-            LastCumulativeAck lastCumulativeAck = this.lastCumulativeAck;
-            if (msgId.compareTo(lastCumulativeAck.messageId) > 0) {
-                if (LAST_CUMULATIVE_ACK_UPDATER.compareAndSet(this, this.lastCumulativeAck, currentCumulativeAck)) {
-                    if (lastCumulativeAck.bitSetRecyclable != null) {
-                        try {
-                            lastCumulativeAck.bitSetRecyclable.recycle();
-                        } catch (Exception ignore) {
-                            // no-op
-                        }
-                        lastCumulativeAck.bitSetRecyclable = null;
-                    }
-                    lastCumulativeAck.recycle();
-                    // Successfully updated the last cumulative ack. Next flush iteration will send this to broker.
-                    cumulativeAckFlushRequired = true;
-                    return;
-                }
-            } else {
-                currentCumulativeAck.recycle();
-                // message id acknowledging an before the current last cumulative ack
-                return;
-            }
-        }
+        lastCumulativeAck.update(msgId, bitSet);
     }
 
     private CompletableFuture<Void> doCumulativeBatchIndexAck(BatchMessageIdImpl batchMessageId,
@@ -474,15 +443,15 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
     }
 
     private void flushAsync(ClientCnx cnx) {
+        final LastCumulativeAck lastCumulativeAckToFlush = lastCumulativeAck.flush();
         boolean shouldFlush = false;
-        if (cumulativeAckFlushRequired) {
-            newMessageAckCommandAndWrite(cnx, consumer.consumerId, lastCumulativeAck.messageId.ledgerId,
-                    lastCumulativeAck.messageId.getEntryId(), lastCumulativeAck.bitSetRecyclable,
-                    AckType.Cumulative, null, Collections.emptyMap(), false,
-                    this.currentCumulativeAckFuture, null);
-            this.consumer.unAckedChunkedMessageIdSequenceMap.remove(lastCumulativeAck.messageId);
+        if (lastCumulativeAckToFlush != null) {
             shouldFlush = true;
-            cumulativeAckFlushRequired = false;
+            final MessageIdImpl messageId = lastCumulativeAckToFlush.getMessageId();
+            newMessageAckCommandAndWrite(cnx, consumer.consumerId, messageId.getLedgerId(), messageId.getEntryId(),
+                    lastCumulativeAckToFlush.getBitSetRecyclable(), AckType.Cumulative, null,
+                    Collections.emptyMap(), false, this.currentCumulativeAckFuture, null);
+            this.consumer.unAckedChunkedMessageIdSequenceMap.remove(messageId);
         }
 
         // Flush all individual acks
@@ -560,7 +529,7 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
     @Override
     public void flushAndClean() {
         flush();
-        lastCumulativeAck = LastCumulativeAck.create((MessageIdImpl) MessageIdImpl.earliest, null);
+        lastCumulativeAck.reset();
         pendingIndividualAcks.clear();
     }
 
@@ -664,36 +633,72 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
         return ackReceiptEnabled && cnx != null
                 && Commands.peerSupportsAckReceipt(cnx.getRemoteEndpointProtocolVersion());
     }
+}
 
-    private static class LastCumulativeAck {
-        private MessageIdImpl messageId;
-        private BitSetRecyclable bitSetRecyclable;
+@Getter
+class LastCumulativeAck {
 
-        static LastCumulativeAck create(MessageIdImpl messageId, BitSetRecyclable bitSetRecyclable) {
-            LastCumulativeAck op = RECYCLER.get();
-            op.messageId = messageId;
-            op.bitSetRecyclable = bitSetRecyclable;
-            return op;
-        }
+    // It's used as a returned value by `flush()` to avoid creating a new instance each time `flush()` is called
+    public static final FastThreadLocal<LastCumulativeAck> LOCAL_LAST_CUMULATIVE_ACK =
+            new FastThreadLocal<LastCumulativeAck>() {
 
-        private LastCumulativeAck(Recycler.Handle<LastCumulativeAck> recyclerHandle) {
-            this.recyclerHandle = recyclerHandle;
-        }
+                @Override
+                protected LastCumulativeAck initialValue() {
+                    return new LastCumulativeAck();
+                }
+            };
+    public static final MessageIdImpl DEFAULT_MESSAGE_ID = (MessageIdImpl) MessageIdImpl.earliest;
 
-        void recycle() {
-            if (bitSetRecyclable != null) {
+    private volatile MessageIdImpl messageId = DEFAULT_MESSAGE_ID;
+    private BitSetRecyclable bitSetRecyclable = null;
+    private boolean flushRequired = false;
+
+    public synchronized void update(final MessageIdImpl messageId, final BitSetRecyclable bitSetRecyclable) {
+        if (messageId.compareTo(this.messageId) > 0) {
+            if (this.bitSetRecyclable != null && this.bitSetRecyclable != bitSetRecyclable) {
                 this.bitSetRecyclable.recycle();
             }
-            this.messageId = null;
-            recyclerHandle.recycle(this);
+            set(messageId, bitSetRecyclable);
+            flushRequired = true;
         }
+    }
 
-        private final Recycler.Handle<LastCumulativeAck> recyclerHandle;
-        private static final Recycler<LastCumulativeAck> RECYCLER = new Recycler<LastCumulativeAck>() {
-            @Override
-            protected LastCumulativeAck newObject(Handle<LastCumulativeAck> handle) {
-                return new LastCumulativeAck(handle);
+    public synchronized LastCumulativeAck flush() {
+        if (flushRequired) {
+            final LastCumulativeAck localLastCumulativeAck = LOCAL_LAST_CUMULATIVE_ACK.get();
+            if (bitSetRecyclable != null) {
+                localLastCumulativeAck.set(messageId, BitSetRecyclable.valueOf(bitSetRecyclable.toLongArray()));
+            } else {
+                localLastCumulativeAck.set(this.messageId, null);
             }
-        };
+            flushRequired = false;
+            return localLastCumulativeAck;
+        } else {
+            // Return null to indicate nothing to be flushed
+            return null;
+        }
+    }
+
+    public synchronized void reset() {
+        if (bitSetRecyclable != null) {
+            bitSetRecyclable.recycle();
+        }
+        messageId = DEFAULT_MESSAGE_ID;
+        bitSetRecyclable = null;
+        flushRequired = false;
+    }
+
+    private synchronized void set(final MessageIdImpl messageId, final BitSetRecyclable bitSetRecyclable) {
+        this.messageId = messageId;
+        this.bitSetRecyclable = bitSetRecyclable;
+    }
+
+    @Override
+    public String toString() {
+        String s = messageId.toString();
+        if (bitSetRecyclable != null) {
+            s += " (bit set: " + bitSetRecyclable + ")";
+        }
+        return s;
     }
 }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/LastCumulativeAckTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/LastCumulativeAckTest.java
new file mode 100644
index 00000000000..102ccfc0e07
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/LastCumulativeAckTest.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotSame;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertSame;
+import static org.testng.Assert.assertTrue;
+import org.apache.pulsar.common.util.collections.BitSetRecyclable;
+import org.testng.annotations.Test;
+
+public class LastCumulativeAckTest {
+
+    @Test
+    public void testUpdate() {
+        final LastCumulativeAck lastCumulativeAck = new LastCumulativeAck();
+        assertFalse(lastCumulativeAck.isFlushRequired());
+        assertEquals(lastCumulativeAck.getMessageId(), LastCumulativeAck.DEFAULT_MESSAGE_ID);
+        assertNull(lastCumulativeAck.getBitSetRecyclable());
+
+        final MessageIdImpl messageId1 = new MessageIdImpl(0L, 1L, 10);
+        final BitSetRecyclable bitSetRecyclable1 = BitSetRecyclable.create();
+        bitSetRecyclable1.set(0, 3);
+        lastCumulativeAck.update(messageId1, bitSetRecyclable1);
+        assertTrue(lastCumulativeAck.isFlushRequired());
+        assertSame(lastCumulativeAck.getMessageId(), messageId1);
+        assertSame(lastCumulativeAck.getBitSetRecyclable(), bitSetRecyclable1);
+
+        final MessageIdImpl messageId2 = new MessageIdImpl(0L, 2L, 8);
+        lastCumulativeAck.update(messageId2, bitSetRecyclable1);
+        // bitSetRecyclable1 is not recycled
+        assertEquals(bitSetRecyclable1.toString(), "{0, 1, 2}");
+
+        final BitSetRecyclable bitSetRecyclable2 = BitSetRecyclable.create();
+        bitSetRecyclable2.set(0, 2);
+
+        // `update()` only accepts a newer message ID, so this call here has no side effect
+        lastCumulativeAck.update(messageId2, bitSetRecyclable2);
+        assertSame(lastCumulativeAck.getBitSetRecyclable(), bitSetRecyclable1);
+
+        final MessageIdImpl messageId3 = new MessageIdImpl(0L, 3L, 9);
+        lastCumulativeAck.update(messageId3, bitSetRecyclable2);
+        // bitSetRecyclable1 is recycled because it's replaced in `update`
+        assertEquals(bitSetRecyclable1.toString(), "{}");
+        assertSame(lastCumulativeAck.getMessageId(), messageId3);
+        assertSame(lastCumulativeAck.getBitSetRecyclable(), bitSetRecyclable2);
+        bitSetRecyclable2.recycle();
+    }
+
+    @Test
+    public void testFlush() {
+        final LastCumulativeAck lastCumulativeAck = new LastCumulativeAck();
+        assertNull(lastCumulativeAck.flush());
+
+        final MessageIdImpl messageId = new MessageIdImpl(0L, 1L, 3);
+        final BitSetRecyclable bitSetRecyclable = BitSetRecyclable.create();
+        bitSetRecyclable.set(0, 3);
+        lastCumulativeAck.update(messageId, bitSetRecyclable);
+        assertTrue(lastCumulativeAck.isFlushRequired());
+
+        final LastCumulativeAck lastCumulativeAckToFlush = lastCumulativeAck.flush();
+        assertFalse(lastCumulativeAck.isFlushRequired());
+        assertSame(lastCumulativeAckToFlush.getMessageId(), messageId);
+        assertNotSame(lastCumulativeAckToFlush.getBitSetRecyclable(), bitSetRecyclable);
+        assertEquals(lastCumulativeAckToFlush.getBitSetRecyclable(), bitSetRecyclable);
+    }
+
+}