You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/03/14 05:56:20 UTC
[kafka] branch 1.1 updated: KAFKA-3978;
Ensure high watermark is always positive (#4695)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.1 by this push:
new 77137e9 KAFKA-3978; Ensure high watermark is always positive (#4695)
77137e9 is described below
commit 77137e993b15ad4272972f05f61e7faee36a1914
Author: Dong Lin <li...@users.noreply.github.com>
AuthorDate: Tue Mar 13 22:52:59 2018 -0700
KAFKA-3978; Ensure high watermark is always positive (#4695)
Partition high watermark may become -1 if the initial value is out of range. This situation can occur during partition reassignment, for example. The bug was fixed and validated with unit test in this patch.
Reviewers: Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>
---
core/src/main/scala/kafka/cluster/Partition.scala | 6 ++++-
core/src/main/scala/kafka/cluster/Replica.scala | 14 ++++++++++--
core/src/main/scala/kafka/log/Log.scala | 8 +++----
.../admin/ReassignPartitionsClusterTest.scala | 26 ++++++++++++++++++++++
4 files changed, 47 insertions(+), 7 deletions(-)
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 3b97671..68faf00 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -460,7 +460,11 @@ class Partition(val topic: String,
}.map(_.logEndOffset)
val newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering)
val oldHighWatermark = leaderReplica.highWatermark
- if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset || oldHighWatermark.onOlderSegment(newHighWatermark)) {
+
+ // Ensure that the high watermark increases monotonically. We also update the high watermark when the new
+ // offset metadata is on a newer segment, which occurs whenever the log is rolled to a new segment.
+ if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset ||
+ (oldHighWatermark.messageOffset == newHighWatermark.messageOffset && oldHighWatermark.onOlderSegment(newHighWatermark))) {
leaderReplica.highWatermark = newHighWatermark
debug(s"High watermark updated to $newHighWatermark")
true
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala
index e41e389..030e5b7 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -138,6 +138,9 @@ class Replica(val brokerId: Int,
def highWatermark_=(newHighWatermark: LogOffsetMetadata) {
if (isLocal) {
+ if (newHighWatermark.messageOffset < 0)
+ throw new IllegalArgumentException("High watermark offset should be non-negative")
+
highWatermarkMetadata = newHighWatermark
log.foreach(_.onHighWatermarkIncremented(newHighWatermark.messageOffset))
trace(s"Setting high watermark for replica $brokerId partition $topicPartition to [$newHighWatermark]")
@@ -165,9 +168,16 @@ class Replica(val brokerId: Int,
s"non-local replica $brokerId"))
}
- def convertHWToLocalOffsetMetadata() = {
+ /*
+ * Convert hw to local offset metadata by reading the log at the hw offset.
+ * If the hw offset is out of range, return the first offset of the first log segment as the offset metadata.
+ */
+ def convertHWToLocalOffsetMetadata() {
if (isLocal) {
- highWatermarkMetadata = log.get.convertToOffsetMetadata(highWatermarkMetadata.messageOffset)
+ highWatermarkMetadata = log.get.convertToOffsetMetadata(highWatermarkMetadata.messageOffset).getOrElse {
+ val firstOffset = log.get.logSegments.head.baseOffset
+ new LogOffsetMetadata(firstOffset, firstOffset, 0)
+ }
} else {
throw new KafkaException(s"Should not construct complete high watermark on partition $topicPartition's non-local replica $brokerId")
}
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 257dd8f..f0050f5 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -1126,14 +1126,14 @@ class Log(@volatile var dir: File,
/**
* Given a message offset, find its corresponding offset metadata in the log.
- * If the message offset is out of range, return unknown offset metadata
+ * If the message offset is out of range, return None to the caller.
*/
- def convertToOffsetMetadata(offset: Long): LogOffsetMetadata = {
+ def convertToOffsetMetadata(offset: Long): Option[LogOffsetMetadata] = {
try {
val fetchDataInfo = readUncommitted(offset, 1)
- fetchDataInfo.fetchOffsetMetadata
+ Some(fetchDataInfo.fetchOffsetMetadata)
} catch {
- case _: OffsetOutOfRangeException => LogOffsetMetadata.UnknownOffsetMetadata
+ case _: OffsetOutOfRangeException => None
}
}
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
index 2a24a37..0c41519 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
@@ -77,6 +77,32 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
}
@Test
+ def testHwAfterPartitionReassignment(): Unit = {
+ //Given a single replica on server 100
+ startBrokers(Seq(100, 101, 102))
+ adminClient = createAdminClient(servers)
+ createTopic(zkClient, topicName, Map(0 -> Seq(100)), servers = servers)
+
+ val topicPartition = new TopicPartition(topicName, 0)
+ val leaderServer = servers.find(_.config.brokerId == 100).get
+ leaderServer.replicaManager.logManager.truncateFullyAndStartAt(topicPartition, 100L, false)
+
+ val topicJson: String = s"""{"version":1,"partitions":[{"topic":"$topicName","partition":0,"replicas":[101, 102]}]}"""
+ ReassignPartitionsCommand.executeAssignment(zkClient, Some(adminClient), topicJson, NoThrottle)
+
+ val newLeaderServer = servers.find(_.config.brokerId == 101).get
+
+ TestUtils.waitUntilTrue (
+ () => newLeaderServer.replicaManager.getPartition(topicPartition).flatMap(_.leaderReplicaIfLocal).isDefined,
+ "broker 101 should be the new leader", pause = 1L
+ )
+
+ assertEquals(100, newLeaderServer.replicaManager.getReplicaOrException(topicPartition).highWatermark.messageOffset)
+ servers.foreach(server => waitUntilTrue(() => server.replicaManager.getReplicaOrException(topicPartition).highWatermark.messageOffset == 100, ""))
+ }
+
+
+ @Test
def shouldMoveSinglePartition(): Unit = {
//Given a single replica on server 100
startBrokers(Seq(100, 101))
--
To stop receiving notification emails like this one, please contact
jgus@apache.org.