You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/01/07 23:24:31 UTC

[kafka] branch 2.4 updated: KAFKA-9065; Fix endless loop when loading group/transaction metadata (#7554)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new d92b5ee  KAFKA-9065; Fix endless loop when loading group/transaction metadata (#7554)
d92b5ee is described below

commit d92b5ee80f83e962478a7620591ac5c9992c560e
Author: David Jacot <dj...@confluent.io>
AuthorDate: Tue Jan 7 23:03:08 2020 +0100

    KAFKA-9065; Fix endless loop when loading group/transaction metadata (#7554)
    
    The current coordinator loading logic causes an infinite loop when there is a gap between the last record in the log and the log end offset. This is possible because of compaction if the active segment is empty. The patch fixes the problem by breaking from the loading loop when a read from the log returns no additional data.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../coordinator/group/GroupMetadataManager.scala   | 18 ++++++++-----
 .../transaction/TransactionStateManager.scala      |  8 +++++-
 .../group/GroupMetadataManagerTest.scala           | 24 +++++++++++++++++
 .../transaction/TransactionStateManagerTest.scala  | 31 ++++++++++++++++++++++
 4 files changed, 74 insertions(+), 7 deletions(-)

diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index a952164..c25e0cf 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -557,22 +557,28 @@ class GroupMetadataManager(brokerId: Int,
         warn(s"Attempted to load offsets and group metadata from $topicPartition, but found no log")
 
       case Some(log) =>
-        var currOffset = log.logStartOffset
+        val loadedOffsets = mutable.Map[GroupTopicPartition, CommitRecordMetadataAndOffset]()
+        val pendingOffsets = mutable.Map[Long, mutable.Map[GroupTopicPartition, CommitRecordMetadataAndOffset]]()
+        val loadedGroups = mutable.Map[String, GroupMetadata]()
+        val removedGroups = mutable.Set[String]()
 
         // buffer may not be needed if records are read from memory
         var buffer = ByteBuffer.allocate(0)
 
         // loop breaks if leader changes at any time during the load, since logEndOffset is -1
-        val loadedOffsets = mutable.Map[GroupTopicPartition, CommitRecordMetadataAndOffset]()
-        val pendingOffsets = mutable.Map[Long, mutable.Map[GroupTopicPartition, CommitRecordMetadataAndOffset]]()
-        val loadedGroups = mutable.Map[String, GroupMetadata]()
-        val removedGroups = mutable.Set[String]()
+        var currOffset = log.logStartOffset
 
-        while (currOffset < logEndOffset && !shuttingDown.get()) {
+        // loop breaks if no records have been read, since the end of the log has been reached
+        var readAtLeastOneRecord = true
+
+        while (currOffset < logEndOffset && readAtLeastOneRecord && !shuttingDown.get()) {
           val fetchDataInfo = log.read(currOffset,
             maxLength = config.loadBufferSize,
             isolation = FetchLogEnd,
             minOneMessage = true)
+
+          readAtLeastOneRecord = fetchDataInfo.records.sizeInBytes > 0
+
           val memRecords = fetchDataInfo.records match {
             case records: MemoryRecords => records
             case fileRecords: FileRecords =>
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index 79f01e3..2e8bf7f 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -308,14 +308,20 @@ class TransactionStateManager(brokerId: Int,
         // loop breaks if leader changes at any time during the load, since logEndOffset is -1
         var currOffset = log.logStartOffset
 
+        // loop breaks if no records have been read, since the end of the log has been reached
+        var readAtLeastOneRecord = true
+
         try {
-          while (currOffset < logEndOffset && !shuttingDown.get() && inReadLock(stateLock) {
+          while (currOffset < logEndOffset && readAtLeastOneRecord && !shuttingDown.get() && inReadLock(stateLock) {
             loadingPartitions.exists { idAndEpoch: TransactionPartitionAndLeaderEpoch =>
               idAndEpoch.txnPartitionId == topicPartition.partition && idAndEpoch.coordinatorEpoch == coordinatorEpoch}}) {
             val fetchDataInfo = log.read(currOffset,
               maxLength = config.transactionLogLoadBufferSize,
               isolation = FetchLogEnd,
               minOneMessage = true)
+
+            readAtLeastOneRecord = fetchDataInfo.records.sizeInBytes > 0
+
             val memRecords = fetchDataInfo.records match {
               case records: MemoryRecords => records
               case fileRecords: FileRecords =>
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 947e4b3..b0f81c8 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -686,6 +686,30 @@ class GroupMetadataManagerTest {
   }
 
   @Test
+  def testLoadGroupAndOffsetsWithCorruptedLog(): Unit = {
+    // Simulate a case where startOffset < endOffset but log is empty. This could theoretically happen
+    // when all the records are expired and the active segment is truncated or when the partition
+    // is accidentally corrupted.
+    val startOffset = 0L
+    val endOffset = 10L
+
+    val logMock: Log = EasyMock.mock(classOf[Log])
+    EasyMock.expect(replicaManager.getLog(groupTopicPartition)).andStubReturn(Some(logMock))
+    expectGroupMetadataLoad(logMock, startOffset, MemoryRecords.EMPTY)
+    EasyMock.expect(replicaManager.getLogEndOffset(groupTopicPartition)).andStubReturn(Some(endOffset))
+    EasyMock.replay(logMock)
+
+    EasyMock.replay(replicaManager)
+
+    groupMetadataManager.loadGroupsAndOffsets(groupTopicPartition, _ => ())
+
+    EasyMock.verify(logMock)
+    EasyMock.verify(replicaManager)
+
+    assertFalse(groupMetadataManager.isPartitionLoading(groupTopicPartition.partition()))
+  }
+
+  @Test
   def testOffsetWriteAfterGroupRemoved(): Unit = {
     // this test case checks the following scenario:
     // 1. the group exists at some point in time, but is later removed (because all members left)
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 038d4c5..e462c14 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -493,6 +493,37 @@ class TransactionStateManagerTest {
     assertEquals(1, transactionManager.transactionMetadataCache.get(partitionId).get.coordinatorEpoch)
   }
 
+  @Test
+  def testLoadTransactionMetadataWithCorruptedLog(): Unit = {
+    // Simulate a case where startOffset < endOffset but log is empty. This could theoretically happen
+    // when all the records are expired and the active segment is truncated or when the partition
+    // is accidentally corrupted.
+    val startOffset = 0L
+    val endOffset = 10L
+
+    val logMock: Log = EasyMock.mock(classOf[Log])
+    EasyMock.expect(replicaManager.getLog(topicPartition)).andStubReturn(Some(logMock))
+    EasyMock.expect(logMock.logStartOffset).andStubReturn(startOffset)
+    EasyMock.expect(logMock.read(EasyMock.eq(startOffset),
+      maxLength = EasyMock.anyInt(),
+      isolation = EasyMock.eq(FetchLogEnd),
+      minOneMessage = EasyMock.eq(true))
+    ).andReturn(FetchDataInfo(LogOffsetMetadata(startOffset), MemoryRecords.EMPTY))
+    EasyMock.expect(replicaManager.getLogEndOffset(topicPartition)).andStubReturn(Some(endOffset))
+
+    EasyMock.replay(logMock)
+    EasyMock.replay(replicaManager)
+
+    transactionManager.loadTransactionsForTxnTopicPartition(partitionId, coordinatorEpoch = 0, (_, _, _, _, _) => ())
+
+    // let the time advance to trigger the background thread loading
+    scheduler.tick()
+
+    EasyMock.verify(logMock)
+    EasyMock.verify(replicaManager)
+    assertEquals(0, transactionManager.loadingPartitions.size)
+  }
+
   private def verifyMetadataDoesExistAndIsUsable(transactionalId: String): Unit = {
     transactionManager.getTransactionState(transactionalId) match {
       case Left(errors) => fail("shouldn't have been any errors")