You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2021/02/17 01:14:13 UTC

[kafka] 01/02: KAFKA-12272: Fix commit-interval metrics (#10102)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit ab17937abcd6aee6192679baf7a0b4f1e0a15fe1
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Thu Feb 11 16:49:05 2021 -0800

    KAFKA-12272: Fix commit-interval metrics (#10102)
    
    Reviewer: A. Sophie Blee-Goldman <so...@confluent.io>
---
 .../streams/processor/internals/StreamThread.java  |   5 +-
 .../processor/internals/StreamThreadTest.java      | 116 ++++++++++++++++++++-
 2 files changed, 118 insertions(+), 3 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 3ea40d5..3dc0c02 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -776,9 +776,10 @@ public class StreamThread extends Thread {
 
                 log.debug("{} punctuators ran.", punctuated);
 
+                final long beforeCommitMs = now;
                 final int committed = maybeCommit();
                 totalCommittedSinceLastSummary += committed;
-                final long commitLatency = advanceNowAndComputeLatency();
+                final long commitLatency = Math.max(now - beforeCommitMs, 0);
                 totalCommitLatency += commitLatency;
                 if (committed > 0) {
                     commitSensor.record(commitLatency / (double) committed, now);
@@ -1016,7 +1017,7 @@ public class StreamThread extends Thread {
             if (committed == -1) {
                 log.debug("Unable to commit as we are in the middle of a rebalance, will try again when it completes.");
             } else {
-                advanceNowAndComputeLatency();
+                now = time.milliseconds();
                 lastCommitMs = now;
             }
         } else {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 2f95a20..e4d083b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -117,6 +117,7 @@ import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.mock;
+import static org.easymock.EasyMock.niceMock;
 import static org.easymock.EasyMock.verify;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.not;
@@ -836,6 +837,119 @@ public class StreamThreadTest {
     }
 
     @Test
+    public void shouldRecordCommitLatency() {
+        final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
+        final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class);
+        expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata);
+        expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
+        expect(consumer.poll(anyObject())).andStubReturn(new ConsumerRecords<>(Collections.emptyMap()));
+        final Task task = niceMock(Task.class);
+        expect(task.id()).andStubReturn(task1);
+        expect(task.inputPartitions()).andStubReturn(Collections.singleton(t1p1));
+        final ActiveTaskCreator activeTaskCreator = mock(ActiveTaskCreator.class);
+        expect(activeTaskCreator.createTasks(anyObject(), anyObject())).andStubReturn(Collections.singleton(task));
+        expect(activeTaskCreator.producerClientIds()).andStubReturn(Collections.singleton("producerClientId"));
+        EasyMock.replay(consumer, consumerGroupMetadata, task, activeTaskCreator);
+
+        final StreamsMetricsImpl streamsMetrics =
+            new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
+
+        final TaskManager taskManager = new TaskManager(
+            null,
+            null,
+            null,
+            null,
+            null,
+            activeTaskCreator,
+            null,
+            internalTopologyBuilder,
+            null,
+            null,
+            null
+        ) {
+            @Override
+            int commit(final Collection<Task> tasksToCommit) {
+                mockTime.sleep(10L);
+                return 1;
+            }
+        };
+        taskManager.setMainConsumer(consumer);
+
+        final StreamThread thread = new StreamThread(
+            mockTime,
+            config,
+            null,
+            consumer,
+            consumer,
+            changelogReader,
+            null,
+            taskManager,
+            streamsMetrics,
+            internalTopologyBuilder,
+            CLIENT_ID,
+            new LogContext(""),
+            new AtomicInteger(),
+            new AtomicLong(Long.MAX_VALUE),
+            null,
+            HANDLER,
+            null
+        );
+        thread.updateThreadMetadata("adminClientId");
+        thread.setState(StreamThread.State.STARTING);
+
+        final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
+        activeTasks.put(task1, Collections.singleton(t1p1));
+        thread.taskManager().handleAssignment(activeTasks, emptyMap());
+        thread.rebalanceListener().onPartitionsAssigned(Collections.singleton(t1p1));
+
+        assertTrue(
+            Double.isNaN(
+                (Double) streamsMetrics.metrics().get(new MetricName(
+                    "commit-latency-max",
+                    "stream-thread-metrics",
+                    "",
+                    Collections.singletonMap("thread-id", CLIENT_ID))
+                ).metricValue()
+            )
+        );
+        assertTrue(
+            Double.isNaN(
+                (Double) streamsMetrics.metrics().get(new MetricName(
+                    "commit-latency-avg",
+                    "stream-thread-metrics",
+                    "",
+                    Collections.singletonMap("thread-id", CLIENT_ID))
+                ).metricValue()
+            )
+        );
+
+        thread.runOnce();
+
+        assertThat(
+            streamsMetrics.metrics().get(
+                new MetricName(
+                    "commit-latency-max",
+                    "stream-thread-metrics",
+                    "",
+                    Collections.singletonMap("thread-id", CLIENT_ID)
+                )
+            ).metricValue(),
+            equalTo(10.0)
+        );
+        assertThat(
+            streamsMetrics.metrics().get(
+                new MetricName(
+                    "commit-latency-avg",
+                    "stream-thread-metrics",
+                    "",
+                    Collections.singletonMap("thread-id", CLIENT_ID)
+                )
+            ).metricValue(),
+            equalTo(10.0)
+        );
+    }
+
+    @Test
     public void shouldInjectSharedProducerForAllTasksUsingClientSupplierOnCreateIfEosDisabled() {
         internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1);
         internalStreamsBuilder.buildAndOptimizeTopology();
@@ -2621,7 +2735,7 @@ public class StreamThreadTest {
         internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1);
         internalTopologyBuilder.addProcessor(
             "processor1",
-            (ProcessorSupplier<byte[], byte[], ?, ?>) () -> new MockApiProcessor<>(),
+            (ProcessorSupplier<byte[], byte[], ?, ?>) MockApiProcessor::new,
             "source1"
         );
     }