You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/05/08 15:26:49 UTC

[kafka] branch trunk updated: BUGFIX: Add missing recording of close of stand-by task (#6663)

This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 83823ae  BUGFIX: Add missing recording of close of stand-by task (#6663)
83823ae is described below

commit 83823aedf2e2d176004402152b69bd67f97d8e12
Author: cadonna <br...@confluent.io>
AuthorDate: Wed May 8 17:26:25 2019 +0200

    BUGFIX: Add missing recording of close of stand-by task (#6663)
    
    Adds recording of close of a stand-by task to the task-closed metric
    Adds unit tests to verify the recording
    
    Reviewers: Guozhang Wang <wa...@gmail.com>, John Roesler <jo...@confluent.io>
---
 .../streams/processor/internals/StandbyTask.java   |   4 +
 .../processor/internals/StandbyTaskTest.java       | 175 ++++++++++++++++++---
 2 files changed, 159 insertions(+), 20 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 749b2ed..424ded4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.TaskId;
@@ -37,6 +38,7 @@ import java.util.Map;
 public class StandbyTask extends AbstractTask {
 
     private Map<TopicPartition, Long> checkpointedOffsets = new HashMap<>();
+    private final Sensor closeTaskSensor;
 
     /**
      * Create {@link StandbyTask} with its assigned partitions
@@ -59,6 +61,7 @@ public class StandbyTask extends AbstractTask {
                 final StateDirectory stateDirectory) {
         super(id, partitions, topology, consumer, changelogReader, true, stateDirectory, config);
 
+        closeTaskSensor = metrics.threadLevelSensor("task-closed", Sensor.RecordingLevel.INFO);
         processorContext = new StandbyContextImpl(id, config, stateMgr, metrics);
     }
 
@@ -132,6 +135,7 @@ public class StandbyTask extends AbstractTask {
     @Override
     public void close(final boolean clean,
                       final boolean isZombie) {
+        closeTaskSensor.record();
         if (!taskInitialized) {
             return;
         }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 6e7655a..8c8811d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -20,10 +20,14 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Total;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
@@ -45,6 +49,7 @@ import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
 import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilderTest;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
@@ -136,7 +141,10 @@ public class StandbyTaskTest {
     }
 
     private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
-    private final MockRestoreConsumer<Integer, Integer> restoreStateConsumer = new MockRestoreConsumer<>(new IntegerSerializer(), new IntegerSerializer());
+    private final MockRestoreConsumer<Integer, Integer> restoreStateConsumer = new MockRestoreConsumer<>(
+        new IntegerSerializer(),
+        new IntegerSerializer()
+    );
     private final StoreChangelogReader changelogReader = new StoreChangelogReader(
         restoreStateConsumer,
         Duration.ZERO,
@@ -147,6 +155,9 @@ public class StandbyTaskTest {
     private final byte[] recordValue = intSerializer.serialize(null, 10);
     private final byte[] recordKey = intSerializer.serialize(null, 1);
 
+    private final String threadName = "threadName";
+    private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), threadName);
+
     @Before
     public void setup() throws Exception {
         restoreStateConsumer.reset();
@@ -177,7 +188,14 @@ public class StandbyTaskTest {
     @Test
     public void testStorePartitions() throws IOException {
         final StreamsConfig config = createConfig(baseDir);
-        task = new StandbyTask(taskId, topicPartitions, topology, consumer, changelogReader, config, null, stateDirectory);
+        task = new StandbyTask(taskId,
+                               topicPartitions,
+                               topology,
+                               consumer,
+                               changelogReader,
+                               config,
+                               streamsMetrics,
+                               stateDirectory);
         task.initializeStateStores();
         assertEquals(Utils.mkSet(partition2, partition1), new HashSet<>(task.checkpointedOffsets().keySet()));
     }
@@ -186,13 +204,31 @@ public class StandbyTaskTest {
     @Test
     public void testUpdateNonInitializedStore() throws IOException {
         final StreamsConfig config = createConfig(baseDir);
-        task = new StandbyTask(taskId, topicPartitions, topology, consumer, changelogReader, config, null, stateDirectory);
+        task = new StandbyTask(taskId,
+                               topicPartitions,
+                               topology,
+                               consumer,
+                               changelogReader,
+                               config,
+                               streamsMetrics,
+                               stateDirectory);
 
         restoreStateConsumer.assign(new ArrayList<>(task.checkpointedOffsets().keySet()));
 
         try {
             task.update(partition1,
-                        singletonList(new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue))
+                        singletonList(
+                            new ConsumerRecord<>(
+                                partition1.topic(),
+                                partition1.partition(),
+                                10,
+                                0L,
+                                TimestampType.CREATE_TIME,
+                                0L,
+                                0,
+                                0,
+                                recordKey,
+                                recordValue))
             );
             fail("expected an exception");
         } catch (final NullPointerException npe) {
@@ -204,15 +240,48 @@ public class StandbyTaskTest {
     @Test
     public void testUpdate() throws IOException {
         final StreamsConfig config = createConfig(baseDir);
-        task = new StandbyTask(taskId, topicPartitions, topology, consumer, changelogReader, config, null, stateDirectory);
+        task = new StandbyTask(taskId,
+                               topicPartitions,
+                               topology,
+                               consumer,
+                               changelogReader,
+                               config,
+                               streamsMetrics,
+                               stateDirectory);
         task.initializeStateStores();
         final Set<TopicPartition> partition = Collections.singleton(partition2);
         restoreStateConsumer.assign(partition);
 
-        for (final ConsumerRecord<Integer, Integer> record : asList(
-            new ConsumerRecord<>(partition2.topic(), partition2.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 1, 100),
-            new ConsumerRecord<>(partition2.topic(), partition2.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 2, 100),
-            new ConsumerRecord<>(partition2.topic(), partition2.partition(), 30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 3, 100))) {
+        for (final ConsumerRecord<Integer, Integer> record : asList(new ConsumerRecord<>(partition2.topic(),
+                                                                                         partition2.partition(),
+                                                                                         10,
+                                                                                         0L,
+                                                                                         TimestampType.CREATE_TIME,
+                                                                                         0L,
+                                                                                         0,
+                                                                                         0,
+                                                                                         1,
+                                                                                         100),
+                                                                    new ConsumerRecord<>(partition2.topic(),
+                                                                                         partition2.partition(),
+                                                                                         20,
+                                                                                         0L,
+                                                                                         TimestampType.CREATE_TIME,
+                                                                                         0L,
+                                                                                         0,
+                                                                                         0,
+                                                                                         2,
+                                                                                         100),
+                                                                    new ConsumerRecord<>(partition2.topic(),
+                                                                                         partition2.partition(),
+                                                                                         30,
+                                                                                         0L,
+                                                                                         TimestampType.CREATE_TIME,
+                                                                                         0L,
+                                                                                         0,
+                                                                                         0,
+                                                                                         3,
+                                                                                         100))) {
             restoreStateConsumer.bufferRecord(record);
         }
 
@@ -380,7 +449,8 @@ public class StandbyTaskTest {
     }
 
     @SuppressWarnings("unchecked")
-    private List<KeyValue<Windowed<Integer>, Long>> getWindowedStoreContents(final String storeName, final StandbyTask task) {
+    private List<KeyValue<Windowed<Integer>, Long>> getWindowedStoreContents(final String storeName,
+                                                                             final StandbyTask task) {
         final StandbyContextImpl context = (StandbyContextImpl) task.context();
 
         final List<KeyValue<Windowed<Integer>, Long>> result = new ArrayList<>();
@@ -410,7 +480,7 @@ public class StandbyTaskTest {
             consumer,
             changelogReader,
             createConfig(baseDir),
-            null,
+            streamsMetrics,
             stateDirectory
         );
         task.initializeStateStores();
@@ -464,7 +534,9 @@ public class StandbyTaskTest {
         assertEquals(emptyList(), remaining);
     }
 
-    private ConsumerRecord<byte[], byte[]> makeConsumerRecord(final TopicPartition topicPartition, final long offset, final int key) {
+    private ConsumerRecord<byte[], byte[]> makeConsumerRecord(final TopicPartition topicPartition,
+                                                              final long offset,
+                                                              final int key) {
         final IntegerSerializer integerSerializer = new IntegerSerializer();
         return new ConsumerRecord<>(
             topicPartition.topic(),
@@ -491,7 +563,8 @@ public class StandbyTaskTest {
     @Test
     public void shouldInitializeWindowStoreWithoutException() throws IOException {
         final InternalStreamsBuilder builder = new InternalStreamsBuilder(new InternalTopologyBuilder());
-        builder.stream(Collections.singleton("topic"), new ConsumedInternal<>()).groupByKey().windowedBy(TimeWindows.of(ofMillis(100))).count();
+        builder.stream(Collections.singleton("topic"),
+                       new ConsumedInternal<>()).groupByKey().windowedBy(TimeWindows.of(ofMillis(100))).count();
 
         initializeStandbyStores(builder);
     }
@@ -522,7 +595,8 @@ public class StandbyTaskTest {
     public void shouldCheckpointStoreOffsetsOnCommit() throws IOException {
         consumer.assign(Collections.singletonList(globalTopicPartition));
         final Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>();
-        committedOffsets.put(new TopicPartition(globalTopicPartition.topic(), globalTopicPartition.partition()), new OffsetAndMetadata(100L));
+        committedOffsets.put(new TopicPartition(globalTopicPartition.topic(), globalTopicPartition.partition()),
+                             new OffsetAndMetadata(100L));
         consumer.commitSync(committedOffsets);
 
         restoreStateConsumer.updatePartitions(
@@ -540,7 +614,7 @@ public class StandbyTaskTest {
             consumer,
             changelogReader,
             config,
-            null,
+            streamsMetrics,
             stateDirectory
         );
         task.initializeStateStores();
@@ -550,9 +624,11 @@ public class StandbyTaskTest {
         final byte[] serializedValue = Serdes.Integer().serializer().serialize("", 1);
         task.update(
             globalTopicPartition,
-            singletonList(
-                new ConsumerRecord<>(globalTopicPartition.topic(), globalTopicPartition.partition(), 50L, serializedValue, serializedValue)
-            )
+            singletonList(new ConsumerRecord<>(globalTopicPartition.topic(),
+                                               globalTopicPartition.partition(),
+                                        50L,
+                                               serializedValue,
+                                               serializedValue))
         );
 
         time.sleep(config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG));
@@ -569,7 +645,8 @@ public class StandbyTaskTest {
     public void shouldCloseStateMangerOnTaskCloseWhenCommitFailed() throws Exception {
         consumer.assign(Collections.singletonList(globalTopicPartition));
         final Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>();
-        committedOffsets.put(new TopicPartition(globalTopicPartition.topic(), globalTopicPartition.partition()), new OffsetAndMetadata(100L));
+        committedOffsets.put(new TopicPartition(globalTopicPartition.topic(), globalTopicPartition.partition()),
+                             new OffsetAndMetadata(100L));
         consumer.commitSync(committedOffsets);
 
         restoreStateConsumer.updatePartitions(
@@ -586,7 +663,7 @@ public class StandbyTaskTest {
             consumer,
             changelogReader,
             config,
-            null,
+            streamsMetrics,
             stateDirectory
         ) {
             @Override
@@ -610,4 +687,62 @@ public class StandbyTaskTest {
         assertTrue(closedStateManager.get());
     }
 
+    private MetricName setupCloseTaskMetric() {
+        final MetricName metricName = new MetricName("name", "group", "description", Collections.emptyMap());
+        final Sensor sensor = streamsMetrics.threadLevelSensor("task-closed", Sensor.RecordingLevel.INFO);
+        sensor.add(metricName, new Total());
+        return metricName;
+    }
+
+    private void verifyCloseTaskMetric(final double expected,
+                                       final StreamsMetricsImpl streamsMetrics,
+                                       final MetricName metricName) {
+        final KafkaMetric metric = (KafkaMetric) streamsMetrics.metrics().get(metricName);
+        final double totalCloses = metric.measurable().measure(metric.config(), System.currentTimeMillis());
+        assertThat(totalCloses, equalTo(expected));
+    }
+
+    @Test
+    public void shouldRecordTaskClosedMetricOnClose() throws IOException {
+        final MetricName metricName = setupCloseTaskMetric();
+        final StandbyTask task = new StandbyTask(
+            taskId,
+            ktablePartitions,
+            ktableTopology,
+            consumer,
+            changelogReader,
+            createConfig(baseDir),
+            streamsMetrics,
+            stateDirectory
+        );
+
+        final boolean clean = true;
+        final boolean isZombie = false;
+        task.close(clean, isZombie);
+
+        final double expectedCloseTaskMetric = 1.0;
+        verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName);
+    }
+
+    @Test
+    public void shouldRecordTaskClosedMetricOnCloseSuspended() throws IOException {
+        final MetricName metricName = setupCloseTaskMetric();
+        final StandbyTask task = new StandbyTask(
+            taskId,
+            ktablePartitions,
+            ktableTopology,
+            consumer,
+            changelogReader,
+            createConfig(baseDir),
+            streamsMetrics,
+            stateDirectory
+        );
+
+        final boolean clean = true;
+        final boolean isZombie = false;
+        task.closeSuspended(clean, isZombie, new RuntimeException());
+
+        final double expectedCloseTaskMetric = 1.0;
+        verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName);
+    }
 }