You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/06/17 18:25:40 UTC

[kafka] branch trunk updated: KAFKA-8539; Add group.instance.id to Subscription (#6936)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 47f908f  KAFKA-8539; Add group.instance.id to Subscription (#6936)
47f908f is described below

commit 47f908fa73fb7bbaec553635e75bffddd7a473f9
Author: Boyang Chen <bo...@confluent.io>
AuthorDate: Mon Jun 17 11:25:22 2019 -0700

    KAFKA-8539; Add group.instance.id to Subscription (#6936)
    
    This PR is part of KIP-345's effort to utilize this new field for more stable topic partition assignment. We add the group instance id to the `Subscription` object to allow partition assignors to make stickier assignments. More details [here](https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances#KIP-345:Introducestaticmembershipprotocoltoreduceconsumerrebalances-ClientBehaviorChanges).
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../consumer/internals/ConsumerCoordinator.java    |  7 ++--
 .../consumer/internals/ConsumerProtocol.java       | 30 ++++++++++++------
 .../consumer/internals/PartitionAssignor.java      | 24 +++++++++++---
 .../kafka/clients/consumer/KafkaConsumerTest.java  |  2 +-
 .../internals/ConsumerCoordinatorTest.java         |  2 +-
 .../consumer/internals/ConsumerProtocolTest.java   | 37 +++++++++++++++++-----
 6 files changed, 75 insertions(+), 27 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index a590a1e..b2b6f96 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -388,9 +388,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
         Set<String> allSubscribedTopics = new HashSet<>();
         Map<String, Subscription> subscriptions = new HashMap<>();
-        for (JoinGroupResponseData.JoinGroupResponseMember memberSubScription : allSubscriptions) {
-            Subscription subscription = ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(memberSubScription.metadata()));
-            subscriptions.put(memberSubScription.memberId(), subscription);
+        for (JoinGroupResponseData.JoinGroupResponseMember memberSubscription : allSubscriptions) {
+            Subscription subscription = ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(memberSubscription.metadata()),
+                                                                                 Optional.ofNullable(memberSubscription.groupInstanceId()));
+            subscriptions.put(memberSubscription.memberId(), subscription);
             allSubscribedTopics.addAll(subscription.topics());
         }
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
index b4ad451..d3737f7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
@@ -27,8 +27,10 @@ import org.apache.kafka.common.utils.CollectionUtils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
 
@@ -177,17 +179,22 @@ public class ConsumerProtocol {
         }
     }
 
-    public static PartitionAssignor.Subscription deserializeSubscriptionV0(ByteBuffer buffer) {
+    public static PartitionAssignor.Subscription deserializeSubscriptionV0(ByteBuffer buffer,
+                                                                           Optional<String> groupInstanceId) {
         Struct struct = SUBSCRIPTION_V0.read(buffer);
         ByteBuffer userData = struct.getBytes(USER_DATA_KEY_NAME);
         List<String> topics = new ArrayList<>();
         for (Object topicObj : struct.getArray(TOPICS_KEY_NAME))
             topics.add((String) topicObj);
-
-        return new PartitionAssignor.Subscription(CONSUMER_PROTOCOL_V0, topics, userData);
+        return new PartitionAssignor.Subscription(CONSUMER_PROTOCOL_V0,
+                                                  topics,
+                                                  userData,
+                                                  Collections.emptyList(),
+                                                  groupInstanceId);
     }
 
-    public static PartitionAssignor.Subscription deserializeSubscriptionV1(ByteBuffer buffer) {
+    public static PartitionAssignor.Subscription deserializeSubscriptionV1(ByteBuffer buffer,
+                                                                           Optional<String> groupInstanceId) {
         Struct struct = SUBSCRIPTION_V1.read(buffer);
         ByteBuffer userData = struct.getBytes(USER_DATA_KEY_NAME);
         List<String> topics = new ArrayList<>();
@@ -203,10 +210,15 @@ public class ConsumerProtocol {
             }
         }
 
-        return new PartitionAssignor.Subscription(CONSUMER_PROTOCOL_V1, topics, userData, ownedPartitions);
+        return new PartitionAssignor.Subscription(CONSUMER_PROTOCOL_V1,
+                                                  topics,
+                                                  userData,
+                                                  ownedPartitions,
+                                                  groupInstanceId);
     }
 
-    public static PartitionAssignor.Subscription deserializeSubscription(ByteBuffer buffer) {
+    public static PartitionAssignor.Subscription deserializeSubscription(ByteBuffer buffer,
+                                                                         Optional<String> groupInstanceId) {
         Struct header = CONSUMER_PROTOCOL_HEADER_SCHEMA.read(buffer);
         Short version = header.getShort(VERSION_KEY_NAME);
 
@@ -215,14 +227,14 @@ public class ConsumerProtocol {
 
         switch (version) {
             case CONSUMER_PROTOCOL_V0:
-                return deserializeSubscriptionV0(buffer);
+                return deserializeSubscriptionV0(buffer, groupInstanceId);
 
             case CONSUMER_PROTOCOL_V1:
-                return deserializeSubscriptionV1(buffer);
+                return deserializeSubscriptionV1(buffer, groupInstanceId);
 
             // assume all higher versions can be parsed as V1
             default:
-                return deserializeSubscriptionV1(buffer);
+                return deserializeSubscriptionV1(buffer, groupInstanceId);
         }
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java
index 5c76fd6..921a55b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 
 import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.CONSUMER_PROTOCOL_V0;
@@ -130,12 +131,18 @@ public interface PartitionAssignor {
         private final List<String> topics;
         private final ByteBuffer userData;
         private final List<TopicPartition> ownedPartitions;
+        private final Optional<String> groupInstanceId;
 
-        Subscription(Short version, List<String> topics, ByteBuffer userData, List<TopicPartition> ownedPartitions) {
+        Subscription(Short version,
+                     List<String> topics,
+                     ByteBuffer userData,
+                     List<TopicPartition> ownedPartitions,
+                     Optional<String> groupInstanceId) {
             this.version = version;
             this.topics = topics;
             this.userData = userData;
             this.ownedPartitions = ownedPartitions;
+            this.groupInstanceId = groupInstanceId;
 
             if (version < CONSUMER_PROTOCOL_V0)
                 throw new SchemaException("Unsupported subscription version: " + version);
@@ -145,11 +152,14 @@ public interface PartitionAssignor {
         }
 
         Subscription(Short version, List<String> topics, ByteBuffer userData) {
-            this(version, topics, userData, Collections.emptyList());
+            this(version, topics, userData, Collections.emptyList(), Optional.empty());
         }
 
-        public Subscription(List<String> topics, ByteBuffer userData, List<TopicPartition> ownedPartitions) {
-            this(CONSUMER_PROTOCOL_V1, topics, userData, ownedPartitions);
+        public Subscription(List<String> topics,
+                            ByteBuffer userData,
+                            List<TopicPartition> ownedPartitions,
+                            Optional<String> groupInstanceId) {
+            this(CONSUMER_PROTOCOL_V1, topics, userData, ownedPartitions, groupInstanceId);
         }
 
         public Subscription(List<String> topics, ByteBuffer userData) {
@@ -176,13 +186,17 @@ public interface PartitionAssignor {
             return userData;
         }
 
+        public Optional<String> groupInstanceId() {
+            return groupInstanceId;
+        }
+
         @Override
         public String toString() {
             return "Subscription(" +
                     "version=" + version +
                     ", topics=" + topics +
                     ", ownedPartitions=" + ownedPartitions +
-                    ')';
+                    ", group.instance.id=" + groupInstanceId + ")";
         }
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index c1adf19..cff71c3 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1692,7 +1692,7 @@ public class KafkaConsumerTest {
                 assertTrue(protocolIterator.hasNext());
 
                 ByteBuffer protocolMetadata = ByteBuffer.wrap(protocolIterator.next().metadata());
-                PartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(protocolMetadata);
+                PartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(protocolMetadata, Optional.empty());
                 return subscribedTopics.equals(new HashSet<>(subscription.topics()));
             }
         }, joinGroupFollowerResponse(assignor, 1, "memberId", "leaderId", Errors.NONE), coordinator);
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index c54540e..8aeec3c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -600,7 +600,7 @@ public class ConsumerCoordinatorTest {
                 JoinGroupRequestData.JoinGroupRequestProtocol protocolMetadata = protocolIterator.next();
 
                 ByteBuffer metadata = ByteBuffer.wrap(protocolMetadata.metadata());
-                PartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(metadata);
+                PartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(metadata, Optional.empty());
                 metadata.rewind();
                 return subscription.topics().containsAll(updatedSubscriptionSet);
             }
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
index 8a8ba0a..07eafa2 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
@@ -33,6 +33,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 
 import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.CONSUMER_PROTOCOL_HEADER_SCHEMA;
@@ -44,6 +45,7 @@ import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.USER_
 import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.VERSION_KEY_NAME;
 import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
@@ -51,46 +53,64 @@ public class ConsumerProtocolTest {
 
     private final TopicPartition tp1 = new TopicPartition("foo", 1);
     private final TopicPartition tp2 = new TopicPartition("bar", 2);
+    private final Optional<String> groupInstanceId = Optional.of("instance.id");
 
     @Test
     public void serializeDeserializeMetadata() {
         Subscription subscription = new Subscription(Arrays.asList("foo", "bar"));
         ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription);
-        Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer);
+        Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer, Optional.empty());
         assertEquals(subscription.topics(), parsedSubscription.topics());
         assertEquals(0, parsedSubscription.userData().limit());
+        assertFalse(parsedSubscription.groupInstanceId().isPresent());
+    }
+
+    @Test
+    public void serializeDeserializeMetadataAndGroupInstanceId() {
+        Subscription subscription = new Subscription(Arrays.asList("foo", "bar"));
+        ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription);
+
+        Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer, groupInstanceId);
+        assertEquals(subscription.topics(), parsedSubscription.topics());
+        assertEquals(groupInstanceId, parsedSubscription.groupInstanceId());
     }
 
     @Test
     public void serializeDeserializeNullSubscriptionUserData() {
         Subscription subscription = new Subscription(Arrays.asList("foo", "bar"), null);
         ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription);
-        Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer);
+        Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer, Optional.empty());
         assertEquals(subscription.topics(), parsedSubscription.topics());
         assertNull(parsedSubscription.userData());
+        assertFalse(parsedSubscription.groupInstanceId().isPresent());
     }
 
     @Test
     public void deserializeOldSubscriptionVersion() {
         Subscription subscription = new Subscription((short) 0, Arrays.asList("foo", "bar"), null);
         ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription);
-        Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer);
+        Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer, groupInstanceId);
         assertEquals(parsedSubscription.topics(), parsedSubscription.topics());
         assertNull(parsedSubscription.userData());
         assertTrue(parsedSubscription.ownedPartitions().isEmpty());
+        assertEquals(groupInstanceId, parsedSubscription.groupInstanceId());
     }
 
     @Test
     public void deserializeNewSubscriptionWithOldVersion() {
-        Subscription subscription = new Subscription((short) 1, Arrays.asList("foo", "bar"), null, Collections.singletonList(tp2));
+        Subscription subscription = new Subscription((short) 1,
+                                                     Arrays.asList("foo", "bar"),
+                                                     null, Collections.singletonList(tp2),
+                                                     Optional.empty());
         ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription);
         // ignore the version assuming it is the old byte code, as it will blindly deserialize as V0
         Struct header = CONSUMER_PROTOCOL_HEADER_SCHEMA.read(buffer);
         header.getShort(VERSION_KEY_NAME);
-        Subscription parsedSubscription = ConsumerProtocol.deserializeSubscriptionV0(buffer);
+        Subscription parsedSubscription = ConsumerProtocol.deserializeSubscriptionV0(buffer, Optional.empty());
         assertEquals(subscription.topics(), parsedSubscription.topics());
         assertNull(parsedSubscription.userData());
         assertTrue(parsedSubscription.ownedPartitions().isEmpty());
+        assertFalse(parsedSubscription.groupInstanceId().isPresent());
     }
 
     @Test
@@ -121,9 +141,10 @@ public class ConsumerProtocolTest {
 
         buffer.flip();
 
-        Subscription subscription = ConsumerProtocol.deserializeSubscription(buffer);
-        assertEquals(Collections.singletonList("topic"), subscription.topics());
-        assertEquals(Collections.singletonList(tp2), subscription.ownedPartitions());
+        Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer, groupInstanceId);
+        assertEquals(Collections.singletonList("topic"), parsedSubscription.topics());
+        assertEquals(Collections.singletonList(tp2), parsedSubscription.ownedPartitions());
+        assertEquals(groupInstanceId, parsedSubscription.groupInstanceId());
     }
 
     @Test