You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Bob Barrett (JIRA)" <ji...@apache.org> on 2018/10/02 00:51:00 UTC
[jira] [Assigned] (KAFKA-7467) NoSuchElementException is raised
because controlBatch is empty
[ https://issues.apache.org/jira/browse/KAFKA-7467?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Bob Barrett reassigned KAFKA-7467:
----------------------------------
Assignee: Bob Barrett
> NoSuchElementException is raised because controlBatch is empty
> --------------------------------------------------------------
>
> Key: KAFKA-7467
> URL: https://issues.apache.org/jira/browse/KAFKA-7467
> Project: Kafka
> Issue Type: Bug
> Components: core
> Affects Versions: 1.1.0
> Reporter: Badai Aqrandista
> Assignee: Bob Barrett
> Priority: Minor
>
> Somehow, log cleaner died because of NoSuchElementException when it calls onControlBatchRead:
> {noformat}
> [2018-09-25 14:18:31,088] INFO Cleaner 0: Cleaning segment 0 in log __consumer_offsets-45 (largest timestamp Fri Apr 27 16:12:39 CDT 2018) into 0, discarding deletes. (kafka.log.LogCleaner)
> [2018-09-25 14:18:31,092] ERROR [kafka-log-cleaner-thread-0]: Error due to (kafka.log.LogCleaner)
> java.util.NoSuchElementException
> at java.util.Collections$EmptyIterator.next(Collections.java:4189)
> at kafka.log.CleanedTransactionMetadata.onControlBatchRead(LogCleaner.scala:945)
> at kafka.log.Cleaner.kafka$log$Cleaner$$shouldDiscardBatch(LogCleaner.scala:636)
> at kafka.log.Cleaner$$anon$5.checkBatchRetention(LogCleaner.scala:573)
> at org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:157)
> at org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:138)
> at kafka.log.Cleaner.cleanInto(LogCleaner.scala:604)
> at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:518)
> at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:462)
> at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:461)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at kafka.log.Cleaner.doClean(LogCleaner.scala:461)
> at kafka.log.Cleaner.clean(LogCleaner.scala:438)
> at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305)
> at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> [2018-09-25 14:18:31,093] INFO [kafka-log-cleaner-thread-0]: Stopped (kafka.log.LogCleaner)
> {noformat}
> The following code does not seem to expect the controlBatch to be empty:
> [https://github.com/apache/kafka/blob/1.1/core/src/main/scala/kafka/log/LogCleaner.scala#L946]
> {noformat}
> def onControlBatchRead(controlBatch: RecordBatch): Boolean = {
> consumeAbortedTxnsUpTo(controlBatch.lastOffset)
> val controlRecord = controlBatch.iterator.next()
> val controlType = ControlRecordType.parse(controlRecord.key)
> val producerId = controlBatch.producerId
> {noformat}
> MemoryRecords.filterTo copies the original control attribute for empty batches, which results in empty control batches. Trying to read the control type of an empty batch causes the error.
> {noformat}
> else if (batchRetention == BatchRetention.RETAIN_EMPTY) {
> if (batchMagic < RecordBatch.MAGIC_VALUE_V2)
> throw new IllegalStateException("Empty batches are only supported for magic v2 and above");
> bufferOutputStream.ensureRemaining(DefaultRecordBatch.RECORD_BATCH_OVERHEAD);
> DefaultRecordBatch.writeEmptyHeader(bufferOutputStream.buffer(), batchMagic, batch.producerId(),
> batch.producerEpoch(), batch.baseSequence(), batch.baseOffset(), batch.lastOffset(),
> batch.partitionLeaderEpoch(), batch.timestampType(), batch.maxTimestamp(),
> batch.isTransactional(), batch.isControlBatch());
> filterResult.updateRetainedBatchMetadata(batch, 0, true);
> {noformat}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)