You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/04/08 21:55:36 UTC

kafka git commit: KAFKA-3528: handle wakeups while rebalancing more gracefully

Repository: kafka
Updated Branches:
  refs/heads/trunk cbdd70ec0 -> 2a8fa2878


KAFKA-3528: handle wakeups while rebalancing more gracefully

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>

Closes #1201 from hachikuji/KAFKA-3528


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2a8fa287
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2a8fa287
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2a8fa287

Branch: refs/heads/trunk
Commit: 2a8fa287862912491d1e118c52a2d194d8480075
Parents: cbdd70e
Author: Jason Gustafson <ja...@confluent.io>
Authored: Fri Apr 8 12:55:21 2016 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Fri Apr 8 12:55:21 2016 -0700

----------------------------------------------------------------------
 .../consumer/internals/AbstractCoordinator.java | 22 ++++++++---
 .../internals/ConsumerCoordinatorTest.java      | 40 ++++++++++++++++++++
 2 files changed, 57 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2a8fa287/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 1e6757e..496a114 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -218,13 +218,25 @@ public abstract class AbstractCoordinator implements Closeable {
             }
 
             RequestFuture<ByteBuffer> future = sendJoinGroupRequest();
+            future.addListener(new RequestFutureListener<ByteBuffer>() {
+                @Override
+                public void onSuccess(ByteBuffer value) {
+                    // handle join completion in the callback so that the callback will be invoked
+                    // even if the consumer is woken up before finishing the rebalance
+                    onJoinComplete(generation, memberId, protocol, value);
+                    needsJoinPrepare = true;
+                    heartbeatTask.reset();
+                }
+
+                @Override
+                public void onFailure(RuntimeException e) {
+                    // we handle failures below after the request finishes. if the join completes
+                    // after having been woken up, the exception is ignored and we will rejoin
+                }
+            });
             client.poll(future);
 
-            if (future.succeeded()) {
-                onJoinComplete(generation, memberId, protocol, future.value());
-                needsJoinPrepare = true;
-                heartbeatTask.reset();
-            } else {
+            if (future.failed()) {
                 RuntimeException exception = future.exception();
                 if (exception instanceof UnknownMemberIdException ||
                         exception instanceof RebalanceInProgressException ||

http://git-wip-us.apache.org/repos/asf/kafka/blob/2a8fa287/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 2189c30..b864d69 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -35,6 +35,7 @@ import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.errors.DisconnectException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
+import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.internals.TopicConstants;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.Errors;
@@ -323,6 +324,45 @@ public class ConsumerCoordinatorTest {
     }
 
     @Test
+    public void testWakeupDuringJoin() {
+        final String consumerId = "leader";
+
+        subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
+        subscriptions.needReassignment();
+
+        // ensure metadata is up-to-date for leader
+        metadata.setTopics(Arrays.asList(topicName));
+        metadata.update(cluster, time.milliseconds());
+
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        Map<String, List<String>> memberSubscriptions = Collections.singletonMap(consumerId, Arrays.asList(topicName));
+        partitionAssignor.prepare(Collections.singletonMap(consumerId, Arrays.asList(tp)));
+
+        // prepare only the first half of the join and then trigger the wakeup
+        client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE.code()));
+        consumerClient.wakeup();
+
+        try {
+            coordinator.ensurePartitionAssignment();
+        } catch (WakeupException e) {
+            // ignore
+        }
+
+        // now complete the second half
+        client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+        coordinator.ensurePartitionAssignment();
+
+        assertFalse(subscriptions.partitionAssignmentNeeded());
+        assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
+        assertEquals(1, rebalanceListener.revokedCount);
+        assertEquals(Collections.emptySet(), rebalanceListener.revoked);
+        assertEquals(1, rebalanceListener.assignedCount);
+        assertEquals(Collections.singleton(tp), rebalanceListener.assigned);
+    }
+
+    @Test
     public void testNormalJoinGroupFollower() {
         final String consumerId = "consumer";