You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/02/10 00:25:19 UTC
kafka git commit: KAFKA-1333 follow-up;
Add missing files for the coordinator folder
Repository: kafka
Updated Branches:
refs/heads/trunk 39cd48de3 -> 71602de0b
KAFKA-1333 follow-up; Add missing files for the coordinator folder
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/71602de0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/71602de0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/71602de0
Branch: refs/heads/trunk
Commit: 71602de0bbf7727f498a812033027f6cbfe34eb8
Parents: 39cd48d
Author: Guozhang Wang <wa...@gmail.com>
Authored: Mon Feb 9 15:24:44 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Feb 9 15:24:44 2015 -0800
----------------------------------------------------------------------
.../kafka/coordinator/ConsumerCoordinator.scala | 333 +++++++++++++++++++
.../kafka/coordinator/ConsumerRegistry.scala | 52 +++
.../kafka/coordinator/DelayedHeartbeat.scala | 44 +++
.../kafka/coordinator/DelayedJoinGroup.scala | 44 +++
.../kafka/coordinator/DelayedRebalance.scala | 62 ++++
.../scala/kafka/coordinator/GroupRegistry.scala | 74 +++++
.../kafka/coordinator/HeartbeatBucket.scala | 36 ++
7 files changed, 645 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/71602de0/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
new file mode 100644
index 0000000..01cf1d9
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
@@ -0,0 +1,333 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator
+
+import org.apache.kafka.common.protocol.Errors
+
+import kafka.common.TopicAndPartition
+import kafka.server._
+import kafka.utils._
+
+import scala.collection.mutable.HashMap
+
+import org.I0Itec.zkclient.{IZkChildListener, ZkClient}
+
+
+/**
+ * Kafka coordinator handles consumer group and consumer offset management.
+ *
+ * Each Kafka server instantiates a coordinator, which is responsible for a set of
+ * consumer groups; the consumer groups are assigned to coordinators based on their
+ * group names.
+ */
+class ConsumerCoordinator(val config: KafkaConfig,
+ val zkClient: ZkClient) extends Logging {
+
+ this.logIdent = "[Kafka Coordinator " + config.brokerId + "]: "
+
+ /* zookeeper listener for topic-partition changes */
+ private val topicPartitionChangeListeners = new HashMap[String, TopicPartitionChangeListener]
+
+ /* the consumer group registry cache */
+ // TODO: access to this map needs to be synchronized
+ private val consumerGroupRegistries = new HashMap[String, GroupRegistry]
+
+ /* the list of subscribed groups per topic */
+ // TODO: access to this map needs to be synchronized
+ private val consumerGroupsPerTopic = new HashMap[String, List[String]]
+
+ /* the delayed operation purgatory for heartbeat-based failure detection */
+ private var heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat] = null
+
+ /* the delayed operation purgatory for handling join-group requests */
+ private var joinGroupPurgatory: DelayedOperationPurgatory[DelayedJoinGroup] = null
+
+ /* the delayed operation purgatory for preparing rebalance process */
+ private var rebalancePurgatory: DelayedOperationPurgatory[DelayedRebalance] = null
+
+ /* latest consumer heartbeat bucket's end timestamp in milliseconds */
+ private var latestHeartbeatBucketEndMs: Long = SystemTime.milliseconds
+
+ /**
+ * Start-up logic executed at the same time when the server starts up.
+ */
+ def startup() {
+
+ // Initialize consumer group registries and heartbeat bucket metadata
+ latestHeartbeatBucketEndMs = SystemTime.milliseconds
+
+ // Initialize purgatories for delayed heartbeat, join-group and rebalance operations
+ heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat](config.brokerId)
+ joinGroupPurgatory = new DelayedOperationPurgatory[DelayedJoinGroup](config.brokerId)
+ rebalancePurgatory = new DelayedOperationPurgatory[DelayedRebalance](config.brokerId)
+
+ }
+
+ /**
+ * Shut-down logic executed at the same time when server shuts down,
+ * ordering of actions should be reversed from the start-up process
+ *
+ */
+ def shutdown() {
+
+ // De-register all Zookeeper listeners for topic-partition changes
+ for (topic <- topicPartitionChangeListeners.keys) {
+ deregisterTopicChangeListener(topic)
+ }
+ topicPartitionChangeListeners.clear()
+
+ // Shutdown purgatories for delayed heartbeat, join-group and rebalance operations
+ heartbeatPurgatory.shutdown()
+ joinGroupPurgatory.shutdown()
+ rebalancePurgatory.shutdown()
+
+ // Clean up consumer group registries metadata
+ consumerGroupRegistries.clear()
+ consumerGroupsPerTopic.clear()
+ }
+
+ /**
+ * Process a join-group request from a consumer to join as a new group member
+ */
+ def consumerJoinGroup(groupId: String,
+ consumerId: String,
+ topics: List[String],
+ sessionTimeoutMs: Int,
+ partitionAssignmentStrategy: String,
+ responseCallback:(List[TopicAndPartition], Int, Short) => Unit ) {
+
+ // if the group does not exist yet, create one
+ if (!consumerGroupRegistries.contains(groupId))
+ createNewGroup(groupId, partitionAssignmentStrategy)
+
+ // if the consumer id is unknown or it does exists in
+ // the group yet, register this consumer to the group
+ // TODO
+
+ // add a delayed join-group operation to the purgatory
+ // TODO
+
+ // if the current group is under rebalance process,
+ // check if the delayed rebalance operation can be finished
+ // TODO
+
+ // TODO --------------------------------------------------------------
+ // TODO: this is just a stub for new consumer testing,
+ // TODO: needs to be replaced with the logic above
+ // TODO --------------------------------------------------------------
+ // just return all the partitions of the subscribed topics
+ val partitionIdsPerTopic = ZkUtils.getPartitionsForTopics(zkClient, topics)
+ val partitions = partitionIdsPerTopic.flatMap{ case (topic, partitionIds) =>
+ partitionIds.map(partition => {
+ TopicAndPartition(topic, partition)
+ })
+ }.toList
+
+ responseCallback(partitions, 1 /* generation id */, Errors.NONE.code)
+
+ info("Handled join-group from consumer " + consumerId + " to group " + groupId)
+ }
+
+ /**
+ * Process a heartbeat request from a consumer
+ */
+ def consumerHeartbeat(groupId: String,
+ consumerId: String,
+ generationId: Int,
+ responseCallback: Short => Unit) {
+
+ // check that the group already exists
+ // TODO
+
+ // check that the consumer has already registered for the group
+ // TODO
+
+ // check if the consumer generation id is correct
+ // TODO
+
+ // remove the consumer from its current heartbeat bucket, and add it back to the corresponding bucket
+ // TODO
+
+ // create the heartbeat response, if partition rebalance is triggered set the corresponding error code
+ // TODO
+
+ info("Handled heartbeat of consumer " + consumerId + " from group " + groupId)
+
+ // TODO --------------------------------------------------------------
+ // TODO: this is just a stub for new consumer testing,
+ // TODO: needs to be replaced with the logic above
+ // TODO --------------------------------------------------------------
+ // always return OK for heartbeat immediately
+ responseCallback(Errors.NONE.code)
+ }
+
+ /**
+ * Create a new consumer
+ */
+ private def createNewConsumer(consumerId: String,
+ topics: List[String],
+ sessionTimeoutMs: Int,
+ groupRegistry: GroupRegistry) {
+ debug("Registering consumer " + consumerId + " for group " + groupRegistry.groupId)
+
+ // create the new consumer registry entry
+ // TODO: specify consumerId as unknown and update at the end of the prepare-rebalance phase
+
+ // check if the partition assignment strategy is consistent with the group
+ // TODO
+
+ // add the group to the subscribed topics
+ // TODO
+
+ // schedule heartbeat tasks for the consumer
+ // TODO
+
+ // add the member registry entry to the group
+ // TODO
+
+ // start preparing group partition rebalance
+ // TODO
+
+ info("Registered consumer " + consumerId + " for group " + groupRegistry.groupId)
+ }
+
+ /**
+ * Create a new consumer group in the registry
+ */
+ private def createNewGroup(groupId: String, partitionAssignmentStrategy: String) {
+ debug("Creating new group " + groupId)
+
+ val groupRegistry = new GroupRegistry(groupId, partitionAssignmentStrategy)
+
+ consumerGroupRegistries.put(groupId, groupRegistry)
+
+ info("Created new group registry " + groupId)
+ }
+
+ /**
+ * Callback invoked when a consumer's heartbeat has expired
+ */
+ private def onConsumerHeartbeatExpired(groupId: String, consumerId: String) {
+
+ // if the consumer does not exist in group registry anymore, do nothing
+ // TODO
+
+ // record heartbeat failure
+ // TODO
+
+ // if the maximum failures has been reached, mark consumer as failed
+ // TODO
+ }
+
+ /**
+ * Callback invoked when a consumer is marked as failed
+ */
+ private def onConsumerFailure(groupId: String, consumerId: String) {
+
+ // remove the consumer from its group registry metadata
+ // TODO
+
+ // cut the socket connection to the consumer
+ // TODO: howto ??
+
+ // if the group has no consumer members any more, remove the group
+ // otherwise start preparing group partition rebalance
+ // TODO
+
+ }
+
+ /**
+ * Prepare partition rebalance for the group
+ */
+ private def prepareRebalance(groupId: String) {
+
+ // try to change the group state to PrepareRebalance
+
+ // add a task to the delayed rebalance purgatory
+
+ // TODO
+ }
+
+ /**
+ * Start partition rebalance for the group
+ */
+ private def startRebalance(groupId: String) {
+
+ // try to change the group state to UnderRebalance
+
+ // compute new assignment based on the strategy
+
+ // send back the join-group response
+
+ // TODO
+ }
+
+ /**
+ * Fail current partition rebalance for the group
+ */
+
+ /**
+ * Register ZK listeners for topic-partition changes
+ */
+ private def registerTopicChangeListener(topic: String) = {
+ if (!topicPartitionChangeListeners.contains(topic)) {
+ val listener = new TopicPartitionChangeListener(config)
+ topicPartitionChangeListeners.put(topic, listener)
+ ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.getTopicPath(topic))
+ zkClient.subscribeChildChanges(ZkUtils.getTopicPath(topic), listener)
+ }
+ }
+
+ /**
+ * De-register ZK listeners for topic-partition changes
+ */
+ private def deregisterTopicChangeListener(topic: String) = {
+ val listener = topicPartitionChangeListeners.get(topic).get
+ zkClient.unsubscribeChildChanges(ZkUtils.getTopicPath(topic), listener)
+ topicPartitionChangeListeners.remove(topic)
+ }
+
+ /**
+ * Zookeeper listener that catch topic-partition changes
+ */
+ class TopicPartitionChangeListener(val config: KafkaConfig) extends IZkChildListener with Logging {
+
+ this.logIdent = "[TopicChangeListener on coordinator " + config.brokerId + "]: "
+
+ /**
+ * Try to trigger a rebalance for each group subscribed in the changed topic
+ *
+ * @throws Exception
+ * On any error.
+ */
+ def handleChildChange(parentPath: String , curChilds: java.util.List[String]) {
+ debug("Fired for path %s with children %s".format(parentPath, curChilds))
+
+ // get the topic
+ val topic = parentPath.split("/").last
+
+ // get groups that subscribed to this topic
+ val groups = consumerGroupsPerTopic.get(topic).get
+
+ for (groupId <- groups) {
+ prepareRebalance(groupId)
+ }
+ }
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/71602de0/core/src/main/scala/kafka/coordinator/ConsumerRegistry.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/ConsumerRegistry.scala b/core/src/main/scala/kafka/coordinator/ConsumerRegistry.scala
new file mode 100644
index 0000000..b65c04d
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/ConsumerRegistry.scala
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.coordinator
+
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
+import java.util.HashMap
+
+/**
+ * Consumer registry metadata contains the following metadata:
+ *
+ * Heartbeat metadata:
+ * 1. negotiated heartbeat session timeout.
+ * 2. recorded number of timed-out heartbeats.
+ * 3. associated heartbeat bucket in the purgatory.
+ *
+ * Subscription metadata:
+ * 1. subscribed topic list
+ * 2. assigned partitions for the subscribed topics.
+ */
+class ConsumerRegistry(val consumerId: String,
+ val subscribedTopics: List[String],
+ val sessionTimeoutMs: Int,
+ val groupRegistry: GroupRegistry) {
+
+ /* number of expired heartbeat recorded */
+ val numExpiredHeartbeat = new AtomicInteger(0)
+
+ /* flag indicating if join group request is received */
+ val joinGroupReceived = new AtomicBoolean(false)
+
+ /* assigned partitions per subscribed topic */
+ val assignedPartitions = new HashMap[String, List[Int]]
+
+ /* associated heartbeat bucket */
+ var currentHeartbeatBucket = null
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/71602de0/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala b/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala
new file mode 100644
index 0000000..894d6ed
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.coordinator
+
+import kafka.server.DelayedOperation
+
+/**
+ * Delayed heartbeat operations that are added to the purgatory for session-timeout checking
+ *
+ * These operations will always be expired. Once it has expired, all its
+ * currently contained consumers are marked as heartbeat timed out.
+ */
+class DelayedHeartbeat(sessionTimeout: Long,
+ bucket: HeartbeatBucket,
+ expireCallback: (String, String) => Unit)
+ extends DelayedOperation(sessionTimeout) {
+
+ /* this function should never be called */
+ override def tryComplete(): Boolean = {
+
+ throw new IllegalStateException("Delayed heartbeat purgatory should never try to complete any bucket")
+ }
+
+ /* mark all consumers within the heartbeat as heartbeat timed out */
+ override def onComplete() {
+ for (registry <- bucket.consumerRegistryList)
+ expireCallback(registry.groupRegistry.groupId, registry.consumerId)
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/71602de0/core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala b/core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala
new file mode 100644
index 0000000..445bfa1
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.coordinator
+
+import kafka.server.DelayedOperation
+
+/**
+ * Delayed join-group operations that are kept in the purgatory before the partition assignment completed
+ *
+ * These operation should never expire; when the rebalance has completed, all consumer's
+ * join-group operations will be completed by sending back the response with the
+ * calculated partition assignment.
+ */
+class DelayedJoinGroup(sessionTimeout: Long,
+ consumerRegistry: ConsumerRegistry,
+ responseCallback: () => Unit) extends DelayedOperation(sessionTimeout) {
+
+ /* always successfully complete the operation once called */
+ override def tryComplete(): Boolean = {
+ forceComplete()
+ }
+
+ /* always assume the partition is already assigned as this delayed operation should never time-out */
+ override def onComplete() {
+
+ // TODO
+ responseCallback
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/71602de0/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala b/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala
new file mode 100644
index 0000000..b3b3749
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.coordinator
+
+import kafka.server.DelayedOperation
+import java.util.concurrent.atomic.AtomicBoolean
+
+
+/**
+ * Delayed rebalance operations that are added to the purgatory when group is preparing for rebalance
+ *
+ * Whenever a join-group request is received, check if all known consumers have requested
+ * to re-join the group; if yes, complete this operation to proceed rebalance.
+ *
+ * When the operation has expired, any known consumers that have not requested to re-join
+ * the group are marked as failed, and complete this operation to proceed rebalance with
+ * the rest of the group.
+ */
+class DelayedRebalance(sessionTimeout: Long,
+ groupRegistry: GroupRegistry,
+ rebalanceCallback: String => Unit,
+ failureCallback: (String, String) => Unit)
+ extends DelayedOperation(sessionTimeout) {
+
+ val allConsumersJoinedGroup = new AtomicBoolean(false)
+
+ /* check if all known consumers have requested to re-join group */
+ override def tryComplete(): Boolean = {
+ allConsumersJoinedGroup.set(groupRegistry.memberRegistries.values.foldLeft
+ (true) ((agg, cur) => agg && cur.joinGroupReceived.get()))
+
+ if (allConsumersJoinedGroup.get())
+ forceComplete()
+ else
+ false
+ }
+
+ /* mark consumers that have not re-joined group as failed and proceed to rebalance the rest of the group */
+ override def onComplete() {
+ groupRegistry.memberRegistries.values.foreach(consumerRegistry =>
+ if (!consumerRegistry.joinGroupReceived.get())
+ failureCallback(groupRegistry.groupId, consumerRegistry.consumerId)
+ )
+
+ rebalanceCallback(groupRegistry.groupId)
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/71602de0/core/src/main/scala/kafka/coordinator/GroupRegistry.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupRegistry.scala b/core/src/main/scala/kafka/coordinator/GroupRegistry.scala
new file mode 100644
index 0000000..7d17e10
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/GroupRegistry.scala
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.coordinator
+
+import scala.collection.mutable
+
+sealed trait GroupStates { def state: Byte }
+
+/**
+ * Consumer group is preparing start rebalance
+ *
+ * action: respond consumer heartbeat with error code,
+ * transition: all known consumers has re-joined group => UnderRebalance
+ */
+case object PrepareRebalance extends GroupStates { val state: Byte = 1 }
+
+/**
+ * Consumer group is under rebalance
+ *
+ * action: send the join-group response with new assignment
+ * transition: all consumers has heartbeat with the new generation id => Fetching
+ * new consumer join-group received => PrepareRebalance
+ */
+case object UnderRebalance extends GroupStates { val state: Byte = 2 }
+
+/**
+ * Consumer group is fetching data
+ *
+ * action: respond consumer heartbeat normally
+ * transition: consumer failure detected via heartbeat => PrepareRebalance
+ * consumer join-group received => PrepareRebalance
+ * zookeeper watcher fired => PrepareRebalance
+ */
+case object Fetching extends GroupStates { val state: Byte = 3 }
+
+case class GroupState() {
+ @volatile var currentState: Byte = PrepareRebalance.state
+}
+
+/* Group registry contains the following metadata of a registered group in the coordinator:
+ *
+ * Membership metadata:
+ * 1. List of consumers registered in this group
+ * 2. Partition assignment strategy for this group
+ *
+ * State metadata:
+ * 1. Current group state
+ * 2. Current group generation id
+ */
+class GroupRegistry(val groupId: String,
+ val partitionAssignmentStrategy: String) {
+
+ val memberRegistries = new mutable.HashMap[String, ConsumerRegistry]()
+
+ val state: GroupState = new GroupState()
+
+ var generationId: Int = 1
+}
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/71602de0/core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala b/core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala
new file mode 100644
index 0000000..821e26e
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.coordinator
+
+import scala.collection.mutable
+
+/**
+ * A bucket of consumers that are scheduled for heartbeat expiration.
+ *
+ * The motivation behind this is to avoid expensive fine-grained per-consumer
+ * heartbeat expiration but use coarsen-grained methods that group consumers
+ * with similar deadline together. This will result in some consumers not
+ * being expired for heartbeats in time but is tolerable.
+ */
+class HeartbeatBucket(val startMs: Long, endMs: Long) {
+
+ /* The list of consumers that are contained in this bucket */
+ val consumerRegistryList = new mutable.HashSet[ConsumerRegistry]
+
+ // TODO
+}