You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Asaf Mesika <as...@gmail.com> on 2019/01/05 08:10:38 UTC

Getting "Illegal batch type" exception on consumers

Hi,

We've recently started encountering the following exceptions, which appears
to happen a lot on the Consumer side - we're using the old consumer (ZK
based) and not the new (Camel based unfortunately).

*The exception*
kafka.common.KafkaException: Error processing data for partition
acmetopic-7 offset 2204558563 at
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:205)
at scala.Option.foreach(Option.scala:257) at
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:169)
at
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:166)
at scala.collection.Iterator.foreach(Iterator.scala:929) at
scala.collection.Iterator.foreach$(Iterator.scala:929) at
scala.collection.AbstractIterator.foreach(Iterator.scala:1417) at
scala.collection.IterableLike.foreach(IterableLike.scala:71) at
scala.collection.IterableLike.foreach$(IterableLike.scala:70) at
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$5(AbstractFetcherThread.scala:166)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:166)
at
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) Caused
by: java.lang.IllegalArgumentException: Illegal batch type class
org.apache.kafka.common.record.DefaultRecordBatch. The older message format
classes only support conversion from class
org.apache.kafka.common.record.AbstractLegacyRecordBatch, which is used for
magic v0 and v1 at
kafka.message.MessageAndOffset$.fromRecordBatch(MessageAndOffset.scala:30)
at
kafka.message.ByteBufferMessageSet.$anonfun$internalIterator$1(ByteBufferMessageSet.scala:169)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:448) at
scala.collection.Iterator.toStream(Iterator.scala:1403) at
scala.collection.Iterator.toStream$(Iterator.scala:1402) at
scala.collection.AbstractIterator.toStream(Iterator.scala:1417) at
scala.collection.TraversableOnce.toSeq(TraversableOnce.scala:298) at
scala.collection.TraversableOnce.toSeq$(TraversableOnce.scala:298) at
scala.collection.AbstractIterator.toSeq(Iterator.scala:1417) at
kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:59) at
kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:87)
at
kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:37)
at
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:183)
... 15 common frames omitted

We're using Kafka v1.1.0 both on server and client.

Unfortunately I'm not up to speed with the exact protocol details between
client and server, but I presume the client tells the server that he's an
old client, and the server "remembers" that for the session created, and
returns Record Batches using magic number v0 or v1.
The exception stack trace shows something odd. It seems that the magic
number sent was v2, thus MemoryRecords class creates an Iterator of
DefaultRecordBatch, but a tidy bit later, it reaches a point where it tries
to convert it to MessageAndOffset, and fails since from some odd reason it
only able to do so for AbstractLegacyRecordBatch.

This is the parts I saw:

*PartitionTopicInfo.scala*

/**
 * Enqueue a message set for processing.
 */
def enqueue(messages: ByteBufferMessageSet) {
  val size = messages.validBytes
  if(size > 0) {
    val next = messages.shallowIterator.toSeq.last.nextOffset

*ByteBufferMessageSet*

/** iterator over compressed messages without decompressing */
def shallowIterator: Iterator[MessageAndOffset] =
internalIterator(isShallow = true)

/** When flag isShallow is set to be true, we do a shallow iteration:
just traverse the first level of messages. **/
private def internalIterator(isShallow: Boolean = false):
Iterator[MessageAndOffset] = {
  if (isShallow)
    asRecords.batches.asScala.iterator.map(MessageAndOffset.fromRecordBatch)

override def asRecords: MemoryRecords =
MemoryRecords.readableRecords(buffer.duplicate())


*MemoryRecords*

public static MemoryRecords readableRecords(ByteBuffer buffer) {
    return new MemoryRecords(buffer);
}

private final Iterable<MutableRecordBatch> batches = new
Iterable<MutableRecordBatch>() {
    @Override
    public Iterator<MutableRecordBatch> iterator() {
        return new RecordBatchIterator<>(new
ByteBufferLogInputStream(buffer.duplicate(), Integer.MAX_VALUE));
    }
};

*ByteBufferLogInputStream*

    public MutableRecordBatch nextBatch() throws IOException {
        int remaining = buffer.remaining();
        if (remaining < LOG_OVERHEAD)
            return null;

        int recordSize = buffer.getInt(buffer.position() + SIZE_OFFSET);
        // V0 has the smallest overhead, stricter checking is done later
        if (recordSize < LegacyRecord.RECORD_OVERHEAD_V0)
            throw new CorruptRecordException(String.format("Record
size is less than the minimum record overhead (%d)",
LegacyRecord.RECORD_OVERHEAD_V0));
        if (recordSize > maxMessageSize)
            throw new CorruptRecordException(String.format("Record
size exceeds the largest allowable message size (%d).",
maxMessageSize));

        int batchSize = recordSize + LOG_OVERHEAD;
        if (remaining < batchSize)
            return null;

        byte magic = buffer.get(buffer.position() + MAGIC_OFFSET);

        ByteBuffer batchSlice = buffer.slice();
        batchSlice.limit(batchSize);
        buffer.position(buffer.position() + batchSize);
\
        if (magic < 0 || magic > RecordBatch.CURRENT_MAGIC_VALUE)
            throw new CorruptRecordException("Invalid magic found in
record: " + magic);

        if (magic > RecordBatch.MAGIC_VALUE_V1)
            return new DefaultRecordBatch(batchSlice);
        else
            return new
AbstractLegacyRecordBatch.ByteBufferLegacyRecordBatch(batchSlice);
    }


So the stream constructs DefaultRecordBatch which later fail since they try
to map it o MessageAndOffset but can't do it for DefaultRecordBatch - can't
figure out why..

To me it seems like a bug. I've posted a JIRA ticket
<https://issues.apache.org/jira/browse/KAFKA-7769>, but not comments since
Dec 26th, so I though I can ping here as well and get some pointers from
the community.

Our current work-around is to restart either the server or client, and it
solves it.

Thanks!

Asaf Mesika

Re: Getting "Illegal batch type" exception on consumers

Posted by Ismael Juma <is...@juma.me.uk>.
Hi,

Best to keep the discussion in the JIRA. I asked a few questions.

Ismael

On Sat, Jan 5, 2019 at 9:38 AM Asaf Mesika <as...@gmail.com> wrote:

> Hi,
>
> We've recently started encountering the following exceptions, which appears
> to happen a lot on the Consumer side - we're using the old consumer (ZK
> based) and not the new (Camel based unfortunately).
>
> *The exception*
> kafka.common.KafkaException: Error processing data for partition
> acmetopic-7 offset 2204558563 at
>
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:205)
> at scala.Option.foreach(Option.scala:257) at
>
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:169)
> at
>
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:166)
> at scala.collection.Iterator.foreach(Iterator.scala:929) at
> scala.collection.Iterator.foreach$(Iterator.scala:929) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1417) at
> scala.collection.IterableLike.foreach(IterableLike.scala:71) at
> scala.collection.IterableLike.foreach$(IterableLike.scala:70) at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
>
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$5(AbstractFetcherThread.scala:166)
> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) at
>
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:166)
> at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) Caused
> by: java.lang.IllegalArgumentException: Illegal batch type class
> org.apache.kafka.common.record.DefaultRecordBatch. The older message format
> classes only support conversion from class
> org.apache.kafka.common.record.AbstractLegacyRecordBatch, which is used for
> magic v0 and v1 at
> kafka.message.MessageAndOffset$.fromRecordBatch(MessageAndOffset.scala:30)
> at
>
> kafka.message.ByteBufferMessageSet.$anonfun$internalIterator$1(ByteBufferMessageSet.scala:169)
> at scala.collection.Iterator$$anon$10.next(Iterator.scala:448) at
> scala.collection.Iterator.toStream(Iterator.scala:1403) at
> scala.collection.Iterator.toStream$(Iterator.scala:1402) at
> scala.collection.AbstractIterator.toStream(Iterator.scala:1417) at
> scala.collection.TraversableOnce.toSeq(TraversableOnce.scala:298) at
> scala.collection.TraversableOnce.toSeq$(TraversableOnce.scala:298) at
> scala.collection.AbstractIterator.toSeq(Iterator.scala:1417) at
> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:59) at
>
> kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:87)
> at
>
> kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:37)
> at
>
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:183)
> ... 15 common frames omitted
>
> We're using Kafka v1.1.0 both on server and client.
>
> Unfortunately I'm not up to speed with the exact protocol details between
> client and server, but I presume the client tells the server that he's an
> old client, and the server "remembers" that for the session created, and
> returns Record Batches using magic number v0 or v1.
> The exception stack trace shows something odd. It seems that the magic
> number sent was v2, thus MemoryRecords class creates an Iterator of
> DefaultRecordBatch, but a tidy bit later, it reaches a point where it tries
> to convert it to MessageAndOffset, and fails since from some odd reason it
> only able to do so for AbstractLegacyRecordBatch.
>
> This is the parts I saw:
>
> *PartitionTopicInfo.scala*
>
> /**
>  * Enqueue a message set for processing.
>  */
> def enqueue(messages: ByteBufferMessageSet) {
>   val size = messages.validBytes
>   if(size > 0) {
>     val next = messages.shallowIterator.toSeq.last.nextOffset
>
> *ByteBufferMessageSet*
>
> /** iterator over compressed messages without decompressing */
> def shallowIterator: Iterator[MessageAndOffset] =
> internalIterator(isShallow = true)
>
> /** When flag isShallow is set to be true, we do a shallow iteration:
> just traverse the first level of messages. **/
> private def internalIterator(isShallow: Boolean = false):
> Iterator[MessageAndOffset] = {
>   if (isShallow)
>
> asRecords.batches.asScala.iterator.map(MessageAndOffset.fromRecordBatch)
>
> override def asRecords: MemoryRecords =
> MemoryRecords.readableRecords(buffer.duplicate())
>
>
> *MemoryRecords*
>
> public static MemoryRecords readableRecords(ByteBuffer buffer) {
>     return new MemoryRecords(buffer);
> }
>
> private final Iterable<MutableRecordBatch> batches = new
> Iterable<MutableRecordBatch>() {
>     @Override
>     public Iterator<MutableRecordBatch> iterator() {
>         return new RecordBatchIterator<>(new
> ByteBufferLogInputStream(buffer.duplicate(), Integer.MAX_VALUE));
>     }
> };
>
> *ByteBufferLogInputStream*
>
>     public MutableRecordBatch nextBatch() throws IOException {
>         int remaining = buffer.remaining();
>         if (remaining < LOG_OVERHEAD)
>             return null;
>
>         int recordSize = buffer.getInt(buffer.position() + SIZE_OFFSET);
>         // V0 has the smallest overhead, stricter checking is done later
>         if (recordSize < LegacyRecord.RECORD_OVERHEAD_V0)
>             throw new CorruptRecordException(String.format("Record
> size is less than the minimum record overhead (%d)",
> LegacyRecord.RECORD_OVERHEAD_V0));
>         if (recordSize > maxMessageSize)
>             throw new CorruptRecordException(String.format("Record
> size exceeds the largest allowable message size (%d).",
> maxMessageSize));
>
>         int batchSize = recordSize + LOG_OVERHEAD;
>         if (remaining < batchSize)
>             return null;
>
>         byte magic = buffer.get(buffer.position() + MAGIC_OFFSET);
>
>         ByteBuffer batchSlice = buffer.slice();
>         batchSlice.limit(batchSize);
>         buffer.position(buffer.position() + batchSize);
> \
>         if (magic < 0 || magic > RecordBatch.CURRENT_MAGIC_VALUE)
>             throw new CorruptRecordException("Invalid magic found in
> record: " + magic);
>
>         if (magic > RecordBatch.MAGIC_VALUE_V1)
>             return new DefaultRecordBatch(batchSlice);
>         else
>             return new
> AbstractLegacyRecordBatch.ByteBufferLegacyRecordBatch(batchSlice);
>     }
>
>
> So the stream constructs DefaultRecordBatch which later fail since they try
> to map it o MessageAndOffset but can't do it for DefaultRecordBatch - can't
> figure out why..
>
> To me it seems like a bug. I've posted a JIRA ticket
> <https://issues.apache.org/jira/browse/KAFKA-7769>, but not comments since
> Dec 26th, so I though I can ping here as well and get some pointers from
> the community.
>
> Our current work-around is to restart either the server or client, and it
> solves it.
>
> Thanks!
>
> Asaf Mesika
>