You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/06/15 08:51:49 UTC
[GitHub] [pulsar] BewareMyPower opened a new pull request, #16072: [fix][Java Client] Fix thread safety issue of `LastCumulativeAck`
BewareMyPower opened a new pull request, #16072:
URL: https://github.com/apache/pulsar/pull/16072
### Motivation
There were several issues caused by the thread safe issue of
`LastCumulativeAck`, see:
- https://github.com/apache/pulsar/pull/10586
- https://github.com/apache/pulsar/pull/12343
The root cause is that `LastCumulativeAck` could be accessed by
different threads, especially in `flushAsync` method. But the fields are
accessed directly and no thread safety can be guaranteed.
In addition, the current `LastCumulativeAck` class was added in
https://github.com/apache/pulsar/pull/8996 to hold two object
references, but this modification is wrong.
Before #8996, there are two CAS operations in `doCumulativeAck` method
in case it's called concurretly. Though the composite CAS operation is
not atomic.
However, after #8996, only CAS operation was performed but it's compared
with a `LastCumulativeAck` object, not the two fields (`messageId` and
`bitSetRecyclable`).
### Modifications
To solve the thread safety issue, this PR move the `LastCumulativeAck`
out of the `PersistentAcknowledgmentsGroupingTracker` to disable
directly access to the internal fields. Then, two synchronized methods
were added to guarantee the thread safety:
- `update`: Guarantee the safe write operations. It also recycles the
`BitSetRecyclable` object before assigning new values.
- `moveOwnershipTo`: This method moves the ownership to another
`LastCumulativeAck` object, which will be responsible to recycle the
`BitSetRecyclable` field after that.
With the methods above, each time `flushAsync` is called, move the
ownership of `lastCumulativeAck` field to another thread local field to
send the ACK command and recycle the `BitSetRecyclable` field.
- `lastCumulativeAck` updates the latest message ID and bit set, the
update operations can be performed by multiple threads and
`lastCumulativeAck` saves the latest value.
- `threadLocalLastCumulativeAckToFlush` only acts as a temporary cache
to the latest value in `flushAsync`.
### Documentation
Check the box below or label this PR directly.
Need to update docs?
- [ ] `doc-required`
(Your PR needs to update docs and you will update later)
- [x] `doc-not-needed`
(Please explain why)
- [ ] `doc`
(Your PR contains doc changes)
- [ ] `doc-complete`
(Docs have been already added)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] congbobo184 commented on a diff in pull request #16072: [fix][Java Client] Fix thread safety issue of `LastCumulativeAck`
Posted by GitBox <gi...@apache.org>.
congbobo184 commented on code in PR #16072:
URL: https://github.com/apache/pulsar/pull/16072#discussion_r902144130
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -370,29 +371,9 @@ private void doIndividualBatchAckAsync(BatchMessageIdImpl batchMessageId) {
private void doCumulativeAckAsync(MessageIdImpl msgId, BitSetRecyclable bitSet) {
// Handle concurrent updates from different threads
- LastCumulativeAck currentCumulativeAck = LastCumulativeAck.create(msgId, bitSet);
- while (true) {
- LastCumulativeAck lastCumulativeAck = this.lastCumulativeAck;
- if (msgId.compareTo(lastCumulativeAck.messageId) > 0) {
- if (LAST_CUMULATIVE_ACK_UPDATER.compareAndSet(this, this.lastCumulativeAck, currentCumulativeAck)) {
- if (lastCumulativeAck.bitSetRecyclable != null) {
- try {
- lastCumulativeAck.bitSetRecyclable.recycle();
- } catch (Exception ignore) {
- // no-op
- }
- lastCumulativeAck.bitSetRecyclable = null;
- }
- lastCumulativeAck.recycle();
- // Successfully updated the last cumulative ack. Next flush iteration will send this to broker.
- cumulativeAckFlushRequired = true;
- return;
- }
- } else {
- currentCumulativeAck.recycle();
- // message id acknowledging an before the current last cumulative ack
- return;
- }
+ synchronized (lastCumulativeAck) {
+ lastCumulativeAck.update(msgId, bitSet);
Review Comment:
may we should check the msgId is rather than the last cumuative ack
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -60,6 +59,13 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
* When reaching the max group size, an ack command is sent out immediately.
*/
private static final int MAX_ACK_GROUP_SIZE = 1000;
+ private static final FastThreadLocal<LastCumulativeAck> LOCAL_LAST_CUMULATIVE_ACK_TO_FLUSH =
Review Comment:
If use FastThreadLocal, multiple `PersistentAcknowledgmentsGroupingTrackers` will affect each other `LastCumulativeAck `
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #16072: [fix][Java Client] Fix thread safety issue of `LastCumulativeAck`
Posted by GitBox <gi...@apache.org>.
Demogorgon314 commented on code in PR #16072:
URL: https://github.com/apache/pulsar/pull/16072#discussion_r901774237
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -116,8 +118,8 @@ public PersistentAcknowledgmentsGroupingTracker(ConsumerImpl<?> consumer, Consum
* resent after a disconnection and for which the user has already sent an acknowledgement.
*/
@Override
- public boolean isDuplicate(@NonNull MessageId messageId) {
- final MessageId messageIdOfLastAck = lastCumulativeAck.messageId;
+ public boolean isDuplicate(MessageId messageId) {
+ final MessageIdImpl messageIdOfLastAck = lastCumulativeAck.getMessageId();
Review Comment:
I see.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] congbobo184 commented on a diff in pull request #16072: [fix][Java Client] Fix thread safety issue of `LastCumulativeAck`
Posted by GitBox <gi...@apache.org>.
congbobo184 commented on code in PR #16072:
URL: https://github.com/apache/pulsar/pull/16072#discussion_r902398053
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -476,12 +457,24 @@ public void flush() {
private void flushAsync(ClientCnx cnx) {
boolean shouldFlush = false;
if (cumulativeAckFlushRequired) {
- newMessageAckCommandAndWrite(cnx, consumer.consumerId, lastCumulativeAck.messageId.ledgerId,
- lastCumulativeAck.messageId.getEntryId(), lastCumulativeAck.bitSetRecyclable,
+ // `doCumulativeAckAsync` has been invoked before, we need to retrieve the latest value from
+ // `lastCumulativeAck`, here we transfer the ownership of `lastCumulativeAck` to the thread local object
+ // so that we can avoid locking this whole code block or recycling the bit set after `lastCumulativeAck`
+ // is updated.
+ final LastCumulativeAck cumulativeAck = LOCAL_LAST_CUMULATIVE_ACK_TO_FLUSH.get();
+ lastCumulativeAck.moveOwnershipTo(cumulativeAck);
+
+ newMessageAckCommandAndWrite(cnx, consumer.consumerId, cumulativeAck.getMessageId().getLedgerId(),
+ cumulativeAck.getMessageId().getEntryId(), cumulativeAck.getBitSetRecyclable(),
AckType.Cumulative, null, Collections.emptyMap(), false,
this.currentCumulativeAckFuture, null);
- this.consumer.unAckedChunkedMessageIdSequenceMap.remove(lastCumulativeAck.messageId);
+ this.consumer.unAckedChunkedMessageIdSequenceMap.remove(cumulativeAck.getMessageId());
shouldFlush = true;
+
+ if (!lastCumulativeAck.restoreOwnershipIfEmpty(cumulativeAck)) {
Review Comment:
I was thinking if invoke
https://github.com/apache/pulsar/blob/141c44022a27be2fc07eab9827cfdb168e448953/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java#L561
then restoreOwnershipIfEmpty, it may will lose messages to receive and ack
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #16072: [fix][Java Client] Fix thread safety issue of `LastCumulativeAck`
Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #16072:
URL: https://github.com/apache/pulsar/pull/16072#discussion_r902269696
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -60,6 +59,13 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
* When reaching the max group size, an ack command is sent out immediately.
*/
private static final int MAX_ACK_GROUP_SIZE = 1000;
+ private static final FastThreadLocal<LastCumulativeAck> LOCAL_LAST_CUMULATIVE_ACK_TO_FLUSH =
Review Comment:
`LOCAL_LAST_CUMULATIVE_ACK_TO_FLUSH` is only used to store the value of `lastCumulativeAck` before sending the ACK so that we can avoid synchronizing the whole code block. After ACK is sent, the `lastCumulativeAck` will restore from the temporary value.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] BewareMyPower commented on pull request #16072: [fix][Java Client] Fix thread safety issue of `LastCumulativeAck`
Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on PR #16072:
URL: https://github.com/apache/pulsar/pull/16072#issuecomment-1161675466
I've updated this PR with a refactored change and the PR description has also been updated, PTAL again @Demogorgon314 @congbobo184
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #16072: [fix][Java Client] Fix thread safety issue of `LastCumulativeAck`
Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #16072:
URL: https://github.com/apache/pulsar/pull/16072#discussion_r901656825
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -116,8 +118,8 @@ public PersistentAcknowledgmentsGroupingTracker(ConsumerImpl<?> consumer, Consum
* resent after a disconnection and for which the user has already sent an acknowledgement.
*/
@Override
- public boolean isDuplicate(@NonNull MessageId messageId) {
- final MessageId messageIdOfLastAck = lastCumulativeAck.messageId;
+ public boolean isDuplicate(MessageId messageId) {
+ final MessageIdImpl messageIdOfLastAck = lastCumulativeAck.getMessageId();
Review Comment:
It's added by me before. The null check is redundant here because the only usage accepts a `MessageId` object returned by `new`, which cannot be `null`. See the following code:
https://github.com/apache/pulsar/blob/d5d8923265b3a4b9731892879b2f2b9dc48c0f98/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1267-L1268
In addition, `@NonNull` will still throw an exception, see https://projectlombok.org/features/NonNull.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #16072: [fix][Java Client] Fix thread safety issue of `LastCumulativeAck`
Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #16072:
URL: https://github.com/apache/pulsar/pull/16072#discussion_r903219234
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -560,7 +529,6 @@ private void flushAsync(ClientCnx cnx) {
@Override
public void flushAndClean() {
flush();
- lastCumulativeAck = LastCumulativeAck.create((MessageIdImpl) MessageIdImpl.earliest, null);
Review Comment:
The original code uses `cumulativeAckFlushRequired` to determine whether to send ACK in `flushAsync`. Even if `lastCumulativeAck` is reset to earliest here, the `lastCumulativeAck` won't work without calling `doCumulativeAckAsync`, which is the only place that changes `cumulativeAckFlushRequired` with true. And after `doCumulativeAckAsync`, the internal message ID of `lastCumulativeAck` will change, so I think this line here is useless. Could you show a test to verify your point?
In this PR, after `lastCumulativeAck.flush()` is called, the internal `flushRequired` field became false and `lastCumulativeAck` wouldn't work in `flushAsync`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] BewareMyPower commented on pull request #16072: [fix][Java Client] Fix thread safety issue of `LastCumulativeAck`
Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on PR #16072:
URL: https://github.com/apache/pulsar/pull/16072#issuecomment-1157809992
Now all tests passed, PTAL, @lhotari @gaozhangmin @congbobo184 @mattisonchao @Technoboy- @codelipenghui
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #16072: [fix][Java Client] Fix thread safety issue of `LastCumulativeAck`
Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #16072:
URL: https://github.com/apache/pulsar/pull/16072#discussion_r903226527
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -664,36 +632,61 @@ private boolean isAckReceiptEnabled(ClientCnx cnx) {
return ackReceiptEnabled && cnx != null
&& Commands.peerSupportsAckReceipt(cnx.getRemoteEndpointProtocolVersion());
}
+}
- private static class LastCumulativeAck {
- private MessageIdImpl messageId;
- private BitSetRecyclable bitSetRecyclable;
+@Getter
+class LastCumulativeAck {
- static LastCumulativeAck create(MessageIdImpl messageId, BitSetRecyclable bitSetRecyclable) {
- LastCumulativeAck op = RECYCLER.get();
- op.messageId = messageId;
- op.bitSetRecyclable = bitSetRecyclable;
- return op;
- }
+ // It's used as a returned value by `flush()` to avoid creating a new instance each time `flush()` is called
+ public static final FastThreadLocal<LastCumulativeAck> LOCAL_LAST_CUMULATIVE_ACK =
+ new FastThreadLocal<LastCumulativeAck>() {
+
+ @Override
+ protected LastCumulativeAck initialValue() {
+ return new LastCumulativeAck();
+ }
+ };
+ public static final MessageIdImpl DEFAULT_MESSAGE_ID = (MessageIdImpl) MessageIdImpl.earliest;
+
+ private volatile MessageIdImpl messageId = DEFAULT_MESSAGE_ID;
+ private BitSetRecyclable bitSetRecyclable = null;
+ private boolean flushRequired = false;
- private LastCumulativeAck(Recycler.Handle<LastCumulativeAck> recyclerHandle) {
- this.recyclerHandle = recyclerHandle;
+ public synchronized void update(final MessageIdImpl messageId, final BitSetRecyclable bitSetRecyclable) {
+ if (this.bitSetRecyclable != null && this.bitSetRecyclable != bitSetRecyclable) {
+ this.bitSetRecyclable.recycle();
}
+ set(messageId, bitSetRecyclable);
+ flushRequired = true;
+ }
- void recycle() {
+ public synchronized LastCumulativeAck flush() {
+ if (flushRequired) {
+ final LastCumulativeAck localLastCumulativeAck = LOCAL_LAST_CUMULATIVE_ACK.get();
if (bitSetRecyclable != null) {
- this.bitSetRecyclable.recycle();
+ localLastCumulativeAck.set(messageId, BitSetRecyclable.valueOf(bitSetRecyclable.toLongArray()));
Review Comment:
I'm not sure if it would be proper to add another constructor that accepts a long array to `bitSetRecyclable` to allocate memory from Netty recycler.
```java
public static BitSetRecyclable create(BitSetRecyclable other) {
final BitSetRecyclable bitSetRecyclable = create();
bitSetRecyclable.words = Arrays.copyOf(other.words, other.length());
bitSetRecyclable.wordsInUse = other.wordsInUse;
bitSetRecyclable.sizeIsSticky = other.sizeIsSticky;
return bitSetRecyclable;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #16072: [fix][Java Client] Fix thread safety issue of `LastCumulativeAck`
Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #16072:
URL: https://github.com/apache/pulsar/pull/16072#discussion_r902469578
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -476,12 +457,24 @@ public void flush() {
private void flushAsync(ClientCnx cnx) {
boolean shouldFlush = false;
if (cumulativeAckFlushRequired) {
- newMessageAckCommandAndWrite(cnx, consumer.consumerId, lastCumulativeAck.messageId.ledgerId,
- lastCumulativeAck.messageId.getEntryId(), lastCumulativeAck.bitSetRecyclable,
+ // `doCumulativeAckAsync` has been invoked before, we need to retrieve the latest value from
+ // `lastCumulativeAck`, here we transfer the ownership of `lastCumulativeAck` to the thread local object
+ // so that we can avoid locking this whole code block or recycling the bit set after `lastCumulativeAck`
+ // is updated.
+ final LastCumulativeAck cumulativeAck = LOCAL_LAST_CUMULATIVE_ACK_TO_FLUSH.get();
+ lastCumulativeAck.moveOwnershipTo(cumulativeAck);
+
+ newMessageAckCommandAndWrite(cnx, consumer.consumerId, cumulativeAck.getMessageId().getLedgerId(),
+ cumulativeAck.getMessageId().getEntryId(), cumulativeAck.getBitSetRecyclable(),
AckType.Cumulative, null, Collections.emptyMap(), false,
this.currentCumulativeAckFuture, null);
- this.consumer.unAckedChunkedMessageIdSequenceMap.remove(lastCumulativeAck.messageId);
+ this.consumer.unAckedChunkedMessageIdSequenceMap.remove(cumulativeAck.getMessageId());
shouldFlush = true;
+
+ if (!lastCumulativeAck.restoreOwnershipIfEmpty(cumulativeAck)) {
Review Comment:
`restoreOwnershipIfEmpty` is called in `flushAsync` and `flushAndClean` will also calls `flushAsync`, do you mean `flushAsync` is invoked concurrently?
----
But I just found another problem, in this case, the 2nd invocation of `flushAsync` might use a default message id because `lastCumulativeAck` has been reset before by `moveOwnershipTo`.
I think we need to use a CAS operation on `cumulativeAckFlushRequired`. I'll check the code again.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #16072: [fix][Java Client] Fix thread safety issue of `LastCumulativeAck`
Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #16072:
URL: https://github.com/apache/pulsar/pull/16072#discussion_r901656825
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -116,8 +118,8 @@ public PersistentAcknowledgmentsGroupingTracker(ConsumerImpl<?> consumer, Consum
* resent after a disconnection and for which the user has already sent an acknowledgement.
*/
@Override
- public boolean isDuplicate(@NonNull MessageId messageId) {
- final MessageId messageIdOfLastAck = lastCumulativeAck.messageId;
+ public boolean isDuplicate(MessageId messageId) {
+ final MessageIdImpl messageIdOfLastAck = lastCumulativeAck.getMessageId();
Review Comment:
It's added by me before. The null check is redundant here because the only usage accepts a `MessageId` object returned by `new`, which cannot be `null`. See the following code:
See https://github.com/apache/pulsar/blob/d5d8923265b3a4b9731892879b2f2b9dc48c0f98/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1267-L1268
In addition, `@NonNull` will still throw an exception, see https://projectlombok.org/features/NonNull.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #16072: [fix][Java Client] Fix thread safety issue of `LastCumulativeAck`
Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #16072:
URL: https://github.com/apache/pulsar/pull/16072#discussion_r902266409
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -60,6 +59,13 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
* When reaching the max group size, an ack command is sent out immediately.
*/
private static final int MAX_ACK_GROUP_SIZE = 1000;
+ private static final FastThreadLocal<LastCumulativeAck> LOCAL_LAST_CUMULATIVE_ACK_TO_FLUSH =
Review Comment:
I think not. Because this field is just a temporary cache, it's only used in the following code block.
```java
final LastCumulativeAck cumulativeAck = LOCAL_LAST_CUMULATIVE_ACK_TO_FLUSH.get();
lastCumulativeAck.moveOwnershipTo(cumulativeAck);
/* ... */
if (!lastCumulativeAck.restoreOwnershipIfEmpty(cumulativeAck)) {
// `lastCumulativeAck` has been updated so that restoreOwnershipIfEmpty failed
cumulativeAck.recycle();
}
cumulativeAckFlushRequired = false;
```
The code above has no side effect on the `lastCumulativeAck` field if it's executed atomically.
Assuming there are two trackers, let's say `T1`, `T2`. The actual `lastCumulativeAck` objects are `T1.lastCumulativeAck` and `T2.lastCumulativeAck`.
If `T1.flushAsync()` and `T2.flushAsync()` are called in different threads, `LOCAL_LAST_CUMULATIVE_ACK_TO_FLUSH.get()` will return two different `LastCumulativeAck` objects and they won't affect each other.
If `T1.flushAsync()` and `T2.flushAsync()` are called in the same thread, these two calls cannot be executed concurrently. Each time `flushAsync()` is done, the `lastCumulativeAck` field of each tracker won't change unless it's updated in other methods.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #16072: [fix][Java Client] Fix thread safety issue of `LastCumulativeAck`
Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #16072:
URL: https://github.com/apache/pulsar/pull/16072#discussion_r902471736
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -476,12 +457,24 @@ public void flush() {
private void flushAsync(ClientCnx cnx) {
boolean shouldFlush = false;
if (cumulativeAckFlushRequired) {
- newMessageAckCommandAndWrite(cnx, consumer.consumerId, lastCumulativeAck.messageId.ledgerId,
- lastCumulativeAck.messageId.getEntryId(), lastCumulativeAck.bitSetRecyclable,
+ // `doCumulativeAckAsync` has been invoked before, we need to retrieve the latest value from
+ // `lastCumulativeAck`, here we transfer the ownership of `lastCumulativeAck` to the thread local object
+ // so that we can avoid locking this whole code block or recycling the bit set after `lastCumulativeAck`
+ // is updated.
+ final LastCumulativeAck cumulativeAck = LOCAL_LAST_CUMULATIVE_ACK_TO_FLUSH.get();
+ lastCumulativeAck.moveOwnershipTo(cumulativeAck);
+
+ newMessageAckCommandAndWrite(cnx, consumer.consumerId, cumulativeAck.getMessageId().getLedgerId(),
+ cumulativeAck.getMessageId().getEntryId(), cumulativeAck.getBitSetRecyclable(),
AckType.Cumulative, null, Collections.emptyMap(), false,
this.currentCumulativeAckFuture, null);
- this.consumer.unAckedChunkedMessageIdSequenceMap.remove(lastCumulativeAck.messageId);
+ this.consumer.unAckedChunkedMessageIdSequenceMap.remove(cumulativeAck.getMessageId());
shouldFlush = true;
+
+ if (!lastCumulativeAck.restoreOwnershipIfEmpty(cumulativeAck)) {
Review Comment:
I will refactor the existing code again. PTAL after I commit the latest code.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] BewareMyPower merged pull request #16072: [fix][Java Client] Fix thread safety issue of `LastCumulativeAck`
Posted by GitBox <gi...@apache.org>.
BewareMyPower merged PR #16072:
URL: https://github.com/apache/pulsar/pull/16072
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #16072: [fix][Java Client] Fix thread safety issue of `LastCumulativeAck`
Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #16072:
URL: https://github.com/apache/pulsar/pull/16072#discussion_r902259296
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -370,29 +371,9 @@ private void doIndividualBatchAckAsync(BatchMessageIdImpl batchMessageId) {
private void doCumulativeAckAsync(MessageIdImpl msgId, BitSetRecyclable bitSet) {
// Handle concurrent updates from different threads
- LastCumulativeAck currentCumulativeAck = LastCumulativeAck.create(msgId, bitSet);
- while (true) {
- LastCumulativeAck lastCumulativeAck = this.lastCumulativeAck;
- if (msgId.compareTo(lastCumulativeAck.messageId) > 0) {
- if (LAST_CUMULATIVE_ACK_UPDATER.compareAndSet(this, this.lastCumulativeAck, currentCumulativeAck)) {
- if (lastCumulativeAck.bitSetRecyclable != null) {
- try {
- lastCumulativeAck.bitSetRecyclable.recycle();
- } catch (Exception ignore) {
- // no-op
- }
- lastCumulativeAck.bitSetRecyclable = null;
- }
- lastCumulativeAck.recycle();
- // Successfully updated the last cumulative ack. Next flush iteration will send this to broker.
- cumulativeAckFlushRequired = true;
- return;
- }
- } else {
- currentCumulativeAck.recycle();
- // message id acknowledging an before the current last cumulative ack
- return;
- }
+ synchronized (lastCumulativeAck) {
+ lastCumulativeAck.update(msgId, bitSet);
Review Comment:
Did you mean when the `msgId` is the same with the `lastCumulativeAck.messageId`? I think we should check `bitSetRecyclable` to avoid invoking the `recycle()` method.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #16072: [fix][Java Client] Fix thread safety issue of `LastCumulativeAck`
Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #16072:
URL: https://github.com/apache/pulsar/pull/16072#discussion_r903245160
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -560,7 +529,6 @@ private void flushAsync(ClientCnx cnx) {
@Override
public void flushAndClean() {
flush();
- lastCumulativeAck = LastCumulativeAck.create((MessageIdImpl) MessageIdImpl.earliest, null);
Review Comment:
I added the comparison log with `messageId` in `LastCumulativeAck#update` back. So we need to reset the `lastCumulativeAck` now to allow it being updated with any value.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #16072: [fix][Java Client] Fix thread safety issue of `LastCumulativeAck`
Posted by GitBox <gi...@apache.org>.
Demogorgon314 commented on code in PR #16072:
URL: https://github.com/apache/pulsar/pull/16072#discussion_r902131202
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -476,12 +457,24 @@ public void flush() {
private void flushAsync(ClientCnx cnx) {
boolean shouldFlush = false;
if (cumulativeAckFlushRequired) {
- newMessageAckCommandAndWrite(cnx, consumer.consumerId, lastCumulativeAck.messageId.ledgerId,
- lastCumulativeAck.messageId.getEntryId(), lastCumulativeAck.bitSetRecyclable,
+ // `doCumulativeAckAsync` has been invoked before, we need to retrieve the latest value from
+ // `lastCumulativeAck`, here we transfer the ownership of `lastCumulativeAck` to the thread local object
+ // so that we can avoid locking this whole code block or recycling the bit set after `lastCumulativeAck`
+ // is updated.
+ final LastCumulativeAck cumulativeAck = LOCAL_LAST_CUMULATIVE_ACK_TO_FLUSH.get();
+ lastCumulativeAck.moveOwnershipTo(LOCAL_LAST_CUMULATIVE_ACK_TO_FLUSH.get());
Review Comment:
```suggestion
lastCumulativeAck.moveOwnershipTo(cumulativeAck);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #16072: [fix][Java Client] Fix thread safety issue of `LastCumulativeAck`
Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #16072:
URL: https://github.com/apache/pulsar/pull/16072#discussion_r903224237
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -664,36 +632,61 @@ private boolean isAckReceiptEnabled(ClientCnx cnx) {
return ackReceiptEnabled && cnx != null
&& Commands.peerSupportsAckReceipt(cnx.getRemoteEndpointProtocolVersion());
}
+}
- private static class LastCumulativeAck {
- private MessageIdImpl messageId;
- private BitSetRecyclable bitSetRecyclable;
+@Getter
+class LastCumulativeAck {
- static LastCumulativeAck create(MessageIdImpl messageId, BitSetRecyclable bitSetRecyclable) {
- LastCumulativeAck op = RECYCLER.get();
- op.messageId = messageId;
- op.bitSetRecyclable = bitSetRecyclable;
- return op;
- }
+ // It's used as a returned value by `flush()` to avoid creating a new instance each time `flush()` is called
+ public static final FastThreadLocal<LastCumulativeAck> LOCAL_LAST_CUMULATIVE_ACK =
+ new FastThreadLocal<LastCumulativeAck>() {
+
+ @Override
+ protected LastCumulativeAck initialValue() {
+ return new LastCumulativeAck();
+ }
+ };
+ public static final MessageIdImpl DEFAULT_MESSAGE_ID = (MessageIdImpl) MessageIdImpl.earliest;
+
+ private volatile MessageIdImpl messageId = DEFAULT_MESSAGE_ID;
+ private BitSetRecyclable bitSetRecyclable = null;
+ private boolean flushRequired = false;
- private LastCumulativeAck(Recycler.Handle<LastCumulativeAck> recyclerHandle) {
- this.recyclerHandle = recyclerHandle;
+ public synchronized void update(final MessageIdImpl messageId, final BitSetRecyclable bitSetRecyclable) {
+ if (this.bitSetRecyclable != null && this.bitSetRecyclable != bitSetRecyclable) {
+ this.bitSetRecyclable.recycle();
}
+ set(messageId, bitSetRecyclable);
+ flushRequired = true;
+ }
- void recycle() {
+ public synchronized LastCumulativeAck flush() {
+ if (flushRequired) {
+ final LastCumulativeAck localLastCumulativeAck = LOCAL_LAST_CUMULATIVE_ACK.get();
if (bitSetRecyclable != null) {
- this.bitSetRecyclable.recycle();
+ localLastCumulativeAck.set(messageId, BitSetRecyclable.valueOf(bitSetRecyclable.toLongArray()));
Review Comment:
Actually not. `BitSetRecyclable#valueOf` doesn't use Netty recycler to create a `BitSetRecyclable` so it doesn't need to recycle. Regarding to the `bitSetRecyclable` in the current instance, it will be recycled next time `update` is called.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] congbobo184 commented on a diff in pull request #16072: [fix][Java Client] Fix thread safety issue of `LastCumulativeAck`
Posted by GitBox <gi...@apache.org>.
congbobo184 commented on code in PR #16072:
URL: https://github.com/apache/pulsar/pull/16072#discussion_r903211468
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -560,7 +529,6 @@ private void flushAsync(ClientCnx cnx) {
@Override
public void flushAndClean() {
flush();
- lastCumulativeAck = LastCumulativeAck.create((MessageIdImpl) MessageIdImpl.earliest, null);
Review Comment:
if delete this line when seek the message to early position, the consumer will ack the incorrect position
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -664,36 +632,61 @@ private boolean isAckReceiptEnabled(ClientCnx cnx) {
return ackReceiptEnabled && cnx != null
&& Commands.peerSupportsAckReceipt(cnx.getRemoteEndpointProtocolVersion());
}
+}
- private static class LastCumulativeAck {
- private MessageIdImpl messageId;
- private BitSetRecyclable bitSetRecyclable;
+@Getter
+class LastCumulativeAck {
- static LastCumulativeAck create(MessageIdImpl messageId, BitSetRecyclable bitSetRecyclable) {
- LastCumulativeAck op = RECYCLER.get();
- op.messageId = messageId;
- op.bitSetRecyclable = bitSetRecyclable;
- return op;
- }
+ // It's used as a returned value by `flush()` to avoid creating a new instance each time `flush()` is called
+ public static final FastThreadLocal<LastCumulativeAck> LOCAL_LAST_CUMULATIVE_ACK =
+ new FastThreadLocal<LastCumulativeAck>() {
+
+ @Override
+ protected LastCumulativeAck initialValue() {
+ return new LastCumulativeAck();
+ }
+ };
+ public static final MessageIdImpl DEFAULT_MESSAGE_ID = (MessageIdImpl) MessageIdImpl.earliest;
+
+ private volatile MessageIdImpl messageId = DEFAULT_MESSAGE_ID;
+ private BitSetRecyclable bitSetRecyclable = null;
+ private boolean flushRequired = false;
- private LastCumulativeAck(Recycler.Handle<LastCumulativeAck> recyclerHandle) {
- this.recyclerHandle = recyclerHandle;
+ public synchronized void update(final MessageIdImpl messageId, final BitSetRecyclable bitSetRecyclable) {
+ if (this.bitSetRecyclable != null && this.bitSetRecyclable != bitSetRecyclable) {
+ this.bitSetRecyclable.recycle();
}
+ set(messageId, bitSetRecyclable);
+ flushRequired = true;
+ }
- void recycle() {
+ public synchronized LastCumulativeAck flush() {
+ if (flushRequired) {
+ final LastCumulativeAck localLastCumulativeAck = LOCAL_LAST_CUMULATIVE_ACK.get();
if (bitSetRecyclable != null) {
- this.bitSetRecyclable.recycle();
+ localLastCumulativeAck.set(messageId, BitSetRecyclable.valueOf(bitSetRecyclable.toLongArray()));
Review Comment:
this bitSet need to recycle
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #16072: [fix][Java Client] Fix thread safety issue of `LastCumulativeAck`
Posted by GitBox <gi...@apache.org>.
Demogorgon314 commented on code in PR #16072:
URL: https://github.com/apache/pulsar/pull/16072#discussion_r901633127
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -68,18 +74,14 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
private volatile TimedCompletableFuture<Void> currentIndividualAckFuture;
private volatile TimedCompletableFuture<Void> currentCumulativeAckFuture;
- private volatile LastCumulativeAck lastCumulativeAck =
- LastCumulativeAck.create((MessageIdImpl) MessageIdImpl.earliest, null);
+ private final LastCumulativeAck lastCumulativeAck = new LastCumulativeAck();
private volatile boolean cumulativeAckFlushRequired = false;
+
Review Comment:
Unnecessary new line.
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -116,8 +118,8 @@ public PersistentAcknowledgmentsGroupingTracker(ConsumerImpl<?> consumer, Consum
* resent after a disconnection and for which the user has already sent an acknowledgement.
*/
@Override
- public boolean isDuplicate(@NonNull MessageId messageId) {
- final MessageId messageIdOfLastAck = lastCumulativeAck.messageId;
+ public boolean isDuplicate(MessageId messageId) {
+ final MessageIdImpl messageIdOfLastAck = lastCumulativeAck.getMessageId();
Review Comment:
Why do we remove the `@NonNull` annotation?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #16072: [fix][Java Client] Fix thread safety issue of `LastCumulativeAck`
Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #16072:
URL: https://github.com/apache/pulsar/pull/16072#discussion_r902248174
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -60,6 +59,13 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
* When reaching the max group size, an ack command is sent out immediately.
*/
private static final int MAX_ACK_GROUP_SIZE = 1000;
+ private static final FastThreadLocal<LastCumulativeAck> LOCAL_LAST_CUMULATIVE_ACK_TO_FLUSH =
Review Comment:
Good catch!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org