You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/12/15 05:25:33 UTC

[GitHub] [kafka] dengziming opened a new pull request #9754: KAFKA-10856: Convert sticky assignor userData schemas to use generated protocol

dengziming opened a new pull request #9754:
URL: https://github.com/apache/kafka/pull/9754


   *More detailed description of your change*
   Replace sticky assignor userData schemas to use generated protocol
   
   *Summary of testing strategy (including rationale)*
   Unit test
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #9754: KAFKA-10856: Convert sticky assignor userData schemas to use generated protocol

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #9754:
URL: https://github.com/apache/kafka/pull/9754#discussion_r546640844



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
##########
@@ -222,51 +204,50 @@ protected MemberData memberData(Subscription subscription) {
         return deserializeTopicPartitionAssignment(userData);
     }
 
-    // visible for testing
     static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData) {
-        Struct struct = new Struct(STICKY_ASSIGNOR_USER_DATA_V1);
-        List<Struct> topicAssignments = new ArrayList<>();
+        return serializeTopicPartitionAssignment(memberData, StickyAssignorUserData.HIGHEST_SUPPORTED_VERSION);
+    }
+
+    // visible for testing
+    static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData, short version) {
+
+        List<StickyAssignorUserData.TopicPartition> topicAssignments = new ArrayList<>();
         for (Map.Entry<String, List<Integer>> topicEntry : CollectionUtils.groupPartitionsByTopic(memberData.partitions).entrySet()) {
-            Struct topicAssignment = new Struct(TOPIC_ASSIGNMENT);
-            topicAssignment.set(TOPIC_KEY_NAME, topicEntry.getKey());
-            topicAssignment.set(PARTITIONS_KEY_NAME, topicEntry.getValue().toArray());
-            topicAssignments.add(topicAssignment);
+            StickyAssignorUserData.TopicPartition topicPartition = new StickyAssignorUserData.TopicPartition()
+                    .setTopic(topicEntry.getKey())
+                    .setPartitions(topicEntry.getValue());
+            topicAssignments.add(topicPartition);
+        }
+        StickyAssignorUserData data = new StickyAssignorUserData()
+                .setPreviousAssignment(topicAssignments);
+        if (version >= 1) {
+            memberData.generation.ifPresent(data::setGeneration);
         }
-        struct.set(TOPIC_PARTITIONS_KEY_NAME, topicAssignments.toArray());
-        if (memberData.generation.isPresent())
-            struct.set(GENERATION_KEY_NAME, memberData.generation.get());
-        ByteBuffer buffer = ByteBuffer.allocate(STICKY_ASSIGNOR_USER_DATA_V1.sizeOf(struct));
-        STICKY_ASSIGNOR_USER_DATA_V1.write(buffer, struct);
-        buffer.flip();
-        return buffer;
+        return MessageUtil.toVersionPrefixedByteBuffer(version, data);
     }
 
-    private static MemberData deserializeTopicPartitionAssignment(ByteBuffer buffer) {
-        Struct struct;
-        ByteBuffer copy = buffer.duplicate();
+    private static MemberData deserializeTopicPartitionAssignment(ByteBuffer buffer, short version) {
+        StickyAssignorUserData data;
         try {
-            struct = STICKY_ASSIGNOR_USER_DATA_V1.read(buffer);
+            data = new StickyAssignorUserData(new ByteBufferAccessor(buffer), version);

Review comment:
       Do you think we need flexible version support here? @chia7712 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on pull request #9754: KAFKA-10856: Convert sticky assignor userData schemas to use generated protocol

Posted by GitBox <gi...@apache.org>.
dengziming commented on pull request #9754:
URL: https://github.com/apache/kafka/pull/9754#issuecomment-745707291


   Hi, @ableegoldman @guozhangwang PTAL.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #9754: KAFKA-10856: Convert sticky assignor userData schemas to use generated protocol

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #9754:
URL: https://github.com/apache/kafka/pull/9754#discussion_r544818885



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
##########
@@ -222,51 +204,50 @@ protected MemberData memberData(Subscription subscription) {
         return deserializeTopicPartitionAssignment(userData);
     }
 
-    // visible for testing
     static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData) {
-        Struct struct = new Struct(STICKY_ASSIGNOR_USER_DATA_V1);
-        List<Struct> topicAssignments = new ArrayList<>();
+        return serializeTopicPartitionAssignment(memberData, StickyAssignorUserData.HIGHEST_SUPPORTED_VERSION);
+    }
+
+    // visible for testing
+    static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData, short version) {
+
+        List<StickyAssignorUserData.TopicPartition> topicAssignments = new ArrayList<>();
         for (Map.Entry<String, List<Integer>> topicEntry : CollectionUtils.groupPartitionsByTopic(memberData.partitions).entrySet()) {
-            Struct topicAssignment = new Struct(TOPIC_ASSIGNMENT);
-            topicAssignment.set(TOPIC_KEY_NAME, topicEntry.getKey());
-            topicAssignment.set(PARTITIONS_KEY_NAME, topicEntry.getValue().toArray());
-            topicAssignments.add(topicAssignment);
+            StickyAssignorUserData.TopicPartition topicPartition = new StickyAssignorUserData.TopicPartition()
+                    .setTopic(topicEntry.getKey())
+                    .setPartitions(topicEntry.getValue());
+            topicAssignments.add(topicPartition);
+        }
+        StickyAssignorUserData data = new StickyAssignorUserData()
+                .setPreviousAssignment(topicAssignments);
+        if (version >= 1) {
+            memberData.generation.ifPresent(data::setGeneration);
         }
-        struct.set(TOPIC_PARTITIONS_KEY_NAME, topicAssignments.toArray());
-        if (memberData.generation.isPresent())
-            struct.set(GENERATION_KEY_NAME, memberData.generation.get());
-        ByteBuffer buffer = ByteBuffer.allocate(STICKY_ASSIGNOR_USER_DATA_V1.sizeOf(struct));
-        STICKY_ASSIGNOR_USER_DATA_V1.write(buffer, struct);
-        buffer.flip();
-        return buffer;
+        return MessageUtil.toVersionPrefixedByteBuffer(version, data);
     }
 
-    private static MemberData deserializeTopicPartitionAssignment(ByteBuffer buffer) {
-        Struct struct;
-        ByteBuffer copy = buffer.duplicate();
+    private static MemberData deserializeTopicPartitionAssignment(ByteBuffer buffer, short version) {
+        StickyAssignorUserData data;
         try {
-            struct = STICKY_ASSIGNOR_USER_DATA_V1.read(buffer);
+            data = new StickyAssignorUserData(new ByteBufferAccessor(buffer), version);

Review comment:
       Yes, the previous ser and de-ser carry no version so the code isn't graceful, I think we needn't keep this backtrack compatibility since this code is only executed at client.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #9754: KAFKA-10856: Convert sticky assignor userData schemas to use generated protocol

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #9754:
URL: https://github.com/apache/kafka/pull/9754#discussion_r545026872



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
##########
@@ -222,51 +204,50 @@ protected MemberData memberData(Subscription subscription) {
         return deserializeTopicPartitionAssignment(userData);
     }
 
-    // visible for testing
     static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData) {
-        Struct struct = new Struct(STICKY_ASSIGNOR_USER_DATA_V1);
-        List<Struct> topicAssignments = new ArrayList<>();
+        return serializeTopicPartitionAssignment(memberData, StickyAssignorUserData.HIGHEST_SUPPORTED_VERSION);
+    }
+
+    // visible for testing
+    static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData, short version) {
+
+        List<StickyAssignorUserData.TopicPartition> topicAssignments = new ArrayList<>();
         for (Map.Entry<String, List<Integer>> topicEntry : CollectionUtils.groupPartitionsByTopic(memberData.partitions).entrySet()) {
-            Struct topicAssignment = new Struct(TOPIC_ASSIGNMENT);
-            topicAssignment.set(TOPIC_KEY_NAME, topicEntry.getKey());
-            topicAssignment.set(PARTITIONS_KEY_NAME, topicEntry.getValue().toArray());
-            topicAssignments.add(topicAssignment);
+            StickyAssignorUserData.TopicPartition topicPartition = new StickyAssignorUserData.TopicPartition()
+                    .setTopic(topicEntry.getKey())
+                    .setPartitions(topicEntry.getValue());
+            topicAssignments.add(topicPartition);
+        }
+        StickyAssignorUserData data = new StickyAssignorUserData()
+                .setPreviousAssignment(topicAssignments);
+        if (version >= 1) {
+            memberData.generation.ifPresent(data::setGeneration);
         }
-        struct.set(TOPIC_PARTITIONS_KEY_NAME, topicAssignments.toArray());
-        if (memberData.generation.isPresent())
-            struct.set(GENERATION_KEY_NAME, memberData.generation.get());
-        ByteBuffer buffer = ByteBuffer.allocate(STICKY_ASSIGNOR_USER_DATA_V1.sizeOf(struct));
-        STICKY_ASSIGNOR_USER_DATA_V1.write(buffer, struct);
-        buffer.flip();
-        return buffer;
+        return MessageUtil.toVersionPrefixedByteBuffer(version, data);
     }
 
-    private static MemberData deserializeTopicPartitionAssignment(ByteBuffer buffer) {
-        Struct struct;
-        ByteBuffer copy = buffer.duplicate();
+    private static MemberData deserializeTopicPartitionAssignment(ByteBuffer buffer, short version) {
+        StickyAssignorUserData data;
         try {
-            struct = STICKY_ASSIGNOR_USER_DATA_V1.read(buffer);
+            data = new StickyAssignorUserData(new ByteBufferAccessor(buffer), version);

Review comment:
       @chia7712 Thank you, I removed the version when serialize, and add a loop when deserialize, Do you think that's right.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #9754: KAFKA-10856: Convert sticky assignor userData schemas to use generated protocol

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #9754:
URL: https://github.com/apache/kafka/pull/9754#discussion_r544818885



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
##########
@@ -222,51 +204,50 @@ protected MemberData memberData(Subscription subscription) {
         return deserializeTopicPartitionAssignment(userData);
     }
 
-    // visible for testing
     static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData) {
-        Struct struct = new Struct(STICKY_ASSIGNOR_USER_DATA_V1);
-        List<Struct> topicAssignments = new ArrayList<>();
+        return serializeTopicPartitionAssignment(memberData, StickyAssignorUserData.HIGHEST_SUPPORTED_VERSION);
+    }
+
+    // visible for testing
+    static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData, short version) {
+
+        List<StickyAssignorUserData.TopicPartition> topicAssignments = new ArrayList<>();
         for (Map.Entry<String, List<Integer>> topicEntry : CollectionUtils.groupPartitionsByTopic(memberData.partitions).entrySet()) {
-            Struct topicAssignment = new Struct(TOPIC_ASSIGNMENT);
-            topicAssignment.set(TOPIC_KEY_NAME, topicEntry.getKey());
-            topicAssignment.set(PARTITIONS_KEY_NAME, topicEntry.getValue().toArray());
-            topicAssignments.add(topicAssignment);
+            StickyAssignorUserData.TopicPartition topicPartition = new StickyAssignorUserData.TopicPartition()
+                    .setTopic(topicEntry.getKey())
+                    .setPartitions(topicEntry.getValue());
+            topicAssignments.add(topicPartition);
+        }
+        StickyAssignorUserData data = new StickyAssignorUserData()
+                .setPreviousAssignment(topicAssignments);
+        if (version >= 1) {
+            memberData.generation.ifPresent(data::setGeneration);
         }
-        struct.set(TOPIC_PARTITIONS_KEY_NAME, topicAssignments.toArray());
-        if (memberData.generation.isPresent())
-            struct.set(GENERATION_KEY_NAME, memberData.generation.get());
-        ByteBuffer buffer = ByteBuffer.allocate(STICKY_ASSIGNOR_USER_DATA_V1.sizeOf(struct));
-        STICKY_ASSIGNOR_USER_DATA_V1.write(buffer, struct);
-        buffer.flip();
-        return buffer;
+        return MessageUtil.toVersionPrefixedByteBuffer(version, data);
     }
 
-    private static MemberData deserializeTopicPartitionAssignment(ByteBuffer buffer) {
-        Struct struct;
-        ByteBuffer copy = buffer.duplicate();
+    private static MemberData deserializeTopicPartitionAssignment(ByteBuffer buffer, short version) {
+        StickyAssignorUserData data;
         try {
-            struct = STICKY_ASSIGNOR_USER_DATA_V1.read(buffer);
+            data = new StickyAssignorUserData(new ByteBufferAccessor(buffer), version);

Review comment:
       Yes, the previous ser and de-ser carry no version so the code isn't graceful, I think we needn't keep this backtrack compatability since this code is only executed at client.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #9754: KAFKA-10856: Convert sticky assignor userData schemas to use generated protocol

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #9754:
URL: https://github.com/apache/kafka/pull/9754#discussion_r544811722



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
##########
@@ -222,51 +204,50 @@ protected MemberData memberData(Subscription subscription) {
         return deserializeTopicPartitionAssignment(userData);
     }
 
-    // visible for testing
     static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData) {
-        Struct struct = new Struct(STICKY_ASSIGNOR_USER_DATA_V1);
-        List<Struct> topicAssignments = new ArrayList<>();
+        return serializeTopicPartitionAssignment(memberData, StickyAssignorUserData.HIGHEST_SUPPORTED_VERSION);
+    }
+
+    // visible for testing
+    static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData, short version) {
+
+        List<StickyAssignorUserData.TopicPartition> topicAssignments = new ArrayList<>();
         for (Map.Entry<String, List<Integer>> topicEntry : CollectionUtils.groupPartitionsByTopic(memberData.partitions).entrySet()) {

Review comment:
       Because topicAssignments put all partitions of a topic in a field. I think the name `TopicPartition` is misleading and I changed it to `TopicPartitions`, and also the `TopicPartition` in ConsumerProtocolAssignment.json and ConsumerProtocolSubscription.json is also misleading, we could also create a new pr the alter them.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #9754: KAFKA-10856: Convert sticky assignor userData schemas to use generated protocol

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #9754:
URL: https://github.com/apache/kafka/pull/9754#discussion_r546640844



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
##########
@@ -222,51 +204,50 @@ protected MemberData memberData(Subscription subscription) {
         return deserializeTopicPartitionAssignment(userData);
     }
 
-    // visible for testing
     static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData) {
-        Struct struct = new Struct(STICKY_ASSIGNOR_USER_DATA_V1);
-        List<Struct> topicAssignments = new ArrayList<>();
+        return serializeTopicPartitionAssignment(memberData, StickyAssignorUserData.HIGHEST_SUPPORTED_VERSION);
+    }
+
+    // visible for testing
+    static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData, short version) {
+
+        List<StickyAssignorUserData.TopicPartition> topicAssignments = new ArrayList<>();
         for (Map.Entry<String, List<Integer>> topicEntry : CollectionUtils.groupPartitionsByTopic(memberData.partitions).entrySet()) {
-            Struct topicAssignment = new Struct(TOPIC_ASSIGNMENT);
-            topicAssignment.set(TOPIC_KEY_NAME, topicEntry.getKey());
-            topicAssignment.set(PARTITIONS_KEY_NAME, topicEntry.getValue().toArray());
-            topicAssignments.add(topicAssignment);
+            StickyAssignorUserData.TopicPartition topicPartition = new StickyAssignorUserData.TopicPartition()
+                    .setTopic(topicEntry.getKey())
+                    .setPartitions(topicEntry.getValue());
+            topicAssignments.add(topicPartition);
+        }
+        StickyAssignorUserData data = new StickyAssignorUserData()
+                .setPreviousAssignment(topicAssignments);
+        if (version >= 1) {
+            memberData.generation.ifPresent(data::setGeneration);
         }
-        struct.set(TOPIC_PARTITIONS_KEY_NAME, topicAssignments.toArray());
-        if (memberData.generation.isPresent())
-            struct.set(GENERATION_KEY_NAME, memberData.generation.get());
-        ByteBuffer buffer = ByteBuffer.allocate(STICKY_ASSIGNOR_USER_DATA_V1.sizeOf(struct));
-        STICKY_ASSIGNOR_USER_DATA_V1.write(buffer, struct);
-        buffer.flip();
-        return buffer;
+        return MessageUtil.toVersionPrefixedByteBuffer(version, data);
     }
 
-    private static MemberData deserializeTopicPartitionAssignment(ByteBuffer buffer) {
-        Struct struct;
-        ByteBuffer copy = buffer.duplicate();
+    private static MemberData deserializeTopicPartitionAssignment(ByteBuffer buffer, short version) {
+        StickyAssignorUserData data;
         try {
-            struct = STICKY_ASSIGNOR_USER_DATA_V1.read(buffer);
+            data = new StickyAssignorUserData(new ByteBufferAccessor(buffer), version);

Review comment:
       Do you think we need flexible version support here? @chia7712 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #9754: KAFKA-10856: Convert sticky assignor userData schemas to use generated protocol

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #9754:
URL: https://github.com/apache/kafka/pull/9754#discussion_r544131964



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
##########
@@ -222,51 +204,50 @@ protected MemberData memberData(Subscription subscription) {
         return deserializeTopicPartitionAssignment(userData);
     }
 
-    // visible for testing
     static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData) {
-        Struct struct = new Struct(STICKY_ASSIGNOR_USER_DATA_V1);
-        List<Struct> topicAssignments = new ArrayList<>();
+        return serializeTopicPartitionAssignment(memberData, StickyAssignorUserData.HIGHEST_SUPPORTED_VERSION);
+    }
+
+    // visible for testing
+    static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData, short version) {
+
+        List<StickyAssignorUserData.TopicPartition> topicAssignments = new ArrayList<>();
         for (Map.Entry<String, List<Integer>> topicEntry : CollectionUtils.groupPartitionsByTopic(memberData.partitions).entrySet()) {
-            Struct topicAssignment = new Struct(TOPIC_ASSIGNMENT);
-            topicAssignment.set(TOPIC_KEY_NAME, topicEntry.getKey());
-            topicAssignment.set(PARTITIONS_KEY_NAME, topicEntry.getValue().toArray());
-            topicAssignments.add(topicAssignment);
+            StickyAssignorUserData.TopicPartition topicPartition = new StickyAssignorUserData.TopicPartition()
+                    .setTopic(topicEntry.getKey())
+                    .setPartitions(topicEntry.getValue());
+            topicAssignments.add(topicPartition);
+        }
+        StickyAssignorUserData data = new StickyAssignorUserData()
+                .setPreviousAssignment(topicAssignments);
+        if (version >= 1) {

Review comment:
       How about using ```"ignorable": true``` to eliminate this if-else?

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
##########
@@ -222,51 +204,50 @@ protected MemberData memberData(Subscription subscription) {
         return deserializeTopicPartitionAssignment(userData);
     }
 
-    // visible for testing
     static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData) {
-        Struct struct = new Struct(STICKY_ASSIGNOR_USER_DATA_V1);
-        List<Struct> topicAssignments = new ArrayList<>();
+        return serializeTopicPartitionAssignment(memberData, StickyAssignorUserData.HIGHEST_SUPPORTED_VERSION);
+    }
+
+    // visible for testing
+    static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData, short version) {
+
+        List<StickyAssignorUserData.TopicPartition> topicAssignments = new ArrayList<>();
         for (Map.Entry<String, List<Integer>> topicEntry : CollectionUtils.groupPartitionsByTopic(memberData.partitions).entrySet()) {

Review comment:
       Why we need ```CollectionUtils.groupPartitionsByTopic(memberData.partitions)``` here ?

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
##########
@@ -222,51 +204,50 @@ protected MemberData memberData(Subscription subscription) {
         return deserializeTopicPartitionAssignment(userData);
     }
 
-    // visible for testing
     static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData) {
-        Struct struct = new Struct(STICKY_ASSIGNOR_USER_DATA_V1);
-        List<Struct> topicAssignments = new ArrayList<>();
+        return serializeTopicPartitionAssignment(memberData, StickyAssignorUserData.HIGHEST_SUPPORTED_VERSION);
+    }
+
+    // visible for testing
+    static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData, short version) {
+
+        List<StickyAssignorUserData.TopicPartition> topicAssignments = new ArrayList<>();
         for (Map.Entry<String, List<Integer>> topicEntry : CollectionUtils.groupPartitionsByTopic(memberData.partitions).entrySet()) {
-            Struct topicAssignment = new Struct(TOPIC_ASSIGNMENT);
-            topicAssignment.set(TOPIC_KEY_NAME, topicEntry.getKey());
-            topicAssignment.set(PARTITIONS_KEY_NAME, topicEntry.getValue().toArray());
-            topicAssignments.add(topicAssignment);
+            StickyAssignorUserData.TopicPartition topicPartition = new StickyAssignorUserData.TopicPartition()
+                    .setTopic(topicEntry.getKey())
+                    .setPartitions(topicEntry.getValue());
+            topicAssignments.add(topicPartition);
+        }
+        StickyAssignorUserData data = new StickyAssignorUserData()
+                .setPreviousAssignment(topicAssignments);
+        if (version >= 1) {
+            memberData.generation.ifPresent(data::setGeneration);
         }
-        struct.set(TOPIC_PARTITIONS_KEY_NAME, topicAssignments.toArray());
-        if (memberData.generation.isPresent())
-            struct.set(GENERATION_KEY_NAME, memberData.generation.get());
-        ByteBuffer buffer = ByteBuffer.allocate(STICKY_ASSIGNOR_USER_DATA_V1.sizeOf(struct));
-        STICKY_ASSIGNOR_USER_DATA_V1.write(buffer, struct);
-        buffer.flip();
-        return buffer;
+        return MessageUtil.toVersionPrefixedByteBuffer(version, data);
     }
 
-    private static MemberData deserializeTopicPartitionAssignment(ByteBuffer buffer) {
-        Struct struct;
-        ByteBuffer copy = buffer.duplicate();
+    private static MemberData deserializeTopicPartitionAssignment(ByteBuffer buffer, short version) {
+        StickyAssignorUserData data;
         try {
-            struct = STICKY_ASSIGNOR_USER_DATA_V1.read(buffer);
+            data = new StickyAssignorUserData(new ByteBufferAccessor(buffer), version);

Review comment:
       It seems to me previous serialization does not carry version field so we have to use this ugly code to handle different version.

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
##########
@@ -222,51 +204,50 @@ protected MemberData memberData(Subscription subscription) {
         return deserializeTopicPartitionAssignment(userData);
     }
 
-    // visible for testing
     static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData) {
-        Struct struct = new Struct(STICKY_ASSIGNOR_USER_DATA_V1);
-        List<Struct> topicAssignments = new ArrayList<>();
+        return serializeTopicPartitionAssignment(memberData, StickyAssignorUserData.HIGHEST_SUPPORTED_VERSION);
+    }
+
+    // visible for testing
+    static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData, short version) {
+
+        List<StickyAssignorUserData.TopicPartition> topicAssignments = new ArrayList<>();
         for (Map.Entry<String, List<Integer>> topicEntry : CollectionUtils.groupPartitionsByTopic(memberData.partitions).entrySet()) {
-            Struct topicAssignment = new Struct(TOPIC_ASSIGNMENT);
-            topicAssignment.set(TOPIC_KEY_NAME, topicEntry.getKey());
-            topicAssignment.set(PARTITIONS_KEY_NAME, topicEntry.getValue().toArray());
-            topicAssignments.add(topicAssignment);
+            StickyAssignorUserData.TopicPartition topicPartition = new StickyAssignorUserData.TopicPartition()
+                    .setTopic(topicEntry.getKey())
+                    .setPartitions(topicEntry.getValue());
+            topicAssignments.add(topicPartition);
+        }
+        StickyAssignorUserData data = new StickyAssignorUserData()
+                .setPreviousAssignment(topicAssignments);
+        if (version >= 1) {
+            memberData.generation.ifPresent(data::setGeneration);
         }
-        struct.set(TOPIC_PARTITIONS_KEY_NAME, topicAssignments.toArray());
-        if (memberData.generation.isPresent())
-            struct.set(GENERATION_KEY_NAME, memberData.generation.get());
-        ByteBuffer buffer = ByteBuffer.allocate(STICKY_ASSIGNOR_USER_DATA_V1.sizeOf(struct));
-        STICKY_ASSIGNOR_USER_DATA_V1.write(buffer, struct);
-        buffer.flip();
-        return buffer;
+        return MessageUtil.toVersionPrefixedByteBuffer(version, data);

Review comment:
       Does previous serialization have version field?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #9754: KAFKA-10856: Convert sticky assignor userData schemas to use generated protocol

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #9754:
URL: https://github.com/apache/kafka/pull/9754#discussion_r544811722



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
##########
@@ -222,51 +204,50 @@ protected MemberData memberData(Subscription subscription) {
         return deserializeTopicPartitionAssignment(userData);
     }
 
-    // visible for testing
     static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData) {
-        Struct struct = new Struct(STICKY_ASSIGNOR_USER_DATA_V1);
-        List<Struct> topicAssignments = new ArrayList<>();
+        return serializeTopicPartitionAssignment(memberData, StickyAssignorUserData.HIGHEST_SUPPORTED_VERSION);
+    }
+
+    // visible for testing
+    static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData, short version) {
+
+        List<StickyAssignorUserData.TopicPartition> topicAssignments = new ArrayList<>();
         for (Map.Entry<String, List<Integer>> topicEntry : CollectionUtils.groupPartitionsByTopic(memberData.partitions).entrySet()) {

Review comment:
       Because topicAssignments put all partitions of a topic in a field. I think the name `TopicPartition` is misleading and I changed it to `TopicPartitions`, and also the `TopicPartition` in `ConsumerProtocolAssignment.json` and `ConsumerProtocolSubscription.json` is also misleading, we could also create a new pr the alter them.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #9754: KAFKA-10856: Convert sticky assignor userData schemas to use generated protocol

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #9754:
URL: https://github.com/apache/kafka/pull/9754#discussion_r544844791



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
##########
@@ -222,51 +204,50 @@ protected MemberData memberData(Subscription subscription) {
         return deserializeTopicPartitionAssignment(userData);
     }
 
-    // visible for testing
     static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData) {
-        Struct struct = new Struct(STICKY_ASSIGNOR_USER_DATA_V1);
-        List<Struct> topicAssignments = new ArrayList<>();
+        return serializeTopicPartitionAssignment(memberData, StickyAssignorUserData.HIGHEST_SUPPORTED_VERSION);
+    }
+
+    // visible for testing
+    static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData, short version) {
+
+        List<StickyAssignorUserData.TopicPartition> topicAssignments = new ArrayList<>();
         for (Map.Entry<String, List<Integer>> topicEntry : CollectionUtils.groupPartitionsByTopic(memberData.partitions).entrySet()) {
-            Struct topicAssignment = new Struct(TOPIC_ASSIGNMENT);
-            topicAssignment.set(TOPIC_KEY_NAME, topicEntry.getKey());
-            topicAssignment.set(PARTITIONS_KEY_NAME, topicEntry.getValue().toArray());
-            topicAssignments.add(topicAssignment);
+            StickyAssignorUserData.TopicPartition topicPartition = new StickyAssignorUserData.TopicPartition()
+                    .setTopic(topicEntry.getKey())
+                    .setPartitions(topicEntry.getValue());
+            topicAssignments.add(topicPartition);
+        }
+        StickyAssignorUserData data = new StickyAssignorUserData()
+                .setPreviousAssignment(topicAssignments);
+        if (version >= 1) {
+            memberData.generation.ifPresent(data::setGeneration);
         }
-        struct.set(TOPIC_PARTITIONS_KEY_NAME, topicAssignments.toArray());
-        if (memberData.generation.isPresent())
-            struct.set(GENERATION_KEY_NAME, memberData.generation.get());
-        ByteBuffer buffer = ByteBuffer.allocate(STICKY_ASSIGNOR_USER_DATA_V1.sizeOf(struct));
-        STICKY_ASSIGNOR_USER_DATA_V1.write(buffer, struct);
-        buffer.flip();
-        return buffer;
+        return MessageUtil.toVersionPrefixedByteBuffer(version, data);
     }
 
-    private static MemberData deserializeTopicPartitionAssignment(ByteBuffer buffer) {
-        Struct struct;
-        ByteBuffer copy = buffer.duplicate();
+    private static MemberData deserializeTopicPartitionAssignment(ByteBuffer buffer, short version) {
+        StickyAssignorUserData data;
         try {
-            struct = STICKY_ASSIGNOR_USER_DATA_V1.read(buffer);
+            data = new StickyAssignorUserData(new ByteBufferAccessor(buffer), version);

Review comment:
       > I think we needn't keep this backtrack compatibility since this code is only executed at client.
   
   It seems to me that is not allowed since the different (consumer) clients with different (kafka) versions should be able to work together. Please take a look at https://cwiki.apache.org/confluence/display/KAFKA/KIP-341%3A+Update+Sticky+Assignor%27s+User+Data+Protocol to see how Kafka consider backward/Forward compatibility.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org