You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/08/31 11:14:54 UTC

[GitHub] [kafka] cadonna opened a new pull request #9232: KAFKA-9924: Add remaining property-based RocksDB metrics as described in KIP-607

cadonna opened a new pull request #9232:
URL: https://github.com/apache/kafka/pull/9232


   This commit adds the remaining property-based RocksDB metrics as described in KIP-607, except for num-entries-active-mem-table, which was added in PR #9177.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9232: KAFKA-9924: Add remaining property-based RocksDB metrics as described in KIP-607

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9232:
URL: https://github.com/apache/kafka/pull/9232#discussion_r480914782



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java
##########
@@ -150,14 +176,55 @@ private void verifyStatistics(final String segmentName, final Statistics statist
                 statistics != null &&
                 storeToValueProviders.values().stream().anyMatch(valueProviders -> valueProviders.statistics == null))) {
 
-            throw new IllegalStateException("Statistics for store \"" + segmentName + "\" of task " + taskId +
-                " is" + (statistics == null ? " " : " not ") + "null although the statistics of another store in this " +
+            throw new IllegalStateException("Statistics for segment " + segmentName + " of task " + taskId +
+                " is" + (statistics == null ? " " : " not ") + "null although the statistics of another segment in this " +
                 "metrics recorder is" + (statistics != null ? " " : " not ") + "null. " +
                 "This is a bug in Kafka Streams. " +
                 "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues");
         }
     }
 
+    private void verifyDbAndCacheAndStatistics(final String segmentName,
+                                               final RocksDB db,
+                                               final Cache cache,
+                                               final Statistics statistics) {
+        for (final DbAndCacheAndStatistics valueProviders : storeToValueProviders.values()) {
+            verifyIfSomeAreNull(segmentName, statistics, valueProviders.statistics, "statistics");
+            verifyIfSomeAreNull(segmentName, cache, valueProviders.cache, "cache");
+            if (db == valueProviders.db) {
+                throw new IllegalStateException("DB instance for store " + segmentName + " of task " + taskId +
+                    " was already added for another segment as a value provider. This is a bug in Kafka Streams. " +
+                    "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues");
+            }
+            if (storeToValueProviders.size() == 1 && cache != valueProviders.cache) {

Review comment:
       If RocksDB uses the memory bounded configuration, all RocksDB instances use the same cache, i.e., the reference to the same cache. 
   At this point in the code the value providers for the given segment with the cache `cache` has not been added yet to the value providers used in this recorder. To understand if different caches are used for different segments (i.e., `singleCache = false`), it is not enough to just check if only one single cache has been already added, we also need to check if the already added cache (i.e., `valueProviders.cache`) is different from the cache to add (`cache`).       




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9232: KAFKA-9924: Add remaining property-based RocksDB metrics as described in KIP-607

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9232:
URL: https://github.com/apache/kafka/pull/9232#discussion_r480927013



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java
##########
@@ -174,22 +241,163 @@ private void initSensors(final StreamsMetricsImpl streamsMetrics, final RocksDBM
         numberOfFileErrorsSensor = RocksDBMetrics.numberOfFileErrorsSensor(streamsMetrics, metricContext);
     }
 
-    private void initGauges(final StreamsMetricsImpl streamsMetrics, final RocksDBMetricContext metricContext) {
-        RocksDBMetrics.addNumEntriesActiveMemTableMetric(streamsMetrics, metricContext, (metricsConfig, now) -> {
+    private void initGauges(final StreamsMetricsImpl streamsMetrics,
+                            final RocksDBMetricContext metricContext) {
+        RocksDBMetrics.addNumImmutableMemTableMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(NUMBER_OF_IMMUTABLE_MEMTABLES)
+        );
+        RocksDBMetrics.addCurSizeActiveMemTable(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(CURRENT_SIZE_OF_ACTIVE_MEMTABLE)
+        );
+        RocksDBMetrics.addCurSizeAllMemTables(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(CURRENT_SIZE_OF_ALL_MEMTABLES)
+        );
+        RocksDBMetrics.addSizeAllMemTables(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(SIZE_OF_ALL_MEMTABLES)
+        );
+        RocksDBMetrics.addNumEntriesActiveMemTableMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE)
+        );
+        RocksDBMetrics.addNumDeletesActiveMemTableMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(NUMBER_OF_DELETES_ACTIVE_MEMTABLE)
+        );
+        RocksDBMetrics.addNumEntriesImmMemTablesMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(NUMBER_OF_ENTRIES_IMMUTABLE_MEMTABLES)
+        );
+        RocksDBMetrics.addNumDeletesImmMemTablesMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(NUMBER_OF_DELETES_IMMUTABLE_MEMTABLES)
+        );
+        RocksDBMetrics.addMemTableFlushPending(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(MEMTABLE_FLUSH_PENDING)
+        );
+        RocksDBMetrics.addNumRunningFlushesMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(NUMBER_OF_RUNNING_FLUSHES)
+        );
+        RocksDBMetrics.addCompactionPendingMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(COMPACTION_PENDING)
+        );
+        RocksDBMetrics.addNumRunningCompactionsMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(NUMBER_OF_RUNNING_COMPACTIONS)
+        );
+        RocksDBMetrics.addEstimatePendingCompactionBytesMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(ESTIMATED_BYTES_OF_PENDING_COMPACTION)
+        );
+        RocksDBMetrics.addTotalSstFilesSizeMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(TOTAL_SST_FILES_SIZE)
+        );
+        RocksDBMetrics.addLiveSstFilesSizeMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(LIVE_SST_FILES_SIZE)
+        );
+        RocksDBMetrics.addNumLiveVersionMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(NUMBER_OF_LIVE_VERSIONS)
+        );
+        RocksDBMetrics.addEstimateNumKeysMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(ESTIMATED_NUMBER_OF_KEYS)
+        );
+        RocksDBMetrics.addEstimateTableReadersMemMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(ESTIMATED_MEMORY_OF_TABLE_READERS)
+        );
+        RocksDBMetrics.addBackgroundErrorsMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(NUMBER_OF_BACKGROUND_ERRORS)
+        );
+        RocksDBMetrics.addBlockCacheCapacityMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeBlockCacheMetrics(CAPACITY_OF_BLOCK_CACHE)
+        );
+        RocksDBMetrics.addBlockCacheUsageMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeBlockCacheMetrics(USAGE_OF_BLOCK_CACHE)
+        );
+        RocksDBMetrics.addBlockCachePinnedUsageMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeBlockCacheMetrics(PINNED_USAGE_OF_BLOCK_CACHE)
+        );
+    }
+
+    private Gauge<BigInteger> gaugeToComputeSumOfProperties(final String propertyName) {
+        return (metricsConfig, now) -> {
             BigInteger result = BigInteger.valueOf(0);
             for (final DbAndCacheAndStatistics valueProvider : storeToValueProviders.values()) {
                 try {
                     // values of RocksDB properties are of type unsigned long in C++, i.e., in Java we need to use
                     // BigInteger and construct the object from the byte representation of the value
                     result = result.add(new BigInteger(1, longToBytes(
-                        valueProvider.db.getAggregatedLongProperty(ROCKSDB_PROPERTIES_PREFIX + NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE))));
+                        valueProvider.db.getAggregatedLongProperty(ROCKSDB_PROPERTIES_PREFIX + propertyName)
+                    )));
+                } catch (final RocksDBException e) {

Review comment:
       Actually, I just followed the pattern found in `RocksDBStore`. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #9232: KAFKA-9924: Add remaining property-based RocksDB metrics as described in KIP-607

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9232:
URL: https://github.com/apache/kafka/pull/9232#discussion_r480477639



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java
##########
@@ -150,14 +176,55 @@ private void verifyStatistics(final String segmentName, final Statistics statist
                 statistics != null &&
                 storeToValueProviders.values().stream().anyMatch(valueProviders -> valueProviders.statistics == null))) {
 
-            throw new IllegalStateException("Statistics for store \"" + segmentName + "\" of task " + taskId +
-                " is" + (statistics == null ? " " : " not ") + "null although the statistics of another store in this " +
+            throw new IllegalStateException("Statistics for segment " + segmentName + " of task " + taskId +
+                " is" + (statistics == null ? " " : " not ") + "null although the statistics of another segment in this " +
                 "metrics recorder is" + (statistics != null ? " " : " not ") + "null. " +
                 "This is a bug in Kafka Streams. " +
                 "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues");
         }
     }
 
+    private void verifyDbAndCacheAndStatistics(final String segmentName,
+                                               final RocksDB db,
+                                               final Cache cache,
+                                               final Statistics statistics) {
+        for (final DbAndCacheAndStatistics valueProviders : storeToValueProviders.values()) {
+            verifyIfSomeAreNull(segmentName, statistics, valueProviders.statistics, "statistics");
+            verifyIfSomeAreNull(segmentName, cache, valueProviders.cache, "cache");
+            if (db == valueProviders.db) {
+                throw new IllegalStateException("DB instance for store " + segmentName + " of task " + taskId +
+                    " was already added for another segment as a value provider. This is a bug in Kafka Streams. " +
+                    "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues");
+            }
+            if (storeToValueProviders.size() == 1 && cache != valueProviders.cache) {

Review comment:
       Hmm, why we need the second condition to determine `singleCache = false` here?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java
##########
@@ -150,14 +176,55 @@ private void verifyStatistics(final String segmentName, final Statistics statist
                 statistics != null &&
                 storeToValueProviders.values().stream().anyMatch(valueProviders -> valueProviders.statistics == null))) {
 
-            throw new IllegalStateException("Statistics for store \"" + segmentName + "\" of task " + taskId +
-                " is" + (statistics == null ? " " : " not ") + "null although the statistics of another store in this " +
+            throw new IllegalStateException("Statistics for segment " + segmentName + " of task " + taskId +
+                " is" + (statistics == null ? " " : " not ") + "null although the statistics of another segment in this " +
                 "metrics recorder is" + (statistics != null ? " " : " not ") + "null. " +
                 "This is a bug in Kafka Streams. " +
                 "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues");
         }
     }
 
+    private void verifyDbAndCacheAndStatistics(final String segmentName,
+                                               final RocksDB db,
+                                               final Cache cache,
+                                               final Statistics statistics) {
+        for (final DbAndCacheAndStatistics valueProviders : storeToValueProviders.values()) {
+            verifyIfSomeAreNull(segmentName, statistics, valueProviders.statistics, "statistics");
+            verifyIfSomeAreNull(segmentName, cache, valueProviders.cache, "cache");
+            if (db == valueProviders.db) {
+                throw new IllegalStateException("DB instance for store " + segmentName + " of task " + taskId +
+                    " was already added for another segment as a value provider. This is a bug in Kafka Streams. " +
+                    "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues");
+            }
+            if (storeToValueProviders.size() == 1 && cache != valueProviders.cache) {
+                singleCache = false;
+            } else if (singleCache && cache != valueProviders.cache || !singleCache && cache == valueProviders.cache) {
+                throw new IllegalStateException("Caches for store " + storeName + " of task " + taskId +
+                    " are either not all distinct or do not all refer to the same cache. This is a bug in Kafka Streams. " +
+                    "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues");
+            }
+        }
+    }
+
+    private void verifyIfSomeAreNull(final String segmentName,

Review comment:
       nit: `verifyConsistentSegmentValueProviders`?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java
##########
@@ -174,22 +241,163 @@ private void initSensors(final StreamsMetricsImpl streamsMetrics, final RocksDBM
         numberOfFileErrorsSensor = RocksDBMetrics.numberOfFileErrorsSensor(streamsMetrics, metricContext);
     }
 
-    private void initGauges(final StreamsMetricsImpl streamsMetrics, final RocksDBMetricContext metricContext) {
-        RocksDBMetrics.addNumEntriesActiveMemTableMetric(streamsMetrics, metricContext, (metricsConfig, now) -> {
+    private void initGauges(final StreamsMetricsImpl streamsMetrics,
+                            final RocksDBMetricContext metricContext) {
+        RocksDBMetrics.addNumImmutableMemTableMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(NUMBER_OF_IMMUTABLE_MEMTABLES)
+        );
+        RocksDBMetrics.addCurSizeActiveMemTable(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(CURRENT_SIZE_OF_ACTIVE_MEMTABLE)
+        );
+        RocksDBMetrics.addCurSizeAllMemTables(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(CURRENT_SIZE_OF_ALL_MEMTABLES)
+        );
+        RocksDBMetrics.addSizeAllMemTables(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(SIZE_OF_ALL_MEMTABLES)
+        );
+        RocksDBMetrics.addNumEntriesActiveMemTableMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE)
+        );
+        RocksDBMetrics.addNumDeletesActiveMemTableMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(NUMBER_OF_DELETES_ACTIVE_MEMTABLE)
+        );
+        RocksDBMetrics.addNumEntriesImmMemTablesMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(NUMBER_OF_ENTRIES_IMMUTABLE_MEMTABLES)
+        );
+        RocksDBMetrics.addNumDeletesImmMemTablesMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(NUMBER_OF_DELETES_IMMUTABLE_MEMTABLES)
+        );
+        RocksDBMetrics.addMemTableFlushPending(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(MEMTABLE_FLUSH_PENDING)
+        );
+        RocksDBMetrics.addNumRunningFlushesMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(NUMBER_OF_RUNNING_FLUSHES)
+        );
+        RocksDBMetrics.addCompactionPendingMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(COMPACTION_PENDING)
+        );
+        RocksDBMetrics.addNumRunningCompactionsMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(NUMBER_OF_RUNNING_COMPACTIONS)
+        );
+        RocksDBMetrics.addEstimatePendingCompactionBytesMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(ESTIMATED_BYTES_OF_PENDING_COMPACTION)
+        );
+        RocksDBMetrics.addTotalSstFilesSizeMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(TOTAL_SST_FILES_SIZE)
+        );
+        RocksDBMetrics.addLiveSstFilesSizeMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(LIVE_SST_FILES_SIZE)
+        );
+        RocksDBMetrics.addNumLiveVersionMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(NUMBER_OF_LIVE_VERSIONS)
+        );
+        RocksDBMetrics.addEstimateNumKeysMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(ESTIMATED_NUMBER_OF_KEYS)
+        );
+        RocksDBMetrics.addEstimateTableReadersMemMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(ESTIMATED_MEMORY_OF_TABLE_READERS)
+        );
+        RocksDBMetrics.addBackgroundErrorsMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(NUMBER_OF_BACKGROUND_ERRORS)
+        );
+        RocksDBMetrics.addBlockCacheCapacityMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeBlockCacheMetrics(CAPACITY_OF_BLOCK_CACHE)
+        );
+        RocksDBMetrics.addBlockCacheUsageMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeBlockCacheMetrics(USAGE_OF_BLOCK_CACHE)
+        );
+        RocksDBMetrics.addBlockCachePinnedUsageMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeBlockCacheMetrics(PINNED_USAGE_OF_BLOCK_CACHE)
+        );
+    }
+
+    private Gauge<BigInteger> gaugeToComputeSumOfProperties(final String propertyName) {
+        return (metricsConfig, now) -> {
             BigInteger result = BigInteger.valueOf(0);
             for (final DbAndCacheAndStatistics valueProvider : storeToValueProviders.values()) {
                 try {
                     // values of RocksDB properties are of type unsigned long in C++, i.e., in Java we need to use
                     // BigInteger and construct the object from the byte representation of the value
                     result = result.add(new BigInteger(1, longToBytes(
-                        valueProvider.db.getAggregatedLongProperty(ROCKSDB_PROPERTIES_PREFIX + NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE))));
+                        valueProvider.db.getAggregatedLongProperty(ROCKSDB_PROPERTIES_PREFIX + propertyName)
+                    )));
+                } catch (final RocksDBException e) {

Review comment:
       Is this a piggy-backed fix to wrap RocksDBException here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang merged pull request #9232: KAFKA-9924: Add remaining property-based RocksDB metrics as described in KIP-607

Posted by GitBox <gi...@apache.org>.
guozhangwang merged pull request #9232:
URL: https://github.com/apache/kafka/pull/9232


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #9232: KAFKA-9924: Add remaining property-based RocksDB metrics as described in KIP-607

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #9232:
URL: https://github.com/apache/kafka/pull/9232#issuecomment-683729711


   Call for review: @guozhangwang @vvcephei 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9232: KAFKA-9924: Add remaining property-based RocksDB metrics as described in KIP-607

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9232:
URL: https://github.com/apache/kafka/pull/9232#discussion_r480924297



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java
##########
@@ -150,14 +176,55 @@ private void verifyStatistics(final String segmentName, final Statistics statist
                 statistics != null &&
                 storeToValueProviders.values().stream().anyMatch(valueProviders -> valueProviders.statistics == null))) {
 
-            throw new IllegalStateException("Statistics for store \"" + segmentName + "\" of task " + taskId +
-                " is" + (statistics == null ? " " : " not ") + "null although the statistics of another store in this " +
+            throw new IllegalStateException("Statistics for segment " + segmentName + " of task " + taskId +
+                " is" + (statistics == null ? " " : " not ") + "null although the statistics of another segment in this " +
                 "metrics recorder is" + (statistics != null ? " " : " not ") + "null. " +
                 "This is a bug in Kafka Streams. " +
                 "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues");
         }
     }
 
+    private void verifyDbAndCacheAndStatistics(final String segmentName,
+                                               final RocksDB db,
+                                               final Cache cache,
+                                               final Statistics statistics) {
+        for (final DbAndCacheAndStatistics valueProviders : storeToValueProviders.values()) {
+            verifyIfSomeAreNull(segmentName, statistics, valueProviders.statistics, "statistics");
+            verifyIfSomeAreNull(segmentName, cache, valueProviders.cache, "cache");
+            if (db == valueProviders.db) {
+                throw new IllegalStateException("DB instance for store " + segmentName + " of task " + taskId +
+                    " was already added for another segment as a value provider. This is a bug in Kafka Streams. " +
+                    "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues");
+            }
+            if (storeToValueProviders.size() == 1 && cache != valueProviders.cache) {
+                singleCache = false;
+            } else if (singleCache && cache != valueProviders.cache || !singleCache && cache == valueProviders.cache) {
+                throw new IllegalStateException("Caches for store " + storeName + " of task " + taskId +
+                    " are either not all distinct or do not all refer to the same cache. This is a bug in Kafka Streams. " +
+                    "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues");
+            }
+        }
+    }
+
+    private void verifyIfSomeAreNull(final String segmentName,

Review comment:
       I will rename it to `verifyConsistencyOfValueProvidersAcrossSegments()`. WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9232: KAFKA-9924: Add remaining property-based RocksDB metrics as described in KIP-607

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9232:
URL: https://github.com/apache/kafka/pull/9232#discussion_r480914782



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java
##########
@@ -150,14 +176,55 @@ private void verifyStatistics(final String segmentName, final Statistics statist
                 statistics != null &&
                 storeToValueProviders.values().stream().anyMatch(valueProviders -> valueProviders.statistics == null))) {
 
-            throw new IllegalStateException("Statistics for store \"" + segmentName + "\" of task " + taskId +
-                " is" + (statistics == null ? " " : " not ") + "null although the statistics of another store in this " +
+            throw new IllegalStateException("Statistics for segment " + segmentName + " of task " + taskId +
+                " is" + (statistics == null ? " " : " not ") + "null although the statistics of another segment in this " +
                 "metrics recorder is" + (statistics != null ? " " : " not ") + "null. " +
                 "This is a bug in Kafka Streams. " +
                 "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues");
         }
     }
 
+    private void verifyDbAndCacheAndStatistics(final String segmentName,
+                                               final RocksDB db,
+                                               final Cache cache,
+                                               final Statistics statistics) {
+        for (final DbAndCacheAndStatistics valueProviders : storeToValueProviders.values()) {
+            verifyIfSomeAreNull(segmentName, statistics, valueProviders.statistics, "statistics");
+            verifyIfSomeAreNull(segmentName, cache, valueProviders.cache, "cache");
+            if (db == valueProviders.db) {
+                throw new IllegalStateException("DB instance for store " + segmentName + " of task " + taskId +
+                    " was already added for another segment as a value provider. This is a bug in Kafka Streams. " +
+                    "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues");
+            }
+            if (storeToValueProviders.size() == 1 && cache != valueProviders.cache) {

Review comment:
       When RocksDB uses the memory bounded configuration, all RocksDB instances use the same cache, i.e., the reference to the same cache. 
   At this point in the code the value providers for the given segment with the cache `cache` has not been added yet to the value providers used in this recorder. To understand if different caches are used for different segments (i.e., `singleCache = false`), it is not enough to just check if only one single cache has been already added, we also need to check if the already added cache (i.e., `valueProviders.cache`) is different from the cache to add (`cache`).       




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org