You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/10/07 06:35:32 UTC

[GitHub] [pulsar] eolivelli commented on a change in pull request #8207: Remove unnecessary locks

eolivelli commented on a change in pull request #8207:
URL: https://github.com/apache/pulsar/pull/8207#discussion_r500767874



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
##########
@@ -219,51 +219,44 @@ private void receiveMessageFromConsumer(ConsumerImpl<T> consumer) {
             messageReceived(consumer, message);
 
             // we're modifying pausedConsumers
-            lock.writeLock().lock();
-            try {
-                int size = incomingMessages.size();
-                if (size >= maxReceiverQueueSize
-                        || (size > sharedQueueResumeThreshold && !pausedConsumers.isEmpty())) {
-                    // mark this consumer to be resumed later: if No more space left in shared queue,
-                    // or if any consumer is already paused (to create fair chance for already paused consumers)
-                    pausedConsumers.add(consumer);
-                } else {
-                    // Schedule next receiveAsync() if the incoming queue is not full. Use a different thread to avoid
-                    // recursion and stack overflow
-                    client.eventLoopGroup().execute(() -> {
-                        receiveMessageFromConsumer(consumer);
-                    });
-                }
-            } finally {
-                lock.writeLock().unlock();
+            int size = incomingMessages.size();
+            if (size >= maxReceiverQueueSize
+                    || (size > sharedQueueResumeThreshold && !pausedConsumers.isEmpty())) {
+                // mark this consumer to be resumed later: if No more space left in shared queue,
+                // or if any consumer is already paused (to create fair chance for already paused consumers)
+                pausedConsumers.add(consumer);
+            } else {
+                // Schedule next receiveAsync() if the incoming queue is not full. Use a different thread to avoid
+                // recursion and stack overflow
+                client.eventLoopGroup().execute(() -> {
+                    receiveMessageFromConsumer(consumer);
+                });
             }
         });
     }
 
     private void messageReceived(ConsumerImpl<T> consumer, Message<T> message) {
         checkArgument(message instanceof MessageImpl);
-        lock.writeLock().lock();
-        try {
-            TopicMessageImpl<T> topicMessage = new TopicMessageImpl<>(
+        TopicMessageImpl<T> topicMessage = new TopicMessageImpl<>(
                 consumer.getTopic(), consumer.getTopicNameWithoutPartition(), message);
 
-            if (log.isDebugEnabled()) {
-                log.debug("[{}][{}] Received message from topics-consumer {}",
+        if (log.isDebugEnabled()) {
+            log.debug("[{}][{}] Received message from topics-consumer {}",
                     topic, subscription, message.getMessageId());
-            }
+        }
 
-            // if asyncReceive is waiting : return message to callback without adding to incomingMessages queue
-            if (!pendingReceives.isEmpty()) {
-                CompletableFuture<Message<T>> receivedFuture = pendingReceives.poll();
-                unAckedMessageTracker.add(topicMessage.getMessageId());
-                listenerExecutor.execute(() -> receivedFuture.complete(topicMessage));
-            } else if (enqueueMessageAndCheckBatchReceive(topicMessage)) {
-                if (hasPendingBatchReceive()) {
-                    notifyPendingBatchReceivedCallBack();
-                }
+        // if asyncReceive is waiting : return message to callback without adding to incomingMessages queue
+        CompletableFuture<Message<T>> receivedFuture = pendingReceives.poll();
+        if (receivedFuture != null) {
+            unAckedMessageTracker.add(topicMessage.getMessageId());
+            listenerExecutor.execute(() -> receivedFuture.complete(topicMessage));
+        } else if (enqueueMessageAndCheckBatchReceive(topicMessage) && hasPendingBatchReceive()) {
+            try {
+                lock.writeLock().lock();

Review comment:
       we are now using only writeLock so we could use a simple ReentrantLock and not a ReentrantReadWriteLock

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
##########
@@ -219,51 +219,44 @@ private void receiveMessageFromConsumer(ConsumerImpl<T> consumer) {
             messageReceived(consumer, message);
 
             // we're modifying pausedConsumers
-            lock.writeLock().lock();
-            try {
-                int size = incomingMessages.size();
-                if (size >= maxReceiverQueueSize
-                        || (size > sharedQueueResumeThreshold && !pausedConsumers.isEmpty())) {
-                    // mark this consumer to be resumed later: if No more space left in shared queue,
-                    // or if any consumer is already paused (to create fair chance for already paused consumers)
-                    pausedConsumers.add(consumer);
-                } else {
-                    // Schedule next receiveAsync() if the incoming queue is not full. Use a different thread to avoid
-                    // recursion and stack overflow
-                    client.eventLoopGroup().execute(() -> {
-                        receiveMessageFromConsumer(consumer);
-                    });
-                }
-            } finally {
-                lock.writeLock().unlock();
+            int size = incomingMessages.size();
+            if (size >= maxReceiverQueueSize
+                    || (size > sharedQueueResumeThreshold && !pausedConsumers.isEmpty())) {
+                // mark this consumer to be resumed later: if No more space left in shared queue,
+                // or if any consumer is already paused (to create fair chance for already paused consumers)
+                pausedConsumers.add(consumer);
+            } else {
+                // Schedule next receiveAsync() if the incoming queue is not full. Use a different thread to avoid
+                // recursion and stack overflow
+                client.eventLoopGroup().execute(() -> {
+                    receiveMessageFromConsumer(consumer);
+                });
             }
         });
     }
 
     private void messageReceived(ConsumerImpl<T> consumer, Message<T> message) {
         checkArgument(message instanceof MessageImpl);
-        lock.writeLock().lock();
-        try {
-            TopicMessageImpl<T> topicMessage = new TopicMessageImpl<>(
+        TopicMessageImpl<T> topicMessage = new TopicMessageImpl<>(
                 consumer.getTopic(), consumer.getTopicNameWithoutPartition(), message);
 
-            if (log.isDebugEnabled()) {
-                log.debug("[{}][{}] Received message from topics-consumer {}",
+        if (log.isDebugEnabled()) {
+            log.debug("[{}][{}] Received message from topics-consumer {}",
                     topic, subscription, message.getMessageId());
-            }
+        }
 
-            // if asyncReceive is waiting : return message to callback without adding to incomingMessages queue
-            if (!pendingReceives.isEmpty()) {
-                CompletableFuture<Message<T>> receivedFuture = pendingReceives.poll();
-                unAckedMessageTracker.add(topicMessage.getMessageId());
-                listenerExecutor.execute(() -> receivedFuture.complete(topicMessage));
-            } else if (enqueueMessageAndCheckBatchReceive(topicMessage)) {
-                if (hasPendingBatchReceive()) {
-                    notifyPendingBatchReceivedCallBack();
-                }
+        // if asyncReceive is waiting : return message to callback without adding to incomingMessages queue
+        CompletableFuture<Message<T>> receivedFuture = pendingReceives.poll();
+        if (receivedFuture != null) {
+            unAckedMessageTracker.add(topicMessage.getMessageId());
+            listenerExecutor.execute(() -> receivedFuture.complete(topicMessage));
+        } else if (enqueueMessageAndCheckBatchReceive(topicMessage) && hasPendingBatchReceive()) {
+            try {
+                lock.writeLock().lock();
+                notifyPendingBatchReceivedCallBack();

Review comment:
       not every call of this `notifyPendingBatchReceivedCallBack` method are guarded by the lock, do we need to use the lock here ?

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
##########
@@ -219,51 +219,44 @@ private void receiveMessageFromConsumer(ConsumerImpl<T> consumer) {
             messageReceived(consumer, message);
 
             // we're modifying pausedConsumers

Review comment:
       probably this comment is no more useful




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org