You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Shuai Lin <li...@gmail.com> on 2017/04/04 11:29:26 UTC

Re: log cleaner thread crashes with different log.cleaner.io.buffer.size

Filed a JIRA for this. https://issues.apache.org/jira/browse/KAFKA-5010

(also forwarded to the dev list.)

On Tue, Apr 4, 2017 at 7:16 PM, Shuai Lin <li...@gmail.com> wrote:

> After digging a bit into the source code of LogCleaner class, i realized
> maybe i should not try to fix the crash by increasing the
> log.cleaner.io.buffer.size: the cleaner would automatically grow the buffer
> so that it can hold at least one record.
>
> Also for the last error i pasted above, i.e. "largest offset in message
> set can not be safely converted to relative offset", it should be due to
> the fact that i set a too large io buffer size (256MB) which can hold too
> many records, among which the value of (largest offset - base offset of the
> new cleaned segment) is larger than Integer.MAX_VALUE, so it failed the
> assertion
> <https://github.com/apache/kafka/blob/0.10.2.0/core/src/main/scala/kafka/log/LogSegment.scala#L86>
> in LogSegment.append.
>
> From the source code, I see the workflow of cleaning a segment is like
> this:
>
> - read as much records as possible into the readBuffer
> - construct a MemoryRecords object and filter the records using the
> OffsetMap
> - write the filtered records into the writeBuffer
> - repeat, until the whole segment is processed
>
> So the question is clear: Given that the size of readBuffer and
> writeBuffer are exactly the same
> <https://github.com/apache/kafka/blob/0.10.2.0/core/src/main/scala/kafka/log/LogCleaner.scala#L320-L324>
> (half of log.cleaner.io.buffer.size), why would the cleaner throw a
> BufferOverflowException when writing the filtered records into the
> writeBuffer?  That should never happen because the size of the filtered
> records should be no greater that the size of the readBuffer, thus no
> greater than the size of the writeBuffer.
>
> [2017-03-24 10:41:07,372] ERROR [kafka-log-cleaner-thread-0], Error due to
>>  (kafka.log.LogCleaner)
>> java.nio.BufferOverflowException
>>         at java.nio.HeapByteBuffer.put(HeapByteBuffer.java:206)
>>         at org.apache.kafka.common.record.LogEntry.writeTo(LogEntry.jav
>> a:98)
>>         at org.apache.kafka.common.record.MemoryRecords.filterTo(Memory
>> Records.java:158)
>>         at org.apache.kafka.common.record.MemoryRecords.filterTo(Memory
>> Records.java:111)
>>         at kafka.log.Cleaner.cleanInto(LogCleaner.scala:468)
>
>
> Also worth mentioning that we are still using log.message.format.version =
> 0.9.0.0 because there are still some old consumers. Could this be related
> to the problem?
>
>
> On Sun, Apr 2, 2017 at 11:46 PM, Shuai Lin <li...@gmail.com> wrote:
>
>> Hi,
>>
>> Recently we updated from kafka 0.10.0.1 to 0.10.2.1, and found the log
>> cleaner thread crashed with this error:
>>
>> [2017-03-24 10:41:03,926] INFO [kafka-log-cleaner-thread-0], Starting
>>>  (kafka.log.LogCleaner)
>>> [2017-03-24 10:41:04,177] INFO Cleaner 0: Beginning cleaning of log
>>> app-topic-20170317-20. (kafka.log.LogCleaner)
>>> [2017-03-24 10:41:04,177] INFO Cleaner 0: Building offset map for
>>> app-topic-20170317-20... (kafka.log.LogCleaner)
>>> [2017-03-24 10:41:04,387] INFO Cleaner 0: Building offset map for log
>>> app-topic-20170317-20 for 1 segments in offset range [9737795, 9887707).
>>> (kafka.log.LogCleaner)
>>> [2017-03-24 10:41:07,101] INFO Cleaner 0: Offset map for log
>>> app-topic-20170317-20 complete. (kafka.log.LogCleaner)
>>> [2017-03-24 10:41:07,106] INFO Cleaner 0: Cleaning log
>>> app-topic-20170317-20 (cleaning prior to Fri Mar 24 10:36:06 GMT 2017,
>>> discarding tombstones prior to Thu Mar 23 10:18:02 GMT 2017)...
>>> (kafka.log.LogCleaner)
>>> [2017-03-24 10:41:07,110] INFO Cleaner 0: Cleaning segment 0 in log
>>> app-topic-20170317-20 (largest timestamp Fri Mar 24 09:58:25 GMT 2017) into
>>> 0, retaining deletes. (kafka.log.LogCleaner)
>>> [2017-03-24 10:41:07,372] ERROR [kafka-log-cleaner-thread-0], Error due
>>> to  (kafka.log.LogCleaner)
>>> java.nio.BufferOverflowException
>>>         at java.nio.HeapByteBuffer.put(HeapByteBuffer.java:206)
>>>         at org.apache.kafka.common.record.LogEntry.writeTo(LogEntry.jav
>>> a:98)
>>>         at org.apache.kafka.common.record.MemoryRecords.filterTo(Memory
>>> Records.java:158)
>>>         at org.apache.kafka.common.record.MemoryRecords.filterTo(Memory
>>> Records.java:111)
>>>         at kafka.log.Cleaner.cleanInto(LogCleaner.scala:468)
>>>         at kafka.log.Cleaner.$anonfun$cleanSegments$1(LogCleaner.scala:
>>> 405)
>>>         at kafka.log.Cleaner.$anonfun$cleanSegments$1$adapted(LogCleane
>>> r.scala:401)
>>>         at scala.collection.immutable.List.foreach(List.scala:378)
>>>         at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:401)
>>>         at kafka.log.Cleaner.$anonfun$clean$6(LogCleaner.scala:363)
>>>         at kafka.log.Cleaner.$anonfun$clean$6$adapted(LogCleaner.scala:
>>> 362)
>>>         at scala.collection.immutable.List.foreach(List.scala:378)
>>>         at kafka.log.Cleaner.clean(LogCleaner.scala:362)
>>>         at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.s
>>> cala:241)
>>>         at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:2
>>> 20)
>>>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:
>>> 63)
>>> [2017-03-24 10:41:07,375] INFO [kafka-log-cleaner-thread-0], Stopped
>>>  (kafka.log.LogCleaner)
>>
>>
>>
>> For completeness, here are some other related settings (they are the same
>> before & after the upgrade to 0.10.2):
>>
>> - log.cleaner.enable = 'true'
>> - log.cleaner.min.cleanable.ratio = '0.1'
>> - log.cleaner.threads = '1'
>> - log.cleaner.io.buffer.load.factor = '0.98'
>> - log.roll.hours = '24'
>> - log.cleaner.dedupe.buffer.size = 2GB
>> - log.segment.bytes = 512MB
>> - message.max.bytes = 10MB
>>
>>
>> Before the upgrade, we used the default value of
>> *log.cleaner.io.buffer.size*, i.e. 512K. Since the above error said
>> "buffer overflow", I updated it to 2M, but the log cleaner thread crashed
>> immediately (with the same error) when the broker restarted.
>>
>> Then I tried 10M, then 128M, but all with no luck. Finally when I used
>> 256MB, the above error didn't happen anymore.
>>
>> However, after a few days, the log cleaner thread crashed again, with
>> another error:
>>
>> [2017-03-27 12:33:51,323] INFO Cleaner 0: Cleaning segment 8590943933
>>> <0859%20094%203933> in log __consumer_offsets-49 (largest timestamp Mon
>>> Mar 27 12:27:12 GMT 2017) into 6443803053, retaining deletes.
>>> (kafka.log.LogCleaner)
>>> [2017-03-27 12:33:51,377] ERROR [kafka-log-cleaner-thread-0], Error due
>>> to  (kafka.log.LogCleaner)
>>> java.lang.IllegalArgumentException: requirement failed: largest offset
>>> in message set can not be safely converted to relative offset.
>>>         at scala.Predef$.require(Predef.scala:277)
>>>         at kafka.log.LogSegment.append(LogSegment.scala:109)
>>>         at kafka.log.Cleaner.cleanInto(LogCleaner.scala:482)
>>>         at kafka.log.Cleaner.$anonfun$cleanSegments$1(LogCleaner.scala:
>>> 405)
>>>         at kafka.log.Cleaner.$anonfun$cleanSegments$1$adapted(LogCleane
>>> r.scala:401)
>>>         at scala.collection.immutable.List.foreach(List.scala:378)
>>>         at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:401)
>>>         at kafka.log.Cleaner.$anonfun$clean$6(LogCleaner.scala:363)
>>>         at kafka.log.Cleaner.$anonfun$clean$6$adapted(LogCleaner.scala:
>>> 362)
>>>         at scala.collection.immutable.List.foreach(List.scala:378)
>>>         at kafka.log.Cleaner.clean(LogCleaner.scala:362)
>>>         at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.s
>>> cala:241)
>>>         at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:2
>>> 20)
>>>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:
>>> 63)
>>> [2017-03-27 12:33:51,377] INFO [kafka-log-cleaner-thread-0], Stopped
>>>  (kafka.log.LogCleaner)
>>
>>
>> Can someone explain what might have caused the above errors, or suggest a
>> possible fix? Thanks!
>>
>> Regards,
>> Shuai
>>
>
>