You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2023/05/09 06:17:42 UTC
[pulsar] 01/02: [improve][broker] Get lowest PositionImpl from NavigableSet (#18278)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 2fb93660aff68b60f0126840f57e9eca46807ed9
Author: WJL3333 <wj...@163.com>
AuthorDate: Wed Nov 2 22:32:42 2022 +0800
[improve][broker] Get lowest PositionImpl from NavigableSet (#18278)
* [cleanup] Direct get lowest PositionImpl from TreeMap
change signature from Set<T> to NavigableSet<T>
which makes the caller to get lowest PositionImpl more efficient.
* change poll to first when call `NavigableSet`
* fix check style remove unused import
Co-authored-by: wangjinlong <wa...@zhihu.com>
---
.../broker/delayed/DelayedDeliveryTracker.java | 4 ++--
.../delayed/InMemoryDelayedDeliveryTracker.java | 6 +++---
.../persistent/MessageRedeliveryController.java | 3 ++-
.../PersistentDispatcherMultipleConsumers.java | 20 ++++++++++++--------
...rsistentStickyKeyDispatcherMultipleConsumers.java | 10 ++++++----
.../utils/ConcurrentBitmapSortedLongPairSet.java | 4 ++--
6 files changed, 27 insertions(+), 20 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
index 35853d3599b..68943f6c398 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
@@ -19,7 +19,7 @@
package org.apache.pulsar.broker.delayed;
import com.google.common.annotations.Beta;
-import java.util.Set;
+import java.util.NavigableSet;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
/**
@@ -53,7 +53,7 @@ public interface DelayedDeliveryTracker extends AutoCloseable {
/**
* Get a set of position of messages that have already reached the delivery time.
*/
- Set<PositionImpl> getScheduledMessages(int maxMessages);
+ NavigableSet<PositionImpl> getScheduledMessages(int maxMessages);
/**
* Tells whether the dispatcher should pause any message deliveries, until the DelayedDeliveryTracker has
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
index 38cc3c4f6e0..a7f8e68bf2b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
@@ -22,7 +22,7 @@ import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
import java.time.Clock;
-import java.util.Set;
+import java.util.NavigableSet;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
@@ -146,9 +146,9 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T
* Get a set of position of messages that have already reached.
*/
@Override
- public Set<PositionImpl> getScheduledMessages(int maxMessages) {
+ public NavigableSet<PositionImpl> getScheduledMessages(int maxMessages) {
int n = maxMessages;
- Set<PositionImpl> positions = new TreeSet<>();
+ NavigableSet<PositionImpl> positions = new TreeSet<>();
long cutoffTime = getCutoffTime();
while (n > 0 && !priorityQueue.isEmpty()) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
index e6febc722de..934628b05a9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service.persistent;
import com.google.common.collect.ComparisonChain;
import java.util.ArrayList;
import java.util.List;
+import java.util.NavigableSet;
import java.util.Set;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -136,7 +137,7 @@ public class MessageRedeliveryController {
return false;
}
- public Set<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
+ public NavigableSet<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
return messagesToRedeliver.items(maxMessagesToRead, PositionImpl::new);
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 20c1173286e..6ee923ede0e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -27,6 +27,7 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.NavigableSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -278,7 +279,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
return;
}
- Set<PositionImpl> messagesToReplayNow = getMessagesToReplayNow(messagesToRead);
+ NavigableSet<PositionImpl> messagesToReplayNow = getMessagesToReplayNow(messagesToRead);
if (!messagesToReplayNow.isEmpty()) {
if (log.isDebugEnabled()) {
@@ -287,7 +288,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
}
havePendingReplayRead = true;
- minReplayedPosition = messagesToReplayNow.stream().min(PositionImpl::compareTo).orElse(null);
+ minReplayedPosition = messagesToReplayNow.first();
Set<? extends Position> deletedMessages = topic.isDelayedDeliveryEnabled()
? asyncReplayEntriesInOrder(messagesToReplayNow) : asyncReplayEntries(messagesToReplayNow);
// clear already acked positions from replay bucket
@@ -311,11 +312,14 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
consumerList.size());
}
havePendingRead = true;
- Set<PositionImpl> toReplay = getMessagesToReplayNow(1);
- minReplayedPosition = toReplay.stream().findFirst().orElse(null);
- if (minReplayedPosition != null) {
+ NavigableSet<PositionImpl> toReplay = getMessagesToReplayNow(1);
+ if (!toReplay.isEmpty()) {
+ minReplayedPosition = toReplay.first();
redeliveryMessages.add(minReplayedPosition.getLedgerId(), minReplayedPosition.getEntryId());
+ } else {
+ minReplayedPosition = null;
}
+
cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead, this,
ReadType.Normal, topic.getMaxReadPosition());
} else {
@@ -1043,17 +1047,17 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
}
}
- protected synchronized Set<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
+ protected synchronized NavigableSet<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
if (!redeliveryMessages.isEmpty()) {
return redeliveryMessages.getMessagesToReplayNow(maxMessagesToRead);
} else if (delayedDeliveryTracker.isPresent() && delayedDeliveryTracker.get().hasMessageAvailable()) {
delayedDeliveryTracker.get().resetTickTime(topic.getDelayedDeliveryTickTimeMillis());
- Set<PositionImpl> messagesAvailableNow =
+ NavigableSet<PositionImpl> messagesAvailableNow =
delayedDeliveryTracker.get().getScheduledMessages(maxMessagesToRead);
messagesAvailableNow.forEach(p -> redeliveryMessages.add(p.getLedgerId(), p.getEntryId()));
return messagesAvailableNow;
} else {
- return Collections.emptySet();
+ return Collections.emptyNavigableSet();
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 1df6366c389..1e52d985520 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -29,6 +29,7 @@ import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.NavigableSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -184,9 +185,10 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
// A corner case that we have to retry a readMoreEntries in order to preserver order delivery.
// This may happen when consumer closed. See issue #12885 for details.
if (!allowOutOfOrderDelivery) {
- Set<PositionImpl> messagesToReplayNow = this.getMessagesToReplayNow(1);
+ NavigableSet<PositionImpl> messagesToReplayNow = this.getMessagesToReplayNow(1);
if (messagesToReplayNow != null && !messagesToReplayNow.isEmpty()) {
- PositionImpl replayPosition = messagesToReplayNow.stream().findFirst().get();
+ PositionImpl replayPosition = messagesToReplayNow.first();
+
// We have received a message potentially from the delayed tracker and, since we're not using it
// right now, it needs to be added to the redelivery tracker or we won't attempt anymore to
// resend it (until we disconnect consumer).
@@ -458,13 +460,13 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
}
@Override
- protected synchronized Set<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
+ protected synchronized NavigableSet<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
if (isDispatcherStuckOnReplays) {
// If we're stuck on replay, we want to move forward reading on the topic (until the overall max-unacked
// messages kicks in), instead of keep replaying the same old messages, since the consumer that these
// messages are routing to might be busy at the moment
this.isDispatcherStuckOnReplays = false;
- return Collections.emptySet();
+ return Collections.emptyNavigableSet();
} else {
return super.getMessagesToReplayNow(maxMessagesToRead);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSet.java b/pulsar-broker/src/main/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSet.java
index c9f1c65daca..a33a78f1eae 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSet.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSet.java
@@ -22,7 +22,6 @@ import java.util.Iterator;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
-import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.locks.ReadWriteLock;
@@ -95,7 +94,8 @@ public class ConcurrentBitmapSortedLongPairSet {
}
- public <T> Set<T> items(int numberOfItems, LongPairSet.LongPairFunction<T> longPairConverter) {
+ public <T extends Comparable<T>> NavigableSet<T> items(int numberOfItems,
+ LongPairSet.LongPairFunction<T> longPairConverter) {
NavigableSet<T> items = new TreeSet<>();
lock.readLock().lock();
try {