You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/05/18 10:27:58 UTC

kafka git commit: TRIVIAL: Remove redundant asMap utility in ConsumerProtocol

Repository: kafka
Updated Branches:
  refs/heads/trunk 9fa0d52ca -> 14a3d69d9


TRIVIAL: Remove redundant asMap utility in ConsumerProtocol

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #3084 from hachikuji/trivial-remove-redundant-utility


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/14a3d69d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/14a3d69d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/14a3d69d

Branch: refs/heads/trunk
Commit: 14a3d69d9463b2aa26776329610f8fc5eddd0202
Parents: 9fa0d52
Author: Jason Gustafson <ja...@confluent.io>
Authored: Thu May 18 11:27:56 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Thu May 18 11:27:56 2017 +0100

----------------------------------------------------------------------
 .../consumer/internals/ConsumerProtocol.java    | 20 +++-----------------
 1 file changed, 3 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/14a3d69d/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
index f8be9a0..920c295 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
@@ -23,11 +23,10 @@ import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.SchemaException;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.protocol.types.Type;
+import org.apache.kafka.common.utils.CollectionUtils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -124,7 +123,8 @@ public class ConsumerProtocol {
         Struct struct = new Struct(ASSIGNMENT_V0);
         struct.set(USER_DATA_KEY_NAME, assignment.userData());
         List<Struct> topicAssignments = new ArrayList<>();
-        for (Map.Entry<String, List<Integer>> topicEntry : asMap(assignment.partitions()).entrySet()) {
+        Map<String, List<Integer>> partitionsByTopic = CollectionUtils.groupDataByTopic(assignment.partitions());
+        for (Map.Entry<String, List<Integer>> topicEntry : partitionsByTopic.entrySet()) {
             Struct topicAssignment = new Struct(TOPIC_ASSIGNMENT_V0);
             topicAssignment.set(TOPIC_KEY_NAME, topicEntry.getKey());
             topicAssignment.set(PARTITIONS_KEY_NAME, topicEntry.getValue().toArray());
@@ -146,18 +146,4 @@ public class ConsumerProtocol {
         // otherwise, assume versions can be parsed as V0
     }
 
-    private static Map<String, List<Integer>> asMap(Collection<TopicPartition> partitions) {
-        Map<String, List<Integer>> partitionMap = new HashMap<>();
-        for (TopicPartition partition : partitions) {
-            String topic = partition.topic();
-            List<Integer> topicPartitions = partitionMap.get(topic);
-            if (topicPartitions == null) {
-                topicPartitions = new ArrayList<>();
-                partitionMap.put(topic, topicPartitions);
-            }
-            topicPartitions.add(partition.partition());
-        }
-        return partitionMap;
-    }
-
 }