You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/11/04 09:22:04 UTC

[GitHub] [kafka] dajac commented on a diff in pull request #12748: KAFKA-13715: add generationId field in subscription

dajac commented on code in PR #12748:
URL: https://github.com/apache/kafka/pull/12748#discussion_r1013722215


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##########
@@ -117,30 +117,41 @@ private boolean allSubscriptionsEqual(Set<String> allTopics,
                 isAllSubscriptionsEqual = false;
             }
 
-            MemberData memberData = memberData(subscription);
+            Optional<Integer> generation;
+            List<TopicPartition> ownedPartitionsInMetadata;
+            if (!subscription.ownedPartitions().isEmpty() && subscription.generationId() != DEFAULT_GENERATION) {
+                // In ConsumerProtocolSubscription v2 or higher, we don't need to deserialize the byte buffer
+                // and take from fields directly
+                ownedPartitionsInMetadata = subscription.ownedPartitions();
+                generation = Optional.of(subscription.generationId());
+            } else {
+                MemberData memberData = memberData(subscription);
+                ownedPartitionsInMetadata = memberData.partitions;
+                generation = memberData.generation;
+            }

Review Comment:
   It seems that we also rely on the generation in `prepopulateCurrentAssignments` so I suppose that we need a similar logic over there in order to be consistent. Do we?
   
   Knowing this, I wonder if we should update `memberData` to get the correct generation from the subscription. That would ensure that all usages of `memberData` get the correct view. What do you think?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java:
##########
@@ -138,6 +146,10 @@ public Optional<String> groupInstanceId() {
             return groupInstanceId;
         }
 
+        public int generationId() {
+            return generationId;
+        }
+
         @Override
         public String toString() {

Review Comment:
   nit: We need to update this one to include the generation.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java:
##########
@@ -102,20 +104,26 @@ final class Subscription {
         private final ByteBuffer userData;
         private final List<TopicPartition> ownedPartitions;
         private Optional<String> groupInstanceId;
+        private final int generationId;

Review Comment:
   Did you consider using an Optional? That may be better than relying on `DEFAULT_GENERATION`.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java:
##########
@@ -247,4 +245,42 @@ public void deserializeFutureAssignmentVersion() {
         Assignment assignment = ConsumerProtocol.deserializeAssignment(buffer);
         assertEquals(toSet(Collections.singletonList(tp1)), toSet(assignment.partitions()));
     }
+
+    private ByteBuffer generateFutureSubscriptionVersionData(boolean hasGenerationId) {
+        // verify that a new version which adds a field is still parseable
+        short version = 100;
+
+        Schema subscriptionSchemaV100 = new Schema(
+            new Field("topics", new ArrayOf(Type.STRING)),
+            new Field("user_data", Type.NULLABLE_BYTES),
+            new Field("owned_partitions", new ArrayOf(
+                ConsumerProtocolSubscription.TopicPartition.SCHEMA_1)),
+            hasGenerationId ? new Field("generation_id", Type.INT32) : new Field("foo", Type.STRING),

Review Comment:
   This looks weird to me. If we are testing with version 100, `generation_id` should always be there, no? The latest version of the subscription has it. `ConsumerProtocol.deserializeAssignment()` will actually use the latest version, which includes generation id, to decode the payload here regardless of this. If you don't put it, the generation may get intitialized based on the `bar`'s data.



##########
clients/src/main/resources/common/message/ConsumerProtocolAssignment.json:
##########
@@ -21,7 +21,9 @@
   // The current implementation assumes that future versions will not break compatibility. When
   // it encounters a newer version, it parses it using the current format. This basically means
   // that new versions cannot remove or reorder any of the existing fields.
-  "validVersions": "0-1",
+  //
+  // version 2 is to support a new field "GenerationId" in ConsumerProtocolSubscription.

Review Comment:
   nit: `Version 2...`



##########
clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java:
##########
@@ -78,6 +80,26 @@ public void testDecodeGeneration() {
         assertFalse(((CooperativeStickyAssignor) assignor).memberData(subscription).generation.isPresent());
     }
 
+    @Test

Review Comment:
   All the tests use `buildSubscriptionWithGeneration` and `buildSubscription` which rely on the old method to encode the partitions and the generation. I wonder if we should update those in order to use the latest mechanism. What do you think? Then, we should have a few tests to ensure the backward compatibility.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java:
##########
@@ -146,43 +165,22 @@ public void deserializeNewSubscriptionWithOldVersion() {
         assertNull(parsedSubscription.userData());
         assertTrue(parsedSubscription.ownedPartitions().isEmpty());
         assertFalse(parsedSubscription.groupInstanceId().isPresent());
+        assertEquals(DEFAULT_GENERATION, parsedSubscription.generationId());
     }
 
-    @Test
-    public void deserializeFutureSubscriptionVersion() {
-        // verify that a new version which adds a field is still parseable
-        short version = 100;
-
-        Schema subscriptionSchemaV100 = new Schema(
-            new Field("topics", new ArrayOf(Type.STRING)),
-            new Field("user_data", Type.NULLABLE_BYTES),
-            new Field("owned_partitions", new ArrayOf(
-                ConsumerProtocolSubscription.TopicPartition.SCHEMA_1)),
-            new Field("foo", Type.STRING));
-
-        Struct subscriptionV100 = new Struct(subscriptionSchemaV100);
-        subscriptionV100.set("topics", new Object[]{"topic"});
-        subscriptionV100.set("user_data", ByteBuffer.wrap(new byte[0]));
-        subscriptionV100.set("owned_partitions", new Object[]{new Struct(
-            ConsumerProtocolSubscription.TopicPartition.SCHEMA_1)
-            .set("topic", tp2.topic())
-            .set("partitions", new Object[]{tp2.partition()})});
-        subscriptionV100.set("foo", "bar");
-
-        Struct headerV100 = new Struct(new Schema(new Field("version", Type.INT16)));
-        headerV100.set("version", version);
-
-        ByteBuffer buffer = ByteBuffer.allocate(subscriptionV100.sizeOf() + headerV100.sizeOf());
-        headerV100.writeTo(buffer);
-        subscriptionV100.writeTo(buffer);
-
-        buffer.flip();
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void deserializeFutureSubscriptionVersion(boolean hasGenerationId) {
+        ByteBuffer buffer = generateFutureSubscriptionVersionData(hasGenerationId);
 
         Subscription subscription = ConsumerProtocol.deserializeSubscription(buffer);
         subscription.setGroupInstanceId(groupInstanceId);
         assertEquals(Collections.singleton("topic"), toSet(subscription.topics()));
         assertEquals(Collections.singleton(tp2), toSet(subscription.ownedPartitions()));
         assertEquals(groupInstanceId, subscription.groupInstanceId());
+        if (hasGenerationId) {
+            assertEquals(generationId, subscription.generationId());
+        }

Review Comment:
   Out of curiosity, does the test pass if you had `assertEquals(DEFAULT_GENERATION, subscription.generationId())` in the `else` branch?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java:
##########
@@ -78,6 +80,26 @@ public void testDecodeGeneration() {
         assertFalse(((CooperativeStickyAssignor) assignor).memberData(subscription).generation.isPresent());
     }
 
+    @Test
+    public void testCooperativeStickyAssignorHonorSubscriptionUserdataIfNoGenerationIdInField() {
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, 2);
+        int higherGenerationId = 2;
+        int lowerGenerationId = 1;
+
+        assignor.onAssignment(new ConsumerPartitionAssignor.Assignment(partitions(tp1)), new ConsumerGroupMetadata(groupId, higherGenerationId, consumer1, Optional.empty()));
+        ByteBuffer userDataWithHigherGenerationId = assignor.subscriptionUserData(new HashSet<>(topics(topic)));

Review Comment:
   All the other tests rely on `buildSubscriptionWithGeneration` and `buildSubscription`, I wonder if we should do the same here. Continuing on my previous comment, we could perhaps extend or add new versions of those to differentiate the old way from the new way.



##########
clients/src/main/resources/common/message/ConsumerProtocolSubscription.json:
##########
@@ -21,7 +21,9 @@
   // The current implementation assumes that future versions will not break compatibility. When
   // it encounters a newer version, it parses it using the current format. This basically means
   // that new versions cannot remove or reorder any of the existing fields.
-  "validVersions": "0-1",
+
+  // Starting from version 2, a new field "GenerationId" is added to indicate if the member has out-of-date ownedPartitions.

Review Comment:
   While we are here, could we add a comment for version 1 as well?



##########
clients/src/main/resources/common/message/ConsumerProtocolSubscription.json:
##########
@@ -32,6 +34,7 @@
         { "name": "Topic", "type": "string", "mapKey": true, "versions": "1+", "entityType": "topicName" },
         { "name": "Partitions", "type": "[]int32", "versions": "1+"}
       ]
-    }
+    },
+    { "name": "GenerationId", "type": "int32", "versions": "2+", "default": "-1", "ignorable": true}

Review Comment:
   nit: A space misses before `}`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org