You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/07/22 22:40:00 UTC

[jira] [Commented] (KAFKA-6604) ReplicaManager should not remove partitions on the log dirctory from high watermark checkpoint file

    [ https://issues.apache.org/jira/browse/KAFKA-6604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16552187#comment-16552187 ] 

ASF GitHub Bot commented on KAFKA-6604:
---------------------------------------

lindong28 closed pull request #4634: KAFKA-6604; ReplicaManager should not remove partitions on the log directory from high watermark checkpoint file
URL: https://github.com/apache/kafka/pull/4634
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 470842e9d9e..10889a0e392 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1384,7 +1384,11 @@ class ReplicaManager(val config: KafkaConfig,
     for ((dir, reps) <- replicasByDir) {
       val hwms = reps.map(r => r.topicPartition -> r.highWatermark.messageOffset).toMap
       try {
-        highWatermarkCheckpoints.get(dir).foreach(_.write(hwms))
+        highWatermarkCheckpoints.get(dir).foreach { checkpointFile =>
+          val previousHwms = checkpointFile.read()
+          val previousHwmsOfExistingReplicas = previousHwms.filterKeys(tp => logManager.getLog(tp).nonEmpty)
+          checkpointFile.write(previousHwmsOfExistingReplicas ++ hwms)
+        }
       } catch {
         case e: KafkaStorageException =>
           error(s"Error while writing to highwatermark file in directory $dir", e)
diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index c0871a769d6..bbc1c7353e8 100755
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -100,6 +100,7 @@ class HighwatermarkPersistenceTest {
   def testHighWatermarkPersistenceMultiplePartitions() {
     val topic1 = "foo1"
     val topic2 = "foo2"
+    val topic3 = "foo3"
     // mock zkclient
     EasyMock.replay(zkClient)
     // create kafka scheduler
@@ -111,6 +112,12 @@ class HighwatermarkPersistenceTest {
     val replicaManager = new ReplicaManager(configs.head, metrics, time, zkClient,
       scheduler, logManagers.head, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time, ""),
       new BrokerTopicStats, new MetadataCache(configs.head.brokerId), logDirFailureChannels.head)
+
+    // Initialize high watermark checkpoint file with topic3 -> 20 and put this partition in LogManager.
+    logManagers.head.getOrCreateLog(new TopicPartition(topic3, 0), LogConfig())
+    val checkpointFile = replicaManager.highWatermarkCheckpoints(new File(replicaManager.config.logDirs.head).getAbsolutePath)
+    checkpointFile.write(Map(new TopicPartition(topic3, 0) -> 20L))
+
     replicaManager.startup()
     try {
       replicaManager.checkpointHighWatermarks()
@@ -156,6 +163,10 @@ class HighwatermarkPersistenceTest {
       // verify checkpointed hw for topic 1
       topic1Partition0Hw = hwmFor(replicaManager, topic1, 0)
       assertEquals(10L, topic1Partition0Hw)
+
+      // The partition topic3-0 should remain in the high watermark checkpoint file even if ReplicaManager does not know this partition
+      val topic3Partition0Hw = hwmFor(replicaManager, topic3, 0)
+      assertEquals(20L, topic3Partition0Hw)
       EasyMock.verify(zkClient)
     } finally {
       // shutdown the replica manager upon test completion


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> ReplicaManager should not remove partitions on the log dirctory from high watermark checkpoint file
> ---------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6604
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6604
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Dong Lin
>            Assignee: Dong Lin
>            Priority: Major
>
> Currently a broker may truncate a partition to log start offset in the following scenario:
> - Broker A is restarted after shutdown
> - Controller knows that broker A is started.
> - Som event (e.g. topic deletion) triggered controller to send LeaderAndIsrRequest for partition P1.
> - Broker A receives LeaderAndIsrRequest for partition P1. After the broker receives the first LeaderAndIsrRequest, it will overwrite the HW checkpoint file with all its leader partitions and follower partitions. The checkpoint file will contain only the HW for partition P1.
> - Controller sends broker A a LeaderAndIsrRequest for all its leader and follower partitions.
> - Broker creates ReplicaFetcherThread for its follower partitions, truncates the log to HW, which will be zero for all partitions except P1.
> When this happens, potentially all logs in the broker will be truncated to log start offset and then the cluster will run with reduced availability for a long time.
> The right solution is to keep the partitions in the high watermark checkpoint file if the partition exists in LogManager.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)