You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/04/10 02:48:06 UTC

[kafka] branch 2.2 updated (bfc7a7f -> 288e46d)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a change to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git.


    from bfc7a7f  KAFKA-9739: 2.3 null child node fix (#8419)
     new 66ae1dd  KAFKA-9654; Update epoch in `ReplicaAlterLogDirsThread` after new LeaderAndIsr  (#8223)
     new 288e46d  KAFKA-9750; Fix race condition with log dir reassign completion (#8412)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../kafka/server/AbstractFetcherManager.scala      |  14 +-
 .../scala/kafka/server/AbstractFetcherThread.scala |  65 +++++--
 .../kafka/server/ReplicaAlterLogDirsManager.scala  |  18 +-
 .../kafka/server/ReplicaAlterLogDirsThread.scala   |  22 ++-
 .../main/scala/kafka/server/ReplicaManager.scala   |  19 +-
 .../admin/ReassignPartitionsClusterTest.scala      |  27 ++-
 .../kafka/server/AbstractFetcherManagerTest.scala  |   1 +
 .../server/ReplicaAlterLogDirsThreadTest.scala     | 212 ++++++++++++++++++++-
 .../unit/kafka/server/ReplicaManagerTest.scala     |  62 +++++-
 9 files changed, 394 insertions(+), 46 deletions(-)


[kafka] 02/02: KAFKA-9750; Fix race condition with log dir reassign completion (#8412)

Posted by jg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 288e46d0b89086a57b284d9fcf0e6e1e5eb2541e
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Fri Apr 3 11:51:04 2020 -0700

    KAFKA-9750; Fix race condition with log dir reassign completion (#8412)
    
    There is a race on receiving a LeaderAndIsr request for a replica with an active log dir reassignment. If the reassignment completes just before the LeaderAndIsr handler updates epoch information, it can lead to an illegal state error since no future log dir exists. This patch fixes the problem by ensuring that the future log dir exists when the fetcher is started. Removal cannot happen concurrently because it requires access the same partition state lock.
    
    Reviewers: Chia-Ping Tsai <ch...@gmail.com>, David Arthur <mu...@gmail.com>
    
    Co-authored-by: Chia-Ping Tsai <ch...@gmail.com>
---
 .../kafka/server/AbstractFetcherManager.scala      |  14 +-
 .../scala/kafka/server/AbstractFetcherThread.scala |   7 +-
 .../kafka/server/ReplicaAlterLogDirsManager.scala  |  18 +-
 .../kafka/server/ReplicaAlterLogDirsThread.scala   |  22 ++-
 .../main/scala/kafka/server/ReplicaManager.scala   |   4 +
 .../kafka/server/AbstractFetcherManagerTest.scala  |   1 +
 .../server/ReplicaAlterLogDirsThreadTest.scala     | 212 ++++++++++++++++++++-
 .../unit/kafka/server/ReplicaManagerTest.scala     |  50 +++--
 8 files changed, 296 insertions(+), 32 deletions(-)

diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index 10ae8df..666305f 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -126,7 +126,8 @@ abstract class AbstractFetcherManager[T <: AbstractFetcherThread](val name: Stri
         BrokerAndFetcherId(brokerAndInitialFetchOffset.leader, getFetcherId(topicPartition))
       }
 
-      def addAndStartFetcherThread(brokerAndFetcherId: BrokerAndFetcherId, brokerIdAndFetcherId: BrokerIdAndFetcherId): AbstractFetcherThread = {
+      def addAndStartFetcherThread(brokerAndFetcherId: BrokerAndFetcherId,
+                                   brokerIdAndFetcherId: BrokerIdAndFetcherId): T = {
         val fetcherThread = createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker)
         fetcherThreadMap.put(brokerIdAndFetcherId, fetcherThread)
         fetcherThread.start()
@@ -150,13 +151,18 @@ abstract class AbstractFetcherManager[T <: AbstractFetcherThread](val name: Stri
           tp -> OffsetAndEpoch(brokerAndInitOffset.initOffset, brokerAndInitOffset.currentLeaderEpoch)
         }
 
-        fetcherThread.addPartitions(initialOffsetAndEpochs)
-        info(s"Added fetcher to broker ${brokerAndFetcherId.broker} for partitions $initialOffsetAndEpochs")
+        addPartitionsToFetcherThread(fetcherThread, initialOffsetAndEpochs)
       }
     }
   }
 
-  def removeFetcherForPartitions(partitions: Set[TopicPartition]) {
+  protected def addPartitionsToFetcherThread(fetcherThread: T,
+                                             initialOffsetAndEpochs: collection.Map[TopicPartition, OffsetAndEpoch]): Unit = {
+    fetcherThread.addPartitions(initialOffsetAndEpochs)
+    info(s"Added fetcher to broker ${fetcherThread.sourceBroker.id} for partitions $initialOffsetAndEpochs")
+  }
+
+  def removeFetcherForPartitions(partitions: Set[TopicPartition]): Unit = {
     lock synchronized {
       for (fetcher <- fetcherThreadMap.values)
         fetcher.removePartitions(partitions)
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 083e46b..c7cfe88 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -60,7 +60,7 @@ abstract class AbstractFetcherThread(name: String,
   type EpochData = OffsetsForLeaderEpochRequest.PartitionData
 
   private val partitionStates = new PartitionStates[PartitionFetchState]
-  private val partitionMapLock = new ReentrantLock
+  protected val partitionMapLock = new ReentrantLock
   private val partitionMapCond = partitionMapLock.newCondition()
 
   private val metricId = ClientIdAndBroker(clientId, sourceBroker.host, sourceBroker.port)
@@ -179,7 +179,7 @@ abstract class AbstractFetcherThread(name: String,
     * - Send OffsetsForLeaderEpochRequest, retrieving the latest offset for each partition's
     *   leader epoch. This is the offset the follower should truncate to ensure
     *   accurate log replication.
-    * - Finally truncate the logs for partitions in the truncating phase and mark them
+    * - Finally truncate the logs for partitions in the truncating phase and mark the
     *   truncation complete. Do this within a lock to ensure no leadership changes can
     *   occur during truncation.
     */
@@ -407,7 +407,7 @@ abstract class AbstractFetcherThread(name: String,
     } finally partitionMapLock.unlock()
   }
 
-  def addPartitions(initialFetchStates: Map[TopicPartition, OffsetAndEpoch]) {
+  def addPartitions(initialFetchStates: Map[TopicPartition, OffsetAndEpoch]): Set[TopicPartition] = {
     partitionMapLock.lockInterruptibly()
     try {
       initialFetchStates.foreach { case (tp, initialFetchState) =>
@@ -426,6 +426,7 @@ abstract class AbstractFetcherThread(name: String,
       }
 
       partitionMapCond.signalAll()
+      initialFetchStates.keySet
     } finally partitionMapLock.unlock()
   }
 
diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsManager.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsManager.scala
index 1616b84..be3d0bd 100644
--- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsManager.scala
@@ -18,6 +18,7 @@
 package kafka.server
 
 import kafka.cluster.BrokerEndPoint
+import org.apache.kafka.common.TopicPartition
 
 class ReplicaAlterLogDirsManager(brokerConfig: KafkaConfig,
                                  replicaManager: ReplicaManager,
@@ -34,7 +35,22 @@ class ReplicaAlterLogDirsManager(brokerConfig: KafkaConfig,
       quotaManager, brokerTopicStats)
   }
 
-  def shutdown() {
+  override protected def addPartitionsToFetcherThread(fetcherThread: ReplicaAlterLogDirsThread,
+                                                      initialOffsetAndEpochs: collection.Map[TopicPartition, OffsetAndEpoch]): Unit = {
+    val addedPartitions = fetcherThread.addPartitions(initialOffsetAndEpochs)
+    val (addedInitialOffsets, notAddedInitialOffsets) = initialOffsetAndEpochs.partition { case (tp, _) =>
+      addedPartitions.contains(tp)
+    }
+
+    if (addedInitialOffsets.nonEmpty)
+      info(s"Added log dir fetcher for partitions with initial offsets $addedInitialOffsets")
+
+    if (notAddedInitialOffsets.nonEmpty)
+      info(s"Failed to add log dir fetch for partitions ${notAddedInitialOffsets.keySet} " +
+        s"since the log dir reassignment has already completed")
+  }
+
+  def shutdown(): Unit = {
     info("shutting down")
     closeAllFetchers()
     info("shutdown completed")
diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
index 381cf3f..ac863f8 100644
--- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
@@ -82,7 +82,7 @@ class ReplicaAlterLogDirsThread(name: String,
       Request.FutureLocalReplicaId,
       request.minBytes,
       request.maxBytes,
-      request.version <= 2,
+      false,
       request.fetchData.asScala.toSeq,
       UnboundedQuota,
       processResponseCallback,
@@ -106,7 +106,11 @@ class ReplicaAlterLogDirsThread(name: String,
       throw new IllegalStateException("Offset mismatch for the future replica %s: fetched offset = %d, log end offset = %d.".format(
         topicPartition, fetchOffset, futureReplica.logEndOffset))
 
-    val logAppendInfo = partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = true)
+    val logAppendInfo = if (records.sizeInBytes() > 0)
+      partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = true)
+    else
+      None
+
     val futureReplicaHighWatermark = futureReplica.logEndOffset.min(partitionData.highWatermark)
     futureReplica.highWatermark = new LogOffsetMetadata(futureReplicaHighWatermark)
     futureReplica.maybeIncrementLogStartOffset(partitionData.logStartOffset)
@@ -118,6 +122,20 @@ class ReplicaAlterLogDirsThread(name: String,
     logAppendInfo
   }
 
+  override def addPartitions(initialFetchStates: Map[TopicPartition, OffsetAndEpoch]): Set[TopicPartition] = {
+    partitionMapLock.lockInterruptibly()
+    try {
+      // It is possible that the log dir fetcher completed just before this call, so we
+      // filter only the partitions which still have a future log dir.
+      val filteredFetchStates = initialFetchStates.filter { case (tp, _) =>
+        replicaMgr.futureLogExists(tp)
+      }
+      super.addPartitions(filteredFetchStates)
+    } finally {
+      partitionMapLock.unlock()
+    }
+  }
+
   override protected def fetchEarliestOffsetFromLeader(topicPartition: TopicPartition, leaderEpoch: Int): Long = {
     val offsetSnapshot = offsetSnapshotFromCurrentReplica(topicPartition, leaderEpoch)
     offsetSnapshot.logStartOffset
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 71b6122..cd19da0 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -463,6 +463,10 @@ class ReplicaManager(val config: KafkaConfig,
     nonOfflinePartition(topicPartition).flatMap(_.localReplica)
   }
 
+  def futureLogExists(topicPartition: TopicPartition): Boolean = {
+    getPartitionOrException(topicPartition, expectLeader = false).futureLocalReplica.isDefined
+  }
+
   def getLogDir(topicPartition: TopicPartition): Option[String] = {
     localReplica(topicPartition).flatMap(_.log).map(_.dir.getParent)
   }
diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
index 0a4d7c1..76880ec 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
@@ -43,6 +43,7 @@ class AbstractFetcherManagerTest {
 
     EasyMock.expect(fetcher.start())
     EasyMock.expect(fetcher.addPartitions(Map(tp -> OffsetAndEpoch(fetchOffset, leaderEpoch))))
+        .andReturn(Set(tp))
     EasyMock.expect(fetcher.fetchState(tp))
       .andReturn(Some(PartitionFetchState(fetchOffset, leaderEpoch, Truncating)))
     EasyMock.expect(fetcher.removePartitions(Set(tp)))
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
index 46ccbab..bb337b4 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
@@ -18,22 +18,29 @@ package kafka.server
 
 import java.util.Optional
 
+import kafka.api.Request
 import kafka.cluster.{BrokerEndPoint, Partition, Replica}
 import kafka.log.LogManager
 import kafka.server.AbstractFetcherThread.ResultWithPartitions
+import kafka.server.QuotaFactory.UnboundedQuota
 import kafka.utils.{DelayedItem, TestUtils}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.KafkaStorageException
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.{EpochEndOffset, OffsetsForLeaderEpochRequest}
+import org.apache.kafka.common.record.MemoryRecords
 import org.apache.kafka.common.requests.EpochEndOffset.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
+import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest, IsolationLevel, OffsetsForLeaderEpochRequest}
 import org.easymock.EasyMock._
 import org.easymock.{Capture, CaptureType, EasyMock, IAnswer}
 import org.junit.Assert._
 import org.junit.Test
+import org.mockito.Mockito.{doNothing, when}
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito}
 
-import scala.collection.JavaConverters._
 import scala.collection.{Map, Seq}
+import scala.collection.JavaConverters._
 
 class ReplicaAlterLogDirsThreadTest {
 
@@ -45,6 +52,199 @@ class ReplicaAlterLogDirsThreadTest {
   }
 
   @Test
+  def shouldNotAddPartitionIfFutureLogIsNotDefined(): Unit = {
+    val brokerId = 1
+    val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(brokerId, "localhost:1234"))
+
+    val replicaManager = Mockito.mock(classOf[ReplicaManager])
+    val quotaManager = Mockito.mock(classOf[ReplicationQuotaManager])
+
+    when(replicaManager.futureLogExists(t1p0)).thenReturn(false)
+
+    val endPoint = new BrokerEndPoint(0, "localhost", 1000)
+    val thread = new ReplicaAlterLogDirsThread(
+      "alter-logs-dirs-thread",
+      sourceBroker = endPoint,
+      brokerConfig = config,
+      replicaMgr = replicaManager,
+      quota = quotaManager,
+      brokerTopicStats = new BrokerTopicStats)
+
+    val addedPartitions = thread.addPartitions(Map(t1p0 -> offsetAndEpoch(0L)))
+    assertEquals(Set.empty, addedPartitions)
+    assertEquals(0, thread.partitionCount())
+    assertEquals(None, thread.fetchState(t1p0))
+  }
+
+  @Test
+  def shouldUpdateLeaderEpochAfterFencedEpochError(): Unit = {
+    val brokerId = 1
+    val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(brokerId, "localhost:1234"))
+
+    val partition = Mockito.mock(classOf[Partition])
+    val replicaManager = Mockito.mock(classOf[ReplicaManager])
+    val quotaManager = Mockito.mock(classOf[ReplicationQuotaManager])
+    val futureReplica = Mockito.mock(classOf[Replica])
+
+    val leaderEpoch = 5
+    val logEndOffset = 0
+
+    when(replicaManager.futureLocalReplicaOrException(t1p0)).thenReturn(futureReplica)
+    when(replicaManager.futureLogExists(t1p0)).thenReturn(true)
+    when(replicaManager.nonOfflinePartition(t1p0)).thenReturn(Some(partition))
+    when(replicaManager.getPartitionOrException(t1p0, expectLeader = false)).thenReturn(partition)
+    when(replicaManager.getPartition(t1p0)).thenReturn(Some(partition))
+
+    when(quotaManager.isQuotaExceeded).thenReturn(false)
+
+    when(partition.lastOffsetForLeaderEpoch(Optional.empty(), leaderEpoch, fetchOnlyFromLeader = false))
+      .thenReturn(new EpochEndOffset(leaderEpoch, logEndOffset))
+    when(partition.futureLocalReplicaOrException).thenReturn(futureReplica)
+    doNothing().when(partition).truncateTo(offset = 0, isFuture = true)
+    when(partition.maybeReplaceCurrentWithFutureReplica()).thenReturn(true)
+
+    when(futureReplica.logStartOffset).thenReturn(0L)
+    when(futureReplica.logEndOffset).thenReturn(0L)
+    when(futureReplica.latestEpoch).thenReturn(None)
+
+    val fencedRequestData = new FetchRequest.PartitionData(0L, 0L,
+      config.replicaFetchMaxBytes, Optional.of(leaderEpoch - 1))
+    val fencedResponseData = FetchPartitionData(
+      error = Errors.FENCED_LEADER_EPOCH,
+      highWatermark = -1,
+      logStartOffset = -1,
+      records = MemoryRecords.EMPTY,
+      lastStableOffset = None,
+      abortedTransactions = None)
+    mockFetchFromCurrentLog(t1p0, fencedRequestData, config, replicaManager, fencedResponseData)
+
+    val endPoint = new BrokerEndPoint(0, "localhost", 1000)
+    val thread = new ReplicaAlterLogDirsThread(
+      "alter-logs-dirs-thread",
+      sourceBroker = endPoint,
+      brokerConfig = config,
+      replicaMgr = replicaManager,
+      quota = quotaManager,
+      brokerTopicStats = new BrokerTopicStats)
+
+    // Initially we add the partition with an older epoch which results in an error
+    thread.addPartitions(Map(t1p0 -> offsetAndEpoch(fetchOffset = 0L, leaderEpoch - 1)))
+    assertTrue(thread.fetchState(t1p0).isDefined)
+    assertEquals(1, thread.partitionCount())
+
+    thread.doWork()
+
+    assertEquals(None, thread.fetchState(t1p0))
+    assertEquals(0, thread.partitionCount())
+
+    // Next we update the epoch and assert that we can continue
+    thread.addPartitions(Map(t1p0 -> offsetAndEpoch(fetchOffset = 0L, leaderEpoch)))
+    assertEquals(Some(leaderEpoch), thread.fetchState(t1p0).map(_.currentLeaderEpoch))
+    assertEquals(1, thread.partitionCount())
+
+    val requestData = new FetchRequest.PartitionData(0L, 0L,
+      config.replicaFetchMaxBytes, Optional.of(leaderEpoch))
+    val responseData = FetchPartitionData(
+      error = Errors.NONE,
+      highWatermark = 0L,
+      logStartOffset = 0L,
+      records = MemoryRecords.EMPTY,
+      lastStableOffset = None,
+      abortedTransactions = None)
+    mockFetchFromCurrentLog(t1p0, requestData, config, replicaManager, responseData)
+
+    thread.doWork()
+
+    assertEquals(None, thread.fetchState(t1p0))
+    assertEquals(0, thread.partitionCount())
+  }
+
+  @Test
+  def shouldReplaceCurrentLogDirWhenCaughtUp(): Unit = {
+    val brokerId = 1
+    val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(brokerId, "localhost:1234"))
+
+    val partition = Mockito.mock(classOf[Partition])
+    val replicaManager = Mockito.mock(classOf[ReplicaManager])
+    val quotaManager = Mockito.mock(classOf[ReplicationQuotaManager])
+    val futureReplica = Mockito.mock(classOf[Replica])
+
+    val leaderEpoch = 5
+    val logEndOffset = 0
+
+    when(replicaManager.futureLocalReplicaOrException(t1p0)).thenReturn(futureReplica)
+    when(replicaManager.futureLogExists(t1p0)).thenReturn(true)
+    when(replicaManager.nonOfflinePartition(t1p0)).thenReturn(Some(partition))
+    when(replicaManager.getPartitionOrException(t1p0, expectLeader = false)).thenReturn(partition)
+    when(replicaManager.getPartition(t1p0)).thenReturn(Some(partition))
+
+    when(quotaManager.isQuotaExceeded).thenReturn(false)
+
+    when(partition.lastOffsetForLeaderEpoch(Optional.empty(), leaderEpoch, fetchOnlyFromLeader = false))
+      .thenReturn(new EpochEndOffset(leaderEpoch, logEndOffset))
+    when(partition.futureLocalReplicaOrException).thenReturn(futureReplica)
+    doNothing().when(partition).truncateTo(offset = 0, isFuture = true)
+    when(partition.maybeReplaceCurrentWithFutureReplica()).thenReturn(true)
+
+    when(futureReplica.logStartOffset).thenReturn(0L)
+    when(futureReplica.logEndOffset).thenReturn(0L)
+    when(futureReplica.latestEpoch).thenReturn(None)
+
+    val requestData = new FetchRequest.PartitionData(0L, 0L,
+      config.replicaFetchMaxBytes, Optional.of(leaderEpoch))
+    val responseData = FetchPartitionData(
+      error = Errors.NONE,
+      highWatermark = 0L,
+      logStartOffset = 0L,
+      records = MemoryRecords.EMPTY,
+      lastStableOffset = None,
+      abortedTransactions = None)
+    mockFetchFromCurrentLog(t1p0, requestData, config, replicaManager, responseData)
+
+    val endPoint = new BrokerEndPoint(0, "localhost", 1000)
+    val thread = new ReplicaAlterLogDirsThread(
+      "alter-logs-dirs-thread",
+      sourceBroker = endPoint,
+      brokerConfig = config,
+      replicaMgr = replicaManager,
+      quota = quotaManager,
+      brokerTopicStats = new BrokerTopicStats)
+
+    thread.addPartitions(Map(t1p0 -> offsetAndEpoch(fetchOffset = 0L, leaderEpoch)))
+    assertTrue(thread.fetchState(t1p0).isDefined)
+    assertEquals(1, thread.partitionCount())
+
+    thread.doWork()
+
+    assertEquals(None, thread.fetchState(t1p0))
+    assertEquals(0, thread.partitionCount())
+  }
+
+  private def mockFetchFromCurrentLog(topicPartition: TopicPartition,
+                                      requestData: FetchRequest.PartitionData,
+                                      config: KafkaConfig,
+                                      replicaManager: ReplicaManager,
+                                      responseData: FetchPartitionData): Unit = {
+    val callbackCaptor: ArgumentCaptor[Seq[(TopicPartition, FetchPartitionData)] => Unit] =
+      ArgumentCaptor.forClass(classOf[Seq[(TopicPartition, FetchPartitionData)] => Unit])
+    when(replicaManager.fetchMessages(
+      timeout = ArgumentMatchers.eq(0L),
+      replicaId = ArgumentMatchers.eq(Request.FutureLocalReplicaId),
+      fetchMinBytes = ArgumentMatchers.eq(0),
+      fetchMaxBytes = ArgumentMatchers.eq(config.replicaFetchResponseMaxBytes),
+      hardMaxBytesLimit = ArgumentMatchers.eq(false),
+      fetchInfos = ArgumentMatchers.eq(Seq(topicPartition -> requestData)),
+      quota = ArgumentMatchers.eq(UnboundedQuota),
+      responseCallback = callbackCaptor.capture(),
+      isolationLevel = ArgumentMatchers.eq(IsolationLevel.READ_UNCOMMITTED)
+    )).thenAnswer(new Answer[Any] {
+      override def answer(invocation: InvocationOnMock): Unit = {
+        callbackCaptor.getValue.apply(Seq((topicPartition, responseData)))
+      }
+    })
+  }
+
+  @Test
   def issuesEpochRequestFromLocalReplica(): Unit = {
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
 
@@ -171,7 +371,9 @@ class ReplicaAlterLogDirsThreadTest {
     expect(replicaManager.getPartitionOrException(t1p1, expectLeader = false))
       .andStubReturn(partitionT1p1)
     expect(replicaManager.futureLocalReplicaOrException(t1p0)).andStubReturn(futureReplicaT1p0)
+    expect(replicaManager.futureLogExists(t1p0)).andStubReturn(true)
     expect(replicaManager.futureLocalReplicaOrException(t1p1)).andStubReturn(futureReplicaT1p1)
+    expect(replicaManager.futureLogExists(t1p1)).andStubReturn(true)
     expect(partitionT1p0.truncateTo(capture(truncateCaptureT1p0), anyBoolean())).anyTimes()
     expect(partitionT1p1.truncateTo(capture(truncateCaptureT1p1), anyBoolean())).anyTimes()
 
@@ -244,6 +446,7 @@ class ReplicaAlterLogDirsThreadTest {
     expect(replicaManager.getPartitionOrException(t1p0, expectLeader = false))
       .andStubReturn(partition)
     expect(replicaManager.futureLocalReplicaOrException(t1p0)).andStubReturn(futureReplica)
+    expect(replicaManager.futureLogExists(t1p0)).andStubReturn(true)
 
     expect(partition.truncateTo(capture(truncateToCapture), EasyMock.eq(true))).anyTimes()
     expect(futureReplica.logEndOffset).andReturn(futureReplicaLEO).anyTimes()
@@ -314,6 +517,7 @@ class ReplicaAlterLogDirsThreadTest {
       .andStubReturn(partition)
     expect(partition.truncateTo(capture(truncated), isFuture = EasyMock.eq(true))).anyTimes()
     expect(replicaManager.futureLocalReplicaOrException(t1p0)).andStubReturn(futureReplica)
+    expect(replicaManager.futureLogExists(t1p0)).andStubReturn(true)
 
     expect(futureReplica.logEndOffset).andReturn(futureReplicaLEO).anyTimes()
     expect(replicaManager.logManager).andReturn(logManager).anyTimes()
@@ -369,6 +573,7 @@ class ReplicaAlterLogDirsThreadTest {
     expect(partition.truncateTo(capture(truncated), isFuture = EasyMock.eq(true))).once()
 
     expect(replicaManager.futureLocalReplicaOrException(t1p0)).andStubReturn(futureReplica)
+    expect(replicaManager.futureLogExists(t1p0)).andStubReturn(true)
     expect(futureReplica.latestEpoch).andStubReturn(Some(futureReplicaLeaderEpoch))
     expect(futureReplica.endOffsetForEpoch(futureReplicaLeaderEpoch)).andReturn(
       Some(OffsetAndEpoch(futureReplicaLEO, futureReplicaLeaderEpoch)))
@@ -452,6 +657,7 @@ class ReplicaAlterLogDirsThreadTest {
     expect(partition.truncateTo(futureReplicaLEO, isFuture = true)).once()
 
     expect(replicaManager.futureLocalReplicaOrException(t1p0)).andStubReturn(futureReplica)
+    expect(replicaManager.futureLogExists(t1p0)).andStubReturn(true)
     expect(futureReplica.latestEpoch).andStubReturn(Some(leaderEpoch))
     expect(futureReplica.logEndOffset).andReturn(futureReplicaLEO).anyTimes()
     expect(futureReplica.endOffsetForEpoch(leaderEpoch)).andReturn(
@@ -604,11 +810,13 @@ class ReplicaAlterLogDirsThreadTest {
     expect(replicaManager.futureLocalReplica(t1p0)).andReturn(Some(futureReplica)).anyTimes()
     expect(replicaManager.localReplicaOrException(t1p0)).andReturn(replicaT1p0).anyTimes()
     expect(replicaManager.futureLocalReplicaOrException(t1p0)).andReturn(futureReplica).anyTimes()
+    expect(replicaManager.futureLogExists(t1p0)).andStubReturn(true)
     expect(replicaManager.getPartition(t1p0)).andReturn(Some(partition)).anyTimes()
     expect(replicaManager.localReplica(t1p1)).andReturn(Some(replicaT1p1)).anyTimes()
     expect(replicaManager.futureLocalReplica(t1p1)).andReturn(Some(futureReplica)).anyTimes()
     expect(replicaManager.localReplicaOrException(t1p1)).andReturn(replicaT1p1).anyTimes()
     expect(replicaManager.futureLocalReplicaOrException(t1p1)).andReturn(futureReplica).anyTimes()
+    expect(replicaManager.futureLogExists(t1p1)).andStubReturn(true)
     expect(replicaManager.getPartition(t1p1)).andReturn(Some(partition)).anyTimes()
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 4786745..6f73690 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -196,6 +196,12 @@ class ReplicaManagerTest {
 
   @Test
   def testFencedErrorCausedByBecomeLeader(): Unit = {
+    testFencedErrorCausedByBecomeLeader(0)
+    testFencedErrorCausedByBecomeLeader(1)
+    testFencedErrorCausedByBecomeLeader(10)
+  }
+
+  private[this] def testFencedErrorCausedByBecomeLeader(loopEpochChange: Int): Unit = {
     val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer)
     try {
       val brokerList = Seq[Integer](0, 1).asJava
@@ -203,6 +209,10 @@ class ReplicaManagerTest {
       replicaManager.getOrCreatePartition(topicPartition)
           .getOrCreateReplica(0, isNew = false)
 
+      val replica = replicaManager.localReplicaOrException(topicPartition)
+      assertFalse(replicaManager.futureLogExists(topicPartition))
+      assertEquals(None, replicaManager.futureLocalReplica(topicPartition))
+
       def leaderAndIsrRequest(epoch: Int): LeaderAndIsrRequest = new LeaderAndIsrRequest.Builder(
         ApiKeys.LEADER_AND_ISR.latestVersion,
         0,
@@ -214,32 +224,32 @@ class ReplicaManagerTest {
       ).build()
 
       replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0), (_, _) => ())
-      val partition = replicaManager.getPartitionOrException(new TopicPartition(topic, 0), expectLeader = true)
-        .localReplica.flatMap(_.log).get
-      assertEquals(1, replicaManager.logManager.liveLogDirs.filterNot(_ == partition.dir.getParentFile).size)
+      val previousReplicaFolder = replica.log.get.dir.getParentFile
 
       // find the live and different folder
-      val newReplicaFolder = replicaManager.logManager.liveLogDirs.filterNot(_ == partition.dir.getParentFile).head
+      assertEquals(1, replicaManager.logManager.liveLogDirs.filterNot(_ == previousReplicaFolder).size)
+      val newReplicaFolder = replicaManager.logManager.liveLogDirs.filterNot(_ == previousReplicaFolder).head
       assertEquals(0, replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.size)
       replicaManager.alterReplicaLogDirs(Map(topicPartition -> newReplicaFolder.getAbsolutePath))
-      assertTrue(replicaManager.futureLocalReplica(topicPartition).flatMap(_.log).isDefined)
-
+      // make sure the future log is created
+      val futureReplica = replicaManager.futureLocalReplicaOrException(topicPartition)
       assertEquals(1, replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.size)
-      // change the epoch from 0 to 1 in order to make fenced error
-      replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(1), (_, _) => ())
-      TestUtils.waitUntilTrue(() => replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.values.forall(_.partitionCount() == 0),
-        s"the partition=$topicPartition should be removed from pending state")
-      // the partition is added to failedPartitions if fenced error happens
-      // if the thread is done before ReplicaManager#becomeLeaderOrFollower updates epoch,the fenced error does
-      // not happen and failedPartitions is empty.
-      if (replicaManager.replicaAlterLogDirsManager.failedPartitions.size != 0) {
+      (1 to loopEpochChange).foreach(epoch => replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(epoch), (_, _) => ()))
+      // wait for the ReplicaAlterLogDirsThread to complete
+      TestUtils.waitUntilTrue(() => {
         replicaManager.replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
-        assertEquals(0, replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.size)
-        // send request again
-        replicaManager.alterReplicaLogDirs(Map(topicPartition -> newReplicaFolder.getAbsolutePath))
-        // the future folder exists so it fails to invoke thread
-        assertEquals(1, replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.size)
-      }
+        replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.isEmpty
+      }, s"ReplicaAlterLogDirsThread should be gone")
+
+      // the replica change is completed after retrying
+      assertTrue(futureReplica.log.isEmpty)
+      assertEquals(newReplicaFolder.getAbsolutePath, replica.log.get.dir.getParent)
+      // change the replica folder again
+      val response = replicaManager.alterReplicaLogDirs(Map(topicPartition -> previousReplicaFolder.getAbsolutePath))
+      assertNotEquals(0, response.size)
+      response.values.foreach(assertEquals(Errors.NONE, _))
+      // should succeed to invoke ReplicaAlterLogDirsThread again
+      assertEquals(1, replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.size)
     } finally replicaManager.shutdown(checkpointHW = false)
   }
 


[kafka] 01/02: KAFKA-9654; Update epoch in `ReplicaAlterLogDirsThread` after new LeaderAndIsr (#8223)

Posted by jg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 66ae1ddbc1b92b5e63939bc54ba1e4c2b0d6ae8b
Author: Chia-Ping Tsai <ch...@gmail.com>
AuthorDate: Fri Mar 20 07:49:35 2020 +0800

    KAFKA-9654; Update epoch in `ReplicaAlterLogDirsThread` after new LeaderAndIsr  (#8223)
    
    Currently when there is a leader change with a log dir reassignment in progress, we do not update the leader epoch in the partition state maintained by `ReplicaAlterLogDirsThread`. This can lead to a FENCED_LEADER_EPOCH error, which results in the partition being marked as failed, which is a permanent failure until the broker is restarted. This patch fixes the problem by updating the epoch in `ReplicaAlterLogDirsThread` after receiving a new LeaderAndIsr request from the controller.
    
    Reviewers: Jun Rao <ju...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
 .../scala/kafka/server/AbstractFetcherThread.scala | 58 +++++++++++++++-------
 .../main/scala/kafka/server/ReplicaManager.scala   | 15 +++---
 .../admin/ReassignPartitionsClusterTest.scala      | 27 +++++++---
 .../unit/kafka/server/ReplicaManagerTest.scala     | 52 ++++++++++++++++++-
 4 files changed, 118 insertions(+), 34 deletions(-)

diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 5871847..083e46b 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -198,7 +198,8 @@ abstract class AbstractFetcherThread(name: String,
         curPartitionState != null && leaderEpochInRequest == curPartitionState.currentLeaderEpoch
       }
 
-      val ResultWithPartitions(fetchOffsets, partitionsWithError) = maybeTruncateToEpochEndOffsets(epochEndOffsets)
+      val ResultWithPartitions(fetchOffsets, partitionsWithError) =
+        maybeTruncateToEpochEndOffsets(epochEndOffsets, latestEpochsForPartitions)
       handlePartitionsWithErrors(partitionsWithError)
       updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets)
     }
@@ -232,7 +233,8 @@ abstract class AbstractFetcherThread(name: String,
     updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets)
   }
 
-  private def maybeTruncateToEpochEndOffsets(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): ResultWithPartitions[Map[TopicPartition, OffsetTruncationState]] = {
+  private def maybeTruncateToEpochEndOffsets(fetchedEpochs: Map[TopicPartition, EpochEndOffset],
+                                             latestEpochsForPartitions: Map[TopicPartition, EpochData]): ResultWithPartitions[Map[TopicPartition, OffsetTruncationState]] = {
     val fetchOffsets = mutable.HashMap.empty[TopicPartition, OffsetTruncationState]
     val partitionsWithError = mutable.HashSet.empty[TopicPartition]
 
@@ -245,7 +247,11 @@ abstract class AbstractFetcherThread(name: String,
             fetchOffsets.put(tp, offsetTruncationState)
 
           case Errors.FENCED_LEADER_EPOCH =>
-            onPartitionFenced(tp)
+            if (onPartitionFenced(tp, latestEpochsForPartitions.get(tp).flatMap {
+              p =>
+                if (p.currentLeaderEpoch.isPresent) Some(p.currentLeaderEpoch.get())
+                else None
+            })) partitionsWithError += tp
 
           case error =>
             info(s"Retrying leaderEpoch request for partition $tp as the leader reported an error: $error")
@@ -261,12 +267,22 @@ abstract class AbstractFetcherThread(name: String,
     ResultWithPartitions(fetchOffsets, partitionsWithError)
   }
 
-  private def onPartitionFenced(tp: TopicPartition): Unit = inLock(partitionMapLock) {
-    Option(partitionStates.stateValue(tp)).foreach { currentFetchState =>
+  /**
+   * remove the partition if the partition state is NOT updated. Otherwise, keep the partition active.
+   * @return true if the epoch in this thread is updated. otherwise, false
+   */
+  private def onPartitionFenced(tp: TopicPartition, requestEpoch: Option[Int]): Boolean = inLock(partitionMapLock) {
+    Option(partitionStates.stateValue(tp)).exists { currentFetchState =>
       val currentLeaderEpoch = currentFetchState.currentLeaderEpoch
-      info(s"Partition $tp has an older epoch ($currentLeaderEpoch) than the current leader. Will await " +
-        s"the new LeaderAndIsr state before resuming fetching.")
-      partitionStates.remove(tp)
+      if (requestEpoch.contains(currentLeaderEpoch)) {
+        info(s"Partition $tp has an older epoch ($currentLeaderEpoch) than the current leader. Will await " +
+          s"the new LeaderAndIsr state before resuming fetching.")
+        partitionStates.remove(tp)
+        false
+      } else {
+        info(s"Partition $tp has an new epoch ($currentLeaderEpoch) than the current leader. retry the partition later")
+        true
+      }
     }
   }
 
@@ -303,6 +319,10 @@ abstract class AbstractFetcherThread(name: String,
             // the current offset is the same as the offset requested.
             val fetchState = fetchStates(topicPartition)
             if (fetchState.fetchOffset == currentFetchState.fetchOffset && currentFetchState.isReadyForFetch) {
+              val requestEpoch = if (fetchState.currentLeaderEpoch >= 0)
+                Some(fetchState.currentLeaderEpoch)
+              else
+                None
               partitionData.error match {
                 case Errors.NONE =>
                   try {
@@ -342,7 +362,7 @@ abstract class AbstractFetcherThread(name: String,
                         s"offset ${currentFetchState.fetchOffset}", e)
                   }
                 case Errors.OFFSET_OUT_OF_RANGE =>
-                  if (!handleOutOfRangeError(topicPartition, currentFetchState))
+                  if (handleOutOfRangeError(topicPartition, currentFetchState, requestEpoch))
                     partitionsWithError += topicPartition
 
                 case Errors.UNKNOWN_LEADER_EPOCH =>
@@ -351,7 +371,7 @@ abstract class AbstractFetcherThread(name: String,
                   partitionsWithError += topicPartition
 
                 case Errors.FENCED_LEADER_EPOCH =>
-                  onPartitionFenced(topicPartition)
+                  if (onPartitionFenced(topicPartition, requestEpoch)) partitionsWithError += topicPartition
 
                 case Errors.NOT_LEADER_FOR_PARTITION =>
                   debug(s"Remote broker is not the leader for partition $topicPartition, which could indicate " +
@@ -504,32 +524,34 @@ abstract class AbstractFetcherThread(name: String,
   }
 
   /**
-   * Handle the out of range error. Return true if the request succeeded or was fenced, which means we need
-   * not backoff and retry. False if there was a retriable error.
+   * Handle the out of range error. Return false if
+   * 1) the request succeeded or
+   * 2) was fenced and this thread haven't received new epoch,
+   * which means we need not backoff and retry. True if there was a retriable error.
    */
   private def handleOutOfRangeError(topicPartition: TopicPartition,
-                                    fetchState: PartitionFetchState): Boolean = {
+                                    fetchState: PartitionFetchState,
+                                    requestEpoch: Option[Int]): Boolean = {
     try {
       val newOffset = fetchOffsetAndTruncate(topicPartition, fetchState.currentLeaderEpoch)
       val newFetchState = PartitionFetchState(newOffset, fetchState.currentLeaderEpoch, state = Fetching)
       partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)
       info(s"Current offset ${fetchState.fetchOffset} for partition $topicPartition is " +
         s"out of range, which typically implies a leader change. Reset fetch offset to $newOffset")
-      true
+      false
     } catch {
       case _: FencedLeaderEpochException =>
-        onPartitionFenced(topicPartition)
-        true
+        onPartitionFenced(topicPartition, requestEpoch)
 
       case e @ (_ : UnknownTopicOrPartitionException |
                 _ : UnknownLeaderEpochException |
                 _ : NotLeaderForPartitionException) =>
         info(s"Could not fetch offset for $topicPartition due to error: ${e.getMessage}")
-        false
+        true
 
       case e: Throwable =>
         error(s"Error getting offset for partition $topicPartition", e)
-        false
+        true
     }
   }
 
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 352ef64..71b6122 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1057,14 +1057,10 @@ class ReplicaManager(val config: KafkaConfig,
 
         // First check partition's leader epoch
         val partitionState = new mutable.HashMap[Partition, LeaderAndIsrRequest.PartitionState]()
-        val newPartitions = new mutable.HashSet[Partition]
+        val updatedPartitions = new mutable.HashSet[Partition]
 
         leaderAndIsrRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>
-          val partition = getPartition(topicPartition).getOrElse {
-            val createdPartition = getOrCreatePartition(topicPartition)
-            newPartitions.add(createdPartition)
-            createdPartition
-          }
+          val partition = getOrCreatePartition(topicPartition)
           val currentLeaderEpoch = partition.getLeaderEpoch
           val requestLeaderEpoch = stateInfo.basePartitionState.leaderEpoch
           if (partition eq ReplicaManager.OfflinePartition) {
@@ -1076,9 +1072,10 @@ class ReplicaManager(val config: KafkaConfig,
           } else if (requestLeaderEpoch > currentLeaderEpoch) {
             // If the leader epoch is valid record the epoch of the controller that made the leadership decision.
             // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path
-            if(stateInfo.basePartitionState.replicas.contains(localBrokerId))
+            if (stateInfo.basePartitionState.replicas.contains(localBrokerId)) {
+              updatedPartitions.add(partition)
               partitionState.put(partition, stateInfo)
-            else {
+            } else {
               stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller $controllerId with " +
                 s"correlation id $correlationId epoch $controllerEpoch for partition $topicPartition as itself is not " +
                 s"in assigned replica list ${stateInfo.basePartitionState.replicas.asScala.mkString(",")}")
@@ -1128,7 +1125,7 @@ class ReplicaManager(val config: KafkaConfig,
         }
 
         val futureReplicasAndInitialOffset = new mutable.HashMap[TopicPartition, InitialFetchState]
-        for (partition <- newPartitions) {
+        for (partition <- updatedPartitions) {
           val topicPartition = partition.topicPartition
           if (logManager.getLog(topicPartition, isFuture = true).isDefined) {
             partition.localReplica.foreach { replica =>
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
index 654a92e..fbcb1ca 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
@@ -66,9 +66,9 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
     JAdminClient.create(props)
   }
 
-  def getRandomLogDirAssignment(brokerId: Int): String = {
+  def getRandomLogDirAssignment(brokerId: Int, excluded: Option[String] = None): String = {
     val server = servers.find(_.config.brokerId == brokerId).get
-    val logDirs = server.config.logDirs
+    val logDirs = server.config.logDirs.filterNot(excluded.contains)
     new File(logDirs(Random.nextInt(logDirs.size))).getAbsolutePath
   }
 
@@ -134,18 +134,33 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
   }
 
   @Test
-  def shouldMoveSinglePartitionWithinBroker() {
+  def shouldMoveSinglePartitionToSameFolderWithinBroker(): Unit = shouldMoveSinglePartitionWithinBroker(true)
+
+  @Test
+  def shouldMoveSinglePartitionToDifferentFolderWithinBroker(): Unit = shouldMoveSinglePartitionWithinBroker(false)
+
+  def shouldMoveSinglePartitionWithinBroker(moveToSameFolder: Boolean): Unit = {
     // Given a single replica on server 100
     startBrokers(Seq(100, 101))
     adminClient = createAdminClient(servers)
-    val expectedLogDir = getRandomLogDirAssignment(100)
     createTopic(zkClient, topicName, Map(0 -> Seq(100)), servers = servers)
 
+    val replica = new TopicPartitionReplica(topicName, 0, 100)
+    val currentLogDir = adminClient.describeReplicaLogDirs(java.util.Collections.singleton(replica))
+      .all()
+      .get()
+      .get(replica)
+      .getCurrentReplicaLogDir
+
+    val expectedLogDir = if (moveToSameFolder)
+      currentLogDir
+    else
+      getRandomLogDirAssignment(100, excluded = Some(currentLogDir))
+
     // When we execute an assignment that moves an existing replica to another log directory on the same broker
     val topicJson: String = s"""{"version":1,"partitions":[{"topic":"$topicName","partition":0,"replicas":[100],"log_dirs":["$expectedLogDir"]}]}"""
     ReassignPartitionsCommand.executeAssignment(zkClient, Some(adminClient), topicJson, NoThrottle)
-    val replica = new TopicPartitionReplica(topicName, 0, 100)
-    TestUtils.waitUntilTrue(() => {
+    waitUntilTrue(() => {
       expectedLogDir == adminClient.describeReplicaLogDirs(Collections.singleton(replica)).all().get.get(replica).getCurrentReplicaLogDir
     }, "Partition should have been moved to the expected log directory", 1000)
   }
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 65b62d0..4786745 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -195,6 +195,55 @@ class ReplicaManagerTest {
   }
 
   @Test
+  def testFencedErrorCausedByBecomeLeader(): Unit = {
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer)
+    try {
+      val brokerList = Seq[Integer](0, 1).asJava
+      val topicPartition = new TopicPartition(topic, 0)
+      replicaManager.getOrCreatePartition(topicPartition)
+          .getOrCreateReplica(0, isNew = false)
+
+      def leaderAndIsrRequest(epoch: Int): LeaderAndIsrRequest = new LeaderAndIsrRequest.Builder(
+        ApiKeys.LEADER_AND_ISR.latestVersion,
+        0,
+        0,
+        brokerEpoch,
+        Map(topicPartition -> new LeaderAndIsrRequest.PartitionState(0, 0,
+          epoch, brokerList, 0, brokerList, true)).asJava,
+        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava
+      ).build()
+
+      replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0), (_, _) => ())
+      val partition = replicaManager.getPartitionOrException(new TopicPartition(topic, 0), expectLeader = true)
+        .localReplica.flatMap(_.log).get
+      assertEquals(1, replicaManager.logManager.liveLogDirs.filterNot(_ == partition.dir.getParentFile).size)
+
+      // find the live and different folder
+      val newReplicaFolder = replicaManager.logManager.liveLogDirs.filterNot(_ == partition.dir.getParentFile).head
+      assertEquals(0, replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.size)
+      replicaManager.alterReplicaLogDirs(Map(topicPartition -> newReplicaFolder.getAbsolutePath))
+      assertTrue(replicaManager.futureLocalReplica(topicPartition).flatMap(_.log).isDefined)
+
+      assertEquals(1, replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.size)
+      // change the epoch from 0 to 1 in order to make fenced error
+      replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(1), (_, _) => ())
+      TestUtils.waitUntilTrue(() => replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.values.forall(_.partitionCount() == 0),
+        s"the partition=$topicPartition should be removed from pending state")
+      // the partition is added to failedPartitions if fenced error happens
+      // if the thread is done before ReplicaManager#becomeLeaderOrFollower updates epoch,the fenced error does
+      // not happen and failedPartitions is empty.
+      if (replicaManager.replicaAlterLogDirsManager.failedPartitions.size != 0) {
+        replicaManager.replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
+        assertEquals(0, replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.size)
+        // send request again
+        replicaManager.alterReplicaLogDirs(Map(topicPartition -> newReplicaFolder.getAbsolutePath))
+        // the future folder exists so it fails to invoke thread
+        assertEquals(1, replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.size)
+      }
+    } finally replicaManager.shutdown(checkpointHW = false)
+  }
+
+  @Test
   def testReceiveOutOfOrderSequenceExceptionWithLogStartOffset(): Unit = {
     val timer = new MockTimer
     val replicaManager = setupReplicaManagerWithMockedPurgatories(timer)
@@ -636,6 +685,7 @@ class ReplicaManagerTest {
       EasyMock.expect(mockLogMgr.truncateTo(Map(new TopicPartition(topic, topicPartition) -> offsetFromLeader),
         isFuture = false)).once
     }
+    EasyMock.expect(mockLogMgr.getLog(new TopicPartition(topic, topicPartition), isFuture = true)).andReturn(None)
     EasyMock.replay(mockLogMgr)
 
     val aliveBrokerIds = Seq[Integer](followerBrokerId, leaderBrokerId)
@@ -798,7 +848,7 @@ class ReplicaManagerTest {
 
   private def setupReplicaManagerWithMockedPurgatories(timer: MockTimer, aliveBrokerIds: Seq[Int] = Seq(0, 1)): ReplicaManager = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
-    props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
+    props.put("log.dirs", TestUtils.tempRelativeDir("data").getAbsolutePath + "," + TestUtils.tempRelativeDir("data2").getAbsolutePath)
     val config = KafkaConfig.fromProps(props)
     val logProps = new Properties()
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)), LogConfig(logProps))