You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/02/10 00:25:19 UTC

kafka git commit: KAFKA-1333 follow-up; Add missing files for the coordinator folder

Repository: kafka
Updated Branches:
  refs/heads/trunk 39cd48de3 -> 71602de0b


KAFKA-1333 follow-up; Add missing files for the coordinator folder


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/71602de0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/71602de0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/71602de0

Branch: refs/heads/trunk
Commit: 71602de0bbf7727f498a812033027f6cbfe34eb8
Parents: 39cd48d
Author: Guozhang Wang <wa...@gmail.com>
Authored: Mon Feb 9 15:24:44 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Feb 9 15:24:44 2015 -0800

----------------------------------------------------------------------
 .../kafka/coordinator/ConsumerCoordinator.scala | 333 +++++++++++++++++++
 .../kafka/coordinator/ConsumerRegistry.scala    |  52 +++
 .../kafka/coordinator/DelayedHeartbeat.scala    |  44 +++
 .../kafka/coordinator/DelayedJoinGroup.scala    |  44 +++
 .../kafka/coordinator/DelayedRebalance.scala    |  62 ++++
 .../scala/kafka/coordinator/GroupRegistry.scala |  74 +++++
 .../kafka/coordinator/HeartbeatBucket.scala     |  36 ++
 7 files changed, 645 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/71602de0/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
new file mode 100644
index 0000000..01cf1d9
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
@@ -0,0 +1,333 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator
+
+import org.apache.kafka.common.protocol.Errors
+
+import kafka.common.TopicAndPartition
+import kafka.server._
+import kafka.utils._
+
+import scala.collection.mutable.HashMap
+
+import org.I0Itec.zkclient.{IZkChildListener, ZkClient}
+
+
+/**
+ * Kafka coordinator handles consumer group and consumer offset management.
+ *
+ * Each Kafka server instantiates a coordinator, which is responsible for a set of
+ * consumer groups; the consumer groups are assigned to coordinators based on their
+ * group names.
+ */
+class ConsumerCoordinator(val config: KafkaConfig,
+                          val zkClient: ZkClient) extends Logging {
+
+  this.logIdent = "[Kafka Coordinator " + config.brokerId + "]: "
+
+  /* zookeeper listener for topic-partition changes */
+  private val topicPartitionChangeListeners = new HashMap[String, TopicPartitionChangeListener]
+
+  /* the consumer group registry cache */
+  // TODO: access to this map needs to be synchronized
+  private val consumerGroupRegistries = new HashMap[String, GroupRegistry]
+
+  /* the list of subscribed groups per topic */
+  // TODO: access to this map needs to be synchronized
+  private val consumerGroupsPerTopic = new HashMap[String, List[String]]
+
+  /* the delayed operation purgatory for heartbeat-based failure detection */
+  private var heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat] = null
+
+  /* the delayed operation purgatory for handling join-group requests */
+  private var joinGroupPurgatory: DelayedOperationPurgatory[DelayedJoinGroup] = null
+
+  /* the delayed operation purgatory for preparing rebalance process */
+  private var rebalancePurgatory: DelayedOperationPurgatory[DelayedRebalance] = null
+
+  /* latest consumer heartbeat bucket's end timestamp in milliseconds */
+  private var latestHeartbeatBucketEndMs: Long = SystemTime.milliseconds
+
+  /**
+   * Start-up logic executed at the same time when the server starts up.
+   */
+  def startup() {
+
+    // Initialize consumer group registries and heartbeat bucket metadata
+    latestHeartbeatBucketEndMs = SystemTime.milliseconds
+
+    // Initialize purgatories for delayed heartbeat, join-group and rebalance operations
+    heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat](config.brokerId)
+    joinGroupPurgatory = new DelayedOperationPurgatory[DelayedJoinGroup](config.brokerId)
+    rebalancePurgatory = new DelayedOperationPurgatory[DelayedRebalance](config.brokerId)
+
+  }
+
+  /**
+   * Shut-down logic executed at the same time when server shuts down,
+   * ordering of actions should be reversed from the start-up process
+   *
+   */
+  def shutdown() {
+
+    // De-register all Zookeeper listeners for topic-partition changes
+    for (topic <- topicPartitionChangeListeners.keys)  {
+      deregisterTopicChangeListener(topic)
+    }
+    topicPartitionChangeListeners.clear()
+
+    // Shutdown purgatories for delayed heartbeat, join-group and rebalance operations
+    heartbeatPurgatory.shutdown()
+    joinGroupPurgatory.shutdown()
+    rebalancePurgatory.shutdown()
+
+    // Clean up consumer group registries metadata
+    consumerGroupRegistries.clear()
+    consumerGroupsPerTopic.clear()
+  }
+
+  /**
+   * Process a join-group request from a consumer to join as a new group member
+   */
+  def consumerJoinGroup(groupId: String,
+                        consumerId: String,
+                        topics: List[String],
+                        sessionTimeoutMs: Int,
+                        partitionAssignmentStrategy: String,
+                        responseCallback:(List[TopicAndPartition], Int, Short) => Unit ) {
+
+    // if the group does not exist yet, create one
+    if (!consumerGroupRegistries.contains(groupId))
+      createNewGroup(groupId, partitionAssignmentStrategy)
+
+    // if the consumer id is unknown or it does exists in
+    // the group yet, register this consumer to the group
+    // TODO
+
+    // add a delayed join-group operation to the purgatory
+    // TODO
+
+    // if the current group is under rebalance process,
+    // check if the delayed rebalance operation can be finished
+    // TODO
+
+    // TODO --------------------------------------------------------------
+    // TODO: this is just a stub for new consumer testing,
+    // TODO: needs to be replaced with the logic above
+    // TODO --------------------------------------------------------------
+    // just return all the partitions of the subscribed topics
+    val partitionIdsPerTopic = ZkUtils.getPartitionsForTopics(zkClient, topics)
+    val partitions = partitionIdsPerTopic.flatMap{ case (topic, partitionIds) =>
+      partitionIds.map(partition => {
+        TopicAndPartition(topic, partition)
+      })
+    }.toList
+
+    responseCallback(partitions, 1 /* generation id */, Errors.NONE.code)
+
+    info("Handled join-group from consumer " + consumerId + " to group " + groupId)
+  }
+
+  /**
+   * Process a heartbeat request from a consumer
+   */
+  def consumerHeartbeat(groupId: String,
+                      consumerId: String,
+                      generationId: Int,
+                      responseCallback: Short => Unit) {
+
+    // check that the group already exists
+    // TODO
+
+    // check that the consumer has already registered for the group
+    // TODO
+
+    // check if the consumer generation id is correct
+    // TODO
+
+    // remove the consumer from its current heartbeat bucket, and add it back to the corresponding bucket
+    // TODO
+
+    // create the heartbeat response, if partition rebalance is triggered set the corresponding error code
+    // TODO
+
+    info("Handled heartbeat of consumer " + consumerId + " from group " + groupId)
+
+    // TODO --------------------------------------------------------------
+    // TODO: this is just a stub for new consumer testing,
+    // TODO: needs to be replaced with the logic above
+    // TODO --------------------------------------------------------------
+    // always return OK for heartbeat immediately
+    responseCallback(Errors.NONE.code)
+  }
+
+  /**
+   * Create a new consumer
+   */
+  private def createNewConsumer(consumerId: String,
+                                topics: List[String],
+                                sessionTimeoutMs: Int,
+                                groupRegistry: GroupRegistry) {
+    debug("Registering consumer " + consumerId + " for group " + groupRegistry.groupId)
+
+    // create the new consumer registry entry
+    // TODO: specify consumerId as unknown and update at the end of the prepare-rebalance phase
+
+    // check if the partition assignment strategy is consistent with the group
+    // TODO
+
+    // add the group to the subscribed topics
+    // TODO
+
+    // schedule heartbeat tasks for the consumer
+    // TODO
+
+    // add the member registry entry to the group
+    // TODO
+
+    // start preparing group partition rebalance
+    // TODO
+
+    info("Registered consumer " + consumerId + " for group " + groupRegistry.groupId)
+  }
+
+  /**
+   * Create a new consumer group in the registry
+   */
+  private def createNewGroup(groupId: String, partitionAssignmentStrategy: String) {
+    debug("Creating new group " + groupId)
+
+    val groupRegistry = new GroupRegistry(groupId, partitionAssignmentStrategy)
+
+    consumerGroupRegistries.put(groupId, groupRegistry)
+
+    info("Created new group registry " + groupId)
+  }
+
+  /**
+   * Callback invoked when a consumer's heartbeat has expired
+   */
+  private def onConsumerHeartbeatExpired(groupId: String, consumerId: String) {
+
+    // if the consumer does not exist in group registry anymore, do nothing
+    // TODO
+
+    // record heartbeat failure
+    // TODO
+
+    // if the maximum failures has been reached, mark consumer as failed
+    // TODO
+  }
+
+  /**
+   * Callback invoked when a consumer is marked as failed
+   */
+  private def onConsumerFailure(groupId: String, consumerId: String) {
+
+    // remove the consumer from its group registry metadata
+    // TODO
+
+    // cut the socket connection to the consumer
+    // TODO: howto ??
+
+    // if the group has no consumer members any more, remove the group
+    // otherwise start preparing group partition rebalance
+    // TODO
+
+  }
+
+  /**
+   * Prepare partition rebalance for the group
+   */
+  private def prepareRebalance(groupId: String) {
+
+    // try to change the group state to PrepareRebalance
+
+    // add a task to the delayed rebalance purgatory
+
+    // TODO
+  }
+
+  /**
+   * Start partition rebalance for the group
+   */
+  private def startRebalance(groupId: String) {
+
+    // try to change the group state to UnderRebalance
+
+    // compute new assignment based on the strategy
+
+    // send back the join-group response
+
+    // TODO
+  }
+
+  /**
+   * Fail current partition rebalance for the group
+   */
+
+  /**
+   * Register ZK listeners for topic-partition changes
+   */
+  private def registerTopicChangeListener(topic: String) = {
+    if (!topicPartitionChangeListeners.contains(topic)) {
+      val listener = new TopicPartitionChangeListener(config)
+      topicPartitionChangeListeners.put(topic, listener)
+      ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.getTopicPath(topic))
+      zkClient.subscribeChildChanges(ZkUtils.getTopicPath(topic), listener)
+    }
+  }
+
+  /**
+   * De-register ZK listeners for topic-partition changes
+   */
+  private def deregisterTopicChangeListener(topic: String) = {
+    val listener = topicPartitionChangeListeners.get(topic).get
+    zkClient.unsubscribeChildChanges(ZkUtils.getTopicPath(topic), listener)
+    topicPartitionChangeListeners.remove(topic)
+  }
+
+  /**
+   * Zookeeper listener that catch topic-partition changes
+   */
+  class TopicPartitionChangeListener(val config: KafkaConfig) extends IZkChildListener with Logging {
+
+    this.logIdent = "[TopicChangeListener on coordinator " + config.brokerId + "]: "
+
+    /**
+     * Try to trigger a rebalance for each group subscribed in the changed topic
+     *
+     * @throws Exception
+     *            On any error.
+     */
+    def handleChildChange(parentPath: String , curChilds: java.util.List[String]) {
+      debug("Fired for path %s with children %s".format(parentPath, curChilds))
+
+      // get the topic
+      val topic = parentPath.split("/").last
+
+      // get groups that subscribed to this topic
+      val groups = consumerGroupsPerTopic.get(topic).get
+
+      for (groupId <- groups) {
+        prepareRebalance(groupId)
+      }
+    }
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/71602de0/core/src/main/scala/kafka/coordinator/ConsumerRegistry.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/ConsumerRegistry.scala b/core/src/main/scala/kafka/coordinator/ConsumerRegistry.scala
new file mode 100644
index 0000000..b65c04d
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/ConsumerRegistry.scala
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.coordinator
+
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
+import java.util.HashMap
+
+/**
+ * Consumer registry metadata contains the following metadata:
+ *
+ *  Heartbeat metadata:
+ *  1. negotiated heartbeat session timeout.
+ *  2. recorded number of timed-out heartbeats.
+ *  3. associated heartbeat bucket in the purgatory.
+ *
+ *  Subscription metadata:
+ *  1. subscribed topic list
+ *  2. assigned partitions for the subscribed topics.
+ */
+class ConsumerRegistry(val consumerId: String,
+                       val subscribedTopics: List[String],
+                       val sessionTimeoutMs: Int,
+                       val groupRegistry: GroupRegistry) {
+
+  /* number of expired heartbeat recorded */
+  val numExpiredHeartbeat = new AtomicInteger(0)
+
+  /* flag indicating if join group request is received */
+  val joinGroupReceived = new AtomicBoolean(false)
+
+  /* assigned partitions per subscribed topic */
+  val assignedPartitions = new HashMap[String, List[Int]]
+
+  /* associated heartbeat bucket */
+  var currentHeartbeatBucket = null
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/71602de0/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala b/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala
new file mode 100644
index 0000000..894d6ed
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.coordinator
+
+import kafka.server.DelayedOperation
+
+/**
+ * Delayed heartbeat operations that are added to the purgatory for session-timeout checking
+ *
+ * These operations will always be expired. Once it has expired, all its
+ * currently contained consumers are marked as heartbeat timed out.
+ */
+class DelayedHeartbeat(sessionTimeout: Long,
+                       bucket: HeartbeatBucket,
+                       expireCallback: (String, String) => Unit)
+  extends DelayedOperation(sessionTimeout) {
+
+  /* this function should never be called */
+  override def tryComplete(): Boolean = {
+
+    throw new IllegalStateException("Delayed heartbeat purgatory should never try to complete any bucket")
+  }
+
+  /* mark all consumers within the heartbeat as heartbeat timed out */
+  override def onComplete() {
+    for (registry <- bucket.consumerRegistryList)
+      expireCallback(registry.groupRegistry.groupId, registry.consumerId)
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/71602de0/core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala b/core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala
new file mode 100644
index 0000000..445bfa1
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.coordinator
+
+import kafka.server.DelayedOperation
+
+/**
+ * Delayed join-group operations that are kept in the purgatory before the partition assignment completed
+ *
+ * These operation should never expire; when the rebalance has completed, all consumer's
+ * join-group operations will be completed by sending back the response with the
+ * calculated partition assignment.
+ */
+class DelayedJoinGroup(sessionTimeout: Long,
+                       consumerRegistry: ConsumerRegistry,
+                       responseCallback: () => Unit) extends DelayedOperation(sessionTimeout) {
+
+  /* always successfully complete the operation once called */
+  override def tryComplete(): Boolean = {
+    forceComplete()
+  }
+
+  /* always assume the partition is already assigned as this delayed operation should never time-out */
+  override def onComplete() {
+
+    // TODO
+    responseCallback
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/71602de0/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala b/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala
new file mode 100644
index 0000000..b3b3749
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.coordinator
+
+import kafka.server.DelayedOperation
+import java.util.concurrent.atomic.AtomicBoolean
+
+
+/**
+ * Delayed rebalance operations that are added to the purgatory when group is preparing for rebalance
+ *
+ * Whenever a join-group request is received, check if all known consumers have requested
+ * to re-join the group; if yes, complete this operation to proceed rebalance.
+ *
+ * When the operation has expired, any known consumers that have not requested to re-join
+ * the group are marked as failed, and complete this operation to proceed rebalance with
+ * the rest of the group.
+ */
+class DelayedRebalance(sessionTimeout: Long,
+                       groupRegistry: GroupRegistry,
+                       rebalanceCallback: String => Unit,
+                       failureCallback: (String, String) => Unit)
+  extends DelayedOperation(sessionTimeout) {
+
+  val allConsumersJoinedGroup = new AtomicBoolean(false)
+
+  /* check if all known consumers have requested to re-join group */
+  override def tryComplete(): Boolean = {
+    allConsumersJoinedGroup.set(groupRegistry.memberRegistries.values.foldLeft
+      (true) ((agg, cur) => agg && cur.joinGroupReceived.get()))
+
+    if (allConsumersJoinedGroup.get())
+      forceComplete()
+    else
+      false
+  }
+
+  /* mark consumers that have not re-joined group as failed and proceed to rebalance the rest of the group */
+  override def onComplete() {
+    groupRegistry.memberRegistries.values.foreach(consumerRegistry =>
+      if (!consumerRegistry.joinGroupReceived.get())
+        failureCallback(groupRegistry.groupId, consumerRegistry.consumerId)
+    )
+
+    rebalanceCallback(groupRegistry.groupId)
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/71602de0/core/src/main/scala/kafka/coordinator/GroupRegistry.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupRegistry.scala b/core/src/main/scala/kafka/coordinator/GroupRegistry.scala
new file mode 100644
index 0000000..7d17e10
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/GroupRegistry.scala
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.coordinator
+
+import scala.collection.mutable
+
+sealed trait GroupStates { def state: Byte }
+
+/**
+ * Consumer group is preparing start rebalance
+ *
+ * action: respond consumer heartbeat with error code,
+ * transition: all known consumers has re-joined group => UnderRebalance
+ */
+case object PrepareRebalance extends GroupStates { val state: Byte = 1 }
+
+/**
+ * Consumer group is under rebalance
+ *
+ * action: send the join-group response with new assignment
+ * transition: all consumers has heartbeat with the new generation id => Fetching
+ *             new consumer join-group received => PrepareRebalance
+ */
+case object UnderRebalance extends GroupStates { val state: Byte = 2 }
+
+/**
+ * Consumer group is fetching data
+ *
+ * action: respond consumer heartbeat normally
+ * transition: consumer failure detected via heartbeat => PrepareRebalance
+ *             consumer join-group received => PrepareRebalance
+ *             zookeeper watcher fired => PrepareRebalance
+ */
+case object Fetching extends GroupStates { val state: Byte = 3 }
+
+case class GroupState() {
+  @volatile var currentState: Byte = PrepareRebalance.state
+}
+
+/* Group registry contains the following metadata of a registered group in the coordinator:
+ *
+ *  Membership metadata:
+ *  1. List of consumers registered in this group
+ *  2. Partition assignment strategy for this group
+ *
+ *  State metadata:
+ *  1. Current group state
+ *  2. Current group generation id
+ */
+class GroupRegistry(val groupId: String,
+                    val partitionAssignmentStrategy: String) {
+
+  val memberRegistries = new mutable.HashMap[String, ConsumerRegistry]()
+
+  val state: GroupState = new GroupState()
+
+  var generationId: Int = 1
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/71602de0/core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala b/core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala
new file mode 100644
index 0000000..821e26e
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.coordinator
+
+import scala.collection.mutable
+
+/**
+ * A bucket of consumers that are scheduled for heartbeat expiration.
+ *
+ * The motivation behind this is to avoid expensive fine-grained per-consumer
+ * heartbeat expiration but use coarsen-grained methods that group consumers
+ * with similar deadline together. This will result in some consumers not
+ * being expired for heartbeats in time but is tolerable.
+ */
+class HeartbeatBucket(val startMs: Long, endMs: Long) {
+
+  /* The list of consumers that are contained in this bucket */
+  val consumerRegistryList = new mutable.HashSet[ConsumerRegistry]
+
+  // TODO
+}