You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2023/05/09 06:17:42 UTC

[pulsar] 01/02: [improve][broker] Get lowest PositionImpl from NavigableSet (#18278)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2fb93660aff68b60f0126840f57e9eca46807ed9
Author: WJL3333 <wj...@163.com>
AuthorDate: Wed Nov 2 22:32:42 2022 +0800

    [improve][broker] Get lowest PositionImpl from NavigableSet (#18278)
    
    * [cleanup] Direct get lowest PositionImpl from TreeMap
    
    change signature from Set<T> to NavigableSet<T>
    which makes the caller to get lowest PositionImpl more efficient.
    
    * change poll to first when call `NavigableSet`
    
    * fix check style remove unused import
    
    Co-authored-by: wangjinlong <wa...@zhihu.com>
---
 .../broker/delayed/DelayedDeliveryTracker.java       |  4 ++--
 .../delayed/InMemoryDelayedDeliveryTracker.java      |  6 +++---
 .../persistent/MessageRedeliveryController.java      |  3 ++-
 .../PersistentDispatcherMultipleConsumers.java       | 20 ++++++++++++--------
 ...rsistentStickyKeyDispatcherMultipleConsumers.java | 10 ++++++----
 .../utils/ConcurrentBitmapSortedLongPairSet.java     |  4 ++--
 6 files changed, 27 insertions(+), 20 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
index 35853d3599b..68943f6c398 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
@@ -19,7 +19,7 @@
 package org.apache.pulsar.broker.delayed;
 
 import com.google.common.annotations.Beta;
-import java.util.Set;
+import java.util.NavigableSet;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 
 /**
@@ -53,7 +53,7 @@ public interface DelayedDeliveryTracker extends AutoCloseable {
     /**
      * Get a set of position of messages that have already reached the delivery time.
      */
-    Set<PositionImpl> getScheduledMessages(int maxMessages);
+    NavigableSet<PositionImpl> getScheduledMessages(int maxMessages);
 
     /**
      * Tells whether the dispatcher should pause any message deliveries, until the DelayedDeliveryTracker has
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
index 38cc3c4f6e0..a7f8e68bf2b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
@@ -22,7 +22,7 @@ import io.netty.util.Timeout;
 import io.netty.util.Timer;
 import io.netty.util.TimerTask;
 import java.time.Clock;
-import java.util.Set;
+import java.util.NavigableSet;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
@@ -146,9 +146,9 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T
      * Get a set of position of messages that have already reached.
      */
     @Override
-    public Set<PositionImpl> getScheduledMessages(int maxMessages) {
+    public NavigableSet<PositionImpl> getScheduledMessages(int maxMessages) {
         int n = maxMessages;
-        Set<PositionImpl> positions = new TreeSet<>();
+        NavigableSet<PositionImpl> positions = new TreeSet<>();
         long cutoffTime = getCutoffTime();
 
         while (n > 0 && !priorityQueue.isEmpty()) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
index e6febc722de..934628b05a9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service.persistent;
 import com.google.common.collect.ComparisonChain;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.NavigableSet;
 import java.util.Set;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -136,7 +137,7 @@ public class MessageRedeliveryController {
         return false;
     }
 
-    public Set<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
+    public NavigableSet<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
         return messagesToRedeliver.items(maxMessagesToRead, PositionImpl::new);
     }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 20c1173286e..6ee923ede0e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -27,6 +27,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableSet;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -278,7 +279,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
                 return;
             }
 
-            Set<PositionImpl> messagesToReplayNow = getMessagesToReplayNow(messagesToRead);
+            NavigableSet<PositionImpl> messagesToReplayNow = getMessagesToReplayNow(messagesToRead);
 
             if (!messagesToReplayNow.isEmpty()) {
                 if (log.isDebugEnabled()) {
@@ -287,7 +288,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
                 }
 
                 havePendingReplayRead = true;
-                minReplayedPosition = messagesToReplayNow.stream().min(PositionImpl::compareTo).orElse(null);
+                minReplayedPosition = messagesToReplayNow.first();
                 Set<? extends Position> deletedMessages = topic.isDelayedDeliveryEnabled()
                         ? asyncReplayEntriesInOrder(messagesToReplayNow) : asyncReplayEntries(messagesToReplayNow);
                 // clear already acked positions from replay bucket
@@ -311,11 +312,14 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
                             consumerList.size());
                 }
                 havePendingRead = true;
-                Set<PositionImpl> toReplay = getMessagesToReplayNow(1);
-                minReplayedPosition = toReplay.stream().findFirst().orElse(null);
-                if (minReplayedPosition != null) {
+                NavigableSet<PositionImpl> toReplay = getMessagesToReplayNow(1);
+                if (!toReplay.isEmpty()) {
+                    minReplayedPosition = toReplay.first();
                     redeliveryMessages.add(minReplayedPosition.getLedgerId(), minReplayedPosition.getEntryId());
+                } else {
+                    minReplayedPosition = null;
                 }
+
                 cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead, this,
                         ReadType.Normal, topic.getMaxReadPosition());
             } else {
@@ -1043,17 +1047,17 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
         }
     }
 
-    protected synchronized Set<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
+    protected synchronized NavigableSet<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
         if (!redeliveryMessages.isEmpty()) {
             return redeliveryMessages.getMessagesToReplayNow(maxMessagesToRead);
         } else if (delayedDeliveryTracker.isPresent() && delayedDeliveryTracker.get().hasMessageAvailable()) {
             delayedDeliveryTracker.get().resetTickTime(topic.getDelayedDeliveryTickTimeMillis());
-            Set<PositionImpl> messagesAvailableNow =
+            NavigableSet<PositionImpl> messagesAvailableNow =
                     delayedDeliveryTracker.get().getScheduledMessages(maxMessagesToRead);
             messagesAvailableNow.forEach(p -> redeliveryMessages.add(p.getLedgerId(), p.getEntryId()));
             return messagesAvailableNow;
         } else {
-            return Collections.emptySet();
+            return Collections.emptyNavigableSet();
         }
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 1df6366c389..1e52d985520 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -29,6 +29,7 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableSet;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -184,9 +185,10 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
         // A corner case that we have to retry a readMoreEntries in order to preserver order delivery.
         // This may happen when consumer closed. See issue #12885 for details.
         if (!allowOutOfOrderDelivery) {
-            Set<PositionImpl> messagesToReplayNow = this.getMessagesToReplayNow(1);
+            NavigableSet<PositionImpl> messagesToReplayNow = this.getMessagesToReplayNow(1);
             if (messagesToReplayNow != null && !messagesToReplayNow.isEmpty()) {
-                PositionImpl replayPosition = messagesToReplayNow.stream().findFirst().get();
+                PositionImpl replayPosition = messagesToReplayNow.first();
+
                 // We have received a message potentially from the delayed tracker and, since we're not using it
                 // right now, it needs to be added to the redelivery tracker or we won't attempt anymore to
                 // resend it (until we disconnect consumer).
@@ -458,13 +460,13 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
     }
 
     @Override
-    protected synchronized Set<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
+    protected synchronized NavigableSet<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
         if (isDispatcherStuckOnReplays) {
             // If we're stuck on replay, we want to move forward reading on the topic (until the overall max-unacked
             // messages kicks in), instead of keep replaying the same old messages, since the consumer that these
             // messages are routing to might be busy at the moment
             this.isDispatcherStuckOnReplays = false;
-            return Collections.emptySet();
+            return Collections.emptyNavigableSet();
         } else {
             return super.getMessagesToReplayNow(maxMessagesToRead);
         }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSet.java b/pulsar-broker/src/main/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSet.java
index c9f1c65daca..a33a78f1eae 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSet.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSet.java
@@ -22,7 +22,6 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.NavigableSet;
-import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -95,7 +94,8 @@ public class ConcurrentBitmapSortedLongPairSet {
     }
 
 
-    public <T> Set<T> items(int numberOfItems, LongPairSet.LongPairFunction<T> longPairConverter) {
+    public <T extends Comparable<T>> NavigableSet<T> items(int numberOfItems,
+                                                           LongPairSet.LongPairFunction<T> longPairConverter) {
         NavigableSet<T> items = new TreeSet<>();
         lock.readLock().lock();
         try {