You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/06/04 00:48:05 UTC
[pulsar] branch master updated: Expose new entries check delay in
the broker.conf (#7154)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b2329e4 Expose new entries check delay in the broker.conf (#7154)
b2329e4 is described below
commit b2329e4f34095c12c93c11628b03b70fa2996e29
Author: lipenghui <pe...@apache.org>
AuthorDate: Thu Jun 4 08:47:53 2020 +0800
Expose new entries check delay in the broker.conf (#7154)
Motivation
Expose new entries check delay in the broker.conf
---
conf/broker.conf | 6 ++++++
conf/standalone.conf | 6 ++++++
.../java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java | 9 +++++++++
.../org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java | 2 +-
.../main/java/org/apache/pulsar/broker/ServiceConfiguration.java | 7 +++++++
.../java/org/apache/pulsar/broker/service/BrokerService.java | 1 +
6 files changed, 30 insertions(+), 1 deletion(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 8d50aeb..601a44f 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -784,6 +784,12 @@ managedLedgerReadEntryTimeoutSeconds=0
# Add entry timeout when broker tries to publish message to bookkeeper (0 to disable it).
managedLedgerAddEntryTimeoutSeconds=0
+# New entries check delay for the cursor under the managed ledger.
+# If no new messages in the topic, the cursor will try to check again after the delay time.
+# For consumption latency sensitive scenario, can set to a smaller value or set to 0.
+# Of course, use a smaller value may degrade consumption throughput. Default is 10ms.
+managedLedgerNewEntriesCheckDelayInMillis=10
+
### --- Load balancer --- ###
# Enable load balancer
diff --git a/conf/standalone.conf b/conf/standalone.conf
index d60bc01..24ff949 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -567,6 +567,12 @@ managedLedgerReadEntryTimeoutSeconds=0
# Add entry timeout when broker tries to publish message to bookkeeper (0 to disable it).
managedLedgerAddEntryTimeoutSeconds=0
+# New entries check delay for the cursor under the managed ledger.
+# If no new messages in the topic, the cursor will try to check again after the delay time.
+# For consumption latency sensitive scenario, can set to a smaller value or set to 0.
+# Of course, use a smaller value may degrade consumption throughput. Default is 10ms.
+managedLedgerNewEntriesCheckDelayInMillis=10
+
# Use Open Range-Set to cache unacked messages
managedLedgerUnackedRangesOpenCacheSetEnabled=true
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
index dde67c2..2214346 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
@@ -70,6 +70,7 @@ public class ManagedLedgerConfig {
private Class<? extends EnsemblePlacementPolicy> bookKeeperEnsemblePlacementPolicyClassName;
private Map<String, Object> bookKeeperEnsemblePlacementPolicyProperties;
private LedgerOffloader ledgerOffloader = NullLedgerOffloader.INSTANCE;
+ private int newEntriesCheckDelayInMillis = 10;
private Clock clock = Clock.systemUTC();
public boolean isCreateIfMissing() {
@@ -602,4 +603,12 @@ public class ManagedLedgerConfig {
public void setDeletionAtBatchIndexLevelEnabled(boolean deletionAtBatchIndexLevelEnabled) {
this.deletionAtBatchIndexLevelEnabled = deletionAtBatchIndexLevelEnabled;
}
+
+ public int getNewEntriesCheckDelayInMillis() {
+ return newEntriesCheckDelayInMillis;
+ }
+
+ public void setNewEntriesCheckDelayInMillis(int newEntriesCheckDelayInMillis) {
+ this.newEntriesCheckDelayInMillis = newEntriesCheckDelayInMillis;
+ }
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 8a3d31f..2105c0d 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -722,7 +722,7 @@ public class ManagedCursorImpl implements ManagedCursor {
// If the managed ledger was indeed terminated, we need to notify the cursor
callback.readEntriesFailed(new NoMoreEntriesToReadException("Topic was terminated"), ctx);
}
- }), 10, TimeUnit.MILLISECONDS);
+ }), config.getNewEntriesCheckDelayInMillis(), TimeUnit.MILLISECONDS);
}
}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index f90d9d9..729733b 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1224,6 +1224,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "Add entry timeout when broker tries to publish message to bookkeeper.(0 to disable it)")
private long managedLedgerAddEntryTimeoutSeconds = 0;
+ @FieldContext(category = CATEGORY_STORAGE_ML,
+ doc = "New entries check delay for the cursor under the managed ledger. \n"
+ + "If no new messages in the topic, the cursor will try to check again after the delay time. \n"
+ + "For consumption latency sensitive scenario, can set to a smaller value or set to 0.\n"
+ + "Of course, this may degrade consumption throughput. Default is 10ms.")
+ private int managedLedgerNewEntriesCheckDelayInMillis = 10;
+
/*** --- Load balancer --- ****/
@FieldContext(
category = CATEGORY_LOAD_BALANCER,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 3e9cf2e..54facc4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1116,6 +1116,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
managedLedgerConfig.setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, offloadPolicies));
managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled());
+ managedLedgerConfig.setNewEntriesCheckDelayInMillis(serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis());
future.complete(managedLedgerConfig);
}, (exception) -> future.completeExceptionally(exception)));