You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/10/14 22:31:14 UTC
kafka git commit: KAFKA-2603: Add timeout arg to ConsoleConsumer for
new consumer
Repository: kafka
Updated Branches:
refs/heads/trunk 27c099b04 -> f13d11559
KAFKA-2603: Add timeout arg to ConsoleConsumer for new consumer
Added --timeout-ms argument to ConsoleConsumer that works with both old and new consumer. Also modified ducktape ConsoleConsumer service to use this arg instead of consumer.timeout.ms config that works only with the old consumer.
Author: Rajini Sivaram <ra...@googlemail.com>
Reviewers: Aditya Auradkar, Ismael Juma, Guozhang Wang
Closes #274 from rajinisivaram/KAFKA-2603
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f13d1155
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f13d1155
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f13d1155
Branch: refs/heads/trunk
Commit: f13d1155960a1ea907b09754877ee74586efd041
Parents: 27c099b
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Wed Oct 14 13:35:49 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Oct 14 13:35:49 2015 -0700
----------------------------------------------------------------------
.../main/scala/kafka/consumer/BaseConsumer.scala | 9 ++++++---
.../main/scala/kafka/tools/ConsoleConsumer.scala | 16 ++++++++++++----
tests/kafkatest/services/console_consumer.py | 2 ++
.../services/templates/console_consumer.properties | 4 ----
4 files changed, 20 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/f13d1155/core/src/main/scala/kafka/consumer/BaseConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/BaseConsumer.scala b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
index 4e956bb..8b93493 100644
--- a/core/src/main/scala/kafka/consumer/BaseConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
@@ -32,7 +32,7 @@ trait BaseConsumer {
case class BaseConsumerRecord(topic: String, partition: Int, offset: Long, key: Array[Byte], value: Array[Byte])
-class NewShinyConsumer(topic: String, consumerProps: Properties) extends BaseConsumer {
+class NewShinyConsumer(topic: String, consumerProps: Properties, val timeoutMs: Long = Long.MaxValue) extends BaseConsumer {
import org.apache.kafka.clients.consumer.KafkaConsumer
import scala.collection.JavaConversions._
@@ -41,8 +41,11 @@ class NewShinyConsumer(topic: String, consumerProps: Properties) extends BaseCon
var recordIter = consumer.poll(0).iterator
override def receive(): BaseConsumerRecord = {
- while (!recordIter.hasNext)
- recordIter = consumer.poll(Long.MaxValue).iterator
+ if (!recordIter.hasNext) {
+ recordIter = consumer.poll(timeoutMs).iterator
+ if (!recordIter.hasNext)
+ throw new ConsumerTimeoutException
+ }
val record = recordIter.next
BaseConsumerRecord(record.topic, record.partition, record.offset, record.key, record.value)
http://git-wip-us.apache.org/repos/asf/kafka/blob/f13d1155/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index de4900d..639360c 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -51,7 +51,8 @@ object ConsoleConsumer extends Logging {
val consumer =
if (conf.useNewConsumer) {
- new NewShinyConsumer(conf.topicArg, getNewConsumerProps(conf))
+ val timeoutMs = if (conf.timeoutMs >= 0) conf.timeoutMs else Long.MaxValue
+ new NewShinyConsumer(conf.topicArg, getNewConsumerProps(conf), timeoutMs)
} else {
checkZk(conf)
new OldConsumer(conf.filterSpec, getOldConsumerProps(conf))
@@ -100,8 +101,8 @@ object ConsoleConsumer extends Logging {
consumer.receive()
} catch {
case e: Throwable => {
- error("Error processing message, stopping consumer: ", e)
- consumer.stop()
+ error("Error processing message, terminating consumer process: ", e)
+ // Consumer will be closed
return
}
}
@@ -112,7 +113,7 @@ object ConsoleConsumer extends Logging {
if (skipMessageOnError) {
error("Error processing message, skipping this message: ", e)
} else {
- consumer.stop()
+ // Consumer will be closed
throw e
}
}
@@ -149,6 +150,8 @@ object ConsoleConsumer extends Logging {
if (config.options.has(config.deleteConsumerOffsetsOpt))
ZkUtils.maybeDeletePath(config.options.valueOf(config.zkConnectOpt), "/consumers/" + config.consumerProps.getProperty("group.id"))
+ if (config.timeoutMs >= 0)
+ props.put("consumer.timeout.ms", config.timeoutMs.toString)
props
}
@@ -204,6 +207,10 @@ object ConsoleConsumer extends Logging {
.withRequiredArg
.describedAs("num_messages")
.ofType(classOf[java.lang.Integer])
+ val timeoutMsOpt = parser.accepts("timeout-ms", "If specified, exit if no message is available for consumption for the specified interval.")
+ .withRequiredArg
+ .describedAs("timeout_ms")
+ .ofType(classOf[java.lang.Integer])
val skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error when processing a message, " +
"skip it instead of halt.")
val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled")
@@ -246,6 +253,7 @@ object ConsoleConsumer extends Logging {
val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt))
val formatterArgs = CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt))
val maxMessages = if (options.has(maxMessagesOpt)) options.valueOf(maxMessagesOpt).intValue else -1
+ val timeoutMs = if (options.has(timeoutMsOpt)) options.valueOf(timeoutMsOpt).intValue else -1
val bootstrapServer = options.valueOf(bootstrapServerOpt)
val keyDeserializer = options.valueOf(keyDeserializerOpt)
val valueDeserializer = options.valueOf(valueDeserializerOpt)
http://git-wip-us.apache.org/repos/asf/kafka/blob/f13d1155/tests/kafkatest/services/console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
index 9b216fe..96fe777 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -161,6 +161,8 @@ class ConsoleConsumer(JmxMixin, PerformanceService):
cmd += " --zookeeper %(zk_connect)s" % args
if self.from_beginning:
cmd += " --from-beginning"
+ if self.consumer_timeout_ms is not None:
+ cmd += " --timeout-ms %s" % self.consumer_timeout_ms
cmd += " 2>> %(stderr)s | tee -a %(stdout)s &" % args
return cmd
http://git-wip-us.apache.org/repos/asf/kafka/blob/f13d1155/tests/kafkatest/services/templates/console_consumer.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/templates/console_consumer.properties b/tests/kafkatest/services/templates/console_consumer.properties
index bab4932..c733f47 100644
--- a/tests/kafkatest/services/templates/console_consumer.properties
+++ b/tests/kafkatest/services/templates/console_consumer.properties
@@ -14,10 +14,6 @@
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults
-{% if consumer_timeout_ms is defined and consumer_timeout_ms is not none %}
-consumer.timeout.ms={{ consumer_timeout_ms }}
-{% endif %}
-
group.id={{ group_id|default('test-consumer-group') }}
{% if client_id is defined and client_id is not none %}