You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/03/01 00:37:39 UTC
[kafka] branch trunk updated: KAFKA-8012;
Ensure partitionStates have not been removed before truncating.
(#6333)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 70828ce KAFKA-8012; Ensure partitionStates have not been removed before truncating. (#6333)
70828ce is described below
commit 70828cea49ab8a3ceb54a9017618b84e0d9c1420
Author: Colin Hicks <co...@gmail.com>
AuthorDate: Thu Feb 28 19:37:30 2019 -0500
KAFKA-8012; Ensure partitionStates have not been removed before truncating. (#6333)
This patch fixes a regression in the replica fetcher which occurs when the replica fetcher manager simultaneously calls `removeFetcherForPartitions`, removing the corresponding partitionStates, while a replica fetcher thread attempts to truncate the same partition(s) in `truncateToHighWatermark`. This causes an NPE which causes the fetcher to crash.
This change simply checks that the `partitionState` is not null first. Note that a similar guard exists in `truncateToEpochEndOffsets`.
Reviewers: Stanislav Kozlovski <st...@outlook.com>, Jason Gustafson <ja...@confluent.io>
---
.../scala/kafka/server/AbstractFetcherThread.scala | 30 +++++----
.../kafka/server/AbstractFetcherThreadTest.scala | 72 ++++++++++++++++++++++
2 files changed, 89 insertions(+), 13 deletions(-)
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 959c2bf..3cc6137 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -205,23 +205,27 @@ abstract class AbstractFetcherThread(name: String,
}
}
- private def truncateToHighWatermark(partitions: Set[TopicPartition]): Unit = inLock(partitionMapLock) {
+ // Visible for testing
+ private[server] def truncateToHighWatermark(partitions: Set[TopicPartition]): Unit = inLock(partitionMapLock) {
val fetchOffsets = mutable.HashMap.empty[TopicPartition, OffsetTruncationState]
val partitionsWithError = mutable.HashSet.empty[TopicPartition]
for (tp <- partitions) {
- try {
- val highWatermark = partitionStates.stateValue(tp).fetchOffset
- val truncationState = OffsetTruncationState(highWatermark, truncationCompleted = true)
-
- info(s"Truncating partition $tp to local high watermark $highWatermark")
- truncate(tp, truncationState)
-
- fetchOffsets.put(tp, truncationState)
- } catch {
- case e: KafkaStorageException =>
- info(s"Failed to truncate $tp", e)
- partitionsWithError += tp
+ val partitionState = partitionStates.stateValue(tp)
+ if (partitionState != null) {
+ try {
+ val highWatermark = partitionState.fetchOffset
+ val truncationState = OffsetTruncationState(highWatermark, truncationCompleted = true)
+
+ info(s"Truncating partition $tp to local high watermark $highWatermark")
+ truncate(tp, truncationState)
+
+ fetchOffsets.put(tp, truncationState)
+ } catch {
+ case e: KafkaStorageException =>
+ info(s"Failed to truncate $tp", e)
+ partitionsWithError += tp
+ }
}
}
diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
index 5a12047..1fc079d 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
@@ -41,6 +41,8 @@ import scala.collection.{Map, Set, mutable}
import scala.util.Random
import org.scalatest.Assertions.assertThrows
+import scala.collection.mutable.ArrayBuffer
+
class AbstractFetcherThreadTest {
@Before
@@ -347,6 +349,34 @@ class AbstractFetcherThreadTest {
}
@Test
+ def testTruncateToHighWatermarkDuringRemovePartitions(): Unit = {
+ val highWatermark = 2L
+ val partition = new TopicPartition("topic", 0)
+ val fetcher = new MockFetcherThread {
+ override def truncateToHighWatermark(partitions: Set[TopicPartition]): Unit = {
+ removePartitions(Set(partition))
+ super.truncateToHighWatermark(partitions)
+ }
+
+ override def latestEpoch(topicPartition: TopicPartition): Option[Int] = None
+ }
+
+ val replicaLog = Seq(
+ mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("a".getBytes)),
+ mkBatch(baseOffset = 1, leaderEpoch = 2, new SimpleRecord("b".getBytes)),
+ mkBatch(baseOffset = 2, leaderEpoch = 4, new SimpleRecord("c".getBytes)))
+
+ val replicaState = MockFetcherThread.PartitionState(replicaLog, leaderEpoch = 5, highWatermark)
+ fetcher.setReplicaState(partition, replicaState)
+ fetcher.addPartitions(Map(partition -> offsetAndEpoch(highWatermark, leaderEpoch = 5)))
+
+ fetcher.doWork()
+
+ assertEquals(replicaLog.last.nextOffset(), replicaState.logEndOffset)
+ assertTrue(fetcher.fetchState(partition).isEmpty)
+ }
+
+ @Test
def testTruncationSkippedIfNoEpochChange(): Unit = {
val partition = new TopicPartition("topic", 0)
@@ -631,6 +661,48 @@ class AbstractFetcherThreadTest {
}
@Test
+ def testTruncateToEpochEndOffsetsDuringRemovePartitions(): Unit = {
+ val partition = new TopicPartition("topic", 0)
+ val leaderEpochOnLeader = 0
+ val initialLeaderEpochOnFollower = 0
+ val nextLeaderEpochOnFollower = initialLeaderEpochOnFollower + 1
+
+ val fetcher = new MockFetcherThread {
+ override def fetchEpochEndOffsets(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = {
+ val fetchedEpochs = super.fetchEpochEndOffsets(partitions)
+ // leader epoch changes while fetching epochs from leader
+ // at the same time, the replica fetcher manager removes the partition
+ removePartitions(Set(partition))
+ setReplicaState(partition, MockFetcherThread.PartitionState(leaderEpoch = nextLeaderEpochOnFollower))
+ fetchedEpochs
+ }
+ }
+
+ fetcher.setReplicaState(partition, MockFetcherThread.PartitionState(leaderEpoch = initialLeaderEpochOnFollower))
+ fetcher.addPartitions(Map(partition -> offsetAndEpoch(0L, leaderEpoch = initialLeaderEpochOnFollower)))
+
+ val leaderLog = Seq(
+ mkBatch(baseOffset = 0, leaderEpoch = initialLeaderEpochOnFollower, new SimpleRecord("c".getBytes)))
+ val leaderState = MockFetcherThread.PartitionState(leaderLog, leaderEpochOnLeader, highWatermark = 0L)
+ fetcher.setLeaderState(partition, leaderState)
+
+ // first round of work
+ fetcher.doWork()
+
+ // since the partition was removed before the fetched endOffsets were filtered against the leader epoch,
+ // we do not expect the partition to be in Truncating state
+ assertEquals(None, fetcher.fetchState(partition).map(_.state))
+ assertEquals(None, fetcher.fetchState(partition).map(_.currentLeaderEpoch))
+
+ fetcher.setLeaderState(
+ partition, MockFetcherThread.PartitionState(leaderLog, nextLeaderEpochOnFollower, highWatermark = 0L))
+
+ // make sure the fetcher is able to continue work
+ fetcher.doWork()
+ assertEquals(ArrayBuffer.empty, fetcher.replicaPartitionState(partition).log)
+ }
+
+ @Test
def testTruncationThrowsExceptionIfLeaderReturnsPartitionsNotRequestedInFetchEpochs(): Unit = {
val partition = new TopicPartition("topic", 0)
val fetcher = new MockFetcherThread {