You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/10/05 19:19:22 UTC

[kafka] branch 1.1 updated: KAFKA-7467; NoSuchElementException is raised because controlBatch is empty (#5727)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new 60e1ce3  KAFKA-7467; NoSuchElementException is raised because controlBatch is empty (#5727)
60e1ce3 is described below

commit 60e1ce3ad74494a9a743bbd2a3664a81f4405767
Author: Bob Barrett <bo...@outlook.com>
AuthorDate: Fri Oct 5 11:58:59 2018 -0700

    KAFKA-7467; NoSuchElementException is raised because controlBatch is empty (#5727)
    
    This patch adds checks before reading the first record of a control batch. If the batch is empty, it is treated as having already been cleaned. In the case of LogCleaner this means it is safe to discard. In the case of ProducerStateManager it means it shouldn't cause state to be stored because the relevant transaction has already been cleaned. In the case of Fetcher, it just preempts the check for an abort. In the case of GroupMetadataManager, it doesn't process the offset as a commit [...]
---
 .../kafka/clients/consumer/internals/Fetcher.java  |  3 +-
 .../clients/consumer/internals/FetcherTest.java    | 46 ++++++++++++++++++
 .../coordinator/group/GroupMetadataManager.scala   | 23 +++++----
 core/src/main/scala/kafka/log/LogCleaner.scala     | 38 ++++++++-------
 .../scala/kafka/log/ProducerStateManager.scala     | 14 ++++--
 .../main/scala/kafka/tools/DumpLogSegments.scala   |  3 +-
 .../group/GroupMetadataManagerTest.scala           | 56 ++++++++++++++++++++++
 .../test/scala/unit/kafka/log/LogCleanerTest.scala | 31 ++++++++++++
 .../unit/kafka/log/ProducerStateManagerTest.scala  | 32 ++++++++++++-
 9 files changed, 212 insertions(+), 34 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index c2e4962..d9bf404 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -1236,8 +1236,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
 
             Iterator<Record> batchIterator = batch.iterator();
             if (!batchIterator.hasNext())
-                throw new InvalidRecordException("Invalid batch for partition " + partition + " at offset " +
-                        batch.baseOffset() + " with control sequence set, but no records");
+                return false;
 
             Record firstRecord = batchIterator.next();
             return ControlRecordType.ABORT == ControlRecordType.parse(firstRecord.key());
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 351049c..2bbdc0b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -2340,6 +2340,52 @@ public class FetcherTest {
         assertEquals(0, future.get());
     }
 
+    @Test
+    public void testEmptyControlBatch() {
+        Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(), new ByteArrayDeserializer(),
+                new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        int currentOffset = 1;
+
+        // Empty control batch should not cause an exception
+        DefaultRecordBatch.writeEmptyHeader(buffer, RecordBatch.MAGIC_VALUE_V2, 1L,
+                (short) 0, -1, 0, 0,
+                RecordBatch.NO_PARTITION_LEADER_EPOCH, TimestampType.CREATE_TIME, time.milliseconds(),
+                true, true);
+
+        currentOffset += appendTransactionalRecords(buffer, 1L, currentOffset,
+                new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes()),
+                new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes()));
+
+        commitTransaction(buffer, 1L, currentOffset);
+        buffer.flip();
+
+        List<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<>();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+        subscriptions.assignFromUser(singleton(tp0));
+
+        subscriptions.seek(tp0, 0);
+
+        // normal fetch
+        assertEquals(1, fetcher.sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+        client.prepareResponse(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(AbstractRequest body) {
+                FetchRequest request = (FetchRequest) body;
+                assertEquals(IsolationLevel.READ_COMMITTED, request.isolationLevel());
+                return true;
+            }
+        }, fullFetchResponseWithAbortedTransactions(records, abortedTransactions, Errors.NONE, 100L, 100L, 0));
+
+        consumerClient.poll(0);
+        assertTrue(fetcher.hasCompletedFetches());
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchedRecords = fetcher.fetchedRecords();
+        assertTrue(fetchedRecords.containsKey(tp0));
+        assertEquals(fetchedRecords.get(tp0).size(), 2);
+    }
+
     private MemoryRecords buildRecords(long baseOffset, int count, long firstMessageId) {
         MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, baseOffset);
         for (int i = 0; i < count; i++)
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 44a9369..d887e8b 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -544,17 +544,20 @@ class GroupMetadataManager(brokerId: Int,
           memRecords.batches.asScala.foreach { batch =>
             val isTxnOffsetCommit = batch.isTransactional
             if (batch.isControlBatch) {
-              val record = batch.iterator.next()
-              val controlRecord = ControlRecordType.parse(record.key)
-              if (controlRecord == ControlRecordType.COMMIT) {
-                pendingOffsets.getOrElse(batch.producerId, mutable.Map[GroupTopicPartition, CommitRecordMetadataAndOffset]())
-                  .foreach {
-                    case (groupTopicPartition, commitRecordMetadataAndOffset) =>
-                      if (!loadedOffsets.contains(groupTopicPartition) || loadedOffsets(groupTopicPartition).olderThan(commitRecordMetadataAndOffset))
-                        loadedOffsets.put(groupTopicPartition, commitRecordMetadataAndOffset)
-                  }
+              val recordIterator = batch.iterator
+              if (recordIterator.hasNext) {
+                val record = recordIterator.next()
+                val controlRecord = ControlRecordType.parse(record.key)
+                if (controlRecord == ControlRecordType.COMMIT) {
+                  pendingOffsets.getOrElse(batch.producerId, mutable.Map[GroupTopicPartition, CommitRecordMetadataAndOffset]())
+                    .foreach {
+                      case (groupTopicPartition, commitRecordMetadataAndOffset) =>
+                        if (!loadedOffsets.contains(groupTopicPartition) || loadedOffsets(groupTopicPartition).olderThan(commitRecordMetadataAndOffset))
+                          loadedOffsets.put(groupTopicPartition, commitRecordMetadataAndOffset)
+                    }
+                }
+                pendingOffsets.remove(batch.producerId)
               }
-              pendingOffsets.remove(batch.producerId)
             } else {
               var batchBaseOffset: Option[Long] = None
               for (record <- batch.asScala) {
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index f5a070d..34f71df 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -943,24 +943,30 @@ private[log] class CleanedTransactionMetadata(val abortedTransactions: mutable.P
   def onControlBatchRead(controlBatch: RecordBatch): Boolean = {
     consumeAbortedTxnsUpTo(controlBatch.lastOffset)
 
-    val controlRecord = controlBatch.iterator.next()
-    val controlType = ControlRecordType.parse(controlRecord.key)
-    val producerId = controlBatch.producerId
-    controlType match {
-      case ControlRecordType.ABORT =>
-        ongoingAbortedTxns.remove(producerId) match {
-          // Retain the marker until all batches from the transaction have been removed
-          case Some(abortedTxnMetadata) if abortedTxnMetadata.lastObservedBatchOffset.isDefined =>
-            transactionIndex.foreach(_.append(abortedTxnMetadata.abortedTxn))
-            false
-          case _ => true
-        }
+    val controlRecordIterator = controlBatch.iterator
+    if (controlRecordIterator.hasNext) {
+      val controlRecord = controlRecordIterator.next()
+      val controlType = ControlRecordType.parse(controlRecord.key)
+      val producerId = controlBatch.producerId
+      controlType match {
+        case ControlRecordType.ABORT =>
+          ongoingAbortedTxns.remove(producerId) match {
+            // Retain the marker until all batches from the transaction have been removed
+            case Some(abortedTxnMetadata) if abortedTxnMetadata.lastObservedBatchOffset.isDefined =>
+              transactionIndex.foreach(_.append(abortedTxnMetadata.abortedTxn))
+              false
+            case _ => true
+          }
 
-      case ControlRecordType.COMMIT =>
-        // This marker is eligible for deletion if we didn't traverse any batches from the transaction
-        !ongoingCommittedTxns.remove(producerId)
+        case ControlRecordType.COMMIT =>
+          // This marker is eligible for deletion if we didn't traverse any batches from the transaction
+          !ongoingCommittedTxns.remove(producerId)
 
-      case _ => false
+        case _ => false
+      }
+    } else {
+      // An empty control batch was already cleaned, so it's safe to discard
+      true
     }
   }
 
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index d2c3b39..164965f 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -250,10 +250,16 @@ private[log] class ProducerAppendInfo(val producerId: Long,
 
   def append(batch: RecordBatch): Option[CompletedTxn] = {
     if (batch.isControlBatch) {
-      val record = batch.iterator.next()
-      val endTxnMarker = EndTransactionMarker.deserialize(record)
-      val completedTxn = appendEndTxnMarker(endTxnMarker, batch.producerEpoch, batch.baseOffset, record.timestamp)
-      Some(completedTxn)
+      val recordIterator = batch.iterator
+      if (recordIterator.hasNext) {
+        val record = recordIterator.next()
+        val endTxnMarker = EndTransactionMarker.deserialize(record)
+        val completedTxn = appendEndTxnMarker(endTxnMarker, batch.producerEpoch, batch.baseOffset, record.timestamp)
+        Some(completedTxn)
+      } else {
+        // An empty control batch means the entire transaction has been cleaned from the log, so no need to append
+        None
+      }
     } else {
       append(batch.producerEpoch, batch.baseSequence, batch.lastSequence, batch.maxTimestamp, batch.lastOffset,
         batch.isTransactional)
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 8b66859..160b764 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -424,7 +424,8 @@ object DumpLogSegments {
           print("baseOffset: " + batch.baseOffset + " lastOffset: " + batch.lastOffset +
             " baseSequence: " + batch.baseSequence + " lastSequence: " + batch.lastSequence +
             " producerId: " + batch.producerId + " producerEpoch: " + batch.producerEpoch +
-            " partitionLeaderEpoch: " + batch.partitionLeaderEpoch + " isTransactional: " + batch.isTransactional)
+            " partitionLeaderEpoch: " + batch.partitionLeaderEpoch + " isTransactional: " + batch.isTransactional +
+            " isControl: " + batch.isControlBatch)
         else
           print("offset: " + batch.lastOffset)
 
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index b358c4e..df03c4d 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -33,6 +33,7 @@ import org.easymock.{Capture, EasyMock, IAnswer}
 import org.junit.Assert.{assertEquals, assertFalse, assertTrue, assertNull}
 import org.junit.{Before, Test}
 import java.nio.ByteBuffer
+import java.util.Collections
 
 import com.yammer.metrics.Metrics
 import com.yammer.metrics.core.Gauge
@@ -1413,6 +1414,61 @@ class GroupMetadataManagerTest {
     EasyMock.verify(replicaManager)
   }
 
+  @Test
+  def testLoadOffsetsWithEmptyControlBatch() {
+    val groupMetadataTopicPartition = groupTopicPartition
+    val startOffset = 15L
+    val generation = 15
+
+    val committedOffsets = Map(
+      new TopicPartition("foo", 0) -> 23L,
+      new TopicPartition("foo", 1) -> 455L,
+      new TopicPartition("bar", 0) -> 8992L
+    )
+
+    val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
+    val groupMetadataRecord = buildEmptyGroupRecord(generation, protocolType)
+    val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
+      offsetCommitRecords ++ Seq(groupMetadataRecord): _*)
+
+    // Prepend empty control batch to valid records
+    val mockBatch = EasyMock.createMock(classOf[MutableRecordBatch])
+    EasyMock.expect(mockBatch.iterator).andReturn(Collections.emptyIterator[Record])
+    EasyMock.expect(mockBatch.isControlBatch).andReturn(true)
+    EasyMock.expect(mockBatch.isTransactional).andReturn(true)
+    EasyMock.expect(mockBatch.nextOffset).andReturn(16L)
+    EasyMock.replay(mockBatch)
+    val mockRecords = EasyMock.createMock(classOf[MemoryRecords])
+    EasyMock.expect(mockRecords.batches).andReturn((Iterable[MutableRecordBatch](mockBatch) ++ records.batches.asScala).asJava).anyTimes()
+    EasyMock.expect(mockRecords.records).andReturn(records.records()).anyTimes()
+    EasyMock.expect(mockRecords.sizeInBytes()).andReturn(DefaultRecordBatch.RECORD_BATCH_OVERHEAD + records.sizeInBytes()).anyTimes()
+    EasyMock.replay(mockRecords)
+
+    val logMock = EasyMock.mock(classOf[Log])
+    EasyMock.expect(logMock.logStartOffset).andReturn(startOffset).anyTimes()
+    EasyMock.expect(logMock.read(EasyMock.eq(startOffset), EasyMock.anyInt(), EasyMock.eq(None),
+      EasyMock.eq(true), EasyMock.eq(IsolationLevel.READ_UNCOMMITTED)))
+      .andReturn(FetchDataInfo(LogOffsetMetadata(startOffset), mockRecords))
+    EasyMock.expect(replicaManager.getLog(groupMetadataTopicPartition)).andStubReturn(Some(logMock))
+    EasyMock.expect(replicaManager.getLogEndOffset(groupMetadataTopicPartition)).andStubReturn(Some(18))
+    EasyMock.replay(logMock)
+    EasyMock.replay(replicaManager)
+
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ())
+
+    // Empty control batch should not have caused the load to fail
+    val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded into the cache"))
+    assertEquals(groupId, group.groupId)
+    assertEquals(Empty, group.currentState)
+    assertEquals(generation, group.generationId)
+    assertEquals(Some(protocolType), group.protocolType)
+    assertNull(group.leaderOrNull)
+    assertNull(group.protocolOrNull)
+    committedOffsets.foreach { case (topicPartition, offset) =>
+      assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
+    }
+  }
+
   private def appendAndCaptureCallback(): Capture[Map[TopicPartition, PartitionResponse] => Unit] = {
     val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
     EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index f6001e9..6e11a1e 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -450,6 +450,37 @@ class LogCleanerTest extends JUnitSuite {
   }
 
   @Test
+  def testCleanEmptyControlBatch(): Unit = {
+    val tp = new TopicPartition("test", 0)
+    val cleaner = makeCleaner(Int.MaxValue)
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 256: java.lang.Integer)
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
+
+    val producerEpoch = 0.toShort
+
+    // [{Producer1: Commit}, {2}, {3}]
+    log.appendAsLeader(commitMarker(1L, producerEpoch), leaderEpoch = 0, isFromClient = false) // offset 7
+    log.appendAsLeader(record(2, 2), leaderEpoch = 0) // offset 2
+    log.appendAsLeader(record(3, 3), leaderEpoch = 0) // offset 3
+    log.roll()
+
+    // first time through the control batch is retained as an empty batch
+    // Expected State: [{Producer1: EmptyBatch}], [{2}, {3}]
+    var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs = Long.MaxValue)._1
+    assertEquals(List(2, 3), keysInLog(log))
+    assertEquals(List(1, 2), offsetsInLog(log))
+    assertEquals(List(0, 1, 2), lastOffsetsPerBatchInLog(log))
+
+    // the empty control batch does not cause an exception when cleaned
+    // Expected State: [{Producer1: EmptyBatch}], [{2}, {3}]
+    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), deleteHorizonMs = Long.MaxValue)._1
+    assertEquals(List(2, 3), keysInLog(log))
+    assertEquals(List(1, 2), offsetsInLog(log))
+    assertEquals(List(0, 1, 2), lastOffsetsPerBatchInLog(log))
+  }
+
+  @Test
   def testAbortMarkerRemoval(): Unit = {
     val tp = new TopicPartition("test", 0)
     val cleaner = makeCleaner(Int.MaxValue)
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index 053aed7..a4ffb46 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -21,14 +21,16 @@ import java.io.File
 import java.nio.ByteBuffer
 import java.nio.channels.FileChannel
 import java.nio.file.StandardOpenOption
+import java.util.Collections
 
 import kafka.server.LogOffsetMetadata
 import kafka.utils.TestUtils
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic
-import org.apache.kafka.common.record.{ControlRecordType, EndTransactionMarker, RecordBatch}
+import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.{MockTime, Utils}
+import org.easymock.EasyMock
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
 import org.scalatest.junit.JUnitSuite
@@ -717,6 +719,22 @@ class ProducerStateManagerTest extends JUnitSuite {
     }
   }
 
+  @Test
+  def testAppendEmptyControlBatch(): Unit = {
+    val producerId = 23423L
+    val producerEpoch = 145.toShort
+    val baseOffset = 15
+
+    val batch = EasyMock.createMock(classOf[RecordBatch])
+    EasyMock.expect(batch.isControlBatch).andReturn(true).once
+    EasyMock.expect(batch.iterator).andReturn(Collections.emptyIterator[Record]).once
+    EasyMock.replay(batch)
+
+    // Appending the empty control batch should not throw and a new transaction shouldn't be started
+    append(stateManager, producerId, producerEpoch, baseOffset, batch, isFromClient = true)
+    assertEquals(None, stateManager.lastEntry(producerId).get.currentTxnFirstOffset)
+  }
+
   private def testLoadFromCorruptSnapshot(makeFileCorrupt: FileChannel => Unit): Unit = {
     val epoch = 0.toShort
     val producerId = 1L
@@ -777,6 +795,18 @@ class ProducerStateManagerTest extends JUnitSuite {
     stateManager.updateMapEndOffset(offset + 1)
   }
 
+  private def append(stateManager: ProducerStateManager,
+                     producerId: Long,
+                     producerEpoch: Short,
+                     offset: Long,
+                     batch: RecordBatch,
+                     isFromClient : Boolean): Unit = {
+    val producerAppendInfo = stateManager.prepareUpdate(producerId, isFromClient)
+    producerAppendInfo.append(batch)
+    stateManager.update(producerAppendInfo)
+    stateManager.updateMapEndOffset(offset + 1)
+  }
+
   private def currentSnapshotOffsets =
     logDir.listFiles.map(Log.offsetFromFile).toSet