You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2023/05/03 22:27:06 UTC
[kafka] branch 3.4 updated: MINOR: Allow tagged fields with version subset of flexible version range (#13551)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.4 by this push:
new 69fbf4c46a5 MINOR: Allow tagged fields with version subset of flexible version range (#13551)
69fbf4c46a5 is described below
commit 69fbf4c46a50348e1040641158057553d909f8e3
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Wed May 3 15:25:32 2023 -0700
MINOR: Allow tagged fields with version subset of flexible version range (#13551)
The generated message types are missing a range check for the case when the tagged version range is a subset of
the flexible version range. This causes the tagged field count, which is computed correctly, to conflict with the
number of tags serialized.
Reviewers: Colin P. McCabe <cm...@apache.org>
---
.../common/message/SimpleExampleMessageTest.java | 28 ++++++++++++++++++++--
.../common/message/SimpleExampleMessage.json | 2 ++
.../apache/kafka/message/MessageDataGenerator.java | 4 ++--
3 files changed, 30 insertions(+), 4 deletions(-)
diff --git a/clients/src/test/java/org/apache/kafka/common/message/SimpleExampleMessageTest.java b/clients/src/test/java/org/apache/kafka/common/message/SimpleExampleMessageTest.java
index b904eed2721..c69e56525a7 100644
--- a/clients/src/test/java/org/apache/kafka/common/message/SimpleExampleMessageTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/message/SimpleExampleMessageTest.java
@@ -339,9 +339,8 @@ public class SimpleExampleMessageTest {
Consumer<SimpleExampleMessageData> validator,
short version) {
validator.accept(message);
- ByteBuffer buf = MessageUtil.toByteBuffer(message, version);
- SimpleExampleMessageData message2 = deserialize(buf.duplicate(), version);
+ SimpleExampleMessageData message2 = roundTripSerde(message, version);
validator.accept(message2);
assertEquals(message, message2);
assertEquals(message.hashCode(), message2.hashCode());
@@ -354,6 +353,30 @@ public class SimpleExampleMessageTest {
assertEquals(message.hashCode(), messageFromJson.hashCode());
}
+ private SimpleExampleMessageData roundTripSerde(
+ SimpleExampleMessageData message,
+ short version
+ ) {
+ ByteBuffer buf = MessageUtil.toByteBuffer(message, version);
+ return deserialize(buf.duplicate(), version);
+ }
+
+ @Test
+ public void testTaggedFieldsShouldSupportFlexibleVersionSubset() {
+ SimpleExampleMessageData message = new SimpleExampleMessageData()
+ .setTaggedLongFlexibleVersionSubset(15L);
+
+ testRoundTrip(
+ message,
+ msg -> assertEquals(15, msg.taggedLongFlexibleVersionSubset),
+ (short) 2
+ );
+
+ SimpleExampleMessageData deserialized = roundTripSerde(message, (short) 1);
+ assertEquals(new SimpleExampleMessageData(), deserialized);
+ assertEquals(0, deserialized.taggedLongFlexibleVersionSubset);
+ }
+
@Test
public void testToString() {
SimpleExampleMessageData message = new SimpleExampleMessageData();
@@ -372,6 +395,7 @@ public class SimpleExampleMessageTest {
"nullableZeroCopyByteBuffer=java.nio.HeapByteBuffer[pos=0 lim=0 cap=0], " +
"myStruct=MyStruct(structId=0, arrayInStruct=[]), " +
"myTaggedStruct=TaggedStruct(structId=''), " +
+ "taggedLongFlexibleVersionSubset=0, " +
"myCommonStruct=TestCommonStruct(foo=123, bar=123), " +
"myOtherCommonStruct=TestCommonStruct(foo=123, bar=123), " +
"myUint16=65535, " +
diff --git a/clients/src/test/resources/common/message/SimpleExampleMessage.json b/clients/src/test/resources/common/message/SimpleExampleMessage.json
index 9b9c049593a..b1bad6aeac9 100644
--- a/clients/src/test/resources/common/message/SimpleExampleMessage.json
+++ b/clients/src/test/resources/common/message/SimpleExampleMessage.json
@@ -50,6 +50,8 @@
"fields": [
{ "name": "structId", "type": "string", "versions": "2+", "about": "String field in struct"}
]},
+ { "name": "taggedLongFlexibleVersionSubset", "type": "int64", "default": "0", "ignorable": true,
+ "taggedVersions": "2+", "tag": 9 },
{ "name": "myCommonStruct", "type": "TestCommonStruct", "versions": "0+"},
{ "name": "myOtherCommonStruct", "type": "TestCommonStruct", "versions": "0+"},
{ "name": "myUint16", "type": "uint16", "versions": "1+", "default": "33000" },
diff --git a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
index 45188fa8109..6c477ef41a7 100644
--- a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
+++ b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
@@ -747,7 +747,7 @@ public final class MessageDataGenerator implements MessageClassGenerator {
ifNotMember(__ -> {
generateCheckForUnsupportedNumTaggedFields("_numTaggedFields > 0");
}).
- ifMember(__ -> {
+ ifMember(flexibleVersions -> {
buffer.printf("_writable.writeUnsignedVarint(_numTaggedFields);%n");
int prevTag = -1;
for (FieldSpec field : taggedFields.values()) {
@@ -755,7 +755,7 @@ public final class MessageDataGenerator implements MessageClassGenerator {
buffer.printf("_rawWriter.writeRawTags(_writable, %d);%n", field.tag().get());
}
VersionConditional.
- forVersions(field.versions(), field.taggedVersions().intersect(field.versions())).
+ forVersions(field.taggedVersions().intersect(field.versions()), flexibleVersions).
allowMembershipCheckAlwaysFalse(false).
ifMember(presentAndTaggedVersions -> {
IsNullConditional cond = IsNullConditional.forName(field.camelCaseName()).