You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ta...@apache.org on 2022/07/14 02:23:17 UTC
[flink] branch master updated: [FLINK-24786][state] Introduce and expose RocksDB statistics as metrics
This is an automated email from the ASF dual-hosted git repository.
tangyun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 46e0014d3fe [FLINK-24786][state] Introduce and expose RocksDB statistics as metrics
46e0014d3fe is described below
commit 46e0014d3fe9be9af5f1fd75dc03dcc967a7fb4a
Author: Yun Tang <my...@live.com>
AuthorDate: Wed May 11 23:30:37 2022 +0800
[FLINK-24786][state] Introduce and expose RocksDB statistics as metrics
---
docs/content.zh/docs/deployment/config.md | 4 +-
docs/content/docs/deployment/config.md | 4 +-
.../rocksdb_native_metric_configuration.html | 50 +++++++-
.../state/EmbeddedRocksDBStateBackend.java | 21 ++--
.../state/RocksDBKeyedStateBackendBuilder.java | 4 +-
.../state/RocksDBNativeMetricMonitor.java | 106 ++++++++++++----
.../state/RocksDBNativeMetricOptions.java | 140 +++++++++++++++++++--
.../streaming/state/RocksDBResourceContainer.java | 23 +++-
.../streaming/state/restore/RocksDBHandle.java | 3 +-
.../state/RocksDBNativeMetricMonitorTest.java | 101 +++++++++++----
.../contrib/streaming/state/RocksDBResource.java | 25 +++-
.../state/RocksDBStateBackendConfigTest.java | 11 +-
12 files changed, 417 insertions(+), 75 deletions(-)
diff --git a/docs/content.zh/docs/deployment/config.md b/docs/content.zh/docs/deployment/config.md
index 912ac8bb1d8..4c6a29da631 100644
--- a/docs/content.zh/docs/deployment/config.md
+++ b/docs/content.zh/docs/deployment/config.md
@@ -264,7 +264,9 @@ Please refer to the [metrics system documentation]({{< ref "docs/ops/metrics" >}
### RocksDB Native Metrics
Flink can report metrics from RocksDB's native code, for applications using the RocksDB state backend.
-The metrics here are scoped to the operators and then further broken down by column family; values are reported as unsigned longs.
+The metrics here are scoped to the operators with unsigned longs and have two kinds of types:
+1. RocksDB property-based metrics, which is broken down by column family, e.g. number of currently running compactions of one specific column family.
+2. RocksDB statistics-based metrics, which holds at the database level, e.g. total block cache hit count within the DB.
{{< hint warning >}}
Enabling RocksDB's native metrics may cause degraded performance and should be set carefully.
diff --git a/docs/content/docs/deployment/config.md b/docs/content/docs/deployment/config.md
index a38e72d0e18..5fc8cb1ac29 100644
--- a/docs/content/docs/deployment/config.md
+++ b/docs/content/docs/deployment/config.md
@@ -266,7 +266,9 @@ Please refer to the [metrics system documentation]({{< ref "docs/ops/metrics" >}
### RocksDB Native Metrics
Flink can report metrics from RocksDB's native code, for applications using the RocksDB state backend.
-The metrics here are scoped to the operators and then further broken down by column family; values are reported as unsigned longs.
+The metrics here are scoped to the operators with unsigned longs and have two kinds of types:
+1. RocksDB property-based metrics, which is broken down by column family, e.g. number of currently running compactions of one specific column family.
+2. RocksDB statistics-based metrics, which holds at the database level, e.g. total block cache hit count within the DB.
{{< hint warning >}}
Enabling RocksDB's native metrics may cause degraded performance and should be set carefully.
diff --git a/docs/layouts/shortcodes/generated/rocksdb_native_metric_configuration.html b/docs/layouts/shortcodes/generated/rocksdb_native_metric_configuration.html
index a7922f2547a..2edd8387d08 100644
--- a/docs/layouts/shortcodes/generated/rocksdb_native_metric_configuration.html
+++ b/docs/layouts/shortcodes/generated/rocksdb_native_metric_configuration.html
@@ -26,6 +26,18 @@
<td>Boolean</td>
<td>Monitor block cache capacity.</td>
</tr>
+ <tr>
+ <td><h5>state.backend.rocksdb.metrics.block-cache-hit</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Monitor the total count of block cache hit in RocksDB (BLOCK_CACHE_HIT == BLOCK_CACHE_INDEX_HIT + BLOCK_CACHE_FILTER_HIT + BLOCK_CACHE_DATA_HIT).</td>
+ </tr>
+ <tr>
+ <td><h5>state.backend.rocksdb.metrics.block-cache-miss</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Monitor the total count of block cache misses in RocksDB (BLOCK_CACHE_MISS == BLOCK_CACHE_INDEX_MISS + BLOCK_CACHE_FILTER_MISS + BLOCK_CACHE_DATA_MISS).</td>
+ </tr>
<tr>
<td><h5>state.backend.rocksdb.metrics.block-cache-pinned-usage</h5></td>
<td style="word-wrap: break-word;">false</td>
@@ -38,11 +50,23 @@
<td>Boolean</td>
<td>Monitor the memory size for the entries residing in block cache.</td>
</tr>
+ <tr>
+ <td><h5>state.backend.rocksdb.metrics.bytes-read</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Monitor the number of uncompressed bytes read (from memtables/cache/sst) from Get() operation in RocksDB.</td>
+ </tr>
+ <tr>
+ <td><h5>state.backend.rocksdb.metrics.bytes-written</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Monitor the number of uncompressed bytes written by DB::{Put(), Delete(), Merge(), Write()} operations, which does not include the compaction written bytes, in RocksDB.</td>
+ </tr>
<tr>
<td><h5>state.backend.rocksdb.metrics.column-family-as-variable</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
- <td>Whether to expose the column family as a variable.</td>
+ <td>Whether to expose the column family as a variable for RocksDB property based metrics.</td>
</tr>
<tr>
<td><h5>state.backend.rocksdb.metrics.compaction-pending</h5></td>
@@ -50,6 +74,18 @@
<td>Boolean</td>
<td>Track pending compactions in RocksDB. Returns 1 if a compaction is pending, 0 otherwise.</td>
</tr>
+ <tr>
+ <td><h5>state.backend.rocksdb.metrics.compaction-read-bytes</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Monitor the bytes read during compaction in RocksDB.</td>
+ </tr>
+ <tr>
+ <td><h5>state.backend.rocksdb.metrics.compaction-write-bytes</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Monitor the bytes written during compaction in RocksDB.</td>
+ </tr>
<tr>
<td><h5>state.backend.rocksdb.metrics.cur-size-active-mem-table</h5></td>
<td style="word-wrap: break-word;">false</td>
@@ -92,6 +128,12 @@
<td>Boolean</td>
<td>Track whether write has been stopped in RocksDB. Returns 1 if write has been stopped, 0 otherwise.</td>
</tr>
+ <tr>
+ <td><h5>state.backend.rocksdb.metrics.iter-bytes-read</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Monitor the number of uncompressed bytes read (from memtables/cache/sst) from an iterator operation in RocksDB.</td>
+ </tr>
<tr>
<td><h5>state.backend.rocksdb.metrics.live-sst-files-size</h5></td>
<td style="word-wrap: break-word;">false</td>
@@ -164,6 +206,12 @@
<td>Boolean</td>
<td>Monitor the approximate size of the active, unflushed immutable, and pinned immutable memtables in bytes.</td>
</tr>
+ <tr>
+ <td><h5>state.backend.rocksdb.metrics.stall-micros</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Monitor the duration of writer requiring to wait for compaction or flush to finish in RocksDB.</td>
+ </tr>
<tr>
<td><h5>state.backend.rocksdb.metrics.total-sst-files-size</h5></td>
<td style="word-wrap: break-word;">false</td>
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
index 07829949a25..8ce78342d3c 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
@@ -147,8 +147,8 @@ public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBacke
/** This determines the type of priority queue state. */
@Nullable private PriorityQueueStateType priorityQueueStateType;
- /** The default rocksdb metrics options. */
- private final RocksDBNativeMetricOptions defaultMetricOptions;
+ /** The default rocksdb property-based metrics options. */
+ private final RocksDBNativeMetricOptions nativeMetricOptions;
// -- runtime values, set on TaskManager when initializing / using the backend
@@ -199,7 +199,7 @@ public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBacke
public EmbeddedRocksDBStateBackend(TernaryBoolean enableIncrementalCheckpointing) {
this.enableIncrementalCheckpointing = enableIncrementalCheckpointing;
this.numberOfTransferThreads = UNDEFINED_NUMBER_OF_TRANSFER_THREADS;
- this.defaultMetricOptions = new RocksDBNativeMetricOptions();
+ this.nativeMetricOptions = new RocksDBNativeMetricOptions();
this.memoryConfiguration = new RocksDBMemoryConfiguration();
this.writeBatchSize = UNDEFINED_WRITE_BATCH_SIZE;
this.overlapFractionThreshold = UNDEFINED_OVERLAP_FRACTION_THRESHOLD;
@@ -263,7 +263,7 @@ public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBacke
}
// configure metric options
- this.defaultMetricOptions = RocksDBNativeMetricOptions.fromConfig(config);
+ this.nativeMetricOptions = RocksDBNativeMetricOptions.fromConfig(config);
// configure RocksDB predefined options
this.predefinedOptions =
@@ -465,7 +465,8 @@ public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBacke
LOG.info("Obtained shared RocksDB cache of size {} bytes", sharedResources.getSize());
}
final RocksDBResourceContainer resourceContainer =
- createOptionsAndResourceContainer(sharedResources);
+ createOptionsAndResourceContainer(
+ sharedResources, nativeMetricOptions.isStatisticsEnabled());
ExecutionConfig executionConfig = env.getExecutionConfig();
StreamCompressionDecorator keyGroupCompressionDecorator =
@@ -496,7 +497,7 @@ public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBacke
.setEnableIncrementalCheckpointing(isIncrementalCheckpointsEnabled())
.setNumberOfTransferingThreads(getNumberOfTransferThreads())
.setNativeMetricOptions(
- resourceContainer.getMemoryWatcherOptions(defaultMetricOptions))
+ resourceContainer.getMemoryWatcherOptions(nativeMetricOptions))
.setWriteBatchSize(getWriteBatchSize())
.setOverlapFractionThreshold(getOverlapFractionThreshold());
return builder.build();
@@ -863,18 +864,20 @@ public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBacke
@VisibleForTesting
RocksDBResourceContainer createOptionsAndResourceContainer() {
- return createOptionsAndResourceContainer(null);
+ return createOptionsAndResourceContainer(null, false);
}
@VisibleForTesting
private RocksDBResourceContainer createOptionsAndResourceContainer(
- @Nullable OpaqueMemoryResource<RocksDBSharedResources> sharedResources) {
+ @Nullable OpaqueMemoryResource<RocksDBSharedResources> sharedResources,
+ boolean enableStatistics) {
return new RocksDBResourceContainer(
configurableOptions != null ? configurableOptions : new Configuration(),
predefinedOptions != null ? predefinedOptions : PredefinedOptions.DEFAULT,
rocksDbOptionsFactory,
- sharedResources);
+ sharedResources,
+ enableStatistics);
}
@Override
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index 8706a90c6b0..ca1adc27303 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -114,7 +114,9 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
/** True if incremental checkpointing is enabled. */
private boolean enableIncrementalCheckpointing;
+ /** RocksDB property-based and statistics-based native metrics options. */
private RocksDBNativeMetricOptions nativeMetricOptions;
+
private int numberOfTransferingThreads;
private long writeBatchSize =
RocksDBConfigurableOptions.WRITE_BATCH_SIZE.defaultValue().getBytes();
@@ -309,7 +311,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
nativeMetricMonitor =
nativeMetricOptions.isEnabled()
? new RocksDBNativeMetricMonitor(
- nativeMetricOptions, metricGroup, db)
+ nativeMetricOptions, metricGroup, db, null)
: null;
} else {
prepareDirectories();
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java
index 83c44c8915d..0ea5a1aec55 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java
@@ -26,10 +26,13 @@ import org.apache.flink.metrics.View;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
+import org.rocksdb.Statistics;
+import org.rocksdb.TickerType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import java.io.Closeable;
@@ -54,15 +57,32 @@ public class RocksDBNativeMetricMonitor implements Closeable {
@GuardedBy("lock")
private RocksDB rocksDB;
+ @Nullable
+ @GuardedBy("lock")
+ private Statistics statistics;
+
public RocksDBNativeMetricMonitor(
@Nonnull RocksDBNativeMetricOptions options,
@Nonnull MetricGroup metricGroup,
- @Nonnull RocksDB rocksDB) {
+ @Nonnull RocksDB rocksDB,
+ @Nullable Statistics statistics) {
this.options = options;
this.metricGroup = metricGroup;
this.rocksDB = rocksDB;
-
+ this.statistics = statistics;
this.lock = new Object();
+ registerStatistics();
+ }
+
+ /** Register gauges to pull native metrics for the database. */
+ private void registerStatistics() {
+ if (statistics != null) {
+ for (TickerType tickerType : options.getMonitorTickerTypes()) {
+ metricGroup.gauge(
+ String.format("rocksdb.%s", tickerType.name().toLowerCase()),
+ new RocksDBNativeStatisticsMetricView(tickerType));
+ }
+ }
}
/**
@@ -80,27 +100,38 @@ public class RocksDBNativeMetricMonitor implements Closeable {
: metricGroup.addGroup(columnFamilyName);
for (String property : options.getProperties()) {
- RocksDBNativeMetricView gauge = new RocksDBNativeMetricView(handle, property);
+ RocksDBNativePropertyMetricView gauge =
+ new RocksDBNativePropertyMetricView(handle, property);
group.gauge(property, gauge);
}
}
/** Updates the value of metricView if the reference is still valid. */
- private void setProperty(
- ColumnFamilyHandle handle, String property, RocksDBNativeMetricView metricView) {
+ private void setProperty(RocksDBNativePropertyMetricView metricView) {
if (metricView.isClosed()) {
return;
}
try {
synchronized (lock) {
if (rocksDB != null) {
- long value = rocksDB.getLongProperty(handle, property);
+ long value = rocksDB.getLongProperty(metricView.handle, metricView.property);
metricView.setValue(value);
}
}
} catch (RocksDBException e) {
metricView.close();
- LOG.warn("Failed to read native metric {} from RocksDB.", property, e);
+ LOG.warn("Failed to read native metric {} from RocksDB.", metricView.property, e);
+ }
+ }
+
+ private void setStatistics(RocksDBNativeStatisticsMetricView metricView) {
+ if (metricView.isClosed()) {
+ return;
+ }
+ if (statistics != null) {
+ synchronized (lock) {
+ metricView.setValue(statistics.getTickerCount(metricView.tickerType));
+ }
}
}
@@ -108,31 +139,46 @@ public class RocksDBNativeMetricMonitor implements Closeable {
public void close() {
synchronized (lock) {
rocksDB = null;
+ statistics = null;
+ }
+ }
+
+ abstract static class RocksDBNativeView implements View {
+ private boolean closed;
+
+ RocksDBNativeView() {
+ this.closed = false;
+ }
+
+ void close() {
+ closed = true;
+ }
+
+ boolean isClosed() {
+ return closed;
}
}
/**
- * A gauge which periodically pulls a RocksDB native metric for the specified column family /
- * metric pair.
+ * A gauge which periodically pulls a RocksDB property-based native metric for the specified
+ * column family / metric pair.
*
* <p><strong>Note</strong>: As the returned property is of type {@code uint64_t} on C++ side
* the returning value can be negative. Because java does not support unsigned long types, this
* gauge wraps the result in a {@link BigInteger}.
*/
- class RocksDBNativeMetricView implements Gauge<BigInteger>, View {
+ class RocksDBNativePropertyMetricView extends RocksDBNativeView implements Gauge<BigInteger> {
private final String property;
private final ColumnFamilyHandle handle;
private BigInteger bigInteger;
- private boolean closed;
-
- private RocksDBNativeMetricView(ColumnFamilyHandle handle, @Nonnull String property) {
+ private RocksDBNativePropertyMetricView(
+ ColumnFamilyHandle handle, @Nonnull String property) {
this.handle = handle;
this.property = property;
this.bigInteger = BigInteger.ZERO;
- this.closed = false;
}
public void setValue(long value) {
@@ -149,22 +195,40 @@ public class RocksDBNativeMetricMonitor implements Closeable {
}
}
- public void close() {
- closed = true;
+ @Override
+ public BigInteger getValue() {
+ return bigInteger;
}
- public boolean isClosed() {
- return closed;
+ @Override
+ public void update() {
+ setProperty(this);
+ }
+ }
+
+ /**
+ * A gauge which periodically pulls a RocksDB statistics-based native metric for the database.
+ */
+ class RocksDBNativeStatisticsMetricView extends RocksDBNativeView implements Gauge<Long> {
+ private final TickerType tickerType;
+ private long value;
+
+ private RocksDBNativeStatisticsMetricView(TickerType tickerType) {
+ this.tickerType = tickerType;
}
@Override
- public BigInteger getValue() {
- return bigInteger;
+ public Long getValue() {
+ return value;
+ }
+
+ void setValue(long value) {
+ this.value = value;
}
@Override
public void update() {
- setProperty(handle, property, this);
+ setStatistics(this);
}
}
}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricOptions.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricOptions.java
index 1cec947e5c5..f6fc5c759ae 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricOptions.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricOptions.java
@@ -18,19 +18,29 @@
package org.apache.flink.contrib.streaming.state;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
+import org.rocksdb.TickerType;
+
import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
/**
- * Enable which RocksDB metrics to forward to Flink's metrics reporter. All metrics report at the
- * column family level and return unsigned long values.
+ * Enable which RocksDB metrics to forward to Flink's metrics reporter.
+ *
+ * <p>Property based metrics would report at the column family level and return unsigned long
+ * values.
+ *
+ * <p>Statistics based metrics would report at the database level, it can return ticker or histogram
+ * kind results.
*
* <p>Properties and doc comments are taken from RocksDB documentation. See <a
* href="https://github.com/facebook/rocksdb/blob/64324e329eb0a9b4e77241a425a1615ff524c7f1/include/rocksdb/db.h#L429">
@@ -39,6 +49,10 @@ import java.util.Set;
public class RocksDBNativeMetricOptions implements Serializable {
private static final long serialVersionUID = 1L;
+ // --------------------------------------------------------------------------------------------
+ // RocksDB property based metrics, report at column family level
+ // --------------------------------------------------------------------------------------------
+
public static final String METRICS_COLUMN_FAMILY_AS_VARIABLE_KEY =
"state.backend.rocksdb.metrics" + ".column-family-as-variable";
@@ -227,11 +241,77 @@ public class RocksDBNativeMetricOptions implements Serializable {
ConfigOptions.key(METRICS_COLUMN_FAMILY_AS_VARIABLE_KEY)
.booleanType()
.defaultValue(false)
- .withDescription("Whether to expose the column family as a variable.");
+ .withDescription(
+ "Whether to expose the column family as a variable for RocksDB property based metrics.");
+
+ // --------------------------------------------------------------------------------------------
+ // RocksDB statistics based metrics, report at database level
+ // --------------------------------------------------------------------------------------------
+
+ public static final ConfigOption<Boolean> MONITOR_BLOCK_CACHE_HIT =
+ ConfigOptions.key("state.backend.rocksdb.metrics.block-cache-hit")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Monitor the total count of block cache hit in RocksDB (BLOCK_CACHE_HIT == BLOCK_CACHE_INDEX_HIT + BLOCK_CACHE_FILTER_HIT + BLOCK_CACHE_DATA_HIT).");
+
+ public static final ConfigOption<Boolean> MONITOR_BLOCK_CACHE_MISS =
+ ConfigOptions.key("state.backend.rocksdb.metrics.block-cache-miss")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Monitor the total count of block cache misses in RocksDB (BLOCK_CACHE_MISS == BLOCK_CACHE_INDEX_MISS + BLOCK_CACHE_FILTER_MISS + BLOCK_CACHE_DATA_MISS).");
+
+ public static final ConfigOption<Boolean> MONITOR_BYTES_READ =
+ ConfigOptions.key("state.backend.rocksdb.metrics.bytes-read")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Monitor the number of uncompressed bytes read (from memtables/cache/sst) from Get() operation in RocksDB.");
+
+ public static final ConfigOption<Boolean> MONITOR_ITER_BYTES_READ =
+ ConfigOptions.key("state.backend.rocksdb.metrics.iter-bytes-read")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Monitor the number of uncompressed bytes read (from memtables/cache/sst) from an iterator operation in RocksDB.");
+
+ public static final ConfigOption<Boolean> MONITOR_BYTES_WRITTEN =
+ ConfigOptions.key("state.backend.rocksdb.metrics.bytes-written")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Monitor the number of uncompressed bytes written by DB::{Put(), Delete(), Merge(), Write()} operations, which does not include the compaction written bytes, in RocksDB.");
+
+ public static final ConfigOption<Boolean> MONITOR_COMPACTION_READ_BYTES =
+ ConfigOptions.key("state.backend.rocksdb.metrics.compaction-read-bytes")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Monitor the bytes read during compaction in RocksDB.");
+
+ public static final ConfigOption<Boolean> MONITOR_COMPACTION_WRITE_BYTES =
+ ConfigOptions.key("state.backend.rocksdb.metrics.compaction-write-bytes")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Monitor the bytes written during compaction in RocksDB.");
+
+ public static final ConfigOption<Boolean> MONITOR_STALL_MICROS =
+ ConfigOptions.key("state.backend.rocksdb.metrics.stall-micros")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Monitor the duration of writer requiring to wait for compaction or flush to finish in RocksDB.");
/** Creates a {@link RocksDBNativeMetricOptions} based on an external configuration. */
public static RocksDBNativeMetricOptions fromConfig(ReadableConfig config) {
RocksDBNativeMetricOptions options = new RocksDBNativeMetricOptions();
+ configurePropertyMetrics(options, config);
+ configureStatisticsMetrics(options, config);
+ return options;
+ }
+
+ private static void configurePropertyMetrics(
+ RocksDBNativeMetricOptions options, ReadableConfig config) {
if (config.get(MONITOR_NUM_IMMUTABLE_MEM_TABLES)) {
options.enableNumImmutableMemTable();
}
@@ -337,15 +417,51 @@ public class RocksDBNativeMetricOptions implements Serializable {
}
options.setColumnFamilyAsVariable(config.get(COLUMN_FAMILY_AS_VARIABLE));
+ }
- return options;
+ private static void configureStatisticsMetrics(
+ RocksDBNativeMetricOptions options, ReadableConfig config) {
+ for (Map.Entry<ConfigOption<Boolean>, TickerType> entry : tickerTypeMapping.entrySet()) {
+ if (config.get(entry.getKey())) {
+ options.monitorTickerTypes.add(entry.getValue());
+ }
+ }
}
+ private static final Map<ConfigOption<Boolean>, TickerType> tickerTypeMapping =
+ new HashMap<ConfigOption<Boolean>, TickerType>() {
+ private static final long serialVersionUID = 1L;
+
+ {
+ put(MONITOR_BLOCK_CACHE_HIT, TickerType.BLOCK_CACHE_HIT);
+ put(MONITOR_BLOCK_CACHE_MISS, TickerType.BLOCK_CACHE_MISS);
+ put(MONITOR_BYTES_READ, TickerType.BYTES_READ);
+ put(MONITOR_ITER_BYTES_READ, TickerType.ITER_BYTES_READ);
+ put(MONITOR_BYTES_WRITTEN, TickerType.BYTES_WRITTEN);
+ put(MONITOR_COMPACTION_READ_BYTES, TickerType.COMPACT_READ_BYTES);
+ put(MONITOR_COMPACTION_WRITE_BYTES, TickerType.COMPACT_WRITE_BYTES);
+ put(MONITOR_STALL_MICROS, TickerType.STALL_MICROS);
+ }
+ };
+
private final Set<String> properties;
+ private final Set<TickerType> monitorTickerTypes;
private boolean columnFamilyAsVariable = COLUMN_FAMILY_AS_VARIABLE.defaultValue();
public RocksDBNativeMetricOptions() {
this.properties = new HashSet<>();
+ this.monitorTickerTypes = new HashSet<>();
+ }
+
+ @VisibleForTesting
+ public void enableNativeStatistics(ConfigOption<Boolean> nativeStatisticsOption) {
+ TickerType tickerType = tickerTypeMapping.get(nativeStatisticsOption);
+ if (tickerType != null) {
+ monitorTickerTypes.add(tickerType);
+ } else {
+ throw new IllegalArgumentException(
+ "Unknown configurable native statistics option " + nativeStatisticsOption);
+ }
}
/** Returns number of immutable memtables that have not yet been flushed. */
@@ -501,18 +617,28 @@ public class RocksDBNativeMetricOptions implements Serializable {
this.columnFamilyAsVariable = columnFamilyAsVariable;
}
- /** @return the enabled RocksDB metrics */
+ /** @return the enabled RocksDB property-based metrics */
public Collection<String> getProperties() {
return Collections.unmodifiableCollection(properties);
}
+ /** @return the enabled RocksDB statistics metrics. */
+ public Collection<TickerType> getMonitorTickerTypes() {
+ return Collections.unmodifiableCollection(monitorTickerTypes);
+ }
+
/**
- * {{@link RocksDBNativeMetricMonitor}} is enabled is any property is set.
+ * {{@link RocksDBNativeMetricMonitor}} is enabled if any property or ticker type is set.
*
* @return true if {{RocksDBNativeMetricMonitor}} should be enabled, false otherwise.
*/
public boolean isEnabled() {
- return !properties.isEmpty();
+ return !properties.isEmpty() || isStatisticsEnabled();
+ }
+
+ /** @return true if RocksDB statistics metrics are enabled, false otherwise. */
+ public boolean isStatisticsEnabled() {
+ return !monitorTickerTypes.isEmpty();
}
/**
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
index 50752a817ac..ed652980d04 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
@@ -18,6 +18,7 @@
package org.apache.flink.contrib.streaming.state;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
@@ -34,6 +35,7 @@ import org.rocksdb.Filter;
import org.rocksdb.IndexType;
import org.rocksdb.PlainTableConfig;
import org.rocksdb.ReadOptions;
+import org.rocksdb.Statistics;
import org.rocksdb.TableFormatConfig;
import org.rocksdb.WriteOptions;
import org.slf4j.Logger;
@@ -73,35 +75,42 @@ public final class RocksDBResourceContainer implements AutoCloseable {
*/
@Nullable private final OpaqueMemoryResource<RocksDBSharedResources> sharedResources;
+ private final boolean enableStatistics;
+
/** The handles to be closed when the container is closed. */
private final ArrayList<AutoCloseable> handlesToClose;
+ @VisibleForTesting
public RocksDBResourceContainer() {
- this(new Configuration(), PredefinedOptions.DEFAULT, null, null);
+ this(new Configuration(), PredefinedOptions.DEFAULT, null, null, false);
}
+ @VisibleForTesting
public RocksDBResourceContainer(
PredefinedOptions predefinedOptions, @Nullable RocksDBOptionsFactory optionsFactory) {
- this(new Configuration(), predefinedOptions, optionsFactory, null);
+ this(new Configuration(), predefinedOptions, optionsFactory, null, false);
}
+ @VisibleForTesting
public RocksDBResourceContainer(
PredefinedOptions predefinedOptions,
@Nullable RocksDBOptionsFactory optionsFactory,
@Nullable OpaqueMemoryResource<RocksDBSharedResources> sharedResources) {
- this(new Configuration(), predefinedOptions, optionsFactory, sharedResources);
+ this(new Configuration(), predefinedOptions, optionsFactory, sharedResources, false);
}
public RocksDBResourceContainer(
ReadableConfig configuration,
PredefinedOptions predefinedOptions,
@Nullable RocksDBOptionsFactory optionsFactory,
- @Nullable OpaqueMemoryResource<RocksDBSharedResources> sharedResources) {
+ @Nullable OpaqueMemoryResource<RocksDBSharedResources> sharedResources,
+ boolean enableStatistics) {
this.configuration = configuration;
this.predefinedOptions = checkNotNull(predefinedOptions);
this.optionsFactory = optionsFactory;
this.sharedResources = sharedResources;
+ this.enableStatistics = enableStatistics;
this.handlesToClose = new ArrayList<>();
}
@@ -127,6 +136,12 @@ public final class RocksDBResourceContainer implements AutoCloseable {
opt.setWriteBufferManager(sharedResources.getResourceHandle().getWriteBufferManager());
}
+ if (enableStatistics) {
+ Statistics statistics = new Statistics();
+ opt.setStatistics(statistics);
+ handlesToClose.add(statistics);
+ }
+
return opt;
}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java
index 9e9a12d7760..6c1d4625fb6 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java
@@ -143,7 +143,8 @@ class RocksDBHandle implements AutoCloseable {
// init native metrics monitor if configured
nativeMetricMonitor =
nativeMetricOptions.isEnabled()
- ? new RocksDBNativeMetricMonitor(nativeMetricOptions, metricGroup, db)
+ ? new RocksDBNativeMetricMonitor(
+ nativeMetricOptions, metricGroup, db, dbOptions.statistics())
: null;
}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitorTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitorTest.java
index f2b95870e19..a03de345e01 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitorTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitorTest.java
@@ -31,9 +31,11 @@ import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDBException;
import java.math.BigInteger;
import java.util.ArrayList;
+import java.util.List;
/** validate native metric monitor. */
public class RocksDBNativeMetricMonitorTest {
@@ -42,14 +44,14 @@ public class RocksDBNativeMetricMonitorTest {
private static final String COLUMN_FAMILY_NAME = "column-family";
- @Rule public RocksDBResource rocksDBResource = new RocksDBResource();
+ @Rule public RocksDBResource rocksDBResource = new RocksDBResource(true);
@Test
public void testMetricMonitorLifecycle() throws Throwable {
// We use a local variable here to manually control the life-cycle.
// This allows us to verify that metrics do not try to access
// RocksDB after the monitor was closed.
- RocksDBResource localRocksDBResource = new RocksDBResource();
+ RocksDBResource localRocksDBResource = new RocksDBResource(true);
localRocksDBResource.before();
SimpleMetricRegistry registry = new SimpleMetricRegistry();
@@ -64,24 +66,38 @@ public class RocksDBNativeMetricMonitorTest {
// value since empty memtables
// have overhead.
options.enableSizeAllMemTables();
+ options.enableNativeStatistics(RocksDBNativeMetricOptions.MONITOR_BYTES_WRITTEN);
RocksDBNativeMetricMonitor monitor =
- new RocksDBNativeMetricMonitor(options, group, localRocksDBResource.getRocksDB());
+ new RocksDBNativeMetricMonitor(
+ options,
+ group,
+ localRocksDBResource.getRocksDB(),
+ localRocksDBResource.getDbOptions().statistics());
ColumnFamilyHandle handle = localRocksDBResource.createNewColumnFamily(COLUMN_FAMILY_NAME);
monitor.registerColumnFamily(COLUMN_FAMILY_NAME, handle);
Assert.assertEquals(
- "Failed to register metrics for column family", 1, registry.metrics.size());
-
- RocksDBNativeMetricMonitor.RocksDBNativeMetricView view = registry.metrics.get(0);
+ "Failed to register metrics for column family", 1, registry.propertyMetrics.size());
- view.update();
+ // write something to ensure the bytes-written is not zero.
+ localRocksDBResource.getRocksDB().put(new byte[4], new byte[10]);
- Assert.assertNotEquals(
- "Failed to pull metric from RocksDB", BigInteger.ZERO, view.getValue());
+ for (RocksDBNativeMetricMonitor.RocksDBNativePropertyMetricView view :
+ registry.propertyMetrics) {
+ view.update();
+ Assert.assertNotEquals(
+ "Failed to pull metric from RocksDB", BigInteger.ZERO, view.getValue());
+ view.setValue(0L);
+ }
- view.setValue(0L);
+ for (RocksDBNativeMetricMonitor.RocksDBNativeStatisticsMetricView view :
+ registry.statisticsMetrics) {
+ view.update();
+ Assert.assertNotEquals(0L, (long) view.getValue());
+ view.setValue(0L);
+ }
// After the monitor is closed no metric should be accessing RocksDB anymore.
// If they do, then this test will likely fail with a segmentation fault.
@@ -89,10 +105,18 @@ public class RocksDBNativeMetricMonitorTest {
localRocksDBResource.after();
- view.update();
+ for (RocksDBNativeMetricMonitor.RocksDBNativePropertyMetricView view :
+ registry.propertyMetrics) {
+ view.update();
+ Assert.assertEquals(
+ "Failed to release RocksDB reference", BigInteger.ZERO, view.getValue());
+ }
- Assert.assertEquals(
- "Failed to release RocksDB reference", BigInteger.ZERO, view.getValue());
+ for (RocksDBNativeMetricMonitor.RocksDBNativeStatisticsMetricView view :
+ registry.statisticsMetrics) {
+ view.update();
+ Assert.assertEquals(0L, (long) view.getValue());
+ }
}
@Test
@@ -111,11 +135,16 @@ public class RocksDBNativeMetricMonitorTest {
options.enableSizeAllMemTables();
RocksDBNativeMetricMonitor monitor =
- new RocksDBNativeMetricMonitor(options, group, localRocksDBResource.getRocksDB());
+ new RocksDBNativeMetricMonitor(
+ options,
+ group,
+ localRocksDBResource.getRocksDB(),
+ localRocksDBResource.getDbOptions().statistics());
ColumnFamilyHandle handle = rocksDBResource.createNewColumnFamily(COLUMN_FAMILY_NAME);
monitor.registerColumnFamily(COLUMN_FAMILY_NAME, handle);
- RocksDBNativeMetricMonitor.RocksDBNativeMetricView view = registry.metrics.get(0);
+ RocksDBNativeMetricMonitor.RocksDBNativePropertyMetricView view =
+ registry.propertyMetrics.get(0);
view.setValue(-1);
BigInteger result = view.getValue();
@@ -127,7 +156,7 @@ public class RocksDBNativeMetricMonitorTest {
}
@Test
- public void testClosedGaugesDontRead() {
+ public void testClosedGaugesDontRead() throws RocksDBException {
SimpleMetricRegistry registry = new SimpleMetricRegistry();
GenericMetricGroup group =
new GenericMetricGroup(
@@ -137,23 +166,42 @@ public class RocksDBNativeMetricMonitorTest {
RocksDBNativeMetricOptions options = new RocksDBNativeMetricOptions();
options.enableSizeAllMemTables();
+ options.enableNativeStatistics(RocksDBNativeMetricOptions.MONITOR_BLOCK_CACHE_HIT);
RocksDBNativeMetricMonitor monitor =
- new RocksDBNativeMetricMonitor(options, group, rocksDBResource.getRocksDB());
+ new RocksDBNativeMetricMonitor(
+ options,
+ group,
+ rocksDBResource.getRocksDB(),
+ rocksDBResource.getDbOptions().statistics());
ColumnFamilyHandle handle = rocksDBResource.createNewColumnFamily(COLUMN_FAMILY_NAME);
monitor.registerColumnFamily(COLUMN_FAMILY_NAME, handle);
- RocksDBNativeMetricMonitor.RocksDBNativeMetricView view = registry.metrics.get(0);
+ rocksDBResource.getRocksDB().put(new byte[4], new byte[10]);
- view.close();
- view.update();
+ for (RocksDBNativeMetricMonitor.RocksDBNativePropertyMetricView view :
+ registry.propertyMetrics) {
+ view.close();
+ view.update();
+ Assert.assertEquals(
+ "Closed gauge still queried RocksDB", BigInteger.ZERO, view.getValue());
+ }
- Assert.assertEquals("Closed gauge still queried RocksDB", BigInteger.ZERO, view.getValue());
+ for (RocksDBNativeMetricMonitor.RocksDBNativeStatisticsMetricView view :
+ registry.statisticsMetrics) {
+ view.close();
+ view.update();
+ Assert.assertEquals("Closed gauge still queried RocksDB", 0L, (long) view.getValue());
+ }
}
static class SimpleMetricRegistry implements MetricRegistry {
- ArrayList<RocksDBNativeMetricMonitor.RocksDBNativeMetricView> metrics = new ArrayList<>();
+ List<RocksDBNativeMetricMonitor.RocksDBNativePropertyMetricView> propertyMetrics =
+ new ArrayList<>();
+
+ List<RocksDBNativeMetricMonitor.RocksDBNativeStatisticsMetricView> statisticsMetrics =
+ new ArrayList<>();
@Override
public char getDelimiter() {
@@ -167,8 +215,13 @@ public class RocksDBNativeMetricMonitorTest {
@Override
public void register(Metric metric, String metricName, AbstractMetricGroup group) {
- if (metric instanceof RocksDBNativeMetricMonitor.RocksDBNativeMetricView) {
- metrics.add((RocksDBNativeMetricMonitor.RocksDBNativeMetricView) metric);
+ if (metric instanceof RocksDBNativeMetricMonitor.RocksDBNativePropertyMetricView) {
+ propertyMetrics.add(
+ (RocksDBNativeMetricMonitor.RocksDBNativePropertyMetricView) metric);
+ } else if (metric
+ instanceof RocksDBNativeMetricMonitor.RocksDBNativeStatisticsMetricView) {
+ statisticsMetrics.add(
+ (RocksDBNativeMetricMonitor.RocksDBNativeStatisticsMetricView) metric);
}
}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java
index 66b6221c179..238489cb8e1 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java
@@ -30,6 +30,7 @@ import org.rocksdb.DBOptions;
import org.rocksdb.InfoLogLevel;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
+import org.rocksdb.Statistics;
import org.rocksdb.WriteOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,6 +51,8 @@ public class RocksDBResource extends ExternalResource {
/** Factory for {@link DBOptions} and {@link ColumnFamilyOptions}. */
private final RocksDBOptionsFactory optionsFactory;
+ private final boolean enableStatistics;
+
/** Temporary folder that provides the working directory for the RocksDB instance. */
private TemporaryFolder temporaryFolder;
@@ -78,8 +81,14 @@ public class RocksDBResource extends ExternalResource {
private ArrayList<AutoCloseable> handlesToClose = new ArrayList<>();
public RocksDBResource() {
+ this(false);
+ }
+
+ public RocksDBResource(boolean enableStatistics) {
this(
new RocksDBOptionsFactory() {
+ private static final long serialVersionUID = 1L;
+
@Override
public DBOptions createDBOptions(
DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
@@ -111,11 +120,14 @@ public class RocksDBResource extends ExternalResource {
return new ColumnFamilyOptions().optimizeForPointLookup(40960);
}
- });
+ },
+ enableStatistics);
}
- public RocksDBResource(@Nonnull RocksDBOptionsFactory optionsFactory) {
+ public RocksDBResource(
+ @Nonnull RocksDBOptionsFactory optionsFactory, boolean enableStatistics) {
this.optionsFactory = optionsFactory;
+ this.enableStatistics = enableStatistics;
}
public ColumnFamilyHandle getDefaultColumnFamily() {
@@ -134,6 +146,10 @@ public class RocksDBResource extends ExternalResource {
return readOptions;
}
+ public DBOptions getDbOptions() {
+ return dbOptions;
+ }
+
public RocksDBWriteBatchWrapper getBatchWrapper() {
return batchWrapper;
}
@@ -165,6 +181,11 @@ public class RocksDBResource extends ExternalResource {
.setStatsDumpPeriodSec(0),
handlesToClose)
.setCreateIfMissing(true);
+ if (enableStatistics) {
+ Statistics statistics = new Statistics();
+ dbOptions.setStatistics(statistics);
+ handlesToClose.add(statistics);
+ }
this.columnFamilyOptions =
optionsFactory.createColumnOptions(new ColumnFamilyOptions(), handlesToClose);
this.writeOptions = new WriteOptions();
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index 8f8dfc4093b..e16d1504812 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -531,7 +531,7 @@ public class RocksDBStateBackendConfigTest {
try (RocksDBResourceContainer optionsContainer =
new RocksDBResourceContainer(
- configuration, PredefinedOptions.DEFAULT, null, null)) {
+ configuration, PredefinedOptions.DEFAULT, null, null, false)) {
DBOptions dbOptions = optionsContainer.getDbOptions();
assertEquals(-1, dbOptions.maxOpenFiles());
@@ -610,7 +610,11 @@ public class RocksDBStateBackendConfigTest {
configuration.set(RocksDBConfigurableOptions.COMPACTION_STYLE, CompactionStyle.UNIVERSAL);
try (final RocksDBResourceContainer optionsContainer =
new RocksDBResourceContainer(
- configuration, PredefinedOptions.SPINNING_DISK_OPTIMIZED, null, null)) {
+ configuration,
+ PredefinedOptions.SPINNING_DISK_OPTIMIZED,
+ null,
+ null,
+ false)) {
final ColumnFamilyOptions columnFamilyOptions = optionsContainer.getColumnOptions();
assertNotNull(columnFamilyOptions);
@@ -622,7 +626,8 @@ public class RocksDBStateBackendConfigTest {
new Configuration(),
PredefinedOptions.SPINNING_DISK_OPTIMIZED,
null,
- null)) {
+ null,
+ false)) {
final ColumnFamilyOptions columnFamilyOptions = optionsContainer.getColumnOptions();
assertNotNull(columnFamilyOptions);