You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/05/18 10:27:58 UTC
kafka git commit: TRIVIAL: Remove redundant asMap utility in
ConsumerProtocol
Repository: kafka
Updated Branches:
refs/heads/trunk 9fa0d52ca -> 14a3d69d9
TRIVIAL: Remove redundant asMap utility in ConsumerProtocol
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #3084 from hachikuji/trivial-remove-redundant-utility
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/14a3d69d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/14a3d69d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/14a3d69d
Branch: refs/heads/trunk
Commit: 14a3d69d9463b2aa26776329610f8fc5eddd0202
Parents: 9fa0d52
Author: Jason Gustafson <ja...@confluent.io>
Authored: Thu May 18 11:27:56 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Thu May 18 11:27:56 2017 +0100
----------------------------------------------------------------------
.../consumer/internals/ConsumerProtocol.java | 20 +++-----------------
1 file changed, 3 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/14a3d69d/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
index f8be9a0..920c295 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
@@ -23,11 +23,10 @@ import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.protocol.types.Type;
+import org.apache.kafka.common.utils.CollectionUtils;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -124,7 +123,8 @@ public class ConsumerProtocol {
Struct struct = new Struct(ASSIGNMENT_V0);
struct.set(USER_DATA_KEY_NAME, assignment.userData());
List<Struct> topicAssignments = new ArrayList<>();
- for (Map.Entry<String, List<Integer>> topicEntry : asMap(assignment.partitions()).entrySet()) {
+ Map<String, List<Integer>> partitionsByTopic = CollectionUtils.groupDataByTopic(assignment.partitions());
+ for (Map.Entry<String, List<Integer>> topicEntry : partitionsByTopic.entrySet()) {
Struct topicAssignment = new Struct(TOPIC_ASSIGNMENT_V0);
topicAssignment.set(TOPIC_KEY_NAME, topicEntry.getKey());
topicAssignment.set(PARTITIONS_KEY_NAME, topicEntry.getValue().toArray());
@@ -146,18 +146,4 @@ public class ConsumerProtocol {
// otherwise, assume versions can be parsed as V0
}
- private static Map<String, List<Integer>> asMap(Collection<TopicPartition> partitions) {
- Map<String, List<Integer>> partitionMap = new HashMap<>();
- for (TopicPartition partition : partitions) {
- String topic = partition.topic();
- List<Integer> topicPartitions = partitionMap.get(topic);
- if (topicPartitions == null) {
- topicPartitions = new ArrayList<>();
- partitionMap.put(topic, topicPartitions);
- }
- topicPartitions.add(partition.partition());
- }
- return partitionMap;
- }
-
}