You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/03/04 22:12:43 UTC

[kafka] branch 2.0 updated: KAFKA-8002: Log dir reassignment stalls if future replica has different segment base offset (#6364)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new b0578b9  KAFKA-8002: Log dir reassignment stalls if future replica has different segment base offset (#6364)
b0578b9 is described below

commit b0578b93cd8122b72f20451ed136584e8e0a9198
Author: Bob Barrett <bo...@outlook.com>
AuthorDate: Mon Mar 4 16:28:31 2019 -0500

    KAFKA-8002: Log dir reassignment stalls if future replica has different segment base offset (#6364)
    
    This patch fixes a bug in log dir reassignment where Partition.maybeReplaceCurrentWithFutureReplica would compare the entire LogEndOffsetMetadata of each replica to determine whether the reassignment has completed. If the active segments of both replicas have different base segments (for example, if the current replica had previously been cleaned and the future replica rolled segments at different points), the reassignment will never complete. The fix is to compare only the LogEndOffs [...]
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 core/src/main/scala/kafka/cluster/Partition.scala  |  6 +--
 .../scala/unit/kafka/cluster/PartitionTest.scala   | 45 ++++++++++++++++++++++
 2 files changed, 48 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index f398f8b..60f05ce 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -236,14 +236,14 @@ class Partition(val topic: String,
   // from its partitionStates if this method returns true
   def maybeReplaceCurrentWithFutureReplica(): Boolean = {
     val replica = getReplica().get
-    val futureReplicaLEO = getReplica(Request.FutureLocalReplicaId).map(_.logEndOffset)
-    if (futureReplicaLEO.contains(replica.logEndOffset)) {
+    val futureReplicaLEO = getReplica(Request.FutureLocalReplicaId).map(_.logEndOffset.messageOffset)
+    if (futureReplicaLEO.contains(replica.logEndOffset.messageOffset)) {
       // The write lock is needed to make sure that while ReplicaAlterDirThread checks the LEO of the
       // current replica, no other thread can update LEO of the current replica via log truncation or log append operation.
       inWriteLock(leaderIsrUpdateLock) {
         getReplica(Request.FutureLocalReplicaId) match {
           case Some(futureReplica) =>
-            if (replica.logEndOffset == futureReplica.logEndOffset) {
+            if (replica.logEndOffset.messageOffset == futureReplica.logEndOffset.messageOffset) {
               logManager.replaceCurrentWithFutureLog(topicPartition)
               replica.log = futureReplica.log
               futureReplica.log = None
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index e58028b..74c4f23 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -216,6 +216,51 @@ class PartitionTest {
     assertEquals(None, partition.getReplica(Request.FutureLocalReplicaId))
   }
 
+  // Verify that replacement works when the replicas have the same log end offset but different base offsets in the
+  // active segment
+  @Test
+  def testMaybeReplaceCurrentWithFutureReplicaDifferentBaseOffsets(): Unit = {
+    // Write records with duplicate keys to current replica and roll at offset 6
+    logManager.maybeUpdatePreferredLogDir(topicPartition, logDir1.getAbsolutePath)
+    val log1 = logManager.getOrCreateLog(topicPartition, logConfig)
+    log1.appendAsLeader(MemoryRecords.withRecords(0L, CompressionType.NONE, 0,
+      new SimpleRecord("k1".getBytes, "v1".getBytes),
+      new SimpleRecord("k1".getBytes, "v2".getBytes),
+      new SimpleRecord("k1".getBytes, "v3".getBytes),
+      new SimpleRecord("k2".getBytes, "v4".getBytes),
+      new SimpleRecord("k2".getBytes, "v5".getBytes),
+      new SimpleRecord("k2".getBytes, "v6".getBytes)
+    ), leaderEpoch = 0)
+    log1.roll()
+    log1.appendAsLeader(MemoryRecords.withRecords(0L, CompressionType.NONE, 0,
+      new SimpleRecord("k3".getBytes, "v7".getBytes),
+      new SimpleRecord("k4".getBytes, "v8".getBytes)
+    ), leaderEpoch = 0)
+
+    // Write to the future replica as if the log had been compacted, and do not roll the segment
+    logManager.maybeUpdatePreferredLogDir(topicPartition, logDir2.getAbsolutePath)
+    val log2 = logManager.getOrCreateLog(topicPartition, logConfig, isFuture = true)
+    val buffer = ByteBuffer.allocate(1024)
+    var builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE,
+      TimestampType.CREATE_TIME, 0L, RecordBatch.NO_TIMESTAMP, 0)
+    builder.appendWithOffset(2L, new SimpleRecord("k1".getBytes, "v3".getBytes))
+    builder.appendWithOffset(5L, new SimpleRecord("k2".getBytes, "v6".getBytes))
+    builder.appendWithOffset(6L, new SimpleRecord("k3".getBytes, "v7".getBytes))
+    builder.appendWithOffset(7L, new SimpleRecord("k4".getBytes, "v8".getBytes))
+
+    log2.appendAsFollower(builder.build())
+
+    val currentReplica = new Replica(brokerId, topicPartition, time, log = Some(log1))
+    val futureReplica = new Replica(Request.FutureLocalReplicaId, topicPartition, time, log = Some(log2))
+    val partition = new Partition(topicPartition.topic, topicPartition.partition, time, replicaManager)
+
+    partition.addReplicaIfNotExists(futureReplica)
+    partition.addReplicaIfNotExists(currentReplica)
+    assertEquals(Some(currentReplica), partition.getReplica(brokerId))
+    assertEquals(Some(futureReplica), partition.getReplica(Request.FutureLocalReplicaId))
+
+    assertTrue(partition.maybeReplaceCurrentWithFutureReplica())
+  }
 
   @Test
   def testAppendRecordsAsFollowerBelowLogStartOffset(): Unit = {