You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/06/18 00:11:35 UTC

[kafka] branch trunk updated: MINOR: rename subscription construction function (#6954)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c7db82b  MINOR: rename subscription construction function (#6954)
c7db82b is described below

commit c7db82b59a145735fce227436c6191d1825fe384
Author: Boyang Chen <bo...@confluent.io>
AuthorDate: Mon Jun 17 17:11:11 2019 -0700

    MINOR: rename subscription construction function (#6954)
    
    Per discussion on #6936, some nit fixes to the Subscription initialization path.
    
    Reviewers: Guozhang Wang <wa...@gmail.com>
---
 .../clients/consumer/internals/ConsumerCoordinator.java   |  4 ++--
 .../clients/consumer/internals/ConsumerProtocol.java      | 15 ++++++---------
 .../apache/kafka/clients/consumer/KafkaConsumerTest.java  |  2 +-
 .../consumer/internals/ConsumerCoordinatorTest.java       |  2 +-
 .../clients/consumer/internals/ConsumerProtocolTest.java  | 12 ++++++------
 5 files changed, 16 insertions(+), 19 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index de65a32..477f24b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -390,8 +390,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         Set<String> allSubscribedTopics = new HashSet<>();
         Map<String, Subscription> subscriptions = new HashMap<>();
         for (JoinGroupResponseData.JoinGroupResponseMember memberSubscription : allSubscriptions) {
-            Subscription subscription = ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(memberSubscription.metadata()),
-                                                                                 Optional.ofNullable(memberSubscription.groupInstanceId()));
+            Subscription subscription = ConsumerProtocol.buildSubscription(ByteBuffer.wrap(memberSubscription.metadata()),
+                                                                           Optional.ofNullable(memberSubscription.groupInstanceId()));
             subscriptions.put(memberSubscription.memberId(), subscription);
             allSubscribedTopics.addAll(subscription.topics());
         }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
index d3737f7..9e7961e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
@@ -179,8 +179,7 @@ public class ConsumerProtocol {
         }
     }
 
-    public static PartitionAssignor.Subscription deserializeSubscriptionV0(ByteBuffer buffer,
-                                                                           Optional<String> groupInstanceId) {
+    public static PartitionAssignor.Subscription buildSubscriptionV0(ByteBuffer buffer, Optional<String> groupInstanceId) {
         Struct struct = SUBSCRIPTION_V0.read(buffer);
         ByteBuffer userData = struct.getBytes(USER_DATA_KEY_NAME);
         List<String> topics = new ArrayList<>();
@@ -193,8 +192,7 @@ public class ConsumerProtocol {
                                                   groupInstanceId);
     }
 
-    public static PartitionAssignor.Subscription deserializeSubscriptionV1(ByteBuffer buffer,
-                                                                           Optional<String> groupInstanceId) {
+    public static PartitionAssignor.Subscription buildSubscriptionV1(ByteBuffer buffer, Optional<String> groupInstanceId) {
         Struct struct = SUBSCRIPTION_V1.read(buffer);
         ByteBuffer userData = struct.getBytes(USER_DATA_KEY_NAME);
         List<String> topics = new ArrayList<>();
@@ -217,8 +215,7 @@ public class ConsumerProtocol {
                                                   groupInstanceId);
     }
 
-    public static PartitionAssignor.Subscription deserializeSubscription(ByteBuffer buffer,
-                                                                         Optional<String> groupInstanceId) {
+    public static PartitionAssignor.Subscription buildSubscription(ByteBuffer buffer, Optional<String> groupInstanceId) {
         Struct header = CONSUMER_PROTOCOL_HEADER_SCHEMA.read(buffer);
         Short version = header.getShort(VERSION_KEY_NAME);
 
@@ -227,14 +224,14 @@ public class ConsumerProtocol {
 
         switch (version) {
             case CONSUMER_PROTOCOL_V0:
-                return deserializeSubscriptionV0(buffer, groupInstanceId);
+                return buildSubscriptionV0(buffer, groupInstanceId);
 
             case CONSUMER_PROTOCOL_V1:
-                return deserializeSubscriptionV1(buffer, groupInstanceId);
+                return buildSubscriptionV1(buffer, groupInstanceId);
 
             // assume all higher versions can be parsed as V1
             default:
-                return deserializeSubscriptionV1(buffer, groupInstanceId);
+                return buildSubscriptionV1(buffer, groupInstanceId);
         }
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index cff71c3..0f9b956 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1692,7 +1692,7 @@ public class KafkaConsumerTest {
                 assertTrue(protocolIterator.hasNext());
 
                 ByteBuffer protocolMetadata = ByteBuffer.wrap(protocolIterator.next().metadata());
-                PartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(protocolMetadata, Optional.empty());
+                PartitionAssignor.Subscription subscription = ConsumerProtocol.buildSubscription(protocolMetadata, Optional.empty());
                 return subscribedTopics.equals(new HashSet<>(subscription.topics()));
             }
         }, joinGroupFollowerResponse(assignor, 1, "memberId", "leaderId", Errors.NONE), coordinator);
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 8aeec3c..f4b87d2 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -600,7 +600,7 @@ public class ConsumerCoordinatorTest {
                 JoinGroupRequestData.JoinGroupRequestProtocol protocolMetadata = protocolIterator.next();
 
                 ByteBuffer metadata = ByteBuffer.wrap(protocolMetadata.metadata());
-                PartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(metadata, Optional.empty());
+                PartitionAssignor.Subscription subscription = ConsumerProtocol.buildSubscription(metadata, Optional.empty());
                 metadata.rewind();
                 return subscription.topics().containsAll(updatedSubscriptionSet);
             }
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
index 07eafa2..d89c5a7 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
@@ -59,7 +59,7 @@ public class ConsumerProtocolTest {
     public void serializeDeserializeMetadata() {
         Subscription subscription = new Subscription(Arrays.asList("foo", "bar"));
         ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription);
-        Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer, Optional.empty());
+        Subscription parsedSubscription = ConsumerProtocol.buildSubscription(buffer, Optional.empty());
         assertEquals(subscription.topics(), parsedSubscription.topics());
         assertEquals(0, parsedSubscription.userData().limit());
         assertFalse(parsedSubscription.groupInstanceId().isPresent());
@@ -70,7 +70,7 @@ public class ConsumerProtocolTest {
         Subscription subscription = new Subscription(Arrays.asList("foo", "bar"));
         ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription);
 
-        Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer, groupInstanceId);
+        Subscription parsedSubscription = ConsumerProtocol.buildSubscription(buffer, groupInstanceId);
         assertEquals(subscription.topics(), parsedSubscription.topics());
         assertEquals(groupInstanceId, parsedSubscription.groupInstanceId());
     }
@@ -79,7 +79,7 @@ public class ConsumerProtocolTest {
     public void serializeDeserializeNullSubscriptionUserData() {
         Subscription subscription = new Subscription(Arrays.asList("foo", "bar"), null);
         ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription);
-        Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer, Optional.empty());
+        Subscription parsedSubscription = ConsumerProtocol.buildSubscription(buffer, Optional.empty());
         assertEquals(subscription.topics(), parsedSubscription.topics());
         assertNull(parsedSubscription.userData());
         assertFalse(parsedSubscription.groupInstanceId().isPresent());
@@ -89,7 +89,7 @@ public class ConsumerProtocolTest {
     public void deserializeOldSubscriptionVersion() {
         Subscription subscription = new Subscription((short) 0, Arrays.asList("foo", "bar"), null);
         ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription);
-        Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer, groupInstanceId);
+        Subscription parsedSubscription = ConsumerProtocol.buildSubscription(buffer, groupInstanceId);
         assertEquals(parsedSubscription.topics(), parsedSubscription.topics());
         assertNull(parsedSubscription.userData());
         assertTrue(parsedSubscription.ownedPartitions().isEmpty());
@@ -106,7 +106,7 @@ public class ConsumerProtocolTest {
         // ignore the version assuming it is the old byte code, as it will blindly deserialize as V0
         Struct header = CONSUMER_PROTOCOL_HEADER_SCHEMA.read(buffer);
         header.getShort(VERSION_KEY_NAME);
-        Subscription parsedSubscription = ConsumerProtocol.deserializeSubscriptionV0(buffer, Optional.empty());
+        Subscription parsedSubscription = ConsumerProtocol.buildSubscriptionV0(buffer, Optional.empty());
         assertEquals(subscription.topics(), parsedSubscription.topics());
         assertNull(parsedSubscription.userData());
         assertTrue(parsedSubscription.ownedPartitions().isEmpty());
@@ -141,7 +141,7 @@ public class ConsumerProtocolTest {
 
         buffer.flip();
 
-        Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer, groupInstanceId);
+        Subscription parsedSubscription = ConsumerProtocol.buildSubscription(buffer, groupInstanceId);
         assertEquals(Collections.singletonList("topic"), parsedSubscription.topics());
         assertEquals(Collections.singletonList(tp2), parsedSubscription.ownedPartitions());
         assertEquals(groupInstanceId, parsedSubscription.groupInstanceId());