You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/10/03 18:48:02 UTC

[pulsar] branch branch-2.11 updated: Allow to configure and disable the size of lookahead for detecting fixed delays in messages (#17907)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new c41e52b42ca Allow to configure and disable the size of lookahead for detecting fixed delays in messages (#17907)
c41e52b42ca is described below

commit c41e52b42ca6e1d89881e23474e795dd086400e3
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sat Oct 1 08:06:13 2022 -0700

    Allow to configure and disable the size of lookahead for detecting fixed delays in messages (#17907)
---
 conf/broker.conf                                   |  6 ++++
 .../apache/pulsar/broker/ServiceConfiguration.java |  6 ++++
 .../delayed/InMemoryDelayedDeliveryTracker.java    | 18 ++++++++----
 .../InMemoryDelayedDeliveryTrackerFactory.java     |  5 +++-
 .../delayed/InMemoryDeliveryTrackerTest.java       | 34 +++++++++++++---------
 5 files changed, 48 insertions(+), 21 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index d7d388e5075..c1c3fff56b0 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -545,6 +545,12 @@ delayedDeliveryTickTimeMillis=1000
 # delayedDeliveryTickTimeMillis.
 isDelayedDeliveryDeliverAtTimeStrict=false
 
+# Size of the lookahead window to use when detecting if all the messages in the topic
+# have a fixed delay.
+# Default is 50,000. Setting the lookahead window to 0 will disable the logic to handle
+# fixed delays in messages in a different way.
+delayedDeliveryFixedDelayDetectionLookahead=50000
+
 # Whether to enable acknowledge of batch local index.
 acknowledgmentAtBatchIndexLevelEnabled=false
 
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 0a54a2dc42e..297417c0a94 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -350,6 +350,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
             + "delayedDeliveryTickTimeMillis.")
     private boolean isDelayedDeliveryDeliverAtTimeStrict = false;
 
+    @FieldContext(category = CATEGORY_SERVER, doc = "Size of the lookahead window to use "
+            + "when detecting if all the messages in the topic have a fixed delay. "
+            + "Default is 50,000. Setting the lookahead window to 0 will disable the "
+            + "logic to handle fixed delays in messages in a different way.")
+    private long delayedDeliveryFixedDelayDetectionLookahead = 50_000;
+
     @FieldContext(category = CATEGORY_SERVER, doc = "Whether to enable the acknowledge of batch local index")
     private boolean acknowledgmentAtBatchIndexLevelEnabled = false;
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
index 390ce5d5071..bb3edef5074 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
@@ -59,7 +59,7 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T
     // always going to be in FIFO order, then we can avoid pulling all the messages in
     // tracker. Instead, we use the lookahead for detection and pause the read from
     // the cursor if the delays are fixed.
-    public static final long DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES = 50_000;
+    private final long fixedDelayDetectionLookahead;
 
     // This is the timestamp of the message with the highest delivery time
     // If new added messages are lower than this, it means the delivery is requested
@@ -70,17 +70,22 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T
     private boolean messagesHaveFixedDelay = true;
 
     InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer, long tickTimeMillis,
-                                   boolean isDelayedDeliveryDeliverAtTimeStrict) {
-        this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict);
+                                   boolean isDelayedDeliveryDeliverAtTimeStrict,
+                                   long fixedDelayDetectionLookahead) {
+        this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict,
+                fixedDelayDetectionLookahead);
     }
 
     InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer,
-                                   long tickTimeMillis, Clock clock, boolean isDelayedDeliveryDeliverAtTimeStrict) {
+                                   long tickTimeMillis, Clock clock,
+                                   boolean isDelayedDeliveryDeliverAtTimeStrict,
+                                   long fixedDelayDetectionLookahead) {
         this.dispatcher = dispatcher;
         this.timer = timer;
         this.tickTimeMillis = tickTimeMillis;
         this.clock = clock;
         this.isDelayedDeliveryDeliverAtTimeStrict = isDelayedDeliveryDeliverAtTimeStrict;
+        this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
     }
 
     /**
@@ -282,8 +287,9 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T
     @Override
     public boolean shouldPauseAllDeliveries() {
         // Pause deliveries if we know all delays are fixed within the lookahead window
-        return messagesHaveFixedDelay
-                && priorityQueue.size() >= DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES
+        return fixedDelayDetectionLookahead > 0
+                && messagesHaveFixedDelay
+                && priorityQueue.size() >= fixedDelayDetectionLookahead
                 && !hasMessageAvailable();
     }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java
index 5c04a6d53b2..7bf0ca87c40 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java
@@ -33,18 +33,21 @@ public class InMemoryDelayedDeliveryTrackerFactory implements DelayedDeliveryTra
 
     private boolean isDelayedDeliveryDeliverAtTimeStrict;
 
+    private long fixedDelayDetectionLookahead;
+
     @Override
     public void initialize(ServiceConfiguration config) {
         this.timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-delayed-delivery"),
                 config.getDelayedDeliveryTickTimeMillis(), TimeUnit.MILLISECONDS);
         this.tickTimeMillis = config.getDelayedDeliveryTickTimeMillis();
         this.isDelayedDeliveryDeliverAtTimeStrict = config.isDelayedDeliveryDeliverAtTimeStrict();
+        this.fixedDelayDetectionLookahead = config.getDelayedDeliveryFixedDelayDetectionLookahead();
     }
 
     @Override
     public DelayedDeliveryTracker newTracker(PersistentDispatcherMultipleConsumers dispatcher) {
         return new InMemoryDelayedDeliveryTracker(dispatcher, timer, tickTimeMillis,
-                isDelayedDeliveryDeliverAtTimeStrict);
+                isDelayedDeliveryDeliverAtTimeStrict, fixedDelayDetectionLookahead);
     }
 
     @Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
index db2db6cc1db..1ff47a4ca50 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
@@ -74,7 +74,7 @@ public class InMemoryDeliveryTrackerTest {
 
         @Cleanup
         InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
-                false);
+                false, 0);
 
         assertFalse(tracker.hasMessageAvailable());
 
@@ -146,7 +146,7 @@ public class InMemoryDeliveryTrackerTest {
 
         @Cleanup
         InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
-                false);
+                false, 0);
 
         assertTrue(tasks.isEmpty());
         assertTrue(tracker.addMessage(2, 2, 20));
@@ -187,7 +187,7 @@ public class InMemoryDeliveryTrackerTest {
 
         @Cleanup
         InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 100, clock,
-                false);
+                false, 0);
 
         clockTime.set(0);
 
@@ -209,7 +209,7 @@ public class InMemoryDeliveryTrackerTest {
 
         @Cleanup
         InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 100, clock,
-                true);
+                true, 0);
 
         clockTime.set(10);
 
@@ -236,7 +236,7 @@ public class InMemoryDeliveryTrackerTest {
         // Use a short tick time to show that the timer task is run based on the deliverAt time in this scenario.
         @Cleanup
         InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer,
-                1000, clock, true);
+                1000, clock, true, 0);
 
         // Set clock time, then run tracker to inherit clock time as the last tick time.
         clockTime.set(10000);
@@ -274,7 +274,7 @@ public class InMemoryDeliveryTrackerTest {
         // a previous tick run.
         @Cleanup
         InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer,
-                100000, clock, true);
+                100000, clock, true, 0);
 
         clockTime.set(500000);
 
@@ -299,7 +299,7 @@ public class InMemoryDeliveryTrackerTest {
         // Use a short tick time to show that the timer task is run based on the deliverAt time in this scenario.
         @Cleanup
         InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer,
-                500, clock, true);
+                500, clock, true, 0);
 
         clockTime.set(0);
 
@@ -323,9 +323,11 @@ public class InMemoryDeliveryTrackerTest {
         Clock clock = mock(Clock.class);
         when(clock.millis()).then(x -> clockTime.get());
 
+        final long fixedDelayLookahead = 100;
+
         @Cleanup
         InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
-                true);
+                true, fixedDelayLookahead);
 
         assertFalse(tracker.hasMessageAvailable());
 
@@ -339,13 +341,13 @@ public class InMemoryDeliveryTrackerTest {
         assertEquals(tracker.getNumberOfDelayedMessages(), 5);
         assertFalse(tracker.shouldPauseAllDeliveries());
 
-        for (int i = 6; i <= InMemoryDelayedDeliveryTracker.DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES; i++) {
+        for (int i = 6; i <= fixedDelayLookahead; i++) {
             assertTrue(tracker.addMessage(i, i, i * 10));
         }
 
         assertTrue(tracker.shouldPauseAllDeliveries());
 
-        clockTime.set(InMemoryDelayedDeliveryTracker.DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES * 10);
+        clockTime.set(fixedDelayLookahead * 10);
 
         tracker.getScheduledMessages(100);
         assertFalse(tracker.shouldPauseAllDeliveries());
@@ -367,9 +369,11 @@ public class InMemoryDeliveryTrackerTest {
         Clock clock = mock(Clock.class);
         when(clock.millis()).then(x -> clockTime.get());
 
+        long fixedDelayLookahead = 100;
+
         @Cleanup
         InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
-                true);
+                true, fixedDelayLookahead);
 
         assertFalse(tracker.hasMessageAvailable());
 
@@ -381,7 +385,7 @@ public class InMemoryDeliveryTrackerTest {
 
         assertFalse(tracker.shouldPauseAllDeliveries());
 
-        for (int i = 6; i <= InMemoryDelayedDeliveryTracker.DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES; i++) {
+        for (int i = 6; i <= fixedDelayLookahead; i++) {
             assertTrue(tracker.addMessage(i, i, i * 10));
         }
 
@@ -401,9 +405,11 @@ public class InMemoryDeliveryTrackerTest {
         Clock clock = mock(Clock.class);
         when(clock.millis()).then(x -> clockTime.get());
 
+        long fixedDelayLookahead = 100;
+
         @Cleanup
         InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
-                true);
+                true, fixedDelayLookahead);
 
         assertFalse(tracker.hasMessageAvailable());
 
@@ -415,7 +421,7 @@ public class InMemoryDeliveryTrackerTest {
 
         assertFalse(tracker.shouldPauseAllDeliveries());
 
-        for (int i = 6; i <= InMemoryDelayedDeliveryTracker.DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES; i++) {
+        for (int i = 6; i <= fixedDelayLookahead; i++) {
             assertTrue(tracker.addMessage(i, i, i * 10));
         }