You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/02/12 17:15:24 UTC
[pulsar] 02/02: Fix race condition of OpSendMsgQueue when publishing messages (#14231)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 1c3c3f40b0d58108a4d0f53489f272f8468bed65
Author: lipenghui <pe...@apache.org>
AuthorDate: Sat Feb 12 13:26:42 2022 +0800
Fix race condition of OpSendMsgQueue when publishing messages (#14231)
* Add synchronized for getPendingQueueSize();
* Use iterator instead.
* Use counter to keep track of messages count
* Changed to int
Co-authored-by: Matteo Merli <mm...@apache.org>
---
.../apache/pulsar/client/impl/ProducerImpl.java | 34 ++++++++++------------
1 file changed, 16 insertions(+), 18 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 877d88b..8486283 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -57,7 +57,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.function.Consumer;
import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Message;
@@ -994,7 +993,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
if (sequenceId > op.sequenceId) {
log.warn("[{}] [{}] Got ack for msg. expecting: {} - {} - got: {} - {} - queue-size: {}", topic, producerName,
- op.sequenceId, op.highestSequenceId, sequenceId, highestSequenceId, pendingMessages.size());
+ op.sequenceId, op.highestSequenceId, sequenceId, highestSequenceId, pendingMessages.messagesCount());
// Force connection closing so that messages can be re-transmitted in a new connection
cnx.channel().close();
return;
@@ -1016,7 +1015,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
releaseSemaphoreForSendOp(op);
} else {
log.warn("[{}] [{}] Got ack for batch msg error. expecting: {} - {} - got: {} - {} - queue-size: {}", topic, producerName,
- op.sequenceId, op.highestSequenceId, sequenceId, highestSequenceId, pendingMessages.size());
+ op.sequenceId, op.highestSequenceId, sequenceId, highestSequenceId, pendingMessages.messagesCount());
// Force connection closing so that messages can be re-transmitted in a new connection
cnx.channel().close();
return;
@@ -1332,6 +1331,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
private final Queue<OpSendMsg> delegate = new ArrayDeque<>();
private int forEachDepth = 0;
private List<OpSendMsg> postponedOpSendMgs;
+ private final AtomicInteger messagesCount = new AtomicInteger(0);
@Override
public void forEach(Consumer<? super OpSendMsg> action) {
@@ -1352,6 +1352,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
public boolean add(OpSendMsg o) {
// postpone adding to the queue while forEach iteration is in progress
+ messagesCount.addAndGet(o.numMessagesInBatch);
if (forEachDepth > 0) {
if (postponedOpSendMgs == null) {
postponedOpSendMgs = new ArrayList<>();
@@ -1364,18 +1365,22 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
public void clear() {
delegate.clear();
+ messagesCount.set(0);
}
public void remove() {
- delegate.remove();
+ OpSendMsg op = delegate.remove();
+ if (op != null) {
+ messagesCount.addAndGet(-op.numMessagesInBatch);
+ }
}
public OpSendMsg peek() {
return delegate.peek();
}
- public int size() {
- return delegate.size();
+ public int messagesCount() {
+ return messagesCount.get();
}
@Override
@@ -1548,7 +1553,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Pending messages: {}", topic, producerName,
- pendingMessages.size());
+ pendingMessages.messagesCount());
}
PulsarClientException bqe = new PulsarClientException.ProducerBlockedQuotaExceededException(
@@ -1649,7 +1654,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
cnx.channel().close();
return;
}
- int messagesToResend = pendingMessages.size();
+ int messagesToResend = pendingMessages.messagesCount();
if (messagesToResend == 0) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] No pending messages to resend {}", topic, producerName, messagesToResend);
@@ -1755,7 +1760,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
// The diff is less than or equal to zero, meaning that the message has been timed out.
// Set the callback to timeout on every message, then clear the pending queue.
log.info("[{}] [{}] Message send timed out. Failing {} messages", topic, producerName,
- pendingMessages.size());
+ pendingMessages.messagesCount());
PulsarClientException te = new PulsarClientException.TimeoutException(
format("The producer %s can not send message to the topic %s within given timeout",
@@ -1925,7 +1930,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
// called again once the new connection registers the producer with the broker.
log.info("[{}][{}] Producer epoch mismatch or the current connection is null. Skip re-sending the "
+ " {} pending messages since they will deliver using another connection.", topic, producerName,
- pendingMessages.size());
+ pendingMessages.messagesCount());
return;
}
final boolean stripChecksum = cnx.getRemoteEndpointProtocolVersion() < brokerChecksumSupportedVersion();
@@ -1997,14 +2002,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
}
public int getPendingQueueSize() {
- if (!isBatchMessagingEnabled()) {
- return pendingMessages.size();
- }
- MutableInt size = new MutableInt(0);
- pendingMessages.forEach(op -> {
- size.add(Math.max(op.numMessagesInBatch, 1));
- });
- return size.getValue();
+ return pendingMessages.messagesCount();
}
@Override