You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/01/24 04:20:54 UTC

[kafka] branch 1.0 updated: KAFKA-6287; Consumer group command should list simple consumer groups (#4407)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.0 by this push:
     new 4f5cb2d  KAFKA-6287; Consumer group command should list simple consumer groups (#4407)
4f5cb2d is described below

commit 4f5cb2d9088863ea9782313fd995e48d55c57b01
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Tue Jan 23 18:58:49 2018 -0800

    KAFKA-6287; Consumer group command should list simple consumer groups (#4407)
    
    With this patch, simple consumer groups which only use Kafka for offset storage will be viewable using the `--list` option in consumer-groups.sh. In addition, this patch fixes a bug in the offset loading logic which caused us to lose the protocol type of empty groups on coordinator failover. I also did some cleanup of the various consumer group command test cases.
    
    For testing, I have added new integration tests which cover listing and describing simple consumer groups. I also added unit tests to cover loading empty groups with assertions on the protocol type.
    
    Reviewers: Rajini Sivaram <ra...@googlemail.com>
---
 core/src/main/scala/kafka/admin/AdminClient.scala  |  18 +-
 .../scala/kafka/admin/ConsumerGroupCommand.scala   |  30 ++--
 .../kafka/coordinator/group/GroupCoordinator.scala |  33 ++--
 .../kafka/coordinator/group/GroupMetadata.scala    |  66 ++++---
 .../coordinator/group/GroupMetadataManager.scala   |  44 +++--
 .../main/scala/kafka/tools/DumpLogSegments.scala   |   2 +-
 .../SaslClientsWithInvalidCredentialsTest.scala    |   2 +-
 .../kafka/admin/ConsumerGroupCommandTest.scala     | 197 +++++++++++++++++++++
 .../kafka/admin/DescribeConsumerGroupTest.scala    | 173 +++++-------------
 .../unit/kafka/admin/ListConsumerGroupTest.scala   |  81 ++++-----
 .../kafka/admin/ResetConsumerGroupOffsetTest.scala | 193 ++++++++------------
 .../coordinator/group/GroupCoordinatorTest.scala   |  32 +++-
 .../group/GroupMetadataManagerTest.scala           | 183 ++++++++++++++-----
 .../coordinator/group/GroupMetadataTest.scala      |  10 +-
 14 files changed, 630 insertions(+), 434 deletions(-)

diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 24149d7..d794eb2 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -167,7 +167,7 @@ class AdminClient(val time: Time,
   }
 
   def listAllGroups(): Map[Node, List[GroupOverview]] = {
-    findAllBrokers.map { broker =>
+    findAllBrokers().map { broker =>
       broker -> {
         try {
           listGroups(broker)
@@ -182,16 +182,22 @@ class AdminClient(val time: Time,
 
   def listAllConsumerGroups(): Map[Node, List[GroupOverview]] = {
     listAllGroups().mapValues { groups =>
-      groups.filter(_.protocolType == ConsumerProtocol.PROTOCOL_TYPE)
+      groups.filter(isConsumerGroup)
     }
   }
 
   def listAllGroupsFlattened(): List[GroupOverview] = {
-    listAllGroups.values.flatten.toList
+    listAllGroups().values.flatten.toList
   }
 
   def listAllConsumerGroupsFlattened(): List[GroupOverview] = {
-    listAllGroupsFlattened.filter(_.protocolType == ConsumerProtocol.PROTOCOL_TYPE)
+    listAllGroupsFlattened().filter(isConsumerGroup)
+  }
+
+  private def isConsumerGroup(group: GroupOverview): Boolean = {
+    // Consumer groups which are using group management use the "consumer" protocol type.
+    // Consumer groups which are only using offset storage will have an empty protocol type.
+    group.protocolType.isEmpty || group.protocolType == ConsumerProtocol.PROTOCOL_TYPE
   }
 
   def listGroupOffsets(groupId: String): Map[TopicPartition, Long] = {
@@ -200,12 +206,12 @@ class AdminClient(val time: Time,
     val response = responseBody.asInstanceOf[OffsetFetchResponse]
     if (response.hasError)
       throw response.error.exception
-    response.maybeThrowFirstPartitionError
+    response.maybeThrowFirstPartitionError()
     response.responseData.asScala.map { case (tp, partitionData) => (tp, partitionData.offset) }.toMap
   }
 
   def listAllBrokerVersionInfo(): Map[Node, Try[NodeApiVersions]] =
-    findAllBrokers.map { broker =>
+    findAllBrokers().map { broker =>
       broker -> Try[NodeApiVersions](new NodeApiVersions(getApiVersions(broker).asJava))
     }.toMap
 
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 2120657..f1c397e 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -146,6 +146,19 @@ object ConsumerGroupCommand extends Logging {
     }
   }
 
+  def convertTimestamp(timeString: String): java.lang.Long = {
+    val datetime: String = timeString match {
+      case ts if ts.split("T")(1).contains("+") || ts.split("T")(1).contains("-") || ts.split("T")(1).contains("Z") => ts.toString
+      case ts => s"${ts}Z"
+    }
+    val date = try {
+      new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").parse(datetime)
+    } catch {
+      case _: ParseException => new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX").parse(datetime)
+    }
+    date.getTime
+  }
+
   def printOffsetsToReset(groupAssignmentsToReset: Map[TopicPartition, OffsetAndMetadata]): Unit = {
     print("\n%-30s %-10s %-15s".format("TOPIC", "PARTITION", "NEW-OFFSET"))
     println()
@@ -606,8 +619,8 @@ object ConsumerGroupCommand extends Logging {
           (topicPartition, new OffsetAndMetadata(newOffset))
         }.toMap
       } else if (opts.options.has(opts.resetToDatetimeOpt)) {
+        val timestamp = convertTimestamp(opts.options.valueOf(opts.resetToDatetimeOpt))
         partitionsToReset.map { topicPartition =>
-          val timestamp = getDateTime
           val logTimestampOffset = getLogTimestampOffset(topicPartition, timestamp)
           logTimestampOffset match {
             case LogOffsetResult.LogOffset(offset) => (topicPartition, new OffsetAndMetadata(offset))
@@ -668,21 +681,6 @@ object ConsumerGroupCommand extends Logging {
       }
     }
 
-    private[admin] def getDateTime: java.lang.Long = {
-      val datetime: String = opts.options.valueOf(opts.resetToDatetimeOpt) match {
-        case ts if ts.split("T")(1).contains("+") || ts.split("T")(1).contains("-") || ts.split("T")(1).contains("Z") => ts.toString
-        case ts => s"${ts}Z"
-      }
-      val date = {
-        try {
-          new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").parse(datetime)
-        } catch {
-          case e: ParseException => new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX").parse(datetime)
-        }
-      }
-      date.getTime
-    }
-
     override def exportOffsetsToReset(assignmentsToReset: Map[TopicPartition, OffsetAndMetadata]): String = {
       val rows = assignmentsToReset.map { case (k,v) => s"${k.topic()},${k.partition()},${v.offset()}" }(collection.breakOut): List[String]
       rows.foldRight("")(_ + "\n" + _)
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 94622f6..bea4e83 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -34,7 +34,6 @@ import org.apache.kafka.common.utils.Time
 import scala.collection.{Map, Seq, immutable}
 import scala.math.max
 
-
 /**
  * GroupCoordinator handles general group membership and offset management.
  *
@@ -129,7 +128,7 @@ class GroupCoordinator(val brokerId: Int,
           if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID) {
             responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID))
           } else {
-            val group = groupManager.addGroup(new GroupMetadata(groupId))
+            val group = groupManager.addGroup(new GroupMetadata(groupId, initialState = Empty))
             doJoinGroup(group, memberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
           }
 
@@ -185,15 +184,15 @@ class GroupCoordinator(val brokerId: Int,
                 // receive the initial JoinGroup response), so just return current group information
                 // for the current generation.
                 responseCallback(JoinGroupResult(
-                  members = if (memberId == group.leaderId) {
+                  members = if (group.isLeader(memberId)) {
                     group.currentMemberMetadata
                   } else {
                     Map.empty
                   },
                   memberId = memberId,
                   generationId = group.generationId,
-                  subProtocol = group.protocol,
-                  leaderId = group.leaderId,
+                  subProtocol = group.protocolOrNull,
+                  leaderId = group.leaderOrNull,
                   error = Errors.NONE))
               } else {
                 // member has changed metadata, so force a rebalance
@@ -207,7 +206,7 @@ class GroupCoordinator(val brokerId: Int,
               addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback)
             } else {
               val member = group.get(memberId)
-              if (memberId == group.leaderId || !member.matches(protocols)) {
+              if (group.isLeader(memberId) || !member.matches(protocols)) {
                 // force a rebalance if a member has changed metadata or if the leader sends JoinGroup.
                 // The latter allows the leader to trigger rebalances for changes affecting assignment
                 // which do not affect the member metadata (such as topic metadata changes for the consumer)
@@ -219,8 +218,8 @@ class GroupCoordinator(val brokerId: Int,
                   members = Map.empty,
                   memberId = memberId,
                   generationId = group.generationId,
-                  subProtocol = group.protocol,
-                  leaderId = group.leaderId,
+                  subProtocol = group.protocolOrNull,
+                  leaderId = group.leaderOrNull,
                   error = Errors.NONE))
               }
             }
@@ -271,7 +270,7 @@ class GroupCoordinator(val brokerId: Int,
             group.get(memberId).awaitingSyncCallback = responseCallback
 
             // if this is the leader, then we can attempt to persist state and transition to stable
-            if (memberId == group.leaderId) {
+            if (group.isLeader(memberId)) {
               info(s"Assignment received from leader for group ${group.groupId} for generation ${group.generationId}")
 
               // fill any missing members with an empty assignment
@@ -408,7 +407,9 @@ class GroupCoordinator(val brokerId: Int,
     validateGroup(groupId) match {
       case Some(error) => responseCallback(offsetMetadata.mapValues(_ => error))
       case None =>
-        val group = groupManager.getGroup(groupId).getOrElse(groupManager.addGroup(new GroupMetadata(groupId)))
+        val group = groupManager.getGroup(groupId).getOrElse {
+          groupManager.addGroup(new GroupMetadata(groupId, initialState = Empty))
+        }
         doCommitOffsets(group, NoMemberId, NoGeneration, producerId, producerEpoch, offsetMetadata, responseCallback)
     }
   }
@@ -425,7 +426,7 @@ class GroupCoordinator(val brokerId: Int,
           case None =>
             if (generationId < 0) {
               // the group is not relying on Kafka for group management, so allow the commit
-              val group = groupManager.addGroup(new GroupMetadata(groupId))
+              val group = groupManager.addGroup(new GroupMetadata(groupId, initialState = Empty))
               doCommitOffsets(group, memberId, generationId, NO_PRODUCER_ID, NO_PRODUCER_EPOCH,
                 offsetMetadata, responseCallback)
             } else {
@@ -459,7 +460,7 @@ class GroupCoordinator(val brokerId: Int,
       if (group.is(Dead)) {
         responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID))
       } else if ((generationId < 0 && group.is(Empty)) || (producerId != NO_PRODUCER_ID)) {
-        // the group is only using Kafka to store offsets
+        // The group is only using Kafka to store offsets.
         // Also, for transactional offset commits we don't need to validate group membership and the generation.
         groupManager.storeOffsets(group, memberId, offsetMetadata, responseCallback, producerId, producerEpoch)
       } else if (group.is(AwaitingSync)) {
@@ -481,7 +482,7 @@ class GroupCoordinator(val brokerId: Int,
     if (!isActive.get)
       (Errors.COORDINATOR_NOT_AVAILABLE, Map())
     else if (!isCoordinatorForGroup(groupId)) {
-      debug("Could not fetch offsets for group %s (not group coordinator).".format(groupId))
+      debug(s"Could not fetch offsets for group $groupId (not group coordinator)")
       (Errors.NOT_COORDINATOR, Map())
     } else if (isCoordinatorLoadInProgress(groupId))
       (Errors.COORDINATOR_LOAD_IN_PROGRESS, Map())
@@ -753,15 +754,15 @@ class GroupCoordinator(val brokerId: Int,
           for (member <- group.allMemberMetadata) {
             assert(member.awaitingJoinCallback != null)
             val joinResult = JoinGroupResult(
-              members = if (member.memberId == group.leaderId) {
+              members = if (group.isLeader(member.memberId)) {
                 group.currentMemberMetadata
               } else {
                 Map.empty
               },
               memberId = member.memberId,
               generationId = group.generationId,
-              subProtocol = group.protocol,
-              leaderId = group.leaderId,
+              subProtocol = group.protocolOrNull,
+              leaderId = group.leaderOrNull,
               error = Errors.NONE)
 
             member.awaitingJoinCallback(joinResult)
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
index 537d944..3257f51 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
@@ -111,6 +111,22 @@ private object GroupMetadata {
       Stable -> Set(AwaitingSync),
       PreparingRebalance -> Set(Stable, AwaitingSync, Empty),
       Empty -> Set(PreparingRebalance))
+
+  def loadGroup(groupId: String,
+                initialState: GroupState,
+                generationId: Int,
+                protocolType: String,
+                protocol: String,
+                leaderId: String,
+                members: Iterable[MemberMetadata]): GroupMetadata = {
+    val group = new GroupMetadata(groupId, initialState)
+    group.generationId = generationId
+    group.protocolType = if (protocolType == null || protocolType.isEmpty) None else Some(protocolType)
+    group.protocol = Option(protocol)
+    group.leaderId = Option(leaderId)
+    members.foreach(group.add)
+    group
+  }
 }
 
 /**
@@ -151,28 +167,22 @@ case class CommitRecordMetadataAndOffset(appendedBatchOffset: Option[Long], offs
  *  3. leader id
  */
 @nonthreadsafe
-private[group] class GroupMetadata(val groupId: String, initialState: GroupState = Empty) extends Logging {
+private[group] class GroupMetadata(val groupId: String, initialState: GroupState) extends Logging {
+  private[group] val lock = new ReentrantLock
 
   private var state: GroupState = initialState
-
-  private[group] val lock = new ReentrantLock
+  var protocolType: Option[String] = None
+  var generationId = 0
+  private var leaderId: Option[String] = None
+  private var protocol: Option[String] = None
 
   private val members = new mutable.HashMap[String, MemberMetadata]
-
   private val offsets = new mutable.HashMap[TopicPartition, CommitRecordMetadataAndOffset]
-
   private val pendingOffsetCommits = new mutable.HashMap[TopicPartition, OffsetAndMetadata]
-
   private val pendingTransactionalOffsetCommits = new mutable.HashMap[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]()
-
   private var receivedTransactionalOffsetCommits = false
-
   private var receivedConsumerOffsetCommits = false
 
-  var protocolType: Option[String] = None
-  var generationId = 0
-  var leaderId: String = null
-  var protocol: String = null
   var newMemberAdded: Boolean = false
 
   def inLock[T](fun: => T): T = CoreUtils.inLock(lock)(fun)
@@ -182,6 +192,10 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
   def has(memberId: String) = members.contains(memberId)
   def get(memberId: String) = members(memberId)
 
+  def isLeader(memberId: String): Boolean = leaderId.contains(memberId)
+  def leaderOrNull: String = leaderId.orNull
+  def protocolOrNull: String = protocol.orNull
+
   def add(member: MemberMetadata) {
     if (members.isEmpty)
       this.protocolType = Some(member.protocolType)
@@ -190,18 +204,18 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
     assert(this.protocolType.orNull == member.protocolType)
     assert(supportsProtocols(member.protocols))
 
-    if (leaderId == null)
-      leaderId = member.memberId
+    if (leaderId.isEmpty)
+      leaderId = Some(member.memberId)
     members.put(member.memberId, member)
   }
 
   def remove(memberId: String) {
     members.remove(memberId)
-    if (memberId == leaderId) {
+    if (isLeader(memberId)) {
       leaderId = if (members.isEmpty) {
-        null
+        None
       } else {
-        members.keys.head
+        Some(members.keys.head)
       }
     }
   }
@@ -260,11 +274,11 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
     assert(notYetRejoinedMembers == List.empty[MemberMetadata])
     if (members.nonEmpty) {
       generationId += 1
-      protocol = selectProtocol
+      protocol = Some(selectProtocol)
       transitionTo(AwaitingSync)
     } else {
       generationId += 1
-      protocol = null
+      protocol = None
       transitionTo(Empty)
     }
     receivedConsumerOffsetCommits = false
@@ -274,16 +288,20 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
   def currentMemberMetadata: Map[String, Array[Byte]] = {
     if (is(Dead) || is(PreparingRebalance))
       throw new IllegalStateException("Cannot obtain member metadata for group in state %s".format(state))
-    members.map{ case (memberId, memberMetadata) => (memberId, memberMetadata.metadata(protocol))}.toMap
+    members.map{ case (memberId, memberMetadata) => (memberId, memberMetadata.metadata(protocol.get))}.toMap
   }
 
   def summary: GroupSummary = {
     if (is(Stable)) {
-      val members = this.members.values.map { member => member.summary(protocol) }.toList
-      GroupSummary(state.toString, protocolType.getOrElse(""), protocol, members)
+      val protocol = protocolOrNull
+      if (protocol == null)
+        throw new IllegalStateException("Invalid null group protocol for stable group")
+
+      val members = this.members.values.map { member => member.summary(protocol) }
+      GroupSummary(state.toString, protocolType.getOrElse(""), protocol, members.toList)
     } else {
-      val members = this.members.values.map{ member => member.summaryNoMetadata() }.toList
-      GroupSummary(state.toString, protocolType.getOrElse(""), GroupCoordinator.NoProtocol, members)
+      val members = this.members.values.map{ member => member.summaryNoMetadata() }
+      GroupSummary(state.toString, protocolType.getOrElse(""), GroupCoordinator.NoProtocol, members.toList)
     }
   }
 
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 0298509..dbae86f 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -26,7 +26,7 @@ import java.util.concurrent.locks.ReentrantLock
 
 import com.yammer.metrics.core.Gauge
 import kafka.api.{ApiVersion, KAFKA_0_10_1_IV0}
-import kafka.common.{MessageFormatter, _}
+import kafka.common.{KafkaException, MessageFormatter, OffsetAndMetadata}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.ReplicaManager
 import kafka.utils.CoreUtils.inLock
@@ -182,8 +182,7 @@ class GroupMetadataManager(brokerId: Int,
             throw new IllegalStateException("Append status %s should only have one partition %s"
               .format(responseStatus, groupMetadataPartition))
 
-          // construct the error status in the propagated assignment response
-          // in the cache
+          // construct the error status in the propagated assignment response in the cache
           val status = responseStatus(groupMetadataPartition)
 
           val responseError = if (status.error == Errors.NONE) {
@@ -263,7 +262,7 @@ class GroupMetadataManager(brokerId: Int,
 
     group.inLock {
       if (!group.hasReceivedConsistentOffsetCommits)
-        warn(s"group: ${group.groupId} with leader: ${group.leaderId} has received offset commits from consumers as well " +
+        warn(s"group: ${group.groupId} with leader: ${group.leaderOrNull} has received offset commits from consumers as well " +
           s"as transactional producers. Mixing both types of offset commits will generally result in surprises and " +
           s"should be avoided.")
     }
@@ -585,8 +584,8 @@ class GroupMetadataManager(brokerId: Int,
 
         // load groups which store offsets in kafka, but which have no active members and thus no group
         // metadata stored in the log
-        (emptyGroupOffsets.keySet ++ pendingEmptyGroupOffsets.keySet).foreach { case(groupId) =>
-          val group = new GroupMetadata(groupId)
+        (emptyGroupOffsets.keySet ++ pendingEmptyGroupOffsets.keySet).foreach { groupId =>
+          val group = new GroupMetadata(groupId, initialState = Empty)
           val offsets = emptyGroupOffsets.getOrElse(groupId, Map.empty[TopicPartition, CommitRecordMetadataAndOffset])
           val pendingOffsets = pendingEmptyGroupOffsets.getOrElse(groupId, Map.empty[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]])
           debug(s"Loaded group metadata $group with offsets $offsets and pending offsets $pendingOffsets")
@@ -1036,8 +1035,8 @@ object GroupMetadataManager {
 
     value.set(PROTOCOL_TYPE_KEY, groupMetadata.protocolType.getOrElse(""))
     value.set(GENERATION_KEY, groupMetadata.generationId)
-    value.set(PROTOCOL_KEY, groupMetadata.protocol)
-    value.set(LEADER_KEY, groupMetadata.leaderId)
+    value.set(PROTOCOL_KEY, groupMetadata.protocolOrNull)
+    value.set(LEADER_KEY, groupMetadata.leaderOrNull)
 
     val memberArray = groupMetadata.allMemberMetadata.map { memberMetadata =>
       val memberStruct = value.instance(MEMBERS_KEY)
@@ -1049,7 +1048,12 @@ object GroupMetadataManager {
       if (version > 0)
         memberStruct.set(REBALANCE_TIMEOUT_KEY, memberMetadata.rebalanceTimeoutMs)
 
-      val metadata = memberMetadata.metadata(groupMetadata.protocol)
+      // The group is non-empty, so the current protocol must be defined
+      val protocol = groupMetadata.protocolOrNull
+      if (protocol == null)
+        throw new IllegalStateException("Attempted to write non-empty group metadata with no defined protocol")
+
+      val metadata = memberMetadata.metadata(protocol)
       memberStruct.set(SUBSCRIPTION_KEY, ByteBuffer.wrap(metadata))
 
       val memberAssignment = assignment(memberMetadata.memberId)
@@ -1145,36 +1149,28 @@ object GroupMetadataManager {
       val value = valueSchema.read(buffer)
 
       if (version == 0 || version == 1) {
+        val generationId = value.get(GENERATION_KEY).asInstanceOf[Int]
         val protocolType = value.get(PROTOCOL_TYPE_KEY).asInstanceOf[String]
-
+        val protocol = value.get(PROTOCOL_KEY).asInstanceOf[String]
+        val leaderId = value.get(LEADER_KEY).asInstanceOf[String]
         val memberMetadataArray = value.getArray(MEMBERS_KEY)
         val initialState = if (memberMetadataArray.isEmpty) Empty else Stable
 
-        val group = new GroupMetadata(groupId, initialState)
-
-        group.generationId = value.get(GENERATION_KEY).asInstanceOf[Int]
-        group.leaderId = value.get(LEADER_KEY).asInstanceOf[String]
-        group.protocol = value.get(PROTOCOL_KEY).asInstanceOf[String]
-
-        memberMetadataArray.foreach { memberMetadataObj =>
+        val members = memberMetadataArray.map { memberMetadataObj =>
           val memberMetadata = memberMetadataObj.asInstanceOf[Struct]
           val memberId = memberMetadata.get(MEMBER_ID_KEY).asInstanceOf[String]
           val clientId = memberMetadata.get(CLIENT_ID_KEY).asInstanceOf[String]
           val clientHost = memberMetadata.get(CLIENT_HOST_KEY).asInstanceOf[String]
           val sessionTimeout = memberMetadata.get(SESSION_TIMEOUT_KEY).asInstanceOf[Int]
           val rebalanceTimeout = if (version == 0) sessionTimeout else memberMetadata.get(REBALANCE_TIMEOUT_KEY).asInstanceOf[Int]
-
           val subscription = Utils.toArray(memberMetadata.get(SUBSCRIPTION_KEY).asInstanceOf[ByteBuffer])
 
           val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
-            protocolType, List((group.protocol, subscription)))
-
+            protocolType, List((protocol, subscription)))
           member.assignment = Utils.toArray(memberMetadata.get(ASSIGNMENT_KEY).asInstanceOf[ByteBuffer])
-
-          group.add(member)
+          member
         }
-
-        group
+        GroupMetadata.loadGroup(groupId, initialState, generationId, protocolType, protocol, leaderId, members)
       } else {
         throw new IllegalStateException("Unknown group metadata message version")
       }
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index fe82dc2..855ca75 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -337,7 +337,7 @@ object DumpLogSegments {
       val keyString = Json.encode(Map("metadata" -> groupId))
       val valueString = Json.encode(Map(
           "protocolType" -> protocolType,
-          "protocol" -> group.protocol,
+          "protocol" -> group.protocolOrNull,
           "generationId" -> group.generationId,
           "assignment" -> assignment))
 
diff --git a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
index b309b80..65e72bd 100644
--- a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
@@ -151,7 +151,7 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with
       createClientCredential()
       verifyWithRetry(describeTopic())
     } finally {
-      adminClient.close
+      adminClient.close()
     }
   }
 
diff --git a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
new file mode 100644
index 0000000..b42c3e2
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
+import java.util.{Collections, Properties}
+
+import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, ConsumerGroupService, KafkaConsumerGroupService, ZkConsumerGroupService}
+import kafka.consumer.{OldConsumer, Whitelist}
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.consumer.{KafkaConsumer, RangeAssignor}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.WakeupException
+import org.apache.kafka.common.serialization.StringDeserializer
+import org.junit.{After, Before}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.JavaConverters._
+
+class ConsumerGroupCommandTest extends KafkaServerTestHarness {
+  import ConsumerGroupCommandTest._
+
+  val topic = "foo"
+  val group = "test.group"
+
+  @deprecated("This field will be removed in a future release", "0.11.0.0")
+  private val oldConsumers = new ArrayBuffer[OldConsumer]
+  private var consumerGroupService: List[ConsumerGroupService] = List()
+  private var consumerGroupExecutors: List[AbstractConsumerGroupExecutor] = List()
+
+  // configure the servers and clients
+  override def generateConfigs = {
+    TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown = false).map { props =>
+      KafkaConfig.fromProps(props)
+    }
+  }
+
+  @Before
+  override def setUp() {
+    super.setUp()
+    AdminUtils.createTopic(zkUtils, topic, 1, 1)
+  }
+
+  @After
+  override def tearDown(): Unit = {
+    consumerGroupService.foreach(_.close())
+    consumerGroupExecutors.foreach(_.shutdown())
+    oldConsumers.foreach(_.stop())
+    super.tearDown()
+  }
+
+  @deprecated("This test has been deprecated and will be removed in a future release.", "0.11.1.0")
+  def createOldConsumer(): Unit = {
+    val consumerProps = new Properties
+    consumerProps.setProperty("group.id", group)
+    consumerProps.setProperty("zookeeper.connect", zkConnect)
+    oldConsumers += new OldConsumer(Whitelist(topic), consumerProps)
+  }
+
+  def stopRandomOldConsumer(): Unit = {
+    oldConsumers.head.stop()
+  }
+
+  def getConsumerGroupService(args: Array[String]): ConsumerGroupService = {
+    val opts = new ConsumerGroupCommandOptions(args)
+    val service = if (opts.useOldConsumer) new ZkConsumerGroupService(opts) else new KafkaConsumerGroupService(opts)
+    consumerGroupService = service :: consumerGroupService
+    service
+  }
+
+  def addConsumerGroupExecutor(numConsumers: Int,
+                               topic: String = topic,
+                               group: String = group,
+                               strategy: String = classOf[RangeAssignor].getName): ConsumerGroupExecutor = {
+    val executor = new ConsumerGroupExecutor(brokerList, numConsumers, group, topic, strategy)
+    addExecutor(executor)
+    executor
+  }
+
+  def addSimpleGroupExecutor(partitions: Iterable[TopicPartition] = Seq(new TopicPartition(topic, 0)),
+                             group: String = group): SimpleConsumerGroupExecutor = {
+    val executor = new SimpleConsumerGroupExecutor(brokerList, group, partitions)
+    addExecutor(executor)
+    executor
+  }
+
+  private def addExecutor(executor: AbstractConsumerGroupExecutor): AbstractConsumerGroupExecutor = {
+    consumerGroupExecutors = executor :: consumerGroupExecutors
+    executor
+  }
+
+}
+
+object ConsumerGroupCommandTest {
+
+  abstract class AbstractConsumerRunnable(broker: String, groupId: String) extends Runnable {
+    val props = new Properties
+    configure(props)
+    val consumer = new KafkaConsumer(props)
+
+    def configure(props: Properties): Unit = {
+      props.put("bootstrap.servers", broker)
+      props.put("group.id", groupId)
+      props.put("key.deserializer", classOf[StringDeserializer].getName)
+      props.put("value.deserializer", classOf[StringDeserializer].getName)
+    }
+
+    def subscribe(): Unit
+
+    def run() {
+      try {
+        subscribe()
+        while (true)
+          consumer.poll(Long.MaxValue)
+      } catch {
+        case _: WakeupException => // OK
+      } finally {
+        consumer.close()
+      }
+    }
+
+    def shutdown() {
+      consumer.wakeup()
+    }
+  }
+
+  class ConsumerRunnable(broker: String, groupId: String, topic: String, strategy: String)
+    extends AbstractConsumerRunnable(broker, groupId) {
+
+    override def configure(props: Properties): Unit = {
+      super.configure(props)
+      props.put("partition.assignment.strategy", strategy)
+    }
+
+    override def subscribe(): Unit = {
+      consumer.subscribe(Collections.singleton(topic))
+    }
+  }
+
+  class SimpleConsumerRunnable(broker: String, groupId: String, partitions: Iterable[TopicPartition])
+    extends AbstractConsumerRunnable(broker, groupId) {
+
+    override def subscribe(): Unit = {
+      consumer.assign(partitions.toList.asJava)
+    }
+  }
+
+  class AbstractConsumerGroupExecutor(numThreads: Int) {
+    private val executor: ExecutorService = Executors.newFixedThreadPool(numThreads)
+    private val consumers = new ArrayBuffer[AbstractConsumerRunnable]()
+
+    def submit(consumerThread: AbstractConsumerRunnable) {
+      consumers += consumerThread
+      executor.submit(consumerThread)
+    }
+
+    def shutdown() {
+      consumers.foreach(_.shutdown())
+      executor.shutdown()
+      executor.awaitTermination(5000, TimeUnit.MILLISECONDS)
+    }
+  }
+
+  class ConsumerGroupExecutor(broker: String, numConsumers: Int, groupId: String, topic: String, strategy: String)
+    extends AbstractConsumerGroupExecutor(numConsumers) {
+
+    for (_ <- 1 to numConsumers) {
+      submit(new ConsumerRunnable(broker, groupId, topic, strategy))
+    }
+
+  }
+
+  class SimpleConsumerGroupExecutor(broker: String, groupId: String, partitions: Iterable[TopicPartition])
+    extends AbstractConsumerGroupExecutor(1) {
+
+    submit(new SimpleConsumerRunnable(broker, groupId, partitions))
+  }
+
+}
+
diff --git a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
index 7000308..c782c3f 100644
--- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
@@ -16,76 +16,49 @@
  */
 package kafka.admin
 
-import java.util.concurrent.ExecutorService
-import java.util.concurrent.Executors
-import java.util.concurrent.TimeUnit
-import java.util.Collections
-import java.util.Properties
-
-import org.junit.Assert._
-import org.junit.{After, Before, Test}
-import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, ConsumerGroupService, KafkaConsumerGroupService, ZkConsumerGroupService}
-import kafka.consumer.OldConsumer
-import kafka.consumer.Whitelist
-import kafka.integration.KafkaServerTestHarness
-import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
-import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.TimeoutException
-import org.apache.kafka.common.errors.WakeupException
-import org.apache.kafka.common.serialization.StringDeserializer
-
-import scala.collection.mutable.ArrayBuffer
-
-class DescribeConsumerGroupTest extends KafkaServerTestHarness {
-  private val topic = "foo"
-  private val group = "test.group"
-
-  @deprecated("This field will be removed in a future release", "0.11.0.0")
-  private val oldConsumers = new ArrayBuffer[OldConsumer]
-  private var consumerGroupService: ConsumerGroupService = _
-  private var consumerGroupExecutor: ConsumerGroupExecutor = _
-
-  // configure the servers and clients
-  override def generateConfigs = {
-    TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown = false).map { props =>
-      KafkaConfig.fromProps(props)
-    }
-  }
-
-  @Before
-  override def setUp() {
-    super.setUp()
-    AdminUtils.createTopic(zkUtils, topic, 1, 1)
-  }
+import org.junit.Assert._
+import org.junit.Test
 
-  @After
-  override def tearDown(): Unit = {
-    if (consumerGroupService != null)
-      consumerGroupService.close()
-    if (consumerGroupExecutor != null)
-      consumerGroupExecutor.shutdown()
-    oldConsumers.foreach(_.stop())
-    super.tearDown()
-  }
+class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
 
   @Test
   @deprecated("This test has been deprecated and will be removed in a future release.", "0.11.0.0")
   def testDescribeNonExistingGroup() {
     TestUtils.createOffsetsTopic(zkUtils, servers)
     createOldConsumer()
-    val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect, "--describe", "--group", "missing.group"))
-    consumerGroupService = new ZkConsumerGroupService(opts)
+    val cgcArgs = Array("--zookeeper", zkConnect, "--describe", "--group", "missing.group")
+    val consumerGroupService = getConsumerGroupService(cgcArgs)
     TestUtils.waitUntilTrue(() => consumerGroupService.describeGroup()._2.isEmpty, "Expected no rows in describe group results.")
   }
 
   @Test
+  def testDescribeSimpleConsumerGroup() {
+    // Ensure that the offsets of consumers which don't use group management are still displayed
+
+    TestUtils.createOffsetsTopic(zkUtils, servers)
+    val topic2 = "foo2"
+    AdminUtils.createTopic(zkUtils, topic2, 2, 1)
+    addSimpleGroupExecutor(Seq(new TopicPartition(topic2, 0), new TopicPartition(topic2, 1)))
+
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
+    val service = getConsumerGroupService(cgcArgs)
+
+    TestUtils.waitUntilTrue(() => {
+      val (state, assignments) = service.describeGroup()
+      println(assignments.get.map(x => (x.topic, x.partition)))
+      state.contains("Empty") && assignments.isDefined && assignments.get.count(_.group == group) == 2
+    }, "Expected two partition assignment results in describe group state result.")
+  }
+
+  @Test
   @deprecated("This test has been deprecated and will be removed in a future release.", "0.11.0.0")
   def testDescribeExistingGroup() {
     TestUtils.createOffsetsTopic(zkUtils, servers)
     createOldConsumer()
-    val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect, "--describe", "--group", group))
-    consumerGroupService = new ZkConsumerGroupService(opts)
+    val consumerGroupService = getConsumerGroupService(Array("--zookeeper", zkConnect, "--describe", "--group", group))
     TestUtils.waitUntilTrue(() => {
       val (_, assignments) = consumerGroupService.describeGroup()
       assignments.isDefined &&
@@ -99,8 +72,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
   def testDescribeExistingGroupWithNoMembers() {
     TestUtils.createOffsetsTopic(zkUtils, servers)
     createOldConsumer()
-    val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect, "--describe", "--group", group))
-    consumerGroupService = new ZkConsumerGroupService(opts)
+    val consumerGroupService = getConsumerGroupService(Array("--zookeeper", zkConnect, "--describe", "--group", group))
 
     TestUtils.waitUntilTrue(() => {
       val (_, assignments) = consumerGroupService.describeGroup()
@@ -108,7 +80,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
       assignments.get.count(_.group == group) == 1 &&
       assignments.get.filter(_.group == group).head.consumerId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE)
     }, "Expected rows and a consumer id column in describe group results.")
-    oldConsumers.head.stop()
+    stopRandomOldConsumer()
 
     TestUtils.waitUntilTrue(() => {
       val (_, assignments) = consumerGroupService.describeGroup()
@@ -124,8 +96,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
     TestUtils.createOffsetsTopic(zkUtils, servers)
     createOldConsumer()
     createOldConsumer()
-    val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect, "--describe", "--group", group))
-    consumerGroupService = new ZkConsumerGroupService(opts)
+    val consumerGroupService = getConsumerGroupService(Array("--zookeeper", zkConnect, "--describe", "--group", group))
     TestUtils.waitUntilTrue(() => {
       val (_, assignments) = consumerGroupService.describeGroup()
       assignments.isDefined &&
@@ -139,12 +110,11 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
   def testDescribeNonExistingGroupWithNewConsumer() {
     TestUtils.createOffsetsTopic(zkUtils, servers)
     // run one consumer in the group consuming from a single-partition topic
-    consumerGroupExecutor = new ConsumerGroupExecutor(brokerList, 1, group, topic)
+    addConsumerGroupExecutor(numConsumers = 1)
 
     // note the group to be queried is a different (non-existing) group
     val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", "missing.group")
-    val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    consumerGroupService = new KafkaConsumerGroupService(opts)
+    val consumerGroupService = getConsumerGroupService(cgcArgs)
 
     val (state, assignments) = consumerGroupService.describeGroup()
     assertTrue("Expected the state to be 'Dead' with no members in the group.", state == Some("Dead") && assignments == Some(List()))
@@ -154,11 +124,10 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
   def testDescribeExistingGroupWithNewConsumer() {
     TestUtils.createOffsetsTopic(zkUtils, servers)
     // run one consumer in the group consuming from a single-partition topic
-    consumerGroupExecutor = new ConsumerGroupExecutor(brokerList, 1, group, topic)
+    addConsumerGroupExecutor(numConsumers = 1)
 
     val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
-    val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    consumerGroupService = new KafkaConsumerGroupService(opts)
+    val consumerGroupService = getConsumerGroupService(cgcArgs)
 
     TestUtils.waitUntilTrue(() => {
         val (state, assignments) = consumerGroupService.describeGroup()
@@ -175,11 +144,9 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
   def testDescribeExistingGroupWithNoMembersWithNewConsumer() {
     TestUtils.createOffsetsTopic(zkUtils, servers)
     // run one consumer in the group consuming from a single-partition topic
-    consumerGroupExecutor = new ConsumerGroupExecutor(brokerList, 1, group, topic)
-
+    val consumerGroupExecutor = addConsumerGroupExecutor(numConsumers = 1)
     val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
-    val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    consumerGroupService = new KafkaConsumerGroupService(opts)
+    val consumerGroupService = getConsumerGroupService(cgcArgs)
 
     TestUtils.waitUntilTrue(() => {
       val (state, _) = consumerGroupService.describeGroup()
@@ -214,11 +181,10 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
   def testDescribeConsumersWithNoAssignedPartitionsWithNewConsumer() {
     TestUtils.createOffsetsTopic(zkUtils, servers)
     // run two consumers in the group consuming from a single-partition topic
-    consumerGroupExecutor = new ConsumerGroupExecutor(brokerList, 2, group, topic)
+    addConsumerGroupExecutor(numConsumers = 2)
 
     val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
-    val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    consumerGroupService = new KafkaConsumerGroupService(opts)
+    val consumerGroupService = getConsumerGroupService(cgcArgs)
 
     TestUtils.waitUntilTrue(() => {
       val (state, assignments) = consumerGroupService.describeGroup()
@@ -237,11 +203,10 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
     AdminUtils.createTopic(zkUtils, topic2, 2, 1)
 
     // run two consumers in the group consuming from a two-partition topic
-    consumerGroupExecutor = new ConsumerGroupExecutor(brokerList, 2, group, topic2)
+    addConsumerGroupExecutor(numConsumers = 2, topic = topic2)
 
     val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
-    val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    consumerGroupService = new KafkaConsumerGroupService(opts)
+    val consumerGroupService = getConsumerGroupService(cgcArgs)
 
     TestUtils.waitUntilTrue(() => {
       val (state, assignments) = consumerGroupService.describeGroup()
@@ -259,12 +224,11 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
     // complete before the timeout expires
 
     // run one consumer in the group consuming from a single-partition topic
-    consumerGroupExecutor = new ConsumerGroupExecutor(brokerList, 1, group, topic)
+    addConsumerGroupExecutor(numConsumers = 1)
 
     // set the group initialization timeout too low for the group to stabilize
     val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", "group", "--timeout", "1")
-    val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    consumerGroupService = new KafkaConsumerGroupService(opts)
+    val consumerGroupService = getConsumerGroupService(cgcArgs)
 
     try {
       consumerGroupService.describeGroup()
@@ -274,59 +238,4 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
     }
   }
 
-  @deprecated("This test has been deprecated and will be removed in a future release.", "0.11.1.0")
-  private def createOldConsumer(): Unit = {
-    val consumerProps = new Properties
-    consumerProps.setProperty("group.id", group)
-    consumerProps.setProperty("zookeeper.connect", zkConnect)
-    oldConsumers += new OldConsumer(Whitelist(topic), consumerProps)
-  }
-}
-
-
-class ConsumerThread(broker: String, id: Int, groupId: String, topic: String) extends Runnable {
-  val props = new Properties
-  props.put("bootstrap.servers", broker)
-  props.put("group.id", groupId)
-  props.put("key.deserializer", classOf[StringDeserializer].getName)
-  props.put("value.deserializer", classOf[StringDeserializer].getName)
-  val consumer = new KafkaConsumer(props)
-
-  def run() {
-    try {
-      consumer.subscribe(Collections.singleton(topic))
-      while (true)
-        consumer.poll(Long.MaxValue)
-    } catch {
-      case _: WakeupException => // OK
-    } finally {
-      consumer.close()
-    }
-  }
-
-  def shutdown() {
-    consumer.wakeup()
-  }
-}
-
-
-class ConsumerGroupExecutor(broker: String, numConsumers: Int, groupId: String, topic: String) {
-  val executor: ExecutorService = Executors.newFixedThreadPool(numConsumers)
-  private val consumers = new ArrayBuffer[ConsumerThread]()
-  for (i <- 1 to numConsumers) {
-    val consumer = new ConsumerThread(broker, i, groupId, topic)
-    consumers += consumer
-    executor.submit(consumer)
-  }
-
-  def shutdown() {
-    consumers.foreach(_.shutdown)
-    executor.shutdown()
-    try {
-      executor.awaitTermination(5000, TimeUnit.MILLISECONDS)
-    } catch {
-      case e: InterruptedException =>
-        e.printStackTrace()
-    }
-  }
 }
diff --git a/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
index 6727fad..13dccbe 100644
--- a/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
@@ -18,53 +18,21 @@ package kafka.admin
 
 import java.util.Properties
 
-import org.easymock.EasyMock
-import org.junit.Before
 import org.junit.Test
-
 import kafka.admin.ConsumerGroupCommand.ConsumerGroupCommandOptions
 import kafka.admin.ConsumerGroupCommand.ZkConsumerGroupService
-import kafka.consumer.OldConsumer
-import kafka.consumer.Whitelist
-import kafka.integration.KafkaServerTestHarness
-import kafka.server.KafkaConfig
+import kafka.consumer.{OldConsumer, Whitelist}
 import kafka.utils.TestUtils
+import org.easymock.EasyMock
 
+class ListConsumerGroupTest extends ConsumerGroupCommandTest {
 
-class ListConsumerGroupTest extends KafkaServerTestHarness {
-
-  val overridingProps = new Properties()
-  val topic = "foo"
-  val topicFilter = Whitelist(topic)
-  val group = "test.group"
-  val props = new Properties
-
-  // configure the servers and clients
-  override def generateConfigs =
-    TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps))
-
-  @Before
-  override def setUp() {
-    super.setUp()
-
-    AdminUtils.createTopic(zkUtils, topic, 1, 1)
+  @Test
+  def testListOldConsumerGroups() {
+    val topicFilter = Whitelist(topic)
+    val props = new Properties
     props.setProperty("group.id", group)
     props.setProperty("zookeeper.connect", zkConnect)
-  }
-
-  @Test
-  def testListGroupWithNoExistingGroup() {
-    val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect))
-    val consumerGroupCommand = new ZkConsumerGroupService(opts)
-    try {
-      assert(consumerGroupCommand.listGroups().isEmpty)
-    } finally {
-      consumerGroupCommand.close()
-    }
-  }
-
-  @Test
-  def testListGroupWithSomeGroups() {
     // mocks
     val consumer1Mock = EasyMock.createMockBuilder(classOf[OldConsumer]).withConstructor(topicFilter, props).createMock()
     props.setProperty("group.id", "some.other.group")
@@ -80,13 +48,42 @@ class ListConsumerGroupTest extends KafkaServerTestHarness {
 
     // action/test
     TestUtils.waitUntilTrue(() => {
-        val groups = consumerGroupCommand.listGroups()
-        groups.size == 2 && groups.contains(group)
-      }, "Expected a different list group results.")
+      val groups = consumerGroupCommand.listGroups()
+      groups.size == 2 && groups.contains(group)
+    }, "Expected a different list group results.")
 
     // cleanup
     consumerGroupCommand.close()
     consumer1Mock.stop()
     consumer2Mock.stop()
   }
+
+  @Test
+  def testListGroupWithNoExistingGroup() {
+    val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect))
+    val consumerGroupCommand = new ZkConsumerGroupService(opts)
+    try {
+      assert(consumerGroupCommand.listGroups().isEmpty)
+    } finally {
+      consumerGroupCommand.close()
+    }
+  }
+
+  @Test
+  def testListConsumerGroups() {
+    val simpleGroup = "simple-group"
+    addSimpleGroupExecutor(group = simpleGroup)
+    addConsumerGroupExecutor(numConsumers = 1)
+
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--list")
+    val service = getConsumerGroupService(cgcArgs)
+
+    val expectedGroups = Set(group, simpleGroup)
+    var foundGroups = Set.empty[String]
+    TestUtils.waitUntilTrue(() => {
+      foundGroups = service.listGroups().toSet
+      expectedGroups == foundGroups
+    }, s"Expected --list to show groups $expectedGroups, but found $foundGroups.")
+  }
+
 }
diff --git a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
index 6853b16..50356ff 100644
--- a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
@@ -16,13 +16,46 @@ import java.io.{BufferedWriter, File, FileWriter}
 import java.text.{ParseException, SimpleDateFormat}
 import java.util.{Calendar, Date, Properties}
 
-import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, KafkaConsumerGroupService}
-import kafka.integration.KafkaServerTestHarness
+import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, ConsumerGroupService, KafkaConsumerGroupService}
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
-import org.junit.{After, Before, Test}
+import org.junit.Assert._
+import org.junit.Test
 
-import scala.collection.mutable.ArrayBuffer
+class TimeConversionTests {
+
+  @Test
+  def testDateTimeFormats() {
+    //check valid formats
+    invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"))
+    invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ"))
+    invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX"))
+    invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXX"))
+    invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX"))
+
+    //check some invalid formats
+    try {
+      invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss"))
+      fail("Call to getDateTime should fail")
+    } catch {
+      case _: ParseException =>
+    }
+
+    try {
+      invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.X"))
+      fail("Call to getDateTime should fail")
+    } catch {
+      case _: ParseException =>
+    }
+  }
+
+  private def invokeGetDateTimeMethod(format: SimpleDateFormat) {
+    val checkpoint = new Date()
+    val timestampString = format.format(checkpoint)
+    ConsumerGroupCommand.convertTimestamp(timestampString)
+  }
+
+}
 
 /**
   * Test cases by:
@@ -34,46 +67,27 @@ import scala.collection.mutable.ArrayBuffer
   * - scope=topics+partitions, scenario=to-earliest
   * - export/import
   */
-class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
-
+class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
+  
   val overridingProps = new Properties()
   val topic1 = "foo1"
   val topic2 = "foo2"
-  val group = "test.group"
-  val props = new Properties
-  val consumerGroupServices = new ArrayBuffer[KafkaConsumerGroupService]
-  val executors = new ArrayBuffer[ConsumerGroupExecutor]
 
   /**
     * Implementations must override this method to return a set of KafkaConfigs. This method will be invoked for every
     * test and should not reuse previous configurations unless they select their ports randomly when servers are started.
     */
-  override def generateConfigs: Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps))
-
-  @Before
-  override def setUp() {
-    super.setUp()
-
-    props.setProperty("group.id", group)
-  }
-
-  @After
-  override def tearDown() {
-    try {
-      executors.foreach(_.shutdown())
-      consumerGroupServices.foreach(_.close())
-    } finally {
-      super.tearDown()
-    }
-  }
+  override def generateConfigs: Seq[KafkaConfig] = {
+    TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown = false)
+      .map(KafkaConfig.fromProps(_, overridingProps))  
+  } 
 
   @Test
   def testResetOffsetsNotExistingGroup() {
-    createConsumerGroupExecutor(brokerList, 1, group, topic1)
+    addConsumerGroupExecutor(numConsumers = 1, topic1)
 
     val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", "missing.group", "--all-topics", "--to-current")
-    val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    val consumerGroupCommand = createConsumerGroupService(opts)
+    val consumerGroupCommand = getConsumerGroupService(cgcArgs)
 
     TestUtils.waitUntilTrue(() => {
       val assignmentsToReset = consumerGroupCommand.resetOffsets()
@@ -113,10 +127,8 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     TestUtils.produceMessages(servers, topic1, 100, acks = 1, 100 * 1000)
 
     val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics")
-    val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    val consumerGroupCommand = createConsumerGroupService(opts)
-
-    val executor = createConsumerGroupExecutor(brokerList, 1, group, topic1)
+    val consumerGroupCommand = getConsumerGroupService(cgcArgs)
+    val executor = addConsumerGroupExecutor(numConsumers = 1, topic1)
 
     TestUtils.waitUntilTrue(() => {
       val (_, assignmentsOption) = consumerGroupCommand.describeGroup()
@@ -134,8 +146,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     executor.shutdown()
 
     val cgcArgs1 = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-datetime", format.format(calendar.getTime), "--execute")
-    val opts1 = new ConsumerGroupCommandOptions(cgcArgs1)
-    val consumerGroupCommand1 = createConsumerGroupService(opts1)
+    val consumerGroupCommand1 = getConsumerGroupService(cgcArgs1)
 
     TestUtils.waitUntilTrue(() => {
       val assignmentsToReset = consumerGroupCommand1.resetOffsets()
@@ -158,10 +169,8 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     TestUtils.produceMessages(servers, topic1, 50, acks = 1, 100 * 1000)
 
     val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics")
-    val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    val consumerGroupCommand = createConsumerGroupService(opts)
-
-    val executor = createConsumerGroupExecutor(brokerList, 1, group, topic1)
+    val consumerGroupCommand = getConsumerGroupService(cgcArgs)
+    val executor = addConsumerGroupExecutor(numConsumers = 1, topic1)
 
     TestUtils.waitUntilTrue(() => {
       val (_, assignmentsOption) = consumerGroupCommand.describeGroup()
@@ -178,8 +187,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     executor.shutdown()
 
     val cgcArgs1 = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-datetime", format.format(checkpoint), "--execute")
-    val opts1 = new ConsumerGroupCommandOptions(cgcArgs1)
-    val consumerGroupCommand1 = createConsumerGroupService(opts1)
+    val consumerGroupCommand1 = getConsumerGroupService(cgcArgs1)
 
     TestUtils.waitUntilTrue(() => {
       val assignmentsToReset = consumerGroupCommand1.resetOffsets()
@@ -192,43 +200,9 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
   }
 
   @Test
-  def testDateTimeFormats() {
-    //check valid formats
-    invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"))
-    invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ"))
-    invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX"))
-    invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXX"))
-    invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX"))
-
-    //check some invalid formats
-    try {
-      invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss"))
-      fail("Call to getDateTime should fail")
-    } catch {
-      case _: ParseException =>
-    }
-
-    try {
-      invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.X"))
-      fail("Call to getDateTime should fail")
-    } catch {
-      case _: ParseException =>
-    }
-  }
-
-  private def invokeGetDateTimeMethod(format: SimpleDateFormat) {
-    val checkpoint = new Date()
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-datetime", format.format(checkpoint), "--execute")
-    val opts  = new ConsumerGroupCommandOptions(cgcArgs)
-    val consumerGroupCommand = createConsumerGroupService(opts)
-    consumerGroupCommand.getDateTime
-  }
-
-  @Test
   def testResetOffsetsByDuration() {
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--by-duration", "PT1M", "--execute")
-    val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    val consumerGroupCommand = createConsumerGroupService(opts)
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--by-duration", "PT1M")
+    val consumerGroupCommand = getConsumerGroupService(cgcArgs)
 
     AdminUtils.createTopic(zkUtils, topic1, 1, 1)
 
@@ -247,8 +221,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
   @Test
   def testResetOffsetsByDurationToEarliest() {
     val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--by-duration", "PT0.1S", "--execute")
-    val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    val consumerGroupCommand = createConsumerGroupService(opts)
+    val consumerGroupCommand = getConsumerGroupService(cgcArgs)
 
     AdminUtils.createTopic(zkUtils, topic1, 1, 1)
 
@@ -266,8 +239,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
   @Test
   def testResetOffsetsToEarliest() {
     val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-earliest", "--execute")
-    val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    val consumerGroupCommand = createConsumerGroupService(opts)
+    val consumerGroupCommand = getConsumerGroupService(cgcArgs)
 
     AdminUtils.createTopic(zkUtils, topic1, 1, 1)
 
@@ -285,8 +257,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
   @Test
   def testResetOffsetsToLatest() {
     val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-latest", "--execute")
-    val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    val consumerGroupCommand = createConsumerGroupService(opts)
+    val consumerGroupCommand = getConsumerGroupService(cgcArgs)
 
     AdminUtils.createTopic(zkUtils, topic1, 1, 1)
 
@@ -307,8 +278,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
   @Test
   def testResetOffsetsToCurrentOffset() {
     val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-current", "--execute")
-    val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    val consumerGroupCommand = createConsumerGroupService(opts)
+    val consumerGroupCommand = getConsumerGroupService(cgcArgs)
 
     AdminUtils.createTopic(zkUtils, topic1, 1, 1)
 
@@ -325,9 +295,9 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     AdminUtils.deleteTopic(zkUtils, topic1)
   }
 
-  private def produceConsumeAndShutdown(consumerGroupCommand: KafkaConsumerGroupService, numConsumers: Int, topic: String, totalMessages: Int) {
+  private def produceConsumeAndShutdown(consumerGroupCommand: ConsumerGroupService, numConsumers: Int, topic: String, totalMessages: Int) {
     TestUtils.produceMessages(servers, topic, totalMessages, acks = 1, 100 * 1000)
-    val executor = createConsumerGroupExecutor(brokerList, numConsumers, group, topic)
+    val executor =  addConsumerGroupExecutor(numConsumers, topic)
 
     TestUtils.waitUntilTrue(() => {
       val (_, assignmentsOption) = consumerGroupCommand.describeGroup()
@@ -348,8 +318,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
   @Test
   def testResetOffsetsToSpecificOffset() {
     val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-offset", "1", "--execute")
-    val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    val consumerGroupCommand = createConsumerGroupService(opts)
+    val consumerGroupCommand = getConsumerGroupService(cgcArgs)
 
     AdminUtils.createTopic(zkUtils, topic1, 1, 1)
 
@@ -368,8 +337,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
   @Test
   def testResetOffsetsShiftPlus() {
     val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--shift-by", "50", "--execute")
-    val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    val consumerGroupCommand = createConsumerGroupService(opts)
+    val consumerGroupCommand = getConsumerGroupService(cgcArgs)
 
     AdminUtils.createTopic(zkUtils, topic1, 1, 1)
 
@@ -389,8 +357,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
   @Test
   def testResetOffsetsShiftMinus() {
     val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--shift-by", "-50", "--execute")
-    val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    val consumerGroupCommand = createConsumerGroupService(opts)
+    val consumerGroupCommand = getConsumerGroupService(cgcArgs)
 
     AdminUtils.createTopic(zkUtils, topic1, 1, 1)
 
@@ -411,8 +378,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
   @Test
   def testResetOffsetsShiftByLowerThanEarliest() {
     val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--shift-by", "-150", "--execute")
-    val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    val consumerGroupCommand = createConsumerGroupService(opts)
+    val consumerGroupCommand = getConsumerGroupService(cgcArgs)
 
     AdminUtils.createTopic(zkUtils, topic1, 1, 1)
 
@@ -432,8 +398,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
   @Test
   def testResetOffsetsShiftByHigherThanLatest() {
     val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--shift-by", "150", "--execute")
-    val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    val consumerGroupCommand = createConsumerGroupService(opts)
+    val consumerGroupCommand = getConsumerGroupService(cgcArgs)
 
     AdminUtils.createTopic(zkUtils, topic1, 1, 1)
 
@@ -453,8 +418,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
   @Test
   def testResetOffsetsToEarliestOnOneTopic() {
     val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--topic", topic1, "--to-earliest", "--execute")
-    val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    val consumerGroupCommand = createConsumerGroupService(opts)
+    val consumerGroupCommand = getConsumerGroupService(cgcArgs)
 
     AdminUtils.createTopic(zkUtils, topic1, 1, 1)
 
@@ -472,8 +436,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
   @Test
   def testResetOffsetsToEarliestOnOneTopicAndPartition() {
     val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--topic", String.format("%s:1", topic1), "--to-earliest", "--execute")
-    val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    val consumerGroupCommand = createConsumerGroupService(opts)
+    val consumerGroupCommand = getConsumerGroupService(cgcArgs)
 
     AdminUtils.createTopic(zkUtils, topic1, 2, 1)
 
@@ -495,8 +458,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
       "--topic", topic1,
       "--topic", topic2,
       "--to-earliest", "--execute")
-    val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    val consumerGroupCommand = createConsumerGroupService(opts)
+    val consumerGroupCommand = getConsumerGroupService(cgcArgs)
 
     AdminUtils.createTopic(zkUtils, topic1, 1, 1)
     AdminUtils.createTopic(zkUtils, topic2, 1, 1)
@@ -522,8 +484,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
       "--topic", String.format("%s:1", topic1),
       "--topic", String.format("%s:1", topic2),
       "--to-earliest", "--execute")
-    val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    val consumerGroupCommand = createConsumerGroupService(opts)
+    val consumerGroupCommand = getConsumerGroupService(cgcArgs)
 
     AdminUtils.createTopic(zkUtils, topic1, 2, 1)
     AdminUtils.createTopic(zkUtils, topic2, 2, 1)
@@ -545,8 +506,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
   @Test
   def testResetOffsetsExportImportPlan() {
     val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-offset","2", "--export")
-    val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    val consumerGroupCommand = createConsumerGroupService(opts)
+    val consumerGroupCommand = getConsumerGroupService(cgcArgs)
 
     AdminUtils.createTopic(zkUtils, topic1, 2, 1)
 
@@ -562,10 +522,8 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
       assignmentsToReset.exists { assignment => assignment._2.offset() == 2 } && file.exists()
     }, "Expected the consume all messages and save reset offsets plan to file")
 
-
     val cgcArgsExec = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--from-file", file.getCanonicalPath)
-    val optsExec = new ConsumerGroupCommandOptions(cgcArgsExec)
-    val consumerGroupCommandExec = createConsumerGroupService(optsExec)
+    val consumerGroupCommandExec = getConsumerGroupService(cgcArgsExec)
 
     TestUtils.waitUntilTrue(() => {
         val assignmentsToReset = consumerGroupCommandExec.resetOffsets()
@@ -588,15 +546,4 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     ConsumerGroupCommand.main(cgcArgs)
   }
 
-  private def createConsumerGroupExecutor(brokerList: String, numConsumers: Int, groupId: String, topic: String): ConsumerGroupExecutor = {
-    val executor = new ConsumerGroupExecutor(brokerList, numConsumers, groupId, topic)
-    executors += executor
-    executor
-  }
-
-  private def createConsumerGroupService(opts: ConsumerGroupCommandOptions): KafkaConsumerGroupService = {
-    val service = new KafkaConsumerGroupService(opts)
-    consumerGroupServices += service
-    service
-  }
 }
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 7075c63..aa44d14 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -777,6 +777,36 @@ class GroupCoordinatorTest extends JUnitSuite {
   }
 
   @Test
+  def testCommitOffsetsAfterGroupIsEmpty(): Unit = {
+    // Tests the scenario where the reset offset tool modifies the offsets
+    // of a group after it becomes empty
+
+    // A group member joins
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+    val assignedMemberId = joinGroupResult.memberId
+    val joinGroupError = joinGroupResult.error
+    assertEquals(Errors.NONE, joinGroupError)
+
+    // and leaves.
+    EasyMock.reset(replicaManager)
+    val leaveGroupResult = leaveGroup(groupId, assignedMemberId)
+    assertEquals(Errors.NONE, leaveGroupResult)
+
+    // The simple offset commit should now fail
+    EasyMock.reset(replicaManager)
+    val tp = new TopicPartition("topic", 0)
+    val offset = OffsetAndMetadata(0)
+    val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID,
+      OffsetCommitRequest.DEFAULT_GENERATION_ID, immutable.Map(tp -> offset))
+    assertEquals(Errors.NONE, commitOffsetResult(tp))
+
+    val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
+    assertEquals(Errors.NONE, error)
+    assertEquals(Some(0), partitionData.get(tp).map(_.offset))
+  }
+
+  @Test
   def testFetchOffsets() {
     val tp = new TopicPartition("topic", 0)
     val offset = OffsetAndMetadata(0)
@@ -866,7 +896,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     groupCoordinator.handleTxnCompletion(producerId, List(offsetsTopic), TransactionResult.COMMIT)
 
     val (thirdReqError, thirdReqPartitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
-    assertEquals(Errors.NONE, secondReqError)
+    assertEquals(Errors.NONE, thirdReqError)
     assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), thirdReqPartitionData.get(tp).map(_.offset))
   }
 
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 78a5eaa..3bdffbd 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -30,7 +30,7 @@ import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.{IsolationLevel, OffsetFetchResponse}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.easymock.{Capture, EasyMock, IAnswer}
-import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
+import org.junit.Assert.{assertEquals, assertFalse, assertTrue, assertNull}
 import org.junit.{Before, Test}
 import java.nio.ByteBuffer
 
@@ -111,6 +111,41 @@ class GroupMetadataManagerTest {
   }
 
   @Test
+  def testLoadEmptyGroupWithOffsets() {
+    val groupMetadataTopicPartition = groupTopicPartition
+    val generation = 15
+    val protocolType = "consumer"
+    val startOffset = 15L
+    val committedOffsets = Map(
+      new TopicPartition("foo", 0) -> 23L,
+      new TopicPartition("foo", 1) -> 455L,
+      new TopicPartition("bar", 0) -> 8992L
+    )
+
+    val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
+    val groupMetadataRecord = buildEmptyGroupRecord(generation, protocolType)
+    val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
+      offsetCommitRecords ++ Seq(groupMetadataRecord): _*)
+
+    expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
+
+    EasyMock.replay(replicaManager)
+
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ())
+
+    val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded into the cache"))
+    assertEquals(groupId, group.groupId)
+    assertEquals(Empty, group.currentState)
+    assertEquals(generation, group.generationId)
+    assertEquals(Some(protocolType), group.protocolType)
+    assertNull(group.leaderOrNull)
+    assertNull(group.protocolOrNull)
+    committedOffsets.foreach { case (topicPartition, offset) =>
+      assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
+    }
+  }
+
+  @Test
   def testLoadTransactionalOffsetsWithoutGroup() {
     val groupMetadataTopicPartition = groupTopicPartition
     val producerId = 1000L
@@ -524,6 +559,9 @@ class GroupMetadataManagerTest {
   @Test
   def testLoadOffsetsAndGroup() {
     val groupMetadataTopicPartition = groupTopicPartition
+    val generation = 935
+    val protocolType = "consumer"
+    val protocol = "range"
     val startOffset = 15L
     val committedOffsets = Map(
       new TopicPartition("foo", 0) -> 23L,
@@ -533,7 +571,7 @@ class GroupMetadataManagerTest {
 
     val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
     val memberId = "98098230493"
-    val groupMetadataRecord = buildStableGroupRecordWithMember(memberId)
+    val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId)
     val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
       offsetCommitRecords ++ Seq(groupMetadataRecord): _*)
 
@@ -546,7 +584,10 @@ class GroupMetadataManagerTest {
     val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded into the cache"))
     assertEquals(groupId, group.groupId)
     assertEquals(Stable, group.currentState)
-    assertEquals(memberId, group.leaderId)
+    assertEquals(memberId, group.leaderOrNull)
+    assertEquals(generation, group.generationId)
+    assertEquals(Some(protocolType), group.protocolType)
+    assertEquals(protocol, group.protocolOrNull)
     assertEquals(Set(memberId), group.allMembers)
     assertEquals(committedOffsets.size, group.allOffsets.size)
     committedOffsets.foreach { case (topicPartition, offset) =>
@@ -558,9 +599,9 @@ class GroupMetadataManagerTest {
   def testLoadGroupWithTombstone() {
     val groupMetadataTopicPartition = groupTopicPartition
     val startOffset = 15L
-
     val memberId = "98098230493"
-    val groupMetadataRecord = buildStableGroupRecordWithMember(memberId)
+    val groupMetadataRecord = buildStableGroupRecordWithMember(generation = 15,
+      protocolType = "consumer", protocol = "range", memberId)
     val groupMetadataTombstone = new SimpleRecord(GroupMetadataManager.groupMetadataKey(groupId), null)
     val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
       Seq(groupMetadataRecord, groupMetadataTombstone): _*)
@@ -581,6 +622,9 @@ class GroupMetadataManagerTest {
     // 2. a "simple" consumer (i.e. not a consumer group) then uses the same groupId to commit some offsets
 
     val groupMetadataTopicPartition = groupTopicPartition
+    val generation = 293
+    val protocolType = "consumer"
+    val protocol = "range"
     val startOffset = 15L
 
     val committedOffsets = Map(
@@ -590,7 +634,7 @@ class GroupMetadataManagerTest {
     )
     val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
     val memberId = "98098230493"
-    val groupMetadataRecord = buildStableGroupRecordWithMember(memberId)
+    val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId)
     val groupMetadataTombstone = new SimpleRecord(GroupMetadataManager.groupMetadataKey(groupId), null)
     val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
       Seq(groupMetadataRecord, groupMetadataTombstone) ++ offsetCommitRecords: _*)
@@ -612,6 +656,9 @@ class GroupMetadataManagerTest {
 
   @Test
   def testLoadGroupAndOffsetsFromDifferentSegments(): Unit = {
+    val generation = 293
+    val protocolType = "consumer"
+    val protocol = "range"
     val startOffset = 15L
     val tp0 = new TopicPartition("foo", 0)
     val tp1 = new TopicPartition("foo", 1)
@@ -624,13 +671,15 @@ class GroupMetadataManagerTest {
     val segment1MemberId = "a"
     val segment1Offsets = Map(tp0 -> 23L, tp1 -> 455L, tp3 -> 42L)
     val segment1Records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
-      createCommittedOffsetRecords(segment1Offsets) ++ Seq(buildStableGroupRecordWithMember(segment1MemberId)): _*)
+      createCommittedOffsetRecords(segment1Offsets) ++ Seq(buildStableGroupRecordWithMember(
+        generation, protocolType, protocol, segment1MemberId)): _*)
     val segment1End = expectGroupMetadataLoad(logMock, startOffset, segment1Records)
 
     val segment2MemberId = "b"
     val segment2Offsets = Map(tp0 -> 33L, tp2 -> 8992L, tp3 -> 10L)
     val segment2Records = MemoryRecords.withRecords(segment1End, CompressionType.NONE,
-      createCommittedOffsetRecords(segment2Offsets) ++ Seq(buildStableGroupRecordWithMember(segment2MemberId)): _*)
+      createCommittedOffsetRecords(segment2Offsets) ++ Seq(buildStableGroupRecordWithMember(
+        generation, protocolType, protocol, segment2MemberId)): _*)
     val segment2End = expectGroupMetadataLoad(logMock, segment1End, segment2Records)
 
     EasyMock.expect(replicaManager.getLogEndOffset(groupTopicPartition)).andStubReturn(Some(segment2End))
@@ -643,7 +692,7 @@ class GroupMetadataManagerTest {
     assertEquals(groupId, group.groupId)
     assertEquals(Stable, group.currentState)
 
-    assertEquals("segment2 group record member should be elected", segment2MemberId, group.leaderId)
+    assertEquals("segment2 group record member should be elected", segment2MemberId, group.leaderOrNull)
     assertEquals("segment2 group record member should be only member", Set(segment2MemberId), group.allMembers)
 
     // offsets of segment1 should be overridden by segment2 offsets of the same topic partitions
@@ -654,20 +703,22 @@ class GroupMetadataManagerTest {
     }
   }
 
-
   @Test
   def testAddGroup() {
-    val group = new GroupMetadata("foo")
+    val group = new GroupMetadata("foo", initialState = Empty)
     assertEquals(group, groupMetadataManager.addGroup(group))
-    assertEquals(group, groupMetadataManager.addGroup(new GroupMetadata("foo")))
+    assertEquals(group, groupMetadataManager.addGroup(new GroupMetadata("foo", initialState = Empty)))
   }
 
   @Test
   def testStoreEmptyGroup() {
-    val group = new GroupMetadata(groupId)
+    val generation = 27
+    val protocolType = "consumer"
+
+    val group = GroupMetadata.loadGroup(groupId, Empty, generation, protocolType, null, null, Seq.empty)
     groupMetadataManager.addGroup(group)
 
-    expectAppendMessage(Errors.NONE)
+    val capturedRecords = expectAppendMessage(Errors.NONE)
     EasyMock.replay(replicaManager)
 
     var maybeError: Option[Errors] = None
@@ -677,6 +728,45 @@ class GroupMetadataManagerTest {
 
     groupMetadataManager.storeGroup(group, Map.empty, callback)
     assertEquals(Some(Errors.NONE), maybeError)
+    assertTrue(capturedRecords.hasCaptured)
+    val records = capturedRecords.getValue()(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId))
+        .records.asScala.toList
+    assertEquals(1, records.size)
+
+    val record = records.head
+    val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, record.value)
+    assertTrue(groupMetadata.is(Empty))
+    assertEquals(generation, groupMetadata.generationId)
+    assertEquals(Some(protocolType), groupMetadata.protocolType)
+  }
+
+  @Test
+  def testStoreEmptySimpleGroup() {
+    val group = new GroupMetadata(groupId, initialState = Empty)
+    groupMetadataManager.addGroup(group)
+
+    val capturedRecords = expectAppendMessage(Errors.NONE)
+    EasyMock.replay(replicaManager)
+
+    var maybeError: Option[Errors] = None
+    def callback(error: Errors) {
+      maybeError = Some(error)
+    }
+
+    groupMetadataManager.storeGroup(group, Map.empty, callback)
+    assertEquals(Some(Errors.NONE), maybeError)
+    assertTrue(capturedRecords.hasCaptured)
+
+    assertTrue(capturedRecords.hasCaptured)
+    val records = capturedRecords.getValue()(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId))
+      .records.asScala.toList
+    assertEquals(1, records.size)
+
+    val record = records.head
+    val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, record.value)
+    assertTrue(groupMetadata.is(Empty))
+    assertEquals(0, groupMetadata.generationId)
+    assertEquals(None, groupMetadata.protocolType)
   }
 
   @Test
@@ -695,7 +785,7 @@ class GroupMetadataManagerTest {
   private def assertStoreGroupErrorMapping(appendError: Errors, expectedError: Errors) {
     EasyMock.reset(replicaManager)
 
-    val group = new GroupMetadata(groupId)
+    val group = new GroupMetadata(groupId, initialState = Empty)
     groupMetadataManager.addGroup(group)
 
     expectAppendMessage(appendError)
@@ -718,7 +808,7 @@ class GroupMetadataManagerTest {
     val clientId = "clientId"
     val clientHost = "localhost"
 
-    val group = new GroupMetadata(groupId)
+    val group = new GroupMetadata(groupId, initialState = Empty)
     groupMetadataManager.addGroup(group)
 
     val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
@@ -749,7 +839,7 @@ class GroupMetadataManagerTest {
     val clientId = "clientId"
     val clientHost = "localhost"
 
-    val group = new GroupMetadata(groupId)
+    val group = new GroupMetadata(groupId, initialState = Empty)
 
     val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
       protocolType, List(("protocol", Array[Byte]())))
@@ -779,7 +869,7 @@ class GroupMetadataManagerTest {
 
     groupMetadataManager.addPartitionOwnership(groupPartitionId)
 
-    val group = new GroupMetadata(groupId)
+    val group = new GroupMetadata(groupId, initialState = Empty)
     groupMetadataManager.addGroup(group)
 
     val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
@@ -821,7 +911,7 @@ class GroupMetadataManagerTest {
 
     groupMetadataManager.addPartitionOwnership(groupPartitionId)
 
-    val group = new GroupMetadata(groupId)
+    val group = new GroupMetadata(groupId, initialState = Empty)
     groupMetadataManager.addGroup(group)
 
     val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
@@ -861,7 +951,7 @@ class GroupMetadataManagerTest {
 
     groupMetadataManager.addPartitionOwnership(groupPartitionId)
 
-    val group = new GroupMetadata(groupId)
+    val group = new GroupMetadata(groupId, initialState = Empty)
     groupMetadataManager.addGroup(group)
 
     val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
@@ -900,7 +990,7 @@ class GroupMetadataManagerTest {
 
     groupMetadataManager.addPartitionOwnership(groupPartitionId)
 
-    val group = new GroupMetadata(groupId)
+    val group = new GroupMetadata(groupId, initialState = Empty)
     groupMetadataManager.addGroup(group)
 
     val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
@@ -938,7 +1028,7 @@ class GroupMetadataManagerTest {
 
     groupMetadataManager.addPartitionOwnership(groupPartitionId)
 
-    val group = new GroupMetadata(groupId)
+    val group = new GroupMetadata(groupId, initialState = Empty)
     groupMetadataManager.addGroup(group)
 
     val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
@@ -980,7 +1070,7 @@ class GroupMetadataManagerTest {
 
     groupMetadataManager.addPartitionOwnership(groupPartitionId)
 
-    val group = new GroupMetadata(groupId)
+    val group = new GroupMetadata(groupId, initialState = Empty)
     groupMetadataManager.addGroup(group)
 
     val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
@@ -1018,7 +1108,7 @@ class GroupMetadataManagerTest {
 
     groupMetadataManager.addPartitionOwnership(groupPartitionId)
 
-    val group = new GroupMetadata(groupId)
+    val group = new GroupMetadata(groupId, initialState = Empty)
     groupMetadataManager.addGroup(group)
 
     // expire the offset after 1 millisecond
@@ -1071,7 +1161,7 @@ class GroupMetadataManagerTest {
 
     groupMetadataManager.addPartitionOwnership(groupPartitionId)
 
-    val group = new GroupMetadata(groupId)
+    val group = new GroupMetadata(groupId, initialState = Empty)
     groupMetadataManager.addGroup(group)
     group.generationId = 5
 
@@ -1119,7 +1209,7 @@ class GroupMetadataManagerTest {
 
     groupMetadataManager.addPartitionOwnership(groupPartitionId)
 
-    val group = new GroupMetadata(groupId)
+    val group = new GroupMetadata(groupId, initialState = Empty)
     groupMetadataManager.addGroup(group)
     group.generationId = 5
 
@@ -1173,7 +1263,7 @@ class GroupMetadataManagerTest {
 
     groupMetadataManager.addPartitionOwnership(groupPartitionId)
 
-    val group = new GroupMetadata(groupId)
+    val group = new GroupMetadata(groupId, initialState = Empty)
     groupMetadataManager.addGroup(group)
 
     // expire the offset after 1 millisecond
@@ -1244,7 +1334,7 @@ class GroupMetadataManagerTest {
 
     groupMetadataManager.addPartitionOwnership(groupPartitionId)
 
-    val group = new GroupMetadata(groupId)
+    val group = new GroupMetadata(groupId, initialState = Empty)
     groupMetadataManager.addGroup(group)
 
     val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
@@ -1314,40 +1404,47 @@ class GroupMetadataManagerTest {
     capturedArgument
   }
 
-  private def expectAppendMessage(error: Errors) {
-    val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
+  private def expectAppendMessage(error: Errors): Capture[Map[TopicPartition, MemoryRecords]] = {
+    val capturedCallback: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
+    val capturedRecords: Capture[Map[TopicPartition, MemoryRecords]] = EasyMock.newCapture()
     EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
       EasyMock.anyShort(),
       internalTopicsAllowed = EasyMock.eq(true),
       isFromClient = EasyMock.eq(false),
-      EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
-      EasyMock.capture(capturedArgument),
+      EasyMock.capture(capturedRecords),
+      EasyMock.capture(capturedCallback),
       EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]],
       EasyMock.anyObject())
     ).andAnswer(new IAnswer[Unit] {
-      override def answer = capturedArgument.getValue.apply(
+      override def answer = capturedCallback.getValue.apply(
         Map(groupTopicPartition ->
           new PartitionResponse(error, 0L, RecordBatch.NO_TIMESTAMP, 0L)
         )
       )})
     EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
+    capturedRecords
   }
 
-  private def buildStableGroupRecordWithMember(memberId: String): SimpleRecord = {
-    val group = new GroupMetadata(groupId)
-    group.transitionTo(PreparingRebalance)
-    val memberProtocols = List(("roundrobin", Array.emptyByteArray))
-    val member = new MemberMetadata(memberId, groupId, "clientId", "clientHost", 30000, 10000, "consumer", memberProtocols)
-    group.add(member)
-    member.awaitingJoinCallback = _ => {}
-    group.initNextGeneration()
-    group.transitionTo(Stable)
-
+  private def buildStableGroupRecordWithMember(generation: Int,
+                                               protocolType: String,
+                                               protocol: String,
+                                               memberId: String): SimpleRecord = {
+    val memberProtocols = List((protocol, Array.emptyByteArray))
+    val member = new MemberMetadata(memberId, groupId, "clientId", "clientHost", 30000, 10000, protocolType, memberProtocols)
+    val group = GroupMetadata.loadGroup(groupId, Stable, generation, protocolType, protocol,
+      leaderId = memberId, Seq(member))
     val groupMetadataKey = GroupMetadataManager.groupMetadataKey(groupId)
     val groupMetadataValue = GroupMetadataManager.groupMetadataValue(group, Map(memberId -> Array.empty[Byte]))
     new SimpleRecord(groupMetadataKey, groupMetadataValue)
   }
 
+  private def buildEmptyGroupRecord(generation: Int, protocolType: String): SimpleRecord = {
+    val group = GroupMetadata.loadGroup(groupId, Empty, generation, protocolType, null, null, Seq.empty)
+    val groupMetadataKey = GroupMetadataManager.groupMetadataKey(groupId)
+    val groupMetadataValue = GroupMetadataManager.groupMetadataValue(group, Map.empty)
+    new SimpleRecord(groupMetadataKey, groupMetadataValue)
+  }
+
   private def expectGroupMetadataLoad(groupMetadataTopicPartition: TopicPartition,
                                       startOffset: Long,
                                       records: MemoryRecords): Unit = {
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
index 2db6603..871a2d3 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
@@ -40,7 +40,7 @@ class GroupMetadataTest extends JUnitSuite {
 
   @Before
   def setUp() {
-    group = new GroupMetadata("groupId")
+    group = new GroupMetadata("groupId", initialState = Empty)
   }
 
   @Test
@@ -271,25 +271,25 @@ class GroupMetadataTest extends JUnitSuite {
     group.add(member)
 
     assertEquals(0, group.generationId)
-    assertNull(group.protocol)
+    assertNull(group.protocolOrNull)
 
     group.initNextGeneration()
 
     assertEquals(1, group.generationId)
-    assertEquals("roundrobin", group.protocol)
+    assertEquals("roundrobin", group.protocolOrNull)
   }
 
   @Test
   def testInitNextGenerationEmptyGroup() {
     assertEquals(Empty, group.currentState)
     assertEquals(0, group.generationId)
-    assertNull(group.protocol)
+    assertNull(group.protocolOrNull)
 
     group.transitionTo(PreparingRebalance)
     group.initNextGeneration()
 
     assertEquals(1, group.generationId)
-    assertNull(group.protocol)
+    assertNull(group.protocolOrNull)
   }
 
   @Test

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.