You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2021/02/12 00:50:12 UTC
[kafka] branch trunk updated: KAFKA-12272: Fix commit-interval
metrics (#10102)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d4de383 KAFKA-12272: Fix commit-interval metrics (#10102)
d4de383 is described below
commit d4de383f5fe8e0d35bdb82f85d71aaaea77d8e90
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Thu Feb 11 16:49:05 2021 -0800
KAFKA-12272: Fix commit-interval metrics (#10102)
Reviewer: A. Sophie Blee-Goldman <so...@confluent.io>
---
.../streams/processor/internals/StreamThread.java | 5 +-
.../processor/internals/StreamThreadTest.java | 116 ++++++++++++++++++++-
2 files changed, 118 insertions(+), 3 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 47eb965..a9bd699 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -776,9 +776,10 @@ public class StreamThread extends Thread {
log.debug("{} punctuators ran.", punctuated);
+ final long beforeCommitMs = now;
final int committed = maybeCommit();
totalCommittedSinceLastSummary += committed;
- final long commitLatency = advanceNowAndComputeLatency();
+ final long commitLatency = Math.max(now - beforeCommitMs, 0);
totalCommitLatency += commitLatency;
if (committed > 0) {
commitSensor.record(commitLatency / (double) committed, now);
@@ -1020,7 +1021,7 @@ public class StreamThread extends Thread {
if (committed == -1) {
log.debug("Unable to commit as we are in the middle of a rebalance, will try again when it completes.");
} else {
- advanceNowAndComputeLatency();
+ now = time.milliseconds();
lastCommitMs = now;
}
} else {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 2f95a20..e4d083b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -117,6 +117,7 @@ import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.mock;
+import static org.easymock.EasyMock.niceMock;
import static org.easymock.EasyMock.verify;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
@@ -836,6 +837,119 @@ public class StreamThreadTest {
}
@Test
+ public void shouldRecordCommitLatency() {
+ final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
+ final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class);
+ expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata);
+ expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
+ expect(consumer.poll(anyObject())).andStubReturn(new ConsumerRecords<>(Collections.emptyMap()));
+ final Task task = niceMock(Task.class);
+ expect(task.id()).andStubReturn(task1);
+ expect(task.inputPartitions()).andStubReturn(Collections.singleton(t1p1));
+ final ActiveTaskCreator activeTaskCreator = mock(ActiveTaskCreator.class);
+ expect(activeTaskCreator.createTasks(anyObject(), anyObject())).andStubReturn(Collections.singleton(task));
+ expect(activeTaskCreator.producerClientIds()).andStubReturn(Collections.singleton("producerClientId"));
+ EasyMock.replay(consumer, consumerGroupMetadata, task, activeTaskCreator);
+
+ final StreamsMetricsImpl streamsMetrics =
+ new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
+
+ final TaskManager taskManager = new TaskManager(
+ null,
+ null,
+ null,
+ null,
+ null,
+ activeTaskCreator,
+ null,
+ internalTopologyBuilder,
+ null,
+ null,
+ null
+ ) {
+ @Override
+ int commit(final Collection<Task> tasksToCommit) {
+ mockTime.sleep(10L);
+ return 1;
+ }
+ };
+ taskManager.setMainConsumer(consumer);
+
+ final StreamThread thread = new StreamThread(
+ mockTime,
+ config,
+ null,
+ consumer,
+ consumer,
+ changelogReader,
+ null,
+ taskManager,
+ streamsMetrics,
+ internalTopologyBuilder,
+ CLIENT_ID,
+ new LogContext(""),
+ new AtomicInteger(),
+ new AtomicLong(Long.MAX_VALUE),
+ null,
+ HANDLER,
+ null
+ );
+ thread.updateThreadMetadata("adminClientId");
+ thread.setState(StreamThread.State.STARTING);
+
+ final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
+ activeTasks.put(task1, Collections.singleton(t1p1));
+ thread.taskManager().handleAssignment(activeTasks, emptyMap());
+ thread.rebalanceListener().onPartitionsAssigned(Collections.singleton(t1p1));
+
+ assertTrue(
+ Double.isNaN(
+ (Double) streamsMetrics.metrics().get(new MetricName(
+ "commit-latency-max",
+ "stream-thread-metrics",
+ "",
+ Collections.singletonMap("thread-id", CLIENT_ID))
+ ).metricValue()
+ )
+ );
+ assertTrue(
+ Double.isNaN(
+ (Double) streamsMetrics.metrics().get(new MetricName(
+ "commit-latency-avg",
+ "stream-thread-metrics",
+ "",
+ Collections.singletonMap("thread-id", CLIENT_ID))
+ ).metricValue()
+ )
+ );
+
+ thread.runOnce();
+
+ assertThat(
+ streamsMetrics.metrics().get(
+ new MetricName(
+ "commit-latency-max",
+ "stream-thread-metrics",
+ "",
+ Collections.singletonMap("thread-id", CLIENT_ID)
+ )
+ ).metricValue(),
+ equalTo(10.0)
+ );
+ assertThat(
+ streamsMetrics.metrics().get(
+ new MetricName(
+ "commit-latency-avg",
+ "stream-thread-metrics",
+ "",
+ Collections.singletonMap("thread-id", CLIENT_ID)
+ )
+ ).metricValue(),
+ equalTo(10.0)
+ );
+ }
+
+ @Test
public void shouldInjectSharedProducerForAllTasksUsingClientSupplierOnCreateIfEosDisabled() {
internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1);
internalStreamsBuilder.buildAndOptimizeTopology();
@@ -2621,7 +2735,7 @@ public class StreamThreadTest {
internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1);
internalTopologyBuilder.addProcessor(
"processor1",
- (ProcessorSupplier<byte[], byte[], ?, ?>) () -> new MockApiProcessor<>(),
+ (ProcessorSupplier<byte[], byte[], ?, ?>) MockApiProcessor::new,
"source1"
);
}