You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/04/21 02:08:03 UTC
kafka git commit: KAFKA-3117: handle metadata updates during consumer
rebalance
Repository: kafka
Updated Branches:
refs/heads/trunk c9485b78a -> 8a863ecee
KAFKA-3117: handle metadata updates during consumer rebalance
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Guozhang Wang <wa...@gmail.com>
Closes #1247 from hachikuji/KAFKA-3117
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8a863ece
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8a863ece
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8a863ece
Branch: refs/heads/trunk
Commit: 8a863ecee7e5dcdaf66a55b91040a7893ffdbf66
Parents: c9485b7
Author: Jason Gustafson <ja...@confluent.io>
Authored: Wed Apr 20 17:08:00 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Apr 20 17:08:00 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/kafka/clients/Metadata.java | 14 +++--
.../consumer/internals/AbstractCoordinator.java | 4 +-
.../consumer/internals/ConsumerCoordinator.java | 49 +++++++++++++----
.../internals/ConsumerNetworkClient.java | 2 +-
.../internals/ConsumerCoordinatorTest.java | 56 ++++++++++++++++++++
.../java/org/apache/kafka/test/TestUtils.java | 21 ++++++--
6 files changed, 125 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/8a863ece/clients/src/main/java/org/apache/kafka/clients/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 73a9f33..322ae0f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -108,6 +108,14 @@ public final class Metadata {
}
/**
+ * Check whether an update has been explicitly requested.
+ * @return true if an update was requested, false otherwise
+ */
+ public synchronized boolean updateRequested() {
+ return this.needUpdate;
+ }
+
+ /**
* Wait for metadata update until the current version is larger than the last version we know of
*/
public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {
@@ -203,10 +211,10 @@ public final class Metadata {
/**
* Set state to indicate if metadata for all topics in Kafka cluster is required or not.
- * @param needMetadaForAllTopics boolean indicating need for metadata of all topics in cluster.
+ * @param needMetadataForAllTopics boolean indicating need for metadata of all topics in cluster.
*/
- public synchronized void needMetadataForAllTopics(boolean needMetadaForAllTopics) {
- this.needMetadataForAllTopics = needMetadaForAllTopics;
+ public synchronized void needMetadataForAllTopics(boolean needMetadataForAllTopics) {
+ this.needMetadataForAllTopics = needMetadataForAllTopics;
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/8a863ece/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 496a114..15185d7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -410,10 +410,10 @@ public abstract class AbstractCoordinator implements Closeable {
if (coordinatorUnknown())
return RequestFuture.coordinatorNotAvailable();
return client.send(coordinator, ApiKeys.SYNC_GROUP, request)
- .compose(new SyncGroupRequestHandler());
+ .compose(new SyncGroupResponseHandler());
}
- private class SyncGroupRequestHandler extends CoordinatorResponseHandler<SyncGroupResponse, ByteBuffer> {
+ private class SyncGroupResponseHandler extends CoordinatorResponseHandler<SyncGroupResponse, ByteBuffer> {
@Override
public SyncGroupResponse parse(ClientResponse response) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/8a863ece/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 86b60d0..887f47c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -63,7 +63,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
private final List<PartitionAssignor> assignors;
private final org.apache.kafka.clients.Metadata metadata;
- private final MetadataSnapshot metadataSnapshot;
private final ConsumerCoordinatorMetrics sensors;
private final SubscriptionState subscriptions;
private final OffsetCommitCallback defaultOffsetCommitCallback;
@@ -72,6 +71,9 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
private final ConsumerInterceptors<?, ?> interceptors;
private final boolean excludeInternalTopics;
+ private MetadataSnapshot metadataSnapshot;
+ private MetadataSnapshot assignmentSnapshot;
+
/**
* Initialize the coordination manager.
*/
@@ -102,7 +104,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
this.metadata = metadata;
this.metadata.requestUpdate();
- this.metadataSnapshot = new MetadataSnapshot();
+ this.metadataSnapshot = new MetadataSnapshot(subscriptions, metadata.fetch());
this.subscriptions = subscriptions;
this.defaultOffsetCommitCallback = defaultOffsetCommitCallback;
this.autoCommitEnabled = autoCommitEnabled;
@@ -159,8 +161,14 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
}
// check if there are any changes to the metadata which should trigger a rebalance
- if (metadataSnapshot.update(subscriptions, cluster) && subscriptions.partitionsAutoAssigned())
- subscriptions.needReassignment();
+ if (subscriptions.partitionsAutoAssigned()) {
+ MetadataSnapshot snapshot = new MetadataSnapshot(subscriptions, cluster);
+ if (!snapshot.equals(metadataSnapshot)) {
+ metadataSnapshot = snapshot;
+ subscriptions.needReassignment();
+ }
+ }
+
}
});
}
@@ -178,6 +186,13 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
String memberId,
String assignmentStrategy,
ByteBuffer assignmentBuffer) {
+ // if we were the assignor, then we need to make sure that there have been no metadata updates
+ // since the rebalance begin. Otherwise, we won't rebalance again until the next metadata change
+ if (assignmentSnapshot != null && !assignmentSnapshot.equals(metadataSnapshot)) {
+ subscriptions.needReassignment();
+ return;
+ }
+
PartitionAssignor assignor = lookupAssignor(assignmentStrategy);
if (assignor == null)
throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
@@ -231,7 +246,11 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
// which ensures that all metadata changes will eventually be seen
this.subscriptions.groupSubscribe(allSubscribedTopics);
metadata.setTopics(this.subscriptions.groupSubscription());
+
+ // update metadata (if needed) and keep track of the metadata used for assignment so that
+ // we can check after rebalance completion whether anything has changed
client.ensureFreshMetadata();
+ assignmentSnapshot = metadataSnapshot;
log.debug("Performing assignment for group {} using strategy {} with subscriptions {}",
groupId, assignor.name(), subscriptions);
@@ -267,6 +286,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
listener.getClass().getName(), groupId, e);
}
+ assignmentSnapshot = null;
subscriptions.needReassignment();
}
@@ -669,19 +689,26 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
}
private static class MetadataSnapshot {
- private Map<String, Integer> partitionsPerTopic = new HashMap<>();
+ private final Map<String, Integer> partitionsPerTopic;
- public boolean update(SubscriptionState subscription, Cluster cluster) {
+ public MetadataSnapshot(SubscriptionState subscription, Cluster cluster) {
Map<String, Integer> partitionsPerTopic = new HashMap<>();
for (String topic : subscription.groupSubscription())
partitionsPerTopic.put(topic, cluster.partitionCountForTopic(topic));
+ this.partitionsPerTopic = partitionsPerTopic;
+ }
- if (!partitionsPerTopic.equals(this.partitionsPerTopic)) {
- this.partitionsPerTopic = partitionsPerTopic;
- return true;
- }
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ MetadataSnapshot that = (MetadataSnapshot) o;
+ return partitionsPerTopic != null ? partitionsPerTopic.equals(that.partitionsPerTopic) : that.partitionsPerTopic == null;
+ }
- return false;
+ @Override
+ public int hashCode() {
+ return partitionsPerTopic != null ? partitionsPerTopic.hashCode() : 0;
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8a863ece/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 4119954..d4c2656 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -138,7 +138,7 @@ public class ConsumerNetworkClient implements Closeable {
* until it has completed).
*/
public void ensureFreshMetadata() {
- if (this.metadata.timeToNextUpdate(time.milliseconds()) == 0)
+ if (this.metadata.updateRequested() || this.metadata.timeToNextUpdate(time.milliseconds()) == 0)
awaitMetadataUpdate();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8a863ece/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index b864d69..5a174db 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -63,6 +63,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -575,6 +576,61 @@ public class ConsumerCoordinatorTest {
assertTrue(subscriptions.partitionAssignmentNeeded());
}
+
+ @Test
+ public void testUpdateMetadataDuringRebalance() {
+ final String topic1 = "topic1";
+ final String topic2 = "topic2";
+ TopicPartition tp1 = new TopicPartition(topic1, 0);
+ TopicPartition tp2 = new TopicPartition(topic2, 0);
+ final String consumerId = "leader";
+
+ List<String> topics = Arrays.asList(topic1, topic2);
+
+ subscriptions.subscribe(topics, rebalanceListener);
+ metadata.setTopics(topics);
+ subscriptions.needReassignment();
+
+ // we only have metadata for one topic initially
+ metadata.update(TestUtils.singletonCluster(topic1, 1), time.milliseconds());
+
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.ensureCoordinatorKnown();
+
+ // prepare initial rebalance
+ Map<String, List<String>> memberSubscriptions = Collections.singletonMap(consumerId, topics);
+ partitionAssignor.prepare(Collections.singletonMap(consumerId, Arrays.asList(tp1)));
+
+ client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE.code()));
+ client.prepareResponse(new MockClient.RequestMatcher() {
+ @Override
+ public boolean matches(ClientRequest request) {
+ SyncGroupRequest sync = new SyncGroupRequest(request.request().body());
+ if (sync.memberId().equals(consumerId) &&
+ sync.generationId() == 1 &&
+ sync.groupAssignment().containsKey(consumerId)) {
+ // trigger the metadata update including both topics after the sync group request has been sent
+ Map<String, Integer> topicPartitionCounts = new HashMap<>();
+ topicPartitionCounts.put(topic1, 1);
+ topicPartitionCounts.put(topic2, 1);
+ metadata.update(TestUtils.singletonCluster(topicPartitionCounts), time.milliseconds());
+ return true;
+ }
+ return false;
+ }
+ }, syncGroupResponse(Arrays.asList(tp1), Errors.NONE.code()));
+
+ // the metadata update should trigger a second rebalance
+ client.prepareResponse(joinGroupLeaderResponse(2, consumerId, memberSubscriptions, Errors.NONE.code()));
+ client.prepareResponse(syncGroupResponse(Arrays.asList(tp1, tp2), Errors.NONE.code()));
+
+ coordinator.ensurePartitionAssignment();
+
+ assertFalse(subscriptions.partitionAssignmentNeeded());
+ assertEquals(new HashSet<>(Arrays.asList(tp1, tp2)), subscriptions.assignedPartitions());
+ }
+
+
@Test
public void testExcludeInternalTopicsConfigOption() {
subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
http://git-wip-us.apache.org/repos/asf/kafka/blob/8a863ece/clients/src/test/java/org/apache/kafka/test/TestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index 7ffc54a..027221e 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Random;
import org.apache.kafka.common.Cluster;
@@ -45,20 +46,32 @@ public class TestUtils {
public static final Random SEEDED_RANDOM = new Random(192348092834L);
public static final Random RANDOM = new Random();
+ public static Cluster singletonCluster(Map<String, Integer> topicPartitionCounts) {
+ return clusterWith(1, topicPartitionCounts);
+ }
+
public static Cluster singletonCluster(String topic, int partitions) {
return clusterWith(1, topic, partitions);
}
- public static Cluster clusterWith(int nodes, String topic, int partitions) {
+ public static Cluster clusterWith(int nodes, Map<String, Integer> topicPartitionCounts) {
Node[] ns = new Node[nodes];
for (int i = 0; i < nodes; i++)
ns[i] = new Node(i, "localhost", 1969);
- List<PartitionInfo> parts = new ArrayList<PartitionInfo>();
- for (int i = 0; i < partitions; i++)
- parts.add(new PartitionInfo(topic, i, ns[i % ns.length], ns, ns));
+ List<PartitionInfo> parts = new ArrayList<>();
+ for (Map.Entry<String, Integer> topicPartition : topicPartitionCounts.entrySet()) {
+ String topic = topicPartition.getKey();
+ int partitions = topicPartition.getValue();
+ for (int i = 0; i < partitions; i++)
+ parts.add(new PartitionInfo(topic, i, ns[i % ns.length], ns, ns));
+ }
return new Cluster(asList(ns), parts, Collections.<String>emptySet());
}
+ public static Cluster clusterWith(int nodes, String topic, int partitions) {
+ return clusterWith(nodes, Collections.singletonMap(topic, partitions));
+ }
+
/**
* Generate an array of random bytes
*