You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/05/28 20:56:00 UTC

[kafka] branch trunk updated: MINOR: Set default `group.instance.id` in JoinGroupResponse to null (#6831)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 901eb36  MINOR: Set default `group.instance.id` in JoinGroupResponse to null (#6831)
901eb36 is described below

commit 901eb3688337a0e173ef7fcff9b3384926520c08
Author: Boyang Chen <bc...@outlook.com>
AuthorDate: Tue May 28 13:55:38 2019 -0700

    MINOR: Set default `group.instance.id` in JoinGroupResponse to null (#6831)
    
    As we are planning to add on more supporting features for rebalancing under static membership, we need to make sure the behavior for `group.instance.id` is consistent throughout the whole stack. This patch ensures that the default value is null in the JoinGroup response.
    
    Reviewers: Guozhang Wang <wa...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
 .../resources/common/message/JoinGroupResponse.json |  3 ++-
 .../apache/kafka/common/message/MessageTest.java    | 21 ++++++++++++++++++++-
 2 files changed, 22 insertions(+), 2 deletions(-)

diff --git a/clients/src/main/resources/common/message/JoinGroupResponse.json b/clients/src/main/resources/common/message/JoinGroupResponse.json
index 6ffa1bc..14e6b1a 100644
--- a/clients/src/main/resources/common/message/JoinGroupResponse.json
+++ b/clients/src/main/resources/common/message/JoinGroupResponse.json
@@ -40,7 +40,8 @@
     { "name": "Members", "type": "[]JoinGroupResponseMember", "versions": "0+", "fields": [
       { "name": "MemberId", "type": "string", "versions": "0+",
         "about": "The group member ID." },
-      { "name": "GroupInstanceId", "type": "string", "versions": "5+", "nullableVersions": "5+",
+      { "name": "GroupInstanceId", "type": "string", "versions": "5+",
+        "nullableVersions": "5+", "default": "null",
         "about": "The unique identifier of the consumer instance provided by end user." },
       { "name": "Metadata", "type": "bytes", "versions": "0+",
         "about": "The group member metadata." }
diff --git a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
index 7c9d335..5213ec2 100644
--- a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
@@ -20,6 +20,7 @@ package org.apache.kafka.common.message;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopic;
 import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicCollection;
+import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
 import org.apache.kafka.common.protocol.Message;
@@ -37,6 +38,7 @@ import org.junit.rules.Timeout;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.function.Supplier;
 
@@ -121,7 +123,7 @@ public final class MessageTest {
     }
 
     @Test
-    public void testJoinGroupVersions() throws Exception {
+    public void testJoinGroupRequestVersions() throws Exception {
         Supplier<JoinGroupRequestData> newRequest = () -> new JoinGroupRequestData()
                 .setGroupId("groupId")
                 .setMemberId("memberId")
@@ -135,6 +137,23 @@ public final class MessageTest {
     }
 
     @Test
+    public void testJoinGroupResponseVersions() throws Exception {
+        String memberId = "memberId";
+        Supplier<JoinGroupResponseData> newResponse = () -> new JoinGroupResponseData()
+                .setMemberId(memberId)
+                .setLeader(memberId)
+                .setGenerationId(1)
+                .setMembers(Collections.singletonList(
+                        new JoinGroupResponseMember()
+                                .setMemberId(memberId)
+                ));
+        testAllMessageRoundTrips(newResponse.get());
+        testAllMessageRoundTripsFromVersion((short) 2, newResponse.get().setThrottleTimeMs(1000));
+        testAllMessageRoundTrips(newResponse.get().members().get(0).setGroupInstanceId(null));
+        testAllMessageRoundTripsFromVersion((short) 5, newResponse.get().members().get(0).setGroupInstanceId("instanceId"));
+    }
+
+    @Test
     public void testSyncGroupDefaultGroupInstanceId() throws Exception {
         Supplier<SyncGroupRequestData> request = () -> new SyncGroupRequestData()
                 .setGroupId("groupId")