You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2021/06/24 07:58:38 UTC

[kafka] branch trunk updated: KAFKA-12898; Owned partitions in the subscription must be sorted (#10878)

This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3866702  KAFKA-12898; Owned partitions in the subscription must be sorted (#10878)
3866702 is described below

commit 386670227b2e5ef7633c74033e2b99222b35d7cc
Author: David Jacot <dj...@confluent.io>
AuthorDate: Thu Jun 24 09:56:27 2021 +0200

    KAFKA-12898; Owned partitions in the subscription must be sorted (#10878)
    
    The group coordinator compares the provided subscription with the store subscription based on their bytes representation. So if the subscribed partitions are not in the same order, the group coordinator would consider that they are different and rebalance the group. This patch ensures that the topics and the owned partitions are sorted.
    
    Reviewers: Anna Sophie Blee-Goldman <ab...@apache.org>
---
 .../consumer/internals/ConsumerProtocol.java       | 22 ++++++++----
 .../consumer/internals/ConsumerProtocolTest.java   | 40 +++++++++++++++++-----
 2 files changed, 48 insertions(+), 14 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
index a9c7430..7df0fe8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
-import java.nio.BufferUnderflowException;
 import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment;
 import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
 import org.apache.kafka.common.TopicPartition;
@@ -27,7 +26,10 @@ import org.apache.kafka.common.protocol.MessageUtil;
 import org.apache.kafka.common.protocol.types.SchemaException;
 
 import java.nio.ByteBuffer;
+import java.nio.BufferUnderflowException;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 
 /**
@@ -70,16 +72,24 @@ public class ConsumerProtocol {
         version = checkSubscriptionVersion(version);
 
         ConsumerProtocolSubscription data = new ConsumerProtocolSubscription();
-        data.setTopics(subscription.topics());
+
+        List<String> topics = new ArrayList<>(subscription.topics());
+        Collections.sort(topics);
+        data.setTopics(topics);
+
         data.setUserData(subscription.userData() != null ? subscription.userData().duplicate() : null);
-        subscription.ownedPartitions().forEach(tp -> {
-            ConsumerProtocolSubscription.TopicPartition partition = data.ownedPartitions().find(tp.topic());
-            if (partition == null) {
+
+        List<TopicPartition> ownedPartitions = new ArrayList<>(subscription.ownedPartitions());
+        ownedPartitions.sort(Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition));
+        ConsumerProtocolSubscription.TopicPartition partition = null;
+        for (TopicPartition tp : ownedPartitions) {
+            if (partition == null || !partition.topic().equals(tp.topic())) {
                 partition = new ConsumerProtocolSubscription.TopicPartition().setTopic(tp.topic());
                 data.ownedPartitions().add(partition);
             }
             partition.partitions().add(tp.partition());
-        });
+        }
+
         return MessageUtil.toVersionPrefixedByteBuffer(version, data);
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
index 35a4a6f..a2d5120 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
@@ -58,7 +58,7 @@ public class ConsumerProtocolTest {
             ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription, version);
             Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer);
 
-            assertEquals(subscription.topics(), parsedSubscription.topics());
+            assertEquals(toSet(subscription.topics()), toSet(parsedSubscription.topics()));
             assertEquals(subscription.userData(), parsedSubscription.userData());
             assertFalse(parsedSubscription.groupInstanceId().isPresent());
 
@@ -75,7 +75,7 @@ public class ConsumerProtocolTest {
         Subscription subscription = new Subscription(Arrays.asList("foo", "bar"), ByteBuffer.wrap(new byte[0]));
         ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription);
         Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer);
-        assertEquals(subscription.topics(), parsedSubscription.topics());
+        assertEquals(toSet(subscription.topics()), toSet(parsedSubscription.topics()));
         assertEquals(0, parsedSubscription.userData().limit());
         assertFalse(parsedSubscription.groupInstanceId().isPresent());
     }
@@ -87,7 +87,7 @@ public class ConsumerProtocolTest {
 
         Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer);
         parsedSubscription.setGroupInstanceId(groupInstanceId);
-        assertEquals(subscription.topics(), parsedSubscription.topics());
+        assertEquals(toSet(subscription.topics()), toSet(parsedSubscription.topics()));
         assertEquals(0, parsedSubscription.userData().limit());
         assertEquals(groupInstanceId, parsedSubscription.groupInstanceId());
     }
@@ -97,16 +97,40 @@ public class ConsumerProtocolTest {
         Subscription subscription = new Subscription(Arrays.asList("foo", "bar"), null);
         ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription);
         Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer);
-        assertEquals(subscription.topics(), parsedSubscription.topics());
+        assertEquals(toSet(subscription.topics()), toSet(parsedSubscription.topics()));
         assertNull(parsedSubscription.userData());
     }
 
     @Test
+    public void serializeSubscriptionShouldOrderTopics() {
+        assertEquals(
+            ConsumerProtocol.serializeSubscription(
+                new Subscription(Arrays.asList("foo", "bar"), null, Arrays.asList(tp1, tp2))
+            ),
+            ConsumerProtocol.serializeSubscription(
+                new Subscription(Arrays.asList("bar", "foo"), null, Arrays.asList(tp1, tp2))
+            )
+        );
+    }
+
+    @Test
+    public void serializeSubscriptionShouldOrderOwnedPartitions() {
+        assertEquals(
+            ConsumerProtocol.serializeSubscription(
+                new Subscription(Arrays.asList("foo", "bar"), null, Arrays.asList(tp1, tp2))
+            ),
+            ConsumerProtocol.serializeSubscription(
+                new Subscription(Arrays.asList("foo", "bar"), null, Arrays.asList(tp2, tp1))
+            )
+        );
+    }
+
+    @Test
     public void deserializeOldSubscriptionVersion() {
         Subscription subscription = new Subscription(Arrays.asList("foo", "bar"), null);
         ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription, (short) 0);
         Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer);
-        assertEquals(parsedSubscription.topics(), parsedSubscription.topics());
+        assertEquals(toSet(parsedSubscription.topics()), toSet(parsedSubscription.topics()));
         assertNull(parsedSubscription.userData());
         assertTrue(parsedSubscription.ownedPartitions().isEmpty());
     }
@@ -118,7 +142,7 @@ public class ConsumerProtocolTest {
         // ignore the version assuming it is the old byte code, as it will blindly deserialize as V0
         ConsumerProtocol.deserializeVersion(buffer);
         Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer, (short) 0);
-        assertEquals(subscription.topics(), parsedSubscription.topics());
+        assertEquals(toSet(subscription.topics()), toSet(parsedSubscription.topics()));
         assertNull(parsedSubscription.userData());
         assertTrue(parsedSubscription.ownedPartitions().isEmpty());
         assertFalse(parsedSubscription.groupInstanceId().isPresent());
@@ -156,8 +180,8 @@ public class ConsumerProtocolTest {
 
         Subscription subscription = ConsumerProtocol.deserializeSubscription(buffer);
         subscription.setGroupInstanceId(groupInstanceId);
-        assertEquals(Collections.singletonList("topic"), subscription.topics());
-        assertEquals(Collections.singletonList(tp2), subscription.ownedPartitions());
+        assertEquals(Collections.singleton("topic"), toSet(subscription.topics()));
+        assertEquals(Collections.singleton(tp2), toSet(subscription.ownedPartitions()));
         assertEquals(groupInstanceId, subscription.groupInstanceId());
     }