You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ch...@apache.org on 2022/06/28 01:19:25 UTC

[pulsar] branch master updated: [improve][broker] Avoid go through all the consumers to get the message ack owner (#16245)

This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 68484f9162b [improve][broker] Avoid go through all the consumers to get the message ack owner (#16245)
68484f9162b is described below

commit 68484f9162bc768816cfd039140fb78196485d19
Author: lipenghui <pe...@apache.org>
AuthorDate: Tue Jun 28 09:19:18 2022 +0800

    [improve][broker] Avoid go through all the consumers to get the message ack owner (#16245)
    
    ### Motivation
    
    The broker don't need to go through all the consumers to get the ack owner consumer.
    Instead, it should check the current consumer first. If the pending acks of current consumer
    don't have the ack position, go through all the consumers to find the owner consumer.
---
 .../main/java/org/apache/pulsar/broker/service/Consumer.java   | 10 ++++++----
 1 file changed, 6 insertions(+), 4 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index cce8fbee885..bedaabf28d1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -624,10 +624,12 @@ public class Consumer {
     private Consumer getAckOwnerConsumer(long ledgerId, long entryId) {
         Consumer ackOwnerConsumer = this;
         if (Subscription.isIndividualAckMode(subType)) {
-            for (Consumer consumer : subscription.getConsumers()) {
-                if (consumer != this && consumer.getPendingAcks().containsKey(ledgerId, entryId)) {
-                    ackOwnerConsumer = consumer;
-                    break;
+            if (!getPendingAcks().containsKey(ledgerId, entryId)) {
+                for (Consumer consumer : subscription.getConsumers()) {
+                    if (consumer != this && consumer.getPendingAcks().containsKey(ledgerId, entryId)) {
+                        ackOwnerConsumer = consumer;
+                        break;
+                    }
                 }
             }
         }