You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/03/04 21:28:42 UTC

[kafka] branch 2.1 updated: KAFKA-8002: Log dir reassignment stalls if future replica has different segment base offset (#6364)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 2ae0938  KAFKA-8002: Log dir reassignment stalls if future replica has different segment base offset (#6364)
2ae0938 is described below

commit 2ae0938592c005e3b8257141c32dceddbbc35ade
Author: Bob Barrett <bo...@outlook.com>
AuthorDate: Mon Mar 4 16:28:31 2019 -0500

    KAFKA-8002: Log dir reassignment stalls if future replica has different segment base offset (#6364)
    
    This patch fixes a bug in log dir reassignment where Partition.maybeReplaceCurrentWithFutureReplica would compare the entire LogEndOffsetMetadata of each replica to determine whether the reassignment has completed. If the active segments of both replicas have different base segments (for example, if the current replica had previously been cleaned and the future replica rolled segments at different points), the reassignment will never complete. The fix is to compare only the LogEndOffs [...]
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 core/src/main/scala/kafka/cluster/Partition.scala  |  6 +--
 .../scala/unit/kafka/cluster/PartitionTest.scala   | 46 ++++++++++++++++++++++
 2 files changed, 49 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 4cedd5e..cf6b27f 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -320,14 +320,14 @@ class Partition(val topicPartition: TopicPartition,
   // from its partitionStates if this method returns true
   def maybeReplaceCurrentWithFutureReplica(): Boolean = {
     val replica = localReplicaOrException
-    val futureReplicaLEO = futureLocalReplica.map(_.logEndOffset)
-    if (futureReplicaLEO.contains(replica.logEndOffset)) {
+    val futureReplicaLEO = futureLocalReplica.map(_.logEndOffset.messageOffset)
+    if (futureReplicaLEO.contains(replica.logEndOffset.messageOffset)) {
       // The write lock is needed to make sure that while ReplicaAlterDirThread checks the LEO of the
       // current replica, no other thread can update LEO of the current replica via log truncation or log append operation.
       inWriteLock(leaderIsrUpdateLock) {
         futureLocalReplica match {
           case Some(futureReplica) =>
-            if (replica.logEndOffset == futureReplica.logEndOffset) {
+            if (replica.logEndOffset.messageOffset == futureReplica.logEndOffset.messageOffset) {
               logManager.replaceCurrentWithFutureLog(topicPartition)
               replica.log = futureReplica.log
               futureReplica.log = None
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index cc2a311..3cdc2bf 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -198,6 +198,52 @@ class PartitionTest {
     assertEquals(None, partition.futureLocalReplica)
   }
 
+  // Verify that replacement works when the replicas have the same log end offset but different base offsets in the
+  // active segment
+  @Test
+  def testMaybeReplaceCurrentWithFutureReplicaDifferentBaseOffsets(): Unit = {
+    // Write records with duplicate keys to current replica and roll at offset 6
+    logManager.maybeUpdatePreferredLogDir(topicPartition, logDir1.getAbsolutePath)
+    val log1 = logManager.getOrCreateLog(topicPartition, logConfig)
+    log1.appendAsLeader(MemoryRecords.withRecords(0L, CompressionType.NONE, 0,
+      new SimpleRecord("k1".getBytes, "v1".getBytes),
+      new SimpleRecord("k1".getBytes, "v2".getBytes),
+      new SimpleRecord("k1".getBytes, "v3".getBytes),
+      new SimpleRecord("k2".getBytes, "v4".getBytes),
+      new SimpleRecord("k2".getBytes, "v5".getBytes),
+      new SimpleRecord("k2".getBytes, "v6".getBytes)
+    ), leaderEpoch = 0)
+    log1.roll()
+    log1.appendAsLeader(MemoryRecords.withRecords(0L, CompressionType.NONE, 0,
+      new SimpleRecord("k3".getBytes, "v7".getBytes),
+      new SimpleRecord("k4".getBytes, "v8".getBytes)
+    ), leaderEpoch = 0)
+
+    // Write to the future replica as if the log had been compacted, and do not roll the segment
+    logManager.maybeUpdatePreferredLogDir(topicPartition, logDir2.getAbsolutePath)
+    val log2 = logManager.getOrCreateLog(topicPartition, logConfig, isFuture = true)
+    val buffer = ByteBuffer.allocate(1024)
+    var builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE,
+      TimestampType.CREATE_TIME, 0L, RecordBatch.NO_TIMESTAMP, 0)
+    builder.appendWithOffset(2L, new SimpleRecord("k1".getBytes, "v3".getBytes))
+    builder.appendWithOffset(5L, new SimpleRecord("k2".getBytes, "v6".getBytes))
+    builder.appendWithOffset(6L, new SimpleRecord("k3".getBytes, "v7".getBytes))
+    builder.appendWithOffset(7L, new SimpleRecord("k4".getBytes, "v8".getBytes))
+
+    log2.appendAsFollower(builder.build())
+
+    val currentReplica = new Replica(brokerId, topicPartition, time, log = Some(log1))
+    val futureReplica = new Replica(Request.FutureLocalReplicaId, topicPartition, time, log = Some(log2))
+    val partition = Partition(topicPartition, time, replicaManager)
+
+    partition.addReplicaIfNotExists(futureReplica)
+    partition.addReplicaIfNotExists(currentReplica)
+    assertEquals(Some(currentReplica), partition.localReplica)
+    assertEquals(Some(futureReplica), partition.futureLocalReplica)
+
+    assertTrue(partition.maybeReplaceCurrentWithFutureReplica())
+  }
+
   @Test
   def testFetchOffsetSnapshotEpochValidationForLeader(): Unit = {
     val leaderEpoch = 5