You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2022/12/07 07:30:27 UTC

[pulsar] branch branch-2.9 updated: Support dynamic update cache config (#13679)

This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 4caf3ef9f34 Support dynamic update cache config (#13679)
4caf3ef9f34 is described below

commit 4caf3ef9f34e58aee0e955a03a63cc6282f257b5
Author: LinChen <15...@qq.com>
AuthorDate: Tue Mar 15 00:59:53 2022 +0800

    Support dynamic update cache config (#13679)
    
    (cherry picked from commit b0213b225f194b47a5dcd63ef4a26f55c4c820b6)
---
 .../bookkeeper/mledger/ManagedLedgerFactory.java   | 18 ++++++++++++++++++
 .../bookkeeper/mledger/impl/EntryCacheManager.java | 19 ++++++++++++++++---
 .../mledger/impl/ManagedLedgerFactoryImpl.java     | 13 ++++++++++++-
 .../apache/pulsar/broker/ServiceConfiguration.java |  3 +++
 .../pulsar/broker/service/BrokerService.java       | 22 ++++++++++++++++++++++
 .../apache/pulsar/broker/admin/AdminApiTest.java   | 22 ++++++++++++++++++++++
 6 files changed, 93 insertions(+), 4 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
index 682ce9008f1..a83994d1cd3 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
@@ -26,6 +26,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ManagedLedgerInfoCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenReadOnlyCursorCallback;
+import org.apache.bookkeeper.mledger.impl.EntryCacheManager;
 
 /**
  * A factory to open/create managed ledgers and delete them.
@@ -179,4 +180,21 @@ public interface ManagedLedgerFactory {
      */
     CompletableFuture<Boolean> asyncExists(String ledgerName);
 
+    /**
+     * @return return EntryCacheManager.
+     */
+    EntryCacheManager getEntryCacheManager();
+
+    /**
+     * update cache evictionTimeThreshold.
+     *
+     * @param cacheEvictionTimeThresholdNanos time threshold for eviction.
+     */
+    void updateCacheEvictionTimeThreshold(long cacheEvictionTimeThresholdNanos);
+
+    /**
+     * @return time threshold for eviction.
+     * */
+    long getCacheEvictionTimeThreshold();
+
 }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
index 9a1d6317e4d..0165fc5b197 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
@@ -46,9 +46,9 @@ import org.slf4j.LoggerFactory;
 @SuppressWarnings("checkstyle:javadoctype")
 public class EntryCacheManager {
 
-    private final long maxSize;
-    private final long evictionTriggerThreshold;
-    private final double cacheEvictionWatermark;
+    private volatile long maxSize;
+    private volatile long evictionTriggerThreshold;
+    private volatile double cacheEvictionWatermark;
     private final AtomicLong currentSize = new AtomicLong(0);
     private final ConcurrentMap<String, EntryCache> caches = Maps.newConcurrentMap();
     private final EntryCacheEvictionPolicy evictionPolicy;
@@ -89,6 +89,15 @@ public class EntryCacheManager {
         }
     }
 
+    public void updateCacheSizeAndThreshold(long maxSize) {
+        this.maxSize = maxSize;
+        this.evictionTriggerThreshold = (long) (maxSize * evictionTriggerThresholdPercent);
+    }
+
+    public void updateCacheEvictionWatermark(double cacheEvictionWatermark) {
+        this.cacheEvictionWatermark = cacheEvictionWatermark;
+    }
+
     void removeEntryCache(String name) {
         EntryCache entryCache = caches.remove(name);
         if (entryCache == null) {
@@ -150,6 +159,10 @@ public class EntryCacheManager {
         return maxSize;
     }
 
+    public double getCacheEvictionWatermark() {
+        return cacheEvictionWatermark;
+    }
+
     public void clear() {
         caches.values().forEach(EntryCache::clear);
     }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index a66545fd3e5..c3e391ffab2 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -104,7 +104,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
     private final ScheduledFuture<?> statsTask;
     private final ScheduledFuture<?> flushCursorsTask;
 
-    private final long cacheEvictionTimeThresholdNanos;
+    private volatile long cacheEvictionTimeThresholdNanos;
     private final MetadataStore metadataStore;
 
     //indicate whether shutdown() is called.
@@ -943,10 +943,21 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
         return config;
     }
 
+    @Override
     public EntryCacheManager getEntryCacheManager() {
         return entryCacheManager;
     }
 
+    @Override
+    public void updateCacheEvictionTimeThreshold(long cacheEvictionTimeThresholdNanos){
+        this.cacheEvictionTimeThresholdNanos = cacheEvictionTimeThresholdNanos;
+    }
+
+    @Override
+    public long getCacheEvictionTimeThreshold(){
+        return cacheEvictionTimeThresholdNanos;
+    }
+
     public ManagedLedgerFactoryMXBean getCacheStats() {
         return this.mbean;
     }
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index f3fe8e86441..ee7ed9e5d7c 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1545,6 +1545,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
     private int managedLedgerMaxAckQuorum = 5;
     @FieldContext(
         category = CATEGORY_STORAGE_ML,
+        dynamic = true,
         doc = "Amount of memory to use for caching data payload in managed ledger. \n\nThis"
             + " memory is allocated from JVM direct memory and it's shared across all the topics"
             + " running in the same broker. By default, uses 1/5th of available direct memory")
@@ -1554,6 +1555,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
     private boolean managedLedgerCacheCopyEntries = false;
     @FieldContext(
         category = CATEGORY_STORAGE_ML,
+        dynamic = true,
         doc = "Threshold to which bring down the cache level when eviction is triggered"
     )
     private double managedLedgerCacheEvictionWatermark = 0.9f;
@@ -1561,6 +1563,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
             doc = "Configure the cache eviction frequency for the managed ledger cache. Default is 100/s")
     private double managedLedgerCacheEvictionFrequency = 100.0;
     @FieldContext(category = CATEGORY_STORAGE_ML,
+            dynamic = true,
             doc = "All entries that have stayed in cache for more than the configured time, will be evicted")
     private long managedLedgerCacheEvictionTimeThresholdMillis = 1000;
     @FieldContext(category = CATEGORY_STORAGE_ML,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 110eccbf0ab..39eeed7ae02 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2151,6 +2151,28 @@ public class BrokerService implements Closeable {
                 }
             });
         });
+
+        //  add listener to notify broker managedLedgerCacheSizeMB dynamic config
+        registerConfigurationListener("managedLedgerCacheSizeMB", (managedLedgerCacheSizeMB) -> {
+            managedLedgerFactory.getEntryCacheManager()
+                    .updateCacheSizeAndThreshold(((int) managedLedgerCacheSizeMB) * 1024L * 1024L);
+        });
+
+        //  add listener to notify broker managedLedgerCacheEvictionWatermark dynamic config
+        registerConfigurationListener(
+                "managedLedgerCacheEvictionWatermark", (cacheEvictionWatermark) -> {
+            managedLedgerFactory.getEntryCacheManager()
+                    .updateCacheEvictionWatermark((double) cacheEvictionWatermark);
+        });
+
+        //  add listener to notify broker managedLedgerCacheEvictionTimeThresholdMillis dynamic config
+        registerConfigurationListener(
+                "managedLedgerCacheEvictionTimeThresholdMillis", (cacheEvictionTimeThresholdMills) -> {
+            managedLedgerFactory.updateCacheEvictionTimeThreshold(TimeUnit.MILLISECONDS
+                    .toNanos((long) cacheEvictionTimeThresholdMills));
+        });
+
+
         // add listener to update message-dispatch-rate in msg for topic
         registerConfigurationListener("dispatchThrottlingRatePerTopicInMsg", (dispatchRatePerTopicInMsg) -> {
             updateTopicMessageDispatchRate();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index f768bf60d9d..7ea016dffae 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -495,6 +495,28 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
         assertEquals(admin.clusters().getClusters(), Lists.newArrayList());
     }
 
+    @Test
+    public void testUpdateDynamicCacheConfigurationWithZkWatch() throws Exception {
+        // update configuration
+        admin.brokers().updateDynamicConfiguration("managedLedgerCacheSizeMB", "1");
+        admin.brokers().updateDynamicConfiguration("managedLedgerCacheEvictionWatermark", "0.8");
+        admin.brokers().updateDynamicConfiguration("managedLedgerCacheEvictionTimeThresholdMillis", "2000");
+
+        // wait config to be updated
+        Awaitility.await().until(() -> {
+            return pulsar.getManagedLedgerFactory().getEntryCacheManager().getMaxSize() == 1 * 1024L * 1024L
+                    && pulsar.getManagedLedgerFactory().getEntryCacheManager().getCacheEvictionWatermark() == 0.8
+                    && pulsar.getManagedLedgerFactory().getCacheEvictionTimeThreshold() == TimeUnit.MILLISECONDS
+                    .toNanos(2000);
+        });
+
+        // verify value is updated
+        assertEquals(pulsar.getManagedLedgerFactory().getEntryCacheManager().getMaxSize(), 1 * 1024L * 1024L);
+        assertEquals(pulsar.getManagedLedgerFactory().getEntryCacheManager().getCacheEvictionWatermark(), 0.8);
+        assertEquals(pulsar.getManagedLedgerFactory().getCacheEvictionTimeThreshold(), TimeUnit.MILLISECONDS
+                .toNanos(2000));
+    }
+
     /**
      * <pre>
      * Verifies: zk-update configuration updates service-config