You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/12/04 09:33:46 UTC

[GitHub] [kafka] dajac opened a new pull request #9689: KAFKA-10740; Replace OffsetsForLeaderEpochRequest.PartitionData with automated protocol

dajac opened a new pull request #9689:
URL: https://github.com/apache/kafka/pull/9689


   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #9689: KAFKA-10740; Replace OffsetsForLeaderEpochRequest.PartitionData with automated protocol

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #9689:
URL: https://github.com/apache/kafka/pull/9689#discussion_r540017068



##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -160,7 +161,9 @@ abstract class AbstractFetcherThread(name: String,
       if (state.isTruncating) {
         latestEpoch(tp) match {
           case Some(epoch) if isOffsetForLeaderEpochSupported =>
-            partitionsWithEpochs += tp -> new EpochData(Optional.of(state.currentLeaderEpoch), epoch)
+            partitionsWithEpochs += tp -> new EpochData()

Review comment:
       Good catch! Let me fix this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac merged pull request #9689: KAFKA-10740; Replace OffsetsForLeaderEpochRequest.PartitionData with automated protocol

Posted by GitBox <gi...@apache.org>.
dajac merged pull request #9689:
URL: https://github.com/apache/kafka/pull/9689


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #9689: KAFKA-10740; Replace OffsetsForLeaderEpochRequest.PartitionData with automated protocol

Posted by GitBox <gi...@apache.org>.
chia7712 commented on pull request #9689:
URL: https://github.com/apache/kafka/pull/9689#issuecomment-765494602


   > You don't know how things will change over time ahead of time.
   
   You are right. However, it seems to me it is hard to apply “single” solution to all auto-generated data. Adding wrap to auto-generated data could cause extra cost (we can address lazy evaluation but it could make complicated code). By contrast, using auto-generated data in server-side directly gets less flexibility.
   
   It is trade off and it seems to me a happy medium is to add help methods to each request. Server-side can call help methods explicitly when it does need different behavior for different data version. The benefit is the extra cost is produced iff we does need different data struct. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #9689: KAFKA-10740; Replace OffsetsForLeaderEpochRequest.PartitionData with automated protocol

Posted by GitBox <gi...@apache.org>.
dajac commented on pull request #9689:
URL: https://github.com/apache/kafka/pull/9689#issuecomment-747433744


   I've rebased the PR. I will merge it when I get a clean build.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #9689: KAFKA-10740; Replace OffsetsForLeaderEpochRequest.PartitionData with automated protocol

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #9689:
URL: https://github.com/apache/kafka/pull/9689#discussion_r540406112



##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -268,11 +271,12 @@ abstract class AbstractFetcherThread(name: String,
             fetchOffsets.put(tp, offsetTruncationState)
 
         case Errors.FENCED_LEADER_EPOCH =>
-          if (onPartitionFenced(tp, latestEpochsForPartitions.get(tp).flatMap {
-            p =>
-              if (p.currentLeaderEpoch.isPresent) Some(p.currentLeaderEpoch.get())
-              else None
-          })) partitionsWithError += tp
+          val currentLeaderEpoch = latestEpochsForPartitions.get(tp) match {

Review comment:
       As onPartitionFenced is used in other code paths, I did not refactor it. We could have another variant which does not take an Optional. Is it worth doing it?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #9689: KAFKA-10740; Replace OffsetsForLeaderEpochRequest.PartitionData with automated protocol

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #9689:
URL: https://github.com/apache/kafka/pull/9689#issuecomment-765474499


   It's the nature of protocol evolution. You don't know how things will change over time ahead of time.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9689: KAFKA-10740; Replace OffsetsForLeaderEpochRequest.PartitionData with automated protocol

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9689:
URL: https://github.com/apache/kafka/pull/9689#discussion_r540311440



##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -268,11 +271,12 @@ abstract class AbstractFetcherThread(name: String,
             fetchOffsets.put(tp, offsetTruncationState)
 
         case Errors.FENCED_LEADER_EPOCH =>
-          if (onPartitionFenced(tp, latestEpochsForPartitions.get(tp).flatMap {
-            p =>
-              if (p.currentLeaderEpoch.isPresent) Some(p.currentLeaderEpoch.get())
-              else None
-          })) partitionsWithError += tp
+          val currentLeaderEpoch = latestEpochsForPartitions.get(tp) match {

Review comment:
       Hmm.. It's a little curious that we need the call to `getLeaderEpoch` here. Tracing this back to `fetchTruncatingPartitions`, it looks like we could not have a negative epoch here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #9689: KAFKA-10740; Replace OffsetsForLeaderEpochRequest.PartitionData with automated protocol

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #9689:
URL: https://github.com/apache/kafka/pull/9689#discussion_r544994273



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java
##########
@@ -34,7 +34,7 @@
 
     private RequestUtils() {}
 
-    static Optional<Integer> getLeaderEpoch(int leaderEpoch) {
+    public static Optional<Integer> getLeaderEpoch(int leaderEpoch) {

Review comment:
       It is ok to me :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #9689: KAFKA-10740; Replace OffsetsForLeaderEpochRequest.PartitionData with automated protocol

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #9689:
URL: https://github.com/apache/kafka/pull/9689#issuecomment-765497872


   It definitely requires analysis on a case by case basis. However, having helpers that do conversion lazily also have performance issues if the method gets called repeatedly, unless you cache the converted data (which has its own complexities). Generally, a good thing to aim for is that recent versions don't require conversions and older version conversions into the latest format are done only once.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #9689: KAFKA-10740; Replace OffsetsForLeaderEpochRequest.PartitionData with automated protocol

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #9689:
URL: https://github.com/apache/kafka/pull/9689#issuecomment-747594813


   @dajac Yeah, I'm definitely in favor of the improvements you have suggested, but I'm not sure they 100% address the concern. The case of `Optional` is an interesting example. A field might become optional because it is not supported in older version of the protocol. That optionality leaks into the code even when we can guarantee that internal data structures will have the field present. Anyway, I still think we should be thinking about these conversions when they make sense. Probably down the Fetch paths (including the replica fetcher) is the biggest opportunity for performance improvements. I'm just highlighting some downsides.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #9689: KAFKA-10740; Replace OffsetsForLeaderEpochRequest.PartitionData with automated protocol

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #9689:
URL: https://github.com/apache/kafka/pull/9689#discussion_r540016804



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
##########
@@ -143,30 +127,24 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         return new OffsetsForLeaderEpochResponse(responseData);
     }
 
-    public static class PartitionData {
-        public final Optional<Integer> currentLeaderEpoch;
-        public final int leaderEpoch;
-
-        public PartitionData(Optional<Integer> currentLeaderEpoch, int leaderEpoch) {
-            this.currentLeaderEpoch = currentLeaderEpoch;
-            this.leaderEpoch = leaderEpoch;
-        }
-
-        @Override
-        public String toString() {
-            StringBuilder bld = new StringBuilder();
-            bld.append("(currentLeaderEpoch=").append(currentLeaderEpoch).
-                append(", leaderEpoch=").append(leaderEpoch).
-                append(")");
-            return bld.toString();
-        }
-    }
-
     /**
      * Check whether a broker allows Topic-level permissions in order to use the
      * OffsetForLeaderEpoch API. Old versions require Cluster permission.
      */
     public static boolean supportsTopicPermission(short latestUsableVersion) {
         return latestUsableVersion >= 3;
     }
+
+    /**
+     * Exposed `OffsetForLeaderPartition.currentLeaderEpoch` as an `java.util.Optional`.
+     *
+     * Classes auto-generated based on the protocol do not support `java.util.Optional` yet. This
+     * is a temporary workaround until that work is completed.
+     */
+    public static Optional<Integer> currentLeaderEpochOpt(OffsetForLeaderPartition offsetForLeaderPartition) {

Review comment:
       Yes, let me change this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #9689: KAFKA-10740; Replace OffsetsForLeaderEpochRequest.PartitionData with automated protocol

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #9689:
URL: https://github.com/apache/kafka/pull/9689#issuecomment-765233789


   Like Jason said, I think the main challenge is how to deal with protocol evolution. It's typically possible to deal with, but it can require quite a lot of contortion. LeaderAndIsrRequest is one example:
   
   https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #9689: KAFKA-10740; Replace OffsetsForLeaderEpochRequest.PartitionData with automated protocol

Posted by GitBox <gi...@apache.org>.
chia7712 commented on pull request #9689:
URL: https://github.com/apache/kafka/pull/9689#issuecomment-765246731


   It look like a bad pattern if we have to "contort" a bunch of data from auto-generated protocol when processing. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9689: KAFKA-10740; Replace OffsetsForLeaderEpochRequest.PartitionData with automated protocol

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9689:
URL: https://github.com/apache/kafka/pull/9689#discussion_r539700926



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
##########
@@ -143,30 +127,24 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         return new OffsetsForLeaderEpochResponse(responseData);
     }
 
-    public static class PartitionData {
-        public final Optional<Integer> currentLeaderEpoch;
-        public final int leaderEpoch;
-
-        public PartitionData(Optional<Integer> currentLeaderEpoch, int leaderEpoch) {
-            this.currentLeaderEpoch = currentLeaderEpoch;
-            this.leaderEpoch = leaderEpoch;
-        }
-
-        @Override
-        public String toString() {
-            StringBuilder bld = new StringBuilder();
-            bld.append("(currentLeaderEpoch=").append(currentLeaderEpoch).
-                append(", leaderEpoch=").append(leaderEpoch).
-                append(")");
-            return bld.toString();
-        }
-    }
-
     /**
      * Check whether a broker allows Topic-level permissions in order to use the
      * OffsetForLeaderEpoch API. Old versions require Cluster permission.
      */
     public static boolean supportsTopicPermission(short latestUsableVersion) {
         return latestUsableVersion >= 3;
     }
+
+    /**
+     * Exposed `OffsetForLeaderPartition.currentLeaderEpoch` as an `java.util.Optional`.
+     *
+     * Classes auto-generated based on the protocol do not support `java.util.Optional` yet. This
+     * is a temporary workaround until that work is completed.
+     */
+    public static Optional<Integer> currentLeaderEpochOpt(OffsetForLeaderPartition offsetForLeaderPartition) {

Review comment:
       Could we use `RequestUtils.getLeaderEpoch`?

##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -268,11 +271,12 @@ abstract class AbstractFetcherThread(name: String,
             fetchOffsets.put(tp, offsetTruncationState)
 
         case Errors.FENCED_LEADER_EPOCH =>
-          if (onPartitionFenced(tp, latestEpochsForPartitions.get(tp).flatMap {
-            p =>
-              if (p.currentLeaderEpoch.isPresent) Some(p.currentLeaderEpoch.get())
-              else None
-          })) partitionsWithError += tp
+          val currentLeaderEpoch = latestEpochsForPartitions.get(tp) match {

Review comment:
       Maybe we can use `scala.compat.java8.OptionConverters._`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #9689: KAFKA-10740; Replace OffsetsForLeaderEpochRequest.PartitionData with automated protocol

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #9689:
URL: https://github.com/apache/kafka/pull/9689#discussion_r540382825



##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -268,11 +271,12 @@ abstract class AbstractFetcherThread(name: String,
             fetchOffsets.put(tp, offsetTruncationState)
 
         case Errors.FENCED_LEADER_EPOCH =>
-          if (onPartitionFenced(tp, latestEpochsForPartitions.get(tp).flatMap {
-            p =>
-              if (p.currentLeaderEpoch.isPresent) Some(p.currentLeaderEpoch.get())
-              else None
-          })) partitionsWithError += tp
+          val currentLeaderEpoch = latestEpochsForPartitions.get(tp) match {

Review comment:
       Yeah, that's because `onPartitionFenced` takes an `Optional` actually.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #9689: KAFKA-10740; Replace OffsetsForLeaderEpochRequest.PartitionData with automated protocol

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #9689:
URL: https://github.com/apache/kafka/pull/9689#discussion_r542386056



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java
##########
@@ -34,7 +34,7 @@
 
     private RequestUtils() {}
 
-    static Optional<Integer> getLeaderEpoch(int leaderEpoch) {
+    public static Optional<Integer> getLeaderEpoch(int leaderEpoch) {

Review comment:
       There are some duplicate code. Maybe we can unify them in this PR (or we can address it in follow-up) ?
   
   1. https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1399
   2. https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1553
   3. https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java#L358
   4. https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L1124
   5. https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L534
   6. https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L1890




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #9689: KAFKA-10740; Replace OffsetsForLeaderEpochRequest.PartitionData with automated protocol

Posted by GitBox <gi...@apache.org>.
dajac commented on pull request #9689:
URL: https://github.com/apache/kafka/pull/9689#issuecomment-741692045


   cc @chia7712 @hachikuji 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #9689: KAFKA-10740; Replace OffsetsForLeaderEpochRequest.PartitionData with automated protocol

Posted by GitBox <gi...@apache.org>.
dajac commented on pull request #9689:
URL: https://github.com/apache/kafka/pull/9689#issuecomment-742414752


   @chia7712 @hachikuji Thanks for your feedback. I have addressed your comments. Could you take another look please?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #9689: KAFKA-10740; Replace OffsetsForLeaderEpochRequest.PartitionData with automated protocol

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #9689:
URL: https://github.com/apache/kafka/pull/9689#discussion_r544993695



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java
##########
@@ -34,7 +34,7 @@
 
     private RequestUtils() {}
 
-    static Optional<Integer> getLeaderEpoch(int leaderEpoch) {
+    public static Optional<Integer> getLeaderEpoch(int leaderEpoch) {

Review comment:
       @chia7712 Thanks for pointing this out. I think that we should tackle this in follow-up PRs as this is not strictly related to this change. Ok for you?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #9689: KAFKA-10740; Replace OffsetsForLeaderEpochRequest.PartitionData with automated protocol

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #9689:
URL: https://github.com/apache/kafka/pull/9689#discussion_r542291295



##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -268,11 +271,12 @@ abstract class AbstractFetcherThread(name: String,
             fetchOffsets.put(tp, offsetTruncationState)
 
         case Errors.FENCED_LEADER_EPOCH =>
-          if (onPartitionFenced(tp, latestEpochsForPartitions.get(tp).flatMap {
-            p =>
-              if (p.currentLeaderEpoch.isPresent) Some(p.currentLeaderEpoch.get())
-              else None
-          })) partitionsWithError += tp
+          val currentLeaderEpoch = latestEpochsForPartitions.get(tp) match {

Review comment:
       @hachikuji Ah.. I see what you mean now. I have removed the call to `getLeaderEpoch`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #9689: KAFKA-10740; Replace OffsetsForLeaderEpochRequest.PartitionData with automated protocol

Posted by GitBox <gi...@apache.org>.
chia7712 commented on pull request #9689:
URL: https://github.com/apache/kafka/pull/9689#issuecomment-747370407


   > Assuming that we can it make feel more natural with the suggested improvement, do you believe that we should try to continue on trying to use the auto-generate requests and responses directly?
   
   I'm +1000 to @dajac as there is a great benefit from using auto-generated code.
   1. avoid extra conversion 
   2. remove duplicate code from code base
   3. builder pattern (it is especially good to the classes having a bunch of fields)
   
   Also, it is ok to use extra request/response wrap to offer some helper methods when they are NOT supported by auto-generated code (optional and TopicPartition, for example).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #9689: KAFKA-10740; Replace OffsetsForLeaderEpochRequest.PartitionData with automated protocol

Posted by GitBox <gi...@apache.org>.
dajac commented on pull request #9689:
URL: https://github.com/apache/kafka/pull/9689#issuecomment-747363671


   @hachikuji I totally agree with your comment. I have been pursuing this migration for this API to better understand the issues that we would face if we really want to directly use the auto-generated requests and responses in the near future.
   
   If we want to proceed on that path, I think that we should evolved the auto-generated protocol. I envision the following improvements:
   
   1. The first area is about the `Map`. We made the choice to not use `Map` in the auto-generated classes in favour of using our internal map-like data structure which is more efficient. The down side of this is that it always requires to convert our internal `Map` to that data structure. It would be great if we could just use the internal `Map` directly. That would reduce the boiler plate code to do the conversion.
   
   2. Similarly, it seems that could make `TopicPartition` a first class citizen in the auto-generated protocol. It would be so nice if we would get `Map` with `TopicPartition` as the key when a request or a response represents topics/partitions. I have put some details here: https://issues.apache.org/jira/browse/KAFKA-10795. This would remove the awkwardness that you are talking about as, in our particular case, it would remove the partition index from `EpochData`.
   
   3. I believe that we should add support for `Optional` in the auto-generated protocol. The idea is to get an empty option when the value is equals to the sentinel value. I prototyped this a while ago. The PR is a bit outdated but the overall idea remains. It would be great it you could take a look. https://github.com/apache/kafka/pull/9085
   
   4. Another area would be to have the ability to define common structs in the auto-generated protocol. That would allow to share them between requests/responses. This is not strictly necessary but that would be nice to have.
   
   Overall, I believe that we could make it happen if we improve the auto-generated protocol. The substitution reads pretty well when the auto-generated data structure is a one-to-one replacement. This PR is a good example: https://github.com/apache/kafka/pull/9746.
   
   Assuming that we can it make feel more natural with the suggested improvement, do you believe that we should try to continue on trying to use the auto-generate requests and responses directly?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #9689: KAFKA-10740; Replace OffsetsForLeaderEpochRequest.PartitionData with automated protocol

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #9689:
URL: https://github.com/apache/kafka/pull/9689#discussion_r540030883



##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -160,7 +161,9 @@ abstract class AbstractFetcherThread(name: String,
       if (state.isTruncating) {
         latestEpoch(tp) match {
           case Some(epoch) if isOffsetForLeaderEpochSupported =>
-            partitionsWithEpochs += tp -> new EpochData(Optional.of(state.currentLeaderEpoch), epoch)
+            partitionsWithEpochs += tp -> new EpochData()

Review comment:
       I have updated tests to catch this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #9689: KAFKA-10740; Replace OffsetsForLeaderEpochRequest.PartitionData with automated protocol

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #9689:
URL: https://github.com/apache/kafka/pull/9689#discussion_r539254641



##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -160,7 +161,9 @@ abstract class AbstractFetcherThread(name: String,
       if (state.isTruncating) {
         latestEpoch(tp) match {
           case Some(epoch) if isOffsetForLeaderEpochSupported =>
-            partitionsWithEpochs += tp -> new EpochData(Optional.of(state.currentLeaderEpoch), epoch)
+            partitionsWithEpochs += tp -> new EpochData()

Review comment:
       Should it call ```setPartition```? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #9689: KAFKA-10740; Replace OffsetsForLeaderEpochRequest.PartitionData with automated protocol

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #9689:
URL: https://github.com/apache/kafka/pull/9689#discussion_r540016662



##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -268,11 +271,12 @@ abstract class AbstractFetcherThread(name: String,
             fetchOffsets.put(tp, offsetTruncationState)
 
         case Errors.FENCED_LEADER_EPOCH =>
-          if (onPartitionFenced(tp, latestEpochsForPartitions.get(tp).flatMap {
-            p =>
-              if (p.currentLeaderEpoch.isPresent) Some(p.currentLeaderEpoch.get())
-              else None
-          })) partitionsWithError += tp
+          val currentLeaderEpoch = latestEpochsForPartitions.get(tp) match {

Review comment:
       We could but that would require two conversions instead of one:
   ```
   val currentLeaderEpoch = latestEpochsForPartitions.get(tp).flatMap { p =>
       RequestUtils.getLeaderEpoch(p.currentLeaderEpoch).asScala
   }.asJava
   ```
   I have a small preference for the current approach to avoid this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org