You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/06/04 00:48:05 UTC

[pulsar] branch master updated: Expose new entries check delay in the broker.conf (#7154)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b2329e4  Expose new entries check delay in the broker.conf (#7154)
b2329e4 is described below

commit b2329e4f34095c12c93c11628b03b70fa2996e29
Author: lipenghui <pe...@apache.org>
AuthorDate: Thu Jun 4 08:47:53 2020 +0800

    Expose new entries check delay in the broker.conf (#7154)
    
    Motivation
    Expose new entries check delay in the broker.conf
---
 conf/broker.conf                                                 | 6 ++++++
 conf/standalone.conf                                             | 6 ++++++
 .../java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java  | 9 +++++++++
 .../org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java    | 2 +-
 .../main/java/org/apache/pulsar/broker/ServiceConfiguration.java | 7 +++++++
 .../java/org/apache/pulsar/broker/service/BrokerService.java     | 1 +
 6 files changed, 30 insertions(+), 1 deletion(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 8d50aeb..601a44f 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -784,6 +784,12 @@ managedLedgerReadEntryTimeoutSeconds=0
 # Add entry timeout when broker tries to publish message to bookkeeper (0 to disable it).
 managedLedgerAddEntryTimeoutSeconds=0
 
+# New entries check delay for the cursor under the managed ledger.
+# If no new messages in the topic, the cursor will try to check again after the delay time.
+# For consumption latency sensitive scenario, can set to a smaller value or set to 0.
+# Of course, use a smaller value may degrade consumption throughput. Default is 10ms.
+managedLedgerNewEntriesCheckDelayInMillis=10
+
 ### --- Load balancer --- ###
 
 # Enable load balancer
diff --git a/conf/standalone.conf b/conf/standalone.conf
index d60bc01..24ff949 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -567,6 +567,12 @@ managedLedgerReadEntryTimeoutSeconds=0
 # Add entry timeout when broker tries to publish message to bookkeeper (0 to disable it).
 managedLedgerAddEntryTimeoutSeconds=0
 
+# New entries check delay for the cursor under the managed ledger.
+# If no new messages in the topic, the cursor will try to check again after the delay time.
+# For consumption latency sensitive scenario, can set to a smaller value or set to 0.
+# Of course, use a smaller value may degrade consumption throughput. Default is 10ms.
+managedLedgerNewEntriesCheckDelayInMillis=10
+
 # Use Open Range-Set to cache unacked messages
 managedLedgerUnackedRangesOpenCacheSetEnabled=true
 
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
index dde67c2..2214346 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
@@ -70,6 +70,7 @@ public class ManagedLedgerConfig {
     private Class<? extends EnsemblePlacementPolicy>  bookKeeperEnsemblePlacementPolicyClassName;
     private Map<String, Object> bookKeeperEnsemblePlacementPolicyProperties;
     private LedgerOffloader ledgerOffloader = NullLedgerOffloader.INSTANCE;
+    private int newEntriesCheckDelayInMillis = 10;
     private Clock clock = Clock.systemUTC();
 
     public boolean isCreateIfMissing() {
@@ -602,4 +603,12 @@ public class ManagedLedgerConfig {
     public void setDeletionAtBatchIndexLevelEnabled(boolean deletionAtBatchIndexLevelEnabled) {
         this.deletionAtBatchIndexLevelEnabled = deletionAtBatchIndexLevelEnabled;
     }
+
+    public int getNewEntriesCheckDelayInMillis() {
+        return newEntriesCheckDelayInMillis;
+    }
+
+    public void setNewEntriesCheckDelayInMillis(int newEntriesCheckDelayInMillis) {
+        this.newEntriesCheckDelayInMillis = newEntriesCheckDelayInMillis;
+    }
 }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 8a3d31f..2105c0d 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -722,7 +722,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                     // If the managed ledger was indeed terminated, we need to notify the cursor
                     callback.readEntriesFailed(new NoMoreEntriesToReadException("Topic was terminated"), ctx);
                 }
-            }), 10, TimeUnit.MILLISECONDS);
+            }), config.getNewEntriesCheckDelayInMillis(), TimeUnit.MILLISECONDS);
         }
     }
 
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index f90d9d9..729733b 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1224,6 +1224,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
             doc = "Add entry timeout when broker tries to publish message to bookkeeper.(0 to disable it)")
     private long managedLedgerAddEntryTimeoutSeconds = 0;
 
+    @FieldContext(category = CATEGORY_STORAGE_ML,
+            doc = "New entries check delay for the cursor under the managed ledger. \n"
+                    + "If no new messages in the topic, the cursor will try to check again after the delay time. \n"
+                    + "For consumption latency sensitive scenario, can set to a smaller value or set to 0.\n"
+                    + "Of course, this may degrade consumption throughput. Default is 10ms.")
+    private int managedLedgerNewEntriesCheckDelayInMillis = 10;
+
     /*** --- Load balancer --- ****/
     @FieldContext(
         category = CATEGORY_LOAD_BALANCER,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 3e9cf2e..54facc4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1116,6 +1116,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
             managedLedgerConfig.setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, offloadPolicies));
 
             managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled());
+            managedLedgerConfig.setNewEntriesCheckDelayInMillis(serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis());
 
             future.complete(managedLedgerConfig);
         }, (exception) -> future.completeExceptionally(exception)));