You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/10/17 02:41:35 UTC
kafka git commit: KAFKA-2397: add leave group request to force
coordinator trigger rebalance
Repository: kafka
Updated Branches:
refs/heads/trunk e2e5c8914 -> 636e14a99
KAFKA-2397: add leave group request to force coordinator trigger rebalance
Let's say every consumer in a group has session timeout s. Currently, if a consumer leaves the group, the worst case time to stabilize the group is 2s (s to detect the consumer failure + s for the rebalance window). If a consumer instead can declare they are leaving the group, the worst case time to stabilize the group would just be the s associated with the rebalance window.
This is a low priority optimization!
Author: Onur Karaman <ok...@linkedin.com>
Reviewers: Jason Gustafson, Guozhang Wang
Closes #103 from onurkaraman/leave-group
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/636e14a9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/636e14a9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/636e14a9
Branch: refs/heads/trunk
Commit: 636e14a99191eeededfb933aacfe2459c7c7bb6f
Parents: e2e5c89
Author: Onur Karaman <ok...@linkedin.com>
Authored: Fri Oct 16 17:46:17 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Oct 16 17:46:17 2015 -0700
----------------------------------------------------------------------
.../apache/kafka/common/protocol/ApiKeys.java | 3 +-
.../apache/kafka/common/protocol/Protocol.java | 13 ++++
.../kafka/common/requests/AbstractRequest.java | 2 +
.../common/requests/LeaveGroupRequest.java | 71 ++++++++++++++++++++
.../common/requests/LeaveGroupResponse.java | 53 +++++++++++++++
.../common/requests/RequestResponseTest.java | 11 +++
core/src/main/scala/kafka/api/RequestKeys.scala | 1 +
.../kafka/coordinator/ConsumerCoordinator.scala | 46 +++++++++++--
.../kafka/coordinator/ConsumerMetadata.scala | 1 +
.../src/main/scala/kafka/server/KafkaApis.scala | 22 +++++-
.../ConsumerCoordinatorResponseTest.scala | 69 +++++++++++++++++++
11 files changed, 286 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/636e14a9/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 46ddddb..fab8b02 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -32,7 +32,8 @@ public enum ApiKeys {
OFFSET_FETCH(9, "OffsetFetch"),
CONSUMER_METADATA(10, "ConsumerMetadata"),
JOIN_GROUP(11, "JoinGroup"),
- HEARTBEAT(12, "Heartbeat");
+ HEARTBEAT(12, "Heartbeat"),
+ LEAVE_GROUP(13, "LeaveGroup");
private static ApiKeys[] codeToType;
public static final int MAX_API_KEY;
http://git-wip-us.apache.org/repos/asf/kafka/blob/636e14a9/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index b72db4f..9f8e981 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -461,6 +461,17 @@ public class Protocol {
public static final Schema[] HEARTBEAT_REQUEST = new Schema[] {HEARTBEAT_REQUEST_V0};
public static final Schema[] HEARTBEAT_RESPONSE = new Schema[] {HEARTBEAT_RESPONSE_V0};
+ /* Leave group api */
+ public static final Schema LEAVE_GROUP_REQUEST_V0 = new Schema(new Field("group_id", STRING, "The consumer group id."),
+ new Field("consumer_id",
+ STRING,
+ "The consumer id assigned by the group coordinator."));
+
+ public static final Schema LEAVE_GROUP_RESPONSE_V0 = new Schema(new Field("error_code", INT16));
+
+ public static final Schema[] LEAVE_GROUP_REQUEST = new Schema[] {LEAVE_GROUP_REQUEST_V0};
+ public static final Schema[] LEAVE_GROUP_RESPONSE = new Schema[] {LEAVE_GROUP_RESPONSE_V0};
+
/* Leader and ISR api */
public static final Schema LEADER_AND_ISR_REQUEST_PARTITION_STATE_V0 =
new Schema(new Field("topic", STRING, "Topic name."),
@@ -581,6 +592,7 @@ public class Protocol {
REQUESTS[ApiKeys.CONSUMER_METADATA.id] = CONSUMER_METADATA_REQUEST;
REQUESTS[ApiKeys.JOIN_GROUP.id] = JOIN_GROUP_REQUEST;
REQUESTS[ApiKeys.HEARTBEAT.id] = HEARTBEAT_REQUEST;
+ REQUESTS[ApiKeys.LEAVE_GROUP.id] = LEAVE_GROUP_REQUEST;
RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE;
@@ -596,6 +608,7 @@ public class Protocol {
RESPONSES[ApiKeys.CONSUMER_METADATA.id] = CONSUMER_METADATA_RESPONSE;
RESPONSES[ApiKeys.JOIN_GROUP.id] = JOIN_GROUP_RESPONSE;
RESPONSES[ApiKeys.HEARTBEAT.id] = HEARTBEAT_RESPONSE;
+ RESPONSES[ApiKeys.LEAVE_GROUP.id] = LEAVE_GROUP_RESPONSE;
/* set the maximum version of each api */
for (ApiKeys api : ApiKeys.values())
http://git-wip-us.apache.org/repos/asf/kafka/blob/636e14a9/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index a696e80..095cd52 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -55,6 +55,8 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
return JoinGroupRequest.parse(buffer, versionId);
case HEARTBEAT:
return HeartbeatRequest.parse(buffer, versionId);
+ case LEAVE_GROUP:
+ return LeaveGroupRequest.parse(buffer, versionId);
case STOP_REPLICA:
return StopReplicaRequest.parse(buffer, versionId);
case CONTROLLED_SHUTDOWN_KEY:
http://git-wip-us.apache.org/repos/asf/kafka/blob/636e14a9/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
new file mode 100644
index 0000000..fcc056a
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import java.nio.ByteBuffer;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+public class LeaveGroupRequest extends AbstractRequest {
+
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.LEAVE_GROUP.id);
+ private static final String GROUP_ID_KEY_NAME = "group_id";
+ private static final String CONSUMER_ID_KEY_NAME = "consumer_id";
+
+ private final String groupId;
+ private final String consumerId;
+
+ public LeaveGroupRequest(String groupId, String consumerId) {
+ super(new Struct(CURRENT_SCHEMA));
+ struct.set(GROUP_ID_KEY_NAME, groupId);
+ struct.set(CONSUMER_ID_KEY_NAME, consumerId);
+ this.groupId = groupId;
+ this.consumerId = consumerId;
+ }
+
+ public LeaveGroupRequest(Struct struct) {
+ super(struct);
+ groupId = struct.getString(GROUP_ID_KEY_NAME);
+ consumerId = struct.getString(CONSUMER_ID_KEY_NAME);
+ }
+
+ @Override
+ public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+ switch (versionId) {
+ case 0:
+ return new LeaveGroupResponse(Errors.forException(e).code());
+ default:
+ throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+ versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.LEAVE_GROUP.id)));
+ }
+ }
+
+ public String groupId() {
+ return groupId;
+ }
+
+ public String consumerId() {
+ return consumerId;
+ }
+
+ public static LeaveGroupRequest parse(ByteBuffer buffer, int versionId) {
+ return new LeaveGroupRequest(ProtoUtils.parseRequest(ApiKeys.LEAVE_GROUP.id, versionId, buffer));
+ }
+
+ public static LeaveGroupRequest parse(ByteBuffer buffer) {
+ return new LeaveGroupRequest((Struct) CURRENT_SCHEMA.read(buffer));
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/636e14a9/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
new file mode 100644
index 0000000..d2af1a1
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import java.nio.ByteBuffer;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+public class LeaveGroupResponse extends AbstractRequestResponse {
+
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.LEAVE_GROUP.id);
+ private static final String ERROR_CODE_KEY_NAME = "error_code";
+
+ /**
+ * Possible error code:
+ *
+ * CONSUMER_COORDINATOR_NOT_AVAILABLE (15)
+ * NOT_COORDINATOR_FOR_CONSUMER (16)
+ * UNKNOWN_CONSUMER_ID (25)
+ */
+
+ private final short errorCode;
+ public LeaveGroupResponse(short errorCode) {
+ super(new Struct(CURRENT_SCHEMA));
+ struct.set(ERROR_CODE_KEY_NAME, errorCode);
+ this.errorCode = errorCode;
+ }
+
+ public LeaveGroupResponse(Struct struct) {
+ super(struct);
+ errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
+ }
+
+ public short errorCode() {
+ return errorCode;
+ }
+
+ public static LeaveGroupResponse parse(ByteBuffer buffer) {
+ return new LeaveGroupResponse((Struct) CURRENT_SCHEMA.read(buffer));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/636e14a9/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index b668013..cabf591 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -58,6 +58,9 @@ public class RequestResponseTest {
createJoinGroupRequest(),
createJoinGroupRequest().getErrorResponse(0, new UnknownServerException()),
createJoinGroupResponse(),
+ createLeaveGroupRequest(),
+ createLeaveGroupRequest().getErrorResponse(0, new UnknownServerException()),
+ createLeaveGroupResponse(),
createListOffsetRequest(),
createListOffsetRequest().getErrorResponse(0, new UnknownServerException()),
createListOffsetResponse(),
@@ -184,6 +187,14 @@ public class RequestResponseTest {
return new JoinGroupResponse(Errors.NONE.code(), 1, "consumer1", Arrays.asList(new TopicPartition("test11", 1), new TopicPartition("test2", 1)));
}
+ private AbstractRequest createLeaveGroupRequest() {
+ return new LeaveGroupRequest("group1", "consumer1");
+ }
+
+ private AbstractRequestResponse createLeaveGroupResponse() {
+ return new LeaveGroupResponse(Errors.NONE.code());
+ }
+
private AbstractRequest createListOffsetRequest() {
Map<TopicPartition, ListOffsetRequest.PartitionData> offsetData = new HashMap<TopicPartition, ListOffsetRequest.PartitionData>();
offsetData.put(new TopicPartition("test", 0), new ListOffsetRequest.PartitionData(1000000L, 10));
http://git-wip-us.apache.org/repos/asf/kafka/blob/636e14a9/core/src/main/scala/kafka/api/RequestKeys.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/RequestKeys.scala b/core/src/main/scala/kafka/api/RequestKeys.scala
index 155cb65..8a22c1a 100644
--- a/core/src/main/scala/kafka/api/RequestKeys.scala
+++ b/core/src/main/scala/kafka/api/RequestKeys.scala
@@ -36,6 +36,7 @@ object RequestKeys {
val ConsumerMetadataKey: Short = 10
val JoinGroupKey: Short = 11
val HeartbeatKey: Short = 12
+ val LeaveGroupKey: Short = 13
val keyToNameAndDeserializerMap: Map[Short, (String, (ByteBuffer) => RequestOrResponse)]=
Map(ProduceKey -> ("Produce", ProducerRequest.readFrom),
http://git-wip-us.apache.org/repos/asf/kafka/blob/636e14a9/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
index 2cdab85..68ff4fc 100644
--- a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
@@ -188,6 +188,38 @@ class ConsumerCoordinator(val brokerId: Int,
}
}
+ def handleLeaveGroup(groupId: String, consumerId: String, responseCallback: Short => Unit) {
+ if (!isActive.get) {
+ responseCallback(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code)
+ } else if (!isCoordinatorForGroup(groupId)) {
+ responseCallback(Errors.NOT_COORDINATOR_FOR_CONSUMER.code)
+ } else {
+ val group = coordinatorMetadata.getGroup(groupId)
+ if (group == null) {
+ // if the group is marked as dead, it means some other thread has just removed the group
+ // from the coordinator metadata; this is likely that the group has migrated to some other
+ // coordinator OR the group is in a transient unstable phase. Let the consumer to retry
+ // joining without specified consumer id,
+ responseCallback(Errors.UNKNOWN_CONSUMER_ID.code)
+ } else {
+ group synchronized {
+ if (group.is(Dead)) {
+ responseCallback(Errors.UNKNOWN_CONSUMER_ID.code)
+ } else if (!group.has(consumerId)) {
+ responseCallback(Errors.UNKNOWN_CONSUMER_ID.code)
+ } else {
+ val consumer = group.get(consumerId)
+ removeHeartbeatForLeavingConsumer(group, consumer)
+ onConsumerFailure(group, consumer)
+ responseCallback(Errors.NONE.code)
+ if (group.is(PreparingRebalance))
+ rebalancePurgatory.checkAndComplete(ConsumerGroupKey(group.groupId))
+ }
+ }
+ }
+ }
+ }
+
def handleHeartbeat(groupId: String,
consumerId: String,
generationId: Int,
@@ -311,6 +343,12 @@ class ConsumerCoordinator(val brokerId: Int,
heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(consumerKey))
}
+ private def removeHeartbeatForLeavingConsumer(group: ConsumerGroupMetadata, consumer: ConsumerMetadata) {
+ consumer.isLeaving = true
+ val consumerKey = ConsumerKey(consumer.groupId, consumer.consumerId)
+ heartbeatPurgatory.checkAndComplete(consumerKey)
+ }
+
private def addConsumer(consumerId: String,
topics: Set[String],
sessionTimeoutMs: Int,
@@ -370,7 +408,7 @@ class ConsumerCoordinator(val brokerId: Int,
info("Stabilized group %s generation %s".format(group.groupId, group.generationId))
}
- private def onConsumerHeartbeatExpired(group: ConsumerGroupMetadata, consumer: ConsumerMetadata) {
+ private def onConsumerFailure(group: ConsumerGroupMetadata, consumer: ConsumerMetadata) {
trace("Consumer %s in group %s has failed".format(consumer.consumerId, group.groupId))
removeConsumer(group, consumer)
maybePrepareRebalance(group)
@@ -389,7 +427,7 @@ class ConsumerCoordinator(val brokerId: Int,
def tryCompleteRebalance(group: ConsumerGroupMetadata, forceComplete: () => Boolean) = {
group synchronized {
- if (group.notYetRejoinedConsumers == List.empty[ConsumerMetadata])
+ if (group.notYetRejoinedConsumers.isEmpty)
forceComplete()
else false
}
@@ -431,7 +469,7 @@ class ConsumerCoordinator(val brokerId: Int,
def tryCompleteHeartbeat(group: ConsumerGroupMetadata, consumer: ConsumerMetadata, heartbeatDeadline: Long, forceComplete: () => Boolean) = {
group synchronized {
- if (shouldKeepConsumerAlive(consumer, heartbeatDeadline))
+ if (shouldKeepConsumerAlive(consumer, heartbeatDeadline) || consumer.isLeaving)
forceComplete()
else false
}
@@ -440,7 +478,7 @@ class ConsumerCoordinator(val brokerId: Int,
def onExpirationHeartbeat(group: ConsumerGroupMetadata, consumer: ConsumerMetadata, heartbeatDeadline: Long) {
group synchronized {
if (!shouldKeepConsumerAlive(consumer, heartbeatDeadline))
- onConsumerHeartbeatExpired(group, consumer)
+ onConsumerFailure(group, consumer)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/636e14a9/core/src/main/scala/kafka/coordinator/ConsumerMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/ConsumerMetadata.scala b/core/src/main/scala/kafka/coordinator/ConsumerMetadata.scala
index d5486cf..64ed4a5 100644
--- a/core/src/main/scala/kafka/coordinator/ConsumerMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/ConsumerMetadata.scala
@@ -46,4 +46,5 @@ private[coordinator] class ConsumerMetadata(val consumerId: String,
var awaitingRebalanceCallback: (Set[TopicAndPartition], String, Int, Short) => Unit = null
var assignedTopicPartitions = Set.empty[TopicAndPartition]
var latestHeartbeat: Long = -1
+ var isLeaving: Boolean = false
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/636e14a9/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 72f3044..5715626 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -30,7 +30,7 @@ import kafka.coordinator.ConsumerCoordinator
import kafka.log._
import kafka.network._
import kafka.network.RequestChannel.{Session, Response}
-import org.apache.kafka.common.requests.{JoinGroupRequest, JoinGroupResponse, HeartbeatRequest, HeartbeatResponse, ResponseHeader, ResponseSend}
+import org.apache.kafka.common.requests.{JoinGroupRequest, JoinGroupResponse, HeartbeatRequest, HeartbeatResponse, LeaveGroupRequest, LeaveGroupResponse, ResponseHeader, ResponseSend}
import kafka.utils.{ZkUtils, ZKGroupTopicDirs, SystemTime, Logging}
import scala.collection._
import org.I0Itec.zkclient.ZkClient
@@ -76,6 +76,7 @@ class KafkaApis(val requestChannel: RequestChannel,
case RequestKeys.ConsumerMetadataKey => handleConsumerMetadataRequest(request)
case RequestKeys.JoinGroupKey => handleJoinGroupRequest(request)
case RequestKeys.HeartbeatKey => handleHeartbeatRequest(request)
+ case RequestKeys.LeaveGroupKey => handleLeaveGroupRequest(request)
case requestId => throw new KafkaException("Unknown api code " + requestId)
}
} catch {
@@ -775,6 +776,25 @@ class KafkaApis(val requestChannel: RequestChannel,
quotaManagers
}
+ def handleLeaveGroupRequest(request: RequestChannel.Request) {
+ val leaveGroupRequest = request.body.asInstanceOf[LeaveGroupRequest]
+ val respHeader = new ResponseHeader(request.header.correlationId)
+
+ // the callback for sending a leave-group response
+ def sendResponseCallback(errorCode: Short) {
+ val response = new LeaveGroupResponse(errorCode)
+ trace("Sending leave group response %s for correlation id %d to client %s."
+ .format(response, request.header.correlationId, request.header.clientId))
+ requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, respHeader, response)))
+ }
+
+ // let the coordinator to handle leave-group
+ coordinator.handleLeaveGroup(
+ leaveGroupRequest.groupId(),
+ leaveGroupRequest.consumerId(),
+ sendResponseCallback)
+ }
+
def close() {
quotaManagers.foreach { case (apiKey, quotaManager) =>
quotaManager.shutdown()
http://git-wip-us.apache.org/repos/asf/kafka/blob/636e14a9/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala
index 3e763c3..c108955 100644
--- a/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala
@@ -43,6 +43,8 @@ class ConsumerCoordinatorResponseTest extends JUnitSuite {
type HeartbeatCallback = Short => Unit
type CommitOffsetCallbackParams = Map[TopicAndPartition, Short]
type CommitOffsetCallback = Map[TopicAndPartition, Short] => Unit
+ type LeaveGroupCallbackParams = Short
+ type LeaveGroupCallback = Short => Unit
val ConsumerMinSessionTimeout = 10
val ConsumerMaxSessionTimeout = 200
@@ -301,6 +303,56 @@ class ConsumerCoordinatorResponseTest extends JUnitSuite {
assertEquals(Errors.NONE.code, otherJoinGroupErrorCode)
}
+ @Test
+ def testLeaveGroupWrongCoordinator() {
+ val groupId = "groupId"
+ val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
+
+ val leaveGroupResult = leaveGroup(groupId, consumerId, isCoordinatorForGroup = false)
+ assertEquals(Errors.NOT_COORDINATOR_FOR_CONSUMER.code, leaveGroupResult)
+ }
+
+ @Test
+ def testLeaveGroupUnknownGroup() {
+ val groupId = "groupId"
+ val consumerId = "consumerId"
+
+ val leaveGroupResult = leaveGroup(groupId, consumerId, isCoordinatorForGroup = true)
+ assertEquals(Errors.UNKNOWN_CONSUMER_ID.code, leaveGroupResult)
+ }
+
+ @Test
+ def testLeaveGroupUnknownConsumerExistingGroup() {
+ val groupId = "groupId"
+ val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
+ val otherConsumerId = "consumerId"
+ val partitionAssignmentStrategy = "range"
+
+ val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true)
+ val joinGroupErrorCode = joinGroupResult._4
+ assertEquals(Errors.NONE.code, joinGroupErrorCode)
+
+ EasyMock.reset(offsetManager)
+ val leaveGroupResult = leaveGroup(groupId, otherConsumerId, isCoordinatorForGroup = true)
+ assertEquals(Errors.UNKNOWN_CONSUMER_ID.code, leaveGroupResult)
+ }
+
+ @Test
+ def testValidLeaveGroup() {
+ val groupId = "groupId"
+ val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
+ val partitionAssignmentStrategy = "range"
+
+ val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true)
+ val assignedConsumerId = joinGroupResult._2
+ val joinGroupErrorCode = joinGroupResult._4
+ assertEquals(Errors.NONE.code, joinGroupErrorCode)
+
+ EasyMock.reset(offsetManager)
+ val leaveGroupResult = leaveGroup(groupId, assignedConsumerId, isCoordinatorForGroup = true)
+ assertEquals(Errors.NONE.code, leaveGroupResult)
+ }
+
private def setupJoinGroupCallback: (Future[JoinGroupCallbackParams], JoinGroupCallback) = {
val responsePromise = Promise[JoinGroupCallbackParams]
val responseFuture = responsePromise.future
@@ -323,6 +375,13 @@ class ConsumerCoordinatorResponseTest extends JUnitSuite {
(responseFuture, responseCallback)
}
+ private def setupLeaveGroupCallback: (Future[LeaveGroupCallbackParams], LeaveGroupCallback) = {
+ val responsePromise = Promise[LeaveGroupCallbackParams]
+ val responseFuture = responsePromise.future
+ val responseCallback: LeaveGroupCallback = errorCode => responsePromise.success(errorCode)
+ (responseFuture, responseCallback)
+ }
+
private def sendJoinGroup(groupId: String,
consumerId: String,
partitionAssignmentStrategy: String,
@@ -374,5 +433,15 @@ class ConsumerCoordinatorResponseTest extends JUnitSuite {
EasyMock.replay(offsetManager)
consumerCoordinator.handleCommitOffsets(groupId, consumerId, generationId, offsets, responseCallback)
Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
+ Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
+ }
+
+ private def leaveGroup(groupId: String, consumerId: String, isCoordinatorForGroup: Boolean): LeaveGroupCallbackParams = {
+ val (responseFuture, responseCallback) = setupHeartbeatCallback
+ EasyMock.expect(offsetManager.partitionFor(groupId)).andReturn(1)
+ EasyMock.expect(offsetManager.leaderIsLocal(1)).andReturn(isCoordinatorForGroup)
+ EasyMock.replay(offsetManager)
+ consumerCoordinator.handleLeaveGroup(groupId, consumerId, responseCallback)
+ Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
}
}