You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/10/01 15:06:25 UTC
[pulsar] branch master updated: Allow to configure and disable the size of lookahead for detecting fixed delays in messages (#17907)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 55826742d1c Allow to configure and disable the size of lookahead for detecting fixed delays in messages (#17907)
55826742d1c is described below
commit 55826742d1c589d106d7cbe97f12ec2e8bcca35f
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sat Oct 1 08:06:13 2022 -0700
Allow to configure and disable the size of lookahead for detecting fixed delays in messages (#17907)
---
conf/broker.conf | 6 ++++
.../apache/pulsar/broker/ServiceConfiguration.java | 6 ++++
.../delayed/InMemoryDelayedDeliveryTracker.java | 18 ++++++++----
.../InMemoryDelayedDeliveryTrackerFactory.java | 5 +++-
.../delayed/InMemoryDeliveryTrackerTest.java | 34 +++++++++++++---------
5 files changed, 48 insertions(+), 21 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index d117d679c85..e6b3aef8811 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -576,6 +576,12 @@ delayedDeliveryMaxNumBuckets=50
# Enable share the delayed message index across subscriptions
delayedDeliverySharedIndexEnabled=false
+# Size of the lookahead window to use when detecting if all the messages in the topic
+# have a fixed delay.
+# Default is 50,000. Setting the lookahead window to 0 will disable the logic to handle
+# fixed delays in messages in a different way.
+delayedDeliveryFixedDelayDetectionLookahead=50000
+
# Whether to enable acknowledge of batch local index.
acknowledgmentAtBatchIndexLevelEnabled=false
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 8c883045e66..6683d36c36e 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -372,6 +372,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
@FieldContext(category = CATEGORY_SERVER, doc = "Enable share the delayed message index across subscriptions")
private boolean delayedDeliverySharedIndexEnabled = false;
+ @FieldContext(category = CATEGORY_SERVER, doc = "Size of the lookahead window to use "
+ + "when detecting if all the messages in the topic have a fixed delay. "
+ + "Default is 50,000. Setting the lookahead window to 0 will disable the "
+ + "logic to handle fixed delays in messages in a different way.")
+ private long delayedDeliveryFixedDelayDetectionLookahead = 50_000;
+
@FieldContext(category = CATEGORY_SERVER, doc = "Whether to enable the acknowledge of batch local index")
private boolean acknowledgmentAtBatchIndexLevelEnabled = false;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
index 83b113df36b..11d663322be 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
@@ -59,7 +59,7 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T
// always going to be in FIFO order, then we can avoid pulling all the messages in
// tracker. Instead, we use the lookahead for detection and pause the read from
// the cursor if the delays are fixed.
- public static final long DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES = 50_000;
+ private final long fixedDelayDetectionLookahead;
// This is the timestamp of the message with the highest delivery time
// If new added messages are lower than this, it means the delivery is requested
@@ -70,17 +70,22 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T
private boolean messagesHaveFixedDelay = true;
InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer, long tickTimeMillis,
- boolean isDelayedDeliveryDeliverAtTimeStrict) {
- this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict);
+ boolean isDelayedDeliveryDeliverAtTimeStrict,
+ long fixedDelayDetectionLookahead) {
+ this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict,
+ fixedDelayDetectionLookahead);
}
InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer,
- long tickTimeMillis, Clock clock, boolean isDelayedDeliveryDeliverAtTimeStrict) {
+ long tickTimeMillis, Clock clock,
+ boolean isDelayedDeliveryDeliverAtTimeStrict,
+ long fixedDelayDetectionLookahead) {
this.dispatcher = dispatcher;
this.timer = timer;
this.tickTimeMillis = tickTimeMillis;
this.clock = clock;
this.isDelayedDeliveryDeliverAtTimeStrict = isDelayedDeliveryDeliverAtTimeStrict;
+ this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
}
/**
@@ -283,8 +288,9 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T
@Override
public boolean shouldPauseAllDeliveries() {
// Pause deliveries if we know all delays are fixed within the lookahead window
- return messagesHaveFixedDelay
- && priorityQueue.size() >= DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES
+ return fixedDelayDetectionLookahead > 0
+ && messagesHaveFixedDelay
+ && priorityQueue.size() >= fixedDelayDetectionLookahead
&& !hasMessageAvailable();
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java
index 5c04a6d53b2..7bf0ca87c40 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java
@@ -33,18 +33,21 @@ public class InMemoryDelayedDeliveryTrackerFactory implements DelayedDeliveryTra
private boolean isDelayedDeliveryDeliverAtTimeStrict;
+ private long fixedDelayDetectionLookahead;
+
@Override
public void initialize(ServiceConfiguration config) {
this.timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-delayed-delivery"),
config.getDelayedDeliveryTickTimeMillis(), TimeUnit.MILLISECONDS);
this.tickTimeMillis = config.getDelayedDeliveryTickTimeMillis();
this.isDelayedDeliveryDeliverAtTimeStrict = config.isDelayedDeliveryDeliverAtTimeStrict();
+ this.fixedDelayDetectionLookahead = config.getDelayedDeliveryFixedDelayDetectionLookahead();
}
@Override
public DelayedDeliveryTracker newTracker(PersistentDispatcherMultipleConsumers dispatcher) {
return new InMemoryDelayedDeliveryTracker(dispatcher, timer, tickTimeMillis,
- isDelayedDeliveryDeliverAtTimeStrict);
+ isDelayedDeliveryDeliverAtTimeStrict, fixedDelayDetectionLookahead);
}
@Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
index db2db6cc1db..1ff47a4ca50 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
@@ -74,7 +74,7 @@ public class InMemoryDeliveryTrackerTest {
@Cleanup
InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
- false);
+ false, 0);
assertFalse(tracker.hasMessageAvailable());
@@ -146,7 +146,7 @@ public class InMemoryDeliveryTrackerTest {
@Cleanup
InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
- false);
+ false, 0);
assertTrue(tasks.isEmpty());
assertTrue(tracker.addMessage(2, 2, 20));
@@ -187,7 +187,7 @@ public class InMemoryDeliveryTrackerTest {
@Cleanup
InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 100, clock,
- false);
+ false, 0);
clockTime.set(0);
@@ -209,7 +209,7 @@ public class InMemoryDeliveryTrackerTest {
@Cleanup
InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 100, clock,
- true);
+ true, 0);
clockTime.set(10);
@@ -236,7 +236,7 @@ public class InMemoryDeliveryTrackerTest {
// Use a short tick time to show that the timer task is run based on the deliverAt time in this scenario.
@Cleanup
InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer,
- 1000, clock, true);
+ 1000, clock, true, 0);
// Set clock time, then run tracker to inherit clock time as the last tick time.
clockTime.set(10000);
@@ -274,7 +274,7 @@ public class InMemoryDeliveryTrackerTest {
// a previous tick run.
@Cleanup
InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer,
- 100000, clock, true);
+ 100000, clock, true, 0);
clockTime.set(500000);
@@ -299,7 +299,7 @@ public class InMemoryDeliveryTrackerTest {
// Use a short tick time to show that the timer task is run based on the deliverAt time in this scenario.
@Cleanup
InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer,
- 500, clock, true);
+ 500, clock, true, 0);
clockTime.set(0);
@@ -323,9 +323,11 @@ public class InMemoryDeliveryTrackerTest {
Clock clock = mock(Clock.class);
when(clock.millis()).then(x -> clockTime.get());
+ final long fixedDelayLookahead = 100;
+
@Cleanup
InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
- true);
+ true, fixedDelayLookahead);
assertFalse(tracker.hasMessageAvailable());
@@ -339,13 +341,13 @@ public class InMemoryDeliveryTrackerTest {
assertEquals(tracker.getNumberOfDelayedMessages(), 5);
assertFalse(tracker.shouldPauseAllDeliveries());
- for (int i = 6; i <= InMemoryDelayedDeliveryTracker.DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES; i++) {
+ for (int i = 6; i <= fixedDelayLookahead; i++) {
assertTrue(tracker.addMessage(i, i, i * 10));
}
assertTrue(tracker.shouldPauseAllDeliveries());
- clockTime.set(InMemoryDelayedDeliveryTracker.DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES * 10);
+ clockTime.set(fixedDelayLookahead * 10);
tracker.getScheduledMessages(100);
assertFalse(tracker.shouldPauseAllDeliveries());
@@ -367,9 +369,11 @@ public class InMemoryDeliveryTrackerTest {
Clock clock = mock(Clock.class);
when(clock.millis()).then(x -> clockTime.get());
+ long fixedDelayLookahead = 100;
+
@Cleanup
InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
- true);
+ true, fixedDelayLookahead);
assertFalse(tracker.hasMessageAvailable());
@@ -381,7 +385,7 @@ public class InMemoryDeliveryTrackerTest {
assertFalse(tracker.shouldPauseAllDeliveries());
- for (int i = 6; i <= InMemoryDelayedDeliveryTracker.DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES; i++) {
+ for (int i = 6; i <= fixedDelayLookahead; i++) {
assertTrue(tracker.addMessage(i, i, i * 10));
}
@@ -401,9 +405,11 @@ public class InMemoryDeliveryTrackerTest {
Clock clock = mock(Clock.class);
when(clock.millis()).then(x -> clockTime.get());
+ long fixedDelayLookahead = 100;
+
@Cleanup
InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
- true);
+ true, fixedDelayLookahead);
assertFalse(tracker.hasMessageAvailable());
@@ -415,7 +421,7 @@ public class InMemoryDeliveryTrackerTest {
assertFalse(tracker.shouldPauseAllDeliveries());
- for (int i = 6; i <= InMemoryDelayedDeliveryTracker.DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES; i++) {
+ for (int i = 6; i <= fixedDelayLookahead; i++) {
assertTrue(tracker.addMessage(i, i, i * 10));
}