You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/04/27 14:39:25 UTC
[pulsar] branch master updated: [consumer] Revert "Remove consumer
unnecessary locks (#9261)" (#10240)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e95d6f0 [consumer] Revert "Remove consumer unnecessary locks (#9261)" (#10240)
e95d6f0 is described below
commit e95d6f03da5ec75eda46870e36592b62108c8b8a
Author: linlinnn <li...@163.com>
AuthorDate: Tue Apr 27 22:38:34 2021 +0800
[consumer] Revert "Remove consumer unnecessary locks (#9261)" (#10240)
---
.../apache/pulsar/client/impl/ConsumerImpl.java | 64 ++++++++++++++--------
.../client/impl/MultiTopicsConsumerImpl.java | 18 +++---
2 files changed, 51 insertions(+), 31 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 089b3d1..2c669ea 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -413,6 +413,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
CompletableFutureCancellationHandler cancellationHandler = new CompletableFutureCancellationHandler();
CompletableFuture<Message<T>> result = cancellationHandler.createFuture();
Message<T> message = null;
+ lock.writeLock().lock();
try {
message = incomingMessages.poll(0, TimeUnit.MILLISECONDS);
if (message == null) {
@@ -422,6 +423,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
result.completeExceptionally(e);
+ } finally {
+ lock.writeLock().unlock();
}
if (message != null) {
@@ -472,8 +475,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
protected CompletableFuture<Messages<T>> internalBatchReceiveAsync() {
CompletableFutureCancellationHandler cancellationHandler = new CompletableFutureCancellationHandler();
CompletableFuture<Messages<T>> result = cancellationHandler.createFuture();
+ lock.writeLock().lock();
try {
- lock.writeLock().lock();
if (pendingBatchReceives == null) {
pendingBatchReceives = Queues.newConcurrentLinkedQueue();
}
@@ -498,7 +501,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
} finally {
lock.writeLock().unlock();
}
-
return result;
}
@@ -561,8 +563,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
delayTime = 0;
}
if (retryLetterProducer == null) {
+ createProducerLock.writeLock().lock();
try {
- createProducerLock.writeLock().lock();
if (retryLetterProducer == null) {
retryLetterProducer = client.newProducer(schema)
.topic(this.deadLetterPolicy.getRetryLetterTopic())
@@ -957,9 +959,14 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
}
private void failPendingReceive() {
- if (pinnedExecutor != null && !pinnedExecutor.isShutdown()) {
- failPendingReceives(this.pendingReceives);
- failPendingBatchReceives(this.pendingBatchReceives);
+ lock.readLock().lock();
+ try {
+ if (pinnedExecutor != null && !pinnedExecutor.isShutdown()) {
+ failPendingReceives(this.pendingReceives);
+ failPendingBatchReceives(this.pendingBatchReceives);
+ }
+ } finally {
+ lock.readLock().unlock();
}
}
@@ -1058,18 +1065,23 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
poolMessages);
uncompressedPayload.release();
- // Enqueue the message so that it can be retrieved when application calls receive()
- // if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it.
- // if asyncReceive is waiting then notify callback without adding to incomingMessages queue
- if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null &&
- redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
- possibleSendToDeadLetterTopicMessages.put((MessageIdImpl) message.getMessageId(),
- Collections.singletonList(message));
- }
- if (peekPendingReceive() != null) {
- notifyPendingReceivedCallback(message, null);
- } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
- notifyPendingBatchReceivedCallBack();
+ lock.readLock().lock();
+ try {
+ // Enqueue the message so that it can be retrieved when application calls receive()
+ // if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it.
+ // if asyncReceive is waiting then notify callback without adding to incomingMessages queue
+ if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null &&
+ redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
+ possibleSendToDeadLetterTopicMessages.put((MessageIdImpl) message.getMessageId(),
+ Collections.singletonList(message));
+ }
+ if (peekPendingReceive() != null) {
+ notifyPendingReceivedCallback(message, null);
+ } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
+ notifyPendingBatchReceivedCallBack();
+ }
+ } finally {
+ lock.readLock().unlock();
}
} else {
// handle batch message enqueuing; uncompressed payload has all messages in batch
@@ -1280,11 +1292,15 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
if (possibleToDeadLetter != null) {
possibleToDeadLetter.add(message);
}
-
- if (peekPendingReceive() != null) {
- notifyPendingReceivedCallback(message, null);
- } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
- notifyPendingBatchReceivedCallBack();
+ lock.readLock().lock();
+ try {
+ if (peekPendingReceive() != null) {
+ notifyPendingReceivedCallback(message, null);
+ } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
+ notifyPendingBatchReceivedCallBack();
+ }
+ } finally {
+ lock.readLock().unlock();
}
singleMessagePayload.release();
}
@@ -1706,8 +1722,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
private void initDeadLetterProducerIfNeeded() {
if (deadLetterProducer == null) {
+ createProducerLock.writeLock().lock();
try {
- createProducerLock.writeLock().lock();
if (deadLetterProducer == null) {
deadLetterProducer = client.newProducer(schema)
.topic(this.deadLetterPolicy.getDeadLetterTopic())
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 2272339..df6a830 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -602,13 +602,17 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
@Override
public void redeliverUnacknowledgedMessages() {
- consumers.values().stream().forEach(consumer -> {
- consumer.redeliverUnacknowledgedMessages();
- consumer.unAckedChunkedMessageIdSequenceMap.clear();
- });
- clearIncomingMessages();
- unAckedMessageTracker.clear();
-
+ lock.writeLock().lock();
+ try {
+ consumers.values().stream().forEach(consumer -> {
+ consumer.redeliverUnacknowledgedMessages();
+ consumer.unAckedChunkedMessageIdSequenceMap.clear();
+ });
+ clearIncomingMessages();
+ unAckedMessageTracker.clear();
+ } finally {
+ lock.writeLock().unlock();
+ }
resumeReceivingFromPausedConsumersIfNeeded();
}