You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/06/17 18:25:40 UTC
[kafka] branch trunk updated: KAFKA-8539;
Add group.instance.id to Subscription (#6936)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 47f908f KAFKA-8539; Add group.instance.id to Subscription (#6936)
47f908f is described below
commit 47f908fa73fb7bbaec553635e75bffddd7a473f9
Author: Boyang Chen <bo...@confluent.io>
AuthorDate: Mon Jun 17 11:25:22 2019 -0700
KAFKA-8539; Add group.instance.id to Subscription (#6936)
This PR is part of KIP-345's effort to utilize this new field for more stable topic partition assignment. We add the group instance id to the `Subscription` object to allow partition assignors to make stickier assignments. More details [here](https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances#KIP-345:Introducestaticmembershipprotocoltoreduceconsumerrebalances-ClientBehaviorChanges).
Reviewers: Jason Gustafson <ja...@confluent.io>
---
.../consumer/internals/ConsumerCoordinator.java | 7 ++--
.../consumer/internals/ConsumerProtocol.java | 30 ++++++++++++------
.../consumer/internals/PartitionAssignor.java | 24 +++++++++++---
.../kafka/clients/consumer/KafkaConsumerTest.java | 2 +-
.../internals/ConsumerCoordinatorTest.java | 2 +-
.../consumer/internals/ConsumerProtocolTest.java | 37 +++++++++++++++++-----
6 files changed, 75 insertions(+), 27 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index a590a1e..b2b6f96 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -388,9 +388,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
Set<String> allSubscribedTopics = new HashSet<>();
Map<String, Subscription> subscriptions = new HashMap<>();
- for (JoinGroupResponseData.JoinGroupResponseMember memberSubScription : allSubscriptions) {
- Subscription subscription = ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(memberSubScription.metadata()));
- subscriptions.put(memberSubScription.memberId(), subscription);
+ for (JoinGroupResponseData.JoinGroupResponseMember memberSubscription : allSubscriptions) {
+ Subscription subscription = ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(memberSubscription.metadata()),
+ Optional.ofNullable(memberSubscription.groupInstanceId()));
+ subscriptions.put(memberSubscription.memberId(), subscription);
allSubscribedTopics.addAll(subscription.topics());
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
index b4ad451..d3737f7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
@@ -27,8 +27,10 @@ import org.apache.kafka.common.utils.CollectionUtils;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
@@ -177,17 +179,22 @@ public class ConsumerProtocol {
}
}
- public static PartitionAssignor.Subscription deserializeSubscriptionV0(ByteBuffer buffer) {
+ public static PartitionAssignor.Subscription deserializeSubscriptionV0(ByteBuffer buffer,
+ Optional<String> groupInstanceId) {
Struct struct = SUBSCRIPTION_V0.read(buffer);
ByteBuffer userData = struct.getBytes(USER_DATA_KEY_NAME);
List<String> topics = new ArrayList<>();
for (Object topicObj : struct.getArray(TOPICS_KEY_NAME))
topics.add((String) topicObj);
-
- return new PartitionAssignor.Subscription(CONSUMER_PROTOCOL_V0, topics, userData);
+ return new PartitionAssignor.Subscription(CONSUMER_PROTOCOL_V0,
+ topics,
+ userData,
+ Collections.emptyList(),
+ groupInstanceId);
}
- public static PartitionAssignor.Subscription deserializeSubscriptionV1(ByteBuffer buffer) {
+ public static PartitionAssignor.Subscription deserializeSubscriptionV1(ByteBuffer buffer,
+ Optional<String> groupInstanceId) {
Struct struct = SUBSCRIPTION_V1.read(buffer);
ByteBuffer userData = struct.getBytes(USER_DATA_KEY_NAME);
List<String> topics = new ArrayList<>();
@@ -203,10 +210,15 @@ public class ConsumerProtocol {
}
}
- return new PartitionAssignor.Subscription(CONSUMER_PROTOCOL_V1, topics, userData, ownedPartitions);
+ return new PartitionAssignor.Subscription(CONSUMER_PROTOCOL_V1,
+ topics,
+ userData,
+ ownedPartitions,
+ groupInstanceId);
}
- public static PartitionAssignor.Subscription deserializeSubscription(ByteBuffer buffer) {
+ public static PartitionAssignor.Subscription deserializeSubscription(ByteBuffer buffer,
+ Optional<String> groupInstanceId) {
Struct header = CONSUMER_PROTOCOL_HEADER_SCHEMA.read(buffer);
Short version = header.getShort(VERSION_KEY_NAME);
@@ -215,14 +227,14 @@ public class ConsumerProtocol {
switch (version) {
case CONSUMER_PROTOCOL_V0:
- return deserializeSubscriptionV0(buffer);
+ return deserializeSubscriptionV0(buffer, groupInstanceId);
case CONSUMER_PROTOCOL_V1:
- return deserializeSubscriptionV1(buffer);
+ return deserializeSubscriptionV1(buffer, groupInstanceId);
// assume all higher versions can be parsed as V1
default:
- return deserializeSubscriptionV1(buffer);
+ return deserializeSubscriptionV1(buffer, groupInstanceId);
}
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java
index 5c76fd6..921a55b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.CONSUMER_PROTOCOL_V0;
@@ -130,12 +131,18 @@ public interface PartitionAssignor {
private final List<String> topics;
private final ByteBuffer userData;
private final List<TopicPartition> ownedPartitions;
+ private final Optional<String> groupInstanceId;
- Subscription(Short version, List<String> topics, ByteBuffer userData, List<TopicPartition> ownedPartitions) {
+ Subscription(Short version,
+ List<String> topics,
+ ByteBuffer userData,
+ List<TopicPartition> ownedPartitions,
+ Optional<String> groupInstanceId) {
this.version = version;
this.topics = topics;
this.userData = userData;
this.ownedPartitions = ownedPartitions;
+ this.groupInstanceId = groupInstanceId;
if (version < CONSUMER_PROTOCOL_V0)
throw new SchemaException("Unsupported subscription version: " + version);
@@ -145,11 +152,14 @@ public interface PartitionAssignor {
}
Subscription(Short version, List<String> topics, ByteBuffer userData) {
- this(version, topics, userData, Collections.emptyList());
+ this(version, topics, userData, Collections.emptyList(), Optional.empty());
}
- public Subscription(List<String> topics, ByteBuffer userData, List<TopicPartition> ownedPartitions) {
- this(CONSUMER_PROTOCOL_V1, topics, userData, ownedPartitions);
+ public Subscription(List<String> topics,
+ ByteBuffer userData,
+ List<TopicPartition> ownedPartitions,
+ Optional<String> groupInstanceId) {
+ this(CONSUMER_PROTOCOL_V1, topics, userData, ownedPartitions, groupInstanceId);
}
public Subscription(List<String> topics, ByteBuffer userData) {
@@ -176,13 +186,17 @@ public interface PartitionAssignor {
return userData;
}
+ public Optional<String> groupInstanceId() {
+ return groupInstanceId;
+ }
+
@Override
public String toString() {
return "Subscription(" +
"version=" + version +
", topics=" + topics +
", ownedPartitions=" + ownedPartitions +
- ')';
+ ", group.instance.id=" + groupInstanceId + ")";
}
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index c1adf19..cff71c3 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1692,7 +1692,7 @@ public class KafkaConsumerTest {
assertTrue(protocolIterator.hasNext());
ByteBuffer protocolMetadata = ByteBuffer.wrap(protocolIterator.next().metadata());
- PartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(protocolMetadata);
+ PartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(protocolMetadata, Optional.empty());
return subscribedTopics.equals(new HashSet<>(subscription.topics()));
}
}, joinGroupFollowerResponse(assignor, 1, "memberId", "leaderId", Errors.NONE), coordinator);
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index c54540e..8aeec3c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -600,7 +600,7 @@ public class ConsumerCoordinatorTest {
JoinGroupRequestData.JoinGroupRequestProtocol protocolMetadata = protocolIterator.next();
ByteBuffer metadata = ByteBuffer.wrap(protocolMetadata.metadata());
- PartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(metadata);
+ PartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(metadata, Optional.empty());
metadata.rewind();
return subscription.topics().containsAll(updatedSubscriptionSet);
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
index 8a8ba0a..07eafa2 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
@@ -33,6 +33,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.Optional;
import java.util.Set;
import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.CONSUMER_PROTOCOL_HEADER_SCHEMA;
@@ -44,6 +45,7 @@ import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.USER_
import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.VERSION_KEY_NAME;
import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -51,46 +53,64 @@ public class ConsumerProtocolTest {
private final TopicPartition tp1 = new TopicPartition("foo", 1);
private final TopicPartition tp2 = new TopicPartition("bar", 2);
+ private final Optional<String> groupInstanceId = Optional.of("instance.id");
@Test
public void serializeDeserializeMetadata() {
Subscription subscription = new Subscription(Arrays.asList("foo", "bar"));
ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription);
- Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer);
+ Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer, Optional.empty());
assertEquals(subscription.topics(), parsedSubscription.topics());
assertEquals(0, parsedSubscription.userData().limit());
+ assertFalse(parsedSubscription.groupInstanceId().isPresent());
+ }
+
+ @Test
+ public void serializeDeserializeMetadataAndGroupInstanceId() {
+ Subscription subscription = new Subscription(Arrays.asList("foo", "bar"));
+ ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription);
+
+ Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer, groupInstanceId);
+ assertEquals(subscription.topics(), parsedSubscription.topics());
+ assertEquals(groupInstanceId, parsedSubscription.groupInstanceId());
}
@Test
public void serializeDeserializeNullSubscriptionUserData() {
Subscription subscription = new Subscription(Arrays.asList("foo", "bar"), null);
ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription);
- Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer);
+ Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer, Optional.empty());
assertEquals(subscription.topics(), parsedSubscription.topics());
assertNull(parsedSubscription.userData());
+ assertFalse(parsedSubscription.groupInstanceId().isPresent());
}
@Test
public void deserializeOldSubscriptionVersion() {
Subscription subscription = new Subscription((short) 0, Arrays.asList("foo", "bar"), null);
ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription);
- Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer);
+ Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer, groupInstanceId);
assertEquals(parsedSubscription.topics(), parsedSubscription.topics());
assertNull(parsedSubscription.userData());
assertTrue(parsedSubscription.ownedPartitions().isEmpty());
+ assertEquals(groupInstanceId, parsedSubscription.groupInstanceId());
}
@Test
public void deserializeNewSubscriptionWithOldVersion() {
- Subscription subscription = new Subscription((short) 1, Arrays.asList("foo", "bar"), null, Collections.singletonList(tp2));
+ Subscription subscription = new Subscription((short) 1,
+ Arrays.asList("foo", "bar"),
+ null, Collections.singletonList(tp2),
+ Optional.empty());
ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription);
// ignore the version assuming it is the old byte code, as it will blindly deserialize as V0
Struct header = CONSUMER_PROTOCOL_HEADER_SCHEMA.read(buffer);
header.getShort(VERSION_KEY_NAME);
- Subscription parsedSubscription = ConsumerProtocol.deserializeSubscriptionV0(buffer);
+ Subscription parsedSubscription = ConsumerProtocol.deserializeSubscriptionV0(buffer, Optional.empty());
assertEquals(subscription.topics(), parsedSubscription.topics());
assertNull(parsedSubscription.userData());
assertTrue(parsedSubscription.ownedPartitions().isEmpty());
+ assertFalse(parsedSubscription.groupInstanceId().isPresent());
}
@Test
@@ -121,9 +141,10 @@ public class ConsumerProtocolTest {
buffer.flip();
- Subscription subscription = ConsumerProtocol.deserializeSubscription(buffer);
- assertEquals(Collections.singletonList("topic"), subscription.topics());
- assertEquals(Collections.singletonList(tp2), subscription.ownedPartitions());
+ Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer, groupInstanceId);
+ assertEquals(Collections.singletonList("topic"), parsedSubscription.topics());
+ assertEquals(Collections.singletonList(tp2), parsedSubscription.ownedPartitions());
+ assertEquals(groupInstanceId, parsedSubscription.groupInstanceId());
}
@Test