You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/05/03 21:19:29 UTC
kafka git commit: KAFKA-5055: Fix Kafka Streams skipped-records-rate
sensor bug
Repository: kafka
Updated Branches:
refs/heads/trunk bc65c62e6 -> a3952aee4
KAFKA-5055: Fix Kafka Streams skipped-records-rate sensor bug
Fix as described in the [KAFKA-5055 Jira comment](https://issues.apache.org/jira/browse/KAFKA-5055?focusedCommentId=15990086&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15990086).
mjsax guozhangwang take a look
Author: dpoldrugo <dp...@gmail.com>
Reviewers: Matthias J. Sax, Guozhang Wang
Closes #2949 from dpoldrugo/KAFKA-5055-skipped-records-rate-sensor-bug
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a3952aee
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a3952aee
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a3952aee
Branch: refs/heads/trunk
Commit: a3952aee49c5eb11cd38c1bc2fb95e9a285ff158
Parents: bc65c62
Author: Davor Poldrugo <dp...@gmail.com>
Authored: Wed May 3 14:19:25 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed May 3 14:19:25 2017 -0700
----------------------------------------------------------------------
.../org/apache/kafka/streams/processor/internals/StreamTask.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/a3952aee/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index d18efef..4a49f9d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -380,7 +380,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
*/
@SuppressWarnings("unchecked")
public int addRecords(final TopicPartition partition, final Iterable<ConsumerRecord<byte[], byte[]>> records) {
- final int oldQueueSize = partitionGroup.numBuffered();
+ final int oldQueueSize = partitionGroup.numBuffered(partition);
final int newQueueSize = partitionGroup.addRawRecords(partition, records);
log.trace("{} Added records into the buffered queue of partition {}, new queue size is {}", logPrefix, partition, newQueueSize);