You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/05/28 20:56:00 UTC
[kafka] branch trunk updated: MINOR: Set default
`group.instance.id` in JoinGroupResponse to null (#6831)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 901eb36 MINOR: Set default `group.instance.id` in JoinGroupResponse to null (#6831)
901eb36 is described below
commit 901eb3688337a0e173ef7fcff9b3384926520c08
Author: Boyang Chen <bc...@outlook.com>
AuthorDate: Tue May 28 13:55:38 2019 -0700
MINOR: Set default `group.instance.id` in JoinGroupResponse to null (#6831)
As we are planning to add on more supporting features for rebalancing under static membership, we need to make sure the behavior for `group.instance.id` is consistent throughout the whole stack. This patch ensures that the default value is null in the JoinGroup response.
Reviewers: Guozhang Wang <wa...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
.../resources/common/message/JoinGroupResponse.json | 3 ++-
.../apache/kafka/common/message/MessageTest.java | 21 ++++++++++++++++++++-
2 files changed, 22 insertions(+), 2 deletions(-)
diff --git a/clients/src/main/resources/common/message/JoinGroupResponse.json b/clients/src/main/resources/common/message/JoinGroupResponse.json
index 6ffa1bc..14e6b1a 100644
--- a/clients/src/main/resources/common/message/JoinGroupResponse.json
+++ b/clients/src/main/resources/common/message/JoinGroupResponse.json
@@ -40,7 +40,8 @@
{ "name": "Members", "type": "[]JoinGroupResponseMember", "versions": "0+", "fields": [
{ "name": "MemberId", "type": "string", "versions": "0+",
"about": "The group member ID." },
- { "name": "GroupInstanceId", "type": "string", "versions": "5+", "nullableVersions": "5+",
+ { "name": "GroupInstanceId", "type": "string", "versions": "5+",
+ "nullableVersions": "5+", "default": "null",
"about": "The unique identifier of the consumer instance provided by end user." },
{ "name": "Metadata", "type": "bytes", "versions": "0+",
"about": "The group member metadata." }
diff --git a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
index 7c9d335..5213ec2 100644
--- a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
@@ -20,6 +20,7 @@ package org.apache.kafka.common.message;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopic;
import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicCollection;
+import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Message;
@@ -37,6 +38,7 @@ import org.junit.rules.Timeout;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.function.Supplier;
@@ -121,7 +123,7 @@ public final class MessageTest {
}
@Test
- public void testJoinGroupVersions() throws Exception {
+ public void testJoinGroupRequestVersions() throws Exception {
Supplier<JoinGroupRequestData> newRequest = () -> new JoinGroupRequestData()
.setGroupId("groupId")
.setMemberId("memberId")
@@ -135,6 +137,23 @@ public final class MessageTest {
}
@Test
+ public void testJoinGroupResponseVersions() throws Exception {
+ String memberId = "memberId";
+ Supplier<JoinGroupResponseData> newResponse = () -> new JoinGroupResponseData()
+ .setMemberId(memberId)
+ .setLeader(memberId)
+ .setGenerationId(1)
+ .setMembers(Collections.singletonList(
+ new JoinGroupResponseMember()
+ .setMemberId(memberId)
+ ));
+ testAllMessageRoundTrips(newResponse.get());
+ testAllMessageRoundTripsFromVersion((short) 2, newResponse.get().setThrottleTimeMs(1000));
+ testAllMessageRoundTrips(newResponse.get().members().get(0).setGroupInstanceId(null));
+ testAllMessageRoundTripsFromVersion((short) 5, newResponse.get().members().get(0).setGroupInstanceId("instanceId"));
+ }
+
+ @Test
public void testSyncGroupDefaultGroupInstanceId() throws Exception {
Supplier<SyncGroupRequestData> request = () -> new SyncGroupRequestData()
.setGroupId("groupId")