You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by jjian fan <xi...@gmail.com> on 2012/07/12 09:55:58 UTC

error with kafka

HI:

Guys, I test kafka in our test high cocunnrent enivorment, I always get the
error message as follows:

ERROR Error processing MultiProducerRequest on axxxxxxxx:2
(kafka.server.KafkaRequestHandlers)
kafka.message.InvalidMessageException: message is invalid, compression
codec: NoCompressionCodec size: 1034 curr offset: 3114 init offset: 0

Can anyone help? Thanks!

Best Regards

Jian Fan

Re: error with kafka

Posted by jjian fan <xi...@gmail.com>.
The new error:

[2012-07-13 10:12:41,178] ERROR Error processing MultiProducerRequest on
oxxxx:0 (kafka.server.KafkaRequestHandlers)
kafka.message.InvalidMessageException: message is invalid, compression
codec: GZIPCompressionCodec size: 48 curr offset: 0 init offset: 0
at
kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
at
kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
at
kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
at scala.collection.Iterator$class.foreach(Iterator.scala:631)
at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
at kafka.message.MessageSet.foreach(MessageSet.scala:87)
at kafka.log.Log.append(Log.scala:205)
at
kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
at
kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
at
kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
at
kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
at
kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
at
kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
at kafka.network.Processor.handle(SocketServer.scala:296)
at kafka.network.Processor.read(SocketServer.scala:319)
at kafka.network.Processor.run(SocketServer.scala:214)
at java.lang.Thread.run(Thread.java:722)

2012/7/13 jjian fan <xi...@gmail.com>

> OK,sometime it has this error :
>
> [2012-07-13 10:08:03,205] ERROR Closing socket for /192.168.75.15 because
> of error (kafka.network.Processor)
> kafka.common.InvalidTopicException: topic name can't be empty
>  at kafka.log.LogManager.getLogPool(LogManager.scala:159)
> at kafka.log.LogManager.getOrCreateLog(LogManager.scala:195)
>  at
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
> at
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
>  at
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>  at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
>  at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
>  at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> at
> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
>  at
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> at
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
>  at kafka.network.Processor.handle(SocketServer.scala:296)
> at kafka.network.Processor.read(SocketServer.scala:319)
>  at kafka.network.Processor.run(SocketServer.scala:214)
> at java.lang.Thread.run(Thread.java:722)
>
> 2012/7/13 jjian fan <xi...@gmail.com>
>
>> sorry!
>>
>> The  producer.type  shoud be async!!
>>
>> I have change it to sync in my new test!
>>
>> Best Regards!
>> Jian Fan
>>
>>
>> 2012/7/13 jjian fan <xi...@gmail.com>
>>
>>> I post my code here:
>>>
>>> ProducerThread.java
>>> package com.tz.kafka;
>>>
>>>
>>> import java.io.Serializable;
>>> import java.util.Properties;
>>> import kafka.producer.ProducerConfig;
>>> import kafka.javaapi.producer.*;
>>> import java.util.*;
>>> import java.util.concurrent.CopyOnWriteArrayList;
>>>
>>> public class ProducerThread implements Runnable ,Serializable
>>>  {
>>>   /**
>>>  *
>>>  */
>>>  private static final long serialVersionUID = 18977854555656L;
>>> //private final kafka.javaapi.producer.Producer<Integer, String>
>>> producer;
>>>   private String topic;
>>>   private Properties props = new Properties();
>>>       private String messageStr;
>>>   public  ProducerThread(String kafkatopic,String message)
>>>   {
>>>     synchronized(this){
>>>     props.put("zk.connect", "192.168.75.45:2181,192.168.75.55:2181,
>>> 192.168.75.65:2181");
>>>  //props.put("broker.list", "4:192.168.75.104:9092");
>>> //props.put("serializer.class", "kafka.serializer.StringEncoder");
>>>  props.put("serializer.class", "kafka.serializer.StringEncoder");
>>> props.put("producer.type", "sync");
>>>  props.put("compression.codec", "1");
>>> props.put("batch.size", "5");
>>>  props.put("queue.enqueueTimeout.ms", "-1");
>>> props.put("queue.size", "2000");
>>>  props.put("buffer.size", "10240000");
>>> //props.put("event.handler", "kafka.producer.async.EventHandler<T>");
>>>  props.put("zk.sessiontimeout.ms", "6000000");
>>> props.put("zk.connectiontimeout.ms", "6000000");
>>>  props.put("socket.timeout.ms", "60000000");
>>> props.put("connect.timeout.ms", "60000000");
>>>  props.put("max.message.size", "20000");
>>> props.put("reconnect.interval", String.valueOf(Integer.MAX_VALUE));
>>>  props.put("reconnect.interval.ms", "3000");
>>>     // Use random partitioner. Don't need the key type. Just set it to
>>> Integer.
>>>     // The message is of type String.
>>> //producer = new kafka.javaapi.producer.Producer<Integer, String>(new
>>> ProducerConfig(props));
>>>     //producer = new kafka.javaapi.producer.Producer<String,
>>> String>(new ProducerConfig(props));
>>>     this.topic = kafkatopic;
>>>     this.messageStr = message;
>>>
>>>   }
>>>   }
>>>
>>>   public void run() {
>>> synchronized(this) {
>>>  Producer<String, String> producer  = new Producer<String, String>(new
>>> ProducerConfig(props));
>>>     //producer.
>>>  long messageNo = 0;
>>>     long t = System.currentTimeMillis();
>>>     long r = System.currentTimeMillis();
>>>     long time = r-t;
>>>     long rate = 0;
>>>     List<String> messageSet = new CopyOnWriteArrayList<String>();
>>>     while(true)
>>>     {
>>>       if(topic.length() > 0 )
>>>       {
>>>      messageSet.add(this.messageStr.toString());
>>>          ProducerData<String, String> data = new ProducerData<String,
>>> String>(topic,null,messageSet);
>>>
>>>          producer.send(data);
>>>          messageSet.clear();
>>>          data = null;
>>>          messageNo++;
>>>
>>>       }
>>>
>>>       if(messageNo % 200000 ==0)
>>>       {
>>>       r = System.currentTimeMillis();
>>>       time = r-t;
>>>       rate = 200000000/time;
>>>       System.out.println(this.topic + " send message per second:"+rate);
>>>       t = r;
>>>       }
>>>
>>>      }
>>> }
>>>   }
>>>     }
>>>
>>> ProducerThreadTest1.java
>>>
>>> package com.tz.kafka;
>>>
>>> import java.util.concurrent.ThreadPoolExecutor;
>>> import java.util.concurrent.TimeUnit;
>>> import java.util.concurrent.LinkedBlockingQueue;
>>>
>>> public class ProducerThreadTest1 {
>>>
>>> /**
>>>  * @param args
>>>  * @throws InterruptedException
>>>  */
>>>  public static void main(String[] args) throws InterruptedException {
>>> // TODO Auto-generated method stub
>>>  int i = Integer.parseInt(args[0]);
>>>  ThreadPoolExecutor threadPool = new ThreadPoolExecutor(i, i, 5,
>>>  TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(i),
>>> new ThreadPoolExecutor.DiscardOldestPolicy());
>>>  int messageSize = Integer.parseInt(args[1]);
>>>  StringBuffer messageStr = new StringBuffer();
>>>  for(int messagesize=0;messagesize<messageSize;messagesize++)
>>>      {
>>>      messageStr.append("X");
>>>      }
>>>  String topic = args[2];
>>> for(int j=0;j < i; j++)
>>> {
>>>    topic += "x";
>>>    threadPool.execute(new ProducerThread(topic,messageStr.toString()));
>>>    Thread.sleep(1000);
>>>
>>> }
>>>  }
>>>
>>> }
>>>
>>>
>>> the shell scripte kafkaThreadTest.sh like this:
>>>
>>> java -Xmx10G -jar kafkaThreadTest.jar 2 1024 a
>>>
>>> I deploy the shell at ten servers!
>>>
>>> Thanks!
>>> Best Regards!
>>>
>>> Jian Fan
>>>
>>> 2012/7/13 Jun Rao <ju...@gmail.com>
>>>
>>>> That seems like a Kafka bug. Do you have a script that can reproduce
>>>> this?
>>>>
>>>> Thanks,
>>>>
>>>> Jun
>>>>
>>>> On Thu, Jul 12, 2012 at 5:44 PM, jjian fan <xi...@gmail.com>
>>>> wrote:
>>>>
>>>> > HI:
>>>> > I use kafka0.7.1, here is the stack trace in kafka server:
>>>> >
>>>> >  ERROR Error processing MultiProducerRequest on bxx:2
>>>> > (kafka.server.KafkaRequestHandlers)
>>>> > kafka.message.InvalidMessageException: message is invalid, compression
>>>> > codec: NoCompressionCodec size: 1030 curr offset: 1034 init offset: 0
>>>> > at
>>>> >
>>>> >
>>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
>>>> > at
>>>> >
>>>> >
>>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
>>>> > at
>>>> >
>>>> >
>>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
>>>> > at
>>>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
>>>> > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
>>>> > at scala.collection.Iterator$class.foreach(Iterator.scala:631)
>>>> > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
>>>> > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
>>>> > at kafka.message.MessageSet.foreach(MessageSet.scala:87)
>>>> > at kafka.log.Log.append(Log.scala:205)
>>>> > at
>>>> >
>>>> >
>>>> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
>>>> > at
>>>> >
>>>> >
>>>> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
>>>> > at
>>>> >
>>>> >
>>>> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
>>>> > at
>>>> >
>>>> >
>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>>>> > at
>>>> >
>>>> >
>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>>>> > at
>>>> >
>>>> >
>>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
>>>> > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
>>>> > at
>>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
>>>> > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
>>>> > at
>>>> >
>>>> >
>>>> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
>>>> > at
>>>> >
>>>> >
>>>> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
>>>> > at
>>>> >
>>>> >
>>>> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
>>>> > at kafka.network.Processor.handle(SocketServer.scala:296)
>>>> > at kafka.network.Processor.read(SocketServer.scala:319)
>>>> > at kafka.network.Processor.run(SocketServer.scala:214)
>>>> > at java.lang.Thread.run(Thread.java:722)
>>>> > [2012-07-13 08:40:06,182] ERROR Closing socket for /192.168.75.13because
>>>> > of error (kafka.network.Processor)
>>>> > kafka.message.InvalidMessageException: message is invalid, compression
>>>> > codec: NoCompressionCodec size: 1030 curr offset: 1034 init offset: 0
>>>> > at
>>>> >
>>>> >
>>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
>>>> > at
>>>> >
>>>> >
>>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
>>>> > at
>>>> >
>>>> >
>>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
>>>> > at
>>>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
>>>> > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
>>>> > at scala.collection.Iterator$class.foreach(Iterator.scala:631)
>>>> > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
>>>> > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
>>>> > at kafka.message.MessageSet.foreach(MessageSet.scala:87)
>>>> > at kafka.log.Log.append(Log.scala:205)
>>>> > at
>>>> >
>>>> >
>>>> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
>>>> > at
>>>> >
>>>> >
>>>> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
>>>> > at
>>>> >
>>>> >
>>>> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
>>>> > at
>>>> >
>>>> >
>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>>>> > at
>>>> >
>>>> >
>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>>>> > at
>>>> >
>>>> >
>>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
>>>> > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
>>>> > at
>>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
>>>> > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
>>>> > at
>>>> >
>>>> >
>>>> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
>>>> > at
>>>> >
>>>> >
>>>> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
>>>> > at
>>>> >
>>>> >
>>>> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
>>>> > at kafka.network.Processor.handle(SocketServer.scala:296)
>>>> > at kafka.network.Processor.read(SocketServer.scala:319)
>>>> > at kafka.network.Processor.run(SocketServer.scala:214)
>>>> > at java.lang.Thread.run(Thread.java:722)
>>>> >
>>>> > here is the track stace in kafka producer:
>>>> > ERROR Connection attempt to 192.168.75.104:9092 failed, next attempt
>>>> in
>>>> > 60000 ms (kafka.producer.SyncProducer)
>>>> > java.net.ConnectException: Connection refused
>>>> > at sun.nio.ch.Net.connect(Native Method)
>>>> > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:525)
>>>> > at kafka.producer.SyncProducer.connect(SyncProducer.scala:173)
>>>> > at
>>>> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:196)
>>>> > at kafka.producer.SyncProducer.send(SyncProducer.scala:92)
>>>> > at kafka.producer.SyncProducer.multiSend(SyncProducer.scala:135)
>>>> > at
>>>> >
>>>> kafka.producer.async.DefaultEventHandler.send(DefaultEventHandler.scala:58)
>>>> > at
>>>> >
>>>> >
>>>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:44)
>>>> > at
>>>> >
>>>> >
>>>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:116)
>>>> > at
>>>> >
>>>> >
>>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:95)
>>>> > at
>>>> >
>>>> >
>>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:71)
>>>> > at scala.collection.immutable.Stream.foreach(Stream.scala:254)
>>>> > at
>>>> >
>>>> >
>>>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:70)
>>>> > at
>>>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:41)
>>>> >
>>>> > The kafka producer is multi-thread program.
>>>> >
>>>> > Thanks!
>>>> >
>>>> > Best Regards!
>>>> >
>>>> >
>>>> > 2012/7/13 Neha Narkhede <ne...@gmail.com>
>>>> >
>>>> > > In addition to Jun's question,
>>>> > >
>>>> > > which version are you using ? Do you have a reproducible test case ?
>>>> > >
>>>> > > Thanks,
>>>> > > Neha
>>>> > >
>>>> > > On Thu, Jul 12, 2012 at 7:19 AM, Jun Rao <ju...@gmail.com> wrote:
>>>> > > > What's the stack trace?
>>>> > > >
>>>> > > > Thanks,
>>>> > > >
>>>> > > > Jun
>>>> > > >
>>>> > > > On Thu, Jul 12, 2012 at 12:55 AM, jjian fan <
>>>> xiaofanhadoop@gmail.com>
>>>> > > wrote:
>>>> > > >
>>>> > > >> HI:
>>>> > > >>
>>>> > > >> Guys, I test kafka in our test high cocunnrent enivorment, I
>>>> always
>>>> > get
>>>> > > the
>>>> > > >> error message as follows:
>>>> > > >>
>>>> > > >> ERROR Error processing MultiProducerRequest on axxxxxxxx:2
>>>> > > >> (kafka.server.KafkaRequestHandlers)
>>>> > > >> kafka.message.InvalidMessageException: message is invalid,
>>>> compression
>>>> > > >> codec: NoCompressionCodec size: 1034 curr offset: 3114 init
>>>> offset: 0
>>>> > > >>
>>>> > > >> Can anyone help? Thanks!
>>>> > > >>
>>>> > > >> Best Regards
>>>> > > >>
>>>> > > >> Jian Fan
>>>> > > >>
>>>> > >
>>>> >
>>>>
>>>
>>>
>>
>

Re: error with kafka

Posted by jjian fan <xi...@gmail.com>.
OK,sometime it has this error :

[2012-07-13 10:08:03,205] ERROR Closing socket for /192.168.75.15 because
of error (kafka.network.Processor)
kafka.common.InvalidTopicException: topic name can't be empty
at kafka.log.LogManager.getLogPool(LogManager.scala:159)
at kafka.log.LogManager.getOrCreateLog(LogManager.scala:195)
at
kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
at
kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
at
kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
at
kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
at
kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
at
kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
at kafka.network.Processor.handle(SocketServer.scala:296)
at kafka.network.Processor.read(SocketServer.scala:319)
at kafka.network.Processor.run(SocketServer.scala:214)
at java.lang.Thread.run(Thread.java:722)

2012/7/13 jjian fan <xi...@gmail.com>

> sorry!
>
> The  producer.type  shoud be async!!
>
> I have change it to sync in my new test!
>
> Best Regards!
> Jian Fan
>
>
> 2012/7/13 jjian fan <xi...@gmail.com>
>
>> I post my code here:
>>
>> ProducerThread.java
>> package com.tz.kafka;
>>
>>
>> import java.io.Serializable;
>> import java.util.Properties;
>> import kafka.producer.ProducerConfig;
>> import kafka.javaapi.producer.*;
>> import java.util.*;
>> import java.util.concurrent.CopyOnWriteArrayList;
>>
>> public class ProducerThread implements Runnable ,Serializable
>>  {
>>   /**
>>  *
>>  */
>>  private static final long serialVersionUID = 18977854555656L;
>> //private final kafka.javaapi.producer.Producer<Integer, String> producer;
>>   private String topic;
>>   private Properties props = new Properties();
>>       private String messageStr;
>>   public  ProducerThread(String kafkatopic,String message)
>>   {
>>     synchronized(this){
>>     props.put("zk.connect", "192.168.75.45:2181,192.168.75.55:2181,
>> 192.168.75.65:2181");
>>  //props.put("broker.list", "4:192.168.75.104:9092");
>> //props.put("serializer.class", "kafka.serializer.StringEncoder");
>>  props.put("serializer.class", "kafka.serializer.StringEncoder");
>> props.put("producer.type", "sync");
>>  props.put("compression.codec", "1");
>> props.put("batch.size", "5");
>>  props.put("queue.enqueueTimeout.ms", "-1");
>> props.put("queue.size", "2000");
>>  props.put("buffer.size", "10240000");
>> //props.put("event.handler", "kafka.producer.async.EventHandler<T>");
>>  props.put("zk.sessiontimeout.ms", "6000000");
>> props.put("zk.connectiontimeout.ms", "6000000");
>>  props.put("socket.timeout.ms", "60000000");
>> props.put("connect.timeout.ms", "60000000");
>>  props.put("max.message.size", "20000");
>> props.put("reconnect.interval", String.valueOf(Integer.MAX_VALUE));
>>  props.put("reconnect.interval.ms", "3000");
>>     // Use random partitioner. Don't need the key type. Just set it to
>> Integer.
>>     // The message is of type String.
>> //producer = new kafka.javaapi.producer.Producer<Integer, String>(new
>> ProducerConfig(props));
>>     //producer = new kafka.javaapi.producer.Producer<String, String>(new
>> ProducerConfig(props));
>>     this.topic = kafkatopic;
>>     this.messageStr = message;
>>
>>   }
>>   }
>>
>>   public void run() {
>> synchronized(this) {
>>  Producer<String, String> producer  = new Producer<String, String>(new
>> ProducerConfig(props));
>>     //producer.
>>  long messageNo = 0;
>>     long t = System.currentTimeMillis();
>>     long r = System.currentTimeMillis();
>>     long time = r-t;
>>     long rate = 0;
>>     List<String> messageSet = new CopyOnWriteArrayList<String>();
>>     while(true)
>>     {
>>       if(topic.length() > 0 )
>>       {
>>      messageSet.add(this.messageStr.toString());
>>          ProducerData<String, String> data = new ProducerData<String,
>> String>(topic,null,messageSet);
>>
>>          producer.send(data);
>>          messageSet.clear();
>>          data = null;
>>          messageNo++;
>>
>>       }
>>
>>       if(messageNo % 200000 ==0)
>>       {
>>       r = System.currentTimeMillis();
>>       time = r-t;
>>       rate = 200000000/time;
>>       System.out.println(this.topic + " send message per second:"+rate);
>>       t = r;
>>       }
>>
>>      }
>> }
>>   }
>>     }
>>
>> ProducerThreadTest1.java
>>
>> package com.tz.kafka;
>>
>> import java.util.concurrent.ThreadPoolExecutor;
>> import java.util.concurrent.TimeUnit;
>> import java.util.concurrent.LinkedBlockingQueue;
>>
>> public class ProducerThreadTest1 {
>>
>> /**
>>  * @param args
>>  * @throws InterruptedException
>>  */
>>  public static void main(String[] args) throws InterruptedException {
>> // TODO Auto-generated method stub
>>  int i = Integer.parseInt(args[0]);
>>  ThreadPoolExecutor threadPool = new ThreadPoolExecutor(i, i, 5,
>>  TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(i),
>> new ThreadPoolExecutor.DiscardOldestPolicy());
>>  int messageSize = Integer.parseInt(args[1]);
>>  StringBuffer messageStr = new StringBuffer();
>>  for(int messagesize=0;messagesize<messageSize;messagesize++)
>>      {
>>      messageStr.append("X");
>>      }
>>  String topic = args[2];
>> for(int j=0;j < i; j++)
>> {
>>    topic += "x";
>>    threadPool.execute(new ProducerThread(topic,messageStr.toString()));
>>    Thread.sleep(1000);
>>
>> }
>>  }
>>
>> }
>>
>>
>> the shell scripte kafkaThreadTest.sh like this:
>>
>> java -Xmx10G -jar kafkaThreadTest.jar 2 1024 a
>>
>> I deploy the shell at ten servers!
>>
>> Thanks!
>> Best Regards!
>>
>> Jian Fan
>>
>> 2012/7/13 Jun Rao <ju...@gmail.com>
>>
>>> That seems like a Kafka bug. Do you have a script that can reproduce
>>> this?
>>>
>>> Thanks,
>>>
>>> Jun
>>>
>>> On Thu, Jul 12, 2012 at 5:44 PM, jjian fan <xi...@gmail.com>
>>> wrote:
>>>
>>> > HI:
>>> > I use kafka0.7.1, here is the stack trace in kafka server:
>>> >
>>> >  ERROR Error processing MultiProducerRequest on bxx:2
>>> > (kafka.server.KafkaRequestHandlers)
>>> > kafka.message.InvalidMessageException: message is invalid, compression
>>> > codec: NoCompressionCodec size: 1030 curr offset: 1034 init offset: 0
>>> > at
>>> >
>>> >
>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
>>> > at
>>> >
>>> >
>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
>>> > at
>>> >
>>> >
>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
>>> > at
>>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
>>> > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
>>> > at scala.collection.Iterator$class.foreach(Iterator.scala:631)
>>> > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
>>> > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
>>> > at kafka.message.MessageSet.foreach(MessageSet.scala:87)
>>> > at kafka.log.Log.append(Log.scala:205)
>>> > at
>>> >
>>> >
>>> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
>>> > at
>>> >
>>> >
>>> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
>>> > at
>>> >
>>> >
>>> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
>>> > at
>>> >
>>> >
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>>> > at
>>> >
>>> >
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>>> > at
>>> >
>>> >
>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
>>> > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
>>> > at
>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
>>> > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
>>> > at
>>> >
>>> >
>>> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
>>> > at
>>> >
>>> >
>>> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
>>> > at
>>> >
>>> >
>>> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
>>> > at kafka.network.Processor.handle(SocketServer.scala:296)
>>> > at kafka.network.Processor.read(SocketServer.scala:319)
>>> > at kafka.network.Processor.run(SocketServer.scala:214)
>>> > at java.lang.Thread.run(Thread.java:722)
>>> > [2012-07-13 08:40:06,182] ERROR Closing socket for /192.168.75.13because
>>> > of error (kafka.network.Processor)
>>> > kafka.message.InvalidMessageException: message is invalid, compression
>>> > codec: NoCompressionCodec size: 1030 curr offset: 1034 init offset: 0
>>> > at
>>> >
>>> >
>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
>>> > at
>>> >
>>> >
>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
>>> > at
>>> >
>>> >
>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
>>> > at
>>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
>>> > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
>>> > at scala.collection.Iterator$class.foreach(Iterator.scala:631)
>>> > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
>>> > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
>>> > at kafka.message.MessageSet.foreach(MessageSet.scala:87)
>>> > at kafka.log.Log.append(Log.scala:205)
>>> > at
>>> >
>>> >
>>> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
>>> > at
>>> >
>>> >
>>> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
>>> > at
>>> >
>>> >
>>> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
>>> > at
>>> >
>>> >
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>>> > at
>>> >
>>> >
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>>> > at
>>> >
>>> >
>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
>>> > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
>>> > at
>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
>>> > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
>>> > at
>>> >
>>> >
>>> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
>>> > at
>>> >
>>> >
>>> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
>>> > at
>>> >
>>> >
>>> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
>>> > at kafka.network.Processor.handle(SocketServer.scala:296)
>>> > at kafka.network.Processor.read(SocketServer.scala:319)
>>> > at kafka.network.Processor.run(SocketServer.scala:214)
>>> > at java.lang.Thread.run(Thread.java:722)
>>> >
>>> > here is the track stace in kafka producer:
>>> > ERROR Connection attempt to 192.168.75.104:9092 failed, next attempt
>>> in
>>> > 60000 ms (kafka.producer.SyncProducer)
>>> > java.net.ConnectException: Connection refused
>>> > at sun.nio.ch.Net.connect(Native Method)
>>> > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:525)
>>> > at kafka.producer.SyncProducer.connect(SyncProducer.scala:173)
>>> > at
>>> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:196)
>>> > at kafka.producer.SyncProducer.send(SyncProducer.scala:92)
>>> > at kafka.producer.SyncProducer.multiSend(SyncProducer.scala:135)
>>> > at
>>> >
>>> kafka.producer.async.DefaultEventHandler.send(DefaultEventHandler.scala:58)
>>> > at
>>> >
>>> >
>>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:44)
>>> > at
>>> >
>>> >
>>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:116)
>>> > at
>>> >
>>> >
>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:95)
>>> > at
>>> >
>>> >
>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:71)
>>> > at scala.collection.immutable.Stream.foreach(Stream.scala:254)
>>> > at
>>> >
>>> >
>>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:70)
>>> > at
>>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:41)
>>> >
>>> > The kafka producer is multi-thread program.
>>> >
>>> > Thanks!
>>> >
>>> > Best Regards!
>>> >
>>> >
>>> > 2012/7/13 Neha Narkhede <ne...@gmail.com>
>>> >
>>> > > In addition to Jun's question,
>>> > >
>>> > > which version are you using ? Do you have a reproducible test case ?
>>> > >
>>> > > Thanks,
>>> > > Neha
>>> > >
>>> > > On Thu, Jul 12, 2012 at 7:19 AM, Jun Rao <ju...@gmail.com> wrote:
>>> > > > What's the stack trace?
>>> > > >
>>> > > > Thanks,
>>> > > >
>>> > > > Jun
>>> > > >
>>> > > > On Thu, Jul 12, 2012 at 12:55 AM, jjian fan <
>>> xiaofanhadoop@gmail.com>
>>> > > wrote:
>>> > > >
>>> > > >> HI:
>>> > > >>
>>> > > >> Guys, I test kafka in our test high cocunnrent enivorment, I
>>> always
>>> > get
>>> > > the
>>> > > >> error message as follows:
>>> > > >>
>>> > > >> ERROR Error processing MultiProducerRequest on axxxxxxxx:2
>>> > > >> (kafka.server.KafkaRequestHandlers)
>>> > > >> kafka.message.InvalidMessageException: message is invalid,
>>> compression
>>> > > >> codec: NoCompressionCodec size: 1034 curr offset: 3114 init
>>> offset: 0
>>> > > >>
>>> > > >> Can anyone help? Thanks!
>>> > > >>
>>> > > >> Best Regards
>>> > > >>
>>> > > >> Jian Fan
>>> > > >>
>>> > >
>>> >
>>>
>>
>>
>

Re: error with kafka

Posted by jjian fan <xi...@gmail.com>.
sorry!

The  producer.type  shoud be async!!

I have change it to sync in my new test!

Best Regards!
Jian Fan

2012/7/13 jjian fan <xi...@gmail.com>

> I post my code here:
>
> ProducerThread.java
> package com.tz.kafka;
>
>
> import java.io.Serializable;
> import java.util.Properties;
> import kafka.producer.ProducerConfig;
> import kafka.javaapi.producer.*;
> import java.util.*;
> import java.util.concurrent.CopyOnWriteArrayList;
>
> public class ProducerThread implements Runnable ,Serializable
> {
>   /**
>  *
>  */
>  private static final long serialVersionUID = 18977854555656L;
> //private final kafka.javaapi.producer.Producer<Integer, String> producer;
>   private String topic;
>   private Properties props = new Properties();
>       private String messageStr;
>   public  ProducerThread(String kafkatopic,String message)
>   {
>     synchronized(this){
>     props.put("zk.connect", "192.168.75.45:2181,192.168.75.55:2181,
> 192.168.75.65:2181");
>  //props.put("broker.list", "4:192.168.75.104:9092");
> //props.put("serializer.class", "kafka.serializer.StringEncoder");
>  props.put("serializer.class", "kafka.serializer.StringEncoder");
> props.put("producer.type", "sync");
>  props.put("compression.codec", "1");
> props.put("batch.size", "5");
>  props.put("queue.enqueueTimeout.ms", "-1");
> props.put("queue.size", "2000");
>  props.put("buffer.size", "10240000");
> //props.put("event.handler", "kafka.producer.async.EventHandler<T>");
>  props.put("zk.sessiontimeout.ms", "6000000");
> props.put("zk.connectiontimeout.ms", "6000000");
>  props.put("socket.timeout.ms", "60000000");
> props.put("connect.timeout.ms", "60000000");
>  props.put("max.message.size", "20000");
> props.put("reconnect.interval", String.valueOf(Integer.MAX_VALUE));
>  props.put("reconnect.interval.ms", "3000");
>     // Use random partitioner. Don't need the key type. Just set it to
> Integer.
>     // The message is of type String.
> //producer = new kafka.javaapi.producer.Producer<Integer, String>(new
> ProducerConfig(props));
>     //producer = new kafka.javaapi.producer.Producer<String, String>(new
> ProducerConfig(props));
>     this.topic = kafkatopic;
>     this.messageStr = message;
>
>   }
>   }
>
>   public void run() {
> synchronized(this) {
>  Producer<String, String> producer  = new Producer<String, String>(new
> ProducerConfig(props));
>     //producer.
>  long messageNo = 0;
>     long t = System.currentTimeMillis();
>     long r = System.currentTimeMillis();
>     long time = r-t;
>     long rate = 0;
>     List<String> messageSet = new CopyOnWriteArrayList<String>();
>     while(true)
>     {
>       if(topic.length() > 0 )
>       {
>      messageSet.add(this.messageStr.toString());
>          ProducerData<String, String> data = new ProducerData<String,
> String>(topic,null,messageSet);
>
>          producer.send(data);
>          messageSet.clear();
>          data = null;
>          messageNo++;
>
>       }
>
>       if(messageNo % 200000 ==0)
>       {
>       r = System.currentTimeMillis();
>       time = r-t;
>       rate = 200000000/time;
>       System.out.println(this.topic + " send message per second:"+rate);
>       t = r;
>       }
>
>      }
> }
>   }
>     }
>
> ProducerThreadTest1.java
>
> package com.tz.kafka;
>
> import java.util.concurrent.ThreadPoolExecutor;
> import java.util.concurrent.TimeUnit;
> import java.util.concurrent.LinkedBlockingQueue;
>
> public class ProducerThreadTest1 {
>
> /**
>  * @param args
>  * @throws InterruptedException
>  */
>  public static void main(String[] args) throws InterruptedException {
> // TODO Auto-generated method stub
>  int i = Integer.parseInt(args[0]);
>  ThreadPoolExecutor threadPool = new ThreadPoolExecutor(i, i, 5,
>  TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(i),
> new ThreadPoolExecutor.DiscardOldestPolicy());
>  int messageSize = Integer.parseInt(args[1]);
>  StringBuffer messageStr = new StringBuffer();
>  for(int messagesize=0;messagesize<messageSize;messagesize++)
>      {
>      messageStr.append("X");
>      }
>  String topic = args[2];
> for(int j=0;j < i; j++)
> {
>    topic += "x";
>    threadPool.execute(new ProducerThread(topic,messageStr.toString()));
>    Thread.sleep(1000);
>
> }
>  }
>
> }
>
>
> the shell scripte kafkaThreadTest.sh like this:
>
> java -Xmx10G -jar kafkaThreadTest.jar 2 1024 a
>
> I deploy the shell at ten servers!
>
> Thanks!
> Best Regards!
>
> Jian Fan
>
> 2012/7/13 Jun Rao <ju...@gmail.com>
>
>> That seems like a Kafka bug. Do you have a script that can reproduce this?
>>
>> Thanks,
>>
>> Jun
>>
>> On Thu, Jul 12, 2012 at 5:44 PM, jjian fan <xi...@gmail.com>
>> wrote:
>>
>> > HI:
>> > I use kafka0.7.1, here is the stack trace in kafka server:
>> >
>> >  ERROR Error processing MultiProducerRequest on bxx:2
>> > (kafka.server.KafkaRequestHandlers)
>> > kafka.message.InvalidMessageException: message is invalid, compression
>> > codec: NoCompressionCodec size: 1030 curr offset: 1034 init offset: 0
>> > at
>> >
>> >
>> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
>> > at
>> >
>> >
>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
>> > at
>> >
>> >
>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
>> > at
>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
>> > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
>> > at scala.collection.Iterator$class.foreach(Iterator.scala:631)
>> > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
>> > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
>> > at kafka.message.MessageSet.foreach(MessageSet.scala:87)
>> > at kafka.log.Log.append(Log.scala:205)
>> > at
>> >
>> >
>> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
>> > at
>> >
>> >
>> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
>> > at
>> >
>> >
>> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
>> > at
>> >
>> >
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>> > at
>> >
>> >
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>> > at
>> >
>> >
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
>> > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
>> > at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
>> > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
>> > at
>> >
>> >
>> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
>> > at
>> >
>> >
>> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
>> > at
>> >
>> >
>> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
>> > at kafka.network.Processor.handle(SocketServer.scala:296)
>> > at kafka.network.Processor.read(SocketServer.scala:319)
>> > at kafka.network.Processor.run(SocketServer.scala:214)
>> > at java.lang.Thread.run(Thread.java:722)
>> > [2012-07-13 08:40:06,182] ERROR Closing socket for /192.168.75.13because
>> > of error (kafka.network.Processor)
>> > kafka.message.InvalidMessageException: message is invalid, compression
>> > codec: NoCompressionCodec size: 1030 curr offset: 1034 init offset: 0
>> > at
>> >
>> >
>> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
>> > at
>> >
>> >
>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
>> > at
>> >
>> >
>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
>> > at
>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
>> > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
>> > at scala.collection.Iterator$class.foreach(Iterator.scala:631)
>> > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
>> > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
>> > at kafka.message.MessageSet.foreach(MessageSet.scala:87)
>> > at kafka.log.Log.append(Log.scala:205)
>> > at
>> >
>> >
>> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
>> > at
>> >
>> >
>> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
>> > at
>> >
>> >
>> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
>> > at
>> >
>> >
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>> > at
>> >
>> >
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>> > at
>> >
>> >
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
>> > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
>> > at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
>> > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
>> > at
>> >
>> >
>> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
>> > at
>> >
>> >
>> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
>> > at
>> >
>> >
>> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
>> > at kafka.network.Processor.handle(SocketServer.scala:296)
>> > at kafka.network.Processor.read(SocketServer.scala:319)
>> > at kafka.network.Processor.run(SocketServer.scala:214)
>> > at java.lang.Thread.run(Thread.java:722)
>> >
>> > here is the track stace in kafka producer:
>> > ERROR Connection attempt to 192.168.75.104:9092 failed, next attempt in
>> > 60000 ms (kafka.producer.SyncProducer)
>> > java.net.ConnectException: Connection refused
>> > at sun.nio.ch.Net.connect(Native Method)
>> > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:525)
>> > at kafka.producer.SyncProducer.connect(SyncProducer.scala:173)
>> > at
>> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:196)
>> > at kafka.producer.SyncProducer.send(SyncProducer.scala:92)
>> > at kafka.producer.SyncProducer.multiSend(SyncProducer.scala:135)
>> > at
>> >
>> kafka.producer.async.DefaultEventHandler.send(DefaultEventHandler.scala:58)
>> > at
>> >
>> >
>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:44)
>> > at
>> >
>> >
>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:116)
>> > at
>> >
>> >
>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:95)
>> > at
>> >
>> >
>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:71)
>> > at scala.collection.immutable.Stream.foreach(Stream.scala:254)
>> > at
>> >
>> >
>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:70)
>> > at
>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:41)
>> >
>> > The kafka producer is multi-thread program.
>> >
>> > Thanks!
>> >
>> > Best Regards!
>> >
>> >
>> > 2012/7/13 Neha Narkhede <ne...@gmail.com>
>> >
>> > > In addition to Jun's question,
>> > >
>> > > which version are you using ? Do you have a reproducible test case ?
>> > >
>> > > Thanks,
>> > > Neha
>> > >
>> > > On Thu, Jul 12, 2012 at 7:19 AM, Jun Rao <ju...@gmail.com> wrote:
>> > > > What's the stack trace?
>> > > >
>> > > > Thanks,
>> > > >
>> > > > Jun
>> > > >
>> > > > On Thu, Jul 12, 2012 at 12:55 AM, jjian fan <
>> xiaofanhadoop@gmail.com>
>> > > wrote:
>> > > >
>> > > >> HI:
>> > > >>
>> > > >> Guys, I test kafka in our test high cocunnrent enivorment, I always
>> > get
>> > > the
>> > > >> error message as follows:
>> > > >>
>> > > >> ERROR Error processing MultiProducerRequest on axxxxxxxx:2
>> > > >> (kafka.server.KafkaRequestHandlers)
>> > > >> kafka.message.InvalidMessageException: message is invalid,
>> compression
>> > > >> codec: NoCompressionCodec size: 1034 curr offset: 3114 init
>> offset: 0
>> > > >>
>> > > >> Can anyone help? Thanks!
>> > > >>
>> > > >> Best Regards
>> > > >>
>> > > >> Jian Fan
>> > > >>
>> > >
>> >
>>
>
>

Re: error with kafka

Posted by jjian fan <xi...@gmail.com>.
Neha :

    I have create the bug in jira with the name of kafka-411, pls check it!

Thanks!
Jian Fan

2012/7/21 Neha Narkhede <ne...@gmail.com>

> Jian,
>
> This is a bug in Kafka. Would you mind filing a JIRA with the testcase you
> have and the error log ?
>
> Thanks,
> Neha
>
> On Fri, Jul 13, 2012 at 7:49 PM, jjian fan <xi...@gmail.com>
> wrote:
>
> > Jun
> >
> >   I have enabled kafka.producer.SyncProducer in my test, In produce side
> > ,there are no InvalidMessageException throw. So, error must be in server
> > side.
> >
> > Thanks!
> > Jian Fan
> >
> > 2012/7/14 Jun Rao <ju...@gmail.com>
> >
> > > Jian,
> > >
> > > Thanks. We will take a look.
> > >
> > > Another thing. Could you try enabling debug level logging in
> > > kafka.producer.SyncProducer while running your test? This will enable
> > > message verification on the producer side and will tell us if the
> > > corruption was introduced on the producer side or the broker side.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Thu, Jul 12, 2012 at 6:51 PM, jjian fan <xi...@gmail.com>
> > > wrote:
> > >
> > > > I post my code here:
> > > >
> > > > ProducerThread.java
> > > > package com.tz.kafka;
> > > >
> > > >
> > > > import java.io.Serializable;
> > > > import java.util.Properties;
> > > > import kafka.producer.ProducerConfig;
> > > > import kafka.javaapi.producer.*;
> > > > import java.util.*;
> > > > import java.util.concurrent.CopyOnWriteArrayList;
> > > >
> > > > public class ProducerThread implements Runnable ,Serializable
> > > > {
> > > >   /**
> > > >  *
> > > >  */
> > > > private static final long serialVersionUID = 18977854555656L;
> > > > //private final kafka.javaapi.producer.Producer<Integer, String>
> > > producer;
> > > >   private String topic;
> > > >   private Properties props = new Properties();
> > > >       private String messageStr;
> > > >   public  ProducerThread(String kafkatopic,String message)
> > > >   {
> > > >     synchronized(this){
> > > >     props.put("zk.connect", "192.168.75.45:2181,192.168.75.55:2181,
> > > > 192.168.75.65:2181");
> > > > //props.put("broker.list", "4:192.168.75.104:9092");
> > > > //props.put("serializer.class", "kafka.serializer.StringEncoder");
> > > > props.put("serializer.class", "kafka.serializer.StringEncoder");
> > > > props.put("producer.type", "sync");
> > > > props.put("compression.codec", "1");
> > > > props.put("batch.size", "5");
> > > > props.put("queue.enqueueTimeout.ms", "-1");
> > > > props.put("queue.size", "2000");
> > > > props.put("buffer.size", "10240000");
> > > > //props.put("event.handler", "kafka.producer.async.EventHandler<T>");
> > > > props.put("zk.sessiontimeout.ms", "6000000");
> > > > props.put("zk.connectiontimeout.ms", "6000000");
> > > > props.put("socket.timeout.ms", "60000000");
> > > > props.put("connect.timeout.ms", "60000000");
> > > > props.put("max.message.size", "20000");
> > > > props.put("reconnect.interval", String.valueOf(Integer.MAX_VALUE));
> > > > props.put("reconnect.interval.ms", "3000");
> > > >     // Use random partitioner. Don't need the key type. Just set it
> to
> > > > Integer.
> > > >     // The message is of type String.
> > > > //producer = new kafka.javaapi.producer.Producer<Integer, String>(new
> > > > ProducerConfig(props));
> > > >     //producer = new kafka.javaapi.producer.Producer<String,
> > String>(new
> > > > ProducerConfig(props));
> > > >     this.topic = kafkatopic;
> > > >     this.messageStr = message;
> > > >
> > > >   }
> > > >   }
> > > >
> > > >   public void run() {
> > > > synchronized(this) {
> > > > Producer<String, String> producer  = new Producer<String, String>(new
> > > > ProducerConfig(props));
> > > >     //producer.
> > > > long messageNo = 0;
> > > >     long t = System.currentTimeMillis();
> > > >     long r = System.currentTimeMillis();
> > > >     long time = r-t;
> > > >     long rate = 0;
> > > >     List<String> messageSet = new CopyOnWriteArrayList<String>();
> > > >     while(true)
> > > >     {
> > > >       if(topic.length() > 0 )
> > > >       {
> > > >      messageSet.add(this.messageStr.toString());
> > > >          ProducerData<String, String> data = new ProducerData<String,
> > > > String>(topic,null,messageSet);
> > > >
> > > >          producer.send(data);
> > > >          messageSet.clear();
> > > >          data = null;
> > > >          messageNo++;
> > > >
> > > >       }
> > > >
> > > >       if(messageNo % 200000 ==0)
> > > >       {
> > > >       r = System.currentTimeMillis();
> > > >       time = r-t;
> > > >       rate = 200000000/time;
> > > >       System.out.println(this.topic + " send message per
> > second:"+rate);
> > > >       t = r;
> > > >       }
> > > >
> > > >      }
> > > > }
> > > >   }
> > > >     }
> > > >
> > > > ProducerThreadTest1.java
> > > >
> > > > package com.tz.kafka;
> > > >
> > > > import java.util.concurrent.ThreadPoolExecutor;
> > > > import java.util.concurrent.TimeUnit;
> > > > import java.util.concurrent.LinkedBlockingQueue;
> > > >
> > > > public class ProducerThreadTest1 {
> > > >
> > > > /**
> > > >  * @param args
> > > >  * @throws InterruptedException
> > > >  */
> > > > public static void main(String[] args) throws InterruptedException {
> > > > // TODO Auto-generated method stub
> > > > int i = Integer.parseInt(args[0]);
> > > >  ThreadPoolExecutor threadPool = new ThreadPoolExecutor(i, i, 5,
> > > > TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(i),
> > > > new ThreadPoolExecutor.DiscardOldestPolicy());
> > > > int messageSize = Integer.parseInt(args[1]);
> > > >  StringBuffer messageStr = new StringBuffer();
> > > > for(int messagesize=0;messagesize<messageSize;messagesize++)
> > > >      {
> > > >      messageStr.append("X");
> > > >      }
> > > > String topic = args[2];
> > > > for(int j=0;j < i; j++)
> > > > {
> > > >    topic += "x";
> > > >    threadPool.execute(new
> ProducerThread(topic,messageStr.toString()));
> > > >    Thread.sleep(1000);
> > > >
> > > > }
> > > > }
> > > >
> > > > }
> > > >
> > > >
> > > > the shell scripte kafkaThreadTest.sh like this:
> > > >
> > > > java -Xmx10G -jar kafkaThreadTest.jar 2 1024 a
> > > >
> > > > I deploy the shell at ten servers!
> > > >
> > > > Thanks!
> > > > Best Regards!
> > > >
> > > > Jian Fan
> > > >
> > > > 2012/7/13 Jun Rao <ju...@gmail.com>
> > > >
> > > > > That seems like a Kafka bug. Do you have a script that can
> reproduce
> > > > this?
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > > On Thu, Jul 12, 2012 at 5:44 PM, jjian fan <
> xiaofanhadoop@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > HI:
> > > > > > I use kafka0.7.1, here is the stack trace in kafka server:
> > > > > >
> > > > > >  ERROR Error processing MultiProducerRequest on bxx:2
> > > > > > (kafka.server.KafkaRequestHandlers)
> > > > > > kafka.message.InvalidMessageException: message is invalid,
> > > compression
> > > > > > codec: NoCompressionCodec size: 1030 curr offset: 1034 init
> > offset: 0
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
> > > > > > at
> > > > >
> > >
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> > > > > > at
> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> > > > > > at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> > > > > > at
> kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
> > > > > > at
> > scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
> > > > > > at kafka.message.MessageSet.foreach(MessageSet.scala:87)
> > > > > > at kafka.log.Log.append(Log.scala:205)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> > > > > > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> > > > > > at
> > > > scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> > > > > > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > > > > at kafka.network.Processor.handle(SocketServer.scala:296)
> > > > > > at kafka.network.Processor.read(SocketServer.scala:319)
> > > > > > at kafka.network.Processor.run(SocketServer.scala:214)
> > > > > > at java.lang.Thread.run(Thread.java:722)
> > > > > > [2012-07-13 08:40:06,182] ERROR Closing socket for
> > > > /192.168.75.13because
> > > > > > of error (kafka.network.Processor)
> > > > > > kafka.message.InvalidMessageException: message is invalid,
> > > compression
> > > > > > codec: NoCompressionCodec size: 1030 curr offset: 1034 init
> > offset: 0
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
> > > > > > at
> > > > >
> > >
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> > > > > > at
> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> > > > > > at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> > > > > > at
> kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
> > > > > > at
> > scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
> > > > > > at kafka.message.MessageSet.foreach(MessageSet.scala:87)
> > > > > > at kafka.log.Log.append(Log.scala:205)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> > > > > > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> > > > > > at
> > > > scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> > > > > > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > > > > at kafka.network.Processor.handle(SocketServer.scala:296)
> > > > > > at kafka.network.Processor.read(SocketServer.scala:319)
> > > > > > at kafka.network.Processor.run(SocketServer.scala:214)
> > > > > > at java.lang.Thread.run(Thread.java:722)
> > > > > >
> > > > > > here is the track stace in kafka producer:
> > > > > > ERROR Connection attempt to 192.168.75.104:9092 failed, next
> > attempt
> > > > in
> > > > > > 60000 ms (kafka.producer.SyncProducer)
> > > > > > java.net.ConnectException: Connection refused
> > > > > > at sun.nio.ch.Net.connect(Native Method)
> > > > > > at
> sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:525)
> > > > > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:173)
> > > > > > at
> > > > >
> > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:196)
> > > > > > at kafka.producer.SyncProducer.send(SyncProducer.scala:92)
> > > > > > at kafka.producer.SyncProducer.multiSend(SyncProducer.scala:135)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.send(DefaultEventHandler.scala:58)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:44)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:116)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:95)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:71)
> > > > > > at scala.collection.immutable.Stream.foreach(Stream.scala:254)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:70)
> > > > > > at
> > > > >
> > >
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:41)
> > > > > >
> > > > > > The kafka producer is multi-thread program.
> > > > > >
> > > > > > Thanks!
> > > > > >
> > > > > > Best Regards!
> > > > > >
> > > > > >
> > > > > > 2012/7/13 Neha Narkhede <ne...@gmail.com>
> > > > > >
> > > > > > > In addition to Jun's question,
> > > > > > >
> > > > > > > which version are you using ? Do you have a reproducible test
> > case
> > > ?
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Neha
> > > > > > >
> > > > > > > On Thu, Jul 12, 2012 at 7:19 AM, Jun Rao <ju...@gmail.com>
> > wrote:
> > > > > > > > What's the stack trace?
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > Jun
> > > > > > > >
> > > > > > > > On Thu, Jul 12, 2012 at 12:55 AM, jjian fan <
> > > > xiaofanhadoop@gmail.com
> > > > > >
> > > > > > > wrote:
> > > > > > > >
> > > > > > > >> HI:
> > > > > > > >>
> > > > > > > >> Guys, I test kafka in our test high cocunnrent enivorment, I
> > > > always
> > > > > > get
> > > > > > > the
> > > > > > > >> error message as follows:
> > > > > > > >>
> > > > > > > >> ERROR Error processing MultiProducerRequest on axxxxxxxx:2
> > > > > > > >> (kafka.server.KafkaRequestHandlers)
> > > > > > > >> kafka.message.InvalidMessageException: message is invalid,
> > > > > compression
> > > > > > > >> codec: NoCompressionCodec size: 1034 curr offset: 3114 init
> > > > offset:
> > > > > 0
> > > > > > > >>
> > > > > > > >> Can anyone help? Thanks!
> > > > > > > >>
> > > > > > > >> Best Regards
> > > > > > > >>
> > > > > > > >> Jian Fan
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: error with kafka

Posted by Neha Narkhede <ne...@gmail.com>.
Jian,

This is a bug in Kafka. Would you mind filing a JIRA with the testcase you
have and the error log ?

Thanks,
Neha

On Fri, Jul 13, 2012 at 7:49 PM, jjian fan <xi...@gmail.com> wrote:

> Jun
>
>   I have enabled kafka.producer.SyncProducer in my test, In produce side
> ,there are no InvalidMessageException throw. So, error must be in server
> side.
>
> Thanks!
> Jian Fan
>
> 2012/7/14 Jun Rao <ju...@gmail.com>
>
> > Jian,
> >
> > Thanks. We will take a look.
> >
> > Another thing. Could you try enabling debug level logging in
> > kafka.producer.SyncProducer while running your test? This will enable
> > message verification on the producer side and will tell us if the
> > corruption was introduced on the producer side or the broker side.
> >
> > Thanks,
> >
> > Jun
> >
> > On Thu, Jul 12, 2012 at 6:51 PM, jjian fan <xi...@gmail.com>
> > wrote:
> >
> > > I post my code here:
> > >
> > > ProducerThread.java
> > > package com.tz.kafka;
> > >
> > >
> > > import java.io.Serializable;
> > > import java.util.Properties;
> > > import kafka.producer.ProducerConfig;
> > > import kafka.javaapi.producer.*;
> > > import java.util.*;
> > > import java.util.concurrent.CopyOnWriteArrayList;
> > >
> > > public class ProducerThread implements Runnable ,Serializable
> > > {
> > >   /**
> > >  *
> > >  */
> > > private static final long serialVersionUID = 18977854555656L;
> > > //private final kafka.javaapi.producer.Producer<Integer, String>
> > producer;
> > >   private String topic;
> > >   private Properties props = new Properties();
> > >       private String messageStr;
> > >   public  ProducerThread(String kafkatopic,String message)
> > >   {
> > >     synchronized(this){
> > >     props.put("zk.connect", "192.168.75.45:2181,192.168.75.55:2181,
> > > 192.168.75.65:2181");
> > > //props.put("broker.list", "4:192.168.75.104:9092");
> > > //props.put("serializer.class", "kafka.serializer.StringEncoder");
> > > props.put("serializer.class", "kafka.serializer.StringEncoder");
> > > props.put("producer.type", "sync");
> > > props.put("compression.codec", "1");
> > > props.put("batch.size", "5");
> > > props.put("queue.enqueueTimeout.ms", "-1");
> > > props.put("queue.size", "2000");
> > > props.put("buffer.size", "10240000");
> > > //props.put("event.handler", "kafka.producer.async.EventHandler<T>");
> > > props.put("zk.sessiontimeout.ms", "6000000");
> > > props.put("zk.connectiontimeout.ms", "6000000");
> > > props.put("socket.timeout.ms", "60000000");
> > > props.put("connect.timeout.ms", "60000000");
> > > props.put("max.message.size", "20000");
> > > props.put("reconnect.interval", String.valueOf(Integer.MAX_VALUE));
> > > props.put("reconnect.interval.ms", "3000");
> > >     // Use random partitioner. Don't need the key type. Just set it to
> > > Integer.
> > >     // The message is of type String.
> > > //producer = new kafka.javaapi.producer.Producer<Integer, String>(new
> > > ProducerConfig(props));
> > >     //producer = new kafka.javaapi.producer.Producer<String,
> String>(new
> > > ProducerConfig(props));
> > >     this.topic = kafkatopic;
> > >     this.messageStr = message;
> > >
> > >   }
> > >   }
> > >
> > >   public void run() {
> > > synchronized(this) {
> > > Producer<String, String> producer  = new Producer<String, String>(new
> > > ProducerConfig(props));
> > >     //producer.
> > > long messageNo = 0;
> > >     long t = System.currentTimeMillis();
> > >     long r = System.currentTimeMillis();
> > >     long time = r-t;
> > >     long rate = 0;
> > >     List<String> messageSet = new CopyOnWriteArrayList<String>();
> > >     while(true)
> > >     {
> > >       if(topic.length() > 0 )
> > >       {
> > >      messageSet.add(this.messageStr.toString());
> > >          ProducerData<String, String> data = new ProducerData<String,
> > > String>(topic,null,messageSet);
> > >
> > >          producer.send(data);
> > >          messageSet.clear();
> > >          data = null;
> > >          messageNo++;
> > >
> > >       }
> > >
> > >       if(messageNo % 200000 ==0)
> > >       {
> > >       r = System.currentTimeMillis();
> > >       time = r-t;
> > >       rate = 200000000/time;
> > >       System.out.println(this.topic + " send message per
> second:"+rate);
> > >       t = r;
> > >       }
> > >
> > >      }
> > > }
> > >   }
> > >     }
> > >
> > > ProducerThreadTest1.java
> > >
> > > package com.tz.kafka;
> > >
> > > import java.util.concurrent.ThreadPoolExecutor;
> > > import java.util.concurrent.TimeUnit;
> > > import java.util.concurrent.LinkedBlockingQueue;
> > >
> > > public class ProducerThreadTest1 {
> > >
> > > /**
> > >  * @param args
> > >  * @throws InterruptedException
> > >  */
> > > public static void main(String[] args) throws InterruptedException {
> > > // TODO Auto-generated method stub
> > > int i = Integer.parseInt(args[0]);
> > >  ThreadPoolExecutor threadPool = new ThreadPoolExecutor(i, i, 5,
> > > TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(i),
> > > new ThreadPoolExecutor.DiscardOldestPolicy());
> > > int messageSize = Integer.parseInt(args[1]);
> > >  StringBuffer messageStr = new StringBuffer();
> > > for(int messagesize=0;messagesize<messageSize;messagesize++)
> > >      {
> > >      messageStr.append("X");
> > >      }
> > > String topic = args[2];
> > > for(int j=0;j < i; j++)
> > > {
> > >    topic += "x";
> > >    threadPool.execute(new ProducerThread(topic,messageStr.toString()));
> > >    Thread.sleep(1000);
> > >
> > > }
> > > }
> > >
> > > }
> > >
> > >
> > > the shell scripte kafkaThreadTest.sh like this:
> > >
> > > java -Xmx10G -jar kafkaThreadTest.jar 2 1024 a
> > >
> > > I deploy the shell at ten servers!
> > >
> > > Thanks!
> > > Best Regards!
> > >
> > > Jian Fan
> > >
> > > 2012/7/13 Jun Rao <ju...@gmail.com>
> > >
> > > > That seems like a Kafka bug. Do you have a script that can reproduce
> > > this?
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Thu, Jul 12, 2012 at 5:44 PM, jjian fan <xi...@gmail.com>
> > > > wrote:
> > > >
> > > > > HI:
> > > > > I use kafka0.7.1, here is the stack trace in kafka server:
> > > > >
> > > > >  ERROR Error processing MultiProducerRequest on bxx:2
> > > > > (kafka.server.KafkaRequestHandlers)
> > > > > kafka.message.InvalidMessageException: message is invalid,
> > compression
> > > > > codec: NoCompressionCodec size: 1030 curr offset: 1034 init
> offset: 0
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
> > > > > at
> > > >
> > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> > > > > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> > > > > at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> > > > > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
> > > > > at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
> > > > > at kafka.message.MessageSet.foreach(MessageSet.scala:87)
> > > > > at kafka.log.Log.append(Log.scala:205)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> > > > > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> > > > > at
> > > scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> > > > > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > > > at kafka.network.Processor.handle(SocketServer.scala:296)
> > > > > at kafka.network.Processor.read(SocketServer.scala:319)
> > > > > at kafka.network.Processor.run(SocketServer.scala:214)
> > > > > at java.lang.Thread.run(Thread.java:722)
> > > > > [2012-07-13 08:40:06,182] ERROR Closing socket for
> > > /192.168.75.13because
> > > > > of error (kafka.network.Processor)
> > > > > kafka.message.InvalidMessageException: message is invalid,
> > compression
> > > > > codec: NoCompressionCodec size: 1030 curr offset: 1034 init
> offset: 0
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
> > > > > at
> > > >
> > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> > > > > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> > > > > at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> > > > > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
> > > > > at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
> > > > > at kafka.message.MessageSet.foreach(MessageSet.scala:87)
> > > > > at kafka.log.Log.append(Log.scala:205)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> > > > > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> > > > > at
> > > scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> > > > > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > > > at kafka.network.Processor.handle(SocketServer.scala:296)
> > > > > at kafka.network.Processor.read(SocketServer.scala:319)
> > > > > at kafka.network.Processor.run(SocketServer.scala:214)
> > > > > at java.lang.Thread.run(Thread.java:722)
> > > > >
> > > > > here is the track stace in kafka producer:
> > > > > ERROR Connection attempt to 192.168.75.104:9092 failed, next
> attempt
> > > in
> > > > > 60000 ms (kafka.producer.SyncProducer)
> > > > > java.net.ConnectException: Connection refused
> > > > > at sun.nio.ch.Net.connect(Native Method)
> > > > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:525)
> > > > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:173)
> > > > > at
> > > >
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:196)
> > > > > at kafka.producer.SyncProducer.send(SyncProducer.scala:92)
> > > > > at kafka.producer.SyncProducer.multiSend(SyncProducer.scala:135)
> > > > > at
> > > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.send(DefaultEventHandler.scala:58)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:44)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:116)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:95)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:71)
> > > > > at scala.collection.immutable.Stream.foreach(Stream.scala:254)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:70)
> > > > > at
> > > >
> > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:41)
> > > > >
> > > > > The kafka producer is multi-thread program.
> > > > >
> > > > > Thanks!
> > > > >
> > > > > Best Regards!
> > > > >
> > > > >
> > > > > 2012/7/13 Neha Narkhede <ne...@gmail.com>
> > > > >
> > > > > > In addition to Jun's question,
> > > > > >
> > > > > > which version are you using ? Do you have a reproducible test
> case
> > ?
> > > > > >
> > > > > > Thanks,
> > > > > > Neha
> > > > > >
> > > > > > On Thu, Jul 12, 2012 at 7:19 AM, Jun Rao <ju...@gmail.com>
> wrote:
> > > > > > > What's the stack trace?
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Jun
> > > > > > >
> > > > > > > On Thu, Jul 12, 2012 at 12:55 AM, jjian fan <
> > > xiaofanhadoop@gmail.com
> > > > >
> > > > > > wrote:
> > > > > > >
> > > > > > >> HI:
> > > > > > >>
> > > > > > >> Guys, I test kafka in our test high cocunnrent enivorment, I
> > > always
> > > > > get
> > > > > > the
> > > > > > >> error message as follows:
> > > > > > >>
> > > > > > >> ERROR Error processing MultiProducerRequest on axxxxxxxx:2
> > > > > > >> (kafka.server.KafkaRequestHandlers)
> > > > > > >> kafka.message.InvalidMessageException: message is invalid,
> > > > compression
> > > > > > >> codec: NoCompressionCodec size: 1034 curr offset: 3114 init
> > > offset:
> > > > 0
> > > > > > >>
> > > > > > >> Can anyone help? Thanks!
> > > > > > >>
> > > > > > >> Best Regards
> > > > > > >>
> > > > > > >> Jian Fan
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: error with kafka

Posted by jjian fan <xi...@gmail.com>.
Jun

  I have enabled kafka.producer.SyncProducer in my test, In produce side
,there are no InvalidMessageException throw. So, error must be in server
side.

Thanks!
Jian Fan

2012/7/14 Jun Rao <ju...@gmail.com>

> Jian,
>
> Thanks. We will take a look.
>
> Another thing. Could you try enabling debug level logging in
> kafka.producer.SyncProducer while running your test? This will enable
> message verification on the producer side and will tell us if the
> corruption was introduced on the producer side or the broker side.
>
> Thanks,
>
> Jun
>
> On Thu, Jul 12, 2012 at 6:51 PM, jjian fan <xi...@gmail.com>
> wrote:
>
> > I post my code here:
> >
> > ProducerThread.java
> > package com.tz.kafka;
> >
> >
> > import java.io.Serializable;
> > import java.util.Properties;
> > import kafka.producer.ProducerConfig;
> > import kafka.javaapi.producer.*;
> > import java.util.*;
> > import java.util.concurrent.CopyOnWriteArrayList;
> >
> > public class ProducerThread implements Runnable ,Serializable
> > {
> >   /**
> >  *
> >  */
> > private static final long serialVersionUID = 18977854555656L;
> > //private final kafka.javaapi.producer.Producer<Integer, String>
> producer;
> >   private String topic;
> >   private Properties props = new Properties();
> >       private String messageStr;
> >   public  ProducerThread(String kafkatopic,String message)
> >   {
> >     synchronized(this){
> >     props.put("zk.connect", "192.168.75.45:2181,192.168.75.55:2181,
> > 192.168.75.65:2181");
> > //props.put("broker.list", "4:192.168.75.104:9092");
> > //props.put("serializer.class", "kafka.serializer.StringEncoder");
> > props.put("serializer.class", "kafka.serializer.StringEncoder");
> > props.put("producer.type", "sync");
> > props.put("compression.codec", "1");
> > props.put("batch.size", "5");
> > props.put("queue.enqueueTimeout.ms", "-1");
> > props.put("queue.size", "2000");
> > props.put("buffer.size", "10240000");
> > //props.put("event.handler", "kafka.producer.async.EventHandler<T>");
> > props.put("zk.sessiontimeout.ms", "6000000");
> > props.put("zk.connectiontimeout.ms", "6000000");
> > props.put("socket.timeout.ms", "60000000");
> > props.put("connect.timeout.ms", "60000000");
> > props.put("max.message.size", "20000");
> > props.put("reconnect.interval", String.valueOf(Integer.MAX_VALUE));
> > props.put("reconnect.interval.ms", "3000");
> >     // Use random partitioner. Don't need the key type. Just set it to
> > Integer.
> >     // The message is of type String.
> > //producer = new kafka.javaapi.producer.Producer<Integer, String>(new
> > ProducerConfig(props));
> >     //producer = new kafka.javaapi.producer.Producer<String, String>(new
> > ProducerConfig(props));
> >     this.topic = kafkatopic;
> >     this.messageStr = message;
> >
> >   }
> >   }
> >
> >   public void run() {
> > synchronized(this) {
> > Producer<String, String> producer  = new Producer<String, String>(new
> > ProducerConfig(props));
> >     //producer.
> > long messageNo = 0;
> >     long t = System.currentTimeMillis();
> >     long r = System.currentTimeMillis();
> >     long time = r-t;
> >     long rate = 0;
> >     List<String> messageSet = new CopyOnWriteArrayList<String>();
> >     while(true)
> >     {
> >       if(topic.length() > 0 )
> >       {
> >      messageSet.add(this.messageStr.toString());
> >          ProducerData<String, String> data = new ProducerData<String,
> > String>(topic,null,messageSet);
> >
> >          producer.send(data);
> >          messageSet.clear();
> >          data = null;
> >          messageNo++;
> >
> >       }
> >
> >       if(messageNo % 200000 ==0)
> >       {
> >       r = System.currentTimeMillis();
> >       time = r-t;
> >       rate = 200000000/time;
> >       System.out.println(this.topic + " send message per second:"+rate);
> >       t = r;
> >       }
> >
> >      }
> > }
> >   }
> >     }
> >
> > ProducerThreadTest1.java
> >
> > package com.tz.kafka;
> >
> > import java.util.concurrent.ThreadPoolExecutor;
> > import java.util.concurrent.TimeUnit;
> > import java.util.concurrent.LinkedBlockingQueue;
> >
> > public class ProducerThreadTest1 {
> >
> > /**
> >  * @param args
> >  * @throws InterruptedException
> >  */
> > public static void main(String[] args) throws InterruptedException {
> > // TODO Auto-generated method stub
> > int i = Integer.parseInt(args[0]);
> >  ThreadPoolExecutor threadPool = new ThreadPoolExecutor(i, i, 5,
> > TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(i),
> > new ThreadPoolExecutor.DiscardOldestPolicy());
> > int messageSize = Integer.parseInt(args[1]);
> >  StringBuffer messageStr = new StringBuffer();
> > for(int messagesize=0;messagesize<messageSize;messagesize++)
> >      {
> >      messageStr.append("X");
> >      }
> > String topic = args[2];
> > for(int j=0;j < i; j++)
> > {
> >    topic += "x";
> >    threadPool.execute(new ProducerThread(topic,messageStr.toString()));
> >    Thread.sleep(1000);
> >
> > }
> > }
> >
> > }
> >
> >
> > the shell scripte kafkaThreadTest.sh like this:
> >
> > java -Xmx10G -jar kafkaThreadTest.jar 2 1024 a
> >
> > I deploy the shell at ten servers!
> >
> > Thanks!
> > Best Regards!
> >
> > Jian Fan
> >
> > 2012/7/13 Jun Rao <ju...@gmail.com>
> >
> > > That seems like a Kafka bug. Do you have a script that can reproduce
> > this?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Thu, Jul 12, 2012 at 5:44 PM, jjian fan <xi...@gmail.com>
> > > wrote:
> > >
> > > > HI:
> > > > I use kafka0.7.1, here is the stack trace in kafka server:
> > > >
> > > >  ERROR Error processing MultiProducerRequest on bxx:2
> > > > (kafka.server.KafkaRequestHandlers)
> > > > kafka.message.InvalidMessageException: message is invalid,
> compression
> > > > codec: NoCompressionCodec size: 1030 curr offset: 1034 init offset: 0
> > > > at
> > > >
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
> > > > at
> > >
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> > > > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> > > > at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> > > > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
> > > > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
> > > > at kafka.message.MessageSet.foreach(MessageSet.scala:87)
> > > > at kafka.log.Log.append(Log.scala:205)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > > at
> > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > > at
> > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > > at
> > > >
> > > >
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> > > > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> > > > at
> > scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> > > > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > > at kafka.network.Processor.handle(SocketServer.scala:296)
> > > > at kafka.network.Processor.read(SocketServer.scala:319)
> > > > at kafka.network.Processor.run(SocketServer.scala:214)
> > > > at java.lang.Thread.run(Thread.java:722)
> > > > [2012-07-13 08:40:06,182] ERROR Closing socket for
> > /192.168.75.13because
> > > > of error (kafka.network.Processor)
> > > > kafka.message.InvalidMessageException: message is invalid,
> compression
> > > > codec: NoCompressionCodec size: 1030 curr offset: 1034 init offset: 0
> > > > at
> > > >
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
> > > > at
> > >
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> > > > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> > > > at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> > > > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
> > > > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
> > > > at kafka.message.MessageSet.foreach(MessageSet.scala:87)
> > > > at kafka.log.Log.append(Log.scala:205)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > > at
> > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > > at
> > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > > at
> > > >
> > > >
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> > > > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> > > > at
> > scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> > > > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > > at kafka.network.Processor.handle(SocketServer.scala:296)
> > > > at kafka.network.Processor.read(SocketServer.scala:319)
> > > > at kafka.network.Processor.run(SocketServer.scala:214)
> > > > at java.lang.Thread.run(Thread.java:722)
> > > >
> > > > here is the track stace in kafka producer:
> > > > ERROR Connection attempt to 192.168.75.104:9092 failed, next attempt
> > in
> > > > 60000 ms (kafka.producer.SyncProducer)
> > > > java.net.ConnectException: Connection refused
> > > > at sun.nio.ch.Net.connect(Native Method)
> > > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:525)
> > > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:173)
> > > > at
> > > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:196)
> > > > at kafka.producer.SyncProducer.send(SyncProducer.scala:92)
> > > > at kafka.producer.SyncProducer.multiSend(SyncProducer.scala:135)
> > > > at
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.send(DefaultEventHandler.scala:58)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:44)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:116)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:95)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:71)
> > > > at scala.collection.immutable.Stream.foreach(Stream.scala:254)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:70)
> > > > at
> > >
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:41)
> > > >
> > > > The kafka producer is multi-thread program.
> > > >
> > > > Thanks!
> > > >
> > > > Best Regards!
> > > >
> > > >
> > > > 2012/7/13 Neha Narkhede <ne...@gmail.com>
> > > >
> > > > > In addition to Jun's question,
> > > > >
> > > > > which version are you using ? Do you have a reproducible test case
> ?
> > > > >
> > > > > Thanks,
> > > > > Neha
> > > > >
> > > > > On Thu, Jul 12, 2012 at 7:19 AM, Jun Rao <ju...@gmail.com> wrote:
> > > > > > What's the stack trace?
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > > On Thu, Jul 12, 2012 at 12:55 AM, jjian fan <
> > xiaofanhadoop@gmail.com
> > > >
> > > > > wrote:
> > > > > >
> > > > > >> HI:
> > > > > >>
> > > > > >> Guys, I test kafka in our test high cocunnrent enivorment, I
> > always
> > > > get
> > > > > the
> > > > > >> error message as follows:
> > > > > >>
> > > > > >> ERROR Error processing MultiProducerRequest on axxxxxxxx:2
> > > > > >> (kafka.server.KafkaRequestHandlers)
> > > > > >> kafka.message.InvalidMessageException: message is invalid,
> > > compression
> > > > > >> codec: NoCompressionCodec size: 1034 curr offset: 3114 init
> > offset:
> > > 0
> > > > > >>
> > > > > >> Can anyone help? Thanks!
> > > > > >>
> > > > > >> Best Regards
> > > > > >>
> > > > > >> Jian Fan
> > > > > >>
> > > > >
> > > >
> > >
> >
>

Re: error with kafka

Posted by Jun Rao <ju...@gmail.com>.
Jian,

Thanks. We will take a look.

Another thing. Could you try enabling debug level logging in
kafka.producer.SyncProducer while running your test? This will enable
message verification on the producer side and will tell us if the
corruption was introduced on the producer side or the broker side.

Thanks,

Jun

On Thu, Jul 12, 2012 at 6:51 PM, jjian fan <xi...@gmail.com> wrote:

> I post my code here:
>
> ProducerThread.java
> package com.tz.kafka;
>
>
> import java.io.Serializable;
> import java.util.Properties;
> import kafka.producer.ProducerConfig;
> import kafka.javaapi.producer.*;
> import java.util.*;
> import java.util.concurrent.CopyOnWriteArrayList;
>
> public class ProducerThread implements Runnable ,Serializable
> {
>   /**
>  *
>  */
> private static final long serialVersionUID = 18977854555656L;
> //private final kafka.javaapi.producer.Producer<Integer, String> producer;
>   private String topic;
>   private Properties props = new Properties();
>       private String messageStr;
>   public  ProducerThread(String kafkatopic,String message)
>   {
>     synchronized(this){
>     props.put("zk.connect", "192.168.75.45:2181,192.168.75.55:2181,
> 192.168.75.65:2181");
> //props.put("broker.list", "4:192.168.75.104:9092");
> //props.put("serializer.class", "kafka.serializer.StringEncoder");
> props.put("serializer.class", "kafka.serializer.StringEncoder");
> props.put("producer.type", "sync");
> props.put("compression.codec", "1");
> props.put("batch.size", "5");
> props.put("queue.enqueueTimeout.ms", "-1");
> props.put("queue.size", "2000");
> props.put("buffer.size", "10240000");
> //props.put("event.handler", "kafka.producer.async.EventHandler<T>");
> props.put("zk.sessiontimeout.ms", "6000000");
> props.put("zk.connectiontimeout.ms", "6000000");
> props.put("socket.timeout.ms", "60000000");
> props.put("connect.timeout.ms", "60000000");
> props.put("max.message.size", "20000");
> props.put("reconnect.interval", String.valueOf(Integer.MAX_VALUE));
> props.put("reconnect.interval.ms", "3000");
>     // Use random partitioner. Don't need the key type. Just set it to
> Integer.
>     // The message is of type String.
> //producer = new kafka.javaapi.producer.Producer<Integer, String>(new
> ProducerConfig(props));
>     //producer = new kafka.javaapi.producer.Producer<String, String>(new
> ProducerConfig(props));
>     this.topic = kafkatopic;
>     this.messageStr = message;
>
>   }
>   }
>
>   public void run() {
> synchronized(this) {
> Producer<String, String> producer  = new Producer<String, String>(new
> ProducerConfig(props));
>     //producer.
> long messageNo = 0;
>     long t = System.currentTimeMillis();
>     long r = System.currentTimeMillis();
>     long time = r-t;
>     long rate = 0;
>     List<String> messageSet = new CopyOnWriteArrayList<String>();
>     while(true)
>     {
>       if(topic.length() > 0 )
>       {
>      messageSet.add(this.messageStr.toString());
>          ProducerData<String, String> data = new ProducerData<String,
> String>(topic,null,messageSet);
>
>          producer.send(data);
>          messageSet.clear();
>          data = null;
>          messageNo++;
>
>       }
>
>       if(messageNo % 200000 ==0)
>       {
>       r = System.currentTimeMillis();
>       time = r-t;
>       rate = 200000000/time;
>       System.out.println(this.topic + " send message per second:"+rate);
>       t = r;
>       }
>
>      }
> }
>   }
>     }
>
> ProducerThreadTest1.java
>
> package com.tz.kafka;
>
> import java.util.concurrent.ThreadPoolExecutor;
> import java.util.concurrent.TimeUnit;
> import java.util.concurrent.LinkedBlockingQueue;
>
> public class ProducerThreadTest1 {
>
> /**
>  * @param args
>  * @throws InterruptedException
>  */
> public static void main(String[] args) throws InterruptedException {
> // TODO Auto-generated method stub
> int i = Integer.parseInt(args[0]);
>  ThreadPoolExecutor threadPool = new ThreadPoolExecutor(i, i, 5,
> TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(i),
> new ThreadPoolExecutor.DiscardOldestPolicy());
> int messageSize = Integer.parseInt(args[1]);
>  StringBuffer messageStr = new StringBuffer();
> for(int messagesize=0;messagesize<messageSize;messagesize++)
>      {
>      messageStr.append("X");
>      }
> String topic = args[2];
> for(int j=0;j < i; j++)
> {
>    topic += "x";
>    threadPool.execute(new ProducerThread(topic,messageStr.toString()));
>    Thread.sleep(1000);
>
> }
> }
>
> }
>
>
> the shell scripte kafkaThreadTest.sh like this:
>
> java -Xmx10G -jar kafkaThreadTest.jar 2 1024 a
>
> I deploy the shell at ten servers!
>
> Thanks!
> Best Regards!
>
> Jian Fan
>
> 2012/7/13 Jun Rao <ju...@gmail.com>
>
> > That seems like a Kafka bug. Do you have a script that can reproduce
> this?
> >
> > Thanks,
> >
> > Jun
> >
> > On Thu, Jul 12, 2012 at 5:44 PM, jjian fan <xi...@gmail.com>
> > wrote:
> >
> > > HI:
> > > I use kafka0.7.1, here is the stack trace in kafka server:
> > >
> > >  ERROR Error processing MultiProducerRequest on bxx:2
> > > (kafka.server.KafkaRequestHandlers)
> > > kafka.message.InvalidMessageException: message is invalid, compression
> > > codec: NoCompressionCodec size: 1030 curr offset: 1034 init offset: 0
> > > at
> > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
> > > at
> > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
> > > at
> > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
> > > at
> > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> > > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> > > at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> > > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
> > > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
> > > at kafka.message.MessageSet.foreach(MessageSet.scala:87)
> > > at kafka.log.Log.append(Log.scala:205)
> > > at
> > >
> > >
> >
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
> > > at
> > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > at
> > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > at
> > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > at
> > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > at
> > >
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> > > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> > > at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> > > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> > > at
> > >
> > >
> >
> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
> > > at
> > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > at
> > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > at kafka.network.Processor.handle(SocketServer.scala:296)
> > > at kafka.network.Processor.read(SocketServer.scala:319)
> > > at kafka.network.Processor.run(SocketServer.scala:214)
> > > at java.lang.Thread.run(Thread.java:722)
> > > [2012-07-13 08:40:06,182] ERROR Closing socket for
> /192.168.75.13because
> > > of error (kafka.network.Processor)
> > > kafka.message.InvalidMessageException: message is invalid, compression
> > > codec: NoCompressionCodec size: 1030 curr offset: 1034 init offset: 0
> > > at
> > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
> > > at
> > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
> > > at
> > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
> > > at
> > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> > > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> > > at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> > > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
> > > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
> > > at kafka.message.MessageSet.foreach(MessageSet.scala:87)
> > > at kafka.log.Log.append(Log.scala:205)
> > > at
> > >
> > >
> >
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
> > > at
> > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > at
> > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > at
> > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > at
> > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > at
> > >
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> > > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> > > at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> > > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> > > at
> > >
> > >
> >
> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
> > > at
> > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > at
> > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > at kafka.network.Processor.handle(SocketServer.scala:296)
> > > at kafka.network.Processor.read(SocketServer.scala:319)
> > > at kafka.network.Processor.run(SocketServer.scala:214)
> > > at java.lang.Thread.run(Thread.java:722)
> > >
> > > here is the track stace in kafka producer:
> > > ERROR Connection attempt to 192.168.75.104:9092 failed, next attempt
> in
> > > 60000 ms (kafka.producer.SyncProducer)
> > > java.net.ConnectException: Connection refused
> > > at sun.nio.ch.Net.connect(Native Method)
> > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:525)
> > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:173)
> > > at
> > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:196)
> > > at kafka.producer.SyncProducer.send(SyncProducer.scala:92)
> > > at kafka.producer.SyncProducer.multiSend(SyncProducer.scala:135)
> > > at
> > >
> >
> kafka.producer.async.DefaultEventHandler.send(DefaultEventHandler.scala:58)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:44)
> > > at
> > >
> > >
> >
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:116)
> > > at
> > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:95)
> > > at
> > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:71)
> > > at scala.collection.immutable.Stream.foreach(Stream.scala:254)
> > > at
> > >
> > >
> >
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:70)
> > > at
> > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:41)
> > >
> > > The kafka producer is multi-thread program.
> > >
> > > Thanks!
> > >
> > > Best Regards!
> > >
> > >
> > > 2012/7/13 Neha Narkhede <ne...@gmail.com>
> > >
> > > > In addition to Jun's question,
> > > >
> > > > which version are you using ? Do you have a reproducible test case ?
> > > >
> > > > Thanks,
> > > > Neha
> > > >
> > > > On Thu, Jul 12, 2012 at 7:19 AM, Jun Rao <ju...@gmail.com> wrote:
> > > > > What's the stack trace?
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > > On Thu, Jul 12, 2012 at 12:55 AM, jjian fan <
> xiaofanhadoop@gmail.com
> > >
> > > > wrote:
> > > > >
> > > > >> HI:
> > > > >>
> > > > >> Guys, I test kafka in our test high cocunnrent enivorment, I
> always
> > > get
> > > > the
> > > > >> error message as follows:
> > > > >>
> > > > >> ERROR Error processing MultiProducerRequest on axxxxxxxx:2
> > > > >> (kafka.server.KafkaRequestHandlers)
> > > > >> kafka.message.InvalidMessageException: message is invalid,
> > compression
> > > > >> codec: NoCompressionCodec size: 1034 curr offset: 3114 init
> offset:
> > 0
> > > > >>
> > > > >> Can anyone help? Thanks!
> > > > >>
> > > > >> Best Regards
> > > > >>
> > > > >> Jian Fan
> > > > >>
> > > >
> > >
> >
>

Re: error with kafka

Posted by jjian fan <xi...@gmail.com>.
Jun:

    I run the test more than 2 days! The packaget receive rate of broker in
my test is about 20MB/s-60MB/s. The message is compressed! You can change
the each Producer to java -Xmx10G -jar kafkaThreadTest.jar 10 1024 a, try
that!  All server use centos6.2!
   The config of broker is like as:

# The id of the broker. This must be set to a unique integer for each
broker.
brokerid=3

# Hostname the broker will advertise to consumers. If not set, kafka will
use the value returned
# from InetAddress.getLocalHost().  If there are multiple interfaces
getLocalHost
# may not be what you want.
hostname=192.168.75.102


############################# Socket Server Settings
#############################

# The port the socket server listens on
port=9092

# The number of processor threads the socket server uses for receiving and
answering requests.
# Defaults to the number of cores on the machine
num.threads=24

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer=20971520

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer=20971520

# The maximum size of a request that the socket server will accept
(protection against OOM)
max.socket.request.bytes=204857600


############################# Log Basics #############################

# The directory under which to store log files
log.dir=/data/kafka

# The number of logical partitions per topic per server. More partitions
allow greater parallelism
# for consumption, but also mean more files.
num.partitions=4

# Overrides for for the default given by num.partitions on a per-topic basis
topic.partition.count.map=test2:1

############################# Log Flush Policy #############################

# The following configurations control the flush of data to disk. This is
the most
# important performance knob in kafka.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data is at greater risk of loss in the event
of a crash.
#    2. Latency: Data is not made available to consumers until it is
flushed (which adds latency).
#    3. Throughput: The flush is generally the most expensive operation.
# The settings below allow one to configure the flush policy to flush data
after a period of time or
# every N messages (or both). This can be done globally and overridden on a
per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
log.flush.interval=4000

# The maximum amount of time a message can sit in a log before we force a
flush
log.default.flush.interval.ms=4000

# Per-topic overrides for log.default.flush.interval.ms
#topic.flush.intervals.ms=topic1:1000, topic2:3000

# The interval (in ms) at which logs are checked to see if they need to be
flushed to disk.
log.default.flush.scheduler.interval.ms=3000

############################# Log Retention Policy
#############################

# The following configurations control the disposal of log segments. The
policy can
# be set to delete segments after a period of time, or after a given size
has accumulated.
# A segment will be deleted whenever *either* of these criteria are met.
Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion
log.retention.hours=336

# A size-based retention policy for logs. Segments are pruned from the log
as long as the remaining
# segments don't drop below log.retention.size.
log.retention.size=2073741824

# The maximum size of a log segment file. When this size is reached a new
log segment will be created.
log.file.size=1073741824

# The interval at which log segments are checked to see if they can be
deleted according
# to the retention policies
log.cleanup.interval.mins=1

############################# Zookeeper #############################

# Enable connecting to zookeeper
enable.zookeeper=true

# Zk connection string (see zk docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zk.connect=192.168.75.45:2181,192.168.75.55:2181,192.168.75.65:2181

# Timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
zk.sessiontimeout.ms = 100000
zk.synctime.ms = 10000


2012/7/28 Jun Rao <ju...@gmail.com>

> Hi,
>
> I was trying to reproduce this problem locally, but couldn't. I set up 1
> server to run the broker and used another server to run 10 instances of
> ProducerThreadTest1 with the parameters you provided. No exceptions showed
> up in the broker log after the tests were running for 5 minutes.
>
> Could you share your detailed setup? What kind of servers were you using?
> Did you change any config on the broker? How long did you have to run the
> test before the exception shows up?
>
> Thanks,
>
> Jun
>
>
> On Thu, Jul 12, 2012 at 6:51 PM, jjian fan <xi...@gmail.com>
> wrote:
>
> > I post my code here:
> >
> > ProducerThread.java
> > package com.tz.kafka;
> >
> >
> > import java.io.Serializable;
> > import java.util.Properties;
> > import kafka.producer.ProducerConfig;
> > import kafka.javaapi.producer.*;
> > import java.util.*;
> > import java.util.concurrent.CopyOnWriteArrayList;
> >
> > public class ProducerThread implements Runnable ,Serializable
> > {
> >   /**
> >  *
> >  */
> > private static final long serialVersionUID = 18977854555656L;
> > //private final kafka.javaapi.producer.Producer<Integer, String>
> producer;
> >   private String topic;
> >   private Properties props = new Properties();
> >       private String messageStr;
> >   public  ProducerThread(String kafkatopic,String message)
> >   {
> >     synchronized(this){
> >     props.put("zk.connect", "192.168.75.45:2181,192.168.75.55:2181,
> > 192.168.75.65:2181");
> > //props.put("broker.list", "4:192.168.75.104:9092");
> > //props.put("serializer.class", "kafka.serializer.StringEncoder");
> > props.put("serializer.class", "kafka.serializer.StringEncoder");
> > props.put("producer.type", "sync");
> > props.put("compression.codec", "1");
> > props.put("batch.size", "5");
> > props.put("queue.enqueueTimeout.ms", "-1");
> > props.put("queue.size", "2000");
> > props.put("buffer.size", "10240000");
> > //props.put("event.handler", "kafka.producer.async.EventHandler<T>");
> > props.put("zk.sessiontimeout.ms", "6000000");
> > props.put("zk.connectiontimeout.ms", "6000000");
> > props.put("socket.timeout.ms", "60000000");
> > props.put("connect.timeout.ms", "60000000");
> > props.put("max.message.size", "20000");
> > props.put("reconnect.interval", String.valueOf(Integer.MAX_VALUE));
> > props.put("reconnect.interval.ms", "3000");
> >     // Use random partitioner. Don't need the key type. Just set it to
> > Integer.
> >     // The message is of type String.
> > //producer = new kafka.javaapi.producer.Producer<Integer, String>(new
> > ProducerConfig(props));
> >     //producer = new kafka.javaapi.producer.Producer<String, String>(new
> > ProducerConfig(props));
> >     this.topic = kafkatopic;
> >     this.messageStr = message;
> >
> >   }
> >   }
> >
> >   public void run() {
> > synchronized(this) {
> > Producer<String, String> producer  = new Producer<String, String>(new
> > ProducerConfig(props));
> >     //producer.
> > long messageNo = 0;
> >     long t = System.currentTimeMillis();
> >     long r = System.currentTimeMillis();
> >     long time = r-t;
> >     long rate = 0;
> >     List<String> messageSet = new CopyOnWriteArrayList<String>();
> >     while(true)
> >     {
> >       if(topic.length() > 0 )
> >       {
> >      messageSet.add(this.messageStr.toString());
> >          ProducerData<String, String> data = new ProducerData<String,
> > String>(topic,null,messageSet);
> >
> >          producer.send(data);
> >          messageSet.clear();
> >          data = null;
> >          messageNo++;
> >
> >       }
> >
> >       if(messageNo % 200000 ==0)
> >       {
> >       r = System.currentTimeMillis();
> >       time = r-t;
> >       rate = 200000000/time;
> >       System.out.println(this.topic + " send message per second:"+rate);
> >       t = r;
> >       }
> >
> >      }
> > }
> >   }
> >     }
> >
> > ProducerThreadTest1.java
> >
> > package com.tz.kafka;
> >
> > import java.util.concurrent.ThreadPoolExecutor;
> > import java.util.concurrent.TimeUnit;
> > import java.util.concurrent.LinkedBlockingQueue;
> >
> > public class ProducerThreadTest1 {
> >
> > /**
> >  * @param args
> >  * @throws InterruptedException
> >  */
> > public static void main(String[] args) throws InterruptedException {
> > // TODO Auto-generated method stub
> > int i = Integer.parseInt(args[0]);
> >  ThreadPoolExecutor threadPool = new ThreadPoolExecutor(i, i, 5,
> > TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(i),
> > new ThreadPoolExecutor.DiscardOldestPolicy());
> > int messageSize = Integer.parseInt(args[1]);
> >  StringBuffer messageStr = new StringBuffer();
> > for(int messagesize=0;messagesize<messageSize;messagesize++)
> >      {
> >      messageStr.append("X");
> >      }
> > String topic = args[2];
> > for(int j=0;j < i; j++)
> > {
> >    topic += "x";
> >    threadPool.execute(new ProducerThread(topic,messageStr.toString()));
> >    Thread.sleep(1000);
> >
> > }
> > }
> >
> > }
> >
> >
> > the shell scripte kafkaThreadTest.sh like this:
> >
> > java -Xmx10G -jar kafkaThreadTest.jar 2 1024 a
> >
> > I deploy the shell at ten servers!
> >
> > Thanks!
> > Best Regards!
> >
> > Jian Fan
> >
> > 2012/7/13 Jun Rao <ju...@gmail.com>
> >
> > > That seems like a Kafka bug. Do you have a script that can reproduce
> > this?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Thu, Jul 12, 2012 at 5:44 PM, jjian fan <xi...@gmail.com>
> > > wrote:
> > >
> > > > HI:
> > > > I use kafka0.7.1, here is the stack trace in kafka server:
> > > >
> > > >  ERROR Error processing MultiProducerRequest on bxx:2
> > > > (kafka.server.KafkaRequestHandlers)
> > > > kafka.message.InvalidMessageException: message is invalid,
> compression
> > > > codec: NoCompressionCodec size: 1030 curr offset: 1034 init offset: 0
> > > > at
> > > >
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
> > > > at
> > >
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> > > > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> > > > at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> > > > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
> > > > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
> > > > at kafka.message.MessageSet.foreach(MessageSet.scala:87)
> > > > at kafka.log.Log.append(Log.scala:205)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > > at
> > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > > at
> > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > > at
> > > >
> > > >
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> > > > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> > > > at
> > scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> > > > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > > at kafka.network.Processor.handle(SocketServer.scala:296)
> > > > at kafka.network.Processor.read(SocketServer.scala:319)
> > > > at kafka.network.Processor.run(SocketServer.scala:214)
> > > > at java.lang.Thread.run(Thread.java:722)
> > > > [2012-07-13 08:40:06,182] ERROR Closing socket for
> > /192.168.75.13because
> > > > of error (kafka.network.Processor)
> > > > kafka.message.InvalidMessageException: message is invalid,
> compression
> > > > codec: NoCompressionCodec size: 1030 curr offset: 1034 init offset: 0
> > > > at
> > > >
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
> > > > at
> > >
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> > > > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> > > > at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> > > > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
> > > > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
> > > > at kafka.message.MessageSet.foreach(MessageSet.scala:87)
> > > > at kafka.log.Log.append(Log.scala:205)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > > at
> > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > > at
> > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > > at
> > > >
> > > >
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> > > > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> > > > at
> > scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> > > > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > > at kafka.network.Processor.handle(SocketServer.scala:296)
> > > > at kafka.network.Processor.read(SocketServer.scala:319)
> > > > at kafka.network.Processor.run(SocketServer.scala:214)
> > > > at java.lang.Thread.run(Thread.java:722)
> > > >
> > > > here is the track stace in kafka producer:
> > > > ERROR Connection attempt to 192.168.75.104:9092 failed, next attempt
> > in
> > > > 60000 ms (kafka.producer.SyncProducer)
> > > > java.net.ConnectException: Connection refused
> > > > at sun.nio.ch.Net.connect(Native Method)
> > > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:525)
> > > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:173)
> > > > at
> > > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:196)
> > > > at kafka.producer.SyncProducer.send(SyncProducer.scala:92)
> > > > at kafka.producer.SyncProducer.multiSend(SyncProducer.scala:135)
> > > > at
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.send(DefaultEventHandler.scala:58)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:44)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:116)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:95)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:71)
> > > > at scala.collection.immutable.Stream.foreach(Stream.scala:254)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:70)
> > > > at
> > >
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:41)
> > > >
> > > > The kafka producer is multi-thread program.
> > > >
> > > > Thanks!
> > > >
> > > > Best Regards!
> > > >
> > > >
> > > > 2012/7/13 Neha Narkhede <ne...@gmail.com>
> > > >
> > > > > In addition to Jun's question,
> > > > >
> > > > > which version are you using ? Do you have a reproducible test case
> ?
> > > > >
> > > > > Thanks,
> > > > > Neha
> > > > >
> > > > > On Thu, Jul 12, 2012 at 7:19 AM, Jun Rao <ju...@gmail.com> wrote:
> > > > > > What's the stack trace?
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > > On Thu, Jul 12, 2012 at 12:55 AM, jjian fan <
> > xiaofanhadoop@gmail.com
> > > >
> > > > > wrote:
> > > > > >
> > > > > >> HI:
> > > > > >>
> > > > > >> Guys, I test kafka in our test high cocunnrent enivorment, I
> > always
> > > > get
> > > > > the
> > > > > >> error message as follows:
> > > > > >>
> > > > > >> ERROR Error processing MultiProducerRequest on axxxxxxxx:2
> > > > > >> (kafka.server.KafkaRequestHandlers)
> > > > > >> kafka.message.InvalidMessageException: message is invalid,
> > > compression
> > > > > >> codec: NoCompressionCodec size: 1034 curr offset: 3114 init
> > offset:
> > > 0
> > > > > >>
> > > > > >> Can anyone help? Thanks!
> > > > > >>
> > > > > >> Best Regards
> > > > > >>
> > > > > >> Jian Fan
> > > > > >>
> > > > >
> > > >
> > >
> >
>

Re: error with kafka

Posted by Jun Rao <ju...@gmail.com>.
Hi,

I was trying to reproduce this problem locally, but couldn't. I set up 1
server to run the broker and used another server to run 10 instances of
ProducerThreadTest1 with the parameters you provided. No exceptions showed
up in the broker log after the tests were running for 5 minutes.

Could you share your detailed setup? What kind of servers were you using?
Did you change any config on the broker? How long did you have to run the
test before the exception shows up?

Thanks,

Jun


On Thu, Jul 12, 2012 at 6:51 PM, jjian fan <xi...@gmail.com> wrote:

> I post my code here:
>
> ProducerThread.java
> package com.tz.kafka;
>
>
> import java.io.Serializable;
> import java.util.Properties;
> import kafka.producer.ProducerConfig;
> import kafka.javaapi.producer.*;
> import java.util.*;
> import java.util.concurrent.CopyOnWriteArrayList;
>
> public class ProducerThread implements Runnable ,Serializable
> {
>   /**
>  *
>  */
> private static final long serialVersionUID = 18977854555656L;
> //private final kafka.javaapi.producer.Producer<Integer, String> producer;
>   private String topic;
>   private Properties props = new Properties();
>       private String messageStr;
>   public  ProducerThread(String kafkatopic,String message)
>   {
>     synchronized(this){
>     props.put("zk.connect", "192.168.75.45:2181,192.168.75.55:2181,
> 192.168.75.65:2181");
> //props.put("broker.list", "4:192.168.75.104:9092");
> //props.put("serializer.class", "kafka.serializer.StringEncoder");
> props.put("serializer.class", "kafka.serializer.StringEncoder");
> props.put("producer.type", "sync");
> props.put("compression.codec", "1");
> props.put("batch.size", "5");
> props.put("queue.enqueueTimeout.ms", "-1");
> props.put("queue.size", "2000");
> props.put("buffer.size", "10240000");
> //props.put("event.handler", "kafka.producer.async.EventHandler<T>");
> props.put("zk.sessiontimeout.ms", "6000000");
> props.put("zk.connectiontimeout.ms", "6000000");
> props.put("socket.timeout.ms", "60000000");
> props.put("connect.timeout.ms", "60000000");
> props.put("max.message.size", "20000");
> props.put("reconnect.interval", String.valueOf(Integer.MAX_VALUE));
> props.put("reconnect.interval.ms", "3000");
>     // Use random partitioner. Don't need the key type. Just set it to
> Integer.
>     // The message is of type String.
> //producer = new kafka.javaapi.producer.Producer<Integer, String>(new
> ProducerConfig(props));
>     //producer = new kafka.javaapi.producer.Producer<String, String>(new
> ProducerConfig(props));
>     this.topic = kafkatopic;
>     this.messageStr = message;
>
>   }
>   }
>
>   public void run() {
> synchronized(this) {
> Producer<String, String> producer  = new Producer<String, String>(new
> ProducerConfig(props));
>     //producer.
> long messageNo = 0;
>     long t = System.currentTimeMillis();
>     long r = System.currentTimeMillis();
>     long time = r-t;
>     long rate = 0;
>     List<String> messageSet = new CopyOnWriteArrayList<String>();
>     while(true)
>     {
>       if(topic.length() > 0 )
>       {
>      messageSet.add(this.messageStr.toString());
>          ProducerData<String, String> data = new ProducerData<String,
> String>(topic,null,messageSet);
>
>          producer.send(data);
>          messageSet.clear();
>          data = null;
>          messageNo++;
>
>       }
>
>       if(messageNo % 200000 ==0)
>       {
>       r = System.currentTimeMillis();
>       time = r-t;
>       rate = 200000000/time;
>       System.out.println(this.topic + " send message per second:"+rate);
>       t = r;
>       }
>
>      }
> }
>   }
>     }
>
> ProducerThreadTest1.java
>
> package com.tz.kafka;
>
> import java.util.concurrent.ThreadPoolExecutor;
> import java.util.concurrent.TimeUnit;
> import java.util.concurrent.LinkedBlockingQueue;
>
> public class ProducerThreadTest1 {
>
> /**
>  * @param args
>  * @throws InterruptedException
>  */
> public static void main(String[] args) throws InterruptedException {
> // TODO Auto-generated method stub
> int i = Integer.parseInt(args[0]);
>  ThreadPoolExecutor threadPool = new ThreadPoolExecutor(i, i, 5,
> TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(i),
> new ThreadPoolExecutor.DiscardOldestPolicy());
> int messageSize = Integer.parseInt(args[1]);
>  StringBuffer messageStr = new StringBuffer();
> for(int messagesize=0;messagesize<messageSize;messagesize++)
>      {
>      messageStr.append("X");
>      }
> String topic = args[2];
> for(int j=0;j < i; j++)
> {
>    topic += "x";
>    threadPool.execute(new ProducerThread(topic,messageStr.toString()));
>    Thread.sleep(1000);
>
> }
> }
>
> }
>
>
> the shell scripte kafkaThreadTest.sh like this:
>
> java -Xmx10G -jar kafkaThreadTest.jar 2 1024 a
>
> I deploy the shell at ten servers!
>
> Thanks!
> Best Regards!
>
> Jian Fan
>
> 2012/7/13 Jun Rao <ju...@gmail.com>
>
> > That seems like a Kafka bug. Do you have a script that can reproduce
> this?
> >
> > Thanks,
> >
> > Jun
> >
> > On Thu, Jul 12, 2012 at 5:44 PM, jjian fan <xi...@gmail.com>
> > wrote:
> >
> > > HI:
> > > I use kafka0.7.1, here is the stack trace in kafka server:
> > >
> > >  ERROR Error processing MultiProducerRequest on bxx:2
> > > (kafka.server.KafkaRequestHandlers)
> > > kafka.message.InvalidMessageException: message is invalid, compression
> > > codec: NoCompressionCodec size: 1030 curr offset: 1034 init offset: 0
> > > at
> > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
> > > at
> > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
> > > at
> > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
> > > at
> > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> > > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> > > at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> > > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
> > > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
> > > at kafka.message.MessageSet.foreach(MessageSet.scala:87)
> > > at kafka.log.Log.append(Log.scala:205)
> > > at
> > >
> > >
> >
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
> > > at
> > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > at
> > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > at
> > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > at
> > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > at
> > >
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> > > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> > > at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> > > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> > > at
> > >
> > >
> >
> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
> > > at
> > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > at
> > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > at kafka.network.Processor.handle(SocketServer.scala:296)
> > > at kafka.network.Processor.read(SocketServer.scala:319)
> > > at kafka.network.Processor.run(SocketServer.scala:214)
> > > at java.lang.Thread.run(Thread.java:722)
> > > [2012-07-13 08:40:06,182] ERROR Closing socket for
> /192.168.75.13because
> > > of error (kafka.network.Processor)
> > > kafka.message.InvalidMessageException: message is invalid, compression
> > > codec: NoCompressionCodec size: 1030 curr offset: 1034 init offset: 0
> > > at
> > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
> > > at
> > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
> > > at
> > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
> > > at
> > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> > > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> > > at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> > > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
> > > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
> > > at kafka.message.MessageSet.foreach(MessageSet.scala:87)
> > > at kafka.log.Log.append(Log.scala:205)
> > > at
> > >
> > >
> >
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
> > > at
> > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > at
> > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > at
> > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > at
> > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > at
> > >
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> > > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> > > at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> > > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> > > at
> > >
> > >
> >
> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
> > > at
> > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > at
> > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > at kafka.network.Processor.handle(SocketServer.scala:296)
> > > at kafka.network.Processor.read(SocketServer.scala:319)
> > > at kafka.network.Processor.run(SocketServer.scala:214)
> > > at java.lang.Thread.run(Thread.java:722)
> > >
> > > here is the track stace in kafka producer:
> > > ERROR Connection attempt to 192.168.75.104:9092 failed, next attempt
> in
> > > 60000 ms (kafka.producer.SyncProducer)
> > > java.net.ConnectException: Connection refused
> > > at sun.nio.ch.Net.connect(Native Method)
> > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:525)
> > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:173)
> > > at
> > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:196)
> > > at kafka.producer.SyncProducer.send(SyncProducer.scala:92)
> > > at kafka.producer.SyncProducer.multiSend(SyncProducer.scala:135)
> > > at
> > >
> >
> kafka.producer.async.DefaultEventHandler.send(DefaultEventHandler.scala:58)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:44)
> > > at
> > >
> > >
> >
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:116)
> > > at
> > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:95)
> > > at
> > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:71)
> > > at scala.collection.immutable.Stream.foreach(Stream.scala:254)
> > > at
> > >
> > >
> >
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:70)
> > > at
> > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:41)
> > >
> > > The kafka producer is multi-thread program.
> > >
> > > Thanks!
> > >
> > > Best Regards!
> > >
> > >
> > > 2012/7/13 Neha Narkhede <ne...@gmail.com>
> > >
> > > > In addition to Jun's question,
> > > >
> > > > which version are you using ? Do you have a reproducible test case ?
> > > >
> > > > Thanks,
> > > > Neha
> > > >
> > > > On Thu, Jul 12, 2012 at 7:19 AM, Jun Rao <ju...@gmail.com> wrote:
> > > > > What's the stack trace?
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > > On Thu, Jul 12, 2012 at 12:55 AM, jjian fan <
> xiaofanhadoop@gmail.com
> > >
> > > > wrote:
> > > > >
> > > > >> HI:
> > > > >>
> > > > >> Guys, I test kafka in our test high cocunnrent enivorment, I
> always
> > > get
> > > > the
> > > > >> error message as follows:
> > > > >>
> > > > >> ERROR Error processing MultiProducerRequest on axxxxxxxx:2
> > > > >> (kafka.server.KafkaRequestHandlers)
> > > > >> kafka.message.InvalidMessageException: message is invalid,
> > compression
> > > > >> codec: NoCompressionCodec size: 1034 curr offset: 3114 init
> offset:
> > 0
> > > > >>
> > > > >> Can anyone help? Thanks!
> > > > >>
> > > > >> Best Regards
> > > > >>
> > > > >> Jian Fan
> > > > >>
> > > >
> > >
> >
>

Re: error with kafka

Posted by jjian fan <xi...@gmail.com>.
I post my code here:

ProducerThread.java
package com.tz.kafka;


import java.io.Serializable;
import java.util.Properties;
import kafka.producer.ProducerConfig;
import kafka.javaapi.producer.*;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;

public class ProducerThread implements Runnable ,Serializable
{
  /**
 *
 */
private static final long serialVersionUID = 18977854555656L;
//private final kafka.javaapi.producer.Producer<Integer, String> producer;
  private String topic;
  private Properties props = new Properties();
      private String messageStr;
  public  ProducerThread(String kafkatopic,String message)
  {
    synchronized(this){
    props.put("zk.connect", "192.168.75.45:2181,192.168.75.55:2181,
192.168.75.65:2181");
//props.put("broker.list", "4:192.168.75.104:9092");
//props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("producer.type", "sync");
props.put("compression.codec", "1");
props.put("batch.size", "5");
props.put("queue.enqueueTimeout.ms", "-1");
props.put("queue.size", "2000");
props.put("buffer.size", "10240000");
//props.put("event.handler", "kafka.producer.async.EventHandler<T>");
props.put("zk.sessiontimeout.ms", "6000000");
props.put("zk.connectiontimeout.ms", "6000000");
props.put("socket.timeout.ms", "60000000");
props.put("connect.timeout.ms", "60000000");
props.put("max.message.size", "20000");
props.put("reconnect.interval", String.valueOf(Integer.MAX_VALUE));
props.put("reconnect.interval.ms", "3000");
    // Use random partitioner. Don't need the key type. Just set it to
Integer.
    // The message is of type String.
//producer = new kafka.javaapi.producer.Producer<Integer, String>(new
ProducerConfig(props));
    //producer = new kafka.javaapi.producer.Producer<String, String>(new
ProducerConfig(props));
    this.topic = kafkatopic;
    this.messageStr = message;

  }
  }

  public void run() {
synchronized(this) {
Producer<String, String> producer  = new Producer<String, String>(new
ProducerConfig(props));
    //producer.
long messageNo = 0;
    long t = System.currentTimeMillis();
    long r = System.currentTimeMillis();
    long time = r-t;
    long rate = 0;
    List<String> messageSet = new CopyOnWriteArrayList<String>();
    while(true)
    {
      if(topic.length() > 0 )
      {
     messageSet.add(this.messageStr.toString());
         ProducerData<String, String> data = new ProducerData<String,
String>(topic,null,messageSet);

         producer.send(data);
         messageSet.clear();
         data = null;
         messageNo++;

      }

      if(messageNo % 200000 ==0)
      {
      r = System.currentTimeMillis();
      time = r-t;
      rate = 200000000/time;
      System.out.println(this.topic + " send message per second:"+rate);
      t = r;
      }

     }
}
  }
    }

ProducerThreadTest1.java

package com.tz.kafka;

import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.LinkedBlockingQueue;

public class ProducerThreadTest1 {

/**
 * @param args
 * @throws InterruptedException
 */
public static void main(String[] args) throws InterruptedException {
// TODO Auto-generated method stub
int i = Integer.parseInt(args[0]);
 ThreadPoolExecutor threadPool = new ThreadPoolExecutor(i, i, 5,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(i),
new ThreadPoolExecutor.DiscardOldestPolicy());
int messageSize = Integer.parseInt(args[1]);
 StringBuffer messageStr = new StringBuffer();
for(int messagesize=0;messagesize<messageSize;messagesize++)
     {
     messageStr.append("X");
     }
String topic = args[2];
for(int j=0;j < i; j++)
{
   topic += "x";
   threadPool.execute(new ProducerThread(topic,messageStr.toString()));
   Thread.sleep(1000);

}
}

}


the shell scripte kafkaThreadTest.sh like this:

java -Xmx10G -jar kafkaThreadTest.jar 2 1024 a

I deploy the shell at ten servers!

Thanks!
Best Regards!

Jian Fan

2012/7/13 Jun Rao <ju...@gmail.com>

> That seems like a Kafka bug. Do you have a script that can reproduce this?
>
> Thanks,
>
> Jun
>
> On Thu, Jul 12, 2012 at 5:44 PM, jjian fan <xi...@gmail.com>
> wrote:
>
> > HI:
> > I use kafka0.7.1, here is the stack trace in kafka server:
> >
> >  ERROR Error processing MultiProducerRequest on bxx:2
> > (kafka.server.KafkaRequestHandlers)
> > kafka.message.InvalidMessageException: message is invalid, compression
> > codec: NoCompressionCodec size: 1030 curr offset: 1034 init offset: 0
> > at
> >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
> > at
> >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
> > at
> >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
> > at
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> > at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
> > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
> > at kafka.message.MessageSet.foreach(MessageSet.scala:87)
> > at kafka.log.Log.append(Log.scala:205)
> > at
> >
> >
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
> > at
> >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > at
> >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > at
> >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > at
> >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > at
> >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> > at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> > at
> >
> >
> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
> > at
> >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > at
> >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > at kafka.network.Processor.handle(SocketServer.scala:296)
> > at kafka.network.Processor.read(SocketServer.scala:319)
> > at kafka.network.Processor.run(SocketServer.scala:214)
> > at java.lang.Thread.run(Thread.java:722)
> > [2012-07-13 08:40:06,182] ERROR Closing socket for /192.168.75.13because
> > of error (kafka.network.Processor)
> > kafka.message.InvalidMessageException: message is invalid, compression
> > codec: NoCompressionCodec size: 1030 curr offset: 1034 init offset: 0
> > at
> >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
> > at
> >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
> > at
> >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
> > at
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> > at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
> > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
> > at kafka.message.MessageSet.foreach(MessageSet.scala:87)
> > at kafka.log.Log.append(Log.scala:205)
> > at
> >
> >
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
> > at
> >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > at
> >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > at
> >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > at
> >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > at
> >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> > at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> > at
> >
> >
> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
> > at
> >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > at
> >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > at kafka.network.Processor.handle(SocketServer.scala:296)
> > at kafka.network.Processor.read(SocketServer.scala:319)
> > at kafka.network.Processor.run(SocketServer.scala:214)
> > at java.lang.Thread.run(Thread.java:722)
> >
> > here is the track stace in kafka producer:
> > ERROR Connection attempt to 192.168.75.104:9092 failed, next attempt in
> > 60000 ms (kafka.producer.SyncProducer)
> > java.net.ConnectException: Connection refused
> > at sun.nio.ch.Net.connect(Native Method)
> > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:525)
> > at kafka.producer.SyncProducer.connect(SyncProducer.scala:173)
> > at
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:196)
> > at kafka.producer.SyncProducer.send(SyncProducer.scala:92)
> > at kafka.producer.SyncProducer.multiSend(SyncProducer.scala:135)
> > at
> >
> kafka.producer.async.DefaultEventHandler.send(DefaultEventHandler.scala:58)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:44)
> > at
> >
> >
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:116)
> > at
> >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:95)
> > at
> >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:71)
> > at scala.collection.immutable.Stream.foreach(Stream.scala:254)
> > at
> >
> >
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:70)
> > at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:41)
> >
> > The kafka producer is multi-thread program.
> >
> > Thanks!
> >
> > Best Regards!
> >
> >
> > 2012/7/13 Neha Narkhede <ne...@gmail.com>
> >
> > > In addition to Jun's question,
> > >
> > > which version are you using ? Do you have a reproducible test case ?
> > >
> > > Thanks,
> > > Neha
> > >
> > > On Thu, Jul 12, 2012 at 7:19 AM, Jun Rao <ju...@gmail.com> wrote:
> > > > What's the stack trace?
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Thu, Jul 12, 2012 at 12:55 AM, jjian fan <xiaofanhadoop@gmail.com
> >
> > > wrote:
> > > >
> > > >> HI:
> > > >>
> > > >> Guys, I test kafka in our test high cocunnrent enivorment, I always
> > get
> > > the
> > > >> error message as follows:
> > > >>
> > > >> ERROR Error processing MultiProducerRequest on axxxxxxxx:2
> > > >> (kafka.server.KafkaRequestHandlers)
> > > >> kafka.message.InvalidMessageException: message is invalid,
> compression
> > > >> codec: NoCompressionCodec size: 1034 curr offset: 3114 init offset:
> 0
> > > >>
> > > >> Can anyone help? Thanks!
> > > >>
> > > >> Best Regards
> > > >>
> > > >> Jian Fan
> > > >>
> > >
> >
>

Re: error with kafka

Posted by Jun Rao <ju...@gmail.com>.
That seems like a Kafka bug. Do you have a script that can reproduce this?

Thanks,

Jun

On Thu, Jul 12, 2012 at 5:44 PM, jjian fan <xi...@gmail.com> wrote:

> HI:
> I use kafka0.7.1, here is the stack trace in kafka server:
>
>  ERROR Error processing MultiProducerRequest on bxx:2
> (kafka.server.KafkaRequestHandlers)
> kafka.message.InvalidMessageException: message is invalid, compression
> codec: NoCompressionCodec size: 1030 curr offset: 1034 init offset: 0
> at
>
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
> at
>
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
> at
>
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
> at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
> at kafka.message.MessageSet.foreach(MessageSet.scala:87)
> at kafka.log.Log.append(Log.scala:205)
> at
>
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
> at
>
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> at
>
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> at
>
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> at
>
> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
> at
>
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> at
>
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> at kafka.network.Processor.handle(SocketServer.scala:296)
> at kafka.network.Processor.read(SocketServer.scala:319)
> at kafka.network.Processor.run(SocketServer.scala:214)
> at java.lang.Thread.run(Thread.java:722)
> [2012-07-13 08:40:06,182] ERROR Closing socket for /192.168.75.13 because
> of error (kafka.network.Processor)
> kafka.message.InvalidMessageException: message is invalid, compression
> codec: NoCompressionCodec size: 1030 curr offset: 1034 init offset: 0
> at
>
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
> at
>
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
> at
>
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
> at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
> at kafka.message.MessageSet.foreach(MessageSet.scala:87)
> at kafka.log.Log.append(Log.scala:205)
> at
>
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
> at
>
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> at
>
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> at
>
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> at
>
> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
> at
>
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> at
>
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> at kafka.network.Processor.handle(SocketServer.scala:296)
> at kafka.network.Processor.read(SocketServer.scala:319)
> at kafka.network.Processor.run(SocketServer.scala:214)
> at java.lang.Thread.run(Thread.java:722)
>
> here is the track stace in kafka producer:
> ERROR Connection attempt to 192.168.75.104:9092 failed, next attempt in
> 60000 ms (kafka.producer.SyncProducer)
> java.net.ConnectException: Connection refused
> at sun.nio.ch.Net.connect(Native Method)
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:525)
> at kafka.producer.SyncProducer.connect(SyncProducer.scala:173)
> at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:196)
> at kafka.producer.SyncProducer.send(SyncProducer.scala:92)
> at kafka.producer.SyncProducer.multiSend(SyncProducer.scala:135)
> at
> kafka.producer.async.DefaultEventHandler.send(DefaultEventHandler.scala:58)
> at
>
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:44)
> at
>
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:116)
> at
>
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:95)
> at
>
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:71)
> at scala.collection.immutable.Stream.foreach(Stream.scala:254)
> at
>
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:70)
> at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:41)
>
> The kafka producer is multi-thread program.
>
> Thanks!
>
> Best Regards!
>
>
> 2012/7/13 Neha Narkhede <ne...@gmail.com>
>
> > In addition to Jun's question,
> >
> > which version are you using ? Do you have a reproducible test case ?
> >
> > Thanks,
> > Neha
> >
> > On Thu, Jul 12, 2012 at 7:19 AM, Jun Rao <ju...@gmail.com> wrote:
> > > What's the stack trace?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Thu, Jul 12, 2012 at 12:55 AM, jjian fan <xi...@gmail.com>
> > wrote:
> > >
> > >> HI:
> > >>
> > >> Guys, I test kafka in our test high cocunnrent enivorment, I always
> get
> > the
> > >> error message as follows:
> > >>
> > >> ERROR Error processing MultiProducerRequest on axxxxxxxx:2
> > >> (kafka.server.KafkaRequestHandlers)
> > >> kafka.message.InvalidMessageException: message is invalid, compression
> > >> codec: NoCompressionCodec size: 1034 curr offset: 3114 init offset: 0
> > >>
> > >> Can anyone help? Thanks!
> > >>
> > >> Best Regards
> > >>
> > >> Jian Fan
> > >>
> >
>

Re: error with kafka

Posted by jjian fan <xi...@gmail.com>.
HI:
I use kafka0.7.1, here is the stack trace in kafka server:

 ERROR Error processing MultiProducerRequest on bxx:2
(kafka.server.KafkaRequestHandlers)
kafka.message.InvalidMessageException: message is invalid, compression
codec: NoCompressionCodec size: 1030 curr offset: 1034 init offset: 0
at
kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
at
kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
at
kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
at scala.collection.Iterator$class.foreach(Iterator.scala:631)
at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
at kafka.message.MessageSet.foreach(MessageSet.scala:87)
at kafka.log.Log.append(Log.scala:205)
at
kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
at
kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
at
kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
at
kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
at
kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
at
kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
at kafka.network.Processor.handle(SocketServer.scala:296)
at kafka.network.Processor.read(SocketServer.scala:319)
at kafka.network.Processor.run(SocketServer.scala:214)
at java.lang.Thread.run(Thread.java:722)
[2012-07-13 08:40:06,182] ERROR Closing socket for /192.168.75.13 because
of error (kafka.network.Processor)
kafka.message.InvalidMessageException: message is invalid, compression
codec: NoCompressionCodec size: 1030 curr offset: 1034 init offset: 0
at
kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
at
kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
at
kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
at scala.collection.Iterator$class.foreach(Iterator.scala:631)
at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
at kafka.message.MessageSet.foreach(MessageSet.scala:87)
at kafka.log.Log.append(Log.scala:205)
at
kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
at
kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
at
kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
at
kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
at
kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
at
kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
at kafka.network.Processor.handle(SocketServer.scala:296)
at kafka.network.Processor.read(SocketServer.scala:319)
at kafka.network.Processor.run(SocketServer.scala:214)
at java.lang.Thread.run(Thread.java:722)

here is the track stace in kafka producer:
ERROR Connection attempt to 192.168.75.104:9092 failed, next attempt in
60000 ms (kafka.producer.SyncProducer)
java.net.ConnectException: Connection refused
at sun.nio.ch.Net.connect(Native Method)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:525)
at kafka.producer.SyncProducer.connect(SyncProducer.scala:173)
at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:196)
at kafka.producer.SyncProducer.send(SyncProducer.scala:92)
at kafka.producer.SyncProducer.multiSend(SyncProducer.scala:135)
at
kafka.producer.async.DefaultEventHandler.send(DefaultEventHandler.scala:58)
at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:44)
at
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:116)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:95)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:71)
at scala.collection.immutable.Stream.foreach(Stream.scala:254)
at
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:70)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:41)

The kafka producer is multi-thread program.

Thanks!

Best Regards!


2012/7/13 Neha Narkhede <ne...@gmail.com>

> In addition to Jun's question,
>
> which version are you using ? Do you have a reproducible test case ?
>
> Thanks,
> Neha
>
> On Thu, Jul 12, 2012 at 7:19 AM, Jun Rao <ju...@gmail.com> wrote:
> > What's the stack trace?
> >
> > Thanks,
> >
> > Jun
> >
> > On Thu, Jul 12, 2012 at 12:55 AM, jjian fan <xi...@gmail.com>
> wrote:
> >
> >> HI:
> >>
> >> Guys, I test kafka in our test high cocunnrent enivorment, I always get
> the
> >> error message as follows:
> >>
> >> ERROR Error processing MultiProducerRequest on axxxxxxxx:2
> >> (kafka.server.KafkaRequestHandlers)
> >> kafka.message.InvalidMessageException: message is invalid, compression
> >> codec: NoCompressionCodec size: 1034 curr offset: 3114 init offset: 0
> >>
> >> Can anyone help? Thanks!
> >>
> >> Best Regards
> >>
> >> Jian Fan
> >>
>

Re: error with kafka

Posted by Neha Narkhede <ne...@gmail.com>.
In addition to Jun's question,

which version are you using ? Do you have a reproducible test case ?

Thanks,
Neha

On Thu, Jul 12, 2012 at 7:19 AM, Jun Rao <ju...@gmail.com> wrote:
> What's the stack trace?
>
> Thanks,
>
> Jun
>
> On Thu, Jul 12, 2012 at 12:55 AM, jjian fan <xi...@gmail.com> wrote:
>
>> HI:
>>
>> Guys, I test kafka in our test high cocunnrent enivorment, I always get the
>> error message as follows:
>>
>> ERROR Error processing MultiProducerRequest on axxxxxxxx:2
>> (kafka.server.KafkaRequestHandlers)
>> kafka.message.InvalidMessageException: message is invalid, compression
>> codec: NoCompressionCodec size: 1034 curr offset: 3114 init offset: 0
>>
>> Can anyone help? Thanks!
>>
>> Best Regards
>>
>> Jian Fan
>>

Re: error with kafka

Posted by Jun Rao <ju...@gmail.com>.
What's the stack trace?

Thanks,

Jun

On Thu, Jul 12, 2012 at 12:55 AM, jjian fan <xi...@gmail.com> wrote:

> HI:
>
> Guys, I test kafka in our test high cocunnrent enivorment, I always get the
> error message as follows:
>
> ERROR Error processing MultiProducerRequest on axxxxxxxx:2
> (kafka.server.KafkaRequestHandlers)
> kafka.message.InvalidMessageException: message is invalid, compression
> codec: NoCompressionCodec size: 1034 curr offset: 3114 init offset: 0
>
> Can anyone help? Thanks!
>
> Best Regards
>
> Jian Fan
>