You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/05/13 00:52:06 UTC

[GitHub] [kafka] kowshik opened a new pull request #10684: MINOR: Improve Log layer segment iteration logic and few other areas

kowshik opened a new pull request #10684:
URL: https://github.com/apache/kafka/pull/10684


   - In `Log.rebuildProducerState()` I've removed usage of the `allSegments` local variable.
   - In `Log.collectAbortedTransactions()` I'm now raising an exception when the segment can't be found.
   - I've introduced a new `LogSegments.higherEntries()` API that's now used to make the logic a bit more readable in `Log.read()`, `Log. collectAbortedTransactions()` and `Log.deletableSegments()` APIs.
   
   **Tests:**
   Relying on existing unit tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ccding commented on pull request #10684: MINOR: Improve Log layer segment iteration logic and few other areas

Posted by GitBox <gi...@apache.org>.
ccding commented on pull request #10684:
URL: https://github.com/apache/kafka/pull/10684#issuecomment-841577947


   LGTM
   
   Thanks @kowshik 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kowshik commented on a change in pull request #10684: MINOR: Improve Log layer segment iteration logic and few other areas

Posted by GitBox <gi...@apache.org>.
kowshik commented on a change in pull request #10684:
URL: https://github.com/apache/kafka/pull/10684#discussion_r631591402



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1219,10 +1216,10 @@ class Log(@volatile private var _dir: File,
           fetchDataInfo = segment.read(startOffset, maxLength, maxPosition, minOneMessage)
           if (fetchDataInfo != null) {
             if (includeAbortedTxns)
-              fetchDataInfo = addAbortedTransactions(startOffset, segmentEntry, fetchDataInfo)
-          } else segmentEntryOpt = segments.higherEntry(baseOffset)
+              fetchDataInfo = addAbortedTransactions(startOffset, segment, fetchDataInfo)
+          } else segmentOpt = segmentsIterator.nextOption()
 
-          done = fetchDataInfo != null || segmentEntryOpt.isEmpty
+          done = fetchDataInfo != null || segmentOpt.isEmpty

Review comment:
       Done. Please see 80c40171302546be2dcf2444167d7f375f0b820d.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kowshik commented on a change in pull request #10684: MINOR: Improve Log layer segment iteration logic and few other areas

Posted by GitBox <gi...@apache.org>.
kowshik commented on a change in pull request #10684:
URL: https://github.com/apache/kafka/pull/10684#discussion_r631584810



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1204,13 +1202,12 @@ class Log(@volatile private var _dir: File,
         // Do the read on the segment with a base offset less than the target offset
         // but if that segment doesn't contain any messages with an offset greater than that
         // continue to read from successive segments until we get some messages or we reach the end of the log
-        var done = segmentEntryOpt.isEmpty
+        var done = segmentOpt.isEmpty
         var fetchDataInfo: FetchDataInfo = null
+        val segmentsIterator = segmentOpt.map(segment => segments.higherSegments(segment.baseOffset))

Review comment:
       Done, made it private. Please see 80c40171302546be2dcf2444167d7f375f0b820d.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ccding commented on a change in pull request #10684: MINOR: Improve Log layer segment iteration logic and few other areas

Posted by GitBox <gi...@apache.org>.
ccding commented on a change in pull request #10684:
URL: https://github.com/apache/kafka/pull/10684#discussion_r631539787



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1204,13 +1202,12 @@ class Log(@volatile private var _dir: File,
         // Do the read on the segment with a base offset less than the target offset
         // but if that segment doesn't contain any messages with an offset greater than that
         // continue to read from successive segments until we get some messages or we reach the end of the log
-        var done = segmentEntryOpt.isEmpty
+        var done = segmentOpt.isEmpty
         var fetchDataInfo: FetchDataInfo = null
+        val segmentsIterator = segmentOpt.map(segment => segments.higherSegments(segment.baseOffset))

Review comment:
       Given we can get baseOffset from segment, can we get rid of `floorEntry` overall? or make `floorEntry` private?

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1219,10 +1216,10 @@ class Log(@volatile private var _dir: File,
           fetchDataInfo = segment.read(startOffset, maxLength, maxPosition, minOneMessage)
           if (fetchDataInfo != null) {
             if (includeAbortedTxns)
-              fetchDataInfo = addAbortedTransactions(startOffset, segmentEntry, fetchDataInfo)
-          } else segmentEntryOpt = segments.higherEntry(baseOffset)
+              fetchDataInfo = addAbortedTransactions(startOffset, segment, fetchDataInfo)
+          } else segmentOpt = segmentsIterator.nextOption()
 
-          done = fetchDataInfo != null || segmentEntryOpt.isEmpty
+          done = fetchDataInfo != null || segmentOpt.isEmpty

Review comment:
       can we simply use `while (fetchDataInfo == null && segmentOpt.isDefined)` rather than `while (!done)`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kowshik commented on a change in pull request #10684: MINOR: Improve Log layer segment iteration logic and few other areas

Posted by GitBox <gi...@apache.org>.
kowshik commented on a change in pull request #10684:
URL: https://github.com/apache/kafka/pull/10684#discussion_r631591402



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1219,10 +1216,10 @@ class Log(@volatile private var _dir: File,
           fetchDataInfo = segment.read(startOffset, maxLength, maxPosition, minOneMessage)
           if (fetchDataInfo != null) {
             if (includeAbortedTxns)
-              fetchDataInfo = addAbortedTransactions(startOffset, segmentEntry, fetchDataInfo)
-          } else segmentEntryOpt = segments.higherEntry(baseOffset)
+              fetchDataInfo = addAbortedTransactions(startOffset, segment, fetchDataInfo)
+          } else segmentOpt = segmentsIterator.nextOption()
 
-          done = fetchDataInfo != null || segmentEntryOpt.isEmpty
+          done = fetchDataInfo != null || segmentOpt.isEmpty

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kowshik commented on pull request #10684: MINOR: Improve Log layer segment iteration logic and few other areas

Posted by GitBox <gi...@apache.org>.
kowshik commented on pull request #10684:
URL: https://github.com/apache/kafka/pull/10684#issuecomment-849086875


   Thanks for the review @junrao! I have addressed your comments in 4b836034415c3d5f6b84384ef9be1e75b66edc4b.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kowshik commented on a change in pull request #10684: MINOR: Improve Log layer segment iteration logic and few other areas

Posted by GitBox <gi...@apache.org>.
kowshik commented on a change in pull request #10684:
URL: https://github.com/apache/kafka/pull/10684#discussion_r631584810



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1204,13 +1202,12 @@ class Log(@volatile private var _dir: File,
         // Do the read on the segment with a base offset less than the target offset
         // but if that segment doesn't contain any messages with an offset greater than that
         // continue to read from successive segments until we get some messages or we reach the end of the log
-        var done = segmentEntryOpt.isEmpty
+        var done = segmentOpt.isEmpty
         var fetchDataInfo: FetchDataInfo = null
+        val segmentsIterator = segmentOpt.map(segment => segments.higherSegments(segment.baseOffset))

Review comment:
       Done, made it private. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kowshik commented on pull request #10684: MINOR: Improve Log layer segment iteration logic and few other areas

Posted by GitBox <gi...@apache.org>.
kowshik commented on pull request #10684:
URL: https://github.com/apache/kafka/pull/10684#issuecomment-840215023


   cc @junrao


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao commented on a change in pull request #10684: MINOR: Improve Log layer segment iteration logic and few other areas

Posted by GitBox <gi...@apache.org>.
junrao commented on a change in pull request #10684:
URL: https://github.com/apache/kafka/pull/10684#discussion_r638164890



##########
File path: core/src/main/scala/kafka/log/LogSegments.scala
##########
@@ -210,11 +209,23 @@ class LogSegments(topicPartition: TopicPartition) {
    * @return the entry associated with the greatest offset, if it exists.
    */
   @threadsafe
-  def lastEntry: Option[Map.Entry[JLong, LogSegment]] = Option(segments.lastEntry)
+  def lastEntry: Option[Map.Entry[Long, LogSegment]] = Option(segments.lastEntry)
 
   /**
    * @return the log segment with the greatest offset, if it exists.
    */
   @threadsafe
   def lastSegment: Option[LogSegment] = lastEntry.map(_.getValue)
+
+  /**
+   * @return an iterable with log segments ordered from lowest base offset to highest,
+   *         each segment returned  has a base offset strictly greater than the provided baseOffset.
+   */
+  def higherSegments(baseOffset: Long): Iterable[LogSegment] = {
+    val view =
+      Option(segments.higherKey(baseOffset)).map {
+        higherOffset => segments.tailMap(higherOffset, true)
+      }.getOrElse(new ConcurrentSkipListMap[Long, LogSegment]())

Review comment:
       Could we return a constant empty map?

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1217,10 +1213,8 @@ class Log(@volatile private var _dir: File,
           fetchDataInfo = segment.read(startOffset, maxLength, maxPosition, minOneMessage)
           if (fetchDataInfo != null) {
             if (includeAbortedTxns)
-              fetchDataInfo = addAbortedTransactions(startOffset, segmentEntry, fetchDataInfo)
-          } else segmentEntryOpt = segments.higherEntry(baseOffset)
-
-          done = fetchDataInfo != null || segmentEntryOpt.isEmpty
+              fetchDataInfo = addAbortedTransactions(startOffset, segment, fetchDataInfo)
+          } else segmentOpt = segmentsIterator.nextOption()

Review comment:
       The old logic supports skipping forward multiple segments to find the right data. The new logic seems to only support skipping forward once. It would be useful to preserve the original semantic.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kowshik commented on a change in pull request #10684: MINOR: Improve Log layer segment iteration logic and few other areas

Posted by GitBox <gi...@apache.org>.
kowshik commented on a change in pull request #10684:
URL: https://github.com/apache/kafka/pull/10684#discussion_r631501285



##########
File path: core/src/main/scala/kafka/log/LogSegments.scala
##########
@@ -17,7 +17,6 @@
 package kafka.log
 
 import java.io.File
-import java.lang.{Long => JLong}

Review comment:
       cc @ccding 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kowshik commented on a change in pull request #10684: MINOR: Improve Log layer segment iteration logic and few other areas

Posted by GitBox <gi...@apache.org>.
kowshik commented on a change in pull request #10684:
URL: https://github.com/apache/kafka/pull/10684#discussion_r638520142



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1217,10 +1213,8 @@ class Log(@volatile private var _dir: File,
           fetchDataInfo = segment.read(startOffset, maxLength, maxPosition, minOneMessage)
           if (fetchDataInfo != null) {
             if (includeAbortedTxns)
-              fetchDataInfo = addAbortedTransactions(startOffset, segmentEntry, fetchDataInfo)
-          } else segmentEntryOpt = segments.higherEntry(baseOffset)
-
-          done = fetchDataInfo != null || segmentEntryOpt.isEmpty
+              fetchDataInfo = addAbortedTransactions(startOffset, segment, fetchDataInfo)
+          } else segmentOpt = segmentsIterator.nextOption()

Review comment:
       That is an excellent catch. Sorry I missed this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao merged pull request #10684: MINOR: Improve Log layer segment iteration logic and few other areas

Posted by GitBox <gi...@apache.org>.
junrao merged pull request #10684:
URL: https://github.com/apache/kafka/pull/10684


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kowshik commented on a change in pull request #10684: MINOR: Improve Log layer segment iteration logic and few other areas

Posted by GitBox <gi...@apache.org>.
kowshik commented on a change in pull request #10684:
URL: https://github.com/apache/kafka/pull/10684#discussion_r631581404



##########
File path: core/src/main/scala/kafka/log/LogSegments.scala
##########
@@ -185,7 +184,7 @@ class LogSegments(topicPartition: TopicPartition) {
    *         if it exists.
    */
   @threadsafe
-  def higherEntry(offset: Long): Option[Map.Entry[JLong, LogSegment]] = Option(segments.higherEntry(offset))
+  def higherEntry(offset: Long): Option[Map.Entry[Long, LogSegment]] = Option(segments.higherEntry(offset))

Review comment:
       That's one reason. The other is that `Map.Entry` provided a better interface for attribute lookup via `Map.Entry.getKey()` and `Map.Entry.getValue()`, as compared to `._1` and `._2` in a tuple.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kowshik commented on a change in pull request #10684: MINOR: Improve Log layer segment iteration logic and few other areas

Posted by GitBox <gi...@apache.org>.
kowshik commented on a change in pull request #10684:
URL: https://github.com/apache/kafka/pull/10684#discussion_r638519831



##########
File path: core/src/main/scala/kafka/log/LogSegments.scala
##########
@@ -210,11 +209,23 @@ class LogSegments(topicPartition: TopicPartition) {
    * @return the entry associated with the greatest offset, if it exists.
    */
   @threadsafe
-  def lastEntry: Option[Map.Entry[JLong, LogSegment]] = Option(segments.lastEntry)
+  def lastEntry: Option[Map.Entry[Long, LogSegment]] = Option(segments.lastEntry)
 
   /**
    * @return the log segment with the greatest offset, if it exists.
    */
   @threadsafe
   def lastSegment: Option[LogSegment] = lastEntry.map(_.getValue)
+
+  /**
+   * @return an iterable with log segments ordered from lowest base offset to highest,
+   *         each segment returned  has a base offset strictly greater than the provided baseOffset.
+   */
+  def higherSegments(baseOffset: Long): Iterable[LogSegment] = {
+    val view =
+      Option(segments.higherKey(baseOffset)).map {
+        higherOffset => segments.tailMap(higherOffset, true)
+      }.getOrElse(new ConcurrentSkipListMap[Long, LogSegment]())

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kowshik commented on a change in pull request #10684: MINOR: Improve Log layer segment iteration logic and few other areas

Posted by GitBox <gi...@apache.org>.
kowshik commented on a change in pull request #10684:
URL: https://github.com/apache/kafka/pull/10684#discussion_r638520142



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1217,10 +1213,8 @@ class Log(@volatile private var _dir: File,
           fetchDataInfo = segment.read(startOffset, maxLength, maxPosition, minOneMessage)
           if (fetchDataInfo != null) {
             if (includeAbortedTxns)
-              fetchDataInfo = addAbortedTransactions(startOffset, segmentEntry, fetchDataInfo)
-          } else segmentEntryOpt = segments.higherEntry(baseOffset)
-
-          done = fetchDataInfo != null || segmentEntryOpt.isEmpty
+              fetchDataInfo = addAbortedTransactions(startOffset, segment, fetchDataInfo)
+          } else segmentOpt = segmentsIterator.nextOption()

Review comment:
       That is an excellent catch. Sorry I missed this. Done, changed it now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10684: MINOR: Improve Log layer segment iteration logic and few other areas

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10684:
URL: https://github.com/apache/kafka/pull/10684#discussion_r631564915



##########
File path: core/src/main/scala/kafka/log/LogSegments.scala
##########
@@ -185,7 +184,7 @@ class LogSegments(topicPartition: TopicPartition) {
    *         if it exists.
    */
   @threadsafe
-  def higherEntry(offset: Long): Option[Map.Entry[JLong, LogSegment]] = Option(segments.higherEntry(offset))
+  def higherEntry(offset: Long): Option[Map.Entry[Long, LogSegment]] = Option(segments.higherEntry(offset))

Review comment:
       Why do we use `Map.Entry` instead of tuple here? Is it to avoid the allocation?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kowshik commented on a change in pull request #10684: MINOR: Improve Log layer segment iteration logic and few other areas

Posted by GitBox <gi...@apache.org>.
kowshik commented on a change in pull request #10684:
URL: https://github.com/apache/kafka/pull/10684#discussion_r631501034



##########
File path: core/src/main/scala/kafka/log/LogSegments.scala
##########
@@ -210,11 +209,23 @@ class LogSegments(topicPartition: TopicPartition) {
    * @return the entry associated with the greatest offset, if it exists.
    */
   @threadsafe
-  def lastEntry: Option[Map.Entry[JLong, LogSegment]] = Option(segments.lastEntry)
+  def lastEntry: Option[Map.Entry[Long, LogSegment]] = Option(segments.lastEntry)
 
   /**
    * @return the log segment with the greatest offset, if it exists.
    */
   @threadsafe
   def lastSegment: Option[LogSegment] = lastEntry.map(_.getValue)
+
+  /**
+   * Returns an iterable to log segments ordered from lowest base offset to highest.
+   * Each segment in the returned iterable has a base offset strictly greater than the provided baseOffset.
+   */
+  def higherSegments(baseOffset: Long): Iterable[LogSegment] = {

Review comment:
       @ijuma If you think this API is a good fit to the requirement, I can add unit tests for it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kowshik commented on pull request #10684: MINOR: Improve Log layer segment iteration logic and few other areas

Posted by GitBox <gi...@apache.org>.
kowshik commented on pull request #10684:
URL: https://github.com/apache/kafka/pull/10684#issuecomment-849263208


   @junrao Thanks for the review and for catching this issue! I've fixed the build now. The issue was that the code in `Log.scala` used the `Iterator.nextOption()`[API](https://www.scala-lang.org/api/2.13.0/scala/collection/Iterator.html#nextOption():Option[A]) that was introduced only in scala v2.13, so the build failed for scala v2.12.
   
   I have now also added unit tests for the newly introduced `LogSegments.higherSegments()` API.
   
   The PR is ready for review again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org