You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2022/12/07 09:41:37 UTC
[pulsar] branch branch-2.9 updated: [improve][broker] refactor ManagedLedger cacheEvictionTask implement (#14488)
This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 06b4d584fbe [improve][broker] refactor ManagedLedger cacheEvictionTask implement (#14488)
06b4d584fbe is described below
commit 06b4d584fbedf83a0241914839d5fe94000a74f3
Author: AloysZhang <lo...@gmail.com>
AuthorDate: Fri Jul 29 11:18:12 2022 +0800
[improve][broker] refactor ManagedLedger cacheEvictionTask implement (#14488)
(cherry picked from commit 3619edc624ac223637f3e2b3fff2674ad9c5e0b6)
---
conf/broker.conf | 7 +++--
deployment/terraform-ansible/templates/broker.conf | 6 +++--
.../mledger/ManagedLedgerFactoryConfig.java | 4 +--
.../mledger/impl/ManagedLedgerFactoryImpl.java | 31 +++++-----------------
.../mledger/impl/EntryCacheManagerTest.java | 4 +--
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 2 +-
.../apache/pulsar/broker/ServiceConfiguration.java | 24 ++++++++++++++---
.../pulsar/broker/ManagedLedgerClientFactory.java | 2 +-
.../pulsar/client/api/ConsumerRedeliveryTest.java | 2 +-
.../pulsar/client/api/PartitionCreationTest.java | 2 +-
10 files changed, 45 insertions(+), 39 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 67adb4bdc9d..329fd399f36 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -957,8 +957,8 @@ managedLedgerCacheCopyEntries=false
# Threshold to which bring down the cache level when eviction is triggered
managedLedgerCacheEvictionWatermark=0.9
-# Configure the cache eviction frequency for the managed ledger cache (evictions/sec)
-managedLedgerCacheEvictionFrequency=100.0
+# Configure the cache eviction interval in milliseconds for the managed ledger cache
+managedLedgerCacheEvictionIntervalMs=10
# All entries that have stayed in cache for more than the configured time, will be evicted
managedLedgerCacheEvictionTimeThresholdMillis=1000
@@ -1386,3 +1386,6 @@ packagesManagementLedgerRootPath=/ledgers
# This config applies to managed ledger bookkeeper clients, as well.
### --- Packages management service configuration variables (end) --- ###
+
+# Deprecated - Use managedLedgerCacheEvictionIntervalMs instead
+managedLedgerCacheEvictionFrequency=0
diff --git a/deployment/terraform-ansible/templates/broker.conf b/deployment/terraform-ansible/templates/broker.conf
index 39adc6b0a20..dd14553013d 100644
--- a/deployment/terraform-ansible/templates/broker.conf
+++ b/deployment/terraform-ansible/templates/broker.conf
@@ -776,8 +776,8 @@ managedLedgerCacheCopyEntries=false
# Threshold to which bring down the cache level when eviction is triggered
managedLedgerCacheEvictionWatermark=0.9
-# Configure the cache eviction frequency for the managed ledger cache (evictions/sec)
-managedLedgerCacheEvictionFrequency=100.0
+# Configure the cache eviction interval in milliseconds for the managed ledger cache
+managedLedgerCacheEvictionIntervalMs=10
# All entries that have stayed in cache for more than the configured time, will be evicted
managedLedgerCacheEvictionTimeThresholdMillis=1000
@@ -1132,6 +1132,8 @@ replicationTlsEnabled=false
# Deprecated. Use brokerDeleteInactiveTopicsFrequencySeconds
brokerServicePurgeInactiveFrequencyInSeconds=60
+# Deprecated - Use managedLedgerCacheEvictionIntervalMs instead
+managedLedgerCacheEvictionFrequency=0
### --- Transaction config variables --- ###
# Enable transaction coordinator in broker
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
index 25fcb377e3e..78314be45c3 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
@@ -43,9 +43,9 @@ public class ManagedLedgerFactoryConfig {
private int numManagedLedgerSchedulerThreads = Runtime.getRuntime().availableProcessors();
/**
- * Frequency of cache eviction triggering. Default is 100 times per second.
+ * Interval of cache eviction triggering. Default is 10 ms times.
*/
- private double cacheEvictionFrequency = 100;
+ private long cacheEvictionIntervalMs = 10;
/**
* All entries that have stayed in cache for more than the configured time, will be evicted.
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index dce1789cd6c..6b323145653 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -33,8 +33,8 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
@@ -79,6 +79,7 @@ import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.Runnables;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
@@ -93,8 +94,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
private final ManagedLedgerFactoryConfig config;
@Getter
protected final OrderedScheduler scheduledExecutor;
-
- private final ExecutorService cacheEvictionExecutor;
+ private final ScheduledExecutorService cacheEvictionExecutor;
@Getter
protected final ManagedLedgerFactoryMBeanImpl mbean;
@@ -183,7 +183,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
.name("bookkeeper-ml-scheduler")
.build();
cacheEvictionExecutor = Executors
- .newSingleThreadExecutor(new DefaultThreadFactory("bookkeeper-ml-cache-eviction"));
+ .newSingleThreadScheduledExecutor(new DefaultThreadFactory("bookkeeper-ml-cache-eviction"));
this.metadataServiceAvailable = true;
this.bookkeeperFactory = bookKeeperGroupFactory;
this.isBookkeeperManaged = isBookkeeperManaged;
@@ -202,8 +202,9 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
this.cacheEvictionTimeThresholdNanos = TimeUnit.MILLISECONDS
.toNanos(config.getCacheEvictionTimeThresholdMillis());
-
- cacheEvictionExecutor.execute(this::cacheEvictionTask);
+ long evictionTaskInterval = config.getCacheEvictionIntervalMs();
+ cacheEvictionExecutor.scheduleWithFixedDelay(Runnables.catchingAndLoggingThrowables(this::doCacheEviction),
+ evictionTaskInterval, evictionTaskInterval, TimeUnit.MILLISECONDS);
closed = false;
metadataStore.registerSessionListener(this::handleMetadataStoreNotification);
@@ -257,24 +258,6 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
lastStatTimestamp = now;
}
- private void cacheEvictionTask() {
- double evictionFrequency = Math.max(Math.min(config.getCacheEvictionFrequency(), 1000.0), 0.001);
- long waitTimeMillis = (long) (1000 / evictionFrequency);
-
- while (!closed) {
- try {
- doCacheEviction();
-
- Thread.sleep(waitTimeMillis);
- } catch (InterruptedException e) {
- // Factory is shutting down
- return;
- } catch (Throwable t) {
- log.warn("Exception while performing cache eviction: {}", t.getMessage(), t);
- }
- }
- }
-
private synchronized void doCacheEviction() {
long maxTimestamp = System.nanoTime() - cacheEvictionTimeThresholdNanos;
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
index 0649a5aace3..970783668c7 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
@@ -278,7 +278,7 @@ public class EntryCacheManagerTest extends MockedBookKeeperTestCase {
ManagedLedgerFactoryConfig config = new ManagedLedgerFactoryConfig();
config.setMaxCacheSize(7 * 10);
config.setCacheEvictionWatermark(0.8);
- config.setCacheEvictionFrequency(1);
+ config.setCacheEvictionIntervalMs(1000);
@Cleanup("shutdown")
ManagedLedgerFactoryImpl factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc, config);
@@ -347,7 +347,7 @@ public class EntryCacheManagerTest extends MockedBookKeeperTestCase {
public void verifyTimeBasedEviction() throws Exception {
ManagedLedgerFactoryConfig config = new ManagedLedgerFactoryConfig();
config.setMaxCacheSize(1000);
- config.setCacheEvictionFrequency(100);
+ config.setCacheEvictionIntervalMs(10);
config.setCacheEvictionTimeThresholdMillis(100);
@Cleanup("shutdown")
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 9faf26e5997..3e22f41fd5e 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -2582,7 +2582,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
@Test
public void testActiveDeactiveCursorWithDiscardEntriesFromCache() throws Exception {
ManagedLedgerFactoryConfig conf = new ManagedLedgerFactoryConfig();
- conf.setCacheEvictionFrequency(0.1);
+ conf.setCacheEvictionIntervalMs(10000);
@Cleanup("shutdown")
ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, conf);
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 65dad25cf1a..b57b69e2030 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -99,7 +99,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
@Category
private static final String CATEGORY_PLUGIN = "Broker Plugin";
- /***** --- pulsar configuration --- ****/
+ private static final double MIN_ML_CACHE_EVICTION_FREQUENCY = 0.001;
+ private static final double MAX_ML_CACHE_EVICTION_FREQUENCY = 1000.0;
+ private static final long MAX_ML_CACHE_EVICTION_INTERVAL_MS = 1000000L;
+
+ /***** --- pulsar configuration. --- ****/
@FieldContext(
category = CATEGORY_SERVER,
required = true,
@@ -1560,8 +1564,14 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private double managedLedgerCacheEvictionWatermark = 0.9f;
@FieldContext(category = CATEGORY_STORAGE_ML,
- doc = "Configure the cache eviction frequency for the managed ledger cache. Default is 100/s")
- private double managedLedgerCacheEvictionFrequency = 100.0;
+ doc = "Configure the cache eviction frequency for the managed ledger cache.")
+ @Deprecated
+ private double managedLedgerCacheEvictionFrequency = 0;
+
+ @FieldContext(category = CATEGORY_STORAGE_ML,
+ doc = "Configure the cache eviction interval in milliseconds for the managed ledger cache, default is 10ms")
+ private long managedLedgerCacheEvictionIntervalMs = 10;
+
@FieldContext(category = CATEGORY_STORAGE_ML,
dynamic = true,
doc = "All entries that have stayed in cache for more than the configured time, will be evicted")
@@ -2538,4 +2548,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
}
return schemaCompatibilityStrategy;
}
+
+ public long getManagedLedgerCacheEvictionIntervalMs() {
+ return managedLedgerCacheEvictionFrequency > 0
+ ? (long) (1000 / Math.max(
+ Math.min(managedLedgerCacheEvictionFrequency, MAX_ML_CACHE_EVICTION_FREQUENCY),
+ MIN_ML_CACHE_EVICTION_FREQUENCY))
+ : Math.min(MAX_ML_CACHE_EVICTION_INTERVAL_MS, managedLedgerCacheEvictionIntervalMs);
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
index 174933b9cbd..3d4332f3161 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
@@ -61,7 +61,7 @@ public class ManagedLedgerClientFactory implements ManagedLedgerStorage {
managedLedgerFactoryConfig.setMaxCacheSize(conf.getManagedLedgerCacheSizeMB() * 1024L * 1024L);
managedLedgerFactoryConfig.setCacheEvictionWatermark(conf.getManagedLedgerCacheEvictionWatermark());
managedLedgerFactoryConfig.setNumManagedLedgerSchedulerThreads(conf.getManagedLedgerNumSchedulerThreads());
- managedLedgerFactoryConfig.setCacheEvictionFrequency(conf.getManagedLedgerCacheEvictionFrequency());
+ managedLedgerFactoryConfig.setCacheEvictionIntervalMs(conf.getManagedLedgerCacheEvictionIntervalMs());
managedLedgerFactoryConfig.setCacheEvictionTimeThresholdMillis(
conf.getManagedLedgerCacheEvictionTimeThresholdMillis());
managedLedgerFactoryConfig.setCopyEntriesInCache(conf.isManagedLedgerCacheCopyEntries());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
index 9b154690d7e..95e343acfef 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
@@ -53,7 +53,7 @@ public class ConsumerRedeliveryTest extends ProducerConsumerBase {
@BeforeClass
@Override
protected void setup() throws Exception {
- conf.setManagedLedgerCacheEvictionFrequency(0.1);
+ conf.setManagedLedgerCacheEvictionIntervalMs(10000);
super.internalSetup();
super.producerBaseSetup();
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java
index da9ece0b4ec..13f11c02612 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java
@@ -46,7 +46,7 @@ public class PartitionCreationTest extends ProducerConsumerBase {
@BeforeClass
@Override
protected void setup() throws Exception {
- conf.setManagedLedgerCacheEvictionFrequency(0.1);
+ conf.setManagedLedgerCacheEvictionIntervalMs(10000);
super.internalSetup();
super.producerBaseSetup();
}