You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/03/01 00:37:39 UTC

[kafka] branch trunk updated: KAFKA-8012; Ensure partitionStates have not been removed before truncating. (#6333)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 70828ce  KAFKA-8012; Ensure partitionStates have not been removed before truncating. (#6333)
70828ce is described below

commit 70828cea49ab8a3ceb54a9017618b84e0d9c1420
Author: Colin Hicks <co...@gmail.com>
AuthorDate: Thu Feb 28 19:37:30 2019 -0500

    KAFKA-8012; Ensure partitionStates have not been removed before truncating. (#6333)
    
    This patch fixes a regression in the replica fetcher which occurs when the replica fetcher manager simultaneously calls `removeFetcherForPartitions`, removing the corresponding partitionStates, while a replica fetcher thread attempts to truncate the same partition(s) in `truncateToHighWatermark`. This causes an NPE which causes the fetcher to crash.
    
    This change simply checks that the `partitionState` is not null first. Note that a similar guard exists in `truncateToEpochEndOffsets`.
    
    Reviewers: Stanislav Kozlovski <st...@outlook.com>, Jason Gustafson <ja...@confluent.io>
---
 .../scala/kafka/server/AbstractFetcherThread.scala | 30 +++++----
 .../kafka/server/AbstractFetcherThreadTest.scala   | 72 ++++++++++++++++++++++
 2 files changed, 89 insertions(+), 13 deletions(-)

diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 959c2bf..3cc6137 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -205,23 +205,27 @@ abstract class AbstractFetcherThread(name: String,
     }
   }
 
-  private def truncateToHighWatermark(partitions: Set[TopicPartition]): Unit = inLock(partitionMapLock) {
+  // Visible for testing
+  private[server] def truncateToHighWatermark(partitions: Set[TopicPartition]): Unit = inLock(partitionMapLock) {
     val fetchOffsets = mutable.HashMap.empty[TopicPartition, OffsetTruncationState]
     val partitionsWithError = mutable.HashSet.empty[TopicPartition]
 
     for (tp <- partitions) {
-      try {
-        val highWatermark = partitionStates.stateValue(tp).fetchOffset
-        val truncationState = OffsetTruncationState(highWatermark, truncationCompleted = true)
-
-        info(s"Truncating partition $tp to local high watermark $highWatermark")
-        truncate(tp, truncationState)
-
-        fetchOffsets.put(tp, truncationState)
-      } catch {
-        case e: KafkaStorageException =>
-          info(s"Failed to truncate $tp", e)
-          partitionsWithError += tp
+      val partitionState = partitionStates.stateValue(tp)
+      if (partitionState != null) {
+        try {
+          val highWatermark = partitionState.fetchOffset
+          val truncationState = OffsetTruncationState(highWatermark, truncationCompleted = true)
+
+          info(s"Truncating partition $tp to local high watermark $highWatermark")
+          truncate(tp, truncationState)
+
+          fetchOffsets.put(tp, truncationState)
+        } catch {
+          case e: KafkaStorageException =>
+            info(s"Failed to truncate $tp", e)
+            partitionsWithError += tp
+        }
       }
     }
 
diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
index 5a12047..1fc079d 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
@@ -41,6 +41,8 @@ import scala.collection.{Map, Set, mutable}
 import scala.util.Random
 import org.scalatest.Assertions.assertThrows
 
+import scala.collection.mutable.ArrayBuffer
+
 class AbstractFetcherThreadTest {
 
   @Before
@@ -347,6 +349,34 @@ class AbstractFetcherThreadTest {
   }
 
   @Test
+  def testTruncateToHighWatermarkDuringRemovePartitions(): Unit = {
+    val highWatermark = 2L
+    val partition = new TopicPartition("topic", 0)
+    val fetcher = new MockFetcherThread {
+      override def truncateToHighWatermark(partitions: Set[TopicPartition]): Unit = {
+        removePartitions(Set(partition))
+        super.truncateToHighWatermark(partitions)
+      }
+
+      override def latestEpoch(topicPartition: TopicPartition): Option[Int] = None
+    }
+
+    val replicaLog = Seq(
+      mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("a".getBytes)),
+      mkBatch(baseOffset = 1, leaderEpoch = 2, new SimpleRecord("b".getBytes)),
+      mkBatch(baseOffset = 2, leaderEpoch = 4, new SimpleRecord("c".getBytes)))
+
+    val replicaState = MockFetcherThread.PartitionState(replicaLog, leaderEpoch = 5, highWatermark)
+    fetcher.setReplicaState(partition, replicaState)
+    fetcher.addPartitions(Map(partition -> offsetAndEpoch(highWatermark, leaderEpoch = 5)))
+
+    fetcher.doWork()
+
+    assertEquals(replicaLog.last.nextOffset(), replicaState.logEndOffset)
+    assertTrue(fetcher.fetchState(partition).isEmpty)
+  }
+
+  @Test
   def testTruncationSkippedIfNoEpochChange(): Unit = {
     val partition = new TopicPartition("topic", 0)
 
@@ -631,6 +661,48 @@ class AbstractFetcherThreadTest {
   }
 
   @Test
+  def testTruncateToEpochEndOffsetsDuringRemovePartitions(): Unit = {
+    val partition = new TopicPartition("topic", 0)
+    val leaderEpochOnLeader = 0
+    val initialLeaderEpochOnFollower = 0
+    val nextLeaderEpochOnFollower = initialLeaderEpochOnFollower + 1
+
+    val fetcher = new MockFetcherThread {
+      override def fetchEpochEndOffsets(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = {
+        val fetchedEpochs = super.fetchEpochEndOffsets(partitions)
+        // leader epoch changes while fetching epochs from leader
+        // at the same time, the replica fetcher manager removes the partition
+        removePartitions(Set(partition))
+        setReplicaState(partition, MockFetcherThread.PartitionState(leaderEpoch = nextLeaderEpochOnFollower))
+        fetchedEpochs
+      }
+    }
+
+    fetcher.setReplicaState(partition, MockFetcherThread.PartitionState(leaderEpoch = initialLeaderEpochOnFollower))
+    fetcher.addPartitions(Map(partition -> offsetAndEpoch(0L, leaderEpoch = initialLeaderEpochOnFollower)))
+
+    val leaderLog = Seq(
+      mkBatch(baseOffset = 0, leaderEpoch = initialLeaderEpochOnFollower, new SimpleRecord("c".getBytes)))
+    val leaderState = MockFetcherThread.PartitionState(leaderLog, leaderEpochOnLeader, highWatermark = 0L)
+    fetcher.setLeaderState(partition, leaderState)
+
+    // first round of work
+    fetcher.doWork()
+
+    // since the partition was removed before the fetched endOffsets were filtered against the leader epoch,
+    // we do not expect the partition to be in Truncating state
+    assertEquals(None, fetcher.fetchState(partition).map(_.state))
+    assertEquals(None, fetcher.fetchState(partition).map(_.currentLeaderEpoch))
+
+    fetcher.setLeaderState(
+      partition, MockFetcherThread.PartitionState(leaderLog, nextLeaderEpochOnFollower, highWatermark = 0L))
+
+    // make sure the fetcher is able to continue work
+    fetcher.doWork()
+    assertEquals(ArrayBuffer.empty, fetcher.replicaPartitionState(partition).log)
+  }
+
+  @Test
   def testTruncationThrowsExceptionIfLeaderReturnsPartitionsNotRequestedInFetchEpochs(): Unit = {
     val partition = new TopicPartition("topic", 0)
     val fetcher = new MockFetcherThread {