You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/04/07 11:24:17 UTC
kafka git commit: MINOR: CollectionUtils.groupDataByTopic in
OffsetsForLeaderEpochRequest/Response
Repository: kafka
Updated Branches:
refs/heads/trunk 865d82af2 -> 359a68510
MINOR: CollectionUtils.groupDataByTopic in OffsetsForLeaderEpochRequest/Response
Author: Ben Stopford <be...@gmail.com>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #2821 from benstopford/kip-101-cleanup-group-by
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/359a6851
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/359a6851
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/359a6851
Branch: refs/heads/trunk
Commit: 359a68510801a22630a7af275c9935fb2d4c8dbf
Parents: 865d82a
Author: Ben Stopford <be...@gmail.com>
Authored: Fri Apr 7 12:11:25 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri Apr 7 12:19:49 2017 +0100
----------------------------------------------------------------------
.../requests/OffsetsForLeaderEpochRequest.java | 77 +++++------------
.../requests/OffsetsForLeaderEpochResponse.java | 90 ++++++--------------
2 files changed, 46 insertions(+), 121 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/359a6851/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
index 38fabf0..3c285f1 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.CollectionUtils;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -84,11 +85,11 @@ public class OffsetsForLeaderEpochRequest extends AbstractRequest {
public OffsetsForLeaderEpochRequest(Struct struct, short version) {
super(version);
epochsByPartition = new HashMap<>();
- for (Object t : struct.getArray(TOPICS)) {
- Struct topicAndEpochs = (Struct) t;
+ for (Object topicAndEpochsObj : struct.getArray(TOPICS)) {
+ Struct topicAndEpochs = (Struct) topicAndEpochsObj;
String topic = topicAndEpochs.getString(TOPIC);
- for (Object e : topicAndEpochs.getArray(PARTITIONS)) {
- Struct partitionAndEpoch = (Struct) e;
+ for (Object partitionAndEpochObj : topicAndEpochs.getArray(PARTITIONS)) {
+ Struct partitionAndEpoch = (Struct) partitionAndEpochObj;
int partitionId = partitionAndEpoch.getInt(PARTITION_ID);
int epoch = partitionAndEpoch.getInt(LEADER_EPOCH);
TopicPartition tp = new TopicPartition(topic, partitionId);
@@ -103,36 +104,26 @@ public class OffsetsForLeaderEpochRequest extends AbstractRequest {
@Override
protected Struct toStruct() {
- Struct struct = new Struct(ApiKeys.OFFSET_FOR_LEADER_EPOCH.requestSchema(version()));
+ Struct requestStruct = new Struct(ApiKeys.OFFSET_FOR_LEADER_EPOCH.requestSchema(version()));
- //Group by topic
- Map<String, List<PartitionLeaderEpoch>> topicsToPartitionEpochs = new HashMap<>();
- for (TopicPartition tp : epochsByPartition.keySet()) {
- List<PartitionLeaderEpoch> partitionEndOffsets = topicsToPartitionEpochs.get(tp.topic());
- if (partitionEndOffsets == null)
- partitionEndOffsets = new ArrayList<>();
- partitionEndOffsets.add(new PartitionLeaderEpoch(tp.partition(), epochsByPartition.get(tp)));
- topicsToPartitionEpochs.put(tp.topic(), partitionEndOffsets);
- }
+ Map<String, Map<Integer, Integer>> topicsToPartitionEpochs = CollectionUtils.groupDataByTopic(epochsByPartition);
List<Struct> topics = new ArrayList<>();
- for (Map.Entry<String, List<PartitionLeaderEpoch>> topicEpochs : topicsToPartitionEpochs.entrySet()) {
- Struct partition = struct.instance(TOPICS);
- String topic = topicEpochs.getKey();
- partition.set(TOPIC, topic);
- List<PartitionLeaderEpoch> partitionLeaderEpoches = topicEpochs.getValue();
- List<Struct> partitions = new ArrayList<>(partitionLeaderEpoches.size());
- for (PartitionLeaderEpoch partitionLeaderEpoch : partitionLeaderEpoches) {
- Struct partitionRow = partition.instance(PARTITIONS);
- partitionRow.set(PARTITION_ID, partitionLeaderEpoch.partitionId);
- partitionRow.set(LEADER_EPOCH, partitionLeaderEpoch.epoch);
- partitions.add(partitionRow);
+ for (Map.Entry<String, Map<Integer, Integer>> topicToEpochs : topicsToPartitionEpochs.entrySet()) {
+ Struct topicsStruct = requestStruct.instance(TOPICS);
+ topicsStruct.set(TOPIC, topicToEpochs.getKey());
+ List<Struct> partitions = new ArrayList<>();
+ for (Map.Entry<Integer, Integer> partitionEpoch : topicToEpochs.getValue().entrySet()) {
+ Struct partitionStruct = topicsStruct.instance(PARTITIONS);
+ partitionStruct.set(PARTITION_ID, partitionEpoch.getKey());
+ partitionStruct.set(LEADER_EPOCH, partitionEpoch.getValue());
+ partitions.add(partitionStruct);
}
- partition.set(PARTITIONS, partitions.toArray());
- topics.add(partition);
+ topicsStruct.set(PARTITIONS, partitions.toArray());
+ topics.add(topicsStruct);
}
- struct.set(TOPICS, topics.toArray());
- return struct;
+ requestStruct.set(TOPICS, topics.toArray());
+ return requestStruct;
}
@Override
@@ -144,32 +135,4 @@ public class OffsetsForLeaderEpochRequest extends AbstractRequest {
}
return new OffsetsForLeaderEpochResponse(errorResponse);
}
-
- private static class PartitionLeaderEpoch {
- final int partitionId;
- final int epoch;
-
- public PartitionLeaderEpoch(int partitionId, int epoch) {
- this.partitionId = partitionId;
- this.epoch = epoch;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- PartitionLeaderEpoch other = (PartitionLeaderEpoch) o;
-
- if (partitionId != other.partitionId) return false;
- return epoch == other.epoch;
- }
-
- @Override
- public int hashCode() {
- int result = partitionId;
- result = 31 * result + epoch;
- return result;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/359a6851/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
index 03f3069..4195b77 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.CollectionUtils;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -39,11 +40,11 @@ public class OffsetsForLeaderEpochResponse extends AbstractResponse {
public OffsetsForLeaderEpochResponse(Struct struct) {
epochEndOffsetsByPartition = new HashMap<>();
- for (Object t : struct.getArray(TOPICS)) {
- Struct topicAndEpochs = (Struct) t;
+ for (Object topicAndEpocsObj : struct.getArray(TOPICS)) {
+ Struct topicAndEpochs = (Struct) topicAndEpocsObj;
String topic = topicAndEpochs.getString(TOPIC);
- for (Object e : topicAndEpochs.getArray(PARTITIONS)) {
- Struct partitionAndEpoch = (Struct) e;
+ for (Object partitionAndEpochObj : topicAndEpochs.getArray(PARTITIONS)) {
+ Struct partitionAndEpoch = (Struct) partitionAndEpochObj;
Errors error = Errors.forCode(partitionAndEpoch.getShort(ERROR_CODE));
int partitionId = partitionAndEpoch.getInt(PARTITION_ID);
TopicPartition tp = new TopicPartition(topic, partitionId);
@@ -67,66 +68,27 @@ public class OffsetsForLeaderEpochResponse extends AbstractResponse {
@Override
protected Struct toStruct(short version) {
- Struct struct = new Struct(ApiKeys.OFFSET_FOR_LEADER_EPOCH.responseSchema(version));
-
- //Group by topic
- Map<String, List<PartitionEndOffset>> topicsToPartitionEndOffsets = new HashMap<>();
- for (TopicPartition tp : epochEndOffsetsByPartition.keySet()) {
- List<PartitionEndOffset> partitionEndOffsets = topicsToPartitionEndOffsets.get(tp.topic());
- if (partitionEndOffsets == null)
- partitionEndOffsets = new ArrayList<>();
- partitionEndOffsets.add(new PartitionEndOffset(tp.partition(), epochEndOffsetsByPartition.get(tp)));
- topicsToPartitionEndOffsets.put(tp.topic(), partitionEndOffsets);
- }
-
- //Write struct
- List<Struct> topics = new ArrayList<>(topicsToPartitionEndOffsets.size());
- for (Map.Entry<String, List<PartitionEndOffset>> topicEpochs : topicsToPartitionEndOffsets.entrySet()) {
- Struct partition = struct.instance(TOPICS);
- String topic = topicEpochs.getKey();
- partition.set(TOPIC, topic);
- List<PartitionEndOffset> paritionEpochs = topicEpochs.getValue();
- List<Struct> paritions = new ArrayList<>(paritionEpochs.size());
- for (PartitionEndOffset peo : paritionEpochs) {
- Struct partitionRow = partition.instance(PARTITIONS);
- partitionRow.set(ERROR_CODE, peo.epochEndOffset.error().code());
- partitionRow.set(PARTITION_ID, peo.partition);
- partitionRow.set(END_OFFSET, peo.epochEndOffset.endOffset());
- paritions.add(partitionRow);
+ Struct responseStruct = new Struct(ApiKeys.OFFSET_FOR_LEADER_EPOCH.responseSchema(version));
+
+ Map<String, Map<Integer, EpochEndOffset>> endOffsetsByTopic = CollectionUtils.groupDataByTopic(epochEndOffsetsByPartition);
+
+ List<Struct> topics = new ArrayList<>(endOffsetsByTopic.size());
+ for (Map.Entry<String, Map<Integer, EpochEndOffset>> topicToPartitionEpochs : endOffsetsByTopic.entrySet()) {
+ Struct topicStruct = responseStruct.instance(TOPICS);
+ topicStruct.set(TOPIC, topicToPartitionEpochs.getKey());
+ Map<Integer, EpochEndOffset> partitionEpochs = topicToPartitionEpochs.getValue();
+ List<Struct> partitions = new ArrayList<>();
+ for (Map.Entry<Integer, EpochEndOffset> partitionEndOffset : partitionEpochs.entrySet()) {
+ Struct partitionStruct = topicStruct.instance(PARTITIONS);
+ partitionStruct.set(ERROR_CODE, partitionEndOffset.getValue().error().code());
+ partitionStruct.set(PARTITION_ID, partitionEndOffset.getKey());
+ partitionStruct.set(END_OFFSET, partitionEndOffset.getValue().endOffset());
+ partitions.add(partitionStruct);
}
-
- partition.set(PARTITIONS, paritions.toArray());
- topics.add(partition);
- }
- struct.set(TOPICS, topics.toArray());
- return struct;
- }
-
- private class PartitionEndOffset {
- private int partition;
- private EpochEndOffset epochEndOffset;
-
- PartitionEndOffset(int partition, EpochEndOffset epochEndOffset) {
- this.partition = partition;
- this.epochEndOffset = epochEndOffset;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- PartitionEndOffset that = (PartitionEndOffset) o;
-
- if (partition != that.partition) return false;
- return epochEndOffset != null ? epochEndOffset.equals(that.epochEndOffset) : that.epochEndOffset == null;
- }
-
- @Override
- public int hashCode() {
- int result = partition;
- result = 31 * result + (epochEndOffset != null ? epochEndOffset.hashCode() : 0);
- return result;
+ topicStruct.set(PARTITIONS, partitions.toArray());
+ topics.add(topicStruct);
}
+ responseStruct.set(TOPICS, topics.toArray());
+ return responseStruct;
}
-}
\ No newline at end of file
+}