You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/02/05 18:06:51 UTC
kafka git commit: KAFKA-3003: Update the replica.highWatermark
correctly
Repository: kafka
Updated Branches:
refs/heads/trunk dc662776c -> 216c75bbc
KAFKA-3003: Update the replica.highWatermark correctly
Author: Jiangjie Qin <be...@gmail.com>
Reviewers: Ismael Juma, Guozhang Wang
Closes #688 from becketqin/KAFKA-3003
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/216c75bb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/216c75bb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/216c75bb
Branch: refs/heads/trunk
Commit: 216c75bbc567a633447b1cbf2bb8c6ba34e2464f
Parents: dc66277
Author: Jiangjie Qin <be...@gmail.com>
Authored: Fri Feb 5 09:06:47 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Feb 5 09:06:47 2016 -0800
----------------------------------------------------------------------
.../scala/kafka/server/LogOffsetMetadata.scala | 22 ++++++++++++++------
1 file changed, 16 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/216c75bb/core/src/main/scala/kafka/server/LogOffsetMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/LogOffsetMetadata.scala b/core/src/main/scala/kafka/server/LogOffsetMetadata.scala
index 00b60fe..6add702 100644
--- a/core/src/main/scala/kafka/server/LogOffsetMetadata.scala
+++ b/core/src/main/scala/kafka/server/LogOffsetMetadata.scala
@@ -45,7 +45,7 @@ case class LogOffsetMetadata(messageOffset: Long,
// check if this offset is already on an older segment compared with the given offset
def offsetOnOlderSegment(that: LogOffsetMetadata): Boolean = {
if (messageOffsetOnly())
- throw new KafkaException("%s cannot compare its segment info with %s since it only has message offset info".format(this, that))
+ throw new KafkaException(s"$this cannot compare its segment info with $that since it only has message offset info")
this.segmentBaseOffset < that.segmentBaseOffset
}
@@ -53,13 +53,23 @@ case class LogOffsetMetadata(messageOffset: Long,
// check if this offset is on the same segment with the given offset
def offsetOnSameSegment(that: LogOffsetMetadata): Boolean = {
if (messageOffsetOnly())
- throw new KafkaException("%s cannot compare its segment info with %s since it only has message offset info".format(this, that))
+ throw new KafkaException(s"$this cannot compare its segment info with $that since it only has message offset info")
this.segmentBaseOffset == that.segmentBaseOffset
}
- // check if this offset is before the given offset
- def precedes(that: LogOffsetMetadata): Boolean = this.messageOffset < that.messageOffset
+ // check if this offset is before the given offset. We need to compare both message offset and segment base offset.
+ def precedes(that: LogOffsetMetadata): Boolean = {
+ this.messageOffset < that.messageOffset || {
+ if (this.messageOffsetOnly() && that.messageOffsetOnly())
+ false
+ else if (!this.messageOffsetOnly() && !that.messageOffsetOnly())
+ this.segmentBaseOffset < that.segmentBaseOffset
+ else
+ throw new KafkaException(s"Cannot compare $this with $that as one has segment base offsets and the other does not.")
+ }
+ }
+
// compute the number of messages between this offset to the given offset
def offsetDiff(that: LogOffsetMetadata): Long = {
@@ -70,9 +80,9 @@ case class LogOffsetMetadata(messageOffset: Long,
// if they are on the same segment and this offset precedes the given offset
def positionDiff(that: LogOffsetMetadata): Int = {
if(!offsetOnSameSegment(that))
- throw new KafkaException("%s cannot compare its segment position with %s since they are not on the same segment".format(this, that))
+ throw new KafkaException(s"$this cannot compare its segment position with $that since they are not on the same segment")
if(messageOffsetOnly())
- throw new KafkaException("%s cannot compare its segment position with %s since it only has message offset info".format(this, that))
+ throw new KafkaException(s"$this cannot compare its segment position with $that since it only has message offset info")
this.relativePositionInSegment - that.relativePositionInSegment
}