You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Gaurav Agarwal <ga...@gmail.com> on 2016/08/27 08:41:01 UTC

Kafka bootup exception while recovering log file

Hi All,

We are facing a weird problem where Kafka broker fails to start due to an
unhandled exception while 'recovering' a log segment. I have been able to
isolate the problem to a single record and providing the details below:

During Kafka restart, if index files are corrupted or they don't exist,
kafka broker is trying to 'recover' a LogSegment and rebuild the indexes -
LogSegment:recover()
I the main while loop here which iterates over the entries in the log:
while(iter.hasNext) { val entry = iter.next....}, I get an entry with
complete underlying byte buffer as follows:

[82, 30, -91, -37, 0, 2, -1, -1, -1, -1, 0, 0, 0, -59, -126, 83, 78, 65,
80, 80, 89, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, -79, -58, 1, 0, 0, 25, 1,
16, -68, 48, -78, -101, -61, 5, 15, -16, 74, 20, 49, 48, 48, 48, 48, 58,
49, 52, 52, 58, 50, 48, 54, 53, 52, 52, 54, 48, 56, 48, 0, 0, 0, -102, 10,
39, 99, 111, 109, 46, 118, 110, 101, 114, 97, 46, 109, 111, 100, 101, 108,
46, 105, 110, 118, 101, 110, 116, 111, 114, 121, 46, 97, 100, 100, 46, 114,
101, 102, 101, 114, 101, 110, 99, 101, 16, -120, -115, -16, -64, -22, 42,
26, 57, 25, 48, 72, 112, 114, 111, 103, 114, 97, 109, 115, 46, 115, 116,
111, 114, 101, 46, 118, 109, 119, 97, 1, 7, 72, 99, 101, 110, 116, 101,
114, 46, 109, 97, 112, 112, 101, 114, 46, 72, 111, 115, 116, 77, 5, 11, 8,
34, 20, 34, 9, -122, 56, 52, 58, 49, 49, 50, 54, 49, 50, 53, 52, 57, 50,
34, 66, 20, 9, 21, 56, 49, 52, 52, 58, 50, 48, 54, 53, 52, 52, 54, 48, 56,
48, 72]

A toString() on this entry yields:

*MessageAndOffset(Message(magic = 0, attributes = 2, crc = 1377740251, key
= null, payload = java.nio.HeapByteBuffer[pos=0 lim=197 cap=197]),4449011)*

It appears that this record is corrupt and deserializing/decompressing it
causes exceptions which are unhandled. Specifically in 0.10.0 version this
calls fails with NoSuchElementException

ByteBufferMessageSet.deepIterator(entry).next().offset

Note: This message was written to disk using* kafka 0.10.0 broker running
snappy jar version 1.1.1.7* (which is known to have some read time bugs).
The log file itself is 512MB large and this message appears at around 4MB
in the file.

We have upgraded snappy; but should this condition be handled correctly?
What is the correct behavior here? Should the exception be handled and log
file be truncated? At the moment this causes kafka to completely crash with
no recovery path except of deleting the bad data file manually and then
starting kafka.

--

cheers,

gaurav


A test case to repro the crash

@Test

def testCorruptLog() {

 val buf = Array[Byte](82, 30, -91, -37, 0, 2, -1, -1, -1, -1, 0, 0, 0,
-59, -126, 83, 78, 65, 80, 80, 89, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, -79,
-58, 1, 0, 0, 25, 1, 16, -68, 48, -78, -101, -61, 5, 15, -16, 74, 20, 49,
48, 48, 48, 48, 58, 49, 52, 52, 58, 50, 48, 54, 53, 52, 52, 54, 48, 56, 48,
0, 0, 0, -102, 10, 39, 99, 111, 109, 46, 118, 110, 101, 114, 97, 46, 109,
111, 100, 101, 108, 46, 105, 110, 118, 101, 110, 116, 111, 114, 121, 46,
97, 100, 100, 46, 114, 101, 102, 101, 114, 101, 110, 99, 101, 16, -120,
-115, -16, -64, -22, 42, 26, 57, 25, 48, 72, 112, 114, 111, 103, 114, 97,
109, 115, 46, 115, 116, 111, 114, 101, 46, 118, 109, 119, 97, 1, 7, 72, 99,
101, 110, 116, 101, 114, 46, 109, 97, 112, 112, 101, 114, 46, 72, 111, 115,
116, 77, 5, 11, 8, 34, 20, 34, 9, -122, 56, 52, 58, 49, 49, 50, 54, 49, 50,
53, 52, 57, 50, 34, 66, 20, 9, 21, 56, 49, 52, 52, 58, 50, 48, 54, 53, 52,
52, 54, 48, 56, 48, 72);

  val msg = new Message(ByteBuffer.wrap(buf), None, None)

  val entry = new MessageAndOffset(msg, 4449011L)

  val deepIterator: Iterator[MessageAndOffset] = ByteBufferMessageSet.
deepIterator(entry)

  deepIterator.next().offset

}

Re: Kafka bootup exception while recovering log file

Posted by Tom Crayford <tc...@heroku.com>.
This sounds like Kafka not being entirely robust to disk corruption, which
seems entirely possible and normal. I'd simply delete that log file and let
a replica replay catch it up at broker bootup.

Trying to guard against all possible disk corruption bugs sounds very
difficult to me, it seems better to let the operator handle corruption on a
case by case basis.

On Tue, Sep 6, 2016 at 7:14 AM, Jaikiran Pai <ja...@gmail.com>
wrote:

> I'm not from the Kafka dev team so I won't be able to comment whether this
> is an expected way to fail or if this needs to be handled in a more
> cleaner/robust manner (at least very least probably a better exception
> message). Since you have put in efforts to write a test case and narrow it
> down to this specific flow, maybe you can send a mail to their dev mailing
> list and/or maybe create a JIRA to report this.
>
> -Jaikiran
>
>
> On Tuesday 30 August 2016 12:07 PM, Gaurav Agarwal wrote:
>
>> Kafka version: 0.10.0
>>
>> Exception Trace
>> --------------------
>> java.util.NoSuchElementException
>> at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:37)
>> at kafka.log.LogSegment.recover(LogSegment.scala:189)
>> at kafka.log.Log.recoverLog(Log.scala:268)
>> at kafka.log.Log.loadSegments(Log.scala:243)
>> at kafka.log.Log.<init>(Log.scala:101)
>> at kafka.log.LogTest.testCorruptLog(LogTest.scala:830)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>> ssorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>> thodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:497)
>> at
>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(
>> FrameworkMethod.java:50)
>> at
>> org.junit.internal.runners.model.ReflectiveCallable.run(Refl
>> ectiveCallable.java:12)
>> at
>> org.junit.runners.model.FrameworkMethod.invokeExplosively(Fr
>> ameworkMethod.java:47)
>> at
>> org.junit.internal.runners.statements.InvokeMethod.evaluate(
>> InvokeMethod.java:17)
>> at
>> org.junit.internal.runners.statements.RunBefores.evaluate(
>> RunBefores.java:26)
>> at
>> org.junit.internal.runners.statements.RunAfters.evaluate(Run
>> Afters.java:27)
>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>> at
>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit
>> 4ClassRunner.java:78)
>> at
>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit
>> 4ClassRunner.java:57)
>> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>> at
>> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs
>> (JUnit4IdeaTestRunner.java:117)
>> at
>> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs
>> (JUnit4IdeaTestRunner.java:42)
>> at
>> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsA
>> ndStart(JUnitStarter.java:262)
>> at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStart
>> er.java:84)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>> ssorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>> thodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:497)
>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
>>
>> Test Code (same exception trace is see in broker logs as well on prod
>> machines with exactly the same lof files as given in this mini test)
>> ---------
>>
>> val logProps = new Properties()
>> logProps.put(LogConfig.MaxMessageBytesProp, 15 * 1024 * 1024:
>> java.lang.Integer)
>> val config = LogConfig(logProps)
>> val cp = new File("/Users/gaurav/Downloads/corrupt/gaurav/kafka-logs/Topi
>> c3-12")
>> var log = new Log(cp, config, 0, time.scheduler, time
>>
>>
>> On Tue, Aug 30, 2016 at 11:37 AM, Jaikiran Pai <ja...@gmail.com>
>> wrote:
>>
>> Can you paste the entire exception stacktrace please?
>>>
>>> -Jaikiran
>>>
>>> On Tuesday 30 August 2016 11:23 AM, Gaurav Agarwal wrote:
>>>
>>> Hi there, just wanted to bump up the thread one more time to check if
>>>> someone can point us in the right direction... This one was quite a
>>>> serious
>>>> failure that took down many of our kafka brokers..
>>>>
>>>> On Sat, Aug 27, 2016 at 2:11 PM, Gaurav Agarwal <
>>>> gauravagarwal4@gmail.com
>>>> wrote:
>>>>
>>>> Hi All,
>>>>
>>>>> We are facing a weird problem where Kafka broker fails to start due to
>>>>> an
>>>>> unhandled exception while 'recovering' a log segment. I have been able
>>>>> to
>>>>> isolate the problem to a single record and providing the details below:
>>>>>
>>>>> During Kafka restart, if index files are corrupted or they don't exist,
>>>>> kafka broker is trying to 'recover' a LogSegment and rebuild the
>>>>> indexes
>>>>> -
>>>>> LogSegment:recover()
>>>>> I the main while loop here which iterates over the entries in the log:
>>>>> while(iter.hasNext) { val entry = iter.next....}, I get an entry with
>>>>> complete underlying byte buffer as follows:
>>>>>
>>>>> [82, 30, -91, -37, 0, 2, -1, -1, -1, -1, 0, 0, 0, -59, -126, 83, 78,
>>>>> 65,
>>>>> 80, 80, 89, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, -79, -58, 1, 0, 0, 25,
>>>>> 1,
>>>>> 16, -68, 48, -78, -101, -61, 5, 15, -16, 74, 20, 49, 48, 48, 48, 48,
>>>>> 58,
>>>>> 49, 52, 52, 58, 50, 48, 54, 53, 52, 52, 54, 48, 56, 48, 0, 0, 0, -102,
>>>>> 10,
>>>>> 39, 99, 111, 109, 46, 118, 110, 101, 114, 97, 46, 109, 111, 100, 101,
>>>>> 108,
>>>>> 46, 105, 110, 118, 101, 110, 116, 111, 114, 121, 46, 97, 100, 100, 46,
>>>>> 114,
>>>>> 101, 102, 101, 114, 101, 110, 99, 101, 16, -120, -115, -16, -64, -22,
>>>>> 42,
>>>>> 26, 57, 25, 48, 72, 112, 114, 111, 103, 114, 97, 109, 115, 46, 115,
>>>>> 116,
>>>>> 111, 114, 101, 46, 118, 109, 119, 97, 1, 7, 72, 99, 101, 110, 116, 101,
>>>>> 114, 46, 109, 97, 112, 112, 101, 114, 46, 72, 111, 115, 116, 77, 5, 11,
>>>>> 8,
>>>>> 34, 20, 34, 9, -122, 56, 52, 58, 49, 49, 50, 54, 49, 50, 53, 52, 57,
>>>>> 50,
>>>>> 34, 66, 20, 9, 21, 56, 49, 52, 52, 58, 50, 48, 54, 53, 52, 52, 54, 48,
>>>>> 56,
>>>>> 48, 72]
>>>>>
>>>>> A toString() on this entry yields:
>>>>>
>>>>> *MessageAndOffset(Message(magic = 0, attributes = 2, crc = 1377740251,
>>>>> key
>>>>> = null, payload = java.nio.HeapByteBuffer[pos=0 lim=197
>>>>> cap=197]),4449011)*
>>>>>
>>>>> It appears that this record is corrupt and deserializing/decompressing
>>>>> it
>>>>> causes exceptions which are unhandled. Specifically in 0.10.0 version
>>>>> this
>>>>> calls fails with NoSuchElementException
>>>>>
>>>>> ByteBufferMessageSet.deepIterator(entry).next().offset
>>>>>
>>>>> Note: This message was written to disk using* kafka 0.10.0 broker
>>>>> running
>>>>> snappy jar version 1.1.1.7* (which is known to have some read time
>>>>> bugs).
>>>>>
>>>>> The log file itself is 512MB large and this message appears at around
>>>>> 4MB
>>>>> in the file.
>>>>>
>>>>> We have upgraded snappy; but should this condition be handled
>>>>> correctly?
>>>>> What is the correct behavior here? Should the exception be handled and
>>>>> log
>>>>> file be truncated? At the moment this causes kafka to completely crash
>>>>> with
>>>>> no recovery path except of deleting the bad data file manually and then
>>>>> starting kafka.
>>>>>
>>>>> --
>>>>>
>>>>> cheers,
>>>>>
>>>>> gaurav
>>>>>
>>>>>
>>>>> A test case to repro the crash
>>>>>
>>>>> @Test
>>>>>
>>>>> def testCorruptLog() {
>>>>>
>>>>>    val buf = Array[Byte](82, 30, -91, -37, 0, 2, -1, -1, -1, -1, 0, 0,
>>>>> 0,
>>>>> -59, -126, 83, 78, 65, 80, 80, 89, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0,
>>>>> -79,
>>>>> -58, 1, 0, 0, 25, 1, 16, -68, 48, -78, -101, -61, 5, 15, -16, 74, 20,
>>>>> 49,
>>>>> 48, 48, 48, 48, 58, 49, 52, 52, 58, 50, 48, 54, 53, 52, 52, 54, 48, 56,
>>>>> 48,
>>>>> 0, 0, 0, -102, 10, 39, 99, 111, 109, 46, 118, 110, 101, 114, 97, 46,
>>>>> 109,
>>>>> 111, 100, 101, 108, 46, 105, 110, 118, 101, 110, 116, 111, 114, 121,
>>>>> 46,
>>>>> 97, 100, 100, 46, 114, 101, 102, 101, 114, 101, 110, 99, 101, 16, -120,
>>>>> -115, -16, -64, -22, 42, 26, 57, 25, 48, 72, 112, 114, 111, 103, 114,
>>>>> 97,
>>>>> 109, 115, 46, 115, 116, 111, 114, 101, 46, 118, 109, 119, 97, 1, 7, 72,
>>>>> 99,
>>>>> 101, 110, 116, 101, 114, 46, 109, 97, 112, 112, 101, 114, 46, 72, 111,
>>>>> 115,
>>>>> 116, 77, 5, 11, 8, 34, 20, 34, 9, -122, 56, 52, 58, 49, 49, 50, 54, 49,
>>>>> 50,
>>>>> 53, 52, 57, 50, 34, 66, 20, 9, 21, 56, 49, 52, 52, 58, 50, 48, 54, 53,
>>>>> 52,
>>>>> 52, 54, 48, 56, 48, 72);
>>>>>
>>>>>     val msg = new Message(ByteBuffer.wrap(buf), None, None)
>>>>>
>>>>>     val entry = new MessageAndOffset(msg, 4449011L)
>>>>>
>>>>>     val deepIterator: Iterator[MessageAndOffset] =
>>>>> ByteBufferMessageSet.deepIterator(entry)
>>>>>
>>>>>     deepIterator.next().offset
>>>>>
>>>>> }
>>>>>
>>>>>
>>>>>
>

Re: Kafka bootup exception while recovering log file

Posted by Jaikiran Pai <ja...@gmail.com>.
I'm not from the Kafka dev team so I won't be able to comment whether 
this is an expected way to fail or if this needs to be handled in a more 
cleaner/robust manner (at least very least probably a better exception 
message). Since you have put in efforts to write a test case and narrow 
it down to this specific flow, maybe you can send a mail to their dev 
mailing list and/or maybe create a JIRA to report this.

-Jaikiran

On Tuesday 30 August 2016 12:07 PM, Gaurav Agarwal wrote:
> Kafka version: 0.10.0
>
> Exception Trace
> --------------------
> java.util.NoSuchElementException
> at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:37)
> at kafka.log.LogSegment.recover(LogSegment.scala:189)
> at kafka.log.Log.recoverLog(Log.scala:268)
> at kafka.log.Log.loadSegments(Log.scala:243)
> at kafka.log.Log.<init>(Log.scala:101)
> at kafka.log.LogTest.testCorruptLog(LogTest.scala:830)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> at
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:117)
> at
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42)
> at
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:262)
> at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:84)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
>
> Test Code (same exception trace is see in broker logs as well on prod
> machines with exactly the same lof files as given in this mini test)
> ---------
>
> val logProps = new Properties()
> logProps.put(LogConfig.MaxMessageBytesProp, 15 * 1024 * 1024: java.lang.Integer)
> val config = LogConfig(logProps)
> val cp = new File("/Users/gaurav/Downloads/corrupt/gaurav/kafka-logs/Topic3-12")
> var log = new Log(cp, config, 0, time.scheduler, time
>
>
> On Tue, Aug 30, 2016 at 11:37 AM, Jaikiran Pai <ja...@gmail.com>
> wrote:
>
>> Can you paste the entire exception stacktrace please?
>>
>> -Jaikiran
>>
>> On Tuesday 30 August 2016 11:23 AM, Gaurav Agarwal wrote:
>>
>>> Hi there, just wanted to bump up the thread one more time to check if
>>> someone can point us in the right direction... This one was quite a
>>> serious
>>> failure that took down many of our kafka brokers..
>>>
>>> On Sat, Aug 27, 2016 at 2:11 PM, Gaurav Agarwal <gauravagarwal4@gmail.com
>>> wrote:
>>>
>>> Hi All,
>>>> We are facing a weird problem where Kafka broker fails to start due to an
>>>> unhandled exception while 'recovering' a log segment. I have been able to
>>>> isolate the problem to a single record and providing the details below:
>>>>
>>>> During Kafka restart, if index files are corrupted or they don't exist,
>>>> kafka broker is trying to 'recover' a LogSegment and rebuild the indexes
>>>> -
>>>> LogSegment:recover()
>>>> I the main while loop here which iterates over the entries in the log:
>>>> while(iter.hasNext) { val entry = iter.next....}, I get an entry with
>>>> complete underlying byte buffer as follows:
>>>>
>>>> [82, 30, -91, -37, 0, 2, -1, -1, -1, -1, 0, 0, 0, -59, -126, 83, 78, 65,
>>>> 80, 80, 89, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, -79, -58, 1, 0, 0, 25, 1,
>>>> 16, -68, 48, -78, -101, -61, 5, 15, -16, 74, 20, 49, 48, 48, 48, 48, 58,
>>>> 49, 52, 52, 58, 50, 48, 54, 53, 52, 52, 54, 48, 56, 48, 0, 0, 0, -102,
>>>> 10,
>>>> 39, 99, 111, 109, 46, 118, 110, 101, 114, 97, 46, 109, 111, 100, 101,
>>>> 108,
>>>> 46, 105, 110, 118, 101, 110, 116, 111, 114, 121, 46, 97, 100, 100, 46,
>>>> 114,
>>>> 101, 102, 101, 114, 101, 110, 99, 101, 16, -120, -115, -16, -64, -22, 42,
>>>> 26, 57, 25, 48, 72, 112, 114, 111, 103, 114, 97, 109, 115, 46, 115, 116,
>>>> 111, 114, 101, 46, 118, 109, 119, 97, 1, 7, 72, 99, 101, 110, 116, 101,
>>>> 114, 46, 109, 97, 112, 112, 101, 114, 46, 72, 111, 115, 116, 77, 5, 11,
>>>> 8,
>>>> 34, 20, 34, 9, -122, 56, 52, 58, 49, 49, 50, 54, 49, 50, 53, 52, 57, 50,
>>>> 34, 66, 20, 9, 21, 56, 49, 52, 52, 58, 50, 48, 54, 53, 52, 52, 54, 48,
>>>> 56,
>>>> 48, 72]
>>>>
>>>> A toString() on this entry yields:
>>>>
>>>> *MessageAndOffset(Message(magic = 0, attributes = 2, crc = 1377740251,
>>>> key
>>>> = null, payload = java.nio.HeapByteBuffer[pos=0 lim=197
>>>> cap=197]),4449011)*
>>>>
>>>> It appears that this record is corrupt and deserializing/decompressing it
>>>> causes exceptions which are unhandled. Specifically in 0.10.0 version
>>>> this
>>>> calls fails with NoSuchElementException
>>>>
>>>> ByteBufferMessageSet.deepIterator(entry).next().offset
>>>>
>>>> Note: This message was written to disk using* kafka 0.10.0 broker running
>>>> snappy jar version 1.1.1.7* (which is known to have some read time bugs).
>>>>
>>>> The log file itself is 512MB large and this message appears at around 4MB
>>>> in the file.
>>>>
>>>> We have upgraded snappy; but should this condition be handled correctly?
>>>> What is the correct behavior here? Should the exception be handled and
>>>> log
>>>> file be truncated? At the moment this causes kafka to completely crash
>>>> with
>>>> no recovery path except of deleting the bad data file manually and then
>>>> starting kafka.
>>>>
>>>> --
>>>>
>>>> cheers,
>>>>
>>>> gaurav
>>>>
>>>>
>>>> A test case to repro the crash
>>>>
>>>> @Test
>>>>
>>>> def testCorruptLog() {
>>>>
>>>>    val buf = Array[Byte](82, 30, -91, -37, 0, 2, -1, -1, -1, -1, 0, 0, 0,
>>>> -59, -126, 83, 78, 65, 80, 80, 89, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0,
>>>> -79,
>>>> -58, 1, 0, 0, 25, 1, 16, -68, 48, -78, -101, -61, 5, 15, -16, 74, 20, 49,
>>>> 48, 48, 48, 48, 58, 49, 52, 52, 58, 50, 48, 54, 53, 52, 52, 54, 48, 56,
>>>> 48,
>>>> 0, 0, 0, -102, 10, 39, 99, 111, 109, 46, 118, 110, 101, 114, 97, 46, 109,
>>>> 111, 100, 101, 108, 46, 105, 110, 118, 101, 110, 116, 111, 114, 121, 46,
>>>> 97, 100, 100, 46, 114, 101, 102, 101, 114, 101, 110, 99, 101, 16, -120,
>>>> -115, -16, -64, -22, 42, 26, 57, 25, 48, 72, 112, 114, 111, 103, 114, 97,
>>>> 109, 115, 46, 115, 116, 111, 114, 101, 46, 118, 109, 119, 97, 1, 7, 72,
>>>> 99,
>>>> 101, 110, 116, 101, 114, 46, 109, 97, 112, 112, 101, 114, 46, 72, 111,
>>>> 115,
>>>> 116, 77, 5, 11, 8, 34, 20, 34, 9, -122, 56, 52, 58, 49, 49, 50, 54, 49,
>>>> 50,
>>>> 53, 52, 57, 50, 34, 66, 20, 9, 21, 56, 49, 52, 52, 58, 50, 48, 54, 53,
>>>> 52,
>>>> 52, 54, 48, 56, 48, 72);
>>>>
>>>>     val msg = new Message(ByteBuffer.wrap(buf), None, None)
>>>>
>>>>     val entry = new MessageAndOffset(msg, 4449011L)
>>>>
>>>>     val deepIterator: Iterator[MessageAndOffset] =
>>>> ByteBufferMessageSet.deepIterator(entry)
>>>>
>>>>     deepIterator.next().offset
>>>>
>>>> }
>>>>
>>>>


Re: Kafka bootup exception while recovering log file

Posted by Gaurav Agarwal <ga...@gmail.com>.
Kafka version: 0.10.0

Exception Trace
--------------------
java.util.NoSuchElementException
at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:37)
at kafka.log.LogSegment.recover(LogSegment.scala:189)
at kafka.log.Log.recoverLog(Log.scala:268)
at kafka.log.Log.loadSegments(Log.scala:243)
at kafka.log.Log.<init>(Log.scala:101)
at kafka.log.LogTest.testCorruptLog(LogTest.scala:830)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:117)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42)
at
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:262)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:84)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)

Test Code (same exception trace is see in broker logs as well on prod
machines with exactly the same lof files as given in this mini test)
---------

val logProps = new Properties()
logProps.put(LogConfig.MaxMessageBytesProp, 15 * 1024 * 1024: java.lang.Integer)
val config = LogConfig(logProps)
val cp = new File("/Users/gaurav/Downloads/corrupt/gaurav/kafka-logs/Topic3-12")
var log = new Log(cp, config, 0, time.scheduler, time


On Tue, Aug 30, 2016 at 11:37 AM, Jaikiran Pai <ja...@gmail.com>
wrote:

> Can you paste the entire exception stacktrace please?
>
> -Jaikiran
>
> On Tuesday 30 August 2016 11:23 AM, Gaurav Agarwal wrote:
>
>> Hi there, just wanted to bump up the thread one more time to check if
>> someone can point us in the right direction... This one was quite a
>> serious
>> failure that took down many of our kafka brokers..
>>
>> On Sat, Aug 27, 2016 at 2:11 PM, Gaurav Agarwal <gauravagarwal4@gmail.com
>> >
>> wrote:
>>
>> Hi All,
>>>
>>> We are facing a weird problem where Kafka broker fails to start due to an
>>> unhandled exception while 'recovering' a log segment. I have been able to
>>> isolate the problem to a single record and providing the details below:
>>>
>>> During Kafka restart, if index files are corrupted or they don't exist,
>>> kafka broker is trying to 'recover' a LogSegment and rebuild the indexes
>>> -
>>> LogSegment:recover()
>>> I the main while loop here which iterates over the entries in the log:
>>> while(iter.hasNext) { val entry = iter.next....}, I get an entry with
>>> complete underlying byte buffer as follows:
>>>
>>> [82, 30, -91, -37, 0, 2, -1, -1, -1, -1, 0, 0, 0, -59, -126, 83, 78, 65,
>>> 80, 80, 89, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, -79, -58, 1, 0, 0, 25, 1,
>>> 16, -68, 48, -78, -101, -61, 5, 15, -16, 74, 20, 49, 48, 48, 48, 48, 58,
>>> 49, 52, 52, 58, 50, 48, 54, 53, 52, 52, 54, 48, 56, 48, 0, 0, 0, -102,
>>> 10,
>>> 39, 99, 111, 109, 46, 118, 110, 101, 114, 97, 46, 109, 111, 100, 101,
>>> 108,
>>> 46, 105, 110, 118, 101, 110, 116, 111, 114, 121, 46, 97, 100, 100, 46,
>>> 114,
>>> 101, 102, 101, 114, 101, 110, 99, 101, 16, -120, -115, -16, -64, -22, 42,
>>> 26, 57, 25, 48, 72, 112, 114, 111, 103, 114, 97, 109, 115, 46, 115, 116,
>>> 111, 114, 101, 46, 118, 109, 119, 97, 1, 7, 72, 99, 101, 110, 116, 101,
>>> 114, 46, 109, 97, 112, 112, 101, 114, 46, 72, 111, 115, 116, 77, 5, 11,
>>> 8,
>>> 34, 20, 34, 9, -122, 56, 52, 58, 49, 49, 50, 54, 49, 50, 53, 52, 57, 50,
>>> 34, 66, 20, 9, 21, 56, 49, 52, 52, 58, 50, 48, 54, 53, 52, 52, 54, 48,
>>> 56,
>>> 48, 72]
>>>
>>> A toString() on this entry yields:
>>>
>>> *MessageAndOffset(Message(magic = 0, attributes = 2, crc = 1377740251,
>>> key
>>> = null, payload = java.nio.HeapByteBuffer[pos=0 lim=197
>>> cap=197]),4449011)*
>>>
>>> It appears that this record is corrupt and deserializing/decompressing it
>>> causes exceptions which are unhandled. Specifically in 0.10.0 version
>>> this
>>> calls fails with NoSuchElementException
>>>
>>> ByteBufferMessageSet.deepIterator(entry).next().offset
>>>
>>> Note: This message was written to disk using* kafka 0.10.0 broker running
>>> snappy jar version 1.1.1.7* (which is known to have some read time bugs).
>>>
>>> The log file itself is 512MB large and this message appears at around 4MB
>>> in the file.
>>>
>>> We have upgraded snappy; but should this condition be handled correctly?
>>> What is the correct behavior here? Should the exception be handled and
>>> log
>>> file be truncated? At the moment this causes kafka to completely crash
>>> with
>>> no recovery path except of deleting the bad data file manually and then
>>> starting kafka.
>>>
>>> --
>>>
>>> cheers,
>>>
>>> gaurav
>>>
>>>
>>> A test case to repro the crash
>>>
>>> @Test
>>>
>>> def testCorruptLog() {
>>>
>>>   val buf = Array[Byte](82, 30, -91, -37, 0, 2, -1, -1, -1, -1, 0, 0, 0,
>>> -59, -126, 83, 78, 65, 80, 80, 89, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0,
>>> -79,
>>> -58, 1, 0, 0, 25, 1, 16, -68, 48, -78, -101, -61, 5, 15, -16, 74, 20, 49,
>>> 48, 48, 48, 48, 58, 49, 52, 52, 58, 50, 48, 54, 53, 52, 52, 54, 48, 56,
>>> 48,
>>> 0, 0, 0, -102, 10, 39, 99, 111, 109, 46, 118, 110, 101, 114, 97, 46, 109,
>>> 111, 100, 101, 108, 46, 105, 110, 118, 101, 110, 116, 111, 114, 121, 46,
>>> 97, 100, 100, 46, 114, 101, 102, 101, 114, 101, 110, 99, 101, 16, -120,
>>> -115, -16, -64, -22, 42, 26, 57, 25, 48, 72, 112, 114, 111, 103, 114, 97,
>>> 109, 115, 46, 115, 116, 111, 114, 101, 46, 118, 109, 119, 97, 1, 7, 72,
>>> 99,
>>> 101, 110, 116, 101, 114, 46, 109, 97, 112, 112, 101, 114, 46, 72, 111,
>>> 115,
>>> 116, 77, 5, 11, 8, 34, 20, 34, 9, -122, 56, 52, 58, 49, 49, 50, 54, 49,
>>> 50,
>>> 53, 52, 57, 50, 34, 66, 20, 9, 21, 56, 49, 52, 52, 58, 50, 48, 54, 53,
>>> 52,
>>> 52, 54, 48, 56, 48, 72);
>>>
>>>    val msg = new Message(ByteBuffer.wrap(buf), None, None)
>>>
>>>    val entry = new MessageAndOffset(msg, 4449011L)
>>>
>>>    val deepIterator: Iterator[MessageAndOffset] =
>>> ByteBufferMessageSet.deepIterator(entry)
>>>
>>>    deepIterator.next().offset
>>>
>>> }
>>>
>>>
>

Re: Kafka bootup exception while recovering log file

Posted by Jaikiran Pai <ja...@gmail.com>.
Can you paste the entire exception stacktrace please?

-Jaikiran
On Tuesday 30 August 2016 11:23 AM, Gaurav Agarwal wrote:
> Hi there, just wanted to bump up the thread one more time to check if
> someone can point us in the right direction... This one was quite a serious
> failure that took down many of our kafka brokers..
>
> On Sat, Aug 27, 2016 at 2:11 PM, Gaurav Agarwal <ga...@gmail.com>
> wrote:
>
>> Hi All,
>>
>> We are facing a weird problem where Kafka broker fails to start due to an
>> unhandled exception while 'recovering' a log segment. I have been able to
>> isolate the problem to a single record and providing the details below:
>>
>> During Kafka restart, if index files are corrupted or they don't exist,
>> kafka broker is trying to 'recover' a LogSegment and rebuild the indexes -
>> LogSegment:recover()
>> I the main while loop here which iterates over the entries in the log:
>> while(iter.hasNext) { val entry = iter.next....}, I get an entry with
>> complete underlying byte buffer as follows:
>>
>> [82, 30, -91, -37, 0, 2, -1, -1, -1, -1, 0, 0, 0, -59, -126, 83, 78, 65,
>> 80, 80, 89, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, -79, -58, 1, 0, 0, 25, 1,
>> 16, -68, 48, -78, -101, -61, 5, 15, -16, 74, 20, 49, 48, 48, 48, 48, 58,
>> 49, 52, 52, 58, 50, 48, 54, 53, 52, 52, 54, 48, 56, 48, 0, 0, 0, -102, 10,
>> 39, 99, 111, 109, 46, 118, 110, 101, 114, 97, 46, 109, 111, 100, 101, 108,
>> 46, 105, 110, 118, 101, 110, 116, 111, 114, 121, 46, 97, 100, 100, 46, 114,
>> 101, 102, 101, 114, 101, 110, 99, 101, 16, -120, -115, -16, -64, -22, 42,
>> 26, 57, 25, 48, 72, 112, 114, 111, 103, 114, 97, 109, 115, 46, 115, 116,
>> 111, 114, 101, 46, 118, 109, 119, 97, 1, 7, 72, 99, 101, 110, 116, 101,
>> 114, 46, 109, 97, 112, 112, 101, 114, 46, 72, 111, 115, 116, 77, 5, 11, 8,
>> 34, 20, 34, 9, -122, 56, 52, 58, 49, 49, 50, 54, 49, 50, 53, 52, 57, 50,
>> 34, 66, 20, 9, 21, 56, 49, 52, 52, 58, 50, 48, 54, 53, 52, 52, 54, 48, 56,
>> 48, 72]
>>
>> A toString() on this entry yields:
>>
>> *MessageAndOffset(Message(magic = 0, attributes = 2, crc = 1377740251, key
>> = null, payload = java.nio.HeapByteBuffer[pos=0 lim=197 cap=197]),4449011)*
>>
>> It appears that this record is corrupt and deserializing/decompressing it
>> causes exceptions which are unhandled. Specifically in 0.10.0 version this
>> calls fails with NoSuchElementException
>>
>> ByteBufferMessageSet.deepIterator(entry).next().offset
>>
>> Note: This message was written to disk using* kafka 0.10.0 broker running
>> snappy jar version 1.1.1.7* (which is known to have some read time bugs).
>> The log file itself is 512MB large and this message appears at around 4MB
>> in the file.
>>
>> We have upgraded snappy; but should this condition be handled correctly?
>> What is the correct behavior here? Should the exception be handled and log
>> file be truncated? At the moment this causes kafka to completely crash with
>> no recovery path except of deleting the bad data file manually and then
>> starting kafka.
>>
>> --
>>
>> cheers,
>>
>> gaurav
>>
>>
>> A test case to repro the crash
>>
>> @Test
>>
>> def testCorruptLog() {
>>
>>   val buf = Array[Byte](82, 30, -91, -37, 0, 2, -1, -1, -1, -1, 0, 0, 0,
>> -59, -126, 83, 78, 65, 80, 80, 89, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, -79,
>> -58, 1, 0, 0, 25, 1, 16, -68, 48, -78, -101, -61, 5, 15, -16, 74, 20, 49,
>> 48, 48, 48, 48, 58, 49, 52, 52, 58, 50, 48, 54, 53, 52, 52, 54, 48, 56, 48,
>> 0, 0, 0, -102, 10, 39, 99, 111, 109, 46, 118, 110, 101, 114, 97, 46, 109,
>> 111, 100, 101, 108, 46, 105, 110, 118, 101, 110, 116, 111, 114, 121, 46,
>> 97, 100, 100, 46, 114, 101, 102, 101, 114, 101, 110, 99, 101, 16, -120,
>> -115, -16, -64, -22, 42, 26, 57, 25, 48, 72, 112, 114, 111, 103, 114, 97,
>> 109, 115, 46, 115, 116, 111, 114, 101, 46, 118, 109, 119, 97, 1, 7, 72, 99,
>> 101, 110, 116, 101, 114, 46, 109, 97, 112, 112, 101, 114, 46, 72, 111, 115,
>> 116, 77, 5, 11, 8, 34, 20, 34, 9, -122, 56, 52, 58, 49, 49, 50, 54, 49, 50,
>> 53, 52, 57, 50, 34, 66, 20, 9, 21, 56, 49, 52, 52, 58, 50, 48, 54, 53, 52,
>> 52, 54, 48, 56, 48, 72);
>>
>>    val msg = new Message(ByteBuffer.wrap(buf), None, None)
>>
>>    val entry = new MessageAndOffset(msg, 4449011L)
>>
>>    val deepIterator: Iterator[MessageAndOffset] =
>> ByteBufferMessageSet.deepIterator(entry)
>>
>>    deepIterator.next().offset
>>
>> }
>>


Re: Kafka bootup exception while recovering log file

Posted by Gaurav Agarwal <ga...@gmail.com>.
Hi there, just wanted to bump up the thread one more time to check if
someone can point us in the right direction... This one was quite a serious
failure that took down many of our kafka brokers..

On Sat, Aug 27, 2016 at 2:11 PM, Gaurav Agarwal <ga...@gmail.com>
wrote:

> Hi All,
>
> We are facing a weird problem where Kafka broker fails to start due to an
> unhandled exception while 'recovering' a log segment. I have been able to
> isolate the problem to a single record and providing the details below:
>
> During Kafka restart, if index files are corrupted or they don't exist,
> kafka broker is trying to 'recover' a LogSegment and rebuild the indexes -
> LogSegment:recover()
> I the main while loop here which iterates over the entries in the log:
> while(iter.hasNext) { val entry = iter.next....}, I get an entry with
> complete underlying byte buffer as follows:
>
> [82, 30, -91, -37, 0, 2, -1, -1, -1, -1, 0, 0, 0, -59, -126, 83, 78, 65,
> 80, 80, 89, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, -79, -58, 1, 0, 0, 25, 1,
> 16, -68, 48, -78, -101, -61, 5, 15, -16, 74, 20, 49, 48, 48, 48, 48, 58,
> 49, 52, 52, 58, 50, 48, 54, 53, 52, 52, 54, 48, 56, 48, 0, 0, 0, -102, 10,
> 39, 99, 111, 109, 46, 118, 110, 101, 114, 97, 46, 109, 111, 100, 101, 108,
> 46, 105, 110, 118, 101, 110, 116, 111, 114, 121, 46, 97, 100, 100, 46, 114,
> 101, 102, 101, 114, 101, 110, 99, 101, 16, -120, -115, -16, -64, -22, 42,
> 26, 57, 25, 48, 72, 112, 114, 111, 103, 114, 97, 109, 115, 46, 115, 116,
> 111, 114, 101, 46, 118, 109, 119, 97, 1, 7, 72, 99, 101, 110, 116, 101,
> 114, 46, 109, 97, 112, 112, 101, 114, 46, 72, 111, 115, 116, 77, 5, 11, 8,
> 34, 20, 34, 9, -122, 56, 52, 58, 49, 49, 50, 54, 49, 50, 53, 52, 57, 50,
> 34, 66, 20, 9, 21, 56, 49, 52, 52, 58, 50, 48, 54, 53, 52, 52, 54, 48, 56,
> 48, 72]
>
> A toString() on this entry yields:
>
> *MessageAndOffset(Message(magic = 0, attributes = 2, crc = 1377740251, key
> = null, payload = java.nio.HeapByteBuffer[pos=0 lim=197 cap=197]),4449011)*
>
> It appears that this record is corrupt and deserializing/decompressing it
> causes exceptions which are unhandled. Specifically in 0.10.0 version this
> calls fails with NoSuchElementException
>
> ByteBufferMessageSet.deepIterator(entry).next().offset
>
> Note: This message was written to disk using* kafka 0.10.0 broker running
> snappy jar version 1.1.1.7* (which is known to have some read time bugs).
> The log file itself is 512MB large and this message appears at around 4MB
> in the file.
>
> We have upgraded snappy; but should this condition be handled correctly?
> What is the correct behavior here? Should the exception be handled and log
> file be truncated? At the moment this causes kafka to completely crash with
> no recovery path except of deleting the bad data file manually and then
> starting kafka.
>
> --
>
> cheers,
>
> gaurav
>
>
> A test case to repro the crash
>
> @Test
>
> def testCorruptLog() {
>
>  val buf = Array[Byte](82, 30, -91, -37, 0, 2, -1, -1, -1, -1, 0, 0, 0,
> -59, -126, 83, 78, 65, 80, 80, 89, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, -79,
> -58, 1, 0, 0, 25, 1, 16, -68, 48, -78, -101, -61, 5, 15, -16, 74, 20, 49,
> 48, 48, 48, 48, 58, 49, 52, 52, 58, 50, 48, 54, 53, 52, 52, 54, 48, 56, 48,
> 0, 0, 0, -102, 10, 39, 99, 111, 109, 46, 118, 110, 101, 114, 97, 46, 109,
> 111, 100, 101, 108, 46, 105, 110, 118, 101, 110, 116, 111, 114, 121, 46,
> 97, 100, 100, 46, 114, 101, 102, 101, 114, 101, 110, 99, 101, 16, -120,
> -115, -16, -64, -22, 42, 26, 57, 25, 48, 72, 112, 114, 111, 103, 114, 97,
> 109, 115, 46, 115, 116, 111, 114, 101, 46, 118, 109, 119, 97, 1, 7, 72, 99,
> 101, 110, 116, 101, 114, 46, 109, 97, 112, 112, 101, 114, 46, 72, 111, 115,
> 116, 77, 5, 11, 8, 34, 20, 34, 9, -122, 56, 52, 58, 49, 49, 50, 54, 49, 50,
> 53, 52, 57, 50, 34, 66, 20, 9, 21, 56, 49, 52, 52, 58, 50, 48, 54, 53, 52,
> 52, 54, 48, 56, 48, 72);
>
>   val msg = new Message(ByteBuffer.wrap(buf), None, None)
>
>   val entry = new MessageAndOffset(msg, 4449011L)
>
>   val deepIterator: Iterator[MessageAndOffset] =
> ByteBufferMessageSet.deepIterator(entry)
>
>   deepIterator.next().offset
>
> }
>