You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/10/07 07:24:15 UTC

[GitHub] [pulsar] 315157973 commented on a change in pull request #8207: Remove unnecessary locks

315157973 commented on a change in pull request #8207:
URL: https://github.com/apache/pulsar/pull/8207#discussion_r500791696



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
##########
@@ -219,51 +219,44 @@ private void receiveMessageFromConsumer(ConsumerImpl<T> consumer) {
             messageReceived(consumer, message);
 
             // we're modifying pausedConsumers
-            lock.writeLock().lock();
-            try {
-                int size = incomingMessages.size();
-                if (size >= maxReceiverQueueSize
-                        || (size > sharedQueueResumeThreshold && !pausedConsumers.isEmpty())) {
-                    // mark this consumer to be resumed later: if No more space left in shared queue,
-                    // or if any consumer is already paused (to create fair chance for already paused consumers)
-                    pausedConsumers.add(consumer);
-                } else {
-                    // Schedule next receiveAsync() if the incoming queue is not full. Use a different thread to avoid
-                    // recursion and stack overflow
-                    client.eventLoopGroup().execute(() -> {
-                        receiveMessageFromConsumer(consumer);
-                    });
-                }
-            } finally {
-                lock.writeLock().unlock();
+            int size = incomingMessages.size();
+            if (size >= maxReceiverQueueSize
+                    || (size > sharedQueueResumeThreshold && !pausedConsumers.isEmpty())) {
+                // mark this consumer to be resumed later: if No more space left in shared queue,
+                // or if any consumer is already paused (to create fair chance for already paused consumers)
+                pausedConsumers.add(consumer);
+            } else {
+                // Schedule next receiveAsync() if the incoming queue is not full. Use a different thread to avoid
+                // recursion and stack overflow
+                client.eventLoopGroup().execute(() -> {
+                    receiveMessageFromConsumer(consumer);
+                });
             }
         });
     }
 
     private void messageReceived(ConsumerImpl<T> consumer, Message<T> message) {
         checkArgument(message instanceof MessageImpl);
-        lock.writeLock().lock();
-        try {
-            TopicMessageImpl<T> topicMessage = new TopicMessageImpl<>(
+        TopicMessageImpl<T> topicMessage = new TopicMessageImpl<>(
                 consumer.getTopic(), consumer.getTopicNameWithoutPartition(), message);
 
-            if (log.isDebugEnabled()) {
-                log.debug("[{}][{}] Received message from topics-consumer {}",
+        if (log.isDebugEnabled()) {
+            log.debug("[{}][{}] Received message from topics-consumer {}",
                     topic, subscription, message.getMessageId());
-            }
+        }
 
-            // if asyncReceive is waiting : return message to callback without adding to incomingMessages queue
-            if (!pendingReceives.isEmpty()) {
-                CompletableFuture<Message<T>> receivedFuture = pendingReceives.poll();
-                unAckedMessageTracker.add(topicMessage.getMessageId());
-                listenerExecutor.execute(() -> receivedFuture.complete(topicMessage));
-            } else if (enqueueMessageAndCheckBatchReceive(topicMessage)) {
-                if (hasPendingBatchReceive()) {
-                    notifyPendingBatchReceivedCallBack();
-                }
+        // if asyncReceive is waiting : return message to callback without adding to incomingMessages queue
+        CompletableFuture<Message<T>> receivedFuture = pendingReceives.poll();
+        if (receivedFuture != null) {
+            unAckedMessageTracker.add(topicMessage.getMessageId());
+            listenerExecutor.execute(() -> receivedFuture.complete(topicMessage));
+        } else if (enqueueMessageAndCheckBatchReceive(topicMessage) && hasPendingBatchReceive()) {
+            try {
+                lock.writeLock().lock();
+                notifyPendingBatchReceivedCallBack();

Review comment:
       This is a good suggestion, thank you very much




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org