You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/03/19 23:50:01 UTC
[kafka] branch trunk updated: KAFKA-9654;
Update epoch in `ReplicaAlterLogDirsThread` after new LeaderAndIsr
(#8223)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c27f629 KAFKA-9654; Update epoch in `ReplicaAlterLogDirsThread` after new LeaderAndIsr (#8223)
c27f629 is described below
commit c27f629e953025d726f722c15bd18c78f5f6cb66
Author: Chia-Ping Tsai <ch...@gmail.com>
AuthorDate: Fri Mar 20 07:49:35 2020 +0800
KAFKA-9654; Update epoch in `ReplicaAlterLogDirsThread` after new LeaderAndIsr (#8223)
Currently when there is a leader change with a log dir reassignment in progress, we do not update the leader epoch in the partition state maintained by `ReplicaAlterLogDirsThread`. This can lead to a FENCED_LEADER_EPOCH error, which results in the partition being marked as failed, which is a permanent failure until the broker is restarted. This patch fixes the problem by updating the epoch in `ReplicaAlterLogDirsThread` after receiving a new LeaderAndIsr request from the controller.
Reviewers: Jun Rao <ju...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
.../scala/kafka/server/AbstractFetcherThread.scala | 54 ++++++++++++++-------
.../main/scala/kafka/server/ReplicaManager.scala | 10 ++--
.../admin/ReassignPartitionsClusterTest.scala | 22 +++++++--
.../unit/kafka/server/ReplicaManagerTest.scala | 55 +++++++++++++++++++++-
4 files changed, 113 insertions(+), 28 deletions(-)
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 5f0d25b..be68074 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -218,7 +218,7 @@ abstract class AbstractFetcherThread(name: String,
curPartitionState != null && leaderEpochInRequest == curPartitionState.currentLeaderEpoch
}
- val ResultWithPartitions(fetchOffsets, partitionsWithError) = maybeTruncateToEpochEndOffsets(epochEndOffsets)
+ val ResultWithPartitions(fetchOffsets, partitionsWithError) = maybeTruncateToEpochEndOffsets(epochEndOffsets, latestEpochsForPartitions)
handlePartitionsWithErrors(partitionsWithError, "truncateToEpochEndOffsets")
updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets)
}
@@ -243,7 +243,8 @@ abstract class AbstractFetcherThread(name: String,
updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets)
}
- private def maybeTruncateToEpochEndOffsets(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): ResultWithPartitions[Map[TopicPartition, OffsetTruncationState]] = {
+ private def maybeTruncateToEpochEndOffsets(fetchedEpochs: Map[TopicPartition, EpochEndOffset],
+ latestEpochsForPartitions: Map[TopicPartition, EpochData]): ResultWithPartitions[Map[TopicPartition, OffsetTruncationState]] = {
val fetchOffsets = mutable.HashMap.empty[TopicPartition, OffsetTruncationState]
val partitionsWithError = mutable.HashSet.empty[TopicPartition]
@@ -255,7 +256,11 @@ abstract class AbstractFetcherThread(name: String,
fetchOffsets.put(tp, offsetTruncationState)
case Errors.FENCED_LEADER_EPOCH =>
- onPartitionFenced(tp)
+ if (onPartitionFenced(tp, latestEpochsForPartitions.get(tp).flatMap {
+ p =>
+ if (p.currentLeaderEpoch.isPresent) Some(p.currentLeaderEpoch.get())
+ else None
+ })) partitionsWithError += tp
case error =>
info(s"Retrying leaderEpoch request for partition $tp as the leader reported an error: $error")
@@ -266,12 +271,22 @@ abstract class AbstractFetcherThread(name: String,
ResultWithPartitions(fetchOffsets, partitionsWithError)
}
- private def onPartitionFenced(tp: TopicPartition): Unit = inLock(partitionMapLock) {
- Option(partitionStates.stateValue(tp)).foreach { currentFetchState =>
+ /**
+ * remove the partition if the partition state is NOT updated. Otherwise, keep the partition active.
+ * @return true if the epoch in this thread is updated. otherwise, false
+ */
+ private def onPartitionFenced(tp: TopicPartition, requestEpoch: Option[Int]): Boolean = inLock(partitionMapLock) {
+ Option(partitionStates.stateValue(tp)).exists { currentFetchState =>
val currentLeaderEpoch = currentFetchState.currentLeaderEpoch
- info(s"Partition $tp has an older epoch ($currentLeaderEpoch) than the current leader. Will await " +
- s"the new LeaderAndIsr state before resuming fetching.")
- markPartitionFailed(tp)
+ if (requestEpoch.contains(currentLeaderEpoch)) {
+ info(s"Partition $tp has an older epoch ($currentLeaderEpoch) than the current leader. Will await " +
+ s"the new LeaderAndIsr state before resuming fetching.")
+ markPartitionFailed(tp)
+ false
+ } else {
+ info(s"Partition $tp has an new epoch ($currentLeaderEpoch) than the current leader. retry the partition later")
+ true
+ }
}
}
@@ -308,6 +323,7 @@ abstract class AbstractFetcherThread(name: String,
// the current offset is the same as the offset requested.
val fetchPartitionData = sessionPartitions.get(topicPartition)
if (fetchPartitionData != null && fetchPartitionData.fetchOffset == currentFetchState.fetchOffset && currentFetchState.isReadyForFetch) {
+ val requestEpoch = if (fetchPartitionData.currentLeaderEpoch.isPresent) Some(fetchPartitionData.currentLeaderEpoch.get().toInt) else None
partitionData.error match {
case Errors.NONE =>
try {
@@ -350,7 +366,7 @@ abstract class AbstractFetcherThread(name: String,
markPartitionFailed(topicPartition)
}
case Errors.OFFSET_OUT_OF_RANGE =>
- if (!handleOutOfRangeError(topicPartition, currentFetchState))
+ if (handleOutOfRangeError(topicPartition, currentFetchState, requestEpoch))
partitionsWithError += topicPartition
case Errors.UNKNOWN_LEADER_EPOCH =>
@@ -359,7 +375,7 @@ abstract class AbstractFetcherThread(name: String,
partitionsWithError += topicPartition
case Errors.FENCED_LEADER_EPOCH =>
- onPartitionFenced(topicPartition)
+ if (onPartitionFenced(topicPartition, requestEpoch)) partitionsWithError += topicPartition
case Errors.NOT_LEADER_FOR_PARTITION =>
debug(s"Remote broker is not the leader for partition $topicPartition, which could indicate " +
@@ -518,31 +534,33 @@ abstract class AbstractFetcherThread(name: String,
}
/**
- * Handle the out of range error. Return true if the request succeeded or was fenced, which means we need
- * not backoff and retry. False if there was a retriable error.
+ * Handle the out of range error. Return false if
+ * 1) the request succeeded or
+ * 2) was fenced and this thread haven't received new epoch,
+ * which means we need not backoff and retry. True if there was a retriable error.
*/
private def handleOutOfRangeError(topicPartition: TopicPartition,
- fetchState: PartitionFetchState): Boolean = {
+ fetchState: PartitionFetchState,
+ requestEpoch: Option[Int]): Boolean = {
try {
val newFetchState = fetchOffsetAndTruncate(topicPartition, fetchState.currentLeaderEpoch)
partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)
info(s"Current offset ${fetchState.fetchOffset} for partition $topicPartition is " +
s"out of range, which typically implies a leader change. Reset fetch offset to ${newFetchState.fetchOffset}")
- true
+ false
} catch {
case _: FencedLeaderEpochException =>
- onPartitionFenced(topicPartition)
- true
+ onPartitionFenced(topicPartition, requestEpoch)
case e @ (_ : UnknownTopicOrPartitionException |
_ : UnknownLeaderEpochException |
_ : NotLeaderForPartitionException) =>
info(s"Could not fetch offset for $topicPartition due to error: ${e.getMessage}")
- false
+ true
case e: Throwable =>
error(s"Error getting offset for partition $topicPartition", e)
- false
+ true
}
}
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 47d9683..084ffa5 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1194,7 +1194,7 @@ class ReplicaManager(val config: KafkaConfig,
// First check partition's leader epoch
val partitionStates = new mutable.HashMap[Partition, LeaderAndIsrPartitionState]()
- val newPartitions = new mutable.HashSet[Partition]
+ val updatedPartitions = new mutable.HashSet[Partition]
leaderAndIsrRequest.partitionStates.asScala.foreach { partitionState =>
val topicPartition = new TopicPartition(partitionState.topicName, partitionState.partitionIndex)
@@ -1207,12 +1207,14 @@ class ReplicaManager(val config: KafkaConfig,
responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
None
- case HostedPartition.Online(partition) => Some(partition)
+ case HostedPartition.Online(partition) =>
+ updatedPartitions.add(partition)
+ Some(partition)
case HostedPartition.None =>
val partition = Partition(topicPartition, time, this)
allPartitions.putIfNotExists(topicPartition, HostedPartition.Online(partition))
- newPartitions.add(partition)
+ updatedPartitions.add(partition)
Some(partition)
}
@@ -1294,7 +1296,7 @@ class ReplicaManager(val config: KafkaConfig,
startHighWatermarkCheckPointThread()
val futureReplicasAndInitialOffset = new mutable.HashMap[TopicPartition, InitialFetchState]
- for (partition <- newPartitions) {
+ for (partition <- updatedPartitions) {
val topicPartition = partition.topicPartition
if (logManager.getLog(topicPartition, isFuture = true).isDefined) {
partition.log.foreach { log =>
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
index 1cbf33e..6dccdd0 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
@@ -71,9 +71,9 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
Admin.create(props)
}
- def getRandomLogDirAssignment(brokerId: Int): String = {
+ def getRandomLogDirAssignment(brokerId: Int, excluded: Option[String] = None): String = {
val server = servers.find(_.config.brokerId == brokerId).get
- val logDirs = server.config.logDirs
+ val logDirs = server.config.logDirs.filterNot(excluded.contains)
new File(logDirs(Random.nextInt(logDirs.size))).getAbsolutePath
}
@@ -161,19 +161,31 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
}
@Test
- def shouldMoveSinglePartitionWithinBroker(): Unit = {
+ def shouldMoveSinglePartitionToSameFolderWithinBroker(): Unit = shouldMoveSinglePartitionWithinBroker(true)
+
+ @Test
+ def shouldMoveSinglePartitionToDifferentFolderWithinBroker(): Unit = shouldMoveSinglePartitionWithinBroker(false)
+
+ private[this] def shouldMoveSinglePartitionWithinBroker(moveToSameFolder: Boolean): Unit = {
// Given a single replica on server 100
startBrokers(Seq(100, 101))
adminClient = createAdminClient(servers)
- val expectedLogDir = getRandomLogDirAssignment(100)
createTopic(zkClient, topicName, Map(tp0.partition() -> Seq(100)), servers = servers)
+ val replica = new TopicPartitionReplica(topicName, 0, 100)
+ val currentLogDir = adminClient.describeReplicaLogDirs(java.util.Collections.singleton(replica))
+ .all()
+ .get()
+ .get(replica)
+ .getCurrentReplicaLogDir
+
+ val expectedLogDir = if (moveToSameFolder) currentLogDir else getRandomLogDirAssignment(100, excluded = Some(currentLogDir))
+
// When we execute an assignment that moves an existing replica to another log directory on the same broker
val topicJson = executeAssignmentJson(Seq(
PartitionAssignmentJson(tp0, replicas = Seq(100), logDirectories = Some(Seq(expectedLogDir)))
))
ReassignPartitionsCommand.executeAssignment(zkClient, Some(adminClient), topicJson, NoThrottle)
- val replica = new TopicPartitionReplica(topicName, 0, 100)
waitUntilTrue(() => {
expectedLogDir == adminClient.describeReplicaLogDirs(Collections.singleton(replica)).all().get.get(replica).getCurrentReplicaLogDir
}, "Partition should have been moved to the expected log directory", 1000)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index a7c59ea..e20ed10 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -210,6 +210,58 @@ class ReplicaManagerTest {
}
@Test
+ def testFencedErrorCausedByBecomeLeader(): Unit = {
+ val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer)
+ try {
+ val brokerList = Seq[Integer](0, 1).asJava
+ val topicPartition = new TopicPartition(topic, 0)
+ replicaManager.createPartition(topicPartition)
+ .createLogIfNotExists(0, isNew = false, isFutureReplica = false,
+ new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
+
+ def leaderAndIsrRequest(epoch: Int): LeaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
+ Seq(new LeaderAndIsrPartitionState()
+ .setTopicName(topic)
+ .setPartitionIndex(0)
+ .setControllerEpoch(0)
+ .setLeader(0)
+ .setLeaderEpoch(epoch)
+ .setIsr(brokerList)
+ .setZkVersion(0)
+ .setReplicas(brokerList)
+ .setIsNew(true)).asJava,
+ Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+
+ replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0), (_, _) => ())
+ val partition = replicaManager.getPartitionOrException(new TopicPartition(topic, 0), expectLeader = true)
+ .localLogOrException
+ assertEquals(1, replicaManager.logManager.liveLogDirs.filterNot(_ == partition.dir.getParentFile).size)
+
+ // find the live and different folder
+ val newReplicaFolder = replicaManager.logManager.liveLogDirs.filterNot(_ == partition.dir.getParentFile).head
+ assertEquals(0, replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.size)
+ replicaManager.alterReplicaLogDirs(Map(topicPartition -> newReplicaFolder.getAbsolutePath))
+ replicaManager.futureLocalLogOrException(topicPartition)
+ assertEquals(1, replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.size)
+ // change the epoch from 0 to 1 in order to make fenced error
+ replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(1), (_, _) => ())
+ TestUtils.waitUntilTrue(() => replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.values.forall(_.partitionCount() == 0),
+ s"the partition=$topicPartition should be removed from pending state")
+ // the partition is added to failedPartitions if fenced error happens
+ // if the thread is done before ReplicaManager#becomeLeaderOrFollower updates epoch,the fenced error does
+ // not happen and failedPartitions is empty.
+ if (replicaManager.replicaAlterLogDirsManager.failedPartitions.size != 0) {
+ replicaManager.replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
+ assertEquals(0, replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.size)
+ // send request again
+ replicaManager.alterReplicaLogDirs(Map(topicPartition -> newReplicaFolder.getAbsolutePath))
+ // the future folder exists so it fails to invoke thread
+ assertEquals(1, replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.size)
+ }
+ } finally replicaManager.shutdown(checkpointHW = false)
+ }
+
+ @Test
def testReceiveOutOfOrderSequenceExceptionWithLogStartOffset(): Unit = {
val timer = new MockTimer
val replicaManager = setupReplicaManagerWithMockedPurgatories(timer)
@@ -1279,6 +1331,7 @@ class ReplicaManagerTest {
isFuture = false)).once
}
EasyMock.expect(mockLogMgr.initializingLog(topicPartitionObj)).anyTimes
+ EasyMock.expect(mockLogMgr.getLog(topicPartitionObj, isFuture = true)).andReturn(None)
EasyMock.expect(mockLogMgr.finishedInitializingLog(
EasyMock.eq(topicPartitionObj), EasyMock.anyObject(), EasyMock.anyObject())).anyTimes
@@ -1469,7 +1522,7 @@ class ReplicaManagerTest {
private def setupReplicaManagerWithMockedPurgatories(timer: MockTimer, aliveBrokerIds: Seq[Int] = Seq(0, 1)): ReplicaManager = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
- props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
+ props.put("log.dirs", TestUtils.tempRelativeDir("data").getAbsolutePath + "," + TestUtils.tempRelativeDir("data2").getAbsolutePath)
val config = KafkaConfig.fromProps(props)
val logProps = new Properties()
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)), LogConfig(logProps))