You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/27 03:36:13 UTC

[GitHub] [kafka] ijuma commented on a change in pull request #8709: KAFKA-9952; Remove immediate fetch completion logic on high watermark updates

ijuma commented on a change in pull request #8709:
URL: https://github.com/apache/kafka/pull/8709#discussion_r429653946



##########
File path: core/src/main/scala/kafka/server/DelayedFetch.scala
##########
@@ -120,14 +119,6 @@ class DelayedFetch(delayMs: Long,
                   accumulatedSize += bytesAvailable
               }
             }
-
-            if (fetchMetadata.isFromFollower) {
-              // Case H check if the follower has the latest HW from the leader
-              if (partition.getReplica(fetchMetadata.replicaId)
-                .exists(r => offsetSnapshot.highWatermark.messageOffset > r.lastSentHighWatermark)) {
-                return forceComplete()
-              }
-            }

Review comment:
       Interesting. This could cause an early return for the leader case too, right?

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1100,10 +1100,8 @@ class ReplicaManager(val config: KafkaConfig,
             leaderLogEndOffset = readInfo.logEndOffset,
             followerLogStartOffset = followerLogStartOffset,
             fetchTimeMs = fetchTimeMs,
-            readSize = adjustedMaxBytes,

Review comment:
       Do we still need to compute `adjustedMaxBytes`? Also, do you know why we don't need `readSize` anymore? What change made it unnecessary?

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -109,9 +109,19 @@ case class LogReadResult(info: FetchDataInfo,
   def withEmptyFetchInfo: LogReadResult =
     copy(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY))
 
-  override def toString =
-    s"Fetch Data: [$info], HW: [$highWatermark], leaderLogStartOffset: [$leaderLogStartOffset], leaderLogEndOffset: [$leaderLogEndOffset], " +
-    s"followerLogStartOffset: [$followerLogStartOffset], fetchTimeMs: [$fetchTimeMs], readSize: [$readSize], lastStableOffset: [$lastStableOffset], error: [$error]"
+  override def toString = {
+    "LogReadResult(" +
+      s"info=$info, " +
+      s"highWatermark=$highWatermark, " +
+      s"leaderLogStartOffset=$leaderLogStartOffset, " +
+      s"leaderLogEndOffset=$leaderLogEndOffset, " +
+      s"followerLogStartOffset=$followerLogStartOffset, " +
+      s"fetchTimeMs=$fetchTimeMs, " +
+      s"preferredReadReplica=$preferredReadReplica, " +
+      s"lastStableOffset=$lastStableOffset, " +
+      s"error=$error" +
+      ")"

Review comment:
       Could we use the `toString` from the case class?

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1100,10 +1100,8 @@ class ReplicaManager(val config: KafkaConfig,
             leaderLogEndOffset = readInfo.logEndOffset,
             followerLogStartOffset = followerLogStartOffset,
             fetchTimeMs = fetchTimeMs,
-            readSize = adjustedMaxBytes,

Review comment:
       Thanks, makes sense.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -109,9 +109,19 @@ case class LogReadResult(info: FetchDataInfo,
   def withEmptyFetchInfo: LogReadResult =
     copy(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY))
 
-  override def toString =
-    s"Fetch Data: [$info], HW: [$highWatermark], leaderLogStartOffset: [$leaderLogStartOffset], leaderLogEndOffset: [$leaderLogEndOffset], " +
-    s"followerLogStartOffset: [$followerLogStartOffset], fetchTimeMs: [$fetchTimeMs], readSize: [$readSize], lastStableOffset: [$lastStableOffset], error: [$error]"
+  override def toString = {
+    "LogReadResult(" +
+      s"info=$info, " +
+      s"highWatermark=$highWatermark, " +
+      s"leaderLogStartOffset=$leaderLogStartOffset, " +
+      s"leaderLogEndOffset=$leaderLogEndOffset, " +
+      s"followerLogStartOffset=$followerLogStartOffset, " +
+      s"fetchTimeMs=$fetchTimeMs, " +
+      s"preferredReadReplica=$preferredReadReplica, " +
+      s"lastStableOffset=$lastStableOffset, " +
+      s"error=$error" +
+      ")"

Review comment:
       That's true. You have to write `toString` yourself if you want labels. We could write a utility method like so (didn't try too compile it):
   
   ```scala
   public def productToString(product: Product): String = {
     val builder = new StringBuilder
     sb.append(product.prefix)
     for (i <- 0 until productArity) {
       builder.append(productElementName(i)).append("=").append(productElement(i))
     }
     sb.build()
   }
   ```
   
   Then we can call that method from any case class `toString` where we want this format. Avoids some duplication and forgetting to update `toString` when new methods are added.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -109,9 +109,19 @@ case class LogReadResult(info: FetchDataInfo,
   def withEmptyFetchInfo: LogReadResult =
     copy(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY))
 
-  override def toString =
-    s"Fetch Data: [$info], HW: [$highWatermark], leaderLogStartOffset: [$leaderLogStartOffset], leaderLogEndOffset: [$leaderLogEndOffset], " +
-    s"followerLogStartOffset: [$followerLogStartOffset], fetchTimeMs: [$fetchTimeMs], readSize: [$readSize], lastStableOffset: [$lastStableOffset], error: [$error]"
+  override def toString = {
+    "LogReadResult(" +
+      s"info=$info, " +
+      s"highWatermark=$highWatermark, " +
+      s"leaderLogStartOffset=$leaderLogStartOffset, " +
+      s"leaderLogEndOffset=$leaderLogEndOffset, " +
+      s"followerLogStartOffset=$followerLogStartOffset, " +
+      s"fetchTimeMs=$fetchTimeMs, " +
+      s"preferredReadReplica=$preferredReadReplica, " +
+      s"lastStableOffset=$lastStableOffset, " +
+      s"error=$error" +
+      ")"

Review comment:
       That's true. You have to write `toString` yourself if you want labels. We could write a utility method like so (didn't try too compile it):
   
   ```scala
   def productToString(product: Product): String = {
     val builder = new StringBuilder
     sb.append(product.prefix)
     for (i <- 0 until productArity) {
       builder.append(productElementName(i)).append("=").append(productElement(i))
     }
     sb.build()
   }
   ```
   
   Then we can call that method from any case class `toString` where we want this format. Avoids some duplication and forgetting to update `toString` when new methods are added.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -109,9 +109,19 @@ case class LogReadResult(info: FetchDataInfo,
   def withEmptyFetchInfo: LogReadResult =
     copy(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY))
 
-  override def toString =
-    s"Fetch Data: [$info], HW: [$highWatermark], leaderLogStartOffset: [$leaderLogStartOffset], leaderLogEndOffset: [$leaderLogEndOffset], " +
-    s"followerLogStartOffset: [$followerLogStartOffset], fetchTimeMs: [$fetchTimeMs], readSize: [$readSize], lastStableOffset: [$lastStableOffset], error: [$error]"
+  override def toString = {
+    "LogReadResult(" +
+      s"info=$info, " +
+      s"highWatermark=$highWatermark, " +
+      s"leaderLogStartOffset=$leaderLogStartOffset, " +
+      s"leaderLogEndOffset=$leaderLogEndOffset, " +
+      s"followerLogStartOffset=$followerLogStartOffset, " +
+      s"fetchTimeMs=$fetchTimeMs, " +
+      s"preferredReadReplica=$preferredReadReplica, " +
+      s"lastStableOffset=$lastStableOffset, " +
+      s"error=$error" +
+      ")"

Review comment:
       That's true. You have to write `toString` yourself if you want labels. We could write a utility method like so (didn't try too compile it):
   
   ```scala
   def productToString(product: Product): String = {
     val builder = new StringBuilder
     sb.append(product.prefix)
     for (i <- 0 until product.productArity) {
       builder.append(product.productElementName(i))
         .append("=")
         .append(product.productElement(i))
     }
     sb.build()
   }
   ```
   
   Then we can call that method from any case class `toString` where we want this format. Avoids some duplication and forgetting to update `toString` when new methods are added.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -109,9 +109,19 @@ case class LogReadResult(info: FetchDataInfo,
   def withEmptyFetchInfo: LogReadResult =
     copy(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY))
 
-  override def toString =
-    s"Fetch Data: [$info], HW: [$highWatermark], leaderLogStartOffset: [$leaderLogStartOffset], leaderLogEndOffset: [$leaderLogEndOffset], " +
-    s"followerLogStartOffset: [$followerLogStartOffset], fetchTimeMs: [$fetchTimeMs], readSize: [$readSize], lastStableOffset: [$lastStableOffset], error: [$error]"
+  override def toString = {
+    "LogReadResult(" +
+      s"info=$info, " +
+      s"highWatermark=$highWatermark, " +
+      s"leaderLogStartOffset=$leaderLogStartOffset, " +
+      s"leaderLogEndOffset=$leaderLogEndOffset, " +
+      s"followerLogStartOffset=$followerLogStartOffset, " +
+      s"fetchTimeMs=$fetchTimeMs, " +
+      s"preferredReadReplica=$preferredReadReplica, " +
+      s"lastStableOffset=$lastStableOffset, " +
+      s"error=$error" +
+      ")"

Review comment:
       That's true. You have to write `toString` yourself if you want labels. We could write a utility method like so (didn't try too compile it and skipped some details like commas):
   
   ```scala
   def productToString(product: Product): String = {
     val builder = new StringBuilder
     sb.append(product.prefix)
     for (i <- 0 until product.productArity) {
       builder.append(product.productElementName(i))
         .append("=")
         .append(product.productElement(i))
     }
     sb.build()
   }
   ```
   
   Then we can call that method from any case class `toString` where we want this format. Avoids some duplication and forgetting to update `toString` when new methods are added.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -109,9 +109,19 @@ case class LogReadResult(info: FetchDataInfo,
   def withEmptyFetchInfo: LogReadResult =
     copy(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY))
 
-  override def toString =
-    s"Fetch Data: [$info], HW: [$highWatermark], leaderLogStartOffset: [$leaderLogStartOffset], leaderLogEndOffset: [$leaderLogEndOffset], " +
-    s"followerLogStartOffset: [$followerLogStartOffset], fetchTimeMs: [$fetchTimeMs], readSize: [$readSize], lastStableOffset: [$lastStableOffset], error: [$error]"
+  override def toString = {
+    "LogReadResult(" +
+      s"info=$info, " +
+      s"highWatermark=$highWatermark, " +
+      s"leaderLogStartOffset=$leaderLogStartOffset, " +
+      s"leaderLogEndOffset=$leaderLogEndOffset, " +
+      s"followerLogStartOffset=$followerLogStartOffset, " +
+      s"fetchTimeMs=$fetchTimeMs, " +
+      s"preferredReadReplica=$preferredReadReplica, " +
+      s"lastStableOffset=$lastStableOffset, " +
+      s"error=$error" +
+      ")"

Review comment:
       That's true. You have to write `toString` yourself if you want labels. We could write a utility method like so (didn't try too compile it and skipped some details like commas):
   
   ```scala
   def productToString(product: Product): String = {
     val builder = new StringBuilder
     sb.append(product.prefix)
     for (i <- 0 until product.productArity) {
       builder.append(product.productElementName(i))
         .append("=")
         .append(product.productElement(i))
     }
     sb.build()
   }
   ```
   
   Then we can call that method from any case class `toString` where we want this format. Avoids some duplication and forgetting to update `toString` when new fields are added.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org