You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/02/12 01:32:50 UTC
[kafka] branch 2.5 updated: KAFKA-9355: Fix bug that removed
RocksDB metrics after failure in EOS (#7996)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push:
new 0d82d25 KAFKA-9355: Fix bug that removed RocksDB metrics after failure in EOS (#7996)
0d82d25 is described below
commit 0d82d25c321aee73c53d8923529cea2f3012f78a
Author: Bruno Cadonna <br...@confluent.io>
AuthorDate: Wed Feb 12 02:31:13 2020 +0100
KAFKA-9355: Fix bug that removed RocksDB metrics after failure in EOS (#7996)
* Added init() method to RocksDBMetricsRecorder
* Added call to init() of RocksDBMetricsRecorder to init() of RocksDB store
* Added call to init() of RocksDBMetricsRecorder to openExisting() of segmented state stores
* Adapted unit tests
* Added integration test that reproduces the situation in which the bug occurred
Reviewers: Guozhang Wang <wa...@gmail.com>
---
.../streams/state/internals/KeyValueSegments.java | 8 +-
.../streams/state/internals/RocksDBStore.java | 3 +-
.../state/internals/TimestampedSegments.java | 6 +
.../internals/metrics/RocksDBMetricsRecorder.java | 31 ++-
.../integration/MetricsIntegrationTest.java | 58 ----
.../integration/RocksDBMetricsIntegrationTest.java | 295 +++++++++++++++++++++
.../state/internals/KeyValueSegmentsTest.java | 2 +
.../streams/state/internals/RocksDBStoreTest.java | 8 +-
.../state/internals/SegmentIteratorTest.java | 4 +-
.../state/internals/TimestampedSegmentsTest.java | 2 +
.../metrics/RocksDBMetricsRecorderTest.java | 145 ++++++----
11 files changed, 426 insertions(+), 136 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java
index fc49c12..9dbbae4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java
@@ -51,4 +51,10 @@ class KeyValueSegments extends AbstractSegments<KeyValueSegment> {
return newSegment;
}
}
-}
+
+ @Override
+ public void openExisting(final InternalProcessorContext context, final long streamTime) {
+ metricsRecorder.init(context.metrics(), context.taskId());
+ super.openExisting(context, streamTime);
+ }
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 96ffe3b..2b9b3f6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -203,7 +203,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt
// metrics recorder will clean up statistics object
final Statistics statistics = new Statistics();
userSpecifiedOptions.setStatistics(statistics);
- metricsRecorder.addStatistics(name, statistics, (StreamsMetricsImpl) context.metrics(), context.taskId());
+ metricsRecorder.addStatistics(name, statistics);
}
}
@@ -225,6 +225,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt
final StateStore root) {
// open the DB dir
internalProcessorContext = context;
+ metricsRecorder.init((StreamsMetricsImpl) context.metrics(), context.taskId());
openDB(context);
batchingStateRestoreCallback = new RocksDBBatchingRestoreCallback(this);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegments.java
index 400511f..e7c2edb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegments.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegments.java
@@ -51,4 +51,10 @@ class TimestampedSegments extends AbstractSegments<TimestampedSegment> {
return newSegment;
}
}
+
+ @Override
+ public void openExisting(final InternalProcessorContext context, final long streamTime) {
+ metricsRecorder.init(context.metrics(), context.taskId());
+ super.openExisting(context, streamTime);
+ }
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java
index 59ade54..b5d603c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java
@@ -51,7 +51,6 @@ public class RocksDBMetricsRecorder {
private final String threadId;
private TaskId taskId;
private StreamsMetricsImpl streamsMetrics;
- private boolean isInitialized = false;
public RocksDBMetricsRecorder(final String metricsScope,
final String threadId,
@@ -71,20 +70,26 @@ public class RocksDBMetricsRecorder {
return taskId;
}
- public void addStatistics(final String segmentName,
- final Statistics statistics,
- final StreamsMetricsImpl streamsMetrics,
- final TaskId taskId) {
- if (!isInitialized) {
- initSensors(streamsMetrics, taskId);
- this.taskId = taskId;
- this.streamsMetrics = streamsMetrics;
- isInitialized = true;
+ /**
+ * The initialisation of the metrics recorder is idempotent.
+ */
+ public void init(final StreamsMetricsImpl streamsMetrics,
+ final TaskId taskId) {
+ if (this.taskId != null && !this.taskId.equals(taskId)) {
+ throw new IllegalStateException("Metrics recorder is re-initialised with different task: previous task is " +
+ this.taskId + " whereas current task is " + taskId + ". This is a bug in Kafka Streams.");
}
- if (this.taskId != taskId) {
- throw new IllegalStateException("Statistics of store \"" + segmentName + "\" for task " + taskId
- + " cannot be added to metrics recorder for task " + this.taskId + ". This is a bug in Kafka Streams.");
+ if (this.streamsMetrics != null && this.streamsMetrics != streamsMetrics) {
+ throw new IllegalStateException("Metrics recorder is re-initialised with different Streams metrics. "
+ + "This is a bug in Kafka Streams.");
}
+ initSensors(streamsMetrics, taskId);
+ this.taskId = taskId;
+ this.streamsMetrics = streamsMetrics;
+ }
+
+ public void addStatistics(final String segmentName,
+ final Statistics statistics) {
if (statisticsToRecord.isEmpty()) {
logger.debug(
"Adding metrics recorder of task {} to metrics recording trigger",
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
index 05412c6..63ce9c9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.integration;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serdes;
@@ -393,10 +392,6 @@ public class MetricsIntegrationTest {
IN_MEMORY_LRUCACHE_TAG_KEY,
builtInMetricsVersion
);
- checkRocksDBMetricsByTag(
- "rocksdb-state-id",
- RecordingLevel.valueOf(streamsConfiguration.getProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG))
- );
checkCacheMetrics(builtInMetricsVersion);
closeApplication();
@@ -439,10 +434,6 @@ public class MetricsIntegrationTest {
waitUntilAllRecordsAreConsumed(1);
checkWindowStoreAndSuppressionBufferMetrics(builtInMetricsVersion);
- checkRocksDBMetricsByTag(
- "rocksdb-window-state-id",
- RecordingLevel.valueOf(streamsConfiguration.getProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG))
- );
closeApplication();
@@ -485,38 +476,12 @@ public class MetricsIntegrationTest {
waitUntilAllRecordsAreConsumed(2);
checkSessionStoreMetrics(builtInMetricsVersion);
- checkRocksDBMetricsByTag(
- "rocksdb-session-state-id",
- RecordingLevel.valueOf(streamsConfiguration.getProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG))
- );
closeApplication();
checkMetricsDeregistration();
}
- @Test
- public void shouldNotAddRocksDBMetricsIfRecordingLevelIsInfo() throws Exception {
- builder.table(
- STREAM_INPUT,
- Materialized.as(Stores.persistentKeyValueStore(MY_STORE_PERSISTENT_KEY_VALUE)).withCachingEnabled()
- ).toStream().to(STREAM_OUTPUT_1);
- streamsConfiguration.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.INFO.name);
- kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
- kafkaStreams.start();
- TestUtils.waitForCondition(
- () -> kafkaStreams.state() == State.RUNNING,
- timeout,
- () -> "Kafka Streams application did not reach state RUNNING in " + timeout + " ms");
-
- checkRocksDBMetricsByTag(
- ROCKSDB_KVSTORE_TAG_KEY,
- RecordingLevel.valueOf(streamsConfiguration.getProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG))
- );
-
- closeApplication();
- }
-
private void verifyStateMetric(final State state) {
final List<Metric> metricsList = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
.filter(m -> m.metricName().name().equals(STATE) &&
@@ -645,29 +610,6 @@ public class MetricsIntegrationTest {
checkMetricByName(listMetricProcessor, FORWARD_RATE, numberOfModifiedForwardMetrics);
}
- private void checkRocksDBMetricsByTag(final String tag, final RecordingLevel recordingLevel) {
- final List<Metric> listMetricStore = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
- .filter(m -> m.metricName().group().equals(STATE_STORE_LEVEL_GROUP) && m.metricName().tags().containsKey(tag))
- .collect(Collectors.toList());
- final int expectedNumberOfMetrics = recordingLevel == RecordingLevel.DEBUG ? 1 : 0;
- checkMetricByName(listMetricStore, BYTES_WRITTEN_RATE, expectedNumberOfMetrics);
- checkMetricByName(listMetricStore, BYTES_WRITTEN_TOTAL, expectedNumberOfMetrics);
- checkMetricByName(listMetricStore, BYTES_READ_RATE, expectedNumberOfMetrics);
- checkMetricByName(listMetricStore, BYTES_READ_TOTAL, expectedNumberOfMetrics);
- checkMetricByName(listMetricStore, MEMTABLE_BYTES_FLUSHED_RATE, expectedNumberOfMetrics);
- checkMetricByName(listMetricStore, MEMTABLE_BYTES_FLUSHED_TOTAL, expectedNumberOfMetrics);
- checkMetricByName(listMetricStore, MEMTABLE_HIT_RATIO, expectedNumberOfMetrics);
- checkMetricByName(listMetricStore, WRITE_STALL_DURATION_AVG, expectedNumberOfMetrics);
- checkMetricByName(listMetricStore, WRITE_STALL_DURATION_TOTAL, expectedNumberOfMetrics);
- checkMetricByName(listMetricStore, BLOCK_CACHE_DATA_HIT_RATIO, expectedNumberOfMetrics);
- checkMetricByName(listMetricStore, BLOCK_CACHE_INDEX_HIT_RATIO, expectedNumberOfMetrics);
- checkMetricByName(listMetricStore, BLOCK_CACHE_FILTER_HIT_RATIO, expectedNumberOfMetrics);
- checkMetricByName(listMetricStore, BYTES_READ_DURING_COMPACTION_RATE, expectedNumberOfMetrics);
- checkMetricByName(listMetricStore, BYTES_WRITTEN_DURING_COMPACTION_RATE, expectedNumberOfMetrics);
- checkMetricByName(listMetricStore, NUMBER_OF_OPEN_FILES, expectedNumberOfMetrics);
- checkMetricByName(listMetricStore, NUMBER_OF_FILE_ERRORS, expectedNumberOfMetrics);
- }
-
private void checkKeyValueStoreMetrics(final String group0100To24,
final String tagKey,
final String builtInMetricsVersion) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
new file mode 100644
index 0000000..b0ac1fa
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+@Category({IntegrationTest.class})
+@RunWith(Parameterized.class)
+public class RocksDBMetricsIntegrationTest {
+
+ private static final int NUM_BROKERS = 3;
+
+ @ClassRule
+ public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+
+ private static final String STREAM_INPUT = "STREAM_INPUT";
+ private static final String STREAM_OUTPUT = "STREAM_OUTPUT";
+ private static final String MY_STORE_PERSISTENT_KEY_VALUE = "myStorePersistentKeyValue";
+ private static final Duration WINDOW_SIZE = Duration.ofMillis(50);
+
+ // RocksDB metrics
+ private static final String BYTES_WRITTEN_RATE = "bytes-written-rate";
+ private static final String BYTES_WRITTEN_TOTAL = "bytes-written-total";
+ private static final String BYTES_READ_RATE = "bytes-read-rate";
+ private static final String BYTES_READ_TOTAL = "bytes-read-total";
+ private static final String MEMTABLE_BYTES_FLUSHED_RATE = "memtable-bytes-flushed-rate";
+ private static final String MEMTABLE_BYTES_FLUSHED_TOTAL = "memtable-bytes-flushed-total";
+ private static final String MEMTABLE_HIT_RATIO = "memtable-hit-ratio";
+ private static final String WRITE_STALL_DURATION_AVG = "write-stall-duration-avg";
+ private static final String WRITE_STALL_DURATION_TOTAL = "write-stall-duration-total";
+ private static final String BLOCK_CACHE_DATA_HIT_RATIO = "block-cache-data-hit-ratio";
+ private static final String BLOCK_CACHE_INDEX_HIT_RATIO = "block-cache-index-hit-ratio";
+ private static final String BLOCK_CACHE_FILTER_HIT_RATIO = "block-cache-filter-hit-ratio";
+ private static final String BYTES_READ_DURING_COMPACTION_RATE = "bytes-read-compaction-rate";
+ private static final String BYTES_WRITTEN_DURING_COMPACTION_RATE = "bytes-written-compaction-rate";
+ private static final String NUMBER_OF_OPEN_FILES = "number-open-files";
+ private static final String NUMBER_OF_FILE_ERRORS = "number-file-errors-total";
+
+ @Parameters(name = "{0}")
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][] {
+ {StreamsConfig.EXACTLY_ONCE},
+ {StreamsConfig.AT_LEAST_ONCE}
+ });
+ }
+
+ @Parameter
+ public String processingGuarantee;
+
+ @Before
+ public void before() throws Exception {
+ CLUSTER.createTopic(STREAM_INPUT, 1, 3);
+ }
+
+ @After
+ public void after() throws Exception {
+ CLUSTER.deleteTopicsAndWait(STREAM_INPUT, STREAM_OUTPUT);
+ }
+
+ @Test
+ public void shouldExposeRocksDBMetricsForNonSegmentedStateStoreBeforeAndAfterFailureWithEmptyStateDir() throws Exception {
+ final Properties streamsConfiguration = streamsConfig();
+ IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+ final StreamsBuilder builder = builderForNonSegmentedStateStore();
+
+ cleanUpStateRunAndVerify(
+ builder,
+ streamsConfiguration,
+ IntegerDeserializer.class,
+ StringDeserializer.class,
+ "rocksdb-state-id"
+ );
+
+ cleanUpStateRunAndVerify(
+ builder,
+ streamsConfiguration,
+ IntegerDeserializer.class,
+ StringDeserializer.class,
+ "rocksdb-state-id"
+ );
+ }
+
+ @Test
+ public void shouldExposeRocksDBMetricsForSegmentedStateStoreBeforeAndAfterFailureWithEmptyStateDir() throws Exception {
+ final Properties streamsConfiguration = streamsConfig();
+ IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+ final StreamsBuilder builder = builderForSegmentedStateStore();
+
+ cleanUpStateRunAndVerify(
+ builder,
+ streamsConfiguration,
+ LongDeserializer.class,
+ LongDeserializer.class,
+ "rocksdb-window-state-id"
+ );
+
+ cleanUpStateRunAndVerify(
+ builder,
+ streamsConfiguration,
+ LongDeserializer.class,
+ LongDeserializer.class,
+ "rocksdb-window-state-id"
+ );
+ }
+
+ private Properties streamsConfig() {
+ final Properties streamsConfiguration = new Properties();
+ streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application");
+ streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
+ streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+ streamsConfiguration.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.DEBUG.name);
+ streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L);
+ streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingGuarantee);
+ streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
+ return streamsConfiguration;
+ }
+
+ private StreamsBuilder builderForNonSegmentedStateStore() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ builder.table(
+ STREAM_INPUT,
+ Materialized.as(Stores.persistentKeyValueStore(MY_STORE_PERSISTENT_KEY_VALUE)).withCachingEnabled()
+ ).toStream().to(STREAM_OUTPUT);
+ return builder;
+ }
+
+ private StreamsBuilder builderForSegmentedStateStore() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), Serdes.String()))
+ .groupByKey()
+ .windowedBy(TimeWindows.of(WINDOW_SIZE).grace(Duration.ZERO))
+ .aggregate(() -> 0L,
+ (aggKey, newValue, aggValue) -> aggValue,
+ Materialized.<Integer, Long, WindowStore<Bytes, byte[]>>as("time-windowed-aggregated-stream-store")
+ .withValueSerde(Serdes.Long())
+ .withRetention(WINDOW_SIZE))
+ .toStream()
+ .map((key, value) -> KeyValue.pair(value, value))
+ .to(STREAM_OUTPUT, Produced.with(Serdes.Long(), Serdes.Long()));
+ return builder;
+ }
+
+ private void cleanUpStateRunAndVerify(final StreamsBuilder builder,
+ final Properties streamsConfiguration,
+ final Class outputKeyDeserializer,
+ final Class outputValueDeserializer,
+ final String metricsScope) throws Exception {
+ final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
+ kafkaStreams.cleanUp();
+ produceRecords();
+
+ StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams, 60000);
+
+ IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+ TestUtils.consumerConfig(
+ CLUSTER.bootstrapServers(),
+ "consumerApp",
+ outputKeyDeserializer,
+ outputValueDeserializer,
+ new Properties()
+ ),
+ STREAM_OUTPUT,
+ 1
+ );
+ verifyRocksDBMetrics(kafkaStreams, metricsScope);
+ kafkaStreams.close();
+ }
+
+ private void produceRecords() throws Exception {
+ final MockTime mockTime = new MockTime(WINDOW_SIZE.toMillis());
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ STREAM_INPUT,
+ Collections.singletonList(new KeyValue<>(1, "A")),
+ TestUtils.producerConfig(
+ CLUSTER.bootstrapServers(),
+ IntegerSerializer.class,
+ StringSerializer.class,
+ new Properties()
+ ),
+ mockTime.milliseconds()
+ );
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ STREAM_INPUT,
+ Collections.singletonList(new KeyValue<>(1, "B")),
+ TestUtils.producerConfig(
+ CLUSTER.bootstrapServers(),
+ IntegerSerializer.class,
+ StringSerializer.class,
+ new Properties()
+ ),
+ mockTime.milliseconds()
+ );
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ STREAM_INPUT,
+ Collections.singletonList(new KeyValue<>(1, "C")),
+ TestUtils.producerConfig(
+ CLUSTER.bootstrapServers(),
+ IntegerSerializer.class,
+ StringSerializer.class,
+ new Properties()
+ ),
+ mockTime.milliseconds()
+ );
+ }
+
+ private void verifyRocksDBMetrics(final KafkaStreams kafkaStreams, final String metricsScope) {
+ final List<Metric> listMetricStore = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
+ .filter(m -> m.metricName().group().equals("stream-state-metrics") && m.metricName().tags().containsKey(metricsScope))
+ .collect(Collectors.toList());
+ checkMetricByName(listMetricStore, BYTES_WRITTEN_RATE, 1);
+ checkMetricByName(listMetricStore, BYTES_WRITTEN_TOTAL, 1);
+ checkMetricByName(listMetricStore, BYTES_READ_RATE, 1);
+ checkMetricByName(listMetricStore, BYTES_READ_TOTAL, 1);
+ checkMetricByName(listMetricStore, MEMTABLE_BYTES_FLUSHED_RATE, 1);
+ checkMetricByName(listMetricStore, MEMTABLE_BYTES_FLUSHED_TOTAL, 1);
+ checkMetricByName(listMetricStore, MEMTABLE_HIT_RATIO, 1);
+ checkMetricByName(listMetricStore, WRITE_STALL_DURATION_AVG, 1);
+ checkMetricByName(listMetricStore, WRITE_STALL_DURATION_TOTAL, 1);
+ checkMetricByName(listMetricStore, BLOCK_CACHE_DATA_HIT_RATIO, 1);
+ checkMetricByName(listMetricStore, BLOCK_CACHE_INDEX_HIT_RATIO, 1);
+ checkMetricByName(listMetricStore, BLOCK_CACHE_FILTER_HIT_RATIO, 1);
+ checkMetricByName(listMetricStore, BYTES_READ_DURING_COMPACTION_RATE, 1);
+ checkMetricByName(listMetricStore, BYTES_WRITTEN_DURING_COMPACTION_RATE, 1);
+ checkMetricByName(listMetricStore, NUMBER_OF_OPEN_FILES, 1);
+ checkMetricByName(listMetricStore, NUMBER_OF_FILE_ERRORS, 1);
+ }
+
+ private void checkMetricByName(final List<Metric> listMetric, final String metricName, final int numMetric) {
+ final List<Metric> metrics = listMetric.stream()
+ .filter(m -> m.metricName().name().equals(metricName))
+ .collect(Collectors.toList());
+ Assert.assertEquals("Size of metrics of type:'" + metricName + "' must be equal to " + numMetric + " but it's equal to " + metrics.size(), numMetric, metrics.size());
+ for (final Metric m : metrics) {
+ Assert.assertNotNull("Metric:'" + m.metricName() + "' must be not null", m.metricValue());
+ }
+ }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java
index 869dd5c..ff67b78 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java
@@ -63,6 +63,7 @@ public class KeyValueSegmentsTest {
new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics()))
);
segments = new KeyValueSegments(storeName, METRICS_SCOPE, RETENTION_PERIOD, SEGMENT_INTERVAL);
+ segments.openExisting(context, -1L);
}
@After
@@ -154,6 +155,7 @@ public class KeyValueSegmentsTest {
@Test
public void shouldOpenExistingSegments() {
segments = new KeyValueSegments("test", METRICS_SCOPE, 4, 1);
+ segments.openExisting(context, -1L);
segments.getOrCreateSegmentIfLive(0, context, -1L);
segments.getOrCreateSegmentIfLive(1, context, -1L);
segments.getOrCreateSegmentIfLive(2, context, -1L);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index 83e43fc..14f1c9e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -95,12 +95,12 @@ public class RocksDBStoreTest {
public void setUp() {
final Properties props = StreamsTestUtils.getStreamsConfig();
props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, MockRocksDbConfigSetter.class);
- rocksDBStore = getRocksDBStore();
dir = TestUtils.tempDirectory();
context = new InternalMockProcessorContext(dir,
Serdes.String(),
Serdes.String(),
new StreamsConfig(props));
+ rocksDBStore = getRocksDBStore();
context.metrics().setRocksDBMetricsRecordingTrigger(new RocksDBMetricsRecordingTrigger());
}
@@ -147,9 +147,7 @@ public class RocksDBStoreTest {
reset(metricsRecorder);
metricsRecorder.addStatistics(
eq(DB_NAME),
- anyObject(Statistics.class),
- eq(mockContext.metrics()),
- eq(mockContext.taskId())
+ anyObject(Statistics.class)
);
replay(metricsRecorder);
@@ -278,7 +276,7 @@ public class RocksDBStoreTest {
public void shouldCallRocksDbConfigSetter() {
MockRocksDbConfigSetter.called = false;
- rocksDBStore.openDB(context);
+ rocksDBStore.init(context, rocksDBStore);
assertTrue(MockRocksDbConfigSetter.called);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
index 2111359..f02e75f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
@@ -62,8 +62,8 @@ public class SegmentIteratorTest {
new LogContext("testCache "),
0,
new MockStreamsMetrics(new Metrics())));
- segmentOne.openDB(context);
- segmentTwo.openDB(context);
+ segmentOne.init(context, segmentOne);
+ segmentTwo.init(context, segmentTwo);
segmentOne.put(Bytes.wrap("a".getBytes()), "1".getBytes());
segmentOne.put(Bytes.wrap("b".getBytes()), "2".getBytes());
segmentTwo.put(Bytes.wrap("c".getBytes()), "3".getBytes());
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java
index f4718d8..2ae6717 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java
@@ -63,6 +63,7 @@ public class TimestampedSegmentsTest {
new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics()))
);
segments = new TimestampedSegments(storeName, METRICS_SCOPE, RETENTION_PERIOD, SEGMENT_INTERVAL);
+ segments.openExisting(context, -1L);
}
@After
@@ -155,6 +156,7 @@ public class TimestampedSegmentsTest {
@Test
public void shouldOpenExistingSegments() {
segments = new TimestampedSegments("test", METRICS_SCOPE, 4, 1);
+ segments.openExisting(context, -1L);
segments.getOrCreateSegmentIfLive(0, context, -1L);
segments.getOrCreateSegmentIfLive(1, context, -1L);
segments.getOrCreateSegmentIfLive(2, context, -1L);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java
index f22c02e..fc60c27 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java
@@ -16,7 +16,9 @@
*/
package org.apache.kafka.streams.state.internals.metrics;
+import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.RocksDBMetricContext;
@@ -35,11 +37,12 @@ import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.mock;
import static org.easymock.EasyMock.niceMock;
import static org.easymock.EasyMock.resetToNice;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertThrows;
import static org.powermock.api.easymock.PowerMock.reset;
import static org.powermock.api.easymock.PowerMock.createMock;
import static org.powermock.api.easymock.PowerMock.mockStatic;
-import static org.powermock.api.easymock.PowerMock.mockStaticNice;
import static org.powermock.api.easymock.PowerMock.replay;
import static org.powermock.api.easymock.PowerMock.verify;
@@ -71,106 +74,110 @@ public class RocksDBMetricsRecorderTest {
private final StreamsMetricsImpl streamsMetrics = niceMock(StreamsMetricsImpl.class);
private final RocksDBMetricsRecordingTrigger recordingTrigger = mock(RocksDBMetricsRecordingTrigger.class);
private final TaskId taskId1 = new TaskId(0, 0);
- private final TaskId taskId2 = new TaskId(0, 2);
+ private final TaskId taskId2 = new TaskId(0, 1);
private final RocksDBMetricsRecorder recorder = new RocksDBMetricsRecorder(METRICS_SCOPE, THREAD_ID, STORE_NAME);
@Before
public void setUp() {
+ setUpMetricsStubMock();
expect(streamsMetrics.rocksDBMetricsRecordingTrigger()).andStubReturn(recordingTrigger);
replay(streamsMetrics);
+ recorder.init(streamsMetrics, taskId1);
}
@Test
- public void shouldSetStatsLevelToExceptDetailedTimersWhenStatisticsIsAdded() {
- mockStaticNice(RocksDBMetrics.class);
- replay(RocksDBMetrics.class);
- statisticsToAdd1.setStatsLevel(StatsLevel.EXCEPT_DETAILED_TIMERS);
- replay(statisticsToAdd1);
+ public void shouldInitMetricsRecorder() {
+ setUpMetricsMock();
- recorder.addStatistics(SEGMENT_STORE_NAME_1, statisticsToAdd1, streamsMetrics, taskId1);
+ recorder.init(streamsMetrics, taskId1);
- verify(statisticsToAdd1);
+ verify(RocksDBMetrics.class);
+ assertThat(recorder.taskId(), is(taskId1));
}
@Test
- public void shouldThrowIfTaskIdOfStatisticsToAddDiffersFromInitialisedOne() {
- mockStaticNice(RocksDBMetrics.class);
- replay(RocksDBMetrics.class);
- recorder.addStatistics(SEGMENT_STORE_NAME_1, statisticsToAdd1, streamsMetrics, taskId1);
+ public void shouldThrowIfMetricRecorderIsReInitialisedWithDifferentTask() {
+ setUpMetricsStubMock();
+ recorder.init(streamsMetrics, taskId1);
+
assertThrows(
IllegalStateException.class,
- () -> recorder.addStatistics(SEGMENT_STORE_NAME_2, statisticsToAdd2, streamsMetrics, taskId2)
+ () -> recorder.init(streamsMetrics, taskId2)
);
}
@Test
- public void shouldThrowIfStatisticsToAddHasBeenAlreadyAdded() {
- mockStaticNice(RocksDBMetrics.class);
- replay(RocksDBMetrics.class);
- recorder.addStatistics(SEGMENT_STORE_NAME_1, statisticsToAdd1, streamsMetrics, taskId1);
+ public void shouldThrowIfMetricRecorderIsReInitialisedWithDifferentStreamsMetrics() {
+ setUpMetricsStubMock();
+ recorder.init(streamsMetrics, taskId1);
assertThrows(
IllegalStateException.class,
- () -> recorder.addStatistics(SEGMENT_STORE_NAME_1, statisticsToAdd1, streamsMetrics, taskId1)
+ () -> recorder.init(
+ new StreamsMetricsImpl(new Metrics(), "test-client", StreamsConfig.METRICS_LATEST),
+ taskId1
+ )
);
}
@Test
- public void shouldInitMetricsAndAddItselfToRecordingTriggerOnlyWhenFirstStatisticsIsAdded() {
- setUpMetricsMock();
- recordingTrigger.addMetricsRecorder(recorder);
- replay(recordingTrigger);
+ public void shouldSetStatsLevelToExceptDetailedTimersWhenStatisticsIsAdded() {
+ statisticsToAdd1.setStatsLevel(StatsLevel.EXCEPT_DETAILED_TIMERS);
+ replay(statisticsToAdd1);
- recorder.addStatistics(SEGMENT_STORE_NAME_1, statisticsToAdd1, streamsMetrics, taskId1);
+ recorder.addStatistics(SEGMENT_STORE_NAME_1, statisticsToAdd1);
- verify(recordingTrigger);
- verify(RocksDBMetrics.class);
+ verify(statisticsToAdd1);
+ }
- mockStatic(RocksDBMetrics.class);
- replay(RocksDBMetrics.class);
- reset(recordingTrigger);
+ @Test
+ public void shouldThrowIfStatisticsToAddHasBeenAlreadyAdded() {
+ recorder.addStatistics(SEGMENT_STORE_NAME_1, statisticsToAdd1);
+
+ assertThrows(
+ IllegalStateException.class,
+ () -> recorder.addStatistics(SEGMENT_STORE_NAME_1, statisticsToAdd1)
+ );
+ }
+
+ @Test
+ public void shouldAddItselfToRecordingTriggerWhenFirstStatisticsIsAddedToNewlyCreatedRecorder() {
+ recordingTrigger.addMetricsRecorder(recorder);
replay(recordingTrigger);
- recorder.addStatistics(SEGMENT_STORE_NAME_2, statisticsToAdd2, streamsMetrics, taskId1);
+ recorder.addStatistics(SEGMENT_STORE_NAME_1, statisticsToAdd1);
verify(recordingTrigger);
- verify(RocksDBMetrics.class);
}
@Test
- public void shouldAddItselfToRecordingTriggerWhenEmptyButInitialised() {
- mockStaticNice(RocksDBMetrics.class);
- replay(RocksDBMetrics.class);
- recorder.addStatistics(SEGMENT_STORE_NAME_1, statisticsToAdd1, streamsMetrics, taskId1);
+ public void shouldAddItselfToRecordingTriggerWhenFirstStatisticsIsAddedAfterLastStatisticsWasRemoved() {
+ recorder.addStatistics(SEGMENT_STORE_NAME_1, statisticsToAdd1);
recorder.removeStatistics(SEGMENT_STORE_NAME_1);
reset(recordingTrigger);
recordingTrigger.addMetricsRecorder(recorder);
replay(recordingTrigger);
- recorder.addStatistics(SEGMENT_STORE_NAME_2, statisticsToAdd2, streamsMetrics, taskId1);
+ recorder.addStatistics(SEGMENT_STORE_NAME_2, statisticsToAdd2);
verify(recordingTrigger);
}
@Test
public void shouldNotAddItselfToRecordingTriggerWhenNotEmpty() {
- mockStaticNice(RocksDBMetrics.class);
- replay(RocksDBMetrics.class);
- recorder.addStatistics(SEGMENT_STORE_NAME_1, statisticsToAdd1, streamsMetrics, taskId1);
+ recorder.addStatistics(SEGMENT_STORE_NAME_1, statisticsToAdd1);
reset(recordingTrigger);
replay(recordingTrigger);
- recorder.addStatistics(SEGMENT_STORE_NAME_2, statisticsToAdd2, streamsMetrics, taskId1);
+ recorder.addStatistics(SEGMENT_STORE_NAME_2, statisticsToAdd2);
verify(recordingTrigger);
}
@Test
public void shouldCloseStatisticsWhenStatisticsIsRemoved() {
- mockStaticNice(RocksDBMetrics.class);
- replay(RocksDBMetrics.class);
- recorder.addStatistics(SEGMENT_STORE_NAME_1, statisticsToAdd1, streamsMetrics, taskId1);
+ recorder.addStatistics(SEGMENT_STORE_NAME_1, statisticsToAdd1);
reset(statisticsToAdd1);
statisticsToAdd1.close();
replay(statisticsToAdd1);
@@ -182,10 +189,8 @@ public class RocksDBMetricsRecorderTest {
@Test
public void shouldRemoveItselfFromRecordingTriggerWhenLastStatisticsIsRemoved() {
- mockStaticNice(RocksDBMetrics.class);
- replay(RocksDBMetrics.class);
- recorder.addStatistics(SEGMENT_STORE_NAME_1, statisticsToAdd1, streamsMetrics, taskId1);
- recorder.addStatistics(SEGMENT_STORE_NAME_2, statisticsToAdd2, streamsMetrics, taskId1);
+ recorder.addStatistics(SEGMENT_STORE_NAME_1, statisticsToAdd1);
+ recorder.addStatistics(SEGMENT_STORE_NAME_2, statisticsToAdd2);
reset(recordingTrigger);
replay(recordingTrigger);
@@ -204,9 +209,8 @@ public class RocksDBMetricsRecorderTest {
@Test
public void shouldThrowIfStatisticsToRemoveNotFound() {
- mockStaticNice(RocksDBMetrics.class);
- replay(RocksDBMetrics.class);
- recorder.addStatistics(SEGMENT_STORE_NAME_1, statisticsToAdd1, streamsMetrics, taskId1);
+ recorder.addStatistics(SEGMENT_STORE_NAME_1, statisticsToAdd1);
+
assertThrows(
IllegalStateException.class,
() -> recorder.removeStatistics(SEGMENT_STORE_NAME_2)
@@ -215,9 +219,8 @@ public class RocksDBMetricsRecorderTest {
@Test
public void shouldRecordMetrics() {
- setUpMetricsMock();
- recorder.addStatistics(SEGMENT_STORE_NAME_1, statisticsToAdd1, streamsMetrics, taskId1);
- recorder.addStatistics(SEGMENT_STORE_NAME_2, statisticsToAdd2, streamsMetrics, taskId1);
+ recorder.addStatistics(SEGMENT_STORE_NAME_1, statisticsToAdd1);
+ recorder.addStatistics(SEGMENT_STORE_NAME_2, statisticsToAdd2);
reset(statisticsToAdd1);
reset(statisticsToAdd2);
@@ -303,10 +306,9 @@ public class RocksDBMetricsRecorderTest {
@Test
public void shouldCorrectlyHandleHitRatioRecordingsWithZeroHitsAndMisses() {
- setUpMetricsMock();
- recorder.addStatistics(SEGMENT_STORE_NAME_1, statisticsToAdd1, streamsMetrics, taskId1);
resetToNice(statisticsToAdd1);
- expect(statisticsToAdd1.getTickerCount(anyObject())).andReturn(0L).anyTimes();
+ recorder.addStatistics(SEGMENT_STORE_NAME_1, statisticsToAdd1);
+ expect(statisticsToAdd1.getTickerCount(anyObject())).andStubReturn(0L);
replay(statisticsToAdd1);
memtableHitRatioSensor.record(0);
blockCacheDataHitRatioSensor.record(0);
@@ -355,4 +357,35 @@ public class RocksDBMetricsRecorderTest {
.andReturn(numberOfFileErrorsSensor);
replay(RocksDBMetrics.class);
}
+
+ private void setUpMetricsStubMock() {
+ mockStatic(RocksDBMetrics.class);
+ final RocksDBMetricContext metricsContext =
+ new RocksDBMetricContext(THREAD_ID, taskId1.toString(), METRICS_SCOPE, STORE_NAME);
+ expect(RocksDBMetrics.bytesWrittenToDatabaseSensor(streamsMetrics, metricsContext))
+ .andStubReturn(bytesWrittenToDatabaseSensor);
+ expect(RocksDBMetrics.bytesReadFromDatabaseSensor(streamsMetrics, metricsContext))
+ .andStubReturn(bytesReadFromDatabaseSensor);
+ expect(RocksDBMetrics.memtableBytesFlushedSensor(streamsMetrics, metricsContext))
+ .andStubReturn(memtableBytesFlushedSensor);
+ expect(RocksDBMetrics.memtableHitRatioSensor(streamsMetrics, metricsContext))
+ .andStubReturn(memtableHitRatioSensor);
+ expect(RocksDBMetrics.writeStallDurationSensor(streamsMetrics, metricsContext))
+ .andStubReturn(writeStallDurationSensor);
+ expect(RocksDBMetrics.blockCacheDataHitRatioSensor(streamsMetrics, metricsContext))
+ .andStubReturn(blockCacheDataHitRatioSensor);
+ expect(RocksDBMetrics.blockCacheIndexHitRatioSensor(streamsMetrics, metricsContext))
+ .andStubReturn(blockCacheIndexHitRatioSensor);
+ expect(RocksDBMetrics.blockCacheFilterHitRatioSensor(streamsMetrics, metricsContext))
+ .andStubReturn(blockCacheFilterHitRatioSensor);
+ expect(RocksDBMetrics.bytesWrittenDuringCompactionSensor(streamsMetrics, metricsContext))
+ .andStubReturn(bytesWrittenDuringCompactionSensor);
+ expect(RocksDBMetrics.bytesReadDuringCompactionSensor(streamsMetrics, metricsContext))
+ .andStubReturn(bytesReadDuringCompactionSensor);
+ expect(RocksDBMetrics.numberOfOpenFilesSensor(streamsMetrics, metricsContext))
+ .andStubReturn(numberOfOpenFilesSensor);
+ expect(RocksDBMetrics.numberOfFileErrorsSensor(streamsMetrics, metricsContext))
+ .andStubReturn(numberOfFileErrorsSensor);
+ replay(RocksDBMetrics.class);
+ }
}
\ No newline at end of file