You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/02/05 18:11:26 UTC
kafka git commit: KAFKA-3157: Mirror maker doesn't commit offset with
low traffic
Repository: kafka
Updated Branches:
refs/heads/trunk 216c75bbc -> d6e36df87
KAFKA-3157: Mirror maker doesn't commit offset with low traffic
Mirror maker doesn't commit offset with new consumer enabled when data volume is low. This is caused by infinite loop in ```receive()``` which would never jump out of loop if no data coming
Author: Tao Xiao <xi...@gmail.com>
Reviewers: Ismael Juma, Jason Gustafson
Closes #821 from xiaotao183/KAFKA-3157
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d6e36df8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d6e36df8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d6e36df8
Branch: refs/heads/trunk
Commit: d6e36df8737aa2b898b4fd0a81c2d94f3c349b68
Parents: 216c75b
Author: Tao Xiao <xi...@gmail.com>
Authored: Fri Feb 5 09:11:24 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Feb 5 09:11:24 2016 -0800
----------------------------------------------------------------------
core/src/main/scala/kafka/tools/MirrorMaker.scala | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/d6e36df8/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index fb3762c..a964f69 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -527,12 +527,14 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
}
}
- // New consumer always hasNext
override def hasData = true
override def receive() : BaseConsumerRecord = {
- while (recordIter == null || !recordIter.hasNext)
+ if (recordIter == null || !recordIter.hasNext) {
recordIter = consumer.poll(1000).iterator
+ if (!recordIter.hasNext)
+ throw new ConsumerTimeoutException
+ }
val record = recordIter.next()
val tp = new TopicPartition(record.topic, record.partition)