You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/07/02 19:22:01 UTC

[GitHub] [kafka] thomaskwscott commented on a change in pull request #10960: KAFKA-12981 Ensure LogSegment.maxTimestampSoFar and LogSegment.offsetOfMaxTimestampSoFar are read/updated in sync

thomaskwscott commented on a change in pull request #10960:
URL: https://github.com/apache/kafka/pull/10960#discussion_r663208192



##########
File path: core/src/main/scala/kafka/log/LogSegment.scala
##########
@@ -99,21 +99,22 @@ class LogSegment private[log] (val log: FileRecords,
   // volatile for LogCleaner to see the update
   @volatile private var rollingBasedTimestamp: Option[Long] = None
 
+  /* The maximum timestamp and offset we see so far */
+  @volatile private var _maxTimestampAndOffsetSoFar: (Option[Long], Option[Long]) = (None, None)
+  def maxTimestampAndOffsetSoFar_= (timestampAndOffset: (Long, Long)) : Unit = _maxTimestampAndOffsetSoFar = (Some(timestampAndOffset._1), Some(timestampAndOffset._2))
+  def maxTimestampAndOffsetSoFar: (Long,Long) = {
+    if (_maxTimestampAndOffsetSoFar._1.isEmpty || _maxTimestampAndOffsetSoFar._2.isEmpty)
+      _maxTimestampAndOffsetSoFar = (Some(timeIndex.lastEntry.timestamp), Some(timeIndex.lastEntry.offset))
+    (_maxTimestampAndOffsetSoFar._1.get, _maxTimestampAndOffsetSoFar._2.get)
+  }

Review comment:
       Thanks for the review, I switched to case class for clarity, TimestampAndOffset does already exist but is not suitable for this purpose so I named it MaxTimestampAndOffset. It looks like the options are only checked during close() and so MaxOffsetAndTimestamp.empty is valid without them. I have changed the timestamp to -1L to make sure it is not a valid value when unset. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org