You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/03/04 22:51:47 UTC

[kafka] branch 1.1 updated: KAFKA-8002: Log dir reassignment stalls if future replica has different segment base offset (#6364)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new 8a94d64  KAFKA-8002: Log dir reassignment stalls if future replica has different segment base offset (#6364)
8a94d64 is described below

commit 8a94d64d2534d9ab7619351c60907c5c1f135b2a
Author: Bob Barrett <bo...@outlook.com>
AuthorDate: Mon Mar 4 16:28:31 2019 -0500

    KAFKA-8002: Log dir reassignment stalls if future replica has different segment base offset (#6364)
    
    This patch fixes a bug in log dir reassignment where Partition.maybeReplaceCurrentWithFutureReplica would compare the entire LogEndOffsetMetadata of each replica to determine whether the reassignment has completed. If the active segments of both replicas have different base segments (for example, if the current replica had previously been cleaned and the future replica rolled segments at different points), the reassignment will never complete. The fix is to compare only the LogEndOffs [...]
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 core/src/main/scala/kafka/cluster/Partition.scala  | 26 +++---
 .../scala/unit/kafka/cluster/PartitionTest.scala   | 93 +++++++++++++++++++++-
 2 files changed, 109 insertions(+), 10 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 41a8faf..3d9b2d1 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -222,18 +222,26 @@ class Partition(val topic: String,
   // from its partitionStates if this method returns true
   def maybeReplaceCurrentWithFutureReplica(): Boolean = {
     val replica = getReplica().get
-    val futureReplica = getReplica(Request.FutureLocalReplicaId).get
-    if (replica.logEndOffset == futureReplica.logEndOffset) {
+    val futureReplicaLEO = getReplica(Request.FutureLocalReplicaId).map(_.logEndOffset.messageOffset)
+    if (futureReplicaLEO.contains(replica.logEndOffset.messageOffset)) {
       // The write lock is needed to make sure that while ReplicaAlterDirThread checks the LEO of the
       // current replica, no other thread can update LEO of the current replica via log truncation or log append operation.
       inWriteLock(leaderIsrUpdateLock) {
-        if (replica.logEndOffset == futureReplica.logEndOffset) {
-          logManager.replaceCurrentWithFutureLog(topicPartition)
-          replica.log = futureReplica.log
-          futureReplica.log = None
-          allReplicasMap.remove(Request.FutureLocalReplicaId)
-          true
-        } else false
+        getReplica(Request.FutureLocalReplicaId) match {
+          case Some(futureReplica) =>
+            if (replica.logEndOffset.messageOffset == futureReplica.logEndOffset.messageOffset) {
+              logManager.replaceCurrentWithFutureLog(topicPartition)
+              replica.log = futureReplica.log
+              futureReplica.log = None
+              allReplicasMap.remove(Request.FutureLocalReplicaId)
+              true
+            } else false
+          case None =>
+            // Future replica is removed by a non-ReplicaAlterLogDirsThread before this method is called
+            // In this case the partition should have been removed from state of the ReplicaAlterLogDirsThread
+            // Return false so that ReplicaAlterLogDirsThread does not have to remove this partition from the state again to avoid race condition
+            false
+        }
       }
     } else false
   }
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 6a40fc6..f130fa4 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -19,8 +19,10 @@ package kafka.cluster
 import java.io.File
 import java.nio.ByteBuffer
 import java.util.Properties
+import java.util.concurrent.CountDownLatch
 import java.util.concurrent.atomic.AtomicBoolean
 
+import kafka.api.Request
 import kafka.common.UnexpectedAppendOffsetException
 import kafka.log.{CleanerConfig, LogConfig, LogManager}
 import kafka.server._
@@ -49,6 +51,7 @@ class PartitionTest {
 
   var tmpDir: File = _
   var logDir: File = _
+  var logDir2: File = _
   var replicaManager: ReplicaManager = _
   var logManager: LogManager = _
   var logConfig: LogConfig = _
@@ -60,8 +63,9 @@ class PartitionTest {
 
     tmpDir = TestUtils.tempDir()
     logDir = TestUtils.randomPartitionLogDir(tmpDir)
+    logDir2 = TestUtils.randomPartitionLogDir(tmpDir)
     logManager = TestUtils.createLogManager(
-      logDirs = Seq(logDir), defaultConfig = logConfig, CleanerConfig(enableCleaner = false), time)
+      logDirs = Seq(logDir, logDir2), defaultConfig = logConfig, CleanerConfig(enableCleaner = false), time)
     logManager.startup()
 
     val brokerProps = TestUtils.createBrokerConfig(brokerId, TestUtils.MockZkConnect)
@@ -170,6 +174,93 @@ class PartitionTest {
   }
 
   @Test
+  // Verify that partition.removeFutureLocalReplica() and partition.maybeReplaceCurrentWithFutureReplica() can run concurrently
+  def testMaybeReplaceCurrentWithFutureReplica(): Unit = {
+    val latch = new CountDownLatch(1)
+
+    logManager.maybeUpdatePreferredLogDir(topicPartition, logDir.getAbsolutePath)
+    val log1 = logManager.getOrCreateLog(topicPartition, logConfig)
+    logManager.maybeUpdatePreferredLogDir(topicPartition, logDir2.getAbsolutePath)
+    val log2 = logManager.getOrCreateLog(topicPartition, logConfig, isFuture = true)
+    val currentReplica = new Replica(brokerId, topicPartition, time, log = Some(log1))
+    val futureReplica = new Replica(Request.FutureLocalReplicaId, topicPartition, time, log = Some(log2))
+    val partition = new Partition(topicPartition.topic, topicPartition.partition, time, replicaManager)
+
+    partition.addReplicaIfNotExists(futureReplica)
+    partition.addReplicaIfNotExists(currentReplica)
+    assertEquals(Some(currentReplica), partition.getReplica(brokerId))
+    assertEquals(Some(futureReplica), partition.getReplica(Request.FutureLocalReplicaId))
+
+    val thread1 = new Thread {
+      override def run(): Unit = {
+        latch.await()
+        partition.removeFutureLocalReplica()
+      }
+    }
+
+    val thread2 = new Thread {
+      override def run(): Unit = {
+        latch.await()
+        partition.maybeReplaceCurrentWithFutureReplica()
+      }
+    }
+
+    thread1.start()
+    thread2.start()
+
+    latch.countDown()
+    thread1.join()
+    thread2.join()
+    assertEquals(None, partition.getReplica(Request.FutureLocalReplicaId))
+  }
+
+  // Verify that replacement works when the replicas have the same log end offset but different base offsets in the
+  // active segment
+  @Test
+  def testMaybeReplaceCurrentWithFutureReplicaDifferentBaseOffsets(): Unit = {
+    // Write records with duplicate keys to current replica and roll at offset 6
+    logManager.maybeUpdatePreferredLogDir(topicPartition, logDir.getAbsolutePath)
+    val log1 = logManager.getOrCreateLog(topicPartition, logConfig)
+    log1.appendAsLeader(MemoryRecords.withRecords(0L, CompressionType.NONE, 0,
+      new SimpleRecord("k1".getBytes, "v1".getBytes),
+      new SimpleRecord("k1".getBytes, "v2".getBytes),
+      new SimpleRecord("k1".getBytes, "v3".getBytes),
+      new SimpleRecord("k2".getBytes, "v4".getBytes),
+      new SimpleRecord("k2".getBytes, "v5".getBytes),
+      new SimpleRecord("k2".getBytes, "v6".getBytes)
+    ), leaderEpoch = 0)
+    log1.roll()
+    log1.appendAsLeader(MemoryRecords.withRecords(0L, CompressionType.NONE, 0,
+      new SimpleRecord("k3".getBytes, "v7".getBytes),
+      new SimpleRecord("k4".getBytes, "v8".getBytes)
+    ), leaderEpoch = 0)
+
+    // Write to the future replica as if the log had been compacted, and do not roll the segment
+    logManager.maybeUpdatePreferredLogDir(topicPartition, logDir2.getAbsolutePath)
+    val log2 = logManager.getOrCreateLog(topicPartition, logConfig, isFuture = true)
+    val buffer = ByteBuffer.allocate(1024)
+    var builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE,
+      TimestampType.CREATE_TIME, 0L, RecordBatch.NO_TIMESTAMP, 0)
+    builder.appendWithOffset(2L, new SimpleRecord("k1".getBytes, "v3".getBytes))
+    builder.appendWithOffset(5L, new SimpleRecord("k2".getBytes, "v6".getBytes))
+    builder.appendWithOffset(6L, new SimpleRecord("k3".getBytes, "v7".getBytes))
+    builder.appendWithOffset(7L, new SimpleRecord("k4".getBytes, "v8".getBytes))
+
+    log2.appendAsFollower(builder.build())
+
+    val currentReplica = new Replica(brokerId, topicPartition, time, log = Some(log1))
+    val futureReplica = new Replica(Request.FutureLocalReplicaId, topicPartition, time, log = Some(log2))
+    val partition = new Partition(topicPartition.topic, topicPartition.partition, time, replicaManager)
+
+    partition.addReplicaIfNotExists(futureReplica)
+    partition.addReplicaIfNotExists(currentReplica)
+    assertEquals(Some(currentReplica), partition.getReplica(brokerId))
+    assertEquals(Some(futureReplica), partition.getReplica(Request.FutureLocalReplicaId))
+
+    assertTrue(partition.maybeReplaceCurrentWithFutureReplica())
+  }
+
+  @Test
   def testAppendRecordsAsFollowerBelowLogStartOffset(): Unit = {
     val log = logManager.getOrCreateLog(topicPartition, logConfig)
     val replica = new Replica(brokerId, topicPartition, time, log = Some(log))