You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2022/07/03 17:03:09 UTC
[kafka] branch trunk updated: HOTFIX: Correct ordering of input buffer and enforced processing sensors (#12363)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ae570f5953 HOTFIX: Correct ordering of input buffer and enforced processing sensors (#12363)
ae570f5953 is described below
commit ae570f59533ae941bbf5ab9ff5e739a5bd855fd6
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Sun Jul 3 10:02:59 2022 -0700
HOTFIX: Correct ordering of input buffer and enforced processing sensors (#12363)
1. As titled, fix the right constructor param ordering.
2. Also added a few more loglines.
Reviewers: Matthias J. Sax <ma...@confluent.io>, Sagar Rao <sa...@gmail.com>, Hao Li <11...@users.noreply.github.com>
---
.../src/main/java/org/apache/kafka/streams/KafkaStreams.java | 7 ++++++-
.../kafka/streams/processor/internals/PartitionGroup.java | 11 ++++++-----
.../apache/kafka/streams/processor/internals/StreamTask.java | 4 +---
.../kafka/streams/processor/internals/StreamThread.java | 11 ++++++++---
.../kafka/streams/processor/internals/PartitionGroupTest.java | 2 +-
5 files changed, 22 insertions(+), 13 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 86ab83f67d..2c95aa85a4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -978,6 +978,10 @@ public class KafkaStreams implements AutoCloseable {
}
// Initially, all Stream Threads are created with 0 cache size and max buffer size and then resized here.
resizeThreadCacheAndBufferMemory(numStreamThreads);
+ if (numStreamThreads > 0) {
+ log.info("Initializing {} StreamThread with cache size/max buffer size values as {} per thread.",
+ numStreamThreads, getThreadCacheAndBufferMemoryString());
+ }
stateDirCleaner = setupStateDirCleaner();
rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, applicationConfigs);
@@ -1143,7 +1147,8 @@ public class KafkaStreams implements AutoCloseable {
+ "for it to complete shutdown as this will result in deadlock.", streamThread.getName());
}
resizeThreadCacheAndBufferMemory(getNumLiveStreamThreads());
- log.info("Resizing thread cache/max buffer size due to removal of thread {}, new cache size/max buffer size per thread is {}", streamThread.getName(), getThreadCacheAndBufferMemoryString());
+ log.info("Resizing thread cache/max buffer size due to removal of thread {}, " +
+ "new cache size/max buffer size per thread is {}", streamThread.getName(), getThreadCacheAndBufferMemoryString());
if (groupInstanceID.isPresent() && callingThreadIsNotCurrentStreamThread) {
final MemberToRemove memberToRemove = new MemberToRemove(groupInstanceID.get());
final Collection<MemberToRemove> membersToRemove = Collections.singletonList(memberToRemove);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
index 21d3cbfa3f..750699a1ec 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
@@ -64,7 +64,7 @@ public class PartitionGroup {
private final Sensor enforcedProcessingSensor;
private final long maxTaskIdleMs;
private final Sensor recordLatenessSensor;
- private final Sensor totalBytesSensor;
+ private final Sensor totalInputBufferBytesSensor;
private final PriorityQueue<RecordQueue> nonEmptyQueuesByTime;
private long streamTime;
@@ -93,8 +93,8 @@ public class PartitionGroup {
final Map<TopicPartition, RecordQueue> partitionQueues,
final Function<TopicPartition, OptionalLong> lagProvider,
final Sensor recordLatenessSensor,
+ final Sensor totalInputBufferBytesSensor,
final Sensor enforcedProcessingSensor,
- final Sensor totalBytesSensor,
final long maxTaskIdleMs) {
this.logger = logContext.logger(PartitionGroup.class);
nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(), Comparator.comparingLong(RecordQueue::headRecordTimestamp));
@@ -103,7 +103,7 @@ public class PartitionGroup {
this.enforcedProcessingSensor = enforcedProcessingSensor;
this.maxTaskIdleMs = maxTaskIdleMs;
this.recordLatenessSensor = recordLatenessSensor;
- this.totalBytesSensor = totalBytesSensor;
+ this.totalInputBufferBytesSensor = totalInputBufferBytesSensor;
totalBuffered = 0;
allBuffered = false;
streamTime = RecordQueue.UNKNOWN;
@@ -230,6 +230,7 @@ public class PartitionGroup {
// if partition is removed should delete its queue
totalBuffered -= queueEntry.getValue().size();
totalBytesBuffered -= queueEntry.getValue().getTotalBytesBuffered();
+ totalInputBufferBytesSensor.record(totalBytesBuffered);
queuesIterator.remove();
removedPartitions.add(topicPartition);
}
@@ -275,7 +276,7 @@ public class PartitionGroup {
if (record != null) {
--totalBuffered;
totalBytesBuffered -= oldBufferSize - newBufferSize;
- totalBytesSensor.record(totalBytesBuffered);
+ totalInputBufferBytesSensor.record(totalBytesBuffered);
if (queue.isEmpty()) {
// if a certain queue has been drained, reset the flag
allBuffered = false;
@@ -329,7 +330,7 @@ public class PartitionGroup {
totalBuffered += newSize - oldSize;
totalBytesBuffered += newBufferSize - oldBufferSize;
- totalBytesSensor.record(totalBytesBuffered);
+ totalInputBufferBytesSensor.record(totalBytesBuffered);
return newSize;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 8514c6ae2e..33751e59d8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -176,8 +176,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
recordInfo = new PartitionGroup.RecordInfo();
- final Sensor enforcedProcessingSensor;
- enforcedProcessingSensor = TaskMetrics.enforcedProcessingSensor(threadId, taskId, streamsMetrics);
final long maxTaskIdleMs = config.maxTaskIdleMs;
partitionGroup = new PartitionGroup(
logContext,
@@ -185,7 +183,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
mainConsumer::currentLag,
TaskMetrics.recordLatenessSensor(threadId, taskId, streamsMetrics),
TaskMetrics.totalInputBufferBytesSensor(threadId, taskId, streamsMetrics),
- enforcedProcessingSensor,
+ TaskMetrics.enforcedProcessingSensor(threadId, taskId, streamsMetrics),
maxTaskIdleMs
);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 64a4ff5433..1c252c2fda 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -798,7 +798,10 @@ public class StreamThread extends Thread {
totalRecordsProcessedSinceLastSummary += processed;
final long bufferSize = taskManager.getInputBufferSizeInBytes();
if (bufferSize <= maxBufferSizeBytes.get()) {
- mainConsumer.resume(mainConsumer.paused());
+ final Set<TopicPartition> pausedPartitions = mainConsumer.paused();
+ log.info("Buffered records size {} bytes falls below {}. Resuming all the paused partitions {} in the consumer",
+ bufferSize, maxBufferSizeBytes.get(), pausedPartitions);
+ mainConsumer.resume(pausedPartitions);
}
}
@@ -969,12 +972,14 @@ public class StreamThread extends Thread {
final long bufferSize = taskManager.getInputBufferSizeInBytes();
// Pausing partitions as the buffer size now exceeds max buffer size
if (bufferSize > maxBufferSizeBytes.get()) {
- log.info("Buffered records size {} bytes exceeds {}. Pausing the consumer", bufferSize, maxBufferSizeBytes.get());
+ final Set<TopicPartition> nonEmptyPartitions = taskManager.nonEmptyPartitions();
+ log.info("Buffered records size {} bytes exceeds {}. Pausing partitions {} from the consumer",
+ bufferSize, maxBufferSizeBytes.get(), nonEmptyPartitions);
// Only non-empty partitions are paused here. Reason is that, if a task has multiple partitions with
// some of them empty, then in that case pausing even empty partitions would sacrifice ordered processing
// and even lead to temporal deadlock. More explanation can be found here:
// https://issues.apache.org/jira/browse/KAFKA-13152?focusedCommentId=17400647&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17400647
- mainConsumer.pause(taskManager.nonEmptyPartitions());
+ mainConsumer.pause(nonEmptyPartitions);
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
index 012373607e..e29cf69d61 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
@@ -861,8 +861,8 @@ public class PartitionGroupTest {
),
tp -> OptionalLong.of(0L),
getValueSensor(metrics, lastLatenessValue),
- enforcedProcessingSensor,
getValueSensor(metrics, totalBytesValue),
+ enforcedProcessingSensor,
maxTaskIdleMs
);
}