You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/03/14 05:53:05 UTC

[kafka] branch trunk updated: KAFKA-3978; Ensure high watermark is always positive (#4695)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6b08905  KAFKA-3978; Ensure high watermark is always positive (#4695)
6b08905 is described below

commit 6b08905dfb40923317745b5e2237b2310b713333
Author: Dong Lin <li...@users.noreply.github.com>
AuthorDate: Tue Mar 13 22:52:59 2018 -0700

    KAFKA-3978; Ensure high watermark is always positive (#4695)
    
    Partition high watermark may become -1 if the initial value is out of range. This situation can occur during partition reassignment, for example. The bug was fixed and validated with unit test in this patch.
    
    Reviewers: Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>
---
 core/src/main/scala/kafka/cluster/Partition.scala  |  6 ++++-
 core/src/main/scala/kafka/cluster/Replica.scala    | 14 ++++++++++--
 core/src/main/scala/kafka/log/Log.scala            |  8 +++----
 .../admin/ReassignPartitionsClusterTest.scala      | 26 ++++++++++++++++++++++
 4 files changed, 47 insertions(+), 7 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 3b97671..68faf00 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -460,7 +460,11 @@ class Partition(val topic: String,
     }.map(_.logEndOffset)
     val newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering)
     val oldHighWatermark = leaderReplica.highWatermark
-    if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset || oldHighWatermark.onOlderSegment(newHighWatermark)) {
+
+    // Ensure that the high watermark increases monotonically. We also update the high watermark when the new
+    // offset metadata is on a newer segment, which occurs whenever the log is rolled to a new segment.
+    if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset ||
+      (oldHighWatermark.messageOffset == newHighWatermark.messageOffset && oldHighWatermark.onOlderSegment(newHighWatermark))) {
       leaderReplica.highWatermark = newHighWatermark
       debug(s"High watermark updated to $newHighWatermark")
       true
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala
index e41e389..030e5b7 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -138,6 +138,9 @@ class Replica(val brokerId: Int,
 
   def highWatermark_=(newHighWatermark: LogOffsetMetadata) {
     if (isLocal) {
+      if (newHighWatermark.messageOffset < 0)
+        throw new IllegalArgumentException("High watermark offset should be non-negative")
+
       highWatermarkMetadata = newHighWatermark
       log.foreach(_.onHighWatermarkIncremented(newHighWatermark.messageOffset))
       trace(s"Setting high watermark for replica $brokerId partition $topicPartition to [$newHighWatermark]")
@@ -165,9 +168,16 @@ class Replica(val brokerId: Int,
       s"non-local replica $brokerId"))
   }
 
-  def convertHWToLocalOffsetMetadata() = {
+  /*
+   * Convert hw to local offset metadata by reading the log at the hw offset.
+   * If the hw offset is out of range, return the first offset of the first log segment as the offset metadata.
+   */
+  def convertHWToLocalOffsetMetadata() {
     if (isLocal) {
-      highWatermarkMetadata = log.get.convertToOffsetMetadata(highWatermarkMetadata.messageOffset)
+      highWatermarkMetadata = log.get.convertToOffsetMetadata(highWatermarkMetadata.messageOffset).getOrElse {
+        val firstOffset = log.get.logSegments.head.baseOffset
+        new LogOffsetMetadata(firstOffset, firstOffset, 0)
+      }
     } else {
       throw new KafkaException(s"Should not construct complete high watermark on partition $topicPartition's non-local replica $brokerId")
     }
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 257dd8f..f0050f5 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -1126,14 +1126,14 @@ class Log(@volatile var dir: File,
 
   /**
    * Given a message offset, find its corresponding offset metadata in the log.
-   * If the message offset is out of range, return unknown offset metadata
+   * If the message offset is out of range, return None to the caller.
    */
-  def convertToOffsetMetadata(offset: Long): LogOffsetMetadata = {
+  def convertToOffsetMetadata(offset: Long): Option[LogOffsetMetadata] = {
     try {
       val fetchDataInfo = readUncommitted(offset, 1)
-      fetchDataInfo.fetchOffsetMetadata
+      Some(fetchDataInfo.fetchOffsetMetadata)
     } catch {
-      case _: OffsetOutOfRangeException => LogOffsetMetadata.UnknownOffsetMetadata
+      case _: OffsetOutOfRangeException => None
     }
   }
 
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
index 2a24a37..0c41519 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
@@ -77,6 +77,32 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
   }
 
   @Test
+  def testHwAfterPartitionReassignment(): Unit = {
+    //Given a single replica on server 100
+    startBrokers(Seq(100, 101, 102))
+    adminClient = createAdminClient(servers)
+    createTopic(zkClient, topicName, Map(0 -> Seq(100)), servers = servers)
+
+    val topicPartition = new TopicPartition(topicName, 0)
+    val leaderServer = servers.find(_.config.brokerId == 100).get
+    leaderServer.replicaManager.logManager.truncateFullyAndStartAt(topicPartition, 100L, false)
+
+    val topicJson: String = s"""{"version":1,"partitions":[{"topic":"$topicName","partition":0,"replicas":[101, 102]}]}"""
+    ReassignPartitionsCommand.executeAssignment(zkClient, Some(adminClient), topicJson, NoThrottle)
+
+    val newLeaderServer = servers.find(_.config.brokerId == 101).get
+
+    TestUtils.waitUntilTrue (
+      () => newLeaderServer.replicaManager.getPartition(topicPartition).flatMap(_.leaderReplicaIfLocal).isDefined,
+      "broker 101 should be the new leader", pause = 1L
+    )
+
+    assertEquals(100, newLeaderServer.replicaManager.getReplicaOrException(topicPartition).highWatermark.messageOffset)
+    servers.foreach(server => waitUntilTrue(() => server.replicaManager.getReplicaOrException(topicPartition).highWatermark.messageOffset == 100, ""))
+  }
+
+
+  @Test
   def shouldMoveSinglePartition(): Unit = {
     //Given a single replica on server 100
     startBrokers(Seq(100, 101))

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.