You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/04/27 14:39:25 UTC

[pulsar] branch master updated: [consumer] Revert "Remove consumer unnecessary locks (#9261)" (#10240)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e95d6f0  [consumer] Revert "Remove consumer unnecessary locks (#9261)" (#10240)
e95d6f0 is described below

commit e95d6f03da5ec75eda46870e36592b62108c8b8a
Author: linlinnn <li...@163.com>
AuthorDate: Tue Apr 27 22:38:34 2021 +0800

    [consumer] Revert "Remove consumer unnecessary locks (#9261)" (#10240)
---
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 64 ++++++++++++++--------
 .../client/impl/MultiTopicsConsumerImpl.java       | 18 +++---
 2 files changed, 51 insertions(+), 31 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 089b3d1..2c669ea 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -413,6 +413,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         CompletableFutureCancellationHandler cancellationHandler = new CompletableFutureCancellationHandler();
         CompletableFuture<Message<T>> result = cancellationHandler.createFuture();
         Message<T> message = null;
+        lock.writeLock().lock();
         try {
             message = incomingMessages.poll(0, TimeUnit.MILLISECONDS);
             if (message == null) {
@@ -422,6 +423,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             result.completeExceptionally(e);
+        } finally {
+            lock.writeLock().unlock();
         }
 
         if (message != null) {
@@ -472,8 +475,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
     protected CompletableFuture<Messages<T>> internalBatchReceiveAsync() {
         CompletableFutureCancellationHandler cancellationHandler = new CompletableFutureCancellationHandler();
         CompletableFuture<Messages<T>> result = cancellationHandler.createFuture();
+        lock.writeLock().lock();
         try {
-            lock.writeLock().lock();
             if (pendingBatchReceives == null) {
                 pendingBatchReceives = Queues.newConcurrentLinkedQueue();
             }
@@ -498,7 +501,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         } finally {
             lock.writeLock().unlock();
         }
-
         return result;
     }
 
@@ -561,8 +563,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
             delayTime = 0;
         }
         if (retryLetterProducer == null) {
+            createProducerLock.writeLock().lock();
             try {
-                createProducerLock.writeLock().lock();
                 if (retryLetterProducer == null) {
                     retryLetterProducer = client.newProducer(schema)
                             .topic(this.deadLetterPolicy.getRetryLetterTopic())
@@ -957,9 +959,14 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
     }
 
     private void failPendingReceive() {
-        if (pinnedExecutor != null && !pinnedExecutor.isShutdown()) {
-            failPendingReceives(this.pendingReceives);
-            failPendingBatchReceives(this.pendingBatchReceives);
+        lock.readLock().lock();
+        try {
+            if (pinnedExecutor != null && !pinnedExecutor.isShutdown()) {
+                failPendingReceives(this.pendingReceives);
+                failPendingBatchReceives(this.pendingBatchReceives);
+            }
+        } finally {
+            lock.readLock().unlock();
         }
     }
 
@@ -1058,18 +1065,23 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                     poolMessages);
             uncompressedPayload.release();
 
-            // Enqueue the message so that it can be retrieved when application calls receive()
-            // if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it.
-            // if asyncReceive is waiting then notify callback without adding to incomingMessages queue
-            if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null &&
-                    redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
-                possibleSendToDeadLetterTopicMessages.put((MessageIdImpl) message.getMessageId(),
-                        Collections.singletonList(message));
-            }
-            if (peekPendingReceive() != null) {
-                notifyPendingReceivedCallback(message, null);
-            } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
-                notifyPendingBatchReceivedCallBack();
+            lock.readLock().lock();
+            try {
+                // Enqueue the message so that it can be retrieved when application calls receive()
+                // if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it.
+                // if asyncReceive is waiting then notify callback without adding to incomingMessages queue
+                if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null &&
+                        redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
+                    possibleSendToDeadLetterTopicMessages.put((MessageIdImpl) message.getMessageId(),
+                            Collections.singletonList(message));
+                }
+                if (peekPendingReceive() != null) {
+                    notifyPendingReceivedCallback(message, null);
+                } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
+                    notifyPendingBatchReceivedCallBack();
+                }
+            } finally {
+                lock.readLock().unlock();
             }
         } else {
             // handle batch message enqueuing; uncompressed payload has all messages in batch
@@ -1280,11 +1292,15 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                 if (possibleToDeadLetter != null) {
                     possibleToDeadLetter.add(message);
                 }
-
-                if (peekPendingReceive() != null) {
-                    notifyPendingReceivedCallback(message, null);
-                } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
-                    notifyPendingBatchReceivedCallBack();
+                lock.readLock().lock();
+                try {
+                    if (peekPendingReceive() != null) {
+                        notifyPendingReceivedCallback(message, null);
+                    } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
+                        notifyPendingBatchReceivedCallBack();
+                    }
+                } finally {
+                    lock.readLock().unlock();
                 }
                 singleMessagePayload.release();
             }
@@ -1706,8 +1722,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
 
     private void initDeadLetterProducerIfNeeded() {
         if (deadLetterProducer == null) {
+            createProducerLock.writeLock().lock();
             try {
-                createProducerLock.writeLock().lock();
                 if (deadLetterProducer == null) {
                     deadLetterProducer = client.newProducer(schema)
                             .topic(this.deadLetterPolicy.getDeadLetterTopic())
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 2272339..df6a830 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -602,13 +602,17 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
 
     @Override
     public void redeliverUnacknowledgedMessages() {
-        consumers.values().stream().forEach(consumer -> {
-            consumer.redeliverUnacknowledgedMessages();
-            consumer.unAckedChunkedMessageIdSequenceMap.clear();
-        });
-        clearIncomingMessages();
-        unAckedMessageTracker.clear();
-
+        lock.writeLock().lock();
+        try {
+            consumers.values().stream().forEach(consumer -> {
+                consumer.redeliverUnacknowledgedMessages();
+                consumer.unAckedChunkedMessageIdSequenceMap.clear();
+            });
+            clearIncomingMessages();
+            unAckedMessageTracker.clear();
+        } finally {
+            lock.writeLock().unlock();
+        }
         resumeReceivingFromPausedConsumersIfNeeded();
     }