You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/10/05 19:00:13 UTC

[kafka] 02/02: KAFKA-7467; NoSuchElementException is raised because controlBatch is empty (#5727)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 9e99b6acf9d1d8b330575af2674083a5f8461ccc
Author: Bob Barrett <bo...@outlook.com>
AuthorDate: Fri Oct 5 11:58:59 2018 -0700

    KAFKA-7467; NoSuchElementException is raised because controlBatch is empty (#5727)
    
    This patch adds checks before reading the first record of a control batch. If the batch is empty, it is treated as having already been cleaned. In the case of LogCleaner this means it is safe to discard. In the case of ProducerStateManager it means it shouldn't cause state to be stored because the relevant transaction has already been cleaned. In the case of Fetcher, it just preempts the check for an abort. In the case of GroupMetadataManager, it doesn't process the offset as a commit [...]
---
 .../kafka/clients/consumer/internals/Fetcher.java  |  3 +-
 .../clients/consumer/internals/FetcherTest.java    | 46 ++++++++++++++++++
 .../coordinator/group/GroupMetadataManager.scala   | 23 +++++----
 core/src/main/scala/kafka/log/LogCleaner.scala     | 38 ++++++++-------
 .../scala/kafka/log/ProducerStateManager.scala     | 14 ++++--
 .../main/scala/kafka/tools/DumpLogSegments.scala   |  3 +-
 .../group/GroupMetadataManagerTest.scala           | 56 ++++++++++++++++++++++
 .../test/scala/unit/kafka/log/LogCleanerTest.scala | 31 ++++++++++++
 .../unit/kafka/log/ProducerStateManagerTest.scala  | 32 ++++++++++++-
 9 files changed, 212 insertions(+), 34 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 2a6ac0c..6b42d07 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -1300,8 +1300,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
 
             Iterator<Record> batchIterator = batch.iterator();
             if (!batchIterator.hasNext())
-                throw new InvalidRecordException("Invalid batch for partition " + partition + " at offset " +
-                        batch.baseOffset() + " with control sequence set, but no records");
+                return false;
 
             Record firstRecord = batchIterator.next();
             return ControlRecordType.ABORT == ControlRecordType.parse(firstRecord.key());
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 75a34cc..42f6beb 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -2639,6 +2639,52 @@ public class FetcherTest {
         assertEquals(0, future.get());
     }
 
+    @Test
+    public void testEmptyControlBatch() {
+        Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(), new ByteArrayDeserializer(),
+                new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        int currentOffset = 1;
+
+        // Empty control batch should not cause an exception
+        DefaultRecordBatch.writeEmptyHeader(buffer, RecordBatch.MAGIC_VALUE_V2, 1L,
+                (short) 0, -1, 0, 0,
+                RecordBatch.NO_PARTITION_LEADER_EPOCH, TimestampType.CREATE_TIME, time.milliseconds(),
+                true, true);
+
+        currentOffset += appendTransactionalRecords(buffer, 1L, currentOffset,
+                new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes()),
+                new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes()));
+
+        commitTransaction(buffer, 1L, currentOffset);
+        buffer.flip();
+
+        List<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<>();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+        subscriptions.assignFromUser(singleton(tp0));
+
+        subscriptions.seek(tp0, 0);
+
+        // normal fetch
+        assertEquals(1, fetcher.sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+        client.prepareResponse(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(AbstractRequest body) {
+                FetchRequest request = (FetchRequest) body;
+                assertEquals(IsolationLevel.READ_COMMITTED, request.isolationLevel());
+                return true;
+            }
+        }, fullFetchResponseWithAbortedTransactions(records, abortedTransactions, Errors.NONE, 100L, 100L, 0));
+
+        consumerClient.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchedRecords = fetcher.fetchedRecords();
+        assertTrue(fetchedRecords.containsKey(tp0));
+        assertEquals(fetchedRecords.get(tp0).size(), 2);
+    }
+
     private MemoryRecords buildRecords(long baseOffset, int count, long firstMessageId) {
         MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, baseOffset);
         for (int i = 0; i < count; i++)
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 21f4a3d..31cd361 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -555,17 +555,20 @@ class GroupMetadataManager(brokerId: Int,
           memRecords.batches.asScala.foreach { batch =>
             val isTxnOffsetCommit = batch.isTransactional
             if (batch.isControlBatch) {
-              val record = batch.iterator.next()
-              val controlRecord = ControlRecordType.parse(record.key)
-              if (controlRecord == ControlRecordType.COMMIT) {
-                pendingOffsets.getOrElse(batch.producerId, mutable.Map[GroupTopicPartition, CommitRecordMetadataAndOffset]())
-                  .foreach {
-                    case (groupTopicPartition, commitRecordMetadataAndOffset) =>
-                      if (!loadedOffsets.contains(groupTopicPartition) || loadedOffsets(groupTopicPartition).olderThan(commitRecordMetadataAndOffset))
-                        loadedOffsets.put(groupTopicPartition, commitRecordMetadataAndOffset)
-                  }
+              val recordIterator = batch.iterator
+              if (recordIterator.hasNext) {
+                val record = recordIterator.next()
+                val controlRecord = ControlRecordType.parse(record.key)
+                if (controlRecord == ControlRecordType.COMMIT) {
+                  pendingOffsets.getOrElse(batch.producerId, mutable.Map[GroupTopicPartition, CommitRecordMetadataAndOffset]())
+                    .foreach {
+                      case (groupTopicPartition, commitRecordMetadataAndOffset) =>
+                        if (!loadedOffsets.contains(groupTopicPartition) || loadedOffsets(groupTopicPartition).olderThan(commitRecordMetadataAndOffset))
+                          loadedOffsets.put(groupTopicPartition, commitRecordMetadataAndOffset)
+                    }
+                }
+                pendingOffsets.remove(batch.producerId)
               }
-              pendingOffsets.remove(batch.producerId)
             } else {
               var batchBaseOffset: Option[Long] = None
               for (record <- batch.asScala) {
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index f1ac1fc..bf4f7e1 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -990,24 +990,30 @@ private[log] class CleanedTransactionMetadata(val abortedTransactions: mutable.P
   def onControlBatchRead(controlBatch: RecordBatch): Boolean = {
     consumeAbortedTxnsUpTo(controlBatch.lastOffset)
 
-    val controlRecord = controlBatch.iterator.next()
-    val controlType = ControlRecordType.parse(controlRecord.key)
-    val producerId = controlBatch.producerId
-    controlType match {
-      case ControlRecordType.ABORT =>
-        ongoingAbortedTxns.remove(producerId) match {
-          // Retain the marker until all batches from the transaction have been removed
-          case Some(abortedTxnMetadata) if abortedTxnMetadata.lastObservedBatchOffset.isDefined =>
-            transactionIndex.foreach(_.append(abortedTxnMetadata.abortedTxn))
-            false
-          case _ => true
-        }
+    val controlRecordIterator = controlBatch.iterator
+    if (controlRecordIterator.hasNext) {
+      val controlRecord = controlRecordIterator.next()
+      val controlType = ControlRecordType.parse(controlRecord.key)
+      val producerId = controlBatch.producerId
+      controlType match {
+        case ControlRecordType.ABORT =>
+          ongoingAbortedTxns.remove(producerId) match {
+            // Retain the marker until all batches from the transaction have been removed
+            case Some(abortedTxnMetadata) if abortedTxnMetadata.lastObservedBatchOffset.isDefined =>
+              transactionIndex.foreach(_.append(abortedTxnMetadata.abortedTxn))
+              false
+            case _ => true
+          }
 
-      case ControlRecordType.COMMIT =>
-        // This marker is eligible for deletion if we didn't traverse any batches from the transaction
-        !ongoingCommittedTxns.remove(producerId)
+        case ControlRecordType.COMMIT =>
+          // This marker is eligible for deletion if we didn't traverse any batches from the transaction
+          !ongoingCommittedTxns.remove(producerId)
 
-      case _ => false
+        case _ => false
+      }
+    } else {
+      // An empty control batch was already cleaned, so it's safe to discard
+      true
     }
   }
 
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index 2f71123..a5c182c 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -254,10 +254,16 @@ private[log] class ProducerAppendInfo(val producerId: Long,
 
   def append(batch: RecordBatch): Option[CompletedTxn] = {
     if (batch.isControlBatch) {
-      val record = batch.iterator.next()
-      val endTxnMarker = EndTransactionMarker.deserialize(record)
-      val completedTxn = appendEndTxnMarker(endTxnMarker, batch.producerEpoch, batch.baseOffset, record.timestamp)
-      Some(completedTxn)
+      val recordIterator = batch.iterator
+      if (recordIterator.hasNext) {
+        val record = recordIterator.next()
+        val endTxnMarker = EndTransactionMarker.deserialize(record)
+        val completedTxn = appendEndTxnMarker(endTxnMarker, batch.producerEpoch, batch.baseOffset, record.timestamp)
+        Some(completedTxn)
+      } else {
+        // An empty control batch means the entire transaction has been cleaned from the log, so no need to append
+        None
+      }
     } else {
       append(batch.producerEpoch, batch.baseSequence, batch.lastSequence, batch.maxTimestamp, batch.lastOffset,
         batch.isTransactional)
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 1792c7b..b5b0e6e 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -424,7 +424,8 @@ object DumpLogSegments {
           print("baseOffset: " + batch.baseOffset + " lastOffset: " + batch.lastOffset + " count: " + batch.countOrNull +
             " baseSequence: " + batch.baseSequence + " lastSequence: " + batch.lastSequence +
             " producerId: " + batch.producerId + " producerEpoch: " + batch.producerEpoch +
-            " partitionLeaderEpoch: " + batch.partitionLeaderEpoch + " isTransactional: " + batch.isTransactional)
+            " partitionLeaderEpoch: " + batch.partitionLeaderEpoch + " isTransactional: " + batch.isTransactional +
+            " isControl: " + batch.isControlBatch)
         else
           print("offset: " + batch.lastOffset)
 
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index b48f297..f648257 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -35,6 +35,7 @@ import org.easymock.{Capture, EasyMock, IAnswer}
 import org.junit.Assert.{assertEquals, assertFalse, assertNull, assertTrue}
 import org.junit.{Before, Test}
 import java.nio.ByteBuffer
+import java.util.Collections
 import java.util.Optional
 
 import com.yammer.metrics.Metrics
@@ -1766,6 +1767,61 @@ class GroupMetadataManagerTest {
       verifySerde(version)
   }
 
+  @Test
+  def testLoadOffsetsWithEmptyControlBatch() {
+    val groupMetadataTopicPartition = groupTopicPartition
+    val startOffset = 15L
+    val generation = 15
+
+    val committedOffsets = Map(
+      new TopicPartition("foo", 0) -> 23L,
+      new TopicPartition("foo", 1) -> 455L,
+      new TopicPartition("bar", 0) -> 8992L
+    )
+
+    val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
+    val groupMetadataRecord = buildEmptyGroupRecord(generation, protocolType)
+    val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
+      offsetCommitRecords ++ Seq(groupMetadataRecord): _*)
+
+    // Prepend empty control batch to valid records
+    val mockBatch = EasyMock.createMock(classOf[MutableRecordBatch])
+    EasyMock.expect(mockBatch.iterator).andReturn(Collections.emptyIterator[Record])
+    EasyMock.expect(mockBatch.isControlBatch).andReturn(true)
+    EasyMock.expect(mockBatch.isTransactional).andReturn(true)
+    EasyMock.expect(mockBatch.nextOffset).andReturn(16L)
+    EasyMock.replay(mockBatch)
+    val mockRecords = EasyMock.createMock(classOf[MemoryRecords])
+    EasyMock.expect(mockRecords.batches).andReturn((Iterable[MutableRecordBatch](mockBatch) ++ records.batches.asScala).asJava).anyTimes()
+    EasyMock.expect(mockRecords.records).andReturn(records.records()).anyTimes()
+    EasyMock.expect(mockRecords.sizeInBytes()).andReturn(DefaultRecordBatch.RECORD_BATCH_OVERHEAD + records.sizeInBytes()).anyTimes()
+    EasyMock.replay(mockRecords)
+
+    val logMock = EasyMock.mock(classOf[Log])
+    EasyMock.expect(logMock.logStartOffset).andReturn(startOffset).anyTimes()
+    EasyMock.expect(logMock.read(EasyMock.eq(startOffset), EasyMock.anyInt(), EasyMock.eq(None),
+      EasyMock.eq(true), EasyMock.eq(IsolationLevel.READ_UNCOMMITTED)))
+      .andReturn(FetchDataInfo(LogOffsetMetadata(startOffset), mockRecords))
+    EasyMock.expect(replicaManager.getLog(groupMetadataTopicPartition)).andStubReturn(Some(logMock))
+    EasyMock.expect(replicaManager.getLogEndOffset(groupMetadataTopicPartition)).andStubReturn(Some(18))
+    EasyMock.replay(logMock)
+    EasyMock.replay(replicaManager)
+
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ())
+
+    // Empty control batch should not have caused the load to fail
+    val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded into the cache"))
+    assertEquals(groupId, group.groupId)
+    assertEquals(Empty, group.currentState)
+    assertEquals(generation, group.generationId)
+    assertEquals(Some(protocolType), group.protocolType)
+    assertNull(group.leaderOrNull)
+    assertNull(group.protocolOrNull)
+    committedOffsets.foreach { case (topicPartition, offset) =>
+      assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
+    }
+  }
+
   private def appendAndCaptureCallback(): Capture[Map[TopicPartition, PartitionResponse] => Unit] = {
     val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
     EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 73dfa7e..ff5af61 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -466,6 +466,37 @@ class LogCleanerTest extends JUnitSuite {
   }
 
   @Test
+  def testCleanEmptyControlBatch(): Unit = {
+    val tp = new TopicPartition("test", 0)
+    val cleaner = makeCleaner(Int.MaxValue)
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 256: java.lang.Integer)
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
+
+    val producerEpoch = 0.toShort
+
+    // [{Producer1: Commit}, {2}, {3}]
+    log.appendAsLeader(commitMarker(1L, producerEpoch), leaderEpoch = 0, isFromClient = false) // offset 7
+    log.appendAsLeader(record(2, 2), leaderEpoch = 0) // offset 2
+    log.appendAsLeader(record(3, 3), leaderEpoch = 0) // offset 3
+    log.roll()
+
+    // first time through the control batch is retained as an empty batch
+    // Expected State: [{Producer1: EmptyBatch}], [{2}, {3}]
+    var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs = Long.MaxValue)._1
+    assertEquals(List(2, 3), LogTest.keysInLog(log))
+    assertEquals(List(1, 2), offsetsInLog(log))
+    assertEquals(List(0, 1, 2), lastOffsetsPerBatchInLog(log))
+
+    // the empty control batch does not cause an exception when cleaned
+    // Expected State: [{Producer1: EmptyBatch}], [{2}, {3}]
+    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), deleteHorizonMs = Long.MaxValue)._1
+    assertEquals(List(2, 3), LogTest.keysInLog(log))
+    assertEquals(List(1, 2), offsetsInLog(log))
+    assertEquals(List(0, 1, 2), lastOffsetsPerBatchInLog(log))
+  }
+
+  @Test
   def testAbortMarkerRemoval(): Unit = {
     val tp = new TopicPartition("test", 0)
     val cleaner = makeCleaner(Int.MaxValue)
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index f9f4a23..9afb145 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -21,14 +21,16 @@ import java.io.File
 import java.nio.ByteBuffer
 import java.nio.channels.FileChannel
 import java.nio.file.StandardOpenOption
+import java.util.Collections
 
 import kafka.server.LogOffsetMetadata
 import kafka.utils.TestUtils
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic
-import org.apache.kafka.common.record.{ControlRecordType, EndTransactionMarker, RecordBatch}
+import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.{MockTime, Utils}
+import org.easymock.EasyMock
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
 import org.scalatest.junit.JUnitSuite
@@ -746,6 +748,22 @@ class ProducerStateManagerTest extends JUnitSuite {
     }
   }
 
+  @Test
+  def testAppendEmptyControlBatch(): Unit = {
+    val producerId = 23423L
+    val producerEpoch = 145.toShort
+    val baseOffset = 15
+
+    val batch = EasyMock.createMock(classOf[RecordBatch])
+    EasyMock.expect(batch.isControlBatch).andReturn(true).once
+    EasyMock.expect(batch.iterator).andReturn(Collections.emptyIterator[Record]).once
+    EasyMock.replay(batch)
+
+    // Appending the empty control batch should not throw and a new transaction shouldn't be started
+    append(stateManager, producerId, producerEpoch, baseOffset, batch, isFromClient = true)
+    assertEquals(None, stateManager.lastEntry(producerId).get.currentTxnFirstOffset)
+  }
+
   private def testLoadFromCorruptSnapshot(makeFileCorrupt: FileChannel => Unit): Unit = {
     val epoch = 0.toShort
     val producerId = 1L
@@ -806,6 +824,18 @@ class ProducerStateManagerTest extends JUnitSuite {
     stateManager.updateMapEndOffset(offset + 1)
   }
 
+  private def append(stateManager: ProducerStateManager,
+                     producerId: Long,
+                     producerEpoch: Short,
+                     offset: Long,
+                     batch: RecordBatch,
+                     isFromClient : Boolean): Unit = {
+    val producerAppendInfo = stateManager.prepareUpdate(producerId, isFromClient)
+    producerAppendInfo.append(batch)
+    stateManager.update(producerAppendInfo)
+    stateManager.updateMapEndOffset(offset + 1)
+  }
+
   private def currentSnapshotOffsets =
     logDir.listFiles.map(Log.offsetFromFile).toSet