You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/04/21 02:08:03 UTC

kafka git commit: KAFKA-3117: handle metadata updates during consumer rebalance

Repository: kafka
Updated Branches:
  refs/heads/trunk c9485b78a -> 8a863ecee


KAFKA-3117: handle metadata updates during consumer rebalance

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Guozhang Wang <wa...@gmail.com>

Closes #1247 from hachikuji/KAFKA-3117


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8a863ece
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8a863ece
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8a863ece

Branch: refs/heads/trunk
Commit: 8a863ecee7e5dcdaf66a55b91040a7893ffdbf66
Parents: c9485b7
Author: Jason Gustafson <ja...@confluent.io>
Authored: Wed Apr 20 17:08:00 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Apr 20 17:08:00 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/kafka/clients/Metadata.java | 14 +++--
 .../consumer/internals/AbstractCoordinator.java |  4 +-
 .../consumer/internals/ConsumerCoordinator.java | 49 +++++++++++++----
 .../internals/ConsumerNetworkClient.java        |  2 +-
 .../internals/ConsumerCoordinatorTest.java      | 56 ++++++++++++++++++++
 .../java/org/apache/kafka/test/TestUtils.java   | 21 ++++++--
 6 files changed, 125 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8a863ece/clients/src/main/java/org/apache/kafka/clients/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 73a9f33..322ae0f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -108,6 +108,14 @@ public final class Metadata {
     }
 
     /**
+     * Check whether an update has been explicitly requested.
+     * @return true if an update was requested, false otherwise
+     */
+    public synchronized boolean updateRequested() {
+        return this.needUpdate;
+    }
+
+    /**
      * Wait for metadata update until the current version is larger than the last version we know of
      */
     public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {
@@ -203,10 +211,10 @@ public final class Metadata {
 
     /**
      * Set state to indicate if metadata for all topics in Kafka cluster is required or not.
-     * @param needMetadaForAllTopics boolean indicating need for metadata of all topics in cluster.
+     * @param needMetadataForAllTopics boolean indicating need for metadata of all topics in cluster.
      */
-    public synchronized void needMetadataForAllTopics(boolean needMetadaForAllTopics) {
-        this.needMetadataForAllTopics = needMetadaForAllTopics;
+    public synchronized void needMetadataForAllTopics(boolean needMetadataForAllTopics) {
+        this.needMetadataForAllTopics = needMetadataForAllTopics;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/8a863ece/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 496a114..15185d7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -410,10 +410,10 @@ public abstract class AbstractCoordinator implements Closeable {
         if (coordinatorUnknown())
             return RequestFuture.coordinatorNotAvailable();
         return client.send(coordinator, ApiKeys.SYNC_GROUP, request)
-                .compose(new SyncGroupRequestHandler());
+                .compose(new SyncGroupResponseHandler());
     }
 
-    private class SyncGroupRequestHandler extends CoordinatorResponseHandler<SyncGroupResponse, ByteBuffer> {
+    private class SyncGroupResponseHandler extends CoordinatorResponseHandler<SyncGroupResponse, ByteBuffer> {
 
         @Override
         public SyncGroupResponse parse(ClientResponse response) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8a863ece/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 86b60d0..887f47c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -63,7 +63,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
     private final List<PartitionAssignor> assignors;
     private final org.apache.kafka.clients.Metadata metadata;
-    private final MetadataSnapshot metadataSnapshot;
     private final ConsumerCoordinatorMetrics sensors;
     private final SubscriptionState subscriptions;
     private final OffsetCommitCallback defaultOffsetCommitCallback;
@@ -72,6 +71,9 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
     private final ConsumerInterceptors<?, ?> interceptors;
     private final boolean excludeInternalTopics;
 
+    private MetadataSnapshot metadataSnapshot;
+    private MetadataSnapshot assignmentSnapshot;
+
     /**
      * Initialize the coordination manager.
      */
@@ -102,7 +104,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         this.metadata = metadata;
 
         this.metadata.requestUpdate();
-        this.metadataSnapshot = new MetadataSnapshot();
+        this.metadataSnapshot = new MetadataSnapshot(subscriptions, metadata.fetch());
         this.subscriptions = subscriptions;
         this.defaultOffsetCommitCallback = defaultOffsetCommitCallback;
         this.autoCommitEnabled = autoCommitEnabled;
@@ -159,8 +161,14 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                 }
 
                 // check if there are any changes to the metadata which should trigger a rebalance
-                if (metadataSnapshot.update(subscriptions, cluster) && subscriptions.partitionsAutoAssigned())
-                    subscriptions.needReassignment();
+                if (subscriptions.partitionsAutoAssigned()) {
+                    MetadataSnapshot snapshot = new MetadataSnapshot(subscriptions, cluster);
+                    if (!snapshot.equals(metadataSnapshot)) {
+                        metadataSnapshot = snapshot;
+                        subscriptions.needReassignment();
+                    }
+                }
+
             }
         });
     }
@@ -178,6 +186,13 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                                   String memberId,
                                   String assignmentStrategy,
                                   ByteBuffer assignmentBuffer) {
+        // if we were the assignor, then we need to make sure that there have been no metadata updates
+        // since the rebalance begin. Otherwise, we won't rebalance again until the next metadata change
+        if (assignmentSnapshot != null && !assignmentSnapshot.equals(metadataSnapshot)) {
+            subscriptions.needReassignment();
+            return;
+        }
+
         PartitionAssignor assignor = lookupAssignor(assignmentStrategy);
         if (assignor == null)
             throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
@@ -231,7 +246,11 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         // which ensures that all metadata changes will eventually be seen
         this.subscriptions.groupSubscribe(allSubscribedTopics);
         metadata.setTopics(this.subscriptions.groupSubscription());
+
+        // update metadata (if needed) and keep track of the metadata used for assignment so that
+        // we can check after rebalance completion whether anything has changed
         client.ensureFreshMetadata();
+        assignmentSnapshot = metadataSnapshot;
 
         log.debug("Performing assignment for group {} using strategy {} with subscriptions {}",
                 groupId, assignor.name(), subscriptions);
@@ -267,6 +286,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                     listener.getClass().getName(), groupId, e);
         }
 
+        assignmentSnapshot = null;
         subscriptions.needReassignment();
     }
 
@@ -669,19 +689,26 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
     }
 
     private static class MetadataSnapshot {
-        private Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        private final Map<String, Integer> partitionsPerTopic;
 
-        public boolean update(SubscriptionState subscription, Cluster cluster) {
+        public MetadataSnapshot(SubscriptionState subscription, Cluster cluster) {
             Map<String, Integer> partitionsPerTopic = new HashMap<>();
             for (String topic : subscription.groupSubscription())
                 partitionsPerTopic.put(topic, cluster.partitionCountForTopic(topic));
+            this.partitionsPerTopic = partitionsPerTopic;
+        }
 
-            if (!partitionsPerTopic.equals(this.partitionsPerTopic)) {
-                this.partitionsPerTopic = partitionsPerTopic;
-                return true;
-            }
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            MetadataSnapshot that = (MetadataSnapshot) o;
+            return partitionsPerTopic != null ? partitionsPerTopic.equals(that.partitionsPerTopic) : that.partitionsPerTopic == null;
+        }
 
-            return false;
+        @Override
+        public int hashCode() {
+            return partitionsPerTopic != null ? partitionsPerTopic.hashCode() : 0;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8a863ece/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 4119954..d4c2656 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -138,7 +138,7 @@ public class ConsumerNetworkClient implements Closeable {
      * until it has completed).
      */
     public void ensureFreshMetadata() {
-        if (this.metadata.timeToNextUpdate(time.milliseconds()) == 0)
+        if (this.metadata.updateRequested() || this.metadata.timeToNextUpdate(time.milliseconds()) == 0)
             awaitMetadataUpdate();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8a863ece/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index b864d69..5a174db 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -63,6 +63,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -575,6 +576,61 @@ public class ConsumerCoordinatorTest {
         assertTrue(subscriptions.partitionAssignmentNeeded());
     }
 
+
+    @Test
+    public void testUpdateMetadataDuringRebalance() {
+        final String topic1 = "topic1";
+        final String topic2 = "topic2";
+        TopicPartition tp1 = new TopicPartition(topic1, 0);
+        TopicPartition tp2 = new TopicPartition(topic2, 0);
+        final String consumerId = "leader";
+
+        List<String> topics = Arrays.asList(topic1, topic2);
+
+        subscriptions.subscribe(topics, rebalanceListener);
+        metadata.setTopics(topics);
+        subscriptions.needReassignment();
+
+        // we only have metadata for one topic initially
+        metadata.update(TestUtils.singletonCluster(topic1, 1), time.milliseconds());
+
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        // prepare initial rebalance
+        Map<String, List<String>> memberSubscriptions = Collections.singletonMap(consumerId, topics);
+        partitionAssignor.prepare(Collections.singletonMap(consumerId, Arrays.asList(tp1)));
+
+        client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE.code()));
+        client.prepareResponse(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(ClientRequest request) {
+                SyncGroupRequest sync = new SyncGroupRequest(request.request().body());
+                if (sync.memberId().equals(consumerId) &&
+                        sync.generationId() == 1 &&
+                        sync.groupAssignment().containsKey(consumerId)) {
+                    // trigger the metadata update including both topics after the sync group request has been sent
+                    Map<String, Integer> topicPartitionCounts = new HashMap<>();
+                    topicPartitionCounts.put(topic1, 1);
+                    topicPartitionCounts.put(topic2, 1);
+                    metadata.update(TestUtils.singletonCluster(topicPartitionCounts), time.milliseconds());
+                    return true;
+                }
+                return false;
+            }
+        }, syncGroupResponse(Arrays.asList(tp1), Errors.NONE.code()));
+
+        // the metadata update should trigger a second rebalance
+        client.prepareResponse(joinGroupLeaderResponse(2, consumerId, memberSubscriptions, Errors.NONE.code()));
+        client.prepareResponse(syncGroupResponse(Arrays.asList(tp1, tp2), Errors.NONE.code()));
+
+        coordinator.ensurePartitionAssignment();
+
+        assertFalse(subscriptions.partitionAssignmentNeeded());
+        assertEquals(new HashSet<>(Arrays.asList(tp1, tp2)), subscriptions.assignedPartitions());
+    }
+
+
     @Test
     public void testExcludeInternalTopicsConfigOption() { 
         subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);

http://git-wip-us.apache.org/repos/asf/kafka/blob/8a863ece/clients/src/test/java/org/apache/kafka/test/TestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index 7ffc54a..027221e 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 
 import org.apache.kafka.common.Cluster;
@@ -45,20 +46,32 @@ public class TestUtils {
     public static final Random SEEDED_RANDOM = new Random(192348092834L);
     public static final Random RANDOM = new Random();
 
+    public static Cluster singletonCluster(Map<String, Integer> topicPartitionCounts) {
+        return clusterWith(1, topicPartitionCounts);
+    }
+
     public static Cluster singletonCluster(String topic, int partitions) {
         return clusterWith(1, topic, partitions);
     }
 
-    public static Cluster clusterWith(int nodes, String topic, int partitions) {
+    public static Cluster clusterWith(int nodes, Map<String, Integer> topicPartitionCounts) {
         Node[] ns = new Node[nodes];
         for (int i = 0; i < nodes; i++)
             ns[i] = new Node(i, "localhost", 1969);
-        List<PartitionInfo> parts = new ArrayList<PartitionInfo>();
-        for (int i = 0; i < partitions; i++)
-            parts.add(new PartitionInfo(topic, i, ns[i % ns.length], ns, ns));
+        List<PartitionInfo> parts = new ArrayList<>();
+        for (Map.Entry<String, Integer> topicPartition : topicPartitionCounts.entrySet()) {
+            String topic = topicPartition.getKey();
+            int partitions = topicPartition.getValue();
+            for (int i = 0; i < partitions; i++)
+                parts.add(new PartitionInfo(topic, i, ns[i % ns.length], ns, ns));
+        }
         return new Cluster(asList(ns), parts, Collections.<String>emptySet());
     }
 
+    public static Cluster clusterWith(int nodes, String topic, int partitions) {
+        return clusterWith(nodes, Collections.singletonMap(topic, partitions));
+    }
+
     /**
      * Generate an array of random bytes
      *