You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/09/02 22:36:53 UTC
[kafka] branch trunk updated: KAFKA-9924: Add remaining
property-based RocksDB metrics as described in KIP-607 (#9232)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c04000c KAFKA-9924: Add remaining property-based RocksDB metrics as described in KIP-607 (#9232)
c04000c is described below
commit c04000cab1e98c206c5410ef68d00df1d9129182
Author: Bruno Cadonna <br...@confluent.io>
AuthorDate: Thu Sep 3 00:32:17 2020 +0200
KAFKA-9924: Add remaining property-based RocksDB metrics as described in KIP-607 (#9232)
This commit adds the remaining property-based RocksDB metrics as described in KIP-607, except for num-entries-active-mem-table, which was added in PR #9177.
Reviewers: Guozhang Wang <wa...@gmail.com>
---
.../state/internals/metrics/RocksDBMetrics.java | 329 ++++++++++++++++++++-
.../internals/metrics/RocksDBMetricsRecorder.java | 225 ++++++++++++--
.../integration/RocksDBMetricsIntegrationTest.java | 45 ++-
.../internals/metrics/StreamsMetricsImplTest.java | 26 +-
.../streams/state/internals/RocksDBStoreTest.java | 64 +++-
.../metrics/RocksDBMetricsRecorderGaugesTest.java | 188 +++++++++++-
.../metrics/RocksDBMetricsRecorderTest.java | 161 +++++++++-
.../internals/metrics/RocksDBMetricsTest.java | 262 +++++++++++++++-
8 files changed, 1225 insertions(+), 75 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetrics.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetrics.java
index f0c25a8..998304e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetrics.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetrics.java
@@ -59,6 +59,27 @@ public class RocksDBMetrics {
private static final String NUMBER_OF_OPEN_FILES = "number-open-files";
private static final String NUMBER_OF_FILE_ERRORS = "number-file-errors";
static final String NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE = "num-entries-active-mem-table";
+ static final String NUMBER_OF_DELETES_ACTIVE_MEMTABLE = "num-deletes-active-mem-table";
+ static final String NUMBER_OF_ENTRIES_IMMUTABLE_MEMTABLES = "num-entries-imm-mem-tables";
+ static final String NUMBER_OF_DELETES_IMMUTABLE_MEMTABLES = "num-deletes-imm-mem-tables";
+ static final String NUMBER_OF_IMMUTABLE_MEMTABLES = "num-immutable-mem-table";
+ static final String CURRENT_SIZE_OF_ACTIVE_MEMTABLE = "cur-size-active-mem-table";
+ static final String CURRENT_SIZE_OF_ALL_MEMTABLES = "cur-size-all-mem-tables";
+ static final String SIZE_OF_ALL_MEMTABLES = "size-all-mem-tables";
+ static final String MEMTABLE_FLUSH_PENDING = "mem-table-flush-pending";
+ static final String NUMBER_OF_RUNNING_FLUSHES = "num-running-flushes";
+ static final String COMPACTION_PENDING = "compaction-pending";
+ static final String NUMBER_OF_RUNNING_COMPACTIONS = "num-running-compactions";
+ static final String ESTIMATED_BYTES_OF_PENDING_COMPACTION = "estimate-pending-compaction-bytes";
+ static final String TOTAL_SST_FILES_SIZE = "total-sst-files-size";
+ static final String LIVE_SST_FILES_SIZE = "live-sst-files-size";
+ static final String NUMBER_OF_LIVE_VERSIONS = "num-live-versions";
+ static final String CAPACITY_OF_BLOCK_CACHE = "block-cache-capacity";
+ static final String USAGE_OF_BLOCK_CACHE = "block-cache-usage";
+ static final String PINNED_USAGE_OF_BLOCK_CACHE = "block-cache-pinned-usage";
+ static final String ESTIMATED_NUMBER_OF_KEYS = "estimate-num-keys";
+ static final String ESTIMATED_MEMORY_OF_TABLE_READERS = "estimate-table-readers-mem";
+ static final String NUMBER_OF_BACKGROUND_ERRORS = "background-errors";
private static final String BYTES_WRITTEN_TO_DB_RATE_DESCRIPTION =
"Average number of bytes written per second to the RocksDB state store";
@@ -98,7 +119,43 @@ public class RocksDBMetrics {
private static final String NUMBER_OF_OPEN_FILES_DESCRIPTION = "Number of currently open files";
private static final String NUMBER_OF_FILE_ERRORS_DESCRIPTION = "Total number of file errors occurred";
private static final String NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE_DESCRIPTION =
- "Current total number of entries in the active memtable";
+ "Total number of entries in the active memtable";
+ private static final String NUMBER_OF_DELETES_ACTIVE_MEMTABLES_DESCRIPTION =
+ "Total number of delete entries in the active memtable";
+ private static final String NUMBER_OF_ENTRIES_IMMUTABLE_MEMTABLES_DESCRIPTION =
+ "Total number of entries in the unflushed immutable memtables";
+ private static final String NUMBER_OF_DELETES_IMMUTABLE_MEMTABLES_DESCRIPTION =
+ "Total number of delete entries in the unflushed immutable memtables";
+ private static final String NUMBER_OF_IMMUTABLE_MEMTABLES_DESCRIPTION =
+ "Number of immutable memtables that have not yet been flushed";
+ private static final String CURRENT_SIZE_OF_ACTIVE_MEMTABLE_DESCRIPTION =
+ "Approximate size of active memtable in bytes";
+ private static final String CURRENT_SIZE_OF_ALL_MEMTABLES_DESCRIPTION =
+ "Approximate size of active and unflushed immutable memtable in bytes";
+ private static final String SIZE_OF_ALL_MEMTABLES_DESCRIPTION =
+ "Approximate size of active, unflushed immutable, and pinned immutable memtables in bytes";
+ private static final String MEMTABLE_FLUSH_PENDING_DESCRIPTION =
+ "Reports 1 if a memtable flush is pending, otherwise it reports 0";
+ private static final String NUMBER_OF_RUNNING_FLUSHES_DESCRIPTION = "Number of currently running flushes";
+ private static final String COMPACTION_PENDING_DESCRIPTION =
+ "Reports 1 if at least one compaction is pending, otherwise it reports 0";
+ private static final String NUMBER_OF_RUNNING_COMPACTIONS_DESCRIPTION = "Number of currently running compactions";
+ private static final String ESTIMATED_BYTES_OF_PENDING_COMPACTION_DESCRIPTION =
+ "Estimated total number of bytes a compaction needs to rewrite on disk to get all levels down to under target size";
+ private static final String TOTAL_SST_FILE_SIZE_DESCRIPTION = "Total size in bytes of all SST files";
+ private static final String LIVE_SST_FILES_SIZE_DESCRIPTION =
+ "Total size in bytes of all SST files that belong to the latest LSM tree";
+ private static final String NUMBER_OF_LIVE_VERSIONS_DESCRIPTION = "Number of live versions";
+ private static final String CAPACITY_OF_BLOCK_CACHE_DESCRIPTION = "Capacity of the block cache in bytes";
+ private static final String USAGE_OF_BLOCK_CACHE_DESCRIPTION =
+ "Memory size of the entries residing in block cache in bytes";
+ private static final String PINNED_USAGE_OF_BLOCK_CACHE_DESCRIPTION =
+ "Memory size for the entries being pinned in the block cache in bytes";
+ private static final String ESTIMATED_NUMBER_OF_KEYS_DESCRIPTION =
+ "Estimated number of total keys in the active and unflushed immutable memtables and storage";
+ private static final String ESTIMATED_MEMORY_OF_TABLE_READERS_DESCRIPTION =
+ "Estimated memory in bytes used for reading SST tables, excluding memory used in block cache";
+ private static final String TOTAL_NUMBER_OF_BACKGROUND_ERRORS_DESCRIPTION = "Total number of background errors";
public static class RocksDBMetricContext {
private final String taskName;
@@ -457,12 +514,278 @@ public class RocksDBMetrics {
public static void addNumEntriesActiveMemTableMetric(final StreamsMetricsImpl streamsMetrics,
final RocksDBMetricContext metricContext,
final Gauge<BigInteger> valueProvider) {
+ addMutableMetric(
+ streamsMetrics,
+ metricContext,
+ valueProvider,
+ NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE,
+ NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE_DESCRIPTION
+ );
+ }
+
+ public static void addNumEntriesImmMemTablesMetric(final StreamsMetricsImpl streamsMetrics,
+ final RocksDBMetricContext metricContext,
+ final Gauge<BigInteger> valueProvider) {
+ addMutableMetric(
+ streamsMetrics,
+ metricContext,
+ valueProvider,
+ NUMBER_OF_ENTRIES_IMMUTABLE_MEMTABLES,
+ NUMBER_OF_ENTRIES_IMMUTABLE_MEMTABLES_DESCRIPTION
+ );
+ }
+
+ public static void addNumDeletesImmMemTablesMetric(final StreamsMetricsImpl streamsMetrics,
+ final RocksDBMetricContext metricContext,
+ final Gauge<BigInteger> valueProvider) {
+ addMutableMetric(
+ streamsMetrics,
+ metricContext,
+ valueProvider,
+ NUMBER_OF_DELETES_IMMUTABLE_MEMTABLES,
+ NUMBER_OF_DELETES_IMMUTABLE_MEMTABLES_DESCRIPTION
+ );
+ }
+
+ public static void addNumDeletesActiveMemTableMetric(final StreamsMetricsImpl streamsMetrics,
+ final RocksDBMetricContext metricContext,
+ final Gauge<BigInteger> valueProvider) {
+ addMutableMetric(
+ streamsMetrics,
+ metricContext,
+ valueProvider,
+ NUMBER_OF_DELETES_ACTIVE_MEMTABLE,
+ NUMBER_OF_DELETES_ACTIVE_MEMTABLES_DESCRIPTION
+ );
+ }
+
+ public static void addNumImmutableMemTableMetric(final StreamsMetricsImpl streamsMetrics,
+ final RocksDBMetricContext metricContext,
+ final Gauge<BigInteger> valueProvider) {
+ addMutableMetric(
+ streamsMetrics,
+ metricContext,
+ valueProvider,
+ NUMBER_OF_IMMUTABLE_MEMTABLES,
+ NUMBER_OF_IMMUTABLE_MEMTABLES_DESCRIPTION
+ );
+ }
+
+ public static void addCurSizeActiveMemTable(final StreamsMetricsImpl streamsMetrics,
+ final RocksDBMetricContext metricContext,
+ final Gauge<BigInteger> valueProvider) {
+ addMutableMetric(
+ streamsMetrics,
+ metricContext,
+ valueProvider,
+ CURRENT_SIZE_OF_ACTIVE_MEMTABLE,
+ CURRENT_SIZE_OF_ACTIVE_MEMTABLE_DESCRIPTION
+ );
+ }
+
+ public static void addCurSizeAllMemTables(final StreamsMetricsImpl streamsMetrics,
+ final RocksDBMetricContext metricContext,
+ final Gauge<BigInteger> valueProvider) {
+ addMutableMetric(
+ streamsMetrics,
+ metricContext,
+ valueProvider,
+ CURRENT_SIZE_OF_ALL_MEMTABLES,
+ CURRENT_SIZE_OF_ALL_MEMTABLES_DESCRIPTION
+ );
+ }
+
+ public static void addSizeAllMemTables(final StreamsMetricsImpl streamsMetrics,
+ final RocksDBMetricContext metricContext,
+ final Gauge<BigInteger> valueProvider) {
+ addMutableMetric(
+ streamsMetrics,
+ metricContext,
+ valueProvider,
+ SIZE_OF_ALL_MEMTABLES,
+ SIZE_OF_ALL_MEMTABLES_DESCRIPTION
+ );
+ }
+
+ public static void addMemTableFlushPending(final StreamsMetricsImpl streamsMetrics,
+ final RocksDBMetricContext metricContext,
+ final Gauge<BigInteger> valueProvider) {
+ addMutableMetric(
+ streamsMetrics,
+ metricContext,
+ valueProvider,
+ MEMTABLE_FLUSH_PENDING,
+ MEMTABLE_FLUSH_PENDING_DESCRIPTION
+ );
+ }
+
+ public static void addNumRunningFlushesMetric(final StreamsMetricsImpl streamsMetrics,
+ final RocksDBMetricContext metricContext,
+ final Gauge<BigInteger> valueProvider) {
+ addMutableMetric(
+ streamsMetrics,
+ metricContext,
+ valueProvider,
+ NUMBER_OF_RUNNING_FLUSHES,
+ NUMBER_OF_RUNNING_FLUSHES_DESCRIPTION
+ );
+ }
+
+ public static void addCompactionPendingMetric(final StreamsMetricsImpl streamsMetrics,
+ final RocksDBMetricContext metricContext,
+ final Gauge<BigInteger> valueProvider) {
+ addMutableMetric(
+ streamsMetrics,
+ metricContext,
+ valueProvider,
+ COMPACTION_PENDING,
+ COMPACTION_PENDING_DESCRIPTION
+ );
+ }
+
+ public static void addNumRunningCompactionsMetric(final StreamsMetricsImpl streamsMetrics,
+ final RocksDBMetricContext metricContext,
+ final Gauge<BigInteger> valueProvider) {
+ addMutableMetric(
+ streamsMetrics,
+ metricContext,
+ valueProvider,
+ NUMBER_OF_RUNNING_COMPACTIONS,
+ NUMBER_OF_RUNNING_COMPACTIONS_DESCRIPTION
+ );
+ }
+
+ public static void addEstimatePendingCompactionBytesMetric(final StreamsMetricsImpl streamsMetrics,
+ final RocksDBMetricContext metricContext,
+ final Gauge<BigInteger> valueProvider) {
+ addMutableMetric(
+ streamsMetrics,
+ metricContext,
+ valueProvider,
+ ESTIMATED_BYTES_OF_PENDING_COMPACTION,
+ ESTIMATED_BYTES_OF_PENDING_COMPACTION_DESCRIPTION
+ );
+ }
+
+ public static void addTotalSstFilesSizeMetric(final StreamsMetricsImpl streamsMetrics,
+ final RocksDBMetricContext metricContext,
+ final Gauge<BigInteger> valueProvider) {
+ addMutableMetric(
+ streamsMetrics,
+ metricContext,
+ valueProvider,
+ TOTAL_SST_FILES_SIZE,
+ TOTAL_SST_FILE_SIZE_DESCRIPTION
+ );
+ }
+
+ public static void addLiveSstFilesSizeMetric(final StreamsMetricsImpl streamsMetrics,
+ final RocksDBMetricContext metricContext,
+ final Gauge<BigInteger> valueProvider) {
+ addMutableMetric(
+ streamsMetrics,
+ metricContext,
+ valueProvider,
+ LIVE_SST_FILES_SIZE,
+ LIVE_SST_FILES_SIZE_DESCRIPTION
+ );
+ }
+
+ public static void addNumLiveVersionMetric(final StreamsMetricsImpl streamsMetrics,
+ final RocksDBMetricContext metricContext,
+ final Gauge<BigInteger> valueProvider) {
+ addMutableMetric(
+ streamsMetrics,
+ metricContext,
+ valueProvider,
+ NUMBER_OF_LIVE_VERSIONS,
+ NUMBER_OF_LIVE_VERSIONS_DESCRIPTION
+ );
+ }
+
+ public static void addBlockCacheCapacityMetric(final StreamsMetricsImpl streamsMetrics,
+ final RocksDBMetricContext metricContext,
+ final Gauge<BigInteger> valueProvider) {
+ addMutableMetric(
+ streamsMetrics,
+ metricContext,
+ valueProvider,
+ CAPACITY_OF_BLOCK_CACHE,
+ CAPACITY_OF_BLOCK_CACHE_DESCRIPTION
+ );
+ }
+
+ public static void addBlockCacheUsageMetric(final StreamsMetricsImpl streamsMetrics,
+ final RocksDBMetricContext metricContext,
+ final Gauge<BigInteger> valueProvider) {
+ addMutableMetric(
+ streamsMetrics,
+ metricContext,
+ valueProvider,
+ USAGE_OF_BLOCK_CACHE,
+ USAGE_OF_BLOCK_CACHE_DESCRIPTION
+ );
+ }
+
+ public static void addBlockCachePinnedUsageMetric(final StreamsMetricsImpl streamsMetrics,
+ final RocksDBMetricContext metricContext,
+ final Gauge<BigInteger> valueProvider) {
+ addMutableMetric(
+ streamsMetrics,
+ metricContext,
+ valueProvider,
+ PINNED_USAGE_OF_BLOCK_CACHE,
+ PINNED_USAGE_OF_BLOCK_CACHE_DESCRIPTION
+ );
+ }
+
+ public static void addEstimateNumKeysMetric(final StreamsMetricsImpl streamsMetrics,
+ final RocksDBMetricContext metricContext,
+ final Gauge<BigInteger> valueProvider) {
+ addMutableMetric(
+ streamsMetrics,
+ metricContext,
+ valueProvider,
+ ESTIMATED_NUMBER_OF_KEYS,
+ ESTIMATED_NUMBER_OF_KEYS_DESCRIPTION
+ );
+ }
+
+ public static void addEstimateTableReadersMemMetric(final StreamsMetricsImpl streamsMetrics,
+ final RocksDBMetricContext metricContext,
+ final Gauge<BigInteger> valueProvider) {
+ addMutableMetric(
+ streamsMetrics,
+ metricContext,
+ valueProvider,
+ ESTIMATED_MEMORY_OF_TABLE_READERS,
+ ESTIMATED_MEMORY_OF_TABLE_READERS_DESCRIPTION
+ );
+ }
+
+ public static void addBackgroundErrorsMetric(final StreamsMetricsImpl streamsMetrics,
+ final RocksDBMetricContext metricContext,
+ final Gauge<BigInteger> valueProvider) {
+ addMutableMetric(
+ streamsMetrics,
+ metricContext,
+ valueProvider,
+ NUMBER_OF_BACKGROUND_ERRORS,
+ TOTAL_NUMBER_OF_BACKGROUND_ERRORS_DESCRIPTION
+ );
+ }
+
+ private static void addMutableMetric(final StreamsMetricsImpl streamsMetrics,
+ final RocksDBMetricContext metricContext,
+ final Gauge<BigInteger> valueProvider,
+ final String name,
+ final String description) {
streamsMetrics.addStoreLevelMutableMetric(
metricContext.taskName(),
metricContext.metricsScope(),
metricContext.storeName(),
- NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE,
- NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE_DESCRIPTION,
+ name,
+ description,
RecordingLevel.INFO,
valueProvider
);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java
index 8fb0a31..85412d1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals.metrics;
+import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.ProcessorStateException;
@@ -32,11 +33,33 @@ import org.slf4j.Logger;
import java.math.BigInteger;
import java.nio.ByteBuffer;
+import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.CAPACITY_OF_BLOCK_CACHE;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.COMPACTION_PENDING;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.CURRENT_SIZE_OF_ACTIVE_MEMTABLE;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.CURRENT_SIZE_OF_ALL_MEMTABLES;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.ESTIMATED_BYTES_OF_PENDING_COMPACTION;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.ESTIMATED_MEMORY_OF_TABLE_READERS;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.ESTIMATED_NUMBER_OF_KEYS;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.LIVE_SST_FILES_SIZE;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.MEMTABLE_FLUSH_PENDING;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.NUMBER_OF_DELETES_ACTIVE_MEMTABLE;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.NUMBER_OF_DELETES_IMMUTABLE_MEMTABLES;
import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.NUMBER_OF_ENTRIES_IMMUTABLE_MEMTABLES;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.NUMBER_OF_IMMUTABLE_MEMTABLES;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.NUMBER_OF_LIVE_VERSIONS;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.NUMBER_OF_RUNNING_COMPACTIONS;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.NUMBER_OF_RUNNING_FLUSHES;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.PINNED_USAGE_OF_BLOCK_CACHE;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.SIZE_OF_ALL_MEMTABLES;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.NUMBER_OF_BACKGROUND_ERRORS;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.TOTAL_SST_FILES_SIZE;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.USAGE_OF_BLOCK_CACHE;
public class RocksDBMetricsRecorder {
@@ -64,6 +87,7 @@ public class RocksDBMetricsRecorder {
private static final String ROCKSDB_PROPERTIES_PREFIX = "rocksdb.";
+
private final Logger logger;
private Sensor bytesWrittenToDatabaseSensor;
@@ -84,6 +108,7 @@ public class RocksDBMetricsRecorder {
private final String storeName;
private TaskId taskId;
private StreamsMetricsImpl streamsMetrics;
+ private boolean singleCache = true;
public RocksDBMetricsRecorder(final String metricsScope,
final String storeName) {
@@ -133,26 +158,51 @@ public class RocksDBMetricsRecorder {
logger.debug("Adding metrics recorder of task {} to metrics recording trigger", taskId);
streamsMetrics.rocksDBMetricsRecordingTrigger().addMetricsRecorder(this);
} else if (storeToValueProviders.containsKey(segmentName)) {
- throw new IllegalStateException("Value providers for store \"" + segmentName + "\" of task " + taskId +
+ throw new IllegalStateException("Value providers for store " + segmentName + " of task " + taskId +
" has been already added. This is a bug in Kafka Streams. " +
"Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues");
}
- verifyStatistics(segmentName, statistics);
+ verifyDbAndCacheAndStatistics(segmentName, db, cache, statistics);
logger.debug("Adding value providers for store {} of task {}", segmentName, taskId);
storeToValueProviders.put(segmentName, new DbAndCacheAndStatistics(db, cache, statistics));
}
- private void verifyStatistics(final String segmentName, final Statistics statistics) {
- if (!storeToValueProviders.isEmpty() && (
- statistics == null &&
- storeToValueProviders.values().stream().anyMatch(valueProviders -> valueProviders.statistics != null)
- ||
- statistics != null &&
- storeToValueProviders.values().stream().anyMatch(valueProviders -> valueProviders.statistics == null))) {
+ private void verifyDbAndCacheAndStatistics(final String segmentName,
+ final RocksDB db,
+ final Cache cache,
+ final Statistics statistics) {
+ for (final DbAndCacheAndStatistics valueProviders : storeToValueProviders.values()) {
+ verifyConsistencyOfValueProvidersAcrossSegments(segmentName, statistics, valueProviders.statistics, "statistics");
+ verifyConsistencyOfValueProvidersAcrossSegments(segmentName, cache, valueProviders.cache, "cache");
+ if (db == valueProviders.db) {
+ throw new IllegalStateException("DB instance for store " + segmentName + " of task " + taskId +
+ " was already added for another segment as a value provider. This is a bug in Kafka Streams. " +
+ "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues");
+ }
+ if (storeToValueProviders.size() == 1 && cache != valueProviders.cache) {
+ singleCache = false;
+ } else if (singleCache && cache != valueProviders.cache || !singleCache && cache == valueProviders.cache) {
+ throw new IllegalStateException("Caches for store " + storeName + " of task " + taskId +
+ " are either not all distinct or do not all refer to the same cache. This is a bug in Kafka Streams. " +
+ "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues");
+ }
+ }
+ }
+
+ private void verifyConsistencyOfValueProvidersAcrossSegments(final String segmentName,
+ final Object newValueProvider,
+ final Object oldValueProvider,
+ final String valueProviderName) {
+ if (newValueProvider == null && oldValueProvider != null ||
+ newValueProvider != null && oldValueProvider == null) {
- throw new IllegalStateException("Statistics for store \"" + segmentName + "\" of task " + taskId +
- " is" + (statistics == null ? " " : " not ") + "null although the statistics of another store in this " +
- "metrics recorder is" + (statistics != null ? " " : " not ") + "null. " +
+ final char capitalizedFirstChar = valueProviderName.toUpperCase(Locale.US).charAt(0);
+ final StringBuilder capitalizedValueProviderName = new StringBuilder(valueProviderName);
+ capitalizedValueProviderName.setCharAt(0, capitalizedFirstChar);
+ throw new IllegalStateException(capitalizedValueProviderName +
+ " for segment " + segmentName + " of task " + taskId +
+ " is" + (newValueProvider == null ? " " : " not ") + "null although the " + valueProviderName +
+ " of another segment in this metrics recorder is" + (newValueProvider != null ? " " : " not ") + "null. " +
"This is a bug in Kafka Streams. " +
"Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues");
}
@@ -174,22 +224,163 @@ public class RocksDBMetricsRecorder {
numberOfFileErrorsSensor = RocksDBMetrics.numberOfFileErrorsSensor(streamsMetrics, metricContext);
}
- private void initGauges(final StreamsMetricsImpl streamsMetrics, final RocksDBMetricContext metricContext) {
- RocksDBMetrics.addNumEntriesActiveMemTableMetric(streamsMetrics, metricContext, (metricsConfig, now) -> {
+ private void initGauges(final StreamsMetricsImpl streamsMetrics,
+ final RocksDBMetricContext metricContext) {
+ RocksDBMetrics.addNumImmutableMemTableMetric(
+ streamsMetrics,
+ metricContext,
+ gaugeToComputeSumOfProperties(NUMBER_OF_IMMUTABLE_MEMTABLES)
+ );
+ RocksDBMetrics.addCurSizeActiveMemTable(
+ streamsMetrics,
+ metricContext,
+ gaugeToComputeSumOfProperties(CURRENT_SIZE_OF_ACTIVE_MEMTABLE)
+ );
+ RocksDBMetrics.addCurSizeAllMemTables(
+ streamsMetrics,
+ metricContext,
+ gaugeToComputeSumOfProperties(CURRENT_SIZE_OF_ALL_MEMTABLES)
+ );
+ RocksDBMetrics.addSizeAllMemTables(
+ streamsMetrics,
+ metricContext,
+ gaugeToComputeSumOfProperties(SIZE_OF_ALL_MEMTABLES)
+ );
+ RocksDBMetrics.addNumEntriesActiveMemTableMetric(
+ streamsMetrics,
+ metricContext,
+ gaugeToComputeSumOfProperties(NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE)
+ );
+ RocksDBMetrics.addNumDeletesActiveMemTableMetric(
+ streamsMetrics,
+ metricContext,
+ gaugeToComputeSumOfProperties(NUMBER_OF_DELETES_ACTIVE_MEMTABLE)
+ );
+ RocksDBMetrics.addNumEntriesImmMemTablesMetric(
+ streamsMetrics,
+ metricContext,
+ gaugeToComputeSumOfProperties(NUMBER_OF_ENTRIES_IMMUTABLE_MEMTABLES)
+ );
+ RocksDBMetrics.addNumDeletesImmMemTablesMetric(
+ streamsMetrics,
+ metricContext,
+ gaugeToComputeSumOfProperties(NUMBER_OF_DELETES_IMMUTABLE_MEMTABLES)
+ );
+ RocksDBMetrics.addMemTableFlushPending(
+ streamsMetrics,
+ metricContext,
+ gaugeToComputeSumOfProperties(MEMTABLE_FLUSH_PENDING)
+ );
+ RocksDBMetrics.addNumRunningFlushesMetric(
+ streamsMetrics,
+ metricContext,
+ gaugeToComputeSumOfProperties(NUMBER_OF_RUNNING_FLUSHES)
+ );
+ RocksDBMetrics.addCompactionPendingMetric(
+ streamsMetrics,
+ metricContext,
+ gaugeToComputeSumOfProperties(COMPACTION_PENDING)
+ );
+ RocksDBMetrics.addNumRunningCompactionsMetric(
+ streamsMetrics,
+ metricContext,
+ gaugeToComputeSumOfProperties(NUMBER_OF_RUNNING_COMPACTIONS)
+ );
+ RocksDBMetrics.addEstimatePendingCompactionBytesMetric(
+ streamsMetrics,
+ metricContext,
+ gaugeToComputeSumOfProperties(ESTIMATED_BYTES_OF_PENDING_COMPACTION)
+ );
+ RocksDBMetrics.addTotalSstFilesSizeMetric(
+ streamsMetrics,
+ metricContext,
+ gaugeToComputeSumOfProperties(TOTAL_SST_FILES_SIZE)
+ );
+ RocksDBMetrics.addLiveSstFilesSizeMetric(
+ streamsMetrics,
+ metricContext,
+ gaugeToComputeSumOfProperties(LIVE_SST_FILES_SIZE)
+ );
+ RocksDBMetrics.addNumLiveVersionMetric(
+ streamsMetrics,
+ metricContext,
+ gaugeToComputeSumOfProperties(NUMBER_OF_LIVE_VERSIONS)
+ );
+ RocksDBMetrics.addEstimateNumKeysMetric(
+ streamsMetrics,
+ metricContext,
+ gaugeToComputeSumOfProperties(ESTIMATED_NUMBER_OF_KEYS)
+ );
+ RocksDBMetrics.addEstimateTableReadersMemMetric(
+ streamsMetrics,
+ metricContext,
+ gaugeToComputeSumOfProperties(ESTIMATED_MEMORY_OF_TABLE_READERS)
+ );
+ RocksDBMetrics.addBackgroundErrorsMetric(
+ streamsMetrics,
+ metricContext,
+ gaugeToComputeSumOfProperties(NUMBER_OF_BACKGROUND_ERRORS)
+ );
+ RocksDBMetrics.addBlockCacheCapacityMetric(
+ streamsMetrics,
+ metricContext,
+ gaugeToComputeBlockCacheMetrics(CAPACITY_OF_BLOCK_CACHE)
+ );
+ RocksDBMetrics.addBlockCacheUsageMetric(
+ streamsMetrics,
+ metricContext,
+ gaugeToComputeBlockCacheMetrics(USAGE_OF_BLOCK_CACHE)
+ );
+ RocksDBMetrics.addBlockCachePinnedUsageMetric(
+ streamsMetrics,
+ metricContext,
+ gaugeToComputeBlockCacheMetrics(PINNED_USAGE_OF_BLOCK_CACHE)
+ );
+ }
+
+ private Gauge<BigInteger> gaugeToComputeSumOfProperties(final String propertyName) {
+ return (metricsConfig, now) -> {
BigInteger result = BigInteger.valueOf(0);
for (final DbAndCacheAndStatistics valueProvider : storeToValueProviders.values()) {
try {
// values of RocksDB properties are of type unsigned long in C++, i.e., in Java we need to use
// BigInteger and construct the object from the byte representation of the value
result = result.add(new BigInteger(1, longToBytes(
- valueProvider.db.getAggregatedLongProperty(ROCKSDB_PROPERTIES_PREFIX + NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE))));
+ valueProvider.db.getAggregatedLongProperty(ROCKSDB_PROPERTIES_PREFIX + propertyName)
+ )));
+ } catch (final RocksDBException e) {
+ throw new ProcessorStateException("Error recording RocksDB metric " + propertyName, e);
+ }
+ }
+ return result;
+ };
+ }
+ private Gauge<BigInteger> gaugeToComputeBlockCacheMetrics(final String propertyName) {
+ return (metricsConfig, now) -> {
+ BigInteger result = BigInteger.valueOf(0);
+ for (final DbAndCacheAndStatistics valueProvider : storeToValueProviders.values()) {
+ try {
+ if (singleCache) {
+ // values of RocksDB properties are of type unsigned long in C++, i.e., in Java we need to use
+ // BigInteger and construct the object from the byte representation of the value
+ result = new BigInteger(1, longToBytes(
+ valueProvider.db.getAggregatedLongProperty(ROCKSDB_PROPERTIES_PREFIX + propertyName)
+ ));
+ break;
+ } else {
+ // values of RocksDB properties are of type unsigned long in C++, i.e., in Java we need to use
+ // BigInteger and construct the object from the byte representation of the value
+ result = result.add(new BigInteger(1, longToBytes(
+ valueProvider.db.getAggregatedLongProperty(ROCKSDB_PROPERTIES_PREFIX + propertyName)
+ )));
+ }
} catch (final RocksDBException e) {
- throw new ProcessorStateException("Error adding RocksDB metric " + NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE, e);
+ throw new ProcessorStateException("Error recording RocksDB metric " + propertyName, e);
}
}
return result;
- });
+ };
}
private static byte[] longToBytes(final long data) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
index 678a286..5917faa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
@@ -101,6 +101,27 @@ public class RocksDBMetricsIntegrationTest {
private static final String NUMBER_OF_OPEN_FILES = "number-open-files";
private static final String NUMBER_OF_FILE_ERRORS = "number-file-errors-total";
private static final String NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE = "num-entries-active-mem-table";
+ private static final String NUMBER_OF_DELETES_ACTIVE_MEMTABLE = "num-deletes-active-mem-table";
+ private static final String NUMBER_OF_ENTRIES_IMMUTABLE_MEMTABLES = "num-entries-imm-mem-tables";
+ private static final String NUMBER_OF_DELETES_IMMUTABLE_MEMTABLES = "num-deletes-imm-mem-tables";
+ private static final String NUMBER_OF_IMMUTABLE_MEMTABLES = "num-immutable-mem-table";
+ private static final String CURRENT_SIZE_OF_ACTIVE_MEMTABLE = "cur-size-active-mem-table";
+ private static final String CURRENT_SIZE_OF_ALL_MEMTABLES = "cur-size-all-mem-tables";
+ private static final String SIZE_OF_ALL_MEMTABLES = "size-all-mem-tables";
+ private static final String MEMTABLE_FLUSH_PENDING = "mem-table-flush-pending";
+ private static final String NUMBER_OF_RUNNING_FLUSHES = "num-running-flushes";
+ private static final String COMPACTION_PENDING = "compaction-pending";
+ private static final String NUMBER_OF_RUNNING_COMPACTIONS = "num-running-compactions";
+ private static final String ESTIMATED_BYTES_OF_PENDING_COMPACTION = "estimate-pending-compaction-bytes";
+ private static final String TOTAL_SST_FILES_SIZE = "total-sst-files-size";
+ private static final String LIVE_SST_FILES_SIZE = "live-sst-files-size";
+ private static final String NUMBER_OF_LIVE_VERSIONS = "num-live-versions";
+ private static final String CAPACITY_OF_BLOCK_CACHE = "block-cache-capacity";
+ private static final String USAGE_OF_BLOCK_CACHE = "block-cache-usage";
+ private static final String PINNED_USAGE_OF_BLOCK_CACHE = "block-cache-pinned-usage";
+ private static final String ESTIMATED_NUMBER_OF_KEYS = "estimate-num-keys";
+ private static final String ESTIMATED_MEMORY_OF_TABLE_READERS = "estimate-table-readers-mem";
+ private static final String NUMBER_OF_BACKGROUND_ERRORS = "background-errors";
@Parameters(name = "{0}")
public static Collection<Object[]> data() {
@@ -164,6 +185,7 @@ public class RocksDBMetricsIntegrationTest {
streamsConfiguration.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.DEBUG.name);
streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingGuarantee);
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
+ streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
return streamsConfiguration;
}
@@ -202,7 +224,7 @@ public class RocksDBMetricsIntegrationTest {
kafkaStreams.close();
}
- private void produceRecords() throws Exception {
+ private void produceRecords() {
final MockTime mockTime = new MockTime(WINDOW_SIZE.toMillis());
final Properties prop = TestUtils.producerConfig(
CLUSTER.bootstrapServers(),
@@ -257,6 +279,27 @@ public class RocksDBMetricsIntegrationTest {
checkMetricByName(listMetricStore, NUMBER_OF_OPEN_FILES, 1);
checkMetricByName(listMetricStore, NUMBER_OF_FILE_ERRORS, 1);
checkMetricByName(listMetricStore, NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE, 1);
+ checkMetricByName(listMetricStore, NUMBER_OF_DELETES_ACTIVE_MEMTABLE, 1);
+ checkMetricByName(listMetricStore, NUMBER_OF_ENTRIES_IMMUTABLE_MEMTABLES, 1);
+ checkMetricByName(listMetricStore, NUMBER_OF_DELETES_IMMUTABLE_MEMTABLES, 1);
+ checkMetricByName(listMetricStore, NUMBER_OF_IMMUTABLE_MEMTABLES, 1);
+ checkMetricByName(listMetricStore, CURRENT_SIZE_OF_ACTIVE_MEMTABLE, 1);
+ checkMetricByName(listMetricStore, CURRENT_SIZE_OF_ALL_MEMTABLES, 1);
+ checkMetricByName(listMetricStore, SIZE_OF_ALL_MEMTABLES, 1);
+ checkMetricByName(listMetricStore, MEMTABLE_FLUSH_PENDING, 1);
+ checkMetricByName(listMetricStore, NUMBER_OF_RUNNING_FLUSHES, 1);
+ checkMetricByName(listMetricStore, COMPACTION_PENDING, 1);
+ checkMetricByName(listMetricStore, NUMBER_OF_RUNNING_COMPACTIONS, 1);
+ checkMetricByName(listMetricStore, ESTIMATED_BYTES_OF_PENDING_COMPACTION, 1);
+ checkMetricByName(listMetricStore, TOTAL_SST_FILES_SIZE, 1);
+ checkMetricByName(listMetricStore, LIVE_SST_FILES_SIZE, 1);
+ checkMetricByName(listMetricStore, NUMBER_OF_LIVE_VERSIONS, 1);
+ checkMetricByName(listMetricStore, CAPACITY_OF_BLOCK_CACHE, 1);
+ checkMetricByName(listMetricStore, USAGE_OF_BLOCK_CACHE, 1);
+ checkMetricByName(listMetricStore, PINNED_USAGE_OF_BLOCK_CACHE, 1);
+ checkMetricByName(listMetricStore, ESTIMATED_NUMBER_OF_KEYS, 1);
+ checkMetricByName(listMetricStore, ESTIMATED_MEMORY_OF_TABLE_READERS, 1);
+ checkMetricByName(listMetricStore, NUMBER_OF_BACKGROUND_ERRORS, 1);
}
private void checkMetricByName(final List<Metric> listMetric,
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
index 8126ba6..3473467 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
@@ -106,6 +106,11 @@ public class StreamsMetricsImplTest {
private final static String STORE_ID_TAG = "-state-id";
private final static String STORE_NAME1 = "store1";
private final static String STORE_NAME2 = "store2";
+ private final static Map<String, String> STORE_LEVEL_TAG_MAP = mkMap(
+ mkEntry(THREAD_ID_TAG, Thread.currentThread().getName()),
+ mkEntry(TASK_ID_TAG, TASK_ID1),
+ mkEntry(SCOPE_NAME + STORE_ID_TAG, STORE_NAME1)
+ );
private final static String RECORD_CACHE_ID_TAG = "record-cache-id";
private final static String ENTITY_NAME = "test-entity";
private final static String OPERATION_NAME = "test-operation";
@@ -125,11 +130,6 @@ public class StreamsMetricsImplTest {
private final String group = "group";
private final Map<String, String> tags = mkMap(mkEntry("tag", "value"));
private final Map<String, String> clientLevelTags = mkMap(mkEntry(CLIENT_ID_TAG, CLIENT_ID));
- private final Map<String, String> storeLevelTagMap = mkMap(
- mkEntry(THREAD_ID_TAG, Thread.currentThread().getName()),
- mkEntry(TASK_ID_TAG, TASK_ID1),
- mkEntry(SCOPE_NAME + STORE_ID_TAG, STORE_NAME1)
- );
private final MetricName metricName1 =
new MetricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, DESCRIPTION1, clientLevelTags);
private final MetricName metricName2 =
@@ -427,9 +427,9 @@ public class StreamsMetricsImplTest {
public void shouldAddNewStoreLevelMutableMetric() {
final Metrics metrics = mock(Metrics.class);
final MetricName metricName =
- new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, storeLevelTagMap);
+ new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP);
final MetricConfig metricConfig = new MetricConfig().recordLevel(INFO_RECORDING_LEVEL);
- expect(metrics.metricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, storeLevelTagMap))
+ expect(metrics.metricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP))
.andReturn(metricName);
expect(metrics.metric(metricName)).andReturn(null);
metrics.addMetric(eq(metricName), eqMetricConfig(metricConfig), eq(VALUE_PROVIDER));
@@ -453,8 +453,8 @@ public class StreamsMetricsImplTest {
public void shouldNotAddStoreLevelMutableMetricIfAlreadyExists() {
final Metrics metrics = mock(Metrics.class);
final MetricName metricName =
- new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, storeLevelTagMap);
- expect(metrics.metricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, storeLevelTagMap))
+ new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP);
+ expect(metrics.metricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP))
.andReturn(metricName);
expect(metrics.metric(metricName)).andReturn(mock(KafkaMetric.class));
replay(metrics);
@@ -478,12 +478,12 @@ public class StreamsMetricsImplTest {
final Metrics metrics = niceMock(Metrics.class);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
final MetricName metricName1 =
- new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, storeLevelTagMap);
+ new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP);
final MetricName metricName2 =
- new MetricName(METRIC_NAME2, STATE_STORE_LEVEL_GROUP, DESCRIPTION2, storeLevelTagMap);
- expect(metrics.metricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, storeLevelTagMap))
+ new MetricName(METRIC_NAME2, STATE_STORE_LEVEL_GROUP, DESCRIPTION2, STORE_LEVEL_TAG_MAP);
+ expect(metrics.metricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP))
.andReturn(metricName1);
- expect(metrics.metricName(METRIC_NAME2, STATE_STORE_LEVEL_GROUP, DESCRIPTION2, storeLevelTagMap))
+ expect(metrics.metricName(METRIC_NAME2, STATE_STORE_LEVEL_GROUP, DESCRIPTION2, STORE_LEVEL_TAG_MAP))
.andReturn(metricName2);
final Capture<String> sensorKeys = addSensorsOnAllLevels(metrics, streamsMetrics);
resetToDefault(metrics);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index ff408c8..ca28181 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -58,6 +58,7 @@ import java.io.File;
import java.io.IOException;
import java.math.BigInteger;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -628,6 +629,9 @@ public class RocksDBStoreTest {
EasyMock.replay(context);
rocksDBStore.init(context, rocksDBStore);
+ final byte[] key = "hello".getBytes();
+ final byte[] value = "world".getBytes();
+ rocksDBStore.put(Bytes.wrap(key), value);
final Metric numberOfEntriesActiveMemTable = metrics.metric(new MetricName(
"num-entries-active-mem-table",
@@ -635,16 +639,62 @@ public class RocksDBStoreTest {
"description is not verified",
streamsMetrics.storeLevelTagMap(taskId.toString(), METRICS_SCOPE, DB_NAME)
));
-
assertThat(numberOfEntriesActiveMemTable, notNullValue());
- assertThat(numberOfEntriesActiveMemTable.metricValue(), is(BigInteger.valueOf(0)));
+ assertThat((BigInteger) numberOfEntriesActiveMemTable.metricValue(), greaterThan(BigInteger.valueOf(0)));
+ }
- final byte[] key = "hello".getBytes();
- final byte[] value = "world".getBytes();
- rocksDBStore.put(Bytes.wrap(key), value);
+ @Test
+ public void shouldVerifyThatPropertyBasedMetricsUseValidPropertyName() {
+ final TaskId taskId = new TaskId(0, 0);
- assertThat(numberOfEntriesActiveMemTable, notNullValue());
- assertThat((BigInteger) numberOfEntriesActiveMemTable.metricValue(), greaterThan(BigInteger.valueOf(0)));
+ final Metrics metrics = new Metrics(new MetricConfig().recordLevel(RecordingLevel.INFO));
+ final StreamsMetricsImpl streamsMetrics =
+ new StreamsMetricsImpl(metrics, "test-application", StreamsConfig.METRICS_LATEST, time);
+
+ final Properties props = StreamsTestUtils.getStreamsConfig();
+ context = EasyMock.niceMock(InternalMockProcessorContext.class);
+ EasyMock.expect(context.metrics()).andStubReturn(streamsMetrics);
+ EasyMock.expect(context.taskId()).andStubReturn(taskId);
+ EasyMock.expect(context.appConfigs()).andStubReturn(new StreamsConfig(props).originals());
+ EasyMock.expect(context.stateDir()).andStubReturn(dir);
+ EasyMock.replay(context);
+
+ rocksDBStore.init(context, rocksDBStore);
+
+ final List<String> propertyNames = Arrays.asList(
+ "num-entries-active-mem-table",
+ "num-deletes-active-mem-table",
+ "num-entries-imm-mem-tables",
+ "num-deletes-imm-mem-tables",
+ "num-immutable-mem-table",
+ "cur-size-active-mem-table",
+ "cur-size-all-mem-tables",
+ "size-all-mem-tables",
+ "mem-table-flush-pending",
+ "num-running-flushes",
+ "compaction-pending",
+ "num-running-compactions",
+ "estimate-pending-compaction-bytes",
+ "total-sst-files-size",
+ "live-sst-files-size",
+ "num-live-versions",
+ "block-cache-capacity",
+ "block-cache-usage",
+ "block-cache-pinned-usage",
+ "estimate-num-keys",
+ "estimate-table-readers-mem",
+ "background-errors"
+ );
+ for (final String propertyname : propertyNames) {
+ final Metric metric = metrics.metric(new MetricName(
+ propertyname,
+ StreamsMetricsImpl.STATE_STORE_LEVEL_GROUP,
+ "description is not verified",
+ streamsMetrics.storeLevelTagMap(taskId.toString(), METRICS_SCOPE, DB_NAME)
+ ));
+ assertThat("Metric " + propertyname + " not found!", metric, notNullValue());
+ metric.metricValue();
+ }
}
public static class MockRocksDbConfigSetter implements RocksDBConfigSetter {
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java
index 14ce729..2695b86 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java
@@ -38,7 +38,27 @@ import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetric
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.STORE_ID_TAG;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TASK_ID_TAG;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_ID_TAG;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.CAPACITY_OF_BLOCK_CACHE;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.COMPACTION_PENDING;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.CURRENT_SIZE_OF_ACTIVE_MEMTABLE;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.CURRENT_SIZE_OF_ALL_MEMTABLES;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.ESTIMATED_BYTES_OF_PENDING_COMPACTION;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.ESTIMATED_MEMORY_OF_TABLE_READERS;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.ESTIMATED_NUMBER_OF_KEYS;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.LIVE_SST_FILES_SIZE;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.MEMTABLE_FLUSH_PENDING;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.NUMBER_OF_DELETES_ACTIVE_MEMTABLE;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.NUMBER_OF_DELETES_IMMUTABLE_MEMTABLES;
import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.NUMBER_OF_ENTRIES_IMMUTABLE_MEMTABLES;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.NUMBER_OF_IMMUTABLE_MEMTABLES;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.NUMBER_OF_RUNNING_COMPACTIONS;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.NUMBER_OF_RUNNING_FLUSHES;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.PINNED_USAGE_OF_BLOCK_CACHE;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.SIZE_OF_ALL_MEMTABLES;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.NUMBER_OF_BACKGROUND_ERRORS;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.TOTAL_SST_FILES_SIZE;
+import static org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.USAGE_OF_BLOCK_CACHE;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.mock;
import static org.hamcrest.CoreMatchers.is;
@@ -62,20 +82,172 @@ public class RocksDBMetricsRecorderGaugesTest {
private final Statistics statisticsToAdd2 = mock(Statistics.class);
@Test
+ public void shouldGetNumberOfImmutableMemTables() throws Exception {
+ runAndVerifySumOfProperties(NUMBER_OF_IMMUTABLE_MEMTABLES);
+ }
+
+ @Test
+ public void shouldGetCurrentSizeofActiveMemTable() throws Exception {
+ runAndVerifySumOfProperties(CURRENT_SIZE_OF_ACTIVE_MEMTABLE);
+ }
+
+ @Test
+ public void shouldGetCurrentSizeofAllMemTables() throws Exception {
+ runAndVerifySumOfProperties(CURRENT_SIZE_OF_ALL_MEMTABLES);
+ }
+
+ @Test
+ public void shouldGetSizeofAllMemTables() throws Exception {
+ runAndVerifySumOfProperties(SIZE_OF_ALL_MEMTABLES);
+ }
+
+ @Test
public void shouldGetNumberOfEntriesActiveMemTable() throws Exception {
+ runAndVerifySumOfProperties(NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE);
+ }
+
+ @Test
+ public void shouldGetNumberOfDeletesActiveMemTable() throws Exception {
+ runAndVerifySumOfProperties(NUMBER_OF_DELETES_ACTIVE_MEMTABLE);
+ }
+
+ @Test
+ public void shouldGetNumberOfEntriesImmutableMemTables() throws Exception {
+ runAndVerifySumOfProperties(NUMBER_OF_ENTRIES_IMMUTABLE_MEMTABLES);
+ }
+
+ @Test
+ public void shouldGetNumberOfDeletesImmutableMemTables() throws Exception {
+ runAndVerifySumOfProperties(NUMBER_OF_DELETES_IMMUTABLE_MEMTABLES);
+ }
+
+ @Test
+ public void shouldGetMemTableFlushPending() throws Exception {
+ runAndVerifySumOfProperties(MEMTABLE_FLUSH_PENDING);
+ }
+
+ @Test
+ public void shouldGetNumberOfRunningFlushes() throws Exception {
+ runAndVerifySumOfProperties(NUMBER_OF_RUNNING_FLUSHES);
+ }
+
+ @Test
+ public void shouldGetCompactionPending() throws Exception {
+ runAndVerifySumOfProperties(COMPACTION_PENDING);
+ }
+
+ @Test
+ public void shouldGetNumberOfRunningCompactions() throws Exception {
+ runAndVerifySumOfProperties(NUMBER_OF_RUNNING_COMPACTIONS);
+ }
+
+ @Test
+ public void shouldGetEstimatedBytesOfPendingCompactions() throws Exception {
+ runAndVerifySumOfProperties(ESTIMATED_BYTES_OF_PENDING_COMPACTION);
+ }
+
+ @Test
+ public void shouldGetTotalSstFilesSize() throws Exception {
+ runAndVerifySumOfProperties(TOTAL_SST_FILES_SIZE);
+ }
+
+ @Test
+ public void shouldGetLiveSstFilesSize() throws Exception {
+ runAndVerifySumOfProperties(LIVE_SST_FILES_SIZE);
+ }
+
+ @Test
+ public void shouldGetEstimatedNumberOfKeys() throws Exception {
+ runAndVerifySumOfProperties(ESTIMATED_NUMBER_OF_KEYS);
+ }
+
+ @Test
+ public void shouldGetEstimatedMemoryOfTableReaders() throws Exception {
+ runAndVerifySumOfProperties(ESTIMATED_MEMORY_OF_TABLE_READERS);
+ }
+
+ @Test
+ public void shouldGetNumberOfBackgroundErrors() throws Exception {
+ runAndVerifySumOfProperties(NUMBER_OF_BACKGROUND_ERRORS);
+ }
+
+ @Test
+ public void shouldGetCapacityOfBlockCacheWithMultipleCaches() throws Exception {
+ runAndVerifyBlockCacheMetricsWithMultipleCaches(CAPACITY_OF_BLOCK_CACHE);
+ }
+
+ @Test
+ public void shouldGetCapacityOfBlockCacheWithSingleCache() throws Exception {
+ runAndVerifyBlockCacheMetricsWithSingleCache(CAPACITY_OF_BLOCK_CACHE);
+ }
+
+ @Test
+ public void shouldGetUsageOfBlockCacheWithMultipleCaches() throws Exception {
+ runAndVerifyBlockCacheMetricsWithMultipleCaches(USAGE_OF_BLOCK_CACHE);
+ }
+
+ @Test
+ public void shouldGetUsageOfBlockCacheWithSingleCache() throws Exception {
+ runAndVerifyBlockCacheMetricsWithSingleCache(USAGE_OF_BLOCK_CACHE);
+ }
+
+ @Test
+ public void shouldGetPinnedUsageOfBlockCacheWithMultipleCaches() throws Exception {
+ runAndVerifyBlockCacheMetricsWithMultipleCaches(PINNED_USAGE_OF_BLOCK_CACHE);
+ }
+
+ @Test
+ public void shouldGetPinnedUsageOfBlockCacheWithSingleCache() throws Exception {
+ runAndVerifyBlockCacheMetricsWithSingleCache(PINNED_USAGE_OF_BLOCK_CACHE);
+ }
+
+ private void runAndVerifySumOfProperties(final String propertyName) throws Exception {
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(new Metrics(), "test-client", StreamsConfig.METRICS_LATEST, new MockTime());
+ new StreamsMetricsImpl(new Metrics(), "test-client", StreamsConfig.METRICS_LATEST, new MockTime());
final RocksDBMetricsRecorder recorder = new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME);
- expect(dbToAdd1.getAggregatedLongProperty(ROCKSDB_PROPERTIES_PREFIX + NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE))
- .andStubReturn(5L);
- expect(dbToAdd2.getAggregatedLongProperty(ROCKSDB_PROPERTIES_PREFIX + NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE))
- .andStubReturn(3L);
- replay(dbToAdd1, dbToAdd2);
recorder.init(streamsMetrics, TASK_ID);
recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1);
recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, cacheToAdd2, statisticsToAdd2);
+ final long recordedValue1 = 5L;
+ final long recordedValue2 = 3L;
+ expect(dbToAdd1.getAggregatedLongProperty(ROCKSDB_PROPERTIES_PREFIX + propertyName))
+ .andStubReturn(recordedValue1);
+ expect(dbToAdd2.getAggregatedLongProperty(ROCKSDB_PROPERTIES_PREFIX + propertyName))
+ .andStubReturn(recordedValue2);
+ replay(dbToAdd1, dbToAdd2);
+
+ verifyMetrics(streamsMetrics, propertyName, recordedValue1 + recordedValue2);
+ }
+
+ private void runAndVerifyBlockCacheMetricsWithMultipleCaches(final String propertyName) throws Exception {
+ runAndVerifySumOfProperties(propertyName);
+ }
+
+ private void runAndVerifyBlockCacheMetricsWithSingleCache(final String propertyName) throws Exception {
+ final StreamsMetricsImpl streamsMetrics =
+ new StreamsMetricsImpl(new Metrics(), "test-client", StreamsConfig.METRICS_LATEST, new MockTime());
+ final RocksDBMetricsRecorder recorder = new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME);
+
+ recorder.init(streamsMetrics, TASK_ID);
+ recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1);
+ recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, cacheToAdd1, statisticsToAdd2);
+
+ final long recordedValue = 5L;
+ expect(dbToAdd1.getAggregatedLongProperty(ROCKSDB_PROPERTIES_PREFIX + propertyName))
+ .andStubReturn(recordedValue);
+ expect(dbToAdd2.getAggregatedLongProperty(ROCKSDB_PROPERTIES_PREFIX + propertyName))
+ .andStubReturn(recordedValue);
+ replay(dbToAdd1, dbToAdd2);
+
+ verifyMetrics(streamsMetrics, propertyName, recordedValue);
+ }
+
+ private void verifyMetrics(final StreamsMetricsImpl streamsMetrics,
+ final String propertyName,
+ final long expectedValue) {
+
final Map<MetricName, ? extends Metric> metrics = streamsMetrics.metrics();
final Map<String, String> tagMap = mkMap(
mkEntry(THREAD_ID_TAG, Thread.currentThread().getName()),
@@ -83,13 +255,13 @@ public class RocksDBMetricsRecorderGaugesTest {
mkEntry(METRICS_SCOPE + "-" + STORE_ID_TAG, STORE_NAME)
);
final KafkaMetric metric = (KafkaMetric) metrics.get(new MetricName(
- NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE,
+ propertyName,
STATE_STORE_LEVEL_GROUP,
"description is ignored",
tagMap
));
assertThat(metric, notNullValue());
- assertThat(metric.metricValue(), is(BigInteger.valueOf(8)));
+ assertThat(metric.metricValue(), is(BigInteger.valueOf(expectedValue)));
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java
index 653cf97..dc08f84 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java
@@ -59,13 +59,16 @@ public class RocksDBMetricsRecorderTest {
private final static String STORE_NAME = "store-name";
private final static String SEGMENT_STORE_NAME_1 = "segment-store-name-1";
private final static String SEGMENT_STORE_NAME_2 = "segment-store-name-2";
+ private final static String SEGMENT_STORE_NAME_3 = "segment-store-name-3";
private final RocksDB dbToAdd1 = mock(RocksDB.class);
private final RocksDB dbToAdd2 = mock(RocksDB.class);
+ private final RocksDB dbToAdd3 = mock(RocksDB.class);
private final Cache cacheToAdd1 = mock(Cache.class);
private final Cache cacheToAdd2 = mock(Cache.class);
private final Statistics statisticsToAdd1 = mock(Statistics.class);
private final Statistics statisticsToAdd2 = mock(Statistics.class);
+ private final Statistics statisticsToAdd3 = mock(Statistics.class);
private final Sensor bytesWrittenToDatabaseSensor = createMock(Sensor.class);
private final Sensor bytesReadFromDatabaseSensor = createMock(Sensor.class);
@@ -157,43 +160,127 @@ public class RocksDBMetricsRecorderTest {
);
assertThat(
exception.getMessage(),
- is("Value providers for store \"" + SEGMENT_STORE_NAME_1 + "\" of task " + TASK_ID1 +
- " has been already added. This is a bug in Kafka Streams. " +
- "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues")
+ is("Value providers for store " + SEGMENT_STORE_NAME_1 + " of task " + TASK_ID1 +
+ " has been already added. This is a bug in Kafka Streams. " +
+ "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues")
);
}
@Test
- public void shouldThrowIfStatisticsForOnlyOneSegmentOutOfMultipleIsNull() {
+ public void shouldThrowIfStatisticsToAddIsNotNullButExsitingStatisticsAreNull() {
+ recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, null);
+
+ final Throwable exception = assertThrows(
+ IllegalStateException.class,
+ () -> recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, cacheToAdd2, statisticsToAdd2)
+ );
+ assertThat(
+ exception.getMessage(),
+ is("Statistics for segment " + SEGMENT_STORE_NAME_2 + " of task " + TASK_ID1 +
+ " is not null although the statistics of another segment in this metrics recorder is null. " +
+ "This is a bug in Kafka Streams. " +
+ "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues")
+ );
+ }
+
+ @Test
+ public void shouldThrowIfStatisticsToAddIsNullButExsitingStatisticsAreNotNull() {
recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1);
final Throwable exception = assertThrows(
IllegalStateException.class,
- () -> recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd1, cacheToAdd1, null)
+ () -> recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, cacheToAdd2, null)
);
assertThat(
exception.getMessage(),
- is("Statistics for store \"" + SEGMENT_STORE_NAME_2 + "\" of task " + TASK_ID1 +
- " is null although the statistics of another store in this metrics recorder is not null. " +
- "This is a bug in Kafka Streams. " +
- "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues")
+ is("Statistics for segment " + SEGMENT_STORE_NAME_2 + " of task " + TASK_ID1 +
+ " is null although the statistics of another segment in this metrics recorder is not null. " +
+ "This is a bug in Kafka Streams. " +
+ "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues")
);
}
@Test
- public void shouldThrowIfStatisticsForOnlyOneSegmentOutOfMultipleIsNotNull() {
- recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, null);
+ public void shouldThrowIfCacheToAddIsNullButExsitingCacheIsNotNull() {
+ recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, null, statisticsToAdd1);
+
+ final Throwable exception = assertThrows(
+ IllegalStateException.class,
+ () -> recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, cacheToAdd1, statisticsToAdd1)
+ );
+ assertThat(
+ exception.getMessage(),
+ is("Cache for segment " + SEGMENT_STORE_NAME_2 + " of task " + TASK_ID1 +
+ " is not null although the cache of another segment in this metrics recorder is null. " +
+ "This is a bug in Kafka Streams. " +
+ "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues")
+ );
+ }
+
+ @Test
+ public void shouldThrowIfCacheToAddIsNotNullButExistingCacheIsNull() {
+ recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1);
+
+ final Throwable exception = assertThrows(
+ IllegalStateException.class,
+ () -> recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, null, statisticsToAdd2)
+ );
+ assertThat(
+ exception.getMessage(),
+ is("Cache for segment " + SEGMENT_STORE_NAME_2 + " of task " + TASK_ID1 +
+ " is null although the cache of another segment in this metrics recorder is not null. " +
+ "This is a bug in Kafka Streams. " +
+ "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues")
+ );
+ }
+
+ @Test
+ public void shouldThrowIfCacheToAddIsNotSameAsAllExistingCaches() {
+ recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1);
+ recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, cacheToAdd1, statisticsToAdd2);
+
+ final Throwable exception = assertThrows(
+ IllegalStateException.class,
+ () -> recorder.addValueProviders(SEGMENT_STORE_NAME_3, dbToAdd3, cacheToAdd2, statisticsToAdd3)
+ );
+ assertThat(
+ exception.getMessage(),
+ is("Caches for store " + STORE_NAME + " of task " + TASK_ID1 +
+ " are either not all distinct or do not all refer to the same cache. This is a bug in Kafka Streams. " +
+ "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues")
+ );
+ }
+
+ @Test
+ public void shouldThrowIfCacheToAddIsSameAsOnlyOneOfMultipleCaches() {
+ recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1);
+ recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, cacheToAdd2, statisticsToAdd2);
+
+ final Throwable exception = assertThrows(
+ IllegalStateException.class,
+ () -> recorder.addValueProviders(SEGMENT_STORE_NAME_3, dbToAdd3, cacheToAdd1, statisticsToAdd3)
+ );
+ assertThat(
+ exception.getMessage(),
+ is("Caches for store " + STORE_NAME + " of task " + TASK_ID1 +
+ " are either not all distinct or do not all refer to the same cache. This is a bug in Kafka Streams. " +
+ "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues")
+ );
+ }
+
+ @Test
+ public void shouldThrowIfDbToAddWasAlreadyAddedForOtherSegment() {
+ recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1);
final Throwable exception = assertThrows(
IllegalStateException.class,
- () -> recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd1, cacheToAdd1, statisticsToAdd1)
+ () -> recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd1, cacheToAdd2, statisticsToAdd2)
);
assertThat(
exception.getMessage(),
- is("Statistics for store \"" + SEGMENT_STORE_NAME_2 + "\" of task " + TASK_ID1 +
- " is not null although the statistics of another store in this metrics recorder is null. " +
- "This is a bug in Kafka Streams. " +
- "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues")
+ is("DB instance for store " + SEGMENT_STORE_NAME_2 + " of task " + TASK_ID1 +
+ " was already added for another segment as a value provider. This is a bug in Kafka Streams. " +
+ "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues")
);
}
@@ -471,7 +558,28 @@ public class RocksDBMetricsRecorderTest {
.andReturn(numberOfOpenFilesSensor);
expect(RocksDBMetrics.numberOfFileErrorsSensor(eq(streamsMetrics), eq(metricsContext)))
.andReturn(numberOfFileErrorsSensor);
+ RocksDBMetrics.addNumImmutableMemTableMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+ RocksDBMetrics.addCurSizeActiveMemTable(eq(streamsMetrics), eq(metricsContext), anyObject());
+ RocksDBMetrics.addCurSizeAllMemTables(eq(streamsMetrics), eq(metricsContext), anyObject());
+ RocksDBMetrics.addSizeAllMemTables(eq(streamsMetrics), eq(metricsContext), anyObject());
RocksDBMetrics.addNumEntriesActiveMemTableMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+ RocksDBMetrics.addNumEntriesImmMemTablesMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+ RocksDBMetrics.addNumDeletesActiveMemTableMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+ RocksDBMetrics.addNumDeletesImmMemTablesMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+ RocksDBMetrics.addMemTableFlushPending(eq(streamsMetrics), eq(metricsContext), anyObject());
+ RocksDBMetrics.addNumRunningFlushesMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+ RocksDBMetrics.addCompactionPendingMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+ RocksDBMetrics.addNumRunningCompactionsMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+ RocksDBMetrics.addEstimatePendingCompactionBytesMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+ RocksDBMetrics.addTotalSstFilesSizeMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+ RocksDBMetrics.addLiveSstFilesSizeMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+ RocksDBMetrics.addNumLiveVersionMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+ RocksDBMetrics.addBlockCacheCapacityMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+ RocksDBMetrics.addBlockCacheUsageMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+ RocksDBMetrics.addBlockCachePinnedUsageMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+ RocksDBMetrics.addEstimateNumKeysMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+ RocksDBMetrics.addEstimateTableReadersMemMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+ RocksDBMetrics.addBackgroundErrorsMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
replay(RocksDBMetrics.class);
}
@@ -503,7 +611,28 @@ public class RocksDBMetricsRecorderTest {
.andStubReturn(numberOfOpenFilesSensor);
expect(RocksDBMetrics.numberOfFileErrorsSensor(streamsMetrics, metricsContext))
.andStubReturn(numberOfFileErrorsSensor);
+ RocksDBMetrics.addNumImmutableMemTableMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+ RocksDBMetrics.addCurSizeActiveMemTable(eq(streamsMetrics), eq(metricsContext), anyObject());
+ RocksDBMetrics.addCurSizeAllMemTables(eq(streamsMetrics), eq(metricsContext), anyObject());
+ RocksDBMetrics.addSizeAllMemTables(eq(streamsMetrics), eq(metricsContext), anyObject());
RocksDBMetrics.addNumEntriesActiveMemTableMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+ RocksDBMetrics.addNumEntriesImmMemTablesMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+ RocksDBMetrics.addNumDeletesActiveMemTableMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+ RocksDBMetrics.addNumDeletesImmMemTablesMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+ RocksDBMetrics.addMemTableFlushPending(eq(streamsMetrics), eq(metricsContext), anyObject());
+ RocksDBMetrics.addNumRunningFlushesMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+ RocksDBMetrics.addCompactionPendingMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+ RocksDBMetrics.addNumRunningCompactionsMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+ RocksDBMetrics.addEstimatePendingCompactionBytesMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+ RocksDBMetrics.addTotalSstFilesSizeMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+ RocksDBMetrics.addLiveSstFilesSizeMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+ RocksDBMetrics.addNumLiveVersionMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+ RocksDBMetrics.addBlockCacheCapacityMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+ RocksDBMetrics.addBlockCacheUsageMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+ RocksDBMetrics.addBlockCachePinnedUsageMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+ RocksDBMetrics.addEstimateNumKeysMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+ RocksDBMetrics.addEstimateTableReadersMemMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
+ RocksDBMetrics.addBackgroundErrorsMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
replay(RocksDBMetrics.class);
}
}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsTest.java
index f6dc503..0d4ae6f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsTest.java
@@ -52,6 +52,7 @@ public class RocksDBMetricsTest {
private static final String STORE_NAME = "store";
private static final RocksDBMetricContext ROCKSDB_METRIC_CONTEXT =
new RocksDBMetricContext(TASK_ID, STORE_TYPE, STORE_NAME);
+ private static final Gauge<BigInteger> VALUE_PROVIDER = (config, now) -> BigInteger.valueOf(10);
private final Metrics metrics = new Metrics();
private final Sensor sensor = metrics.sensor("dummy");
@@ -221,22 +222,263 @@ public class RocksDBMetricsTest {
}
@Test
- public void shouldAddNumImmutableMemTableMetric() {
+ public void shouldAddNumEntriesActiveMemTableMetric() {
final String name = "num-entries-active-mem-table";
- final String description = "Current total number of entries in the active memtable";
- final Gauge<BigInteger> valueProvider = (config, now) -> BigInteger.valueOf(10);
+ final String description = "Total number of entries in the active memtable";
+ runAndVerifyMutableMetric(
+ name,
+ description,
+ () -> RocksDBMetrics.addNumEntriesActiveMemTableMetric(streamsMetrics, ROCKSDB_METRIC_CONTEXT, VALUE_PROVIDER)
+ );
+ }
+
+ @Test
+ public void shouldAddNumberDeletesActiveTableMetric() {
+ final String name = "num-deletes-active-mem-table";
+ final String description = "Total number of delete entries in the active memtable";
+ runAndVerifyMutableMetric(
+ name,
+ description,
+ () -> RocksDBMetrics.addNumDeletesActiveMemTableMetric(streamsMetrics, ROCKSDB_METRIC_CONTEXT, VALUE_PROVIDER)
+ );
+ }
+
+ @Test
+ public void shouldAddNumEntriesImmutableMemTablesMetric() {
+ final String name = "num-entries-imm-mem-tables";
+ final String description = "Total number of entries in the unflushed immutable memtables";
+ runAndVerifyMutableMetric(
+ name,
+ description,
+ () -> RocksDBMetrics.addNumEntriesImmMemTablesMetric(streamsMetrics, ROCKSDB_METRIC_CONTEXT, VALUE_PROVIDER)
+ );
+ }
+
+ @Test
+ public void shouldAddNumDeletesImmutableMemTablesMetric() {
+ final String name = "num-deletes-imm-mem-tables";
+ final String description = "Total number of delete entries in the unflushed immutable memtables";
+ runAndVerifyMutableMetric(
+ name,
+ description,
+ () -> RocksDBMetrics.addNumDeletesImmMemTablesMetric(streamsMetrics, ROCKSDB_METRIC_CONTEXT, VALUE_PROVIDER)
+ );
+ }
+
+ @Test
+ public void shouldAddNumImmutableMemTablesMetric() {
+ final String name = "num-immutable-mem-table";
+ final String description = "Number of immutable memtables that have not yet been flushed";
+ runAndVerifyMutableMetric(
+ name,
+ description,
+ () -> RocksDBMetrics.addNumImmutableMemTableMetric(streamsMetrics, ROCKSDB_METRIC_CONTEXT, VALUE_PROVIDER)
+ );
+ }
+
+ @Test
+ public void shouldAddCurSizeActiveMemTableMetric() {
+ final String name = "cur-size-active-mem-table";
+ final String description = "Approximate size of active memtable in bytes";
+ runAndVerifyMutableMetric(
+ name,
+ description,
+ () -> RocksDBMetrics.addCurSizeActiveMemTable(streamsMetrics, ROCKSDB_METRIC_CONTEXT, VALUE_PROVIDER)
+ );
+ }
+
+ @Test
+ public void shouldAddCurSizeAllMemTablesMetric() {
+ final String name = "cur-size-all-mem-tables";
+ final String description = "Approximate size of active and unflushed immutable memtable in bytes";
+ runAndVerifyMutableMetric(
+ name,
+ description,
+ () -> RocksDBMetrics.addCurSizeAllMemTables(streamsMetrics, ROCKSDB_METRIC_CONTEXT, VALUE_PROVIDER)
+ );
+ }
+
+ @Test
+ public void shouldAddSizeAllMemTablesMetric() {
+ final String name = "size-all-mem-tables";
+ final String description = "Approximate size of active, unflushed immutable, and pinned immutable memtables in bytes";
+ runAndVerifyMutableMetric(
+ name,
+ description,
+ () -> RocksDBMetrics.addSizeAllMemTables(streamsMetrics, ROCKSDB_METRIC_CONTEXT, VALUE_PROVIDER)
+ );
+ }
+
+ @Test
+ public void shouldAddMemTableFlushPendingMetric() {
+ final String name = "mem-table-flush-pending";
+ final String description = "Reports 1 if a memtable flush is pending, otherwise it reports 0";
+ runAndVerifyMutableMetric(
+ name,
+ description,
+ () -> RocksDBMetrics.addMemTableFlushPending(streamsMetrics, ROCKSDB_METRIC_CONTEXT, VALUE_PROVIDER)
+ );
+ }
+
+ @Test
+ public void shouldAddNumRunningFlushesMetric() {
+ final String name = "num-running-flushes";
+ final String description = "Number of currently running flushes";
+ runAndVerifyMutableMetric(
+ name,
+ description,
+ () -> RocksDBMetrics.addNumRunningFlushesMetric(streamsMetrics, ROCKSDB_METRIC_CONTEXT, VALUE_PROVIDER)
+ );
+ }
+
+ @Test
+ public void shouldAddCompactionPendingMetric() {
+ final String name = "compaction-pending";
+ final String description = "Reports 1 if at least one compaction is pending, otherwise it reports 0";
+ runAndVerifyMutableMetric(
+ name,
+ description,
+ () -> RocksDBMetrics.addCompactionPendingMetric(streamsMetrics, ROCKSDB_METRIC_CONTEXT, VALUE_PROVIDER)
+ );
+ }
+
+ @Test
+ public void shouldAddNumRunningCompactionsMetric() {
+ final String name = "num-running-compactions";
+ final String description = "Number of currently running compactions";
+ runAndVerifyMutableMetric(
+ name,
+ description,
+ () -> RocksDBMetrics.addNumRunningCompactionsMetric(streamsMetrics, ROCKSDB_METRIC_CONTEXT, VALUE_PROVIDER)
+ );
+ }
+
+ @Test
+ public void shouldAddEstimatePendingCompactionBytesMetric() {
+ final String name = "estimate-pending-compaction-bytes";
+ final String description =
+ "Estimated total number of bytes a compaction needs to rewrite on disk to get all levels down to under target size";
+ runAndVerifyMutableMetric(
+ name,
+ description,
+ () -> RocksDBMetrics.addEstimatePendingCompactionBytesMetric(streamsMetrics, ROCKSDB_METRIC_CONTEXT, VALUE_PROVIDER)
+ );
+ }
+
+ @Test
+ public void shouldAddTotalSstFilesSizeMetric() {
+ final String name = "total-sst-files-size";
+ final String description = "Total size in bytes of all SST files";
+ runAndVerifyMutableMetric(
+ name,
+ description,
+ () -> RocksDBMetrics.addTotalSstFilesSizeMetric(streamsMetrics, ROCKSDB_METRIC_CONTEXT, VALUE_PROVIDER)
+ );
+ }
+
+ @Test
+ public void shouldAddLiveSstFilesSizeMetric() {
+ final String name = "live-sst-files-size";
+ final String description = "Total size in bytes of all SST files that belong to the latest LSM tree";
+ runAndVerifyMutableMetric(
+ name,
+ description,
+ () -> RocksDBMetrics.addLiveSstFilesSizeMetric(streamsMetrics, ROCKSDB_METRIC_CONTEXT, VALUE_PROVIDER)
+ );
+ }
+
+ @Test
+ public void shouldAddNumLiveVersionMetric() {
+ final String name = "num-live-versions";
+ final String description = "Number of live versions";
+ runAndVerifyMutableMetric(
+ name,
+ description,
+ () -> RocksDBMetrics.addNumLiveVersionMetric(streamsMetrics, ROCKSDB_METRIC_CONTEXT, VALUE_PROVIDER)
+ );
+ }
+
+ @Test
+ public void shouldAddBlockCacheCapacityMetric() {
+ final String name = "block-cache-capacity";
+ final String description = "Capacity of the block cache in bytes";
+ runAndVerifyMutableMetric(
+ name,
+ description,
+ () -> RocksDBMetrics.addBlockCacheCapacityMetric(streamsMetrics, ROCKSDB_METRIC_CONTEXT, VALUE_PROVIDER)
+ );
+ }
+
+ @Test
+ public void shouldAddBlockCacheUsageMetric() {
+ final String name = "block-cache-usage";
+ final String description = "Memory size of the entries residing in block cache in bytes";
+ runAndVerifyMutableMetric(
+ name,
+ description,
+ () -> RocksDBMetrics.addBlockCacheUsageMetric(streamsMetrics, ROCKSDB_METRIC_CONTEXT, VALUE_PROVIDER)
+ );
+ }
+
+ @Test
+ public void shouldAddBlockCachePinnedUsageMetric() {
+ final String name = "block-cache-pinned-usage";
+ final String description = "Memory size for the entries being pinned in the block cache in bytes";
+ runAndVerifyMutableMetric(
+ name,
+ description,
+ () -> RocksDBMetrics.addBlockCachePinnedUsageMetric(streamsMetrics, ROCKSDB_METRIC_CONTEXT, VALUE_PROVIDER)
+ );
+ }
+
+ @Test
+ public void shouldAddEstimateNumKeysMetric() {
+ final String name = "estimate-num-keys";
+ final String description =
+ "Estimated number of total keys in the active and unflushed immutable memtables and storage";
+ runAndVerifyMutableMetric(
+ name,
+ description,
+ () -> RocksDBMetrics.addEstimateNumKeysMetric(streamsMetrics, ROCKSDB_METRIC_CONTEXT, VALUE_PROVIDER)
+ );
+ }
+
+ @Test
+ public void shouldAddEstimateTableReadersMemMetric() {
+ final String name = "estimate-table-readers-mem";
+ final String description =
+ "Estimated memory in bytes used for reading SST tables, excluding memory used in block cache";
+ runAndVerifyMutableMetric(
+ name,
+ description,
+ () -> RocksDBMetrics.addEstimateTableReadersMemMetric(streamsMetrics, ROCKSDB_METRIC_CONTEXT, VALUE_PROVIDER)
+ );
+ }
+
+ @Test
+ public void shouldAddBackgroundErrorsMetric() {
+ final String name = "background-errors";
+ final String description = "Total number of background errors";
+ runAndVerifyMutableMetric(
+ name,
+ description,
+ () -> RocksDBMetrics.addBackgroundErrorsMetric(streamsMetrics, ROCKSDB_METRIC_CONTEXT, VALUE_PROVIDER)
+ );
+ }
+
+ private void runAndVerifyMutableMetric(final String name, final String description, final Runnable metricAdder) {
streamsMetrics.addStoreLevelMutableMetric(
eq(TASK_ID),
- eq(STORE_TYPE),
- eq(STORE_NAME),
- eq(name),
- eq(description),
- eq(RecordingLevel.INFO),
- eq(valueProvider)
+ eq(STORE_TYPE),
+ eq(STORE_NAME),
+ eq(name),
+ eq(description),
+ eq(RecordingLevel.INFO),
+ eq(VALUE_PROVIDER)
);
replay(streamsMetrics);
- RocksDBMetrics.addNumEntriesActiveMemTableMetric(streamsMetrics, ROCKSDB_METRIC_CONTEXT, valueProvider);
+ metricAdder.run();
verify(streamsMetrics);
}