You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/28 15:13:23 UTC
[pulsar] 18/29: [fix][Java Client] Fix thread safety issue of `LastCumulativeAck` (#16072)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 5eefdf10e563c32552772a3d50127c55ff18d557
Author: Yunze Xu <xy...@163.com>
AuthorDate: Wed Jun 22 23:34:49 2022 +0800
[fix][Java Client] Fix thread safety issue of `LastCumulativeAck` (#16072)
### Motivation
There were several issues caused by the thread safe issue of
`LastCumulativeAck`, see:
- https://github.com/apache/pulsar/pull/10586
- https://github.com/apache/pulsar/pull/12343
The root cause is that `LastCumulativeAck` could be accessed by
different threads, especially in `flushAsync` method. But the fields are
accessed directly and no thread safety can be guaranteed.
In addition, the current `LastCumulativeAck` class was added in
https://github.com/apache/pulsar/pull/8996 to hold two object
references, but this modification is wrong.
Before #8996, there are two CAS operations in `doCumulativeAck` method
in case it's called concurretly. Though the composite CAS operation is
not atomic.
However, after #8996, only CAS operation was performed but it's compared
with a `LastCumulativeAck` object, not the two fields (`messageId` and
`bitSetRecyclable`).
There is another issue that it uses a flag `cumulativeAckFlushRequired`
to mark if `lastCumulativeAck` should flush. However, if `flushAsync`
was called concurrently, both would send ACK commands to broker.
### Modifications
To solve the thread safety issue, this PR move the `LastCumulativeAck`
out of the `PersistentAcknowledgmentsGroupingTracker` to disable
directly access to the internal fields. Then, the following synchronized
methods were added to guarantee the thread safety:
- `update`: Guarantee the safe write operations. It also recycles the
`BitSetRecyclable` object before assigning new values and indicates
itself can be flushed.
- `flush`: If it can be flushed, return a thread local
`LastCumulativeAck` instance that contains the message ID and the bit
set. The bit set is deep copied to avoid the original reference being
recycled in another `update` call.
In addition, since the `messageId` field is volatile, the `getMessageId`
method can always retrieve the latest reference.
`LastCumulativeAckTest` is added to verify the sematics above.
Based on the new design, we can only maintain a `LastCumulativeAck`
field in `PersistentAcknowledgmentsGroupingTracker` and call the related
methods in `doCumulativeAck` and `flushAsync`. It also fixes the problem
that two concurrent `flushAsync` calls might send the same ACK command
twice.
(cherry picked from commit 936d6fdc780ea454e72e82b6c7a1885799158d02)
---
.../PersistentAcknowledgmentsGroupingTracker.java | 141 +++++++++++----------
.../pulsar/client/impl/LastCumulativeAckTest.java | 86 +++++++++++++
2 files changed, 159 insertions(+), 68 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index f0f0cfd7548..9829babece7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -21,7 +21,7 @@ package org.apache.pulsar.client.impl;
import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
import io.netty.buffer.ByteBuf;
import io.netty.channel.EventLoopGroup;
-import io.netty.util.Recycler;
+import io.netty.util.concurrent.FastThreadLocal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -34,9 +34,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import lombok.NonNull;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.pulsar.client.api.MessageId;
@@ -68,18 +67,11 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
private volatile TimedCompletableFuture<Void> currentIndividualAckFuture;
private volatile TimedCompletableFuture<Void> currentCumulativeAckFuture;
- private volatile LastCumulativeAck lastCumulativeAck =
- LastCumulativeAck.create((MessageIdImpl) MessageIdImpl.earliest, null);
-
- private volatile boolean cumulativeAckFlushRequired = false;
+ private final LastCumulativeAck lastCumulativeAck = new LastCumulativeAck();
// When we flush the command, we should ensure current ack request will send correct
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
- private static final AtomicReferenceFieldUpdater<PersistentAcknowledgmentsGroupingTracker, LastCumulativeAck>
- LAST_CUMULATIVE_ACK_UPDATER = AtomicReferenceFieldUpdater.newUpdater(
- PersistentAcknowledgmentsGroupingTracker.class, LastCumulativeAck.class, "lastCumulativeAck");
-
/**
* This is a set of all the individual acks that the application has issued and that were not already sent to
* broker.
@@ -116,13 +108,13 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
* resent after a disconnection and for which the user has already sent an acknowledgement.
*/
@Override
- public boolean isDuplicate(@NonNull MessageId messageId) {
- final MessageId messageIdOfLastAck = lastCumulativeAck.messageId;
+ public boolean isDuplicate(MessageId messageId) {
+ final MessageIdImpl messageIdOfLastAck = lastCumulativeAck.getMessageId();
if (messageIdOfLastAck != null && messageId.compareTo(messageIdOfLastAck) <= 0) {
// Already included in a cumulative ack
return true;
} else {
- return pendingIndividualAcks.contains(messageId);
+ return pendingIndividualAcks.contains((MessageIdImpl) messageId);
}
}
@@ -370,30 +362,7 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
private void doCumulativeAckAsync(MessageIdImpl msgId, BitSetRecyclable bitSet) {
// Handle concurrent updates from different threads
- LastCumulativeAck currentCumulativeAck = LastCumulativeAck.create(msgId, bitSet);
- while (true) {
- LastCumulativeAck lastCumulativeAck = this.lastCumulativeAck;
- if (msgId.compareTo(lastCumulativeAck.messageId) > 0) {
- if (LAST_CUMULATIVE_ACK_UPDATER.compareAndSet(this, this.lastCumulativeAck, currentCumulativeAck)) {
- if (lastCumulativeAck.bitSetRecyclable != null) {
- try {
- lastCumulativeAck.bitSetRecyclable.recycle();
- } catch (Exception ignore) {
- // no-op
- }
- lastCumulativeAck.bitSetRecyclable = null;
- }
- lastCumulativeAck.recycle();
- // Successfully updated the last cumulative ack. Next flush iteration will send this to broker.
- cumulativeAckFlushRequired = true;
- return;
- }
- } else {
- currentCumulativeAck.recycle();
- // message id acknowledging an before the current last cumulative ack
- return;
- }
- }
+ lastCumulativeAck.update(msgId, bitSet);
}
private CompletableFuture<Void> doCumulativeBatchIndexAck(BatchMessageIdImpl batchMessageId,
@@ -474,15 +443,15 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
}
private void flushAsync(ClientCnx cnx) {
+ final LastCumulativeAck lastCumulativeAckToFlush = lastCumulativeAck.flush();
boolean shouldFlush = false;
- if (cumulativeAckFlushRequired) {
- newMessageAckCommandAndWrite(cnx, consumer.consumerId, lastCumulativeAck.messageId.ledgerId,
- lastCumulativeAck.messageId.getEntryId(), lastCumulativeAck.bitSetRecyclable,
- AckType.Cumulative, null, Collections.emptyMap(), false,
- this.currentCumulativeAckFuture, null);
- this.consumer.unAckedChunkedMessageIdSequenceMap.remove(lastCumulativeAck.messageId);
+ if (lastCumulativeAckToFlush != null) {
shouldFlush = true;
- cumulativeAckFlushRequired = false;
+ final MessageIdImpl messageId = lastCumulativeAckToFlush.getMessageId();
+ newMessageAckCommandAndWrite(cnx, consumer.consumerId, messageId.getLedgerId(), messageId.getEntryId(),
+ lastCumulativeAckToFlush.getBitSetRecyclable(), AckType.Cumulative, null,
+ Collections.emptyMap(), false, this.currentCumulativeAckFuture, null);
+ this.consumer.unAckedChunkedMessageIdSequenceMap.remove(messageId);
}
// Flush all individual acks
@@ -560,7 +529,7 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
@Override
public void flushAndClean() {
flush();
- lastCumulativeAck = LastCumulativeAck.create((MessageIdImpl) MessageIdImpl.earliest, null);
+ lastCumulativeAck.reset();
pendingIndividualAcks.clear();
}
@@ -664,36 +633,72 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
return ackReceiptEnabled && cnx != null
&& Commands.peerSupportsAckReceipt(cnx.getRemoteEndpointProtocolVersion());
}
+}
- private static class LastCumulativeAck {
- private MessageIdImpl messageId;
- private BitSetRecyclable bitSetRecyclable;
+@Getter
+class LastCumulativeAck {
- static LastCumulativeAck create(MessageIdImpl messageId, BitSetRecyclable bitSetRecyclable) {
- LastCumulativeAck op = RECYCLER.get();
- op.messageId = messageId;
- op.bitSetRecyclable = bitSetRecyclable;
- return op;
- }
+ // It's used as a returned value by `flush()` to avoid creating a new instance each time `flush()` is called
+ public static final FastThreadLocal<LastCumulativeAck> LOCAL_LAST_CUMULATIVE_ACK =
+ new FastThreadLocal<LastCumulativeAck>() {
- private LastCumulativeAck(Recycler.Handle<LastCumulativeAck> recyclerHandle) {
- this.recyclerHandle = recyclerHandle;
- }
+ @Override
+ protected LastCumulativeAck initialValue() {
+ return new LastCumulativeAck();
+ }
+ };
+ public static final MessageIdImpl DEFAULT_MESSAGE_ID = (MessageIdImpl) MessageIdImpl.earliest;
- void recycle() {
- if (bitSetRecyclable != null) {
+ private volatile MessageIdImpl messageId = DEFAULT_MESSAGE_ID;
+ private BitSetRecyclable bitSetRecyclable = null;
+ private boolean flushRequired = false;
+
+ public synchronized void update(final MessageIdImpl messageId, final BitSetRecyclable bitSetRecyclable) {
+ if (messageId.compareTo(this.messageId) > 0) {
+ if (this.bitSetRecyclable != null && this.bitSetRecyclable != bitSetRecyclable) {
this.bitSetRecyclable.recycle();
}
- this.messageId = null;
- recyclerHandle.recycle(this);
+ set(messageId, bitSetRecyclable);
+ flushRequired = true;
}
+ }
- private final Recycler.Handle<LastCumulativeAck> recyclerHandle;
- private static final Recycler<LastCumulativeAck> RECYCLER = new Recycler<LastCumulativeAck>() {
- @Override
- protected LastCumulativeAck newObject(Handle<LastCumulativeAck> handle) {
- return new LastCumulativeAck(handle);
+ public synchronized LastCumulativeAck flush() {
+ if (flushRequired) {
+ final LastCumulativeAck localLastCumulativeAck = LOCAL_LAST_CUMULATIVE_ACK.get();
+ if (bitSetRecyclable != null) {
+ localLastCumulativeAck.set(messageId, BitSetRecyclable.valueOf(bitSetRecyclable.toLongArray()));
+ } else {
+ localLastCumulativeAck.set(this.messageId, null);
}
- };
+ flushRequired = false;
+ return localLastCumulativeAck;
+ } else {
+ // Return null to indicate nothing to be flushed
+ return null;
+ }
+ }
+
+ public synchronized void reset() {
+ if (bitSetRecyclable != null) {
+ bitSetRecyclable.recycle();
+ }
+ messageId = DEFAULT_MESSAGE_ID;
+ bitSetRecyclable = null;
+ flushRequired = false;
+ }
+
+ private synchronized void set(final MessageIdImpl messageId, final BitSetRecyclable bitSetRecyclable) {
+ this.messageId = messageId;
+ this.bitSetRecyclable = bitSetRecyclable;
+ }
+
+ @Override
+ public String toString() {
+ String s = messageId.toString();
+ if (bitSetRecyclable != null) {
+ s += " (bit set: " + bitSetRecyclable + ")";
+ }
+ return s;
}
}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/LastCumulativeAckTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/LastCumulativeAckTest.java
new file mode 100644
index 00000000000..102ccfc0e07
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/LastCumulativeAckTest.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotSame;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertSame;
+import static org.testng.Assert.assertTrue;
+import org.apache.pulsar.common.util.collections.BitSetRecyclable;
+import org.testng.annotations.Test;
+
+public class LastCumulativeAckTest {
+
+ @Test
+ public void testUpdate() {
+ final LastCumulativeAck lastCumulativeAck = new LastCumulativeAck();
+ assertFalse(lastCumulativeAck.isFlushRequired());
+ assertEquals(lastCumulativeAck.getMessageId(), LastCumulativeAck.DEFAULT_MESSAGE_ID);
+ assertNull(lastCumulativeAck.getBitSetRecyclable());
+
+ final MessageIdImpl messageId1 = new MessageIdImpl(0L, 1L, 10);
+ final BitSetRecyclable bitSetRecyclable1 = BitSetRecyclable.create();
+ bitSetRecyclable1.set(0, 3);
+ lastCumulativeAck.update(messageId1, bitSetRecyclable1);
+ assertTrue(lastCumulativeAck.isFlushRequired());
+ assertSame(lastCumulativeAck.getMessageId(), messageId1);
+ assertSame(lastCumulativeAck.getBitSetRecyclable(), bitSetRecyclable1);
+
+ final MessageIdImpl messageId2 = new MessageIdImpl(0L, 2L, 8);
+ lastCumulativeAck.update(messageId2, bitSetRecyclable1);
+ // bitSetRecyclable1 is not recycled
+ assertEquals(bitSetRecyclable1.toString(), "{0, 1, 2}");
+
+ final BitSetRecyclable bitSetRecyclable2 = BitSetRecyclable.create();
+ bitSetRecyclable2.set(0, 2);
+
+ // `update()` only accepts a newer message ID, so this call here has no side effect
+ lastCumulativeAck.update(messageId2, bitSetRecyclable2);
+ assertSame(lastCumulativeAck.getBitSetRecyclable(), bitSetRecyclable1);
+
+ final MessageIdImpl messageId3 = new MessageIdImpl(0L, 3L, 9);
+ lastCumulativeAck.update(messageId3, bitSetRecyclable2);
+ // bitSetRecyclable1 is recycled because it's replaced in `update`
+ assertEquals(bitSetRecyclable1.toString(), "{}");
+ assertSame(lastCumulativeAck.getMessageId(), messageId3);
+ assertSame(lastCumulativeAck.getBitSetRecyclable(), bitSetRecyclable2);
+ bitSetRecyclable2.recycle();
+ }
+
+ @Test
+ public void testFlush() {
+ final LastCumulativeAck lastCumulativeAck = new LastCumulativeAck();
+ assertNull(lastCumulativeAck.flush());
+
+ final MessageIdImpl messageId = new MessageIdImpl(0L, 1L, 3);
+ final BitSetRecyclable bitSetRecyclable = BitSetRecyclable.create();
+ bitSetRecyclable.set(0, 3);
+ lastCumulativeAck.update(messageId, bitSetRecyclable);
+ assertTrue(lastCumulativeAck.isFlushRequired());
+
+ final LastCumulativeAck lastCumulativeAckToFlush = lastCumulativeAck.flush();
+ assertFalse(lastCumulativeAck.isFlushRequired());
+ assertSame(lastCumulativeAckToFlush.getMessageId(), messageId);
+ assertNotSame(lastCumulativeAckToFlush.getBitSetRecyclable(), bitSetRecyclable);
+ assertEquals(lastCumulativeAckToFlush.getBitSetRecyclable(), bitSetRecyclable);
+ }
+
+}