You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/10/05 19:07:53 UTC

[kafka] branch 2.0 updated: KAFKA-7467; NoSuchElementException is raised because controlBatch is empty (#5727)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new 431752b  KAFKA-7467; NoSuchElementException is raised because controlBatch is empty (#5727)
431752b is described below

commit 431752bdf7fbee43a44f8564606c7660cf98b801
Author: Bob Barrett <bo...@outlook.com>
AuthorDate: Fri Oct 5 11:58:59 2018 -0700

    KAFKA-7467; NoSuchElementException is raised because controlBatch is empty (#5727)
    
    This patch adds checks before reading the first record of a control batch. If the batch is empty, it is treated as having already been cleaned. In the case of LogCleaner this means it is safe to discard. In the case of ProducerStateManager it means it shouldn't cause state to be stored because the relevant transaction has already been cleaned. In the case of Fetcher, it just preempts the check for an abort. In the case of GroupMetadataManager, it doesn't process the offset as a commit [...]
---
 .../kafka/clients/consumer/internals/Fetcher.java  |  3 +-
 .../clients/consumer/internals/FetcherTest.java    | 46 ++++++++++++++++++
 .../coordinator/group/GroupMetadataManager.scala   | 23 +++++----
 core/src/main/scala/kafka/log/LogCleaner.scala     | 38 ++++++++-------
 .../scala/kafka/log/ProducerStateManager.scala     | 14 ++++--
 .../main/scala/kafka/tools/DumpLogSegments.scala   |  3 +-
 .../group/GroupMetadataManagerTest.scala           | 56 ++++++++++++++++++++++
 .../test/scala/unit/kafka/log/LogCleanerTest.scala | 31 ++++++++++++
 .../unit/kafka/log/ProducerStateManagerTest.scala  | 32 ++++++++++++-
 9 files changed, 212 insertions(+), 34 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index af18d09..cf32873 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -1279,8 +1279,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
 
             Iterator<Record> batchIterator = batch.iterator();
             if (!batchIterator.hasNext())
-                throw new InvalidRecordException("Invalid batch for partition " + partition + " at offset " +
-                        batch.baseOffset() + " with control sequence set, but no records");
+                return false;
 
             Record firstRecord = batchIterator.next();
             return ControlRecordType.ABORT == ControlRecordType.parse(firstRecord.key());
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 7f550d3..f62ee02 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -2538,6 +2538,52 @@ public class FetcherTest {
         assertEquals(0, future.get());
     }
 
+    @Test
+    public void testEmptyControlBatch() {
+        Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(), new ByteArrayDeserializer(),
+                new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        int currentOffset = 1;
+
+        // Empty control batch should not cause an exception
+        DefaultRecordBatch.writeEmptyHeader(buffer, RecordBatch.MAGIC_VALUE_V2, 1L,
+                (short) 0, -1, 0, 0,
+                RecordBatch.NO_PARTITION_LEADER_EPOCH, TimestampType.CREATE_TIME, time.milliseconds(),
+                true, true);
+
+        currentOffset += appendTransactionalRecords(buffer, 1L, currentOffset,
+                new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes()),
+                new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes()));
+
+        commitTransaction(buffer, 1L, currentOffset);
+        buffer.flip();
+
+        List<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<>();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+        subscriptions.assignFromUser(singleton(tp0));
+
+        subscriptions.seek(tp0, 0);
+
+        // normal fetch
+        assertEquals(1, fetcher.sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+        client.prepareResponse(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(AbstractRequest body) {
+                FetchRequest request = (FetchRequest) body;
+                assertEquals(IsolationLevel.READ_COMMITTED, request.isolationLevel());
+                return true;
+            }
+        }, fullFetchResponseWithAbortedTransactions(records, abortedTransactions, Errors.NONE, 100L, 100L, 0));
+
+        consumerClient.poll(0);
+        assertTrue(fetcher.hasCompletedFetches());
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchedRecords = fetcher.fetchedRecords();
+        assertTrue(fetchedRecords.containsKey(tp0));
+        assertEquals(fetchedRecords.get(tp0).size(), 2);
+    }
+
     private MemoryRecords buildRecords(long baseOffset, int count, long firstMessageId) {
         MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, baseOffset);
         for (int i = 0; i < count; i++)
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 4830a35..bc2659e 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -554,17 +554,20 @@ class GroupMetadataManager(brokerId: Int,
           memRecords.batches.asScala.foreach { batch =>
             val isTxnOffsetCommit = batch.isTransactional
             if (batch.isControlBatch) {
-              val record = batch.iterator.next()
-              val controlRecord = ControlRecordType.parse(record.key)
-              if (controlRecord == ControlRecordType.COMMIT) {
-                pendingOffsets.getOrElse(batch.producerId, mutable.Map[GroupTopicPartition, CommitRecordMetadataAndOffset]())
-                  .foreach {
-                    case (groupTopicPartition, commitRecordMetadataAndOffset) =>
-                      if (!loadedOffsets.contains(groupTopicPartition) || loadedOffsets(groupTopicPartition).olderThan(commitRecordMetadataAndOffset))
-                        loadedOffsets.put(groupTopicPartition, commitRecordMetadataAndOffset)
-                  }
+              val recordIterator = batch.iterator
+              if (recordIterator.hasNext) {
+                val record = recordIterator.next()
+                val controlRecord = ControlRecordType.parse(record.key)
+                if (controlRecord == ControlRecordType.COMMIT) {
+                  pendingOffsets.getOrElse(batch.producerId, mutable.Map[GroupTopicPartition, CommitRecordMetadataAndOffset]())
+                    .foreach {
+                      case (groupTopicPartition, commitRecordMetadataAndOffset) =>
+                        if (!loadedOffsets.contains(groupTopicPartition) || loadedOffsets(groupTopicPartition).olderThan(commitRecordMetadataAndOffset))
+                          loadedOffsets.put(groupTopicPartition, commitRecordMetadataAndOffset)
+                    }
+                }
+                pendingOffsets.remove(batch.producerId)
               }
-              pendingOffsets.remove(batch.producerId)
             } else {
               var batchBaseOffset: Option[Long] = None
               for (record <- batch.asScala) {
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 04b284c..03e4788 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -979,24 +979,30 @@ private[log] class CleanedTransactionMetadata(val abortedTransactions: mutable.P
   def onControlBatchRead(controlBatch: RecordBatch): Boolean = {
     consumeAbortedTxnsUpTo(controlBatch.lastOffset)
 
-    val controlRecord = controlBatch.iterator.next()
-    val controlType = ControlRecordType.parse(controlRecord.key)
-    val producerId = controlBatch.producerId
-    controlType match {
-      case ControlRecordType.ABORT =>
-        ongoingAbortedTxns.remove(producerId) match {
-          // Retain the marker until all batches from the transaction have been removed
-          case Some(abortedTxnMetadata) if abortedTxnMetadata.lastObservedBatchOffset.isDefined =>
-            transactionIndex.foreach(_.append(abortedTxnMetadata.abortedTxn))
-            false
-          case _ => true
-        }
+    val controlRecordIterator = controlBatch.iterator
+    if (controlRecordIterator.hasNext) {
+      val controlRecord = controlRecordIterator.next()
+      val controlType = ControlRecordType.parse(controlRecord.key)
+      val producerId = controlBatch.producerId
+      controlType match {
+        case ControlRecordType.ABORT =>
+          ongoingAbortedTxns.remove(producerId) match {
+            // Retain the marker until all batches from the transaction have been removed
+            case Some(abortedTxnMetadata) if abortedTxnMetadata.lastObservedBatchOffset.isDefined =>
+              transactionIndex.foreach(_.append(abortedTxnMetadata.abortedTxn))
+              false
+            case _ => true
+          }
 
-      case ControlRecordType.COMMIT =>
-        // This marker is eligible for deletion if we didn't traverse any batches from the transaction
-        !ongoingCommittedTxns.remove(producerId)
+        case ControlRecordType.COMMIT =>
+          // This marker is eligible for deletion if we didn't traverse any batches from the transaction
+          !ongoingCommittedTxns.remove(producerId)
 
-      case _ => false
+        case _ => false
+      }
+    } else {
+      // An empty control batch was already cleaned, so it's safe to discard
+      true
     }
   }
 
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index bee7abb..9f0dc49 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -253,10 +253,16 @@ private[log] class ProducerAppendInfo(val producerId: Long,
 
   def append(batch: RecordBatch): Option[CompletedTxn] = {
     if (batch.isControlBatch) {
-      val record = batch.iterator.next()
-      val endTxnMarker = EndTransactionMarker.deserialize(record)
-      val completedTxn = appendEndTxnMarker(endTxnMarker, batch.producerEpoch, batch.baseOffset, record.timestamp)
-      Some(completedTxn)
+      val recordIterator = batch.iterator
+      if (recordIterator.hasNext) {
+        val record = recordIterator.next()
+        val endTxnMarker = EndTransactionMarker.deserialize(record)
+        val completedTxn = appendEndTxnMarker(endTxnMarker, batch.producerEpoch, batch.baseOffset, record.timestamp)
+        Some(completedTxn)
+      } else {
+        // An empty control batch means the entire transaction has been cleaned from the log, so no need to append
+        None
+      }
     } else {
       append(batch.producerEpoch, batch.baseSequence, batch.lastSequence, batch.maxTimestamp, batch.lastOffset,
         batch.isTransactional)
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 17fbd8f..0f588e3 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -424,7 +424,8 @@ object DumpLogSegments {
           print("baseOffset: " + batch.baseOffset + " lastOffset: " + batch.lastOffset + " count: " + batch.countOrNull +
             " baseSequence: " + batch.baseSequence + " lastSequence: " + batch.lastSequence +
             " producerId: " + batch.producerId + " producerEpoch: " + batch.producerEpoch +
-            " partitionLeaderEpoch: " + batch.partitionLeaderEpoch + " isTransactional: " + batch.isTransactional)
+            " partitionLeaderEpoch: " + batch.partitionLeaderEpoch + " isTransactional: " + batch.isTransactional +
+            " isControl: " + batch.isControlBatch)
         else
           print("offset: " + batch.lastOffset)
 
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 9c8ef2f..99aa666 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -33,6 +33,7 @@ import org.easymock.{Capture, EasyMock, IAnswer}
 import org.junit.Assert.{assertEquals, assertFalse, assertTrue, assertNull}
 import org.junit.{Before, Test}
 import java.nio.ByteBuffer
+import java.util.Collections
 
 import com.yammer.metrics.Metrics
 import com.yammer.metrics.core.Gauge
@@ -1445,6 +1446,61 @@ class GroupMetadataManagerTest {
     EasyMock.verify(replicaManager)
   }
 
+  @Test
+  def testLoadOffsetsWithEmptyControlBatch() {
+    val groupMetadataTopicPartition = groupTopicPartition
+    val startOffset = 15L
+    val generation = 15
+
+    val committedOffsets = Map(
+      new TopicPartition("foo", 0) -> 23L,
+      new TopicPartition("foo", 1) -> 455L,
+      new TopicPartition("bar", 0) -> 8992L
+    )
+
+    val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
+    val groupMetadataRecord = buildEmptyGroupRecord(generation, protocolType)
+    val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
+      offsetCommitRecords ++ Seq(groupMetadataRecord): _*)
+
+    // Prepend empty control batch to valid records
+    val mockBatch = EasyMock.createMock(classOf[MutableRecordBatch])
+    EasyMock.expect(mockBatch.iterator).andReturn(Collections.emptyIterator[Record])
+    EasyMock.expect(mockBatch.isControlBatch).andReturn(true)
+    EasyMock.expect(mockBatch.isTransactional).andReturn(true)
+    EasyMock.expect(mockBatch.nextOffset).andReturn(16L)
+    EasyMock.replay(mockBatch)
+    val mockRecords = EasyMock.createMock(classOf[MemoryRecords])
+    EasyMock.expect(mockRecords.batches).andReturn((Iterable[MutableRecordBatch](mockBatch) ++ records.batches.asScala).asJava).anyTimes()
+    EasyMock.expect(mockRecords.records).andReturn(records.records()).anyTimes()
+    EasyMock.expect(mockRecords.sizeInBytes()).andReturn(DefaultRecordBatch.RECORD_BATCH_OVERHEAD + records.sizeInBytes()).anyTimes()
+    EasyMock.replay(mockRecords)
+
+    val logMock = EasyMock.mock(classOf[Log])
+    EasyMock.expect(logMock.logStartOffset).andReturn(startOffset).anyTimes()
+    EasyMock.expect(logMock.read(EasyMock.eq(startOffset), EasyMock.anyInt(), EasyMock.eq(None),
+      EasyMock.eq(true), EasyMock.eq(IsolationLevel.READ_UNCOMMITTED)))
+      .andReturn(FetchDataInfo(LogOffsetMetadata(startOffset), mockRecords))
+    EasyMock.expect(replicaManager.getLog(groupMetadataTopicPartition)).andStubReturn(Some(logMock))
+    EasyMock.expect(replicaManager.getLogEndOffset(groupMetadataTopicPartition)).andStubReturn(Some(18))
+    EasyMock.replay(logMock)
+    EasyMock.replay(replicaManager)
+
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ())
+
+    // Empty control batch should not have caused the load to fail
+    val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded into the cache"))
+    assertEquals(groupId, group.groupId)
+    assertEquals(Empty, group.currentState)
+    assertEquals(generation, group.generationId)
+    assertEquals(Some(protocolType), group.protocolType)
+    assertNull(group.leaderOrNull)
+    assertNull(group.protocolOrNull)
+    committedOffsets.foreach { case (topicPartition, offset) =>
+      assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
+    }
+  }
+
   private def appendAndCaptureCallback(): Capture[Map[TopicPartition, PartitionResponse] => Unit] = {
     val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
     EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 73dfa7e..ff5af61 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -466,6 +466,37 @@ class LogCleanerTest extends JUnitSuite {
   }
 
   @Test
+  def testCleanEmptyControlBatch(): Unit = {
+    val tp = new TopicPartition("test", 0)
+    val cleaner = makeCleaner(Int.MaxValue)
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 256: java.lang.Integer)
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
+
+    val producerEpoch = 0.toShort
+
+    // [{Producer1: Commit}, {2}, {3}]
+    log.appendAsLeader(commitMarker(1L, producerEpoch), leaderEpoch = 0, isFromClient = false) // offset 7
+    log.appendAsLeader(record(2, 2), leaderEpoch = 0) // offset 2
+    log.appendAsLeader(record(3, 3), leaderEpoch = 0) // offset 3
+    log.roll()
+
+    // first time through the control batch is retained as an empty batch
+    // Expected State: [{Producer1: EmptyBatch}], [{2}, {3}]
+    var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs = Long.MaxValue)._1
+    assertEquals(List(2, 3), LogTest.keysInLog(log))
+    assertEquals(List(1, 2), offsetsInLog(log))
+    assertEquals(List(0, 1, 2), lastOffsetsPerBatchInLog(log))
+
+    // the empty control batch does not cause an exception when cleaned
+    // Expected State: [{Producer1: EmptyBatch}], [{2}, {3}]
+    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), deleteHorizonMs = Long.MaxValue)._1
+    assertEquals(List(2, 3), LogTest.keysInLog(log))
+    assertEquals(List(1, 2), offsetsInLog(log))
+    assertEquals(List(0, 1, 2), lastOffsetsPerBatchInLog(log))
+  }
+
+  @Test
   def testAbortMarkerRemoval(): Unit = {
     val tp = new TopicPartition("test", 0)
     val cleaner = makeCleaner(Int.MaxValue)
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index f9f4a23..9afb145 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -21,14 +21,16 @@ import java.io.File
 import java.nio.ByteBuffer
 import java.nio.channels.FileChannel
 import java.nio.file.StandardOpenOption
+import java.util.Collections
 
 import kafka.server.LogOffsetMetadata
 import kafka.utils.TestUtils
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic
-import org.apache.kafka.common.record.{ControlRecordType, EndTransactionMarker, RecordBatch}
+import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.{MockTime, Utils}
+import org.easymock.EasyMock
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
 import org.scalatest.junit.JUnitSuite
@@ -746,6 +748,22 @@ class ProducerStateManagerTest extends JUnitSuite {
     }
   }
 
+  @Test
+  def testAppendEmptyControlBatch(): Unit = {
+    val producerId = 23423L
+    val producerEpoch = 145.toShort
+    val baseOffset = 15
+
+    val batch = EasyMock.createMock(classOf[RecordBatch])
+    EasyMock.expect(batch.isControlBatch).andReturn(true).once
+    EasyMock.expect(batch.iterator).andReturn(Collections.emptyIterator[Record]).once
+    EasyMock.replay(batch)
+
+    // Appending the empty control batch should not throw and a new transaction shouldn't be started
+    append(stateManager, producerId, producerEpoch, baseOffset, batch, isFromClient = true)
+    assertEquals(None, stateManager.lastEntry(producerId).get.currentTxnFirstOffset)
+  }
+
   private def testLoadFromCorruptSnapshot(makeFileCorrupt: FileChannel => Unit): Unit = {
     val epoch = 0.toShort
     val producerId = 1L
@@ -806,6 +824,18 @@ class ProducerStateManagerTest extends JUnitSuite {
     stateManager.updateMapEndOffset(offset + 1)
   }
 
+  private def append(stateManager: ProducerStateManager,
+                     producerId: Long,
+                     producerEpoch: Short,
+                     offset: Long,
+                     batch: RecordBatch,
+                     isFromClient : Boolean): Unit = {
+    val producerAppendInfo = stateManager.prepareUpdate(producerId, isFromClient)
+    producerAppendInfo.append(batch)
+    stateManager.update(producerAppendInfo)
+    stateManager.updateMapEndOffset(offset + 1)
+  }
+
   private def currentSnapshotOffsets =
     logDir.listFiles.map(Log.offsetFromFile).toSet