You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/27 04:10:24 UTC

[GitHub] [kafka] hachikuji commented on a change in pull request #8709: KAFKA-9952; Remove immediate fetch completion logic on high watermark updates

hachikuji commented on a change in pull request #8709:
URL: https://github.com/apache/kafka/pull/8709#discussion_r430546391



##########
File path: core/src/main/scala/kafka/server/DelayedFetch.scala
##########
@@ -120,14 +119,6 @@ class DelayedFetch(delayMs: Long,
                   accumulatedSize += bytesAvailable
               }
             }
-
-            if (fetchMetadata.isFromFollower) {
-              // Case H check if the follower has the latest HW from the leader
-              if (partition.getReplica(fetchMetadata.replicaId)
-                .exists(r => offsetSnapshot.highWatermark.messageOffset > r.lastSentHighWatermark)) {
-                return forceComplete()
-              }
-            }

Review comment:
       Yeah, we missed this in the other patch.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1100,10 +1100,8 @@ class ReplicaManager(val config: KafkaConfig,
             leaderLogEndOffset = readInfo.logEndOffset,
             followerLogStartOffset = followerLogStartOffset,
             fetchTimeMs = fetchTimeMs,
-            readSize = adjustedMaxBytes,

Review comment:
       We can get the read size already from the `Records` object. The code must have been changed at some point to use this. It looks like `adjustedMaxBytes` is still sent through in the call to `Partition.readRecords`.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -109,9 +109,19 @@ case class LogReadResult(info: FetchDataInfo,
   def withEmptyFetchInfo: LogReadResult =
     copy(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY))
 
-  override def toString =
-    s"Fetch Data: [$info], HW: [$highWatermark], leaderLogStartOffset: [$leaderLogStartOffset], leaderLogEndOffset: [$leaderLogEndOffset], " +
-    s"followerLogStartOffset: [$followerLogStartOffset], fetchTimeMs: [$fetchTimeMs], readSize: [$readSize], lastStableOffset: [$lastStableOffset], error: [$error]"
+  override def toString = {
+    "LogReadResult(" +
+      s"info=$info, " +
+      s"highWatermark=$highWatermark, " +
+      s"leaderLogStartOffset=$leaderLogStartOffset, " +
+      s"leaderLogEndOffset=$leaderLogEndOffset, " +
+      s"followerLogStartOffset=$followerLogStartOffset, " +
+      s"fetchTimeMs=$fetchTimeMs, " +
+      s"preferredReadReplica=$preferredReadReplica, " +
+      s"lastStableOffset=$lastStableOffset, " +
+      s"error=$error" +
+      ")"

Review comment:
       Do we get the labels from the default `toString`? In the past, I thought it would only show the values.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org