You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ch...@apache.org on 2021/02/17 04:16:05 UTC
[kafka] branch trunk updated: MINOR: use 'mapKey' to avoid
unnecessary grouped data (#10082)
This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f5c2f60 MINOR: use 'mapKey' to avoid unnecessary grouped data (#10082)
f5c2f60 is described below
commit f5c2f608b05c8094a898c14a6c4a3238f6388df1
Author: Chia-Ping Tsai <ch...@gmail.com>
AuthorDate: Wed Feb 17 12:14:08 2021 +0800
MINOR: use 'mapKey' to avoid unnecessary grouped data (#10082)
1. add 'mapKey=true' to DescribeLogDirsRequest
2. rename PartitionIndex to Partitions for DescribeLogDirsRequest
3. add 'mapKey=true' to ElectLeadersRequest
4. rename PartitionId to Partitions for ElectLeadersRequest
5. add 'mapKey=true' to ConsumerProtocolAssignment
Reviewers: David Jacot <dj...@confluent.io>, Ismael Juma <is...@juma.me.uk>
---
.../kafka/clients/admin/KafkaAdminClient.java | 10 +++----
.../consumer/internals/ConsumerProtocol.java | 32 +++++++++++-----------
.../kafka/common/requests/ElectLeadersRequest.java | 15 ++++++----
.../common/message/ConsumerProtocolAssignment.json | 2 +-
.../message/ConsumerProtocolSubscription.json | 2 +-
.../common/message/DescribeLogDirsRequest.json | 2 +-
.../common/message/ElectLeadersRequest.json | 4 +--
core/src/main/scala/kafka/api/package.scala | 2 +-
core/src/main/scala/kafka/server/KafkaApis.scala | 2 +-
.../kafka/api/AuthorizerIntegrationTest.scala | 2 +-
.../scala/unit/kafka/server/RequestQuotaTest.scala | 2 +-
11 files changed, 39 insertions(+), 36 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 3cad328..3296fb8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -2578,13 +2578,13 @@ public class KafkaAdminClient extends AdminClient {
brokerId -> new DescribeLogDirsRequestData());
DescribableLogDirTopic describableLogDirTopic = requestData.topics().find(replica.topic());
if (describableLogDirTopic == null) {
- List<Integer> partitionIndex = new ArrayList<>();
- partitionIndex.add(replica.partition());
+ List<Integer> partitions = new ArrayList<>();
+ partitions.add(replica.partition());
describableLogDirTopic = new DescribableLogDirTopic().setTopic(replica.topic())
- .setPartitionIndex(partitionIndex);
+ .setPartitions(partitions);
requestData.topics().add(describableLogDirTopic);
} else {
- describableLogDirTopic.partitionIndex().add(replica.partition());
+ describableLogDirTopic.partitions().add(replica.partition());
}
}
@@ -2594,7 +2594,7 @@ public class KafkaAdminClient extends AdminClient {
final DescribeLogDirsRequestData topicPartitions = entry.getValue();
final Map<TopicPartition, ReplicaLogDirInfo> replicaDirInfoByPartition = new HashMap<>();
for (DescribableLogDirTopic topicPartition: topicPartitions.topics()) {
- for (Integer partitionId : topicPartition.partitionIndex()) {
+ for (Integer partitionId : topicPartition.partitions()) {
replicaDirInfoByPartition.put(new TopicPartition(topicPartition.topic(), partitionId), new ReplicaLogDirInfo());
}
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
index a05e871..a9c7430 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
@@ -25,12 +25,10 @@ import org.apache.kafka.common.message.ConsumerProtocolSubscription;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.MessageUtil;
import org.apache.kafka.common.protocol.types.SchemaException;
-import org.apache.kafka.common.utils.CollectionUtils;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
/**
* ConsumerProtocol contains the schemas for consumer subscriptions and assignments for use with
@@ -74,13 +72,14 @@ public class ConsumerProtocol {
ConsumerProtocolSubscription data = new ConsumerProtocolSubscription();
data.setTopics(subscription.topics());
data.setUserData(subscription.userData() != null ? subscription.userData().duplicate() : null);
- Map<String, List<Integer>> partitionsByTopic = CollectionUtils.groupPartitionsByTopic(subscription.ownedPartitions());
- for (Map.Entry<String, List<Integer>> topicEntry : partitionsByTopic.entrySet()) {
- data.ownedPartitions().add(new ConsumerProtocolSubscription.TopicPartition()
- .setTopic(topicEntry.getKey())
- .setPartitions(topicEntry.getValue()));
- }
-
+ subscription.ownedPartitions().forEach(tp -> {
+ ConsumerProtocolSubscription.TopicPartition partition = data.ownedPartitions().find(tp.topic());
+ if (partition == null) {
+ partition = new ConsumerProtocolSubscription.TopicPartition().setTopic(tp.topic());
+ data.ownedPartitions().add(partition);
+ }
+ partition.partitions().add(tp.partition());
+ });
return MessageUtil.toVersionPrefixedByteBuffer(version, data);
}
@@ -120,13 +119,14 @@ public class ConsumerProtocol {
ConsumerProtocolAssignment data = new ConsumerProtocolAssignment();
data.setUserData(assignment.userData() != null ? assignment.userData().duplicate() : null);
- Map<String, List<Integer>> partitionsByTopic = CollectionUtils.groupPartitionsByTopic(assignment.partitions());
- for (Map.Entry<String, List<Integer>> topicEntry : partitionsByTopic.entrySet()) {
- data.assignedPartitions().add(new ConsumerProtocolAssignment.TopicPartition()
- .setTopic(topicEntry.getKey())
- .setPartitions(topicEntry.getValue()));
- }
-
+ assignment.partitions().forEach(tp -> {
+ ConsumerProtocolAssignment.TopicPartition partition = data.assignedPartitions().find(tp.topic());
+ if (partition == null) {
+ partition = new ConsumerProtocolAssignment.TopicPartition().setTopic(tp.topic());
+ data.assignedPartitions().add(partition);
+ }
+ partition.partitions().add(tp.partition());
+ });
return MessageUtil.toVersionPrefixedByteBuffer(version, data);
}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java
index 92f6b45..1bc888a 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java
@@ -21,7 +21,6 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import java.util.Map;
import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnsupportedVersionException;
@@ -32,7 +31,6 @@ import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionR
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.MessageUtil;
-import org.apache.kafka.common.utils.CollectionUtils;
public class ElectLeadersRequest extends AbstractRequest {
public static class Builder extends AbstractRequest.Builder<ElectLeadersRequest> {
@@ -70,9 +68,14 @@ public class ElectLeadersRequest extends AbstractRequest {
.setTimeoutMs(timeoutMs);
if (topicPartitions != null) {
- for (Map.Entry<String, List<Integer>> tp : CollectionUtils.groupPartitionsByTopic(topicPartitions).entrySet()) {
- data.topicPartitions().add(new ElectLeadersRequestData.TopicPartitions().setTopic(tp.getKey()).setPartitionId(tp.getValue()));
- }
+ topicPartitions.forEach(tp -> {
+ ElectLeadersRequestData.TopicPartitions tps = data.topicPartitions().find(tp.topic());
+ if (tps == null) {
+ tps = new ElectLeadersRequestData.TopicPartitions().setTopic(tp.topic());
+ data.topicPartitions().add(tps);
+ }
+ tps.partitions().add(tp.partition());
+ });
} else {
data.setTopicPartitions(null);
}
@@ -104,7 +107,7 @@ public class ElectLeadersRequest extends AbstractRequest {
ReplicaElectionResult electionResult = new ReplicaElectionResult();
electionResult.setTopic(topic.topic());
- for (Integer partitionId : topic.partitionId()) {
+ for (Integer partitionId : topic.partitions()) {
PartitionResult partitionResult = new PartitionResult();
partitionResult.setPartitionId(partitionId);
partitionResult.setErrorCode(apiError.error().code());
diff --git a/clients/src/main/resources/common/message/ConsumerProtocolAssignment.json b/clients/src/main/resources/common/message/ConsumerProtocolAssignment.json
index 2ad373a..544db20 100644
--- a/clients/src/main/resources/common/message/ConsumerProtocolAssignment.json
+++ b/clients/src/main/resources/common/message/ConsumerProtocolAssignment.json
@@ -25,7 +25,7 @@
"fields": [
{ "name": "AssignedPartitions", "type": "[]TopicPartition", "versions": "0+",
"fields": [
- { "name": "Topic", "type": "string", "versions": "0+" },
+ { "name": "Topic", "type": "string", "mapKey": true, "versions": "0+" },
{ "name": "Partitions", "type": "[]int32", "versions": "0+" }
]
},
diff --git a/clients/src/main/resources/common/message/ConsumerProtocolSubscription.json b/clients/src/main/resources/common/message/ConsumerProtocolSubscription.json
index fa4c371..207dac7 100644
--- a/clients/src/main/resources/common/message/ConsumerProtocolSubscription.json
+++ b/clients/src/main/resources/common/message/ConsumerProtocolSubscription.json
@@ -28,7 +28,7 @@
"default": "null", "zeroCopy": true },
{ "name": "OwnedPartitions", "type": "[]TopicPartition", "versions": "1+", "ignorable": true,
"fields": [
- { "name": "Topic", "type": "string", "versions": "1+" },
+ { "name": "Topic", "type": "string", "mapKey": true, "versions": "1+" },
{ "name": "Partitions", "type": "[]int32", "versions": "1+"}
]
}
diff --git a/clients/src/main/resources/common/message/DescribeLogDirsRequest.json b/clients/src/main/resources/common/message/DescribeLogDirsRequest.json
index 577f2eb..c498e0f 100644
--- a/clients/src/main/resources/common/message/DescribeLogDirsRequest.json
+++ b/clients/src/main/resources/common/message/DescribeLogDirsRequest.json
@@ -26,7 +26,7 @@
"about": "Each topic that we want to describe log directories for, or null for all topics.", "fields": [
{ "name": "Topic", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true,
"about": "The topic name" },
- { "name": "PartitionIndex", "type": "[]int32", "versions": "0+",
+ { "name": "Partitions", "type": "[]int32", "versions": "0+",
"about": "The partition indxes." }
]}
]
diff --git a/clients/src/main/resources/common/message/ElectLeadersRequest.json b/clients/src/main/resources/common/message/ElectLeadersRequest.json
index 5b86c96..a2ba2bd 100644
--- a/clients/src/main/resources/common/message/ElectLeadersRequest.json
+++ b/clients/src/main/resources/common/message/ElectLeadersRequest.json
@@ -28,9 +28,9 @@
{ "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+", "nullableVersions": "0+",
"about": "The topic partitions to elect leaders.",
"fields": [
- { "name": "Topic", "type": "string", "versions": "0+", "entityType": "topicName",
+ { "name": "Topic", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true,
"about": "The name of a topic." },
- { "name": "PartitionId", "type": "[]int32", "versions": "0+",
+ { "name": "Partitions", "type": "[]int32", "versions": "0+",
"about": "The partitions of this topic whose leader should be elected." }
]
},
diff --git a/core/src/main/scala/kafka/api/package.scala b/core/src/main/scala/kafka/api/package.scala
index 11a956d..e0678f8 100644
--- a/core/src/main/scala/kafka/api/package.scala
+++ b/core/src/main/scala/kafka/api/package.scala
@@ -28,7 +28,7 @@ package object api {
Set.empty
} else {
self.data.topicPartitions.asScala.iterator.flatMap { topicPartition =>
- topicPartition.partitionId.asScala.map { partitionId =>
+ topicPartition.partitions.asScala.map { partitionId =>
new TopicPartition(topicPartition.topic, partitionId)
}
}.toSet
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 5d71c8e..ffb1b8e 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -2772,7 +2772,7 @@ class KafkaApis(val requestChannel: RequestChannel,
replicaManager.logManager.allLogs.map(_.topicPartition).toSet
else
describeLogDirsDirRequest.data.topics.asScala.flatMap(
- logDirTopic => logDirTopic.partitionIndex.asScala.map(partitionIndex =>
+ logDirTopic => logDirTopic.partitions.asScala.map(partitionIndex =>
new TopicPartition(logDirTopic.topic, partitionIndex))).toSet
replicaManager.describeLogDirs(partitions)
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index fec75eb..66f25ee 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -609,7 +609,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
}
private def describeLogDirsRequest = new DescribeLogDirsRequest.Builder(new DescribeLogDirsRequestData().setTopics(new DescribeLogDirsRequestData.DescribableLogDirTopicCollection(Collections.singleton(
- new DescribeLogDirsRequestData.DescribableLogDirTopic().setTopic(tp.topic).setPartitionIndex(Collections.singletonList(tp.partition))).iterator()))).build()
+ new DescribeLogDirsRequestData.DescribableLogDirTopic().setTopic(tp.topic).setPartitions(Collections.singletonList(tp.partition))).iterator()))).build()
private def addPartitionsToTxnRequest = new AddPartitionsToTxnRequest.Builder(transactionalId, 1, 1, Collections.singletonList(tp)).build()
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 5b1363f..1924034 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -507,7 +507,7 @@ class RequestQuotaTest extends BaseRequestTest {
val data = new DescribeLogDirsRequestData()
data.topics.add(new DescribeLogDirsRequestData.DescribableLogDirTopic()
.setTopic(tp.topic)
- .setPartitionIndex(Collections.singletonList(tp.partition)))
+ .setPartitions(Collections.singletonList(tp.partition)))
new DescribeLogDirsRequest.Builder(data)
case ApiKeys.CREATE_PARTITIONS =>