You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2022/12/07 09:03:14 UTC

[pulsar] branch branch-2.9 updated: [enh][broker] Add metrics for entry cache insertion, eviction (#17248)

This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 9df0048af6a [enh][broker] Add metrics for entry cache insertion, eviction (#17248)
9df0048af6a is described below

commit 9df0048af6abf47c36cbc27dd9ebcd299cd0d4a9
Author: Michael Marshall <mm...@apache.org>
AuthorDate: Wed Aug 24 14:00:45 2022 -0700

    [enh][broker] Add metrics for entry cache insertion, eviction (#17248)
    
    Fixes https://github.com/apache/pulsar/issues/16584
    
    With the `RangeCache`, it is hard to reason about its behavior other than cache hits/misses or the cache's size hitting the limit and triggering a size based eviction. This PR adds 3 new metrics to help provide additional insight into the cache's behavior. It adds `pulsar_ml_cache_inserted_entries_total`, `pulsar_ml_cache_evicted_entries_total`, and `pulsar_ml_cache_entries`.
    
    * Add new metrics for cache insertion, eviction, and current number of entries.
    * Add new methods to the `ManagedLedgerFactoryMXBean` interface.
    * Update several method return values in the `RangeCache`.
    * Update tests.
    
    This change is covered by modified tests that already existed.
    
    There is a breaking change to the `RangeCache` class for the `clear` and the `evictLEntriesBeforeTimestamp` methods. The previous result was a `long`, and now it is a `Pair<Integer, Long>`. The new result matches the same style as `evictLeastAccessedEntries`. Given that this class is only meant for use within the broker, I think it is reasonable to break these methods. I will send a note to the mailing list.
    
    - [x] `doc`
    
    (cherry picked from commit e3b25403e4abb6a0c16073e5fb28a533497d094c)
---
 .../mledger/ManagedLedgerFactoryMXBean.java        | 15 ++++++++
 .../impl/ManagedLedgerFactoryMBeanImpl.java        | 27 ++++++++++++++
 .../mledger/impl/cache/RangeEntryCacheImpl.java    | 14 ++++----
 .../impl/cache/RangeEntryCacheManagerImpl.java     |  4 ++-
 .../apache/bookkeeper/mledger/util/RangeCache.java | 12 ++++---
 .../mledger/impl/EntryCacheManagerTest.java        | 18 ++++++++++
 .../bookkeeper/mledger/util/RangeCacheTest.java    |  5 +--
 .../stats/metrics/ManagedLedgerCacheMetrics.java   |  3 ++
 site2/docs/reference-metrics.md                    | 42 ++++++++++++++++++++++
 9 files changed, 126 insertions(+), 14 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryMXBean.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryMXBean.java
index ea5b2074ffa..f71583ab886 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryMXBean.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryMXBean.java
@@ -66,4 +66,19 @@ public interface ManagedLedgerFactoryMXBean {
      * Get the number of cache evictions during the last minute.
      */
     long getNumberOfCacheEvictions();
+
+    /**
+     * Cumulative number of entries inserted into the cache.
+     */
+    long getCacheInsertedEntriesCount();
+
+    /**
+     * Cumulative number of entries evicted from the cache.
+     */
+    long getCacheEvictedEntriesCount();
+
+    /**
+     * Current number of entries in the cache.
+     */
+    long getCacheEntriesCount();
 }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryMBeanImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryMBeanImpl.java
index d514d8381d9..a5f0c67e68a 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryMBeanImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryMBeanImpl.java
@@ -19,6 +19,7 @@
 package org.apache.bookkeeper.mledger.impl;
 
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAdder;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactoryMXBean;
 import org.apache.pulsar.common.stats.Rate;
 
@@ -31,6 +32,10 @@ public class ManagedLedgerFactoryMBeanImpl implements ManagedLedgerFactoryMXBean
     final Rate cacheMisses = new Rate();
     final Rate cacheEvictions = new Rate();
 
+    private final LongAdder insertedEntryCount = new LongAdder();
+    private final LongAdder evictedEntryCount = new LongAdder();
+    private final LongAdder cacheEntryCount = new LongAdder();
+
     public ManagedLedgerFactoryMBeanImpl(ManagedLedgerFactoryImpl factory) throws Exception {
         this.factory = factory;
     }
@@ -64,6 +69,16 @@ public class ManagedLedgerFactoryMBeanImpl implements ManagedLedgerFactoryMXBean
         cacheEvictions.recordEvent();
     }
 
+    public void recordCacheInsertion() {
+        insertedEntryCount.increment();
+        cacheEntryCount.increment();
+    }
+
+    public void recordNumberOfCacheEntriesEvicted(int count) {
+        evictedEntryCount.add(count);
+        cacheEntryCount.add(-count);
+    }
+
     @Override
     public int getNumberOfManagedLedgers() {
         return factory.ledgers.size();
@@ -104,4 +119,16 @@ public class ManagedLedgerFactoryMBeanImpl implements ManagedLedgerFactoryMXBean
         return cacheEvictions.getCount();
     }
 
+    public long getCacheInsertedEntriesCount() {
+        return insertedEntryCount.sum();
+    }
+
+    public long getCacheEvictedEntriesCount() {
+        return evictedEntryCount.sum();
+    }
+
+    public long getCacheEntriesCount() {
+        return cacheEntryCount.sum();
+    }
+
 }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
index 5a99ce9ea4e..0420bb7abf5 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
@@ -169,7 +169,7 @@ public class RangeEntryCacheImpl implements EntryCache {
                     lastPosition, entriesRemoved, sizeRemoved);
         }
 
-        manager.entriesRemoved(sizeRemoved);
+        manager.entriesRemoved(sizeRemoved, entriesRemoved);
     }
 
     @Override
@@ -185,7 +185,7 @@ public class RangeEntryCacheImpl implements EntryCache {
                     ml.getName(), ledgerId, entriesRemoved, sizeRemoved);
         }
 
-        manager.entriesRemoved(sizeRemoved);
+        manager.entriesRemoved(sizeRemoved, entriesRemoved);
     }
 
     @Override
@@ -336,8 +336,8 @@ public class RangeEntryCacheImpl implements EntryCache {
 
     @Override
     public void clear() {
-        long removedSize = entries.clear();
-        manager.entriesRemoved(removedSize);
+        Pair<Integer, Long> removedPair = entries.clear();
+        manager.entriesRemoved(removedPair.getRight(), removedPair.getLeft());
     }
 
     @Override
@@ -362,14 +362,14 @@ public class RangeEntryCacheImpl implements EntryCache {
                             + " -- Current Size: {} Mb",
                     ml.getName(), sizeToFree / MB, evictedEntries, evictedSize / MB, entries.getSize() / MB);
         }
-        manager.entriesRemoved(evictedSize);
+        manager.entriesRemoved(evictedSize, evictedEntries);
         return evicted;
     }
 
     @Override
     public void invalidateEntriesBeforeTimestamp(long timestamp) {
-        long evictedSize = entries.evictLEntriesBeforeTimestamp(timestamp);
-        manager.entriesRemoved(evictedSize);
+        Pair<Integer, Long> evictedPair = entries.evictLEntriesBeforeTimestamp(timestamp);
+        manager.entriesRemoved(evictedPair.getRight(), evictedPair.getLeft());
     }
 
     private static final Logger log = LoggerFactory.getLogger(RangeEntryCacheImpl.class);
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java
index 3c12c4605b2..f408426e3f8 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java
@@ -142,10 +142,12 @@ public class RangeEntryCacheManagerImpl implements EntryCacheManager {
     }
 
     void entryAdded(long size) {
+        mlFactoryMBean.recordCacheInsertion();
         currentSize.addAndGet(size);
     }
 
-    void entriesRemoved(long size) {
+    void entriesRemoved(long size, int count) {
+        mlFactoryMBean.recordNumberOfCacheEntriesEvicted(count);
         currentSize.addAndGet(-size);
     }
 
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
index 6d6e71b18fa..cb51a162718 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
@@ -184,8 +184,9 @@ public class RangeCache<Key extends Comparable<Key>, Value extends ReferenceCoun
     * @param maxTimestamp the max timestamp of the entries to be evicted
     * @return the tota
     */
-   public long evictLEntriesBeforeTimestamp(long maxTimestamp) {
+   public Pair<Integer, Long> evictLEntriesBeforeTimestamp(long maxTimestamp) {
        long removedSize = 0;
+       int removedCount = 0;
 
        while (true) {
            Map.Entry<Key, Value> entry = entries.firstEntry();
@@ -199,11 +200,12 @@ public class RangeCache<Key extends Comparable<Key>, Value extends ReferenceCoun
            }
 
            removedSize += weighter.getSize(value);
+           removedCount++;
            value.release();
        }
 
        size.addAndGet(-removedSize);
-       return removedSize;
+       return Pair.of(removedCount, removedSize);
    }
 
     /**
@@ -222,8 +224,9 @@ public class RangeCache<Key extends Comparable<Key>, Value extends ReferenceCoun
      *
      * @return size of removed entries
      */
-    public synchronized long clear() {
+    public synchronized Pair<Integer, Long> clear() {
         long removedSize = 0;
+        int removedCount = 0;
 
         while (true) {
             Map.Entry<Key, Value> entry = entries.pollFirstEntry();
@@ -232,12 +235,13 @@ public class RangeCache<Key extends Comparable<Key>, Value extends ReferenceCoun
             }
             Value value = entry.getValue();
             removedSize += weighter.getSize(value);
+            removedCount++;
             value.release();
         }
 
         entries.clear();
         size.getAndAdd(-removedSize);
-        return removedSize;
+        return Pair.of(removedCount, removedSize);
     }
 
     /**
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
index ae4820b2323..0649a5aace3 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
@@ -94,6 +94,9 @@ public class EntryCacheManagerTest extends MockedBookKeeperTestCase {
         assertEquals(factory2.getMbean().getCacheMissesRate(), 0.0);
         assertEquals(factory2.getMbean().getCacheHitsThroughput(), 0.0);
         assertEquals(factory2.getMbean().getNumberOfCacheEvictions(), 0);
+        assertEquals(factory2.getMbean().getCacheInsertedEntriesCount(), 2);
+        assertEquals(factory2.getMbean().getCacheEvictedEntriesCount(), 0);
+        assertEquals(factory2.getMbean().getCacheEntriesCount(), 2);
 
         cache2.insert(EntryImpl.create(2, 0, new byte[1]));
         cache2.insert(EntryImpl.create(2, 1, new byte[1]));
@@ -129,6 +132,9 @@ public class EntryCacheManagerTest extends MockedBookKeeperTestCase {
         assertEquals(factory2.getMbean().getCacheMissesRate(), 0.0);
         assertEquals(factory2.getMbean().getCacheHitsThroughput(), 0.0);
         assertEquals(factory2.getMbean().getNumberOfCacheEvictions(), 1);
+        assertEquals(factory2.getMbean().getCacheInsertedEntriesCount(), 5);
+        assertEquals(factory2.getMbean().getCacheEntriesCount(), 2);
+        assertEquals(factory2.getMbean().getCacheEvictedEntriesCount(), 3);
     }
 
     @Test
@@ -153,6 +159,9 @@ public class EntryCacheManagerTest extends MockedBookKeeperTestCase {
 
         assertEquals(cache1.getSize(), 7);
         assertEquals(cacheManager.getSize(), 7);
+        assertEquals(factory2.getMbean().getCacheInsertedEntriesCount(), 2);
+        assertEquals(factory2.getMbean().getCacheEntriesCount(), 2);
+        assertEquals(factory2.getMbean().getCacheEvictedEntriesCount(), 0);
     }
 
     @Test
@@ -185,6 +194,9 @@ public class EntryCacheManagerTest extends MockedBookKeeperTestCase {
 
         cacheManager.removeEntryCache(ml1.getName());
         assertTrue(cacheManager.getSize() > 0);
+        assertEquals(factory2.getMbean().getCacheInsertedEntriesCount(), 20);
+        assertEquals(factory2.getMbean().getCacheEntriesCount(), 0);
+        assertEquals(factory2.getMbean().getCacheEvictedEntriesCount(), 20);
     }
 
 
@@ -217,6 +229,9 @@ public class EntryCacheManagerTest extends MockedBookKeeperTestCase {
         assertEquals(factory2.getMbean().getCacheMissesRate(), 0.0);
         assertEquals(factory2.getMbean().getCacheHitsThroughput(), 0.0);
         assertEquals(factory2.getMbean().getNumberOfCacheEvictions(), 0);
+        assertEquals(factory2.getMbean().getCacheInsertedEntriesCount(), 0);
+        assertEquals(factory2.getMbean().getCacheEntriesCount(), 0);
+        assertEquals(factory2.getMbean().getCacheEvictedEntriesCount(), 0);
 
         cache2.insert(EntryImpl.create(2, 0, new byte[1]));
         cache2.insert(EntryImpl.create(2, 1, new byte[1]));
@@ -253,6 +268,9 @@ public class EntryCacheManagerTest extends MockedBookKeeperTestCase {
         assertEquals(factory2.getMbean().getCacheMissesRate(), 0.0);
         assertEquals(factory2.getMbean().getCacheHitsThroughput(), 0.0);
         assertEquals(factory2.getMbean().getNumberOfCacheEvictions(), 0);
+        assertEquals(factory2.getMbean().getCacheInsertedEntriesCount(), 0);
+        assertEquals(factory2.getMbean().getCacheEntriesCount(), 0);
+        assertEquals(factory2.getMbean().getCacheEvictedEntriesCount(), 0);
     }
 
     @Test
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java
index f31aa4a74f9..341a4928cc7 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java
@@ -139,8 +139,9 @@ public class RangeCacheTest {
         assertEquals(cache.getSize(), 10);
         assertEquals(cache.getNumberOfEntries(), 4);
 
-        long evictedSize = cache.evictLEntriesBeforeTimestamp(3);
-        assertEquals(evictedSize, 6);
+        Pair<Integer, Long> evictedSize = cache.evictLEntriesBeforeTimestamp(3);
+        assertEquals(evictedSize.getRight().longValue(), 6);
+        assertEquals(evictedSize.getLeft().longValue(), 3);
 
         assertEquals(cache.getSize(), 4);
         assertEquals(cache.getNumberOfEntries(), 1);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerCacheMetrics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerCacheMetrics.java
index 8e46b4cf254..1f4181f887f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerCacheMetrics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerCacheMetrics.java
@@ -48,6 +48,9 @@ public class ManagedLedgerCacheMetrics extends AbstractMetrics {
 
         m.put("brk_ml_count", mlCacheStats.getNumberOfManagedLedgers());
         m.put("brk_ml_cache_used_size", mlCacheStats.getCacheUsedSize());
+        m.put("brk_ml_cache_inserted_entries_total", mlCacheStats.getCacheInsertedEntriesCount());
+        m.put("brk_ml_cache_evicted_entries_total", mlCacheStats.getCacheEvictedEntriesCount());
+        m.put("brk_ml_cache_entries", mlCacheStats.getCacheEntriesCount());
         m.put("brk_ml_cache_evictions", mlCacheStats.getNumberOfCacheEvictions());
         m.put("brk_ml_cache_hits_rate", mlCacheStats.getCacheHitsRate());
         m.put("brk_ml_cache_misses_rate", mlCacheStats.getCacheMissesRate());
diff --git a/site2/docs/reference-metrics.md b/site2/docs/reference-metrics.md
index 720d077d1e9..ff3e858051e 100644
--- a/site2/docs/reference-metrics.md
+++ b/site2/docs/reference-metrics.md
@@ -125,6 +125,48 @@ The following metrics are available for broker:
 - [Pulsar SQL Worker](#pulsar-sql-worker)
 - [Pulsar transaction](#pulsar-transaction)
 
+### Broker metrics
+All the broker metrics are labelled with the following labels:
+- cluster: cluster=${pulsar_cluster}. ${pulsar_cluster} is the cluster name that you have configured in the `broker.conf` file.
+
+| Name | Type   | Description                                          |
+|---|---|---|
+| pulsar_ml_cache_evictions | Gauge  | The number of cache evictions during the last minute. |
+| pulsar_ml_cache_inserted_entries_total | Counter | The number of entries inserted into the entry cache. |
+| pulsar_ml_cache_evicted_entries_total | Counter | The number of entries evicted from the entry cache. |
+| pulsar_ml_cache_entries | Gauge | The number of entries in the entry cache. |
+| pulsar_ml_cache_hits_rate | Gauge  | The number of cache hits per second on the broker side. |
+| pulsar_ml_cache_hits_throughput | Gauge  | The amount of data (byte per second) retrieved from the cache on the broker side. |
+| pulsar_ml_cache_misses_rate | Gauge  | The number of cache missed per second on the broker side. |
+| pulsar_ml_cache_misses_throughput | Gauge  | The amount of data (byte per second) that cannot be retrieved from the cache on the broker side. |
+| pulsar_ml_cache_pool_active_allocations | Gauge  | The number of currently active allocations in direct arena. |
+| pulsar_ml_cache_pool_active_allocations_huge | Gauge  | The number of currently active huge allocation in direct arena. |
+| pulsar_ml_cache_pool_active_allocations_normal | Gauge  | The number of currently active normal allocations in direct arena. |
+| pulsar_ml_cache_pool_active_allocations_small | Gauge  | The number of currently active small allocations in direct arena. |
+| pulsar_ml_cache_pool_allocated | Gauge  | The total allocated memory of chunk lists in direct arena. |
+| pulsar_ml_cache_pool_used | Gauge  | The total used memory of chunk lists in direct arena. |
+| pulsar_ml_cache_used_size | Gauge  | The size used to store the payloads of entries (in bytes). |
+| pulsar_ml_count | Gauge  | The number of currently opened managed ledgers. |
+| topic_load_times | Summary | The topic load latency calculated in milliseconds. |
+| pulsar_active_connections| Gauge | The number of active connections. |
+| pulsar_connection_created_total_count | Gauge | The total number of connections. |
+| pulsar_connection_create_success_count | Gauge | The number of successfully created connections. |
+| pulsar_connection_create_fail_count | Gauge | The number of failed connections. |
+| pulsar_connection_closed_total_count | Gauge | The total number of closed connections. |
+| pulsar_broker_throttled_connections | Gauge | The number of throttled connections. |
+| pulsar_broker_throttled_connections_global_limit | Gauge | The number of throttled connections due to per-connection limit. |
+
+### BookKeeper client metrics
+
+All the BookKeeper client metric are labelled with the following label:
+
+- *cluster*: `cluster=${pulsar_cluster}`. `${pulsar_cluster}` is the cluster name that you configured in `broker.conf`.
+
+| Name | Type | Description |
+|---|---|---|
+| pulsar_managedLedger_client_bookkeeper_client_BOOKIE_QUARANTINE | Counter | The number of bookie clients to be quarantined.<br /><br />If you want to expose this metric, set `bookkeeperClientExposeStatsToPrometheus` to `true` in the `broker.conf` file.|
+
+
 ### Namespace metrics
 
 > Namespace metrics are only exposed when `exposeTopicLevelMetricsInPrometheus` is set to `false`.