You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/05/08 15:26:49 UTC
[kafka] branch trunk updated: BUGFIX: Add missing recording of
close of stand-by task (#6663)
This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 83823ae BUGFIX: Add missing recording of close of stand-by task (#6663)
83823ae is described below
commit 83823aedf2e2d176004402152b69bd67f97d8e12
Author: cadonna <br...@confluent.io>
AuthorDate: Wed May 8 17:26:25 2019 +0200
BUGFIX: Add missing recording of close of stand-by task (#6663)
Adds recording of close of a stand-by task to the task-closed metric
Adds unit tests to verify the recording
Reviewers: Guozhang Wang <wa...@gmail.com>, John Roesler <jo...@confluent.io>
---
.../streams/processor/internals/StandbyTask.java | 4 +
.../processor/internals/StandbyTaskTest.java | 175 ++++++++++++++++++---
2 files changed, 159 insertions(+), 20 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 749b2ed..424ded4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.processor.TaskId;
@@ -37,6 +38,7 @@ import java.util.Map;
public class StandbyTask extends AbstractTask {
private Map<TopicPartition, Long> checkpointedOffsets = new HashMap<>();
+ private final Sensor closeTaskSensor;
/**
* Create {@link StandbyTask} with its assigned partitions
@@ -59,6 +61,7 @@ public class StandbyTask extends AbstractTask {
final StateDirectory stateDirectory) {
super(id, partitions, topology, consumer, changelogReader, true, stateDirectory, config);
+ closeTaskSensor = metrics.threadLevelSensor("task-closed", Sensor.RecordingLevel.INFO);
processorContext = new StandbyContextImpl(id, config, stateMgr, metrics);
}
@@ -132,6 +135,7 @@ public class StandbyTask extends AbstractTask {
@Override
public void close(final boolean clean,
final boolean isZombie) {
+ closeTaskSensor.record();
if (!taskInitialized) {
return;
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 6e7655a..8c8811d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -20,10 +20,14 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Total;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
@@ -45,6 +49,7 @@ import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilderTest;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
@@ -136,7 +141,10 @@ public class StandbyTaskTest {
}
private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
- private final MockRestoreConsumer<Integer, Integer> restoreStateConsumer = new MockRestoreConsumer<>(new IntegerSerializer(), new IntegerSerializer());
+ private final MockRestoreConsumer<Integer, Integer> restoreStateConsumer = new MockRestoreConsumer<>(
+ new IntegerSerializer(),
+ new IntegerSerializer()
+ );
private final StoreChangelogReader changelogReader = new StoreChangelogReader(
restoreStateConsumer,
Duration.ZERO,
@@ -147,6 +155,9 @@ public class StandbyTaskTest {
private final byte[] recordValue = intSerializer.serialize(null, 10);
private final byte[] recordKey = intSerializer.serialize(null, 1);
+ private final String threadName = "threadName";
+ private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), threadName);
+
@Before
public void setup() throws Exception {
restoreStateConsumer.reset();
@@ -177,7 +188,14 @@ public class StandbyTaskTest {
@Test
public void testStorePartitions() throws IOException {
final StreamsConfig config = createConfig(baseDir);
- task = new StandbyTask(taskId, topicPartitions, topology, consumer, changelogReader, config, null, stateDirectory);
+ task = new StandbyTask(taskId,
+ topicPartitions,
+ topology,
+ consumer,
+ changelogReader,
+ config,
+ streamsMetrics,
+ stateDirectory);
task.initializeStateStores();
assertEquals(Utils.mkSet(partition2, partition1), new HashSet<>(task.checkpointedOffsets().keySet()));
}
@@ -186,13 +204,31 @@ public class StandbyTaskTest {
@Test
public void testUpdateNonInitializedStore() throws IOException {
final StreamsConfig config = createConfig(baseDir);
- task = new StandbyTask(taskId, topicPartitions, topology, consumer, changelogReader, config, null, stateDirectory);
+ task = new StandbyTask(taskId,
+ topicPartitions,
+ topology,
+ consumer,
+ changelogReader,
+ config,
+ streamsMetrics,
+ stateDirectory);
restoreStateConsumer.assign(new ArrayList<>(task.checkpointedOffsets().keySet()));
try {
task.update(partition1,
- singletonList(new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue))
+ singletonList(
+ new ConsumerRecord<>(
+ partition1.topic(),
+ partition1.partition(),
+ 10,
+ 0L,
+ TimestampType.CREATE_TIME,
+ 0L,
+ 0,
+ 0,
+ recordKey,
+ recordValue))
);
fail("expected an exception");
} catch (final NullPointerException npe) {
@@ -204,15 +240,48 @@ public class StandbyTaskTest {
@Test
public void testUpdate() throws IOException {
final StreamsConfig config = createConfig(baseDir);
- task = new StandbyTask(taskId, topicPartitions, topology, consumer, changelogReader, config, null, stateDirectory);
+ task = new StandbyTask(taskId,
+ topicPartitions,
+ topology,
+ consumer,
+ changelogReader,
+ config,
+ streamsMetrics,
+ stateDirectory);
task.initializeStateStores();
final Set<TopicPartition> partition = Collections.singleton(partition2);
restoreStateConsumer.assign(partition);
- for (final ConsumerRecord<Integer, Integer> record : asList(
- new ConsumerRecord<>(partition2.topic(), partition2.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 1, 100),
- new ConsumerRecord<>(partition2.topic(), partition2.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 2, 100),
- new ConsumerRecord<>(partition2.topic(), partition2.partition(), 30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 3, 100))) {
+ for (final ConsumerRecord<Integer, Integer> record : asList(new ConsumerRecord<>(partition2.topic(),
+ partition2.partition(),
+ 10,
+ 0L,
+ TimestampType.CREATE_TIME,
+ 0L,
+ 0,
+ 0,
+ 1,
+ 100),
+ new ConsumerRecord<>(partition2.topic(),
+ partition2.partition(),
+ 20,
+ 0L,
+ TimestampType.CREATE_TIME,
+ 0L,
+ 0,
+ 0,
+ 2,
+ 100),
+ new ConsumerRecord<>(partition2.topic(),
+ partition2.partition(),
+ 30,
+ 0L,
+ TimestampType.CREATE_TIME,
+ 0L,
+ 0,
+ 0,
+ 3,
+ 100))) {
restoreStateConsumer.bufferRecord(record);
}
@@ -380,7 +449,8 @@ public class StandbyTaskTest {
}
@SuppressWarnings("unchecked")
- private List<KeyValue<Windowed<Integer>, Long>> getWindowedStoreContents(final String storeName, final StandbyTask task) {
+ private List<KeyValue<Windowed<Integer>, Long>> getWindowedStoreContents(final String storeName,
+ final StandbyTask task) {
final StandbyContextImpl context = (StandbyContextImpl) task.context();
final List<KeyValue<Windowed<Integer>, Long>> result = new ArrayList<>();
@@ -410,7 +480,7 @@ public class StandbyTaskTest {
consumer,
changelogReader,
createConfig(baseDir),
- null,
+ streamsMetrics,
stateDirectory
);
task.initializeStateStores();
@@ -464,7 +534,9 @@ public class StandbyTaskTest {
assertEquals(emptyList(), remaining);
}
- private ConsumerRecord<byte[], byte[]> makeConsumerRecord(final TopicPartition topicPartition, final long offset, final int key) {
+ private ConsumerRecord<byte[], byte[]> makeConsumerRecord(final TopicPartition topicPartition,
+ final long offset,
+ final int key) {
final IntegerSerializer integerSerializer = new IntegerSerializer();
return new ConsumerRecord<>(
topicPartition.topic(),
@@ -491,7 +563,8 @@ public class StandbyTaskTest {
@Test
public void shouldInitializeWindowStoreWithoutException() throws IOException {
final InternalStreamsBuilder builder = new InternalStreamsBuilder(new InternalTopologyBuilder());
- builder.stream(Collections.singleton("topic"), new ConsumedInternal<>()).groupByKey().windowedBy(TimeWindows.of(ofMillis(100))).count();
+ builder.stream(Collections.singleton("topic"),
+ new ConsumedInternal<>()).groupByKey().windowedBy(TimeWindows.of(ofMillis(100))).count();
initializeStandbyStores(builder);
}
@@ -522,7 +595,8 @@ public class StandbyTaskTest {
public void shouldCheckpointStoreOffsetsOnCommit() throws IOException {
consumer.assign(Collections.singletonList(globalTopicPartition));
final Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>();
- committedOffsets.put(new TopicPartition(globalTopicPartition.topic(), globalTopicPartition.partition()), new OffsetAndMetadata(100L));
+ committedOffsets.put(new TopicPartition(globalTopicPartition.topic(), globalTopicPartition.partition()),
+ new OffsetAndMetadata(100L));
consumer.commitSync(committedOffsets);
restoreStateConsumer.updatePartitions(
@@ -540,7 +614,7 @@ public class StandbyTaskTest {
consumer,
changelogReader,
config,
- null,
+ streamsMetrics,
stateDirectory
);
task.initializeStateStores();
@@ -550,9 +624,11 @@ public class StandbyTaskTest {
final byte[] serializedValue = Serdes.Integer().serializer().serialize("", 1);
task.update(
globalTopicPartition,
- singletonList(
- new ConsumerRecord<>(globalTopicPartition.topic(), globalTopicPartition.partition(), 50L, serializedValue, serializedValue)
- )
+ singletonList(new ConsumerRecord<>(globalTopicPartition.topic(),
+ globalTopicPartition.partition(),
+ 50L,
+ serializedValue,
+ serializedValue))
);
time.sleep(config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG));
@@ -569,7 +645,8 @@ public class StandbyTaskTest {
public void shouldCloseStateMangerOnTaskCloseWhenCommitFailed() throws Exception {
consumer.assign(Collections.singletonList(globalTopicPartition));
final Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>();
- committedOffsets.put(new TopicPartition(globalTopicPartition.topic(), globalTopicPartition.partition()), new OffsetAndMetadata(100L));
+ committedOffsets.put(new TopicPartition(globalTopicPartition.topic(), globalTopicPartition.partition()),
+ new OffsetAndMetadata(100L));
consumer.commitSync(committedOffsets);
restoreStateConsumer.updatePartitions(
@@ -586,7 +663,7 @@ public class StandbyTaskTest {
consumer,
changelogReader,
config,
- null,
+ streamsMetrics,
stateDirectory
) {
@Override
@@ -610,4 +687,62 @@ public class StandbyTaskTest {
assertTrue(closedStateManager.get());
}
+ private MetricName setupCloseTaskMetric() {
+ final MetricName metricName = new MetricName("name", "group", "description", Collections.emptyMap());
+ final Sensor sensor = streamsMetrics.threadLevelSensor("task-closed", Sensor.RecordingLevel.INFO);
+ sensor.add(metricName, new Total());
+ return metricName;
+ }
+
+ private void verifyCloseTaskMetric(final double expected,
+ final StreamsMetricsImpl streamsMetrics,
+ final MetricName metricName) {
+ final KafkaMetric metric = (KafkaMetric) streamsMetrics.metrics().get(metricName);
+ final double totalCloses = metric.measurable().measure(metric.config(), System.currentTimeMillis());
+ assertThat(totalCloses, equalTo(expected));
+ }
+
+ @Test
+ public void shouldRecordTaskClosedMetricOnClose() throws IOException {
+ final MetricName metricName = setupCloseTaskMetric();
+ final StandbyTask task = new StandbyTask(
+ taskId,
+ ktablePartitions,
+ ktableTopology,
+ consumer,
+ changelogReader,
+ createConfig(baseDir),
+ streamsMetrics,
+ stateDirectory
+ );
+
+ final boolean clean = true;
+ final boolean isZombie = false;
+ task.close(clean, isZombie);
+
+ final double expectedCloseTaskMetric = 1.0;
+ verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName);
+ }
+
+ @Test
+ public void shouldRecordTaskClosedMetricOnCloseSuspended() throws IOException {
+ final MetricName metricName = setupCloseTaskMetric();
+ final StandbyTask task = new StandbyTask(
+ taskId,
+ ktablePartitions,
+ ktableTopology,
+ consumer,
+ changelogReader,
+ createConfig(baseDir),
+ streamsMetrics,
+ stateDirectory
+ );
+
+ final boolean clean = true;
+ final boolean isZombie = false;
+ task.closeSuspended(clean, isZombie, new RuntimeException());
+
+ final double expectedCloseTaskMetric = 1.0;
+ verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName);
+ }
}