You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/10/14 16:11:36 UTC

[GitHub] [kafka] rajinisivaram opened a new pull request #9434: MINOR: Handle lastFetchedEpoch/divergingEpoch in FetchSession and DelayedFetch

rajinisivaram opened a new pull request #9434:
URL: https://github.com/apache/kafka/pull/9434


   In 2.7, we added lastFetchedEpoch to fetch requests and divergingEpoch to fetch responses. We are not using these for truncation yet, but in order to use these for truncation with IBP 2.7 onwards in the next release, we should make sure that we handle these in all the supporting classes even in 2.7.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a change in pull request #9434: MINOR: Handle lastFetchedEpoch/divergingEpoch in FetchSession and DelayedFetch

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on a change in pull request #9434:
URL: https://github.com/apache/kafka/pull/9434#discussion_r505716961



##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -77,7 +77,8 @@ class CachedPartition(val topic: String,
                       var highWatermark: Long,
                       var leaderEpoch: Optional[Integer],
                       var fetcherLogStartOffset: Long,
-                      var localLogStartOffset: Long)
+                      var localLogStartOffset: Long,
+                      var lastFetchedEpoch: Optional[Integer] = Optional.empty[Integer])

Review comment:
       removed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a change in pull request #9434: MINOR: Handle lastFetchedEpoch/divergingEpoch in FetchSession and DelayedFetch

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on a change in pull request #9434:
URL: https://github.com/apache/kafka/pull/9434#discussion_r504952903



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1084,7 +1084,7 @@ class ReplicaManager(val config: KafkaConfig,
       fetchInfos.foreach { case (topicPartition, partitionData) =>
         logReadResultMap.get(topicPartition).foreach(logReadResult => {
           val logOffsetMetadata = logReadResult.info.fetchOffsetMetadata
-          fetchPartitionStatus += (topicPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData))
+          fetchPartitionStatus += (topicPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData, logReadResult.divergingEpoch.nonEmpty))

Review comment:
       ah, yes, so we don't need to check the original result in DelayedFetch, we return immediately here. Updated.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on pull request #9434: MINOR: Handle lastFetchedEpoch/divergingEpoch in FetchSession and DelayedFetch

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on pull request #9434:
URL: https://github.com/apache/kafka/pull/9434#issuecomment-709919899


   @hachikuji Thanks for the reviews. Test failure not related (`kafka.network.ConnectionQuotasTest.testNoConnectionLimitsByDefault`). Merging to trunk and 2.7.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a change in pull request #9434: MINOR: Handle lastFetchedEpoch/divergingEpoch in FetchSession and DelayedFetch

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on a change in pull request #9434:
URL: https://github.com/apache/kafka/pull/9434#discussion_r505698143



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -1162,6 +1162,13 @@ class Partition(val topicPartition: TopicPartition,
     localLog.fetchOffsetSnapshot
   }
 
+  def hasDivergingEpoch(currentLeaderEpoch: Optional[Integer],
+                        lastFetchedEpoch: Int,
+                        fetchOffset: Long): Boolean = {
+    val epochEndOffset = lastOffsetForLeaderEpoch(currentLeaderEpoch, lastFetchedEpoch, fetchOnlyFromLeader = false)

Review comment:
       @hachikuji Thanks for the review. I took the check from `Partition.readRecords`, but we throw exceptions there to return appropriate errors. I was thinking we would return true here for undefined epochs because of the check below and that would go through the other code path to return the appropriate errors or diverging epoch. Do you think we should do the same error handling hchecks ere as in readRecords to make the flow more obvious?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9434: MINOR: Handle lastFetchedEpoch/divergingEpoch in FetchSession and DelayedFetch

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9434:
URL: https://github.com/apache/kafka/pull/9434#discussion_r504839142



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1084,7 +1084,7 @@ class ReplicaManager(val config: KafkaConfig,
       fetchInfos.foreach { case (topicPartition, partitionData) =>
         logReadResultMap.get(topicPartition).foreach(logReadResult => {
           val logOffsetMetadata = logReadResult.info.fetchOffsetMetadata
-          fetchPartitionStatus += (topicPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData))
+          fetchPartitionStatus += (topicPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData, logReadResult.divergingEpoch.nonEmpty))

Review comment:
       Hmm.. If the `LogReadResult` has a diverging epoch, wouldn't we want to respond immediately?

##########
File path: core/src/main/scala/kafka/server/DelayedFetch.scala
##########
@@ -88,6 +90,13 @@ class DelayedFetch(delayMs: Long,
         try {
           if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) {
             val partition = replicaManager.getPartitionOrException(topicPartition)
+
+            // Case H: Return diverging epoch in response to trigger truncation
+            if (fetchStatus.hasDivergingEpoch) {

Review comment:
       Here we are using the status from the original fetch. I am wondering if we need to recheck below since it is possible to get a truncation while a fetch is in purgatory.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram merged pull request #9434: MINOR: Handle lastFetchedEpoch/divergingEpoch in FetchSession and DelayedFetch

Posted by GitBox <gi...@apache.org>.
rajinisivaram merged pull request #9434:
URL: https://github.com/apache/kafka/pull/9434


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9434: MINOR: Handle lastFetchedEpoch/divergingEpoch in FetchSession and DelayedFetch

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9434:
URL: https://github.com/apache/kafka/pull/9434#discussion_r505678344



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -1162,6 +1162,13 @@ class Partition(val topicPartition: TopicPartition,
     localLog.fetchOffsetSnapshot
   }
 
+  def hasDivergingEpoch(currentLeaderEpoch: Optional[Integer],
+                        lastFetchedEpoch: Int,
+                        fetchOffset: Long): Boolean = {
+    val epochEndOffset = lastOffsetForLeaderEpoch(currentLeaderEpoch, lastFetchedEpoch, fetchOnlyFromLeader = false)

Review comment:
       Do we need to check the error? Or are you relying on the check below failing if `UNDEFINED_EPOCH` is returned?

##########
File path: core/src/main/scala/kafka/server/DelayedFetch.scala
##########
@@ -96,6 +98,7 @@ class DelayedFetch(delayMs: Long,
               case FetchTxnCommitted => offsetSnapshot.lastStableOffset
             }
 
+

Review comment:
       nit: unneeded newline

##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -77,7 +77,8 @@ class CachedPartition(val topic: String,
                       var highWatermark: Long,
                       var leaderEpoch: Optional[Integer],
                       var fetcherLogStartOffset: Long,
-                      var localLogStartOffset: Long)
+                      var localLogStartOffset: Long,
+                      var lastFetchedEpoch: Optional[Integer] = Optional.empty[Integer])

Review comment:
       Do we need to provide a default here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a change in pull request #9434: MINOR: Handle lastFetchedEpoch/divergingEpoch in FetchSession and DelayedFetch

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on a change in pull request #9434:
URL: https://github.com/apache/kafka/pull/9434#discussion_r505716802



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -1162,6 +1162,13 @@ class Partition(val topicPartition: TopicPartition,
     localLog.fetchOffsetSnapshot
   }
 
+  def hasDivergingEpoch(currentLeaderEpoch: Optional[Integer],
+                        lastFetchedEpoch: Int,
+                        fetchOffset: Long): Boolean = {
+    val epochEndOffset = lastOffsetForLeaderEpoch(currentLeaderEpoch, lastFetchedEpoch, fetchOnlyFromLeader = false)

Review comment:
       As discussed offline, moved the check to DelayedFetch.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on pull request #9434: MINOR: Handle lastFetchedEpoch/divergingEpoch in FetchSession and DelayedFetch

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on pull request #9434:
URL: https://github.com/apache/kafka/pull/9434#issuecomment-709478153


   @hachikuji Thanks for the review, have addressed the comments


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a change in pull request #9434: MINOR: Handle lastFetchedEpoch/divergingEpoch in FetchSession and DelayedFetch

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on a change in pull request #9434:
URL: https://github.com/apache/kafka/pull/9434#discussion_r504954049



##########
File path: core/src/main/scala/kafka/server/DelayedFetch.scala
##########
@@ -88,6 +90,13 @@ class DelayedFetch(delayMs: Long,
         try {
           if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) {
             val partition = replicaManager.getPartitionOrException(topicPartition)
+
+            // Case H: Return diverging epoch in response to trigger truncation
+            if (fetchStatus.hasDivergingEpoch) {

Review comment:
       @hachikuji Thanks for the review. Makes sense, I have added a new check at the end instead of this one, not sure if there is a better way to check.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org