You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/10/17 02:41:35 UTC

kafka git commit: KAFKA-2397: add leave group request to force coordinator trigger rebalance

Repository: kafka
Updated Branches:
  refs/heads/trunk e2e5c8914 -> 636e14a99


KAFKA-2397: add leave group request to force coordinator trigger rebalance

Let's say every consumer in a group has session timeout s. Currently, if a consumer leaves the group, the worst case time to stabilize the group is 2s (s to detect the consumer failure + s for the rebalance window). If a consumer instead can declare they are leaving the group, the worst case time to stabilize the group would just be the s associated with the rebalance window.

This is a low priority optimization!

Author: Onur Karaman <ok...@linkedin.com>

Reviewers: Jason Gustafson, Guozhang Wang

Closes #103 from onurkaraman/leave-group


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/636e14a9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/636e14a9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/636e14a9

Branch: refs/heads/trunk
Commit: 636e14a99191eeededfb933aacfe2459c7c7bb6f
Parents: e2e5c89
Author: Onur Karaman <ok...@linkedin.com>
Authored: Fri Oct 16 17:46:17 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Oct 16 17:46:17 2015 -0700

----------------------------------------------------------------------
 .../apache/kafka/common/protocol/ApiKeys.java   |  3 +-
 .../apache/kafka/common/protocol/Protocol.java  | 13 ++++
 .../kafka/common/requests/AbstractRequest.java  |  2 +
 .../common/requests/LeaveGroupRequest.java      | 71 ++++++++++++++++++++
 .../common/requests/LeaveGroupResponse.java     | 53 +++++++++++++++
 .../common/requests/RequestResponseTest.java    | 11 +++
 core/src/main/scala/kafka/api/RequestKeys.scala |  1 +
 .../kafka/coordinator/ConsumerCoordinator.scala | 46 +++++++++++--
 .../kafka/coordinator/ConsumerMetadata.scala    |  1 +
 .../src/main/scala/kafka/server/KafkaApis.scala | 22 +++++-
 .../ConsumerCoordinatorResponseTest.scala       | 69 +++++++++++++++++++
 11 files changed, 286 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/636e14a9/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 46ddddb..fab8b02 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -32,7 +32,8 @@ public enum ApiKeys {
     OFFSET_FETCH(9, "OffsetFetch"),
     CONSUMER_METADATA(10, "ConsumerMetadata"),
     JOIN_GROUP(11, "JoinGroup"),
-    HEARTBEAT(12, "Heartbeat");
+    HEARTBEAT(12, "Heartbeat"),
+    LEAVE_GROUP(13, "LeaveGroup");
 
     private static ApiKeys[] codeToType;
     public static final int MAX_API_KEY;

http://git-wip-us.apache.org/repos/asf/kafka/blob/636e14a9/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index b72db4f..9f8e981 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -461,6 +461,17 @@ public class Protocol {
     public static final Schema[] HEARTBEAT_REQUEST = new Schema[] {HEARTBEAT_REQUEST_V0};
     public static final Schema[] HEARTBEAT_RESPONSE = new Schema[] {HEARTBEAT_RESPONSE_V0};
 
+    /* Leave group api */
+    public static final Schema LEAVE_GROUP_REQUEST_V0 = new Schema(new Field("group_id", STRING, "The consumer group id."),
+                                                                   new Field("consumer_id",
+                                                                             STRING,
+                                                                             "The consumer id assigned by the group coordinator."));
+
+    public static final Schema LEAVE_GROUP_RESPONSE_V0 = new Schema(new Field("error_code", INT16));
+
+    public static final Schema[] LEAVE_GROUP_REQUEST = new Schema[] {LEAVE_GROUP_REQUEST_V0};
+    public static final Schema[] LEAVE_GROUP_RESPONSE = new Schema[] {LEAVE_GROUP_RESPONSE_V0};
+
     /* Leader and ISR api */
     public static final Schema LEADER_AND_ISR_REQUEST_PARTITION_STATE_V0 =
             new Schema(new Field("topic", STRING, "Topic name."),
@@ -581,6 +592,7 @@ public class Protocol {
         REQUESTS[ApiKeys.CONSUMER_METADATA.id] = CONSUMER_METADATA_REQUEST;
         REQUESTS[ApiKeys.JOIN_GROUP.id] = JOIN_GROUP_REQUEST;
         REQUESTS[ApiKeys.HEARTBEAT.id] = HEARTBEAT_REQUEST;
+        REQUESTS[ApiKeys.LEAVE_GROUP.id] = LEAVE_GROUP_REQUEST;
 
 
         RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE;
@@ -596,6 +608,7 @@ public class Protocol {
         RESPONSES[ApiKeys.CONSUMER_METADATA.id] = CONSUMER_METADATA_RESPONSE;
         RESPONSES[ApiKeys.JOIN_GROUP.id] = JOIN_GROUP_RESPONSE;
         RESPONSES[ApiKeys.HEARTBEAT.id] = HEARTBEAT_RESPONSE;
+        RESPONSES[ApiKeys.LEAVE_GROUP.id] = LEAVE_GROUP_RESPONSE;
 
         /* set the maximum version of each api */
         for (ApiKeys api : ApiKeys.values())

http://git-wip-us.apache.org/repos/asf/kafka/blob/636e14a9/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index a696e80..095cd52 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -55,6 +55,8 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
                 return JoinGroupRequest.parse(buffer, versionId);
             case HEARTBEAT:
                 return HeartbeatRequest.parse(buffer, versionId);
+            case LEAVE_GROUP:
+                return LeaveGroupRequest.parse(buffer, versionId);
             case STOP_REPLICA:
                 return StopReplicaRequest.parse(buffer, versionId);
             case CONTROLLED_SHUTDOWN_KEY:

http://git-wip-us.apache.org/repos/asf/kafka/blob/636e14a9/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
new file mode 100644
index 0000000..fcc056a
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import java.nio.ByteBuffer;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+public class LeaveGroupRequest extends AbstractRequest {
+
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.LEAVE_GROUP.id);
+    private static final String GROUP_ID_KEY_NAME = "group_id";
+    private static final String CONSUMER_ID_KEY_NAME = "consumer_id";
+
+    private final String groupId;
+    private final String consumerId;
+
+    public LeaveGroupRequest(String groupId, String consumerId) {
+        super(new Struct(CURRENT_SCHEMA));
+        struct.set(GROUP_ID_KEY_NAME, groupId);
+        struct.set(CONSUMER_ID_KEY_NAME, consumerId);
+        this.groupId = groupId;
+        this.consumerId = consumerId;
+    }
+
+    public LeaveGroupRequest(Struct struct) {
+        super(struct);
+        groupId = struct.getString(GROUP_ID_KEY_NAME);
+        consumerId = struct.getString(CONSUMER_ID_KEY_NAME);
+    }
+
+    @Override
+    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+        switch (versionId) {
+            case 0:
+                return new LeaveGroupResponse(Errors.forException(e).code());
+            default:
+                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+                        versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.LEAVE_GROUP.id)));
+        }
+    }
+
+    public String groupId() {
+        return groupId;
+    }
+
+    public String consumerId() {
+        return consumerId;
+    }
+
+    public static LeaveGroupRequest parse(ByteBuffer buffer, int versionId) {
+        return new LeaveGroupRequest(ProtoUtils.parseRequest(ApiKeys.LEAVE_GROUP.id, versionId, buffer));
+    }
+
+    public static LeaveGroupRequest parse(ByteBuffer buffer) {
+        return new LeaveGroupRequest((Struct) CURRENT_SCHEMA.read(buffer));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/636e14a9/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
new file mode 100644
index 0000000..d2af1a1
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import java.nio.ByteBuffer;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+public class LeaveGroupResponse extends AbstractRequestResponse {
+
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.LEAVE_GROUP.id);
+    private static final String ERROR_CODE_KEY_NAME = "error_code";
+
+    /**
+     * Possible error code:
+     *
+     * CONSUMER_COORDINATOR_NOT_AVAILABLE (15)
+     * NOT_COORDINATOR_FOR_CONSUMER (16)
+     * UNKNOWN_CONSUMER_ID (25)
+     */
+
+    private final short errorCode;
+    public LeaveGroupResponse(short errorCode) {
+        super(new Struct(CURRENT_SCHEMA));
+        struct.set(ERROR_CODE_KEY_NAME, errorCode);
+        this.errorCode = errorCode;
+    }
+
+    public LeaveGroupResponse(Struct struct) {
+        super(struct);
+        errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
+    }
+
+    public short errorCode() {
+        return errorCode;
+    }
+
+    public static LeaveGroupResponse parse(ByteBuffer buffer) {
+        return new LeaveGroupResponse((Struct) CURRENT_SCHEMA.read(buffer));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/636e14a9/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index b668013..cabf591 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -58,6 +58,9 @@ public class RequestResponseTest {
                 createJoinGroupRequest(),
                 createJoinGroupRequest().getErrorResponse(0, new UnknownServerException()),
                 createJoinGroupResponse(),
+                createLeaveGroupRequest(),
+                createLeaveGroupRequest().getErrorResponse(0, new UnknownServerException()),
+                createLeaveGroupResponse(),
                 createListOffsetRequest(),
                 createListOffsetRequest().getErrorResponse(0, new UnknownServerException()),
                 createListOffsetResponse(),
@@ -184,6 +187,14 @@ public class RequestResponseTest {
         return new JoinGroupResponse(Errors.NONE.code(), 1, "consumer1", Arrays.asList(new TopicPartition("test11", 1), new TopicPartition("test2", 1)));
     }
 
+    private AbstractRequest createLeaveGroupRequest() {
+        return new LeaveGroupRequest("group1", "consumer1");
+    }
+
+    private AbstractRequestResponse createLeaveGroupResponse() {
+        return new LeaveGroupResponse(Errors.NONE.code());
+    }
+
     private AbstractRequest createListOffsetRequest() {
         Map<TopicPartition, ListOffsetRequest.PartitionData> offsetData = new HashMap<TopicPartition, ListOffsetRequest.PartitionData>();
         offsetData.put(new TopicPartition("test", 0), new ListOffsetRequest.PartitionData(1000000L, 10));

http://git-wip-us.apache.org/repos/asf/kafka/blob/636e14a9/core/src/main/scala/kafka/api/RequestKeys.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/RequestKeys.scala b/core/src/main/scala/kafka/api/RequestKeys.scala
index 155cb65..8a22c1a 100644
--- a/core/src/main/scala/kafka/api/RequestKeys.scala
+++ b/core/src/main/scala/kafka/api/RequestKeys.scala
@@ -36,6 +36,7 @@ object RequestKeys {
   val ConsumerMetadataKey: Short = 10
   val JoinGroupKey: Short = 11
   val HeartbeatKey: Short = 12
+  val LeaveGroupKey: Short = 13
 
   val keyToNameAndDeserializerMap: Map[Short, (String, (ByteBuffer) => RequestOrResponse)]=
     Map(ProduceKey -> ("Produce", ProducerRequest.readFrom),

http://git-wip-us.apache.org/repos/asf/kafka/blob/636e14a9/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
index 2cdab85..68ff4fc 100644
--- a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
@@ -188,6 +188,38 @@ class ConsumerCoordinator(val brokerId: Int,
     }
   }
 
+  def handleLeaveGroup(groupId: String, consumerId: String, responseCallback: Short => Unit) {
+    if (!isActive.get) {
+      responseCallback(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code)
+    } else if (!isCoordinatorForGroup(groupId)) {
+      responseCallback(Errors.NOT_COORDINATOR_FOR_CONSUMER.code)
+    } else {
+      val group = coordinatorMetadata.getGroup(groupId)
+      if (group == null) {
+        // if the group is marked as dead, it means some other thread has just removed the group
+        // from the coordinator metadata; this is likely that the group has migrated to some other
+        // coordinator OR the group is in a transient unstable phase. Let the consumer to retry
+        // joining without specified consumer id,
+        responseCallback(Errors.UNKNOWN_CONSUMER_ID.code)
+      } else {
+        group synchronized {
+          if (group.is(Dead)) {
+            responseCallback(Errors.UNKNOWN_CONSUMER_ID.code)
+          } else if (!group.has(consumerId)) {
+            responseCallback(Errors.UNKNOWN_CONSUMER_ID.code)
+          } else {
+            val consumer = group.get(consumerId)
+            removeHeartbeatForLeavingConsumer(group, consumer)
+            onConsumerFailure(group, consumer)
+            responseCallback(Errors.NONE.code)
+            if (group.is(PreparingRebalance))
+              rebalancePurgatory.checkAndComplete(ConsumerGroupKey(group.groupId))
+          }
+        }
+      }
+    }
+  }
+
   def handleHeartbeat(groupId: String,
                       consumerId: String,
                       generationId: Int,
@@ -311,6 +343,12 @@ class ConsumerCoordinator(val brokerId: Int,
     heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(consumerKey))
   }
 
+  private def removeHeartbeatForLeavingConsumer(group: ConsumerGroupMetadata, consumer: ConsumerMetadata) {
+    consumer.isLeaving = true
+    val consumerKey = ConsumerKey(consumer.groupId, consumer.consumerId)
+    heartbeatPurgatory.checkAndComplete(consumerKey)
+  }
+
   private def addConsumer(consumerId: String,
                           topics: Set[String],
                           sessionTimeoutMs: Int,
@@ -370,7 +408,7 @@ class ConsumerCoordinator(val brokerId: Int,
     info("Stabilized group %s generation %s".format(group.groupId, group.generationId))
   }
 
-  private def onConsumerHeartbeatExpired(group: ConsumerGroupMetadata, consumer: ConsumerMetadata) {
+  private def onConsumerFailure(group: ConsumerGroupMetadata, consumer: ConsumerMetadata) {
     trace("Consumer %s in group %s has failed".format(consumer.consumerId, group.groupId))
     removeConsumer(group, consumer)
     maybePrepareRebalance(group)
@@ -389,7 +427,7 @@ class ConsumerCoordinator(val brokerId: Int,
 
   def tryCompleteRebalance(group: ConsumerGroupMetadata, forceComplete: () => Boolean) = {
     group synchronized {
-      if (group.notYetRejoinedConsumers == List.empty[ConsumerMetadata])
+      if (group.notYetRejoinedConsumers.isEmpty)
         forceComplete()
       else false
     }
@@ -431,7 +469,7 @@ class ConsumerCoordinator(val brokerId: Int,
 
   def tryCompleteHeartbeat(group: ConsumerGroupMetadata, consumer: ConsumerMetadata, heartbeatDeadline: Long, forceComplete: () => Boolean) = {
     group synchronized {
-      if (shouldKeepConsumerAlive(consumer, heartbeatDeadline))
+      if (shouldKeepConsumerAlive(consumer, heartbeatDeadline) || consumer.isLeaving)
         forceComplete()
       else false
     }
@@ -440,7 +478,7 @@ class ConsumerCoordinator(val brokerId: Int,
   def onExpirationHeartbeat(group: ConsumerGroupMetadata, consumer: ConsumerMetadata, heartbeatDeadline: Long) {
     group synchronized {
       if (!shouldKeepConsumerAlive(consumer, heartbeatDeadline))
-        onConsumerHeartbeatExpired(group, consumer)
+        onConsumerFailure(group, consumer)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/636e14a9/core/src/main/scala/kafka/coordinator/ConsumerMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/ConsumerMetadata.scala b/core/src/main/scala/kafka/coordinator/ConsumerMetadata.scala
index d5486cf..64ed4a5 100644
--- a/core/src/main/scala/kafka/coordinator/ConsumerMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/ConsumerMetadata.scala
@@ -46,4 +46,5 @@ private[coordinator] class ConsumerMetadata(val consumerId: String,
   var awaitingRebalanceCallback: (Set[TopicAndPartition], String, Int, Short) => Unit = null
   var assignedTopicPartitions = Set.empty[TopicAndPartition]
   var latestHeartbeat: Long = -1
+  var isLeaving: Boolean = false
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/636e14a9/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 72f3044..5715626 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -30,7 +30,7 @@ import kafka.coordinator.ConsumerCoordinator
 import kafka.log._
 import kafka.network._
 import kafka.network.RequestChannel.{Session, Response}
-import org.apache.kafka.common.requests.{JoinGroupRequest, JoinGroupResponse, HeartbeatRequest, HeartbeatResponse, ResponseHeader, ResponseSend}
+import org.apache.kafka.common.requests.{JoinGroupRequest, JoinGroupResponse, HeartbeatRequest, HeartbeatResponse, LeaveGroupRequest, LeaveGroupResponse, ResponseHeader, ResponseSend}
 import kafka.utils.{ZkUtils, ZKGroupTopicDirs, SystemTime, Logging}
 import scala.collection._
 import org.I0Itec.zkclient.ZkClient
@@ -76,6 +76,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case RequestKeys.ConsumerMetadataKey => handleConsumerMetadataRequest(request)
         case RequestKeys.JoinGroupKey => handleJoinGroupRequest(request)
         case RequestKeys.HeartbeatKey => handleHeartbeatRequest(request)
+        case RequestKeys.LeaveGroupKey => handleLeaveGroupRequest(request)
         case requestId => throw new KafkaException("Unknown api code " + requestId)
       }
     } catch {
@@ -775,6 +776,25 @@ class KafkaApis(val requestChannel: RequestChannel,
     quotaManagers
   }
 
+  def handleLeaveGroupRequest(request: RequestChannel.Request) {
+    val leaveGroupRequest = request.body.asInstanceOf[LeaveGroupRequest]
+    val respHeader = new ResponseHeader(request.header.correlationId)
+
+    // the callback for sending a leave-group response
+    def sendResponseCallback(errorCode: Short) {
+      val response = new LeaveGroupResponse(errorCode)
+      trace("Sending leave group response %s for correlation id %d to client %s."
+                    .format(response, request.header.correlationId, request.header.clientId))
+      requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, respHeader, response)))
+    }
+
+    // let the coordinator to handle leave-group
+    coordinator.handleLeaveGroup(
+      leaveGroupRequest.groupId(),
+      leaveGroupRequest.consumerId(),
+      sendResponseCallback)
+  }
+
   def close() {
     quotaManagers.foreach { case (apiKey, quotaManager) =>
       quotaManager.shutdown()

http://git-wip-us.apache.org/repos/asf/kafka/blob/636e14a9/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala
index 3e763c3..c108955 100644
--- a/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala
@@ -43,6 +43,8 @@ class ConsumerCoordinatorResponseTest extends JUnitSuite {
   type HeartbeatCallback = Short => Unit
   type CommitOffsetCallbackParams = Map[TopicAndPartition, Short]
   type CommitOffsetCallback = Map[TopicAndPartition, Short] => Unit
+  type LeaveGroupCallbackParams = Short
+  type LeaveGroupCallback = Short => Unit
 
   val ConsumerMinSessionTimeout = 10
   val ConsumerMaxSessionTimeout = 200
@@ -301,6 +303,56 @@ class ConsumerCoordinatorResponseTest extends JUnitSuite {
     assertEquals(Errors.NONE.code, otherJoinGroupErrorCode)
   }
 
+  @Test
+  def testLeaveGroupWrongCoordinator() {
+    val groupId = "groupId"
+    val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
+
+    val leaveGroupResult = leaveGroup(groupId, consumerId, isCoordinatorForGroup = false)
+    assertEquals(Errors.NOT_COORDINATOR_FOR_CONSUMER.code, leaveGroupResult)
+  }
+
+  @Test
+  def testLeaveGroupUnknownGroup() {
+    val groupId = "groupId"
+    val consumerId = "consumerId"
+
+    val leaveGroupResult = leaveGroup(groupId, consumerId, isCoordinatorForGroup = true)
+    assertEquals(Errors.UNKNOWN_CONSUMER_ID.code, leaveGroupResult)
+  }
+
+  @Test
+  def testLeaveGroupUnknownConsumerExistingGroup() {
+    val groupId = "groupId"
+    val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
+    val otherConsumerId = "consumerId"
+    val partitionAssignmentStrategy = "range"
+
+    val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true)
+    val joinGroupErrorCode = joinGroupResult._4
+    assertEquals(Errors.NONE.code, joinGroupErrorCode)
+
+    EasyMock.reset(offsetManager)
+    val leaveGroupResult = leaveGroup(groupId, otherConsumerId, isCoordinatorForGroup = true)
+    assertEquals(Errors.UNKNOWN_CONSUMER_ID.code, leaveGroupResult)
+  }
+
+  @Test
+  def testValidLeaveGroup() {
+    val groupId = "groupId"
+    val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
+    val partitionAssignmentStrategy = "range"
+
+    val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true)
+    val assignedConsumerId = joinGroupResult._2
+    val joinGroupErrorCode = joinGroupResult._4
+    assertEquals(Errors.NONE.code, joinGroupErrorCode)
+
+    EasyMock.reset(offsetManager)
+    val leaveGroupResult = leaveGroup(groupId, assignedConsumerId, isCoordinatorForGroup = true)
+    assertEquals(Errors.NONE.code, leaveGroupResult)
+  }
+
   private def setupJoinGroupCallback: (Future[JoinGroupCallbackParams], JoinGroupCallback) = {
     val responsePromise = Promise[JoinGroupCallbackParams]
     val responseFuture = responsePromise.future
@@ -323,6 +375,13 @@ class ConsumerCoordinatorResponseTest extends JUnitSuite {
     (responseFuture, responseCallback)
   }
 
+  private def setupLeaveGroupCallback: (Future[LeaveGroupCallbackParams], LeaveGroupCallback) = {
+    val responsePromise = Promise[LeaveGroupCallbackParams]
+    val responseFuture = responsePromise.future
+    val responseCallback: LeaveGroupCallback = errorCode => responsePromise.success(errorCode)
+    (responseFuture, responseCallback)
+  }
+
   private def sendJoinGroup(groupId: String,
                             consumerId: String,
                             partitionAssignmentStrategy: String,
@@ -374,5 +433,15 @@ class ConsumerCoordinatorResponseTest extends JUnitSuite {
     EasyMock.replay(offsetManager)
     consumerCoordinator.handleCommitOffsets(groupId, consumerId, generationId, offsets, responseCallback)
     Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
+    Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
+  }
+
+  private def leaveGroup(groupId: String, consumerId: String, isCoordinatorForGroup: Boolean): LeaveGroupCallbackParams = {
+    val (responseFuture, responseCallback) = setupHeartbeatCallback
+    EasyMock.expect(offsetManager.partitionFor(groupId)).andReturn(1)
+    EasyMock.expect(offsetManager.leaderIsLocal(1)).andReturn(isCoordinatorForGroup)
+    EasyMock.replay(offsetManager)
+    consumerCoordinator.handleLeaveGroup(groupId, consumerId, responseCallback)
+    Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
   }
 }