You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/05/23 18:38:27 UTC
[kafka] branch trunk updated: MINOR: Remove checking on original
joined subscription within handleAssignmentMismatch (#6782)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4574b24 MINOR: Remove checking on original joined subscription within handleAssignmentMismatch (#6782)
4574b24 is described below
commit 4574b2438a82115c5ec3ca3c381270b06fc2a8e3
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Thu May 23 11:38:10 2019 -0700
MINOR: Remove checking on original joined subscription within handleAssignmentMismatch (#6782)
When consumer coordinator realize the subscription may have changed, today we check again against the joinedSubscription within handleAssignmentMismatch. This checking however is a bit fishy and over-kill as well. It's better just simplifying it to always request re-join.
The joinedSubscription object itself however still need to be maintained for potential augment to avoid extra re-joining the group.
Since testOutdatedCoordinatorAssignment already cover the normal case we also remove the other invalidAssignment test case.
Reviewers: Jason Gustafson <ja...@confluent.io>
---
.../consumer/internals/ConsumerCoordinator.java | 50 +++++++---------------
.../consumer/internals/SubscriptionState.java | 26 +++++++++++
.../internals/ConsumerCoordinatorTest.java | 26 -----------
3 files changed, 42 insertions(+), 60 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index b03af74..6af36e9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -39,7 +39,6 @@ import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData;
import org.apache.kafka.common.metrics.Measurable;
-import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
@@ -209,19 +208,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
return null;
}
- private void handleAssignmentMismatch(Assignment assignment) {
- // We received an assignment that doesn't match our current subscription. If the subscription changed,
- // we can ignore the assignment and rebalance. Otherwise we raise an error.
- Set<TopicPartition> invalidAssignments = assignment.partitions().stream().filter(topicPartition ->
- !joinedSubscription.contains(topicPartition.topic())).collect(Collectors.toSet());
- if (invalidAssignments.size() > 0) {
- throw new IllegalStateException("Consumer was assigned partitions " + invalidAssignments +
- " which didn't correspond to subscription request " + joinedSubscription);
- }
-
- requestRejoin();
- }
-
private void maybeUpdateJoinedSubscription(Set<TopicPartition> assignedPartitions) {
// Check if the assignment contains some topics that were not in the original
// subscription, if yes we will obey what leader has decided and add these topics
@@ -261,7 +247,12 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer);
if (!subscriptions.assignFromSubscribed(assignment.partitions())) {
- handleAssignmentMismatch(assignment);
+ log.warn("We received an assignment {} that doesn't match our current subscription {}; it is likely " +
+ "that the subscription has changed since we joined the group. Will try re-join the group with current subscription",
+ assignment.partitions(), subscriptions.prettyString());
+
+ requestRejoin();
+
return;
}
@@ -727,20 +718,17 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = subscriptions.allConsumed();
log.debug("Sending asynchronous auto-commit of offsets {}", allConsumedOffsets);
- commitOffsetsAsync(allConsumedOffsets, new OffsetCommitCallback() {
- @Override
- public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
- if (exception != null) {
- if (exception instanceof RetriableException) {
- log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error: {}", offsets,
- exception);
- nextAutoCommitTimer.updateAndReset(retryBackoffMs);
- } else {
- log.warn("Asynchronous auto-commit of offsets {} failed: {}", offsets, exception.getMessage());
- }
+ commitOffsetsAsync(allConsumedOffsets, (offsets, exception) -> {
+ if (exception != null) {
+ if (exception instanceof RetriableException) {
+ log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error: {}", offsets,
+ exception);
+ nextAutoCommitTimer.updateAndReset(retryBackoffMs);
} else {
- log.debug("Completed asynchronous auto-commit of offsets {}", offsets);
+ log.warn("Asynchronous auto-commit of offsets {} failed: {}", offsets, exception.getMessage());
}
+ } else {
+ log.debug("Completed asynchronous auto-commit of offsets {}", offsets);
}
});
}
@@ -1004,13 +992,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
"The max time taken for a commit request"), new Max());
this.commitLatency.add(createMeter(metrics, metricGrpName, "commit", "commit calls"));
- Measurable numParts =
- new Measurable() {
- public double measure(MetricConfig config, long now) {
- // Get the number of assigned partitions in a thread safe manner
- return subscriptions.numAssignedPartitions();
- }
- };
+ Measurable numParts = (config, now) -> subscriptions.numAssignedPartitions();
metrics.addMetric(metrics.metricName("assigned-partitions",
this.metricGrpName,
"The number of partitions currently assigned to this consumer"), numParts);
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 87d1a35..c48bd74 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -101,6 +101,32 @@ public class SubscriptionState {
private int assignmentId = 0;
+ @Override
+ public String toString() {
+ return "SubscriptionState{" +
+ "type=" + subscriptionType +
+ ", subscribedPattern=" + subscribedPattern +
+ ", subscription=" + String.join(",", subscription) +
+ ", groupSubscription=" + String.join(",", groupSubscription) +
+ ", defaultResetStrategy=" + defaultResetStrategy +
+ ", assignment=" + assignment.partitionStateValues() + " (id=" + assignmentId + ")}";
+ }
+
+ public String prettyString() {
+ switch (subscriptionType) {
+ case NONE:
+ return "None";
+ case AUTO_TOPICS:
+ return "Subscribe(" + String.join(",", subscription) + ")";
+ case AUTO_PATTERN:
+ return "Subscribe(" + subscribedPattern + ")";
+ case USER_ASSIGNED:
+ return "Assign(" + assignedPartitions() + " , id=" + assignmentId + ")";
+ default:
+ throw new IllegalStateException("Unrecognized subscription type: " + subscriptionType);
+ }
+ }
+
public SubscriptionState(LogContext logContext, OffsetResetStrategy defaultResetStrategy) {
this.log = logContext.logger(this.getClass());
this.defaultResetStrategy = defaultResetStrategy;
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 86032c4..6f62bbf 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -495,32 +495,6 @@ public class ConsumerCoordinatorTest {
}
@Test
- public void testInvalidCoordinatorAssignment() {
- final String consumerId = "invalid_assignment";
-
- subscriptions.subscribe(singleton(topic1), rebalanceListener);
-
- client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
- coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
-
- // normal join group
- Map<String, List<String>> memberSubscriptions = singletonMap(consumerId, singletonList(topic2));
- partitionAssignor.prepare(singletonMap(consumerId, singletonList(t2p)));
-
- client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE));
- client.prepareResponse(new MockClient.RequestMatcher() {
- @Override
- public boolean matches(AbstractRequest body) {
- SyncGroupRequest sync = (SyncGroupRequest) body;
- return sync.data.memberId().equals(consumerId) &&
- sync.data.generationId() == 1 &&
- sync.groupAssignments().containsKey(consumerId);
- }
- }, syncGroupResponse(singletonList(t2p), Errors.NONE));
- assertThrows(IllegalStateException.class, () -> coordinator.poll(time.timer(Long.MAX_VALUE)));
- }
-
- @Test
public void testPatternJoinGroupLeader() {
final String consumerId = "leader";