You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/06/18 00:11:35 UTC
[kafka] branch trunk updated: MINOR: rename subscription
construction function (#6954)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c7db82b MINOR: rename subscription construction function (#6954)
c7db82b is described below
commit c7db82b59a145735fce227436c6191d1825fe384
Author: Boyang Chen <bo...@confluent.io>
AuthorDate: Mon Jun 17 17:11:11 2019 -0700
MINOR: rename subscription construction function (#6954)
Per discussion on #6936, some nit fixes to the Subscription initialization path.
Reviewers: Guozhang Wang <wa...@gmail.com>
---
.../clients/consumer/internals/ConsumerCoordinator.java | 4 ++--
.../clients/consumer/internals/ConsumerProtocol.java | 15 ++++++---------
.../apache/kafka/clients/consumer/KafkaConsumerTest.java | 2 +-
.../consumer/internals/ConsumerCoordinatorTest.java | 2 +-
.../clients/consumer/internals/ConsumerProtocolTest.java | 12 ++++++------
5 files changed, 16 insertions(+), 19 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index de65a32..477f24b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -390,8 +390,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
Set<String> allSubscribedTopics = new HashSet<>();
Map<String, Subscription> subscriptions = new HashMap<>();
for (JoinGroupResponseData.JoinGroupResponseMember memberSubscription : allSubscriptions) {
- Subscription subscription = ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(memberSubscription.metadata()),
- Optional.ofNullable(memberSubscription.groupInstanceId()));
+ Subscription subscription = ConsumerProtocol.buildSubscription(ByteBuffer.wrap(memberSubscription.metadata()),
+ Optional.ofNullable(memberSubscription.groupInstanceId()));
subscriptions.put(memberSubscription.memberId(), subscription);
allSubscribedTopics.addAll(subscription.topics());
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
index d3737f7..9e7961e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
@@ -179,8 +179,7 @@ public class ConsumerProtocol {
}
}
- public static PartitionAssignor.Subscription deserializeSubscriptionV0(ByteBuffer buffer,
- Optional<String> groupInstanceId) {
+ public static PartitionAssignor.Subscription buildSubscriptionV0(ByteBuffer buffer, Optional<String> groupInstanceId) {
Struct struct = SUBSCRIPTION_V0.read(buffer);
ByteBuffer userData = struct.getBytes(USER_DATA_KEY_NAME);
List<String> topics = new ArrayList<>();
@@ -193,8 +192,7 @@ public class ConsumerProtocol {
groupInstanceId);
}
- public static PartitionAssignor.Subscription deserializeSubscriptionV1(ByteBuffer buffer,
- Optional<String> groupInstanceId) {
+ public static PartitionAssignor.Subscription buildSubscriptionV1(ByteBuffer buffer, Optional<String> groupInstanceId) {
Struct struct = SUBSCRIPTION_V1.read(buffer);
ByteBuffer userData = struct.getBytes(USER_DATA_KEY_NAME);
List<String> topics = new ArrayList<>();
@@ -217,8 +215,7 @@ public class ConsumerProtocol {
groupInstanceId);
}
- public static PartitionAssignor.Subscription deserializeSubscription(ByteBuffer buffer,
- Optional<String> groupInstanceId) {
+ public static PartitionAssignor.Subscription buildSubscription(ByteBuffer buffer, Optional<String> groupInstanceId) {
Struct header = CONSUMER_PROTOCOL_HEADER_SCHEMA.read(buffer);
Short version = header.getShort(VERSION_KEY_NAME);
@@ -227,14 +224,14 @@ public class ConsumerProtocol {
switch (version) {
case CONSUMER_PROTOCOL_V0:
- return deserializeSubscriptionV0(buffer, groupInstanceId);
+ return buildSubscriptionV0(buffer, groupInstanceId);
case CONSUMER_PROTOCOL_V1:
- return deserializeSubscriptionV1(buffer, groupInstanceId);
+ return buildSubscriptionV1(buffer, groupInstanceId);
// assume all higher versions can be parsed as V1
default:
- return deserializeSubscriptionV1(buffer, groupInstanceId);
+ return buildSubscriptionV1(buffer, groupInstanceId);
}
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index cff71c3..0f9b956 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1692,7 +1692,7 @@ public class KafkaConsumerTest {
assertTrue(protocolIterator.hasNext());
ByteBuffer protocolMetadata = ByteBuffer.wrap(protocolIterator.next().metadata());
- PartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(protocolMetadata, Optional.empty());
+ PartitionAssignor.Subscription subscription = ConsumerProtocol.buildSubscription(protocolMetadata, Optional.empty());
return subscribedTopics.equals(new HashSet<>(subscription.topics()));
}
}, joinGroupFollowerResponse(assignor, 1, "memberId", "leaderId", Errors.NONE), coordinator);
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 8aeec3c..f4b87d2 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -600,7 +600,7 @@ public class ConsumerCoordinatorTest {
JoinGroupRequestData.JoinGroupRequestProtocol protocolMetadata = protocolIterator.next();
ByteBuffer metadata = ByteBuffer.wrap(protocolMetadata.metadata());
- PartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(metadata, Optional.empty());
+ PartitionAssignor.Subscription subscription = ConsumerProtocol.buildSubscription(metadata, Optional.empty());
metadata.rewind();
return subscription.topics().containsAll(updatedSubscriptionSet);
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
index 07eafa2..d89c5a7 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
@@ -59,7 +59,7 @@ public class ConsumerProtocolTest {
public void serializeDeserializeMetadata() {
Subscription subscription = new Subscription(Arrays.asList("foo", "bar"));
ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription);
- Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer, Optional.empty());
+ Subscription parsedSubscription = ConsumerProtocol.buildSubscription(buffer, Optional.empty());
assertEquals(subscription.topics(), parsedSubscription.topics());
assertEquals(0, parsedSubscription.userData().limit());
assertFalse(parsedSubscription.groupInstanceId().isPresent());
@@ -70,7 +70,7 @@ public class ConsumerProtocolTest {
Subscription subscription = new Subscription(Arrays.asList("foo", "bar"));
ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription);
- Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer, groupInstanceId);
+ Subscription parsedSubscription = ConsumerProtocol.buildSubscription(buffer, groupInstanceId);
assertEquals(subscription.topics(), parsedSubscription.topics());
assertEquals(groupInstanceId, parsedSubscription.groupInstanceId());
}
@@ -79,7 +79,7 @@ public class ConsumerProtocolTest {
public void serializeDeserializeNullSubscriptionUserData() {
Subscription subscription = new Subscription(Arrays.asList("foo", "bar"), null);
ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription);
- Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer, Optional.empty());
+ Subscription parsedSubscription = ConsumerProtocol.buildSubscription(buffer, Optional.empty());
assertEquals(subscription.topics(), parsedSubscription.topics());
assertNull(parsedSubscription.userData());
assertFalse(parsedSubscription.groupInstanceId().isPresent());
@@ -89,7 +89,7 @@ public class ConsumerProtocolTest {
public void deserializeOldSubscriptionVersion() {
Subscription subscription = new Subscription((short) 0, Arrays.asList("foo", "bar"), null);
ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription);
- Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer, groupInstanceId);
+ Subscription parsedSubscription = ConsumerProtocol.buildSubscription(buffer, groupInstanceId);
assertEquals(parsedSubscription.topics(), parsedSubscription.topics());
assertNull(parsedSubscription.userData());
assertTrue(parsedSubscription.ownedPartitions().isEmpty());
@@ -106,7 +106,7 @@ public class ConsumerProtocolTest {
// ignore the version assuming it is the old byte code, as it will blindly deserialize as V0
Struct header = CONSUMER_PROTOCOL_HEADER_SCHEMA.read(buffer);
header.getShort(VERSION_KEY_NAME);
- Subscription parsedSubscription = ConsumerProtocol.deserializeSubscriptionV0(buffer, Optional.empty());
+ Subscription parsedSubscription = ConsumerProtocol.buildSubscriptionV0(buffer, Optional.empty());
assertEquals(subscription.topics(), parsedSubscription.topics());
assertNull(parsedSubscription.userData());
assertTrue(parsedSubscription.ownedPartitions().isEmpty());
@@ -141,7 +141,7 @@ public class ConsumerProtocolTest {
buffer.flip();
- Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer, groupInstanceId);
+ Subscription parsedSubscription = ConsumerProtocol.buildSubscription(buffer, groupInstanceId);
assertEquals(Collections.singletonList("topic"), parsedSubscription.topics());
assertEquals(Collections.singletonList(tp2), parsedSubscription.ownedPartitions());
assertEquals(groupInstanceId, parsedSubscription.groupInstanceId());