You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2020/04/15 20:51:00 UTC
[kafka] branch 2.5 updated: KAFKA-9675: Fix bug that prevents
RocksDB metrics to be updated (#8256)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push:
new 2808a05 KAFKA-9675: Fix bug that prevents RocksDB metrics to be updated (#8256)
2808a05 is described below
commit 2808a05b01ef1ca23870b5d7d4897dc02129b6bb
Author: Bruno Cadonna <br...@confluent.io>
AuthorDate: Thu Mar 12 19:51:33 2020 +0100
KAFKA-9675: Fix bug that prevents RocksDB metrics to be updated (#8256)
Reviewers: John Roesler <vv...@apache.org>
---
.../streams/state/internals/RocksDBStore.java | 7 +-
.../integration/RocksDBMetricsIntegrationTest.java | 130 ++++++++++++++++-----
2 files changed, 108 insertions(+), 29 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 2b9b3f6..bf6a033 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -188,11 +188,12 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt
throw new ProcessorStateException(fatal);
}
+ // Setup metrics before the database is opened, otherwise the metrics are not updated
+ // with the measurements from Rocks DB
+ maybeSetUpMetricsRecorder(context, configs);
+
openRocksDB(dbOptions, columnFamilyOptions);
open = true;
-
- // Do this last because the prior operations could throw exceptions.
- maybeSetUpMetricsRecorder(context, configs);
}
private void maybeSetUpMetricsRecorder(final ProcessorContext context, final Map<String, Object> configs) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
index b0ac1fa..34cc428 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
@@ -42,7 +42,6 @@ import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
@@ -61,6 +60,10 @@ import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+
@Category({IntegrationTest.class})
@RunWith(Parameterized.class)
public class RocksDBMetricsIntegrationTest {
@@ -74,8 +77,10 @@ public class RocksDBMetricsIntegrationTest {
private static final String STREAM_OUTPUT = "STREAM_OUTPUT";
private static final String MY_STORE_PERSISTENT_KEY_VALUE = "myStorePersistentKeyValue";
private static final Duration WINDOW_SIZE = Duration.ofMillis(50);
+ private static final long TIMEOUT = 60000;
// RocksDB metrics
+ private static final String METRICS_GROUP = "stream-state-metrics";
private static final String BYTES_WRITTEN_RATE = "bytes-written-rate";
private static final String BYTES_WRITTEN_TOTAL = "bytes-written-total";
private static final String BYTES_READ_RATE = "bytes-read-rate";
@@ -114,26 +119,36 @@ public class RocksDBMetricsIntegrationTest {
CLUSTER.deleteTopicsAndWait(STREAM_INPUT, STREAM_OUTPUT);
}
+ @FunctionalInterface
+ private interface MetricsVerifier {
+ void verify(final KafkaStreams kafkaStreams, final String metricScope) throws Exception;
+ }
+
@Test
public void shouldExposeRocksDBMetricsForNonSegmentedStateStoreBeforeAndAfterFailureWithEmptyStateDir() throws Exception {
final Properties streamsConfiguration = streamsConfig();
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
final StreamsBuilder builder = builderForNonSegmentedStateStore();
+ final String metricsScope = "rocksdb-state-id";
- cleanUpStateRunAndVerify(
+ cleanUpStateRunVerifyAndClose(
builder,
streamsConfiguration,
IntegerDeserializer.class,
StringDeserializer.class,
- "rocksdb-state-id"
+ this::verifyThatRocksDBMetricsAreExposed,
+ metricsScope
);
- cleanUpStateRunAndVerify(
+ // simulated failure
+
+ cleanUpStateRunVerifyAndClose(
builder,
streamsConfiguration,
IntegerDeserializer.class,
StringDeserializer.class,
- "rocksdb-state-id"
+ this::verifyThatRocksDBMetricsAreExposed,
+ metricsScope
);
}
@@ -142,21 +157,60 @@ public class RocksDBMetricsIntegrationTest {
final Properties streamsConfiguration = streamsConfig();
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
final StreamsBuilder builder = builderForSegmentedStateStore();
+ final String metricsScope = "rocksdb-window-state-id";
- cleanUpStateRunAndVerify(
+ cleanUpStateRunVerifyAndClose(
builder,
streamsConfiguration,
LongDeserializer.class,
LongDeserializer.class,
- "rocksdb-window-state-id"
+ this::verifyThatRocksDBMetricsAreExposed,
+ metricsScope
+ );
+
+ // simulated failure
+
+ cleanUpStateRunVerifyAndClose(
+ builder,
+ streamsConfiguration,
+ LongDeserializer.class,
+ LongDeserializer.class,
+ this::verifyThatRocksDBMetricsAreExposed,
+ metricsScope
+ );
+ }
+
+ @Test
+ public void shouldVerifyThatMetricsGetMeasurementsFromRocksDBForNonSegmentedStateStore() throws Exception {
+ final Properties streamsConfiguration = streamsConfig();
+ IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+ final StreamsBuilder builder = builderForNonSegmentedStateStore();
+ final String metricsScope = "rocksdb-state-id";
+
+ cleanUpStateRunVerifyAndClose(
+ builder,
+ streamsConfiguration,
+ IntegerDeserializer.class,
+ StringDeserializer.class,
+ this::verifyThatBytesWrittenTotalIncreases,
+ metricsScope
);
+ }
+
+ @Test
+ public void shouldVerifyThatMetricsGetMeasurementsFromRocksDBForSegmentedStateStore() throws Exception {
+ final Properties streamsConfiguration = streamsConfig();
+ IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+ final StreamsBuilder builder = builderForSegmentedStateStore();
+ final String metricsScope = "rocksdb-window-state-id";
- cleanUpStateRunAndVerify(
+ cleanUpStateRunVerifyAndClose(
builder,
streamsConfiguration,
LongDeserializer.class,
LongDeserializer.class,
- "rocksdb-window-state-id"
+ this::verifyThatBytesWrittenTotalIncreases,
+ metricsScope
);
}
@@ -167,7 +221,6 @@ public class RocksDBMetricsIntegrationTest {
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.DEBUG.name);
- streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L);
streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingGuarantee);
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
return streamsConfiguration;
@@ -198,16 +251,17 @@ public class RocksDBMetricsIntegrationTest {
return builder;
}
- private void cleanUpStateRunAndVerify(final StreamsBuilder builder,
- final Properties streamsConfiguration,
- final Class outputKeyDeserializer,
- final Class outputValueDeserializer,
- final String metricsScope) throws Exception {
+ private void cleanUpStateRunVerifyAndClose(final StreamsBuilder builder,
+ final Properties streamsConfiguration,
+ final Class outputKeyDeserializer,
+ final Class outputValueDeserializer,
+ final MetricsVerifier metricsVerifier,
+ final String metricsScope) throws Exception {
final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
kafkaStreams.cleanUp();
produceRecords();
- StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams, 60000);
+ StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams, TIMEOUT);
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
TestUtils.consumerConfig(
@@ -220,7 +274,7 @@ public class RocksDBMetricsIntegrationTest {
STREAM_OUTPUT,
1
);
- verifyRocksDBMetrics(kafkaStreams, metricsScope);
+ metricsVerifier.verify(kafkaStreams, metricsScope);
kafkaStreams.close();
}
@@ -261,10 +315,9 @@ public class RocksDBMetricsIntegrationTest {
);
}
- private void verifyRocksDBMetrics(final KafkaStreams kafkaStreams, final String metricsScope) {
- final List<Metric> listMetricStore = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
- .filter(m -> m.metricName().group().equals("stream-state-metrics") && m.metricName().tags().containsKey(metricsScope))
- .collect(Collectors.toList());
+ private void verifyThatRocksDBMetricsAreExposed(final KafkaStreams kafkaStreams,
+ final String metricsScope) {
+ final List<Metric> listMetricStore = getRocksDBMetrics(kafkaStreams, metricsScope);
checkMetricByName(listMetricStore, BYTES_WRITTEN_RATE, 1);
checkMetricByName(listMetricStore, BYTES_WRITTEN_TOTAL, 1);
checkMetricByName(listMetricStore, BYTES_READ_RATE, 1);
@@ -283,13 +336,38 @@ public class RocksDBMetricsIntegrationTest {
checkMetricByName(listMetricStore, NUMBER_OF_FILE_ERRORS, 1);
}
- private void checkMetricByName(final List<Metric> listMetric, final String metricName, final int numMetric) {
+ private void checkMetricByName(final List<Metric> listMetric,
+ final String metricName,
+ final int numMetric) {
final List<Metric> metrics = listMetric.stream()
.filter(m -> m.metricName().name().equals(metricName))
.collect(Collectors.toList());
- Assert.assertEquals("Size of metrics of type:'" + metricName + "' must be equal to " + numMetric + " but it's equal to " + metrics.size(), numMetric, metrics.size());
- for (final Metric m : metrics) {
- Assert.assertNotNull("Metric:'" + m.metricName() + "' must be not null", m.metricValue());
+ assertThat(
+ "Size of metrics of type:'" + metricName + "' must be equal to " + numMetric + " but it's equal to " + metrics.size(),
+ metrics.size(),
+ is(numMetric)
+ );
+ for (final Metric metric : metrics) {
+ assertThat("Metric:'" + metric.metricName() + "' must be not null", metric.metricValue(), is(notNullValue()));
}
}
-}
+
+ private void verifyThatBytesWrittenTotalIncreases(final KafkaStreams kafkaStreams,
+ final String metricsScope) throws InterruptedException {
+ final List<Metric> metric = getRocksDBMetrics(kafkaStreams, metricsScope).stream()
+ .filter(m -> BYTES_WRITTEN_TOTAL.equals(m.metricName().name()))
+ .collect(Collectors.toList());
+ TestUtils.waitForCondition(
+ () -> (double) metric.get(0).metricValue() > 0,
+ TIMEOUT,
+ () -> "RocksDB metric bytes.written.total did not increase in " + TIMEOUT + " ms"
+ );
+ }
+
+ private List<Metric> getRocksDBMetrics(final KafkaStreams kafkaStreams,
+ final String metricsScope) {
+ return new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
+ .filter(m -> m.metricName().group().equals(METRICS_GROUP) && m.metricName().tags().containsKey(metricsScope))
+ .collect(Collectors.toList());
+ }
+}
\ No newline at end of file