You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ta...@apache.org on 2022/07/14 02:23:17 UTC

[flink] branch master updated: [FLINK-24786][state] Introduce and expose RocksDB statistics as metrics

This is an automated email from the ASF dual-hosted git repository.

tangyun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 46e0014d3fe [FLINK-24786][state] Introduce and expose RocksDB statistics as metrics
46e0014d3fe is described below

commit 46e0014d3fe9be9af5f1fd75dc03dcc967a7fb4a
Author: Yun Tang <my...@live.com>
AuthorDate: Wed May 11 23:30:37 2022 +0800

    [FLINK-24786][state] Introduce and expose RocksDB statistics as metrics
---
 docs/content.zh/docs/deployment/config.md          |   4 +-
 docs/content/docs/deployment/config.md             |   4 +-
 .../rocksdb_native_metric_configuration.html       |  50 +++++++-
 .../state/EmbeddedRocksDBStateBackend.java         |  21 ++--
 .../state/RocksDBKeyedStateBackendBuilder.java     |   4 +-
 .../state/RocksDBNativeMetricMonitor.java          | 106 ++++++++++++----
 .../state/RocksDBNativeMetricOptions.java          | 140 +++++++++++++++++++--
 .../streaming/state/RocksDBResourceContainer.java  |  23 +++-
 .../streaming/state/restore/RocksDBHandle.java     |   3 +-
 .../state/RocksDBNativeMetricMonitorTest.java      | 101 +++++++++++----
 .../contrib/streaming/state/RocksDBResource.java   |  25 +++-
 .../state/RocksDBStateBackendConfigTest.java       |  11 +-
 12 files changed, 417 insertions(+), 75 deletions(-)

diff --git a/docs/content.zh/docs/deployment/config.md b/docs/content.zh/docs/deployment/config.md
index 912ac8bb1d8..4c6a29da631 100644
--- a/docs/content.zh/docs/deployment/config.md
+++ b/docs/content.zh/docs/deployment/config.md
@@ -264,7 +264,9 @@ Please refer to the [metrics system documentation]({{< ref "docs/ops/metrics" >}
 ### RocksDB Native Metrics
 
 Flink can report metrics from RocksDB's native code, for applications using the RocksDB state backend.
-The metrics here are scoped to the operators and then further broken down by column family; values are reported as unsigned longs. 
+The metrics here are scoped to the operators with unsigned longs and have two kinds of types:
+1. RocksDB property-based metrics, which is broken down by column family, e.g. number of currently running compactions of one specific column family.
+2. RocksDB statistics-based metrics, which holds at the database level, e.g. total block cache hit count within the DB.
 
 {{< hint warning >}}
 Enabling RocksDB's native metrics may cause degraded performance and should be set carefully. 
diff --git a/docs/content/docs/deployment/config.md b/docs/content/docs/deployment/config.md
index a38e72d0e18..5fc8cb1ac29 100644
--- a/docs/content/docs/deployment/config.md
+++ b/docs/content/docs/deployment/config.md
@@ -266,7 +266,9 @@ Please refer to the [metrics system documentation]({{< ref "docs/ops/metrics" >}
 ### RocksDB Native Metrics
 
 Flink can report metrics from RocksDB's native code, for applications using the RocksDB state backend.
-The metrics here are scoped to the operators and then further broken down by column family; values are reported as unsigned longs. 
+The metrics here are scoped to the operators with unsigned longs and have two kinds of types:
+1. RocksDB property-based metrics, which is broken down by column family, e.g. number of currently running compactions of one specific column family.
+2. RocksDB statistics-based metrics, which holds at the database level, e.g. total block cache hit count within the DB.
 
 {{< hint warning >}}
 Enabling RocksDB's native metrics may cause degraded performance and should be set carefully. 
diff --git a/docs/layouts/shortcodes/generated/rocksdb_native_metric_configuration.html b/docs/layouts/shortcodes/generated/rocksdb_native_metric_configuration.html
index a7922f2547a..2edd8387d08 100644
--- a/docs/layouts/shortcodes/generated/rocksdb_native_metric_configuration.html
+++ b/docs/layouts/shortcodes/generated/rocksdb_native_metric_configuration.html
@@ -26,6 +26,18 @@
             <td>Boolean</td>
             <td>Monitor block cache capacity.</td>
         </tr>
+        <tr>
+            <td><h5>state.backend.rocksdb.metrics.block-cache-hit</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Monitor the total count of block cache hit in RocksDB (BLOCK_CACHE_HIT == BLOCK_CACHE_INDEX_HIT + BLOCK_CACHE_FILTER_HIT + BLOCK_CACHE_DATA_HIT).</td>
+        </tr>
+        <tr>
+            <td><h5>state.backend.rocksdb.metrics.block-cache-miss</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Monitor the total count of block cache misses in RocksDB (BLOCK_CACHE_MISS == BLOCK_CACHE_INDEX_MISS + BLOCK_CACHE_FILTER_MISS + BLOCK_CACHE_DATA_MISS).</td>
+        </tr>
         <tr>
             <td><h5>state.backend.rocksdb.metrics.block-cache-pinned-usage</h5></td>
             <td style="word-wrap: break-word;">false</td>
@@ -38,11 +50,23 @@
             <td>Boolean</td>
             <td>Monitor the memory size for the entries residing in block cache.</td>
         </tr>
+        <tr>
+            <td><h5>state.backend.rocksdb.metrics.bytes-read</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Monitor the number of uncompressed bytes read (from memtables/cache/sst) from Get() operation in RocksDB.</td>
+        </tr>
+        <tr>
+            <td><h5>state.backend.rocksdb.metrics.bytes-written</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Monitor the number of uncompressed bytes written by DB::{Put(), Delete(), Merge(), Write()} operations, which does not include the compaction written bytes, in RocksDB.</td>
+        </tr>
         <tr>
             <td><h5>state.backend.rocksdb.metrics.column-family-as-variable</h5></td>
             <td style="word-wrap: break-word;">false</td>
             <td>Boolean</td>
-            <td>Whether to expose the column family as a variable.</td>
+            <td>Whether to expose the column family as a variable for RocksDB property based metrics.</td>
         </tr>
         <tr>
             <td><h5>state.backend.rocksdb.metrics.compaction-pending</h5></td>
@@ -50,6 +74,18 @@
             <td>Boolean</td>
             <td>Track pending compactions in RocksDB. Returns 1 if a compaction is pending, 0 otherwise.</td>
         </tr>
+        <tr>
+            <td><h5>state.backend.rocksdb.metrics.compaction-read-bytes</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Monitor the bytes read during compaction in RocksDB.</td>
+        </tr>
+        <tr>
+            <td><h5>state.backend.rocksdb.metrics.compaction-write-bytes</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Monitor the bytes written during compaction in RocksDB.</td>
+        </tr>
         <tr>
             <td><h5>state.backend.rocksdb.metrics.cur-size-active-mem-table</h5></td>
             <td style="word-wrap: break-word;">false</td>
@@ -92,6 +128,12 @@
             <td>Boolean</td>
             <td>Track whether write has been stopped in RocksDB. Returns 1 if write has been stopped, 0 otherwise.</td>
         </tr>
+        <tr>
+            <td><h5>state.backend.rocksdb.metrics.iter-bytes-read</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Monitor the number of uncompressed bytes read (from memtables/cache/sst) from an iterator operation in RocksDB.</td>
+        </tr>
         <tr>
             <td><h5>state.backend.rocksdb.metrics.live-sst-files-size</h5></td>
             <td style="word-wrap: break-word;">false</td>
@@ -164,6 +206,12 @@
             <td>Boolean</td>
             <td>Monitor the approximate size of the active, unflushed immutable, and pinned immutable memtables in bytes.</td>
         </tr>
+        <tr>
+            <td><h5>state.backend.rocksdb.metrics.stall-micros</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Monitor the duration of writer requiring to wait for compaction or flush to finish in RocksDB.</td>
+        </tr>
         <tr>
             <td><h5>state.backend.rocksdb.metrics.total-sst-files-size</h5></td>
             <td style="word-wrap: break-word;">false</td>
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
index 07829949a25..8ce78342d3c 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
@@ -147,8 +147,8 @@ public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBacke
     /** This determines the type of priority queue state. */
     @Nullable private PriorityQueueStateType priorityQueueStateType;
 
-    /** The default rocksdb metrics options. */
-    private final RocksDBNativeMetricOptions defaultMetricOptions;
+    /** The default rocksdb property-based metrics options. */
+    private final RocksDBNativeMetricOptions nativeMetricOptions;
 
     // -- runtime values, set on TaskManager when initializing / using the backend
 
@@ -199,7 +199,7 @@ public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBacke
     public EmbeddedRocksDBStateBackend(TernaryBoolean enableIncrementalCheckpointing) {
         this.enableIncrementalCheckpointing = enableIncrementalCheckpointing;
         this.numberOfTransferThreads = UNDEFINED_NUMBER_OF_TRANSFER_THREADS;
-        this.defaultMetricOptions = new RocksDBNativeMetricOptions();
+        this.nativeMetricOptions = new RocksDBNativeMetricOptions();
         this.memoryConfiguration = new RocksDBMemoryConfiguration();
         this.writeBatchSize = UNDEFINED_WRITE_BATCH_SIZE;
         this.overlapFractionThreshold = UNDEFINED_OVERLAP_FRACTION_THRESHOLD;
@@ -263,7 +263,7 @@ public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBacke
         }
 
         // configure metric options
-        this.defaultMetricOptions = RocksDBNativeMetricOptions.fromConfig(config);
+        this.nativeMetricOptions = RocksDBNativeMetricOptions.fromConfig(config);
 
         // configure RocksDB predefined options
         this.predefinedOptions =
@@ -465,7 +465,8 @@ public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBacke
             LOG.info("Obtained shared RocksDB cache of size {} bytes", sharedResources.getSize());
         }
         final RocksDBResourceContainer resourceContainer =
-                createOptionsAndResourceContainer(sharedResources);
+                createOptionsAndResourceContainer(
+                        sharedResources, nativeMetricOptions.isStatisticsEnabled());
 
         ExecutionConfig executionConfig = env.getExecutionConfig();
         StreamCompressionDecorator keyGroupCompressionDecorator =
@@ -496,7 +497,7 @@ public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBacke
                         .setEnableIncrementalCheckpointing(isIncrementalCheckpointsEnabled())
                         .setNumberOfTransferingThreads(getNumberOfTransferThreads())
                         .setNativeMetricOptions(
-                                resourceContainer.getMemoryWatcherOptions(defaultMetricOptions))
+                                resourceContainer.getMemoryWatcherOptions(nativeMetricOptions))
                         .setWriteBatchSize(getWriteBatchSize())
                         .setOverlapFractionThreshold(getOverlapFractionThreshold());
         return builder.build();
@@ -863,18 +864,20 @@ public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBacke
 
     @VisibleForTesting
     RocksDBResourceContainer createOptionsAndResourceContainer() {
-        return createOptionsAndResourceContainer(null);
+        return createOptionsAndResourceContainer(null, false);
     }
 
     @VisibleForTesting
     private RocksDBResourceContainer createOptionsAndResourceContainer(
-            @Nullable OpaqueMemoryResource<RocksDBSharedResources> sharedResources) {
+            @Nullable OpaqueMemoryResource<RocksDBSharedResources> sharedResources,
+            boolean enableStatistics) {
 
         return new RocksDBResourceContainer(
                 configurableOptions != null ? configurableOptions : new Configuration(),
                 predefinedOptions != null ? predefinedOptions : PredefinedOptions.DEFAULT,
                 rocksDbOptionsFactory,
-                sharedResources);
+                sharedResources,
+                enableStatistics);
     }
 
     @Override
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index 8706a90c6b0..ca1adc27303 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -114,7 +114,9 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
     /** True if incremental checkpointing is enabled. */
     private boolean enableIncrementalCheckpointing;
 
+    /** RocksDB property-based and statistics-based native metrics options. */
     private RocksDBNativeMetricOptions nativeMetricOptions;
+
     private int numberOfTransferingThreads;
     private long writeBatchSize =
             RocksDBConfigurableOptions.WRITE_BATCH_SIZE.defaultValue().getBytes();
@@ -309,7 +311,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
                 nativeMetricMonitor =
                         nativeMetricOptions.isEnabled()
                                 ? new RocksDBNativeMetricMonitor(
-                                        nativeMetricOptions, metricGroup, db)
+                                        nativeMetricOptions, metricGroup, db, null)
                                 : null;
             } else {
                 prepareDirectories();
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java
index 83c44c8915d..0ea5a1aec55 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java
@@ -26,10 +26,13 @@ import org.apache.flink.metrics.View;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
+import org.rocksdb.Statistics;
+import org.rocksdb.TickerType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 
 import java.io.Closeable;
@@ -54,15 +57,32 @@ public class RocksDBNativeMetricMonitor implements Closeable {
     @GuardedBy("lock")
     private RocksDB rocksDB;
 
+    @Nullable
+    @GuardedBy("lock")
+    private Statistics statistics;
+
     public RocksDBNativeMetricMonitor(
             @Nonnull RocksDBNativeMetricOptions options,
             @Nonnull MetricGroup metricGroup,
-            @Nonnull RocksDB rocksDB) {
+            @Nonnull RocksDB rocksDB,
+            @Nullable Statistics statistics) {
         this.options = options;
         this.metricGroup = metricGroup;
         this.rocksDB = rocksDB;
-
+        this.statistics = statistics;
         this.lock = new Object();
+        registerStatistics();
+    }
+
+    /** Register gauges to pull native metrics for the database. */
+    private void registerStatistics() {
+        if (statistics != null) {
+            for (TickerType tickerType : options.getMonitorTickerTypes()) {
+                metricGroup.gauge(
+                        String.format("rocksdb.%s", tickerType.name().toLowerCase()),
+                        new RocksDBNativeStatisticsMetricView(tickerType));
+            }
+        }
     }
 
     /**
@@ -80,27 +100,38 @@ public class RocksDBNativeMetricMonitor implements Closeable {
                         : metricGroup.addGroup(columnFamilyName);
 
         for (String property : options.getProperties()) {
-            RocksDBNativeMetricView gauge = new RocksDBNativeMetricView(handle, property);
+            RocksDBNativePropertyMetricView gauge =
+                    new RocksDBNativePropertyMetricView(handle, property);
             group.gauge(property, gauge);
         }
     }
 
     /** Updates the value of metricView if the reference is still valid. */
-    private void setProperty(
-            ColumnFamilyHandle handle, String property, RocksDBNativeMetricView metricView) {
+    private void setProperty(RocksDBNativePropertyMetricView metricView) {
         if (metricView.isClosed()) {
             return;
         }
         try {
             synchronized (lock) {
                 if (rocksDB != null) {
-                    long value = rocksDB.getLongProperty(handle, property);
+                    long value = rocksDB.getLongProperty(metricView.handle, metricView.property);
                     metricView.setValue(value);
                 }
             }
         } catch (RocksDBException e) {
             metricView.close();
-            LOG.warn("Failed to read native metric {} from RocksDB.", property, e);
+            LOG.warn("Failed to read native metric {} from RocksDB.", metricView.property, e);
+        }
+    }
+
+    private void setStatistics(RocksDBNativeStatisticsMetricView metricView) {
+        if (metricView.isClosed()) {
+            return;
+        }
+        if (statistics != null) {
+            synchronized (lock) {
+                metricView.setValue(statistics.getTickerCount(metricView.tickerType));
+            }
         }
     }
 
@@ -108,31 +139,46 @@ public class RocksDBNativeMetricMonitor implements Closeable {
     public void close() {
         synchronized (lock) {
             rocksDB = null;
+            statistics = null;
+        }
+    }
+
+    abstract static class RocksDBNativeView implements View {
+        private boolean closed;
+
+        RocksDBNativeView() {
+            this.closed = false;
+        }
+
+        void close() {
+            closed = true;
+        }
+
+        boolean isClosed() {
+            return closed;
         }
     }
 
     /**
-     * A gauge which periodically pulls a RocksDB native metric for the specified column family /
-     * metric pair.
+     * A gauge which periodically pulls a RocksDB property-based native metric for the specified
+     * column family / metric pair.
      *
      * <p><strong>Note</strong>: As the returned property is of type {@code uint64_t} on C++ side
      * the returning value can be negative. Because java does not support unsigned long types, this
      * gauge wraps the result in a {@link BigInteger}.
      */
-    class RocksDBNativeMetricView implements Gauge<BigInteger>, View {
+    class RocksDBNativePropertyMetricView extends RocksDBNativeView implements Gauge<BigInteger> {
         private final String property;
 
         private final ColumnFamilyHandle handle;
 
         private BigInteger bigInteger;
 
-        private boolean closed;
-
-        private RocksDBNativeMetricView(ColumnFamilyHandle handle, @Nonnull String property) {
+        private RocksDBNativePropertyMetricView(
+                ColumnFamilyHandle handle, @Nonnull String property) {
             this.handle = handle;
             this.property = property;
             this.bigInteger = BigInteger.ZERO;
-            this.closed = false;
         }
 
         public void setValue(long value) {
@@ -149,22 +195,40 @@ public class RocksDBNativeMetricMonitor implements Closeable {
             }
         }
 
-        public void close() {
-            closed = true;
+        @Override
+        public BigInteger getValue() {
+            return bigInteger;
         }
 
-        public boolean isClosed() {
-            return closed;
+        @Override
+        public void update() {
+            setProperty(this);
+        }
+    }
+
+    /**
+     * A gauge which periodically pulls a RocksDB statistics-based native metric for the database.
+     */
+    class RocksDBNativeStatisticsMetricView extends RocksDBNativeView implements Gauge<Long> {
+        private final TickerType tickerType;
+        private long value;
+
+        private RocksDBNativeStatisticsMetricView(TickerType tickerType) {
+            this.tickerType = tickerType;
         }
 
         @Override
-        public BigInteger getValue() {
-            return bigInteger;
+        public Long getValue() {
+            return value;
+        }
+
+        void setValue(long value) {
+            this.value = value;
         }
 
         @Override
         public void update() {
-            setProperty(handle, property, this);
+            setStatistics(this);
         }
     }
 }
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricOptions.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricOptions.java
index 1cec947e5c5..f6fc5c759ae 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricOptions.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricOptions.java
@@ -18,19 +18,29 @@
 
 package org.apache.flink.contrib.streaming.state;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.ReadableConfig;
 
+import org.rocksdb.TickerType;
+
 import java.io.Serializable;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 
 /**
- * Enable which RocksDB metrics to forward to Flink's metrics reporter. All metrics report at the
- * column family level and return unsigned long values.
+ * Enable which RocksDB metrics to forward to Flink's metrics reporter.
+ *
+ * <p>Property based metrics would report at the column family level and return unsigned long
+ * values.
+ *
+ * <p>Statistics based metrics would report at the database level, it can return ticker or histogram
+ * kind results.
  *
  * <p>Properties and doc comments are taken from RocksDB documentation. See <a
  * href="https://github.com/facebook/rocksdb/blob/64324e329eb0a9b4e77241a425a1615ff524c7f1/include/rocksdb/db.h#L429">
@@ -39,6 +49,10 @@ import java.util.Set;
 public class RocksDBNativeMetricOptions implements Serializable {
     private static final long serialVersionUID = 1L;
 
+    // --------------------------------------------------------------------------------------------
+    //  RocksDB property based metrics, report at column family level
+    // --------------------------------------------------------------------------------------------
+
     public static final String METRICS_COLUMN_FAMILY_AS_VARIABLE_KEY =
             "state.backend.rocksdb.metrics" + ".column-family-as-variable";
 
@@ -227,11 +241,77 @@ public class RocksDBNativeMetricOptions implements Serializable {
             ConfigOptions.key(METRICS_COLUMN_FAMILY_AS_VARIABLE_KEY)
                     .booleanType()
                     .defaultValue(false)
-                    .withDescription("Whether to expose the column family as a variable.");
+                    .withDescription(
+                            "Whether to expose the column family as a variable for RocksDB property based metrics.");
+
+    // --------------------------------------------------------------------------------------------
+    //  RocksDB statistics based metrics, report at database level
+    // --------------------------------------------------------------------------------------------
+
+    public static final ConfigOption<Boolean> MONITOR_BLOCK_CACHE_HIT =
+            ConfigOptions.key("state.backend.rocksdb.metrics.block-cache-hit")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Monitor the total count of block cache hit in RocksDB (BLOCK_CACHE_HIT == BLOCK_CACHE_INDEX_HIT + BLOCK_CACHE_FILTER_HIT + BLOCK_CACHE_DATA_HIT).");
+
+    public static final ConfigOption<Boolean> MONITOR_BLOCK_CACHE_MISS =
+            ConfigOptions.key("state.backend.rocksdb.metrics.block-cache-miss")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Monitor the total count of block cache misses in RocksDB (BLOCK_CACHE_MISS == BLOCK_CACHE_INDEX_MISS + BLOCK_CACHE_FILTER_MISS + BLOCK_CACHE_DATA_MISS).");
+
+    public static final ConfigOption<Boolean> MONITOR_BYTES_READ =
+            ConfigOptions.key("state.backend.rocksdb.metrics.bytes-read")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Monitor the number of uncompressed bytes read (from memtables/cache/sst) from Get() operation in RocksDB.");
+
+    public static final ConfigOption<Boolean> MONITOR_ITER_BYTES_READ =
+            ConfigOptions.key("state.backend.rocksdb.metrics.iter-bytes-read")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Monitor the number of uncompressed bytes read (from memtables/cache/sst) from an iterator operation in RocksDB.");
+
+    public static final ConfigOption<Boolean> MONITOR_BYTES_WRITTEN =
+            ConfigOptions.key("state.backend.rocksdb.metrics.bytes-written")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Monitor the number of uncompressed bytes written by DB::{Put(), Delete(), Merge(), Write()} operations, which does not include the compaction written bytes, in RocksDB.");
+
+    public static final ConfigOption<Boolean> MONITOR_COMPACTION_READ_BYTES =
+            ConfigOptions.key("state.backend.rocksdb.metrics.compaction-read-bytes")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Monitor the bytes read during compaction in RocksDB.");
+
+    public static final ConfigOption<Boolean> MONITOR_COMPACTION_WRITE_BYTES =
+            ConfigOptions.key("state.backend.rocksdb.metrics.compaction-write-bytes")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Monitor the bytes written during compaction in RocksDB.");
+
+    public static final ConfigOption<Boolean> MONITOR_STALL_MICROS =
+            ConfigOptions.key("state.backend.rocksdb.metrics.stall-micros")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Monitor the duration of writer requiring to wait for compaction or flush to finish in RocksDB.");
 
     /** Creates a {@link RocksDBNativeMetricOptions} based on an external configuration. */
     public static RocksDBNativeMetricOptions fromConfig(ReadableConfig config) {
         RocksDBNativeMetricOptions options = new RocksDBNativeMetricOptions();
+        configurePropertyMetrics(options, config);
+        configureStatisticsMetrics(options, config);
+        return options;
+    }
+
+    private static void configurePropertyMetrics(
+            RocksDBNativeMetricOptions options, ReadableConfig config) {
         if (config.get(MONITOR_NUM_IMMUTABLE_MEM_TABLES)) {
             options.enableNumImmutableMemTable();
         }
@@ -337,15 +417,51 @@ public class RocksDBNativeMetricOptions implements Serializable {
         }
 
         options.setColumnFamilyAsVariable(config.get(COLUMN_FAMILY_AS_VARIABLE));
+    }
 
-        return options;
+    private static void configureStatisticsMetrics(
+            RocksDBNativeMetricOptions options, ReadableConfig config) {
+        for (Map.Entry<ConfigOption<Boolean>, TickerType> entry : tickerTypeMapping.entrySet()) {
+            if (config.get(entry.getKey())) {
+                options.monitorTickerTypes.add(entry.getValue());
+            }
+        }
     }
 
+    private static final Map<ConfigOption<Boolean>, TickerType> tickerTypeMapping =
+            new HashMap<ConfigOption<Boolean>, TickerType>() {
+                private static final long serialVersionUID = 1L;
+
+                {
+                    put(MONITOR_BLOCK_CACHE_HIT, TickerType.BLOCK_CACHE_HIT);
+                    put(MONITOR_BLOCK_CACHE_MISS, TickerType.BLOCK_CACHE_MISS);
+                    put(MONITOR_BYTES_READ, TickerType.BYTES_READ);
+                    put(MONITOR_ITER_BYTES_READ, TickerType.ITER_BYTES_READ);
+                    put(MONITOR_BYTES_WRITTEN, TickerType.BYTES_WRITTEN);
+                    put(MONITOR_COMPACTION_READ_BYTES, TickerType.COMPACT_READ_BYTES);
+                    put(MONITOR_COMPACTION_WRITE_BYTES, TickerType.COMPACT_WRITE_BYTES);
+                    put(MONITOR_STALL_MICROS, TickerType.STALL_MICROS);
+                }
+            };
+
     private final Set<String> properties;
+    private final Set<TickerType> monitorTickerTypes;
     private boolean columnFamilyAsVariable = COLUMN_FAMILY_AS_VARIABLE.defaultValue();
 
     public RocksDBNativeMetricOptions() {
         this.properties = new HashSet<>();
+        this.monitorTickerTypes = new HashSet<>();
+    }
+
+    @VisibleForTesting
+    public void enableNativeStatistics(ConfigOption<Boolean> nativeStatisticsOption) {
+        TickerType tickerType = tickerTypeMapping.get(nativeStatisticsOption);
+        if (tickerType != null) {
+            monitorTickerTypes.add(tickerType);
+        } else {
+            throw new IllegalArgumentException(
+                    "Unknown configurable native statistics option " + nativeStatisticsOption);
+        }
     }
 
     /** Returns number of immutable memtables that have not yet been flushed. */
@@ -501,18 +617,28 @@ public class RocksDBNativeMetricOptions implements Serializable {
         this.columnFamilyAsVariable = columnFamilyAsVariable;
     }
 
-    /** @return the enabled RocksDB metrics */
+    /** @return the enabled RocksDB property-based metrics */
     public Collection<String> getProperties() {
         return Collections.unmodifiableCollection(properties);
     }
 
+    /** @return the enabled RocksDB statistics metrics. */
+    public Collection<TickerType> getMonitorTickerTypes() {
+        return Collections.unmodifiableCollection(monitorTickerTypes);
+    }
+
     /**
-     * {{@link RocksDBNativeMetricMonitor}} is enabled is any property is set.
+     * {{@link RocksDBNativeMetricMonitor}} is enabled if any property or ticker type is set.
      *
      * @return true if {{RocksDBNativeMetricMonitor}} should be enabled, false otherwise.
      */
     public boolean isEnabled() {
-        return !properties.isEmpty();
+        return !properties.isEmpty() || isStatisticsEnabled();
+    }
+
+    /** @return true if RocksDB statistics metrics are enabled, false otherwise. */
+    public boolean isStatisticsEnabled() {
+        return !monitorTickerTypes.isEmpty();
     }
 
     /**
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
index 50752a817ac..ed652980d04 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.contrib.streaming.state;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
@@ -34,6 +35,7 @@ import org.rocksdb.Filter;
 import org.rocksdb.IndexType;
 import org.rocksdb.PlainTableConfig;
 import org.rocksdb.ReadOptions;
+import org.rocksdb.Statistics;
 import org.rocksdb.TableFormatConfig;
 import org.rocksdb.WriteOptions;
 import org.slf4j.Logger;
@@ -73,35 +75,42 @@ public final class RocksDBResourceContainer implements AutoCloseable {
      */
     @Nullable private final OpaqueMemoryResource<RocksDBSharedResources> sharedResources;
 
+    private final boolean enableStatistics;
+
     /** The handles to be closed when the container is closed. */
     private final ArrayList<AutoCloseable> handlesToClose;
 
+    @VisibleForTesting
     public RocksDBResourceContainer() {
-        this(new Configuration(), PredefinedOptions.DEFAULT, null, null);
+        this(new Configuration(), PredefinedOptions.DEFAULT, null, null, false);
     }
 
+    @VisibleForTesting
     public RocksDBResourceContainer(
             PredefinedOptions predefinedOptions, @Nullable RocksDBOptionsFactory optionsFactory) {
-        this(new Configuration(), predefinedOptions, optionsFactory, null);
+        this(new Configuration(), predefinedOptions, optionsFactory, null, false);
     }
 
+    @VisibleForTesting
     public RocksDBResourceContainer(
             PredefinedOptions predefinedOptions,
             @Nullable RocksDBOptionsFactory optionsFactory,
             @Nullable OpaqueMemoryResource<RocksDBSharedResources> sharedResources) {
-        this(new Configuration(), predefinedOptions, optionsFactory, sharedResources);
+        this(new Configuration(), predefinedOptions, optionsFactory, sharedResources, false);
     }
 
     public RocksDBResourceContainer(
             ReadableConfig configuration,
             PredefinedOptions predefinedOptions,
             @Nullable RocksDBOptionsFactory optionsFactory,
-            @Nullable OpaqueMemoryResource<RocksDBSharedResources> sharedResources) {
+            @Nullable OpaqueMemoryResource<RocksDBSharedResources> sharedResources,
+            boolean enableStatistics) {
 
         this.configuration = configuration;
         this.predefinedOptions = checkNotNull(predefinedOptions);
         this.optionsFactory = optionsFactory;
         this.sharedResources = sharedResources;
+        this.enableStatistics = enableStatistics;
         this.handlesToClose = new ArrayList<>();
     }
 
@@ -127,6 +136,12 @@ public final class RocksDBResourceContainer implements AutoCloseable {
             opt.setWriteBufferManager(sharedResources.getResourceHandle().getWriteBufferManager());
         }
 
+        if (enableStatistics) {
+            Statistics statistics = new Statistics();
+            opt.setStatistics(statistics);
+            handlesToClose.add(statistics);
+        }
+
         return opt;
     }
 
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java
index 9e9a12d7760..6c1d4625fb6 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java
@@ -143,7 +143,8 @@ class RocksDBHandle implements AutoCloseable {
         // init native metrics monitor if configured
         nativeMetricMonitor =
                 nativeMetricOptions.isEnabled()
-                        ? new RocksDBNativeMetricMonitor(nativeMetricOptions, metricGroup, db)
+                        ? new RocksDBNativeMetricMonitor(
+                                nativeMetricOptions, metricGroup, db, dbOptions.statistics())
                         : null;
     }
 
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitorTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitorTest.java
index f2b95870e19..a03de345e01 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitorTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitorTest.java
@@ -31,9 +31,11 @@ import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDBException;
 
 import java.math.BigInteger;
 import java.util.ArrayList;
+import java.util.List;
 
 /** validate native metric monitor. */
 public class RocksDBNativeMetricMonitorTest {
@@ -42,14 +44,14 @@ public class RocksDBNativeMetricMonitorTest {
 
     private static final String COLUMN_FAMILY_NAME = "column-family";
 
-    @Rule public RocksDBResource rocksDBResource = new RocksDBResource();
+    @Rule public RocksDBResource rocksDBResource = new RocksDBResource(true);
 
     @Test
     public void testMetricMonitorLifecycle() throws Throwable {
         // We use a local variable here to manually control the life-cycle.
         // This allows us to verify that metrics do not try to access
         // RocksDB after the monitor was closed.
-        RocksDBResource localRocksDBResource = new RocksDBResource();
+        RocksDBResource localRocksDBResource = new RocksDBResource(true);
         localRocksDBResource.before();
 
         SimpleMetricRegistry registry = new SimpleMetricRegistry();
@@ -64,24 +66,38 @@ public class RocksDBNativeMetricMonitorTest {
         // value since empty memtables
         // have overhead.
         options.enableSizeAllMemTables();
+        options.enableNativeStatistics(RocksDBNativeMetricOptions.MONITOR_BYTES_WRITTEN);
 
         RocksDBNativeMetricMonitor monitor =
-                new RocksDBNativeMetricMonitor(options, group, localRocksDBResource.getRocksDB());
+                new RocksDBNativeMetricMonitor(
+                        options,
+                        group,
+                        localRocksDBResource.getRocksDB(),
+                        localRocksDBResource.getDbOptions().statistics());
 
         ColumnFamilyHandle handle = localRocksDBResource.createNewColumnFamily(COLUMN_FAMILY_NAME);
         monitor.registerColumnFamily(COLUMN_FAMILY_NAME, handle);
 
         Assert.assertEquals(
-                "Failed to register metrics for column family", 1, registry.metrics.size());
-
-        RocksDBNativeMetricMonitor.RocksDBNativeMetricView view = registry.metrics.get(0);
+                "Failed to register metrics for column family", 1, registry.propertyMetrics.size());
 
-        view.update();
+        // write something to ensure the bytes-written is not zero.
+        localRocksDBResource.getRocksDB().put(new byte[4], new byte[10]);
 
-        Assert.assertNotEquals(
-                "Failed to pull metric from RocksDB", BigInteger.ZERO, view.getValue());
+        for (RocksDBNativeMetricMonitor.RocksDBNativePropertyMetricView view :
+                registry.propertyMetrics) {
+            view.update();
+            Assert.assertNotEquals(
+                    "Failed to pull metric from RocksDB", BigInteger.ZERO, view.getValue());
+            view.setValue(0L);
+        }
 
-        view.setValue(0L);
+        for (RocksDBNativeMetricMonitor.RocksDBNativeStatisticsMetricView view :
+                registry.statisticsMetrics) {
+            view.update();
+            Assert.assertNotEquals(0L, (long) view.getValue());
+            view.setValue(0L);
+        }
 
         // After the monitor is closed no metric should be accessing RocksDB anymore.
         // If they do, then this test will likely fail with a segmentation fault.
@@ -89,10 +105,18 @@ public class RocksDBNativeMetricMonitorTest {
 
         localRocksDBResource.after();
 
-        view.update();
+        for (RocksDBNativeMetricMonitor.RocksDBNativePropertyMetricView view :
+                registry.propertyMetrics) {
+            view.update();
+            Assert.assertEquals(
+                    "Failed to release RocksDB reference", BigInteger.ZERO, view.getValue());
+        }
 
-        Assert.assertEquals(
-                "Failed to release RocksDB reference", BigInteger.ZERO, view.getValue());
+        for (RocksDBNativeMetricMonitor.RocksDBNativeStatisticsMetricView view :
+                registry.statisticsMetrics) {
+            view.update();
+            Assert.assertEquals(0L, (long) view.getValue());
+        }
     }
 
     @Test
@@ -111,11 +135,16 @@ public class RocksDBNativeMetricMonitorTest {
         options.enableSizeAllMemTables();
 
         RocksDBNativeMetricMonitor monitor =
-                new RocksDBNativeMetricMonitor(options, group, localRocksDBResource.getRocksDB());
+                new RocksDBNativeMetricMonitor(
+                        options,
+                        group,
+                        localRocksDBResource.getRocksDB(),
+                        localRocksDBResource.getDbOptions().statistics());
 
         ColumnFamilyHandle handle = rocksDBResource.createNewColumnFamily(COLUMN_FAMILY_NAME);
         monitor.registerColumnFamily(COLUMN_FAMILY_NAME, handle);
-        RocksDBNativeMetricMonitor.RocksDBNativeMetricView view = registry.metrics.get(0);
+        RocksDBNativeMetricMonitor.RocksDBNativePropertyMetricView view =
+                registry.propertyMetrics.get(0);
 
         view.setValue(-1);
         BigInteger result = view.getValue();
@@ -127,7 +156,7 @@ public class RocksDBNativeMetricMonitorTest {
     }
 
     @Test
-    public void testClosedGaugesDontRead() {
+    public void testClosedGaugesDontRead() throws RocksDBException {
         SimpleMetricRegistry registry = new SimpleMetricRegistry();
         GenericMetricGroup group =
                 new GenericMetricGroup(
@@ -137,23 +166,42 @@ public class RocksDBNativeMetricMonitorTest {
 
         RocksDBNativeMetricOptions options = new RocksDBNativeMetricOptions();
         options.enableSizeAllMemTables();
+        options.enableNativeStatistics(RocksDBNativeMetricOptions.MONITOR_BLOCK_CACHE_HIT);
 
         RocksDBNativeMetricMonitor monitor =
-                new RocksDBNativeMetricMonitor(options, group, rocksDBResource.getRocksDB());
+                new RocksDBNativeMetricMonitor(
+                        options,
+                        group,
+                        rocksDBResource.getRocksDB(),
+                        rocksDBResource.getDbOptions().statistics());
 
         ColumnFamilyHandle handle = rocksDBResource.createNewColumnFamily(COLUMN_FAMILY_NAME);
         monitor.registerColumnFamily(COLUMN_FAMILY_NAME, handle);
 
-        RocksDBNativeMetricMonitor.RocksDBNativeMetricView view = registry.metrics.get(0);
+        rocksDBResource.getRocksDB().put(new byte[4], new byte[10]);
 
-        view.close();
-        view.update();
+        for (RocksDBNativeMetricMonitor.RocksDBNativePropertyMetricView view :
+                registry.propertyMetrics) {
+            view.close();
+            view.update();
+            Assert.assertEquals(
+                    "Closed gauge still queried RocksDB", BigInteger.ZERO, view.getValue());
+        }
 
-        Assert.assertEquals("Closed gauge still queried RocksDB", BigInteger.ZERO, view.getValue());
+        for (RocksDBNativeMetricMonitor.RocksDBNativeStatisticsMetricView view :
+                registry.statisticsMetrics) {
+            view.close();
+            view.update();
+            Assert.assertEquals("Closed gauge still queried RocksDB", 0L, (long) view.getValue());
+        }
     }
 
     static class SimpleMetricRegistry implements MetricRegistry {
-        ArrayList<RocksDBNativeMetricMonitor.RocksDBNativeMetricView> metrics = new ArrayList<>();
+        List<RocksDBNativeMetricMonitor.RocksDBNativePropertyMetricView> propertyMetrics =
+                new ArrayList<>();
+
+        List<RocksDBNativeMetricMonitor.RocksDBNativeStatisticsMetricView> statisticsMetrics =
+                new ArrayList<>();
 
         @Override
         public char getDelimiter() {
@@ -167,8 +215,13 @@ public class RocksDBNativeMetricMonitorTest {
 
         @Override
         public void register(Metric metric, String metricName, AbstractMetricGroup group) {
-            if (metric instanceof RocksDBNativeMetricMonitor.RocksDBNativeMetricView) {
-                metrics.add((RocksDBNativeMetricMonitor.RocksDBNativeMetricView) metric);
+            if (metric instanceof RocksDBNativeMetricMonitor.RocksDBNativePropertyMetricView) {
+                propertyMetrics.add(
+                        (RocksDBNativeMetricMonitor.RocksDBNativePropertyMetricView) metric);
+            } else if (metric
+                    instanceof RocksDBNativeMetricMonitor.RocksDBNativeStatisticsMetricView) {
+                statisticsMetrics.add(
+                        (RocksDBNativeMetricMonitor.RocksDBNativeStatisticsMetricView) metric);
             }
         }
 
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java
index 66b6221c179..238489cb8e1 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java
@@ -30,6 +30,7 @@ import org.rocksdb.DBOptions;
 import org.rocksdb.InfoLogLevel;
 import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
+import org.rocksdb.Statistics;
 import org.rocksdb.WriteOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,6 +51,8 @@ public class RocksDBResource extends ExternalResource {
     /** Factory for {@link DBOptions} and {@link ColumnFamilyOptions}. */
     private final RocksDBOptionsFactory optionsFactory;
 
+    private final boolean enableStatistics;
+
     /** Temporary folder that provides the working directory for the RocksDB instance. */
     private TemporaryFolder temporaryFolder;
 
@@ -78,8 +81,14 @@ public class RocksDBResource extends ExternalResource {
     private ArrayList<AutoCloseable> handlesToClose = new ArrayList<>();
 
     public RocksDBResource() {
+        this(false);
+    }
+
+    public RocksDBResource(boolean enableStatistics) {
         this(
                 new RocksDBOptionsFactory() {
+                    private static final long serialVersionUID = 1L;
+
                     @Override
                     public DBOptions createDBOptions(
                             DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
@@ -111,11 +120,14 @@ public class RocksDBResource extends ExternalResource {
 
                         return new ColumnFamilyOptions().optimizeForPointLookup(40960);
                     }
-                });
+                },
+                enableStatistics);
     }
 
-    public RocksDBResource(@Nonnull RocksDBOptionsFactory optionsFactory) {
+    public RocksDBResource(
+            @Nonnull RocksDBOptionsFactory optionsFactory, boolean enableStatistics) {
         this.optionsFactory = optionsFactory;
+        this.enableStatistics = enableStatistics;
     }
 
     public ColumnFamilyHandle getDefaultColumnFamily() {
@@ -134,6 +146,10 @@ public class RocksDBResource extends ExternalResource {
         return readOptions;
     }
 
+    public DBOptions getDbOptions() {
+        return dbOptions;
+    }
+
     public RocksDBWriteBatchWrapper getBatchWrapper() {
         return batchWrapper;
     }
@@ -165,6 +181,11 @@ public class RocksDBResource extends ExternalResource {
                                         .setStatsDumpPeriodSec(0),
                                 handlesToClose)
                         .setCreateIfMissing(true);
+        if (enableStatistics) {
+            Statistics statistics = new Statistics();
+            dbOptions.setStatistics(statistics);
+            handlesToClose.add(statistics);
+        }
         this.columnFamilyOptions =
                 optionsFactory.createColumnOptions(new ColumnFamilyOptions(), handlesToClose);
         this.writeOptions = new WriteOptions();
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index 8f8dfc4093b..e16d1504812 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -531,7 +531,7 @@ public class RocksDBStateBackendConfigTest {
 
             try (RocksDBResourceContainer optionsContainer =
                     new RocksDBResourceContainer(
-                            configuration, PredefinedOptions.DEFAULT, null, null)) {
+                            configuration, PredefinedOptions.DEFAULT, null, null, false)) {
 
                 DBOptions dbOptions = optionsContainer.getDbOptions();
                 assertEquals(-1, dbOptions.maxOpenFiles());
@@ -610,7 +610,11 @@ public class RocksDBStateBackendConfigTest {
         configuration.set(RocksDBConfigurableOptions.COMPACTION_STYLE, CompactionStyle.UNIVERSAL);
         try (final RocksDBResourceContainer optionsContainer =
                 new RocksDBResourceContainer(
-                        configuration, PredefinedOptions.SPINNING_DISK_OPTIMIZED, null, null)) {
+                        configuration,
+                        PredefinedOptions.SPINNING_DISK_OPTIMIZED,
+                        null,
+                        null,
+                        false)) {
 
             final ColumnFamilyOptions columnFamilyOptions = optionsContainer.getColumnOptions();
             assertNotNull(columnFamilyOptions);
@@ -622,7 +626,8 @@ public class RocksDBStateBackendConfigTest {
                         new Configuration(),
                         PredefinedOptions.SPINNING_DISK_OPTIMIZED,
                         null,
-                        null)) {
+                        null,
+                        false)) {
 
             final ColumnFamilyOptions columnFamilyOptions = optionsContainer.getColumnOptions();
             assertNotNull(columnFamilyOptions);