You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/12/13 14:49:05 UTC

[GitHub] sijie closed pull request #1874: [stats][doc] Add @StatsDoc annotation for db ledger storage stats

sijie closed pull request #1874: [stats][doc] Add @StatsDoc annotation for db ledger storage stats
URL: https://github.com/apache/bookkeeper/pull/1874
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
index aad42ad361..249bb30011 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
@@ -56,7 +56,6 @@
 import org.apache.bookkeeper.common.util.Watcher;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager;
-import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.DiskChecker;
@@ -85,6 +84,7 @@
 
     // Keep 1 single Bookie GC thread so the the compactions from multiple individual directories are serialized
     private ScheduledExecutorService gcExecutor;
+    private DbLedgerStorageStats stats;
 
     @Override
     public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager,
@@ -121,7 +121,13 @@ public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, Le
                     perDirectoryReadCacheSize));
         }
 
-        registerStats(statsLogger);
+        this.stats = new DbLedgerStorageStats(
+            statsLogger,
+            () -> ledgerStorageList.stream().mapToLong(SingleDirectoryDbLedgerStorage::getWriteCacheSize).sum(),
+            () -> ledgerStorageList.stream().mapToLong(SingleDirectoryDbLedgerStorage::getWriteCacheCount).sum(),
+            () -> ledgerStorageList.stream().mapToLong(SingleDirectoryDbLedgerStorage::getReadCacheSize).sum(),
+            () -> ledgerStorageList.stream().mapToLong(SingleDirectoryDbLedgerStorage::getReadCacheCount).sum()
+        );
     }
 
     @VisibleForTesting
@@ -134,53 +140,6 @@ protected SingleDirectoryDbLedgerStorage newSingleDirectoryDbLedgerStorage(Serve
                 stateManager, checkpointSource, checkpointer, statsLogger, gcExecutor, writeCacheSize, readCacheSize);
     }
 
-    public void registerStats(StatsLogger stats) {
-        stats.registerGauge("write-cache-size", new Gauge<Long>() {
-            @Override
-            public Long getDefaultValue() {
-                return 0L;
-            }
-
-            @Override
-            public Long getSample() {
-                return ledgerStorageList.stream().mapToLong(SingleDirectoryDbLedgerStorage::getWriteCacheSize).sum();
-            }
-        });
-        stats.registerGauge("write-cache-count", new Gauge<Long>() {
-            @Override
-            public Long getDefaultValue() {
-                return 0L;
-            }
-
-            @Override
-            public Long getSample() {
-                return ledgerStorageList.stream().mapToLong(SingleDirectoryDbLedgerStorage::getWriteCacheCount).sum();
-            }
-        });
-        stats.registerGauge("read-cache-size", new Gauge<Long>() {
-            @Override
-            public Long getDefaultValue() {
-                return 0L;
-            }
-
-            @Override
-            public Long getSample() {
-                return ledgerStorageList.stream().mapToLong(SingleDirectoryDbLedgerStorage::getReadCacheSize).sum();
-            }
-        });
-        stats.registerGauge("read-cache-count", new Gauge<Long>() {
-            @Override
-            public Long getDefaultValue() {
-                return 0L;
-            }
-
-            @Override
-            public Long getSample() {
-                return ledgerStorageList.stream().mapToLong(SingleDirectoryDbLedgerStorage::getReadCacheCount).sum();
-            }
-        });
-    }
-
     @Override
     public void start() {
         ledgerStorageList.forEach(LedgerStorage::start);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageStats.java
new file mode 100644
index 0000000000..bc99c609d2
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageStats.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_ADD_ENTRY;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.CATEGORY_SERVER;
+
+import java.util.function.Supplier;
+import lombok.Getter;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.annotations.StatsDoc;
+
+/**
+ * A umbrella class for db ledger storage stats.
+ */
+@StatsDoc(
+    name = BOOKIE_SCOPE,
+    category = CATEGORY_SERVER,
+    help = "DbLedgerStorage related stats"
+)
+@Getter
+class DbLedgerStorageStats {
+
+    private static final String ADD_ENTRY = "add-entry";
+    private static final String READ_ENTRY = "read-entry";
+    private static final String READ_CACHE_HITS = "read-cache-hits";
+    private static final String READ_CACHE_MISSES = "read-cache-misses";
+    private static final String READAHEAD_BATCH_COUNT = "readahead-batch-count";
+    private static final String READAHEAD_BATCH_SIZE = "readahead-batch-size";
+    private static final String FLUSH = "flush";
+    private static final String FLUSH_SIZE = "flush-size";
+    private static final String THROTTLED_WRITE_REQUESTS = "throttled-write-requests";
+    private static final String REJECTED_WRITE_REQUESTS = "rejected-write-requests";
+    private static final String WRITE_CACHE_SIZE = "write-cache-size";
+    private static final String WRITE_CACHE_COUNT = "write-cache-count";
+    private static final String READ_CACHE_SIZE = "read-cache-size";
+    private static final String READ_CACHE_COUNT = "read-cache-count";
+
+    @StatsDoc(
+        name = ADD_ENTRY,
+        help = "operation stats of adding entries to db ledger storage",
+        parent = BOOKIE_ADD_ENTRY
+    )
+    private final OpStatsLogger addEntryStats;
+    @StatsDoc(
+        name = READ_ENTRY,
+        help = "operation stats of reading entries from db ledger storage",
+        parent = BOOKIE_ADD_ENTRY
+    )
+    private final OpStatsLogger readEntryStats;
+    @StatsDoc(
+        name = READ_CACHE_HITS,
+        help = "operation stats of read cache hits",
+        parent = READ_ENTRY
+    )
+    private final OpStatsLogger readCacheHitStats;
+    @StatsDoc(
+        name = READ_CACHE_MISSES,
+        help = "operation stats of read cache misses",
+        parent = READ_ENTRY
+    )
+    private final OpStatsLogger readCacheMissStats;
+    @StatsDoc(
+        name = READAHEAD_BATCH_COUNT,
+        help = "the distribution of num of entries to read in one readahead batch"
+    )
+    private final OpStatsLogger readAheadBatchCountStats;
+    @StatsDoc(
+        name = READAHEAD_BATCH_COUNT,
+        help = "the distribution of num of bytes to read in one readahead batch"
+    )
+    private final OpStatsLogger readAheadBatchSizeStats;
+    @StatsDoc(
+        name = FLUSH_SIZE,
+        help = "operation stats of flushing write cache to entry log files"
+    )
+    private final OpStatsLogger flushStats;
+    @StatsDoc(
+        name = FLUSH_SIZE,
+        help = "the distribution of number of bytes flushed from write cache to entry log files"
+    )
+    private final OpStatsLogger flushSizeStats;
+    @StatsDoc(
+        name = THROTTLED_WRITE_REQUESTS,
+        help = "The number of requests throttled due to write cache is full"
+    )
+    private final Counter throttledWriteRequests;
+    @StatsDoc(
+        name = REJECTED_WRITE_REQUESTS,
+        help = "The number of requests rejected due to write cache is full"
+    )
+    private final Counter rejectedWriteRequests;
+
+    @StatsDoc(
+        name = WRITE_CACHE_SIZE,
+        help = "Current number of bytes in write cache"
+    )
+    private final Gauge<Long> writeCacheSizeGauge;
+    @StatsDoc(
+        name = WRITE_CACHE_COUNT,
+        help = "Current number of entries in write cache"
+    )
+    private final Gauge<Long> writeCacheCountGauge;
+    @StatsDoc(
+        name = READ_CACHE_SIZE,
+        help = "Current number of bytes in read cache"
+    )
+    private final Gauge<Long> readCacheSizeGauge;
+    @StatsDoc(
+        name = READ_CACHE_COUNT,
+        help = "Current number of entries in read cache"
+    )
+    private final Gauge<Long> readCacheCountGauge;
+
+    DbLedgerStorageStats(StatsLogger stats,
+                         Supplier<Long> writeCacheSizeSupplier,
+                         Supplier<Long> writeCacheCountSupplier,
+                         Supplier<Long> readCacheSizeSupplier,
+                         Supplier<Long> readCacheCountSupplier) {
+        addEntryStats = stats.getOpStatsLogger(ADD_ENTRY);
+        readEntryStats = stats.getOpStatsLogger(READ_ENTRY);
+        readCacheHitStats = stats.getOpStatsLogger(READ_CACHE_HITS);
+        readCacheMissStats = stats.getOpStatsLogger(READ_CACHE_MISSES);
+        readAheadBatchCountStats = stats.getOpStatsLogger(READAHEAD_BATCH_COUNT);
+        readAheadBatchSizeStats = stats.getOpStatsLogger(READAHEAD_BATCH_SIZE);
+        flushStats = stats.getOpStatsLogger(FLUSH);
+        flushSizeStats = stats.getOpStatsLogger(FLUSH_SIZE);
+
+        throttledWriteRequests = stats.getCounter(THROTTLED_WRITE_REQUESTS);
+        rejectedWriteRequests = stats.getCounter(REJECTED_WRITE_REQUESTS);
+
+        writeCacheSizeGauge = new Gauge<Long>() {
+            @Override
+            public Long getDefaultValue() {
+                return 0L;
+            }
+
+            @Override
+            public Long getSample() {
+                return writeCacheSizeSupplier.get();
+            }
+        };
+        stats.registerGauge(WRITE_CACHE_SIZE, writeCacheSizeGauge);
+        writeCacheCountGauge = new Gauge<Long>() {
+            @Override
+            public Long getDefaultValue() {
+                return 0L;
+            }
+
+            @Override
+            public Long getSample() {
+                return writeCacheCountSupplier.get();
+            }
+        };
+        stats.registerGauge(WRITE_CACHE_COUNT, writeCacheCountGauge);
+        readCacheSizeGauge = new Gauge<Long>() {
+            @Override
+            public Long getDefaultValue() {
+                return 0L;
+            }
+
+            @Override
+            public Long getSample() {
+                return readCacheSizeSupplier.get();
+            }
+        };
+        stats.registerGauge(READ_CACHE_SIZE, readCacheSizeGauge);
+        readCacheCountGauge = new Gauge<Long>() {
+
+            @Override
+            public Long getDefaultValue() {
+                return 0L;
+            }
+
+            @Override
+            public Long getSample() {
+                return readCacheCountSupplier.get();
+            }
+        };
+        stats.registerGauge(READ_CACHE_COUNT, readCacheCountGauge);
+    }
+
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java
index 21b87e2be9..5673883b91 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java
@@ -34,7 +34,6 @@
 import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.Batch;
 import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType;
 import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.collections.ConcurrentLongHashSet;
 import org.slf4j.Logger;
@@ -51,33 +50,22 @@
     private final KeyValueStorage locationsDb;
     private final ConcurrentLongHashSet deletedLedgers = new ConcurrentLongHashSet();
 
-    private StatsLogger stats;
+    private final EntryLocationIndexStats stats;
 
     public EntryLocationIndex(ServerConfiguration conf, KeyValueStorageFactory storageFactory, String basePath,
             StatsLogger stats) throws IOException {
         String locationsDbPath = FileSystems.getDefault().getPath(basePath, "locations").toFile().toString();
         locationsDb = storageFactory.newKeyValueStorage(locationsDbPath, DbConfigType.Huge, conf);
 
-        this.stats = stats;
-        registerStats();
-    }
-
-    public void registerStats() {
-        stats.registerGauge("entries-count", new Gauge<Long>() {
-            @Override
-            public Long getDefaultValue() {
-                return 0L;
-            }
-
-            @Override
-            public Long getSample() {
+        this.stats = new EntryLocationIndexStats(
+            stats,
+            () -> {
                 try {
                     return locationsDb.count();
                 } catch (IOException e) {
                     return -1L;
                 }
-            }
-        });
+            });
     }
 
     @Override
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndexStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndexStats.java
new file mode 100644
index 0000000000..dd87f7b92b
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndexStats.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.CATEGORY_SERVER;
+
+import java.util.function.Supplier;
+import lombok.Getter;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.annotations.StatsDoc;
+
+/**
+ * A umbrella class for ledger metadata index stats.
+ */
+@StatsDoc(
+    name = BOOKIE_SCOPE,
+    category = CATEGORY_SERVER,
+    help = "Entry location index stats"
+)
+@Getter
+class EntryLocationIndexStats {
+
+    private static final String ENTRIES_COUNT = "entries-count";
+
+    @StatsDoc(
+        name = ENTRIES_COUNT,
+        help = "Current number of entries"
+    )
+    private final Gauge<Long> entriesCountGauge;
+
+    EntryLocationIndexStats(StatsLogger statsLogger,
+                            Supplier<Long> entriesCountSupplier) {
+        entriesCountGauge = new Gauge<Long>() {
+            @Override
+            public Long getDefaultValue() {
+                return 0L;
+            }
+
+            @Override
+            public Long getSample() {
+                return entriesCountSupplier.get();
+            }
+        };
+        statsLogger.registerGauge(ENTRIES_COUNT, entriesCountGauge);
+    }
+
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndex.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndex.java
index 04bf32dba7..8383db4388 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndex.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndex.java
@@ -39,7 +39,6 @@
 import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.CloseableIterator;
 import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType;
 import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap;
 import org.slf4j.Logger;
@@ -56,7 +55,7 @@
     private final AtomicInteger ledgersCount;
 
     private final KeyValueStorage ledgersDb;
-    private StatsLogger stats;
+    private final LedgerMetadataIndexStats stats;
 
     // Holds ledger modifications applied in memory map, and pending to be flushed on db
     private final ConcurrentLinkedQueue<Entry<Long, LedgerData>> pendingLedgersUpdates;
@@ -89,22 +88,9 @@ public LedgerMetadataIndex(ServerConfiguration conf, KeyValueStorageFactory stor
         this.pendingLedgersUpdates = new ConcurrentLinkedQueue<Entry<Long, LedgerData>>();
         this.pendingDeletedLedgers = new ConcurrentLinkedQueue<Long>();
 
-        this.stats = stats;
-        registerStats();
-    }
-
-    public void registerStats() {
-        stats.registerGauge("ledgers-count", new Gauge<Long>() {
-            @Override
-            public Long getDefaultValue() {
-                return 0L;
-            }
-
-            @Override
-            public Long getSample() {
-                return (long) ledgersCount.get();
-            }
-        });
+        this.stats = new LedgerMetadataIndexStats(
+            stats,
+            () -> (long) ledgersCount.get());
     }
 
     @Override
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexStats.java
new file mode 100644
index 0000000000..a46e38b650
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexStats.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.CATEGORY_SERVER;
+
+import java.util.function.Supplier;
+import lombok.Getter;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.annotations.StatsDoc;
+
+/**
+ * A umbrella class for ledger metadata index stats.
+ */
+@StatsDoc(
+    name = BOOKIE_SCOPE,
+    category = CATEGORY_SERVER,
+    help = "Ledger metadata index stats"
+)
+@Getter
+class LedgerMetadataIndexStats {
+
+    private static final String LEDGERS_COUNT = "ledgers-count";
+
+    @StatsDoc(
+        name = LEDGERS_COUNT,
+        help = "Current number of ledgers"
+    )
+    private final Gauge<Long> ledgersCountGauge;
+
+    LedgerMetadataIndexStats(StatsLogger statsLogger,
+                             Supplier<Long> ledgersCountSupplier) {
+        ledgersCountGauge = new Gauge<Long>() {
+            @Override
+            public Long getDefaultValue() {
+                return 0L;
+            }
+
+            @Override
+            public Long getSample() {
+                return ledgersCountSupplier.get();
+            }
+        };
+        statsLogger.registerGauge(LEDGERS_COUNT, ledgersCountGauge);
+    }
+
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
index 27b4214e1b..079c8eee9e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
@@ -61,8 +61,6 @@
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.proto.BookieProtocol;
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.MathUtils;
@@ -120,19 +118,7 @@
 
     private final long maxThrottleTimeNanos;
 
-    private final StatsLogger stats;
-
-    private final OpStatsLogger addEntryStats;
-    private final OpStatsLogger readEntryStats;
-    private final OpStatsLogger readCacheHitStats;
-    private final OpStatsLogger readCacheMissStats;
-    private final OpStatsLogger readAheadBatchCountStats;
-    private final OpStatsLogger readAheadBatchSizeStats;
-    private final OpStatsLogger flushStats;
-    private final OpStatsLogger flushSizeStats;
-
-    private final Counter throttledWriteRequests;
-    private final Counter rejectedWriteRequests;
+    private final DbLedgerStorageStats dbLedgerStorageStats;
 
     static final String READ_AHEAD_CACHE_BATCH_SIZE = "dbStorage_readAheadCacheBatchSize";
     private static final int DEFAULT_READ_AHEAD_CACHE_BATCH_SIZE = 100;
@@ -165,10 +151,8 @@ public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager le
 
         readCache = new ReadCache(readCacheMaxSize);
 
-        this.stats = statsLogger;
-
-        ledgerIndex = new LedgerMetadataIndex(conf, KeyValueStorageRocksDB.factory, baseDir, stats);
-        entryLocationIndex = new EntryLocationIndex(conf, KeyValueStorageRocksDB.factory, baseDir, stats);
+        ledgerIndex = new LedgerMetadataIndex(conf, KeyValueStorageRocksDB.factory, baseDir, statsLogger);
+        entryLocationIndex = new EntryLocationIndex(conf, KeyValueStorageRocksDB.factory, baseDir, statsLogger);
 
         transientLedgerInfoCache = new ConcurrentLongHashMap<>(16 * 1024,
                 Runtime.getRuntime().availableProcessors() * 2);
@@ -179,62 +163,13 @@ public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager le
         entryLogger = new EntryLogger(conf, ledgerDirsManager, null, statsLogger);
         gcThread = new GarbageCollectorThread(conf, ledgerManager, this, statsLogger);
 
-        stats.registerGauge("write-cache-size", new Gauge<Long>() {
-            @Override
-            public Long getDefaultValue() {
-                return 0L;
-            }
-
-            @Override
-            public Long getSample() {
-                return writeCache.size() + writeCacheBeingFlushed.size();
-            }
-        });
-        stats.registerGauge("write-cache-count", new Gauge<Long>() {
-            @Override
-            public Long getDefaultValue() {
-                return 0L;
-            }
-
-            @Override
-            public Long getSample() {
-                return writeCache.count() + writeCacheBeingFlushed.count();
-            }
-        });
-        stats.registerGauge("read-cache-size", new Gauge<Long>() {
-            @Override
-            public Long getDefaultValue() {
-                return 0L;
-            }
-
-            @Override
-            public Long getSample() {
-                return readCache.size();
-            }
-        });
-        stats.registerGauge("read-cache-count", new Gauge<Long>() {
-            @Override
-            public Long getDefaultValue() {
-                return 0L;
-            }
-
-            @Override
-            public Long getSample() {
-                return readCache.count();
-            }
-        });
-
-        addEntryStats = stats.getOpStatsLogger("add-entry");
-        readEntryStats = stats.getOpStatsLogger("read-entry");
-        readCacheHitStats = stats.getOpStatsLogger("read-cache-hits");
-        readCacheMissStats = stats.getOpStatsLogger("read-cache-misses");
-        readAheadBatchCountStats = stats.getOpStatsLogger("readahead-batch-count");
-        readAheadBatchSizeStats = stats.getOpStatsLogger("readahead-batch-size");
-        flushStats = stats.getOpStatsLogger("flush");
-        flushSizeStats = stats.getOpStatsLogger("flush-size");
-
-        throttledWriteRequests = stats.getCounter("throttled-write-requests");
-        rejectedWriteRequests = stats.getCounter("rejected-write-requests");
+        dbLedgerStorageStats = new DbLedgerStorageStats(
+            statsLogger,
+            () -> writeCache.size() + writeCacheBeingFlushed.size(),
+            () -> writeCache.count() + writeCacheBeingFlushed.count(),
+            () -> readCache.size(),
+            () -> readCache.count()
+        );
     }
 
     @Override
@@ -384,7 +319,7 @@ public long addEntry(ByteBuf entry) throws IOException, BookieException {
         // after successfully insert the entry, update LAC and notify the watchers
         updateCachedLacIfNeeded(ledgerId, lac);
 
-        recordSuccessfulEvent(addEntryStats, startTime);
+        recordSuccessfulEvent(dbLedgerStorageStats.getAddEntryStats(), startTime);
         return entryId;
     }
 
@@ -405,7 +340,7 @@ private void triggerFlushAndAddEntry(long ledgerId, long entryId, ByteBuf entry)
             });
         }
 
-        throttledWriteRequests.inc();
+        dbLedgerStorageStats.getThrottledWriteRequests().inc();
         long absoluteTimeoutNanos = System.nanoTime() + maxThrottleTimeNanos;
 
         while (System.nanoTime() < absoluteTimeoutNanos) {
@@ -429,7 +364,7 @@ private void triggerFlushAndAddEntry(long ledgerId, long entryId, ByteBuf entry)
         }
 
         // Timeout expired and we weren't able to insert in write cache
-        rejectedWriteRequests.inc();
+        dbLedgerStorageStats.getRejectedWriteRequests().inc();
         throw new OperationRejectedException();
     }
 
@@ -464,24 +399,24 @@ public ByteBuf getEntry(long ledgerId, long entryId) throws IOException {
         // First try to read from the write cache of recent entries
         ByteBuf entry = localWriteCache.get(ledgerId, entryId);
         if (entry != null) {
-            recordSuccessfulEvent(readCacheHitStats, startTime);
-            recordSuccessfulEvent(readEntryStats, startTime);
+            recordSuccessfulEvent(dbLedgerStorageStats.getReadCacheHitStats(), startTime);
+            recordSuccessfulEvent(dbLedgerStorageStats.getReadEntryStats(), startTime);
             return entry;
         }
 
         // If there's a flush going on, the entry might be in the flush buffer
         entry = localWriteCacheBeingFlushed.get(ledgerId, entryId);
         if (entry != null) {
-            recordSuccessfulEvent(readCacheHitStats, startTime);
-            recordSuccessfulEvent(readEntryStats, startTime);
+            recordSuccessfulEvent(dbLedgerStorageStats.getReadCacheHitStats(), startTime);
+            recordSuccessfulEvent(dbLedgerStorageStats.getReadEntryStats(), startTime);
             return entry;
         }
 
         // Try reading from read-ahead cache
         entry = readCache.get(ledgerId, entryId);
         if (entry != null) {
-            recordSuccessfulEvent(readCacheHitStats, startTime);
-            recordSuccessfulEvent(readEntryStats, startTime);
+            recordSuccessfulEvent(dbLedgerStorageStats.getReadCacheHitStats(), startTime);
+            recordSuccessfulEvent(dbLedgerStorageStats.getReadEntryStats(), startTime);
             return entry;
         }
 
@@ -494,7 +429,7 @@ public ByteBuf getEntry(long ledgerId, long entryId) throws IOException {
             }
             entry = entryLogger.readEntry(ledgerId, entryId, entryLocation);
         } catch (NoEntryException e) {
-            recordFailedEvent(readEntryStats, startTime);
+            recordFailedEvent(dbLedgerStorageStats.getReadEntryStats(), startTime);
             throw e;
         }
 
@@ -504,8 +439,8 @@ public ByteBuf getEntry(long ledgerId, long entryId) throws IOException {
         long nextEntryLocation = entryLocation + 4 /* size header */ + entry.readableBytes();
         fillReadAheadCache(ledgerId, entryId + 1, nextEntryLocation);
 
-        recordSuccessfulEvent(readCacheMissStats, startTime);
-        recordSuccessfulEvent(readEntryStats, startTime);
+        recordSuccessfulEvent(dbLedgerStorageStats.getReadCacheMissStats(), startTime);
+        recordSuccessfulEvent(dbLedgerStorageStats.getReadEntryStats(), startTime);
         return entry;
     }
 
@@ -542,8 +477,8 @@ private void fillReadAheadCache(long orginalLedgerId, long firstEntryId, long fi
                 }
             }
 
-            readAheadBatchCountStats.registerSuccessfulValue(count);
-            readAheadBatchSizeStats.registerSuccessfulValue(size);
+            dbLedgerStorageStats.getReadAheadBatchCountStats().registerSuccessfulValue(count);
+            dbLedgerStorageStats.getReadAheadBatchSizeStats().registerSuccessfulValue(size);
         } catch (Exception e) {
             if (log.isDebugEnabled()) {
                 log.debug("Exception during read ahead for ledger: {}: e", orginalLedgerId, e);
@@ -569,8 +504,8 @@ public ByteBuf getLastEntry(long ledgerId) throws IOException {
                     }
                 }
 
-                recordSuccessfulEvent(readCacheHitStats, startTime);
-                recordSuccessfulEvent(readEntryStats, startTime);
+                recordSuccessfulEvent(dbLedgerStorageStats.getReadCacheHitStats(), startTime);
+                recordSuccessfulEvent(dbLedgerStorageStats.getReadEntryStats(), startTime);
                 return entry;
             }
 
@@ -586,8 +521,8 @@ public ByteBuf getLastEntry(long ledgerId) throws IOException {
                     }
                 }
 
-                recordSuccessfulEvent(readCacheHitStats, startTime);
-                recordSuccessfulEvent(readEntryStats, startTime);
+                recordSuccessfulEvent(dbLedgerStorageStats.getReadCacheHitStats(), startTime);
+                recordSuccessfulEvent(dbLedgerStorageStats.getReadEntryStats(), startTime);
                 return entry;
             }
         } finally {
@@ -603,8 +538,8 @@ public ByteBuf getLastEntry(long ledgerId) throws IOException {
         long entryLocation = entryLocationIndex.getLocation(ledgerId, lastEntryId);
         ByteBuf content = entryLogger.readEntry(ledgerId, lastEntryId, entryLocation);
 
-        recordSuccessfulEvent(readCacheMissStats, startTime);
-        recordSuccessfulEvent(readEntryStats, startTime);
+        recordSuccessfulEvent(dbLedgerStorageStats.getReadCacheMissStats(), startTime);
+        recordSuccessfulEvent(dbLedgerStorageStats.getReadEntryStats(), startTime);
         return content;
     }
 
@@ -693,8 +628,8 @@ public void checkpoint(Checkpoint checkpoint) throws IOException {
                 log.debug("Flushing done time {} s -- Written {} MB/s", flushTimeSeconds, flushThroughput);
             }
 
-            recordSuccessfulEvent(flushStats, startTime);
-            flushSizeStats.registerSuccessfulValue(sizeToFlush);
+            recordSuccessfulEvent(dbLedgerStorageStats.getFlushStats(), startTime);
+            dbLedgerStorageStats.getFlushSizeStats().registerSuccessfulValue(sizeToFlush);
         } catch (IOException e) {
             // Leave IOExecption as it is
             throw e;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services