You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2021/11/16 09:30:05 UTC

[kafka] branch 3.1 updated: KAFKA-13406: skip assignment validation for built-in cooperativeStickyAssignor (#11439)

This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.1 by this push:
     new dee8245  KAFKA-13406: skip assignment validation for built-in cooperativeStickyAssignor (#11439)
dee8245 is described below

commit dee8245a87753190ecb7866336c947aaa80b4284
Author: Luke Chen <sh...@gmail.com>
AuthorDate: Tue Nov 16 10:57:03 2021 +0800

    KAFKA-13406: skip assignment validation for built-in cooperativeStickyAssignor (#11439)
    
    This fix is trying to skip the assignment validation for built-in cooperative sticky assignor, since (a) we know the assignment is valid since we do essentially this same check already in the cooperative sticky assignor, and (b) the check is broken anyways due to potential for claimed `ownedPartitions` to be incorrect
    
    Reviewers: Anna Sophie Blee-Goldman <ab...@apache.org>
---
 .../consumer/internals/ConsumerCoordinator.java    |  10 +-
 .../internals/ConsumerCoordinatorTest.java         | 103 +++++++++++++++++++++
 2 files changed, 110 insertions(+), 3 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index bbdc434..fad7f92 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -82,6 +82,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 import static org.apache.kafka.clients.consumer.ConsumerConfig.ASSIGN_FROM_SUBSCRIBED_ASSIGNORS;
+import static org.apache.kafka.clients.consumer.CooperativeStickyAssignor.COOPERATIVE_STICKY_ASSIGNOR_NAME;
 
 /**
  * This class manages the coordination process with the consumer coordinator.
@@ -612,6 +613,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         ConsumerPartitionAssignor assignor = lookupAssignor(assignmentStrategy);
         if (assignor == null)
             throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
+        String assignorName = assignor.name();
 
         Set<String> allSubscribedTopics = new HashSet<>();
         Map<String, Subscription> subscriptions = new HashMap<>();
@@ -633,15 +635,17 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
         isLeader = true;
 
-        log.debug("Performing assignment using strategy {} with subscriptions {}", assignor.name(), subscriptions);
+        log.debug("Performing assignment using strategy {} with subscriptions {}", assignorName, subscriptions);
 
         Map<String, Assignment> assignments = assignor.assign(metadata.fetch(), new GroupSubscription(subscriptions)).groupAssignment();
 
-        if (protocol == RebalanceProtocol.COOPERATIVE) {
+        // skip the validation for built-in cooperative sticky assignor since we've considered
+        // the "generation" of ownedPartition inside the assignor
+        if (protocol == RebalanceProtocol.COOPERATIVE && !assignorName.equals(COOPERATIVE_STICKY_ASSIGNOR_NAME)) {
             validateCooperativeAssignment(ownedPartitions, assignments);
         }
 
-        maybeUpdateGroupSubscription(assignor.name(), assignments, allSubscribedTopics);
+        maybeUpdateGroupSubscription(assignorName, assignments, allSubscribedTopics);
 
         assignmentSnapshot = metadataSnapshot;
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index a1dbf3a..96aaf8b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -56,6 +56,10 @@ import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.protocol.types.Type;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.requests.HeartbeatResponse;
@@ -103,12 +107,14 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
 
+import static java.util.Collections.emptyList;
 import static java.util.Collections.emptySet;
 import static java.util.Collections.singleton;
 import static java.util.Collections.singletonList;
 import static java.util.Collections.singletonMap;
 import static org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE;
 import static org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.RebalanceProtocol.EAGER;
+import static org.apache.kafka.clients.consumer.CooperativeStickyAssignor.COOPERATIVE_STICKY_ASSIGNOR_NAME;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.test.TestUtils.toSet;
@@ -143,6 +149,7 @@ public abstract class ConsumerCoordinatorTest {
     private final List<ConsumerPartitionAssignor> assignors;
     private final Map<String, MockPartitionAssignor> assignorMap;
     private final String consumerId = "consumer";
+    private final String consumerId2 = "consumer2";
 
     private MockClient client;
     private MetadataResponse metadataResponse = RequestTestUtils.metadataUpdateWith(1, new HashMap<String, Integer>() {
@@ -319,6 +326,102 @@ public abstract class ConsumerCoordinatorTest {
         }
     }
 
+    public ByteBuffer subscriptionUserData(int generation) {
+        final String generationKeyName = "generation";
+        final Schema cooperativeStickyAssignorUserDataV0 = new Schema(
+            new Field(generationKeyName, Type.INT32));
+        Struct struct = new Struct(cooperativeStickyAssignorUserDataV0);
+
+        struct.set(generationKeyName, generation);
+        ByteBuffer buffer = ByteBuffer.allocate(cooperativeStickyAssignorUserDataV0.sizeOf(struct));
+        cooperativeStickyAssignorUserDataV0.write(buffer, struct);
+        buffer.flip();
+        return buffer;
+    }
+
+    private List<JoinGroupResponseData.JoinGroupResponseMember> validateCooperativeAssignmentTestSetup() {
+        // consumer1 and consumer2 subscribed to "topic1" with 2 partitions: t1p, t2p
+        Map<String, List<String>> memberSubscriptions = new HashMap<>();
+        List<String> subscribedTopics = singletonList(topic1);
+        memberSubscriptions.put(consumerId, subscribedTopics);
+        memberSubscriptions.put(consumerId2, subscribedTopics);
+
+        // the ownedPartition for consumer1 is t1p, t2p
+        ConsumerPartitionAssignor.Subscription subscriptionConsumer1 = new ConsumerPartitionAssignor.Subscription(
+            subscribedTopics, subscriptionUserData(1), Arrays.asList(t1p, t2p));
+
+        // the ownedPartition for consumer2 is empty
+        ConsumerPartitionAssignor.Subscription subscriptionConsumer2 = new ConsumerPartitionAssignor.Subscription(
+            subscribedTopics, subscriptionUserData(1), emptyList());
+
+        List<JoinGroupResponseData.JoinGroupResponseMember> metadata = new ArrayList<>();
+        for (Map.Entry<String, List<String>> subscriptionEntry : memberSubscriptions.entrySet()) {
+            ByteBuffer buf = null;
+            if (subscriptionEntry.getKey().equals(consumerId)) {
+                buf = ConsumerProtocol.serializeSubscription(subscriptionConsumer1);
+            } else {
+                buf = ConsumerProtocol.serializeSubscription(subscriptionConsumer2);
+            }
+
+            metadata.add(new JoinGroupResponseData.JoinGroupResponseMember()
+                .setMemberId(subscriptionEntry.getKey())
+                .setMetadata(buf.array()));
+        }
+
+        return metadata;
+    }
+
+    @Test
+    public void testPerformAssignmentShouldValidateCooperativeAssignment() {
+        SubscriptionState mockSubscriptionState = Mockito.mock(SubscriptionState.class);
+        List<JoinGroupResponseData.JoinGroupResponseMember> metadata = validateCooperativeAssignmentTestSetup();
+
+        // simulate the custom cooperative assignor didn't revoke the partition first before assign to other consumer
+        Map<String, List<TopicPartition>> assignment = new HashMap<>();
+        assignment.put(consumerId, Arrays.asList(t1p));
+        assignment.put(consumerId2, Arrays.asList(t2p));
+        partitionAssignor.prepare(assignment);
+
+        try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, false, mockSubscriptionState)) {
+            if (protocol == COOPERATIVE) {
+                // in cooperative protocol, we should throw exception when validating cooperative assignment
+                Exception e = assertThrows(IllegalStateException.class,
+                    () -> coordinator.performAssignment("1", partitionAssignor.name(), metadata));
+                assertTrue(e.getMessage().contains("Assignor supporting the COOPERATIVE protocol violates its requirements"));
+            } else {
+                // in eager protocol, we should not validate assignment
+                coordinator.performAssignment("1", partitionAssignor.name(), metadata);
+            }
+        }
+    }
+
+    @Test
+    public void testPerformAssignmentShouldSkipValidateCooperativeAssignmentForBuiltInCooperativeStickyAssignor() {
+        SubscriptionState mockSubscriptionState = Mockito.mock(SubscriptionState.class);
+        List<JoinGroupResponseData.JoinGroupResponseMember> metadata = validateCooperativeAssignmentTestSetup();
+
+        List<ConsumerPartitionAssignor> assignorsWithCooperativeStickyAssignor = new ArrayList<>(assignors);
+        // create a mockPartitionAssignor with the same name as cooperative sticky assignor
+        MockPartitionAssignor mockCooperativeStickyAssignor = new MockPartitionAssignor(Collections.singletonList(protocol)) {
+            @Override
+            public String name() {
+                return COOPERATIVE_STICKY_ASSIGNOR_NAME;
+            }
+        };
+        assignorsWithCooperativeStickyAssignor.add(mockCooperativeStickyAssignor);
+
+        // simulate the cooperative sticky assignor do the assignment with out-of-date ownedPartition
+        Map<String, List<TopicPartition>> assignment = new HashMap<>();
+        assignment.put(consumerId, Arrays.asList(t1p));
+        assignment.put(consumerId2, Arrays.asList(t2p));
+        mockCooperativeStickyAssignor.prepare(assignment);
+
+        try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignorsWithCooperativeStickyAssignor, false, mockSubscriptionState)) {
+            // should not validate assignment for built-in cooperative sticky assignor
+            coordinator.performAssignment("1", mockCooperativeStickyAssignor.name(), metadata);
+        }
+    }
+
     @Test
     public void testSelectRebalanceProtcol() {
         List<ConsumerPartitionAssignor> assignors = new ArrayList<>();