You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/04/07 11:24:17 UTC

kafka git commit: MINOR: CollectionUtils.groupDataByTopic in OffsetsForLeaderEpochRequest/Response

Repository: kafka
Updated Branches:
  refs/heads/trunk 865d82af2 -> 359a68510


MINOR: CollectionUtils.groupDataByTopic in OffsetsForLeaderEpochRequest/Response

Author: Ben Stopford <be...@gmail.com>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #2821 from benstopford/kip-101-cleanup-group-by


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/359a6851
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/359a6851
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/359a6851

Branch: refs/heads/trunk
Commit: 359a68510801a22630a7af275c9935fb2d4c8dbf
Parents: 865d82a
Author: Ben Stopford <be...@gmail.com>
Authored: Fri Apr 7 12:11:25 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri Apr 7 12:19:49 2017 +0100

----------------------------------------------------------------------
 .../requests/OffsetsForLeaderEpochRequest.java  | 77 +++++------------
 .../requests/OffsetsForLeaderEpochResponse.java | 90 ++++++--------------
 2 files changed, 46 insertions(+), 121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/359a6851/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
index 38fabf0..3c285f1 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.CollectionUtils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -84,11 +85,11 @@ public class OffsetsForLeaderEpochRequest extends AbstractRequest {
     public OffsetsForLeaderEpochRequest(Struct struct, short version) {
         super(version);
         epochsByPartition = new HashMap<>();
-        for (Object t : struct.getArray(TOPICS)) {
-            Struct topicAndEpochs = (Struct) t;
+        for (Object topicAndEpochsObj : struct.getArray(TOPICS)) {
+            Struct topicAndEpochs = (Struct) topicAndEpochsObj;
             String topic = topicAndEpochs.getString(TOPIC);
-            for (Object e : topicAndEpochs.getArray(PARTITIONS)) {
-                Struct partitionAndEpoch = (Struct) e;
+            for (Object partitionAndEpochObj : topicAndEpochs.getArray(PARTITIONS)) {
+                Struct partitionAndEpoch = (Struct) partitionAndEpochObj;
                 int partitionId = partitionAndEpoch.getInt(PARTITION_ID);
                 int epoch = partitionAndEpoch.getInt(LEADER_EPOCH);
                 TopicPartition tp = new TopicPartition(topic, partitionId);
@@ -103,36 +104,26 @@ public class OffsetsForLeaderEpochRequest extends AbstractRequest {
 
     @Override
     protected Struct toStruct() {
-        Struct struct = new Struct(ApiKeys.OFFSET_FOR_LEADER_EPOCH.requestSchema(version()));
+        Struct requestStruct = new Struct(ApiKeys.OFFSET_FOR_LEADER_EPOCH.requestSchema(version()));
 
-        //Group by topic
-        Map<String, List<PartitionLeaderEpoch>> topicsToPartitionEpochs = new HashMap<>();
-        for (TopicPartition tp : epochsByPartition.keySet()) {
-            List<PartitionLeaderEpoch> partitionEndOffsets = topicsToPartitionEpochs.get(tp.topic());
-            if (partitionEndOffsets == null)
-                partitionEndOffsets = new ArrayList<>();
-            partitionEndOffsets.add(new PartitionLeaderEpoch(tp.partition(), epochsByPartition.get(tp)));
-            topicsToPartitionEpochs.put(tp.topic(), partitionEndOffsets);
-        }
+        Map<String, Map<Integer, Integer>> topicsToPartitionEpochs = CollectionUtils.groupDataByTopic(epochsByPartition);
 
         List<Struct> topics = new ArrayList<>();
-        for (Map.Entry<String, List<PartitionLeaderEpoch>> topicEpochs : topicsToPartitionEpochs.entrySet()) {
-            Struct partition = struct.instance(TOPICS);
-            String topic = topicEpochs.getKey();
-            partition.set(TOPIC, topic);
-            List<PartitionLeaderEpoch> partitionLeaderEpoches = topicEpochs.getValue();
-            List<Struct> partitions = new ArrayList<>(partitionLeaderEpoches.size());
-            for (PartitionLeaderEpoch partitionLeaderEpoch : partitionLeaderEpoches) {
-                Struct partitionRow = partition.instance(PARTITIONS);
-                partitionRow.set(PARTITION_ID, partitionLeaderEpoch.partitionId);
-                partitionRow.set(LEADER_EPOCH, partitionLeaderEpoch.epoch);
-                partitions.add(partitionRow);
+        for (Map.Entry<String, Map<Integer, Integer>> topicToEpochs : topicsToPartitionEpochs.entrySet()) {
+            Struct topicsStruct = requestStruct.instance(TOPICS);
+            topicsStruct.set(TOPIC, topicToEpochs.getKey());
+            List<Struct> partitions = new ArrayList<>();
+            for (Map.Entry<Integer, Integer> partitionEpoch : topicToEpochs.getValue().entrySet()) {
+                Struct partitionStruct = topicsStruct.instance(PARTITIONS);
+                partitionStruct.set(PARTITION_ID, partitionEpoch.getKey());
+                partitionStruct.set(LEADER_EPOCH, partitionEpoch.getValue());
+                partitions.add(partitionStruct);
             }
-            partition.set(PARTITIONS, partitions.toArray());
-            topics.add(partition);
+            topicsStruct.set(PARTITIONS, partitions.toArray());
+            topics.add(topicsStruct);
         }
-        struct.set(TOPICS, topics.toArray());
-        return struct;
+        requestStruct.set(TOPICS, topics.toArray());
+        return requestStruct;
     }
 
     @Override
@@ -144,32 +135,4 @@ public class OffsetsForLeaderEpochRequest extends AbstractRequest {
         }
         return new OffsetsForLeaderEpochResponse(errorResponse);
     }
-
-    private static class PartitionLeaderEpoch {
-        final int partitionId;
-        final int epoch;
-
-        public PartitionLeaderEpoch(int partitionId, int epoch) {
-            this.partitionId = partitionId;
-            this.epoch = epoch;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) return true;
-            if (o == null || getClass() != o.getClass()) return false;
-
-            PartitionLeaderEpoch other = (PartitionLeaderEpoch) o;
-
-            if (partitionId != other.partitionId) return false;
-            return epoch == other.epoch;
-        }
-
-        @Override
-        public int hashCode() {
-            int result = partitionId;
-            result = 31 * result + epoch;
-            return result;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/359a6851/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
index 03f3069..4195b77 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.CollectionUtils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -39,11 +40,11 @@ public class OffsetsForLeaderEpochResponse extends AbstractResponse {
 
     public OffsetsForLeaderEpochResponse(Struct struct) {
         epochEndOffsetsByPartition = new HashMap<>();
-        for (Object t : struct.getArray(TOPICS)) {
-            Struct topicAndEpochs = (Struct) t;
+        for (Object topicAndEpocsObj : struct.getArray(TOPICS)) {
+            Struct topicAndEpochs = (Struct) topicAndEpocsObj;
             String topic = topicAndEpochs.getString(TOPIC);
-            for (Object e : topicAndEpochs.getArray(PARTITIONS)) {
-                Struct partitionAndEpoch = (Struct) e;
+            for (Object partitionAndEpochObj : topicAndEpochs.getArray(PARTITIONS)) {
+                Struct partitionAndEpoch = (Struct) partitionAndEpochObj;
                 Errors error = Errors.forCode(partitionAndEpoch.getShort(ERROR_CODE));
                 int partitionId = partitionAndEpoch.getInt(PARTITION_ID);
                 TopicPartition tp = new TopicPartition(topic, partitionId);
@@ -67,66 +68,27 @@ public class OffsetsForLeaderEpochResponse extends AbstractResponse {
 
     @Override
     protected Struct toStruct(short version) {
-        Struct struct = new Struct(ApiKeys.OFFSET_FOR_LEADER_EPOCH.responseSchema(version));
-
-        //Group by topic
-        Map<String, List<PartitionEndOffset>> topicsToPartitionEndOffsets = new HashMap<>();
-        for (TopicPartition tp : epochEndOffsetsByPartition.keySet()) {
-            List<PartitionEndOffset> partitionEndOffsets = topicsToPartitionEndOffsets.get(tp.topic());
-            if (partitionEndOffsets == null)
-                partitionEndOffsets = new ArrayList<>();
-            partitionEndOffsets.add(new PartitionEndOffset(tp.partition(), epochEndOffsetsByPartition.get(tp)));
-            topicsToPartitionEndOffsets.put(tp.topic(), partitionEndOffsets);
-        }
-
-        //Write struct
-        List<Struct> topics = new ArrayList<>(topicsToPartitionEndOffsets.size());
-        for (Map.Entry<String, List<PartitionEndOffset>> topicEpochs : topicsToPartitionEndOffsets.entrySet()) {
-            Struct partition = struct.instance(TOPICS);
-            String topic = topicEpochs.getKey();
-            partition.set(TOPIC, topic);
-            List<PartitionEndOffset> paritionEpochs = topicEpochs.getValue();
-            List<Struct> paritions = new ArrayList<>(paritionEpochs.size());
-            for (PartitionEndOffset peo : paritionEpochs) {
-                Struct partitionRow = partition.instance(PARTITIONS);
-                partitionRow.set(ERROR_CODE, peo.epochEndOffset.error().code());
-                partitionRow.set(PARTITION_ID, peo.partition);
-                partitionRow.set(END_OFFSET, peo.epochEndOffset.endOffset());
-                paritions.add(partitionRow);
+        Struct responseStruct = new Struct(ApiKeys.OFFSET_FOR_LEADER_EPOCH.responseSchema(version));
+
+        Map<String, Map<Integer, EpochEndOffset>> endOffsetsByTopic = CollectionUtils.groupDataByTopic(epochEndOffsetsByPartition);
+
+        List<Struct> topics = new ArrayList<>(endOffsetsByTopic.size());
+        for (Map.Entry<String, Map<Integer, EpochEndOffset>> topicToPartitionEpochs : endOffsetsByTopic.entrySet()) {
+            Struct topicStruct = responseStruct.instance(TOPICS);
+            topicStruct.set(TOPIC, topicToPartitionEpochs.getKey());
+            Map<Integer, EpochEndOffset> partitionEpochs = topicToPartitionEpochs.getValue();
+            List<Struct> partitions = new ArrayList<>();
+            for (Map.Entry<Integer, EpochEndOffset> partitionEndOffset : partitionEpochs.entrySet()) {
+                Struct partitionStruct = topicStruct.instance(PARTITIONS);
+                partitionStruct.set(ERROR_CODE, partitionEndOffset.getValue().error().code());
+                partitionStruct.set(PARTITION_ID, partitionEndOffset.getKey());
+                partitionStruct.set(END_OFFSET, partitionEndOffset.getValue().endOffset());
+                partitions.add(partitionStruct);
             }
-
-            partition.set(PARTITIONS, paritions.toArray());
-            topics.add(partition);
-        }
-        struct.set(TOPICS, topics.toArray());
-        return struct;
-    }
-
-    private class PartitionEndOffset {
-        private int partition;
-        private EpochEndOffset epochEndOffset;
-
-        PartitionEndOffset(int partition, EpochEndOffset epochEndOffset) {
-            this.partition = partition;
-            this.epochEndOffset = epochEndOffset;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) return true;
-            if (o == null || getClass() != o.getClass()) return false;
-
-            PartitionEndOffset that = (PartitionEndOffset) o;
-
-            if (partition != that.partition) return false;
-            return epochEndOffset != null ? epochEndOffset.equals(that.epochEndOffset) : that.epochEndOffset == null;
-        }
-
-        @Override
-        public int hashCode() {
-            int result = partition;
-            result = 31 * result + (epochEndOffset != null ? epochEndOffset.hashCode() : 0);
-            return result;
+            topicStruct.set(PARTITIONS, partitions.toArray());
+            topics.add(topicStruct);
         }
+        responseStruct.set(TOPICS, topics.toArray());
+        return responseStruct;
     }
-}
\ No newline at end of file
+}