You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/04/26 21:11:13 UTC

[10/10] kafka git commit: KAFKA-5059: Implement Transactional Coordinator

KAFKA-5059: Implement Transactional Coordinator

Author: Damian Guy <da...@gmail.com>
Author: Guozhang Wang <wa...@gmail.com>
Author: Apurva Mehta <ap...@confluent.io>

Reviewers: Guozhang Wang, Jason Gustafson, Apurva Mehta, Jun Rao

Closes #2849 from dguy/exactly-once-tc


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f69d9415
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f69d9415
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f69d9415

Branch: refs/heads/trunk
Commit: f69d94158ce72f4ef2df5b98f2395c2b2e61251e
Parents: 8d8ab2e
Author: Damian Guy <da...@gmail.com>
Authored: Wed Apr 26 14:10:38 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Apr 26 14:10:38 2017 -0700

----------------------------------------------------------------------
 checkstyle/import-control-core.xml              |    4 +
 .../clients/consumer/internals/Fetcher.java     |    8 +-
 .../errors/ConcurrentTransactionsException.java |   25 +
 .../errors/InvalidTxnTimeoutException.java      |    1 +
 .../apache/kafka/common/protocol/Errors.java    |   10 +-
 .../kafka/common/requests/InitPidRequest.java   |    2 -
 .../common/requests/WriteTxnMarkersRequest.java |   41 +
 .../common/requests/RequestResponseTest.java    |    3 +-
 config/server.properties                        |    6 +-
 .../main/scala/kafka/admin/AdminClient.scala    |    2 +-
 .../main/scala/kafka/cluster/Partition.scala    |    4 +-
 .../kafka/common/InterBrokerSendThread.scala    |   69 ++
 core/src/main/scala/kafka/common/Topic.scala    |    3 +-
 .../kafka/coordinator/DelayedHeartbeat.scala    |   40 -
 .../scala/kafka/coordinator/DelayedJoin.scala   |   44 -
 .../kafka/coordinator/GroupCoordinator.scala    |  817 -------------
 .../scala/kafka/coordinator/GroupMetadata.scala |  327 -----
 .../coordinator/GroupMetadataManager.scala      | 1122 -----------------
 .../kafka/coordinator/MemberMetadata.scala      |  123 --
 .../scala/kafka/coordinator/OffsetConfig.scala  |   61 -
 .../kafka/coordinator/ProducerIdManager.scala   |  153 ---
 .../coordinator/TransactionCoordinator.scala    |   92 --
 .../coordinator/group/DelayedHeartbeat.scala    |   40 +
 .../kafka/coordinator/group/DelayedJoin.scala   |   43 +
 .../coordinator/group/GroupCoordinator.scala    |  817 +++++++++++++
 .../kafka/coordinator/group/GroupMetadata.scala |  326 +++++
 .../group/GroupMetadataManager.scala            | 1123 +++++++++++++++++
 .../coordinator/group/MemberMetadata.scala      |  120 ++
 .../kafka/coordinator/group/OffsetConfig.scala  |   61 +
 .../transaction/DelayedTxnMarker.scala          |   48 +
 .../transaction/ProducerIdManager.scala         |  153 +++
 .../transaction/TransactionCoordinator.scala    |  434 +++++++
 .../transaction/TransactionLog.scala            |  275 +++++
 .../transaction/TransactionMarkerChannel.scala  |  168 +++
 .../TransactionMarkerChannelManager.scala       |  159 +++
 ...nsactionMarkerRequestCompletionHandler.scala |   95 ++
 .../transaction/TransactionMetadata.scala       |  175 +++
 .../transaction/TransactionStateManager.scala   |  429 +++++++
 .../main/scala/kafka/server/AdminManager.scala  |    2 +-
 .../scala/kafka/server/DelayedOperation.scala   |   32 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |  194 ++-
 .../main/scala/kafka/server/KafkaConfig.scala   |   54 +-
 .../main/scala/kafka/server/KafkaServer.scala   |    7 +-
 .../main/scala/kafka/server/MetadataCache.scala |   14 +-
 .../scala/kafka/server/ReplicaManager.scala     |    6 +-
 .../scala/kafka/tools/DumpLogSegments.scala     |    2 +-
 .../main/scala/kafka/utils/KafkaScheduler.scala |    2 +-
 .../scala/kafka/utils/ShutdownableThread.scala  |    4 +-
 ...eListenersWithSameSecurityProtocolTest.scala |    2 +-
 .../common/InterBrokerSendThreadTest.scala      |  131 ++
 .../GroupCoordinatorResponseTest.scala          | 1154 ------------------
 .../coordinator/GroupCoordinatorTest.scala      |   36 -
 .../coordinator/GroupMetadataManagerTest.scala  |  814 ------------
 .../kafka/coordinator/GroupMetadataTest.scala   |  360 ------
 .../kafka/coordinator/MemberMetadataTest.scala  |   88 --
 .../coordinator/ProducerIdManagerTest.scala     |  105 --
 .../TransactionCoordinatorTest.scala            |   93 --
 .../group/GroupCoordinatorResponseTest.scala    | 1154 ++++++++++++++++++
 .../group/GroupCoordinatorTest.scala            |   36 +
 .../group/GroupMetadataManagerTest.scala        |  815 +++++++++++++
 .../coordinator/group/GroupMetadataTest.scala   |  362 ++++++
 .../coordinator/group/MemberMetadataTest.scala  |   88 ++
 .../transaction/ProducerIdManagerTest.scala     |  104 ++
 .../TransactionCoordinatorIntegrationTest.scala |   98 ++
 .../TransactionCoordinatorTest.scala            |  787 ++++++++++++
 .../transaction/TransactionLogTest.scala        |  109 ++
 .../TransactionMarkerChannelManagerTest.scala   |  283 +++++
 .../TransactionMarkerChannelTest.scala          |  179 +++
 ...tionMarkerRequestCompletionHandlerTest.scala |  164 +++
 .../TransactionStateManagerTest.scala           |  354 ++++++
 .../kafka/server/DelayedOperationTest.scala     |   34 +-
 .../unit/kafka/server/KafkaConfigTest.scala     |    8 +-
 72 files changed, 9554 insertions(+), 5544 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f69d9415/checkstyle/import-control-core.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml
index 5714bfd..856df58 100644
--- a/checkstyle/import-control-core.xml
+++ b/checkstyle/import-control-core.xml
@@ -61,6 +61,10 @@
     <allow pkg="org.apache.kafka.clients.consumer" />
   </subpackage>
 
+  <subpackage name="coordinator">
+    <allow class="kafka.server.MetadataCache" />
+  </subpackage>
+
   <subpackage name="examples">
     <allow pkg="org.apache.kafka.clients" />
     <allow pkg="kafka.api" />

http://git-wip-us.apache.org/repos/asf/kafka/blob/f69d9415/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index e337f4e..f421dfb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -868,7 +868,13 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
         return partitionRecords;
     }
 
-    private ConsumerRecord<K, V> parseRecord(TopicPartition partition, RecordBatch batch, Record record) {
+
+    /**
+     * Parse the record entry, deserializing the key / value fields if necessary
+     */
+    private ConsumerRecord<K, V> parseRecord(TopicPartition partition,
+                                             RecordBatch batch,
+                                             Record record) {
         try {
             long offset = record.offset();
             long timestamp = record.timestamp();

http://git-wip-us.apache.org/repos/asf/kafka/blob/f69d9415/clients/src/main/java/org/apache/kafka/common/errors/ConcurrentTransactionsException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/ConcurrentTransactionsException.java b/clients/src/main/java/org/apache/kafka/common/errors/ConcurrentTransactionsException.java
new file mode 100644
index 0000000..6ad6b8a
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/ConcurrentTransactionsException.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class ConcurrentTransactionsException extends ApiException {
+    private static final long serialVersionUID = 1L;
+
+    public ConcurrentTransactionsException(final String message) {
+        super(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f69d9415/clients/src/main/java/org/apache/kafka/common/errors/InvalidTxnTimeoutException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InvalidTxnTimeoutException.java b/clients/src/main/java/org/apache/kafka/common/errors/InvalidTxnTimeoutException.java
index 12d873e..e5df248 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/InvalidTxnTimeoutException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/InvalidTxnTimeoutException.java
@@ -31,3 +31,4 @@ public class InvalidTxnTimeoutException extends ApiException {
         super(message);
     }
 }
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/f69d9415/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 375cf16..80e9191 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common.protocol;
 import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.errors.BrokerNotAvailableException;
 import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.ConcurrentTransactionsException;
 import org.apache.kafka.common.errors.ControllerMovedException;
 import org.apache.kafka.common.errors.CorruptRecordException;
 import org.apache.kafka.common.errors.DuplicateSequenceNumberException;
@@ -38,11 +39,12 @@ import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
 import org.apache.kafka.common.errors.InvalidReplicationFactorException;
 import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.errors.InvalidRequiredAcksException;
+import org.apache.kafka.common.errors.InvalidTxnTimeoutException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
 import org.apache.kafka.common.errors.InvalidSessionTimeoutException;
 import org.apache.kafka.common.errors.InvalidTimestampException;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.InvalidTxnStateException;
-import org.apache.kafka.common.errors.InvalidTxnTimeoutException;
 import org.apache.kafka.common.errors.LeaderNotAvailableException;
 import org.apache.kafka.common.errors.NetworkException;
 import org.apache.kafka.common.errors.NotControllerException;
@@ -52,7 +54,6 @@ import org.apache.kafka.common.errors.NotEnoughReplicasException;
 import org.apache.kafka.common.errors.NotLeaderForPartitionException;
 import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
 import org.apache.kafka.common.errors.OffsetOutOfRangeException;
-import org.apache.kafka.common.errors.OutOfOrderSequenceException;
 import org.apache.kafka.common.errors.PolicyViolationException;
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.errors.RebalanceInProgressException;
@@ -186,7 +187,10 @@ public enum Errors {
         new InvalidPidMappingException("The PID mapping is invalid")),
     INVALID_TRANSACTION_TIMEOUT(50,
         new InvalidTxnTimeoutException("The transaction timeout is larger than the maximum value allowed by the broker " +
-            "(as configured by max.transaction.timeout.ms)."));
+            "(as configured by max.transaction.timeout.ms).")),
+    CONCURRENT_TRANSACTIONS(51,
+        new ConcurrentTransactionsException("The producer attempted to update a transaction " +
+             "while another concurrent operation on the same transaction was ongoing"));
 
     private static final Logger log = LoggerFactory.getLogger(Errors.class);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f69d9415/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java
index dedbc0f..eff05e2 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java
@@ -28,7 +28,6 @@ public class InitPidRequest extends AbstractRequest {
     private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id";
     private static final String TRANSACTION_TIMEOUT_KEY_NAME = "transaction_timeout_ms";
 
-
     private final String transactionalId;
     private final int transactionTimeoutMs;
 
@@ -63,7 +62,6 @@ public class InitPidRequest extends AbstractRequest {
             return "(type=InitPidRequest, transactionalId=" + transactionalId + ", transactionTimeoutMs=" +
                     transactionTimeoutMs + ")";
         }
-
     }
 
     public InitPidRequest(Struct struct, short version) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f69d9415/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
index fe64603..998d504 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
 public class WriteTxnMarkersRequest extends AbstractRequest {
     private static final String COORDINATOR_EPOCH_KEY_NAME = "coordinator_epoch";
@@ -67,6 +68,33 @@ public class WriteTxnMarkersRequest extends AbstractRequest {
         public List<TopicPartition> partitions() {
             return partitions;
         }
+
+
+        @Override
+        public String toString() {
+            return "TxnMarkerEntry{" +
+                    "pid=" + producerId +
+                    ", epoch=" + producerEpoch +
+                    ", result=" + result +
+                    ", partitions=" + partitions +
+                    '}';
+        }
+
+        @Override
+        public boolean equals(final Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            final TxnMarkerEntry that = (TxnMarkerEntry) o;
+            return producerId == that.producerId &&
+                    producerEpoch == that.producerEpoch &&
+                    result == that.result &&
+                    Objects.equals(partitions, that.partitions);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(producerId, producerEpoch, result, partitions);
+        }
     }
 
     public static class Builder extends AbstractRequest.Builder<WriteTxnMarkersRequest> {
@@ -183,4 +211,17 @@ public class WriteTxnMarkersRequest extends AbstractRequest {
         return new WriteTxnMarkersRequest(ApiKeys.WRITE_TXN_MARKERS.parseRequest(version, buffer), version);
     }
 
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        final WriteTxnMarkersRequest that = (WriteTxnMarkersRequest) o;
+        return coordinatorEpoch == that.coordinatorEpoch &&
+                Objects.equals(markers, that.markers);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(coordinatorEpoch, markers);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f69d9415/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index e41c38e..9e283c0 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -568,7 +568,7 @@ public class RequestResponseTest {
     }
 
     private ListGroupsResponse createListGroupsResponse() {
-        List<ListGroupsResponse.Group> groups = asList(new ListGroupsResponse.Group("test-group", "consumer"));
+        List<ListGroupsResponse.Group> groups = Collections.singletonList(new ListGroupsResponse.Group("test-group", "consumer"));
         return new ListGroupsResponse(Errors.NONE, groups);
     }
 
@@ -844,6 +844,7 @@ public class RequestResponseTest {
         return new InitPidResponse(Errors.NONE, 3332, (short) 3);
     }
 
+
     private OffsetsForLeaderEpochRequest createLeaderEpochRequest() {
         Map<TopicPartition, Integer> epochs = new HashMap<>();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f69d9415/config/server.properties
----------------------------------------------------------------------
diff --git a/config/server.properties b/config/server.properties
index 37b5bb3..da72075 100644
--- a/config/server.properties
+++ b/config/server.properties
@@ -72,9 +72,11 @@ num.partitions=1
 num.recovery.threads.per.data.dir=1
 
 ############################# Internal Topic Settings  #############################
-# The replication factor for the group metadata internal topic "__consumer_offsets".
-# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
+# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
+# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
 offsets.topic.replication.factor=1
+transaction.state.log.replication.factor=1
+transaction.state.log.min.isr=1
 
 ############################# Log Flush Policy #############################
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f69d9415/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 4d218c1..7342bbb 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -19,7 +19,7 @@ import java.util.concurrent.atomic.AtomicInteger
 import java.util.concurrent.{ConcurrentLinkedQueue, TimeUnit, Future}
 import kafka.admin.AdminClient.DeleteRecordsResult
 import kafka.common.KafkaException
-import kafka.coordinator.GroupOverview
+import kafka.coordinator.group.GroupOverview
 import kafka.utils.Logging
 
 import org.apache.kafka.clients._

http://git-wip-us.apache.org/repos/asf/kafka/blob/f69d9415/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 72e505d..1eea8dc 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -241,9 +241,9 @@ class Partition(val topic: String,
     getReplica(replicaId) match {
       case Some(replica) =>
         // No need to calculate low watermark if there is no delayed DeleteRecordsRequest
-        val oldLeaderLW = if (replicaManager.delayedDeleteRecordsPurgatory.delayed() > 0) lowWatermarkIfLeader else -1L
+        val oldLeaderLW = if (replicaManager.delayedDeleteRecordsPurgatory.delayed > 0) lowWatermarkIfLeader else -1L
         replica.updateLogReadResult(logReadResult)
-        val newLeaderLW = if (replicaManager.delayedDeleteRecordsPurgatory.delayed() > 0) lowWatermarkIfLeader else -1L
+        val newLeaderLW = if (replicaManager.delayedDeleteRecordsPurgatory.delayed > 0) lowWatermarkIfLeader else -1L
         // check if the LW of the partition has incremented
         // since the replica's logStartOffset may have incremented
         val leaderLWIncremented = newLeaderLW > oldLeaderLW

http://git-wip-us.apache.org/repos/asf/kafka/blob/f69d9415/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
new file mode 100644
index 0000000..b626954
--- /dev/null
+++ b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.common
+
+import kafka.utils.ShutdownableThread
+import org.apache.kafka.clients.{ClientResponse, NetworkClient, RequestCompletionHandler}
+import org.apache.kafka.common.Node
+import org.apache.kafka.common.requests.AbstractRequest
+import org.apache.kafka.common.utils.Time
+
+
+/**
+ *  Class for inter-broker send thread that utilize a non-blocking network client.
+ */
+class InterBrokerSendThread(name: String,
+                            networkClient: NetworkClient,
+                            requestGenerator: () => Iterable[RequestAndCompletionHandler],
+                            time: Time)
+  extends ShutdownableThread(name, isInterruptible = false) {
+
+  override def doWork() {
+    val now = time.milliseconds()
+    var pollTimeout = Long.MaxValue
+
+    val requestsToSend: Iterable[RequestAndCompletionHandler] = requestGenerator()
+
+    for (request: RequestAndCompletionHandler <- requestsToSend) {
+      val destination = Integer.toString(request.destination.id())
+      val completionHandler = request.handler
+      // TODO: Need to check inter broker protocol and error if new request is not supported
+      val clientRequest = networkClient.newClientRequest(destination,
+        request.request,
+        now,
+        true,
+        completionHandler)
+
+      if (networkClient.ready(request.destination, now)) {
+        networkClient.send(clientRequest, now)
+      } else {
+        val disConnectedResponse: ClientResponse = new ClientResponse(clientRequest.makeHeader(request.request.desiredOrLatestVersion()),
+          completionHandler, destination,
+          now /* createdTimeMs */, now /* receivedTimeMs */, true /* disconnected */, null /* versionMismatch */, null /* responseBody */)
+
+        // poll timeout would be the minimum of connection delay if there are any dest yet to be reached;
+        // otherwise it is infinity
+        pollTimeout = Math.min(pollTimeout, networkClient.connectionDelay(request.destination, now))
+
+        completionHandler.onComplete(disConnectedResponse)
+      }
+    }
+    networkClient.poll(pollTimeout, now)
+  }
+}
+
+case class RequestAndCompletionHandler(destination: Node, request: AbstractRequest.Builder[_ <: AbstractRequest], handler: RequestCompletionHandler)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/f69d9415/core/src/main/scala/kafka/common/Topic.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/Topic.scala b/core/src/main/scala/kafka/common/Topic.scala
index 4a65afb..6ca7175 100644
--- a/core/src/main/scala/kafka/common/Topic.scala
+++ b/core/src/main/scala/kafka/common/Topic.scala
@@ -24,7 +24,8 @@ import scala.collection.immutable
 object Topic {
 
   val GroupMetadataTopicName = "__consumer_offsets"
-  val InternalTopics = immutable.Set(GroupMetadataTopicName)
+  val TransactionStateTopicName = "__transaction_state"
+  val InternalTopics = immutable.Set(GroupMetadataTopicName, TransactionStateTopicName)
 
   val legalChars = "[a-zA-Z0-9\\._\\-]"
   private val maxNameLength = 249

http://git-wip-us.apache.org/repos/asf/kafka/blob/f69d9415/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala b/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala
deleted file mode 100644
index b05186c..0000000
--- a/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.coordinator
-
-import kafka.server.DelayedOperation
-
-/**
- * Delayed heartbeat operations that are added to the purgatory for session timeout checking.
- * Heartbeats are paused during rebalance.
- */
-private[coordinator] class DelayedHeartbeat(coordinator: GroupCoordinator,
-                                            group: GroupMetadata,
-                                            member: MemberMetadata,
-                                            heartbeatDeadline: Long,
-                                            sessionTimeout: Long)
-  extends DelayedOperation(sessionTimeout) {
-
-  // overridden since tryComplete already synchronizes on the group. This makes it safe to
-  // call purgatory operations while holding the group lock.
-  override def safeTryComplete(): Boolean = tryComplete()
-
-  override def tryComplete(): Boolean = coordinator.tryCompleteHeartbeat(group, member, heartbeatDeadline, forceComplete)
-  override def onExpiration() = coordinator.onExpireHeartbeat(group, member, heartbeatDeadline)
-  override def onComplete() = coordinator.onCompleteHeartbeat()
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f69d9415/core/src/main/scala/kafka/coordinator/DelayedJoin.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/DelayedJoin.scala b/core/src/main/scala/kafka/coordinator/DelayedJoin.scala
deleted file mode 100644
index 8744f16..0000000
--- a/core/src/main/scala/kafka/coordinator/DelayedJoin.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.coordinator
-
-import kafka.server.DelayedOperation
-
-/**
- * Delayed rebalance operations that are added to the purgatory when group is preparing for rebalance
- *
- * Whenever a join-group request is received, check if all known group members have requested
- * to re-join the group; if yes, complete this operation to proceed rebalance.
- *
- * When the operation has expired, any known members that have not requested to re-join
- * the group are marked as failed, and complete this operation to proceed rebalance with
- * the rest of the group.
- */
-private[coordinator] class DelayedJoin(coordinator: GroupCoordinator,
-                                       group: GroupMetadata,
-                                       rebalanceTimeout: Long)
-  extends DelayedOperation(rebalanceTimeout) {
-
-  // overridden since tryComplete already synchronizes on the group. This makes it safe to
-  // call purgatory operations while holding the group lock.
-  override def safeTryComplete(): Boolean = tryComplete()
-
-  override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, forceComplete)
-  override def onExpiration() = coordinator.onExpireJoin()
-  override def onComplete() = coordinator.onCompleteJoin(group)
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f69d9415/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
deleted file mode 100644
index d78d1df..0000000
--- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
+++ /dev/null
@@ -1,817 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.coordinator
-
-import java.util.Properties
-import java.util.concurrent.atomic.AtomicBoolean
-
-import kafka.common.{OffsetAndMetadata, OffsetMetadataAndError, TopicAndPartition}
-import kafka.log.LogConfig
-import kafka.message.ProducerCompressionCodec
-import kafka.server._
-import kafka.utils._
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.utils.Time
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.{OffsetFetchResponse, JoinGroupRequest}
-
-import scala.collection.{Map, Seq, immutable}
-
-
-/**
- * GroupCoordinator handles general group membership and offset management.
- *
- * Each Kafka server instantiates a coordinator which is responsible for a set of
- * groups. Groups are assigned to coordinators based on their group names.
- */
-class GroupCoordinator(val brokerId: Int,
-                       val groupConfig: GroupConfig,
-                       val offsetConfig: OffsetConfig,
-                       val groupManager: GroupMetadataManager,
-                       val heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat],
-                       val joinPurgatory: DelayedOperationPurgatory[DelayedJoin],
-                       time: Time) extends Logging {
-  type JoinCallback = JoinGroupResult => Unit
-  type SyncCallback = (Array[Byte], Errors) => Unit
-
-  this.logIdent = "[GroupCoordinator " + brokerId + "]: "
-
-  private val isActive = new AtomicBoolean(false)
-
-  def offsetsTopicConfigs: Properties = {
-    val props = new Properties
-    props.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
-    props.put(LogConfig.SegmentBytesProp, offsetConfig.offsetsTopicSegmentBytes.toString)
-    props.put(LogConfig.CompressionTypeProp, ProducerCompressionCodec.name)
-    props
-  }
-
-  /**
-   * NOTE: If a group lock and metadataLock are simultaneously needed,
-   * be sure to acquire the group lock before metadataLock to prevent deadlock
-   */
-
-  /**
-   * Startup logic executed at the same time when the server starts up.
-   */
-  def startup(enableMetadataExpiration: Boolean = true) {
-    info("Starting up.")
-    if (enableMetadataExpiration)
-      groupManager.enableMetadataExpiration()
-    isActive.set(true)
-    info("Startup complete.")
-  }
-
-  /**
-   * Shutdown logic executed at the same time when server shuts down.
-   * Ordering of actions should be reversed from the startup process.
-   */
-  def shutdown() {
-    info("Shutting down.")
-    isActive.set(false)
-    groupManager.shutdown()
-    heartbeatPurgatory.shutdown()
-    joinPurgatory.shutdown()
-    info("Shutdown complete.")
-  }
-
-  def handleJoinGroup(groupId: String,
-                      memberId: String,
-                      clientId: String,
-                      clientHost: String,
-                      rebalanceTimeoutMs: Int,
-                      sessionTimeoutMs: Int,
-                      protocolType: String,
-                      protocols: List[(String, Array[Byte])],
-                      responseCallback: JoinCallback) {
-    if (!isActive.get) {
-      responseCallback(joinError(memberId, Errors.COORDINATOR_NOT_AVAILABLE))
-    } else if (!validGroupId(groupId)) {
-      responseCallback(joinError(memberId, Errors.INVALID_GROUP_ID))
-    } else if (!isCoordinatorForGroup(groupId)) {
-      responseCallback(joinError(memberId, Errors.NOT_COORDINATOR))
-    } else if (isCoordinatorLoadInProgress(groupId)) {
-      responseCallback(joinError(memberId, Errors.COORDINATOR_LOAD_IN_PROGRESS))
-    } else if (sessionTimeoutMs < groupConfig.groupMinSessionTimeoutMs ||
-               sessionTimeoutMs > groupConfig.groupMaxSessionTimeoutMs) {
-      responseCallback(joinError(memberId, Errors.INVALID_SESSION_TIMEOUT))
-    } else {
-      // only try to create the group if the group is not unknown AND
-      // the member id is UNKNOWN, if member is specified but group does not
-      // exist we should reject the request
-      groupManager.getGroup(groupId) match {
-        case None =>
-          if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID) {
-            responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID))
-          } else {
-            val group = groupManager.addGroup(new GroupMetadata(groupId))
-            doJoinGroup(group, memberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
-          }
-
-        case Some(group) =>
-          doJoinGroup(group, memberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
-      }
-    }
-  }
-
-  private def doJoinGroup(group: GroupMetadata,
-                          memberId: String,
-                          clientId: String,
-                          clientHost: String,
-                          rebalanceTimeoutMs: Int,
-                          sessionTimeoutMs: Int,
-                          protocolType: String,
-                          protocols: List[(String, Array[Byte])],
-                          responseCallback: JoinCallback) {
-    group synchronized {
-      if (!group.is(Empty) && (group.protocolType != Some(protocolType) || !group.supportsProtocols(protocols.map(_._1).toSet))) {
-        // if the new member does not support the group protocol, reject it
-        responseCallback(joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL))
-      } else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID && !group.has(memberId)) {
-        // if the member trying to register with a un-recognized id, send the response to let
-        // it reset its member id and retry
-        responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID))
-      } else {
-        group.currentState match {
-          case Dead =>
-            // if the group is marked as dead, it means some other thread has just removed the group
-            // from the coordinator metadata; this is likely that the group has migrated to some other
-            // coordinator OR the group is in a transient unstable phase. Let the member retry
-            // joining without the specified member id,
-            responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID))
-
-          case PreparingRebalance =>
-            if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
-              addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback)
-            } else {
-              val member = group.get(memberId)
-              updateMemberAndRebalance(group, member, protocols, responseCallback)
-            }
-
-          case AwaitingSync =>
-            if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
-              addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback)
-            } else {
-              val member = group.get(memberId)
-              if (member.matches(protocols)) {
-                // member is joining with the same metadata (which could be because it failed to
-                // receive the initial JoinGroup response), so just return current group information
-                // for the current generation.
-                responseCallback(JoinGroupResult(
-                  members = if (memberId == group.leaderId) {
-                    group.currentMemberMetadata
-                  } else {
-                    Map.empty
-                  },
-                  memberId = memberId,
-                  generationId = group.generationId,
-                  subProtocol = group.protocol,
-                  leaderId = group.leaderId,
-                  error = Errors.NONE))
-              } else {
-                // member has changed metadata, so force a rebalance
-                updateMemberAndRebalance(group, member, protocols, responseCallback)
-              }
-            }
-
-          case Empty | Stable =>
-            if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
-              // if the member id is unknown, register the member to the group
-              addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback)
-            } else {
-              val member = group.get(memberId)
-              if (memberId == group.leaderId || !member.matches(protocols)) {
-                // force a rebalance if a member has changed metadata or if the leader sends JoinGroup.
-                // The latter allows the leader to trigger rebalances for changes affecting assignment
-                // which do not affect the member metadata (such as topic metadata changes for the consumer)
-                updateMemberAndRebalance(group, member, protocols, responseCallback)
-              } else {
-                // for followers with no actual change to their metadata, just return group information
-                // for the current generation which will allow them to issue SyncGroup
-                responseCallback(JoinGroupResult(
-                  members = Map.empty,
-                  memberId = memberId,
-                  generationId = group.generationId,
-                  subProtocol = group.protocol,
-                  leaderId = group.leaderId,
-                  error = Errors.NONE))
-              }
-            }
-        }
-
-        if (group.is(PreparingRebalance))
-          joinPurgatory.checkAndComplete(GroupKey(group.groupId))
-      }
-    }
-  }
-
-  def handleSyncGroup(groupId: String,
-                      generation: Int,
-                      memberId: String,
-                      groupAssignment: Map[String, Array[Byte]],
-                      responseCallback: SyncCallback) {
-    if (!isActive.get) {
-      responseCallback(Array.empty, Errors.COORDINATOR_NOT_AVAILABLE)
-    } else if (!isCoordinatorForGroup(groupId)) {
-      responseCallback(Array.empty, Errors.NOT_COORDINATOR)
-    } else {
-      groupManager.getGroup(groupId) match {
-        case None => responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID)
-        case Some(group) => doSyncGroup(group, generation, memberId, groupAssignment, responseCallback)
-      }
-    }
-  }
-
-  private def doSyncGroup(group: GroupMetadata,
-                          generationId: Int,
-                          memberId: String,
-                          groupAssignment: Map[String, Array[Byte]],
-                          responseCallback: SyncCallback) {
-    var delayedGroupStore: Option[DelayedStore] = None
-
-    group synchronized {
-      if (!group.has(memberId)) {
-        responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID)
-      } else if (generationId != group.generationId) {
-        responseCallback(Array.empty, Errors.ILLEGAL_GENERATION)
-      } else {
-        group.currentState match {
-          case Empty | Dead =>
-            responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID)
-
-          case PreparingRebalance =>
-            responseCallback(Array.empty, Errors.REBALANCE_IN_PROGRESS)
-
-          case AwaitingSync =>
-            group.get(memberId).awaitingSyncCallback = responseCallback
-
-            // if this is the leader, then we can attempt to persist state and transition to stable
-            if (memberId == group.leaderId) {
-              info(s"Assignment received from leader for group ${group.groupId} for generation ${group.generationId}")
-
-              // fill any missing members with an empty assignment
-              val missing = group.allMembers -- groupAssignment.keySet
-              val assignment = groupAssignment ++ missing.map(_ -> Array.empty[Byte]).toMap
-
-              delayedGroupStore = groupManager.prepareStoreGroup(group, assignment, (error: Errors) => {
-                group synchronized {
-                  // another member may have joined the group while we were awaiting this callback,
-                  // so we must ensure we are still in the AwaitingSync state and the same generation
-                  // when it gets invoked. if we have transitioned to another state, then do nothing
-                  if (group.is(AwaitingSync) && generationId == group.generationId) {
-                    if (error != Errors.NONE) {
-                      resetAndPropagateAssignmentError(group, error)
-                      maybePrepareRebalance(group)
-                    } else {
-                      setAndPropagateAssignment(group, assignment)
-                      group.transitionTo(Stable)
-                    }
-                  }
-                }
-              })
-            }
-
-          case Stable =>
-            // if the group is stable, we just return the current assignment
-            val memberMetadata = group.get(memberId)
-            responseCallback(memberMetadata.assignment, Errors.NONE)
-            completeAndScheduleNextHeartbeatExpiration(group, group.get(memberId))
-        }
-      }
-    }
-
-    // store the group metadata without holding the group lock to avoid the potential
-    // for deadlock if the callback is invoked holding other locks (e.g. the replica
-    // state change lock)
-    delayedGroupStore.foreach(groupManager.store)
-  }
-
-  def handleLeaveGroup(groupId: String, memberId: String, responseCallback: Errors => Unit) {
-    if (!isActive.get) {
-      responseCallback(Errors.COORDINATOR_NOT_AVAILABLE)
-    } else if (!isCoordinatorForGroup(groupId)) {
-      responseCallback(Errors.NOT_COORDINATOR)
-    } else if (isCoordinatorLoadInProgress(groupId)) {
-      responseCallback(Errors.COORDINATOR_LOAD_IN_PROGRESS)
-    } else {
-      groupManager.getGroup(groupId) match {
-        case None =>
-          // if the group is marked as dead, it means some other thread has just removed the group
-          // from the coordinator metadata; this is likely that the group has migrated to some other
-          // coordinator OR the group is in a transient unstable phase. Let the consumer to retry
-          // joining without specified consumer id,
-          responseCallback(Errors.UNKNOWN_MEMBER_ID)
-
-        case Some(group) =>
-          group synchronized {
-            if (group.is(Dead) || !group.has(memberId)) {
-              responseCallback(Errors.UNKNOWN_MEMBER_ID)
-            } else {
-              val member = group.get(memberId)
-              removeHeartbeatForLeavingMember(group, member)
-              onMemberFailure(group, member)
-              responseCallback(Errors.NONE)
-            }
-          }
-      }
-    }
-  }
-
-  def handleHeartbeat(groupId: String,
-                      memberId: String,
-                      generationId: Int,
-                      responseCallback: Errors => Unit) {
-    if (!isActive.get) {
-      responseCallback(Errors.COORDINATOR_NOT_AVAILABLE)
-    } else if (!isCoordinatorForGroup(groupId)) {
-      responseCallback(Errors.NOT_COORDINATOR)
-    } else if (isCoordinatorLoadInProgress(groupId)) {
-      // the group is still loading, so respond just blindly
-      responseCallback(Errors.NONE)
-    } else {
-      groupManager.getGroup(groupId) match {
-        case None =>
-          responseCallback(Errors.UNKNOWN_MEMBER_ID)
-
-        case Some(group) =>
-          group synchronized {
-            group.currentState match {
-              case Dead =>
-                // if the group is marked as dead, it means some other thread has just removed the group
-                // from the coordinator metadata; this is likely that the group has migrated to some other
-                // coordinator OR the group is in a transient unstable phase. Let the member retry
-                // joining without the specified member id,
-                responseCallback(Errors.UNKNOWN_MEMBER_ID)
-
-              case Empty =>
-                responseCallback(Errors.UNKNOWN_MEMBER_ID)
-
-              case AwaitingSync =>
-                if (!group.has(memberId))
-                  responseCallback(Errors.UNKNOWN_MEMBER_ID)
-                else
-                  responseCallback(Errors.REBALANCE_IN_PROGRESS)
-
-              case PreparingRebalance =>
-                if (!group.has(memberId)) {
-                  responseCallback(Errors.UNKNOWN_MEMBER_ID)
-                } else if (generationId != group.generationId) {
-                  responseCallback(Errors.ILLEGAL_GENERATION)
-                } else {
-                  val member = group.get(memberId)
-                  completeAndScheduleNextHeartbeatExpiration(group, member)
-                  responseCallback(Errors.REBALANCE_IN_PROGRESS)
-                }
-
-              case Stable =>
-                if (!group.has(memberId)) {
-                  responseCallback(Errors.UNKNOWN_MEMBER_ID)
-                } else if (generationId != group.generationId) {
-                  responseCallback(Errors.ILLEGAL_GENERATION)
-                } else {
-                  val member = group.get(memberId)
-                  completeAndScheduleNextHeartbeatExpiration(group, member)
-                  responseCallback(Errors.NONE)
-                }
-            }
-          }
-      }
-    }
-  }
-
-  def handleCommitOffsets(groupId: String,
-                          memberId: String,
-                          generationId: Int,
-                          offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
-                          responseCallback: immutable.Map[TopicPartition, Errors] => Unit) {
-    if (!isActive.get) {
-      responseCallback(offsetMetadata.mapValues(_ => Errors.COORDINATOR_NOT_AVAILABLE))
-    } else if (!isCoordinatorForGroup(groupId)) {
-      responseCallback(offsetMetadata.mapValues(_ => Errors.NOT_COORDINATOR))
-    } else if (isCoordinatorLoadInProgress(groupId)) {
-      responseCallback(offsetMetadata.mapValues(_ => Errors.COORDINATOR_LOAD_IN_PROGRESS))
-    } else {
-      groupManager.getGroup(groupId) match {
-        case None =>
-          if (generationId < 0) {
-            // the group is not relying on Kafka for group management, so allow the commit
-            val group = groupManager.addGroup(new GroupMetadata(groupId))
-            doCommitOffsets(group, memberId, generationId, offsetMetadata, responseCallback)
-          } else {
-            // or this is a request coming from an older generation. either way, reject the commit
-            responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION))
-          }
-
-        case Some(group) =>
-          doCommitOffsets(group, memberId, generationId, offsetMetadata, responseCallback)
-      }
-    }
-  }
-
-  private def doCommitOffsets(group: GroupMetadata,
-                              memberId: String,
-                              generationId: Int,
-                              offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
-                              responseCallback: immutable.Map[TopicPartition, Errors] => Unit) {
-    var delayedOffsetStore: Option[DelayedStore] = None
-
-    group synchronized {
-      if (group.is(Dead)) {
-        responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID))
-      } else if (generationId < 0 && group.is(Empty)) {
-        // the group is only using Kafka to store offsets
-        delayedOffsetStore = groupManager.prepareStoreOffsets(group, memberId, generationId,
-          offsetMetadata, responseCallback)
-      } else if (group.is(AwaitingSync)) {
-        responseCallback(offsetMetadata.mapValues(_ => Errors.REBALANCE_IN_PROGRESS))
-      } else if (!group.has(memberId)) {
-        responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID))
-      } else if (generationId != group.generationId) {
-        responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION))
-      } else {
-        val member = group.get(memberId)
-        completeAndScheduleNextHeartbeatExpiration(group, member)
-        delayedOffsetStore = groupManager.prepareStoreOffsets(group, memberId, generationId,
-          offsetMetadata, responseCallback)
-      }
-    }
-
-    // store the offsets without holding the group lock
-    delayedOffsetStore.foreach(groupManager.store)
-  }
-
-  def handleFetchOffsets(groupId: String,
-                         partitions: Option[Seq[TopicPartition]] = None): (Errors, Map[TopicPartition, OffsetFetchResponse.PartitionData]) = {
-    if (!isActive.get)
-      (Errors.COORDINATOR_NOT_AVAILABLE, Map())
-    else if (!isCoordinatorForGroup(groupId)) {
-      debug("Could not fetch offsets for group %s (not group coordinator).".format(groupId))
-      (Errors.NOT_COORDINATOR, Map())
-    } else if (isCoordinatorLoadInProgress(groupId))
-      (Errors.COORDINATOR_LOAD_IN_PROGRESS, Map())
-    else {
-      // return offsets blindly regardless the current group state since the group may be using
-      // Kafka commit storage without automatic group management
-      (Errors.NONE, groupManager.getOffsets(groupId, partitions))
-    }
-  }
-
-  def handleListGroups(): (Errors, List[GroupOverview]) = {
-    if (!isActive.get) {
-      (Errors.COORDINATOR_NOT_AVAILABLE, List[GroupOverview]())
-    } else {
-      val errorCode = if (groupManager.isLoading) Errors.COORDINATOR_LOAD_IN_PROGRESS else Errors.NONE
-      (errorCode, groupManager.currentGroups.map(_.overview).toList)
-    }
-  }
-
-  def handleDescribeGroup(groupId: String): (Errors, GroupSummary) = {
-    if (!isActive.get) {
-      (Errors.COORDINATOR_NOT_AVAILABLE, GroupCoordinator.EmptyGroup)
-    } else if (!isCoordinatorForGroup(groupId)) {
-      (Errors.NOT_COORDINATOR, GroupCoordinator.EmptyGroup)
-    } else if (isCoordinatorLoadInProgress(groupId)) {
-      (Errors.COORDINATOR_LOAD_IN_PROGRESS, GroupCoordinator.EmptyGroup)
-    } else {
-      groupManager.getGroup(groupId) match {
-        case None => (Errors.NONE, GroupCoordinator.DeadGroup)
-        case Some(group) =>
-          group synchronized {
-            (Errors.NONE, group.summary)
-          }
-      }
-    }
-  }
-
-  def handleDeletedPartitions(topicPartitions: Seq[TopicPartition]) {
-    groupManager.cleanupGroupMetadata(Some(topicPartitions))
-  }
-
-  private def onGroupUnloaded(group: GroupMetadata) {
-    group synchronized {
-      info(s"Unloading group metadata for ${group.groupId} with generation ${group.generationId}")
-      val previousState = group.currentState
-      group.transitionTo(Dead)
-
-      previousState match {
-        case Empty | Dead =>
-        case PreparingRebalance =>
-          for (member <- group.allMemberMetadata) {
-            if (member.awaitingJoinCallback != null) {
-              member.awaitingJoinCallback(joinError(member.memberId, Errors.NOT_COORDINATOR))
-              member.awaitingJoinCallback = null
-            }
-          }
-          joinPurgatory.checkAndComplete(GroupKey(group.groupId))
-
-        case Stable | AwaitingSync =>
-          for (member <- group.allMemberMetadata) {
-            if (member.awaitingSyncCallback != null) {
-              member.awaitingSyncCallback(Array.empty[Byte], Errors.NOT_COORDINATOR)
-              member.awaitingSyncCallback = null
-            }
-            heartbeatPurgatory.checkAndComplete(MemberKey(member.groupId, member.memberId))
-          }
-      }
-    }
-  }
-
-  private def onGroupLoaded(group: GroupMetadata) {
-    group synchronized {
-      info(s"Loading group metadata for ${group.groupId} with generation ${group.generationId}")
-      assert(group.is(Stable) || group.is(Empty))
-      group.allMemberMetadata.foreach(completeAndScheduleNextHeartbeatExpiration(group, _))
-    }
-  }
-
-  def handleGroupImmigration(offsetTopicPartitionId: Int) {
-    groupManager.loadGroupsForPartition(offsetTopicPartitionId, onGroupLoaded)
-  }
-
-  def handleGroupEmigration(offsetTopicPartitionId: Int) {
-    groupManager.removeGroupsForPartition(offsetTopicPartitionId, onGroupUnloaded)
-  }
-
-  private def setAndPropagateAssignment(group: GroupMetadata, assignment: Map[String, Array[Byte]]) {
-    assert(group.is(AwaitingSync))
-    group.allMemberMetadata.foreach(member => member.assignment = assignment(member.memberId))
-    propagateAssignment(group, Errors.NONE)
-  }
-
-  private def resetAndPropagateAssignmentError(group: GroupMetadata, error: Errors) {
-    assert(group.is(AwaitingSync))
-    group.allMemberMetadata.foreach(_.assignment = Array.empty[Byte])
-    propagateAssignment(group, error)
-  }
-
-  private def propagateAssignment(group: GroupMetadata, error: Errors) {
-    for (member <- group.allMemberMetadata) {
-      if (member.awaitingSyncCallback != null) {
-        member.awaitingSyncCallback(member.assignment, error)
-        member.awaitingSyncCallback = null
-
-        // reset the session timeout for members after propagating the member's assignment.
-        // This is because if any member's session expired while we were still awaiting either
-        // the leader sync group or the storage callback, its expiration will be ignored and no
-        // future heartbeat expectations will not be scheduled.
-        completeAndScheduleNextHeartbeatExpiration(group, member)
-      }
-    }
-  }
-
-  private def validGroupId(groupId: String): Boolean = {
-    groupId != null && !groupId.isEmpty
-  }
-
-  private def joinError(memberId: String, error: Errors): JoinGroupResult = {
-    JoinGroupResult(
-      members = Map.empty,
-      memberId = memberId,
-      generationId = 0,
-      subProtocol = GroupCoordinator.NoProtocol,
-      leaderId = GroupCoordinator.NoLeader,
-      error = error)
-  }
-
-  /**
-   * Complete existing DelayedHeartbeats for the given member and schedule the next one
-   */
-  private def completeAndScheduleNextHeartbeatExpiration(group: GroupMetadata, member: MemberMetadata) {
-    // complete current heartbeat expectation
-    member.latestHeartbeat = time.milliseconds()
-    val memberKey = MemberKey(member.groupId, member.memberId)
-    heartbeatPurgatory.checkAndComplete(memberKey)
-
-    // reschedule the next heartbeat expiration deadline
-    val newHeartbeatDeadline = member.latestHeartbeat + member.sessionTimeoutMs
-    val delayedHeartbeat = new DelayedHeartbeat(this, group, member, newHeartbeatDeadline, member.sessionTimeoutMs)
-    heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(memberKey))
-  }
-
-  private def removeHeartbeatForLeavingMember(group: GroupMetadata, member: MemberMetadata) {
-    member.isLeaving = true
-    val memberKey = MemberKey(member.groupId, member.memberId)
-    heartbeatPurgatory.checkAndComplete(memberKey)
-  }
-
-  private def addMemberAndRebalance(rebalanceTimeoutMs: Int,
-                                    sessionTimeoutMs: Int,
-                                    clientId: String,
-                                    clientHost: String,
-                                    protocolType: String,
-                                    protocols: List[(String, Array[Byte])],
-                                    group: GroupMetadata,
-                                    callback: JoinCallback) = {
-    // use the client-id with a random id suffix as the member-id
-    val memberId = clientId + "-" + group.generateMemberIdSuffix
-    val member = new MemberMetadata(memberId, group.groupId, clientId, clientHost, rebalanceTimeoutMs,
-      sessionTimeoutMs, protocolType, protocols)
-    member.awaitingJoinCallback = callback
-    group.add(member)
-    maybePrepareRebalance(group)
-    member
-  }
-
-  private def updateMemberAndRebalance(group: GroupMetadata,
-                                       member: MemberMetadata,
-                                       protocols: List[(String, Array[Byte])],
-                                       callback: JoinCallback) {
-    member.supportedProtocols = protocols
-    member.awaitingJoinCallback = callback
-    maybePrepareRebalance(group)
-  }
-
-  private def maybePrepareRebalance(group: GroupMetadata) {
-    group synchronized {
-      if (group.canRebalance)
-        prepareRebalance(group)
-    }
-  }
-
-  private def prepareRebalance(group: GroupMetadata) {
-    // if any members are awaiting sync, cancel their request and have them rejoin
-    if (group.is(AwaitingSync))
-      resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS)
-
-    group.transitionTo(PreparingRebalance)
-    info("Preparing to restabilize group %s with old generation %s".format(group.groupId, group.generationId))
-
-    val rebalanceTimeout = group.rebalanceTimeoutMs
-    val delayedRebalance = new DelayedJoin(this, group, rebalanceTimeout)
-    val groupKey = GroupKey(group.groupId)
-    joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))
-  }
-
-  private def onMemberFailure(group: GroupMetadata, member: MemberMetadata) {
-    trace("Member %s in group %s has failed".format(member.memberId, group.groupId))
-    group.remove(member.memberId)
-    group.currentState match {
-      case Dead | Empty =>
-      case Stable | AwaitingSync => maybePrepareRebalance(group)
-      case PreparingRebalance => joinPurgatory.checkAndComplete(GroupKey(group.groupId))
-    }
-  }
-
-  def tryCompleteJoin(group: GroupMetadata, forceComplete: () => Boolean) = {
-    group synchronized {
-      if (group.notYetRejoinedMembers.isEmpty)
-        forceComplete()
-      else false
-    }
-  }
-
-  def onExpireJoin() {
-    // TODO: add metrics for restabilize timeouts
-  }
-
-  def onCompleteJoin(group: GroupMetadata) {
-    var delayedStore: Option[DelayedStore] = None
-    group synchronized {
-      // remove any members who haven't joined the group yet
-      group.notYetRejoinedMembers.foreach { failedMember =>
-        group.remove(failedMember.memberId)
-        // TODO: cut the socket connection to the client
-      }
-
-      if (!group.is(Dead)) {
-        group.initNextGeneration()
-        if (group.is(Empty)) {
-          info(s"Group ${group.groupId} with generation ${group.generationId} is now empty")
-
-          delayedStore = groupManager.prepareStoreGroup(group, Map.empty, error => {
-            if (error != Errors.NONE) {
-              // we failed to write the empty group metadata. If the broker fails before another rebalance,
-              // the previous generation written to the log will become active again (and most likely timeout).
-              // This should be safe since there are no active members in an empty generation, so we just warn.
-              warn(s"Failed to write empty metadata for group ${group.groupId}: ${error.message}")
-            }
-          })
-        } else {
-          info(s"Stabilized group ${group.groupId} generation ${group.generationId}")
-
-          // trigger the awaiting join group response callback for all the members after rebalancing
-          for (member <- group.allMemberMetadata) {
-            assert(member.awaitingJoinCallback != null)
-            val joinResult = JoinGroupResult(
-              members = if (member.memberId == group.leaderId) { group.currentMemberMetadata } else { Map.empty },
-              memberId = member.memberId,
-              generationId = group.generationId,
-              subProtocol = group.protocol,
-              leaderId = group.leaderId,
-              error = Errors.NONE)
-
-            member.awaitingJoinCallback(joinResult)
-            member.awaitingJoinCallback = null
-            completeAndScheduleNextHeartbeatExpiration(group, member)
-          }
-        }
-      }
-    }
-
-    // call without holding the group lock
-    delayedStore.foreach(groupManager.store)
-  }
-
-  def tryCompleteHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long, forceComplete: () => Boolean) = {
-    group synchronized {
-      if (shouldKeepMemberAlive(member, heartbeatDeadline) || member.isLeaving)
-        forceComplete()
-      else false
-    }
-  }
-
-  def onExpireHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long) {
-    group synchronized {
-      if (!shouldKeepMemberAlive(member, heartbeatDeadline))
-        onMemberFailure(group, member)
-    }
-  }
-
-  def onCompleteHeartbeat() {
-    // TODO: add metrics for complete heartbeats
-  }
-
-  def partitionFor(group: String): Int = groupManager.partitionFor(group)
-
-  private def shouldKeepMemberAlive(member: MemberMetadata, heartbeatDeadline: Long) =
-    member.awaitingJoinCallback != null ||
-      member.awaitingSyncCallback != null ||
-      member.latestHeartbeat + member.sessionTimeoutMs > heartbeatDeadline
-
-  private def isCoordinatorForGroup(groupId: String) = groupManager.isGroupLocal(groupId)
-
-  private def isCoordinatorLoadInProgress(groupId: String) = groupManager.isGroupLoading(groupId)
-}
-
-object GroupCoordinator {
-
-  val NoState = ""
-  val NoProtocolType = ""
-  val NoProtocol = ""
-  val NoLeader = ""
-  val NoMembers = List[MemberSummary]()
-  val EmptyGroup = GroupSummary(NoState, NoProtocolType, NoProtocol, NoMembers)
-  val DeadGroup = GroupSummary(Dead.toString, NoProtocolType, NoProtocol, NoMembers)
-
-  def apply(config: KafkaConfig,
-            zkUtils: ZkUtils,
-            replicaManager: ReplicaManager,
-            time: Time): GroupCoordinator = {
-    val heartbeatPurgatory = DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", config.brokerId)
-    val joinPurgatory = DelayedOperationPurgatory[DelayedJoin]("Rebalance", config.brokerId)
-    apply(config, zkUtils, replicaManager, heartbeatPurgatory, joinPurgatory, time)
-  }
-
-  private[coordinator] def offsetConfig(config: KafkaConfig) = OffsetConfig(
-    maxMetadataSize = config.offsetMetadataMaxSize,
-    loadBufferSize = config.offsetsLoadBufferSize,
-    offsetsRetentionMs = config.offsetsRetentionMinutes * 60L * 1000L,
-    offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs,
-    offsetsTopicNumPartitions = config.offsetsTopicPartitions,
-    offsetsTopicSegmentBytes = config.offsetsTopicSegmentBytes,
-    offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor,
-    offsetsTopicCompressionCodec = config.offsetsTopicCompressionCodec,
-    offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,
-    offsetCommitRequiredAcks = config.offsetCommitRequiredAcks
-  )
-
-  def apply(config: KafkaConfig,
-            zkUtils: ZkUtils,
-            replicaManager: ReplicaManager,
-            heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat],
-            joinPurgatory: DelayedOperationPurgatory[DelayedJoin],
-            time: Time): GroupCoordinator = {
-    val offsetConfig = this.offsetConfig(config)
-    val groupConfig = GroupConfig(groupMinSessionTimeoutMs = config.groupMinSessionTimeoutMs,
-      groupMaxSessionTimeoutMs = config.groupMaxSessionTimeoutMs)
-
-    val groupMetadataManager = new GroupMetadataManager(config.brokerId, config.interBrokerProtocolVersion,
-      offsetConfig, replicaManager, zkUtils, time)
-    new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, groupMetadataManager, heartbeatPurgatory, joinPurgatory, time)
-  }
-
-}
-
-case class GroupConfig(groupMinSessionTimeoutMs: Int,
-                       groupMaxSessionTimeoutMs: Int)
-
-case class JoinGroupResult(members: Map[String, Array[Byte]],
-                           memberId: String,
-                           generationId: Int,
-                           subProtocol: String,
-                           leaderId: String,
-                           error: Errors)

http://git-wip-us.apache.org/repos/asf/kafka/blob/f69d9415/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
deleted file mode 100644
index 4ea5bdd..0000000
--- a/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
+++ /dev/null
@@ -1,327 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.coordinator
-
-import collection.{Seq, mutable, immutable}
-
-import java.util.UUID
-
-import kafka.common.OffsetAndMetadata
-import kafka.utils.nonthreadsafe
-import org.apache.kafka.common.TopicPartition
-
-private[coordinator] sealed trait GroupState { def state: Byte }
-
-/**
- * Group is preparing to rebalance
- *
- * action: respond to heartbeats with REBALANCE_IN_PROGRESS
- *         respond to sync group with REBALANCE_IN_PROGRESS
- *         remove member on leave group request
- *         park join group requests from new or existing members until all expected members have joined
- *         allow offset commits from previous generation
- *         allow offset fetch requests
- * transition: some members have joined by the timeout => AwaitingSync
- *             all members have left the group => Empty
- *             group is removed by partition emigration => Dead
- */
-private[coordinator] case object PreparingRebalance extends GroupState { val state: Byte = 1 }
-
-/**
- * Group is awaiting state assignment from the leader
- *
- * action: respond to heartbeats with REBALANCE_IN_PROGRESS
- *         respond to offset commits with REBALANCE_IN_PROGRESS
- *         park sync group requests from followers until transition to Stable
- *         allow offset fetch requests
- * transition: sync group with state assignment received from leader => Stable
- *             join group from new member or existing member with updated metadata => PreparingRebalance
- *             leave group from existing member => PreparingRebalance
- *             member failure detected => PreparingRebalance
- *             group is removed by partition emigration => Dead
- */
-private[coordinator] case object AwaitingSync extends GroupState { val state: Byte = 5}
-
-/**
- * Group is stable
- *
- * action: respond to member heartbeats normally
- *         respond to sync group from any member with current assignment
- *         respond to join group from followers with matching metadata with current group metadata
- *         allow offset commits from member of current generation
- *         allow offset fetch requests
- * transition: member failure detected via heartbeat => PreparingRebalance
- *             leave group from existing member => PreparingRebalance
- *             leader join-group received => PreparingRebalance
- *             follower join-group with new metadata => PreparingRebalance
- *             group is removed by partition emigration => Dead
- */
-private[coordinator] case object Stable extends GroupState { val state: Byte = 3 }
-
-/**
- * Group has no more members and its metadata is being removed
- *
- * action: respond to join group with UNKNOWN_MEMBER_ID
- *         respond to sync group with UNKNOWN_MEMBER_ID
- *         respond to heartbeat with UNKNOWN_MEMBER_ID
- *         respond to leave group with UNKNOWN_MEMBER_ID
- *         respond to offset commit with UNKNOWN_MEMBER_ID
- *         allow offset fetch requests
- * transition: Dead is a final state before group metadata is cleaned up, so there are no transitions
- */
-private[coordinator] case object Dead extends GroupState { val state: Byte = 4 }
-
-/**
-  * Group has no more members, but lingers until all offsets have expired. This state
-  * also represents groups which use Kafka only for offset commits and have no members.
-  *
-  * action: respond normally to join group from new members
-  *         respond to sync group with UNKNOWN_MEMBER_ID
-  *         respond to heartbeat with UNKNOWN_MEMBER_ID
-  *         respond to leave group with UNKNOWN_MEMBER_ID
-  *         respond to offset commit with UNKNOWN_MEMBER_ID
-  *         allow offset fetch requests
-  * transition: last offsets removed in periodic expiration task => Dead
-  *             join group from a new member => PreparingRebalance
-  *             group is removed by partition emigration => Dead
-  *             group is removed by expiration => Dead
-  */
-private[coordinator] case object Empty extends GroupState { val state: Byte = 5 }
-
-
-private object GroupMetadata {
-  private val validPreviousStates: Map[GroupState, Set[GroupState]] =
-    Map(Dead -> Set(Stable, PreparingRebalance, AwaitingSync, Empty, Dead),
-      AwaitingSync -> Set(PreparingRebalance),
-      Stable -> Set(AwaitingSync),
-      PreparingRebalance -> Set(Stable, AwaitingSync, Empty),
-      Empty -> Set(PreparingRebalance))
-}
-
-/**
- * Case class used to represent group metadata for the ListGroups API
- */
-case class GroupOverview(groupId: String,
-                         protocolType: String)
-
-/**
- * Case class used to represent group metadata for the DescribeGroup API
- */
-case class GroupSummary(state: String,
-                        protocolType: String,
-                        protocol: String,
-                        members: List[MemberSummary])
-
-/**
- * Group contains the following metadata:
- *
- *  Membership metadata:
- *  1. Members registered in this group
- *  2. Current protocol assigned to the group (e.g. partition assignment strategy for consumers)
- *  3. Protocol metadata associated with group members
- *
- *  State metadata:
- *  1. group state
- *  2. generation id
- *  3. leader id
- */
-@nonthreadsafe
-private[coordinator] class GroupMetadata(val groupId: String, initialState: GroupState = Empty) {
-
-  private var state: GroupState = initialState
-  private val members = new mutable.HashMap[String, MemberMetadata]
-  private val offsets = new mutable.HashMap[TopicPartition, OffsetAndMetadata]
-  private val pendingOffsetCommits = new mutable.HashMap[TopicPartition, OffsetAndMetadata]
-
-  var protocolType: Option[String] = None
-  var generationId = 0
-  var leaderId: String = null
-  var protocol: String = null
-
-  def is(groupState: GroupState) = state == groupState
-  def not(groupState: GroupState) = state != groupState
-  def has(memberId: String) = members.contains(memberId)
-  def get(memberId: String) = members(memberId)
-
-  def add(member: MemberMetadata) {
-    if (members.isEmpty)
-      this.protocolType = Some(member.protocolType)
-
-    assert(groupId == member.groupId)
-    assert(this.protocolType.orNull == member.protocolType)
-    assert(supportsProtocols(member.protocols))
-
-    if (leaderId == null)
-      leaderId = member.memberId
-    members.put(member.memberId, member)
-  }
-
-  def remove(memberId: String) {
-    members.remove(memberId)
-    if (memberId == leaderId) {
-      leaderId = if (members.isEmpty) {
-        null
-      } else {
-        members.keys.head
-      }
-    }
-  }
-
-  def currentState = state
-
-  def notYetRejoinedMembers = members.values.filter(_.awaitingJoinCallback == null).toList
-
-  def allMembers = members.keySet
-
-  def allMemberMetadata = members.values.toList
-
-  def rebalanceTimeoutMs = members.values.foldLeft(0) { (timeout, member) =>
-    timeout.max(member.rebalanceTimeoutMs)
-  }
-
-  // TODO: decide if ids should be predictable or random
-  def generateMemberIdSuffix = UUID.randomUUID().toString
-
-  def canRebalance = GroupMetadata.validPreviousStates(PreparingRebalance).contains(state)
-
-  def transitionTo(groupState: GroupState) {
-    assertValidTransition(groupState)
-    state = groupState
-  }
-
-  def selectProtocol: String = {
-    if (members.isEmpty)
-      throw new IllegalStateException("Cannot select protocol for empty group")
-
-    // select the protocol for this group which is supported by all members
-    val candidates = candidateProtocols
-
-    // let each member vote for one of the protocols and choose the one with the most votes
-    val votes: List[(String, Int)] = allMemberMetadata
-      .map(_.vote(candidates))
-      .groupBy(identity)
-      .mapValues(_.size)
-      .toList
-
-    votes.maxBy(_._2)._1
-  }
-
-  private def candidateProtocols = {
-    // get the set of protocols that are commonly supported by all members
-    allMemberMetadata
-      .map(_.protocols)
-      .reduceLeft((commonProtocols, protocols) => commonProtocols & protocols)
-  }
-
-  def supportsProtocols(memberProtocols: Set[String]) = {
-    members.isEmpty || (memberProtocols & candidateProtocols).nonEmpty
-  }
-
-  def initNextGeneration() = {
-    assert(notYetRejoinedMembers == List.empty[MemberMetadata])
-    if (members.nonEmpty) {
-      generationId += 1
-      protocol = selectProtocol
-      transitionTo(AwaitingSync)
-    } else {
-      generationId += 1
-      protocol = null
-      transitionTo(Empty)
-    }
-  }
-
-  def currentMemberMetadata: Map[String, Array[Byte]] = {
-    if (is(Dead) || is(PreparingRebalance))
-      throw new IllegalStateException("Cannot obtain member metadata for group in state %s".format(state))
-    members.map{ case (memberId, memberMetadata) => (memberId, memberMetadata.metadata(protocol))}.toMap
-  }
-
-  def summary: GroupSummary = {
-    if (is(Stable)) {
-      val members = this.members.values.map { member => member.summary(protocol) }.toList
-      GroupSummary(state.toString, protocolType.getOrElse(""), protocol, members)
-    } else {
-      val members = this.members.values.map{ member => member.summaryNoMetadata() }.toList
-      GroupSummary(state.toString, protocolType.getOrElse(""), GroupCoordinator.NoProtocol, members)
-    }
-  }
-
-  def overview: GroupOverview = {
-    GroupOverview(groupId, protocolType.getOrElse(""))
-  }
-
-  def initializeOffsets(offsets: collection.Map[TopicPartition, OffsetAndMetadata]) {
-     this.offsets ++= offsets
-  }
-
-  def completePendingOffsetWrite(topicPartition: TopicPartition, offset: OffsetAndMetadata) {
-    if (pendingOffsetCommits.contains(topicPartition))
-      offsets.put(topicPartition, offset)
-
-    pendingOffsetCommits.get(topicPartition) match {
-      case Some(stagedOffset) if offset == stagedOffset => pendingOffsetCommits.remove(topicPartition)
-      case _ =>
-    }
-  }
-
-  def failPendingOffsetWrite(topicPartition: TopicPartition, offset: OffsetAndMetadata): Unit = {
-    pendingOffsetCommits.get(topicPartition) match {
-      case Some(pendingOffset) if offset == pendingOffset => pendingOffsetCommits.remove(topicPartition)
-      case _ =>
-    }
-  }
-
-  def prepareOffsetCommit(offsets: Map[TopicPartition, OffsetAndMetadata]) {
-    pendingOffsetCommits ++= offsets
-  }
-
-  def removeOffsets(topicPartitions: Seq[TopicPartition]): immutable.Map[TopicPartition, OffsetAndMetadata] = {
-    topicPartitions.flatMap { topicPartition =>
-      pendingOffsetCommits.remove(topicPartition)
-      val removedOffset = offsets.remove(topicPartition)
-      removedOffset.map(topicPartition -> _)
-    }.toMap
-  }
-
-  def removeExpiredOffsets(startMs: Long) = {
-    val expiredOffsets = offsets.filter {
-      case (topicPartition, offset) => offset.expireTimestamp < startMs && !pendingOffsetCommits.contains(topicPartition)
-    }
-    offsets --= expiredOffsets.keySet
-    expiredOffsets.toMap
-  }
-
-  def allOffsets = offsets.toMap
-
-  def offset(topicPartition: TopicPartition) = offsets.get(topicPartition)
-
-  def numOffsets = offsets.size
-
-  def hasOffsets = offsets.nonEmpty || pendingOffsetCommits.nonEmpty
-
-  private def assertValidTransition(targetState: GroupState) {
-    if (!GroupMetadata.validPreviousStates(targetState).contains(state))
-      throw new IllegalStateException("Group %s should be in the %s states before moving to %s state. Instead it is in %s state"
-        .format(groupId, GroupMetadata.validPreviousStates(targetState).mkString(","), targetState, state))
-  }
-
-  override def toString = {
-    "[%s,%s,%s,%s]".format(groupId, protocolType, currentState.toString, members)
-  }
-}
-