You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/04 23:58:21 UTC
kafka git commit: KAFKA-2697: client-side support for leave group
Repository: kafka
Updated Branches:
refs/heads/trunk 8de62253a -> ef5d168cc
KAFKA-2697: client-side support for leave group
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Ewen Cheslack-Postava, Guozhang Wang
Closes #414 from hachikuji/KAFKA-2697
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ef5d168c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ef5d168c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ef5d168c
Branch: refs/heads/trunk
Commit: ef5d168cc8f10ad4f0efe9df4cbe849a4b35496e
Parents: 8de6225
Author: Jason Gustafson <ja...@confluent.io>
Authored: Wed Nov 4 15:04:03 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Nov 4 15:04:03 2015 -0800
----------------------------------------------------------------------
.../apache/kafka/clients/consumer/Consumer.java | 12 ++--
.../kafka/clients/consumer/KafkaConsumer.java | 18 +++--
.../kafka/clients/consumer/MockConsumer.java | 2 +-
.../consumer/internals/AbstractCoordinator.java | 76 +++++++++++++++++---
.../consumer/internals/ConsumerCoordinator.java | 28 ++++----
.../internals/ConsumerNetworkClient.java | 15 ++--
.../apache/kafka/common/protocol/Protocol.java | 10 +--
.../common/requests/LeaveGroupRequest.java | 16 ++---
.../internals/ConsumerCoordinatorTest.java | 66 ++++++++++++++++-
.../runtime/distributed/WorkerCoordinator.java | 9 +--
.../src/main/scala/kafka/server/KafkaApis.scala | 2 +-
11 files changed, 190 insertions(+), 64 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/ef5d168c/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
index a3d8776..c9f114d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
@@ -12,18 +12,18 @@
*/
package org.apache.kafka.clients.consumer;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
import java.io.Closeable;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.annotation.InterfaceStability;
-
/**
* @see KafkaConsumer
* @see MockConsumer
http://git-wip-us.apache.org/repos/asf/kafka/blob/ef5d168c/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index a6be519..f3d2e15 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -727,7 +727,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
try {
log.debug("Unsubscribed all topics or patterns and assigned partitions");
this.subscriptions.unsubscribe();
- this.coordinator.resetGeneration();
+ this.coordinator.maybeLeaveGroup(false);
this.metadata.needMetadataForAllTopics(false);
} finally {
release();
@@ -790,11 +790,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
throw new IllegalArgumentException("Timeout must not be negative");
// poll for new data until the timeout expires
+ long start = time.milliseconds();
long remaining = timeout;
- while (remaining >= 0) {
- long start = time.milliseconds();
+ do {
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
-
if (!records.isEmpty()) {
// if data is available, then return it, but first send off the
// next round of fetches to enable pipelining while the user is
@@ -804,8 +803,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
return new ConsumerRecords<>(records);
}
- remaining -= time.milliseconds() - start;
- }
+ long elapsed = time.milliseconds() - start;
+ remaining = timeout - elapsed;
+ } while (remaining > 0);
return ConsumerRecords.empty();
} finally {
@@ -1157,6 +1157,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
}
}
+ /**
+ * Close the consumer, waiting indefinitely for any needed cleanup. If auto-commit is
+ * enabled, this will commit the current offsets.
+ */
@Override
public void close() {
acquire();
@@ -1179,7 +1183,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
private void close(boolean swallowException) {
log.trace("Closing the Kafka consumer.");
- AtomicReference<Throwable> firstException = new AtomicReference<Throwable>();
+ AtomicReference<Throwable> firstException = new AtomicReference<>();
this.closed = true;
ClientUtils.closeQuietly(coordinator, "coordinator", firstException);
ClientUtils.closeQuietly(metrics, "consumer metrics", firstException);
http://git-wip-us.apache.org/repos/asf/kafka/blob/ef5d168c/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 25c0c2c..894bc93 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -31,8 +31,8 @@ import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
-import java.util.regex.Pattern;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Pattern;
/**
* A mock of the {@link Consumer} interface you can use for testing code that uses Kafka. This class is <i> not
http://git-wip-us.apache.org/repos/asf/kafka/blob/ef5d168c/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 781ff78..e9af6c8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -37,6 +37,8 @@ import org.apache.kafka.common.requests.HeartbeatRequest;
import org.apache.kafka.common.requests.HeartbeatResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.JoinGroupResponse;
+import org.apache.kafka.common.requests.LeaveGroupRequest;
+import org.apache.kafka.common.requests.LeaveGroupResponse;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.SyncGroupRequest;
import org.apache.kafka.common.requests.SyncGroupResponse;
@@ -45,6 +47,7 @@ import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
@@ -77,7 +80,7 @@ import java.util.concurrent.TimeUnit;
* {@link #onJoinComplete(int, String, String, ByteBuffer)}.
*
*/
-public abstract class AbstractCoordinator {
+public abstract class AbstractCoordinator implements Closeable {
private static final Logger log = LoggerFactory.getLogger(AbstractCoordinator.class);
@@ -196,15 +199,6 @@ public abstract class AbstractCoordinator {
}
/**
- * Reset the generation/memberId tracked by this member
- */
- public void resetGeneration() {
- this.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID;
- this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
- rejoinNeeded = true;
- }
-
- /**
* Ensure that the group is active (i.e. joined and synced)
*/
public void ensureActiveGroup() {
@@ -514,7 +508,6 @@ public abstract class AbstractCoordinator {
return false;
}
-
/**
* Mark the current coordinator as dead.
*/
@@ -526,6 +519,67 @@ public abstract class AbstractCoordinator {
}
/**
+ * Close the coordinator, waiting if needed to send LeaveGroup.
+ */
+ @Override
+ public void close() {
+ maybeLeaveGroup(true);
+ }
+
+ /**
+ * Leave the current group and reset local generation/memberId.
+ */
+ public void maybeLeaveGroup(boolean awaitResponse) {
+ if (!coordinatorUnknown() && generation > 0) {
+ // this is a minimal effort attempt to leave the group. we do not
+ // attempt any resending if the request fails or times out.
+ sendLeaveGroupRequest(awaitResponse);
+ }
+
+ this.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID;
+ this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
+ rejoinNeeded = true;
+ }
+
+ private void sendLeaveGroupRequest(boolean awaitResponse) {
+ LeaveGroupRequest request = new LeaveGroupRequest(groupId, memberId);
+ RequestFuture<Void> future = client.send(coordinator, ApiKeys.LEAVE_GROUP, request)
+ .compose(new LeaveGroupResponseHandler());
+
+ future.addListener(new RequestFutureListener<Void>() {
+ @Override
+ public void onSuccess(Void value) {}
+
+ @Override
+ public void onFailure(RuntimeException e) {
+ log.info("LeaveGroup request failed with error", e);
+ }
+ });
+
+ if (awaitResponse)
+ client.poll(future);
+ else
+ client.poll(future, 0);
+ }
+
+ private class LeaveGroupResponseHandler extends CoordinatorResponseHandler<LeaveGroupResponse, Void> {
+ @Override
+ public LeaveGroupResponse parse(ClientResponse response) {
+ return new LeaveGroupResponse(response.responseBody());
+ }
+
+ @Override
+ public void handle(LeaveGroupResponse leaveResponse, RequestFuture<Void> future) {
+ // process the response
+ short errorCode = leaveResponse.errorCode();
+ if (errorCode == Errors.NONE.code())
+ future.complete(null);
+ else
+ future.raise(Errors.forCode(errorCode));
+ }
+ }
+
+ /**
* Send a heartbeat request now (visible only for testing).
*/
public RequestFuture<Void> sendHeartbeatRequest() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/ef5d168c/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index c7323cb..25d389c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -15,9 +15,6 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.common.errors.GroupAuthorizationException;
-import org.apache.kafka.common.errors.TopicAuthorizationException;
-import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Assignment;
@@ -26,6 +23,9 @@ import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
@@ -45,7 +45,6 @@ import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
@@ -58,7 +57,7 @@ import java.util.Set;
/**
* This class manages the coordination process with the consumer coordinator.
*/
-public final class ConsumerCoordinator extends AbstractCoordinator implements Closeable {
+public final class ConsumerCoordinator extends AbstractCoordinator {
private static final Logger log = LoggerFactory.getLogger(ConsumerCoordinator.class);
@@ -305,15 +304,18 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl
@Override
public void close() {
- // commit offsets prior to closing if auto-commit enabled
- while (true) {
- try {
- maybeAutoCommitOffsetsSync();
- return;
- } catch (WakeupException e) {
- // ignore wakeups while closing to ensure we have a chance to commit
- continue;
+ try {
+ while (true) {
+ try {
+ maybeAutoCommitOffsetsSync();
+ return;
+ } catch (WakeupException e) {
+ // ignore wakeups while closing to ensure we have a chance to commit
+ continue;
+ }
}
+ } finally {
+ super.close();
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ef5d168c/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 4757fc4..f1f1cc7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -17,8 +17,8 @@ import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.RequestCompletionHandler;
-import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.RequestHeader;
@@ -162,12 +162,15 @@ public class ConsumerNetworkClient implements Closeable {
* @throws WakeupException if {@link #wakeup()} is called from another thread
*/
public boolean poll(RequestFuture<?> future, long timeout) {
- long now = time.milliseconds();
- long deadline = now + timeout;
- while (!future.isDone() && now < deadline) {
- poll(deadline - now, now);
+ long begin = time.milliseconds();
+ long remaining = timeout;
+ long now = begin;
+ do {
+ poll(remaining, now);
now = time.milliseconds();
- }
+ long elapsed = now - begin;
+ remaining = timeout - elapsed;
+ } while (!future.isDone() && remaining > 0);
return future.isDone();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ef5d168c/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 00560db..ff844e7 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -528,10 +528,10 @@ public class Protocol {
public static final Schema[] SYNC_GROUP_RESPONSE = new Schema[] {SYNC_GROUP_RESPONSE_V0};
/* Heartbeat api */
- public static final Schema HEARTBEAT_REQUEST_V0 = new Schema(new Field("group_id", STRING, "The consumer group id."),
+ public static final Schema HEARTBEAT_REQUEST_V0 = new Schema(new Field("group_id", STRING, "The group id."),
new Field("group_generation_id",
INT32,
- "The generation of the consumer group."),
+ "The generation of the group."),
new Field("member_id",
STRING,
"The member id assigned by the group coordinator."));
@@ -542,10 +542,10 @@ public class Protocol {
public static final Schema[] HEARTBEAT_RESPONSE = new Schema[] {HEARTBEAT_RESPONSE_V0};
/* Leave group api */
- public static final Schema LEAVE_GROUP_REQUEST_V0 = new Schema(new Field("group_id", STRING, "The consumer group id."),
- new Field("consumer_id",
+ public static final Schema LEAVE_GROUP_REQUEST_V0 = new Schema(new Field("group_id", STRING, "The group id."),
+ new Field("member_id",
STRING,
- "The consumer id assigned by the group coordinator."));
+ "The member id assigned by the group coordinator."));
public static final Schema LEAVE_GROUP_RESPONSE_V0 = new Schema(new Field("error_code", INT16));
http://git-wip-us.apache.org/repos/asf/kafka/blob/ef5d168c/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
index fcc056a..05bdf90 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
@@ -23,23 +23,23 @@ public class LeaveGroupRequest extends AbstractRequest {
private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.LEAVE_GROUP.id);
private static final String GROUP_ID_KEY_NAME = "group_id";
- private static final String CONSUMER_ID_KEY_NAME = "consumer_id";
+ private static final String MEMBER_ID_KEY_NAME = "member_id";
private final String groupId;
- private final String consumerId;
+ private final String memberId;
- public LeaveGroupRequest(String groupId, String consumerId) {
+ public LeaveGroupRequest(String groupId, String memberId) {
super(new Struct(CURRENT_SCHEMA));
struct.set(GROUP_ID_KEY_NAME, groupId);
- struct.set(CONSUMER_ID_KEY_NAME, consumerId);
+ struct.set(MEMBER_ID_KEY_NAME, memberId);
this.groupId = groupId;
- this.consumerId = consumerId;
+ this.memberId = memberId;
}
public LeaveGroupRequest(Struct struct) {
super(struct);
groupId = struct.getString(GROUP_ID_KEY_NAME);
- consumerId = struct.getString(CONSUMER_ID_KEY_NAME);
+ memberId = struct.getString(MEMBER_ID_KEY_NAME);
}
@Override
@@ -57,8 +57,8 @@ public class LeaveGroupRequest extends AbstractRequest {
return groupId;
}
- public String consumerId() {
- return consumerId;
+ public String memberId() {
+ return memberId;
}
public static LeaveGroupRequest parse(ByteBuffer buffer, int versionId) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/ef5d168c/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 7fd6d88..391f719 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -35,7 +35,10 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.requests.GroupCoordinatorResponse;
import org.apache.kafka.common.requests.HeartbeatResponse;
+import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.JoinGroupResponse;
+import org.apache.kafka.common.requests.LeaveGroupRequest;
+import org.apache.kafka.common.requests.LeaveGroupResponse;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetFetchResponse;
@@ -362,6 +365,64 @@ public class ConsumerCoordinatorTest {
}
@Test
+ public void testLeaveGroupOnClose() {
+ final String consumerId = "consumer";
+
+ subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
+ subscriptions.needReassignment();
+
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.ensureCoordinatorKnown();
+
+ client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
+ client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+ coordinator.ensurePartitionAssignment();
+
+ final AtomicBoolean received = new AtomicBoolean(false);
+ client.prepareResponse(new MockClient.RequestMatcher() {
+ @Override
+ public boolean matches(ClientRequest request) {
+ received.set(true);
+ LeaveGroupRequest leaveRequest = new LeaveGroupRequest(request.request().body());
+ return leaveRequest.memberId().equals(consumerId) &&
+ leaveRequest.groupId().equals(groupId);
+ }
+ }, new LeaveGroupResponse(Errors.NONE.code()).toStruct());
+ coordinator.close();
+ assertTrue(received.get());
+ }
+
+ @Test
+ public void testMaybeLeaveGroup() {
+ final String consumerId = "consumer";
+
+ subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
+ subscriptions.needReassignment();
+
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.ensureCoordinatorKnown();
+
+ client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
+ client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+ coordinator.ensurePartitionAssignment();
+
+ final AtomicBoolean received = new AtomicBoolean(false);
+ client.prepareResponse(new MockClient.RequestMatcher() {
+ @Override
+ public boolean matches(ClientRequest request) {
+ received.set(true);
+ LeaveGroupRequest leaveRequest = new LeaveGroupRequest(request.request().body());
+ return leaveRequest.memberId().equals(consumerId) &&
+ leaveRequest.groupId().equals(groupId);
+ }
+ }, new LeaveGroupResponse(Errors.NONE.code()).toStruct());
+ coordinator.maybeLeaveGroup(false);
+ assertTrue(received.get());
+ assertEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, coordinator.memberId);
+ assertEquals(OffsetCommitRequest.DEFAULT_GENERATION_ID, coordinator.generation);
+ }
+
+ @Test
public void testRebalanceInProgressOnSyncGroup() {
final String consumerId = "consumer";
@@ -543,7 +604,7 @@ public class ConsumerCoordinatorTest {
}
@Test
- public void testResetGeneration() {
+ public void testCommitAfterLeaveGroup() {
// enable auto-assignment
subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
@@ -555,8 +616,9 @@ public class ConsumerCoordinatorTest {
coordinator.ensurePartitionAssignment();
// now switch to manual assignment
+ client.prepareResponse(new LeaveGroupResponse(Errors.NONE.code()).toStruct());
subscriptions.unsubscribe();
- coordinator.resetGeneration();
+ coordinator.maybeLeaveGroup(false);
subscriptions.assignFromUser(Arrays.asList(tp));
// the client should not reuse generation/memberId from auto-subscribed generation
http://git-wip-us.apache.org/repos/asf/kafka/blob/ef5d168c/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java
index d11165c..c748971 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java
@@ -242,14 +242,15 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
return super.needRejoin() || (assignmentSnapshot == null || assignmentSnapshot.failed()) || rejoinRequested;
}
- @Override
- public void close() {
- }
-
public String memberId() {
return this.memberId;
}
+ @Override
+ public void close() {
+ super.close();
+ }
+
private class CopycatWorkerCoordinatorMetrics {
public final Metrics metrics;
public final String metricGrpName;
http://git-wip-us.apache.org/repos/asf/kafka/blob/ef5d168c/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 21434f7..df064e4 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -895,7 +895,7 @@ class KafkaApis(val requestChannel: RequestChannel,
// let the coordinator to handle leave-group
coordinator.handleLeaveGroup(
leaveGroupRequest.groupId(),
- leaveGroupRequest.consumerId(),
+ leaveGroupRequest.memberId(),
sendResponseCallback)
}
}