You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/21 21:31:13 UTC

[GitHub] [kafka] hachikuji opened a new pull request #8709: KAFKA-9952; Remove immediate fetch completion logic on high watermark updates

hachikuji opened a new pull request #8709:
URL: https://github.com/apache/kafka/pull/8709


   For KIP-392, we added logic to make sure that high watermark changes are propagated to followers without delay in order to improve end to end latency when fetching from followers. The downside of this change is that it increases the rate of fetch requests from followers which can have a noticeable impact on performance (see KAFKA-9731). 
   
   To fix that problem, we have already modified the code so that we only propagate high watermark changes immediately when a replica selector is used (which is not the default). However, leaving this logic around means that it is risky to enable follower fetching since it changes the follower request rate, which can have a big impact on overall broker performance. 
   
   This patch disables immediate propagation of the high watermark more generally. Instead, users can use the max wait time in order to control the worst-case latency. Note that this is typically only a problem anyway for low-throughput clusters since otherwise we will have a steady rate of high watermark updates.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #8709: KAFKA-9952; Remove immediate fetch completion logic on high watermark updates

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #8709:
URL: https://github.com/apache/kafka/pull/8709#discussion_r431300357



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -109,9 +109,19 @@ case class LogReadResult(info: FetchDataInfo,
   def withEmptyFetchInfo: LogReadResult =
     copy(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY))
 
-  override def toString =
-    s"Fetch Data: [$info], HW: [$highWatermark], leaderLogStartOffset: [$leaderLogStartOffset], leaderLogEndOffset: [$leaderLogEndOffset], " +
-    s"followerLogStartOffset: [$followerLogStartOffset], fetchTimeMs: [$fetchTimeMs], readSize: [$readSize], lastStableOffset: [$lastStableOffset], error: [$error]"
+  override def toString = {
+    "LogReadResult(" +
+      s"info=$info, " +
+      s"highWatermark=$highWatermark, " +
+      s"leaderLogStartOffset=$leaderLogStartOffset, " +
+      s"leaderLogEndOffset=$leaderLogEndOffset, " +
+      s"followerLogStartOffset=$followerLogStartOffset, " +
+      s"fetchTimeMs=$fetchTimeMs, " +
+      s"preferredReadReplica=$preferredReadReplica, " +
+      s"lastStableOffset=$lastStableOffset, " +
+      s"error=$error" +
+      ")"

Review comment:
       Sweet!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #8709: KAFKA-9952; Remove immediate fetch completion logic on high watermark updates

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #8709:
URL: https://github.com/apache/kafka/pull/8709#issuecomment-634803985


   Looks like `productToString` only works with Scala 2.13. :(


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma edited a comment on pull request #8709: KAFKA-9952; Remove immediate fetch completion logic on high watermark updates

Posted by GitBox <gi...@apache.org>.
ijuma edited a comment on pull request #8709:
URL: https://github.com/apache/kafka/pull/8709#issuecomment-634126643


   Makes sense, let me review this one as it stands then.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #8709: KAFKA-9952; Remove immediate fetch completion logic on high watermark updates

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #8709:
URL: https://github.com/apache/kafka/pull/8709#issuecomment-634126643






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #8709: KAFKA-9952; Remove immediate fetch completion logic on high watermark updates

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #8709:
URL: https://github.com/apache/kafka/pull/8709#discussion_r430546391



##########
File path: core/src/main/scala/kafka/server/DelayedFetch.scala
##########
@@ -120,14 +119,6 @@ class DelayedFetch(delayMs: Long,
                   accumulatedSize += bytesAvailable
               }
             }
-
-            if (fetchMetadata.isFromFollower) {
-              // Case H check if the follower has the latest HW from the leader
-              if (partition.getReplica(fetchMetadata.replicaId)
-                .exists(r => offsetSnapshot.highWatermark.messageOffset > r.lastSentHighWatermark)) {
-                return forceComplete()
-              }
-            }

Review comment:
       Yeah, we missed this in the other patch.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1100,10 +1100,8 @@ class ReplicaManager(val config: KafkaConfig,
             leaderLogEndOffset = readInfo.logEndOffset,
             followerLogStartOffset = followerLogStartOffset,
             fetchTimeMs = fetchTimeMs,
-            readSize = adjustedMaxBytes,

Review comment:
       We can get the read size already from the `Records` object. The code must have been changed at some point to use this. It looks like `adjustedMaxBytes` is still sent through in the call to `Partition.readRecords`.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -109,9 +109,19 @@ case class LogReadResult(info: FetchDataInfo,
   def withEmptyFetchInfo: LogReadResult =
     copy(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY))
 
-  override def toString =
-    s"Fetch Data: [$info], HW: [$highWatermark], leaderLogStartOffset: [$leaderLogStartOffset], leaderLogEndOffset: [$leaderLogEndOffset], " +
-    s"followerLogStartOffset: [$followerLogStartOffset], fetchTimeMs: [$fetchTimeMs], readSize: [$readSize], lastStableOffset: [$lastStableOffset], error: [$error]"
+  override def toString = {
+    "LogReadResult(" +
+      s"info=$info, " +
+      s"highWatermark=$highWatermark, " +
+      s"leaderLogStartOffset=$leaderLogStartOffset, " +
+      s"leaderLogEndOffset=$leaderLogEndOffset, " +
+      s"followerLogStartOffset=$followerLogStartOffset, " +
+      s"fetchTimeMs=$fetchTimeMs, " +
+      s"preferredReadReplica=$preferredReadReplica, " +
+      s"lastStableOffset=$lastStableOffset, " +
+      s"error=$error" +
+      ")"

Review comment:
       Do we get the labels from the default `toString`? In the past, I thought it would only show the values.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #8709: KAFKA-9952; Remove immediate fetch completion logic on high watermark updates

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #8709:
URL: https://github.com/apache/kafka/pull/8709#issuecomment-634124729






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #8709: KAFKA-9952; Remove immediate fetch completion logic on high watermark updates

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #8709:
URL: https://github.com/apache/kafka/pull/8709#issuecomment-634825301


   Bummer. Guess I will revert the commit.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #8709: KAFKA-9952; Remove immediate fetch completion logic on high watermark updates

Posted by GitBox <gi...@apache.org>.
hachikuji merged pull request #8709:
URL: https://github.com/apache/kafka/pull/8709


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #8709: KAFKA-9952; Remove immediate fetch completion logic on high watermark updates

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #8709:
URL: https://github.com/apache/kafka/pull/8709#issuecomment-634803238


   Argh:
   
   > 09:53:20 [Error] /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk8-scala2.12/core/src/main/scala/kafka/utils/CoreUtils.scala:326: value productElementName is not a member of Product
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #8709: KAFKA-9952; Remove immediate fetch completion logic on high watermark updates

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #8709:
URL: https://github.com/apache/kafka/pull/8709#discussion_r429653946



##########
File path: core/src/main/scala/kafka/server/DelayedFetch.scala
##########
@@ -120,14 +119,6 @@ class DelayedFetch(delayMs: Long,
                   accumulatedSize += bytesAvailable
               }
             }
-
-            if (fetchMetadata.isFromFollower) {
-              // Case H check if the follower has the latest HW from the leader
-              if (partition.getReplica(fetchMetadata.replicaId)
-                .exists(r => offsetSnapshot.highWatermark.messageOffset > r.lastSentHighWatermark)) {
-                return forceComplete()
-              }
-            }

Review comment:
       Interesting. This could cause an early return for the leader case too, right?

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1100,10 +1100,8 @@ class ReplicaManager(val config: KafkaConfig,
             leaderLogEndOffset = readInfo.logEndOffset,
             followerLogStartOffset = followerLogStartOffset,
             fetchTimeMs = fetchTimeMs,
-            readSize = adjustedMaxBytes,

Review comment:
       Do we still need to compute `adjustedMaxBytes`? Also, do you know why we don't need `readSize` anymore? What change made it unnecessary?

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -109,9 +109,19 @@ case class LogReadResult(info: FetchDataInfo,
   def withEmptyFetchInfo: LogReadResult =
     copy(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY))
 
-  override def toString =
-    s"Fetch Data: [$info], HW: [$highWatermark], leaderLogStartOffset: [$leaderLogStartOffset], leaderLogEndOffset: [$leaderLogEndOffset], " +
-    s"followerLogStartOffset: [$followerLogStartOffset], fetchTimeMs: [$fetchTimeMs], readSize: [$readSize], lastStableOffset: [$lastStableOffset], error: [$error]"
+  override def toString = {
+    "LogReadResult(" +
+      s"info=$info, " +
+      s"highWatermark=$highWatermark, " +
+      s"leaderLogStartOffset=$leaderLogStartOffset, " +
+      s"leaderLogEndOffset=$leaderLogEndOffset, " +
+      s"followerLogStartOffset=$followerLogStartOffset, " +
+      s"fetchTimeMs=$fetchTimeMs, " +
+      s"preferredReadReplica=$preferredReadReplica, " +
+      s"lastStableOffset=$lastStableOffset, " +
+      s"error=$error" +
+      ")"

Review comment:
       Could we use the `toString` from the case class?

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1100,10 +1100,8 @@ class ReplicaManager(val config: KafkaConfig,
             leaderLogEndOffset = readInfo.logEndOffset,
             followerLogStartOffset = followerLogStartOffset,
             fetchTimeMs = fetchTimeMs,
-            readSize = adjustedMaxBytes,

Review comment:
       Thanks, makes sense.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -109,9 +109,19 @@ case class LogReadResult(info: FetchDataInfo,
   def withEmptyFetchInfo: LogReadResult =
     copy(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY))
 
-  override def toString =
-    s"Fetch Data: [$info], HW: [$highWatermark], leaderLogStartOffset: [$leaderLogStartOffset], leaderLogEndOffset: [$leaderLogEndOffset], " +
-    s"followerLogStartOffset: [$followerLogStartOffset], fetchTimeMs: [$fetchTimeMs], readSize: [$readSize], lastStableOffset: [$lastStableOffset], error: [$error]"
+  override def toString = {
+    "LogReadResult(" +
+      s"info=$info, " +
+      s"highWatermark=$highWatermark, " +
+      s"leaderLogStartOffset=$leaderLogStartOffset, " +
+      s"leaderLogEndOffset=$leaderLogEndOffset, " +
+      s"followerLogStartOffset=$followerLogStartOffset, " +
+      s"fetchTimeMs=$fetchTimeMs, " +
+      s"preferredReadReplica=$preferredReadReplica, " +
+      s"lastStableOffset=$lastStableOffset, " +
+      s"error=$error" +
+      ")"

Review comment:
       That's true. You have to write `toString` yourself if you want labels. We could write a utility method like so (didn't try too compile it):
   
   ```scala
   public def productToString(product: Product): String = {
     val builder = new StringBuilder
     sb.append(product.prefix)
     for (i <- 0 until productArity) {
       builder.append(productElementName(i)).append("=").append(productElement(i))
     }
     sb.build()
   }
   ```
   
   Then we can call that method from any case class `toString` where we want this format. Avoids some duplication and forgetting to update `toString` when new methods are added.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -109,9 +109,19 @@ case class LogReadResult(info: FetchDataInfo,
   def withEmptyFetchInfo: LogReadResult =
     copy(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY))
 
-  override def toString =
-    s"Fetch Data: [$info], HW: [$highWatermark], leaderLogStartOffset: [$leaderLogStartOffset], leaderLogEndOffset: [$leaderLogEndOffset], " +
-    s"followerLogStartOffset: [$followerLogStartOffset], fetchTimeMs: [$fetchTimeMs], readSize: [$readSize], lastStableOffset: [$lastStableOffset], error: [$error]"
+  override def toString = {
+    "LogReadResult(" +
+      s"info=$info, " +
+      s"highWatermark=$highWatermark, " +
+      s"leaderLogStartOffset=$leaderLogStartOffset, " +
+      s"leaderLogEndOffset=$leaderLogEndOffset, " +
+      s"followerLogStartOffset=$followerLogStartOffset, " +
+      s"fetchTimeMs=$fetchTimeMs, " +
+      s"preferredReadReplica=$preferredReadReplica, " +
+      s"lastStableOffset=$lastStableOffset, " +
+      s"error=$error" +
+      ")"

Review comment:
       That's true. You have to write `toString` yourself if you want labels. We could write a utility method like so (didn't try too compile it):
   
   ```scala
   def productToString(product: Product): String = {
     val builder = new StringBuilder
     sb.append(product.prefix)
     for (i <- 0 until productArity) {
       builder.append(productElementName(i)).append("=").append(productElement(i))
     }
     sb.build()
   }
   ```
   
   Then we can call that method from any case class `toString` where we want this format. Avoids some duplication and forgetting to update `toString` when new methods are added.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -109,9 +109,19 @@ case class LogReadResult(info: FetchDataInfo,
   def withEmptyFetchInfo: LogReadResult =
     copy(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY))
 
-  override def toString =
-    s"Fetch Data: [$info], HW: [$highWatermark], leaderLogStartOffset: [$leaderLogStartOffset], leaderLogEndOffset: [$leaderLogEndOffset], " +
-    s"followerLogStartOffset: [$followerLogStartOffset], fetchTimeMs: [$fetchTimeMs], readSize: [$readSize], lastStableOffset: [$lastStableOffset], error: [$error]"
+  override def toString = {
+    "LogReadResult(" +
+      s"info=$info, " +
+      s"highWatermark=$highWatermark, " +
+      s"leaderLogStartOffset=$leaderLogStartOffset, " +
+      s"leaderLogEndOffset=$leaderLogEndOffset, " +
+      s"followerLogStartOffset=$followerLogStartOffset, " +
+      s"fetchTimeMs=$fetchTimeMs, " +
+      s"preferredReadReplica=$preferredReadReplica, " +
+      s"lastStableOffset=$lastStableOffset, " +
+      s"error=$error" +
+      ")"

Review comment:
       That's true. You have to write `toString` yourself if you want labels. We could write a utility method like so (didn't try too compile it):
   
   ```scala
   def productToString(product: Product): String = {
     val builder = new StringBuilder
     sb.append(product.prefix)
     for (i <- 0 until product.productArity) {
       builder.append(product.productElementName(i))
         .append("=")
         .append(product.productElement(i))
     }
     sb.build()
   }
   ```
   
   Then we can call that method from any case class `toString` where we want this format. Avoids some duplication and forgetting to update `toString` when new methods are added.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -109,9 +109,19 @@ case class LogReadResult(info: FetchDataInfo,
   def withEmptyFetchInfo: LogReadResult =
     copy(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY))
 
-  override def toString =
-    s"Fetch Data: [$info], HW: [$highWatermark], leaderLogStartOffset: [$leaderLogStartOffset], leaderLogEndOffset: [$leaderLogEndOffset], " +
-    s"followerLogStartOffset: [$followerLogStartOffset], fetchTimeMs: [$fetchTimeMs], readSize: [$readSize], lastStableOffset: [$lastStableOffset], error: [$error]"
+  override def toString = {
+    "LogReadResult(" +
+      s"info=$info, " +
+      s"highWatermark=$highWatermark, " +
+      s"leaderLogStartOffset=$leaderLogStartOffset, " +
+      s"leaderLogEndOffset=$leaderLogEndOffset, " +
+      s"followerLogStartOffset=$followerLogStartOffset, " +
+      s"fetchTimeMs=$fetchTimeMs, " +
+      s"preferredReadReplica=$preferredReadReplica, " +
+      s"lastStableOffset=$lastStableOffset, " +
+      s"error=$error" +
+      ")"

Review comment:
       That's true. You have to write `toString` yourself if you want labels. We could write a utility method like so (didn't try too compile it and skipped some details like commas):
   
   ```scala
   def productToString(product: Product): String = {
     val builder = new StringBuilder
     sb.append(product.prefix)
     for (i <- 0 until product.productArity) {
       builder.append(product.productElementName(i))
         .append("=")
         .append(product.productElement(i))
     }
     sb.build()
   }
   ```
   
   Then we can call that method from any case class `toString` where we want this format. Avoids some duplication and forgetting to update `toString` when new methods are added.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -109,9 +109,19 @@ case class LogReadResult(info: FetchDataInfo,
   def withEmptyFetchInfo: LogReadResult =
     copy(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY))
 
-  override def toString =
-    s"Fetch Data: [$info], HW: [$highWatermark], leaderLogStartOffset: [$leaderLogStartOffset], leaderLogEndOffset: [$leaderLogEndOffset], " +
-    s"followerLogStartOffset: [$followerLogStartOffset], fetchTimeMs: [$fetchTimeMs], readSize: [$readSize], lastStableOffset: [$lastStableOffset], error: [$error]"
+  override def toString = {
+    "LogReadResult(" +
+      s"info=$info, " +
+      s"highWatermark=$highWatermark, " +
+      s"leaderLogStartOffset=$leaderLogStartOffset, " +
+      s"leaderLogEndOffset=$leaderLogEndOffset, " +
+      s"followerLogStartOffset=$followerLogStartOffset, " +
+      s"fetchTimeMs=$fetchTimeMs, " +
+      s"preferredReadReplica=$preferredReadReplica, " +
+      s"lastStableOffset=$lastStableOffset, " +
+      s"error=$error" +
+      ")"

Review comment:
       That's true. You have to write `toString` yourself if you want labels. We could write a utility method like so (didn't try too compile it and skipped some details like commas):
   
   ```scala
   def productToString(product: Product): String = {
     val builder = new StringBuilder
     sb.append(product.prefix)
     for (i <- 0 until product.productArity) {
       builder.append(product.productElementName(i))
         .append("=")
         .append(product.productElement(i))
     }
     sb.build()
   }
   ```
   
   Then we can call that method from any case class `toString` where we want this format. Avoids some duplication and forgetting to update `toString` when new fields are added.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #8709: KAFKA-9952; Remove immediate fetch completion logic on high watermark updates

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #8709:
URL: https://github.com/apache/kafka/pull/8709#discussion_r431288110



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -109,9 +109,19 @@ case class LogReadResult(info: FetchDataInfo,
   def withEmptyFetchInfo: LogReadResult =
     copy(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY))
 
-  override def toString =
-    s"Fetch Data: [$info], HW: [$highWatermark], leaderLogStartOffset: [$leaderLogStartOffset], leaderLogEndOffset: [$leaderLogEndOffset], " +
-    s"followerLogStartOffset: [$followerLogStartOffset], fetchTimeMs: [$fetchTimeMs], readSize: [$readSize], lastStableOffset: [$lastStableOffset], error: [$error]"
+  override def toString = {
+    "LogReadResult(" +
+      s"info=$info, " +
+      s"highWatermark=$highWatermark, " +
+      s"leaderLogStartOffset=$leaderLogStartOffset, " +
+      s"leaderLogEndOffset=$leaderLogEndOffset, " +
+      s"followerLogStartOffset=$followerLogStartOffset, " +
+      s"fetchTimeMs=$fetchTimeMs, " +
+      s"preferredReadReplica=$preferredReadReplica, " +
+      s"lastStableOffset=$lastStableOffset, " +
+      s"error=$error" +
+      ")"

Review comment:
       Ok, I added something like that.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org