You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Steve Miller <st...@idrathernotsay.com> on 2014/08/13 02:42:32 UTC

Strange topic-corruption issue?

[ "Aha!", you say, "now I know why this guy's been doing so much tshark stuff!" (-: ] 

   Hi.  I'm running into a strange situation, in which more or less all of the topics on our Kafka server behave exactly as expected... but the data produced by one family of applications is producing fairly frequent topic corruption.

   When this happens, on the client side, the results are all over the place: sometimes you get a ConsumerFetchSizeTooSmall exception, or an exception for an unknown error type, or an invalid-offset error, it's all over the map.

   On the server side, I think something like this is the first sign of badness:

[2014-08-11 21:03:28,121] ERROR [KafkaApi-1] Error processing ProducerRequest with correlation id 6750 from client test-producer on partition [mytopic,9] (kafka.server.KafkaApis)
java.lang.ArrayIndexOutOfBoundsException
[2014-08-11 21:03:28,121] INFO [KafkaApi-1] Send the close connection response due to error handling produce request [clientId = test-producer, correlationId = 6750, topicAndPartition = [mytopic,9]] with Ack=0 (kafka.server.KafkaApis)

shortly thereafter, you begin to see oddness facing the clients:

[2014-08-11 21:17:58,132] ERROR [KafkaApi-1] Error when processing fetch request for partition [mytopic,9] offset 1327 from consumer with correlation id 87204 (kafka.server.KafkaApis)
java.lang.IllegalStateException: Invalid message size: 0
        at kafka.log.FileMessageSet.searchFor(FileMessageSet.scala:127)
        at kafka.log.LogSegment.translateOffset(LogSegment.scala:100)
        at kafka.log.LogSegment.read(LogSegment.scala:137)
        at kafka.log.Log.read(Log.scala:386)
        at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:530)
        at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:476)
        at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:471)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
        at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:471)
        at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:437)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:186)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
        at java.lang.Thread.run(Unknown Source)

If I go run the DumpLogSegments tool on the particular topic and partition that's generating the errors, I can see there's corruption in the log:

Non-secutive offsets in :/data/d3/kafka/log/mytopic-9/00000000000000000000.log
  1327 is followed by 1327

The only thing producing data to corrupted topics was also the only thing where snappy compression was turned on in the Java API being used by the producer (it's a Storm topology; we've had the same issue with one in Scala and with one that produces very similar data, but that was written in Java).  We turned that off, published to a different topic name (so it was created fresh), and had a couple of happy days where all was well.  Then we decided that all was well so we tried to go back to the original topic -- after we'd verified that all data had aged out of the logs for that topic.  And we started seeing errors again.  So we switched to a different topic again, let it be created, and also started seeing errors on that topic.

We have other producers, written in C and Java and python, which are working flawlessly, even though the size of the data they produce and the rate at which they produce it is much larger than what we're seeing with this one problematic producer.  We also have producers written in other languages that produce at very low rates, so it's (probably) not the sort of thing where the issue is masked by more frequent data production.

But in any case it looks like there's something the client can send that will corrupt the topic, which seems like something that shouldn't be able to happen.  I know there's at least some error checking for bad protocol requests, as I hacked a python client to produce some corrupt messages and saw an error response from the server.

I'm happy to supply more data but I'm not sure what would be useful.  I'm also fine with continuing to dig into this on my own but I'd reached a point where it'd be useful to know if anyone had seen something like this before.  I have a ton o' tcpdumps running and some tail -F greps running on the logs so that if we see that producer error again we can go find the corresponding tcpdump file and hopefully find the smoking gun.  (It turns out that the real-time tshark processing invocations I sent out earlier can get quite far behind; I had that running when the corruption occurred today, but the output processing was a full hour behind the current time, the packet-writing part of tshark was far ahead of the packet-analyzing part!)

Are there any particular log4j options I should turn on?  Is there a way to just enable trace logging for a specific topic?  Does trace logging print the contents of the message somewhere, not as something all nice and interpreted but as, say, a bag of hex digits?  I might end up rebuilding kafka and adding some very specialized logging just for this.

Kafka 0.8.1.1, JRE 1.6.0-71, Storm 0.9.1, RHEL6, in likely order of importance. (-:  Also, here's the topic description:

Topic:mytopic	PartitionCount:10	ReplicationFactor:1	Configs:
	Topic: mytopic	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
	Topic: mytopic	Partition: 1	Leader: 1	Replicas: 1	Isr: 1
	Topic: mytopic	Partition: 2	Leader: 0	Replicas: 0	Isr: 0
	Topic: mytopic	Partition: 3	Leader: 1	Replicas: 1	Isr: 1
	Topic: mytopic	Partition: 4	Leader: 0	Replicas: 0	Isr: 0
	Topic: mytopic	Partition: 5	Leader: 1	Replicas: 1	Isr: 1
	Topic: mytopic	Partition: 6	Leader: 0	Replicas: 0	Isr: 0
	Topic: mytopic	Partition: 7	Leader: 1	Replicas: 1	Isr: 1
	Topic: mytopic	Partition: 8	Leader: 0	Replicas: 0	Isr: 0
	Topic: mytopic	Partition: 9	Leader: 1	Replicas: 1	Isr: 1

(2 brokers, 1 ZK server, no obvious issues with delays or process restarts or disk errors or any of the other usual suspects.  One partition per filesystem.  But my gut says none of that's pertinent, it's a matter of which partition the producer happens to be publishing to when it sends garbage.)


	-Steve

Re: Strange topic-corruption issue?

Posted by Steve Miller <st...@idrathernotsay.com>.
   Odd -- I copied and pasted what you'd asked me to run:

/opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.log

and I just re-ran it and the output looks the same as what I'd put up for people to grab.

   I also ran:

/opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --deep-iteration --files 00000000000000000000.log

and the output is identical (I diffed it to be sure).

   The publisher is publishing with compression turned off, though; when we had it turned on, I remember seeing some references to Snappy in there.  I'd turned compression off on the theory that (at the time) we had one thing producing in a way that caused corruption and one thing that was producing using compression, and maybe they were linked (but turning compression off didn't help).

	-Steve

On Fri, Aug 15, 2014 at 04:00:33PM -0700, Jun Rao wrote:
> What's in there seems to be still the output for deep iteration. For
> shallow iteration, the compression codec for each message should be Snappy.
> 
> Thanks,
> 
> Jun
> 
> 
> On Fri, Aug 15, 2014 at 5:27 AM, Steve Miller <st...@idrathernotsay.com>
> wrote:
> 
> >    Oh, yeah, sorry about that.  I threw a copy of that up at:
> >
> >         https://newbie.idrathernotsay.com/full.txt.gz
> >
> > (you'll get a cert error, for the four times I put something on the home
> > website each year, I didn't feel like getting a real cert (-: ).
> >
> >    If that doesn't work I'm sure I can figure something else out.
> >
> >         -Steve
> >
> > On Thu, Aug 14, 2014 at 05:04:29PM -0700, Neha Narkhede wrote:
> > > Apache doesn't allow attachments. Could you send maybe a pastebin or
> > > something?
> >

Re: Strange topic-corruption issue?

Posted by Jun Rao <ju...@gmail.com>.
What's in there seems to be still the output for deep iteration. For
shallow iteration, the compression codec for each message should be Snappy.

Thanks,

Jun


On Fri, Aug 15, 2014 at 5:27 AM, Steve Miller <st...@idrathernotsay.com>
wrote:

>    Oh, yeah, sorry about that.  I threw a copy of that up at:
>
>         https://newbie.idrathernotsay.com/full.txt.gz
>
> (you'll get a cert error, for the four times I put something on the home
> website each year, I didn't feel like getting a real cert (-: ).
>
>    If that doesn't work I'm sure I can figure something else out.
>
>         -Steve
>
> On Thu, Aug 14, 2014 at 05:04:29PM -0700, Neha Narkhede wrote:
> > Apache doesn't allow attachments. Could you send maybe a pastebin or
> > something?
>

Re: Strange topic-corruption issue?

Posted by Steve Miller <st...@idrathernotsay.com>.
   Oh, yeah, sorry about that.  I threw a copy of that up at:

	https://newbie.idrathernotsay.com/full.txt.gz

(you'll get a cert error, for the four times I put something on the home website each year, I didn't feel like getting a real cert (-: ).

   If that doesn't work I'm sure I can figure something else out.

	-Steve

On Thu, Aug 14, 2014 at 05:04:29PM -0700, Neha Narkhede wrote:
> Apache doesn't allow attachments. Could you send maybe a pastebin or
> something?

Re: Strange topic-corruption issue?

Posted by Neha Narkhede <ne...@gmail.com>.
Apache doesn't allow attachments. Could you send maybe a pastebin or
something?


On Thu, Aug 14, 2014 at 2:11 PM, Steve Miller <st...@idrathernotsay.com>
wrote:

>    I've attached the full output.  The only other thing it produced was
> our old favorite:
>
> Non-secutive offsets in :/home/steve/mytopic-9/00000000000000000000.log
>   1327 is followed by 1327
>
>    For the first time, earlier today, we've seen this happen from one of
> our other producers; offhand I'm thinking that there's a race of some sort
> somewhere and the other producers aren't immune, they're just much much
> less likely to run into the issue.  The other possibility is that since
> those are all much higher-volume producers, maybe this has been happening
> with them before, but given the size of the log segments relative to the
> size of the data stream, the bad segment is rotated out in a few minutes --
> so there's less of a window for us to notice.
>
>     I changed the one producer who was consistently having issues so that
> it's now not publishing lots of small messages, each in its own
> single-message message set.  Instead it's batching, which seems like it
> might help if it's message-arrival-rate related or message-size related.
>  It hasn't failed since then but then again sometimes this runs OK for,
> well, just long enough to make me think I have it figured out.  Then it
> breaks again. (-:
>
>    Given that other kafka users don't seem to be having this sort of
> issue, and given that I'm out of ideas that aren't either "race condition
> in kafka that no one but us sees" or "Java versionitis", I'm thinking we
> should try to eliminate Java versionitis as a cause.  We were already
> planning on moving from Java 6 to Java 7 so we're dragging that forward,
> and hope to get that taken care of over the next few days.  If you have
> another idea, that's awesome, but if it is versionitis I'd hate to have
> wasted anyone's time but my own on it.
>
>    So we'll let you know if we see a change over the next few days,
> particularly once we get the new Java setup, uh, setup.
>
>    Thanks!
>
>         -Steve
>
> On Thu, Aug 14, 2014 at 11:22:08AM -0700, Jun Rao wrote:
> > What's the output of the following command?
> >
> >         /opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments
> > --files 00000000000000000000.log
>

Re: Strange topic-corruption issue?

Posted by Steve Miller <st...@idrathernotsay.com>.
   I've attached the full output.  The only other thing it produced was our old favorite:

Non-secutive offsets in :/home/steve/mytopic-9/00000000000000000000.log
  1327 is followed by 1327

   For the first time, earlier today, we've seen this happen from one of our other producers; offhand I'm thinking that there's a race of some sort somewhere and the other producers aren't immune, they're just much much less likely to run into the issue.  The other possibility is that since those are all much higher-volume producers, maybe this has been happening with them before, but given the size of the log segments relative to the size of the data stream, the bad segment is rotated out in a few minutes -- so there's less of a window for us to notice.

    I changed the one producer who was consistently having issues so that it's now not publishing lots of small messages, each in its own single-message message set.  Instead it's batching, which seems like it might help if it's message-arrival-rate related or message-size related.  It hasn't failed since then but then again sometimes this runs OK for, well, just long enough to make me think I have it figured out.  Then it breaks again. (-:

   Given that other kafka users don't seem to be having this sort of issue, and given that I'm out of ideas that aren't either "race condition in kafka that no one but us sees" or "Java versionitis", I'm thinking we should try to eliminate Java versionitis as a cause.  We were already planning on moving from Java 6 to Java 7 so we're dragging that forward, and hope to get that taken care of over the next few days.  If you have another idea, that's awesome, but if it is versionitis I'd hate to have wasted anyone's time but my own on it.

   So we'll let you know if we see a change over the next few days, particularly once we get the new Java setup, uh, setup.

   Thanks!

	-Steve

On Thu, Aug 14, 2014 at 11:22:08AM -0700, Jun Rao wrote:
> What's the output of the following command?
> 
>         /opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments
> --files 00000000000000000000.log

Re: Strange topic-corruption issue?

Posted by Jun Rao <ju...@gmail.com>.
What's the output of the following command?

        /opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments
--files 00000000000000000000.log

Thanks,

Jun


On Wed, Aug 13, 2014 at 11:40 AM, Steve Miller <st...@idrathernotsay.com>
wrote:

>    Sure.  I ran:
>
>         /opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments
> --files 00000000000000000000.log --deep-iteration
>
> and got (in addition to the same non-secutive offsets error):
>
> [ ... ]
>
> offset: 1320 position: 344293 isvalid: true payloadsize: 208 magic: 0
> compresscodec: NoCompressionCodec crc: 1038804751
> offset: 1321 position: 344527 isvalid: true payloadsize: 194 magic: 0
> compresscodec: NoCompressionCodec crc: 1211626571
> offset: 1322 position: 344747 isvalid: true payloadsize: 195 magic: 0
> compresscodec: NoCompressionCodec crc: 228214666
> offset: 1323 position: 344968 isvalid: true payloadsize: 285 magic: 0
> compresscodec: NoCompressionCodec crc: 2412118642
> offset: 1324 position: 345279 isvalid: true payloadsize: 267 magic: 0
> compresscodec: NoCompressionCodec crc: 814469229
> offset: 1325 position: 345572 isvalid: true payloadsize: 267 magic: 0
> compresscodec: NoCompressionCodec crc: 874964779
> offset: 1326 position: 345865 isvalid: true payloadsize: 143 magic: 0
> compresscodec: NoCompressionCodec crc: 1448343333
> offset: 1327 position: 346034 isvalid: true payloadsize: 161 magic: 0
> compresscodec: NoCompressionCodec crc: 3486482767
> offset: 1327 position: 346221 isvalid: true payloadsize: 194 magic: 0
> compresscodec: NoCompressionCodec crc: 3322604516
> offset: 1328 position: 346441 isvalid: true payloadsize: 207 magic: 0
> compresscodec: NoCompressionCodec crc: 3181460980
> offset: 1329 position: 346674 isvalid: true payloadsize: 164 magic: 0
> compresscodec: NoCompressionCodec crc: 77979807
> offset: 1330 position: 346864 isvalid: true payloadsize: 208 magic: 0
> compresscodec: NoCompressionCodec crc: 3051442612
> offset: 1331 position: 347098 isvalid: true payloadsize: 196 magic: 0
> compresscodec: NoCompressionCodec crc: 1906163219
> offset: 1332 position: 347320 isvalid: true payloadsize: 196 magic: 0
> compresscodec: NoCompressionCodec crc: 3849763639
> offset: 1333 position: 347542 isvalid: true payloadsize: 207 magic: 0
> compresscodec: NoCompressionCodec crc: 3724257965
> offset: 1334 position: 347775 isvalid: true payloadsize: 194 magic: 0
> compresscodec: NoCompressionCodec crc: 510173020
> offset: 1335 position: 347995 isvalid: true payloadsize: 357 magic: 0
> compresscodec: NoCompressionCodec crc: 2043065154
> offset: 1336 position: 348378 isvalid: true payloadsize: 195 magic: 0
> compresscodec: NoCompressionCodec crc: 435251578
> offset: 1337 position: 348599 isvalid: true payloadsize: 169 magic: 0
> compresscodec: NoCompressionCodec crc: 1172187172
> offset: 1338 position: 348794 isvalid: true payloadsize: 312 magic: 0
> compresscodec: NoCompressionCodec crc: 1324582122
> offset: 1339 position: 349132 isvalid: true payloadsize: 196 magic: 0
> compresscodec: NoCompressionCodec crc: 3649742340
> offset: 1340 position: 349354 isvalid: true payloadsize: 288 magic: 0
> compresscodec: NoCompressionCodec crc: 581177172
>
> (etc.)
>
> I also ran:
>
>          /opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments
> --files 00000000000000000000.index --deep-iteration
>
> At first, I got the following:
>
> Dumping 00000000000000000000.index
> offset: 16 position: 4342
> offset: 32 position: 8555
> offset: 48 position: 12676
> offset: 63 position: 16824
> offset: 79 position: 21256
> offset: 96 position: 25599
> offset: 112 position: 29740
> offset: 126 position: 33981
> offset: 143 position: 38122
> offset: 160 position: 42364
> offset: 176 position: 46589
> offset: 192 position: 50755
> offset: 208 position: 54969
> offset: 223 position: 59207
> offset: 239 position: 63317
> offset: 255 position: 67547
> offset: 272 position: 71771
> offset: 289 position: 76012
> offset: 306 position: 80476
> offset: 323 position: 84602
> offset: 337 position: 88876
> offset: 354 position: 93153
> offset: 371 position: 97329
> offset: 387 position: 101496
> offset: 403 position: 105657
> offset: 419 position: 109848
> offset: 434 position: 113950
> offset: 451 position: 118223
> offset: 465 position: 122366
> offset: 482 position: 126463
> offset: 499 position: 130707
> offset: 517 position: 135044
> offset: 533 position: 139505
> offset: 549 position: 143637
> offset: 566 position: 147916
> offset: 582 position: 152223
> offset: 599 position: 156528
> offset: 613 position: 160694
> offset: 629 position: 164807
> offset: 644 position: 169020
> offset: 662 position: 173449
> offset: 679 position: 177721
> offset: 695 position: 182003
> offset: 711 position: 186374
> offset: 728 position: 190644
> offset: 746 position: 195036
> offset: 762 position: 199231
> offset: 778 position: 203581
> offset: 794 position: 208024
> offset: 810 position: 212192
> offset: 825 position: 216446
> offset: 841 position: 220564
> offset: 858 position: 224718
> offset: 875 position: 228823
> offset: 890 position: 232983
> offset: 907 position: 237116
> offset: 920 position: 241229
> offset: 936 position: 245504
> offset: 951 position: 249601
> offset: 969 position: 253908
> offset: 986 position: 258074
> offset: 1002 position: 262228
> offset: 1018 position: 266385
> offset: 1035 position: 270699
> offset: 1051 position: 274843
> offset: 1067 position: 278954
> offset: 1085 position: 283283
> offset: 1102 position: 287632
> offset: 1118 position: 291971
> offset: 1135 position: 296271
> offset: 1152 position: 300722
> offset: 1168 position: 304924
> offset: 1184 position: 309051
> offset: 1201 position: 313349
> offset: 1217 position: 317543
> offset: 1233 position: 321727
> offset: 1249 position: 325877
> offset: 1266 position: 330122
> offset: 1282 position: 334413
> offset: 1298 position: 338579
> offset: 1313 position: 342717
> Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
>         at java.nio.HeapByteBuffer.<init>(Unknown Source)
>         at java.nio.ByteBuffer.allocate(Unknown Source)
>         at
> kafka.log.FileMessageSet$$anon$1.makeNext(FileMessageSet.scala:188)
>         at
> kafka.log.FileMessageSet$$anon$1.makeNext(FileMessageSet.scala:165)
>         at
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
>         at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
>         at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:38)
>         at scala.collection.IterableLike$class.head(IterableLike.scala:91)
>         at kafka.message.MessageSet.head(MessageSet.scala:67)
>         at
> kafka.tools.DumpLogSegments$$anonfun$kafka$tools$DumpLogSegments$$dumpIndex$1.apply$mcVI$sp(DumpLogSegments.scala:102)
>         at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
>         at
> kafka.tools.DumpLogSegments$.kafka$tools$DumpLogSegments$$dumpIndex(DumpLogSegments.scala:99)
>         at
> kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:68)
>         at
> kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:61)
>         at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>         at
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:105)
>         at kafka.tools.DumpLogSegments$.main(DumpLogSegments.scala:61)
>         at kafka.tools.DumpLogSegments.main(DumpLogSegments.scala)
>
> If I fiddled with KAFKA_HEAP_OPTS, I could change the error to:
>
> Exception in thread "main" java.util.NoSuchElementException
>         at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:39)
>         at scala.collection.IterableLike$class.head(IterableLike.scala:91)
>         at kafka.message.MessageSet.head(MessageSet.scala:67)
>         at
> kafka.tools.DumpLogSegments$$anonfun$kafka$tools$DumpLogSegments$$dumpIndex$1.apply$mcVI$sp(DumpLogSegments.scala:102)
>         at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
>         at
> kafka.tools.DumpLogSegments$.kafka$tools$DumpLogSegments$$dumpIndex(DumpLogSegments.scala:99)
>         at
> kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:68)
>         at
> kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:61)
>         at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>         at
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:105)
>         at kafka.tools.DumpLogSegments$.main(DumpLogSegments.scala:61)
>         at kafka.tools.DumpLogSegments.main(DumpLogSegments.scala)
>
> (bumping it to 1024M didn't do the trick, I still got the heap-space
> exception then, but going to 10240M gave it enough heap to fail this way
> (-: ).
>
>    Also BTW: there seems to be a bug in dumpIndex() in
> DumpLogSegments.scala, as it is unhappy if the topic name has a dot in it.
>  If you try to use DumpLogSegments to look at such a file (well, if it's in
> the "real" log directory, of course), if you topic was named X.Y, and
> you're in the directory X.Y-9, and you're looking at
> 00000000000000000000.index, you'll get an error like:
>
> Exception in thread "main" java.io.FileNotFoundException:
> /home/whatever/X.log (No such file or directory)
>         at java.io.FileInputStream.open(Native Method)
>         at java.io.FileInputStream.<init>(Unknown Source)
>         at kafka.utils.Utils$.openChannel(Utils.scala:157)
>         at kafka.log.FileMessageSet.<init>(FileMessageSet.scala:74)
>         at
> kafka.tools.DumpLogSegments$.kafka$tools$DumpLogSegments$$dumpIndex(DumpLogSegments.scala:97)
>         at
> kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:68)
>         at
> kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:61)
>         at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>         at
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:105)
>         at kafka.tools.DumpLogSegments$.main(DumpLogSegments.scala:61)
>         at kafka.tools.DumpLogSegments.main(DumpLogSegments.scala)
>
>
> because these both assume that there's only one dot on which to split:
>
>     val startOffset = file.getName().split("\\.")(0).toLong
>     val logFileName = file.getAbsolutePath.split("\\.")(0) +
> Log.LogFileSuffix
>
> (it's not bad to work around, copy or move the files to a directory
> without dots in it, and you're OK, but I did want to point it out).
>
>    Overnight, I did a ton of pcap captures, while having something watch
> the logging output looking for the same sort of ProducerRequest error I'd
> described below.  It's a lot of data to sort through and I'm still poking
> at it, but at first glance at least I could see the ProduceRequest that
> produced a similar error... and either it wasn't messed up or if it's
> messed up, it is messed up in a way that such that the Kafka protocol
> decoder doesn't see the issue.
>
>    For example, here's an error:
>
> [2014-08-13 00:28:11,232] ERROR [KafkaApi-0] Error processing
> ProducerRequest with correlation id 3484669 from client test-producer on
> partition [mytopic,4] (kafka.server.KafkaApis)
> java.lang.ArrayIndexOutOfBoundsException
> [2014-08-13 00:28:11,233] INFO [KafkaApi-0] Send the close connection
> response due to error handling produce request [clientId = test-producer,
> correlationId = 3484669, topicAndPartition = [mytopic,4]] with Ack=0
> (kafka.server.KafkaApis)
>
> and below is the ProduceRequest before the one that produced the error,
> then the one that produced the error, then the next one (note that the
> topic name has been changed but it is 55 bytes long).
>
>    One other interesting thing I see, though: for one of the times we saw
> this corruption, it looks like there was also some leader-election flappage
> going on (again, topic name changed to something innocuous).  Here's the
> error:
>
> [2014-08-13 04:45:41,750] ERROR [KafkaApi-0] Error processing
> ProducerRequest with correlation id 5016862 from client test-producer on
> partition [mytopic,8] (kafka.server.KafkaApis)
> java.lang.ArrayIndexOutOfBoundsException
> [2014-08-13 04:45:41,750] INFO [KafkaApi-0] Send the close connection
> response due to error handling produce request [clientId = test-producer,
> correlationId = 5016862, topicAndPartition = [mytopic,8]] with Ack=0
> (kafka.server.KafkaApis)
>
> and here's what I saw in state-change.log that was related to the same
> topic and partition:
>
> [2014-08-13 04:45:28,018] TRACE Controller 0 epoch 3 changed partition
> [mytopic,8] state from OnlinePartition to OfflinePartition
> (state.change.logger)
> [2014-08-13 04:45:28,021] TRACE Controller 0 epoch 3 started leader
> election for partition [mytopic,8] (state.change.logger)
> [2014-08-13 04:45:28,026] ERROR Controller 0 epoch 3 initiated state
> change for partition [mytopic,8] from OfflinePartition to OnlinePartition
> failed (state.change.logger)
> [2014-08-13 04:45:28,933] TRACE Controller 0 epoch 3 changed state of
> replica 1 for partition [mytopic,8] from OnlineReplica to OfflineReplica
> (state.change.logger)
> [2014-08-13 04:45:29,150] TRACE Controller 0 epoch 3 sending
> UpdateMetadata request (Leader:-1,ISR:,LeaderEpoch:89,ControllerEpoch:3)
> with correlationId 276 to broker 0 for partition [mytopic,8]
> (state.change.logger)
> [2014-08-13 04:45:29,155] TRACE Broker 0 cached leader info
> (LeaderAndIsrInfo:(Leader:-1,ISR:,LeaderEpoch:89,ControllerEpoch:3),ReplicationFactor:1),AllReplicas:1)
> for partition [mytopic,8] in response to UpdateMetadata request sent by
> cont roller 0 epoch 3 with correlation id 276 (state.change.logger)
> [2014-08-13 04:45:33,940] TRACE Controller 0 epoch 3 sending
> UpdateMetadata request (Leader:-1,ISR:,LeaderEpoch:89,ControllerEpoch:3)
> with correlationId 277 to broker 1 for partition [mytopic,8]
> (state.change.logger)
> [2014-08-13 04:45:33,940] TRACE Controller 0 epoch 3 sending
> UpdateMetadata request (Leader:0,ISR:0,LeaderEpoch:1,ControllerEpoch:3)
> with correlationId 277 to broker 1 for partition [mytopic,8]
> (state.change.logger)
> [2014-08-13 04:45:33,949] TRACE Controller 0 epoch 3 changed state of
> replica 1 for partition [mytopic,8] from OfflineReplica to OnlineReplica
> (state.change.logger)
> [2014-08-13 04:45:33,954] TRACE Controller 0 epoch 3 sending
> become-follower LeaderAndIsr request
> (Leader:-1,ISR:,LeaderEpoch:89,ControllerEpoch:3) with correlationId 278 to
> broker 1 for partition [mytopic,8] (state.change.logger)
> [2014-08-13 04:45:33,956] TRACE Controller 0 epoch 3 sending
> UpdateMetadata request (Leader:-1,ISR:,LeaderEpoch:89,ControllerEpoch:3)
> with correlationId 278 to broker 1 for partition [mytopic,8]
> (state.change.logger)
> [2014-08-13 04:45:33,959] TRACE Controller 0 epoch 3 sending
> UpdateMetadata request (Leader:-1,ISR:,LeaderEpoch:89,ControllerEpoch:3)
> with correlationId 278 to broker 0 for partition [mytopic,8]
> (state.change.logger)
> [2014-08-13 04:45:33,961] TRACE Controller 0 epoch 3 started leader
> election for partition [mytopic,8] (state.change.logger)
> [2014-08-13 04:45:33,963] TRACE Broker 0 cached leader info
> (LeaderAndIsrInfo:(Leader:-1,ISR:,LeaderEpoch:89,ControllerEpoch:3),ReplicationFactor:1),AllReplicas:1)
> for partition [mytopic,8] in response to UpdateMetadata request sent by
> controller 0 epoch 3 with correlation id 278 (state.change.logger)
> [2014-08-13 04:45:33,968] TRACE Controller 0 epoch 3 elected leader 1 for
> Offline partition [mytopic,8] (state.change.logger)
> [2014-08-13 04:45:33,969] TRACE Controller 0 epoch 3 changed partition
> [mytopic,8] from OfflinePartition to OnlinePartition with leader 1
> (state.change.logger)
> [2014-08-13 04:45:39,245] TRACE Controller 0 epoch 3 sending become-leader
> LeaderAndIsr request (Leader:1,ISR:1,LeaderEpoch:90,ControllerEpoch:3) with
> correlationId 279 to broker 1 for partition [mytopic,8]
> (state.change.logger)
> [2014-08-13 04:45:39,248] TRACE Controller 0 epoch 3 sending
> UpdateMetadata request (Leader:1,ISR:1,LeaderEpoch:90,ControllerEpoch:3)
> with correlationId 279 to broker 1 for partition [mytopic,8]
> (state.change.logger)
> [2014-08-13 04:45:39,251] TRACE Controller 0 epoch 3 sending
> UpdateMetadata request (Leader:1,ISR:1,LeaderEpoch:90,ControllerEpoch:3)
> with correlationId 279 to broker 0 for partition [mytopic,8]
> (state.change.logger)
> [2014-08-13 04:45:39,255] TRACE Broker 0 cached leader info
> (LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:90,ControllerEpoch:3),ReplicationFactor:1),AllReplicas:1)
> for partition [mytopic,8] in response to UpdateMetadata request sent by
> controller 0 epoch 3 with correlation id 279 (state.change.logger)
>
>    Strangely, though, I see another ProducerRequest error at around
> 09:08:11, but I don't see any obvious controller-related flappage at that
> time.  So maybe correlation (leader election at nearly the same time we see
> the error) doesn't imply causation.
>
>    I don't know.  This is the only producer that I can think of here that
> produces any significant number of messages (maybe a burst of 600ish, every
> 30sec or so) and that does so one at a time rather than by batching them.
>  Maybe there's an element here that's related to publishing lots of message
> sets with only one (small) message in them?
>
>    If there's anything else you want me to look at, and/or if you want me
> to reformat this somehow so it's more readable (I know it's a lot of stuff
> inline), please let me know.  Thanks!
>
>         -Steve
>
> =========== sample transaction data for one case of this sort of error
> Frame 8451: 516 bytes on wire (4128 bits), 516 bytes captured (4128 bits)
>     Encapsulation type: Ethernet (1)
>     Arrival Time: Aug 13, 2014 00:28:11.232408000 UTC
>     [Time shift for this packet: 0.000000000 seconds]
>     Epoch Time: 1407889691.232408000 seconds
>     [Time delta from previous captured frame: 0.000479000 seconds]
>     [Time delta from previous displayed frame: 0.000479000 seconds]
>     [Time since reference or first frame: 191.232312000 seconds]
>     Frame Number: 8451
>     Frame Length: 516 bytes (4128 bits)
>     Capture Length: 516 bytes (4128 bits)
>     [Frame is marked: False]
>     [Frame is ignored: False]
>     [Protocols in frame: eth:ethertype:ip:tcp:kafka]
>     [Number of per-protocol-data: 1]
>     [Kafka, key 0]
> Ethernet II, Src: 84:2b:2b:4e:47:b0 (84:2b:2b:4e:47:b0), Dst:
> 84:2b:2b:4e:7b:a1 (84:2b:2b:4e:7b:a1)
>     Destination: 84:2b:2b:4e:7b:a1 (84:2b:2b:4e:7b:a1)
>         Address: 84:2b:2b:4e:7b:a1 (84:2b:2b:4e:7b:a1)
>         .... ..0. .... .... .... .... = LG bit: Globally unique address
> (factory default)
>         .... ...0 .... .... .... .... = IG bit: Individual address
> (unicast)
>     Source: 84:2b:2b:4e:47:b0 (84:2b:2b:4e:47:b0)
>         Address: 84:2b:2b:4e:47:b0 (84:2b:2b:4e:47:b0)
>         .... ..0. .... .... .... .... = LG bit: Globally unique address
> (factory default)
>         .... ...0 .... .... .... .... = IG bit: Individual address
> (unicast)
>     Type: IP (0x0800)
> Internet Protocol Version 4, Src: 10.163.193.125 (10.163.193.125), Dst:
> 10.163.193.121 (10.163.193.121)
>     Version: 4
>     Header Length: 20 bytes
>     Differentiated Services Field: 0x00 (DSCP 0x00: Default; ECN: 0x00:
> Not-ECT (Not ECN-Capable Transport))
>         0000 00.. = Differentiated Services Codepoint: Default (0x00)
>         .... ..00 = Explicit Congestion Notification: Not-ECT (Not
> ECN-Capable Transport) (0x00)
>     Total Length: 502
>     Identification: 0x2b7a (11130)
>     Flags: 0x02 (Don't Fragment)
>         0... .... = Reserved bit: Not set
>         .1.. .... = Don't fragment: Set
>         ..0. .... = More fragments: Not set
>     Fragment offset: 0
>     Time to live: 64
>     Protocol: TCP (6)
>     Header checksum: 0x754b [validation disabled]
>         [Good: False]
>         [Bad: False]
>     Source: 10.163.193.125 (10.163.193.125)
>     Destination: 10.163.193.121 (10.163.193.121)
>     [Source GeoIP: Unknown]
>     [Destination GeoIP: Unknown]
> Transmission Control Protocol, Src Port: 33857 (33857), Dst Port: 9092
> (9092), Seq: 3091465, Ack: 1, Len: 450
>     Source Port: 33857 (33857)
>     Destination Port: 9092 (9092)
>     [Stream index: 14]
>     [TCP Segment Len: 450]
>     Sequence number: 3091465    (relative sequence number)
>     [Next sequence number: 3091915    (relative sequence number)]
>     Acknowledgment number: 1    (relative ack number)
>     Header Length: 32 bytes
>     .... 0000 0001 1000 = Flags: 0x018 (PSH, ACK)
>         000. .... .... = Reserved: Not set
>         ...0 .... .... = Nonce: Not set
>         .... 0... .... = Congestion Window Reduced (CWR): Not set
>         .... .0.. .... = ECN-Echo: Not set
>         .... ..0. .... = Urgent: Not set
>         .... ...1 .... = Acknowledgment: Set
>         .... .... 1... = Push: Set
>         .... .... .0.. = Reset: Not set
>         .... .... ..0. = Syn: Not set
>         .... .... ...0 = Fin: Not set
>     Window size value: 107
>     [Calculated window size: 13696]
>     [Window size scaling factor: 128]
>     Checksum: 0x10a1 [validation disabled]
>         [Good Checksum: False]
>         [Bad Checksum: False]
>     Urgent pointer: 0
>     Options: (12 bytes), No-Operation (NOP), No-Operation (NOP), Timestamps
>         No-Operation (NOP)
>             Type: 1
>                 0... .... = Copy on fragmentation: No
>                 .00. .... = Class: Control (0)
>                 ...0 0001 = Number: No-Operation (NOP) (1)
>         No-Operation (NOP)
>             Type: 1
>                 0... .... = Copy on fragmentation: No
>                 .00. .... = Class: Control (0)
>                 ...0 0001 = Number: No-Operation (NOP) (1)
>         Timestamps: TSval 2762073799, TSecr 2668832110
>             Kind: Time Stamp Option (8)
>             Length: 10
>             Timestamp value: 2762073799
>             Timestamp echo reply: 2668832110
>     [SEQ/ACK analysis]
>         [iRTT: 0.000199000 seconds]
>         [Bytes in flight: 450]
> Kafka
>     Length: 446
>     API Key: Produce (0)
>     API Version: 0
>     Correlation ID: 3484667
>     String Length: 13
>     Client ID: test-producer
>     Required Acks: 0
>     Timeout: 10000
>     Array Count: 1
>     Produce Request Topic
>         String Length: 55
>         Topic Name: mytopic
>         Array Count: 1
>         Produce Request Partition
>             Partition ID: 4
>             Message Set Size: 344
>             Message Set
>                 Offset: 0
>                 Message Size: 332
>                 Message
>                     CRC32: 0x81b88c04
>                     Magic Byte: 0
>                     .... ..00 = Compression Codec: None (0)
>                     Bytes Length: -1
>                     Key: <MISSING>
>                     Bytes Length: 318
>                     Value:
> 00b6c4d5be0a000a73646e73320008746b6f320012786e2d...
>
> Frame 8452: 465 bytes on wire (3720 bits), 465 bytes captured (3720 bits)
>     Encapsulation type: Ethernet (1)
>     Arrival Time: Aug 13, 2014 00:28:11.232797000 UTC
>     [Time shift for this packet: 0.000000000 seconds]
>     Epoch Time: 1407889691.232797000 seconds
>     [Time delta from previous captured frame: 0.000389000 seconds]
>     [Time delta from previous displayed frame: 0.000389000 seconds]
>     [Time since reference or first frame: 191.232701000 seconds]
>     Frame Number: 8452
>     Frame Length: 465 bytes (3720 bits)
>     Capture Length: 465 bytes (3720 bits)
>     [Frame is marked: False]
>     [Frame is ignored: False]
>     [Protocols in frame: eth:ethertype:ip:tcp:kafka]
>     [Number of per-protocol-data: 1]
>     [Kafka, key 0]
> Ethernet II, Src: 84:2b:2b:4e:47:b0 (84:2b:2b:4e:47:b0), Dst:
> 84:2b:2b:4e:7b:a1 (84:2b:2b:4e:7b:a1)
>     Destination: 84:2b:2b:4e:7b:a1 (84:2b:2b:4e:7b:a1)
>         Address: 84:2b:2b:4e:7b:a1 (84:2b:2b:4e:7b:a1)
>         .... ..0. .... .... .... .... = LG bit: Globally unique address
> (factory default)
>         .... ...0 .... .... .... .... = IG bit: Individual address
> (unicast)
>     Source: 84:2b:2b:4e:47:b0 (84:2b:2b:4e:47:b0)
>         Address: 84:2b:2b:4e:47:b0 (84:2b:2b:4e:47:b0)
>         .... ..0. .... .... .... .... = LG bit: Globally unique address
> (factory default)
>         .... ...0 .... .... .... .... = IG bit: Individual address
> (unicast)
>     Type: IP (0x0800)
> Internet Protocol Version 4, Src: 10.163.193.125 (10.163.193.125), Dst:
> 10.163.193.121 (10.163.193.121)
>     Version: 4
>     Header Length: 20 bytes
>     Differentiated Services Field: 0x00 (DSCP 0x00: Default; ECN: 0x00:
> Not-ECT (Not ECN-Capable Transport))
>         0000 00.. = Differentiated Services Codepoint: Default (0x00)
>         .... ..00 = Explicit Congestion Notification: Not-ECT (Not
> ECN-Capable Transport) (0x00)
>     Total Length: 451
>     Identification: 0x2b7b (11131)
>     Flags: 0x02 (Don't Fragment)
>         0... .... = Reserved bit: Not set
>         .1.. .... = Don't fragment: Set
>         ..0. .... = More fragments: Not set
>     Fragment offset: 0
>     Time to live: 64
>     Protocol: TCP (6)
>     Header checksum: 0x757d [validation disabled]
>         [Good: False]
>         [Bad: False]
>     Source: 10.163.193.125 (10.163.193.125)
>     Destination: 10.163.193.121 (10.163.193.121)
>     [Source GeoIP: Unknown]
>     [Destination GeoIP: Unknown]
> Transmission Control Protocol, Src Port: 33857 (33857), Dst Port: 9092
> (9092), Seq: 3091915, Ack: 1, Len: 399
>     Source Port: 33857 (33857)
>     Destination Port: 9092 (9092)
>     [Stream index: 14]
>     [TCP Segment Len: 399]
>     Sequence number: 3091915    (relative sequence number)
>     [Next sequence number: 3092314    (relative sequence number)]
>     Acknowledgment number: 1    (relative ack number)
>     Header Length: 32 bytes
>     .... 0000 0001 1000 = Flags: 0x018 (PSH, ACK)
>         000. .... .... = Reserved: Not set
>         ...0 .... .... = Nonce: Not set
>         .... 0... .... = Congestion Window Reduced (CWR): Not set
>         .... .0.. .... = ECN-Echo: Not set
>         .... ..0. .... = Urgent: Not set
>         .... ...1 .... = Acknowledgment: Set
>         .... .... 1... = Push: Set
>         .... .... .0.. = Reset: Not set
>         .... .... ..0. = Syn: Not set
>         .... .... ...0 = Fin: Not set
>     Window size value: 107
>     [Calculated window size: 13696]
>     [Window size scaling factor: 128]
>     Checksum: 0x6f34 [validation disabled]
>         [Good Checksum: False]
>         [Bad Checksum: False]
>     Urgent pointer: 0
>     Options: (12 bytes), No-Operation (NOP), No-Operation (NOP), Timestamps
>         No-Operation (NOP)
>             Type: 1
>                 0... .... = Copy on fragmentation: No
>                 .00. .... = Class: Control (0)
>                 ...0 0001 = Number: No-Operation (NOP) (1)
>         No-Operation (NOP)
>             Type: 1
>                 0... .... = Copy on fragmentation: No
>                 .00. .... = Class: Control (0)
>                 ...0 0001 = Number: No-Operation (NOP) (1)
>         Timestamps: TSval 2762073799, TSecr 2668832111
>             Kind: Time Stamp Option (8)
>             Length: 10
>             Timestamp value: 2762073799
>             Timestamp echo reply: 2668832111
>     [SEQ/ACK analysis]
>         [iRTT: 0.000199000 seconds]
>         [Bytes in flight: 399]
> Kafka
>     Length: 395
>     API Key: Produce (0)
>     API Version: 0
>     Correlation ID: 3484669
>     String Length: 13
>     Client ID: test-producer
>     Required Acks: 0
>     Timeout: 10000
>     Array Count: 1
>     Produce Request Topic
>         String Length: 55
>         Topic Name: mytopic
>         Array Count: 1
>         Produce Request Partition
>             Partition ID: 4
>             Message Set Size: 293
>             Message Set
>                 Offset: 0
>                 Message Size: 281
>                 Message
>                     CRC32: 0x80ce403c
>                     Magic Byte: 0
>                     .... ..00 = Compression Codec: None (0)
>                     Bytes Length: -1
>                     Key: <MISSING>
>                     Bytes Length: 267
>                     Value:
> 00b6c4d5be0a001061646e7363746c640008706172320006...
>
> Frame 8453: 354 bytes on wire (2832 bits), 354 bytes captured (2832 bits)
>     Encapsulation type: Ethernet (1)
>     Arrival Time: Aug 13, 2014 00:28:11.233223000 UTC
>     [Time shift for this packet: 0.000000000 seconds]
>     Epoch Time: 1407889691.233223000 seconds
>     [Time delta from previous captured frame: 0.000426000 seconds]
>     [Time delta from previous displayed frame: 0.000426000 seconds]
>     [Time since reference or first frame: 191.233127000 seconds]
>     Frame Number: 8453
>     Frame Length: 354 bytes (2832 bits)
>     Capture Length: 354 bytes (2832 bits)
>     [Frame is marked: False]
>     [Frame is ignored: False]
>     [Protocols in frame: eth:ethertype:ip:tcp:kafka]
>     [Number of per-protocol-data: 1]
>     [Kafka, key 0]
> Ethernet II, Src: 84:2b:2b:4e:47:b0 (84:2b:2b:4e:47:b0), Dst:
> 84:2b:2b:4e:7b:a1 (84:2b:2b:4e:7b:a1)
>     Destination: 84:2b:2b:4e:7b:a1 (84:2b:2b:4e:7b:a1)
>         Address: 84:2b:2b:4e:7b:a1 (84:2b:2b:4e:7b:a1)
>         .... ..0. .... .... .... .... = LG bit: Globally unique address
> (factory default)
>         .... ...0 .... .... .... .... = IG bit: Individual address
> (unicast)
>     Source: 84:2b:2b:4e:47:b0 (84:2b:2b:4e:47:b0)
>         Address: 84:2b:2b:4e:47:b0 (84:2b:2b:4e:47:b0)
>         .... ..0. .... .... .... .... = LG bit: Globally unique address
> (factory default)
>         .... ...0 .... .... .... .... = IG bit: Individual address
> (unicast)
>     Type: IP (0x0800)
> Internet Protocol Version 4, Src: 10.163.193.125 (10.163.193.125), Dst:
> 10.163.193.121 (10.163.193.121)
>     Version: 4
>     Header Length: 20 bytes
>     Differentiated Services Field: 0x00 (DSCP 0x00: Default; ECN: 0x00:
> Not-ECT (Not ECN-Capable Transport))
>         0000 00.. = Differentiated Services Codepoint: Default (0x00)
>         .... ..00 = Explicit Congestion Notification: Not-ECT (Not
> ECN-Capable Transport) (0x00)
>     Total Length: 340
>     Identification: 0x2b7c (11132)
>     Flags: 0x02 (Don't Fragment)
>         0... .... = Reserved bit: Not set
>         .1.. .... = Don't fragment: Set
>         ..0. .... = More fragments: Not set
>     Fragment offset: 0
>     Time to live: 64
>     Protocol: TCP (6)
>     Header checksum: 0x75eb [validation disabled]
>         [Good: False]
>         [Bad: False]
>     Source: 10.163.193.125 (10.163.193.125)
>     Destination: 10.163.193.121 (10.163.193.121)
>     [Source GeoIP: Unknown]
>     [Destination GeoIP: Unknown]
> Transmission Control Protocol, Src Port: 33857 (33857), Dst Port: 9092
> (9092), Seq: 3092314, Ack: 1, Len: 288
>     Source Port: 33857 (33857)
>     Destination Port: 9092 (9092)
>     [Stream index: 14]
>     [TCP Segment Len: 288]
>     Sequence number: 3092314    (relative sequence number)
>     [Next sequence number: 3092602    (relative sequence number)]
>     Acknowledgment number: 1    (relative ack number)
>     Header Length: 32 bytes
>     .... 0000 0001 1000 = Flags: 0x018 (PSH, ACK)
>         000. .... .... = Reserved: Not set
>         ...0 .... .... = Nonce: Not set
>         .... 0... .... = Congestion Window Reduced (CWR): Not set
>         .... .0.. .... = ECN-Echo: Not set
>         .... ..0. .... = Urgent: Not set
>         .... ...1 .... = Acknowledgment: Set
>         .... .... 1... = Push: Set
>         .... .... .0.. = Reset: Not set
>         .... .... ..0. = Syn: Not set
>         .... .... ...0 = Fin: Not set
>     Window size value: 107
>     [Calculated window size: 13696]
>     [Window size scaling factor: 128]
>     Checksum: 0xea49 [validation disabled]
>         [Good Checksum: False]
>         [Bad Checksum: False]
>     Urgent pointer: 0
>     Options: (12 bytes), No-Operation (NOP), No-Operation (NOP), Timestamps
>         No-Operation (NOP)
>             Type: 1
>                 0... .... = Copy on fragmentation: No
>                 .00. .... = Class: Control (0)
>                 ...0 0001 = Number: No-Operation (NOP) (1)
>         No-Operation (NOP)
>             Type: 1
>                 0... .... = Copy on fragmentation: No
>                 .00. .... = Class: Control (0)
>                 ...0 0001 = Number: No-Operation (NOP) (1)
>         Timestamps: TSval 2762073800, TSecr 2668832112
>             Kind: Time Stamp Option (8)
>             Length: 10
>             Timestamp value: 2762073800
>             Timestamp echo reply: 2668832112
>     [SEQ/ACK analysis]
>         [iRTT: 0.000199000 seconds]
>         [Bytes in flight: 288]
> Kafka
>     Length: 284
>     API Key: Produce (0)
>     API Version: 0
>     Correlation ID: 3484671
>     String Length: 13
>     Client ID: test-producer
>     Required Acks: 0
>     Timeout: 10000
>     Array Count: 1
>     Produce Request Topic
>         String Length: 55
>         Topic Name: mytopic
>         Array Count: 1
>         Produce Request Partition
>             Partition ID: 4
>             Message Set Size: 182
>             Message Set
>                 Offset: 0
>                 Message Size: 170
>                 Message
>                     CRC32: 0x0ece510e
>                     Magic Byte: 0
>                     .... ..00 = Compression Codec: None (0)
>                     Bytes Length: -1
>                     Key: <MISSING>
>                     Bytes Length: 156
>                     Value:
> 00b6c4d5be0a0008726f6f740008686b6735000c246a726f...
>
>
> On Wed, Aug 13, 2014 at 08:15:21AM -0700, Jun Rao wrote:
> > Interesting, could you run DumpLogSegments with and w/o deep-iteration
> and
> > send the output around offset 1327?
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Tue, Aug 12, 2014 at 5:42 PM, Steve Miller <st...@idrathernotsay.com>
> > wrote:
> >
> > > [ "Aha!", you say, "now I know why this guy's been doing so much tshark
> > > stuff!" (-: ]
> > >
> > >    Hi.  I'm running into a strange situation, in which more or less
> all of
> > > the topics on our Kafka server behave exactly as expected... but the
> data
> > > produced by one family of applications is producing fairly frequent
> topic
> > > corruption.
> > >
> > >    When this happens, on the client side, the results are all over the
> > > place: sometimes you get a ConsumerFetchSizeTooSmall exception, or an
> > > exception for an unknown error type, or an invalid-offset error, it's
> all
> > > over the map.
> > >
> > >    On the server side, I think something like this is the first sign of
> > > badness:
> > >
> > > [2014-08-11 21:03:28,121] ERROR [KafkaApi-1] Error processing
> > > ProducerRequest with correlation id 6750 from client test-producer on
> > > partition [mytopic,9] (kafka.server.KafkaApis)
> > > java.lang.ArrayIndexOutOfBoundsException
> > > [2014-08-11 21:03:28,121] INFO [KafkaApi-1] Send the close connection
> > > response due to error handling produce request [clientId =
> test-producer,
> > > correlationId = 6750, topicAndPartition = [mytopic,9]] with Ack=0
> > > (kafka.server.KafkaApis)
> > >
> > > shortly thereafter, you begin to see oddness facing the clients:
> > >
> > > [2014-08-11 21:17:58,132] ERROR [KafkaApi-1] Error when processing
> fetch
> > > request for partition [mytopic,9] offset 1327 from consumer with
> > > correlation id 87204 (kafka.server.KafkaApis)
> > > java.lang.IllegalStateException: Invalid message size: 0
> > >         at kafka.log.FileMessageSet.searchFor(FileMessageSet.scala:127)
> > >         at kafka.log.LogSegment.translateOffset(LogSegment.scala:100)
> > >         at kafka.log.LogSegment.read(LogSegment.scala:137)
> > >         at kafka.log.Log.read(Log.scala:386)
> > >         at
> > >
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:530)
> > >         at
> > >
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:476)
> > >         at
> > >
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:471)
> > >         at
> > >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> > >         at
> > >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> > >         at
> > > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
> > >         at
> > >
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
> > >         at
> > > scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> > >         at
> scala.collection.AbstractTraversable.map(Traversable.scala:105)
> > >         at
> > >
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:471)
> > >         at
> kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:437)
> > >         at kafka.server.KafkaApis.handle(KafkaApis.scala:186)
> > >         at
> > > kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
> > >         at java.lang.Thread.run(Unknown Source)
> > >
> > > If I go run the DumpLogSegments tool on the particular topic and
> partition
> > > that's generating the errors, I can see there's corruption in the log:
> > >
> > > Non-secutive offsets in
> > > :/data/d3/kafka/log/mytopic-9/00000000000000000000.log
> > >   1327 is followed by 1327
> > >
> > > The only thing producing data to corrupted topics was also the only
> thing
> > > where snappy compression was turned on in the Java API being used by
> the
> > > producer (it's a Storm topology; we've had the same issue with one in
> Scala
> > > and with one that produces very similar data, but that was written in
> > > Java).  We turned that off, published to a different topic name (so it
> was
> > > created fresh), and had a couple of happy days where all was well.
>  Then we
> > > decided that all was well so we tried to go back to the original topic
> --
> > > after we'd verified that all data had aged out of the logs for that
> topic.
> > >  And we started seeing errors again.  So we switched to a different
> topic
> > > again, let it be created, and also started seeing errors on that topic.
> > >
> > > We have other producers, written in C and Java and python, which are
> > > working flawlessly, even though the size of the data they produce and
> the
> > > rate at which they produce it is much larger than what we're seeing
> with
> > > this one problematic producer.  We also have producers written in other
> > > languages that produce at very low rates, so it's (probably) not the
> sort
> > > of thing where the issue is masked by more frequent data production.
> > >
> > > But in any case it looks like there's something the client can send
> that
> > > will corrupt the topic, which seems like something that shouldn't be
> able
> > > to happen.  I know there's at least some error checking for bad
> protocol
> > > requests, as I hacked a python client to produce some corrupt messages
> and
> > > saw an error response from the server.
> > >
> > > I'm happy to supply more data but I'm not sure what would be useful.
>  I'm
> > > also fine with continuing to dig into this on my own but I'd reached a
> > > point where it'd be useful to know if anyone had seen something like
> this
> > > before.  I have a ton o' tcpdumps running and some tail -F greps
> running on
> > > the logs so that if we see that producer error again we can go find the
> > > corresponding tcpdump file and hopefully find the smoking gun.  (It
> turns
> > > out that the real-time tshark processing invocations I sent out
> earlier can
> > > get quite far behind; I had that running when the corruption occurred
> > > today, but the output processing was a full hour behind the current
> time,
> > > the packet-writing part of tshark was far ahead of the packet-analyzing
> > > part!)
> > >
> > > Are there any particular log4j options I should turn on?  Is there a
> way
> > > to just enable trace logging for a specific topic?  Does trace logging
> > > print the contents of the message somewhere, not as something all nice
> and
> > > interpreted but as, say, a bag of hex digits?  I might end up
> rebuilding
> > > kafka and adding some very specialized logging just for this.
> > >
> > > Kafka 0.8.1.1, JRE 1.6.0-71, Storm 0.9.1, RHEL6, in likely order of
> > > importance. (-:  Also, here's the topic description:
> > >
> > > Topic:mytopic   PartitionCount:10       ReplicationFactor:1
> Configs:
> > >         Topic: mytopic  Partition: 0    Leader: 0       Replicas: 0
> > > Isr: 0
> > >         Topic: mytopic  Partition: 1    Leader: 1       Replicas: 1
> > > Isr: 1
> > >         Topic: mytopic  Partition: 2    Leader: 0       Replicas: 0
> > > Isr: 0
> > >         Topic: mytopic  Partition: 3    Leader: 1       Replicas: 1
> > > Isr: 1
> > >         Topic: mytopic  Partition: 4    Leader: 0       Replicas: 0
> > > Isr: 0
> > >         Topic: mytopic  Partition: 5    Leader: 1       Replicas: 1
> > > Isr: 1
> > >         Topic: mytopic  Partition: 6    Leader: 0       Replicas: 0
> > > Isr: 0
> > >         Topic: mytopic  Partition: 7    Leader: 1       Replicas: 1
> > > Isr: 1
> > >         Topic: mytopic  Partition: 8    Leader: 0       Replicas: 0
> > > Isr: 0
> > >         Topic: mytopic  Partition: 9    Leader: 1       Replicas: 1
> > > Isr: 1
> > >
> > > (2 brokers, 1 ZK server, no obvious issues with delays or process
> restarts
> > > or disk errors or any of the other usual suspects.  One partition per
> > > filesystem.  But my gut says none of that's pertinent, it's a matter of
> > > which partition the producer happens to be publishing to when it sends
> > > garbage.)
> > >
> > >
> > >         -Steve
> > >
>
>

Re: Strange topic-corruption issue?

Posted by Steve Miller <st...@idrathernotsay.com>.
   Sure.  I ran:

	/opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.log --deep-iteration

and got (in addition to the same non-secutive offsets error):

[ ... ]

offset: 1320 position: 344293 isvalid: true payloadsize: 208 magic: 0 compresscodec: NoCompressionCodec crc: 1038804751
offset: 1321 position: 344527 isvalid: true payloadsize: 194 magic: 0 compresscodec: NoCompressionCodec crc: 1211626571
offset: 1322 position: 344747 isvalid: true payloadsize: 195 magic: 0 compresscodec: NoCompressionCodec crc: 228214666
offset: 1323 position: 344968 isvalid: true payloadsize: 285 magic: 0 compresscodec: NoCompressionCodec crc: 2412118642
offset: 1324 position: 345279 isvalid: true payloadsize: 267 magic: 0 compresscodec: NoCompressionCodec crc: 814469229
offset: 1325 position: 345572 isvalid: true payloadsize: 267 magic: 0 compresscodec: NoCompressionCodec crc: 874964779
offset: 1326 position: 345865 isvalid: true payloadsize: 143 magic: 0 compresscodec: NoCompressionCodec crc: 1448343333
offset: 1327 position: 346034 isvalid: true payloadsize: 161 magic: 0 compresscodec: NoCompressionCodec crc: 3486482767
offset: 1327 position: 346221 isvalid: true payloadsize: 194 magic: 0 compresscodec: NoCompressionCodec crc: 3322604516
offset: 1328 position: 346441 isvalid: true payloadsize: 207 magic: 0 compresscodec: NoCompressionCodec crc: 3181460980
offset: 1329 position: 346674 isvalid: true payloadsize: 164 magic: 0 compresscodec: NoCompressionCodec crc: 77979807
offset: 1330 position: 346864 isvalid: true payloadsize: 208 magic: 0 compresscodec: NoCompressionCodec crc: 3051442612
offset: 1331 position: 347098 isvalid: true payloadsize: 196 magic: 0 compresscodec: NoCompressionCodec crc: 1906163219
offset: 1332 position: 347320 isvalid: true payloadsize: 196 magic: 0 compresscodec: NoCompressionCodec crc: 3849763639
offset: 1333 position: 347542 isvalid: true payloadsize: 207 magic: 0 compresscodec: NoCompressionCodec crc: 3724257965
offset: 1334 position: 347775 isvalid: true payloadsize: 194 magic: 0 compresscodec: NoCompressionCodec crc: 510173020
offset: 1335 position: 347995 isvalid: true payloadsize: 357 magic: 0 compresscodec: NoCompressionCodec crc: 2043065154
offset: 1336 position: 348378 isvalid: true payloadsize: 195 magic: 0 compresscodec: NoCompressionCodec crc: 435251578
offset: 1337 position: 348599 isvalid: true payloadsize: 169 magic: 0 compresscodec: NoCompressionCodec crc: 1172187172
offset: 1338 position: 348794 isvalid: true payloadsize: 312 magic: 0 compresscodec: NoCompressionCodec crc: 1324582122
offset: 1339 position: 349132 isvalid: true payloadsize: 196 magic: 0 compresscodec: NoCompressionCodec crc: 3649742340
offset: 1340 position: 349354 isvalid: true payloadsize: 288 magic: 0 compresscodec: NoCompressionCodec crc: 581177172

(etc.)

I also ran:

	 /opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.index --deep-iteration

At first, I got the following:

Dumping 00000000000000000000.index
offset: 16 position: 4342
offset: 32 position: 8555
offset: 48 position: 12676
offset: 63 position: 16824
offset: 79 position: 21256
offset: 96 position: 25599
offset: 112 position: 29740
offset: 126 position: 33981
offset: 143 position: 38122
offset: 160 position: 42364
offset: 176 position: 46589
offset: 192 position: 50755
offset: 208 position: 54969
offset: 223 position: 59207
offset: 239 position: 63317
offset: 255 position: 67547
offset: 272 position: 71771
offset: 289 position: 76012
offset: 306 position: 80476
offset: 323 position: 84602
offset: 337 position: 88876
offset: 354 position: 93153
offset: 371 position: 97329
offset: 387 position: 101496
offset: 403 position: 105657
offset: 419 position: 109848
offset: 434 position: 113950
offset: 451 position: 118223
offset: 465 position: 122366
offset: 482 position: 126463
offset: 499 position: 130707
offset: 517 position: 135044
offset: 533 position: 139505
offset: 549 position: 143637
offset: 566 position: 147916
offset: 582 position: 152223
offset: 599 position: 156528
offset: 613 position: 160694
offset: 629 position: 164807
offset: 644 position: 169020
offset: 662 position: 173449
offset: 679 position: 177721
offset: 695 position: 182003
offset: 711 position: 186374
offset: 728 position: 190644
offset: 746 position: 195036
offset: 762 position: 199231
offset: 778 position: 203581
offset: 794 position: 208024
offset: 810 position: 212192
offset: 825 position: 216446
offset: 841 position: 220564
offset: 858 position: 224718
offset: 875 position: 228823
offset: 890 position: 232983
offset: 907 position: 237116
offset: 920 position: 241229
offset: 936 position: 245504
offset: 951 position: 249601
offset: 969 position: 253908
offset: 986 position: 258074
offset: 1002 position: 262228
offset: 1018 position: 266385
offset: 1035 position: 270699
offset: 1051 position: 274843
offset: 1067 position: 278954
offset: 1085 position: 283283
offset: 1102 position: 287632
offset: 1118 position: 291971
offset: 1135 position: 296271
offset: 1152 position: 300722
offset: 1168 position: 304924
offset: 1184 position: 309051
offset: 1201 position: 313349
offset: 1217 position: 317543
offset: 1233 position: 321727
offset: 1249 position: 325877
offset: 1266 position: 330122
offset: 1282 position: 334413
offset: 1298 position: 338579
offset: 1313 position: 342717
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
	at java.nio.HeapByteBuffer.<init>(Unknown Source)
	at java.nio.ByteBuffer.allocate(Unknown Source)
	at kafka.log.FileMessageSet$$anon$1.makeNext(FileMessageSet.scala:188)
	at kafka.log.FileMessageSet$$anon$1.makeNext(FileMessageSet.scala:165)
	at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
	at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
	at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:38)
	at scala.collection.IterableLike$class.head(IterableLike.scala:91)
	at kafka.message.MessageSet.head(MessageSet.scala:67)
	at kafka.tools.DumpLogSegments$$anonfun$kafka$tools$DumpLogSegments$$dumpIndex$1.apply$mcVI$sp(DumpLogSegments.scala:102)
	at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
	at kafka.tools.DumpLogSegments$.kafka$tools$DumpLogSegments$$dumpIndex(DumpLogSegments.scala:99)
	at kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:68)
	at kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:61)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:105)
	at kafka.tools.DumpLogSegments$.main(DumpLogSegments.scala:61)
	at kafka.tools.DumpLogSegments.main(DumpLogSegments.scala)

If I fiddled with KAFKA_HEAP_OPTS, I could change the error to:

Exception in thread "main" java.util.NoSuchElementException
	at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:39)
	at scala.collection.IterableLike$class.head(IterableLike.scala:91)
	at kafka.message.MessageSet.head(MessageSet.scala:67)
	at kafka.tools.DumpLogSegments$$anonfun$kafka$tools$DumpLogSegments$$dumpIndex$1.apply$mcVI$sp(DumpLogSegments.scala:102)
	at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
	at kafka.tools.DumpLogSegments$.kafka$tools$DumpLogSegments$$dumpIndex(DumpLogSegments.scala:99)
	at kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:68)
	at kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:61)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:105)
	at kafka.tools.DumpLogSegments$.main(DumpLogSegments.scala:61)
	at kafka.tools.DumpLogSegments.main(DumpLogSegments.scala)

(bumping it to 1024M didn't do the trick, I still got the heap-space exception then, but going to 10240M gave it enough heap to fail this way (-: ).

   Also BTW: there seems to be a bug in dumpIndex() in DumpLogSegments.scala, as it is unhappy if the topic name has a dot in it.  If you try to use DumpLogSegments to look at such a file (well, if it's in the "real" log directory, of course), if you topic was named X.Y, and you're in the directory X.Y-9, and you're looking at 00000000000000000000.index, you'll get an error like:

Exception in thread "main" java.io.FileNotFoundException: /home/whatever/X.log (No such file or directory)
	at java.io.FileInputStream.open(Native Method)
	at java.io.FileInputStream.<init>(Unknown Source)
	at kafka.utils.Utils$.openChannel(Utils.scala:157)
	at kafka.log.FileMessageSet.<init>(FileMessageSet.scala:74)
	at kafka.tools.DumpLogSegments$.kafka$tools$DumpLogSegments$$dumpIndex(DumpLogSegments.scala:97)
	at kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:68)
	at kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:61)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:105)
	at kafka.tools.DumpLogSegments$.main(DumpLogSegments.scala:61)
	at kafka.tools.DumpLogSegments.main(DumpLogSegments.scala)


because these both assume that there's only one dot on which to split:

    val startOffset = file.getName().split("\\.")(0).toLong
    val logFileName = file.getAbsolutePath.split("\\.")(0) + Log.LogFileSuffix

(it's not bad to work around, copy or move the files to a directory without dots in it, and you're OK, but I did want to point it out).

   Overnight, I did a ton of pcap captures, while having something watch the logging output looking for the same sort of ProducerRequest error I'd described below.  It's a lot of data to sort through and I'm still poking at it, but at first glance at least I could see the ProduceRequest that produced a similar error... and either it wasn't messed up or if it's messed up, it is messed up in a way that such that the Kafka protocol decoder doesn't see the issue.

   For example, here's an error:

[2014-08-13 00:28:11,232] ERROR [KafkaApi-0] Error processing ProducerRequest with correlation id 3484669 from client test-producer on partition [mytopic,4] (kafka.server.KafkaApis)
java.lang.ArrayIndexOutOfBoundsException
[2014-08-13 00:28:11,233] INFO [KafkaApi-0] Send the close connection response due to error handling produce request [clientId = test-producer, correlationId = 3484669, topicAndPartition = [mytopic,4]] with Ack=0 (kafka.server.KafkaApis)

and below is the ProduceRequest before the one that produced the error, then the one that produced the error, then the next one (note that the topic name has been changed but it is 55 bytes long).

   One other interesting thing I see, though: for one of the times we saw this corruption, it looks like there was also some leader-election flappage going on (again, topic name changed to something innocuous).  Here's the error:

[2014-08-13 04:45:41,750] ERROR [KafkaApi-0] Error processing ProducerRequest with correlation id 5016862 from client test-producer on partition [mytopic,8] (kafka.server.KafkaApis)
java.lang.ArrayIndexOutOfBoundsException
[2014-08-13 04:45:41,750] INFO [KafkaApi-0] Send the close connection response due to error handling produce request [clientId = test-producer, correlationId = 5016862, topicAndPartition = [mytopic,8]] with Ack=0 (kafka.server.KafkaApis)

and here's what I saw in state-change.log that was related to the same topic and partition:

[2014-08-13 04:45:28,018] TRACE Controller 0 epoch 3 changed partition [mytopic,8] state from OnlinePartition to OfflinePartition (state.change.logger)
[2014-08-13 04:45:28,021] TRACE Controller 0 epoch 3 started leader election for partition [mytopic,8] (state.change.logger)
[2014-08-13 04:45:28,026] ERROR Controller 0 epoch 3 initiated state change for partition [mytopic,8] from OfflinePartition to OnlinePartition failed (state.change.logger)
[2014-08-13 04:45:28,933] TRACE Controller 0 epoch 3 changed state of replica 1 for partition [mytopic,8] from OnlineReplica to OfflineReplica (state.change.logger)
[2014-08-13 04:45:29,150] TRACE Controller 0 epoch 3 sending UpdateMetadata request (Leader:-1,ISR:,LeaderEpoch:89,ControllerEpoch:3) with correlationId 276 to broker 0 for partition [mytopic,8] (state.change.logger)
[2014-08-13 04:45:29,155] TRACE Broker 0 cached leader info (LeaderAndIsrInfo:(Leader:-1,ISR:,LeaderEpoch:89,ControllerEpoch:3),ReplicationFactor:1),AllReplicas:1) for partition [mytopic,8] in response to UpdateMetadata request sent by cont roller 0 epoch 3 with correlation id 276 (state.change.logger)
[2014-08-13 04:45:33,940] TRACE Controller 0 epoch 3 sending UpdateMetadata request (Leader:-1,ISR:,LeaderEpoch:89,ControllerEpoch:3) with correlationId 277 to broker 1 for partition [mytopic,8] (state.change.logger)
[2014-08-13 04:45:33,940] TRACE Controller 0 epoch 3 sending UpdateMetadata request (Leader:0,ISR:0,LeaderEpoch:1,ControllerEpoch:3) with correlationId 277 to broker 1 for partition [mytopic,8] (state.change.logger)
[2014-08-13 04:45:33,949] TRACE Controller 0 epoch 3 changed state of replica 1 for partition [mytopic,8] from OfflineReplica to OnlineReplica (state.change.logger)
[2014-08-13 04:45:33,954] TRACE Controller 0 epoch 3 sending become-follower LeaderAndIsr request (Leader:-1,ISR:,LeaderEpoch:89,ControllerEpoch:3) with correlationId 278 to broker 1 for partition [mytopic,8] (state.change.logger)
[2014-08-13 04:45:33,956] TRACE Controller 0 epoch 3 sending UpdateMetadata request (Leader:-1,ISR:,LeaderEpoch:89,ControllerEpoch:3) with correlationId 278 to broker 1 for partition [mytopic,8] (state.change.logger)
[2014-08-13 04:45:33,959] TRACE Controller 0 epoch 3 sending UpdateMetadata request (Leader:-1,ISR:,LeaderEpoch:89,ControllerEpoch:3) with correlationId 278 to broker 0 for partition [mytopic,8] (state.change.logger)
[2014-08-13 04:45:33,961] TRACE Controller 0 epoch 3 started leader election for partition [mytopic,8] (state.change.logger)
[2014-08-13 04:45:33,963] TRACE Broker 0 cached leader info (LeaderAndIsrInfo:(Leader:-1,ISR:,LeaderEpoch:89,ControllerEpoch:3),ReplicationFactor:1),AllReplicas:1) for partition [mytopic,8] in response to UpdateMetadata request sent by controller 0 epoch 3 with correlation id 278 (state.change.logger)
[2014-08-13 04:45:33,968] TRACE Controller 0 epoch 3 elected leader 1 for Offline partition [mytopic,8] (state.change.logger)
[2014-08-13 04:45:33,969] TRACE Controller 0 epoch 3 changed partition [mytopic,8] from OfflinePartition to OnlinePartition with leader 1 (state.change.logger)
[2014-08-13 04:45:39,245] TRACE Controller 0 epoch 3 sending become-leader LeaderAndIsr request (Leader:1,ISR:1,LeaderEpoch:90,ControllerEpoch:3) with correlationId 279 to broker 1 for partition [mytopic,8] (state.change.logger)
[2014-08-13 04:45:39,248] TRACE Controller 0 epoch 3 sending UpdateMetadata request (Leader:1,ISR:1,LeaderEpoch:90,ControllerEpoch:3) with correlationId 279 to broker 1 for partition [mytopic,8] (state.change.logger)
[2014-08-13 04:45:39,251] TRACE Controller 0 epoch 3 sending UpdateMetadata request (Leader:1,ISR:1,LeaderEpoch:90,ControllerEpoch:3) with correlationId 279 to broker 0 for partition [mytopic,8] (state.change.logger)
[2014-08-13 04:45:39,255] TRACE Broker 0 cached leader info (LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:90,ControllerEpoch:3),ReplicationFactor:1),AllReplicas:1) for partition [mytopic,8] in response to UpdateMetadata request sent by controller 0 epoch 3 with correlation id 279 (state.change.logger)

   Strangely, though, I see another ProducerRequest error at around 09:08:11, but I don't see any obvious controller-related flappage at that time.  So maybe correlation (leader election at nearly the same time we see the error) doesn't imply causation.

   I don't know.  This is the only producer that I can think of here that produces any significant number of messages (maybe a burst of 600ish, every 30sec or so) and that does so one at a time rather than by batching them.  Maybe there's an element here that's related to publishing lots of message sets with only one (small) message in them?

   If there's anything else you want me to look at, and/or if you want me to reformat this somehow so it's more readable (I know it's a lot of stuff inline), please let me know.  Thanks!

	-Steve

=========== sample transaction data for one case of this sort of error
Frame 8451: 516 bytes on wire (4128 bits), 516 bytes captured (4128 bits)
    Encapsulation type: Ethernet (1)
    Arrival Time: Aug 13, 2014 00:28:11.232408000 UTC
    [Time shift for this packet: 0.000000000 seconds]
    Epoch Time: 1407889691.232408000 seconds
    [Time delta from previous captured frame: 0.000479000 seconds]
    [Time delta from previous displayed frame: 0.000479000 seconds]
    [Time since reference or first frame: 191.232312000 seconds]
    Frame Number: 8451
    Frame Length: 516 bytes (4128 bits)
    Capture Length: 516 bytes (4128 bits)
    [Frame is marked: False]
    [Frame is ignored: False]
    [Protocols in frame: eth:ethertype:ip:tcp:kafka]
    [Number of per-protocol-data: 1]
    [Kafka, key 0]
Ethernet II, Src: 84:2b:2b:4e:47:b0 (84:2b:2b:4e:47:b0), Dst: 84:2b:2b:4e:7b:a1 (84:2b:2b:4e:7b:a1)
    Destination: 84:2b:2b:4e:7b:a1 (84:2b:2b:4e:7b:a1)
        Address: 84:2b:2b:4e:7b:a1 (84:2b:2b:4e:7b:a1)
        .... ..0. .... .... .... .... = LG bit: Globally unique address (factory default)
        .... ...0 .... .... .... .... = IG bit: Individual address (unicast)
    Source: 84:2b:2b:4e:47:b0 (84:2b:2b:4e:47:b0)
        Address: 84:2b:2b:4e:47:b0 (84:2b:2b:4e:47:b0)
        .... ..0. .... .... .... .... = LG bit: Globally unique address (factory default)
        .... ...0 .... .... .... .... = IG bit: Individual address (unicast)
    Type: IP (0x0800)
Internet Protocol Version 4, Src: 10.163.193.125 (10.163.193.125), Dst: 10.163.193.121 (10.163.193.121)
    Version: 4
    Header Length: 20 bytes
    Differentiated Services Field: 0x00 (DSCP 0x00: Default; ECN: 0x00: Not-ECT (Not ECN-Capable Transport))
        0000 00.. = Differentiated Services Codepoint: Default (0x00)
        .... ..00 = Explicit Congestion Notification: Not-ECT (Not ECN-Capable Transport) (0x00)
    Total Length: 502
    Identification: 0x2b7a (11130)
    Flags: 0x02 (Don't Fragment)
        0... .... = Reserved bit: Not set
        .1.. .... = Don't fragment: Set
        ..0. .... = More fragments: Not set
    Fragment offset: 0
    Time to live: 64
    Protocol: TCP (6)
    Header checksum: 0x754b [validation disabled]
        [Good: False]
        [Bad: False]
    Source: 10.163.193.125 (10.163.193.125)
    Destination: 10.163.193.121 (10.163.193.121)
    [Source GeoIP: Unknown]
    [Destination GeoIP: Unknown]
Transmission Control Protocol, Src Port: 33857 (33857), Dst Port: 9092 (9092), Seq: 3091465, Ack: 1, Len: 450
    Source Port: 33857 (33857)
    Destination Port: 9092 (9092)
    [Stream index: 14]
    [TCP Segment Len: 450]
    Sequence number: 3091465    (relative sequence number)
    [Next sequence number: 3091915    (relative sequence number)]
    Acknowledgment number: 1    (relative ack number)
    Header Length: 32 bytes
    .... 0000 0001 1000 = Flags: 0x018 (PSH, ACK)
        000. .... .... = Reserved: Not set
        ...0 .... .... = Nonce: Not set
        .... 0... .... = Congestion Window Reduced (CWR): Not set
        .... .0.. .... = ECN-Echo: Not set
        .... ..0. .... = Urgent: Not set
        .... ...1 .... = Acknowledgment: Set
        .... .... 1... = Push: Set
        .... .... .0.. = Reset: Not set
        .... .... ..0. = Syn: Not set
        .... .... ...0 = Fin: Not set
    Window size value: 107
    [Calculated window size: 13696]
    [Window size scaling factor: 128]
    Checksum: 0x10a1 [validation disabled]
        [Good Checksum: False]
        [Bad Checksum: False]
    Urgent pointer: 0
    Options: (12 bytes), No-Operation (NOP), No-Operation (NOP), Timestamps
        No-Operation (NOP)
            Type: 1
                0... .... = Copy on fragmentation: No
                .00. .... = Class: Control (0)
                ...0 0001 = Number: No-Operation (NOP) (1)
        No-Operation (NOP)
            Type: 1
                0... .... = Copy on fragmentation: No
                .00. .... = Class: Control (0)
                ...0 0001 = Number: No-Operation (NOP) (1)
        Timestamps: TSval 2762073799, TSecr 2668832110
            Kind: Time Stamp Option (8)
            Length: 10
            Timestamp value: 2762073799
            Timestamp echo reply: 2668832110
    [SEQ/ACK analysis]
        [iRTT: 0.000199000 seconds]
        [Bytes in flight: 450]
Kafka
    Length: 446
    API Key: Produce (0)
    API Version: 0
    Correlation ID: 3484667
    String Length: 13
    Client ID: test-producer
    Required Acks: 0
    Timeout: 10000
    Array Count: 1
    Produce Request Topic
        String Length: 55
        Topic Name: mytopic
        Array Count: 1
        Produce Request Partition
            Partition ID: 4
            Message Set Size: 344
            Message Set
                Offset: 0
                Message Size: 332
                Message
                    CRC32: 0x81b88c04
                    Magic Byte: 0
                    .... ..00 = Compression Codec: None (0)
                    Bytes Length: -1
                    Key: <MISSING>
                    Bytes Length: 318
                    Value: 00b6c4d5be0a000a73646e73320008746b6f320012786e2d...

Frame 8452: 465 bytes on wire (3720 bits), 465 bytes captured (3720 bits)
    Encapsulation type: Ethernet (1)
    Arrival Time: Aug 13, 2014 00:28:11.232797000 UTC
    [Time shift for this packet: 0.000000000 seconds]
    Epoch Time: 1407889691.232797000 seconds
    [Time delta from previous captured frame: 0.000389000 seconds]
    [Time delta from previous displayed frame: 0.000389000 seconds]
    [Time since reference or first frame: 191.232701000 seconds]
    Frame Number: 8452
    Frame Length: 465 bytes (3720 bits)
    Capture Length: 465 bytes (3720 bits)
    [Frame is marked: False]
    [Frame is ignored: False]
    [Protocols in frame: eth:ethertype:ip:tcp:kafka]
    [Number of per-protocol-data: 1]
    [Kafka, key 0]
Ethernet II, Src: 84:2b:2b:4e:47:b0 (84:2b:2b:4e:47:b0), Dst: 84:2b:2b:4e:7b:a1 (84:2b:2b:4e:7b:a1)
    Destination: 84:2b:2b:4e:7b:a1 (84:2b:2b:4e:7b:a1)
        Address: 84:2b:2b:4e:7b:a1 (84:2b:2b:4e:7b:a1)
        .... ..0. .... .... .... .... = LG bit: Globally unique address (factory default)
        .... ...0 .... .... .... .... = IG bit: Individual address (unicast)
    Source: 84:2b:2b:4e:47:b0 (84:2b:2b:4e:47:b0)
        Address: 84:2b:2b:4e:47:b0 (84:2b:2b:4e:47:b0)
        .... ..0. .... .... .... .... = LG bit: Globally unique address (factory default)
        .... ...0 .... .... .... .... = IG bit: Individual address (unicast)
    Type: IP (0x0800)
Internet Protocol Version 4, Src: 10.163.193.125 (10.163.193.125), Dst: 10.163.193.121 (10.163.193.121)
    Version: 4
    Header Length: 20 bytes
    Differentiated Services Field: 0x00 (DSCP 0x00: Default; ECN: 0x00: Not-ECT (Not ECN-Capable Transport))
        0000 00.. = Differentiated Services Codepoint: Default (0x00)
        .... ..00 = Explicit Congestion Notification: Not-ECT (Not ECN-Capable Transport) (0x00)
    Total Length: 451
    Identification: 0x2b7b (11131)
    Flags: 0x02 (Don't Fragment)
        0... .... = Reserved bit: Not set
        .1.. .... = Don't fragment: Set
        ..0. .... = More fragments: Not set
    Fragment offset: 0
    Time to live: 64
    Protocol: TCP (6)
    Header checksum: 0x757d [validation disabled]
        [Good: False]
        [Bad: False]
    Source: 10.163.193.125 (10.163.193.125)
    Destination: 10.163.193.121 (10.163.193.121)
    [Source GeoIP: Unknown]
    [Destination GeoIP: Unknown]
Transmission Control Protocol, Src Port: 33857 (33857), Dst Port: 9092 (9092), Seq: 3091915, Ack: 1, Len: 399
    Source Port: 33857 (33857)
    Destination Port: 9092 (9092)
    [Stream index: 14]
    [TCP Segment Len: 399]
    Sequence number: 3091915    (relative sequence number)
    [Next sequence number: 3092314    (relative sequence number)]
    Acknowledgment number: 1    (relative ack number)
    Header Length: 32 bytes
    .... 0000 0001 1000 = Flags: 0x018 (PSH, ACK)
        000. .... .... = Reserved: Not set
        ...0 .... .... = Nonce: Not set
        .... 0... .... = Congestion Window Reduced (CWR): Not set
        .... .0.. .... = ECN-Echo: Not set
        .... ..0. .... = Urgent: Not set
        .... ...1 .... = Acknowledgment: Set
        .... .... 1... = Push: Set
        .... .... .0.. = Reset: Not set
        .... .... ..0. = Syn: Not set
        .... .... ...0 = Fin: Not set
    Window size value: 107
    [Calculated window size: 13696]
    [Window size scaling factor: 128]
    Checksum: 0x6f34 [validation disabled]
        [Good Checksum: False]
        [Bad Checksum: False]
    Urgent pointer: 0
    Options: (12 bytes), No-Operation (NOP), No-Operation (NOP), Timestamps
        No-Operation (NOP)
            Type: 1
                0... .... = Copy on fragmentation: No
                .00. .... = Class: Control (0)
                ...0 0001 = Number: No-Operation (NOP) (1)
        No-Operation (NOP)
            Type: 1
                0... .... = Copy on fragmentation: No
                .00. .... = Class: Control (0)
                ...0 0001 = Number: No-Operation (NOP) (1)
        Timestamps: TSval 2762073799, TSecr 2668832111
            Kind: Time Stamp Option (8)
            Length: 10
            Timestamp value: 2762073799
            Timestamp echo reply: 2668832111
    [SEQ/ACK analysis]
        [iRTT: 0.000199000 seconds]
        [Bytes in flight: 399]
Kafka
    Length: 395
    API Key: Produce (0)
    API Version: 0
    Correlation ID: 3484669
    String Length: 13
    Client ID: test-producer
    Required Acks: 0
    Timeout: 10000
    Array Count: 1
    Produce Request Topic
        String Length: 55
        Topic Name: mytopic
        Array Count: 1
        Produce Request Partition
            Partition ID: 4
            Message Set Size: 293
            Message Set
                Offset: 0
                Message Size: 281
                Message
                    CRC32: 0x80ce403c
                    Magic Byte: 0
                    .... ..00 = Compression Codec: None (0)
                    Bytes Length: -1
                    Key: <MISSING>
                    Bytes Length: 267
                    Value: 00b6c4d5be0a001061646e7363746c640008706172320006...

Frame 8453: 354 bytes on wire (2832 bits), 354 bytes captured (2832 bits)
    Encapsulation type: Ethernet (1)
    Arrival Time: Aug 13, 2014 00:28:11.233223000 UTC
    [Time shift for this packet: 0.000000000 seconds]
    Epoch Time: 1407889691.233223000 seconds
    [Time delta from previous captured frame: 0.000426000 seconds]
    [Time delta from previous displayed frame: 0.000426000 seconds]
    [Time since reference or first frame: 191.233127000 seconds]
    Frame Number: 8453
    Frame Length: 354 bytes (2832 bits)
    Capture Length: 354 bytes (2832 bits)
    [Frame is marked: False]
    [Frame is ignored: False]
    [Protocols in frame: eth:ethertype:ip:tcp:kafka]
    [Number of per-protocol-data: 1]
    [Kafka, key 0]
Ethernet II, Src: 84:2b:2b:4e:47:b0 (84:2b:2b:4e:47:b0), Dst: 84:2b:2b:4e:7b:a1 (84:2b:2b:4e:7b:a1)
    Destination: 84:2b:2b:4e:7b:a1 (84:2b:2b:4e:7b:a1)
        Address: 84:2b:2b:4e:7b:a1 (84:2b:2b:4e:7b:a1)
        .... ..0. .... .... .... .... = LG bit: Globally unique address (factory default)
        .... ...0 .... .... .... .... = IG bit: Individual address (unicast)
    Source: 84:2b:2b:4e:47:b0 (84:2b:2b:4e:47:b0)
        Address: 84:2b:2b:4e:47:b0 (84:2b:2b:4e:47:b0)
        .... ..0. .... .... .... .... = LG bit: Globally unique address (factory default)
        .... ...0 .... .... .... .... = IG bit: Individual address (unicast)
    Type: IP (0x0800)
Internet Protocol Version 4, Src: 10.163.193.125 (10.163.193.125), Dst: 10.163.193.121 (10.163.193.121)
    Version: 4
    Header Length: 20 bytes
    Differentiated Services Field: 0x00 (DSCP 0x00: Default; ECN: 0x00: Not-ECT (Not ECN-Capable Transport))
        0000 00.. = Differentiated Services Codepoint: Default (0x00)
        .... ..00 = Explicit Congestion Notification: Not-ECT (Not ECN-Capable Transport) (0x00)
    Total Length: 340
    Identification: 0x2b7c (11132)
    Flags: 0x02 (Don't Fragment)
        0... .... = Reserved bit: Not set
        .1.. .... = Don't fragment: Set
        ..0. .... = More fragments: Not set
    Fragment offset: 0
    Time to live: 64
    Protocol: TCP (6)
    Header checksum: 0x75eb [validation disabled]
        [Good: False]
        [Bad: False]
    Source: 10.163.193.125 (10.163.193.125)
    Destination: 10.163.193.121 (10.163.193.121)
    [Source GeoIP: Unknown]
    [Destination GeoIP: Unknown]
Transmission Control Protocol, Src Port: 33857 (33857), Dst Port: 9092 (9092), Seq: 3092314, Ack: 1, Len: 288
    Source Port: 33857 (33857)
    Destination Port: 9092 (9092)
    [Stream index: 14]
    [TCP Segment Len: 288]
    Sequence number: 3092314    (relative sequence number)
    [Next sequence number: 3092602    (relative sequence number)]
    Acknowledgment number: 1    (relative ack number)
    Header Length: 32 bytes
    .... 0000 0001 1000 = Flags: 0x018 (PSH, ACK)
        000. .... .... = Reserved: Not set
        ...0 .... .... = Nonce: Not set
        .... 0... .... = Congestion Window Reduced (CWR): Not set
        .... .0.. .... = ECN-Echo: Not set
        .... ..0. .... = Urgent: Not set
        .... ...1 .... = Acknowledgment: Set
        .... .... 1... = Push: Set
        .... .... .0.. = Reset: Not set
        .... .... ..0. = Syn: Not set
        .... .... ...0 = Fin: Not set
    Window size value: 107
    [Calculated window size: 13696]
    [Window size scaling factor: 128]
    Checksum: 0xea49 [validation disabled]
        [Good Checksum: False]
        [Bad Checksum: False]
    Urgent pointer: 0
    Options: (12 bytes), No-Operation (NOP), No-Operation (NOP), Timestamps
        No-Operation (NOP)
            Type: 1
                0... .... = Copy on fragmentation: No
                .00. .... = Class: Control (0)
                ...0 0001 = Number: No-Operation (NOP) (1)
        No-Operation (NOP)
            Type: 1
                0... .... = Copy on fragmentation: No
                .00. .... = Class: Control (0)
                ...0 0001 = Number: No-Operation (NOP) (1)
        Timestamps: TSval 2762073800, TSecr 2668832112
            Kind: Time Stamp Option (8)
            Length: 10
            Timestamp value: 2762073800
            Timestamp echo reply: 2668832112
    [SEQ/ACK analysis]
        [iRTT: 0.000199000 seconds]
        [Bytes in flight: 288]
Kafka
    Length: 284
    API Key: Produce (0)
    API Version: 0
    Correlation ID: 3484671
    String Length: 13
    Client ID: test-producer
    Required Acks: 0
    Timeout: 10000
    Array Count: 1
    Produce Request Topic
        String Length: 55
        Topic Name: mytopic
        Array Count: 1
        Produce Request Partition
            Partition ID: 4
            Message Set Size: 182
            Message Set
                Offset: 0
                Message Size: 170
                Message
                    CRC32: 0x0ece510e
                    Magic Byte: 0
                    .... ..00 = Compression Codec: None (0)
                    Bytes Length: -1
                    Key: <MISSING>
                    Bytes Length: 156
                    Value: 00b6c4d5be0a0008726f6f740008686b6735000c246a726f...


On Wed, Aug 13, 2014 at 08:15:21AM -0700, Jun Rao wrote:
> Interesting, could you run DumpLogSegments with and w/o deep-iteration and
> send the output around offset 1327?
> 
> Thanks,
> 
> Jun
> 
> 
> On Tue, Aug 12, 2014 at 5:42 PM, Steve Miller <st...@idrathernotsay.com>
> wrote:
> 
> > [ "Aha!", you say, "now I know why this guy's been doing so much tshark
> > stuff!" (-: ]
> >
> >    Hi.  I'm running into a strange situation, in which more or less all of
> > the topics on our Kafka server behave exactly as expected... but the data
> > produced by one family of applications is producing fairly frequent topic
> > corruption.
> >
> >    When this happens, on the client side, the results are all over the
> > place: sometimes you get a ConsumerFetchSizeTooSmall exception, or an
> > exception for an unknown error type, or an invalid-offset error, it's all
> > over the map.
> >
> >    On the server side, I think something like this is the first sign of
> > badness:
> >
> > [2014-08-11 21:03:28,121] ERROR [KafkaApi-1] Error processing
> > ProducerRequest with correlation id 6750 from client test-producer on
> > partition [mytopic,9] (kafka.server.KafkaApis)
> > java.lang.ArrayIndexOutOfBoundsException
> > [2014-08-11 21:03:28,121] INFO [KafkaApi-1] Send the close connection
> > response due to error handling produce request [clientId = test-producer,
> > correlationId = 6750, topicAndPartition = [mytopic,9]] with Ack=0
> > (kafka.server.KafkaApis)
> >
> > shortly thereafter, you begin to see oddness facing the clients:
> >
> > [2014-08-11 21:17:58,132] ERROR [KafkaApi-1] Error when processing fetch
> > request for partition [mytopic,9] offset 1327 from consumer with
> > correlation id 87204 (kafka.server.KafkaApis)
> > java.lang.IllegalStateException: Invalid message size: 0
> >         at kafka.log.FileMessageSet.searchFor(FileMessageSet.scala:127)
> >         at kafka.log.LogSegment.translateOffset(LogSegment.scala:100)
> >         at kafka.log.LogSegment.read(LogSegment.scala:137)
> >         at kafka.log.Log.read(Log.scala:386)
> >         at
> > kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:530)
> >         at
> > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:476)
> >         at
> > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:471)
> >         at
> > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> >         at
> > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> >         at
> > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
> >         at
> > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
> >         at
> > scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> >         at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> >         at
> > kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:471)
> >         at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:437)
> >         at kafka.server.KafkaApis.handle(KafkaApis.scala:186)
> >         at
> > kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
> >         at java.lang.Thread.run(Unknown Source)
> >
> > If I go run the DumpLogSegments tool on the particular topic and partition
> > that's generating the errors, I can see there's corruption in the log:
> >
> > Non-secutive offsets in
> > :/data/d3/kafka/log/mytopic-9/00000000000000000000.log
> >   1327 is followed by 1327
> >
> > The only thing producing data to corrupted topics was also the only thing
> > where snappy compression was turned on in the Java API being used by the
> > producer (it's a Storm topology; we've had the same issue with one in Scala
> > and with one that produces very similar data, but that was written in
> > Java).  We turned that off, published to a different topic name (so it was
> > created fresh), and had a couple of happy days where all was well.  Then we
> > decided that all was well so we tried to go back to the original topic --
> > after we'd verified that all data had aged out of the logs for that topic.
> >  And we started seeing errors again.  So we switched to a different topic
> > again, let it be created, and also started seeing errors on that topic.
> >
> > We have other producers, written in C and Java and python, which are
> > working flawlessly, even though the size of the data they produce and the
> > rate at which they produce it is much larger than what we're seeing with
> > this one problematic producer.  We also have producers written in other
> > languages that produce at very low rates, so it's (probably) not the sort
> > of thing where the issue is masked by more frequent data production.
> >
> > But in any case it looks like there's something the client can send that
> > will corrupt the topic, which seems like something that shouldn't be able
> > to happen.  I know there's at least some error checking for bad protocol
> > requests, as I hacked a python client to produce some corrupt messages and
> > saw an error response from the server.
> >
> > I'm happy to supply more data but I'm not sure what would be useful.  I'm
> > also fine with continuing to dig into this on my own but I'd reached a
> > point where it'd be useful to know if anyone had seen something like this
> > before.  I have a ton o' tcpdumps running and some tail -F greps running on
> > the logs so that if we see that producer error again we can go find the
> > corresponding tcpdump file and hopefully find the smoking gun.  (It turns
> > out that the real-time tshark processing invocations I sent out earlier can
> > get quite far behind; I had that running when the corruption occurred
> > today, but the output processing was a full hour behind the current time,
> > the packet-writing part of tshark was far ahead of the packet-analyzing
> > part!)
> >
> > Are there any particular log4j options I should turn on?  Is there a way
> > to just enable trace logging for a specific topic?  Does trace logging
> > print the contents of the message somewhere, not as something all nice and
> > interpreted but as, say, a bag of hex digits?  I might end up rebuilding
> > kafka and adding some very specialized logging just for this.
> >
> > Kafka 0.8.1.1, JRE 1.6.0-71, Storm 0.9.1, RHEL6, in likely order of
> > importance. (-:  Also, here's the topic description:
> >
> > Topic:mytopic   PartitionCount:10       ReplicationFactor:1     Configs:
> >         Topic: mytopic  Partition: 0    Leader: 0       Replicas: 0
> > Isr: 0
> >         Topic: mytopic  Partition: 1    Leader: 1       Replicas: 1
> > Isr: 1
> >         Topic: mytopic  Partition: 2    Leader: 0       Replicas: 0
> > Isr: 0
> >         Topic: mytopic  Partition: 3    Leader: 1       Replicas: 1
> > Isr: 1
> >         Topic: mytopic  Partition: 4    Leader: 0       Replicas: 0
> > Isr: 0
> >         Topic: mytopic  Partition: 5    Leader: 1       Replicas: 1
> > Isr: 1
> >         Topic: mytopic  Partition: 6    Leader: 0       Replicas: 0
> > Isr: 0
> >         Topic: mytopic  Partition: 7    Leader: 1       Replicas: 1
> > Isr: 1
> >         Topic: mytopic  Partition: 8    Leader: 0       Replicas: 0
> > Isr: 0
> >         Topic: mytopic  Partition: 9    Leader: 1       Replicas: 1
> > Isr: 1
> >
> > (2 brokers, 1 ZK server, no obvious issues with delays or process restarts
> > or disk errors or any of the other usual suspects.  One partition per
> > filesystem.  But my gut says none of that's pertinent, it's a matter of
> > which partition the producer happens to be publishing to when it sends
> > garbage.)
> >
> >
> >         -Steve
> >


Re: Strange topic-corruption issue?

Posted by Jun Rao <ju...@gmail.com>.
Interesting, could you run DumpLogSegments with and w/o deep-iteration and
send the output around offset 1327?

Thanks,

Jun


On Tue, Aug 12, 2014 at 5:42 PM, Steve Miller <st...@idrathernotsay.com>
wrote:

> [ "Aha!", you say, "now I know why this guy's been doing so much tshark
> stuff!" (-: ]
>
>    Hi.  I'm running into a strange situation, in which more or less all of
> the topics on our Kafka server behave exactly as expected... but the data
> produced by one family of applications is producing fairly frequent topic
> corruption.
>
>    When this happens, on the client side, the results are all over the
> place: sometimes you get a ConsumerFetchSizeTooSmall exception, or an
> exception for an unknown error type, or an invalid-offset error, it's all
> over the map.
>
>    On the server side, I think something like this is the first sign of
> badness:
>
> [2014-08-11 21:03:28,121] ERROR [KafkaApi-1] Error processing
> ProducerRequest with correlation id 6750 from client test-producer on
> partition [mytopic,9] (kafka.server.KafkaApis)
> java.lang.ArrayIndexOutOfBoundsException
> [2014-08-11 21:03:28,121] INFO [KafkaApi-1] Send the close connection
> response due to error handling produce request [clientId = test-producer,
> correlationId = 6750, topicAndPartition = [mytopic,9]] with Ack=0
> (kafka.server.KafkaApis)
>
> shortly thereafter, you begin to see oddness facing the clients:
>
> [2014-08-11 21:17:58,132] ERROR [KafkaApi-1] Error when processing fetch
> request for partition [mytopic,9] offset 1327 from consumer with
> correlation id 87204 (kafka.server.KafkaApis)
> java.lang.IllegalStateException: Invalid message size: 0
>         at kafka.log.FileMessageSet.searchFor(FileMessageSet.scala:127)
>         at kafka.log.LogSegment.translateOffset(LogSegment.scala:100)
>         at kafka.log.LogSegment.read(LogSegment.scala:137)
>         at kafka.log.Log.read(Log.scala:386)
>         at
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:530)
>         at
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:476)
>         at
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:471)
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>         at
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
>         at
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
>         at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>         at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>         at
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:471)
>         at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:437)
>         at kafka.server.KafkaApis.handle(KafkaApis.scala:186)
>         at
> kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
>         at java.lang.Thread.run(Unknown Source)
>
> If I go run the DumpLogSegments tool on the particular topic and partition
> that's generating the errors, I can see there's corruption in the log:
>
> Non-secutive offsets in
> :/data/d3/kafka/log/mytopic-9/00000000000000000000.log
>   1327 is followed by 1327
>
> The only thing producing data to corrupted topics was also the only thing
> where snappy compression was turned on in the Java API being used by the
> producer (it's a Storm topology; we've had the same issue with one in Scala
> and with one that produces very similar data, but that was written in
> Java).  We turned that off, published to a different topic name (so it was
> created fresh), and had a couple of happy days where all was well.  Then we
> decided that all was well so we tried to go back to the original topic --
> after we'd verified that all data had aged out of the logs for that topic.
>  And we started seeing errors again.  So we switched to a different topic
> again, let it be created, and also started seeing errors on that topic.
>
> We have other producers, written in C and Java and python, which are
> working flawlessly, even though the size of the data they produce and the
> rate at which they produce it is much larger than what we're seeing with
> this one problematic producer.  We also have producers written in other
> languages that produce at very low rates, so it's (probably) not the sort
> of thing where the issue is masked by more frequent data production.
>
> But in any case it looks like there's something the client can send that
> will corrupt the topic, which seems like something that shouldn't be able
> to happen.  I know there's at least some error checking for bad protocol
> requests, as I hacked a python client to produce some corrupt messages and
> saw an error response from the server.
>
> I'm happy to supply more data but I'm not sure what would be useful.  I'm
> also fine with continuing to dig into this on my own but I'd reached a
> point where it'd be useful to know if anyone had seen something like this
> before.  I have a ton o' tcpdumps running and some tail -F greps running on
> the logs so that if we see that producer error again we can go find the
> corresponding tcpdump file and hopefully find the smoking gun.  (It turns
> out that the real-time tshark processing invocations I sent out earlier can
> get quite far behind; I had that running when the corruption occurred
> today, but the output processing was a full hour behind the current time,
> the packet-writing part of tshark was far ahead of the packet-analyzing
> part!)
>
> Are there any particular log4j options I should turn on?  Is there a way
> to just enable trace logging for a specific topic?  Does trace logging
> print the contents of the message somewhere, not as something all nice and
> interpreted but as, say, a bag of hex digits?  I might end up rebuilding
> kafka and adding some very specialized logging just for this.
>
> Kafka 0.8.1.1, JRE 1.6.0-71, Storm 0.9.1, RHEL6, in likely order of
> importance. (-:  Also, here's the topic description:
>
> Topic:mytopic   PartitionCount:10       ReplicationFactor:1     Configs:
>         Topic: mytopic  Partition: 0    Leader: 0       Replicas: 0
> Isr: 0
>         Topic: mytopic  Partition: 1    Leader: 1       Replicas: 1
> Isr: 1
>         Topic: mytopic  Partition: 2    Leader: 0       Replicas: 0
> Isr: 0
>         Topic: mytopic  Partition: 3    Leader: 1       Replicas: 1
> Isr: 1
>         Topic: mytopic  Partition: 4    Leader: 0       Replicas: 0
> Isr: 0
>         Topic: mytopic  Partition: 5    Leader: 1       Replicas: 1
> Isr: 1
>         Topic: mytopic  Partition: 6    Leader: 0       Replicas: 0
> Isr: 0
>         Topic: mytopic  Partition: 7    Leader: 1       Replicas: 1
> Isr: 1
>         Topic: mytopic  Partition: 8    Leader: 0       Replicas: 0
> Isr: 0
>         Topic: mytopic  Partition: 9    Leader: 1       Replicas: 1
> Isr: 1
>
> (2 brokers, 1 ZK server, no obvious issues with delays or process restarts
> or disk errors or any of the other usual suspects.  One partition per
> filesystem.  But my gut says none of that's pertinent, it's a matter of
> which partition the producer happens to be publishing to when it sends
> garbage.)
>
>
>         -Steve
>