You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/08/11 05:57:11 UTC

kafka git commit: KAFKA-2202: fix consumerTimeoutMs computation on ConsumerPerformance; reviewed by Guozhang Wang

Repository: kafka
Updated Branches:
  refs/heads/trunk 1ed88f0eb -> 1fdb758f2


KAFKA-2202: fix consumerTimeoutMs computation on ConsumerPerformance; reviewed by Guozhang Wang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1fdb758f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1fdb758f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1fdb758f

Branch: refs/heads/trunk
Commit: 1fdb758f286868a00cbebcc5bbcfc8195529158e
Parents: 1ed88f0
Author: Manikumar Reddy <ma...@gmail.com>
Authored: Mon Aug 10 20:58:20 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Aug 10 20:58:20 2015 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/tools/ConsumerPerformance.scala | 15 +++++++++++----
 1 file changed, 11 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1fdb758f/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
index 903318d..7797dee 100644
--- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
@@ -30,6 +30,7 @@ import kafka.consumer.ConsumerConnector
 import kafka.consumer.KafkaStream
 import kafka.consumer.ConsumerTimeoutException
 import java.text.SimpleDateFormat
+import java.util.concurrent.atomic.AtomicBoolean
 
 /**
  * Performance test for the full zookeeper consumer
@@ -43,6 +44,7 @@ object ConsumerPerformance {
     logger.info("Starting consumer...")
     val totalMessagesRead = new AtomicLong(0)
     val totalBytesRead = new AtomicLong(0)
+    val consumerTimeout = new AtomicBoolean(false)
 
     if (!config.hideHeader) {
       if (!config.showDetailedStats)
@@ -67,7 +69,7 @@ object ConsumerPerformance {
       var threadList = List[ConsumerPerfThread]()
       for ((topic, streamList) <- topicMessageStreams)
         for (i <- 0 until streamList.length)
-          threadList ::= new ConsumerPerfThread(i, "kafka-zk-consumer-" + i, streamList(i), config, totalMessagesRead, totalBytesRead)
+          threadList ::= new ConsumerPerfThread(i, "kafka-zk-consumer-" + i, streamList(i), config, totalMessagesRead, totalBytesRead, consumerTimeout)
 
       logger.info("Sleeping for 1 second.")
       Thread.sleep(1000)
@@ -77,7 +79,10 @@ object ConsumerPerformance {
         thread.start
       for (thread <- threadList)
         thread.join
-      endMs = System.currentTimeMillis - consumerConfig.consumerTimeoutMs 
+      if(consumerTimeout.get())
+	endMs = System.currentTimeMillis - consumerConfig.consumerTimeoutMs
+      else
+	endMs = System.currentTimeMillis
       consumerConnector.shutdown()
     }
     val elapsedSecs = (endMs - startMs) / 1000.0
@@ -209,7 +214,7 @@ object ConsumerPerformance {
   }
 
   class ConsumerPerfThread(threadId: Int, name: String, stream: KafkaStream[Array[Byte], Array[Byte]],
-    config: ConsumerPerfConfig, totalMessagesRead: AtomicLong, totalBytesRead: AtomicLong)
+    config: ConsumerPerfConfig, totalMessagesRead: AtomicLong, totalBytesRead: AtomicLong, consumerTimeout: AtomicBoolean)
     extends Thread(name) {
 
     override def run() {
@@ -238,7 +243,9 @@ object ConsumerPerformance {
       } catch {
         case _: InterruptedException =>
         case _: ClosedByInterruptException =>
-        case _: ConsumerTimeoutException =>
+        case _: ConsumerTimeoutException => {
+          consumerTimeout.set(true);
+        }
         case e: Throwable => e.printStackTrace()
       }
       totalMessagesRead.addAndGet(messagesRead)