You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/08/13 20:34:09 UTC

[GitHub] [kafka] cadonna opened a new pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table

cadonna opened a new pull request #9177:
URL: https://github.com/apache/kafka/pull/9177


   This commit adds the first RocksDB metric that exposes RocksDB property num-entries-active-mem-table. More specifically it introduces
   
   - code in StreamsMetricsImpl that is shared by all such metrics,
   - unit tests for the shared code
   - code that adds the metric
   - unit tests and intergration tests for the metric
   
   This commit only contains one metric to keep the PR at a reasonable size. All other RocksDB metrics described in KIP-607 will be added in other PRs.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #9177:
URL: https://github.com/apache/kafka/pull/9177#issuecomment-678074711


   test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9177:
URL: https://github.com/apache/kafka/pull/9177#issuecomment-675522188


   Retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #9177:
URL: https://github.com/apache/kafka/pull/9177#issuecomment-678269275


   test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #9177:
URL: https://github.com/apache/kafka/pull/9177#issuecomment-678075215


   test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #9177:
URL: https://github.com/apache/kafka/pull/9177#issuecomment-675651901


   test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #9177:
URL: https://github.com/apache/kafka/pull/9177#issuecomment-673697329


   Call for review: @guozhangwang @vvcephei 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #9177:
URL: https://github.com/apache/kafka/pull/9177#issuecomment-674954410


   JDK 8 and Scala 1.12 had following test failures:
   
   ```
       org.apache.kafka.connect.runtime.WorkerSourceTaskWithTopicCreationTest.testCommitFailure
       kafka.api.PlaintextAdminIntegrationTest.testConsumerGroups
   ``` 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9177:
URL: https://github.com/apache/kafka/pull/9177#discussion_r478504532



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -396,34 +398,65 @@ private String cacheSensorPrefix(final String threadId, final String taskId, fin
             + SENSOR_PREFIX_DELIMITER + SENSOR_CACHE_LABEL + SENSOR_PREFIX_DELIMITER + cacheName;
     }
 
-    public final Sensor storeLevelSensor(final String threadId,
-                                         final String taskId,
+    public final Sensor storeLevelSensor(final String taskId,
                                          final String storeName,
                                          final String sensorName,
-                                         final Sensor.RecordingLevel recordingLevel,
+                                         final RecordingLevel recordingLevel,
                                          final Sensor... parents) {
-        final String key = storeSensorPrefix(threadId, taskId, storeName);
-        synchronized (storeLevelSensors) {
-            final String fullSensorName = key + SENSOR_NAME_DELIMITER + sensorName;
-            final Sensor sensor = metrics.getSensor(fullSensorName);
-            if (sensor == null) {
+        final String key = storeSensorPrefix(Thread.currentThread().getName(), taskId, storeName);
+        final String fullSensorName = key + SENSOR_NAME_DELIMITER + sensorName;
+        return Optional.ofNullable(metrics.getSensor(fullSensorName))
+            .orElseGet(() -> {
                 storeLevelSensors.computeIfAbsent(key, ignored -> new LinkedList<>()).push(fullSensorName);
                 return metrics.sensor(fullSensorName, recordingLevel, parents);
-            } else {
-                return sensor;
-            }
+            });

Review comment:
       After our offline discussion, I added some clarifications in the code.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #9177:
URL: https://github.com/apache/kafka/pull/9177#issuecomment-679884365


   test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna edited a comment on pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table

Posted by GitBox <gi...@apache.org>.
cadonna edited a comment on pull request #9177:
URL: https://github.com/apache/kafka/pull/9177#issuecomment-675909416


   If we assume, that the same thread adds and removes the state store level metrics (which is currently the case), we just need thread-safe maps `storeLevelSensors` and `storeLevelMetrics` and forgo `synchronize`. This can be done by using `ConcurrentMap` instead of `Map`. However, with the current API, we do not make that assumption, because we pass in the thread ID. I would be really happy to get rid of `synchronized` and make the changes to the API for state store level metrics. Afterwards, we should consider adapting also the other levels. WDYT?      


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9177:
URL: https://github.com/apache/kafka/pull/9177#discussion_r472250612



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -415,9 +416,40 @@ public final Sensor storeLevelSensor(final String threadId,
         }
     }
 
-    public final void removeAllStoreLevelSensors(final String threadId,
-                                                 final String taskId,
-                                                 final String storeName) {
+    public <T> void addStoreLevelMutableMetric(final String threadId,
+                                               final String taskId,
+                                               final String metricsScope,
+                                               final String storeName,
+                                               final String name,
+                                               final String description,
+                                               final RecordingLevel recordingLevel,
+                                               final Gauge<T> valueProvider) {
+        final MetricName metricName = metrics.metricName(
+            name,
+            STATE_STORE_LEVEL_GROUP,
+            description,
+            storeLevelTagMap(threadId, taskId, metricsScope, storeName)
+        );
+        if (metrics.metric(metricName) == null) {
+            final MetricConfig metricConfig = new MetricConfig().recordLevel(recordingLevel);
+            final String key = storeSensorPrefix(threadId, taskId, storeName);
+            synchronized (storeLevelMetrics) {
+                metrics.addMetric(metricName, metricConfig, valueProvider);
+                storeLevelMetrics.computeIfAbsent(key, ignored -> new LinkedList<>()).push(metricName);
+            }
+        }
+    }
+
+    public final void removeAllStoreLevelSensorsAndMetrics(final String threadId,
+                                                           final String taskId,
+                                                           final String storeName) {
+        removeAllStoreLevelMetrics(threadId, taskId, storeName);
+        removeAllStoreLevelSensors(threadId, taskId, storeName);
+    }

Review comment:
       It just seems oddly granular to synchronize them individually, since we always remove all of both collections together. If it doesn't matter, then do we need to synchronize at all?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9177:
URL: https://github.com/apache/kafka/pull/9177#discussion_r472276615



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -415,9 +416,40 @@ public final Sensor storeLevelSensor(final String threadId,
         }
     }
 
-    public final void removeAllStoreLevelSensors(final String threadId,
-                                                 final String taskId,
-                                                 final String storeName) {
+    public <T> void addStoreLevelMutableMetric(final String threadId,
+                                               final String taskId,
+                                               final String metricsScope,
+                                               final String storeName,
+                                               final String name,
+                                               final String description,
+                                               final RecordingLevel recordingLevel,
+                                               final Gauge<T> valueProvider) {
+        final MetricName metricName = metrics.metricName(
+            name,
+            STATE_STORE_LEVEL_GROUP,
+            description,
+            storeLevelTagMap(threadId, taskId, metricsScope, storeName)
+        );
+        if (metrics.metric(metricName) == null) {
+            final MetricConfig metricConfig = new MetricConfig().recordLevel(recordingLevel);
+            final String key = storeSensorPrefix(threadId, taskId, storeName);
+            synchronized (storeLevelMetrics) {
+                metrics.addMetric(metricName, metricConfig, valueProvider);
+                storeLevelMetrics.computeIfAbsent(key, ignored -> new LinkedList<>()).push(metricName);
+            }
+        }
+    }
+
+    public final void removeAllStoreLevelSensorsAndMetrics(final String threadId,
+                                                           final String taskId,
+                                                           final String storeName) {
+        removeAllStoreLevelMetrics(threadId, taskId, storeName);
+        removeAllStoreLevelSensors(threadId, taskId, storeName);
+    }

Review comment:
       Yes, we need to synchronize. At least, we have to ensure that lines 411 and 438 are thread-safe. Then, if we do not want to have duplicates in `storeLevelSensors` we should ensure to have a lock between line 409 to 411. Between line 434 and 439, we need to ensure that the removal of all store level metrics completed otherwise it could happen that we find a store level metric that would prevent the addition of a metric but then earlier found metric would be removed during the remainder of the removal process. Similar is true for the store level sensors.
   
   It is true that we always remove all of both collections together, but we do not add metric names and sensor names to both collections together.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #9177:
URL: https://github.com/apache/kafka/pull/9177#issuecomment-678074413


   Retest this, please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9177:
URL: https://github.com/apache/kafka/pull/9177#discussion_r472100951



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java
##########
@@ -56,6 +62,9 @@ public void maybeCloseStatistics() {
         }
     }
 
+    private static final String ROCKSDB_PROPERTIES_PREFIX = "rocksdb.";
+    private static final ByteBuffer CONVERSION_BUFFER = ByteBuffer.allocate(Long.BYTES);

Review comment:
       Good point! I missed that the gauge can be called by multiple metrics reporters concurrently.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9177:
URL: https://github.com/apache/kafka/pull/9177#discussion_r472276615



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -415,9 +416,40 @@ public final Sensor storeLevelSensor(final String threadId,
         }
     }
 
-    public final void removeAllStoreLevelSensors(final String threadId,
-                                                 final String taskId,
-                                                 final String storeName) {
+    public <T> void addStoreLevelMutableMetric(final String threadId,
+                                               final String taskId,
+                                               final String metricsScope,
+                                               final String storeName,
+                                               final String name,
+                                               final String description,
+                                               final RecordingLevel recordingLevel,
+                                               final Gauge<T> valueProvider) {
+        final MetricName metricName = metrics.metricName(
+            name,
+            STATE_STORE_LEVEL_GROUP,
+            description,
+            storeLevelTagMap(threadId, taskId, metricsScope, storeName)
+        );
+        if (metrics.metric(metricName) == null) {
+            final MetricConfig metricConfig = new MetricConfig().recordLevel(recordingLevel);
+            final String key = storeSensorPrefix(threadId, taskId, storeName);
+            synchronized (storeLevelMetrics) {
+                metrics.addMetric(metricName, metricConfig, valueProvider);
+                storeLevelMetrics.computeIfAbsent(key, ignored -> new LinkedList<>()).push(metricName);
+            }
+        }
+    }
+
+    public final void removeAllStoreLevelSensorsAndMetrics(final String threadId,
+                                                           final String taskId,
+                                                           final String storeName) {
+        removeAllStoreLevelMetrics(threadId, taskId, storeName);
+        removeAllStoreLevelSensors(threadId, taskId, storeName);
+    }

Review comment:
       Yes, we need to synchronize. At least, we have to ensure that lines 411 and 438 are thread-safe. Then, if we do not want to have duplicates in `storeLevelSensors` we should ensure to have a lock between line 409 to 411. Between line 434 and 439, we need to ensure that the removal of all store level metrics completed otherwise it could happen that we find a store level metric that would prevent the addition of a metric but then the earlier found metric would be removed during the remainder of the removal process. Similar is true for the store level sensors.
   
   It is true that we always remove all of both collections together, but we do not add metric names and sensor names to both collections together.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9177:
URL: https://github.com/apache/kafka/pull/9177#discussion_r472429141



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetrics.java
##########
@@ -33,7 +35,7 @@
 import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addSumMetricToSensor;
 import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addValueMetricToSensor;
 
-public class RocksDBMetrics {
+public class  RocksDBMetrics {

Review comment:
       nit: extra space.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -415,9 +416,40 @@ public final Sensor storeLevelSensor(final String threadId,
         }
     }
 
-    public final void removeAllStoreLevelSensors(final String threadId,
-                                                 final String taskId,
-                                                 final String storeName) {
+    public <T> void addStoreLevelMutableMetric(final String threadId,
+                                               final String taskId,
+                                               final String metricsScope,
+                                               final String storeName,
+                                               final String name,
+                                               final String description,
+                                               final RecordingLevel recordingLevel,
+                                               final Gauge<T> valueProvider) {
+        final MetricName metricName = metrics.metricName(
+            name,
+            STATE_STORE_LEVEL_GROUP,
+            description,
+            storeLevelTagMap(threadId, taskId, metricsScope, storeName)
+        );
+        if (metrics.metric(metricName) == null) {
+            final MetricConfig metricConfig = new MetricConfig().recordLevel(recordingLevel);
+            final String key = storeSensorPrefix(threadId, taskId, storeName);
+            synchronized (storeLevelMetrics) {
+                metrics.addMetric(metricName, metricConfig, valueProvider);
+                storeLevelMetrics.computeIfAbsent(key, ignored -> new LinkedList<>()).push(metricName);
+            }
+        }
+    }
+
+    public final void removeAllStoreLevelSensorsAndMetrics(final String threadId,
+                                                           final String taskId,
+                                                           final String storeName) {
+        removeAllStoreLevelMetrics(threadId, taskId, storeName);
+        removeAllStoreLevelSensors(threadId, taskId, storeName);
+    }

Review comment:
       This may be a bit paranoid, but when adding them, the order seem to be `initSensors` first and `initGauges`, while removing we call `removeAllStoreLevelMetrics` first and then the other. I know that today there should be not concurrent threads trying to init / removeAll concurrently, but just to be safe maybe we can make the call ordering to be sensors first and then gauges?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9177:
URL: https://github.com/apache/kafka/pull/9177#issuecomment-682208742


   Since the PR builds aren't running, I'm merging with trunk on my machine and running the tests.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9177:
URL: https://github.com/apache/kafka/pull/9177#issuecomment-682235389


   Hey @cadonna , after checking out your branch and merging with trunk, I discovered some minor conflicts with 22bcd9fac3c988c15862d0b6c01930814b676253, so I pushed a quick fix before squashing and merging. I hope this is ok.
   
   Thanks again for the PR!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #9177:
URL: https://github.com/apache/kafka/pull/9177#issuecomment-675909416


   If we assume, that the same thread adds and removes the state store level metrics (which is currently the case), we just need thread-safe maps `storeLevelSensors` and `storeLevelMetrics` and forgo `synchronize`. This can be done by using `ConcurrentMap` instead of `Map`. However, with the current API, we do not make that assumption, because we pass in the thread ID. I would be really happy to get rid of `synchronized` and make the changes to the API for state store level metrics. Afterwards, we should consider adapting also the other levels.      


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9177:
URL: https://github.com/apache/kafka/pull/9177#discussion_r477424578



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -396,34 +398,65 @@ private String cacheSensorPrefix(final String threadId, final String taskId, fin
             + SENSOR_PREFIX_DELIMITER + SENSOR_CACHE_LABEL + SENSOR_PREFIX_DELIMITER + cacheName;
     }
 
-    public final Sensor storeLevelSensor(final String threadId,
-                                         final String taskId,
+    public final Sensor storeLevelSensor(final String taskId,
                                          final String storeName,
                                          final String sensorName,
-                                         final Sensor.RecordingLevel recordingLevel,
+                                         final RecordingLevel recordingLevel,
                                          final Sensor... parents) {
-        final String key = storeSensorPrefix(threadId, taskId, storeName);
-        synchronized (storeLevelSensors) {
-            final String fullSensorName = key + SENSOR_NAME_DELIMITER + sensorName;
-            final Sensor sensor = metrics.getSensor(fullSensorName);
-            if (sensor == null) {
+        final String key = storeSensorPrefix(Thread.currentThread().getName(), taskId, storeName);
+        final String fullSensorName = key + SENSOR_NAME_DELIMITER + sensorName;
+        return Optional.ofNullable(metrics.getSensor(fullSensorName))
+            .orElseGet(() -> {
                 storeLevelSensors.computeIfAbsent(key, ignored -> new LinkedList<>()).push(fullSensorName);
                 return metrics.sensor(fullSensorName, recordingLevel, parents);
-            } else {
-                return sensor;
-            }
+            });

Review comment:
       I'm still mildly concerned about walking back the synchronization here, but I can't think of a realistic scenario in which we'd get a concurrency bug. Then again, the whole point of defaulting to less granular concurrency controls is that it's hard to imagine all the possible scenarios.
   
   In this case, it really doesn't seem like there's a good reason to go for super granular concurrency control. Did we spend a lot of time blocked registering sensors before?
   
   Actually, one condition comes to mind: LinkedList is not threadsafe, and accessing the ConcurrentHashMap value is only either a CAS or volatile read, so it doesn't create a memory barrier as `synchronized` does. Therefore, different threads will only be looking at their own locally cached list for each value in the map, although they'll all agree on the set of keys in the map.
   
   If you want to push the current implementation style, then you should use a ConcurrentLinkedDeque instead of LinkedList, but I'd really prefer to see the `synchronized` blocks come back unless/until there's a compelling performance reason to drop them.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei edited a comment on pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table

Posted by GitBox <gi...@apache.org>.
vvcephei edited a comment on pull request #9177:
URL: https://github.com/apache/kafka/pull/9177#issuecomment-675522188


   Looks like Jenkins shut down during the run last time or something.
   
   Retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9177:
URL: https://github.com/apache/kafka/pull/9177#discussion_r471582580



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -415,9 +416,40 @@ public final Sensor storeLevelSensor(final String threadId,
         }
     }
 
-    public final void removeAllStoreLevelSensors(final String threadId,
-                                                 final String taskId,
-                                                 final String storeName) {
+    public <T> void addStoreLevelMutableMetric(final String threadId,
+                                               final String taskId,
+                                               final String metricsScope,
+                                               final String storeName,
+                                               final String name,
+                                               final String description,
+                                               final RecordingLevel recordingLevel,
+                                               final Gauge<T> valueProvider) {
+        final MetricName metricName = metrics.metricName(
+            name,
+            STATE_STORE_LEVEL_GROUP,
+            description,
+            storeLevelTagMap(threadId, taskId, metricsScope, storeName)
+        );
+        if (metrics.metric(metricName) == null) {
+            final MetricConfig metricConfig = new MetricConfig().recordLevel(recordingLevel);
+            final String key = storeSensorPrefix(threadId, taskId, storeName);
+            synchronized (storeLevelMetrics) {

Review comment:
       Should we check `if (metrics.metric(metricName) == null)` again after synchronizing?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -415,9 +416,40 @@ public final Sensor storeLevelSensor(final String threadId,
         }
     }
 
-    public final void removeAllStoreLevelSensors(final String threadId,
-                                                 final String taskId,
-                                                 final String storeName) {
+    public <T> void addStoreLevelMutableMetric(final String threadId,
+                                               final String taskId,
+                                               final String metricsScope,
+                                               final String storeName,
+                                               final String name,
+                                               final String description,
+                                               final RecordingLevel recordingLevel,
+                                               final Gauge<T> valueProvider) {
+        final MetricName metricName = metrics.metricName(
+            name,
+            STATE_STORE_LEVEL_GROUP,
+            description,
+            storeLevelTagMap(threadId, taskId, metricsScope, storeName)
+        );
+        if (metrics.metric(metricName) == null) {
+            final MetricConfig metricConfig = new MetricConfig().recordLevel(recordingLevel);
+            final String key = storeSensorPrefix(threadId, taskId, storeName);
+            synchronized (storeLevelMetrics) {
+                metrics.addMetric(metricName, metricConfig, valueProvider);
+                storeLevelMetrics.computeIfAbsent(key, ignored -> new LinkedList<>()).push(metricName);
+            }
+        }
+    }
+
+    public final void removeAllStoreLevelSensorsAndMetrics(final String threadId,
+                                                           final String taskId,
+                                                           final String storeName) {
+        removeAllStoreLevelMetrics(threadId, taskId, storeName);
+        removeAllStoreLevelSensors(threadId, taskId, storeName);
+    }

Review comment:
       Should we make this all one method, and also synchronize both storeLevel collections on a single monitor?

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
##########
@@ -609,6 +611,37 @@ public void shouldVerifyThatMetricsGetMeasurementsFromRocksDB() {
         assertThat((double) bytesWrittenTotal.metricValue(), greaterThan(0d));
     }
 
+    @Test
+    public void shouldVerifyThatMetricsRecordedFromPropertiesGetMeasurementsFromRocksDB() {
+        final TaskId taskId = new TaskId(0, 0);
+
+        final Metrics metrics = new Metrics(new MetricConfig().recordLevel(RecordingLevel.INFO));
+        final StreamsMetricsImpl streamsMetrics =
+            new StreamsMetricsImpl(metrics, "test-application", StreamsConfig.METRICS_LATEST, time);
+
+        context = EasyMock.niceMock(InternalMockProcessorContext.class);
+        EasyMock.expect(context.metrics()).andStubReturn(streamsMetrics);
+        EasyMock.expect(context.taskId()).andStubReturn(taskId);
+        EasyMock.expect(context.appConfigs())
+                .andStubReturn(new StreamsConfig(StreamsTestUtils.getStreamsConfig()).originals());
+        EasyMock.expect(context.stateDir()).andStubReturn(dir);
+        EasyMock.replay(context);
+
+        rocksDBStore.init(context, rocksDBStore);
+        final byte[] key = "hello".getBytes();
+        final byte[] value = "world".getBytes();
+        rocksDBStore.put(Bytes.wrap(key), value);
+
+        final Metric numberOfEntriesActiveMemTable = metrics.metric(new MetricName(
+            "num-entries-active-mem-table",
+            StreamsMetricsImpl.STATE_STORE_LEVEL_GROUP,
+            "description is not verified",
+            streamsMetrics.storeLevelTagMap(Thread.currentThread().getName(), taskId.toString(), METRICS_SCOPE, DB_NAME)
+        ));
+        assertThat(numberOfEntriesActiveMemTable, notNullValue());
+        assertThat((BigInteger) numberOfEntriesActiveMemTable.metricValue(), greaterThan(BigInteger.valueOf(0)));

Review comment:
       would it not be exactly `1`?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java
##########
@@ -56,6 +62,9 @@ public void maybeCloseStatistics() {
         }
     }
 
+    private static final String ROCKSDB_PROPERTIES_PREFIX = "rocksdb.";
+    private static final ByteBuffer CONVERSION_BUFFER = ByteBuffer.allocate(Long.BYTES);

Review comment:
       It seems a little risky to use this in a multithreaded context. Why not just create a new short-lived buffer each time for the conversion?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9177:
URL: https://github.com/apache/kafka/pull/9177#discussion_r472200360



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -415,9 +416,40 @@ public final Sensor storeLevelSensor(final String threadId,
         }
     }
 
-    public final void removeAllStoreLevelSensors(final String threadId,
-                                                 final String taskId,
-                                                 final String storeName) {
+    public <T> void addStoreLevelMutableMetric(final String threadId,
+                                               final String taskId,
+                                               final String metricsScope,
+                                               final String storeName,
+                                               final String name,
+                                               final String description,
+                                               final RecordingLevel recordingLevel,
+                                               final Gauge<T> valueProvider) {
+        final MetricName metricName = metrics.metricName(
+            name,
+            STATE_STORE_LEVEL_GROUP,
+            description,
+            storeLevelTagMap(threadId, taskId, metricsScope, storeName)
+        );
+        if (metrics.metric(metricName) == null) {
+            final MetricConfig metricConfig = new MetricConfig().recordLevel(recordingLevel);
+            final String key = storeSensorPrefix(threadId, taskId, storeName);
+            synchronized (storeLevelMetrics) {
+                metrics.addMetric(metricName, metricConfig, valueProvider);
+                storeLevelMetrics.computeIfAbsent(key, ignored -> new LinkedList<>()).push(metricName);
+            }
+        }
+    }
+
+    public final void removeAllStoreLevelSensorsAndMetrics(final String threadId,
+                                                           final String taskId,
+                                                           final String storeName) {
+        removeAllStoreLevelMetrics(threadId, taskId, storeName);
+        removeAllStoreLevelSensors(threadId, taskId, storeName);
+    }

Review comment:
       Do you have performance concerns due to the two monitors? Or what is the main reason for using a single monitor here? By using a single monitor here and in `addStoreLevelMutableMetric()` and `storeLevelSensor()`, we do not ensure that no metrics are added to the metrics map during removal of all metrics because each time  `Sensor#add()` is called a metric is added without synchronizing on the monitor of `storeLevelSensors`. Single operations on the metrics map are synchronized (through `ConcurrentMap`), but not multiple operations.
   
    

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
##########
@@ -609,6 +611,37 @@ public void shouldVerifyThatMetricsGetMeasurementsFromRocksDB() {
         assertThat((double) bytesWrittenTotal.metricValue(), greaterThan(0d));
     }
 
+    @Test
+    public void shouldVerifyThatMetricsRecordedFromPropertiesGetMeasurementsFromRocksDB() {
+        final TaskId taskId = new TaskId(0, 0);
+
+        final Metrics metrics = new Metrics(new MetricConfig().recordLevel(RecordingLevel.INFO));
+        final StreamsMetricsImpl streamsMetrics =
+            new StreamsMetricsImpl(metrics, "test-application", StreamsConfig.METRICS_LATEST, time);
+
+        context = EasyMock.niceMock(InternalMockProcessorContext.class);
+        EasyMock.expect(context.metrics()).andStubReturn(streamsMetrics);
+        EasyMock.expect(context.taskId()).andStubReturn(taskId);
+        EasyMock.expect(context.appConfigs())
+                .andStubReturn(new StreamsConfig(StreamsTestUtils.getStreamsConfig()).originals());
+        EasyMock.expect(context.stateDir()).andStubReturn(dir);
+        EasyMock.replay(context);
+
+        rocksDBStore.init(context, rocksDBStore);
+        final byte[] key = "hello".getBytes();
+        final byte[] value = "world".getBytes();
+        rocksDBStore.put(Bytes.wrap(key), value);
+
+        final Metric numberOfEntriesActiveMemTable = metrics.metric(new MetricName(
+            "num-entries-active-mem-table",
+            StreamsMetricsImpl.STATE_STORE_LEVEL_GROUP,
+            "description is not verified",
+            streamsMetrics.storeLevelTagMap(Thread.currentThread().getName(), taskId.toString(), METRICS_SCOPE, DB_NAME)
+        ));
+        assertThat(numberOfEntriesActiveMemTable, notNullValue());
+        assertThat((BigInteger) numberOfEntriesActiveMemTable.metricValue(), greaterThan(BigInteger.valueOf(0)));

Review comment:
       Yes, but in this test I merely test whether the metric is updated. The correctness of the computation is verified in `RocksDBMetricsRecorderGaugesTest`. I will improve this test to verify that the metric is zero before the put and greater than zero after the put.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -415,9 +416,40 @@ public final Sensor storeLevelSensor(final String threadId,
         }
     }
 
-    public final void removeAllStoreLevelSensors(final String threadId,
-                                                 final String taskId,
-                                                 final String storeName) {
+    public <T> void addStoreLevelMutableMetric(final String threadId,
+                                               final String taskId,
+                                               final String metricsScope,
+                                               final String storeName,
+                                               final String name,
+                                               final String description,
+                                               final RecordingLevel recordingLevel,
+                                               final Gauge<T> valueProvider) {
+        final MetricName metricName = metrics.metricName(
+            name,
+            STATE_STORE_LEVEL_GROUP,
+            description,
+            storeLevelTagMap(threadId, taskId, metricsScope, storeName)
+        );
+        if (metrics.metric(metricName) == null) {
+            final MetricConfig metricConfig = new MetricConfig().recordLevel(recordingLevel);
+            final String key = storeSensorPrefix(threadId, taskId, storeName);
+            synchronized (storeLevelMetrics) {

Review comment:
       I will do that to ensure that the `removeAllStoreLevelMetrics()` completes before we do the check. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #9177:
URL: https://github.com/apache/kafka/pull/9177#issuecomment-678075020


   ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei merged pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table

Posted by GitBox <gi...@apache.org>.
vvcephei merged pull request #9177:
URL: https://github.com/apache/kafka/pull/9177


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #9177:
URL: https://github.com/apache/kafka/pull/9177#issuecomment-683631452


   @vvcephei Thank you very much for taking care of the conflicts and merging the PR!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9177:
URL: https://github.com/apache/kafka/pull/9177#discussion_r478709525



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -396,34 +398,65 @@ private String cacheSensorPrefix(final String threadId, final String taskId, fin
             + SENSOR_PREFIX_DELIMITER + SENSOR_CACHE_LABEL + SENSOR_PREFIX_DELIMITER + cacheName;
     }
 
-    public final Sensor storeLevelSensor(final String threadId,
-                                         final String taskId,
+    public final Sensor storeLevelSensor(final String taskId,
                                          final String storeName,
                                          final String sensorName,
-                                         final Sensor.RecordingLevel recordingLevel,
+                                         final RecordingLevel recordingLevel,
                                          final Sensor... parents) {
-        final String key = storeSensorPrefix(threadId, taskId, storeName);
-        synchronized (storeLevelSensors) {
-            final String fullSensorName = key + SENSOR_NAME_DELIMITER + sensorName;
-            final Sensor sensor = metrics.getSensor(fullSensorName);
-            if (sensor == null) {
+        final String key = storeSensorPrefix(Thread.currentThread().getName(), taskId, storeName);
+        final String fullSensorName = key + SENSOR_NAME_DELIMITER + sensorName;
+        return Optional.ofNullable(metrics.getSensor(fullSensorName))
+            .orElseGet(() -> {
                 storeLevelSensors.computeIfAbsent(key, ignored -> new LinkedList<>()).push(fullSensorName);
                 return metrics.sensor(fullSensorName, recordingLevel, parents);
-            } else {
-                return sensor;
-            }
+            });

Review comment:
       Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #9177:
URL: https://github.com/apache/kafka/pull/9177#issuecomment-676001391


   I removed `synchronized` during adding and removing store level sensors and metrics. Let me know what you think. If you do not like it, we can revert the last commit. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9177:
URL: https://github.com/apache/kafka/pull/9177#issuecomment-675521788


   Thanks for the update, @cadonna . Just one reply above.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #9177:
URL: https://github.com/apache/kafka/pull/9177#issuecomment-677836579


   LGTM. Leaving to @vvcephei for final review and merge.
   
   I think it is okay to not bookkeep the thread id and always rely on currentThread() assuming only one thread would be accessing the store level sensors.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org