You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/07/29 03:18:19 UTC

[pulsar] branch master updated: [improve][broker] refactor ManagedLedger cacheEvictionTask implement (#14488)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3619edc624a [improve][broker] refactor ManagedLedger cacheEvictionTask implement (#14488)
3619edc624a is described below

commit 3619edc624ac223637f3e2b3fff2674ad9c5e0b6
Author: AloysZhang <lo...@gmail.com>
AuthorDate: Fri Jul 29 11:18:12 2022 +0800

    [improve][broker] refactor ManagedLedger cacheEvictionTask implement (#14488)
---
 conf/broker.conf                                   |  9 ++++---
 deployment/terraform-ansible/templates/broker.conf |  7 +++--
 .../mledger/ManagedLedgerFactoryConfig.java        |  4 +--
 .../mledger/impl/ManagedLedgerFactoryImpl.java     | 31 +++++-----------------
 .../mledger/impl/EntryCacheManagerTest.java        |  4 +--
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java |  2 +-
 .../apache/pulsar/broker/ServiceConfiguration.java | 22 +++++++++++++--
 .../pulsar/broker/ManagedLedgerClientFactory.java  |  2 +-
 .../pulsar/client/api/ConsumerRedeliveryTest.java  |  2 +-
 .../pulsar/client/api/PartitionCreationTest.java   |  2 +-
 10 files changed, 46 insertions(+), 39 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 7f07ae54449..41809f87116 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -1014,8 +1014,8 @@ managedLedgerCacheCopyEntries=false
 # Threshold to which bring down the cache level when eviction is triggered
 managedLedgerCacheEvictionWatermark=0.9
 
-# Configure the cache eviction frequency for the managed ledger cache (evictions/sec)
-managedLedgerCacheEvictionFrequency=100.0
+# Configure the cache eviction interval in milliseconds for the managed ledger cache
+managedLedgerCacheEvictionIntervalMs=10
 
 # All entries that have stayed in cache for more than the configured time, will be evicted
 managedLedgerCacheEvictionTimeThresholdMillis=1000
@@ -1572,4 +1572,7 @@ managedLedgerMaxUnackedRangesToPersistInZooKeeper=-1
 
 # If enabled, the maximum "acknowledgment holes" will not be limited and "acknowledgment holes" are stored in
 # multiple entries.
-persistentUnackedRangesWithMultipleEntriesEnabled=false
\ No newline at end of file
+persistentUnackedRangesWithMultipleEntriesEnabled=false
+
+# Deprecated - Use managedLedgerCacheEvictionIntervalMs instead
+managedLedgerCacheEvictionFrequency=0
\ No newline at end of file
diff --git a/deployment/terraform-ansible/templates/broker.conf b/deployment/terraform-ansible/templates/broker.conf
index fb4456f0b1d..1ed98cd4e02 100644
--- a/deployment/terraform-ansible/templates/broker.conf
+++ b/deployment/terraform-ansible/templates/broker.conf
@@ -772,8 +772,8 @@ managedLedgerCacheCopyEntries=false
 # Threshold to which bring down the cache level when eviction is triggered
 managedLedgerCacheEvictionWatermark=0.9
 
-# Configure the cache eviction frequency for the managed ledger cache (evictions/sec)
-managedLedgerCacheEvictionFrequency=100.0
+# Configure the cache eviction interval in milliseconds for the managed ledger cache
+managedLedgerCacheEvictionIntervalMs=10
 
 # All entries that have stayed in cache for more than the configured time, will be evicted
 managedLedgerCacheEvictionTimeThresholdMillis=1000
@@ -1137,6 +1137,9 @@ brokerServicePurgeInactiveFrequencyInSeconds=60
 # Deprecated: use managedLedgerMaxUnackedRangesToPersistInMetadataStore
 managedLedgerMaxUnackedRangesToPersistInZooKeeper=-1
 
+# Deprecated - Use managedLedgerCacheEvictionIntervalMs instead
+managedLedgerCacheEvictionFrequency=0
+
 ### --- Transaction config variables --- ###
 
 # Enable transaction coordinator in broker
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
index 25fcb377e3e..78314be45c3 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
@@ -43,9 +43,9 @@ public class ManagedLedgerFactoryConfig {
     private int numManagedLedgerSchedulerThreads = Runtime.getRuntime().availableProcessors();
 
     /**
-     * Frequency of cache eviction triggering. Default is 100 times per second.
+     * Interval of cache eviction triggering. Default is 10 ms times.
      */
-    private double cacheEvictionFrequency = 100;
+    private long cacheEvictionIntervalMs = 10;
 
     /**
      * All entries that have stayed in cache for more than the configured time, will be evicted.
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 8e3271a0393..629e96ba3e3 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -34,8 +34,8 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
@@ -80,6 +80,7 @@ import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
 import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.Runnables;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.Stat;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
@@ -94,8 +95,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
     private final ManagedLedgerFactoryConfig config;
     @Getter
     protected final OrderedScheduler scheduledExecutor;
-
-    private final ExecutorService cacheEvictionExecutor;
+    private final ScheduledExecutorService cacheEvictionExecutor;
 
     @Getter
     protected final ManagedLedgerFactoryMBeanImpl mbean;
@@ -184,7 +184,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
                 .name("bookkeeper-ml-scheduler")
                 .build();
         cacheEvictionExecutor = Executors
-                .newSingleThreadExecutor(new DefaultThreadFactory("bookkeeper-ml-cache-eviction"));
+                .newSingleThreadScheduledExecutor(new DefaultThreadFactory("bookkeeper-ml-cache-eviction"));
         this.metadataServiceAvailable = true;
         this.bookkeeperFactory = bookKeeperGroupFactory;
         this.isBookkeeperManaged = isBookkeeperManaged;
@@ -203,8 +203,9 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
         this.cacheEvictionTimeThresholdNanos = TimeUnit.MILLISECONDS
                 .toNanos(config.getCacheEvictionTimeThresholdMillis());
 
-
-        cacheEvictionExecutor.execute(this::cacheEvictionTask);
+        long evictionTaskInterval = config.getCacheEvictionIntervalMs();
+        cacheEvictionExecutor.scheduleWithFixedDelay(Runnables.catchingAndLoggingThrowables(this::doCacheEviction),
+                evictionTaskInterval, evictionTaskInterval, TimeUnit.MILLISECONDS);
         closed = false;
 
         metadataStore.registerSessionListener(this::handleMetadataStoreNotification);
@@ -258,24 +259,6 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
         lastStatTimestamp = now;
     }
 
-    private void cacheEvictionTask() {
-        double evictionFrequency = Math.max(Math.min(config.getCacheEvictionFrequency(), 1000.0), 0.001);
-        long waitTimeMillis = (long) (1000 / evictionFrequency);
-
-        while (!closed) {
-            try {
-                doCacheEviction();
-
-                Thread.sleep(waitTimeMillis);
-            } catch (InterruptedException e) {
-                // Factory is shutting down
-                return;
-            } catch (Throwable t) {
-                log.warn("Exception while performing cache eviction: {}", t.getMessage(), t);
-            }
-        }
-    }
-
     private synchronized void doCacheEviction() {
         long maxTimestamp = System.nanoTime() - cacheEvictionTimeThresholdNanos;
 
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
index 55f58ecd11c..a1dcb4ea02f 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
@@ -260,7 +260,7 @@ public class EntryCacheManagerTest extends MockedBookKeeperTestCase {
         ManagedLedgerFactoryConfig config = new ManagedLedgerFactoryConfig();
         config.setMaxCacheSize(7 * 10);
         config.setCacheEvictionWatermark(0.8);
-        config.setCacheEvictionFrequency(1);
+        config.setCacheEvictionIntervalMs(1000);
 
         @Cleanup("shutdown")
         ManagedLedgerFactoryImpl factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc, config);
@@ -329,7 +329,7 @@ public class EntryCacheManagerTest extends MockedBookKeeperTestCase {
     public void verifyTimeBasedEviction() throws Exception {
         ManagedLedgerFactoryConfig config = new ManagedLedgerFactoryConfig();
         config.setMaxCacheSize(1000);
-        config.setCacheEvictionFrequency(100);
+        config.setCacheEvictionIntervalMs(10);
         config.setCacheEvictionTimeThresholdMillis(100);
 
         @Cleanup("shutdown")
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index c60644301fa..dff1176c86d 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -2668,7 +2668,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
     @Test
     public void testActiveDeactiveCursorWithDiscardEntriesFromCache() throws Exception {
         ManagedLedgerFactoryConfig conf = new ManagedLedgerFactoryConfig();
-        conf.setCacheEvictionFrequency(0.1);
+        conf.setCacheEvictionIntervalMs(10000);
 
         @Cleanup("shutdown")
         ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, conf);
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 2dfa122d83d..178e047a32d 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -105,6 +105,10 @@ public class ServiceConfiguration implements PulsarConfiguration {
     @Category
     private static final String CATEGORY_PLUGIN = "Broker Plugin";
 
+    private static final double MIN_ML_CACHE_EVICTION_FREQUENCY = 0.001;
+    private static final double MAX_ML_CACHE_EVICTION_FREQUENCY = 1000.0;
+    private static final long MAX_ML_CACHE_EVICTION_INTERVAL_MS = 1000000L;
+
     /***** --- pulsar configuration. --- ****/
     @FieldContext(
         category = CATEGORY_SERVER,
@@ -1807,8 +1811,14 @@ public class ServiceConfiguration implements PulsarConfiguration {
     )
     private double managedLedgerCacheEvictionWatermark = 0.9;
     @FieldContext(category = CATEGORY_STORAGE_ML,
-            doc = "Configure the cache eviction frequency for the managed ledger cache. Default is 100/s")
-    private double managedLedgerCacheEvictionFrequency = 100.0;
+            doc = "Configure the cache eviction frequency for the managed ledger cache.")
+    @Deprecated
+    private double managedLedgerCacheEvictionFrequency = 0;
+
+    @FieldContext(category = CATEGORY_STORAGE_ML,
+            doc = "Configure the cache eviction interval in milliseconds for the managed ledger cache, default is 10ms")
+    private long managedLedgerCacheEvictionIntervalMs = 10;
+
     @FieldContext(category = CATEGORY_STORAGE_ML,
             dynamic = true,
             doc = "All entries that have stayed in cache for more than the configured time, will be evicted")
@@ -3018,4 +3028,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
     public int getMetadataStoreCacheExpirySeconds() {
         return zooKeeperCacheExpirySeconds > 0 ? zooKeeperCacheExpirySeconds : metadataStoreCacheExpirySeconds;
     }
+
+    public long getManagedLedgerCacheEvictionIntervalMs() {
+        return managedLedgerCacheEvictionFrequency > 0
+                ? (long) (1000 / Math.max(
+                        Math.min(managedLedgerCacheEvictionFrequency, MAX_ML_CACHE_EVICTION_FREQUENCY),
+                                   MIN_ML_CACHE_EVICTION_FREQUENCY))
+                : Math.min(MAX_ML_CACHE_EVICTION_INTERVAL_MS, managedLedgerCacheEvictionIntervalMs);
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
index bb7cb6ffd8d..234e11bee64 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
@@ -59,7 +59,7 @@ public class ManagedLedgerClientFactory implements ManagedLedgerStorage {
         managedLedgerFactoryConfig.setMaxCacheSize(conf.getManagedLedgerCacheSizeMB() * 1024L * 1024L);
         managedLedgerFactoryConfig.setCacheEvictionWatermark(conf.getManagedLedgerCacheEvictionWatermark());
         managedLedgerFactoryConfig.setNumManagedLedgerSchedulerThreads(conf.getManagedLedgerNumSchedulerThreads());
-        managedLedgerFactoryConfig.setCacheEvictionFrequency(conf.getManagedLedgerCacheEvictionFrequency());
+        managedLedgerFactoryConfig.setCacheEvictionIntervalMs(conf.getManagedLedgerCacheEvictionIntervalMs());
         managedLedgerFactoryConfig.setCacheEvictionTimeThresholdMillis(
                 conf.getManagedLedgerCacheEvictionTimeThresholdMillis());
         managedLedgerFactoryConfig.setCopyEntriesInCache(conf.isManagedLedgerCacheCopyEntries());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
index 9b154690d7e..95e343acfef 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
@@ -53,7 +53,7 @@ public class ConsumerRedeliveryTest extends ProducerConsumerBase {
     @BeforeClass
     @Override
     protected void setup() throws Exception {
-        conf.setManagedLedgerCacheEvictionFrequency(0.1);
+        conf.setManagedLedgerCacheEvictionIntervalMs(10000);
         super.internalSetup();
         super.producerBaseSetup();
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java
index da9ece0b4ec..13f11c02612 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java
@@ -46,7 +46,7 @@ public class PartitionCreationTest extends ProducerConsumerBase {
     @BeforeClass
     @Override
     protected void setup() throws Exception {
-        conf.setManagedLedgerCacheEvictionFrequency(0.1);
+        conf.setManagedLedgerCacheEvictionIntervalMs(10000);
         super.internalSetup();
         super.producerBaseSetup();
     }