You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/10/14 22:31:14 UTC

kafka git commit: KAFKA-2603: Add timeout arg to ConsoleConsumer for new consumer

Repository: kafka
Updated Branches:
  refs/heads/trunk 27c099b04 -> f13d11559


KAFKA-2603: Add timeout arg to ConsoleConsumer for new consumer

Added --timeout-ms argument to ConsoleConsumer that works with both old and new consumer. Also modified ducktape ConsoleConsumer service to use this arg instead of consumer.timeout.ms config that works only with the old consumer.

Author: Rajini Sivaram <ra...@googlemail.com>

Reviewers: Aditya Auradkar, Ismael Juma, Guozhang Wang

Closes #274 from rajinisivaram/KAFKA-2603


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f13d1155
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f13d1155
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f13d1155

Branch: refs/heads/trunk
Commit: f13d1155960a1ea907b09754877ee74586efd041
Parents: 27c099b
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Wed Oct 14 13:35:49 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Oct 14 13:35:49 2015 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/consumer/BaseConsumer.scala    |  9 ++++++---
 .../main/scala/kafka/tools/ConsoleConsumer.scala    | 16 ++++++++++++----
 tests/kafkatest/services/console_consumer.py        |  2 ++
 .../services/templates/console_consumer.properties  |  4 ----
 4 files changed, 20 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f13d1155/core/src/main/scala/kafka/consumer/BaseConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/BaseConsumer.scala b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
index 4e956bb..8b93493 100644
--- a/core/src/main/scala/kafka/consumer/BaseConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
@@ -32,7 +32,7 @@ trait BaseConsumer {
 
 case class BaseConsumerRecord(topic: String, partition: Int, offset: Long, key: Array[Byte], value: Array[Byte])
 
-class NewShinyConsumer(topic: String, consumerProps: Properties) extends BaseConsumer {
+class NewShinyConsumer(topic: String, consumerProps: Properties, val timeoutMs: Long = Long.MaxValue) extends BaseConsumer {
   import org.apache.kafka.clients.consumer.KafkaConsumer
   import scala.collection.JavaConversions._
 
@@ -41,8 +41,11 @@ class NewShinyConsumer(topic: String, consumerProps: Properties) extends BaseCon
   var recordIter = consumer.poll(0).iterator
 
   override def receive(): BaseConsumerRecord = {
-    while (!recordIter.hasNext)
-      recordIter = consumer.poll(Long.MaxValue).iterator
+    if (!recordIter.hasNext) {
+      recordIter = consumer.poll(timeoutMs).iterator
+      if (!recordIter.hasNext)
+        throw new ConsumerTimeoutException
+    }
 
     val record = recordIter.next
     BaseConsumerRecord(record.topic, record.partition, record.offset, record.key, record.value)

http://git-wip-us.apache.org/repos/asf/kafka/blob/f13d1155/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index de4900d..639360c 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -51,7 +51,8 @@ object ConsoleConsumer extends Logging {
 
     val consumer =
       if (conf.useNewConsumer) {
-        new NewShinyConsumer(conf.topicArg, getNewConsumerProps(conf))
+        val timeoutMs = if (conf.timeoutMs >= 0) conf.timeoutMs else Long.MaxValue
+        new NewShinyConsumer(conf.topicArg, getNewConsumerProps(conf), timeoutMs)
       } else {
         checkZk(conf)
         new OldConsumer(conf.filterSpec, getOldConsumerProps(conf))
@@ -100,8 +101,8 @@ object ConsoleConsumer extends Logging {
         consumer.receive()
       } catch {
         case e: Throwable => {
-          error("Error processing message, stopping consumer: ", e)
-          consumer.stop()
+          error("Error processing message, terminating consumer process: ", e)
+          // Consumer will be closed
           return
         }
       }
@@ -112,7 +113,7 @@ object ConsoleConsumer extends Logging {
           if (skipMessageOnError) {
             error("Error processing message, skipping this message: ", e)
           } else {
-            consumer.stop()
+            // Consumer will be closed
             throw e
           }
       }
@@ -149,6 +150,8 @@ object ConsoleConsumer extends Logging {
 
     if (config.options.has(config.deleteConsumerOffsetsOpt))
       ZkUtils.maybeDeletePath(config.options.valueOf(config.zkConnectOpt), "/consumers/" + config.consumerProps.getProperty("group.id"))
+    if (config.timeoutMs >= 0)
+      props.put("consumer.timeout.ms", config.timeoutMs.toString)
 
     props
   }
@@ -204,6 +207,10 @@ object ConsoleConsumer extends Logging {
       .withRequiredArg
       .describedAs("num_messages")
       .ofType(classOf[java.lang.Integer])
+    val timeoutMsOpt = parser.accepts("timeout-ms", "If specified, exit if no message is available for consumption for the specified interval.")
+      .withRequiredArg
+      .describedAs("timeout_ms")
+      .ofType(classOf[java.lang.Integer])
     val skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error when processing a message, " +
       "skip it instead of halt.")
     val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled")
@@ -246,6 +253,7 @@ object ConsoleConsumer extends Logging {
     val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt))
     val formatterArgs = CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt))
     val maxMessages = if (options.has(maxMessagesOpt)) options.valueOf(maxMessagesOpt).intValue else -1
+    val timeoutMs = if (options.has(timeoutMsOpt)) options.valueOf(timeoutMsOpt).intValue else -1
     val bootstrapServer = options.valueOf(bootstrapServerOpt)
     val keyDeserializer = options.valueOf(keyDeserializerOpt)
     val valueDeserializer = options.valueOf(valueDeserializerOpt)

http://git-wip-us.apache.org/repos/asf/kafka/blob/f13d1155/tests/kafkatest/services/console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
index 9b216fe..96fe777 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -161,6 +161,8 @@ class ConsoleConsumer(JmxMixin, PerformanceService):
             cmd += " --zookeeper %(zk_connect)s" % args
         if self.from_beginning:
             cmd += " --from-beginning"
+        if self.consumer_timeout_ms is not None:
+            cmd += " --timeout-ms %s" % self.consumer_timeout_ms
 
         cmd += " 2>> %(stderr)s | tee -a %(stdout)s &" % args
         return cmd

http://git-wip-us.apache.org/repos/asf/kafka/blob/f13d1155/tests/kafkatest/services/templates/console_consumer.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/templates/console_consumer.properties b/tests/kafkatest/services/templates/console_consumer.properties
index bab4932..c733f47 100644
--- a/tests/kafkatest/services/templates/console_consumer.properties
+++ b/tests/kafkatest/services/templates/console_consumer.properties
@@ -14,10 +14,6 @@
 # limitations under the License.
 # see kafka.server.KafkaConfig for additional details and defaults
 
-{% if consumer_timeout_ms is defined and consumer_timeout_ms is not none %}
-consumer.timeout.ms={{ consumer_timeout_ms }}
-{% endif %}
-
 group.id={{ group_id|default('test-consumer-group') }}
 
 {% if client_id is defined and client_id is not none %}