You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2023/05/03 22:27:06 UTC

[kafka] branch 3.4 updated: MINOR: Allow tagged fields with version subset of flexible version range (#13551)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.4 by this push:
     new 69fbf4c46a5 MINOR: Allow tagged fields with version subset of flexible version range (#13551)
69fbf4c46a5 is described below

commit 69fbf4c46a50348e1040641158057553d909f8e3
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Wed May 3 15:25:32 2023 -0700

    MINOR: Allow tagged fields with version subset of flexible version range (#13551)
    
    The generated message types are missing a range check for the case when the tagged version range is a subset of
    the flexible version range. This causes the tagged field count, which is computed correctly, to conflict with the
    number of tags serialized.
    
    Reviewers: Colin P. McCabe <cm...@apache.org>
---
 .../common/message/SimpleExampleMessageTest.java   | 28 ++++++++++++++++++++--
 .../common/message/SimpleExampleMessage.json       |  2 ++
 .../apache/kafka/message/MessageDataGenerator.java |  4 ++--
 3 files changed, 30 insertions(+), 4 deletions(-)

diff --git a/clients/src/test/java/org/apache/kafka/common/message/SimpleExampleMessageTest.java b/clients/src/test/java/org/apache/kafka/common/message/SimpleExampleMessageTest.java
index b904eed2721..c69e56525a7 100644
--- a/clients/src/test/java/org/apache/kafka/common/message/SimpleExampleMessageTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/message/SimpleExampleMessageTest.java
@@ -339,9 +339,8 @@ public class SimpleExampleMessageTest {
                                Consumer<SimpleExampleMessageData> validator,
                                short version) {
         validator.accept(message);
-        ByteBuffer buf = MessageUtil.toByteBuffer(message, version);
 
-        SimpleExampleMessageData message2 = deserialize(buf.duplicate(), version);
+        SimpleExampleMessageData message2 = roundTripSerde(message, version);
         validator.accept(message2);
         assertEquals(message, message2);
         assertEquals(message.hashCode(), message2.hashCode());
@@ -354,6 +353,30 @@ public class SimpleExampleMessageTest {
         assertEquals(message.hashCode(), messageFromJson.hashCode());
     }
 
+    private SimpleExampleMessageData roundTripSerde(
+        SimpleExampleMessageData message,
+        short version
+    ) {
+        ByteBuffer buf = MessageUtil.toByteBuffer(message, version);
+        return deserialize(buf.duplicate(), version);
+    }
+
+    @Test
+    public void testTaggedFieldsShouldSupportFlexibleVersionSubset() {
+        SimpleExampleMessageData message = new SimpleExampleMessageData()
+            .setTaggedLongFlexibleVersionSubset(15L);
+
+        testRoundTrip(
+            message,
+            msg -> assertEquals(15, msg.taggedLongFlexibleVersionSubset),
+            (short) 2
+        );
+
+        SimpleExampleMessageData deserialized = roundTripSerde(message, (short) 1);
+        assertEquals(new SimpleExampleMessageData(), deserialized);
+        assertEquals(0, deserialized.taggedLongFlexibleVersionSubset);
+    }
+
     @Test
     public void testToString() {
         SimpleExampleMessageData message = new SimpleExampleMessageData();
@@ -372,6 +395,7 @@ public class SimpleExampleMessageTest {
                 "nullableZeroCopyByteBuffer=java.nio.HeapByteBuffer[pos=0 lim=0 cap=0], " +
                 "myStruct=MyStruct(structId=0, arrayInStruct=[]), " +
                 "myTaggedStruct=TaggedStruct(structId=''), " +
+                "taggedLongFlexibleVersionSubset=0, " +
                 "myCommonStruct=TestCommonStruct(foo=123, bar=123), " +
                 "myOtherCommonStruct=TestCommonStruct(foo=123, bar=123), " +
                 "myUint16=65535, " +
diff --git a/clients/src/test/resources/common/message/SimpleExampleMessage.json b/clients/src/test/resources/common/message/SimpleExampleMessage.json
index 9b9c049593a..b1bad6aeac9 100644
--- a/clients/src/test/resources/common/message/SimpleExampleMessage.json
+++ b/clients/src/test/resources/common/message/SimpleExampleMessage.json
@@ -50,6 +50,8 @@
       "fields": [
         { "name": "structId", "type": "string", "versions": "2+", "about": "String field in struct"}
     ]},
+    { "name": "taggedLongFlexibleVersionSubset", "type": "int64", "default": "0", "ignorable": true,
+      "taggedVersions": "2+", "tag": 9 },
     { "name": "myCommonStruct", "type": "TestCommonStruct", "versions": "0+"},
     { "name": "myOtherCommonStruct", "type": "TestCommonStruct", "versions": "0+"},
     { "name": "myUint16", "type": "uint16", "versions": "1+", "default": "33000" },
diff --git a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
index 45188fa8109..6c477ef41a7 100644
--- a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
+++ b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
@@ -747,7 +747,7 @@ public final class MessageDataGenerator implements MessageClassGenerator {
             ifNotMember(__ -> {
                 generateCheckForUnsupportedNumTaggedFields("_numTaggedFields > 0");
             }).
-            ifMember(__ -> {
+            ifMember(flexibleVersions -> {
                 buffer.printf("_writable.writeUnsignedVarint(_numTaggedFields);%n");
                 int prevTag = -1;
                 for (FieldSpec field : taggedFields.values()) {
@@ -755,7 +755,7 @@ public final class MessageDataGenerator implements MessageClassGenerator {
                         buffer.printf("_rawWriter.writeRawTags(_writable, %d);%n", field.tag().get());
                     }
                     VersionConditional.
-                        forVersions(field.versions(), field.taggedVersions().intersect(field.versions())).
+                        forVersions(field.taggedVersions().intersect(field.versions()), flexibleVersions).
                         allowMembershipCheckAlwaysFalse(false).
                         ifMember(presentAndTaggedVersions -> {
                             IsNullConditional cond = IsNullConditional.forName(field.camelCaseName()).