You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/11/18 18:01:29 UTC

[GitHub] [kafka] chia7712 commented on a change in pull request #9547: KAFKA-9630; Replace OffsetsForLeaderEpoch request/response with automated protocol

chia7712 commented on a change in pull request #9547:
URL: https://github.com/apache/kafka/pull/9547#discussion_r526301121



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
##########
@@ -51,169 +47,120 @@
      */
     public static final int DEBUGGING_REPLICA_ID = -2;
 
-    private static final Field.ComplexArray TOPICS = new Field.ComplexArray("topics",
-            "An array of topics to get epochs for");
-    private static final Field.ComplexArray PARTITIONS = new Field.ComplexArray("partitions",
-            "An array of partitions to get epochs for");
-
-    private static final Field.Int32 LEADER_EPOCH = new Field.Int32("leader_epoch",
-            "The epoch to lookup an offset for.");
-
-    private static final Field PARTITIONS_V0 = PARTITIONS.withFields(
-            PARTITION_ID,
-            LEADER_EPOCH);
-    private static final Field TOPICS_V0 = TOPICS.withFields(
-            TOPIC_NAME,
-            PARTITIONS_V0);
-    private static final Schema OFFSET_FOR_LEADER_EPOCH_REQUEST_V0 = new Schema(
-            TOPICS_V0);
-
-    // V1 request is the same as v0. Per-partition leader epoch has been added to response
-    private static final Schema OFFSET_FOR_LEADER_EPOCH_REQUEST_V1 = OFFSET_FOR_LEADER_EPOCH_REQUEST_V0;
-
-    // V2 adds the current leader epoch to support fencing and the addition of the throttle time in the response
-    private static final Field PARTITIONS_V2 = PARTITIONS.withFields(
-            PARTITION_ID,
-            CURRENT_LEADER_EPOCH,
-            LEADER_EPOCH);
-    private static final Field TOPICS_V2 = TOPICS.withFields(
-            TOPIC_NAME,
-            PARTITIONS_V2);
-    private static final Schema OFFSET_FOR_LEADER_EPOCH_REQUEST_V2 = new Schema(
-            TOPICS_V2);
-
-    private static final Schema OFFSET_FOR_LEADER_EPOCH_REQUEST_V3 = new Schema(
-            REPLICA_ID,
-            TOPICS_V2);
-
-    public static Schema[] schemaVersions() {
-        return new Schema[]{OFFSET_FOR_LEADER_EPOCH_REQUEST_V0, OFFSET_FOR_LEADER_EPOCH_REQUEST_V1,
-            OFFSET_FOR_LEADER_EPOCH_REQUEST_V2, OFFSET_FOR_LEADER_EPOCH_REQUEST_V3};
-    }
-
-    private final Map<TopicPartition, PartitionData> epochsByPartition;
-
-    private final int replicaId;
-
-    public Map<TopicPartition, PartitionData> epochsByTopicPartition() {
-        return epochsByPartition;
-    }
-
-    public int replicaId() {
-        return replicaId;
-    }
+    private final OffsetForLeaderEpochRequestData data;
 
     public static class Builder extends AbstractRequest.Builder<OffsetsForLeaderEpochRequest> {
-        private final Map<TopicPartition, PartitionData> epochsByPartition;
-        private final int replicaId;
+        private final OffsetForLeaderEpochRequestData data;
 
-        Builder(short oldestAllowedVersion, short latestAllowedVersion, Map<TopicPartition, PartitionData> epochsByPartition, int replicaId) {
+        Builder(short oldestAllowedVersion, short latestAllowedVersion, OffsetForLeaderEpochRequestData data) {
             super(ApiKeys.OFFSET_FOR_LEADER_EPOCH, oldestAllowedVersion, latestAllowedVersion);
-            this.epochsByPartition = epochsByPartition;
-            this.replicaId = replicaId;
+            this.data = data;
         }
 
-        public static Builder forConsumer(Map<TopicPartition, PartitionData> epochsByPartition) {
+        public static Builder forConsumer(OffsetForLeaderTopicCollection epochsByPartition) {
             // Old versions of this API require CLUSTER permission which is not typically granted
             // to clients. Beginning with version 3, the broker requires only TOPIC Describe
             // permission for the topic of each requested partition. In order to ensure client
             // compatibility, we only send this request when we can guarantee the relaxed permissions.
-            return new Builder((short) 3, ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(),
-                    epochsByPartition, CONSUMER_REPLICA_ID);
+            OffsetForLeaderEpochRequestData data = new OffsetForLeaderEpochRequestData();
+            data.setReplicaId(CONSUMER_REPLICA_ID);
+            data.setTopics(epochsByPartition);
+            return new Builder((short) 3, ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), data);
         }
 
         public static Builder forFollower(short version, Map<TopicPartition, PartitionData> epochsByPartition, int replicaId) {
-            return new Builder(version, version, epochsByPartition, replicaId);
+            OffsetForLeaderEpochRequestData data = new OffsetForLeaderEpochRequestData();
+            data.setReplicaId(replicaId);
+
+            epochsByPartition.forEach((partitionKey, partitionValue) -> {
+                OffsetForLeaderTopic topic = data.topics().find(partitionKey.topic());
+                if (topic == null) {
+                    topic = new OffsetForLeaderTopic().setTopic(partitionKey.topic());
+                    data.topics().add(topic);
+                }
+                topic.partitions().add(new OffsetForLeaderPartition()
+                    .setPartition(partitionKey.partition())
+                    .setLeaderEpoch(partitionValue.leaderEpoch)
+                    .setCurrentLeaderEpoch(partitionValue.currentLeaderEpoch
+                        .orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
+                );
+            });
+            return new Builder(version, version, data);
         }
 
         @Override
         public OffsetsForLeaderEpochRequest build(short version) {
             if (version < oldestAllowedVersion() || version > latestAllowedVersion())
                 throw new UnsupportedVersionException("Cannot build " + this + " with version " + version);
-            return new OffsetsForLeaderEpochRequest(epochsByPartition, replicaId, version);
-        }
 
-        public static OffsetsForLeaderEpochRequest parse(ByteBuffer buffer, short version) {
-            return new OffsetsForLeaderEpochRequest(ApiKeys.OFFSET_FOR_LEADER_EPOCH.parseRequest(version, buffer), version);
+            return new OffsetsForLeaderEpochRequest(data, version);
         }
 
         @Override
         public String toString() {
-            StringBuilder bld = new StringBuilder();
-            bld.append("OffsetsForLeaderEpochRequest(").
-                    append("epochsByPartition=").append(epochsByPartition).
-                    append(")");
-            return bld.toString();
+            return data.toString();
         }
     }
 
-    public OffsetsForLeaderEpochRequest(Map<TopicPartition, PartitionData> epochsByPartition, int replicaId, short version) {
+    public OffsetsForLeaderEpochRequest(OffsetForLeaderEpochRequestData data, short version) {
         super(ApiKeys.OFFSET_FOR_LEADER_EPOCH, version);
-        this.epochsByPartition = epochsByPartition;
-        this.replicaId = replicaId;
+        this.data = data;
     }
 
     public OffsetsForLeaderEpochRequest(Struct struct, short version) {
         super(ApiKeys.OFFSET_FOR_LEADER_EPOCH, version);
-        replicaId = struct.getOrElse(REPLICA_ID, DEBUGGING_REPLICA_ID);
-        epochsByPartition = new HashMap<>();
-        for (Object topicAndEpochsObj : struct.get(TOPICS)) {
-            Struct topicAndEpochs = (Struct) topicAndEpochsObj;
-            String topic = topicAndEpochs.get(TOPIC_NAME);
-            for (Object partitionAndEpochObj : topicAndEpochs.get(PARTITIONS)) {
-                Struct partitionAndEpoch = (Struct) partitionAndEpochObj;
-                int partitionId = partitionAndEpoch.get(PARTITION_ID);
-                int leaderEpoch = partitionAndEpoch.get(LEADER_EPOCH);
-                Optional<Integer> currentEpoch = RequestUtils.getLeaderEpoch(partitionAndEpoch, CURRENT_LEADER_EPOCH);
-                TopicPartition tp = new TopicPartition(topic, partitionId);
-                epochsByPartition.put(tp, new PartitionData(currentEpoch, leaderEpoch));
-            }
-        }
+        this.data = new OffsetForLeaderEpochRequestData(struct, version);
     }
 
-    public static OffsetsForLeaderEpochRequest parse(ByteBuffer buffer, short versionId) {
-        return new OffsetsForLeaderEpochRequest(ApiKeys.OFFSET_FOR_LEADER_EPOCH.parseRequest(versionId, buffer), versionId);
+    public OffsetForLeaderEpochRequestData data() {
+        return data;
+    }
+
+    public Map<TopicPartition, PartitionData> epochsByTopicPartition() {
+        Map<TopicPartition, PartitionData> epochsByTopicPartition = new HashMap<>();
+
+        data.topics().forEach(topic ->
+            topic.partitions().forEach(partition ->
+                epochsByTopicPartition.put(
+                    new TopicPartition(topic.topic(), partition.partition()),
+                    new PartitionData(
+                        RequestUtils.getLeaderEpoch(partition.currentLeaderEpoch()),
+                        partition.leaderEpoch()))));
+
+        return epochsByTopicPartition;
+    }
+
+    public int replicaId() {
+        return data.replicaId();
+    }
+
+    public static OffsetsForLeaderEpochRequest parse(ByteBuffer buffer, short version) {

Review comment:
       not sure why this kind of method still exist in each request. It is not used anymore.

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
##########
@@ -51,133 +41,82 @@
  * - {@link Errors#UNKNOWN_SERVER_ERROR} For any unexpected errors
  */
 public class OffsetsForLeaderEpochResponse extends AbstractResponse {
-    private static final Field.ComplexArray TOPICS = new Field.ComplexArray("topics",
-            "An array of topics for which we have leader offsets for some requested partition leader epoch");
-    private static final Field.ComplexArray PARTITIONS = new Field.ComplexArray("partitions",
-            "An array of offsets by partition");
-    private static final Field.Int64 END_OFFSET = new Field.Int64("end_offset", "The end offset");
-
-    private static final Field PARTITIONS_V0 = PARTITIONS.withFields(
-            ERROR_CODE,
-            PARTITION_ID,
-            END_OFFSET);
-    private static final Field TOPICS_V0 = TOPICS.withFields(
-            TOPIC_NAME,
-            PARTITIONS_V0);
-    private static final Schema OFFSET_FOR_LEADER_EPOCH_RESPONSE_V0 = new Schema(
-            TOPICS_V0);
-
-    // V1 added a per-partition leader epoch field which specifies which leader epoch the end offset belongs to
-    private static final Field PARTITIONS_V1 = PARTITIONS.withFields(
-            ERROR_CODE,
-            PARTITION_ID,
-            LEADER_EPOCH,
-            END_OFFSET);
-    private static final Field TOPICS_V1 = TOPICS.withFields(
-            TOPIC_NAME,
-            PARTITIONS_V1);
-    private static final Schema OFFSET_FOR_LEADER_EPOCH_RESPONSE_V1 = new Schema(
-            TOPICS_V1);
-
-    // V2 bumped for addition of current leader epoch to the request schema and the addition of the throttle
-    // time in the response
-    private static final Schema OFFSET_FOR_LEADER_EPOCH_RESPONSE_V2 = new Schema(
-            THROTTLE_TIME_MS,
-            TOPICS_V1);
-
-    private static final Schema OFFSET_FOR_LEADER_EPOCH_RESPONSE_V3 = OFFSET_FOR_LEADER_EPOCH_RESPONSE_V2;
-
-
-    public static Schema[] schemaVersions() {
-        return new Schema[]{OFFSET_FOR_LEADER_EPOCH_RESPONSE_V0, OFFSET_FOR_LEADER_EPOCH_RESPONSE_V1,
-            OFFSET_FOR_LEADER_EPOCH_RESPONSE_V2, OFFSET_FOR_LEADER_EPOCH_RESPONSE_V3};
+
+    private final OffsetForLeaderEpochResponseData data;
+
+    public OffsetsForLeaderEpochResponse(OffsetForLeaderEpochResponseData data) {
+        this.data = data;
     }
 
-    private final int throttleTimeMs;
-    private final Map<TopicPartition, EpochEndOffset> epochEndOffsetsByPartition;
-
-    public OffsetsForLeaderEpochResponse(Struct struct) {
-        this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
-        this.epochEndOffsetsByPartition = new HashMap<>();
-        for (Object topicAndEpocsObj : struct.get(TOPICS)) {
-            Struct topicAndEpochs = (Struct) topicAndEpocsObj;
-            String topic = topicAndEpochs.get(TOPIC_NAME);
-            for (Object partitionAndEpochObj : topicAndEpochs.get(PARTITIONS)) {
-                Struct partitionAndEpoch = (Struct) partitionAndEpochObj;
-                Errors error = Errors.forCode(partitionAndEpoch.get(ERROR_CODE));
-                int partitionId = partitionAndEpoch.get(PARTITION_ID);
-                TopicPartition tp = new TopicPartition(topic, partitionId);
-                int leaderEpoch = partitionAndEpoch.getOrElse(LEADER_EPOCH, RecordBatch.NO_PARTITION_LEADER_EPOCH);
-                long endOffset = partitionAndEpoch.get(END_OFFSET);
-                epochEndOffsetsByPartition.put(tp, new EpochEndOffset(error, leaderEpoch, endOffset));
-            }
-        }
+    public OffsetsForLeaderEpochResponse(Struct struct, short version) {
+        data = new OffsetForLeaderEpochResponseData(struct, version);
+    }
+
+    public OffsetsForLeaderEpochResponse(Map<TopicPartition, EpochEndOffset> offsets) {
+        this(0, offsets);
     }
 
-    public OffsetsForLeaderEpochResponse(Map<TopicPartition, EpochEndOffset> epochsByTopic) {
-        this(DEFAULT_THROTTLE_TIME, epochsByTopic);
+    public OffsetsForLeaderEpochResponse(int throttleTimeMs, Map<TopicPartition, EpochEndOffset> offsets) {
+        data = new OffsetForLeaderEpochResponseData();
+        data.setThrottleTimeMs(throttleTimeMs);
+
+        offsets.forEach((tp, offset) -> {
+            OffsetForLeaderTopicResult topic = data.topics().find(tp.topic());
+            if (topic == null) {
+                topic = new OffsetForLeaderTopicResult().setTopic(tp.topic());
+                data.topics().add(topic);
+            }
+            topic.partitions().add(new OffsetForLeaderPartitionResult()
+                .setPartition(tp.partition())
+                .setErrorCode(offset.error().code())
+                .setLeaderEpoch(offset.leaderEpoch())
+                .setEndOffset(offset.endOffset()));
+        });
     }
 
-    public OffsetsForLeaderEpochResponse(int throttleTimeMs, Map<TopicPartition, EpochEndOffset> epochsByTopic) {
-        this.throttleTimeMs = throttleTimeMs;
-        this.epochEndOffsetsByPartition = epochsByTopic;
+    public OffsetForLeaderEpochResponseData data() {
+        return data;
     }
 
     public Map<TopicPartition, EpochEndOffset> responses() {
+        Map<TopicPartition, EpochEndOffset> epochEndOffsetsByPartition = new HashMap<>();
+
+        data.topics().forEach(topic ->
+            topic.partitions().forEach(partition ->
+                epochEndOffsetsByPartition.put(
+                    new TopicPartition(topic.topic(), partition.partition()),
+                    new EpochEndOffset(
+                        Errors.forCode(partition.errorCode()),
+                        partition.leaderEpoch(),
+                        partition.endOffset()))));
+
         return epochEndOffsetsByPartition;
     }
 
     @Override
     public Map<Errors, Integer> errorCounts() {
         Map<Errors, Integer> errorCounts = new HashMap<>();
-        epochEndOffsetsByPartition.values().forEach(response ->
+        responses().values().forEach(response ->

Review comment:
       Is it worth using ```data``` rather than ```responses()``` to avoid extra conversion? 

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
##########
@@ -51,133 +41,82 @@
  * - {@link Errors#UNKNOWN_SERVER_ERROR} For any unexpected errors
  */
 public class OffsetsForLeaderEpochResponse extends AbstractResponse {
-    private static final Field.ComplexArray TOPICS = new Field.ComplexArray("topics",
-            "An array of topics for which we have leader offsets for some requested partition leader epoch");
-    private static final Field.ComplexArray PARTITIONS = new Field.ComplexArray("partitions",
-            "An array of offsets by partition");
-    private static final Field.Int64 END_OFFSET = new Field.Int64("end_offset", "The end offset");
-
-    private static final Field PARTITIONS_V0 = PARTITIONS.withFields(
-            ERROR_CODE,
-            PARTITION_ID,
-            END_OFFSET);
-    private static final Field TOPICS_V0 = TOPICS.withFields(
-            TOPIC_NAME,
-            PARTITIONS_V0);
-    private static final Schema OFFSET_FOR_LEADER_EPOCH_RESPONSE_V0 = new Schema(
-            TOPICS_V0);
-
-    // V1 added a per-partition leader epoch field which specifies which leader epoch the end offset belongs to
-    private static final Field PARTITIONS_V1 = PARTITIONS.withFields(
-            ERROR_CODE,
-            PARTITION_ID,
-            LEADER_EPOCH,
-            END_OFFSET);
-    private static final Field TOPICS_V1 = TOPICS.withFields(
-            TOPIC_NAME,
-            PARTITIONS_V1);
-    private static final Schema OFFSET_FOR_LEADER_EPOCH_RESPONSE_V1 = new Schema(
-            TOPICS_V1);
-
-    // V2 bumped for addition of current leader epoch to the request schema and the addition of the throttle
-    // time in the response
-    private static final Schema OFFSET_FOR_LEADER_EPOCH_RESPONSE_V2 = new Schema(
-            THROTTLE_TIME_MS,
-            TOPICS_V1);
-
-    private static final Schema OFFSET_FOR_LEADER_EPOCH_RESPONSE_V3 = OFFSET_FOR_LEADER_EPOCH_RESPONSE_V2;
-
-
-    public static Schema[] schemaVersions() {
-        return new Schema[]{OFFSET_FOR_LEADER_EPOCH_RESPONSE_V0, OFFSET_FOR_LEADER_EPOCH_RESPONSE_V1,
-            OFFSET_FOR_LEADER_EPOCH_RESPONSE_V2, OFFSET_FOR_LEADER_EPOCH_RESPONSE_V3};
+
+    private final OffsetForLeaderEpochResponseData data;
+
+    public OffsetsForLeaderEpochResponse(OffsetForLeaderEpochResponseData data) {
+        this.data = data;
     }
 
-    private final int throttleTimeMs;
-    private final Map<TopicPartition, EpochEndOffset> epochEndOffsetsByPartition;
-
-    public OffsetsForLeaderEpochResponse(Struct struct) {
-        this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
-        this.epochEndOffsetsByPartition = new HashMap<>();
-        for (Object topicAndEpocsObj : struct.get(TOPICS)) {
-            Struct topicAndEpochs = (Struct) topicAndEpocsObj;
-            String topic = topicAndEpochs.get(TOPIC_NAME);
-            for (Object partitionAndEpochObj : topicAndEpochs.get(PARTITIONS)) {
-                Struct partitionAndEpoch = (Struct) partitionAndEpochObj;
-                Errors error = Errors.forCode(partitionAndEpoch.get(ERROR_CODE));
-                int partitionId = partitionAndEpoch.get(PARTITION_ID);
-                TopicPartition tp = new TopicPartition(topic, partitionId);
-                int leaderEpoch = partitionAndEpoch.getOrElse(LEADER_EPOCH, RecordBatch.NO_PARTITION_LEADER_EPOCH);
-                long endOffset = partitionAndEpoch.get(END_OFFSET);
-                epochEndOffsetsByPartition.put(tp, new EpochEndOffset(error, leaderEpoch, endOffset));
-            }
-        }
+    public OffsetsForLeaderEpochResponse(Struct struct, short version) {
+        data = new OffsetForLeaderEpochResponseData(struct, version);
+    }
+
+    public OffsetsForLeaderEpochResponse(Map<TopicPartition, EpochEndOffset> offsets) {
+        this(0, offsets);
     }
 
-    public OffsetsForLeaderEpochResponse(Map<TopicPartition, EpochEndOffset> epochsByTopic) {
-        this(DEFAULT_THROTTLE_TIME, epochsByTopic);
+    public OffsetsForLeaderEpochResponse(int throttleTimeMs, Map<TopicPartition, EpochEndOffset> offsets) {
+        data = new OffsetForLeaderEpochResponseData();
+        data.setThrottleTimeMs(throttleTimeMs);
+
+        offsets.forEach((tp, offset) -> {
+            OffsetForLeaderTopicResult topic = data.topics().find(tp.topic());
+            if (topic == null) {
+                topic = new OffsetForLeaderTopicResult().setTopic(tp.topic());
+                data.topics().add(topic);
+            }
+            topic.partitions().add(new OffsetForLeaderPartitionResult()
+                .setPartition(tp.partition())
+                .setErrorCode(offset.error().code())
+                .setLeaderEpoch(offset.leaderEpoch())
+                .setEndOffset(offset.endOffset()));
+        });
     }
 
-    public OffsetsForLeaderEpochResponse(int throttleTimeMs, Map<TopicPartition, EpochEndOffset> epochsByTopic) {
-        this.throttleTimeMs = throttleTimeMs;
-        this.epochEndOffsetsByPartition = epochsByTopic;
+    public OffsetForLeaderEpochResponseData data() {
+        return data;
     }
 
     public Map<TopicPartition, EpochEndOffset> responses() {

Review comment:
       I hope we can get rid of those conversion in the future :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org