You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Bob Barrett (JIRA)" <ji...@apache.org> on 2018/10/02 00:51:00 UTC

[jira] [Assigned] (KAFKA-7467) NoSuchElementException is raised because controlBatch is empty

     [ https://issues.apache.org/jira/browse/KAFKA-7467?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Bob Barrett reassigned KAFKA-7467:
----------------------------------

    Assignee: Bob Barrett

> NoSuchElementException is raised because controlBatch is empty
> --------------------------------------------------------------
>
>                 Key: KAFKA-7467
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7467
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 1.1.0
>            Reporter: Badai Aqrandista
>            Assignee: Bob Barrett
>            Priority: Minor
>
> Somehow, log cleaner died because of NoSuchElementException when it calls onControlBatchRead:
> {noformat}
> [2018-09-25 14:18:31,088] INFO Cleaner 0: Cleaning segment 0 in log __consumer_offsets-45 (largest timestamp Fri Apr 27 16:12:39 CDT 2018) into 0, discarding deletes. (kafka.log.LogCleaner)
> [2018-09-25 14:18:31,092] ERROR [kafka-log-cleaner-thread-0]: Error due to (kafka.log.LogCleaner)
> java.util.NoSuchElementException
>   at java.util.Collections$EmptyIterator.next(Collections.java:4189)
>   at kafka.log.CleanedTransactionMetadata.onControlBatchRead(LogCleaner.scala:945)
>   at kafka.log.Cleaner.kafka$log$Cleaner$$shouldDiscardBatch(LogCleaner.scala:636)
>   at kafka.log.Cleaner$$anon$5.checkBatchRetention(LogCleaner.scala:573)
>   at org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:157)
>   at org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:138)
>   at kafka.log.Cleaner.cleanInto(LogCleaner.scala:604)
>   at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:518)
>   at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:462)
>   at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:461)
>   at scala.collection.immutable.List.foreach(List.scala:392)
>   at kafka.log.Cleaner.doClean(LogCleaner.scala:461)
>   at kafka.log.Cleaner.clean(LogCleaner.scala:438)
>   at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305)
>   at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> [2018-09-25 14:18:31,093] INFO [kafka-log-cleaner-thread-0]: Stopped (kafka.log.LogCleaner)
> {noformat}
> The following code does not seem to expect the controlBatch to be empty:
> [https://github.com/apache/kafka/blob/1.1/core/src/main/scala/kafka/log/LogCleaner.scala#L946]
> {noformat}
>   def onControlBatchRead(controlBatch: RecordBatch): Boolean = {
>     consumeAbortedTxnsUpTo(controlBatch.lastOffset)
>     val controlRecord = controlBatch.iterator.next()
>     val controlType = ControlRecordType.parse(controlRecord.key)
>     val producerId = controlBatch.producerId
> {noformat}
> MemoryRecords.filterTo copies the original control attribute for empty batches, which results in empty control batches. Trying to read the control type of an empty batch causes the error.
> {noformat}
> else if (batchRetention == BatchRetention.RETAIN_EMPTY) {
>                 if (batchMagic < RecordBatch.MAGIC_VALUE_V2)
>                     throw new IllegalStateException("Empty batches are only supported for magic v2 and above");
>                 bufferOutputStream.ensureRemaining(DefaultRecordBatch.RECORD_BATCH_OVERHEAD);
>                 DefaultRecordBatch.writeEmptyHeader(bufferOutputStream.buffer(), batchMagic, batch.producerId(),
>                         batch.producerEpoch(), batch.baseSequence(), batch.baseOffset(), batch.lastOffset(),
>                         batch.partitionLeaderEpoch(), batch.timestampType(), batch.maxTimestamp(),
>                         batch.isTransactional(), batch.isControlBatch());
>                 filterResult.updateRetainedBatchMetadata(batch, 0, true);
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)