You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2022/12/07 07:30:27 UTC
[pulsar] branch branch-2.9 updated: Support dynamic update cache config (#13679)
This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 4caf3ef9f34 Support dynamic update cache config (#13679)
4caf3ef9f34 is described below
commit 4caf3ef9f34e58aee0e955a03a63cc6282f257b5
Author: LinChen <15...@qq.com>
AuthorDate: Tue Mar 15 00:59:53 2022 +0800
Support dynamic update cache config (#13679)
(cherry picked from commit b0213b225f194b47a5dcd63ef4a26f55c4c820b6)
---
.../bookkeeper/mledger/ManagedLedgerFactory.java | 18 ++++++++++++++++++
.../bookkeeper/mledger/impl/EntryCacheManager.java | 19 ++++++++++++++++---
.../mledger/impl/ManagedLedgerFactoryImpl.java | 13 ++++++++++++-
.../apache/pulsar/broker/ServiceConfiguration.java | 3 +++
.../pulsar/broker/service/BrokerService.java | 22 ++++++++++++++++++++++
.../apache/pulsar/broker/admin/AdminApiTest.java | 22 ++++++++++++++++++++++
6 files changed, 93 insertions(+), 4 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
index 682ce9008f1..a83994d1cd3 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
@@ -26,6 +26,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ManagedLedgerInfoCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenReadOnlyCursorCallback;
+import org.apache.bookkeeper.mledger.impl.EntryCacheManager;
/**
* A factory to open/create managed ledgers and delete them.
@@ -179,4 +180,21 @@ public interface ManagedLedgerFactory {
*/
CompletableFuture<Boolean> asyncExists(String ledgerName);
+ /**
+ * @return return EntryCacheManager.
+ */
+ EntryCacheManager getEntryCacheManager();
+
+ /**
+ * update cache evictionTimeThreshold.
+ *
+ * @param cacheEvictionTimeThresholdNanos time threshold for eviction.
+ */
+ void updateCacheEvictionTimeThreshold(long cacheEvictionTimeThresholdNanos);
+
+ /**
+ * @return time threshold for eviction.
+ * */
+ long getCacheEvictionTimeThreshold();
+
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
index 9a1d6317e4d..0165fc5b197 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
@@ -46,9 +46,9 @@ import org.slf4j.LoggerFactory;
@SuppressWarnings("checkstyle:javadoctype")
public class EntryCacheManager {
- private final long maxSize;
- private final long evictionTriggerThreshold;
- private final double cacheEvictionWatermark;
+ private volatile long maxSize;
+ private volatile long evictionTriggerThreshold;
+ private volatile double cacheEvictionWatermark;
private final AtomicLong currentSize = new AtomicLong(0);
private final ConcurrentMap<String, EntryCache> caches = Maps.newConcurrentMap();
private final EntryCacheEvictionPolicy evictionPolicy;
@@ -89,6 +89,15 @@ public class EntryCacheManager {
}
}
+ public void updateCacheSizeAndThreshold(long maxSize) {
+ this.maxSize = maxSize;
+ this.evictionTriggerThreshold = (long) (maxSize * evictionTriggerThresholdPercent);
+ }
+
+ public void updateCacheEvictionWatermark(double cacheEvictionWatermark) {
+ this.cacheEvictionWatermark = cacheEvictionWatermark;
+ }
+
void removeEntryCache(String name) {
EntryCache entryCache = caches.remove(name);
if (entryCache == null) {
@@ -150,6 +159,10 @@ public class EntryCacheManager {
return maxSize;
}
+ public double getCacheEvictionWatermark() {
+ return cacheEvictionWatermark;
+ }
+
public void clear() {
caches.values().forEach(EntryCache::clear);
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index a66545fd3e5..c3e391ffab2 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -104,7 +104,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
private final ScheduledFuture<?> statsTask;
private final ScheduledFuture<?> flushCursorsTask;
- private final long cacheEvictionTimeThresholdNanos;
+ private volatile long cacheEvictionTimeThresholdNanos;
private final MetadataStore metadataStore;
//indicate whether shutdown() is called.
@@ -943,10 +943,21 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
return config;
}
+ @Override
public EntryCacheManager getEntryCacheManager() {
return entryCacheManager;
}
+ @Override
+ public void updateCacheEvictionTimeThreshold(long cacheEvictionTimeThresholdNanos){
+ this.cacheEvictionTimeThresholdNanos = cacheEvictionTimeThresholdNanos;
+ }
+
+ @Override
+ public long getCacheEvictionTimeThreshold(){
+ return cacheEvictionTimeThresholdNanos;
+ }
+
public ManagedLedgerFactoryMXBean getCacheStats() {
return this.mbean;
}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index f3fe8e86441..ee7ed9e5d7c 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1545,6 +1545,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
private int managedLedgerMaxAckQuorum = 5;
@FieldContext(
category = CATEGORY_STORAGE_ML,
+ dynamic = true,
doc = "Amount of memory to use for caching data payload in managed ledger. \n\nThis"
+ " memory is allocated from JVM direct memory and it's shared across all the topics"
+ " running in the same broker. By default, uses 1/5th of available direct memory")
@@ -1554,6 +1555,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
private boolean managedLedgerCacheCopyEntries = false;
@FieldContext(
category = CATEGORY_STORAGE_ML,
+ dynamic = true,
doc = "Threshold to which bring down the cache level when eviction is triggered"
)
private double managedLedgerCacheEvictionWatermark = 0.9f;
@@ -1561,6 +1563,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "Configure the cache eviction frequency for the managed ledger cache. Default is 100/s")
private double managedLedgerCacheEvictionFrequency = 100.0;
@FieldContext(category = CATEGORY_STORAGE_ML,
+ dynamic = true,
doc = "All entries that have stayed in cache for more than the configured time, will be evicted")
private long managedLedgerCacheEvictionTimeThresholdMillis = 1000;
@FieldContext(category = CATEGORY_STORAGE_ML,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 110eccbf0ab..39eeed7ae02 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2151,6 +2151,28 @@ public class BrokerService implements Closeable {
}
});
});
+
+ // add listener to notify broker managedLedgerCacheSizeMB dynamic config
+ registerConfigurationListener("managedLedgerCacheSizeMB", (managedLedgerCacheSizeMB) -> {
+ managedLedgerFactory.getEntryCacheManager()
+ .updateCacheSizeAndThreshold(((int) managedLedgerCacheSizeMB) * 1024L * 1024L);
+ });
+
+ // add listener to notify broker managedLedgerCacheEvictionWatermark dynamic config
+ registerConfigurationListener(
+ "managedLedgerCacheEvictionWatermark", (cacheEvictionWatermark) -> {
+ managedLedgerFactory.getEntryCacheManager()
+ .updateCacheEvictionWatermark((double) cacheEvictionWatermark);
+ });
+
+ // add listener to notify broker managedLedgerCacheEvictionTimeThresholdMillis dynamic config
+ registerConfigurationListener(
+ "managedLedgerCacheEvictionTimeThresholdMillis", (cacheEvictionTimeThresholdMills) -> {
+ managedLedgerFactory.updateCacheEvictionTimeThreshold(TimeUnit.MILLISECONDS
+ .toNanos((long) cacheEvictionTimeThresholdMills));
+ });
+
+
// add listener to update message-dispatch-rate in msg for topic
registerConfigurationListener("dispatchThrottlingRatePerTopicInMsg", (dispatchRatePerTopicInMsg) -> {
updateTopicMessageDispatchRate();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index f768bf60d9d..7ea016dffae 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -495,6 +495,28 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
assertEquals(admin.clusters().getClusters(), Lists.newArrayList());
}
+ @Test
+ public void testUpdateDynamicCacheConfigurationWithZkWatch() throws Exception {
+ // update configuration
+ admin.brokers().updateDynamicConfiguration("managedLedgerCacheSizeMB", "1");
+ admin.brokers().updateDynamicConfiguration("managedLedgerCacheEvictionWatermark", "0.8");
+ admin.brokers().updateDynamicConfiguration("managedLedgerCacheEvictionTimeThresholdMillis", "2000");
+
+ // wait config to be updated
+ Awaitility.await().until(() -> {
+ return pulsar.getManagedLedgerFactory().getEntryCacheManager().getMaxSize() == 1 * 1024L * 1024L
+ && pulsar.getManagedLedgerFactory().getEntryCacheManager().getCacheEvictionWatermark() == 0.8
+ && pulsar.getManagedLedgerFactory().getCacheEvictionTimeThreshold() == TimeUnit.MILLISECONDS
+ .toNanos(2000);
+ });
+
+ // verify value is updated
+ assertEquals(pulsar.getManagedLedgerFactory().getEntryCacheManager().getMaxSize(), 1 * 1024L * 1024L);
+ assertEquals(pulsar.getManagedLedgerFactory().getEntryCacheManager().getCacheEvictionWatermark(), 0.8);
+ assertEquals(pulsar.getManagedLedgerFactory().getCacheEvictionTimeThreshold(), TimeUnit.MILLISECONDS
+ .toNanos(2000));
+ }
+
/**
* <pre>
* Verifies: zk-update configuration updates service-config