You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2020/04/15 20:51:00 UTC

[kafka] branch 2.5 updated: KAFKA-9675: Fix bug that prevents RocksDB metrics to be updated (#8256)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new 2808a05  KAFKA-9675: Fix bug that prevents RocksDB metrics to be updated (#8256)
2808a05 is described below

commit 2808a05b01ef1ca23870b5d7d4897dc02129b6bb
Author: Bruno Cadonna <br...@confluent.io>
AuthorDate: Thu Mar 12 19:51:33 2020 +0100

    KAFKA-9675: Fix bug that prevents RocksDB metrics to be updated (#8256)
    
    Reviewers: John Roesler <vv...@apache.org>
---
 .../streams/state/internals/RocksDBStore.java      |   7 +-
 .../integration/RocksDBMetricsIntegrationTest.java | 130 ++++++++++++++++-----
 2 files changed, 108 insertions(+), 29 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 2b9b3f6..bf6a033 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -188,11 +188,12 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt
             throw new ProcessorStateException(fatal);
         }
 
+        // Setup metrics before the database is opened, otherwise the metrics are not updated
+        // with the measurements from Rocks DB
+        maybeSetUpMetricsRecorder(context, configs);
+
         openRocksDB(dbOptions, columnFamilyOptions);
         open = true;
-
-        // Do this last because the prior operations could throw exceptions.
-        maybeSetUpMetricsRecorder(context, configs);
     }
 
     private void maybeSetUpMetricsRecorder(final ProcessorContext context, final Map<String, Object> configs) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
index b0ac1fa..34cc428 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
@@ -42,7 +42,6 @@ import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -61,6 +60,10 @@ import java.util.List;
 import java.util.Properties;
 import java.util.stream.Collectors;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+
 @Category({IntegrationTest.class})
 @RunWith(Parameterized.class)
 public class RocksDBMetricsIntegrationTest {
@@ -74,8 +77,10 @@ public class RocksDBMetricsIntegrationTest {
     private static final String STREAM_OUTPUT = "STREAM_OUTPUT";
     private static final String MY_STORE_PERSISTENT_KEY_VALUE = "myStorePersistentKeyValue";
     private static final Duration WINDOW_SIZE = Duration.ofMillis(50);
+    private static final long TIMEOUT = 60000;
 
     // RocksDB metrics
+    private static final String METRICS_GROUP = "stream-state-metrics";
     private static final String BYTES_WRITTEN_RATE = "bytes-written-rate";
     private static final String BYTES_WRITTEN_TOTAL = "bytes-written-total";
     private static final String BYTES_READ_RATE = "bytes-read-rate";
@@ -114,26 +119,36 @@ public class RocksDBMetricsIntegrationTest {
         CLUSTER.deleteTopicsAndWait(STREAM_INPUT, STREAM_OUTPUT);
     }
 
+    @FunctionalInterface
+    private interface MetricsVerifier {
+        void verify(final KafkaStreams kafkaStreams, final String metricScope) throws Exception;
+    }
+
     @Test
     public void shouldExposeRocksDBMetricsForNonSegmentedStateStoreBeforeAndAfterFailureWithEmptyStateDir() throws Exception {
         final Properties streamsConfiguration = streamsConfig();
         IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
         final StreamsBuilder builder = builderForNonSegmentedStateStore();
+        final String metricsScope = "rocksdb-state-id";
 
-        cleanUpStateRunAndVerify(
+        cleanUpStateRunVerifyAndClose(
             builder,
             streamsConfiguration,
             IntegerDeserializer.class,
             StringDeserializer.class,
-            "rocksdb-state-id"
+            this::verifyThatRocksDBMetricsAreExposed,
+            metricsScope
         );
 
-        cleanUpStateRunAndVerify(
+        // simulated failure
+
+        cleanUpStateRunVerifyAndClose(
             builder,
             streamsConfiguration,
             IntegerDeserializer.class,
             StringDeserializer.class,
-            "rocksdb-state-id"
+            this::verifyThatRocksDBMetricsAreExposed,
+            metricsScope
         );
     }
 
@@ -142,21 +157,60 @@ public class RocksDBMetricsIntegrationTest {
         final Properties streamsConfiguration = streamsConfig();
         IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
         final StreamsBuilder builder = builderForSegmentedStateStore();
+        final String metricsScope = "rocksdb-window-state-id";
 
-        cleanUpStateRunAndVerify(
+        cleanUpStateRunVerifyAndClose(
             builder,
             streamsConfiguration,
             LongDeserializer.class,
             LongDeserializer.class,
-            "rocksdb-window-state-id"
+            this::verifyThatRocksDBMetricsAreExposed,
+            metricsScope
+        );
+
+        // simulated failure
+
+        cleanUpStateRunVerifyAndClose(
+            builder,
+            streamsConfiguration,
+            LongDeserializer.class,
+            LongDeserializer.class,
+            this::verifyThatRocksDBMetricsAreExposed,
+            metricsScope
+        );
+    }
+
+    @Test
+    public void shouldVerifyThatMetricsGetMeasurementsFromRocksDBForNonSegmentedStateStore() throws Exception {
+        final Properties streamsConfiguration = streamsConfig();
+        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+        final StreamsBuilder builder = builderForNonSegmentedStateStore();
+        final String metricsScope = "rocksdb-state-id";
+
+        cleanUpStateRunVerifyAndClose(
+            builder,
+            streamsConfiguration,
+            IntegerDeserializer.class,
+            StringDeserializer.class,
+            this::verifyThatBytesWrittenTotalIncreases,
+            metricsScope
         );
+    }
+
+    @Test
+    public void shouldVerifyThatMetricsGetMeasurementsFromRocksDBForSegmentedStateStore() throws Exception {
+        final Properties streamsConfiguration = streamsConfig();
+        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+        final StreamsBuilder builder = builderForSegmentedStateStore();
+        final String metricsScope = "rocksdb-window-state-id";
 
-        cleanUpStateRunAndVerify(
+        cleanUpStateRunVerifyAndClose(
             builder,
             streamsConfiguration,
             LongDeserializer.class,
             LongDeserializer.class,
-            "rocksdb-window-state-id"
+            this::verifyThatBytesWrittenTotalIncreases,
+            metricsScope
         );
     }
 
@@ -167,7 +221,6 @@ public class RocksDBMetricsIntegrationTest {
         streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
         streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         streamsConfiguration.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.DEBUG.name);
-        streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L);
         streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingGuarantee);
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         return streamsConfiguration;
@@ -198,16 +251,17 @@ public class RocksDBMetricsIntegrationTest {
         return builder;
     }
 
-    private void cleanUpStateRunAndVerify(final StreamsBuilder builder,
-                                          final Properties streamsConfiguration,
-                                          final Class outputKeyDeserializer,
-                                          final Class outputValueDeserializer,
-                                          final String metricsScope) throws Exception {
+    private void cleanUpStateRunVerifyAndClose(final StreamsBuilder builder,
+                                               final Properties streamsConfiguration,
+                                               final Class outputKeyDeserializer,
+                                               final Class outputValueDeserializer,
+                                               final MetricsVerifier metricsVerifier,
+                                               final String metricsScope) throws Exception {
         final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
         kafkaStreams.cleanUp();
         produceRecords();
 
-        StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams, 60000);
+        StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams, TIMEOUT);
 
         IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
             TestUtils.consumerConfig(
@@ -220,7 +274,7 @@ public class RocksDBMetricsIntegrationTest {
             STREAM_OUTPUT,
             1
         );
-        verifyRocksDBMetrics(kafkaStreams, metricsScope);
+        metricsVerifier.verify(kafkaStreams, metricsScope);
         kafkaStreams.close();
     }
 
@@ -261,10 +315,9 @@ public class RocksDBMetricsIntegrationTest {
         );
     }
 
-    private void verifyRocksDBMetrics(final KafkaStreams kafkaStreams, final String metricsScope) {
-        final List<Metric> listMetricStore = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
-            .filter(m -> m.metricName().group().equals("stream-state-metrics") && m.metricName().tags().containsKey(metricsScope))
-            .collect(Collectors.toList());
+    private void verifyThatRocksDBMetricsAreExposed(final KafkaStreams kafkaStreams,
+                                                    final String metricsScope) {
+        final List<Metric> listMetricStore = getRocksDBMetrics(kafkaStreams, metricsScope);
         checkMetricByName(listMetricStore, BYTES_WRITTEN_RATE, 1);
         checkMetricByName(listMetricStore, BYTES_WRITTEN_TOTAL, 1);
         checkMetricByName(listMetricStore, BYTES_READ_RATE, 1);
@@ -283,13 +336,38 @@ public class RocksDBMetricsIntegrationTest {
         checkMetricByName(listMetricStore, NUMBER_OF_FILE_ERRORS, 1);
     }
 
-    private void checkMetricByName(final List<Metric> listMetric, final String metricName, final int numMetric) {
+    private void checkMetricByName(final List<Metric> listMetric,
+                                   final String metricName,
+                                   final int numMetric) {
         final List<Metric> metrics = listMetric.stream()
             .filter(m -> m.metricName().name().equals(metricName))
             .collect(Collectors.toList());
-        Assert.assertEquals("Size of metrics of type:'" + metricName + "' must be equal to " + numMetric + " but it's equal to " + metrics.size(), numMetric, metrics.size());
-        for (final Metric m : metrics) {
-            Assert.assertNotNull("Metric:'" + m.metricName() + "' must be not null", m.metricValue());
+        assertThat(
+            "Size of metrics of type:'" + metricName + "' must be equal to " + numMetric + " but it's equal to " + metrics.size(),
+            metrics.size(),
+            is(numMetric)
+        );
+        for (final Metric metric : metrics) {
+            assertThat("Metric:'" + metric.metricName() + "' must be not null", metric.metricValue(), is(notNullValue()));
         }
     }
-}
+
+    private void verifyThatBytesWrittenTotalIncreases(final KafkaStreams kafkaStreams,
+                                                      final String metricsScope) throws InterruptedException {
+        final List<Metric> metric = getRocksDBMetrics(kafkaStreams, metricsScope).stream()
+            .filter(m -> BYTES_WRITTEN_TOTAL.equals(m.metricName().name()))
+            .collect(Collectors.toList());
+        TestUtils.waitForCondition(
+            () -> (double) metric.get(0).metricValue() > 0,
+            TIMEOUT,
+            () -> "RocksDB metric bytes.written.total did not increase in " + TIMEOUT + " ms"
+        );
+    }
+
+    private List<Metric> getRocksDBMetrics(final KafkaStreams kafkaStreams,
+                                           final String metricsScope) {
+        return new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
+            .filter(m -> m.metricName().group().equals(METRICS_GROUP) && m.metricName().tags().containsKey(metricsScope))
+            .collect(Collectors.toList());
+    }
+}
\ No newline at end of file