You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/05/22 05:58:59 UTC

[kafka] 02/02: HOTFIX: Fix recent protocol breakage from KIP-345 and KIP-392 (#6780)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 42aaccec8b86fc26dc6da5ea3f1c3c7fd7a496a5
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Tue May 21 22:51:56 2019 -0700

    HOTFIX: Fix recent protocol breakage from KIP-345 and KIP-392 (#6780)
    
    KIP-345 and KIP-392 introduced a couple breaking changes for old versions of bumped protocols. This patch fixes them.
    
    Reviewers: Colin Patrick McCabe <cm...@confluent.io>, Ismael Juma <is...@juma.me.uk>, Boyang Chen <bc...@outlook.com>, Guozhang Wang <wa...@gmail.com>
---
 .../requests/OffsetsForLeaderEpochRequest.java     |   2 +-
 .../resources/common/message/HeartbeatRequest.json |   3 +-
 .../resources/common/message/JoinGroupRequest.json |   3 +-
 .../common/message/OffsetCommitRequest.json        |   3 +-
 .../resources/common/message/SyncGroupRequest.json |   3 +-
 .../apache/kafka/common/message/MessageTest.java   | 196 +++++++++++++++------
 .../kafka/common/requests/RequestResponseTest.java |   9 +-
 .../apache/kafka/message/MessageDataGenerator.java |   6 +-
 8 files changed, 165 insertions(+), 60 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
index 6599a70..d3df6cf 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
@@ -166,7 +166,7 @@ public class OffsetsForLeaderEpochRequest extends AbstractRequest {
     @Override
     protected Struct toStruct() {
         Struct requestStruct = new Struct(ApiKeys.OFFSET_FOR_LEADER_EPOCH.requestSchema(version()));
-        requestStruct.set(REPLICA_ID, replicaId);
+        requestStruct.setIfExists(REPLICA_ID, replicaId);
 
         Map<String, Map<Integer, PartitionData>> topicsToPartitionEpochs = CollectionUtils.groupPartitionDataByTopic(epochsByPartition);
 
diff --git a/clients/src/main/resources/common/message/HeartbeatRequest.json b/clients/src/main/resources/common/message/HeartbeatRequest.json
index 148e661..aa7f337 100644
--- a/clients/src/main/resources/common/message/HeartbeatRequest.json
+++ b/clients/src/main/resources/common/message/HeartbeatRequest.json
@@ -27,7 +27,8 @@
       "about": "The generation of the group." },
     { "name": "MemberId", "type": "string", "versions": "0+",
       "about": "The member ID." },
-    { "name": "GroupInstanceId", "type": "string", "versions": "3+", "nullableVersions": "3+",
+    { "name": "GroupInstanceId", "type": "string", "versions": "3+",
+      "nullableVersions": "3+", "default": "null",
       "about": "The unique identifier of the consumer instance provided by end user." }
   ]
 }
diff --git a/clients/src/main/resources/common/message/JoinGroupRequest.json b/clients/src/main/resources/common/message/JoinGroupRequest.json
index 6db24da..8e58b87 100644
--- a/clients/src/main/resources/common/message/JoinGroupRequest.json
+++ b/clients/src/main/resources/common/message/JoinGroupRequest.json
@@ -34,7 +34,8 @@
       "about": "The maximum time in milliseconds that the coordinator will wait for each member to rejoin when rebalancing the group." },
     { "name": "MemberId", "type": "string", "versions": "0+",
       "about": "The member id assigned by the group coordinator." },
-    { "name": "GroupInstanceId", "type": "string", "versions": "5+", "nullableVersions": "5+",
+    { "name": "GroupInstanceId", "type": "string", "versions": "5+", 
+      "nullableVersions": "5+", "default": "null",
       "about": "The unique identifier of the consumer instance provided by end user." },
     { "name": "ProtocolType", "type": "string", "versions": "0+",
       "about": "The unique name the for class of protocols implemented by the group we want to join." },
diff --git a/clients/src/main/resources/common/message/OffsetCommitRequest.json b/clients/src/main/resources/common/message/OffsetCommitRequest.json
index 0ad7565..adda079 100644
--- a/clients/src/main/resources/common/message/OffsetCommitRequest.json
+++ b/clients/src/main/resources/common/message/OffsetCommitRequest.json
@@ -35,7 +35,8 @@
       "about": "The generation of the group." },
     { "name": "MemberId", "type": "string", "versions": "1+", "ignorable": true,
       "about": "The member ID assigned by the group coordinator." },
-    { "name": "GroupInstanceId", "type": "string", "versions": "7+", "ignorable": true, "nullableVersions": "7+",
+    { "name": "GroupInstanceId", "type": "string", "versions": "7+",
+      "nullableVersions": "7+", "default": "null",
       "about": "The unique identifier of the consumer instance provided by end user." },
     { "name": "RetentionTimeMs", "type": "int64", "versions": "2-4", "default": "-1", "ignorable": true,
       "about": "The time period in ms to retain the offset." },
diff --git a/clients/src/main/resources/common/message/SyncGroupRequest.json b/clients/src/main/resources/common/message/SyncGroupRequest.json
index 282cb2a..3ace70d 100644
--- a/clients/src/main/resources/common/message/SyncGroupRequest.json
+++ b/clients/src/main/resources/common/message/SyncGroupRequest.json
@@ -27,7 +27,8 @@
       "about": "The generation of the group." },
     { "name": "MemberId", "type": "string", "versions": "0+",
       "about": "The member ID assigned by the group." },
-    { "name": "GroupInstanceId", "type": "string", "versions": "3+", "nullableVersions": "3+",
+    { "name": "GroupInstanceId", "type": "string", "versions": "3+", 
+      "nullableVersions": "3+", "default": "null",
       "about": "The unique identifier of the consumer instance provided by end user." },
     { "name": "Assignments", "type": "[]SyncGroupRequestAssignment", "versions": "0+",
       "about": "Each assignment.", "fields": [
diff --git a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
index 1780a71..547dbc4 100644
--- a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
@@ -18,6 +18,8 @@
 package org.apache.kafka.common.message;
 
 import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopic;
+import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicCollection;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
 import org.apache.kafka.common.protocol.Message;
@@ -25,22 +27,20 @@ import org.apache.kafka.common.protocol.types.ArrayOf;
 import org.apache.kafka.common.protocol.types.BoundField;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
 import org.apache.kafka.common.protocol.types.Type;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopic;
-import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicCollection;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.Timeout;
 
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Supplier;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -49,50 +49,127 @@ public final class MessageTest {
     @Rule
     final public Timeout globalTimeout = Timeout.millis(120000);
 
-    /**
-     * Test serializing and deserializing some messages.
-     */
     @Test
-    public void testRoundTrips() throws Exception {
-        testMessageRoundTrips(new MetadataRequestData().setTopics(
-            Arrays.asList(new MetadataRequestData.MetadataRequestTopic().setName("foo"),
-                new MetadataRequestData.MetadataRequestTopic().setName("bar")
-            )), (short) 6);
-        testMessageRoundTrips(new AddOffsetsToTxnRequestData().
-            setTransactionalId("foobar").
-            setProducerId(0xbadcafebadcafeL).
-            setProducerEpoch((short) 123).
-            setGroupId("baaz"), (short) 1);
-        testMessageRoundTrips(new AddOffsetsToTxnResponseData().
-            setThrottleTimeMs(42).
-            setErrorCode((short) 0), (short) 0);
-        testMessageRoundTrips(new AddPartitionsToTxnRequestData().
-            setTransactionalId("blah").
-            setProducerId(0xbadcafebadcafeL).
-            setProducerEpoch((short) 30000).
-            setTopics(new AddPartitionsToTxnTopicCollection(Collections.singletonList(
-                new AddPartitionsToTxnTopic().
-                    setName("Topic").
-                    setPartitions(Collections.singletonList(1))).iterator())));
-        testMessageRoundTrips(new CreateTopicsRequestData().
-            setTimeoutMs(1000).setTopics(new CreateTopicsRequestData.CreatableTopicCollection()));
-        testMessageRoundTrips(new DescribeAclsRequestData().
-            setResourceType((byte) 42).
-            setResourceNameFilter(null).
-            setResourcePatternType((byte) 3).
-            setPrincipalFilter("abc").
-            setHostFilter(null).
-            setOperation((byte) 0).
-            setPermissionType((byte) 0), (short) 0);
-        testMessageRoundTrips(new MetadataRequestData().
-            setTopics(null).
-            setAllowAutoTopicCreation(false).
-            setIncludeClusterAuthorizedOperations(false).
-            setIncludeTopicAuthorizedOperations(false));
+    public void testAddOffsetsToTxnVersions() throws Exception {
+        testAllMessageRoundTrips(new AddOffsetsToTxnRequestData().
+                setTransactionalId("foobar").
+                setProducerId(0xbadcafebadcafeL).
+                setProducerEpoch((short) 123).
+                setGroupId("baaz"));
+        testAllMessageRoundTrips(new AddOffsetsToTxnResponseData().
+                setThrottleTimeMs(42).
+                setErrorCode((short) 0));
+    }
+
+    @Test
+    public void testAddPartitionsToTxnVersions() throws Exception {
+        testAllMessageRoundTrips(new AddPartitionsToTxnRequestData().
+                setTransactionalId("blah").
+                setProducerId(0xbadcafebadcafeL).
+                setProducerEpoch((short) 30000).
+                setTopics(new AddPartitionsToTxnTopicCollection(Collections.singletonList(
+                        new AddPartitionsToTxnTopic().
+                                setName("Topic").
+                                setPartitions(Collections.singletonList(1))).iterator())));
+    }
+
+    @Test
+    public void testCreateTopicsVersions() throws Exception {
+        testAllMessageRoundTrips(new CreateTopicsRequestData().
+                setTimeoutMs(1000).setTopics(new CreateTopicsRequestData.CreatableTopicCollection()));
     }
 
-    private void testMessageRoundTrips(Message message) throws Exception {
-        testMessageRoundTrips(message, message.highestSupportedVersion());
+    @Test
+    public void testDescribeAclsRequest() throws Exception {
+        testAllMessageRoundTrips(new DescribeAclsRequestData().
+                setResourceType((byte) 42).
+                setResourceNameFilter(null).
+                setResourcePatternType((byte) 3).
+                setPrincipalFilter("abc").
+                setHostFilter(null).
+                setOperation((byte) 0).
+                setPermissionType((byte) 0));
+    }
+
+    @Test
+    public void testMetadataVersions() throws Exception {
+        testAllMessageRoundTrips(new MetadataRequestData().setTopics(
+                Arrays.asList(new MetadataRequestData.MetadataRequestTopic().setName("foo"),
+                        new MetadataRequestData.MetadataRequestTopic().setName("bar")
+                )));
+        testAllMessageRoundTripsFromVersion(new MetadataRequestData().
+                setTopics(null).
+                setAllowAutoTopicCreation(true).
+                setIncludeClusterAuthorizedOperations(false).
+                setIncludeTopicAuthorizedOperations(false), (short) 1);
+        testAllMessageRoundTripsFromVersion(new MetadataRequestData().
+                setTopics(null).
+                setAllowAutoTopicCreation(false).
+                setIncludeClusterAuthorizedOperations(false).
+                setIncludeTopicAuthorizedOperations(false), (short) 4);
+    }
+
+    @Test
+    public void testHeartbeatVersions() throws Exception {
+        Supplier<HeartbeatRequestData> newRequest = () -> new HeartbeatRequestData()
+                .setGroupId("groupId")
+                .setMemberId("memberId")
+                .setGenerationId(15);
+        testAllMessageRoundTrips(newRequest.get());
+        testAllMessageRoundTrips(newRequest.get().setGroupInstanceId(null));
+        testAllMessageRoundTripsFromVersion(newRequest.get().setGroupInstanceId("instanceId"), (short) 3);
+    }
+
+    @Test
+    public void testJoinGroupVersions() throws Exception {
+        Supplier<JoinGroupRequestData> newRequest = () -> new JoinGroupRequestData()
+                .setGroupId("groupId")
+                .setMemberId("memberId")
+                .setProtocolType("consumer")
+                .setProtocols(new JoinGroupRequestData.JoinGroupRequestProtocolCollection())
+                .setSessionTimeoutMs(10000);
+        testAllMessageRoundTrips(newRequest.get());
+        testAllMessageRoundTripsFromVersion(newRequest.get().setRebalanceTimeoutMs(20000), (short) 1);
+        testAllMessageRoundTrips(newRequest.get().setGroupInstanceId(null));
+        testAllMessageRoundTripsFromVersion(newRequest.get().setGroupInstanceId("instanceId"), (short) 5);
+    }
+
+    @Test
+    public void testSyncGroupDefaultGroupInstanceId() throws Exception {
+        Supplier<SyncGroupRequestData> request = () -> new SyncGroupRequestData()
+                .setGroupId("groupId")
+                .setMemberId("memberId")
+                .setGenerationId(15)
+                .setAssignments(new ArrayList<>());
+        testAllMessageRoundTrips(request.get());
+        testAllMessageRoundTrips(request.get().setGroupInstanceId(null));
+        testAllMessageRoundTripsFromVersion(request.get().setGroupInstanceId("instanceId"), (short) 3);
+    }
+
+    @Test
+    public void testOffsetCommitDefaultGroupInstanceId() throws Exception {
+        testAllMessageRoundTrips(new OffsetCommitRequestData()
+                .setTopics(new ArrayList<>())
+                .setGroupId("groupId"));
+
+        Supplier<OffsetCommitRequestData> request = () -> new OffsetCommitRequestData()
+                .setGroupId("groupId")
+                .setMemberId("memberId")
+                .setTopics(new ArrayList<>())
+                .setGenerationId(15);
+        testAllMessageRoundTripsFromVersion(request.get(), (short) 1);
+        testAllMessageRoundTripsFromVersion(request.get().setGroupInstanceId(null), (short) 1);
+        testAllMessageRoundTripsFromVersion(request.get().setGroupInstanceId("instanceId"), (short) 7);
+    }
+
+    private void testAllMessageRoundTrips(Message message) throws Exception {
+        testAllMessageRoundTripsFromVersion(message, message.lowestSupportedVersion());
+    }
+
+    private void testAllMessageRoundTripsFromVersion(Message message, short fromVersion) throws Exception {
+        for (short version = fromVersion; version < message.highestSupportedVersion(); version++) {
+            testMessageRoundTrips(message, version);
+        }
     }
 
     private void testMessageRoundTrips(Message message, short version) throws Exception {
@@ -294,6 +371,25 @@ public final class MessageTest {
                 new FetchRequestData.ForgottenTopic().setName("foo"))));
     }
 
+    @Test
+    public void testNonIgnorableFieldWithDefaultNull() throws Exception {
+        // Test non-ignorable string field `groupInstanceId` with default null
+        verifySizeRaisesUve((short) 0, "groupInstanceId", new HeartbeatRequestData()
+                .setGroupId("groupId")
+                .setGenerationId(15)
+                .setMemberId("memberId")
+                .setGroupInstanceId("instanceId"));
+        verifySizeSucceeds((short) 0, new HeartbeatRequestData()
+                .setGroupId("groupId")
+                .setGenerationId(15)
+                .setMemberId("memberId")
+                .setGroupInstanceId(null));
+        verifySizeSucceeds((short) 0, new HeartbeatRequestData()
+                .setGroupId("groupId")
+                .setGenerationId(15)
+                .setMemberId("memberId"));
+    }
+
     private void verifySizeRaisesUve(short version, String problemFieldName,
                                      Message message) throws Exception {
         try {
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index d7e9223..e8f349f 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -290,9 +290,10 @@ public class RequestResponseTest {
         checkRequest(createListOffsetRequest(0), true);
         checkErrorResponse(createListOffsetRequest(0), new UnknownServerException(), true);
         checkResponse(createListOffsetResponse(0), 0, true);
-        checkRequest(createLeaderEpochRequest(), true);
+        checkRequest(createLeaderEpochRequest(0), true);
+        checkRequest(createLeaderEpochRequest(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion()), true);
         checkResponse(createLeaderEpochResponse(), 0, true);
-        checkErrorResponse(createLeaderEpochRequest(), new UnknownServerException(), true);
+        checkErrorResponse(createLeaderEpochRequest(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion()), new UnknownServerException(), true);
         checkRequest(createAddPartitionsToTxnRequest(), true);
         checkErrorResponse(createAddPartitionsToTxnRequest(), new UnknownServerException(), true);
         checkResponse(createAddPartitionsToTxnResponse(), 0, true);
@@ -1250,7 +1251,7 @@ public class RequestResponseTest {
         return new InitProducerIdResponse(responseData);
     }
 
-    private OffsetsForLeaderEpochRequest createLeaderEpochRequest() {
+    private OffsetsForLeaderEpochRequest createLeaderEpochRequest(int version) {
         Map<TopicPartition, OffsetsForLeaderEpochRequest.PartitionData> epochs = new HashMap<>();
 
         epochs.put(new TopicPartition("topic1", 0),
@@ -1260,7 +1261,7 @@ public class RequestResponseTest {
         epochs.put(new TopicPartition("topic2", 2),
                 new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), 3));
 
-        return OffsetsForLeaderEpochRequest.Builder.forConsumer(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), epochs).build();
+        return OffsetsForLeaderEpochRequest.Builder.forConsumer((short) version, epochs).build();
     }
 
     private OffsetsForLeaderEpochResponse createLeaderEpochResponse() {
diff --git a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
index d6cd5f3..ad00137 100644
--- a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
+++ b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
@@ -928,7 +928,11 @@ public final class MessageDataGenerator {
         } else if (field.type().isBytes()) {
             buffer.printf("if (%s.length != 0) {%n", field.camelCaseName());
         } else if (field.type().isString()) {
-            buffer.printf("if (%s.equals(%s)) {%n", field.camelCaseName(), fieldDefault(field));
+            if (fieldDefault(field).equals("null")) {
+                buffer.printf("if (%s != null) {%n", field.camelCaseName());
+            } else {
+                buffer.printf("if (!%s.equals(%s)) {%n", field.camelCaseName(), fieldDefault(field));
+            }
         } else if (field.type() instanceof FieldType.BoolFieldType) {
             buffer.printf("if (%s%s) {%n",
                 fieldDefault(field).equals("true") ? "!" : "",