You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/20 11:57:06 UTC

[pulsar] 16/22: Reduce the readFailureBackoff time (#12444)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 56aaa70b2be15e37a1bff4fc794bd56e6abd7769
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Fri Oct 22 17:03:38 2021 +0800

    Reduce the readFailureBackoff time (#12444)
    
    ### Motivation
    When reading entries met exception, the dispatcher will backoff 15s
    to start next read. The 15s is unacceptable at most of case for the
    consumer. So it's better to reduce the interval.
    
    ### Modifications
    Make the read failure backoff start from 1s.
    
    (cherry picked from commit 99c90a548085d0dbefcc613eac729368933811c6)
---
 conf/broker.conf                                    |  9 +++++++++
 conf/standalone.conf                                |  9 +++++++++
 .../apache/pulsar/broker/ServiceConfiguration.java  | 21 +++++++++++++++++++++
 .../PersistentDispatcherSingleActiveConsumer.java   |  7 +++++--
 4 files changed, 44 insertions(+), 2 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 1fc578a..3dd8590 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -391,6 +391,15 @@ dispatcherMinReadBatchSize=1
 # Max number of entries to dispatch for a shared subscription. By default it is 20 entries.
 dispatcherMaxRoundRobinBatchSize=20
 
+# The read failure backoff initial time in milliseconds. By default it is 15s.
+dispatcherReadFailureBackoffInitialTimeInMs=15000
+
+# The read failure backoff max time in milliseconds. By default it is 60s.
+dispatcherReadFailureBackoffMaxTimeInMs=60000
+
+# The read failure backoff mandatory stop time in milliseconds. By default it is 0s.
+dispatcherReadFailureBackoffMandatoryStopTimeInMs=0
+
 # Precise dispathcer flow control according to history message number of each entry
 preciseDispatcherFlowControl=false
 
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 2e0273a2..878a852 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -243,6 +243,15 @@ dispatchThrottlingRateRelativeToPublishRate=false
 # backlog.
 dispatchThrottlingOnNonBacklogConsumerEnabled=true
 
+# The read failure backoff initial time in milliseconds. By default it is 15s.
+dispatcherReadFailureBackoffInitialTimeInMs=15000
+
+# The read failure backoff max time in milliseconds. By default it is 60s.
+dispatcherReadFailureBackoffMaxTimeInMs=60000
+
+# The read failure backoff mandatory stop time in milliseconds. By default it is 0s.
+dispatcherReadFailureBackoffMandatoryStopTimeInMs=0
+
 # Precise dispathcer flow control according to history message number of each entry
 preciseDispatcherFlowControl=false
 
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 4c7ee85..69a3d35 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -782,6 +782,27 @@ public class ServiceConfiguration implements PulsarConfiguration {
     @FieldContext(
         dynamic = true,
         category = CATEGORY_SERVER,
+        doc = "The read failure backoff initial time in milliseconds. By default it is 15s."
+    )
+    private int dispatcherReadFailureBackoffInitialTimeInMs = 15000;
+
+    @FieldContext(
+        dynamic = true,
+        category = CATEGORY_SERVER,
+        doc = "The read failure backoff max time in milliseconds. By default it is 60s."
+    )
+    private int dispatcherReadFailureBackoffMaxTimeInMs = 60000;
+
+    @FieldContext(
+        dynamic = true,
+        category = CATEGORY_SERVER,
+        doc = "The read failure backoff mandatory stop time in milliseconds. By default it is 0s."
+    )
+    private int dispatcherReadFailureBackoffMandatoryStopTimeInMs = 0;
+
+    @FieldContext(
+        dynamic = true,
+        category = CATEGORY_SERVER,
         doc = "Max number of entries to dispatch for a shared subscription. By default it is 20 entries."
     )
     private int dispatcherMaxRoundRobinBatchSize = 20;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index fa7ac03..653c1c1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -65,8 +65,7 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
     protected volatile boolean havePendingRead = false;
 
     protected volatile int readBatchSize;
-    protected final Backoff readFailureBackoff = new Backoff(15, TimeUnit.SECONDS,
-            1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
+    protected final Backoff readFailureBackoff;
     private volatile ScheduledFuture<?> readOnActiveConsumerTask = null;
 
     private final RedeliveryTracker redeliveryTracker;
@@ -80,6 +79,10 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
                 : ""/* NonDurableCursor doesn't have name */);
         this.cursor = cursor;
         this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
+        this.readFailureBackoff = new Backoff(serviceConfig.getDispatcherReadFailureBackoffInitialTimeInMs(),
+            TimeUnit.MILLISECONDS, serviceConfig.getDispatcherReadFailureBackoffMaxTimeInMs(),
+            TimeUnit.MILLISECONDS, serviceConfig.getDispatcherReadFailureBackoffMandatoryStopTimeInMs(),
+            TimeUnit.MILLISECONDS);
         this.redeliveryTracker = RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
         this.initializeDispatchRateLimiterIfNeeded(Optional.empty());
     }