You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by jjian fan <xi...@gmail.com> on 2012/07/27 11:16:36 UTC

error in LogManager.createlog()

    In high cocurrent environment, the tcp server will drop some package
when the tcp buffer is over. Then LogManager.createlog will create some
no-exists topic log. But one thing is very strange, the log directory
should be like a-0,a-1, a-2 and so on ,but file.mkdir() create log
directory like a. Seems some bug in file.mkdir() of LogManager.createlog.

the exception message is

[2012-07-27 17:08:00,559] INFO create directory /data/kafka/axx-0
(kafka.log.LogManager)
[2012-07-27 17:08:00,561] ERROR Error processing MultiProducerRequest on
axx:0 (kafka.server.KafkaRequestHandlers)
java.io.FileNotFoundException: /data/kafka/axx-0/00000000000000000000.kafka
(Is a directory)
at java.io.RandomAccessFile.open(Native Method)
at java.io.RandomAccessFile.<init>(RandomAccessFile.java:233)
at kafka.utils.Utils$.openChannel(Utils.scala:324)
at kafka.message.FileMessageSet.<init>(FileMessageSet.scala:75)
at kafka.log.Log.loadSegments(Log.scala:144)
at kafka.log.Log.<init>(Log.scala:116)
at kafka.log.LogManager.createLog(LogManager.scala:159)
at kafka.log.LogManager.getOrCreateLog(LogManager.scala:214)
at
kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:74)
at
kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
at
kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
at
kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
at
kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
at
kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
at kafka.network.Processor.handle(SocketServer.scala:296)
at kafka.network.Processor.read(SocketServer.scala:319)
at kafka.network.Processor.run(SocketServer.scala:214)
at java.lang.Thread.run(Thread.java:679)

Re: error in LogManager.createlog()

Posted by jjian fan <xi...@gmail.com>.
Jun:

    After we replace the router, the topic name issue isn't happened again!

Thanks!

Jian Fan


2012/8/9 Jun Rao <ju...@gmail.com>

> Thanks for the update. Do you still see the corrupted topic name issue
> after the router issue is fixed?
>
> Thanks,
>
> Jun
>
> On Wed, Aug 8, 2012 at 9:03 PM, jjian fan <xi...@gmail.com> wrote:
>
> > Jun:
> >
> >     I have locate the problem. It was cause by cisio router. In high load
> > scenario, our cisio router(2960s) will drop some packages by its low
> > ability. So socket.recv() should be fine,  we just need to solve the log
> > directory corrupted by topic name with null byte in this scenario.
> >
> > Jian Fan
> >
> > 2012/8/7 Jun Rao <ju...@gmail.com>
> >
> > > Thanks for the pointer to the paper. However, the socket buffer
> overflow
> > > issue mentioned in the paper seems to be a performance issue, not a
> > > correctness issue. That is, whatever bytes socket.recv() get should not
> > be
> > > corrupted. Is this not true?
> > >
> > > Jun
> > >
> > > On Fri, Aug 3, 2012 at 6:54 AM, jjian fan <xi...@gmail.com>
> > wrote:
> > >
> > > > The exception reason may be tcp buffer overflow, pls check the paper
> > > >
> > >
> >
> http://os.korea.ac.kr/publication_papers/inter_journal/jhchoi_cn_2007.pdf
> > > >
> > > > Thanks!
> > > >
> > > > 2012/8/2 jjian fan <xi...@gmail.com>
> > > >
> > > > > Jun:
> > > > >
> > > > >     How about the server power of the broker, you can deploy more
> > > > producer
> > > > > clients to increase the borker pressure. In my test, we send 300
> > > thousand
> > > > > messages per second to the broker, the message size is 1024. In
> this
> > > > > scenario, these exceptions are often be seen.
> > > > >
> > > > > Thanks!
> > > > > Jian Fan
> > > > >
> > > > > 2012/8/1 Jun Rao <ju...@gmail.com>
> > > > >
> > > > >> Jian,
> > > > >>
> > > > >> The message format is documented in the Message class and has the
> > > > >> following
> > > > >> format.
> > > > >>
> > > > >> /**
> > > > >>  * A message. The format of an N byte message is the following:
> > > > >>  *
> > > > >>  * If magic byte is 0
> > > > >>  *
> > > > >>  * 1. 1 byte "magic" identifier to allow format changes
> > > > >>  *
> > > > >>  * 2. 4 byte CRC32 of the payload
> > > > >>  *
> > > > >>  * 3. N - 5 byte payload
> > > > >>  *
> > > > >>  * If magic byte is 1
> > > > >>  *
> > > > >>  * 1. 1 byte "magic" identifier to allow format changes
> > > > >>  *
> > > > >>  * 2. 1 byte "attributes" identifier to allow annotations on the
> > > message
> > > > >> independent of the version (e.g. compression enabled, type of
> codec
> > > > used)
> > > > >>  *
> > > > >>  * 3. 4 byte CRC32 of the payload
> > > > >>  *
> > > > >>  * 4. N - 6 byte payload
> > > > >>  *
> > > > >>  */
> > > > >>
> > > > >> The flow is the following:
> > > > >> 1. SyncProducer.send serializes a MultiProduceRequest to bytes and
> > > sends
> > > > >> the bytes to socket.
> > > > >> 2. On the server side:
> > > > >> 2.1 Processor.read reads the bytes off socket and deserializes the
> > > bytes
> > > > >> into a MultiProduceRequest
> > > > >> 2.2 The request is then handled in KafkaRequestHandler
> > > > >>
> > > > >> BTW, I ran your test for a couple of days, but couldn't reproduce
> > the
> > > > >> exception. In your test, how frequently do you see the exceptions?
> > > > >>
> > > > >> Thanks,
> > > > >>
> > > > >> Jun
> > > > >>
> > > > >> On Wed, Aug 1, 2012 at 6:43 AM, jjian fan <
> xiaofanhadoop@gmail.com>
> > > > >> wrote:
> > > > >>
> > > > >> > Jun:
> > > > >> >
> > > > >> >    Can you give more detail of the bytebuffer structure of
> > messages,
> > > > and
> > > > >> > the process of sending and receiving the messages?
> > > > >> >
> > > > >> > Thanks
> > > > >> >
> > > > >> > Jian Fan
> > > > >> >
> > > > >> >
> > > > >> > 2012/7/31 Jun Rao <ju...@gmail.com>
> > > > >> >
> > > > >> > > Jian,
> > > > >> > >
> > > > >> > > Thanks for the patch. It may not be the right fix though since
> > it
> > > > >> fixes
> > > > >> > > the symptom, but not the cause. For each produce request, the
> > > broker
> > > > >> does
> > > > >> > > the following: (1) read all bytes of the request into
> > > > >> > > a BoundedByteBufferReceive (SocketServer.read); (2) after all
> > > bytes
> > > > of
> > > > >> > the
> > > > >> > > request are ready, deserialize the bytes into a
> ProducerRequest
> > > > >> > > (KafkaRequestHandler.handleProducerRequest); (3) finally,
> serve
> > > the
> > > > >> > request
> > > > >> > > by adding topic data to logs.
> > > > >> > >
> > > > >> > > What you observed is that in step 3, a topic name is corrupted
> > > > >> somehow.
> > > > >> > > However, this means that the corresponding ProducerRequest is
> > > > >> corrupted.
> > > > >> > > Assuming there is no corruption at the network layer (very
> > > > unlikely),
> > > > >> the
> > > > >> > > corruption much have happened in step 1 or step 2. So, instead
> > of
> > > > >> > patching
> > > > >> > > a corrupted topic name, we should understand why a
> > ProducerRequest
> > > > >> can be
> > > > >> > > corrupted and fix the cause. BTW, what's caused the corrupted
> > > topic
> > > > >> could
> > > > >> > > be causing the corrupted messages too.
> > > > >> > >
> > > > >> > > Thanks,
> > > > >> > >
> > > > >> > > Jun
> > > > >> > >
> > > > >> > > On Mon, Jul 30, 2012 at 2:18 AM, jjian fan <
> > > xiaofanhadoop@gmail.com
> > > > >
> > > > >> > > wrote:
> > > > >> > >
> > > > >> > > > Jun:
> > > > >> > > >
> > > > >> > > >   Hi. I find why the error appear. In high cocurrent
> > > environment,
> > > > >> the
> > > > >> > tcp
> > > > >> > > > server will drop some package when the tcp buffer is over.
> So
> > > > there
> > > > >> are
> > > > >> > > > some chances that "topic" contains one or more characters
> that
> > > > >> encode
> > > > >> > to
> > > > >> > > > bytes that include NULL (0).
> > > > >> > > >   I have submit the patch to kafka-411, pls check that!
> > > > >> > > >
> > > > >> > > > Thanks!
> > > > >> > > > Jian Fan
> > > > >> > > >
> > > > >> > > > 2012/7/30 Jun Rao <ju...@gmail.com>
> > > > >> > > >
> > > > >> > > > > Jian,
> > > > >> > > > >
> > > > >> > > > > All log directories in kafka are created by
> > > > >> LogManager.createLog().
> > > > >> > As
> > > > >> > > > you
> > > > >> > > > > can see, the directory always has the form of
> > > topic-partitionId.
> > > > >> So,
> > > > >> > > it's
> > > > >> > > > > not clear how a directory of "a" can be created in your
> > case.
> > > I
> > > > >> will
> > > > >> > > try
> > > > >> > > > to
> > > > >> > > > > rerun your test and see if it can be reproduced.
> > > > >> > > > >
> > > > >> > > > > Thanks,
> > > > >> > > > >
> > > > >> > > > > Jun
> > > > >> > > > >
> > > > >> > > > > On Sat, Jul 28, 2012 at 7:35 PM, jjian fan <
> > > > >> xiaofanhadoop@gmail.com>
> > > > >> > > > > wrote:
> > > > >> > > > >
> > > > >> > > > > > Jay:
> > > > >> > > > > >
> > > > >> > > > > >    You can try to send 600 thousand message per second
> to
> > > the
> > > > >> > broker,
> > > > >> > > > you
> > > > >> > > > > > can find the tcp will drop packages, so sometimes the
> > topic
> > > of
> > > > >> ax
> > > > >> > > will
> > > > >> > > > be
> > > > >> > > > > > a. I don't mean to slove the tcp problem from
> application
> > > > >> level, I
> > > > >> > > just
> > > > >> > > > > > find there are myabe a bug in file.mkdir() of
> > > > >> LogManager.createlog.
> > > > >> > > It
> > > > >> > > > > will
> > > > >> > > > > > infect the kafka useage.
> > > > >> > > > > >
> > > > >> > > > > > Thanks
> > > > >> > > > > > Jian Fan
> > > > >> > > > > >
> > > > >> > > > > > 2012/7/29 Jay Kreps <ja...@gmail.com>
> > > > >> > > > > >
> > > > >> > > > > > > Hmm, that is not my understanding of TCP. TCP is a
> > > reliable
> > > > >> > > protocol
> > > > >> > > > so
> > > > >> > > > > > it
> > > > >> > > > > > > is supposed to either deliver packets in order or
> > timeout
> > > > >> > retrying.
> > > > >> > > > In
> > > > >> > > > > > the
> > > > >> > > > > > > case of the topic name, that is a size-delimited
> string,
> > > > there
> > > > >> > > should
> > > > >> > > > > be
> > > > >> > > > > > no
> > > > >> > > > > > > way for it to drop a single byte in the middle of the
> > > > request
> > > > >> > like
> > > > >> > > > > that.
> > > > >> > > > > > If
> > > > >> > > > > > > that is in fact happening, I don't think it is
> something
> > > we
> > > > >> can
> > > > >> > > hope
> > > > >> > > > to
> > > > >> > > > > > > recover from at the application level...
> > > > >> > > > > > >
> > > > >> > > > > > > -Jay
> > > > >> > > > > > >
> > > > >> > > > > > > On Fri, Jul 27, 2012 at 9:45 PM, jjian fan <
> > > > >> > > xiaofanhadoop@gmail.com>
> > > > >> > > > > > > wrote:
> > > > >> > > > > > >
> > > > >> > > > > > > > Jun:
> > > > >> > > > > > > >    Dropping packages in TCP is an issue of OS/JVM,
> but
> > > it
> > > > >> can
> > > > >> > > also
> > > > >> > > > > > cause
> > > > >> > > > > > > > some kafka issue!
> > > > >> > > > > > > >    For example, the topic of the message is ax, but
> it
> > > can
> > > > >> > change
> > > > >> > > > to
> > > > >> > > > > a
> > > > >> > > > > > in
> > > > >> > > > > > > > broker because the some packages is drop, so the log
> > > > >> directory
> > > > >> > > > > > > >    should be like a-0,a-1, a-2 and so on ,but
> > > file.mkdir()
> > > > >> > create
> > > > >> > > > log
> > > > >> > > > > > > > directory like a. Seems some bugs in file.mkdir() of
> > > > >> > > > > > > LogManager.createlog.
> > > > >> > > > > > > >    If you shutdown the broker and restart it. The
> the
> > > > broker
> > > > >> > will
> > > > >> > > > > > report
> > > > >> > > > > > > > the exception like this:
> > > > >> > > > > > > >
> > > > >> > > > > > > > [2012-07-28 12:43:44,565] INFO Loading log 'a'
> > > > >> > > > (kafka.log.LogManager)
> > > > >> > > > > > > > [2012-07-28 12:43:44,574] FATAL Fatal error during
> > > > >> > > > KafkaServerStable
> > > > >> > > > > > > > startup. Prepare to shutdown
> > > > >> > (kafka.server.KafkaServerStartable)
> > > > >> > > > > > > > java.lang.StringIndexOutOfBoundsException: String
> > index
> > > > out
> > > > >> of
> > > > >> > > > range:
> > > > >> > > > > > -1
> > > > >> > > > > > > >     at java.lang.String.substring(String.java:1949)
> > > > >> > > > > > > >     at
> > > > kafka.utils.Utils$.getTopicPartition(Utils.scala:558)
> > > > >> > > > > > > >     at
> > > > >> > kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:71)
> > > > >> > > > > > > >     at
> > > > >> > kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:65)
> > > > >> > > > > > > >     at
> > > > >> > > > > > > >
> > > > >> > > > > > > >
> > > > >> > > > > > >
> > > > >> > > > > >
> > > > >> > > > >
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> > > > >> > > > > > > >     at
> > > > >> > > scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> > > > >> > > > > > > >     at
> > kafka.log.LogManager.<init>(LogManager.scala:65)
> > > > >> > > > > > > >     at
> > > > >> kafka.server.KafkaServer.startup(KafkaServer.scala:58)
> > > > >> > > > > > > >     at
> > > > >> > > > > > > >
> > > > >> > > > > >
> > > > >> > > >
> > > > >> >
> > > >
> > kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)
> > > > >> > > > > > > >     at kafka.Kafka$.main(Kafka.scala:50)
> > > > >> > > > > > > >     at kafka.Kafka.main(Kafka.scala)
> > > > >> > > > > > > >
> > > > >> > > > > > > >
> > > > >> > > > > > > > 2012/7/28 Jun Rao <ju...@gmail.com>
> > > > >> > > > > > > >
> > > > >> > > > > > > > > Jian,
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > I am not sure if I understand this completely.
> > > Dropping
> > > > >> > > packages
> > > > >> > > > in
> > > > >> > > > > > TCP
> > > > >> > > > > > > > > shouldn't cause corruption in the TCP buffer,
> right?
> > > Is
> > > > >> this
> > > > >> > an
> > > > >> > > > > issue
> > > > >> > > > > > > in
> > > > >> > > > > > > > > Kafka or OS/JVM?
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > Thanks,
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > Jun
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > On Fri, Jul 27, 2012 at 8:29 PM, jjian fan <
> > > > >> > > > > xiaofanhadoop@gmail.com>
> > > > >> > > > > > > > > wrote:
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > > Jun:
> > > > >> > > > > > > > > > Yes, if the socket server can't handle the
> package
> > > > >> quickly,
> > > > >> > > tcp
> > > > >> > > > > > > > protocol
> > > > >> > > > > > > > > > will drop some network package until the buffer
> is
> > > > >> > overflow,
> > > > >> > > >  the
> > > > >> > > > > > > > > corrupted
> > > > >> > > > > > > > > > messages is also appear on this situtation!  I
> > run a
> > > > >> > > systemtap
> > > > >> > > > > > script
> > > > >> > > > > > > > to
> > > > >> > > > > > > > > > find the package droping ,also you can type "
> cat
> > > > >> > > > > > /proc/net/sockstat"
> > > > >> > > > > > > > to
> > > > >> > > > > > > > > > see the tcp memory increase.  I debug the whole
> > > kafka
> > > > >> > source
> > > > >> > > > code
> > > > >> > > > > > to
> > > > >> > > > > > > > find
> > > > >> > > > > > > > > > the bug in file.mkdir() of LogManager.createlog.
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > JIan Fan
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > 2012/7/27 Jun Rao <ju...@gmail.com>
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > > Thanks for the finding. Are you saying that
> this
> > > > >> problem
> > > > >> > is
> > > > >> > > > > > caused
> > > > >> > > > > > > by
> > > > >> > > > > > > > > the
> > > > >> > > > > > > > > > > buffering in Kafka socket server? How did you
> > > figure
> > > > >> that
> > > > >> > > > out?
> > > > >> > > > > Is
> > > > >> > > > > > > > this
> > > > >> > > > > > > > > > > problem exposed by the same test that caused
> the
> > > > >> > corrupted
> > > > >> > > > > > messages
> > > > >> > > > > > > > in
> > > > >> > > > > > > > > > the
> > > > >> > > > > > > > > > > broker?
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > > Thanks,
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > > Jun
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > > On Fri, Jul 27, 2012 at 2:16 AM, jjian fan <
> > > > >> > > > > > > xiaofanhadoop@gmail.com>
> > > > >> > > > > > > > > > > wrote:
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > > >     In high cocurrent environment, the tcp
> > > server
> > > > >> will
> > > > >> > > drop
> > > > >> > > > > > some
> > > > >> > > > > > > > > > package
> > > > >> > > > > > > > > > > > when the tcp buffer is over. Then
> > > > >> LogManager.createlog
> > > > >> > > will
> > > > >> > > > > > > create
> > > > >> > > > > > > > > some
> > > > >> > > > > > > > > > > > no-exists topic log. But one thing is very
> > > > strange,
> > > > >> the
> > > > >> > > log
> > > > >> > > > > > > > directory
> > > > >> > > > > > > > > > > > should be like a-0,a-1, a-2 and so on ,but
> > > > >> file.mkdir()
> > > > >> > > > > create
> > > > >> > > > > > > log
> > > > >> > > > > > > > > > > > directory like a. Seems some bug in
> > file.mkdir()
> > > > of
> > > > >> > > > > > > > > > LogManager.createlog.
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > > the exception message is
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > > [2012-07-27 17:08:00,559] INFO create
> > directory
> > > > >> > > > > > /data/kafka/axx-0
> > > > >> > > > > > > > > > > > (kafka.log.LogManager)
> > > > >> > > > > > > > > > > > [2012-07-27 17:08:00,561] ERROR Error
> > processing
> > > > >> > > > > > > > MultiProducerRequest
> > > > >> > > > > > > > > > on
> > > > >> > > > > > > > > > > > axx:0 (kafka.server.KafkaRequestHandlers)
> > > > >> > > > > > > > > > > > java.io.FileNotFoundException:
> > > > >> > > > > > > > > > > /data/kafka/axx-0/00000000000000000000.kafka
> > > > >> > > > > > > > > > > > (Is a directory)
> > > > >> > > > > > > > > > > > at java.io.RandomAccessFile.open(Native
> > Method)
> > > > >> > > > > > > > > > > > at
> > > > >> > > > java.io.RandomAccessFile.<init>(RandomAccessFile.java:233)
> > > > >> > > > > > > > > > > > at
> > > kafka.utils.Utils$.openChannel(Utils.scala:324)
> > > > >> > > > > > > > > > > > at
> > > > >> > > > >
> kafka.message.FileMessageSet.<init>(FileMessageSet.scala:75)
> > > > >> > > > > > > > > > > > at kafka.log.Log.loadSegments(Log.scala:144)
> > > > >> > > > > > > > > > > > at kafka.log.Log.<init>(Log.scala:116)
> > > > >> > > > > > > > > > > > at
> > > > >> kafka.log.LogManager.createLog(LogManager.scala:159)
> > > > >> > > > > > > > > > > > at
> > > > >> > > > kafka.log.LogManager.getOrCreateLog(LogManager.scala:214)
> > > > >> > > > > > > > > > > > at
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > >
> > > > >> > > > > > > >
> > > > >> > > > > > >
> > > > >> > > > > >
> > > > >> > > > >
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:74)
> > > > >> > > > > > > > > > > > at
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > >
> > > > >> > > > > > > >
> > > > >> > > > > > >
> > > > >> > > > > >
> > > > >> > > > >
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > > >> > > > > > > > > > > > at
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > >
> > > > >> > > > > > > >
> > > > >> > > > > > >
> > > > >> > > > > >
> > > > >> > > > >
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > > >> > > > > > > > > > > > at
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > >
> > > > >> > > > > > > >
> > > > >> > > > > > >
> > > > >> > > > > >
> > > > >> > > > >
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > > >> > > > > > > > > > > > at
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > >
> > > > >> > > > > > > >
> > > > >> > > > > > >
> > > > >> > > > > >
> > > > >> > > > >
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > > >> > > > > > > > > > > > at
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > >
> > > > >> > > > > > > >
> > > > >> > > > > > >
> > > > >> > > > > >
> > > > >> > > > >
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> > > > >> > > > > > > > > > > > at
> > > > >> > > > >
> scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> > > > >> > > > > > > > > > > > at
> > > > >> > > > > > > > > >
> > > > >> > > > > >
> > > > >> >
> > > scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> > > > >> > > > > > > > > > > > at
> > > > >> > > scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> > > > >> > > > > > > > > > > > at
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > >
> > > > >> > > > > > > >
> > > > >> > > > > > >
> > > > >> > > > > >
> > > > >> > > > >
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
> > > > >> > > > > > > > > > > > at
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > >
> > > > >> > > > > > > >
> > > > >> > > > > > >
> > > > >> > > > > >
> > > > >> > > > >
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > > >> > > > > > > > > > > > at
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > >
> > > > >> > > > > > > >
> > > > >> > > > > > >
> > > > >> > > > > >
> > > > >> > > > >
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > > >> > > > > > > > > > > > at
> > > > >> > kafka.network.Processor.handle(SocketServer.scala:296)
> > > > >> > > > > > > > > > > > at
> > > > >> kafka.network.Processor.read(SocketServer.scala:319)
> > > > >> > > > > > > > > > > > at
> > > > >> kafka.network.Processor.run(SocketServer.scala:214)
> > > > >> > > > > > > > > > > > at java.lang.Thread.run(Thread.java:679)
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > >
> > > > >> > > > > > > >
> > > > >> > > > > > >
> > > > >> > > > > >
> > > > >> > > > >
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>

Re: error in LogManager.createlog()

Posted by Jun Rao <ju...@gmail.com>.
Thanks for the update. Do you still see the corrupted topic name issue
after the router issue is fixed?

Thanks,

Jun

On Wed, Aug 8, 2012 at 9:03 PM, jjian fan <xi...@gmail.com> wrote:

> Jun:
>
>     I have locate the problem. It was cause by cisio router. In high load
> scenario, our cisio router(2960s) will drop some packages by its low
> ability. So socket.recv() should be fine,  we just need to solve the log
> directory corrupted by topic name with null byte in this scenario.
>
> Jian Fan
>
> 2012/8/7 Jun Rao <ju...@gmail.com>
>
> > Thanks for the pointer to the paper. However, the socket buffer overflow
> > issue mentioned in the paper seems to be a performance issue, not a
> > correctness issue. That is, whatever bytes socket.recv() get should not
> be
> > corrupted. Is this not true?
> >
> > Jun
> >
> > On Fri, Aug 3, 2012 at 6:54 AM, jjian fan <xi...@gmail.com>
> wrote:
> >
> > > The exception reason may be tcp buffer overflow, pls check the paper
> > >
> >
> http://os.korea.ac.kr/publication_papers/inter_journal/jhchoi_cn_2007.pdf
> > >
> > > Thanks!
> > >
> > > 2012/8/2 jjian fan <xi...@gmail.com>
> > >
> > > > Jun:
> > > >
> > > >     How about the server power of the broker, you can deploy more
> > > producer
> > > > clients to increase the borker pressure. In my test, we send 300
> > thousand
> > > > messages per second to the broker, the message size is 1024. In this
> > > > scenario, these exceptions are often be seen.
> > > >
> > > > Thanks!
> > > > Jian Fan
> > > >
> > > > 2012/8/1 Jun Rao <ju...@gmail.com>
> > > >
> > > >> Jian,
> > > >>
> > > >> The message format is documented in the Message class and has the
> > > >> following
> > > >> format.
> > > >>
> > > >> /**
> > > >>  * A message. The format of an N byte message is the following:
> > > >>  *
> > > >>  * If magic byte is 0
> > > >>  *
> > > >>  * 1. 1 byte "magic" identifier to allow format changes
> > > >>  *
> > > >>  * 2. 4 byte CRC32 of the payload
> > > >>  *
> > > >>  * 3. N - 5 byte payload
> > > >>  *
> > > >>  * If magic byte is 1
> > > >>  *
> > > >>  * 1. 1 byte "magic" identifier to allow format changes
> > > >>  *
> > > >>  * 2. 1 byte "attributes" identifier to allow annotations on the
> > message
> > > >> independent of the version (e.g. compression enabled, type of codec
> > > used)
> > > >>  *
> > > >>  * 3. 4 byte CRC32 of the payload
> > > >>  *
> > > >>  * 4. N - 6 byte payload
> > > >>  *
> > > >>  */
> > > >>
> > > >> The flow is the following:
> > > >> 1. SyncProducer.send serializes a MultiProduceRequest to bytes and
> > sends
> > > >> the bytes to socket.
> > > >> 2. On the server side:
> > > >> 2.1 Processor.read reads the bytes off socket and deserializes the
> > bytes
> > > >> into a MultiProduceRequest
> > > >> 2.2 The request is then handled in KafkaRequestHandler
> > > >>
> > > >> BTW, I ran your test for a couple of days, but couldn't reproduce
> the
> > > >> exception. In your test, how frequently do you see the exceptions?
> > > >>
> > > >> Thanks,
> > > >>
> > > >> Jun
> > > >>
> > > >> On Wed, Aug 1, 2012 at 6:43 AM, jjian fan <xi...@gmail.com>
> > > >> wrote:
> > > >>
> > > >> > Jun:
> > > >> >
> > > >> >    Can you give more detail of the bytebuffer structure of
> messages,
> > > and
> > > >> > the process of sending and receiving the messages?
> > > >> >
> > > >> > Thanks
> > > >> >
> > > >> > Jian Fan
> > > >> >
> > > >> >
> > > >> > 2012/7/31 Jun Rao <ju...@gmail.com>
> > > >> >
> > > >> > > Jian,
> > > >> > >
> > > >> > > Thanks for the patch. It may not be the right fix though since
> it
> > > >> fixes
> > > >> > > the symptom, but not the cause. For each produce request, the
> > broker
> > > >> does
> > > >> > > the following: (1) read all bytes of the request into
> > > >> > > a BoundedByteBufferReceive (SocketServer.read); (2) after all
> > bytes
> > > of
> > > >> > the
> > > >> > > request are ready, deserialize the bytes into a ProducerRequest
> > > >> > > (KafkaRequestHandler.handleProducerRequest); (3) finally, serve
> > the
> > > >> > request
> > > >> > > by adding topic data to logs.
> > > >> > >
> > > >> > > What you observed is that in step 3, a topic name is corrupted
> > > >> somehow.
> > > >> > > However, this means that the corresponding ProducerRequest is
> > > >> corrupted.
> > > >> > > Assuming there is no corruption at the network layer (very
> > > unlikely),
> > > >> the
> > > >> > > corruption much have happened in step 1 or step 2. So, instead
> of
> > > >> > patching
> > > >> > > a corrupted topic name, we should understand why a
> ProducerRequest
> > > >> can be
> > > >> > > corrupted and fix the cause. BTW, what's caused the corrupted
> > topic
> > > >> could
> > > >> > > be causing the corrupted messages too.
> > > >> > >
> > > >> > > Thanks,
> > > >> > >
> > > >> > > Jun
> > > >> > >
> > > >> > > On Mon, Jul 30, 2012 at 2:18 AM, jjian fan <
> > xiaofanhadoop@gmail.com
> > > >
> > > >> > > wrote:
> > > >> > >
> > > >> > > > Jun:
> > > >> > > >
> > > >> > > >   Hi. I find why the error appear. In high cocurrent
> > environment,
> > > >> the
> > > >> > tcp
> > > >> > > > server will drop some package when the tcp buffer is over. So
> > > there
> > > >> are
> > > >> > > > some chances that "topic" contains one or more characters that
> > > >> encode
> > > >> > to
> > > >> > > > bytes that include NULL (0).
> > > >> > > >   I have submit the patch to kafka-411, pls check that!
> > > >> > > >
> > > >> > > > Thanks!
> > > >> > > > Jian Fan
> > > >> > > >
> > > >> > > > 2012/7/30 Jun Rao <ju...@gmail.com>
> > > >> > > >
> > > >> > > > > Jian,
> > > >> > > > >
> > > >> > > > > All log directories in kafka are created by
> > > >> LogManager.createLog().
> > > >> > As
> > > >> > > > you
> > > >> > > > > can see, the directory always has the form of
> > topic-partitionId.
> > > >> So,
> > > >> > > it's
> > > >> > > > > not clear how a directory of "a" can be created in your
> case.
> > I
> > > >> will
> > > >> > > try
> > > >> > > > to
> > > >> > > > > rerun your test and see if it can be reproduced.
> > > >> > > > >
> > > >> > > > > Thanks,
> > > >> > > > >
> > > >> > > > > Jun
> > > >> > > > >
> > > >> > > > > On Sat, Jul 28, 2012 at 7:35 PM, jjian fan <
> > > >> xiaofanhadoop@gmail.com>
> > > >> > > > > wrote:
> > > >> > > > >
> > > >> > > > > > Jay:
> > > >> > > > > >
> > > >> > > > > >    You can try to send 600 thousand message per second to
> > the
> > > >> > broker,
> > > >> > > > you
> > > >> > > > > > can find the tcp will drop packages, so sometimes the
> topic
> > of
> > > >> ax
> > > >> > > will
> > > >> > > > be
> > > >> > > > > > a. I don't mean to slove the tcp problem from application
> > > >> level, I
> > > >> > > just
> > > >> > > > > > find there are myabe a bug in file.mkdir() of
> > > >> LogManager.createlog.
> > > >> > > It
> > > >> > > > > will
> > > >> > > > > > infect the kafka useage.
> > > >> > > > > >
> > > >> > > > > > Thanks
> > > >> > > > > > Jian Fan
> > > >> > > > > >
> > > >> > > > > > 2012/7/29 Jay Kreps <ja...@gmail.com>
> > > >> > > > > >
> > > >> > > > > > > Hmm, that is not my understanding of TCP. TCP is a
> > reliable
> > > >> > > protocol
> > > >> > > > so
> > > >> > > > > > it
> > > >> > > > > > > is supposed to either deliver packets in order or
> timeout
> > > >> > retrying.
> > > >> > > > In
> > > >> > > > > > the
> > > >> > > > > > > case of the topic name, that is a size-delimited string,
> > > there
> > > >> > > should
> > > >> > > > > be
> > > >> > > > > > no
> > > >> > > > > > > way for it to drop a single byte in the middle of the
> > > request
> > > >> > like
> > > >> > > > > that.
> > > >> > > > > > If
> > > >> > > > > > > that is in fact happening, I don't think it is something
> > we
> > > >> can
> > > >> > > hope
> > > >> > > > to
> > > >> > > > > > > recover from at the application level...
> > > >> > > > > > >
> > > >> > > > > > > -Jay
> > > >> > > > > > >
> > > >> > > > > > > On Fri, Jul 27, 2012 at 9:45 PM, jjian fan <
> > > >> > > xiaofanhadoop@gmail.com>
> > > >> > > > > > > wrote:
> > > >> > > > > > >
> > > >> > > > > > > > Jun:
> > > >> > > > > > > >    Dropping packages in TCP is an issue of OS/JVM, but
> > it
> > > >> can
> > > >> > > also
> > > >> > > > > > cause
> > > >> > > > > > > > some kafka issue!
> > > >> > > > > > > >    For example, the topic of the message is ax, but it
> > can
> > > >> > change
> > > >> > > > to
> > > >> > > > > a
> > > >> > > > > > in
> > > >> > > > > > > > broker because the some packages is drop, so the log
> > > >> directory
> > > >> > > > > > > >    should be like a-0,a-1, a-2 and so on ,but
> > file.mkdir()
> > > >> > create
> > > >> > > > log
> > > >> > > > > > > > directory like a. Seems some bugs in file.mkdir() of
> > > >> > > > > > > LogManager.createlog.
> > > >> > > > > > > >    If you shutdown the broker and restart it. The the
> > > broker
> > > >> > will
> > > >> > > > > > report
> > > >> > > > > > > > the exception like this:
> > > >> > > > > > > >
> > > >> > > > > > > > [2012-07-28 12:43:44,565] INFO Loading log 'a'
> > > >> > > > (kafka.log.LogManager)
> > > >> > > > > > > > [2012-07-28 12:43:44,574] FATAL Fatal error during
> > > >> > > > KafkaServerStable
> > > >> > > > > > > > startup. Prepare to shutdown
> > > >> > (kafka.server.KafkaServerStartable)
> > > >> > > > > > > > java.lang.StringIndexOutOfBoundsException: String
> index
> > > out
> > > >> of
> > > >> > > > range:
> > > >> > > > > > -1
> > > >> > > > > > > >     at java.lang.String.substring(String.java:1949)
> > > >> > > > > > > >     at
> > > kafka.utils.Utils$.getTopicPartition(Utils.scala:558)
> > > >> > > > > > > >     at
> > > >> > kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:71)
> > > >> > > > > > > >     at
> > > >> > kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:65)
> > > >> > > > > > > >     at
> > > >> > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > >
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> > > >> > > > > > > >     at
> > > >> > > scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> > > >> > > > > > > >     at
> kafka.log.LogManager.<init>(LogManager.scala:65)
> > > >> > > > > > > >     at
> > > >> kafka.server.KafkaServer.startup(KafkaServer.scala:58)
> > > >> > > > > > > >     at
> > > >> > > > > > > >
> > > >> > > > > >
> > > >> > > >
> > > >> >
> > >
> kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)
> > > >> > > > > > > >     at kafka.Kafka$.main(Kafka.scala:50)
> > > >> > > > > > > >     at kafka.Kafka.main(Kafka.scala)
> > > >> > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > > > 2012/7/28 Jun Rao <ju...@gmail.com>
> > > >> > > > > > > >
> > > >> > > > > > > > > Jian,
> > > >> > > > > > > > >
> > > >> > > > > > > > > I am not sure if I understand this completely.
> > Dropping
> > > >> > > packages
> > > >> > > > in
> > > >> > > > > > TCP
> > > >> > > > > > > > > shouldn't cause corruption in the TCP buffer, right?
> > Is
> > > >> this
> > > >> > an
> > > >> > > > > issue
> > > >> > > > > > > in
> > > >> > > > > > > > > Kafka or OS/JVM?
> > > >> > > > > > > > >
> > > >> > > > > > > > > Thanks,
> > > >> > > > > > > > >
> > > >> > > > > > > > > Jun
> > > >> > > > > > > > >
> > > >> > > > > > > > > On Fri, Jul 27, 2012 at 8:29 PM, jjian fan <
> > > >> > > > > xiaofanhadoop@gmail.com>
> > > >> > > > > > > > > wrote:
> > > >> > > > > > > > >
> > > >> > > > > > > > > > Jun:
> > > >> > > > > > > > > > Yes, if the socket server can't handle the package
> > > >> quickly,
> > > >> > > tcp
> > > >> > > > > > > > protocol
> > > >> > > > > > > > > > will drop some network package until the buffer is
> > > >> > overflow,
> > > >> > > >  the
> > > >> > > > > > > > > corrupted
> > > >> > > > > > > > > > messages is also appear on this situtation!  I
> run a
> > > >> > > systemtap
> > > >> > > > > > script
> > > >> > > > > > > > to
> > > >> > > > > > > > > > find the package droping ,also you can type " cat
> > > >> > > > > > /proc/net/sockstat"
> > > >> > > > > > > > to
> > > >> > > > > > > > > > see the tcp memory increase.  I debug the whole
> > kafka
> > > >> > source
> > > >> > > > code
> > > >> > > > > > to
> > > >> > > > > > > > find
> > > >> > > > > > > > > > the bug in file.mkdir() of LogManager.createlog.
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > JIan Fan
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > 2012/7/27 Jun Rao <ju...@gmail.com>
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > > Thanks for the finding. Are you saying that this
> > > >> problem
> > > >> > is
> > > >> > > > > > caused
> > > >> > > > > > > by
> > > >> > > > > > > > > the
> > > >> > > > > > > > > > > buffering in Kafka socket server? How did you
> > figure
> > > >> that
> > > >> > > > out?
> > > >> > > > > Is
> > > >> > > > > > > > this
> > > >> > > > > > > > > > > problem exposed by the same test that caused the
> > > >> > corrupted
> > > >> > > > > > messages
> > > >> > > > > > > > in
> > > >> > > > > > > > > > the
> > > >> > > > > > > > > > > broker?
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > > Thanks,
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > > Jun
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > > On Fri, Jul 27, 2012 at 2:16 AM, jjian fan <
> > > >> > > > > > > xiaofanhadoop@gmail.com>
> > > >> > > > > > > > > > > wrote:
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > > >     In high cocurrent environment, the tcp
> > server
> > > >> will
> > > >> > > drop
> > > >> > > > > > some
> > > >> > > > > > > > > > package
> > > >> > > > > > > > > > > > when the tcp buffer is over. Then
> > > >> LogManager.createlog
> > > >> > > will
> > > >> > > > > > > create
> > > >> > > > > > > > > some
> > > >> > > > > > > > > > > > no-exists topic log. But one thing is very
> > > strange,
> > > >> the
> > > >> > > log
> > > >> > > > > > > > directory
> > > >> > > > > > > > > > > > should be like a-0,a-1, a-2 and so on ,but
> > > >> file.mkdir()
> > > >> > > > > create
> > > >> > > > > > > log
> > > >> > > > > > > > > > > > directory like a. Seems some bug in
> file.mkdir()
> > > of
> > > >> > > > > > > > > > LogManager.createlog.
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > > the exception message is
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > > [2012-07-27 17:08:00,559] INFO create
> directory
> > > >> > > > > > /data/kafka/axx-0
> > > >> > > > > > > > > > > > (kafka.log.LogManager)
> > > >> > > > > > > > > > > > [2012-07-27 17:08:00,561] ERROR Error
> processing
> > > >> > > > > > > > MultiProducerRequest
> > > >> > > > > > > > > > on
> > > >> > > > > > > > > > > > axx:0 (kafka.server.KafkaRequestHandlers)
> > > >> > > > > > > > > > > > java.io.FileNotFoundException:
> > > >> > > > > > > > > > > /data/kafka/axx-0/00000000000000000000.kafka
> > > >> > > > > > > > > > > > (Is a directory)
> > > >> > > > > > > > > > > > at java.io.RandomAccessFile.open(Native
> Method)
> > > >> > > > > > > > > > > > at
> > > >> > > > java.io.RandomAccessFile.<init>(RandomAccessFile.java:233)
> > > >> > > > > > > > > > > > at
> > kafka.utils.Utils$.openChannel(Utils.scala:324)
> > > >> > > > > > > > > > > > at
> > > >> > > > > kafka.message.FileMessageSet.<init>(FileMessageSet.scala:75)
> > > >> > > > > > > > > > > > at kafka.log.Log.loadSegments(Log.scala:144)
> > > >> > > > > > > > > > > > at kafka.log.Log.<init>(Log.scala:116)
> > > >> > > > > > > > > > > > at
> > > >> kafka.log.LogManager.createLog(LogManager.scala:159)
> > > >> > > > > > > > > > > > at
> > > >> > > > kafka.log.LogManager.getOrCreateLog(LogManager.scala:214)
> > > >> > > > > > > > > > > > at
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > >
> > > >> > > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > >
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:74)
> > > >> > > > > > > > > > > > at
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > >
> > > >> > > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > >
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > >> > > > > > > > > > > > at
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > >
> > > >> > > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > >
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > >> > > > > > > > > > > > at
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > >
> > > >> > > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > >
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > >> > > > > > > > > > > > at
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > >
> > > >> > > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > >
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > >> > > > > > > > > > > > at
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > >
> > > >> > > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > >
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> > > >> > > > > > > > > > > > at
> > > >> > > > > scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> > > >> > > > > > > > > > > > at
> > > >> > > > > > > > > >
> > > >> > > > > >
> > > >> >
> > scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> > > >> > > > > > > > > > > > at
> > > >> > > scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> > > >> > > > > > > > > > > > at
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > >
> > > >> > > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > >
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
> > > >> > > > > > > > > > > > at
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > >
> > > >> > > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > >
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > >> > > > > > > > > > > > at
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > >
> > > >> > > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > >
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > >> > > > > > > > > > > > at
> > > >> > kafka.network.Processor.handle(SocketServer.scala:296)
> > > >> > > > > > > > > > > > at
> > > >> kafka.network.Processor.read(SocketServer.scala:319)
> > > >> > > > > > > > > > > > at
> > > >> kafka.network.Processor.run(SocketServer.scala:214)
> > > >> > > > > > > > > > > > at java.lang.Thread.run(Thread.java:679)
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > >
> > > >> > > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > >
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > > >
> > > >
> > >
> >
>

Re: error in LogManager.createlog()

Posted by jjian fan <xi...@gmail.com>.
Jun:

    I have locate the problem. It was cause by cisio router. In high load
scenario, our cisio router(2960s) will drop some packages by its low
ability. So socket.recv() should be fine,  we just need to solve the log
directory corrupted by topic name with null byte in this scenario.

Jian Fan

2012/8/7 Jun Rao <ju...@gmail.com>

> Thanks for the pointer to the paper. However, the socket buffer overflow
> issue mentioned in the paper seems to be a performance issue, not a
> correctness issue. That is, whatever bytes socket.recv() get should not be
> corrupted. Is this not true?
>
> Jun
>
> On Fri, Aug 3, 2012 at 6:54 AM, jjian fan <xi...@gmail.com> wrote:
>
> > The exception reason may be tcp buffer overflow, pls check the paper
> >
> http://os.korea.ac.kr/publication_papers/inter_journal/jhchoi_cn_2007.pdf
> >
> > Thanks!
> >
> > 2012/8/2 jjian fan <xi...@gmail.com>
> >
> > > Jun:
> > >
> > >     How about the server power of the broker, you can deploy more
> > producer
> > > clients to increase the borker pressure. In my test, we send 300
> thousand
> > > messages per second to the broker, the message size is 1024. In this
> > > scenario, these exceptions are often be seen.
> > >
> > > Thanks!
> > > Jian Fan
> > >
> > > 2012/8/1 Jun Rao <ju...@gmail.com>
> > >
> > >> Jian,
> > >>
> > >> The message format is documented in the Message class and has the
> > >> following
> > >> format.
> > >>
> > >> /**
> > >>  * A message. The format of an N byte message is the following:
> > >>  *
> > >>  * If magic byte is 0
> > >>  *
> > >>  * 1. 1 byte "magic" identifier to allow format changes
> > >>  *
> > >>  * 2. 4 byte CRC32 of the payload
> > >>  *
> > >>  * 3. N - 5 byte payload
> > >>  *
> > >>  * If magic byte is 1
> > >>  *
> > >>  * 1. 1 byte "magic" identifier to allow format changes
> > >>  *
> > >>  * 2. 1 byte "attributes" identifier to allow annotations on the
> message
> > >> independent of the version (e.g. compression enabled, type of codec
> > used)
> > >>  *
> > >>  * 3. 4 byte CRC32 of the payload
> > >>  *
> > >>  * 4. N - 6 byte payload
> > >>  *
> > >>  */
> > >>
> > >> The flow is the following:
> > >> 1. SyncProducer.send serializes a MultiProduceRequest to bytes and
> sends
> > >> the bytes to socket.
> > >> 2. On the server side:
> > >> 2.1 Processor.read reads the bytes off socket and deserializes the
> bytes
> > >> into a MultiProduceRequest
> > >> 2.2 The request is then handled in KafkaRequestHandler
> > >>
> > >> BTW, I ran your test for a couple of days, but couldn't reproduce the
> > >> exception. In your test, how frequently do you see the exceptions?
> > >>
> > >> Thanks,
> > >>
> > >> Jun
> > >>
> > >> On Wed, Aug 1, 2012 at 6:43 AM, jjian fan <xi...@gmail.com>
> > >> wrote:
> > >>
> > >> > Jun:
> > >> >
> > >> >    Can you give more detail of the bytebuffer structure of messages,
> > and
> > >> > the process of sending and receiving the messages?
> > >> >
> > >> > Thanks
> > >> >
> > >> > Jian Fan
> > >> >
> > >> >
> > >> > 2012/7/31 Jun Rao <ju...@gmail.com>
> > >> >
> > >> > > Jian,
> > >> > >
> > >> > > Thanks for the patch. It may not be the right fix though since it
> > >> fixes
> > >> > > the symptom, but not the cause. For each produce request, the
> broker
> > >> does
> > >> > > the following: (1) read all bytes of the request into
> > >> > > a BoundedByteBufferReceive (SocketServer.read); (2) after all
> bytes
> > of
> > >> > the
> > >> > > request are ready, deserialize the bytes into a ProducerRequest
> > >> > > (KafkaRequestHandler.handleProducerRequest); (3) finally, serve
> the
> > >> > request
> > >> > > by adding topic data to logs.
> > >> > >
> > >> > > What you observed is that in step 3, a topic name is corrupted
> > >> somehow.
> > >> > > However, this means that the corresponding ProducerRequest is
> > >> corrupted.
> > >> > > Assuming there is no corruption at the network layer (very
> > unlikely),
> > >> the
> > >> > > corruption much have happened in step 1 or step 2. So, instead of
> > >> > patching
> > >> > > a corrupted topic name, we should understand why a ProducerRequest
> > >> can be
> > >> > > corrupted and fix the cause. BTW, what's caused the corrupted
> topic
> > >> could
> > >> > > be causing the corrupted messages too.
> > >> > >
> > >> > > Thanks,
> > >> > >
> > >> > > Jun
> > >> > >
> > >> > > On Mon, Jul 30, 2012 at 2:18 AM, jjian fan <
> xiaofanhadoop@gmail.com
> > >
> > >> > > wrote:
> > >> > >
> > >> > > > Jun:
> > >> > > >
> > >> > > >   Hi. I find why the error appear. In high cocurrent
> environment,
> > >> the
> > >> > tcp
> > >> > > > server will drop some package when the tcp buffer is over. So
> > there
> > >> are
> > >> > > > some chances that "topic" contains one or more characters that
> > >> encode
> > >> > to
> > >> > > > bytes that include NULL (0).
> > >> > > >   I have submit the patch to kafka-411, pls check that!
> > >> > > >
> > >> > > > Thanks!
> > >> > > > Jian Fan
> > >> > > >
> > >> > > > 2012/7/30 Jun Rao <ju...@gmail.com>
> > >> > > >
> > >> > > > > Jian,
> > >> > > > >
> > >> > > > > All log directories in kafka are created by
> > >> LogManager.createLog().
> > >> > As
> > >> > > > you
> > >> > > > > can see, the directory always has the form of
> topic-partitionId.
> > >> So,
> > >> > > it's
> > >> > > > > not clear how a directory of "a" can be created in your case.
> I
> > >> will
> > >> > > try
> > >> > > > to
> > >> > > > > rerun your test and see if it can be reproduced.
> > >> > > > >
> > >> > > > > Thanks,
> > >> > > > >
> > >> > > > > Jun
> > >> > > > >
> > >> > > > > On Sat, Jul 28, 2012 at 7:35 PM, jjian fan <
> > >> xiaofanhadoop@gmail.com>
> > >> > > > > wrote:
> > >> > > > >
> > >> > > > > > Jay:
> > >> > > > > >
> > >> > > > > >    You can try to send 600 thousand message per second to
> the
> > >> > broker,
> > >> > > > you
> > >> > > > > > can find the tcp will drop packages, so sometimes the topic
> of
> > >> ax
> > >> > > will
> > >> > > > be
> > >> > > > > > a. I don't mean to slove the tcp problem from application
> > >> level, I
> > >> > > just
> > >> > > > > > find there are myabe a bug in file.mkdir() of
> > >> LogManager.createlog.
> > >> > > It
> > >> > > > > will
> > >> > > > > > infect the kafka useage.
> > >> > > > > >
> > >> > > > > > Thanks
> > >> > > > > > Jian Fan
> > >> > > > > >
> > >> > > > > > 2012/7/29 Jay Kreps <ja...@gmail.com>
> > >> > > > > >
> > >> > > > > > > Hmm, that is not my understanding of TCP. TCP is a
> reliable
> > >> > > protocol
> > >> > > > so
> > >> > > > > > it
> > >> > > > > > > is supposed to either deliver packets in order or timeout
> > >> > retrying.
> > >> > > > In
> > >> > > > > > the
> > >> > > > > > > case of the topic name, that is a size-delimited string,
> > there
> > >> > > should
> > >> > > > > be
> > >> > > > > > no
> > >> > > > > > > way for it to drop a single byte in the middle of the
> > request
> > >> > like
> > >> > > > > that.
> > >> > > > > > If
> > >> > > > > > > that is in fact happening, I don't think it is something
> we
> > >> can
> > >> > > hope
> > >> > > > to
> > >> > > > > > > recover from at the application level...
> > >> > > > > > >
> > >> > > > > > > -Jay
> > >> > > > > > >
> > >> > > > > > > On Fri, Jul 27, 2012 at 9:45 PM, jjian fan <
> > >> > > xiaofanhadoop@gmail.com>
> > >> > > > > > > wrote:
> > >> > > > > > >
> > >> > > > > > > > Jun:
> > >> > > > > > > >    Dropping packages in TCP is an issue of OS/JVM, but
> it
> > >> can
> > >> > > also
> > >> > > > > > cause
> > >> > > > > > > > some kafka issue!
> > >> > > > > > > >    For example, the topic of the message is ax, but it
> can
> > >> > change
> > >> > > > to
> > >> > > > > a
> > >> > > > > > in
> > >> > > > > > > > broker because the some packages is drop, so the log
> > >> directory
> > >> > > > > > > >    should be like a-0,a-1, a-2 and so on ,but
> file.mkdir()
> > >> > create
> > >> > > > log
> > >> > > > > > > > directory like a. Seems some bugs in file.mkdir() of
> > >> > > > > > > LogManager.createlog.
> > >> > > > > > > >    If you shutdown the broker and restart it. The the
> > broker
> > >> > will
> > >> > > > > > report
> > >> > > > > > > > the exception like this:
> > >> > > > > > > >
> > >> > > > > > > > [2012-07-28 12:43:44,565] INFO Loading log 'a'
> > >> > > > (kafka.log.LogManager)
> > >> > > > > > > > [2012-07-28 12:43:44,574] FATAL Fatal error during
> > >> > > > KafkaServerStable
> > >> > > > > > > > startup. Prepare to shutdown
> > >> > (kafka.server.KafkaServerStartable)
> > >> > > > > > > > java.lang.StringIndexOutOfBoundsException: String index
> > out
> > >> of
> > >> > > > range:
> > >> > > > > > -1
> > >> > > > > > > >     at java.lang.String.substring(String.java:1949)
> > >> > > > > > > >     at
> > kafka.utils.Utils$.getTopicPartition(Utils.scala:558)
> > >> > > > > > > >     at
> > >> > kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:71)
> > >> > > > > > > >     at
> > >> > kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:65)
> > >> > > > > > > >     at
> > >> > > > > > > >
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> > >> > > > > > > >     at
> > >> > > scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> > >> > > > > > > >     at kafka.log.LogManager.<init>(LogManager.scala:65)
> > >> > > > > > > >     at
> > >> kafka.server.KafkaServer.startup(KafkaServer.scala:58)
> > >> > > > > > > >     at
> > >> > > > > > > >
> > >> > > > > >
> > >> > > >
> > >> >
> > kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)
> > >> > > > > > > >     at kafka.Kafka$.main(Kafka.scala:50)
> > >> > > > > > > >     at kafka.Kafka.main(Kafka.scala)
> > >> > > > > > > >
> > >> > > > > > > >
> > >> > > > > > > > 2012/7/28 Jun Rao <ju...@gmail.com>
> > >> > > > > > > >
> > >> > > > > > > > > Jian,
> > >> > > > > > > > >
> > >> > > > > > > > > I am not sure if I understand this completely.
> Dropping
> > >> > > packages
> > >> > > > in
> > >> > > > > > TCP
> > >> > > > > > > > > shouldn't cause corruption in the TCP buffer, right?
> Is
> > >> this
> > >> > an
> > >> > > > > issue
> > >> > > > > > > in
> > >> > > > > > > > > Kafka or OS/JVM?
> > >> > > > > > > > >
> > >> > > > > > > > > Thanks,
> > >> > > > > > > > >
> > >> > > > > > > > > Jun
> > >> > > > > > > > >
> > >> > > > > > > > > On Fri, Jul 27, 2012 at 8:29 PM, jjian fan <
> > >> > > > > xiaofanhadoop@gmail.com>
> > >> > > > > > > > > wrote:
> > >> > > > > > > > >
> > >> > > > > > > > > > Jun:
> > >> > > > > > > > > > Yes, if the socket server can't handle the package
> > >> quickly,
> > >> > > tcp
> > >> > > > > > > > protocol
> > >> > > > > > > > > > will drop some network package until the buffer is
> > >> > overflow,
> > >> > > >  the
> > >> > > > > > > > > corrupted
> > >> > > > > > > > > > messages is also appear on this situtation!  I run a
> > >> > > systemtap
> > >> > > > > > script
> > >> > > > > > > > to
> > >> > > > > > > > > > find the package droping ,also you can type " cat
> > >> > > > > > /proc/net/sockstat"
> > >> > > > > > > > to
> > >> > > > > > > > > > see the tcp memory increase.  I debug the whole
> kafka
> > >> > source
> > >> > > > code
> > >> > > > > > to
> > >> > > > > > > > find
> > >> > > > > > > > > > the bug in file.mkdir() of LogManager.createlog.
> > >> > > > > > > > > >
> > >> > > > > > > > > > JIan Fan
> > >> > > > > > > > > >
> > >> > > > > > > > > > 2012/7/27 Jun Rao <ju...@gmail.com>
> > >> > > > > > > > > >
> > >> > > > > > > > > > > Thanks for the finding. Are you saying that this
> > >> problem
> > >> > is
> > >> > > > > > caused
> > >> > > > > > > by
> > >> > > > > > > > > the
> > >> > > > > > > > > > > buffering in Kafka socket server? How did you
> figure
> > >> that
> > >> > > > out?
> > >> > > > > Is
> > >> > > > > > > > this
> > >> > > > > > > > > > > problem exposed by the same test that caused the
> > >> > corrupted
> > >> > > > > > messages
> > >> > > > > > > > in
> > >> > > > > > > > > > the
> > >> > > > > > > > > > > broker?
> > >> > > > > > > > > > >
> > >> > > > > > > > > > > Thanks,
> > >> > > > > > > > > > >
> > >> > > > > > > > > > > Jun
> > >> > > > > > > > > > >
> > >> > > > > > > > > > > On Fri, Jul 27, 2012 at 2:16 AM, jjian fan <
> > >> > > > > > > xiaofanhadoop@gmail.com>
> > >> > > > > > > > > > > wrote:
> > >> > > > > > > > > > >
> > >> > > > > > > > > > > >     In high cocurrent environment, the tcp
> server
> > >> will
> > >> > > drop
> > >> > > > > > some
> > >> > > > > > > > > > package
> > >> > > > > > > > > > > > when the tcp buffer is over. Then
> > >> LogManager.createlog
> > >> > > will
> > >> > > > > > > create
> > >> > > > > > > > > some
> > >> > > > > > > > > > > > no-exists topic log. But one thing is very
> > strange,
> > >> the
> > >> > > log
> > >> > > > > > > > directory
> > >> > > > > > > > > > > > should be like a-0,a-1, a-2 and so on ,but
> > >> file.mkdir()
> > >> > > > > create
> > >> > > > > > > log
> > >> > > > > > > > > > > > directory like a. Seems some bug in file.mkdir()
> > of
> > >> > > > > > > > > > LogManager.createlog.
> > >> > > > > > > > > > > >
> > >> > > > > > > > > > > > the exception message is
> > >> > > > > > > > > > > >
> > >> > > > > > > > > > > > [2012-07-27 17:08:00,559] INFO create directory
> > >> > > > > > /data/kafka/axx-0
> > >> > > > > > > > > > > > (kafka.log.LogManager)
> > >> > > > > > > > > > > > [2012-07-27 17:08:00,561] ERROR Error processing
> > >> > > > > > > > MultiProducerRequest
> > >> > > > > > > > > > on
> > >> > > > > > > > > > > > axx:0 (kafka.server.KafkaRequestHandlers)
> > >> > > > > > > > > > > > java.io.FileNotFoundException:
> > >> > > > > > > > > > > /data/kafka/axx-0/00000000000000000000.kafka
> > >> > > > > > > > > > > > (Is a directory)
> > >> > > > > > > > > > > > at java.io.RandomAccessFile.open(Native Method)
> > >> > > > > > > > > > > > at
> > >> > > > java.io.RandomAccessFile.<init>(RandomAccessFile.java:233)
> > >> > > > > > > > > > > > at
> kafka.utils.Utils$.openChannel(Utils.scala:324)
> > >> > > > > > > > > > > > at
> > >> > > > > kafka.message.FileMessageSet.<init>(FileMessageSet.scala:75)
> > >> > > > > > > > > > > > at kafka.log.Log.loadSegments(Log.scala:144)
> > >> > > > > > > > > > > > at kafka.log.Log.<init>(Log.scala:116)
> > >> > > > > > > > > > > > at
> > >> kafka.log.LogManager.createLog(LogManager.scala:159)
> > >> > > > > > > > > > > > at
> > >> > > > kafka.log.LogManager.getOrCreateLog(LogManager.scala:214)
> > >> > > > > > > > > > > > at
> > >> > > > > > > > > > > >
> > >> > > > > > > > > > > >
> > >> > > > > > > > > > >
> > >> > > > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:74)
> > >> > > > > > > > > > > > at
> > >> > > > > > > > > > > >
> > >> > > > > > > > > > > >
> > >> > > > > > > > > > >
> > >> > > > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > >> > > > > > > > > > > > at
> > >> > > > > > > > > > > >
> > >> > > > > > > > > > > >
> > >> > > > > > > > > > >
> > >> > > > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > >> > > > > > > > > > > > at
> > >> > > > > > > > > > > >
> > >> > > > > > > > > > > >
> > >> > > > > > > > > > >
> > >> > > > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > >> > > > > > > > > > > > at
> > >> > > > > > > > > > > >
> > >> > > > > > > > > > > >
> > >> > > > > > > > > > >
> > >> > > > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > >> > > > > > > > > > > > at
> > >> > > > > > > > > > > >
> > >> > > > > > > > > > > >
> > >> > > > > > > > > > >
> > >> > > > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> > >> > > > > > > > > > > > at
> > >> > > > > scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> > >> > > > > > > > > > > > at
> > >> > > > > > > > > >
> > >> > > > > >
> > >> >
> scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> > >> > > > > > > > > > > > at
> > >> > > scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> > >> > > > > > > > > > > > at
> > >> > > > > > > > > > > >
> > >> > > > > > > > > > > >
> > >> > > > > > > > > > >
> > >> > > > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
> > >> > > > > > > > > > > > at
> > >> > > > > > > > > > > >
> > >> > > > > > > > > > > >
> > >> > > > > > > > > > >
> > >> > > > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > >> > > > > > > > > > > > at
> > >> > > > > > > > > > > >
> > >> > > > > > > > > > > >
> > >> > > > > > > > > > >
> > >> > > > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > >> > > > > > > > > > > > at
> > >> > kafka.network.Processor.handle(SocketServer.scala:296)
> > >> > > > > > > > > > > > at
> > >> kafka.network.Processor.read(SocketServer.scala:319)
> > >> > > > > > > > > > > > at
> > >> kafka.network.Processor.run(SocketServer.scala:214)
> > >> > > > > > > > > > > > at java.lang.Thread.run(Thread.java:679)
> > >> > > > > > > > > > > >
> > >> > > > > > > > > > >
> > >> > > > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >
> > >
> >
>

Re: error in LogManager.createlog()

Posted by Jun Rao <ju...@gmail.com>.
Thanks for the pointer to the paper. However, the socket buffer overflow
issue mentioned in the paper seems to be a performance issue, not a
correctness issue. That is, whatever bytes socket.recv() get should not be
corrupted. Is this not true?

Jun

On Fri, Aug 3, 2012 at 6:54 AM, jjian fan <xi...@gmail.com> wrote:

> The exception reason may be tcp buffer overflow, pls check the paper
> http://os.korea.ac.kr/publication_papers/inter_journal/jhchoi_cn_2007.pdf
>
> Thanks!
>
> 2012/8/2 jjian fan <xi...@gmail.com>
>
> > Jun:
> >
> >     How about the server power of the broker, you can deploy more
> producer
> > clients to increase the borker pressure. In my test, we send 300 thousand
> > messages per second to the broker, the message size is 1024. In this
> > scenario, these exceptions are often be seen.
> >
> > Thanks!
> > Jian Fan
> >
> > 2012/8/1 Jun Rao <ju...@gmail.com>
> >
> >> Jian,
> >>
> >> The message format is documented in the Message class and has the
> >> following
> >> format.
> >>
> >> /**
> >>  * A message. The format of an N byte message is the following:
> >>  *
> >>  * If magic byte is 0
> >>  *
> >>  * 1. 1 byte "magic" identifier to allow format changes
> >>  *
> >>  * 2. 4 byte CRC32 of the payload
> >>  *
> >>  * 3. N - 5 byte payload
> >>  *
> >>  * If magic byte is 1
> >>  *
> >>  * 1. 1 byte "magic" identifier to allow format changes
> >>  *
> >>  * 2. 1 byte "attributes" identifier to allow annotations on the message
> >> independent of the version (e.g. compression enabled, type of codec
> used)
> >>  *
> >>  * 3. 4 byte CRC32 of the payload
> >>  *
> >>  * 4. N - 6 byte payload
> >>  *
> >>  */
> >>
> >> The flow is the following:
> >> 1. SyncProducer.send serializes a MultiProduceRequest to bytes and sends
> >> the bytes to socket.
> >> 2. On the server side:
> >> 2.1 Processor.read reads the bytes off socket and deserializes the bytes
> >> into a MultiProduceRequest
> >> 2.2 The request is then handled in KafkaRequestHandler
> >>
> >> BTW, I ran your test for a couple of days, but couldn't reproduce the
> >> exception. In your test, how frequently do you see the exceptions?
> >>
> >> Thanks,
> >>
> >> Jun
> >>
> >> On Wed, Aug 1, 2012 at 6:43 AM, jjian fan <xi...@gmail.com>
> >> wrote:
> >>
> >> > Jun:
> >> >
> >> >    Can you give more detail of the bytebuffer structure of messages,
> and
> >> > the process of sending and receiving the messages?
> >> >
> >> > Thanks
> >> >
> >> > Jian Fan
> >> >
> >> >
> >> > 2012/7/31 Jun Rao <ju...@gmail.com>
> >> >
> >> > > Jian,
> >> > >
> >> > > Thanks for the patch. It may not be the right fix though since it
> >> fixes
> >> > > the symptom, but not the cause. For each produce request, the broker
> >> does
> >> > > the following: (1) read all bytes of the request into
> >> > > a BoundedByteBufferReceive (SocketServer.read); (2) after all bytes
> of
> >> > the
> >> > > request are ready, deserialize the bytes into a ProducerRequest
> >> > > (KafkaRequestHandler.handleProducerRequest); (3) finally, serve the
> >> > request
> >> > > by adding topic data to logs.
> >> > >
> >> > > What you observed is that in step 3, a topic name is corrupted
> >> somehow.
> >> > > However, this means that the corresponding ProducerRequest is
> >> corrupted.
> >> > > Assuming there is no corruption at the network layer (very
> unlikely),
> >> the
> >> > > corruption much have happened in step 1 or step 2. So, instead of
> >> > patching
> >> > > a corrupted topic name, we should understand why a ProducerRequest
> >> can be
> >> > > corrupted and fix the cause. BTW, what's caused the corrupted topic
> >> could
> >> > > be causing the corrupted messages too.
> >> > >
> >> > > Thanks,
> >> > >
> >> > > Jun
> >> > >
> >> > > On Mon, Jul 30, 2012 at 2:18 AM, jjian fan <xiaofanhadoop@gmail.com
> >
> >> > > wrote:
> >> > >
> >> > > > Jun:
> >> > > >
> >> > > >   Hi. I find why the error appear. In high cocurrent environment,
> >> the
> >> > tcp
> >> > > > server will drop some package when the tcp buffer is over. So
> there
> >> are
> >> > > > some chances that "topic" contains one or more characters that
> >> encode
> >> > to
> >> > > > bytes that include NULL (0).
> >> > > >   I have submit the patch to kafka-411, pls check that!
> >> > > >
> >> > > > Thanks!
> >> > > > Jian Fan
> >> > > >
> >> > > > 2012/7/30 Jun Rao <ju...@gmail.com>
> >> > > >
> >> > > > > Jian,
> >> > > > >
> >> > > > > All log directories in kafka are created by
> >> LogManager.createLog().
> >> > As
> >> > > > you
> >> > > > > can see, the directory always has the form of topic-partitionId.
> >> So,
> >> > > it's
> >> > > > > not clear how a directory of "a" can be created in your case. I
> >> will
> >> > > try
> >> > > > to
> >> > > > > rerun your test and see if it can be reproduced.
> >> > > > >
> >> > > > > Thanks,
> >> > > > >
> >> > > > > Jun
> >> > > > >
> >> > > > > On Sat, Jul 28, 2012 at 7:35 PM, jjian fan <
> >> xiaofanhadoop@gmail.com>
> >> > > > > wrote:
> >> > > > >
> >> > > > > > Jay:
> >> > > > > >
> >> > > > > >    You can try to send 600 thousand message per second to the
> >> > broker,
> >> > > > you
> >> > > > > > can find the tcp will drop packages, so sometimes the topic of
> >> ax
> >> > > will
> >> > > > be
> >> > > > > > a. I don't mean to slove the tcp problem from application
> >> level, I
> >> > > just
> >> > > > > > find there are myabe a bug in file.mkdir() of
> >> LogManager.createlog.
> >> > > It
> >> > > > > will
> >> > > > > > infect the kafka useage.
> >> > > > > >
> >> > > > > > Thanks
> >> > > > > > Jian Fan
> >> > > > > >
> >> > > > > > 2012/7/29 Jay Kreps <ja...@gmail.com>
> >> > > > > >
> >> > > > > > > Hmm, that is not my understanding of TCP. TCP is a reliable
> >> > > protocol
> >> > > > so
> >> > > > > > it
> >> > > > > > > is supposed to either deliver packets in order or timeout
> >> > retrying.
> >> > > > In
> >> > > > > > the
> >> > > > > > > case of the topic name, that is a size-delimited string,
> there
> >> > > should
> >> > > > > be
> >> > > > > > no
> >> > > > > > > way for it to drop a single byte in the middle of the
> request
> >> > like
> >> > > > > that.
> >> > > > > > If
> >> > > > > > > that is in fact happening, I don't think it is something we
> >> can
> >> > > hope
> >> > > > to
> >> > > > > > > recover from at the application level...
> >> > > > > > >
> >> > > > > > > -Jay
> >> > > > > > >
> >> > > > > > > On Fri, Jul 27, 2012 at 9:45 PM, jjian fan <
> >> > > xiaofanhadoop@gmail.com>
> >> > > > > > > wrote:
> >> > > > > > >
> >> > > > > > > > Jun:
> >> > > > > > > >    Dropping packages in TCP is an issue of OS/JVM, but it
> >> can
> >> > > also
> >> > > > > > cause
> >> > > > > > > > some kafka issue!
> >> > > > > > > >    For example, the topic of the message is ax, but it can
> >> > change
> >> > > > to
> >> > > > > a
> >> > > > > > in
> >> > > > > > > > broker because the some packages is drop, so the log
> >> directory
> >> > > > > > > >    should be like a-0,a-1, a-2 and so on ,but file.mkdir()
> >> > create
> >> > > > log
> >> > > > > > > > directory like a. Seems some bugs in file.mkdir() of
> >> > > > > > > LogManager.createlog.
> >> > > > > > > >    If you shutdown the broker and restart it. The the
> broker
> >> > will
> >> > > > > > report
> >> > > > > > > > the exception like this:
> >> > > > > > > >
> >> > > > > > > > [2012-07-28 12:43:44,565] INFO Loading log 'a'
> >> > > > (kafka.log.LogManager)
> >> > > > > > > > [2012-07-28 12:43:44,574] FATAL Fatal error during
> >> > > > KafkaServerStable
> >> > > > > > > > startup. Prepare to shutdown
> >> > (kafka.server.KafkaServerStartable)
> >> > > > > > > > java.lang.StringIndexOutOfBoundsException: String index
> out
> >> of
> >> > > > range:
> >> > > > > > -1
> >> > > > > > > >     at java.lang.String.substring(String.java:1949)
> >> > > > > > > >     at
> kafka.utils.Utils$.getTopicPartition(Utils.scala:558)
> >> > > > > > > >     at
> >> > kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:71)
> >> > > > > > > >     at
> >> > kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:65)
> >> > > > > > > >     at
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> >> > > > > > > >     at
> >> > > scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> >> > > > > > > >     at kafka.log.LogManager.<init>(LogManager.scala:65)
> >> > > > > > > >     at
> >> kafka.server.KafkaServer.startup(KafkaServer.scala:58)
> >> > > > > > > >     at
> >> > > > > > > >
> >> > > > > >
> >> > > >
> >> >
> kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)
> >> > > > > > > >     at kafka.Kafka$.main(Kafka.scala:50)
> >> > > > > > > >     at kafka.Kafka.main(Kafka.scala)
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > > 2012/7/28 Jun Rao <ju...@gmail.com>
> >> > > > > > > >
> >> > > > > > > > > Jian,
> >> > > > > > > > >
> >> > > > > > > > > I am not sure if I understand this completely. Dropping
> >> > > packages
> >> > > > in
> >> > > > > > TCP
> >> > > > > > > > > shouldn't cause corruption in the TCP buffer, right? Is
> >> this
> >> > an
> >> > > > > issue
> >> > > > > > > in
> >> > > > > > > > > Kafka or OS/JVM?
> >> > > > > > > > >
> >> > > > > > > > > Thanks,
> >> > > > > > > > >
> >> > > > > > > > > Jun
> >> > > > > > > > >
> >> > > > > > > > > On Fri, Jul 27, 2012 at 8:29 PM, jjian fan <
> >> > > > > xiaofanhadoop@gmail.com>
> >> > > > > > > > > wrote:
> >> > > > > > > > >
> >> > > > > > > > > > Jun:
> >> > > > > > > > > > Yes, if the socket server can't handle the package
> >> quickly,
> >> > > tcp
> >> > > > > > > > protocol
> >> > > > > > > > > > will drop some network package until the buffer is
> >> > overflow,
> >> > > >  the
> >> > > > > > > > > corrupted
> >> > > > > > > > > > messages is also appear on this situtation!  I run a
> >> > > systemtap
> >> > > > > > script
> >> > > > > > > > to
> >> > > > > > > > > > find the package droping ,also you can type " cat
> >> > > > > > /proc/net/sockstat"
> >> > > > > > > > to
> >> > > > > > > > > > see the tcp memory increase.  I debug the whole kafka
> >> > source
> >> > > > code
> >> > > > > > to
> >> > > > > > > > find
> >> > > > > > > > > > the bug in file.mkdir() of LogManager.createlog.
> >> > > > > > > > > >
> >> > > > > > > > > > JIan Fan
> >> > > > > > > > > >
> >> > > > > > > > > > 2012/7/27 Jun Rao <ju...@gmail.com>
> >> > > > > > > > > >
> >> > > > > > > > > > > Thanks for the finding. Are you saying that this
> >> problem
> >> > is
> >> > > > > > caused
> >> > > > > > > by
> >> > > > > > > > > the
> >> > > > > > > > > > > buffering in Kafka socket server? How did you figure
> >> that
> >> > > > out?
> >> > > > > Is
> >> > > > > > > > this
> >> > > > > > > > > > > problem exposed by the same test that caused the
> >> > corrupted
> >> > > > > > messages
> >> > > > > > > > in
> >> > > > > > > > > > the
> >> > > > > > > > > > > broker?
> >> > > > > > > > > > >
> >> > > > > > > > > > > Thanks,
> >> > > > > > > > > > >
> >> > > > > > > > > > > Jun
> >> > > > > > > > > > >
> >> > > > > > > > > > > On Fri, Jul 27, 2012 at 2:16 AM, jjian fan <
> >> > > > > > > xiaofanhadoop@gmail.com>
> >> > > > > > > > > > > wrote:
> >> > > > > > > > > > >
> >> > > > > > > > > > > >     In high cocurrent environment, the tcp server
> >> will
> >> > > drop
> >> > > > > > some
> >> > > > > > > > > > package
> >> > > > > > > > > > > > when the tcp buffer is over. Then
> >> LogManager.createlog
> >> > > will
> >> > > > > > > create
> >> > > > > > > > > some
> >> > > > > > > > > > > > no-exists topic log. But one thing is very
> strange,
> >> the
> >> > > log
> >> > > > > > > > directory
> >> > > > > > > > > > > > should be like a-0,a-1, a-2 and so on ,but
> >> file.mkdir()
> >> > > > > create
> >> > > > > > > log
> >> > > > > > > > > > > > directory like a. Seems some bug in file.mkdir()
> of
> >> > > > > > > > > > LogManager.createlog.
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > the exception message is
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > [2012-07-27 17:08:00,559] INFO create directory
> >> > > > > > /data/kafka/axx-0
> >> > > > > > > > > > > > (kafka.log.LogManager)
> >> > > > > > > > > > > > [2012-07-27 17:08:00,561] ERROR Error processing
> >> > > > > > > > MultiProducerRequest
> >> > > > > > > > > > on
> >> > > > > > > > > > > > axx:0 (kafka.server.KafkaRequestHandlers)
> >> > > > > > > > > > > > java.io.FileNotFoundException:
> >> > > > > > > > > > > /data/kafka/axx-0/00000000000000000000.kafka
> >> > > > > > > > > > > > (Is a directory)
> >> > > > > > > > > > > > at java.io.RandomAccessFile.open(Native Method)
> >> > > > > > > > > > > > at
> >> > > > java.io.RandomAccessFile.<init>(RandomAccessFile.java:233)
> >> > > > > > > > > > > > at kafka.utils.Utils$.openChannel(Utils.scala:324)
> >> > > > > > > > > > > > at
> >> > > > > kafka.message.FileMessageSet.<init>(FileMessageSet.scala:75)
> >> > > > > > > > > > > > at kafka.log.Log.loadSegments(Log.scala:144)
> >> > > > > > > > > > > > at kafka.log.Log.<init>(Log.scala:116)
> >> > > > > > > > > > > > at
> >> kafka.log.LogManager.createLog(LogManager.scala:159)
> >> > > > > > > > > > > > at
> >> > > > kafka.log.LogManager.getOrCreateLog(LogManager.scala:214)
> >> > > > > > > > > > > > at
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:74)
> >> > > > > > > > > > > > at
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> >> > > > > > > > > > > > at
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> >> > > > > > > > > > > > at
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> >> > > > > > > > > > > > at
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> >> > > > > > > > > > > > at
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> >> > > > > > > > > > > > at
> >> > > > > scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> >> > > > > > > > > > > > at
> >> > > > > > > > > >
> >> > > > > >
> >> > scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> >> > > > > > > > > > > > at
> >> > > scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> >> > > > > > > > > > > > at
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
> >> > > > > > > > > > > > at
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> >> > > > > > > > > > > > at
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> >> > > > > > > > > > > > at
> >> > kafka.network.Processor.handle(SocketServer.scala:296)
> >> > > > > > > > > > > > at
> >> kafka.network.Processor.read(SocketServer.scala:319)
> >> > > > > > > > > > > > at
> >> kafka.network.Processor.run(SocketServer.scala:214)
> >> > > > > > > > > > > > at java.lang.Thread.run(Thread.java:679)
> >> > > > > > > > > > > >
> >> > > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >
> >
>

Re: error in LogManager.createlog()

Posted by jjian fan <xi...@gmail.com>.
The exception reason may be tcp buffer overflow, pls check the paper
http://os.korea.ac.kr/publication_papers/inter_journal/jhchoi_cn_2007.pdf

Thanks!

2012/8/2 jjian fan <xi...@gmail.com>

> Jun:
>
>     How about the server power of the broker, you can deploy more producer
> clients to increase the borker pressure. In my test, we send 300 thousand
> messages per second to the broker, the message size is 1024. In this
> scenario, these exceptions are often be seen.
>
> Thanks!
> Jian Fan
>
> 2012/8/1 Jun Rao <ju...@gmail.com>
>
>> Jian,
>>
>> The message format is documented in the Message class and has the
>> following
>> format.
>>
>> /**
>>  * A message. The format of an N byte message is the following:
>>  *
>>  * If magic byte is 0
>>  *
>>  * 1. 1 byte "magic" identifier to allow format changes
>>  *
>>  * 2. 4 byte CRC32 of the payload
>>  *
>>  * 3. N - 5 byte payload
>>  *
>>  * If magic byte is 1
>>  *
>>  * 1. 1 byte "magic" identifier to allow format changes
>>  *
>>  * 2. 1 byte "attributes" identifier to allow annotations on the message
>> independent of the version (e.g. compression enabled, type of codec used)
>>  *
>>  * 3. 4 byte CRC32 of the payload
>>  *
>>  * 4. N - 6 byte payload
>>  *
>>  */
>>
>> The flow is the following:
>> 1. SyncProducer.send serializes a MultiProduceRequest to bytes and sends
>> the bytes to socket.
>> 2. On the server side:
>> 2.1 Processor.read reads the bytes off socket and deserializes the bytes
>> into a MultiProduceRequest
>> 2.2 The request is then handled in KafkaRequestHandler
>>
>> BTW, I ran your test for a couple of days, but couldn't reproduce the
>> exception. In your test, how frequently do you see the exceptions?
>>
>> Thanks,
>>
>> Jun
>>
>> On Wed, Aug 1, 2012 at 6:43 AM, jjian fan <xi...@gmail.com>
>> wrote:
>>
>> > Jun:
>> >
>> >    Can you give more detail of the bytebuffer structure of messages, and
>> > the process of sending and receiving the messages?
>> >
>> > Thanks
>> >
>> > Jian Fan
>> >
>> >
>> > 2012/7/31 Jun Rao <ju...@gmail.com>
>> >
>> > > Jian,
>> > >
>> > > Thanks for the patch. It may not be the right fix though since it
>> fixes
>> > > the symptom, but not the cause. For each produce request, the broker
>> does
>> > > the following: (1) read all bytes of the request into
>> > > a BoundedByteBufferReceive (SocketServer.read); (2) after all bytes of
>> > the
>> > > request are ready, deserialize the bytes into a ProducerRequest
>> > > (KafkaRequestHandler.handleProducerRequest); (3) finally, serve the
>> > request
>> > > by adding topic data to logs.
>> > >
>> > > What you observed is that in step 3, a topic name is corrupted
>> somehow.
>> > > However, this means that the corresponding ProducerRequest is
>> corrupted.
>> > > Assuming there is no corruption at the network layer (very unlikely),
>> the
>> > > corruption much have happened in step 1 or step 2. So, instead of
>> > patching
>> > > a corrupted topic name, we should understand why a ProducerRequest
>> can be
>> > > corrupted and fix the cause. BTW, what's caused the corrupted topic
>> could
>> > > be causing the corrupted messages too.
>> > >
>> > > Thanks,
>> > >
>> > > Jun
>> > >
>> > > On Mon, Jul 30, 2012 at 2:18 AM, jjian fan <xi...@gmail.com>
>> > > wrote:
>> > >
>> > > > Jun:
>> > > >
>> > > >   Hi. I find why the error appear. In high cocurrent environment,
>> the
>> > tcp
>> > > > server will drop some package when the tcp buffer is over. So there
>> are
>> > > > some chances that "topic" contains one or more characters that
>> encode
>> > to
>> > > > bytes that include NULL (0).
>> > > >   I have submit the patch to kafka-411, pls check that!
>> > > >
>> > > > Thanks!
>> > > > Jian Fan
>> > > >
>> > > > 2012/7/30 Jun Rao <ju...@gmail.com>
>> > > >
>> > > > > Jian,
>> > > > >
>> > > > > All log directories in kafka are created by
>> LogManager.createLog().
>> > As
>> > > > you
>> > > > > can see, the directory always has the form of topic-partitionId.
>> So,
>> > > it's
>> > > > > not clear how a directory of "a" can be created in your case. I
>> will
>> > > try
>> > > > to
>> > > > > rerun your test and see if it can be reproduced.
>> > > > >
>> > > > > Thanks,
>> > > > >
>> > > > > Jun
>> > > > >
>> > > > > On Sat, Jul 28, 2012 at 7:35 PM, jjian fan <
>> xiaofanhadoop@gmail.com>
>> > > > > wrote:
>> > > > >
>> > > > > > Jay:
>> > > > > >
>> > > > > >    You can try to send 600 thousand message per second to the
>> > broker,
>> > > > you
>> > > > > > can find the tcp will drop packages, so sometimes the topic of
>> ax
>> > > will
>> > > > be
>> > > > > > a. I don't mean to slove the tcp problem from application
>> level, I
>> > > just
>> > > > > > find there are myabe a bug in file.mkdir() of
>> LogManager.createlog.
>> > > It
>> > > > > will
>> > > > > > infect the kafka useage.
>> > > > > >
>> > > > > > Thanks
>> > > > > > Jian Fan
>> > > > > >
>> > > > > > 2012/7/29 Jay Kreps <ja...@gmail.com>
>> > > > > >
>> > > > > > > Hmm, that is not my understanding of TCP. TCP is a reliable
>> > > protocol
>> > > > so
>> > > > > > it
>> > > > > > > is supposed to either deliver packets in order or timeout
>> > retrying.
>> > > > In
>> > > > > > the
>> > > > > > > case of the topic name, that is a size-delimited string, there
>> > > should
>> > > > > be
>> > > > > > no
>> > > > > > > way for it to drop a single byte in the middle of the request
>> > like
>> > > > > that.
>> > > > > > If
>> > > > > > > that is in fact happening, I don't think it is something we
>> can
>> > > hope
>> > > > to
>> > > > > > > recover from at the application level...
>> > > > > > >
>> > > > > > > -Jay
>> > > > > > >
>> > > > > > > On Fri, Jul 27, 2012 at 9:45 PM, jjian fan <
>> > > xiaofanhadoop@gmail.com>
>> > > > > > > wrote:
>> > > > > > >
>> > > > > > > > Jun:
>> > > > > > > >    Dropping packages in TCP is an issue of OS/JVM, but it
>> can
>> > > also
>> > > > > > cause
>> > > > > > > > some kafka issue!
>> > > > > > > >    For example, the topic of the message is ax, but it can
>> > change
>> > > > to
>> > > > > a
>> > > > > > in
>> > > > > > > > broker because the some packages is drop, so the log
>> directory
>> > > > > > > >    should be like a-0,a-1, a-2 and so on ,but file.mkdir()
>> > create
>> > > > log
>> > > > > > > > directory like a. Seems some bugs in file.mkdir() of
>> > > > > > > LogManager.createlog.
>> > > > > > > >    If you shutdown the broker and restart it. The the broker
>> > will
>> > > > > > report
>> > > > > > > > the exception like this:
>> > > > > > > >
>> > > > > > > > [2012-07-28 12:43:44,565] INFO Loading log 'a'
>> > > > (kafka.log.LogManager)
>> > > > > > > > [2012-07-28 12:43:44,574] FATAL Fatal error during
>> > > > KafkaServerStable
>> > > > > > > > startup. Prepare to shutdown
>> > (kafka.server.KafkaServerStartable)
>> > > > > > > > java.lang.StringIndexOutOfBoundsException: String index out
>> of
>> > > > range:
>> > > > > > -1
>> > > > > > > >     at java.lang.String.substring(String.java:1949)
>> > > > > > > >     at kafka.utils.Utils$.getTopicPartition(Utils.scala:558)
>> > > > > > > >     at
>> > kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:71)
>> > > > > > > >     at
>> > kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:65)
>> > > > > > > >     at
>> > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
>> > > > > > > >     at
>> > > scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
>> > > > > > > >     at kafka.log.LogManager.<init>(LogManager.scala:65)
>> > > > > > > >     at
>> kafka.server.KafkaServer.startup(KafkaServer.scala:58)
>> > > > > > > >     at
>> > > > > > > >
>> > > > > >
>> > > >
>> > kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)
>> > > > > > > >     at kafka.Kafka$.main(Kafka.scala:50)
>> > > > > > > >     at kafka.Kafka.main(Kafka.scala)
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > 2012/7/28 Jun Rao <ju...@gmail.com>
>> > > > > > > >
>> > > > > > > > > Jian,
>> > > > > > > > >
>> > > > > > > > > I am not sure if I understand this completely. Dropping
>> > > packages
>> > > > in
>> > > > > > TCP
>> > > > > > > > > shouldn't cause corruption in the TCP buffer, right? Is
>> this
>> > an
>> > > > > issue
>> > > > > > > in
>> > > > > > > > > Kafka or OS/JVM?
>> > > > > > > > >
>> > > > > > > > > Thanks,
>> > > > > > > > >
>> > > > > > > > > Jun
>> > > > > > > > >
>> > > > > > > > > On Fri, Jul 27, 2012 at 8:29 PM, jjian fan <
>> > > > > xiaofanhadoop@gmail.com>
>> > > > > > > > > wrote:
>> > > > > > > > >
>> > > > > > > > > > Jun:
>> > > > > > > > > > Yes, if the socket server can't handle the package
>> quickly,
>> > > tcp
>> > > > > > > > protocol
>> > > > > > > > > > will drop some network package until the buffer is
>> > overflow,
>> > > >  the
>> > > > > > > > > corrupted
>> > > > > > > > > > messages is also appear on this situtation!  I run a
>> > > systemtap
>> > > > > > script
>> > > > > > > > to
>> > > > > > > > > > find the package droping ,also you can type " cat
>> > > > > > /proc/net/sockstat"
>> > > > > > > > to
>> > > > > > > > > > see the tcp memory increase.  I debug the whole kafka
>> > source
>> > > > code
>> > > > > > to
>> > > > > > > > find
>> > > > > > > > > > the bug in file.mkdir() of LogManager.createlog.
>> > > > > > > > > >
>> > > > > > > > > > JIan Fan
>> > > > > > > > > >
>> > > > > > > > > > 2012/7/27 Jun Rao <ju...@gmail.com>
>> > > > > > > > > >
>> > > > > > > > > > > Thanks for the finding. Are you saying that this
>> problem
>> > is
>> > > > > > caused
>> > > > > > > by
>> > > > > > > > > the
>> > > > > > > > > > > buffering in Kafka socket server? How did you figure
>> that
>> > > > out?
>> > > > > Is
>> > > > > > > > this
>> > > > > > > > > > > problem exposed by the same test that caused the
>> > corrupted
>> > > > > > messages
>> > > > > > > > in
>> > > > > > > > > > the
>> > > > > > > > > > > broker?
>> > > > > > > > > > >
>> > > > > > > > > > > Thanks,
>> > > > > > > > > > >
>> > > > > > > > > > > Jun
>> > > > > > > > > > >
>> > > > > > > > > > > On Fri, Jul 27, 2012 at 2:16 AM, jjian fan <
>> > > > > > > xiaofanhadoop@gmail.com>
>> > > > > > > > > > > wrote:
>> > > > > > > > > > >
>> > > > > > > > > > > >     In high cocurrent environment, the tcp server
>> will
>> > > drop
>> > > > > > some
>> > > > > > > > > > package
>> > > > > > > > > > > > when the tcp buffer is over. Then
>> LogManager.createlog
>> > > will
>> > > > > > > create
>> > > > > > > > > some
>> > > > > > > > > > > > no-exists topic log. But one thing is very strange,
>> the
>> > > log
>> > > > > > > > directory
>> > > > > > > > > > > > should be like a-0,a-1, a-2 and so on ,but
>> file.mkdir()
>> > > > > create
>> > > > > > > log
>> > > > > > > > > > > > directory like a. Seems some bug in file.mkdir() of
>> > > > > > > > > > LogManager.createlog.
>> > > > > > > > > > > >
>> > > > > > > > > > > > the exception message is
>> > > > > > > > > > > >
>> > > > > > > > > > > > [2012-07-27 17:08:00,559] INFO create directory
>> > > > > > /data/kafka/axx-0
>> > > > > > > > > > > > (kafka.log.LogManager)
>> > > > > > > > > > > > [2012-07-27 17:08:00,561] ERROR Error processing
>> > > > > > > > MultiProducerRequest
>> > > > > > > > > > on
>> > > > > > > > > > > > axx:0 (kafka.server.KafkaRequestHandlers)
>> > > > > > > > > > > > java.io.FileNotFoundException:
>> > > > > > > > > > > /data/kafka/axx-0/00000000000000000000.kafka
>> > > > > > > > > > > > (Is a directory)
>> > > > > > > > > > > > at java.io.RandomAccessFile.open(Native Method)
>> > > > > > > > > > > > at
>> > > > java.io.RandomAccessFile.<init>(RandomAccessFile.java:233)
>> > > > > > > > > > > > at kafka.utils.Utils$.openChannel(Utils.scala:324)
>> > > > > > > > > > > > at
>> > > > > kafka.message.FileMessageSet.<init>(FileMessageSet.scala:75)
>> > > > > > > > > > > > at kafka.log.Log.loadSegments(Log.scala:144)
>> > > > > > > > > > > > at kafka.log.Log.<init>(Log.scala:116)
>> > > > > > > > > > > > at
>> kafka.log.LogManager.createLog(LogManager.scala:159)
>> > > > > > > > > > > > at
>> > > > kafka.log.LogManager.getOrCreateLog(LogManager.scala:214)
>> > > > > > > > > > > > at
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:74)
>> > > > > > > > > > > > at
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
>> > > > > > > > > > > > at
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
>> > > > > > > > > > > > at
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>> > > > > > > > > > > > at
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>> > > > > > > > > > > > at
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
>> > > > > > > > > > > > at
>> > > > > scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
>> > > > > > > > > > > > at
>> > > > > > > > > >
>> > > > > >
>> > scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
>> > > > > > > > > > > > at
>> > > scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
>> > > > > > > > > > > > at
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
>> > > > > > > > > > > > at
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
>> > > > > > > > > > > > at
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
>> > > > > > > > > > > > at
>> > kafka.network.Processor.handle(SocketServer.scala:296)
>> > > > > > > > > > > > at
>> kafka.network.Processor.read(SocketServer.scala:319)
>> > > > > > > > > > > > at
>> kafka.network.Processor.run(SocketServer.scala:214)
>> > > > > > > > > > > > at java.lang.Thread.run(Thread.java:679)
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Re: error in LogManager.createlog()

Posted by jjian fan <xi...@gmail.com>.
Jun:

    How about the server power of the broker, you can deploy more producer
clients to increase the borker pressure. In my test, we send 300 thousand
messages per second to the broker, the message size is 1024. In this
scenario, these exceptions are often be seen.

Thanks!
Jian Fan

2012/8/1 Jun Rao <ju...@gmail.com>

> Jian,
>
> The message format is documented in the Message class and has the following
> format.
>
> /**
>  * A message. The format of an N byte message is the following:
>  *
>  * If magic byte is 0
>  *
>  * 1. 1 byte "magic" identifier to allow format changes
>  *
>  * 2. 4 byte CRC32 of the payload
>  *
>  * 3. N - 5 byte payload
>  *
>  * If magic byte is 1
>  *
>  * 1. 1 byte "magic" identifier to allow format changes
>  *
>  * 2. 1 byte "attributes" identifier to allow annotations on the message
> independent of the version (e.g. compression enabled, type of codec used)
>  *
>  * 3. 4 byte CRC32 of the payload
>  *
>  * 4. N - 6 byte payload
>  *
>  */
>
> The flow is the following:
> 1. SyncProducer.send serializes a MultiProduceRequest to bytes and sends
> the bytes to socket.
> 2. On the server side:
> 2.1 Processor.read reads the bytes off socket and deserializes the bytes
> into a MultiProduceRequest
> 2.2 The request is then handled in KafkaRequestHandler
>
> BTW, I ran your test for a couple of days, but couldn't reproduce the
> exception. In your test, how frequently do you see the exceptions?
>
> Thanks,
>
> Jun
>
> On Wed, Aug 1, 2012 at 6:43 AM, jjian fan <xi...@gmail.com> wrote:
>
> > Jun:
> >
> >    Can you give more detail of the bytebuffer structure of messages, and
> > the process of sending and receiving the messages?
> >
> > Thanks
> >
> > Jian Fan
> >
> >
> > 2012/7/31 Jun Rao <ju...@gmail.com>
> >
> > > Jian,
> > >
> > > Thanks for the patch. It may not be the right fix though since it fixes
> > > the symptom, but not the cause. For each produce request, the broker
> does
> > > the following: (1) read all bytes of the request into
> > > a BoundedByteBufferReceive (SocketServer.read); (2) after all bytes of
> > the
> > > request are ready, deserialize the bytes into a ProducerRequest
> > > (KafkaRequestHandler.handleProducerRequest); (3) finally, serve the
> > request
> > > by adding topic data to logs.
> > >
> > > What you observed is that in step 3, a topic name is corrupted somehow.
> > > However, this means that the corresponding ProducerRequest is
> corrupted.
> > > Assuming there is no corruption at the network layer (very unlikely),
> the
> > > corruption much have happened in step 1 or step 2. So, instead of
> > patching
> > > a corrupted topic name, we should understand why a ProducerRequest can
> be
> > > corrupted and fix the cause. BTW, what's caused the corrupted topic
> could
> > > be causing the corrupted messages too.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Mon, Jul 30, 2012 at 2:18 AM, jjian fan <xi...@gmail.com>
> > > wrote:
> > >
> > > > Jun:
> > > >
> > > >   Hi. I find why the error appear. In high cocurrent environment, the
> > tcp
> > > > server will drop some package when the tcp buffer is over. So there
> are
> > > > some chances that "topic" contains one or more characters that encode
> > to
> > > > bytes that include NULL (0).
> > > >   I have submit the patch to kafka-411, pls check that!
> > > >
> > > > Thanks!
> > > > Jian Fan
> > > >
> > > > 2012/7/30 Jun Rao <ju...@gmail.com>
> > > >
> > > > > Jian,
> > > > >
> > > > > All log directories in kafka are created by LogManager.createLog().
> > As
> > > > you
> > > > > can see, the directory always has the form of topic-partitionId.
> So,
> > > it's
> > > > > not clear how a directory of "a" can be created in your case. I
> will
> > > try
> > > > to
> > > > > rerun your test and see if it can be reproduced.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > > On Sat, Jul 28, 2012 at 7:35 PM, jjian fan <
> xiaofanhadoop@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Jay:
> > > > > >
> > > > > >    You can try to send 600 thousand message per second to the
> > broker,
> > > > you
> > > > > > can find the tcp will drop packages, so sometimes the topic of ax
> > > will
> > > > be
> > > > > > a. I don't mean to slove the tcp problem from application level,
> I
> > > just
> > > > > > find there are myabe a bug in file.mkdir() of
> LogManager.createlog.
> > > It
> > > > > will
> > > > > > infect the kafka useage.
> > > > > >
> > > > > > Thanks
> > > > > > Jian Fan
> > > > > >
> > > > > > 2012/7/29 Jay Kreps <ja...@gmail.com>
> > > > > >
> > > > > > > Hmm, that is not my understanding of TCP. TCP is a reliable
> > > protocol
> > > > so
> > > > > > it
> > > > > > > is supposed to either deliver packets in order or timeout
> > retrying.
> > > > In
> > > > > > the
> > > > > > > case of the topic name, that is a size-delimited string, there
> > > should
> > > > > be
> > > > > > no
> > > > > > > way for it to drop a single byte in the middle of the request
> > like
> > > > > that.
> > > > > > If
> > > > > > > that is in fact happening, I don't think it is something we can
> > > hope
> > > > to
> > > > > > > recover from at the application level...
> > > > > > >
> > > > > > > -Jay
> > > > > > >
> > > > > > > On Fri, Jul 27, 2012 at 9:45 PM, jjian fan <
> > > xiaofanhadoop@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Jun:
> > > > > > > >    Dropping packages in TCP is an issue of OS/JVM, but it can
> > > also
> > > > > > cause
> > > > > > > > some kafka issue!
> > > > > > > >    For example, the topic of the message is ax, but it can
> > change
> > > > to
> > > > > a
> > > > > > in
> > > > > > > > broker because the some packages is drop, so the log
> directory
> > > > > > > >    should be like a-0,a-1, a-2 and so on ,but file.mkdir()
> > create
> > > > log
> > > > > > > > directory like a. Seems some bugs in file.mkdir() of
> > > > > > > LogManager.createlog.
> > > > > > > >    If you shutdown the broker and restart it. The the broker
> > will
> > > > > > report
> > > > > > > > the exception like this:
> > > > > > > >
> > > > > > > > [2012-07-28 12:43:44,565] INFO Loading log 'a'
> > > > (kafka.log.LogManager)
> > > > > > > > [2012-07-28 12:43:44,574] FATAL Fatal error during
> > > > KafkaServerStable
> > > > > > > > startup. Prepare to shutdown
> > (kafka.server.KafkaServerStartable)
> > > > > > > > java.lang.StringIndexOutOfBoundsException: String index out
> of
> > > > range:
> > > > > > -1
> > > > > > > >     at java.lang.String.substring(String.java:1949)
> > > > > > > >     at kafka.utils.Utils$.getTopicPartition(Utils.scala:558)
> > > > > > > >     at
> > kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:71)
> > > > > > > >     at
> > kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:65)
> > > > > > > >     at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> > > > > > > >     at
> > > scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> > > > > > > >     at kafka.log.LogManager.<init>(LogManager.scala:65)
> > > > > > > >     at kafka.server.KafkaServer.startup(KafkaServer.scala:58)
> > > > > > > >     at
> > > > > > > >
> > > > > >
> > > >
> > kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)
> > > > > > > >     at kafka.Kafka$.main(Kafka.scala:50)
> > > > > > > >     at kafka.Kafka.main(Kafka.scala)
> > > > > > > >
> > > > > > > >
> > > > > > > > 2012/7/28 Jun Rao <ju...@gmail.com>
> > > > > > > >
> > > > > > > > > Jian,
> > > > > > > > >
> > > > > > > > > I am not sure if I understand this completely. Dropping
> > > packages
> > > > in
> > > > > > TCP
> > > > > > > > > shouldn't cause corruption in the TCP buffer, right? Is
> this
> > an
> > > > > issue
> > > > > > > in
> > > > > > > > > Kafka or OS/JVM?
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > >
> > > > > > > > > Jun
> > > > > > > > >
> > > > > > > > > On Fri, Jul 27, 2012 at 8:29 PM, jjian fan <
> > > > > xiaofanhadoop@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Jun:
> > > > > > > > > > Yes, if the socket server can't handle the package
> quickly,
> > > tcp
> > > > > > > > protocol
> > > > > > > > > > will drop some network package until the buffer is
> > overflow,
> > > >  the
> > > > > > > > > corrupted
> > > > > > > > > > messages is also appear on this situtation!  I run a
> > > systemtap
> > > > > > script
> > > > > > > > to
> > > > > > > > > > find the package droping ,also you can type " cat
> > > > > > /proc/net/sockstat"
> > > > > > > > to
> > > > > > > > > > see the tcp memory increase.  I debug the whole kafka
> > source
> > > > code
> > > > > > to
> > > > > > > > find
> > > > > > > > > > the bug in file.mkdir() of LogManager.createlog.
> > > > > > > > > >
> > > > > > > > > > JIan Fan
> > > > > > > > > >
> > > > > > > > > > 2012/7/27 Jun Rao <ju...@gmail.com>
> > > > > > > > > >
> > > > > > > > > > > Thanks for the finding. Are you saying that this
> problem
> > is
> > > > > > caused
> > > > > > > by
> > > > > > > > > the
> > > > > > > > > > > buffering in Kafka socket server? How did you figure
> that
> > > > out?
> > > > > Is
> > > > > > > > this
> > > > > > > > > > > problem exposed by the same test that caused the
> > corrupted
> > > > > > messages
> > > > > > > > in
> > > > > > > > > > the
> > > > > > > > > > > broker?
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > >
> > > > > > > > > > > Jun
> > > > > > > > > > >
> > > > > > > > > > > On Fri, Jul 27, 2012 at 2:16 AM, jjian fan <
> > > > > > > xiaofanhadoop@gmail.com>
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > >     In high cocurrent environment, the tcp server
> will
> > > drop
> > > > > > some
> > > > > > > > > > package
> > > > > > > > > > > > when the tcp buffer is over. Then
> LogManager.createlog
> > > will
> > > > > > > create
> > > > > > > > > some
> > > > > > > > > > > > no-exists topic log. But one thing is very strange,
> the
> > > log
> > > > > > > > directory
> > > > > > > > > > > > should be like a-0,a-1, a-2 and so on ,but
> file.mkdir()
> > > > > create
> > > > > > > log
> > > > > > > > > > > > directory like a. Seems some bug in file.mkdir() of
> > > > > > > > > > LogManager.createlog.
> > > > > > > > > > > >
> > > > > > > > > > > > the exception message is
> > > > > > > > > > > >
> > > > > > > > > > > > [2012-07-27 17:08:00,559] INFO create directory
> > > > > > /data/kafka/axx-0
> > > > > > > > > > > > (kafka.log.LogManager)
> > > > > > > > > > > > [2012-07-27 17:08:00,561] ERROR Error processing
> > > > > > > > MultiProducerRequest
> > > > > > > > > > on
> > > > > > > > > > > > axx:0 (kafka.server.KafkaRequestHandlers)
> > > > > > > > > > > > java.io.FileNotFoundException:
> > > > > > > > > > > /data/kafka/axx-0/00000000000000000000.kafka
> > > > > > > > > > > > (Is a directory)
> > > > > > > > > > > > at java.io.RandomAccessFile.open(Native Method)
> > > > > > > > > > > > at
> > > > java.io.RandomAccessFile.<init>(RandomAccessFile.java:233)
> > > > > > > > > > > > at kafka.utils.Utils$.openChannel(Utils.scala:324)
> > > > > > > > > > > > at
> > > > > kafka.message.FileMessageSet.<init>(FileMessageSet.scala:75)
> > > > > > > > > > > > at kafka.log.Log.loadSegments(Log.scala:144)
> > > > > > > > > > > > at kafka.log.Log.<init>(Log.scala:116)
> > > > > > > > > > > > at
> kafka.log.LogManager.createLog(LogManager.scala:159)
> > > > > > > > > > > > at
> > > > kafka.log.LogManager.getOrCreateLog(LogManager.scala:214)
> > > > > > > > > > > > at
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:74)
> > > > > > > > > > > > at
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > > > > > > > > > > at
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > > > > > > > > > > at
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > > > > > > > > > > at
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > > > > > > > > > > at
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> > > > > > > > > > > > at
> > > > > scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> > > > > > > > > > > > at
> > > > > > > > > >
> > > > > >
> > scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> > > > > > > > > > > > at
> > > scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> > > > > > > > > > > > at
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
> > > > > > > > > > > > at
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > > > > > > > > > > at
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > > > > > > > > > > at
> > kafka.network.Processor.handle(SocketServer.scala:296)
> > > > > > > > > > > > at
> kafka.network.Processor.read(SocketServer.scala:319)
> > > > > > > > > > > > at
> kafka.network.Processor.run(SocketServer.scala:214)
> > > > > > > > > > > > at java.lang.Thread.run(Thread.java:679)
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: error in LogManager.createlog()

Posted by Jun Rao <ju...@gmail.com>.
Jian,

The message format is documented in the Message class and has the following
format.

/**
 * A message. The format of an N byte message is the following:
 *
 * If magic byte is 0
 *
 * 1. 1 byte "magic" identifier to allow format changes
 *
 * 2. 4 byte CRC32 of the payload
 *
 * 3. N - 5 byte payload
 *
 * If magic byte is 1
 *
 * 1. 1 byte "magic" identifier to allow format changes
 *
 * 2. 1 byte "attributes" identifier to allow annotations on the message
independent of the version (e.g. compression enabled, type of codec used)
 *
 * 3. 4 byte CRC32 of the payload
 *
 * 4. N - 6 byte payload
 *
 */

The flow is the following:
1. SyncProducer.send serializes a MultiProduceRequest to bytes and sends
the bytes to socket.
2. On the server side:
2.1 Processor.read reads the bytes off socket and deserializes the bytes
into a MultiProduceRequest
2.2 The request is then handled in KafkaRequestHandler

BTW, I ran your test for a couple of days, but couldn't reproduce the
exception. In your test, how frequently do you see the exceptions?

Thanks,

Jun

On Wed, Aug 1, 2012 at 6:43 AM, jjian fan <xi...@gmail.com> wrote:

> Jun:
>
>    Can you give more detail of the bytebuffer structure of messages, and
> the process of sending and receiving the messages?
>
> Thanks
>
> Jian Fan
>
>
> 2012/7/31 Jun Rao <ju...@gmail.com>
>
> > Jian,
> >
> > Thanks for the patch. It may not be the right fix though since it fixes
> > the symptom, but not the cause. For each produce request, the broker does
> > the following: (1) read all bytes of the request into
> > a BoundedByteBufferReceive (SocketServer.read); (2) after all bytes of
> the
> > request are ready, deserialize the bytes into a ProducerRequest
> > (KafkaRequestHandler.handleProducerRequest); (3) finally, serve the
> request
> > by adding topic data to logs.
> >
> > What you observed is that in step 3, a topic name is corrupted somehow.
> > However, this means that the corresponding ProducerRequest is corrupted.
> > Assuming there is no corruption at the network layer (very unlikely), the
> > corruption much have happened in step 1 or step 2. So, instead of
> patching
> > a corrupted topic name, we should understand why a ProducerRequest can be
> > corrupted and fix the cause. BTW, what's caused the corrupted topic could
> > be causing the corrupted messages too.
> >
> > Thanks,
> >
> > Jun
> >
> > On Mon, Jul 30, 2012 at 2:18 AM, jjian fan <xi...@gmail.com>
> > wrote:
> >
> > > Jun:
> > >
> > >   Hi. I find why the error appear. In high cocurrent environment, the
> tcp
> > > server will drop some package when the tcp buffer is over. So there are
> > > some chances that "topic" contains one or more characters that encode
> to
> > > bytes that include NULL (0).
> > >   I have submit the patch to kafka-411, pls check that!
> > >
> > > Thanks!
> > > Jian Fan
> > >
> > > 2012/7/30 Jun Rao <ju...@gmail.com>
> > >
> > > > Jian,
> > > >
> > > > All log directories in kafka are created by LogManager.createLog().
> As
> > > you
> > > > can see, the directory always has the form of topic-partitionId. So,
> > it's
> > > > not clear how a directory of "a" can be created in your case. I will
> > try
> > > to
> > > > rerun your test and see if it can be reproduced.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Sat, Jul 28, 2012 at 7:35 PM, jjian fan <xi...@gmail.com>
> > > > wrote:
> > > >
> > > > > Jay:
> > > > >
> > > > >    You can try to send 600 thousand message per second to the
> broker,
> > > you
> > > > > can find the tcp will drop packages, so sometimes the topic of ax
> > will
> > > be
> > > > > a. I don't mean to slove the tcp problem from application level, I
> > just
> > > > > find there are myabe a bug in file.mkdir() of LogManager.createlog.
> > It
> > > > will
> > > > > infect the kafka useage.
> > > > >
> > > > > Thanks
> > > > > Jian Fan
> > > > >
> > > > > 2012/7/29 Jay Kreps <ja...@gmail.com>
> > > > >
> > > > > > Hmm, that is not my understanding of TCP. TCP is a reliable
> > protocol
> > > so
> > > > > it
> > > > > > is supposed to either deliver packets in order or timeout
> retrying.
> > > In
> > > > > the
> > > > > > case of the topic name, that is a size-delimited string, there
> > should
> > > > be
> > > > > no
> > > > > > way for it to drop a single byte in the middle of the request
> like
> > > > that.
> > > > > If
> > > > > > that is in fact happening, I don't think it is something we can
> > hope
> > > to
> > > > > > recover from at the application level...
> > > > > >
> > > > > > -Jay
> > > > > >
> > > > > > On Fri, Jul 27, 2012 at 9:45 PM, jjian fan <
> > xiaofanhadoop@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Jun:
> > > > > > >    Dropping packages in TCP is an issue of OS/JVM, but it can
> > also
> > > > > cause
> > > > > > > some kafka issue!
> > > > > > >    For example, the topic of the message is ax, but it can
> change
> > > to
> > > > a
> > > > > in
> > > > > > > broker because the some packages is drop, so the log directory
> > > > > > >    should be like a-0,a-1, a-2 and so on ,but file.mkdir()
> create
> > > log
> > > > > > > directory like a. Seems some bugs in file.mkdir() of
> > > > > > LogManager.createlog.
> > > > > > >    If you shutdown the broker and restart it. The the broker
> will
> > > > > report
> > > > > > > the exception like this:
> > > > > > >
> > > > > > > [2012-07-28 12:43:44,565] INFO Loading log 'a'
> > > (kafka.log.LogManager)
> > > > > > > [2012-07-28 12:43:44,574] FATAL Fatal error during
> > > KafkaServerStable
> > > > > > > startup. Prepare to shutdown
> (kafka.server.KafkaServerStartable)
> > > > > > > java.lang.StringIndexOutOfBoundsException: String index out of
> > > range:
> > > > > -1
> > > > > > >     at java.lang.String.substring(String.java:1949)
> > > > > > >     at kafka.utils.Utils$.getTopicPartition(Utils.scala:558)
> > > > > > >     at
> kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:71)
> > > > > > >     at
> kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:65)
> > > > > > >     at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> > > > > > >     at
> > scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> > > > > > >     at kafka.log.LogManager.<init>(LogManager.scala:65)
> > > > > > >     at kafka.server.KafkaServer.startup(KafkaServer.scala:58)
> > > > > > >     at
> > > > > > >
> > > > >
> > >
> kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)
> > > > > > >     at kafka.Kafka$.main(Kafka.scala:50)
> > > > > > >     at kafka.Kafka.main(Kafka.scala)
> > > > > > >
> > > > > > >
> > > > > > > 2012/7/28 Jun Rao <ju...@gmail.com>
> > > > > > >
> > > > > > > > Jian,
> > > > > > > >
> > > > > > > > I am not sure if I understand this completely. Dropping
> > packages
> > > in
> > > > > TCP
> > > > > > > > shouldn't cause corruption in the TCP buffer, right? Is this
> an
> > > > issue
> > > > > > in
> > > > > > > > Kafka or OS/JVM?
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > Jun
> > > > > > > >
> > > > > > > > On Fri, Jul 27, 2012 at 8:29 PM, jjian fan <
> > > > xiaofanhadoop@gmail.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Jun:
> > > > > > > > > Yes, if the socket server can't handle the package quickly,
> > tcp
> > > > > > > protocol
> > > > > > > > > will drop some network package until the buffer is
> overflow,
> > >  the
> > > > > > > > corrupted
> > > > > > > > > messages is also appear on this situtation!  I run a
> > systemtap
> > > > > script
> > > > > > > to
> > > > > > > > > find the package droping ,also you can type " cat
> > > > > /proc/net/sockstat"
> > > > > > > to
> > > > > > > > > see the tcp memory increase.  I debug the whole kafka
> source
> > > code
> > > > > to
> > > > > > > find
> > > > > > > > > the bug in file.mkdir() of LogManager.createlog.
> > > > > > > > >
> > > > > > > > > JIan Fan
> > > > > > > > >
> > > > > > > > > 2012/7/27 Jun Rao <ju...@gmail.com>
> > > > > > > > >
> > > > > > > > > > Thanks for the finding. Are you saying that this problem
> is
> > > > > caused
> > > > > > by
> > > > > > > > the
> > > > > > > > > > buffering in Kafka socket server? How did you figure that
> > > out?
> > > > Is
> > > > > > > this
> > > > > > > > > > problem exposed by the same test that caused the
> corrupted
> > > > > messages
> > > > > > > in
> > > > > > > > > the
> > > > > > > > > > broker?
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > >
> > > > > > > > > > Jun
> > > > > > > > > >
> > > > > > > > > > On Fri, Jul 27, 2012 at 2:16 AM, jjian fan <
> > > > > > xiaofanhadoop@gmail.com>
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > >     In high cocurrent environment, the tcp server will
> > drop
> > > > > some
> > > > > > > > > package
> > > > > > > > > > > when the tcp buffer is over. Then LogManager.createlog
> > will
> > > > > > create
> > > > > > > > some
> > > > > > > > > > > no-exists topic log. But one thing is very strange, the
> > log
> > > > > > > directory
> > > > > > > > > > > should be like a-0,a-1, a-2 and so on ,but file.mkdir()
> > > > create
> > > > > > log
> > > > > > > > > > > directory like a. Seems some bug in file.mkdir() of
> > > > > > > > > LogManager.createlog.
> > > > > > > > > > >
> > > > > > > > > > > the exception message is
> > > > > > > > > > >
> > > > > > > > > > > [2012-07-27 17:08:00,559] INFO create directory
> > > > > /data/kafka/axx-0
> > > > > > > > > > > (kafka.log.LogManager)
> > > > > > > > > > > [2012-07-27 17:08:00,561] ERROR Error processing
> > > > > > > MultiProducerRequest
> > > > > > > > > on
> > > > > > > > > > > axx:0 (kafka.server.KafkaRequestHandlers)
> > > > > > > > > > > java.io.FileNotFoundException:
> > > > > > > > > > /data/kafka/axx-0/00000000000000000000.kafka
> > > > > > > > > > > (Is a directory)
> > > > > > > > > > > at java.io.RandomAccessFile.open(Native Method)
> > > > > > > > > > > at
> > > java.io.RandomAccessFile.<init>(RandomAccessFile.java:233)
> > > > > > > > > > > at kafka.utils.Utils$.openChannel(Utils.scala:324)
> > > > > > > > > > > at
> > > > kafka.message.FileMessageSet.<init>(FileMessageSet.scala:75)
> > > > > > > > > > > at kafka.log.Log.loadSegments(Log.scala:144)
> > > > > > > > > > > at kafka.log.Log.<init>(Log.scala:116)
> > > > > > > > > > > at kafka.log.LogManager.createLog(LogManager.scala:159)
> > > > > > > > > > > at
> > > kafka.log.LogManager.getOrCreateLog(LogManager.scala:214)
> > > > > > > > > > > at
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:74)
> > > > > > > > > > > at
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > > > > > > > > > at
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > > > > > > > > > at
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > > > > > > > > > at
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > > > > > > > > > at
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> > > > > > > > > > > at
> > > > scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> > > > > > > > > > > at
> > > > > > > > >
> > > > >
> scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> > > > > > > > > > > at
> > scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> > > > > > > > > > > at
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
> > > > > > > > > > > at
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > > > > > > > > > at
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > > > > > > > > > at
> kafka.network.Processor.handle(SocketServer.scala:296)
> > > > > > > > > > > at kafka.network.Processor.read(SocketServer.scala:319)
> > > > > > > > > > > at kafka.network.Processor.run(SocketServer.scala:214)
> > > > > > > > > > > at java.lang.Thread.run(Thread.java:679)
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: error in LogManager.createlog()

Posted by jjian fan <xi...@gmail.com>.
Jun:

   Can you give more detail of the bytebuffer structure of messages, and
the process of sending and receiving the messages?

Thanks

Jian Fan


2012/7/31 Jun Rao <ju...@gmail.com>

> Jian,
>
> Thanks for the patch. It may not be the right fix though since it fixes
> the symptom, but not the cause. For each produce request, the broker does
> the following: (1) read all bytes of the request into
> a BoundedByteBufferReceive (SocketServer.read); (2) after all bytes of the
> request are ready, deserialize the bytes into a ProducerRequest
> (KafkaRequestHandler.handleProducerRequest); (3) finally, serve the request
> by adding topic data to logs.
>
> What you observed is that in step 3, a topic name is corrupted somehow.
> However, this means that the corresponding ProducerRequest is corrupted.
> Assuming there is no corruption at the network layer (very unlikely), the
> corruption much have happened in step 1 or step 2. So, instead of patching
> a corrupted topic name, we should understand why a ProducerRequest can be
> corrupted and fix the cause. BTW, what's caused the corrupted topic could
> be causing the corrupted messages too.
>
> Thanks,
>
> Jun
>
> On Mon, Jul 30, 2012 at 2:18 AM, jjian fan <xi...@gmail.com>
> wrote:
>
> > Jun:
> >
> >   Hi. I find why the error appear. In high cocurrent environment, the tcp
> > server will drop some package when the tcp buffer is over. So there are
> > some chances that "topic" contains one or more characters that encode to
> > bytes that include NULL (0).
> >   I have submit the patch to kafka-411, pls check that!
> >
> > Thanks!
> > Jian Fan
> >
> > 2012/7/30 Jun Rao <ju...@gmail.com>
> >
> > > Jian,
> > >
> > > All log directories in kafka are created by LogManager.createLog(). As
> > you
> > > can see, the directory always has the form of topic-partitionId. So,
> it's
> > > not clear how a directory of "a" can be created in your case. I will
> try
> > to
> > > rerun your test and see if it can be reproduced.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Sat, Jul 28, 2012 at 7:35 PM, jjian fan <xi...@gmail.com>
> > > wrote:
> > >
> > > > Jay:
> > > >
> > > >    You can try to send 600 thousand message per second to the broker,
> > you
> > > > can find the tcp will drop packages, so sometimes the topic of ax
> will
> > be
> > > > a. I don't mean to slove the tcp problem from application level, I
> just
> > > > find there are myabe a bug in file.mkdir() of LogManager.createlog.
> It
> > > will
> > > > infect the kafka useage.
> > > >
> > > > Thanks
> > > > Jian Fan
> > > >
> > > > 2012/7/29 Jay Kreps <ja...@gmail.com>
> > > >
> > > > > Hmm, that is not my understanding of TCP. TCP is a reliable
> protocol
> > so
> > > > it
> > > > > is supposed to either deliver packets in order or timeout retrying.
> > In
> > > > the
> > > > > case of the topic name, that is a size-delimited string, there
> should
> > > be
> > > > no
> > > > > way for it to drop a single byte in the middle of the request like
> > > that.
> > > > If
> > > > > that is in fact happening, I don't think it is something we can
> hope
> > to
> > > > > recover from at the application level...
> > > > >
> > > > > -Jay
> > > > >
> > > > > On Fri, Jul 27, 2012 at 9:45 PM, jjian fan <
> xiaofanhadoop@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Jun:
> > > > > >    Dropping packages in TCP is an issue of OS/JVM, but it can
> also
> > > > cause
> > > > > > some kafka issue!
> > > > > >    For example, the topic of the message is ax, but it can change
> > to
> > > a
> > > > in
> > > > > > broker because the some packages is drop, so the log directory
> > > > > >    should be like a-0,a-1, a-2 and so on ,but file.mkdir() create
> > log
> > > > > > directory like a. Seems some bugs in file.mkdir() of
> > > > > LogManager.createlog.
> > > > > >    If you shutdown the broker and restart it. The the broker will
> > > > report
> > > > > > the exception like this:
> > > > > >
> > > > > > [2012-07-28 12:43:44,565] INFO Loading log 'a'
> > (kafka.log.LogManager)
> > > > > > [2012-07-28 12:43:44,574] FATAL Fatal error during
> > KafkaServerStable
> > > > > > startup. Prepare to shutdown (kafka.server.KafkaServerStartable)
> > > > > > java.lang.StringIndexOutOfBoundsException: String index out of
> > range:
> > > > -1
> > > > > >     at java.lang.String.substring(String.java:1949)
> > > > > >     at kafka.utils.Utils$.getTopicPartition(Utils.scala:558)
> > > > > >     at kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:71)
> > > > > >     at kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:65)
> > > > > >     at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> > > > > >     at
> scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> > > > > >     at kafka.log.LogManager.<init>(LogManager.scala:65)
> > > > > >     at kafka.server.KafkaServer.startup(KafkaServer.scala:58)
> > > > > >     at
> > > > > >
> > > >
> > kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)
> > > > > >     at kafka.Kafka$.main(Kafka.scala:50)
> > > > > >     at kafka.Kafka.main(Kafka.scala)
> > > > > >
> > > > > >
> > > > > > 2012/7/28 Jun Rao <ju...@gmail.com>
> > > > > >
> > > > > > > Jian,
> > > > > > >
> > > > > > > I am not sure if I understand this completely. Dropping
> packages
> > in
> > > > TCP
> > > > > > > shouldn't cause corruption in the TCP buffer, right? Is this an
> > > issue
> > > > > in
> > > > > > > Kafka or OS/JVM?
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Jun
> > > > > > >
> > > > > > > On Fri, Jul 27, 2012 at 8:29 PM, jjian fan <
> > > xiaofanhadoop@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Jun:
> > > > > > > > Yes, if the socket server can't handle the package quickly,
> tcp
> > > > > > protocol
> > > > > > > > will drop some network package until the buffer is overflow,
> >  the
> > > > > > > corrupted
> > > > > > > > messages is also appear on this situtation!  I run a
> systemtap
> > > > script
> > > > > > to
> > > > > > > > find the package droping ,also you can type " cat
> > > > /proc/net/sockstat"
> > > > > > to
> > > > > > > > see the tcp memory increase.  I debug the whole kafka source
> > code
> > > > to
> > > > > > find
> > > > > > > > the bug in file.mkdir() of LogManager.createlog.
> > > > > > > >
> > > > > > > > JIan Fan
> > > > > > > >
> > > > > > > > 2012/7/27 Jun Rao <ju...@gmail.com>
> > > > > > > >
> > > > > > > > > Thanks for the finding. Are you saying that this problem is
> > > > caused
> > > > > by
> > > > > > > the
> > > > > > > > > buffering in Kafka socket server? How did you figure that
> > out?
> > > Is
> > > > > > this
> > > > > > > > > problem exposed by the same test that caused the corrupted
> > > > messages
> > > > > > in
> > > > > > > > the
> > > > > > > > > broker?
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > >
> > > > > > > > > Jun
> > > > > > > > >
> > > > > > > > > On Fri, Jul 27, 2012 at 2:16 AM, jjian fan <
> > > > > xiaofanhadoop@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > >     In high cocurrent environment, the tcp server will
> drop
> > > > some
> > > > > > > > package
> > > > > > > > > > when the tcp buffer is over. Then LogManager.createlog
> will
> > > > > create
> > > > > > > some
> > > > > > > > > > no-exists topic log. But one thing is very strange, the
> log
> > > > > > directory
> > > > > > > > > > should be like a-0,a-1, a-2 and so on ,but file.mkdir()
> > > create
> > > > > log
> > > > > > > > > > directory like a. Seems some bug in file.mkdir() of
> > > > > > > > LogManager.createlog.
> > > > > > > > > >
> > > > > > > > > > the exception message is
> > > > > > > > > >
> > > > > > > > > > [2012-07-27 17:08:00,559] INFO create directory
> > > > /data/kafka/axx-0
> > > > > > > > > > (kafka.log.LogManager)
> > > > > > > > > > [2012-07-27 17:08:00,561] ERROR Error processing
> > > > > > MultiProducerRequest
> > > > > > > > on
> > > > > > > > > > axx:0 (kafka.server.KafkaRequestHandlers)
> > > > > > > > > > java.io.FileNotFoundException:
> > > > > > > > > /data/kafka/axx-0/00000000000000000000.kafka
> > > > > > > > > > (Is a directory)
> > > > > > > > > > at java.io.RandomAccessFile.open(Native Method)
> > > > > > > > > > at
> > java.io.RandomAccessFile.<init>(RandomAccessFile.java:233)
> > > > > > > > > > at kafka.utils.Utils$.openChannel(Utils.scala:324)
> > > > > > > > > > at
> > > kafka.message.FileMessageSet.<init>(FileMessageSet.scala:75)
> > > > > > > > > > at kafka.log.Log.loadSegments(Log.scala:144)
> > > > > > > > > > at kafka.log.Log.<init>(Log.scala:116)
> > > > > > > > > > at kafka.log.LogManager.createLog(LogManager.scala:159)
> > > > > > > > > > at
> > kafka.log.LogManager.getOrCreateLog(LogManager.scala:214)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:74)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> > > > > > > > > > at
> > > scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> > > > > > > > > > at
> > > > > > > >
> > > > scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> > > > > > > > > > at
> scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > > > > > > > > at kafka.network.Processor.handle(SocketServer.scala:296)
> > > > > > > > > > at kafka.network.Processor.read(SocketServer.scala:319)
> > > > > > > > > > at kafka.network.Processor.run(SocketServer.scala:214)
> > > > > > > > > > at java.lang.Thread.run(Thread.java:679)
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: error in LogManager.createlog()

Posted by Jun Rao <ju...@gmail.com>.
Jian,

Thanks for the patch. It may not be the right fix though since it fixes
the symptom, but not the cause. For each produce request, the broker does
the following: (1) read all bytes of the request into
a BoundedByteBufferReceive (SocketServer.read); (2) after all bytes of the
request are ready, deserialize the bytes into a ProducerRequest
(KafkaRequestHandler.handleProducerRequest); (3) finally, serve the request
by adding topic data to logs.

What you observed is that in step 3, a topic name is corrupted somehow.
However, this means that the corresponding ProducerRequest is corrupted.
Assuming there is no corruption at the network layer (very unlikely), the
corruption much have happened in step 1 or step 2. So, instead of patching
a corrupted topic name, we should understand why a ProducerRequest can be
corrupted and fix the cause. BTW, what's caused the corrupted topic could
be causing the corrupted messages too.

Thanks,

Jun

On Mon, Jul 30, 2012 at 2:18 AM, jjian fan <xi...@gmail.com> wrote:

> Jun:
>
>   Hi. I find why the error appear. In high cocurrent environment, the tcp
> server will drop some package when the tcp buffer is over. So there are
> some chances that "topic" contains one or more characters that encode to
> bytes that include NULL (0).
>   I have submit the patch to kafka-411, pls check that!
>
> Thanks!
> Jian Fan
>
> 2012/7/30 Jun Rao <ju...@gmail.com>
>
> > Jian,
> >
> > All log directories in kafka are created by LogManager.createLog(). As
> you
> > can see, the directory always has the form of topic-partitionId. So, it's
> > not clear how a directory of "a" can be created in your case. I will try
> to
> > rerun your test and see if it can be reproduced.
> >
> > Thanks,
> >
> > Jun
> >
> > On Sat, Jul 28, 2012 at 7:35 PM, jjian fan <xi...@gmail.com>
> > wrote:
> >
> > > Jay:
> > >
> > >    You can try to send 600 thousand message per second to the broker,
> you
> > > can find the tcp will drop packages, so sometimes the topic of ax will
> be
> > > a. I don't mean to slove the tcp problem from application level, I just
> > > find there are myabe a bug in file.mkdir() of LogManager.createlog. It
> > will
> > > infect the kafka useage.
> > >
> > > Thanks
> > > Jian Fan
> > >
> > > 2012/7/29 Jay Kreps <ja...@gmail.com>
> > >
> > > > Hmm, that is not my understanding of TCP. TCP is a reliable protocol
> so
> > > it
> > > > is supposed to either deliver packets in order or timeout retrying.
> In
> > > the
> > > > case of the topic name, that is a size-delimited string, there should
> > be
> > > no
> > > > way for it to drop a single byte in the middle of the request like
> > that.
> > > If
> > > > that is in fact happening, I don't think it is something we can hope
> to
> > > > recover from at the application level...
> > > >
> > > > -Jay
> > > >
> > > > On Fri, Jul 27, 2012 at 9:45 PM, jjian fan <xi...@gmail.com>
> > > > wrote:
> > > >
> > > > > Jun:
> > > > >    Dropping packages in TCP is an issue of OS/JVM, but it can also
> > > cause
> > > > > some kafka issue!
> > > > >    For example, the topic of the message is ax, but it can change
> to
> > a
> > > in
> > > > > broker because the some packages is drop, so the log directory
> > > > >    should be like a-0,a-1, a-2 and so on ,but file.mkdir() create
> log
> > > > > directory like a. Seems some bugs in file.mkdir() of
> > > > LogManager.createlog.
> > > > >    If you shutdown the broker and restart it. The the broker will
> > > report
> > > > > the exception like this:
> > > > >
> > > > > [2012-07-28 12:43:44,565] INFO Loading log 'a'
> (kafka.log.LogManager)
> > > > > [2012-07-28 12:43:44,574] FATAL Fatal error during
> KafkaServerStable
> > > > > startup. Prepare to shutdown (kafka.server.KafkaServerStartable)
> > > > > java.lang.StringIndexOutOfBoundsException: String index out of
> range:
> > > -1
> > > > >     at java.lang.String.substring(String.java:1949)
> > > > >     at kafka.utils.Utils$.getTopicPartition(Utils.scala:558)
> > > > >     at kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:71)
> > > > >     at kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:65)
> > > > >     at
> > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> > > > >     at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> > > > >     at kafka.log.LogManager.<init>(LogManager.scala:65)
> > > > >     at kafka.server.KafkaServer.startup(KafkaServer.scala:58)
> > > > >     at
> > > > >
> > >
> kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)
> > > > >     at kafka.Kafka$.main(Kafka.scala:50)
> > > > >     at kafka.Kafka.main(Kafka.scala)
> > > > >
> > > > >
> > > > > 2012/7/28 Jun Rao <ju...@gmail.com>
> > > > >
> > > > > > Jian,
> > > > > >
> > > > > > I am not sure if I understand this completely. Dropping packages
> in
> > > TCP
> > > > > > shouldn't cause corruption in the TCP buffer, right? Is this an
> > issue
> > > > in
> > > > > > Kafka or OS/JVM?
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > > On Fri, Jul 27, 2012 at 8:29 PM, jjian fan <
> > xiaofanhadoop@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Jun:
> > > > > > > Yes, if the socket server can't handle the package quickly, tcp
> > > > > protocol
> > > > > > > will drop some network package until the buffer is overflow,
>  the
> > > > > > corrupted
> > > > > > > messages is also appear on this situtation!  I run a systemtap
> > > script
> > > > > to
> > > > > > > find the package droping ,also you can type " cat
> > > /proc/net/sockstat"
> > > > > to
> > > > > > > see the tcp memory increase.  I debug the whole kafka source
> code
> > > to
> > > > > find
> > > > > > > the bug in file.mkdir() of LogManager.createlog.
> > > > > > >
> > > > > > > JIan Fan
> > > > > > >
> > > > > > > 2012/7/27 Jun Rao <ju...@gmail.com>
> > > > > > >
> > > > > > > > Thanks for the finding. Are you saying that this problem is
> > > caused
> > > > by
> > > > > > the
> > > > > > > > buffering in Kafka socket server? How did you figure that
> out?
> > Is
> > > > > this
> > > > > > > > problem exposed by the same test that caused the corrupted
> > > messages
> > > > > in
> > > > > > > the
> > > > > > > > broker?
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > Jun
> > > > > > > >
> > > > > > > > On Fri, Jul 27, 2012 at 2:16 AM, jjian fan <
> > > > xiaofanhadoop@gmail.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > >     In high cocurrent environment, the tcp server will drop
> > > some
> > > > > > > package
> > > > > > > > > when the tcp buffer is over. Then LogManager.createlog will
> > > > create
> > > > > > some
> > > > > > > > > no-exists topic log. But one thing is very strange, the log
> > > > > directory
> > > > > > > > > should be like a-0,a-1, a-2 and so on ,but file.mkdir()
> > create
> > > > log
> > > > > > > > > directory like a. Seems some bug in file.mkdir() of
> > > > > > > LogManager.createlog.
> > > > > > > > >
> > > > > > > > > the exception message is
> > > > > > > > >
> > > > > > > > > [2012-07-27 17:08:00,559] INFO create directory
> > > /data/kafka/axx-0
> > > > > > > > > (kafka.log.LogManager)
> > > > > > > > > [2012-07-27 17:08:00,561] ERROR Error processing
> > > > > MultiProducerRequest
> > > > > > > on
> > > > > > > > > axx:0 (kafka.server.KafkaRequestHandlers)
> > > > > > > > > java.io.FileNotFoundException:
> > > > > > > > /data/kafka/axx-0/00000000000000000000.kafka
> > > > > > > > > (Is a directory)
> > > > > > > > > at java.io.RandomAccessFile.open(Native Method)
> > > > > > > > > at
> java.io.RandomAccessFile.<init>(RandomAccessFile.java:233)
> > > > > > > > > at kafka.utils.Utils$.openChannel(Utils.scala:324)
> > > > > > > > > at
> > kafka.message.FileMessageSet.<init>(FileMessageSet.scala:75)
> > > > > > > > > at kafka.log.Log.loadSegments(Log.scala:144)
> > > > > > > > > at kafka.log.Log.<init>(Log.scala:116)
> > > > > > > > > at kafka.log.LogManager.createLog(LogManager.scala:159)
> > > > > > > > > at
> kafka.log.LogManager.getOrCreateLog(LogManager.scala:214)
> > > > > > > > > at
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:74)
> > > > > > > > > at
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > > > > > > > at
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > > > > > > > at
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > > > > > > > at
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > > > > > > > at
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> > > > > > > > > at
> > scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> > > > > > > > > at
> > > > > > >
> > > scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> > > > > > > > > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> > > > > > > > > at
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
> > > > > > > > > at
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > > > > > > > at
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > > > > > > > at kafka.network.Processor.handle(SocketServer.scala:296)
> > > > > > > > > at kafka.network.Processor.read(SocketServer.scala:319)
> > > > > > > > > at kafka.network.Processor.run(SocketServer.scala:214)
> > > > > > > > > at java.lang.Thread.run(Thread.java:679)
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: error in LogManager.createlog()

Posted by jjian fan <xi...@gmail.com>.
Jun:

  Hi. I find why the error appear. In high cocurrent environment, the tcp
server will drop some package when the tcp buffer is over. So there are
some chances that "topic" contains one or more characters that encode to
bytes that include NULL (0).
  I have submit the patch to kafka-411, pls check that!

Thanks!
Jian Fan

2012/7/30 Jun Rao <ju...@gmail.com>

> Jian,
>
> All log directories in kafka are created by LogManager.createLog(). As you
> can see, the directory always has the form of topic-partitionId. So, it's
> not clear how a directory of "a" can be created in your case. I will try to
> rerun your test and see if it can be reproduced.
>
> Thanks,
>
> Jun
>
> On Sat, Jul 28, 2012 at 7:35 PM, jjian fan <xi...@gmail.com>
> wrote:
>
> > Jay:
> >
> >    You can try to send 600 thousand message per second to the broker, you
> > can find the tcp will drop packages, so sometimes the topic of ax will be
> > a. I don't mean to slove the tcp problem from application level, I just
> > find there are myabe a bug in file.mkdir() of LogManager.createlog. It
> will
> > infect the kafka useage.
> >
> > Thanks
> > Jian Fan
> >
> > 2012/7/29 Jay Kreps <ja...@gmail.com>
> >
> > > Hmm, that is not my understanding of TCP. TCP is a reliable protocol so
> > it
> > > is supposed to either deliver packets in order or timeout retrying. In
> > the
> > > case of the topic name, that is a size-delimited string, there should
> be
> > no
> > > way for it to drop a single byte in the middle of the request like
> that.
> > If
> > > that is in fact happening, I don't think it is something we can hope to
> > > recover from at the application level...
> > >
> > > -Jay
> > >
> > > On Fri, Jul 27, 2012 at 9:45 PM, jjian fan <xi...@gmail.com>
> > > wrote:
> > >
> > > > Jun:
> > > >    Dropping packages in TCP is an issue of OS/JVM, but it can also
> > cause
> > > > some kafka issue!
> > > >    For example, the topic of the message is ax, but it can change to
> a
> > in
> > > > broker because the some packages is drop, so the log directory
> > > >    should be like a-0,a-1, a-2 and so on ,but file.mkdir() create log
> > > > directory like a. Seems some bugs in file.mkdir() of
> > > LogManager.createlog.
> > > >    If you shutdown the broker and restart it. The the broker will
> > report
> > > > the exception like this:
> > > >
> > > > [2012-07-28 12:43:44,565] INFO Loading log 'a' (kafka.log.LogManager)
> > > > [2012-07-28 12:43:44,574] FATAL Fatal error during KafkaServerStable
> > > > startup. Prepare to shutdown (kafka.server.KafkaServerStartable)
> > > > java.lang.StringIndexOutOfBoundsException: String index out of range:
> > -1
> > > >     at java.lang.String.substring(String.java:1949)
> > > >     at kafka.utils.Utils$.getTopicPartition(Utils.scala:558)
> > > >     at kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:71)
> > > >     at kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:65)
> > > >     at
> > > >
> > > >
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> > > >     at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> > > >     at kafka.log.LogManager.<init>(LogManager.scala:65)
> > > >     at kafka.server.KafkaServer.startup(KafkaServer.scala:58)
> > > >     at
> > > >
> > kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)
> > > >     at kafka.Kafka$.main(Kafka.scala:50)
> > > >     at kafka.Kafka.main(Kafka.scala)
> > > >
> > > >
> > > > 2012/7/28 Jun Rao <ju...@gmail.com>
> > > >
> > > > > Jian,
> > > > >
> > > > > I am not sure if I understand this completely. Dropping packages in
> > TCP
> > > > > shouldn't cause corruption in the TCP buffer, right? Is this an
> issue
> > > in
> > > > > Kafka or OS/JVM?
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > > On Fri, Jul 27, 2012 at 8:29 PM, jjian fan <
> xiaofanhadoop@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Jun:
> > > > > > Yes, if the socket server can't handle the package quickly, tcp
> > > > protocol
> > > > > > will drop some network package until the buffer is overflow,  the
> > > > > corrupted
> > > > > > messages is also appear on this situtation!  I run a systemtap
> > script
> > > > to
> > > > > > find the package droping ,also you can type " cat
> > /proc/net/sockstat"
> > > > to
> > > > > > see the tcp memory increase.  I debug the whole kafka source code
> > to
> > > > find
> > > > > > the bug in file.mkdir() of LogManager.createlog.
> > > > > >
> > > > > > JIan Fan
> > > > > >
> > > > > > 2012/7/27 Jun Rao <ju...@gmail.com>
> > > > > >
> > > > > > > Thanks for the finding. Are you saying that this problem is
> > caused
> > > by
> > > > > the
> > > > > > > buffering in Kafka socket server? How did you figure that out?
> Is
> > > > this
> > > > > > > problem exposed by the same test that caused the corrupted
> > messages
> > > > in
> > > > > > the
> > > > > > > broker?
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Jun
> > > > > > >
> > > > > > > On Fri, Jul 27, 2012 at 2:16 AM, jjian fan <
> > > xiaofanhadoop@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > >     In high cocurrent environment, the tcp server will drop
> > some
> > > > > > package
> > > > > > > > when the tcp buffer is over. Then LogManager.createlog will
> > > create
> > > > > some
> > > > > > > > no-exists topic log. But one thing is very strange, the log
> > > > directory
> > > > > > > > should be like a-0,a-1, a-2 and so on ,but file.mkdir()
> create
> > > log
> > > > > > > > directory like a. Seems some bug in file.mkdir() of
> > > > > > LogManager.createlog.
> > > > > > > >
> > > > > > > > the exception message is
> > > > > > > >
> > > > > > > > [2012-07-27 17:08:00,559] INFO create directory
> > /data/kafka/axx-0
> > > > > > > > (kafka.log.LogManager)
> > > > > > > > [2012-07-27 17:08:00,561] ERROR Error processing
> > > > MultiProducerRequest
> > > > > > on
> > > > > > > > axx:0 (kafka.server.KafkaRequestHandlers)
> > > > > > > > java.io.FileNotFoundException:
> > > > > > > /data/kafka/axx-0/00000000000000000000.kafka
> > > > > > > > (Is a directory)
> > > > > > > > at java.io.RandomAccessFile.open(Native Method)
> > > > > > > > at java.io.RandomAccessFile.<init>(RandomAccessFile.java:233)
> > > > > > > > at kafka.utils.Utils$.openChannel(Utils.scala:324)
> > > > > > > > at
> kafka.message.FileMessageSet.<init>(FileMessageSet.scala:75)
> > > > > > > > at kafka.log.Log.loadSegments(Log.scala:144)
> > > > > > > > at kafka.log.Log.<init>(Log.scala:116)
> > > > > > > > at kafka.log.LogManager.createLog(LogManager.scala:159)
> > > > > > > > at kafka.log.LogManager.getOrCreateLog(LogManager.scala:214)
> > > > > > > > at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:74)
> > > > > > > > at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > > > > > > at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > > > > > > at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > > > > > > at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > > > > > > at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> > > > > > > > at
> scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> > > > > > > > at
> > > > > >
> > scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> > > > > > > > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> > > > > > > > at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
> > > > > > > > at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > > > > > > at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > > > > > > at kafka.network.Processor.handle(SocketServer.scala:296)
> > > > > > > > at kafka.network.Processor.read(SocketServer.scala:319)
> > > > > > > > at kafka.network.Processor.run(SocketServer.scala:214)
> > > > > > > > at java.lang.Thread.run(Thread.java:679)
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: error in LogManager.createlog()

Posted by Jun Rao <ju...@gmail.com>.
Jian,

All log directories in kafka are created by LogManager.createLog(). As you
can see, the directory always has the form of topic-partitionId. So, it's
not clear how a directory of "a" can be created in your case. I will try to
rerun your test and see if it can be reproduced.

Thanks,

Jun

On Sat, Jul 28, 2012 at 7:35 PM, jjian fan <xi...@gmail.com> wrote:

> Jay:
>
>    You can try to send 600 thousand message per second to the broker, you
> can find the tcp will drop packages, so sometimes the topic of ax will be
> a. I don't mean to slove the tcp problem from application level, I just
> find there are myabe a bug in file.mkdir() of LogManager.createlog. It will
> infect the kafka useage.
>
> Thanks
> Jian Fan
>
> 2012/7/29 Jay Kreps <ja...@gmail.com>
>
> > Hmm, that is not my understanding of TCP. TCP is a reliable protocol so
> it
> > is supposed to either deliver packets in order or timeout retrying. In
> the
> > case of the topic name, that is a size-delimited string, there should be
> no
> > way for it to drop a single byte in the middle of the request like that.
> If
> > that is in fact happening, I don't think it is something we can hope to
> > recover from at the application level...
> >
> > -Jay
> >
> > On Fri, Jul 27, 2012 at 9:45 PM, jjian fan <xi...@gmail.com>
> > wrote:
> >
> > > Jun:
> > >    Dropping packages in TCP is an issue of OS/JVM, but it can also
> cause
> > > some kafka issue!
> > >    For example, the topic of the message is ax, but it can change to a
> in
> > > broker because the some packages is drop, so the log directory
> > >    should be like a-0,a-1, a-2 and so on ,but file.mkdir() create log
> > > directory like a. Seems some bugs in file.mkdir() of
> > LogManager.createlog.
> > >    If you shutdown the broker and restart it. The the broker will
> report
> > > the exception like this:
> > >
> > > [2012-07-28 12:43:44,565] INFO Loading log 'a' (kafka.log.LogManager)
> > > [2012-07-28 12:43:44,574] FATAL Fatal error during KafkaServerStable
> > > startup. Prepare to shutdown (kafka.server.KafkaServerStartable)
> > > java.lang.StringIndexOutOfBoundsException: String index out of range:
> -1
> > >     at java.lang.String.substring(String.java:1949)
> > >     at kafka.utils.Utils$.getTopicPartition(Utils.scala:558)
> > >     at kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:71)
> > >     at kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:65)
> > >     at
> > >
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> > >     at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> > >     at kafka.log.LogManager.<init>(LogManager.scala:65)
> > >     at kafka.server.KafkaServer.startup(KafkaServer.scala:58)
> > >     at
> > >
> kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)
> > >     at kafka.Kafka$.main(Kafka.scala:50)
> > >     at kafka.Kafka.main(Kafka.scala)
> > >
> > >
> > > 2012/7/28 Jun Rao <ju...@gmail.com>
> > >
> > > > Jian,
> > > >
> > > > I am not sure if I understand this completely. Dropping packages in
> TCP
> > > > shouldn't cause corruption in the TCP buffer, right? Is this an issue
> > in
> > > > Kafka or OS/JVM?
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Fri, Jul 27, 2012 at 8:29 PM, jjian fan <xi...@gmail.com>
> > > > wrote:
> > > >
> > > > > Jun:
> > > > > Yes, if the socket server can't handle the package quickly, tcp
> > > protocol
> > > > > will drop some network package until the buffer is overflow,  the
> > > > corrupted
> > > > > messages is also appear on this situtation!  I run a systemtap
> script
> > > to
> > > > > find the package droping ,also you can type " cat
> /proc/net/sockstat"
> > > to
> > > > > see the tcp memory increase.  I debug the whole kafka source code
> to
> > > find
> > > > > the bug in file.mkdir() of LogManager.createlog.
> > > > >
> > > > > JIan Fan
> > > > >
> > > > > 2012/7/27 Jun Rao <ju...@gmail.com>
> > > > >
> > > > > > Thanks for the finding. Are you saying that this problem is
> caused
> > by
> > > > the
> > > > > > buffering in Kafka socket server? How did you figure that out? Is
> > > this
> > > > > > problem exposed by the same test that caused the corrupted
> messages
> > > in
> > > > > the
> > > > > > broker?
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > > On Fri, Jul 27, 2012 at 2:16 AM, jjian fan <
> > xiaofanhadoop@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > >     In high cocurrent environment, the tcp server will drop
> some
> > > > > package
> > > > > > > when the tcp buffer is over. Then LogManager.createlog will
> > create
> > > > some
> > > > > > > no-exists topic log. But one thing is very strange, the log
> > > directory
> > > > > > > should be like a-0,a-1, a-2 and so on ,but file.mkdir() create
> > log
> > > > > > > directory like a. Seems some bug in file.mkdir() of
> > > > > LogManager.createlog.
> > > > > > >
> > > > > > > the exception message is
> > > > > > >
> > > > > > > [2012-07-27 17:08:00,559] INFO create directory
> /data/kafka/axx-0
> > > > > > > (kafka.log.LogManager)
> > > > > > > [2012-07-27 17:08:00,561] ERROR Error processing
> > > MultiProducerRequest
> > > > > on
> > > > > > > axx:0 (kafka.server.KafkaRequestHandlers)
> > > > > > > java.io.FileNotFoundException:
> > > > > > /data/kafka/axx-0/00000000000000000000.kafka
> > > > > > > (Is a directory)
> > > > > > > at java.io.RandomAccessFile.open(Native Method)
> > > > > > > at java.io.RandomAccessFile.<init>(RandomAccessFile.java:233)
> > > > > > > at kafka.utils.Utils$.openChannel(Utils.scala:324)
> > > > > > > at kafka.message.FileMessageSet.<init>(FileMessageSet.scala:75)
> > > > > > > at kafka.log.Log.loadSegments(Log.scala:144)
> > > > > > > at kafka.log.Log.<init>(Log.scala:116)
> > > > > > > at kafka.log.LogManager.createLog(LogManager.scala:159)
> > > > > > > at kafka.log.LogManager.getOrCreateLog(LogManager.scala:214)
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:74)
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> > > > > > > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> > > > > > > at
> > > > >
> scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> > > > > > > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > > > > > at kafka.network.Processor.handle(SocketServer.scala:296)
> > > > > > > at kafka.network.Processor.read(SocketServer.scala:319)
> > > > > > > at kafka.network.Processor.run(SocketServer.scala:214)
> > > > > > > at java.lang.Thread.run(Thread.java:679)
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: error in LogManager.createlog()

Posted by jjian fan <xi...@gmail.com>.
Jay:

   You can try to send 600 thousand message per second to the broker, you
can find the tcp will drop packages, so sometimes the topic of ax will be
a. I don't mean to slove the tcp problem from application level, I just
find there are myabe a bug in file.mkdir() of LogManager.createlog. It will
infect the kafka useage.

Thanks
Jian Fan

2012/7/29 Jay Kreps <ja...@gmail.com>

> Hmm, that is not my understanding of TCP. TCP is a reliable protocol so it
> is supposed to either deliver packets in order or timeout retrying. In the
> case of the topic name, that is a size-delimited string, there should be no
> way for it to drop a single byte in the middle of the request like that. If
> that is in fact happening, I don't think it is something we can hope to
> recover from at the application level...
>
> -Jay
>
> On Fri, Jul 27, 2012 at 9:45 PM, jjian fan <xi...@gmail.com>
> wrote:
>
> > Jun:
> >    Dropping packages in TCP is an issue of OS/JVM, but it can also cause
> > some kafka issue!
> >    For example, the topic of the message is ax, but it can change to a in
> > broker because the some packages is drop, so the log directory
> >    should be like a-0,a-1, a-2 and so on ,but file.mkdir() create log
> > directory like a. Seems some bugs in file.mkdir() of
> LogManager.createlog.
> >    If you shutdown the broker and restart it. The the broker will report
> > the exception like this:
> >
> > [2012-07-28 12:43:44,565] INFO Loading log 'a' (kafka.log.LogManager)
> > [2012-07-28 12:43:44,574] FATAL Fatal error during KafkaServerStable
> > startup. Prepare to shutdown (kafka.server.KafkaServerStartable)
> > java.lang.StringIndexOutOfBoundsException: String index out of range: -1
> >     at java.lang.String.substring(String.java:1949)
> >     at kafka.utils.Utils$.getTopicPartition(Utils.scala:558)
> >     at kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:71)
> >     at kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:65)
> >     at
> >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> >     at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> >     at kafka.log.LogManager.<init>(LogManager.scala:65)
> >     at kafka.server.KafkaServer.startup(KafkaServer.scala:58)
> >     at
> > kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)
> >     at kafka.Kafka$.main(Kafka.scala:50)
> >     at kafka.Kafka.main(Kafka.scala)
> >
> >
> > 2012/7/28 Jun Rao <ju...@gmail.com>
> >
> > > Jian,
> > >
> > > I am not sure if I understand this completely. Dropping packages in TCP
> > > shouldn't cause corruption in the TCP buffer, right? Is this an issue
> in
> > > Kafka or OS/JVM?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Fri, Jul 27, 2012 at 8:29 PM, jjian fan <xi...@gmail.com>
> > > wrote:
> > >
> > > > Jun:
> > > > Yes, if the socket server can't handle the package quickly, tcp
> > protocol
> > > > will drop some network package until the buffer is overflow,  the
> > > corrupted
> > > > messages is also appear on this situtation!  I run a systemtap script
> > to
> > > > find the package droping ,also you can type " cat /proc/net/sockstat"
> > to
> > > > see the tcp memory increase.  I debug the whole kafka source code to
> > find
> > > > the bug in file.mkdir() of LogManager.createlog.
> > > >
> > > > JIan Fan
> > > >
> > > > 2012/7/27 Jun Rao <ju...@gmail.com>
> > > >
> > > > > Thanks for the finding. Are you saying that this problem is caused
> by
> > > the
> > > > > buffering in Kafka socket server? How did you figure that out? Is
> > this
> > > > > problem exposed by the same test that caused the corrupted messages
> > in
> > > > the
> > > > > broker?
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > > On Fri, Jul 27, 2012 at 2:16 AM, jjian fan <
> xiaofanhadoop@gmail.com>
> > > > > wrote:
> > > > >
> > > > > >     In high cocurrent environment, the tcp server will drop some
> > > > package
> > > > > > when the tcp buffer is over. Then LogManager.createlog will
> create
> > > some
> > > > > > no-exists topic log. But one thing is very strange, the log
> > directory
> > > > > > should be like a-0,a-1, a-2 and so on ,but file.mkdir() create
> log
> > > > > > directory like a. Seems some bug in file.mkdir() of
> > > > LogManager.createlog.
> > > > > >
> > > > > > the exception message is
> > > > > >
> > > > > > [2012-07-27 17:08:00,559] INFO create directory /data/kafka/axx-0
> > > > > > (kafka.log.LogManager)
> > > > > > [2012-07-27 17:08:00,561] ERROR Error processing
> > MultiProducerRequest
> > > > on
> > > > > > axx:0 (kafka.server.KafkaRequestHandlers)
> > > > > > java.io.FileNotFoundException:
> > > > > /data/kafka/axx-0/00000000000000000000.kafka
> > > > > > (Is a directory)
> > > > > > at java.io.RandomAccessFile.open(Native Method)
> > > > > > at java.io.RandomAccessFile.<init>(RandomAccessFile.java:233)
> > > > > > at kafka.utils.Utils$.openChannel(Utils.scala:324)
> > > > > > at kafka.message.FileMessageSet.<init>(FileMessageSet.scala:75)
> > > > > > at kafka.log.Log.loadSegments(Log.scala:144)
> > > > > > at kafka.log.Log.<init>(Log.scala:116)
> > > > > > at kafka.log.LogManager.createLog(LogManager.scala:159)
> > > > > > at kafka.log.LogManager.getOrCreateLog(LogManager.scala:214)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:74)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> > > > > > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> > > > > > at
> > > > scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> > > > > > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > > > > at kafka.network.Processor.handle(SocketServer.scala:296)
> > > > > > at kafka.network.Processor.read(SocketServer.scala:319)
> > > > > > at kafka.network.Processor.run(SocketServer.scala:214)
> > > > > > at java.lang.Thread.run(Thread.java:679)
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: error in LogManager.createlog()

Posted by Jay Kreps <ja...@gmail.com>.
Hmm, that is not my understanding of TCP. TCP is a reliable protocol so it
is supposed to either deliver packets in order or timeout retrying. In the
case of the topic name, that is a size-delimited string, there should be no
way for it to drop a single byte in the middle of the request like that. If
that is in fact happening, I don't think it is something we can hope to
recover from at the application level...

-Jay

On Fri, Jul 27, 2012 at 9:45 PM, jjian fan <xi...@gmail.com> wrote:

> Jun:
>    Dropping packages in TCP is an issue of OS/JVM, but it can also cause
> some kafka issue!
>    For example, the topic of the message is ax, but it can change to a in
> broker because the some packages is drop, so the log directory
>    should be like a-0,a-1, a-2 and so on ,but file.mkdir() create log
> directory like a. Seems some bugs in file.mkdir() of LogManager.createlog.
>    If you shutdown the broker and restart it. The the broker will report
> the exception like this:
>
> [2012-07-28 12:43:44,565] INFO Loading log 'a' (kafka.log.LogManager)
> [2012-07-28 12:43:44,574] FATAL Fatal error during KafkaServerStable
> startup. Prepare to shutdown (kafka.server.KafkaServerStartable)
> java.lang.StringIndexOutOfBoundsException: String index out of range: -1
>     at java.lang.String.substring(String.java:1949)
>     at kafka.utils.Utils$.getTopicPartition(Utils.scala:558)
>     at kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:71)
>     at kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:65)
>     at
>
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
>     at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
>     at kafka.log.LogManager.<init>(LogManager.scala:65)
>     at kafka.server.KafkaServer.startup(KafkaServer.scala:58)
>     at
> kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)
>     at kafka.Kafka$.main(Kafka.scala:50)
>     at kafka.Kafka.main(Kafka.scala)
>
>
> 2012/7/28 Jun Rao <ju...@gmail.com>
>
> > Jian,
> >
> > I am not sure if I understand this completely. Dropping packages in TCP
> > shouldn't cause corruption in the TCP buffer, right? Is this an issue in
> > Kafka or OS/JVM?
> >
> > Thanks,
> >
> > Jun
> >
> > On Fri, Jul 27, 2012 at 8:29 PM, jjian fan <xi...@gmail.com>
> > wrote:
> >
> > > Jun:
> > > Yes, if the socket server can't handle the package quickly, tcp
> protocol
> > > will drop some network package until the buffer is overflow,  the
> > corrupted
> > > messages is also appear on this situtation!  I run a systemtap script
> to
> > > find the package droping ,also you can type " cat /proc/net/sockstat"
> to
> > > see the tcp memory increase.  I debug the whole kafka source code to
> find
> > > the bug in file.mkdir() of LogManager.createlog.
> > >
> > > JIan Fan
> > >
> > > 2012/7/27 Jun Rao <ju...@gmail.com>
> > >
> > > > Thanks for the finding. Are you saying that this problem is caused by
> > the
> > > > buffering in Kafka socket server? How did you figure that out? Is
> this
> > > > problem exposed by the same test that caused the corrupted messages
> in
> > > the
> > > > broker?
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Fri, Jul 27, 2012 at 2:16 AM, jjian fan <xi...@gmail.com>
> > > > wrote:
> > > >
> > > > >     In high cocurrent environment, the tcp server will drop some
> > > package
> > > > > when the tcp buffer is over. Then LogManager.createlog will create
> > some
> > > > > no-exists topic log. But one thing is very strange, the log
> directory
> > > > > should be like a-0,a-1, a-2 and so on ,but file.mkdir() create log
> > > > > directory like a. Seems some bug in file.mkdir() of
> > > LogManager.createlog.
> > > > >
> > > > > the exception message is
> > > > >
> > > > > [2012-07-27 17:08:00,559] INFO create directory /data/kafka/axx-0
> > > > > (kafka.log.LogManager)
> > > > > [2012-07-27 17:08:00,561] ERROR Error processing
> MultiProducerRequest
> > > on
> > > > > axx:0 (kafka.server.KafkaRequestHandlers)
> > > > > java.io.FileNotFoundException:
> > > > /data/kafka/axx-0/00000000000000000000.kafka
> > > > > (Is a directory)
> > > > > at java.io.RandomAccessFile.open(Native Method)
> > > > > at java.io.RandomAccessFile.<init>(RandomAccessFile.java:233)
> > > > > at kafka.utils.Utils$.openChannel(Utils.scala:324)
> > > > > at kafka.message.FileMessageSet.<init>(FileMessageSet.scala:75)
> > > > > at kafka.log.Log.loadSegments(Log.scala:144)
> > > > > at kafka.log.Log.<init>(Log.scala:116)
> > > > > at kafka.log.LogManager.createLog(LogManager.scala:159)
> > > > > at kafka.log.LogManager.getOrCreateLog(LogManager.scala:214)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:74)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> > > > > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> > > > > at
> > > scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> > > > > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > > > at kafka.network.Processor.handle(SocketServer.scala:296)
> > > > > at kafka.network.Processor.read(SocketServer.scala:319)
> > > > > at kafka.network.Processor.run(SocketServer.scala:214)
> > > > > at java.lang.Thread.run(Thread.java:679)
> > > > >
> > > >
> > >
> >
>

Re: error in LogManager.createlog()

Posted by jjian fan <xi...@gmail.com>.
Jun:
   Dropping packages in TCP is an issue of OS/JVM, but it can also cause
some kafka issue!
   For example, the topic of the message is ax, but it can change to a in
broker because the some packages is drop, so the log directory
   should be like a-0,a-1, a-2 and so on ,but file.mkdir() create log
directory like a. Seems some bugs in file.mkdir() of LogManager.createlog.
   If you shutdown the broker and restart it. The the broker will report
the exception like this:

[2012-07-28 12:43:44,565] INFO Loading log 'a' (kafka.log.LogManager)
[2012-07-28 12:43:44,574] FATAL Fatal error during KafkaServerStable
startup. Prepare to shutdown (kafka.server.KafkaServerStartable)
java.lang.StringIndexOutOfBoundsException: String index out of range: -1
    at java.lang.String.substring(String.java:1949)
    at kafka.utils.Utils$.getTopicPartition(Utils.scala:558)
    at kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:71)
    at kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:65)
    at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
    at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
    at kafka.log.LogManager.<init>(LogManager.scala:65)
    at kafka.server.KafkaServer.startup(KafkaServer.scala:58)
    at
kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)
    at kafka.Kafka$.main(Kafka.scala:50)
    at kafka.Kafka.main(Kafka.scala)


2012/7/28 Jun Rao <ju...@gmail.com>

> Jian,
>
> I am not sure if I understand this completely. Dropping packages in TCP
> shouldn't cause corruption in the TCP buffer, right? Is this an issue in
> Kafka or OS/JVM?
>
> Thanks,
>
> Jun
>
> On Fri, Jul 27, 2012 at 8:29 PM, jjian fan <xi...@gmail.com>
> wrote:
>
> > Jun:
> > Yes, if the socket server can't handle the package quickly, tcp protocol
> > will drop some network package until the buffer is overflow,  the
> corrupted
> > messages is also appear on this situtation!  I run a systemtap script to
> > find the package droping ,also you can type " cat /proc/net/sockstat" to
> > see the tcp memory increase.  I debug the whole kafka source code to find
> > the bug in file.mkdir() of LogManager.createlog.
> >
> > JIan Fan
> >
> > 2012/7/27 Jun Rao <ju...@gmail.com>
> >
> > > Thanks for the finding. Are you saying that this problem is caused by
> the
> > > buffering in Kafka socket server? How did you figure that out? Is this
> > > problem exposed by the same test that caused the corrupted messages in
> > the
> > > broker?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Fri, Jul 27, 2012 at 2:16 AM, jjian fan <xi...@gmail.com>
> > > wrote:
> > >
> > > >     In high cocurrent environment, the tcp server will drop some
> > package
> > > > when the tcp buffer is over. Then LogManager.createlog will create
> some
> > > > no-exists topic log. But one thing is very strange, the log directory
> > > > should be like a-0,a-1, a-2 and so on ,but file.mkdir() create log
> > > > directory like a. Seems some bug in file.mkdir() of
> > LogManager.createlog.
> > > >
> > > > the exception message is
> > > >
> > > > [2012-07-27 17:08:00,559] INFO create directory /data/kafka/axx-0
> > > > (kafka.log.LogManager)
> > > > [2012-07-27 17:08:00,561] ERROR Error processing MultiProducerRequest
> > on
> > > > axx:0 (kafka.server.KafkaRequestHandlers)
> > > > java.io.FileNotFoundException:
> > > /data/kafka/axx-0/00000000000000000000.kafka
> > > > (Is a directory)
> > > > at java.io.RandomAccessFile.open(Native Method)
> > > > at java.io.RandomAccessFile.<init>(RandomAccessFile.java:233)
> > > > at kafka.utils.Utils$.openChannel(Utils.scala:324)
> > > > at kafka.message.FileMessageSet.<init>(FileMessageSet.scala:75)
> > > > at kafka.log.Log.loadSegments(Log.scala:144)
> > > > at kafka.log.Log.<init>(Log.scala:116)
> > > > at kafka.log.LogManager.createLog(LogManager.scala:159)
> > > > at kafka.log.LogManager.getOrCreateLog(LogManager.scala:214)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:74)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > > at
> > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > > at
> > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > > at
> > > >
> > > >
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> > > > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> > > > at
> > scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> > > > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > > at kafka.network.Processor.handle(SocketServer.scala:296)
> > > > at kafka.network.Processor.read(SocketServer.scala:319)
> > > > at kafka.network.Processor.run(SocketServer.scala:214)
> > > > at java.lang.Thread.run(Thread.java:679)
> > > >
> > >
> >
>

Re: error in LogManager.createlog()

Posted by Jun Rao <ju...@gmail.com>.
Jian,

I am not sure if I understand this completely. Dropping packages in TCP
shouldn't cause corruption in the TCP buffer, right? Is this an issue in
Kafka or OS/JVM?

Thanks,

Jun

On Fri, Jul 27, 2012 at 8:29 PM, jjian fan <xi...@gmail.com> wrote:

> Jun:
> Yes, if the socket server can't handle the package quickly, tcp protocol
> will drop some network package until the buffer is overflow,  the corrupted
> messages is also appear on this situtation!  I run a systemtap script to
> find the package droping ,also you can type " cat /proc/net/sockstat" to
> see the tcp memory increase.  I debug the whole kafka source code to find
> the bug in file.mkdir() of LogManager.createlog.
>
> JIan Fan
>
> 2012/7/27 Jun Rao <ju...@gmail.com>
>
> > Thanks for the finding. Are you saying that this problem is caused by the
> > buffering in Kafka socket server? How did you figure that out? Is this
> > problem exposed by the same test that caused the corrupted messages in
> the
> > broker?
> >
> > Thanks,
> >
> > Jun
> >
> > On Fri, Jul 27, 2012 at 2:16 AM, jjian fan <xi...@gmail.com>
> > wrote:
> >
> > >     In high cocurrent environment, the tcp server will drop some
> package
> > > when the tcp buffer is over. Then LogManager.createlog will create some
> > > no-exists topic log. But one thing is very strange, the log directory
> > > should be like a-0,a-1, a-2 and so on ,but file.mkdir() create log
> > > directory like a. Seems some bug in file.mkdir() of
> LogManager.createlog.
> > >
> > > the exception message is
> > >
> > > [2012-07-27 17:08:00,559] INFO create directory /data/kafka/axx-0
> > > (kafka.log.LogManager)
> > > [2012-07-27 17:08:00,561] ERROR Error processing MultiProducerRequest
> on
> > > axx:0 (kafka.server.KafkaRequestHandlers)
> > > java.io.FileNotFoundException:
> > /data/kafka/axx-0/00000000000000000000.kafka
> > > (Is a directory)
> > > at java.io.RandomAccessFile.open(Native Method)
> > > at java.io.RandomAccessFile.<init>(RandomAccessFile.java:233)
> > > at kafka.utils.Utils$.openChannel(Utils.scala:324)
> > > at kafka.message.FileMessageSet.<init>(FileMessageSet.scala:75)
> > > at kafka.log.Log.loadSegments(Log.scala:144)
> > > at kafka.log.Log.<init>(Log.scala:116)
> > > at kafka.log.LogManager.createLog(LogManager.scala:159)
> > > at kafka.log.LogManager.getOrCreateLog(LogManager.scala:214)
> > > at
> > >
> > >
> >
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:74)
> > > at
> > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > at
> > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > at
> > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > at
> > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > at
> > >
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> > > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> > > at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> > > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> > > at
> > >
> > >
> >
> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
> > > at
> > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > at
> > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > at kafka.network.Processor.handle(SocketServer.scala:296)
> > > at kafka.network.Processor.read(SocketServer.scala:319)
> > > at kafka.network.Processor.run(SocketServer.scala:214)
> > > at java.lang.Thread.run(Thread.java:679)
> > >
> >
>

Re: error in LogManager.createlog()

Posted by jjian fan <xi...@gmail.com>.
Jun:
Yes, if the socket server can't handle the package quickly, tcp protocol
will drop some network package until the buffer is overflow,  the corrupted
messages is also appear on this situtation!  I run a systemtap script to
find the package droping ,also you can type " cat /proc/net/sockstat" to
see the tcp memory increase.  I debug the whole kafka source code to find
the bug in file.mkdir() of LogManager.createlog.

JIan Fan

2012/7/27 Jun Rao <ju...@gmail.com>

> Thanks for the finding. Are you saying that this problem is caused by the
> buffering in Kafka socket server? How did you figure that out? Is this
> problem exposed by the same test that caused the corrupted messages in the
> broker?
>
> Thanks,
>
> Jun
>
> On Fri, Jul 27, 2012 at 2:16 AM, jjian fan <xi...@gmail.com>
> wrote:
>
> >     In high cocurrent environment, the tcp server will drop some package
> > when the tcp buffer is over. Then LogManager.createlog will create some
> > no-exists topic log. But one thing is very strange, the log directory
> > should be like a-0,a-1, a-2 and so on ,but file.mkdir() create log
> > directory like a. Seems some bug in file.mkdir() of LogManager.createlog.
> >
> > the exception message is
> >
> > [2012-07-27 17:08:00,559] INFO create directory /data/kafka/axx-0
> > (kafka.log.LogManager)
> > [2012-07-27 17:08:00,561] ERROR Error processing MultiProducerRequest on
> > axx:0 (kafka.server.KafkaRequestHandlers)
> > java.io.FileNotFoundException:
> /data/kafka/axx-0/00000000000000000000.kafka
> > (Is a directory)
> > at java.io.RandomAccessFile.open(Native Method)
> > at java.io.RandomAccessFile.<init>(RandomAccessFile.java:233)
> > at kafka.utils.Utils$.openChannel(Utils.scala:324)
> > at kafka.message.FileMessageSet.<init>(FileMessageSet.scala:75)
> > at kafka.log.Log.loadSegments(Log.scala:144)
> > at kafka.log.Log.<init>(Log.scala:116)
> > at kafka.log.LogManager.createLog(LogManager.scala:159)
> > at kafka.log.LogManager.getOrCreateLog(LogManager.scala:214)
> > at
> >
> >
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:74)
> > at
> >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > at
> >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > at
> >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > at
> >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > at
> >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> > at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> > at
> >
> >
> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
> > at
> >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > at
> >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > at kafka.network.Processor.handle(SocketServer.scala:296)
> > at kafka.network.Processor.read(SocketServer.scala:319)
> > at kafka.network.Processor.run(SocketServer.scala:214)
> > at java.lang.Thread.run(Thread.java:679)
> >
>

Re: error in LogManager.createlog()

Posted by Jun Rao <ju...@gmail.com>.
Thanks for the finding. Are you saying that this problem is caused by the
buffering in Kafka socket server? How did you figure that out? Is this
problem exposed by the same test that caused the corrupted messages in the
broker?

Thanks,

Jun

On Fri, Jul 27, 2012 at 2:16 AM, jjian fan <xi...@gmail.com> wrote:

>     In high cocurrent environment, the tcp server will drop some package
> when the tcp buffer is over. Then LogManager.createlog will create some
> no-exists topic log. But one thing is very strange, the log directory
> should be like a-0,a-1, a-2 and so on ,but file.mkdir() create log
> directory like a. Seems some bug in file.mkdir() of LogManager.createlog.
>
> the exception message is
>
> [2012-07-27 17:08:00,559] INFO create directory /data/kafka/axx-0
> (kafka.log.LogManager)
> [2012-07-27 17:08:00,561] ERROR Error processing MultiProducerRequest on
> axx:0 (kafka.server.KafkaRequestHandlers)
> java.io.FileNotFoundException: /data/kafka/axx-0/00000000000000000000.kafka
> (Is a directory)
> at java.io.RandomAccessFile.open(Native Method)
> at java.io.RandomAccessFile.<init>(RandomAccessFile.java:233)
> at kafka.utils.Utils$.openChannel(Utils.scala:324)
> at kafka.message.FileMessageSet.<init>(FileMessageSet.scala:75)
> at kafka.log.Log.loadSegments(Log.scala:144)
> at kafka.log.Log.<init>(Log.scala:116)
> at kafka.log.LogManager.createLog(LogManager.scala:159)
> at kafka.log.LogManager.getOrCreateLog(LogManager.scala:214)
> at
>
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:74)
> at
>
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> at
>
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> at
>
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> at
>
> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
> at
>
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> at
>
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> at kafka.network.Processor.handle(SocketServer.scala:296)
> at kafka.network.Processor.read(SocketServer.scala:319)
> at kafka.network.Processor.run(SocketServer.scala:214)
> at java.lang.Thread.run(Thread.java:679)
>