You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "jian fan (JIRA)" <ji...@apache.org> on 2012/07/22 05:19:33 UTC

[jira] [Created] (KAFKA-411) Message Error in high cocurrent environment

jian fan created KAFKA-411:
------------------------------

             Summary: Message Error in high cocurrent environment
                 Key: KAFKA-411
                 URL: https://issues.apache.org/jira/browse/KAFKA-411
             Project: Kafka
          Issue Type: Bug
          Components: core
    Affects Versions: 0.7
            Reporter: jian fan
            Priority: Blocker


In high cocurrent environment,  these errors always appera in kafka broker:

ERROR Error processing MultiProducerRequest on bxx:2 (kafka.server.KafkaRequestHandlers)
kafka.message.InvalidMessageException: message is invalid, compression codec: NoCompressionCodec size: 1030 curr offset: 1034 init offset: 0
	at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
	at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
	at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
	at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
	at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
	at scala.collection.Iterator$class.foreach(Iterator.scala:631)
	at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
	at kafka.message.MessageSet.foreach(MessageSet.scala:87)
	at kafka.log.Log.append(Log.scala:205)
	at kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
	at kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
	at kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
	at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
	at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
	at kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
	at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
	at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
	at kafka.network.Processor.handle(SocketServer.scala:296)
	at kafka.network.Processor.read(SocketServer.scala:319)
	at kafka.network.Processor.run(SocketServer.scala:214)
	at java.lang.Thread.run(Thread.java:722)


 ERROR Closing socket for /192.168.75.15 because of error (kafka.network.Processor)
kafka.common.InvalidTopicException: topic name can't be empty
	at kafka.log.LogManager.getLogPool(LogManager.scala:159)
	at kafka.log.LogManager.getOrCreateLog(LogManager.scala:195)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (KAFKA-411) Message Error in high cocurrent environment

Posted by "jian fan (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

jian fan updated KAFKA-411:
---------------------------

    Fix Version/s: 0.7.2
                   0.8
           Labels: InvalidTopic  (was: )
           Status: Patch Available  (was: Open)
    
> Message Error in high cocurrent environment
> -------------------------------------------
>
>                 Key: KAFKA-411
>                 URL: https://issues.apache.org/jira/browse/KAFKA-411
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7
>            Reporter: jian fan
>            Priority: Blocker
>              Labels: InvalidTopic
>             Fix For: 0.8, 0.7.2
>
>
> In high cocurrent environment,  these errors always appera in kafka broker:
> ERROR Error processing MultiProducerRequest on bxx:2 (kafka.server.KafkaRequestHandlers)
> kafka.message.InvalidMessageException: message is invalid, compression codec: NoCompressionCodec size: 1030 curr offset: 1034 init offset: 0
> 	at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
> 	at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
> 	at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
> 	at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> 	at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> 	at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
> 	at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
> 	at kafka.message.MessageSet.foreach(MessageSet.scala:87)
> 	at kafka.log.Log.append(Log.scala:205)
> 	at kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
> 	at kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> 	at kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> 	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> 	at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> 	at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> 	at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> 	at kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
> 	at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> 	at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> 	at kafka.network.Processor.handle(SocketServer.scala:296)
> 	at kafka.network.Processor.read(SocketServer.scala:319)
> 	at kafka.network.Processor.run(SocketServer.scala:214)
> 	at java.lang.Thread.run(Thread.java:722)
>  ERROR Closing socket for /192.168.75.15 because of error (kafka.network.Processor)
> kafka.common.InvalidTopicException: topic name can't be empty
> 	at kafka.log.LogManager.getLogPool(LogManager.scala:159)
> 	at kafka.log.LogManager.getOrCreateLog(LogManager.scala:195)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-411) Message Error in high cocurrent environment

Posted by "jian fan (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13424769#comment-13424769 ] 

jian fan commented on KAFKA-411:
--------------------------------

In high cocurrent environment, the tcp server will drop some package when the tcp buffer is over. So there are some chances that "topic" contains one or more characters that encode to bytes that include NULL (0).
                
> Message Error in high cocurrent environment
> -------------------------------------------
>
>                 Key: KAFKA-411
>                 URL: https://issues.apache.org/jira/browse/KAFKA-411
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7
>            Reporter: jian fan
>            Priority: Blocker
>              Labels: InvalidTopic
>             Fix For: 0.8, 0.7.2
>
>         Attachments: InvalidTopic.patch
>
>
> In high cocurrent environment,  these errors always appera in kafka broker:
> ERROR Error processing MultiProducerRequest on bxx:2 (kafka.server.KafkaRequestHandlers)
> kafka.message.InvalidMessageException: message is invalid, compression codec: NoCompressionCodec size: 1030 curr offset: 1034 init offset: 0
> 	at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
> 	at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
> 	at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
> 	at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> 	at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> 	at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
> 	at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
> 	at kafka.message.MessageSet.foreach(MessageSet.scala:87)
> 	at kafka.log.Log.append(Log.scala:205)
> 	at kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
> 	at kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> 	at kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> 	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> 	at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> 	at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> 	at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> 	at kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
> 	at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> 	at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> 	at kafka.network.Processor.handle(SocketServer.scala:296)
> 	at kafka.network.Processor.read(SocketServer.scala:319)
> 	at kafka.network.Processor.run(SocketServer.scala:214)
> 	at java.lang.Thread.run(Thread.java:722)
>  ERROR Closing socket for /192.168.75.15 because of error (kafka.network.Processor)
> kafka.common.InvalidTopicException: topic name can't be empty
> 	at kafka.log.LogManager.getLogPool(LogManager.scala:159)
> 	at kafka.log.LogManager.getOrCreateLog(LogManager.scala:195)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (KAFKA-411) Message Error in high cocurrent environment

Posted by "jian fan (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

jian fan updated KAFKA-411:
---------------------------

    Attachment:     (was: InvalidTopic.patch)
    
> Message Error in high cocurrent environment
> -------------------------------------------
>
>                 Key: KAFKA-411
>                 URL: https://issues.apache.org/jira/browse/KAFKA-411
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7
>            Reporter: jian fan
>            Priority: Blocker
>              Labels: InvalidTopic
>             Fix For: 0.8, 0.7.2
>
>         Attachments: kafka-411.patch
>
>
> In high cocurrent environment,  these errors always appera in kafka broker:
> ERROR Error processing MultiProducerRequest on bxx:2 (kafka.server.KafkaRequestHandlers)
> kafka.message.InvalidMessageException: message is invalid, compression codec: NoCompressionCodec size: 1030 curr offset: 1034 init offset: 0
> 	at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
> 	at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
> 	at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
> 	at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> 	at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> 	at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
> 	at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
> 	at kafka.message.MessageSet.foreach(MessageSet.scala:87)
> 	at kafka.log.Log.append(Log.scala:205)
> 	at kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
> 	at kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> 	at kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> 	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> 	at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> 	at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> 	at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> 	at kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
> 	at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> 	at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> 	at kafka.network.Processor.handle(SocketServer.scala:296)
> 	at kafka.network.Processor.read(SocketServer.scala:319)
> 	at kafka.network.Processor.run(SocketServer.scala:214)
> 	at java.lang.Thread.run(Thread.java:722)
>  ERROR Closing socket for /192.168.75.15 because of error (kafka.network.Processor)
> kafka.common.InvalidTopicException: topic name can't be empty
> 	at kafka.log.LogManager.getLogPool(LogManager.scala:159)
> 	at kafka.log.LogManager.getOrCreateLog(LogManager.scala:195)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Comment Edited] (KAFKA-411) Message Error in high cocurrent environment

Posted by "jian fan (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13423748#comment-13423748 ] 

jian fan edited comment on KAFKA-411 at 7/27/12 9:15 AM:
---------------------------------------------------------

In high cocurrent environment, the tcp server will drop some package when the tcp buffer is over. Then LogManager.createlog will create some no-exists topic log. But one thing is very strange, the log directory should be like a-0,a-1, a-2 and so on ,but file.mkdir() also create log directory like a. Seems some bug in   file.mkdir()  of  LogManager.createlog.
                
      was (Author: jianfan):
    In high cocurrent environment, the tcp server will drop some package when the tcp buffer is over. Then LogManager.createlog will create some no-exists topic log. But one thing is very strange, the log director should be like a-0,a-1, a-2 and so on ,but file.mkdir() also create log directory like a. Seems some bug in   file.mkdir()  of  LogManager.createlog.
                  
> Message Error in high cocurrent environment
> -------------------------------------------
>
>                 Key: KAFKA-411
>                 URL: https://issues.apache.org/jira/browse/KAFKA-411
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7
>            Reporter: jian fan
>            Priority: Blocker
>
> In high cocurrent environment,  these errors always appera in kafka broker:
> ERROR Error processing MultiProducerRequest on bxx:2 (kafka.server.KafkaRequestHandlers)
> kafka.message.InvalidMessageException: message is invalid, compression codec: NoCompressionCodec size: 1030 curr offset: 1034 init offset: 0
> 	at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
> 	at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
> 	at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
> 	at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> 	at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> 	at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
> 	at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
> 	at kafka.message.MessageSet.foreach(MessageSet.scala:87)
> 	at kafka.log.Log.append(Log.scala:205)
> 	at kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
> 	at kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> 	at kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> 	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> 	at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> 	at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> 	at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> 	at kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
> 	at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> 	at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> 	at kafka.network.Processor.handle(SocketServer.scala:296)
> 	at kafka.network.Processor.read(SocketServer.scala:319)
> 	at kafka.network.Processor.run(SocketServer.scala:214)
> 	at java.lang.Thread.run(Thread.java:722)
>  ERROR Closing socket for /192.168.75.15 because of error (kafka.network.Processor)
> kafka.common.InvalidTopicException: topic name can't be empty
> 	at kafka.log.LogManager.getLogPool(LogManager.scala:159)
> 	at kafka.log.LogManager.getOrCreateLog(LogManager.scala:195)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Comment Edited] (KAFKA-411) Message Error in high cocurrent environment

Posted by "jian fan (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13423748#comment-13423748 ] 

jian fan edited comment on KAFKA-411 at 7/27/12 8:42 AM:
---------------------------------------------------------

In high cocurrent environment, the tcp server will drop some package when the tcp buffer is over. Then LogManager.createlog will create some no-exists topic log. But one thing is very strange, the log director should be like a-0,a-1, a-2 and so on ,but file.mkdir() also create log directory like a. Seems some bug in   file.mkdir()  of  LogManager.createlog.
                
      was (Author: jianfan):
    In high cocurrent environment, the tcp in server will drop some package when the tcp buffer is over. Then LogManager.createlog will create some no-exists topic log. But one thing is very strange, the log director should be like a-0,a-1, a-2 and so on ,but file.mkdir() also create log directory like a. Seems some bug in   file.mkdir()  of  LogManager.createlog.
                  
> Message Error in high cocurrent environment
> -------------------------------------------
>
>                 Key: KAFKA-411
>                 URL: https://issues.apache.org/jira/browse/KAFKA-411
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7
>            Reporter: jian fan
>            Priority: Blocker
>
> In high cocurrent environment,  these errors always appera in kafka broker:
> ERROR Error processing MultiProducerRequest on bxx:2 (kafka.server.KafkaRequestHandlers)
> kafka.message.InvalidMessageException: message is invalid, compression codec: NoCompressionCodec size: 1030 curr offset: 1034 init offset: 0
> 	at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
> 	at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
> 	at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
> 	at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> 	at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> 	at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
> 	at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
> 	at kafka.message.MessageSet.foreach(MessageSet.scala:87)
> 	at kafka.log.Log.append(Log.scala:205)
> 	at kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
> 	at kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> 	at kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> 	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> 	at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> 	at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> 	at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> 	at kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
> 	at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> 	at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> 	at kafka.network.Processor.handle(SocketServer.scala:296)
> 	at kafka.network.Processor.read(SocketServer.scala:319)
> 	at kafka.network.Processor.run(SocketServer.scala:214)
> 	at java.lang.Thread.run(Thread.java:722)
>  ERROR Closing socket for /192.168.75.15 because of error (kafka.network.Processor)
> kafka.common.InvalidTopicException: topic name can't be empty
> 	at kafka.log.LogManager.getLogPool(LogManager.scala:159)
> 	at kafka.log.LogManager.getOrCreateLog(LogManager.scala:195)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-411) Message Error in high cocurrent environment

Posted by "jian fan (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13433014#comment-13433014 ] 

jian fan commented on KAFKA-411:
--------------------------------

  I have locate the problem. It was cause by cisio router. In high load scenario, our cisio router(2960s) will drop some packages by its low ability. So socket.recv() should be fine,  we just need to solve the log directory corrupted by topic name with null byte in this scenario.
                
> Message Error in high cocurrent environment
> -------------------------------------------
>
>                 Key: KAFKA-411
>                 URL: https://issues.apache.org/jira/browse/KAFKA-411
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7
>            Reporter: jian fan
>            Priority: Blocker
>              Labels: InvalidTopic
>             Fix For: 0.8, 0.7.2
>
>         Attachments: kafka-411.patch
>
>
> In high cocurrent environment,  these errors always appera in kafka broker:
> ERROR Error processing MultiProducerRequest on bxx:2 (kafka.server.KafkaRequestHandlers)
> kafka.message.InvalidMessageException: message is invalid, compression codec: NoCompressionCodec size: 1030 curr offset: 1034 init offset: 0
> 	at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
> 	at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
> 	at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
> 	at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> 	at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> 	at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
> 	at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
> 	at kafka.message.MessageSet.foreach(MessageSet.scala:87)
> 	at kafka.log.Log.append(Log.scala:205)
> 	at kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
> 	at kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> 	at kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> 	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> 	at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> 	at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> 	at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> 	at kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
> 	at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> 	at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> 	at kafka.network.Processor.handle(SocketServer.scala:296)
> 	at kafka.network.Processor.read(SocketServer.scala:319)
> 	at kafka.network.Processor.run(SocketServer.scala:214)
> 	at java.lang.Thread.run(Thread.java:722)
>  ERROR Closing socket for /192.168.75.15 because of error (kafka.network.Processor)
> kafka.common.InvalidTopicException: topic name can't be empty
> 	at kafka.log.LogManager.getLogPool(LogManager.scala:159)
> 	at kafka.log.LogManager.getOrCreateLog(LogManager.scala:195)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-411) Message Error in high cocurrent environment

Posted by "jian fan (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13423759#comment-13423759 ] 

jian fan commented on KAFKA-411:
--------------------------------

the error is 

[2012-07-27 17:08:00,559] INFO create directory /data/kafka/axx-0 (kafka.log.LogManager)
[2012-07-27 17:08:00,561] ERROR Error processing MultiProducerRequest on axx:0 (kafka.server.KafkaRequestHandlers)
java.io.FileNotFoundException: /data/kafka/axx-0/00000000000000000000.kafka (Is a directory)
	at java.io.RandomAccessFile.open(Native Method)
	at java.io.RandomAccessFile.<init>(RandomAccessFile.java:233)
	at kafka.utils.Utils$.openChannel(Utils.scala:324)
	at kafka.message.FileMessageSet.<init>(FileMessageSet.scala:75)
	at kafka.log.Log.loadSegments(Log.scala:144)
	at kafka.log.Log.<init>(Log.scala:116)
	at kafka.log.LogManager.createLog(LogManager.scala:159)
	at kafka.log.LogManager.getOrCreateLog(LogManager.scala:214)
	at kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:74)
	at kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
	at kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
	at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
	at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
	at kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
	at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
	at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
	at kafka.network.Processor.handle(SocketServer.scala:296)
	at kafka.network.Processor.read(SocketServer.scala:319)
	at kafka.network.Processor.run(SocketServer.scala:214)
	at java.lang.Thread.run(Thread.java:679)
                
> Message Error in high cocurrent environment
> -------------------------------------------
>
>                 Key: KAFKA-411
>                 URL: https://issues.apache.org/jira/browse/KAFKA-411
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7
>            Reporter: jian fan
>            Priority: Blocker
>
> In high cocurrent environment,  these errors always appera in kafka broker:
> ERROR Error processing MultiProducerRequest on bxx:2 (kafka.server.KafkaRequestHandlers)
> kafka.message.InvalidMessageException: message is invalid, compression codec: NoCompressionCodec size: 1030 curr offset: 1034 init offset: 0
> 	at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
> 	at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
> 	at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
> 	at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> 	at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> 	at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
> 	at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
> 	at kafka.message.MessageSet.foreach(MessageSet.scala:87)
> 	at kafka.log.Log.append(Log.scala:205)
> 	at kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
> 	at kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> 	at kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> 	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> 	at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> 	at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> 	at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> 	at kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
> 	at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> 	at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> 	at kafka.network.Processor.handle(SocketServer.scala:296)
> 	at kafka.network.Processor.read(SocketServer.scala:319)
> 	at kafka.network.Processor.run(SocketServer.scala:214)
> 	at java.lang.Thread.run(Thread.java:722)
>  ERROR Closing socket for /192.168.75.15 because of error (kafka.network.Processor)
> kafka.common.InvalidTopicException: topic name can't be empty
> 	at kafka.log.LogManager.getLogPool(LogManager.scala:159)
> 	at kafka.log.LogManager.getOrCreateLog(LogManager.scala:195)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-411) Message Error in high cocurrent environment

Posted by "jian fan (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13423748#comment-13423748 ] 

jian fan commented on KAFKA-411:
--------------------------------

In high cocurrent environment, the tcp in server will drop some package when the tcp buffer is over. Then LogManager.createlog will create some no-exists topic log. But one thing is very strange, the log director should be like a-0,a-1, a-2 and so on ,but file.mkdir() also create log directory like a. Seems some bug in   file.mkdir()  of  LogManager.createlog.
                
> Message Error in high cocurrent environment
> -------------------------------------------
>
>                 Key: KAFKA-411
>                 URL: https://issues.apache.org/jira/browse/KAFKA-411
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7
>            Reporter: jian fan
>            Priority: Blocker
>
> In high cocurrent environment,  these errors always appera in kafka broker:
> ERROR Error processing MultiProducerRequest on bxx:2 (kafka.server.KafkaRequestHandlers)
> kafka.message.InvalidMessageException: message is invalid, compression codec: NoCompressionCodec size: 1030 curr offset: 1034 init offset: 0
> 	at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
> 	at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
> 	at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
> 	at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> 	at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> 	at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
> 	at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
> 	at kafka.message.MessageSet.foreach(MessageSet.scala:87)
> 	at kafka.log.Log.append(Log.scala:205)
> 	at kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
> 	at kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> 	at kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> 	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> 	at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> 	at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> 	at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> 	at kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
> 	at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> 	at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> 	at kafka.network.Processor.handle(SocketServer.scala:296)
> 	at kafka.network.Processor.read(SocketServer.scala:319)
> 	at kafka.network.Processor.run(SocketServer.scala:214)
> 	at java.lang.Thread.run(Thread.java:722)
>  ERROR Closing socket for /192.168.75.15 because of error (kafka.network.Processor)
> kafka.common.InvalidTopicException: topic name can't be empty
> 	at kafka.log.LogManager.getLogPool(LogManager.scala:159)
> 	at kafka.log.LogManager.getOrCreateLog(LogManager.scala:195)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-411) Message Error in high cocurrent environment

Posted by "Jun Rao (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13424952#comment-13424952 ] 

Jun Rao commented on KAFKA-411:
-------------------------------

Thanks for the patch. It may not be the right fix though since it fixes the symptom, but not the cause. For each produce request, the broker does the following: (1) read all bytes of the request into a BoundedByteBufferReceive (SocketServer.read); (2) after all bytes of the request are ready, deserialize the bytes into a ProducerRequest (KafkaRequestHandler.handleProducerRequest); (3) finally, serve the request by adding topic data to logs.

What you observed is that in step 3, a topic name is corrupted somehow. However, this means that the corresponding ProducerRequest is corrupted. Assuming there is no corruption at the network layer (very unlikely), the corruption much have happened in step 1 or step 2. So, instead of patching a corrupted topic name, we should understand why a ProducerRequest can be corrupted and fix the cause. BTW, what's caused the corrupted topic could be causing the corrupted messages too.

                
> Message Error in high cocurrent environment
> -------------------------------------------
>
>                 Key: KAFKA-411
>                 URL: https://issues.apache.org/jira/browse/KAFKA-411
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7
>            Reporter: jian fan
>            Priority: Blocker
>              Labels: InvalidTopic
>             Fix For: 0.8, 0.7.2
>
>         Attachments: InvalidTopic.patch
>
>
> In high cocurrent environment,  these errors always appera in kafka broker:
> ERROR Error processing MultiProducerRequest on bxx:2 (kafka.server.KafkaRequestHandlers)
> kafka.message.InvalidMessageException: message is invalid, compression codec: NoCompressionCodec size: 1030 curr offset: 1034 init offset: 0
> 	at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
> 	at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
> 	at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
> 	at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> 	at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> 	at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
> 	at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
> 	at kafka.message.MessageSet.foreach(MessageSet.scala:87)
> 	at kafka.log.Log.append(Log.scala:205)
> 	at kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
> 	at kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> 	at kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> 	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> 	at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> 	at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> 	at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> 	at kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
> 	at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> 	at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> 	at kafka.network.Processor.handle(SocketServer.scala:296)
> 	at kafka.network.Processor.read(SocketServer.scala:319)
> 	at kafka.network.Processor.run(SocketServer.scala:214)
> 	at java.lang.Thread.run(Thread.java:722)
>  ERROR Closing socket for /192.168.75.15 because of error (kafka.network.Processor)
> kafka.common.InvalidTopicException: topic name can't be empty
> 	at kafka.log.LogManager.getLogPool(LogManager.scala:159)
> 	at kafka.log.LogManager.getOrCreateLog(LogManager.scala:195)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (KAFKA-411) Message Error in high cocurrent environment

Posted by "jian fan (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

jian fan updated KAFKA-411:
---------------------------

    Attachment: InvalidTopic.patch

In high cocurrent environment, the tcp server will drop some package when the tcp buffer is over. So there are some chances that "topic" contains one or more characters that encode to bytes that include NULL (0)
                
> Message Error in high cocurrent environment
> -------------------------------------------
>
>                 Key: KAFKA-411
>                 URL: https://issues.apache.org/jira/browse/KAFKA-411
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7
>            Reporter: jian fan
>            Priority: Blocker
>              Labels: InvalidTopic
>             Fix For: 0.8, 0.7.2
>
>         Attachments: InvalidTopic.patch
>
>
> In high cocurrent environment,  these errors always appera in kafka broker:
> ERROR Error processing MultiProducerRequest on bxx:2 (kafka.server.KafkaRequestHandlers)
> kafka.message.InvalidMessageException: message is invalid, compression codec: NoCompressionCodec size: 1030 curr offset: 1034 init offset: 0
> 	at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
> 	at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
> 	at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
> 	at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> 	at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> 	at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
> 	at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
> 	at kafka.message.MessageSet.foreach(MessageSet.scala:87)
> 	at kafka.log.Log.append(Log.scala:205)
> 	at kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
> 	at kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> 	at kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> 	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> 	at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> 	at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> 	at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> 	at kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
> 	at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> 	at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> 	at kafka.network.Processor.handle(SocketServer.scala:296)
> 	at kafka.network.Processor.read(SocketServer.scala:319)
> 	at kafka.network.Processor.run(SocketServer.scala:214)
> 	at java.lang.Thread.run(Thread.java:722)
>  ERROR Closing socket for /192.168.75.15 because of error (kafka.network.Processor)
> kafka.common.InvalidTopicException: topic name can't be empty
> 	at kafka.log.LogManager.getLogPool(LogManager.scala:159)
> 	at kafka.log.LogManager.getOrCreateLog(LogManager.scala:195)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (KAFKA-411) Message Error in high cocurrent environment

Posted by "jian fan (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

jian fan updated KAFKA-411:
---------------------------

    Attachment: kafka-411.patch
    
> Message Error in high cocurrent environment
> -------------------------------------------
>
>                 Key: KAFKA-411
>                 URL: https://issues.apache.org/jira/browse/KAFKA-411
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7
>            Reporter: jian fan
>            Priority: Blocker
>              Labels: InvalidTopic
>             Fix For: 0.8, 0.7.2
>
>         Attachments: kafka-411.patch
>
>
> In high cocurrent environment,  these errors always appera in kafka broker:
> ERROR Error processing MultiProducerRequest on bxx:2 (kafka.server.KafkaRequestHandlers)
> kafka.message.InvalidMessageException: message is invalid, compression codec: NoCompressionCodec size: 1030 curr offset: 1034 init offset: 0
> 	at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
> 	at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
> 	at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
> 	at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> 	at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> 	at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
> 	at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
> 	at kafka.message.MessageSet.foreach(MessageSet.scala:87)
> 	at kafka.log.Log.append(Log.scala:205)
> 	at kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
> 	at kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> 	at kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> 	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> 	at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> 	at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> 	at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> 	at kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
> 	at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> 	at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> 	at kafka.network.Processor.handle(SocketServer.scala:296)
> 	at kafka.network.Processor.read(SocketServer.scala:319)
> 	at kafka.network.Processor.run(SocketServer.scala:214)
> 	at java.lang.Thread.run(Thread.java:722)
>  ERROR Closing socket for /192.168.75.15 because of error (kafka.network.Processor)
> kafka.common.InvalidTopicException: topic name can't be empty
> 	at kafka.log.LogManager.getLogPool(LogManager.scala:159)
> 	at kafka.log.LogManager.getOrCreateLog(LogManager.scala:195)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira