You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Steve Arch (Commented) (JIRA)" <ji...@apache.org> on 2012/02/27 17:48:48 UTC

[jira] [Commented] (KAFKA-279) kafka-console-producer does not take in customized values of --batch-size or --timeout

    [ https://issues.apache.org/jira/browse/KAFKA-279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13217272#comment-13217272 ] 

Steve Arch commented on KAFKA-279:
----------------------------------

I tried the patch and issuing the same command get the following stack trace part-way through an import of a 100000 line file (--batchsize 300):
[2012-02-27 16:44:37,911] ERROR Event queue is full of unsent messages, could not send event: 7300043|103|60|1329080400|en|1987973|269118099490000000000000103153898086 (kafka.producer.async.AsyncProducer)
Exception in thread "main" kafka.producer.async.QueueFullException: Event queue is full of unsent messages, could not send event: 7300043|103|60|1329080400|en|1987973|269118099490000000000000103153898086
        at kafka.producer.async.AsyncProducer.send(AsyncProducer.scala:121)
        at kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$apply$mcVI$sp$1$$anonfun$apply$2.apply(ProducerPool.scala:131)
        at kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$apply$mcVI$sp$1$$anonfun$apply$2.apply(ProducerPool.scala:131)
        at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
        at scala.collection.immutable.List.foreach(List.scala:45)
        at kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$apply$mcVI$sp$1.apply(ProducerPool.scala:131)
        at kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$apply$mcVI$sp$1.apply(ProducerPool.scala:130)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
        at kafka.producer.ProducerPool$$anonfun$send$1.apply$mcVI$sp(ProducerPool.scala:130)
        at kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)
        at kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
        at kafka.producer.ProducerPool.send(ProducerPool.scala:102)
        at kafka.producer.Producer.zkSend(Producer.scala:143)
        at kafka.producer.Producer.send(Producer.scala:105)
        at kafka.producer.ConsoleProducer$.main(ConsoleProducer.scala:120)
        at kafka.producer.ConsoleProducer.main(ConsoleProducer.scala)

                
> kafka-console-producer does not take in customized values of --batch-size or --timeout
> --------------------------------------------------------------------------------------
>
>                 Key: KAFKA-279
>                 URL: https://issues.apache.org/jira/browse/KAFKA-279
>             Project: Kafka
>          Issue Type: Bug
>          Components: contrib
>    Affects Versions: 0.7
>         Environment: Ubuntu 10.04, openjdk1.6 with default installation of 0.7
>            Reporter: milind parikh
>            Priority: Minor
>         Attachments: kafka-279.patch
>
>
> 1. While the default console-producer, console-consumer paradigm works great, when I try modiying the batch size
> bin/kafka-console-producer.sh --batch-size 300   --zookeeper localhost:2181 --topic test1
> it gives me a
> Exception in thread "main" java.lang.NumberFormatException: null
>     at java.lang.Integer.parseInt(Integer.java:443)
>     at java.lang.Integer.parseInt(Integer.java:514)
>     at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:207)
>     at scala.collection.immutable.StringOps.toInt(StringOps.scala:31)
>     at kafka.utils.Utils$.getIntInRange(Utils.scala:189)
>     at kafka.utils.Utils$.getInt(Utils.scala:174)
>     at kafka.producer.async.AsyncProducerConfigShared$class.$init$(AsyncProducerConfig.scala:45)
>     at kafka.producer.ProducerConfig.<init>(ProducerConfig.scala:25)
>     at kafka.producer.ConsoleProducer$.main(ConsoleProducer.scala:108)
>     at kafka.producer.ConsoleProducer.main(ConsoleProducer.scala)
> I have looked at the code and can't figure out what's wrong
> 2. When I do bin/kafka-console-producer.sh --timeout 30000   --zookeeper localhost:2181 --topic test1
> I would think that console-producer would wait for 30s if the batch size (default 200) is not full. It doesn't. It takes the same time without the timeout parameter (default 1000) and dumps whatever the batch size.
> Resolution from Jun
> 1. The code does the following to set batch size
>      props.put("batch.size", batchSize)
> Instead, it should do
>      props.put("batch.size", batchSize.toString)
> 2. It sets the wrong property name for timeout. Instead of doing
>    props.put("queue.enqueueTimeout.ms", sendTimeout.toString)
> it should do
>    props.put("queue.time", sendTimeout.toString)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira