You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ch...@apache.org on 2021/02/17 04:16:05 UTC

[kafka] branch trunk updated: MINOR: use 'mapKey' to avoid unnecessary grouped data (#10082)

This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f5c2f60  MINOR: use 'mapKey' to avoid unnecessary grouped data (#10082)
f5c2f60 is described below

commit f5c2f608b05c8094a898c14a6c4a3238f6388df1
Author: Chia-Ping Tsai <ch...@gmail.com>
AuthorDate: Wed Feb 17 12:14:08 2021 +0800

    MINOR: use 'mapKey' to avoid unnecessary grouped data (#10082)
    
    1. add 'mapKey=true' to DescribeLogDirsRequest
    2. rename PartitionIndex to Partitions for DescribeLogDirsRequest
    3. add 'mapKey=true' to ElectLeadersRequest
    4. rename PartitionId to Partitions for ElectLeadersRequest
    5. add 'mapKey=true' to ConsumerProtocolAssignment
    
    Reviewers: David Jacot <dj...@confluent.io>, Ismael Juma <is...@juma.me.uk>
---
 .../kafka/clients/admin/KafkaAdminClient.java      | 10 +++----
 .../consumer/internals/ConsumerProtocol.java       | 32 +++++++++++-----------
 .../kafka/common/requests/ElectLeadersRequest.java | 15 ++++++----
 .../common/message/ConsumerProtocolAssignment.json |  2 +-
 .../message/ConsumerProtocolSubscription.json      |  2 +-
 .../common/message/DescribeLogDirsRequest.json     |  2 +-
 .../common/message/ElectLeadersRequest.json        |  4 +--
 core/src/main/scala/kafka/api/package.scala        |  2 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |  2 +-
 .../kafka/api/AuthorizerIntegrationTest.scala      |  2 +-
 .../scala/unit/kafka/server/RequestQuotaTest.scala |  2 +-
 11 files changed, 39 insertions(+), 36 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 3cad328..3296fb8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -2578,13 +2578,13 @@ public class KafkaAdminClient extends AdminClient {
                 brokerId -> new DescribeLogDirsRequestData());
             DescribableLogDirTopic describableLogDirTopic = requestData.topics().find(replica.topic());
             if (describableLogDirTopic == null) {
-                List<Integer> partitionIndex = new ArrayList<>();
-                partitionIndex.add(replica.partition());
+                List<Integer> partitions = new ArrayList<>();
+                partitions.add(replica.partition());
                 describableLogDirTopic = new DescribableLogDirTopic().setTopic(replica.topic())
-                        .setPartitionIndex(partitionIndex);
+                        .setPartitions(partitions);
                 requestData.topics().add(describableLogDirTopic);
             } else {
-                describableLogDirTopic.partitionIndex().add(replica.partition());
+                describableLogDirTopic.partitions().add(replica.partition());
             }
         }
 
@@ -2594,7 +2594,7 @@ public class KafkaAdminClient extends AdminClient {
             final DescribeLogDirsRequestData topicPartitions = entry.getValue();
             final Map<TopicPartition, ReplicaLogDirInfo> replicaDirInfoByPartition = new HashMap<>();
             for (DescribableLogDirTopic topicPartition: topicPartitions.topics()) {
-                for (Integer partitionId : topicPartition.partitionIndex()) {
+                for (Integer partitionId : topicPartition.partitions()) {
                     replicaDirInfoByPartition.put(new TopicPartition(topicPartition.topic(), partitionId), new ReplicaLogDirInfo());
                 }
             }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
index a05e871..a9c7430 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
@@ -25,12 +25,10 @@ import org.apache.kafka.common.message.ConsumerProtocolSubscription;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
 import org.apache.kafka.common.protocol.MessageUtil;
 import org.apache.kafka.common.protocol.types.SchemaException;
-import org.apache.kafka.common.utils.CollectionUtils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 
 /**
  * ConsumerProtocol contains the schemas for consumer subscriptions and assignments for use with
@@ -74,13 +72,14 @@ public class ConsumerProtocol {
         ConsumerProtocolSubscription data = new ConsumerProtocolSubscription();
         data.setTopics(subscription.topics());
         data.setUserData(subscription.userData() != null ? subscription.userData().duplicate() : null);
-        Map<String, List<Integer>> partitionsByTopic = CollectionUtils.groupPartitionsByTopic(subscription.ownedPartitions());
-        for (Map.Entry<String, List<Integer>> topicEntry : partitionsByTopic.entrySet()) {
-            data.ownedPartitions().add(new ConsumerProtocolSubscription.TopicPartition()
-                .setTopic(topicEntry.getKey())
-                .setPartitions(topicEntry.getValue()));
-        }
-
+        subscription.ownedPartitions().forEach(tp -> {
+            ConsumerProtocolSubscription.TopicPartition partition = data.ownedPartitions().find(tp.topic());
+            if (partition == null) {
+                partition = new ConsumerProtocolSubscription.TopicPartition().setTopic(tp.topic());
+                data.ownedPartitions().add(partition);
+            }
+            partition.partitions().add(tp.partition());
+        });
         return MessageUtil.toVersionPrefixedByteBuffer(version, data);
     }
 
@@ -120,13 +119,14 @@ public class ConsumerProtocol {
 
         ConsumerProtocolAssignment data = new ConsumerProtocolAssignment();
         data.setUserData(assignment.userData() != null ? assignment.userData().duplicate() : null);
-        Map<String, List<Integer>> partitionsByTopic = CollectionUtils.groupPartitionsByTopic(assignment.partitions());
-        for (Map.Entry<String, List<Integer>> topicEntry : partitionsByTopic.entrySet()) {
-            data.assignedPartitions().add(new ConsumerProtocolAssignment.TopicPartition()
-                .setTopic(topicEntry.getKey())
-                .setPartitions(topicEntry.getValue()));
-        }
-
+        assignment.partitions().forEach(tp -> {
+            ConsumerProtocolAssignment.TopicPartition partition = data.assignedPartitions().find(tp.topic());
+            if (partition == null) {
+                partition = new ConsumerProtocolAssignment.TopicPartition().setTopic(tp.topic());
+                data.assignedPartitions().add(partition);
+            }
+            partition.partitions().add(tp.partition());
+        });
         return MessageUtil.toVersionPrefixedByteBuffer(version, data);
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java
index 92f6b45..1bc888a 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java
@@ -21,7 +21,6 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import java.util.Map;
 import org.apache.kafka.common.ElectionType;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
@@ -32,7 +31,6 @@ import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionR
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
 import org.apache.kafka.common.protocol.MessageUtil;
-import org.apache.kafka.common.utils.CollectionUtils;
 
 public class ElectLeadersRequest extends AbstractRequest {
     public static class Builder extends AbstractRequest.Builder<ElectLeadersRequest> {
@@ -70,9 +68,14 @@ public class ElectLeadersRequest extends AbstractRequest {
                 .setTimeoutMs(timeoutMs);
 
             if (topicPartitions != null) {
-                for (Map.Entry<String, List<Integer>> tp : CollectionUtils.groupPartitionsByTopic(topicPartitions).entrySet()) {
-                    data.topicPartitions().add(new ElectLeadersRequestData.TopicPartitions().setTopic(tp.getKey()).setPartitionId(tp.getValue()));
-                }
+                topicPartitions.forEach(tp -> {
+                    ElectLeadersRequestData.TopicPartitions tps = data.topicPartitions().find(tp.topic());
+                    if (tps == null) {
+                        tps = new ElectLeadersRequestData.TopicPartitions().setTopic(tp.topic());
+                        data.topicPartitions().add(tps);
+                    }
+                    tps.partitions().add(tp.partition());
+                });
             } else {
                 data.setTopicPartitions(null);
             }
@@ -104,7 +107,7 @@ public class ElectLeadersRequest extends AbstractRequest {
             ReplicaElectionResult electionResult = new ReplicaElectionResult();
 
             electionResult.setTopic(topic.topic());
-            for (Integer partitionId : topic.partitionId()) {
+            for (Integer partitionId : topic.partitions()) {
                 PartitionResult partitionResult = new PartitionResult();
                 partitionResult.setPartitionId(partitionId);
                 partitionResult.setErrorCode(apiError.error().code());
diff --git a/clients/src/main/resources/common/message/ConsumerProtocolAssignment.json b/clients/src/main/resources/common/message/ConsumerProtocolAssignment.json
index 2ad373a..544db20 100644
--- a/clients/src/main/resources/common/message/ConsumerProtocolAssignment.json
+++ b/clients/src/main/resources/common/message/ConsumerProtocolAssignment.json
@@ -25,7 +25,7 @@
   "fields": [
     { "name": "AssignedPartitions", "type": "[]TopicPartition", "versions": "0+",
       "fields": [
-        { "name": "Topic", "type": "string", "versions": "0+" },
+        { "name": "Topic", "type": "string", "mapKey": true, "versions": "0+" },
         { "name": "Partitions", "type": "[]int32", "versions": "0+" }
       ]
     },
diff --git a/clients/src/main/resources/common/message/ConsumerProtocolSubscription.json b/clients/src/main/resources/common/message/ConsumerProtocolSubscription.json
index fa4c371..207dac7 100644
--- a/clients/src/main/resources/common/message/ConsumerProtocolSubscription.json
+++ b/clients/src/main/resources/common/message/ConsumerProtocolSubscription.json
@@ -28,7 +28,7 @@
       "default": "null", "zeroCopy": true },
     { "name": "OwnedPartitions", "type": "[]TopicPartition", "versions": "1+", "ignorable": true,
       "fields": [
-        { "name": "Topic", "type": "string", "versions": "1+" },
+        { "name": "Topic", "type": "string", "mapKey": true, "versions": "1+" },
         { "name": "Partitions", "type": "[]int32", "versions": "1+"}
       ]
     }
diff --git a/clients/src/main/resources/common/message/DescribeLogDirsRequest.json b/clients/src/main/resources/common/message/DescribeLogDirsRequest.json
index 577f2eb..c498e0f 100644
--- a/clients/src/main/resources/common/message/DescribeLogDirsRequest.json
+++ b/clients/src/main/resources/common/message/DescribeLogDirsRequest.json
@@ -26,7 +26,7 @@
       "about": "Each topic that we want to describe log directories for, or null for all topics.", "fields": [
       { "name": "Topic", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true,
         "about": "The topic name" },
-      { "name": "PartitionIndex", "type": "[]int32", "versions": "0+",
+      { "name": "Partitions", "type": "[]int32", "versions": "0+",
         "about": "The partition indxes." }
     ]}
   ]
diff --git a/clients/src/main/resources/common/message/ElectLeadersRequest.json b/clients/src/main/resources/common/message/ElectLeadersRequest.json
index 5b86c96..a2ba2bd 100644
--- a/clients/src/main/resources/common/message/ElectLeadersRequest.json
+++ b/clients/src/main/resources/common/message/ElectLeadersRequest.json
@@ -28,9 +28,9 @@
     { "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+", "nullableVersions": "0+",
       "about": "The topic partitions to elect leaders.",
       "fields": [
-        { "name": "Topic", "type": "string", "versions": "0+", "entityType": "topicName",
+        { "name": "Topic", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true,
           "about": "The name of a topic." },
-        { "name": "PartitionId", "type": "[]int32", "versions": "0+",
+        { "name": "Partitions", "type": "[]int32", "versions": "0+",
           "about": "The partitions of this topic whose leader should be elected." }
       ]
     },
diff --git a/core/src/main/scala/kafka/api/package.scala b/core/src/main/scala/kafka/api/package.scala
index 11a956d..e0678f8 100644
--- a/core/src/main/scala/kafka/api/package.scala
+++ b/core/src/main/scala/kafka/api/package.scala
@@ -28,7 +28,7 @@ package object api {
         Set.empty
       } else {
         self.data.topicPartitions.asScala.iterator.flatMap { topicPartition =>
-          topicPartition.partitionId.asScala.map { partitionId =>
+          topicPartition.partitions.asScala.map { partitionId =>
             new TopicPartition(topicPartition.topic, partitionId)
           }
         }.toSet
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 5d71c8e..ffb1b8e 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -2772,7 +2772,7 @@ class KafkaApis(val requestChannel: RequestChannel,
             replicaManager.logManager.allLogs.map(_.topicPartition).toSet
           else
             describeLogDirsDirRequest.data.topics.asScala.flatMap(
-              logDirTopic => logDirTopic.partitionIndex.asScala.map(partitionIndex =>
+              logDirTopic => logDirTopic.partitions.asScala.map(partitionIndex =>
                 new TopicPartition(logDirTopic.topic, partitionIndex))).toSet
 
         replicaManager.describeLogDirs(partitions)
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index fec75eb..66f25ee 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -609,7 +609,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   }
 
   private def describeLogDirsRequest = new DescribeLogDirsRequest.Builder(new DescribeLogDirsRequestData().setTopics(new DescribeLogDirsRequestData.DescribableLogDirTopicCollection(Collections.singleton(
-    new DescribeLogDirsRequestData.DescribableLogDirTopic().setTopic(tp.topic).setPartitionIndex(Collections.singletonList(tp.partition))).iterator()))).build()
+    new DescribeLogDirsRequestData.DescribableLogDirTopic().setTopic(tp.topic).setPartitions(Collections.singletonList(tp.partition))).iterator()))).build()
 
   private def addPartitionsToTxnRequest = new AddPartitionsToTxnRequest.Builder(transactionalId, 1, 1, Collections.singletonList(tp)).build()
 
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 5b1363f..1924034 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -507,7 +507,7 @@ class RequestQuotaTest extends BaseRequestTest {
           val data = new DescribeLogDirsRequestData()
           data.topics.add(new DescribeLogDirsRequestData.DescribableLogDirTopic()
             .setTopic(tp.topic)
-            .setPartitionIndex(Collections.singletonList(tp.partition)))
+            .setPartitions(Collections.singletonList(tp.partition)))
           new DescribeLogDirsRequest.Builder(data)
 
         case ApiKeys.CREATE_PARTITIONS =>