You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/05/23 18:38:27 UTC

[kafka] branch trunk updated: MINOR: Remove checking on original joined subscription within handleAssignmentMismatch (#6782)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4574b24  MINOR: Remove checking on original joined subscription within handleAssignmentMismatch (#6782)
4574b24 is described below

commit 4574b2438a82115c5ec3ca3c381270b06fc2a8e3
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Thu May 23 11:38:10 2019 -0700

    MINOR: Remove checking on original joined subscription within handleAssignmentMismatch (#6782)
    
    When consumer coordinator realize the subscription may have changed, today we check again against the joinedSubscription within handleAssignmentMismatch. This checking however is a bit fishy and over-kill as well. It's better just simplifying it to always request re-join.
    
    The joinedSubscription object itself however still need to be maintained for potential augment to avoid extra re-joining the group.
    
    Since testOutdatedCoordinatorAssignment already cover the normal case we also remove the other invalidAssignment test case.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../consumer/internals/ConsumerCoordinator.java    | 50 +++++++---------------
 .../consumer/internals/SubscriptionState.java      | 26 +++++++++++
 .../internals/ConsumerCoordinatorTest.java         | 26 -----------
 3 files changed, 42 insertions(+), 60 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index b03af74..6af36e9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -39,7 +39,6 @@ import org.apache.kafka.common.message.JoinGroupResponseData;
 import org.apache.kafka.common.message.OffsetCommitRequestData;
 import org.apache.kafka.common.message.OffsetCommitResponseData;
 import org.apache.kafka.common.metrics.Measurable;
-import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
@@ -209,19 +208,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         return null;
     }
 
-    private void handleAssignmentMismatch(Assignment assignment) {
-        // We received an assignment that doesn't match our current subscription. If the subscription changed,
-        // we can ignore the assignment and rebalance. Otherwise we raise an error.
-        Set<TopicPartition> invalidAssignments = assignment.partitions().stream().filter(topicPartition ->
-                !joinedSubscription.contains(topicPartition.topic())).collect(Collectors.toSet());
-        if (invalidAssignments.size() > 0) {
-            throw new IllegalStateException("Consumer was assigned partitions " + invalidAssignments +
-                    " which didn't correspond to subscription request " + joinedSubscription);
-        }
-
-        requestRejoin();
-    }
-
     private void maybeUpdateJoinedSubscription(Set<TopicPartition> assignedPartitions) {
         // Check if the assignment contains some topics that were not in the original
         // subscription, if yes we will obey what leader has decided and add these topics
@@ -261,7 +247,12 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
         Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer);
         if (!subscriptions.assignFromSubscribed(assignment.partitions())) {
-            handleAssignmentMismatch(assignment);
+            log.warn("We received an assignment {} that doesn't match our current subscription {}; it is likely " +
+                "that the subscription has changed since we joined the group. Will try re-join the group with current subscription",
+                assignment.partitions(), subscriptions.prettyString());
+
+            requestRejoin();
+
             return;
         }
 
@@ -727,20 +718,17 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = subscriptions.allConsumed();
         log.debug("Sending asynchronous auto-commit of offsets {}", allConsumedOffsets);
 
-        commitOffsetsAsync(allConsumedOffsets, new OffsetCommitCallback() {
-            @Override
-            public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
-                if (exception != null) {
-                    if (exception instanceof RetriableException) {
-                        log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error: {}", offsets,
-                                exception);
-                        nextAutoCommitTimer.updateAndReset(retryBackoffMs);
-                    } else {
-                        log.warn("Asynchronous auto-commit of offsets {} failed: {}", offsets, exception.getMessage());
-                    }
+        commitOffsetsAsync(allConsumedOffsets, (offsets, exception) -> {
+            if (exception != null) {
+                if (exception instanceof RetriableException) {
+                    log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error: {}", offsets,
+                        exception);
+                    nextAutoCommitTimer.updateAndReset(retryBackoffMs);
                 } else {
-                    log.debug("Completed asynchronous auto-commit of offsets {}", offsets);
+                    log.warn("Asynchronous auto-commit of offsets {} failed: {}", offsets, exception.getMessage());
                 }
+            } else {
+                log.debug("Completed asynchronous auto-commit of offsets {}", offsets);
             }
         });
     }
@@ -1004,13 +992,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                 "The max time taken for a commit request"), new Max());
             this.commitLatency.add(createMeter(metrics, metricGrpName, "commit", "commit calls"));
 
-            Measurable numParts =
-                new Measurable() {
-                    public double measure(MetricConfig config, long now) {
-                        // Get the number of assigned partitions in a thread safe manner
-                        return subscriptions.numAssignedPartitions();
-                    }
-                };
+            Measurable numParts = (config, now) -> subscriptions.numAssignedPartitions();
             metrics.addMetric(metrics.metricName("assigned-partitions",
                 this.metricGrpName,
                 "The number of partitions currently assigned to this consumer"), numParts);
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 87d1a35..c48bd74 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -101,6 +101,32 @@ public class SubscriptionState {
 
     private int assignmentId = 0;
 
+    @Override
+    public String toString() {
+        return "SubscriptionState{" +
+            "type=" + subscriptionType +
+            ", subscribedPattern=" + subscribedPattern +
+            ", subscription=" + String.join(",", subscription) +
+            ", groupSubscription=" + String.join(",", groupSubscription) +
+            ", defaultResetStrategy=" + defaultResetStrategy +
+            ", assignment=" + assignment.partitionStateValues() + " (id=" + assignmentId + ")}";
+    }
+
+    public String prettyString() {
+        switch (subscriptionType) {
+            case NONE:
+                return "None";
+            case AUTO_TOPICS:
+                return "Subscribe(" + String.join(",", subscription) + ")";
+            case AUTO_PATTERN:
+                return "Subscribe(" + subscribedPattern + ")";
+            case USER_ASSIGNED:
+                return "Assign(" + assignedPartitions() + " , id=" + assignmentId + ")";
+            default:
+                throw new IllegalStateException("Unrecognized subscription type: " + subscriptionType);
+        }
+    }
+
     public SubscriptionState(LogContext logContext, OffsetResetStrategy defaultResetStrategy) {
         this.log = logContext.logger(this.getClass());
         this.defaultResetStrategy = defaultResetStrategy;
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 86032c4..6f62bbf 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -495,32 +495,6 @@ public class ConsumerCoordinatorTest {
     }
 
     @Test
-    public void testInvalidCoordinatorAssignment() {
-        final String consumerId = "invalid_assignment";
-
-        subscriptions.subscribe(singleton(topic1), rebalanceListener);
-
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
-
-        // normal join group
-        Map<String, List<String>> memberSubscriptions = singletonMap(consumerId, singletonList(topic2));
-        partitionAssignor.prepare(singletonMap(consumerId, singletonList(t2p)));
-
-        client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE));
-        client.prepareResponse(new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(AbstractRequest body) {
-                SyncGroupRequest sync = (SyncGroupRequest) body;
-                return sync.data.memberId().equals(consumerId) &&
-                        sync.data.generationId() == 1 &&
-                        sync.groupAssignments().containsKey(consumerId);
-            }
-        }, syncGroupResponse(singletonList(t2p), Errors.NONE));
-        assertThrows(IllegalStateException.class, () -> coordinator.poll(time.timer(Long.MAX_VALUE)));
-    }
-
-    @Test
     public void testPatternJoinGroupLeader() {
         final String consumerId = "leader";