You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/09/02 22:36:53 UTC

[kafka] branch trunk updated: KAFKA-9924: Add remaining property-based RocksDB metrics as described in KIP-607 (#9232)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c04000c  KAFKA-9924: Add remaining property-based RocksDB metrics as described in KIP-607 (#9232)
c04000c is described below

commit c04000cab1e98c206c5410ef68d00df1d9129182
Author: Bruno Cadonna <br...@confluent.io>
AuthorDate: Thu Sep 3 00:32:17 2020 +0200

    KAFKA-9924: Add remaining property-based RocksDB metrics as described in KIP-607 (#9232)
    
    This commit adds the remaining property-based RocksDB metrics as described in KIP-607, except for num-entries-active-mem-table, which was added in PR #9177.
    
    Reviewers: Guozhang Wang <wa...@gmail.com>
---
 .../state/internals/metrics/RocksDBMetrics.java    | 329 ++++++++++++++++++++-
 .../internals/metrics/RocksDBMetricsRecorder.java  | 225 ++++++++++++--
 .../integration/RocksDBMetricsIntegrationTest.java |  45 ++-
 .../internals/metrics/StreamsMetricsImplTest.java  |  26 +-
 .../streams/state/internals/RocksDBStoreTest.java  |  64 +++-
 .../metrics/RocksDBMetricsRecorderGaugesTest.java  | 188 +++++++++++-
 .../metrics/RocksDBMetricsRecorderTest.java        | 161 +++++++++-
 .../internals/metrics/RocksDBMetricsTest.java      | 262 +++++++++++++++-
 8 files changed, 1225 insertions(+), 75 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetrics.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetrics.java
index f0c25a8..998304e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetrics.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetrics.java
@@ -59,6 +59,27 @@ public class RocksDBMetrics {
     private static final String NUMBER_OF_OPEN_FILES = "number-open-files";
     private static final String NUMBER_OF_FILE_ERRORS = "number-file-errors";
     static final String NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE = "num-entries-active-mem-table";
+    static final String NUMBER_OF_DELETES_ACTIVE_MEMTABLE = "num-deletes-active-mem-table";
+    static final String NUMBER_OF_ENTRIES_IMMUTABLE_MEMTABLES = "num-entries-imm-mem-tables";
+    static final String NUMBER_OF_DELETES_IMMUTABLE_MEMTABLES = "num-deletes-imm-mem-tables";
+    static final String NUMBER_OF_IMMUTABLE_MEMTABLES = "num-immutable-mem-table";
+    static final String CURRENT_SIZE_OF_ACTIVE_MEMTABLE = "cur-size-active-mem-table";
+    static final String CURRENT_SIZE_OF_ALL_MEMTABLES = "cur-size-all-mem-tables";
+    static final String SIZE_OF_ALL_MEMTABLES = "size-all-mem-tables";
+    static final String MEMTABLE_FLUSH_PENDING = "mem-table-flush-pending";
+    static final String NUMBER_OF_RUNNING_FLUSHES = "num-running-flushes";
+    static final String COMPACTION_PENDING = "compaction-pending";
+    static final String NUMBER_OF_RUNNING_COMPACTIONS = "num-running-compactions";
+    static final String ESTIMATED_BYTES_OF_PENDING_COMPACTION = "estimate-pending-compaction-bytes";
+    static final String TOTAL_SST_FILES_SIZE = "total-sst-files-size";
+    static final String LIVE_SST_FILES_SIZE = "live-sst-files-size";
+    static final String NUMBER_OF_LIVE_VERSIONS = "num-live-versions";
+    static final String CAPACITY_OF_BLOCK_CACHE = "block-cache-capacity";
+    static final String USAGE_OF_BLOCK_CACHE = "block-cache-usage";
+    static final String PINNED_USAGE_OF_BLOCK_CACHE = "block-cache-pinned-usage";
+    static final String ESTIMATED_NUMBER_OF_KEYS = "estimate-num-keys";
+    static final String ESTIMATED_MEMORY_OF_TABLE_READERS = "estimate-table-readers-mem";
+    static final String NUMBER_OF_BACKGROUND_ERRORS = "background-errors";
 
     private static final String BYTES_WRITTEN_TO_DB_RATE_DESCRIPTION =
         "Average number of bytes written per second to the RocksDB state store";
@@ -98,7 +119,43 @@ public class RocksDBMetrics {
     private static final String NUMBER_OF_OPEN_FILES_DESCRIPTION = "Number of currently open files";
     private static final String NUMBER_OF_FILE_ERRORS_DESCRIPTION = "Total number of file errors occurred";
     private static final String NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE_DESCRIPTION =
-            "Current total number of entries in the active memtable";
+        "Total number of entries in the active memtable";
+    private static final String NUMBER_OF_DELETES_ACTIVE_MEMTABLES_DESCRIPTION =
+        "Total number of delete entries in the active memtable";
+    private static final String NUMBER_OF_ENTRIES_IMMUTABLE_MEMTABLES_DESCRIPTION =
+        "Total number of entries in the unflushed immutable memtables";
+    private static final String NUMBER_OF_DELETES_IMMUTABLE_MEMTABLES_DESCRIPTION =
+        "Total number of delete entries in the unflushed immutable memtables";
+    private static final String NUMBER_OF_IMMUTABLE_MEMTABLES_DESCRIPTION =
+        "Number of immutable memtables that have not yet been flushed";
+    private static final String CURRENT_SIZE_OF_ACTIVE_MEMTABLE_DESCRIPTION =
+        "Approximate size of active memtable in bytes";
+    private static final String CURRENT_SIZE_OF_ALL_MEMTABLES_DESCRIPTION =
+        "Approximate size of active and unflushed immutable memtable in bytes";
+    private static final String SIZE_OF_ALL_MEMTABLES_DESCRIPTION =
+        "Approximate size of active, unflushed immutable, and pinned immutable memtables in bytes";
+    private static final String MEMTABLE_FLUSH_PENDING_DESCRIPTION =
+        "Reports 1 if a memtable flush is pending, otherwise it reports 0";
+    private static final String NUMBER_OF_RUNNING_FLUSHES_DESCRIPTION = "Number of currently running flushes";
+    private static final String COMPACTION_PENDING_DESCRIPTION =
+        "Reports 1 if at least one compaction is pending, otherwise it reports 0";
+    private static final String NUMBER_OF_RUNNING_COMPACTIONS_DESCRIPTION = "Number of currently running compactions";
+    private static final String ESTIMATED_BYTES_OF_PENDING_COMPACTION_DESCRIPTION =
+        "Estimated total number of bytes a compaction needs to rewrite on disk to get all levels down to under target size";
+    private static final String TOTAL_SST_FILE_SIZE_DESCRIPTION = "Total size in bytes of all SST files";
+    private static final String LIVE_SST_FILES_SIZE_DESCRIPTION =
+        "Total size in bytes of all SST files that belong to the latest LSM tree";
+    private static final String NUMBER_OF_LIVE_VERSIONS_DESCRIPTION = "Number of live versions";
+    private static final String CAPACITY_OF_BLOCK_CACHE_DESCRIPTION = "Capacity of the block cache in bytes";
+    private static final String USAGE_OF_BLOCK_CACHE_DESCRIPTION =
+        "Memory size of the entries residing in block cache in bytes";
+    private static final String PINNED_USAGE_OF_BLOCK_CACHE_DESCRIPTION =
+        "Memory size for the entries being pinned in the block cache in bytes";
+    private static final String ESTIMATED_NUMBER_OF_KEYS_DESCRIPTION =
+        "Estimated number of total keys in the active and unflushed immutable memtables and storage";
+    private static final String ESTIMATED_MEMORY_OF_TABLE_READERS_DESCRIPTION =
+        "Estimated memory in bytes used for reading SST tables, excluding memory used in block cache";
+    private static final String TOTAL_NUMBER_OF_BACKGROUND_ERRORS_DESCRIPTION = "Total number of background errors";
 
     public static class RocksDBMetricContext {
         private final String taskName;
@@ -457,12 +514,278 @@ public class RocksDBMetrics {
     public static void addNumEntriesActiveMemTableMetric(final StreamsMetricsImpl streamsMetrics,
                                                          final RocksDBMetricContext metricContext,
                                                          final Gauge<BigInteger> valueProvider) {
+        addMutableMetric(
+            streamsMetrics,
+            metricContext,
+            valueProvider,
+            NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE,
+            NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE_DESCRIPTION
+        );
+    }
+
+    public static void addNumEntriesImmMemTablesMetric(final StreamsMetricsImpl streamsMetrics,
+                                                       final RocksDBMetricContext metricContext,
+                                                       final Gauge<BigInteger> valueProvider) {
+        addMutableMetric(
+            streamsMetrics,
+            metricContext,
+            valueProvider,
+            NUMBER_OF_ENTRIES_IMMUTABLE_MEMTABLES,
+            NUMBER_OF_ENTRIES_IMMUTABLE_MEMTABLES_DESCRIPTION
+        );
+    }
+
+    public static void addNumDeletesImmMemTablesMetric(final StreamsMetricsImpl streamsMetrics,
+                                                       final RocksDBMetricContext metricContext,
+                                                       final Gauge<BigInteger> valueProvider) {
+        addMutableMetric(
+            streamsMetrics,
+            metricContext,
+            valueProvider,
+            NUMBER_OF_DELETES_IMMUTABLE_MEMTABLES,
+            NUMBER_OF_DELETES_IMMUTABLE_MEMTABLES_DESCRIPTION
+        );
+    }
+
+    public static void addNumDeletesActiveMemTableMetric(final StreamsMetricsImpl streamsMetrics,
+                                                         final RocksDBMetricContext metricContext,
+                                                         final Gauge<BigInteger> valueProvider) {
+        addMutableMetric(
+            streamsMetrics,
+            metricContext,
+            valueProvider,
+            NUMBER_OF_DELETES_ACTIVE_MEMTABLE,
+            NUMBER_OF_DELETES_ACTIVE_MEMTABLES_DESCRIPTION
+        );
+    }
+
+    public static void addNumImmutableMemTableMetric(final StreamsMetricsImpl streamsMetrics,
+                                                     final RocksDBMetricContext metricContext,
+                                                     final Gauge<BigInteger> valueProvider) {
+        addMutableMetric(
+            streamsMetrics,
+            metricContext,
+            valueProvider,
+            NUMBER_OF_IMMUTABLE_MEMTABLES,
+            NUMBER_OF_IMMUTABLE_MEMTABLES_DESCRIPTION
+        );
+    }
+
+    public static void addCurSizeActiveMemTable(final StreamsMetricsImpl streamsMetrics,
+                                                final RocksDBMetricContext metricContext,
+                                                final Gauge<BigInteger> valueProvider) {
+        addMutableMetric(
+            streamsMetrics,
+            metricContext,
+            valueProvider,
+            CURRENT_SIZE_OF_ACTIVE_MEMTABLE,
+            CURRENT_SIZE_OF_ACTIVE_MEMTABLE_DESCRIPTION
+        );
+    }
+
+    public static void addCurSizeAllMemTables(final StreamsMetricsImpl streamsMetrics,
+                                              final RocksDBMetricContext metricContext,
+                                              final Gauge<BigInteger> valueProvider) {
+        addMutableMetric(
+            streamsMetrics,
+            metricContext,
+            valueProvider,
+            CURRENT_SIZE_OF_ALL_MEMTABLES,
+            CURRENT_SIZE_OF_ALL_MEMTABLES_DESCRIPTION
+        );
+    }
+
+    public static void addSizeAllMemTables(final StreamsMetricsImpl streamsMetrics,
+                                           final RocksDBMetricContext metricContext,
+                                           final Gauge<BigInteger> valueProvider) {
+        addMutableMetric(
+            streamsMetrics,
+            metricContext,
+            valueProvider,
+            SIZE_OF_ALL_MEMTABLES,
+            SIZE_OF_ALL_MEMTABLES_DESCRIPTION
+        );
+    }
+
+    public static void addMemTableFlushPending(final StreamsMetricsImpl streamsMetrics,
+                                               final RocksDBMetricContext metricContext,
+                                               final Gauge<BigInteger> valueProvider) {
+        addMutableMetric(
+            streamsMetrics,
+            metricContext,
+            valueProvider,
+            MEMTABLE_FLUSH_PENDING,
+            MEMTABLE_FLUSH_PENDING_DESCRIPTION
+        );
+    }
+
+    public static void addNumRunningFlushesMetric(final StreamsMetricsImpl streamsMetrics,
+                                                  final RocksDBMetricContext metricContext,
+                                                  final Gauge<BigInteger> valueProvider) {
+        addMutableMetric(
+            streamsMetrics,
+            metricContext,
+            valueProvider,
+            NUMBER_OF_RUNNING_FLUSHES,
+            NUMBER_OF_RUNNING_FLUSHES_DESCRIPTION
+        );
+    }
+
+    public static void addCompactionPendingMetric(final StreamsMetricsImpl streamsMetrics,
+                                                  final RocksDBMetricContext metricContext,
+                                                  final Gauge<BigInteger> valueProvider) {
+        addMutableMetric(
+            streamsMetrics,
+            metricContext,
+            valueProvider,
+            COMPACTION_PENDING,
+            COMPACTION_PENDING_DESCRIPTION
+        );
+    }
+
+    public static void addNumRunningCompactionsMetric(final StreamsMetricsImpl streamsMetrics,
+                                                      final RocksDBMetricContext metricContext,
+                                                      final Gauge<BigInteger> valueProvider) {
+        addMutableMetric(
+            streamsMetrics,
+            metricContext,
+            valueProvider,
+            NUMBER_OF_RUNNING_COMPACTIONS,
+            NUMBER_OF_RUNNING_COMPACTIONS_DESCRIPTION
+        );
+    }
+
+    public static void addEstimatePendingCompactionBytesMetric(final StreamsMetricsImpl streamsMetrics,
+                                                               final RocksDBMetricContext metricContext,
+                                                               final Gauge<BigInteger> valueProvider) {
+        addMutableMetric(
+            streamsMetrics,
+            metricContext,
+            valueProvider,
+            ESTIMATED_BYTES_OF_PENDING_COMPACTION,
+            ESTIMATED_BYTES_OF_PENDING_COMPACTION_DESCRIPTION
+        );
+    }
+
+    public static void addTotalSstFilesSizeMetric(final StreamsMetricsImpl streamsMetrics,
+                                                  final RocksDBMetricContext metricContext,
+                                                  final Gauge<BigInteger> valueProvider) {
+        addMutableMetric(
+            streamsMetrics,
+            metricContext,
+            valueProvider,
+            TOTAL_SST_FILES_SIZE,
+            TOTAL_SST_FILE_SIZE_DESCRIPTION
+        );
+    }
+
+    public static void addLiveSstFilesSizeMetric(final StreamsMetricsImpl streamsMetrics,
+                                                 final RocksDBMetricContext metricContext,
+                                                 final Gauge<BigInteger> valueProvider) {
+        addMutableMetric(
+            streamsMetrics,
+            metricContext,
+            valueProvider,
+            LIVE_SST_FILES_SIZE,
+            LIVE_SST_FILES_SIZE_DESCRIPTION
+        );
+    }
+
+    public static void addNumLiveVersionMetric(final StreamsMetricsImpl streamsMetrics,
+                                               final RocksDBMetricContext metricContext,
+                                               final Gauge<BigInteger> valueProvider) {
+        addMutableMetric(
+            streamsMetrics,
+            metricContext,
+            valueProvider,
+            NUMBER_OF_LIVE_VERSIONS,
+            NUMBER_OF_LIVE_VERSIONS_DESCRIPTION
+        );
+    }
+
+    public static void addBlockCacheCapacityMetric(final StreamsMetricsImpl streamsMetrics,
+                                                   final RocksDBMetricContext metricContext,
+                                                   final Gauge<BigInteger> valueProvider) {
+        addMutableMetric(
+            streamsMetrics,
+            metricContext,
+            valueProvider,
+            CAPACITY_OF_BLOCK_CACHE,
+            CAPACITY_OF_BLOCK_CACHE_DESCRIPTION
+        );
+    }
+
+    public static void addBlockCacheUsageMetric(final StreamsMetricsImpl streamsMetrics,
+                                                final RocksDBMetricContext metricContext,
+                                                final Gauge<BigInteger> valueProvider) {
+        addMutableMetric(
+            streamsMetrics,
+            metricContext,
+            valueProvider,
+            USAGE_OF_BLOCK_CACHE,
+            USAGE_OF_BLOCK_CACHE_DESCRIPTION
+        );
+    }
+
+    public static void addBlockCachePinnedUsageMetric(final StreamsMetricsImpl streamsMetrics,
+                                                      final RocksDBMetricContext metricContext,
+                                                      final Gauge<BigInteger> valueProvider) {
+        addMutableMetric(
+            streamsMetrics,
+            metricContext,
+            valueProvider,
+            PINNED_USAGE_OF_BLOCK_CACHE,
+            PINNED_USAGE_OF_BLOCK_CACHE_DESCRIPTION
+        );
+    }
+
+    public static void addEstimateNumKeysMetric(final StreamsMetricsImpl streamsMetrics,
+                                                final RocksDBMetricContext metricContext,
+                                                final Gauge<BigInteger> valueProvider) {
+        addMutableMetric(
+            streamsMetrics,
+            metricContext,
+            valueProvider,
+            ESTIMATED_NUMBER_OF_KEYS,
+            ESTIMATED_NUMBER_OF_KEYS_DESCRIPTION
+        );
+    }
+
+    public static void addEstimateTableReadersMemMetric(final StreamsMetricsImpl streamsMetrics,
+                                                        final RocksDBMetricContext metricContext,
+                                                        final Gauge<BigInteger> valueProvider) {
+        addMutableMetric(
+            streamsMetrics,
+            metricContext,
+            valueProvider,
+            ESTIMATED_MEMORY_OF_TABLE_READERS,
+            ESTIMATED_MEMORY_OF_TABLE_READERS_DESCRIPTION
+        );
+    }
+
+    public static void addBackgroundErrorsMetric(final StreamsMetricsImpl streamsMetrics,
+                                                 final RocksDBMetricContext metricContext,
+                                                 final Gauge<BigInteger> valueProvider) {
+        addMutableMetric(
+            streamsMetrics,
+            metricContext,
+            valueProvider,
+            NUMBER_OF_BACKGROUND_ERRORS,
+            TOTAL_NUMBER_OF_BACKGROUND_ERRORS_DESCRIPTION
+        );
+    }
+
+    private static void addMutableMetric(final StreamsMetricsImpl streamsMetrics,
+                                         final RocksDBMetricContext metricContext,
+                                         final Gauge<BigInteger> valueProvider,
+                                         final String name,
+                                         final String description) {
         streamsMetrics.addStoreLevelMutableMetric(
             metricContext.taskName(),
             metricContext.metricsScope(),
             metricContext.storeName(),
-            NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE,
-            NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE_DESCRIPTION,
+            name,
+            description,
             RecordingLevel.INFO,
             valueProvider
         );
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java
index 8fb0a31..85412d1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals.metrics;
 
+import org.apache.kafka.common.metrics.Gauge;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.errors.ProcessorStateException;
@@ -32,11 +33,33 @@ import org.slf4j.Logger;
 
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.CAPACITY_OF_BLOCK_CACHE;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.COMPACTION_PENDING;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.CURRENT_SIZE_OF_ACTIVE_MEMTABLE;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.CURRENT_SIZE_OF_ALL_MEMTABLES;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.ESTIMATED_BYTES_OF_PENDING_COMPACTION;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.ESTIMATED_MEMORY_OF_TABLE_READERS;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.ESTIMATED_NUMBER_OF_KEYS;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.LIVE_SST_FILES_SIZE;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.MEMTABLE_FLUSH_PENDING;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.NUMBER_OF_DELETES_ACTIVE_MEMTABLE;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.NUMBER_OF_DELETES_IMMUTABLE_MEMTABLES;
 import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.NUMBER_OF_ENTRIES_IMMUTABLE_MEMTABLES;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.NUMBER_OF_IMMUTABLE_MEMTABLES;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.NUMBER_OF_LIVE_VERSIONS;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.NUMBER_OF_RUNNING_COMPACTIONS;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.NUMBER_OF_RUNNING_FLUSHES;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.PINNED_USAGE_OF_BLOCK_CACHE;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.SIZE_OF_ALL_MEMTABLES;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.NUMBER_OF_BACKGROUND_ERRORS;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.TOTAL_SST_FILES_SIZE;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.USAGE_OF_BLOCK_CACHE;
 
 public class RocksDBMetricsRecorder {
 
@@ -64,6 +87,7 @@ public class RocksDBMetricsRecorder {
 
     private static final String ROCKSDB_PROPERTIES_PREFIX = "rocksdb.";
 
+
     private final Logger logger;
 
     private Sensor bytesWrittenToDatabaseSensor;
@@ -84,6 +108,7 @@ public class RocksDBMetricsRecorder {
     private final String storeName;
     private TaskId taskId;
     private StreamsMetricsImpl streamsMetrics;
+    private boolean singleCache = true;
 
     public RocksDBMetricsRecorder(final String metricsScope,
                                   final String storeName) {
@@ -133,26 +158,51 @@ public class RocksDBMetricsRecorder {
             logger.debug("Adding metrics recorder of task {} to metrics recording trigger", taskId);
             streamsMetrics.rocksDBMetricsRecordingTrigger().addMetricsRecorder(this);
         } else if (storeToValueProviders.containsKey(segmentName)) {
-            throw new IllegalStateException("Value providers for store \"" + segmentName + "\" of task " + taskId +
+            throw new IllegalStateException("Value providers for store " + segmentName + " of task " + taskId +
                 " has been already added. This is a bug in Kafka Streams. " +
                 "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues");
         }
-        verifyStatistics(segmentName, statistics);
+        verifyDbAndCacheAndStatistics(segmentName, db, cache, statistics);
         logger.debug("Adding value providers for store {} of task {}", segmentName, taskId);
         storeToValueProviders.put(segmentName, new DbAndCacheAndStatistics(db, cache, statistics));
     }
 
-    private void verifyStatistics(final String segmentName, final Statistics statistics) {
-        if (!storeToValueProviders.isEmpty() && (
-                statistics == null &&
-                storeToValueProviders.values().stream().anyMatch(valueProviders -> valueProviders.statistics != null)
-                ||
-                statistics != null &&
-                storeToValueProviders.values().stream().anyMatch(valueProviders -> valueProviders.statistics == null))) {
+    private void verifyDbAndCacheAndStatistics(final String segmentName,
+                                               final RocksDB db,
+                                               final Cache cache,
+                                               final Statistics statistics) {
+        for (final DbAndCacheAndStatistics valueProviders : storeToValueProviders.values()) {
+            verifyConsistencyOfValueProvidersAcrossSegments(segmentName, statistics, valueProviders.statistics, "statistics");
+            verifyConsistencyOfValueProvidersAcrossSegments(segmentName, cache, valueProviders.cache, "cache");
+            if (db == valueProviders.db) {
+                throw new IllegalStateException("DB instance for store " + segmentName + " of task " + taskId +
+                    " was already added for another segment as a value provider. This is a bug in Kafka Streams. " +
+                    "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues");
+            }
+            if (storeToValueProviders.size() == 1 && cache != valueProviders.cache) {
+                singleCache = false;
+            } else if (singleCache && cache != valueProviders.cache || !singleCache && cache == valueProviders.cache) {
+                throw new IllegalStateException("Caches for store " + storeName + " of task " + taskId +
+                    " are either not all distinct or do not all refer to the same cache. This is a bug in Kafka Streams. " +
+                    "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues");
+            }
+        }
+    }
+
+    private void verifyConsistencyOfValueProvidersAcrossSegments(final String segmentName,
+                                                                 final Object newValueProvider,
+                                                                 final Object oldValueProvider,
+                                                                 final String valueProviderName) {
+        if (newValueProvider == null && oldValueProvider != null ||
+            newValueProvider != null && oldValueProvider == null) {
 
-            throw new IllegalStateException("Statistics for store \"" + segmentName + "\" of task " + taskId +
-                " is" + (statistics == null ? " " : " not ") + "null although the statistics of another store in this " +
-                "metrics recorder is" + (statistics != null ? " " : " not ") + "null. " +
+            final char capitalizedFirstChar = valueProviderName.toUpperCase(Locale.US).charAt(0);
+            final StringBuilder capitalizedValueProviderName = new StringBuilder(valueProviderName);
+            capitalizedValueProviderName.setCharAt(0, capitalizedFirstChar);
+            throw new IllegalStateException(capitalizedValueProviderName +
+                " for segment " + segmentName + " of task " + taskId +
+                " is" + (newValueProvider == null ? " " : " not ") + "null although the " + valueProviderName +
+                " of another segment in this metrics recorder is" + (newValueProvider != null ? " " : " not ") + "null. " +
                 "This is a bug in Kafka Streams. " +
                 "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues");
         }
@@ -174,22 +224,163 @@ public class RocksDBMetricsRecorder {
         numberOfFileErrorsSensor = RocksDBMetrics.numberOfFileErrorsSensor(streamsMetrics, metricContext);
     }
 
-    private void initGauges(final StreamsMetricsImpl streamsMetrics, final RocksDBMetricContext metricContext) {
-        RocksDBMetrics.addNumEntriesActiveMemTableMetric(streamsMetrics, metricContext, (metricsConfig, now) -> {
+    private void initGauges(final StreamsMetricsImpl streamsMetrics,
+                            final RocksDBMetricContext metricContext) {
+        RocksDBMetrics.addNumImmutableMemTableMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(NUMBER_OF_IMMUTABLE_MEMTABLES)
+        );
+        RocksDBMetrics.addCurSizeActiveMemTable(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(CURRENT_SIZE_OF_ACTIVE_MEMTABLE)
+        );
+        RocksDBMetrics.addCurSizeAllMemTables(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(CURRENT_SIZE_OF_ALL_MEMTABLES)
+        );
+        RocksDBMetrics.addSizeAllMemTables(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(SIZE_OF_ALL_MEMTABLES)
+        );
+        RocksDBMetrics.addNumEntriesActiveMemTableMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE)
+        );
+        RocksDBMetrics.addNumDeletesActiveMemTableMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(NUMBER_OF_DELETES_ACTIVE_MEMTABLE)
+        );
+        RocksDBMetrics.addNumEntriesImmMemTablesMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(NUMBER_OF_ENTRIES_IMMUTABLE_MEMTABLES)
+        );
+        RocksDBMetrics.addNumDeletesImmMemTablesMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(NUMBER_OF_DELETES_IMMUTABLE_MEMTABLES)
+        );
+        RocksDBMetrics.addMemTableFlushPending(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(MEMTABLE_FLUSH_PENDING)
+        );
+        RocksDBMetrics.addNumRunningFlushesMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(NUMBER_OF_RUNNING_FLUSHES)
+        );
+        RocksDBMetrics.addCompactionPendingMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(COMPACTION_PENDING)
+        );
+        RocksDBMetrics.addNumRunningCompactionsMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(NUMBER_OF_RUNNING_COMPACTIONS)
+        );
+        RocksDBMetrics.addEstimatePendingCompactionBytesMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(ESTIMATED_BYTES_OF_PENDING_COMPACTION)
+        );
+        RocksDBMetrics.addTotalSstFilesSizeMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(TOTAL_SST_FILES_SIZE)
+        );
+        RocksDBMetrics.addLiveSstFilesSizeMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(LIVE_SST_FILES_SIZE)
+        );
+        RocksDBMetrics.addNumLiveVersionMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(NUMBER_OF_LIVE_VERSIONS)
+        );
+        RocksDBMetrics.addEstimateNumKeysMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(ESTIMATED_NUMBER_OF_KEYS)
+        );
+        RocksDBMetrics.addEstimateTableReadersMemMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(ESTIMATED_MEMORY_OF_TABLE_READERS)
+        );
+        RocksDBMetrics.addBackgroundErrorsMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(NUMBER_OF_BACKGROUND_ERRORS)
+        );
+        RocksDBMetrics.addBlockCacheCapacityMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeBlockCacheMetrics(CAPACITY_OF_BLOCK_CACHE)
+        );
+        RocksDBMetrics.addBlockCacheUsageMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeBlockCacheMetrics(USAGE_OF_BLOCK_CACHE)
+        );
+        RocksDBMetrics.addBlockCachePinnedUsageMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeBlockCacheMetrics(PINNED_USAGE_OF_BLOCK_CACHE)
+        );
+    }
+
+    private Gauge<BigInteger> gaugeToComputeSumOfProperties(final String propertyName) {
+        return (metricsConfig, now) -> {
             BigInteger result = BigInteger.valueOf(0);
             for (final DbAndCacheAndStatistics valueProvider : storeToValueProviders.values()) {
                 try {
                     // values of RocksDB properties are of type unsigned long in C++, i.e., in Java we need to use
                     // BigInteger and construct the object from the byte representation of the value
                     result = result.add(new BigInteger(1, longToBytes(
-                        valueProvider.db.getAggregatedLongProperty(ROCKSDB_PROPERTIES_PREFIX + NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE))));
+                        valueProvider.db.getAggregatedLongProperty(ROCKSDB_PROPERTIES_PREFIX + propertyName)
+                    )));
+                } catch (final RocksDBException e) {
+                    throw new ProcessorStateException("Error recording RocksDB metric " + propertyName, e);
+                }
+            }
+            return result;
+        };
+    }
 
+    private Gauge<BigInteger> gaugeToComputeBlockCacheMetrics(final String propertyName) {
+        return (metricsConfig, now) -> {
+            BigInteger result = BigInteger.valueOf(0);
+            for (final DbAndCacheAndStatistics valueProvider : storeToValueProviders.values()) {
+                try {
+                    if (singleCache) {
+                        // values of RocksDB properties are of type unsigned long in C++, i.e., in Java we need to use
+                        // BigInteger and construct the object from the byte representation of the value
+                        result = new BigInteger(1, longToBytes(
+                            valueProvider.db.getAggregatedLongProperty(ROCKSDB_PROPERTIES_PREFIX + propertyName)
+                        ));
+                        break;
+                    } else {
+                        // values of RocksDB properties are of type unsigned long in C++, i.e., in Java we need to use
+                        // BigInteger and construct the object from the byte representation of the value
+                        result = result.add(new BigInteger(1, longToBytes(
+                            valueProvider.db.getAggregatedLongProperty(ROCKSDB_PROPERTIES_PREFIX + propertyName)
+                        )));
+                    }
                 } catch (final RocksDBException e) {
-                    throw new ProcessorStateException("Error adding RocksDB metric " + NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE, e);
+                    throw new ProcessorStateException("Error recording RocksDB metric " + propertyName, e);
                 }
             }
             return result;
-        });
+        };
     }
 
     private static byte[] longToBytes(final long data) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
index 678a286..5917faa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
@@ -101,6 +101,27 @@ public class RocksDBMetricsIntegrationTest {
     private static final String NUMBER_OF_OPEN_FILES = "number-open-files";
     private static final String NUMBER_OF_FILE_ERRORS = "number-file-errors-total";
     private static final String NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE = "num-entries-active-mem-table";
+    private static final String NUMBER_OF_DELETES_ACTIVE_MEMTABLE = "num-deletes-active-mem-table";
+    private static final String NUMBER_OF_ENTRIES_IMMUTABLE_MEMTABLES = "num-entries-imm-mem-tables";
+    private static final String NUMBER_OF_DELETES_IMMUTABLE_MEMTABLES = "num-deletes-imm-mem-tables";
+    private static final String NUMBER_OF_IMMUTABLE_MEMTABLES = "num-immutable-mem-table";
+    private static final String CURRENT_SIZE_OF_ACTIVE_MEMTABLE = "cur-size-active-mem-table";
+    private static final String CURRENT_SIZE_OF_ALL_MEMTABLES = "cur-size-all-mem-tables";
+    private static final String SIZE_OF_ALL_MEMTABLES = "size-all-mem-tables";
+    private static final String MEMTABLE_FLUSH_PENDING = "mem-table-flush-pending";
+    private static final String NUMBER_OF_RUNNING_FLUSHES = "num-running-flushes";
+    private static final String COMPACTION_PENDING = "compaction-pending";
+    private static final String NUMBER_OF_RUNNING_COMPACTIONS = "num-running-compactions";
+    private static final String ESTIMATED_BYTES_OF_PENDING_COMPACTION = "estimate-pending-compaction-bytes";
+    private static final String TOTAL_SST_FILES_SIZE = "total-sst-files-size";
+    private static final String LIVE_SST_FILES_SIZE = "live-sst-files-size";
+    private static final String NUMBER_OF_LIVE_VERSIONS = "num-live-versions";
+    private static final String CAPACITY_OF_BLOCK_CACHE = "block-cache-capacity";
+    private static final String USAGE_OF_BLOCK_CACHE = "block-cache-usage";
+    private static final String PINNED_USAGE_OF_BLOCK_CACHE = "block-cache-pinned-usage";
+    private static final String ESTIMATED_NUMBER_OF_KEYS = "estimate-num-keys";
+    private static final String ESTIMATED_MEMORY_OF_TABLE_READERS = "estimate-table-readers-mem";
+    private static final String NUMBER_OF_BACKGROUND_ERRORS = "background-errors";
 
     @Parameters(name = "{0}")
     public static Collection<Object[]> data() {
@@ -164,6 +185,7 @@ public class RocksDBMetricsIntegrationTest {
         streamsConfiguration.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.DEBUG.name);
         streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingGuarantee);
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
+        streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         return streamsConfiguration;
     }
 
@@ -202,7 +224,7 @@ public class RocksDBMetricsIntegrationTest {
         kafkaStreams.close();
     }
 
-    private void produceRecords() throws Exception {
+    private void produceRecords() {
         final MockTime mockTime = new MockTime(WINDOW_SIZE.toMillis());
         final Properties prop = TestUtils.producerConfig(
             CLUSTER.bootstrapServers(),
@@ -257,6 +279,27 @@ public class RocksDBMetricsIntegrationTest {
         checkMetricByName(listMetricStore, NUMBER_OF_OPEN_FILES, 1);
         checkMetricByName(listMetricStore, NUMBER_OF_FILE_ERRORS, 1);
         checkMetricByName(listMetricStore, NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE, 1);
+        checkMetricByName(listMetricStore, NUMBER_OF_DELETES_ACTIVE_MEMTABLE, 1);
+        checkMetricByName(listMetricStore, NUMBER_OF_ENTRIES_IMMUTABLE_MEMTABLES, 1);
+        checkMetricByName(listMetricStore, NUMBER_OF_DELETES_IMMUTABLE_MEMTABLES, 1);
+        checkMetricByName(listMetricStore, NUMBER_OF_IMMUTABLE_MEMTABLES, 1);
+        checkMetricByName(listMetricStore, CURRENT_SIZE_OF_ACTIVE_MEMTABLE, 1);
+        checkMetricByName(listMetricStore, CURRENT_SIZE_OF_ALL_MEMTABLES, 1);
+        checkMetricByName(listMetricStore, SIZE_OF_ALL_MEMTABLES, 1);
+        checkMetricByName(listMetricStore, MEMTABLE_FLUSH_PENDING, 1);
+        checkMetricByName(listMetricStore, NUMBER_OF_RUNNING_FLUSHES, 1);
+        checkMetricByName(listMetricStore, COMPACTION_PENDING, 1);
+        checkMetricByName(listMetricStore, NUMBER_OF_RUNNING_COMPACTIONS, 1);
+        checkMetricByName(listMetricStore, ESTIMATED_BYTES_OF_PENDING_COMPACTION, 1);
+        checkMetricByName(listMetricStore, TOTAL_SST_FILES_SIZE, 1);
+        checkMetricByName(listMetricStore, LIVE_SST_FILES_SIZE, 1);
+        checkMetricByName(listMetricStore, NUMBER_OF_LIVE_VERSIONS, 1);
+        checkMetricByName(listMetricStore, CAPACITY_OF_BLOCK_CACHE, 1);
+        checkMetricByName(listMetricStore, USAGE_OF_BLOCK_CACHE, 1);
+        checkMetricByName(listMetricStore, PINNED_USAGE_OF_BLOCK_CACHE, 1);
+        checkMetricByName(listMetricStore, ESTIMATED_NUMBER_OF_KEYS, 1);
+        checkMetricByName(listMetricStore, ESTIMATED_MEMORY_OF_TABLE_READERS, 1);
+        checkMetricByName(listMetricStore, NUMBER_OF_BACKGROUND_ERRORS, 1);
     }
 
     private void checkMetricByName(final List<Metric> listMetric,
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
index 8126ba6..3473467 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
@@ -106,6 +106,11 @@ public class StreamsMetricsImplTest {
     private final static String STORE_ID_TAG = "-state-id";
     private final static String STORE_NAME1 = "store1";
     private final static String STORE_NAME2 = "store2";
+    private final static Map<String, String> STORE_LEVEL_TAG_MAP = mkMap(
+        mkEntry(THREAD_ID_TAG, Thread.currentThread().getName()),
+        mkEntry(TASK_ID_TAG, TASK_ID1),
+        mkEntry(SCOPE_NAME + STORE_ID_TAG, STORE_NAME1)
+    );
     private final static String RECORD_CACHE_ID_TAG = "record-cache-id";
     private final static String ENTITY_NAME = "test-entity";
     private final static String OPERATION_NAME = "test-operation";
@@ -125,11 +130,6 @@ public class StreamsMetricsImplTest {
     private final String group = "group";
     private final Map<String, String> tags = mkMap(mkEntry("tag", "value"));
     private final Map<String, String> clientLevelTags = mkMap(mkEntry(CLIENT_ID_TAG, CLIENT_ID));
-    private final Map<String, String> storeLevelTagMap = mkMap(
-        mkEntry(THREAD_ID_TAG, Thread.currentThread().getName()),
-        mkEntry(TASK_ID_TAG, TASK_ID1),
-        mkEntry(SCOPE_NAME + STORE_ID_TAG, STORE_NAME1)
-    );
     private final MetricName metricName1 =
         new MetricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, DESCRIPTION1, clientLevelTags);
     private final MetricName metricName2 =
@@ -427,9 +427,9 @@ public class StreamsMetricsImplTest {
     public void shouldAddNewStoreLevelMutableMetric() {
         final Metrics metrics = mock(Metrics.class);
         final MetricName metricName =
-            new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, storeLevelTagMap);
+            new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP);
         final MetricConfig metricConfig = new MetricConfig().recordLevel(INFO_RECORDING_LEVEL);
-        expect(metrics.metricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, storeLevelTagMap))
+        expect(metrics.metricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP))
             .andReturn(metricName);
         expect(metrics.metric(metricName)).andReturn(null);
         metrics.addMetric(eq(metricName), eqMetricConfig(metricConfig), eq(VALUE_PROVIDER));
@@ -453,8 +453,8 @@ public class StreamsMetricsImplTest {
     public void shouldNotAddStoreLevelMutableMetricIfAlreadyExists() {
         final Metrics metrics = mock(Metrics.class);
         final MetricName metricName =
-            new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, storeLevelTagMap);
-        expect(metrics.metricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, storeLevelTagMap))
+            new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP);
+        expect(metrics.metricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP))
             .andReturn(metricName);
         expect(metrics.metric(metricName)).andReturn(mock(KafkaMetric.class));
         replay(metrics);
@@ -478,12 +478,12 @@ public class StreamsMetricsImplTest {
         final Metrics metrics = niceMock(Metrics.class);
         final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
         final MetricName metricName1 =
-            new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, storeLevelTagMap);
+            new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP);
         final MetricName metricName2 =
-            new MetricName(METRIC_NAME2, STATE_STORE_LEVEL_GROUP, DESCRIPTION2, storeLevelTagMap);
-        expect(metrics.metricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, storeLevelTagMap))
+            new MetricName(METRIC_NAME2, STATE_STORE_LEVEL_GROUP, DESCRIPTION2, STORE_LEVEL_TAG_MAP);
+        expect(metrics.metricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP))
             .andReturn(metricName1);
-        expect(metrics.metricName(METRIC_NAME2, STATE_STORE_LEVEL_GROUP, DESCRIPTION2, storeLevelTagMap))
+        expect(metrics.metricName(METRIC_NAME2, STATE_STORE_LEVEL_GROUP, DESCRIPTION2, STORE_LEVEL_TAG_MAP))
             .andReturn(metricName2);
         final Capture<String> sensorKeys = addSensorsOnAllLevels(metrics, streamsMetrics);
         resetToDefault(metrics);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index ff408c8..ca28181 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -58,6 +58,7 @@ import java.io.File;
 import java.io.IOException;
 import java.math.BigInteger;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -628,6 +629,9 @@ public class RocksDBStoreTest {
         EasyMock.replay(context);
 
         rocksDBStore.init(context, rocksDBStore);
+        final byte[] key = "hello".getBytes();
+        final byte[] value = "world".getBytes();
+        rocksDBStore.put(Bytes.wrap(key), value);
 
         final Metric numberOfEntriesActiveMemTable = metrics.metric(new MetricName(
             "num-entries-active-mem-table",
@@ -635,16 +639,62 @@ public class RocksDBStoreTest {
             "description is not verified",
             streamsMetrics.storeLevelTagMap(taskId.toString(), METRICS_SCOPE, DB_NAME)
         ));
-
         assertThat(numberOfEntriesActiveMemTable, notNullValue());
-        assertThat(numberOfEntriesActiveMemTable.metricValue(), is(BigInteger.valueOf(0)));
+        assertThat((BigInteger) numberOfEntriesActiveMemTable.metricValue(), greaterThan(BigInteger.valueOf(0)));
+    }
 
-        final byte[] key = "hello".getBytes();
-        final byte[] value = "world".getBytes();
-        rocksDBStore.put(Bytes.wrap(key), value);
+    @Test
+    public void shouldVerifyThatPropertyBasedMetricsUseValidPropertyName() {
+        final TaskId taskId = new TaskId(0, 0);
 
-        assertThat(numberOfEntriesActiveMemTable, notNullValue());
-        assertThat((BigInteger) numberOfEntriesActiveMemTable.metricValue(), greaterThan(BigInteger.valueOf(0)));
+        final Metrics metrics = new Metrics(new MetricConfig().recordLevel(RecordingLevel.INFO));
+        final StreamsMetricsImpl streamsMetrics =
+            new StreamsMetricsImpl(metrics, "test-application", StreamsConfig.METRICS_LATEST, time);
+
+        final Properties props = StreamsTestUtils.getStreamsConfig();
+        context = EasyMock.niceMock(InternalMockProcessorContext.class);
+        EasyMock.expect(context.metrics()).andStubReturn(streamsMetrics);
+        EasyMock.expect(context.taskId()).andStubReturn(taskId);
+        EasyMock.expect(context.appConfigs()).andStubReturn(new StreamsConfig(props).originals());
+        EasyMock.expect(context.stateDir()).andStubReturn(dir);
+        EasyMock.replay(context);
+
+        rocksDBStore.init(context, rocksDBStore);
+
+        final List<String> propertyNames = Arrays.asList(
+            "num-entries-active-mem-table",
+            "num-deletes-active-mem-table",
+            "num-entries-imm-mem-tables",
+            "num-deletes-imm-mem-tables",
+            "num-immutable-mem-table",
+            "cur-size-active-mem-table",
+            "cur-size-all-mem-tables",
+            "size-all-mem-tables",
+            "mem-table-flush-pending",
+            "num-running-flushes",
+            "compaction-pending",
+            "num-running-compactions",
+            "estimate-pending-compaction-bytes",
+            "total-sst-files-size",
+            "live-sst-files-size",
+            "num-live-versions",
+            "block-cache-capacity",
+            "block-cache-usage",
+            "block-cache-pinned-usage",
+            "estimate-num-keys",
+            "estimate-table-readers-mem",
+            "background-errors"
+        );
+        for (final String propertyname : propertyNames) {
+            final Metric metric = metrics.metric(new MetricName(
+                propertyname,
+                StreamsMetricsImpl.STATE_STORE_LEVEL_GROUP,
+                "description is not verified",
+                streamsMetrics.storeLevelTagMap(taskId.toString(), METRICS_SCOPE, DB_NAME)
+            ));
+            assertThat("Metric " + propertyname + " not found!", metric, notNullValue());
+            metric.metricValue();
+        }
     }
 
     public static class MockRocksDbConfigSetter implements RocksDBConfigSetter {
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java
index 14ce729..2695b86 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java
@@ -38,7 +38,27 @@ import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetric
 import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.STORE_ID_TAG;
 import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TASK_ID_TAG;
 import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_ID_TAG;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.CAPACITY_OF_BLOCK_CACHE;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.COMPACTION_PENDING;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.CURRENT_SIZE_OF_ACTIVE_MEMTABLE;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.CURRENT_SIZE_OF_ALL_MEMTABLES;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.ESTIMATED_BYTES_OF_PENDING_COMPACTION;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.ESTIMATED_MEMORY_OF_TABLE_READERS;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.ESTIMATED_NUMBER_OF_KEYS;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.LIVE_SST_FILES_SIZE;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.MEMTABLE_FLUSH_PENDING;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.NUMBER_OF_DELETES_ACTIVE_MEMTABLE;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.NUMBER_OF_DELETES_IMMUTABLE_MEMTABLES;
 import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.NUMBER_OF_ENTRIES_IMMUTABLE_MEMTABLES;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.NUMBER_OF_IMMUTABLE_MEMTABLES;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.NUMBER_OF_RUNNING_COMPACTIONS;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.NUMBER_OF_RUNNING_FLUSHES;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.PINNED_USAGE_OF_BLOCK_CACHE;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.SIZE_OF_ALL_MEMTABLES;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.NUMBER_OF_BACKGROUND_ERRORS;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.TOTAL_SST_FILES_SIZE;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.USAGE_OF_BLOCK_CACHE;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.mock;
 import static org.hamcrest.CoreMatchers.is;
@@ -62,20 +82,172 @@ public class RocksDBMetricsRecorderGaugesTest {
     private final Statistics statisticsToAdd2 = mock(Statistics.class);
 
     @Test
+    public void shouldGetNumberOfImmutableMemTables() throws Exception {
+        runAndVerifySumOfProperties(NUMBER_OF_IMMUTABLE_MEMTABLES);
+    }
+
+    @Test
+    public void shouldGetCurrentSizeofActiveMemTable() throws Exception {
+        runAndVerifySumOfProperties(CURRENT_SIZE_OF_ACTIVE_MEMTABLE);
+    }
+
+    @Test
+    public void shouldGetCurrentSizeofAllMemTables() throws Exception {
+        runAndVerifySumOfProperties(CURRENT_SIZE_OF_ALL_MEMTABLES);
+    }
+
+    @Test
+    public void shouldGetSizeofAllMemTables() throws Exception {
+        runAndVerifySumOfProperties(SIZE_OF_ALL_MEMTABLES);
+    }
+
+    @Test
     public void shouldGetNumberOfEntriesActiveMemTable() throws Exception {
+        runAndVerifySumOfProperties(NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE);
+    }
+
+    @Test
+    public void shouldGetNumberOfDeletesActiveMemTable() throws Exception {
+        runAndVerifySumOfProperties(NUMBER_OF_DELETES_ACTIVE_MEMTABLE);
+    }
+
+    @Test
+    public void shouldGetNumberOfEntriesImmutableMemTables() throws Exception {
+        runAndVerifySumOfProperties(NUMBER_OF_ENTRIES_IMMUTABLE_MEMTABLES);
+    }
+
+    @Test
+    public void shouldGetNumberOfDeletesImmutableMemTables() throws Exception {
+        runAndVerifySumOfProperties(NUMBER_OF_DELETES_IMMUTABLE_MEMTABLES);
+    }
+
+    @Test
+    public void shouldGetMemTableFlushPending() throws Exception {
+        runAndVerifySumOfProperties(MEMTABLE_FLUSH_PENDING);
+    }
+
+    @Test
+    public void shouldGetNumberOfRunningFlushes() throws Exception {
+        runAndVerifySumOfProperties(NUMBER_OF_RUNNING_FLUSHES);
+    }
+
+    @Test
+    public void shouldGetCompactionPending() throws Exception {
+        runAndVerifySumOfProperties(COMPACTION_PENDING);
+    }
+
+    @Test
+    public void shouldGetNumberOfRunningCompactions() throws Exception {
+        runAndVerifySumOfProperties(NUMBER_OF_RUNNING_COMPACTIONS);
+    }
+
+    @Test
+    public void shouldGetEstimatedBytesOfPendingCompactions() throws Exception {
+        runAndVerifySumOfProperties(ESTIMATED_BYTES_OF_PENDING_COMPACTION);
+    }
+
+    @Test
+    public void shouldGetTotalSstFilesSize() throws Exception {
+        runAndVerifySumOfProperties(TOTAL_SST_FILES_SIZE);
+    }
+
+    @Test
+    public void shouldGetLiveSstFilesSize() throws Exception {
+        runAndVerifySumOfProperties(LIVE_SST_FILES_SIZE);
+    }
+
+    @Test
+    public void shouldGetEstimatedNumberOfKeys() throws Exception {
+        runAndVerifySumOfProperties(ESTIMATED_NUMBER_OF_KEYS);
+    }
+
+    @Test
+    public void shouldGetEstimatedMemoryOfTableReaders() throws Exception {
+        runAndVerifySumOfProperties(ESTIMATED_MEMORY_OF_TABLE_READERS);
+    }
+
+    @Test
+    public void shouldGetNumberOfBackgroundErrors() throws Exception {
+        runAndVerifySumOfProperties(NUMBER_OF_BACKGROUND_ERRORS);
+    }
+
+    @Test
+    public void shouldGetCapacityOfBlockCacheWithMultipleCaches() throws Exception {
+        runAndVerifyBlockCacheMetricsWithMultipleCaches(CAPACITY_OF_BLOCK_CACHE);
+    }
+
+    @Test
+    public void shouldGetCapacityOfBlockCacheWithSingleCache() throws Exception {
+        runAndVerifyBlockCacheMetricsWithSingleCache(CAPACITY_OF_BLOCK_CACHE);
+    }
+
+    @Test
+    public void shouldGetUsageOfBlockCacheWithMultipleCaches() throws Exception {
+        runAndVerifyBlockCacheMetricsWithMultipleCaches(USAGE_OF_BLOCK_CACHE);
+    }
+
+    @Test
+    public void shouldGetUsageOfBlockCacheWithSingleCache() throws Exception {
+        runAndVerifyBlockCacheMetricsWithSingleCache(USAGE_OF_BLOCK_CACHE);
+    }
+
+    @Test
+    public void shouldGetPinnedUsageOfBlockCacheWithMultipleCaches() throws Exception {
+        runAndVerifyBlockCacheMetricsWithMultipleCaches(PINNED_USAGE_OF_BLOCK_CACHE);
+    }
+
+    @Test
+    public void shouldGetPinnedUsageOfBlockCacheWithSingleCache() throws Exception {
+        runAndVerifyBlockCacheMetricsWithSingleCache(PINNED_USAGE_OF_BLOCK_CACHE);
+    }
+
+    private void runAndVerifySumOfProperties(final String propertyName) throws Exception {
         final StreamsMetricsImpl streamsMetrics =
-                new StreamsMetricsImpl(new Metrics(), "test-client", StreamsConfig.METRICS_LATEST, new MockTime());
+            new StreamsMetricsImpl(new Metrics(), "test-client", StreamsConfig.METRICS_LATEST, new MockTime());
         final RocksDBMetricsRecorder recorder = new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME);
-        expect(dbToAdd1.getAggregatedLongProperty(ROCKSDB_PROPERTIES_PREFIX + NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE))
-                .andStubReturn(5L);
-        expect(dbToAdd2.getAggregatedLongProperty(ROCKSDB_PROPERTIES_PREFIX + NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE))
-                .andStubReturn(3L);
-        replay(dbToAdd1, dbToAdd2);
 
         recorder.init(streamsMetrics, TASK_ID);
         recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1);
         recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, cacheToAdd2, statisticsToAdd2);
 
+        final long recordedValue1 = 5L;
+        final long recordedValue2 = 3L;
+        expect(dbToAdd1.getAggregatedLongProperty(ROCKSDB_PROPERTIES_PREFIX + propertyName))
+            .andStubReturn(recordedValue1);
+        expect(dbToAdd2.getAggregatedLongProperty(ROCKSDB_PROPERTIES_PREFIX + propertyName))
+            .andStubReturn(recordedValue2);
+        replay(dbToAdd1, dbToAdd2);
+
+        verifyMetrics(streamsMetrics, propertyName, recordedValue1 + recordedValue2);
+    }
+
+    private void runAndVerifyBlockCacheMetricsWithMultipleCaches(final String propertyName) throws Exception {
+        runAndVerifySumOfProperties(propertyName);
+    }
+
+    private void runAndVerifyBlockCacheMetricsWithSingleCache(final String propertyName) throws Exception {
+        final StreamsMetricsImpl streamsMetrics =
+            new StreamsMetricsImpl(new Metrics(), "test-client", StreamsConfig.METRICS_LATEST, new MockTime());
+        final RocksDBMetricsRecorder recorder = new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME);
+
+        recorder.init(streamsMetrics, TASK_ID);
+        recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1);
+        recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, cacheToAdd1, statisticsToAdd2);
+
+        final long recordedValue = 5L;
+        expect(dbToAdd1.getAggregatedLongProperty(ROCKSDB_PROPERTIES_PREFIX + propertyName))
+            .andStubReturn(recordedValue);
+        expect(dbToAdd2.getAggregatedLongProperty(ROCKSDB_PROPERTIES_PREFIX + propertyName))
+            .andStubReturn(recordedValue);
+        replay(dbToAdd1, dbToAdd2);
+
+        verifyMetrics(streamsMetrics, propertyName, recordedValue);
+    }
+
+    private void verifyMetrics(final StreamsMetricsImpl streamsMetrics,
+                               final String propertyName,
+                               final long expectedValue) {
+
         final Map<MetricName, ? extends Metric> metrics = streamsMetrics.metrics();
         final Map<String, String> tagMap = mkMap(
             mkEntry(THREAD_ID_TAG, Thread.currentThread().getName()),
@@ -83,13 +255,13 @@ public class RocksDBMetricsRecorderGaugesTest {
             mkEntry(METRICS_SCOPE + "-" + STORE_ID_TAG, STORE_NAME)
         );
         final KafkaMetric metric = (KafkaMetric) metrics.get(new MetricName(
-            NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE,
+            propertyName,
             STATE_STORE_LEVEL_GROUP,
             "description is ignored",
             tagMap
         ));
 
         assertThat(metric, notNullValue());
-        assertThat(metric.metricValue(), is(BigInteger.valueOf(8)));
+        assertThat(metric.metricValue(), is(BigInteger.valueOf(expectedValue)));
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java
index 653cf97..dc08f84 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java
@@ -59,13 +59,16 @@ public class RocksDBMetricsRecorderTest {
     private final static String STORE_NAME = "store-name";
     private final static String SEGMENT_STORE_NAME_1 = "segment-store-name-1";
     private final static String SEGMENT_STORE_NAME_2 = "segment-store-name-2";
+    private final static String SEGMENT_STORE_NAME_3 = "segment-store-name-3";
 
     private final RocksDB dbToAdd1 = mock(RocksDB.class);
     private final RocksDB dbToAdd2 = mock(RocksDB.class);
+    private final RocksDB dbToAdd3 = mock(RocksDB.class);
     private final Cache cacheToAdd1 = mock(Cache.class);
     private final Cache cacheToAdd2 = mock(Cache.class);
     private final Statistics statisticsToAdd1 = mock(Statistics.class);
     private final Statistics statisticsToAdd2 = mock(Statistics.class);
+    private final Statistics statisticsToAdd3 = mock(Statistics.class);
 
     private final Sensor bytesWrittenToDatabaseSensor = createMock(Sensor.class);
     private final Sensor bytesReadFromDatabaseSensor = createMock(Sensor.class);
@@ -157,43 +160,127 @@ public class RocksDBMetricsRecorderTest {
         );
         assertThat(
             exception.getMessage(),
-            is("Value providers for store \"" + SEGMENT_STORE_NAME_1 + "\" of task " + TASK_ID1 +
-                    " has been already added. This is a bug in Kafka Streams. " +
-                    "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues")
+            is("Value providers for store " + SEGMENT_STORE_NAME_1 + " of task " + TASK_ID1 +
+                " has been already added. This is a bug in Kafka Streams. " +
+                "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues")
         );
     }
 
     @Test
-    public void shouldThrowIfStatisticsForOnlyOneSegmentOutOfMultipleIsNull() {
+    public void shouldThrowIfStatisticsToAddIsNotNullButExsitingStatisticsAreNull() {
+        recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, null);
+
+        final Throwable exception = assertThrows(
+            IllegalStateException.class,
+            () -> recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, cacheToAdd2, statisticsToAdd2)
+        );
+        assertThat(
+            exception.getMessage(),
+            is("Statistics for segment " + SEGMENT_STORE_NAME_2 + " of task " + TASK_ID1 +
+                " is not null although the statistics of another segment in this metrics recorder is null. " +
+                "This is a bug in Kafka Streams. " +
+                "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues")
+        );
+    }
+
+    @Test
+    public void shouldThrowIfStatisticsToAddIsNullButExsitingStatisticsAreNotNull() {
         recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1);
 
         final Throwable exception = assertThrows(
             IllegalStateException.class,
-            () -> recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd1, cacheToAdd1, null)
+            () -> recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, cacheToAdd2, null)
         );
         assertThat(
             exception.getMessage(),
-            is("Statistics for store \"" + SEGMENT_STORE_NAME_2 + "\" of task " + TASK_ID1 +
-                    " is null although the statistics of another store in this metrics recorder is not null. " +
-                    "This is a bug in Kafka Streams. " +
-                    "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues")
+            is("Statistics for segment " + SEGMENT_STORE_NAME_2 + " of task " + TASK_ID1 +
+                " is null although the statistics of another segment in this metrics recorder is not null. " +
+                "This is a bug in Kafka Streams. " +
+                "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues")
         );
     }
 
     @Test
-    public void shouldThrowIfStatisticsForOnlyOneSegmentOutOfMultipleIsNotNull() {
-        recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, null);
+    public void shouldThrowIfCacheToAddIsNullButExsitingCacheIsNotNull() {
+        recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, null, statisticsToAdd1);
+
+        final Throwable exception = assertThrows(
+            IllegalStateException.class,
+            () -> recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, cacheToAdd1, statisticsToAdd1)
+        );
+        assertThat(
+            exception.getMessage(),
+            is("Cache for segment " + SEGMENT_STORE_NAME_2 + " of task " + TASK_ID1 +
+                " is not null although the cache of another segment in this metrics recorder is null. " +
+                "This is a bug in Kafka Streams. " +
+                "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues")
+        );
+    }
+
+    @Test
+    public void shouldThrowIfCacheToAddIsNotNullButExistingCacheIsNull() {
+        recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1);
+
+        final Throwable exception = assertThrows(
+            IllegalStateException.class,
+            () -> recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, null, statisticsToAdd2)
+        );
+        assertThat(
+            exception.getMessage(),
+            is("Cache for segment " + SEGMENT_STORE_NAME_2 + " of task " + TASK_ID1 +
+                " is null although the cache of another segment in this metrics recorder is not null. " +
+                "This is a bug in Kafka Streams. " +
+                "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues")
+        );
+    }
+
+    @Test
+    public void shouldThrowIfCacheToAddIsNotSameAsAllExistingCaches() {
+        recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1);
+        recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, cacheToAdd1, statisticsToAdd2);
+
+        final Throwable exception = assertThrows(
+            IllegalStateException.class,
+            () -> recorder.addValueProviders(SEGMENT_STORE_NAME_3, dbToAdd3, cacheToAdd2, statisticsToAdd3)
+        );
+        assertThat(
+            exception.getMessage(),
+            is("Caches for store " + STORE_NAME + " of task " + TASK_ID1 +
+                " are either not all distinct or do not all refer to the same cache. This is a bug in Kafka Streams. " +
+                "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues")
+        );
+    }
+
+    @Test
+    public void shouldThrowIfCacheToAddIsSameAsOnlyOneOfMultipleCaches() {
+        recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1);
+        recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, cacheToAdd2, statisticsToAdd2);
+
+        final Throwable exception = assertThrows(
+            IllegalStateException.class,
+            () -> recorder.addValueProviders(SEGMENT_STORE_NAME_3, dbToAdd3, cacheToAdd1, statisticsToAdd3)
+        );
+        assertThat(
+            exception.getMessage(),
+            is("Caches for store " + STORE_NAME + " of task " + TASK_ID1 +
+                " are either not all distinct or do not all refer to the same cache. This is a bug in Kafka Streams. " +
+                "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues")
+        );
+    }
+
+    @Test
+    public void shouldThrowIfDbToAddWasAlreadyAddedForOtherSegment() {
+        recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1);
 
         final Throwable exception = assertThrows(
             IllegalStateException.class,
-            () -> recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd1, cacheToAdd1, statisticsToAdd1)
+            () -> recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd1, cacheToAdd2, statisticsToAdd2)
         );
         assertThat(
             exception.getMessage(),
-            is("Statistics for store \"" + SEGMENT_STORE_NAME_2 + "\" of task " + TASK_ID1 +
-                    " is not null although the statistics of another store in this metrics recorder is null. " +
-                    "This is a bug in Kafka Streams. " +
-                    "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues")
+            is("DB instance for store " + SEGMENT_STORE_NAME_2 + " of task " + TASK_ID1 +
+                " was already added for another segment as a value provider. This is a bug in Kafka Streams. " +
+                "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues")
         );
     }
 
@@ -471,7 +558,28 @@ public class RocksDBMetricsRecorderTest {
             .andReturn(numberOfOpenFilesSensor);
         expect(RocksDBMetrics.numberOfFileErrorsSensor(eq(streamsMetrics), eq(metricsContext)))
             .andReturn(numberOfFileErrorsSensor);
+        RocksDBMetrics.addNumImmutableMemTableMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+        RocksDBMetrics.addCurSizeActiveMemTable(eq(streamsMetrics), eq(metricsContext), anyObject());
+        RocksDBMetrics.addCurSizeAllMemTables(eq(streamsMetrics), eq(metricsContext), anyObject());
+        RocksDBMetrics.addSizeAllMemTables(eq(streamsMetrics), eq(metricsContext), anyObject());
         RocksDBMetrics.addNumEntriesActiveMemTableMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+        RocksDBMetrics.addNumEntriesImmMemTablesMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+        RocksDBMetrics.addNumDeletesActiveMemTableMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+        RocksDBMetrics.addNumDeletesImmMemTablesMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+        RocksDBMetrics.addMemTableFlushPending(eq(streamsMetrics), eq(metricsContext), anyObject());
+        RocksDBMetrics.addNumRunningFlushesMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+        RocksDBMetrics.addCompactionPendingMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+        RocksDBMetrics.addNumRunningCompactionsMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+        RocksDBMetrics.addEstimatePendingCompactionBytesMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+        RocksDBMetrics.addTotalSstFilesSizeMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+        RocksDBMetrics.addLiveSstFilesSizeMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+        RocksDBMetrics.addNumLiveVersionMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+        RocksDBMetrics.addBlockCacheCapacityMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+        RocksDBMetrics.addBlockCacheUsageMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+        RocksDBMetrics.addBlockCachePinnedUsageMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+        RocksDBMetrics.addEstimateNumKeysMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+        RocksDBMetrics.addEstimateTableReadersMemMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+        RocksDBMetrics.addBackgroundErrorsMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
         replay(RocksDBMetrics.class);
     }
 
@@ -503,7 +611,28 @@ public class RocksDBMetricsRecorderTest {
             .andStubReturn(numberOfOpenFilesSensor);
         expect(RocksDBMetrics.numberOfFileErrorsSensor(streamsMetrics, metricsContext))
             .andStubReturn(numberOfFileErrorsSensor);
+        RocksDBMetrics.addNumImmutableMemTableMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+        RocksDBMetrics.addCurSizeActiveMemTable(eq(streamsMetrics), eq(metricsContext), anyObject());
+        RocksDBMetrics.addCurSizeAllMemTables(eq(streamsMetrics), eq(metricsContext), anyObject());
+        RocksDBMetrics.addSizeAllMemTables(eq(streamsMetrics), eq(metricsContext), anyObject());
         RocksDBMetrics.addNumEntriesActiveMemTableMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+        RocksDBMetrics.addNumEntriesImmMemTablesMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+        RocksDBMetrics.addNumDeletesActiveMemTableMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+        RocksDBMetrics.addNumDeletesImmMemTablesMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+        RocksDBMetrics.addMemTableFlushPending(eq(streamsMetrics), eq(metricsContext), anyObject());
+        RocksDBMetrics.addNumRunningFlushesMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+        RocksDBMetrics.addCompactionPendingMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+        RocksDBMetrics.addNumRunningCompactionsMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+        RocksDBMetrics.addEstimatePendingCompactionBytesMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+        RocksDBMetrics.addTotalSstFilesSizeMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+        RocksDBMetrics.addLiveSstFilesSizeMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+        RocksDBMetrics.addNumLiveVersionMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+        RocksDBMetrics.addBlockCacheCapacityMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+        RocksDBMetrics.addBlockCacheUsageMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+        RocksDBMetrics.addBlockCachePinnedUsageMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+        RocksDBMetrics.addEstimateNumKeysMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+        RocksDBMetrics.addEstimateTableReadersMemMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+        RocksDBMetrics.addBackgroundErrorsMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
         replay(RocksDBMetrics.class);
     }
 }
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsTest.java
index f6dc503..0d4ae6f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsTest.java
@@ -52,6 +52,7 @@ public class RocksDBMetricsTest {
     private static final String STORE_NAME = "store";
     private static final RocksDBMetricContext ROCKSDB_METRIC_CONTEXT =
         new RocksDBMetricContext(TASK_ID, STORE_TYPE, STORE_NAME);
+    private static final Gauge<BigInteger> VALUE_PROVIDER = (config, now) -> BigInteger.valueOf(10);
 
     private final Metrics metrics = new Metrics();
     private final Sensor sensor = metrics.sensor("dummy");
@@ -221,22 +222,263 @@ public class RocksDBMetricsTest {
     }
 
     @Test
-    public void shouldAddNumImmutableMemTableMetric() {
+    public void shouldAddNumEntriesActiveMemTableMetric() {
         final String name = "num-entries-active-mem-table";
-        final String description = "Current total number of entries in the active memtable";
-        final Gauge<BigInteger> valueProvider = (config, now) -> BigInteger.valueOf(10);
+        final String description = "Total number of entries in the active memtable";
+        runAndVerifyMutableMetric(
+            name,
+            description,
+            () -> RocksDBMetrics.addNumEntriesActiveMemTableMetric(streamsMetrics, ROCKSDB_METRIC_CONTEXT, VALUE_PROVIDER)
+        );
+    }
+
+    @Test
+    public void shouldAddNumberDeletesActiveTableMetric() {
+        final String name = "num-deletes-active-mem-table";
+        final String description = "Total number of delete entries in the active memtable";
+        runAndVerifyMutableMetric(
+            name,
+            description,
+            () -> RocksDBMetrics.addNumDeletesActiveMemTableMetric(streamsMetrics, ROCKSDB_METRIC_CONTEXT, VALUE_PROVIDER)
+        );
+    }
+
+    @Test
+    public void shouldAddNumEntriesImmutableMemTablesMetric() {
+        final String name = "num-entries-imm-mem-tables";
+        final String description = "Total number of entries in the unflushed immutable memtables";
+        runAndVerifyMutableMetric(
+            name,
+            description,
+            () -> RocksDBMetrics.addNumEntriesImmMemTablesMetric(streamsMetrics, ROCKSDB_METRIC_CONTEXT, VALUE_PROVIDER)
+        );
+    }
+
+    @Test
+    public void shouldAddNumDeletesImmutableMemTablesMetric() {
+        final String name = "num-deletes-imm-mem-tables";
+        final String description = "Total number of delete entries in the unflushed immutable memtables";
+        runAndVerifyMutableMetric(
+            name,
+            description,
+            () -> RocksDBMetrics.addNumDeletesImmMemTablesMetric(streamsMetrics, ROCKSDB_METRIC_CONTEXT, VALUE_PROVIDER)
+        );
+    }
+
+    @Test
+    public void shouldAddNumImmutableMemTablesMetric() {
+        final String name = "num-immutable-mem-table";
+        final String description = "Number of immutable memtables that have not yet been flushed";
+        runAndVerifyMutableMetric(
+            name,
+            description,
+            () -> RocksDBMetrics.addNumImmutableMemTableMetric(streamsMetrics, ROCKSDB_METRIC_CONTEXT, VALUE_PROVIDER)
+        );
+    }
+
+    @Test
+    public void shouldAddCurSizeActiveMemTableMetric() {
+        final String name = "cur-size-active-mem-table";
+        final String description = "Approximate size of active memtable in bytes";
+        runAndVerifyMutableMetric(
+            name,
+            description,
+            () -> RocksDBMetrics.addCurSizeActiveMemTable(streamsMetrics, ROCKSDB_METRIC_CONTEXT, VALUE_PROVIDER)
+        );
+    }
+
+    @Test
+    public void shouldAddCurSizeAllMemTablesMetric() {
+        final String name = "cur-size-all-mem-tables";
+        final String description = "Approximate size of active and unflushed immutable memtable in bytes";
+        runAndVerifyMutableMetric(
+            name,
+            description,
+            () -> RocksDBMetrics.addCurSizeAllMemTables(streamsMetrics, ROCKSDB_METRIC_CONTEXT, VALUE_PROVIDER)
+        );
+    }
+
+    @Test
+    public void shouldAddSizeAllMemTablesMetric() {
+        final String name = "size-all-mem-tables";
+        final String description = "Approximate size of active, unflushed immutable, and pinned immutable memtables in bytes";
+        runAndVerifyMutableMetric(
+            name,
+            description,
+            () -> RocksDBMetrics.addSizeAllMemTables(streamsMetrics, ROCKSDB_METRIC_CONTEXT, VALUE_PROVIDER)
+        );
+    }
+
+    @Test
+    public void shouldAddMemTableFlushPendingMetric() {
+        final String name = "mem-table-flush-pending";
+        final String description = "Reports 1 if a memtable flush is pending, otherwise it reports 0";
+        runAndVerifyMutableMetric(
+            name,
+            description,
+            () -> RocksDBMetrics.addMemTableFlushPending(streamsMetrics, ROCKSDB_METRIC_CONTEXT, VALUE_PROVIDER)
+        );
+    }
+
+    @Test
+    public void shouldAddNumRunningFlushesMetric() {
+        final String name = "num-running-flushes";
+        final String description = "Number of currently running flushes";
+        runAndVerifyMutableMetric(
+            name,
+            description,
+            () -> RocksDBMetrics.addNumRunningFlushesMetric(streamsMetrics, ROCKSDB_METRIC_CONTEXT, VALUE_PROVIDER)
+        );
+    }
+
+    @Test
+    public void shouldAddCompactionPendingMetric() {
+        final String name = "compaction-pending";
+        final String description = "Reports 1 if at least one compaction is pending, otherwise it reports 0";
+        runAndVerifyMutableMetric(
+            name,
+            description,
+            () -> RocksDBMetrics.addCompactionPendingMetric(streamsMetrics, ROCKSDB_METRIC_CONTEXT, VALUE_PROVIDER)
+        );
+    }
+
+    @Test
+    public void shouldAddNumRunningCompactionsMetric() {
+        final String name = "num-running-compactions";
+        final String description = "Number of currently running compactions";
+        runAndVerifyMutableMetric(
+            name,
+            description,
+            () -> RocksDBMetrics.addNumRunningCompactionsMetric(streamsMetrics, ROCKSDB_METRIC_CONTEXT, VALUE_PROVIDER)
+        );
+    }
+
+    @Test
+    public void shouldAddEstimatePendingCompactionBytesMetric() {
+        final String name = "estimate-pending-compaction-bytes";
+        final String description =
+            "Estimated total number of bytes a compaction needs to rewrite on disk to get all levels down to under target size";
+        runAndVerifyMutableMetric(
+            name,
+            description,
+            () -> RocksDBMetrics.addEstimatePendingCompactionBytesMetric(streamsMetrics, ROCKSDB_METRIC_CONTEXT, VALUE_PROVIDER)
+        );
+    }
+
+    @Test
+    public void shouldAddTotalSstFilesSizeMetric() {
+        final String name = "total-sst-files-size";
+        final String description = "Total size in bytes of all SST files";
+        runAndVerifyMutableMetric(
+            name,
+            description,
+            () -> RocksDBMetrics.addTotalSstFilesSizeMetric(streamsMetrics, ROCKSDB_METRIC_CONTEXT, VALUE_PROVIDER)
+        );
+    }
+
+    @Test
+    public void shouldAddLiveSstFilesSizeMetric() {
+        final String name = "live-sst-files-size";
+        final String description = "Total size in bytes of all SST files that belong to the latest LSM tree";
+        runAndVerifyMutableMetric(
+            name,
+            description,
+            () -> RocksDBMetrics.addLiveSstFilesSizeMetric(streamsMetrics, ROCKSDB_METRIC_CONTEXT, VALUE_PROVIDER)
+        );
+    }
+
+    @Test
+    public void shouldAddNumLiveVersionMetric() {
+        final String name = "num-live-versions";
+        final String description = "Number of live versions";
+        runAndVerifyMutableMetric(
+            name,
+            description,
+            () -> RocksDBMetrics.addNumLiveVersionMetric(streamsMetrics, ROCKSDB_METRIC_CONTEXT, VALUE_PROVIDER)
+        );
+    }
+
+    @Test
+    public void shouldAddBlockCacheCapacityMetric() {
+        final String name = "block-cache-capacity";
+        final String description = "Capacity of the block cache in bytes";
+        runAndVerifyMutableMetric(
+            name,
+            description,
+            () -> RocksDBMetrics.addBlockCacheCapacityMetric(streamsMetrics, ROCKSDB_METRIC_CONTEXT, VALUE_PROVIDER)
+        );
+    }
+
+    @Test
+    public void shouldAddBlockCacheUsageMetric() {
+        final String name = "block-cache-usage";
+        final String description = "Memory size of the entries residing in block cache in bytes";
+        runAndVerifyMutableMetric(
+            name,
+            description,
+            () -> RocksDBMetrics.addBlockCacheUsageMetric(streamsMetrics, ROCKSDB_METRIC_CONTEXT, VALUE_PROVIDER)
+        );
+    }
+
+    @Test
+    public void shouldAddBlockCachePinnedUsageMetric() {
+        final String name = "block-cache-pinned-usage";
+        final String description = "Memory size for the entries being pinned in the block cache in bytes";
+        runAndVerifyMutableMetric(
+            name,
+            description,
+            () -> RocksDBMetrics.addBlockCachePinnedUsageMetric(streamsMetrics, ROCKSDB_METRIC_CONTEXT, VALUE_PROVIDER)
+        );
+    }
+
+    @Test
+    public void shouldAddEstimateNumKeysMetric() {
+        final String name = "estimate-num-keys";
+        final String description =
+            "Estimated number of total keys in the active and unflushed immutable memtables and storage";
+        runAndVerifyMutableMetric(
+            name,
+            description,
+            () -> RocksDBMetrics.addEstimateNumKeysMetric(streamsMetrics, ROCKSDB_METRIC_CONTEXT, VALUE_PROVIDER)
+        );
+    }
+
+    @Test
+    public void shouldAddEstimateTableReadersMemMetric() {
+        final String name = "estimate-table-readers-mem";
+        final String description =
+            "Estimated memory in bytes used for reading SST tables, excluding memory used in block cache";
+        runAndVerifyMutableMetric(
+            name,
+            description,
+            () -> RocksDBMetrics.addEstimateTableReadersMemMetric(streamsMetrics, ROCKSDB_METRIC_CONTEXT, VALUE_PROVIDER)
+        );
+    }
+
+    @Test
+    public void shouldAddBackgroundErrorsMetric() {
+        final String name = "background-errors";
+        final String description = "Total number of background errors";
+        runAndVerifyMutableMetric(
+            name,
+            description,
+            () -> RocksDBMetrics.addBackgroundErrorsMetric(streamsMetrics, ROCKSDB_METRIC_CONTEXT, VALUE_PROVIDER)
+        );
+    }
+
+    private void runAndVerifyMutableMetric(final String name, final String description, final Runnable metricAdder) {
         streamsMetrics.addStoreLevelMutableMetric(
             eq(TASK_ID),
-                eq(STORE_TYPE),
-                eq(STORE_NAME),
-                eq(name),
-                eq(description),
-                eq(RecordingLevel.INFO),
-                eq(valueProvider)
+            eq(STORE_TYPE),
+            eq(STORE_NAME),
+            eq(name),
+            eq(description),
+            eq(RecordingLevel.INFO),
+            eq(VALUE_PROVIDER)
         );
         replay(streamsMetrics);
 
-        RocksDBMetrics.addNumEntriesActiveMemTableMetric(streamsMetrics, ROCKSDB_METRIC_CONTEXT, valueProvider);
+        metricAdder.run();
 
         verify(streamsMetrics);
     }