You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/12/09 22:42:22 UTC

[GitHub] [kafka] hachikuji commented on a change in pull request #9689: KAFKA-10740; Replace OffsetsForLeaderEpochRequest.PartitionData with automated protocol

hachikuji commented on a change in pull request #9689:
URL: https://github.com/apache/kafka/pull/9689#discussion_r539700926



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
##########
@@ -143,30 +127,24 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         return new OffsetsForLeaderEpochResponse(responseData);
     }
 
-    public static class PartitionData {
-        public final Optional<Integer> currentLeaderEpoch;
-        public final int leaderEpoch;
-
-        public PartitionData(Optional<Integer> currentLeaderEpoch, int leaderEpoch) {
-            this.currentLeaderEpoch = currentLeaderEpoch;
-            this.leaderEpoch = leaderEpoch;
-        }
-
-        @Override
-        public String toString() {
-            StringBuilder bld = new StringBuilder();
-            bld.append("(currentLeaderEpoch=").append(currentLeaderEpoch).
-                append(", leaderEpoch=").append(leaderEpoch).
-                append(")");
-            return bld.toString();
-        }
-    }
-
     /**
      * Check whether a broker allows Topic-level permissions in order to use the
      * OffsetForLeaderEpoch API. Old versions require Cluster permission.
      */
     public static boolean supportsTopicPermission(short latestUsableVersion) {
         return latestUsableVersion >= 3;
     }
+
+    /**
+     * Exposed `OffsetForLeaderPartition.currentLeaderEpoch` as an `java.util.Optional`.
+     *
+     * Classes auto-generated based on the protocol do not support `java.util.Optional` yet. This
+     * is a temporary workaround until that work is completed.
+     */
+    public static Optional<Integer> currentLeaderEpochOpt(OffsetForLeaderPartition offsetForLeaderPartition) {

Review comment:
       Could we use `RequestUtils.getLeaderEpoch`?

##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -268,11 +271,12 @@ abstract class AbstractFetcherThread(name: String,
             fetchOffsets.put(tp, offsetTruncationState)
 
         case Errors.FENCED_LEADER_EPOCH =>
-          if (onPartitionFenced(tp, latestEpochsForPartitions.get(tp).flatMap {
-            p =>
-              if (p.currentLeaderEpoch.isPresent) Some(p.currentLeaderEpoch.get())
-              else None
-          })) partitionsWithError += tp
+          val currentLeaderEpoch = latestEpochsForPartitions.get(tp) match {

Review comment:
       Maybe we can use `scala.compat.java8.OptionConverters._`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org