You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/05/03 21:19:29 UTC

kafka git commit: KAFKA-5055: Fix Kafka Streams skipped-records-rate sensor bug

Repository: kafka
Updated Branches:
  refs/heads/trunk bc65c62e6 -> a3952aee4


KAFKA-5055: Fix Kafka Streams skipped-records-rate sensor bug

Fix as described in the [KAFKA-5055 Jira comment](https://issues.apache.org/jira/browse/KAFKA-5055?focusedCommentId=15990086&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15990086).

mjsax guozhangwang take a look

Author: dpoldrugo <dp...@gmail.com>

Reviewers: Matthias J. Sax, Guozhang Wang

Closes #2949 from dpoldrugo/KAFKA-5055-skipped-records-rate-sensor-bug


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a3952aee
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a3952aee
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a3952aee

Branch: refs/heads/trunk
Commit: a3952aee49c5eb11cd38c1bc2fb95e9a285ff158
Parents: bc65c62
Author: Davor Poldrugo <dp...@gmail.com>
Authored: Wed May 3 14:19:25 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed May 3 14:19:25 2017 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/streams/processor/internals/StreamTask.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a3952aee/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index d18efef..4a49f9d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -380,7 +380,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
      */
     @SuppressWarnings("unchecked")
     public int addRecords(final TopicPartition partition, final Iterable<ConsumerRecord<byte[], byte[]>> records) {
-        final int oldQueueSize = partitionGroup.numBuffered();
+        final int oldQueueSize = partitionGroup.numBuffered(partition);
         final int newQueueSize = partitionGroup.addRawRecords(partition, records);
 
         log.trace("{} Added records into the buffered queue of partition {}, new queue size is {}", logPrefix, partition, newQueueSize);