You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/04/12 01:09:59 UTC
[43/50] [abbrv] kafka git commit: KAFKA-3528: handle wakeups while
rebalancing more gracefully
KAFKA-3528: handle wakeups while rebalancing more gracefully
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>
Closes #1201 from hachikuji/KAFKA-3528
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2a8fa287
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2a8fa287
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2a8fa287
Branch: refs/heads/0.10.0
Commit: 2a8fa287862912491d1e118c52a2d194d8480075
Parents: cbdd70e
Author: Jason Gustafson <ja...@confluent.io>
Authored: Fri Apr 8 12:55:21 2016 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Fri Apr 8 12:55:21 2016 -0700
----------------------------------------------------------------------
.../consumer/internals/AbstractCoordinator.java | 22 ++++++++---
.../internals/ConsumerCoordinatorTest.java | 40 ++++++++++++++++++++
2 files changed, 57 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/2a8fa287/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 1e6757e..496a114 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -218,13 +218,25 @@ public abstract class AbstractCoordinator implements Closeable {
}
RequestFuture<ByteBuffer> future = sendJoinGroupRequest();
+ future.addListener(new RequestFutureListener<ByteBuffer>() {
+ @Override
+ public void onSuccess(ByteBuffer value) {
+ // handle join completion in the callback so that the callback will be invoked
+ // even if the consumer is woken up before finishing the rebalance
+ onJoinComplete(generation, memberId, protocol, value);
+ needsJoinPrepare = true;
+ heartbeatTask.reset();
+ }
+
+ @Override
+ public void onFailure(RuntimeException e) {
+ // we handle failures below after the request finishes. if the join completes
+ // after having been woken up, the exception is ignored and we will rejoin
+ }
+ });
client.poll(future);
- if (future.succeeded()) {
- onJoinComplete(generation, memberId, protocol, future.value());
- needsJoinPrepare = true;
- heartbeatTask.reset();
- } else {
+ if (future.failed()) {
RuntimeException exception = future.exception();
if (exception instanceof UnknownMemberIdException ||
exception instanceof RebalanceInProgressException ||
http://git-wip-us.apache.org/repos/asf/kafka/blob/2a8fa287/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 2189c30..b864d69 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -35,6 +35,7 @@ import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
+import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.internals.TopicConstants;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
@@ -323,6 +324,45 @@ public class ConsumerCoordinatorTest {
}
@Test
+ public void testWakeupDuringJoin() {
+ final String consumerId = "leader";
+
+ subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
+ subscriptions.needReassignment();
+
+ // ensure metadata is up-to-date for leader
+ metadata.setTopics(Arrays.asList(topicName));
+ metadata.update(cluster, time.milliseconds());
+
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.ensureCoordinatorKnown();
+
+ Map<String, List<String>> memberSubscriptions = Collections.singletonMap(consumerId, Arrays.asList(topicName));
+ partitionAssignor.prepare(Collections.singletonMap(consumerId, Arrays.asList(tp)));
+
+ // prepare only the first half of the join and then trigger the wakeup
+ client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE.code()));
+ consumerClient.wakeup();
+
+ try {
+ coordinator.ensurePartitionAssignment();
+ } catch (WakeupException e) {
+ // ignore
+ }
+
+ // now complete the second half
+ client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+ coordinator.ensurePartitionAssignment();
+
+ assertFalse(subscriptions.partitionAssignmentNeeded());
+ assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
+ assertEquals(1, rebalanceListener.revokedCount);
+ assertEquals(Collections.emptySet(), rebalanceListener.revoked);
+ assertEquals(1, rebalanceListener.assignedCount);
+ assertEquals(Collections.singleton(tp), rebalanceListener.assigned);
+ }
+
+ @Test
public void testNormalJoinGroupFollower() {
final String consumerId = "consumer";