You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/05/22 05:58:59 UTC
[kafka] 02/02: HOTFIX: Fix recent protocol breakage from KIP-345
and KIP-392 (#6780)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 42aaccec8b86fc26dc6da5ea3f1c3c7fd7a496a5
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Tue May 21 22:51:56 2019 -0700
HOTFIX: Fix recent protocol breakage from KIP-345 and KIP-392 (#6780)
KIP-345 and KIP-392 introduced a couple breaking changes for old versions of bumped protocols. This patch fixes them.
Reviewers: Colin Patrick McCabe <cm...@confluent.io>, Ismael Juma <is...@juma.me.uk>, Boyang Chen <bc...@outlook.com>, Guozhang Wang <wa...@gmail.com>
---
.../requests/OffsetsForLeaderEpochRequest.java | 2 +-
.../resources/common/message/HeartbeatRequest.json | 3 +-
.../resources/common/message/JoinGroupRequest.json | 3 +-
.../common/message/OffsetCommitRequest.json | 3 +-
.../resources/common/message/SyncGroupRequest.json | 3 +-
.../apache/kafka/common/message/MessageTest.java | 196 +++++++++++++++------
.../kafka/common/requests/RequestResponseTest.java | 9 +-
.../apache/kafka/message/MessageDataGenerator.java | 6 +-
8 files changed, 165 insertions(+), 60 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
index 6599a70..d3df6cf 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
@@ -166,7 +166,7 @@ public class OffsetsForLeaderEpochRequest extends AbstractRequest {
@Override
protected Struct toStruct() {
Struct requestStruct = new Struct(ApiKeys.OFFSET_FOR_LEADER_EPOCH.requestSchema(version()));
- requestStruct.set(REPLICA_ID, replicaId);
+ requestStruct.setIfExists(REPLICA_ID, replicaId);
Map<String, Map<Integer, PartitionData>> topicsToPartitionEpochs = CollectionUtils.groupPartitionDataByTopic(epochsByPartition);
diff --git a/clients/src/main/resources/common/message/HeartbeatRequest.json b/clients/src/main/resources/common/message/HeartbeatRequest.json
index 148e661..aa7f337 100644
--- a/clients/src/main/resources/common/message/HeartbeatRequest.json
+++ b/clients/src/main/resources/common/message/HeartbeatRequest.json
@@ -27,7 +27,8 @@
"about": "The generation of the group." },
{ "name": "MemberId", "type": "string", "versions": "0+",
"about": "The member ID." },
- { "name": "GroupInstanceId", "type": "string", "versions": "3+", "nullableVersions": "3+",
+ { "name": "GroupInstanceId", "type": "string", "versions": "3+",
+ "nullableVersions": "3+", "default": "null",
"about": "The unique identifier of the consumer instance provided by end user." }
]
}
diff --git a/clients/src/main/resources/common/message/JoinGroupRequest.json b/clients/src/main/resources/common/message/JoinGroupRequest.json
index 6db24da..8e58b87 100644
--- a/clients/src/main/resources/common/message/JoinGroupRequest.json
+++ b/clients/src/main/resources/common/message/JoinGroupRequest.json
@@ -34,7 +34,8 @@
"about": "The maximum time in milliseconds that the coordinator will wait for each member to rejoin when rebalancing the group." },
{ "name": "MemberId", "type": "string", "versions": "0+",
"about": "The member id assigned by the group coordinator." },
- { "name": "GroupInstanceId", "type": "string", "versions": "5+", "nullableVersions": "5+",
+ { "name": "GroupInstanceId", "type": "string", "versions": "5+",
+ "nullableVersions": "5+", "default": "null",
"about": "The unique identifier of the consumer instance provided by end user." },
{ "name": "ProtocolType", "type": "string", "versions": "0+",
"about": "The unique name the for class of protocols implemented by the group we want to join." },
diff --git a/clients/src/main/resources/common/message/OffsetCommitRequest.json b/clients/src/main/resources/common/message/OffsetCommitRequest.json
index 0ad7565..adda079 100644
--- a/clients/src/main/resources/common/message/OffsetCommitRequest.json
+++ b/clients/src/main/resources/common/message/OffsetCommitRequest.json
@@ -35,7 +35,8 @@
"about": "The generation of the group." },
{ "name": "MemberId", "type": "string", "versions": "1+", "ignorable": true,
"about": "The member ID assigned by the group coordinator." },
- { "name": "GroupInstanceId", "type": "string", "versions": "7+", "ignorable": true, "nullableVersions": "7+",
+ { "name": "GroupInstanceId", "type": "string", "versions": "7+",
+ "nullableVersions": "7+", "default": "null",
"about": "The unique identifier of the consumer instance provided by end user." },
{ "name": "RetentionTimeMs", "type": "int64", "versions": "2-4", "default": "-1", "ignorable": true,
"about": "The time period in ms to retain the offset." },
diff --git a/clients/src/main/resources/common/message/SyncGroupRequest.json b/clients/src/main/resources/common/message/SyncGroupRequest.json
index 282cb2a..3ace70d 100644
--- a/clients/src/main/resources/common/message/SyncGroupRequest.json
+++ b/clients/src/main/resources/common/message/SyncGroupRequest.json
@@ -27,7 +27,8 @@
"about": "The generation of the group." },
{ "name": "MemberId", "type": "string", "versions": "0+",
"about": "The member ID assigned by the group." },
- { "name": "GroupInstanceId", "type": "string", "versions": "3+", "nullableVersions": "3+",
+ { "name": "GroupInstanceId", "type": "string", "versions": "3+",
+ "nullableVersions": "3+", "default": "null",
"about": "The unique identifier of the consumer instance provided by end user." },
{ "name": "Assignments", "type": "[]SyncGroupRequestAssignment", "versions": "0+",
"about": "Each assignment.", "fields": [
diff --git a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
index 1780a71..547dbc4 100644
--- a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
@@ -18,6 +18,8 @@
package org.apache.kafka.common.message;
import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopic;
+import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicCollection;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Message;
@@ -25,22 +27,20 @@ import org.apache.kafka.common.protocol.types.ArrayOf;
import org.apache.kafka.common.protocol.types.BoundField;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
import org.apache.kafka.common.protocol.types.Type;
import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopic;
-import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicCollection;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Supplier;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -49,50 +49,127 @@ public final class MessageTest {
@Rule
final public Timeout globalTimeout = Timeout.millis(120000);
- /**
- * Test serializing and deserializing some messages.
- */
@Test
- public void testRoundTrips() throws Exception {
- testMessageRoundTrips(new MetadataRequestData().setTopics(
- Arrays.asList(new MetadataRequestData.MetadataRequestTopic().setName("foo"),
- new MetadataRequestData.MetadataRequestTopic().setName("bar")
- )), (short) 6);
- testMessageRoundTrips(new AddOffsetsToTxnRequestData().
- setTransactionalId("foobar").
- setProducerId(0xbadcafebadcafeL).
- setProducerEpoch((short) 123).
- setGroupId("baaz"), (short) 1);
- testMessageRoundTrips(new AddOffsetsToTxnResponseData().
- setThrottleTimeMs(42).
- setErrorCode((short) 0), (short) 0);
- testMessageRoundTrips(new AddPartitionsToTxnRequestData().
- setTransactionalId("blah").
- setProducerId(0xbadcafebadcafeL).
- setProducerEpoch((short) 30000).
- setTopics(new AddPartitionsToTxnTopicCollection(Collections.singletonList(
- new AddPartitionsToTxnTopic().
- setName("Topic").
- setPartitions(Collections.singletonList(1))).iterator())));
- testMessageRoundTrips(new CreateTopicsRequestData().
- setTimeoutMs(1000).setTopics(new CreateTopicsRequestData.CreatableTopicCollection()));
- testMessageRoundTrips(new DescribeAclsRequestData().
- setResourceType((byte) 42).
- setResourceNameFilter(null).
- setResourcePatternType((byte) 3).
- setPrincipalFilter("abc").
- setHostFilter(null).
- setOperation((byte) 0).
- setPermissionType((byte) 0), (short) 0);
- testMessageRoundTrips(new MetadataRequestData().
- setTopics(null).
- setAllowAutoTopicCreation(false).
- setIncludeClusterAuthorizedOperations(false).
- setIncludeTopicAuthorizedOperations(false));
+ public void testAddOffsetsToTxnVersions() throws Exception {
+ testAllMessageRoundTrips(new AddOffsetsToTxnRequestData().
+ setTransactionalId("foobar").
+ setProducerId(0xbadcafebadcafeL).
+ setProducerEpoch((short) 123).
+ setGroupId("baaz"));
+ testAllMessageRoundTrips(new AddOffsetsToTxnResponseData().
+ setThrottleTimeMs(42).
+ setErrorCode((short) 0));
+ }
+
+ @Test
+ public void testAddPartitionsToTxnVersions() throws Exception {
+ testAllMessageRoundTrips(new AddPartitionsToTxnRequestData().
+ setTransactionalId("blah").
+ setProducerId(0xbadcafebadcafeL).
+ setProducerEpoch((short) 30000).
+ setTopics(new AddPartitionsToTxnTopicCollection(Collections.singletonList(
+ new AddPartitionsToTxnTopic().
+ setName("Topic").
+ setPartitions(Collections.singletonList(1))).iterator())));
+ }
+
+ @Test
+ public void testCreateTopicsVersions() throws Exception {
+ testAllMessageRoundTrips(new CreateTopicsRequestData().
+ setTimeoutMs(1000).setTopics(new CreateTopicsRequestData.CreatableTopicCollection()));
}
- private void testMessageRoundTrips(Message message) throws Exception {
- testMessageRoundTrips(message, message.highestSupportedVersion());
+ @Test
+ public void testDescribeAclsRequest() throws Exception {
+ testAllMessageRoundTrips(new DescribeAclsRequestData().
+ setResourceType((byte) 42).
+ setResourceNameFilter(null).
+ setResourcePatternType((byte) 3).
+ setPrincipalFilter("abc").
+ setHostFilter(null).
+ setOperation((byte) 0).
+ setPermissionType((byte) 0));
+ }
+
+ @Test
+ public void testMetadataVersions() throws Exception {
+ testAllMessageRoundTrips(new MetadataRequestData().setTopics(
+ Arrays.asList(new MetadataRequestData.MetadataRequestTopic().setName("foo"),
+ new MetadataRequestData.MetadataRequestTopic().setName("bar")
+ )));
+ testAllMessageRoundTripsFromVersion(new MetadataRequestData().
+ setTopics(null).
+ setAllowAutoTopicCreation(true).
+ setIncludeClusterAuthorizedOperations(false).
+ setIncludeTopicAuthorizedOperations(false), (short) 1);
+ testAllMessageRoundTripsFromVersion(new MetadataRequestData().
+ setTopics(null).
+ setAllowAutoTopicCreation(false).
+ setIncludeClusterAuthorizedOperations(false).
+ setIncludeTopicAuthorizedOperations(false), (short) 4);
+ }
+
+ @Test
+ public void testHeartbeatVersions() throws Exception {
+ Supplier<HeartbeatRequestData> newRequest = () -> new HeartbeatRequestData()
+ .setGroupId("groupId")
+ .setMemberId("memberId")
+ .setGenerationId(15);
+ testAllMessageRoundTrips(newRequest.get());
+ testAllMessageRoundTrips(newRequest.get().setGroupInstanceId(null));
+ testAllMessageRoundTripsFromVersion(newRequest.get().setGroupInstanceId("instanceId"), (short) 3);
+ }
+
+ @Test
+ public void testJoinGroupVersions() throws Exception {
+ Supplier<JoinGroupRequestData> newRequest = () -> new JoinGroupRequestData()
+ .setGroupId("groupId")
+ .setMemberId("memberId")
+ .setProtocolType("consumer")
+ .setProtocols(new JoinGroupRequestData.JoinGroupRequestProtocolCollection())
+ .setSessionTimeoutMs(10000);
+ testAllMessageRoundTrips(newRequest.get());
+ testAllMessageRoundTripsFromVersion(newRequest.get().setRebalanceTimeoutMs(20000), (short) 1);
+ testAllMessageRoundTrips(newRequest.get().setGroupInstanceId(null));
+ testAllMessageRoundTripsFromVersion(newRequest.get().setGroupInstanceId("instanceId"), (short) 5);
+ }
+
+ @Test
+ public void testSyncGroupDefaultGroupInstanceId() throws Exception {
+ Supplier<SyncGroupRequestData> request = () -> new SyncGroupRequestData()
+ .setGroupId("groupId")
+ .setMemberId("memberId")
+ .setGenerationId(15)
+ .setAssignments(new ArrayList<>());
+ testAllMessageRoundTrips(request.get());
+ testAllMessageRoundTrips(request.get().setGroupInstanceId(null));
+ testAllMessageRoundTripsFromVersion(request.get().setGroupInstanceId("instanceId"), (short) 3);
+ }
+
+ @Test
+ public void testOffsetCommitDefaultGroupInstanceId() throws Exception {
+ testAllMessageRoundTrips(new OffsetCommitRequestData()
+ .setTopics(new ArrayList<>())
+ .setGroupId("groupId"));
+
+ Supplier<OffsetCommitRequestData> request = () -> new OffsetCommitRequestData()
+ .setGroupId("groupId")
+ .setMemberId("memberId")
+ .setTopics(new ArrayList<>())
+ .setGenerationId(15);
+ testAllMessageRoundTripsFromVersion(request.get(), (short) 1);
+ testAllMessageRoundTripsFromVersion(request.get().setGroupInstanceId(null), (short) 1);
+ testAllMessageRoundTripsFromVersion(request.get().setGroupInstanceId("instanceId"), (short) 7);
+ }
+
+ private void testAllMessageRoundTrips(Message message) throws Exception {
+ testAllMessageRoundTripsFromVersion(message, message.lowestSupportedVersion());
+ }
+
+ private void testAllMessageRoundTripsFromVersion(Message message, short fromVersion) throws Exception {
+ for (short version = fromVersion; version < message.highestSupportedVersion(); version++) {
+ testMessageRoundTrips(message, version);
+ }
}
private void testMessageRoundTrips(Message message, short version) throws Exception {
@@ -294,6 +371,25 @@ public final class MessageTest {
new FetchRequestData.ForgottenTopic().setName("foo"))));
}
+ @Test
+ public void testNonIgnorableFieldWithDefaultNull() throws Exception {
+ // Test non-ignorable string field `groupInstanceId` with default null
+ verifySizeRaisesUve((short) 0, "groupInstanceId", new HeartbeatRequestData()
+ .setGroupId("groupId")
+ .setGenerationId(15)
+ .setMemberId("memberId")
+ .setGroupInstanceId("instanceId"));
+ verifySizeSucceeds((short) 0, new HeartbeatRequestData()
+ .setGroupId("groupId")
+ .setGenerationId(15)
+ .setMemberId("memberId")
+ .setGroupInstanceId(null));
+ verifySizeSucceeds((short) 0, new HeartbeatRequestData()
+ .setGroupId("groupId")
+ .setGenerationId(15)
+ .setMemberId("memberId"));
+ }
+
private void verifySizeRaisesUve(short version, String problemFieldName,
Message message) throws Exception {
try {
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index d7e9223..e8f349f 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -290,9 +290,10 @@ public class RequestResponseTest {
checkRequest(createListOffsetRequest(0), true);
checkErrorResponse(createListOffsetRequest(0), new UnknownServerException(), true);
checkResponse(createListOffsetResponse(0), 0, true);
- checkRequest(createLeaderEpochRequest(), true);
+ checkRequest(createLeaderEpochRequest(0), true);
+ checkRequest(createLeaderEpochRequest(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion()), true);
checkResponse(createLeaderEpochResponse(), 0, true);
- checkErrorResponse(createLeaderEpochRequest(), new UnknownServerException(), true);
+ checkErrorResponse(createLeaderEpochRequest(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion()), new UnknownServerException(), true);
checkRequest(createAddPartitionsToTxnRequest(), true);
checkErrorResponse(createAddPartitionsToTxnRequest(), new UnknownServerException(), true);
checkResponse(createAddPartitionsToTxnResponse(), 0, true);
@@ -1250,7 +1251,7 @@ public class RequestResponseTest {
return new InitProducerIdResponse(responseData);
}
- private OffsetsForLeaderEpochRequest createLeaderEpochRequest() {
+ private OffsetsForLeaderEpochRequest createLeaderEpochRequest(int version) {
Map<TopicPartition, OffsetsForLeaderEpochRequest.PartitionData> epochs = new HashMap<>();
epochs.put(new TopicPartition("topic1", 0),
@@ -1260,7 +1261,7 @@ public class RequestResponseTest {
epochs.put(new TopicPartition("topic2", 2),
new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), 3));
- return OffsetsForLeaderEpochRequest.Builder.forConsumer(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), epochs).build();
+ return OffsetsForLeaderEpochRequest.Builder.forConsumer((short) version, epochs).build();
}
private OffsetsForLeaderEpochResponse createLeaderEpochResponse() {
diff --git a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
index d6cd5f3..ad00137 100644
--- a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
+++ b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
@@ -928,7 +928,11 @@ public final class MessageDataGenerator {
} else if (field.type().isBytes()) {
buffer.printf("if (%s.length != 0) {%n", field.camelCaseName());
} else if (field.type().isString()) {
- buffer.printf("if (%s.equals(%s)) {%n", field.camelCaseName(), fieldDefault(field));
+ if (fieldDefault(field).equals("null")) {
+ buffer.printf("if (%s != null) {%n", field.camelCaseName());
+ } else {
+ buffer.printf("if (!%s.equals(%s)) {%n", field.camelCaseName(), fieldDefault(field));
+ }
} else if (field.type() instanceof FieldType.BoolFieldType) {
buffer.printf("if (%s%s) {%n",
fieldDefault(field).equals("true") ? "!" : "",