You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/02/05 18:06:51 UTC

kafka git commit: KAFKA-3003: Update the replica.highWatermark correctly

Repository: kafka
Updated Branches:
  refs/heads/trunk dc662776c -> 216c75bbc


KAFKA-3003: Update the replica.highWatermark correctly

Author: Jiangjie Qin <be...@gmail.com>

Reviewers: Ismael Juma, Guozhang Wang

Closes #688 from becketqin/KAFKA-3003


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/216c75bb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/216c75bb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/216c75bb

Branch: refs/heads/trunk
Commit: 216c75bbc567a633447b1cbf2bb8c6ba34e2464f
Parents: dc66277
Author: Jiangjie Qin <be...@gmail.com>
Authored: Fri Feb 5 09:06:47 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Feb 5 09:06:47 2016 -0800

----------------------------------------------------------------------
 .../scala/kafka/server/LogOffsetMetadata.scala  | 22 ++++++++++++++------
 1 file changed, 16 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/216c75bb/core/src/main/scala/kafka/server/LogOffsetMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/LogOffsetMetadata.scala b/core/src/main/scala/kafka/server/LogOffsetMetadata.scala
index 00b60fe..6add702 100644
--- a/core/src/main/scala/kafka/server/LogOffsetMetadata.scala
+++ b/core/src/main/scala/kafka/server/LogOffsetMetadata.scala
@@ -45,7 +45,7 @@ case class LogOffsetMetadata(messageOffset: Long,
   // check if this offset is already on an older segment compared with the given offset
   def offsetOnOlderSegment(that: LogOffsetMetadata): Boolean = {
     if (messageOffsetOnly())
-      throw new KafkaException("%s cannot compare its segment info with %s since it only has message offset info".format(this, that))
+      throw new KafkaException(s"$this cannot compare its segment info with $that since it only has message offset info")
 
     this.segmentBaseOffset < that.segmentBaseOffset
   }
@@ -53,13 +53,23 @@ case class LogOffsetMetadata(messageOffset: Long,
   // check if this offset is on the same segment with the given offset
   def offsetOnSameSegment(that: LogOffsetMetadata): Boolean = {
     if (messageOffsetOnly())
-      throw new KafkaException("%s cannot compare its segment info with %s since it only has message offset info".format(this, that))
+      throw new KafkaException(s"$this cannot compare its segment info with $that since it only has message offset info")
 
     this.segmentBaseOffset == that.segmentBaseOffset
   }
 
-  // check if this offset is before the given offset
-  def precedes(that: LogOffsetMetadata): Boolean = this.messageOffset < that.messageOffset
+  // check if this offset is before the given offset. We need to compare both message offset and segment base offset.
+  def precedes(that: LogOffsetMetadata): Boolean = {
+    this.messageOffset < that.messageOffset || {
+      if (this.messageOffsetOnly() && that.messageOffsetOnly())
+        false
+      else if (!this.messageOffsetOnly() && !that.messageOffsetOnly())
+        this.segmentBaseOffset < that.segmentBaseOffset
+      else
+        throw new KafkaException(s"Cannot compare $this with $that as one has segment base offsets and the other does not.")
+    }
+  }
+
 
   // compute the number of messages between this offset to the given offset
   def offsetDiff(that: LogOffsetMetadata): Long = {
@@ -70,9 +80,9 @@ case class LogOffsetMetadata(messageOffset: Long,
   // if they are on the same segment and this offset precedes the given offset
   def positionDiff(that: LogOffsetMetadata): Int = {
     if(!offsetOnSameSegment(that))
-      throw new KafkaException("%s cannot compare its segment position with %s since they are not on the same segment".format(this, that))
+      throw new KafkaException(s"$this cannot compare its segment position with $that since they are not on the same segment")
     if(messageOffsetOnly())
-      throw new KafkaException("%s cannot compare its segment position with %s since it only has message offset info".format(this, that))
+      throw new KafkaException(s"$this cannot compare its segment position with $that since it only has message offset info")
 
     this.relativePositionInSegment - that.relativePositionInSegment
   }