You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/08/17 16:28:13 UTC

[GitHub] [kafka] vvcephei commented on a change in pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table

vvcephei commented on a change in pull request #9177:
URL: https://github.com/apache/kafka/pull/9177#discussion_r471582580



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -415,9 +416,40 @@ public final Sensor storeLevelSensor(final String threadId,
         }
     }
 
-    public final void removeAllStoreLevelSensors(final String threadId,
-                                                 final String taskId,
-                                                 final String storeName) {
+    public <T> void addStoreLevelMutableMetric(final String threadId,
+                                               final String taskId,
+                                               final String metricsScope,
+                                               final String storeName,
+                                               final String name,
+                                               final String description,
+                                               final RecordingLevel recordingLevel,
+                                               final Gauge<T> valueProvider) {
+        final MetricName metricName = metrics.metricName(
+            name,
+            STATE_STORE_LEVEL_GROUP,
+            description,
+            storeLevelTagMap(threadId, taskId, metricsScope, storeName)
+        );
+        if (metrics.metric(metricName) == null) {
+            final MetricConfig metricConfig = new MetricConfig().recordLevel(recordingLevel);
+            final String key = storeSensorPrefix(threadId, taskId, storeName);
+            synchronized (storeLevelMetrics) {

Review comment:
       Should we check `if (metrics.metric(metricName) == null)` again after synchronizing?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -415,9 +416,40 @@ public final Sensor storeLevelSensor(final String threadId,
         }
     }
 
-    public final void removeAllStoreLevelSensors(final String threadId,
-                                                 final String taskId,
-                                                 final String storeName) {
+    public <T> void addStoreLevelMutableMetric(final String threadId,
+                                               final String taskId,
+                                               final String metricsScope,
+                                               final String storeName,
+                                               final String name,
+                                               final String description,
+                                               final RecordingLevel recordingLevel,
+                                               final Gauge<T> valueProvider) {
+        final MetricName metricName = metrics.metricName(
+            name,
+            STATE_STORE_LEVEL_GROUP,
+            description,
+            storeLevelTagMap(threadId, taskId, metricsScope, storeName)
+        );
+        if (metrics.metric(metricName) == null) {
+            final MetricConfig metricConfig = new MetricConfig().recordLevel(recordingLevel);
+            final String key = storeSensorPrefix(threadId, taskId, storeName);
+            synchronized (storeLevelMetrics) {
+                metrics.addMetric(metricName, metricConfig, valueProvider);
+                storeLevelMetrics.computeIfAbsent(key, ignored -> new LinkedList<>()).push(metricName);
+            }
+        }
+    }
+
+    public final void removeAllStoreLevelSensorsAndMetrics(final String threadId,
+                                                           final String taskId,
+                                                           final String storeName) {
+        removeAllStoreLevelMetrics(threadId, taskId, storeName);
+        removeAllStoreLevelSensors(threadId, taskId, storeName);
+    }

Review comment:
       Should we make this all one method, and also synchronize both storeLevel collections on a single monitor?

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
##########
@@ -609,6 +611,37 @@ public void shouldVerifyThatMetricsGetMeasurementsFromRocksDB() {
         assertThat((double) bytesWrittenTotal.metricValue(), greaterThan(0d));
     }
 
+    @Test
+    public void shouldVerifyThatMetricsRecordedFromPropertiesGetMeasurementsFromRocksDB() {
+        final TaskId taskId = new TaskId(0, 0);
+
+        final Metrics metrics = new Metrics(new MetricConfig().recordLevel(RecordingLevel.INFO));
+        final StreamsMetricsImpl streamsMetrics =
+            new StreamsMetricsImpl(metrics, "test-application", StreamsConfig.METRICS_LATEST, time);
+
+        context = EasyMock.niceMock(InternalMockProcessorContext.class);
+        EasyMock.expect(context.metrics()).andStubReturn(streamsMetrics);
+        EasyMock.expect(context.taskId()).andStubReturn(taskId);
+        EasyMock.expect(context.appConfigs())
+                .andStubReturn(new StreamsConfig(StreamsTestUtils.getStreamsConfig()).originals());
+        EasyMock.expect(context.stateDir()).andStubReturn(dir);
+        EasyMock.replay(context);
+
+        rocksDBStore.init(context, rocksDBStore);
+        final byte[] key = "hello".getBytes();
+        final byte[] value = "world".getBytes();
+        rocksDBStore.put(Bytes.wrap(key), value);
+
+        final Metric numberOfEntriesActiveMemTable = metrics.metric(new MetricName(
+            "num-entries-active-mem-table",
+            StreamsMetricsImpl.STATE_STORE_LEVEL_GROUP,
+            "description is not verified",
+            streamsMetrics.storeLevelTagMap(Thread.currentThread().getName(), taskId.toString(), METRICS_SCOPE, DB_NAME)
+        ));
+        assertThat(numberOfEntriesActiveMemTable, notNullValue());
+        assertThat((BigInteger) numberOfEntriesActiveMemTable.metricValue(), greaterThan(BigInteger.valueOf(0)));

Review comment:
       would it not be exactly `1`?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java
##########
@@ -56,6 +62,9 @@ public void maybeCloseStatistics() {
         }
     }
 
+    private static final String ROCKSDB_PROPERTIES_PREFIX = "rocksdb.";
+    private static final ByteBuffer CONVERSION_BUFFER = ByteBuffer.allocate(Long.BYTES);

Review comment:
       It seems a little risky to use this in a multithreaded context. Why not just create a new short-lived buffer each time for the conversion?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org