You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/01/24 04:20:54 UTC
[kafka] branch 1.0 updated: KAFKA-6287;
Consumer group command should list simple consumer groups (#4407)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.0 by this push:
new 4f5cb2d KAFKA-6287; Consumer group command should list simple consumer groups (#4407)
4f5cb2d is described below
commit 4f5cb2d9088863ea9782313fd995e48d55c57b01
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Tue Jan 23 18:58:49 2018 -0800
KAFKA-6287; Consumer group command should list simple consumer groups (#4407)
With this patch, simple consumer groups which only use Kafka for offset storage will be viewable using the `--list` option in consumer-groups.sh. In addition, this patch fixes a bug in the offset loading logic which caused us to lose the protocol type of empty groups on coordinator failover. I also did some cleanup of the various consumer group command test cases.
For testing, I have added new integration tests which cover listing and describing simple consumer groups. I also added unit tests to cover loading empty groups with assertions on the protocol type.
Reviewers: Rajini Sivaram <ra...@googlemail.com>
---
core/src/main/scala/kafka/admin/AdminClient.scala | 18 +-
.../scala/kafka/admin/ConsumerGroupCommand.scala | 30 ++--
.../kafka/coordinator/group/GroupCoordinator.scala | 33 ++--
.../kafka/coordinator/group/GroupMetadata.scala | 66 ++++---
.../coordinator/group/GroupMetadataManager.scala | 44 +++--
.../main/scala/kafka/tools/DumpLogSegments.scala | 2 +-
.../SaslClientsWithInvalidCredentialsTest.scala | 2 +-
.../kafka/admin/ConsumerGroupCommandTest.scala | 197 +++++++++++++++++++++
.../kafka/admin/DescribeConsumerGroupTest.scala | 173 +++++-------------
.../unit/kafka/admin/ListConsumerGroupTest.scala | 81 ++++-----
.../kafka/admin/ResetConsumerGroupOffsetTest.scala | 193 ++++++++------------
.../coordinator/group/GroupCoordinatorTest.scala | 32 +++-
.../group/GroupMetadataManagerTest.scala | 183 ++++++++++++++-----
.../coordinator/group/GroupMetadataTest.scala | 10 +-
14 files changed, 630 insertions(+), 434 deletions(-)
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 24149d7..d794eb2 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -167,7 +167,7 @@ class AdminClient(val time: Time,
}
def listAllGroups(): Map[Node, List[GroupOverview]] = {
- findAllBrokers.map { broker =>
+ findAllBrokers().map { broker =>
broker -> {
try {
listGroups(broker)
@@ -182,16 +182,22 @@ class AdminClient(val time: Time,
def listAllConsumerGroups(): Map[Node, List[GroupOverview]] = {
listAllGroups().mapValues { groups =>
- groups.filter(_.protocolType == ConsumerProtocol.PROTOCOL_TYPE)
+ groups.filter(isConsumerGroup)
}
}
def listAllGroupsFlattened(): List[GroupOverview] = {
- listAllGroups.values.flatten.toList
+ listAllGroups().values.flatten.toList
}
def listAllConsumerGroupsFlattened(): List[GroupOverview] = {
- listAllGroupsFlattened.filter(_.protocolType == ConsumerProtocol.PROTOCOL_TYPE)
+ listAllGroupsFlattened().filter(isConsumerGroup)
+ }
+
+ private def isConsumerGroup(group: GroupOverview): Boolean = {
+ // Consumer groups which are using group management use the "consumer" protocol type.
+ // Consumer groups which are only using offset storage will have an empty protocol type.
+ group.protocolType.isEmpty || group.protocolType == ConsumerProtocol.PROTOCOL_TYPE
}
def listGroupOffsets(groupId: String): Map[TopicPartition, Long] = {
@@ -200,12 +206,12 @@ class AdminClient(val time: Time,
val response = responseBody.asInstanceOf[OffsetFetchResponse]
if (response.hasError)
throw response.error.exception
- response.maybeThrowFirstPartitionError
+ response.maybeThrowFirstPartitionError()
response.responseData.asScala.map { case (tp, partitionData) => (tp, partitionData.offset) }.toMap
}
def listAllBrokerVersionInfo(): Map[Node, Try[NodeApiVersions]] =
- findAllBrokers.map { broker =>
+ findAllBrokers().map { broker =>
broker -> Try[NodeApiVersions](new NodeApiVersions(getApiVersions(broker).asJava))
}.toMap
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 2120657..f1c397e 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -146,6 +146,19 @@ object ConsumerGroupCommand extends Logging {
}
}
+ def convertTimestamp(timeString: String): java.lang.Long = {
+ val datetime: String = timeString match {
+ case ts if ts.split("T")(1).contains("+") || ts.split("T")(1).contains("-") || ts.split("T")(1).contains("Z") => ts.toString
+ case ts => s"${ts}Z"
+ }
+ val date = try {
+ new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").parse(datetime)
+ } catch {
+ case _: ParseException => new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX").parse(datetime)
+ }
+ date.getTime
+ }
+
def printOffsetsToReset(groupAssignmentsToReset: Map[TopicPartition, OffsetAndMetadata]): Unit = {
print("\n%-30s %-10s %-15s".format("TOPIC", "PARTITION", "NEW-OFFSET"))
println()
@@ -606,8 +619,8 @@ object ConsumerGroupCommand extends Logging {
(topicPartition, new OffsetAndMetadata(newOffset))
}.toMap
} else if (opts.options.has(opts.resetToDatetimeOpt)) {
+ val timestamp = convertTimestamp(opts.options.valueOf(opts.resetToDatetimeOpt))
partitionsToReset.map { topicPartition =>
- val timestamp = getDateTime
val logTimestampOffset = getLogTimestampOffset(topicPartition, timestamp)
logTimestampOffset match {
case LogOffsetResult.LogOffset(offset) => (topicPartition, new OffsetAndMetadata(offset))
@@ -668,21 +681,6 @@ object ConsumerGroupCommand extends Logging {
}
}
- private[admin] def getDateTime: java.lang.Long = {
- val datetime: String = opts.options.valueOf(opts.resetToDatetimeOpt) match {
- case ts if ts.split("T")(1).contains("+") || ts.split("T")(1).contains("-") || ts.split("T")(1).contains("Z") => ts.toString
- case ts => s"${ts}Z"
- }
- val date = {
- try {
- new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").parse(datetime)
- } catch {
- case e: ParseException => new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX").parse(datetime)
- }
- }
- date.getTime
- }
-
override def exportOffsetsToReset(assignmentsToReset: Map[TopicPartition, OffsetAndMetadata]): String = {
val rows = assignmentsToReset.map { case (k,v) => s"${k.topic()},${k.partition()},${v.offset()}" }(collection.breakOut): List[String]
rows.foldRight("")(_ + "\n" + _)
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 94622f6..bea4e83 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -34,7 +34,6 @@ import org.apache.kafka.common.utils.Time
import scala.collection.{Map, Seq, immutable}
import scala.math.max
-
/**
* GroupCoordinator handles general group membership and offset management.
*
@@ -129,7 +128,7 @@ class GroupCoordinator(val brokerId: Int,
if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID) {
responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID))
} else {
- val group = groupManager.addGroup(new GroupMetadata(groupId))
+ val group = groupManager.addGroup(new GroupMetadata(groupId, initialState = Empty))
doJoinGroup(group, memberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
}
@@ -185,15 +184,15 @@ class GroupCoordinator(val brokerId: Int,
// receive the initial JoinGroup response), so just return current group information
// for the current generation.
responseCallback(JoinGroupResult(
- members = if (memberId == group.leaderId) {
+ members = if (group.isLeader(memberId)) {
group.currentMemberMetadata
} else {
Map.empty
},
memberId = memberId,
generationId = group.generationId,
- subProtocol = group.protocol,
- leaderId = group.leaderId,
+ subProtocol = group.protocolOrNull,
+ leaderId = group.leaderOrNull,
error = Errors.NONE))
} else {
// member has changed metadata, so force a rebalance
@@ -207,7 +206,7 @@ class GroupCoordinator(val brokerId: Int,
addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback)
} else {
val member = group.get(memberId)
- if (memberId == group.leaderId || !member.matches(protocols)) {
+ if (group.isLeader(memberId) || !member.matches(protocols)) {
// force a rebalance if a member has changed metadata or if the leader sends JoinGroup.
// The latter allows the leader to trigger rebalances for changes affecting assignment
// which do not affect the member metadata (such as topic metadata changes for the consumer)
@@ -219,8 +218,8 @@ class GroupCoordinator(val brokerId: Int,
members = Map.empty,
memberId = memberId,
generationId = group.generationId,
- subProtocol = group.protocol,
- leaderId = group.leaderId,
+ subProtocol = group.protocolOrNull,
+ leaderId = group.leaderOrNull,
error = Errors.NONE))
}
}
@@ -271,7 +270,7 @@ class GroupCoordinator(val brokerId: Int,
group.get(memberId).awaitingSyncCallback = responseCallback
// if this is the leader, then we can attempt to persist state and transition to stable
- if (memberId == group.leaderId) {
+ if (group.isLeader(memberId)) {
info(s"Assignment received from leader for group ${group.groupId} for generation ${group.generationId}")
// fill any missing members with an empty assignment
@@ -408,7 +407,9 @@ class GroupCoordinator(val brokerId: Int,
validateGroup(groupId) match {
case Some(error) => responseCallback(offsetMetadata.mapValues(_ => error))
case None =>
- val group = groupManager.getGroup(groupId).getOrElse(groupManager.addGroup(new GroupMetadata(groupId)))
+ val group = groupManager.getGroup(groupId).getOrElse {
+ groupManager.addGroup(new GroupMetadata(groupId, initialState = Empty))
+ }
doCommitOffsets(group, NoMemberId, NoGeneration, producerId, producerEpoch, offsetMetadata, responseCallback)
}
}
@@ -425,7 +426,7 @@ class GroupCoordinator(val brokerId: Int,
case None =>
if (generationId < 0) {
// the group is not relying on Kafka for group management, so allow the commit
- val group = groupManager.addGroup(new GroupMetadata(groupId))
+ val group = groupManager.addGroup(new GroupMetadata(groupId, initialState = Empty))
doCommitOffsets(group, memberId, generationId, NO_PRODUCER_ID, NO_PRODUCER_EPOCH,
offsetMetadata, responseCallback)
} else {
@@ -459,7 +460,7 @@ class GroupCoordinator(val brokerId: Int,
if (group.is(Dead)) {
responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID))
} else if ((generationId < 0 && group.is(Empty)) || (producerId != NO_PRODUCER_ID)) {
- // the group is only using Kafka to store offsets
+ // The group is only using Kafka to store offsets.
// Also, for transactional offset commits we don't need to validate group membership and the generation.
groupManager.storeOffsets(group, memberId, offsetMetadata, responseCallback, producerId, producerEpoch)
} else if (group.is(AwaitingSync)) {
@@ -481,7 +482,7 @@ class GroupCoordinator(val brokerId: Int,
if (!isActive.get)
(Errors.COORDINATOR_NOT_AVAILABLE, Map())
else if (!isCoordinatorForGroup(groupId)) {
- debug("Could not fetch offsets for group %s (not group coordinator).".format(groupId))
+ debug(s"Could not fetch offsets for group $groupId (not group coordinator)")
(Errors.NOT_COORDINATOR, Map())
} else if (isCoordinatorLoadInProgress(groupId))
(Errors.COORDINATOR_LOAD_IN_PROGRESS, Map())
@@ -753,15 +754,15 @@ class GroupCoordinator(val brokerId: Int,
for (member <- group.allMemberMetadata) {
assert(member.awaitingJoinCallback != null)
val joinResult = JoinGroupResult(
- members = if (member.memberId == group.leaderId) {
+ members = if (group.isLeader(member.memberId)) {
group.currentMemberMetadata
} else {
Map.empty
},
memberId = member.memberId,
generationId = group.generationId,
- subProtocol = group.protocol,
- leaderId = group.leaderId,
+ subProtocol = group.protocolOrNull,
+ leaderId = group.leaderOrNull,
error = Errors.NONE)
member.awaitingJoinCallback(joinResult)
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
index 537d944..3257f51 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
@@ -111,6 +111,22 @@ private object GroupMetadata {
Stable -> Set(AwaitingSync),
PreparingRebalance -> Set(Stable, AwaitingSync, Empty),
Empty -> Set(PreparingRebalance))
+
+ def loadGroup(groupId: String,
+ initialState: GroupState,
+ generationId: Int,
+ protocolType: String,
+ protocol: String,
+ leaderId: String,
+ members: Iterable[MemberMetadata]): GroupMetadata = {
+ val group = new GroupMetadata(groupId, initialState)
+ group.generationId = generationId
+ group.protocolType = if (protocolType == null || protocolType.isEmpty) None else Some(protocolType)
+ group.protocol = Option(protocol)
+ group.leaderId = Option(leaderId)
+ members.foreach(group.add)
+ group
+ }
}
/**
@@ -151,28 +167,22 @@ case class CommitRecordMetadataAndOffset(appendedBatchOffset: Option[Long], offs
* 3. leader id
*/
@nonthreadsafe
-private[group] class GroupMetadata(val groupId: String, initialState: GroupState = Empty) extends Logging {
+private[group] class GroupMetadata(val groupId: String, initialState: GroupState) extends Logging {
+ private[group] val lock = new ReentrantLock
private var state: GroupState = initialState
-
- private[group] val lock = new ReentrantLock
+ var protocolType: Option[String] = None
+ var generationId = 0
+ private var leaderId: Option[String] = None
+ private var protocol: Option[String] = None
private val members = new mutable.HashMap[String, MemberMetadata]
-
private val offsets = new mutable.HashMap[TopicPartition, CommitRecordMetadataAndOffset]
-
private val pendingOffsetCommits = new mutable.HashMap[TopicPartition, OffsetAndMetadata]
-
private val pendingTransactionalOffsetCommits = new mutable.HashMap[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]()
-
private var receivedTransactionalOffsetCommits = false
-
private var receivedConsumerOffsetCommits = false
- var protocolType: Option[String] = None
- var generationId = 0
- var leaderId: String = null
- var protocol: String = null
var newMemberAdded: Boolean = false
def inLock[T](fun: => T): T = CoreUtils.inLock(lock)(fun)
@@ -182,6 +192,10 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
def has(memberId: String) = members.contains(memberId)
def get(memberId: String) = members(memberId)
+ def isLeader(memberId: String): Boolean = leaderId.contains(memberId)
+ def leaderOrNull: String = leaderId.orNull
+ def protocolOrNull: String = protocol.orNull
+
def add(member: MemberMetadata) {
if (members.isEmpty)
this.protocolType = Some(member.protocolType)
@@ -190,18 +204,18 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
assert(this.protocolType.orNull == member.protocolType)
assert(supportsProtocols(member.protocols))
- if (leaderId == null)
- leaderId = member.memberId
+ if (leaderId.isEmpty)
+ leaderId = Some(member.memberId)
members.put(member.memberId, member)
}
def remove(memberId: String) {
members.remove(memberId)
- if (memberId == leaderId) {
+ if (isLeader(memberId)) {
leaderId = if (members.isEmpty) {
- null
+ None
} else {
- members.keys.head
+ Some(members.keys.head)
}
}
}
@@ -260,11 +274,11 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
assert(notYetRejoinedMembers == List.empty[MemberMetadata])
if (members.nonEmpty) {
generationId += 1
- protocol = selectProtocol
+ protocol = Some(selectProtocol)
transitionTo(AwaitingSync)
} else {
generationId += 1
- protocol = null
+ protocol = None
transitionTo(Empty)
}
receivedConsumerOffsetCommits = false
@@ -274,16 +288,20 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
def currentMemberMetadata: Map[String, Array[Byte]] = {
if (is(Dead) || is(PreparingRebalance))
throw new IllegalStateException("Cannot obtain member metadata for group in state %s".format(state))
- members.map{ case (memberId, memberMetadata) => (memberId, memberMetadata.metadata(protocol))}.toMap
+ members.map{ case (memberId, memberMetadata) => (memberId, memberMetadata.metadata(protocol.get))}.toMap
}
def summary: GroupSummary = {
if (is(Stable)) {
- val members = this.members.values.map { member => member.summary(protocol) }.toList
- GroupSummary(state.toString, protocolType.getOrElse(""), protocol, members)
+ val protocol = protocolOrNull
+ if (protocol == null)
+ throw new IllegalStateException("Invalid null group protocol for stable group")
+
+ val members = this.members.values.map { member => member.summary(protocol) }
+ GroupSummary(state.toString, protocolType.getOrElse(""), protocol, members.toList)
} else {
- val members = this.members.values.map{ member => member.summaryNoMetadata() }.toList
- GroupSummary(state.toString, protocolType.getOrElse(""), GroupCoordinator.NoProtocol, members)
+ val members = this.members.values.map{ member => member.summaryNoMetadata() }
+ GroupSummary(state.toString, protocolType.getOrElse(""), GroupCoordinator.NoProtocol, members.toList)
}
}
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 0298509..dbae86f 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -26,7 +26,7 @@ import java.util.concurrent.locks.ReentrantLock
import com.yammer.metrics.core.Gauge
import kafka.api.{ApiVersion, KAFKA_0_10_1_IV0}
-import kafka.common.{MessageFormatter, _}
+import kafka.common.{KafkaException, MessageFormatter, OffsetAndMetadata}
import kafka.metrics.KafkaMetricsGroup
import kafka.server.ReplicaManager
import kafka.utils.CoreUtils.inLock
@@ -182,8 +182,7 @@ class GroupMetadataManager(brokerId: Int,
throw new IllegalStateException("Append status %s should only have one partition %s"
.format(responseStatus, groupMetadataPartition))
- // construct the error status in the propagated assignment response
- // in the cache
+ // construct the error status in the propagated assignment response in the cache
val status = responseStatus(groupMetadataPartition)
val responseError = if (status.error == Errors.NONE) {
@@ -263,7 +262,7 @@ class GroupMetadataManager(brokerId: Int,
group.inLock {
if (!group.hasReceivedConsistentOffsetCommits)
- warn(s"group: ${group.groupId} with leader: ${group.leaderId} has received offset commits from consumers as well " +
+ warn(s"group: ${group.groupId} with leader: ${group.leaderOrNull} has received offset commits from consumers as well " +
s"as transactional producers. Mixing both types of offset commits will generally result in surprises and " +
s"should be avoided.")
}
@@ -585,8 +584,8 @@ class GroupMetadataManager(brokerId: Int,
// load groups which store offsets in kafka, but which have no active members and thus no group
// metadata stored in the log
- (emptyGroupOffsets.keySet ++ pendingEmptyGroupOffsets.keySet).foreach { case(groupId) =>
- val group = new GroupMetadata(groupId)
+ (emptyGroupOffsets.keySet ++ pendingEmptyGroupOffsets.keySet).foreach { groupId =>
+ val group = new GroupMetadata(groupId, initialState = Empty)
val offsets = emptyGroupOffsets.getOrElse(groupId, Map.empty[TopicPartition, CommitRecordMetadataAndOffset])
val pendingOffsets = pendingEmptyGroupOffsets.getOrElse(groupId, Map.empty[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]])
debug(s"Loaded group metadata $group with offsets $offsets and pending offsets $pendingOffsets")
@@ -1036,8 +1035,8 @@ object GroupMetadataManager {
value.set(PROTOCOL_TYPE_KEY, groupMetadata.protocolType.getOrElse(""))
value.set(GENERATION_KEY, groupMetadata.generationId)
- value.set(PROTOCOL_KEY, groupMetadata.protocol)
- value.set(LEADER_KEY, groupMetadata.leaderId)
+ value.set(PROTOCOL_KEY, groupMetadata.protocolOrNull)
+ value.set(LEADER_KEY, groupMetadata.leaderOrNull)
val memberArray = groupMetadata.allMemberMetadata.map { memberMetadata =>
val memberStruct = value.instance(MEMBERS_KEY)
@@ -1049,7 +1048,12 @@ object GroupMetadataManager {
if (version > 0)
memberStruct.set(REBALANCE_TIMEOUT_KEY, memberMetadata.rebalanceTimeoutMs)
- val metadata = memberMetadata.metadata(groupMetadata.protocol)
+ // The group is non-empty, so the current protocol must be defined
+ val protocol = groupMetadata.protocolOrNull
+ if (protocol == null)
+ throw new IllegalStateException("Attempted to write non-empty group metadata with no defined protocol")
+
+ val metadata = memberMetadata.metadata(protocol)
memberStruct.set(SUBSCRIPTION_KEY, ByteBuffer.wrap(metadata))
val memberAssignment = assignment(memberMetadata.memberId)
@@ -1145,36 +1149,28 @@ object GroupMetadataManager {
val value = valueSchema.read(buffer)
if (version == 0 || version == 1) {
+ val generationId = value.get(GENERATION_KEY).asInstanceOf[Int]
val protocolType = value.get(PROTOCOL_TYPE_KEY).asInstanceOf[String]
-
+ val protocol = value.get(PROTOCOL_KEY).asInstanceOf[String]
+ val leaderId = value.get(LEADER_KEY).asInstanceOf[String]
val memberMetadataArray = value.getArray(MEMBERS_KEY)
val initialState = if (memberMetadataArray.isEmpty) Empty else Stable
- val group = new GroupMetadata(groupId, initialState)
-
- group.generationId = value.get(GENERATION_KEY).asInstanceOf[Int]
- group.leaderId = value.get(LEADER_KEY).asInstanceOf[String]
- group.protocol = value.get(PROTOCOL_KEY).asInstanceOf[String]
-
- memberMetadataArray.foreach { memberMetadataObj =>
+ val members = memberMetadataArray.map { memberMetadataObj =>
val memberMetadata = memberMetadataObj.asInstanceOf[Struct]
val memberId = memberMetadata.get(MEMBER_ID_KEY).asInstanceOf[String]
val clientId = memberMetadata.get(CLIENT_ID_KEY).asInstanceOf[String]
val clientHost = memberMetadata.get(CLIENT_HOST_KEY).asInstanceOf[String]
val sessionTimeout = memberMetadata.get(SESSION_TIMEOUT_KEY).asInstanceOf[Int]
val rebalanceTimeout = if (version == 0) sessionTimeout else memberMetadata.get(REBALANCE_TIMEOUT_KEY).asInstanceOf[Int]
-
val subscription = Utils.toArray(memberMetadata.get(SUBSCRIPTION_KEY).asInstanceOf[ByteBuffer])
val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
- protocolType, List((group.protocol, subscription)))
-
+ protocolType, List((protocol, subscription)))
member.assignment = Utils.toArray(memberMetadata.get(ASSIGNMENT_KEY).asInstanceOf[ByteBuffer])
-
- group.add(member)
+ member
}
-
- group
+ GroupMetadata.loadGroup(groupId, initialState, generationId, protocolType, protocol, leaderId, members)
} else {
throw new IllegalStateException("Unknown group metadata message version")
}
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index fe82dc2..855ca75 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -337,7 +337,7 @@ object DumpLogSegments {
val keyString = Json.encode(Map("metadata" -> groupId))
val valueString = Json.encode(Map(
"protocolType" -> protocolType,
- "protocol" -> group.protocol,
+ "protocol" -> group.protocolOrNull,
"generationId" -> group.generationId,
"assignment" -> assignment))
diff --git a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
index b309b80..65e72bd 100644
--- a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
@@ -151,7 +151,7 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with
createClientCredential()
verifyWithRetry(describeTopic())
} finally {
- adminClient.close
+ adminClient.close()
}
}
diff --git a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
new file mode 100644
index 0000000..b42c3e2
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
+import java.util.{Collections, Properties}
+
+import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, ConsumerGroupService, KafkaConsumerGroupService, ZkConsumerGroupService}
+import kafka.consumer.{OldConsumer, Whitelist}
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.consumer.{KafkaConsumer, RangeAssignor}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.WakeupException
+import org.apache.kafka.common.serialization.StringDeserializer
+import org.junit.{After, Before}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.JavaConverters._
+
+class ConsumerGroupCommandTest extends KafkaServerTestHarness {
+ import ConsumerGroupCommandTest._
+
+ val topic = "foo"
+ val group = "test.group"
+
+ @deprecated("This field will be removed in a future release", "0.11.0.0")
+ private val oldConsumers = new ArrayBuffer[OldConsumer]
+ private var consumerGroupService: List[ConsumerGroupService] = List()
+ private var consumerGroupExecutors: List[AbstractConsumerGroupExecutor] = List()
+
+ // configure the servers and clients
+ override def generateConfigs = {
+ TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown = false).map { props =>
+ KafkaConfig.fromProps(props)
+ }
+ }
+
+ @Before
+ override def setUp() {
+ super.setUp()
+ AdminUtils.createTopic(zkUtils, topic, 1, 1)
+ }
+
+ @After
+ override def tearDown(): Unit = {
+ consumerGroupService.foreach(_.close())
+ consumerGroupExecutors.foreach(_.shutdown())
+ oldConsumers.foreach(_.stop())
+ super.tearDown()
+ }
+
+ @deprecated("This test has been deprecated and will be removed in a future release.", "0.11.1.0")
+ def createOldConsumer(): Unit = {
+ val consumerProps = new Properties
+ consumerProps.setProperty("group.id", group)
+ consumerProps.setProperty("zookeeper.connect", zkConnect)
+ oldConsumers += new OldConsumer(Whitelist(topic), consumerProps)
+ }
+
+ def stopRandomOldConsumer(): Unit = {
+ oldConsumers.head.stop()
+ }
+
+ def getConsumerGroupService(args: Array[String]): ConsumerGroupService = {
+ val opts = new ConsumerGroupCommandOptions(args)
+ val service = if (opts.useOldConsumer) new ZkConsumerGroupService(opts) else new KafkaConsumerGroupService(opts)
+ consumerGroupService = service :: consumerGroupService
+ service
+ }
+
+ def addConsumerGroupExecutor(numConsumers: Int,
+ topic: String = topic,
+ group: String = group,
+ strategy: String = classOf[RangeAssignor].getName): ConsumerGroupExecutor = {
+ val executor = new ConsumerGroupExecutor(brokerList, numConsumers, group, topic, strategy)
+ addExecutor(executor)
+ executor
+ }
+
+ def addSimpleGroupExecutor(partitions: Iterable[TopicPartition] = Seq(new TopicPartition(topic, 0)),
+ group: String = group): SimpleConsumerGroupExecutor = {
+ val executor = new SimpleConsumerGroupExecutor(brokerList, group, partitions)
+ addExecutor(executor)
+ executor
+ }
+
+ private def addExecutor(executor: AbstractConsumerGroupExecutor): AbstractConsumerGroupExecutor = {
+ consumerGroupExecutors = executor :: consumerGroupExecutors
+ executor
+ }
+
+}
+
+object ConsumerGroupCommandTest {
+
+ abstract class AbstractConsumerRunnable(broker: String, groupId: String) extends Runnable {
+ val props = new Properties
+ configure(props)
+ val consumer = new KafkaConsumer(props)
+
+ def configure(props: Properties): Unit = {
+ props.put("bootstrap.servers", broker)
+ props.put("group.id", groupId)
+ props.put("key.deserializer", classOf[StringDeserializer].getName)
+ props.put("value.deserializer", classOf[StringDeserializer].getName)
+ }
+
+ def subscribe(): Unit
+
+ def run() {
+ try {
+ subscribe()
+ while (true)
+ consumer.poll(Long.MaxValue)
+ } catch {
+ case _: WakeupException => // OK
+ } finally {
+ consumer.close()
+ }
+ }
+
+ def shutdown() {
+ consumer.wakeup()
+ }
+ }
+
+ class ConsumerRunnable(broker: String, groupId: String, topic: String, strategy: String)
+ extends AbstractConsumerRunnable(broker, groupId) {
+
+ override def configure(props: Properties): Unit = {
+ super.configure(props)
+ props.put("partition.assignment.strategy", strategy)
+ }
+
+ override def subscribe(): Unit = {
+ consumer.subscribe(Collections.singleton(topic))
+ }
+ }
+
+ class SimpleConsumerRunnable(broker: String, groupId: String, partitions: Iterable[TopicPartition])
+ extends AbstractConsumerRunnable(broker, groupId) {
+
+ override def subscribe(): Unit = {
+ consumer.assign(partitions.toList.asJava)
+ }
+ }
+
+ class AbstractConsumerGroupExecutor(numThreads: Int) {
+ private val executor: ExecutorService = Executors.newFixedThreadPool(numThreads)
+ private val consumers = new ArrayBuffer[AbstractConsumerRunnable]()
+
+ def submit(consumerThread: AbstractConsumerRunnable) {
+ consumers += consumerThread
+ executor.submit(consumerThread)
+ }
+
+ def shutdown() {
+ consumers.foreach(_.shutdown())
+ executor.shutdown()
+ executor.awaitTermination(5000, TimeUnit.MILLISECONDS)
+ }
+ }
+
+ class ConsumerGroupExecutor(broker: String, numConsumers: Int, groupId: String, topic: String, strategy: String)
+ extends AbstractConsumerGroupExecutor(numConsumers) {
+
+ for (_ <- 1 to numConsumers) {
+ submit(new ConsumerRunnable(broker, groupId, topic, strategy))
+ }
+
+ }
+
+ class SimpleConsumerGroupExecutor(broker: String, groupId: String, partitions: Iterable[TopicPartition])
+ extends AbstractConsumerGroupExecutor(1) {
+
+ submit(new SimpleConsumerRunnable(broker, groupId, partitions))
+ }
+
+}
+
diff --git a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
index 7000308..c782c3f 100644
--- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
@@ -16,76 +16,49 @@
*/
package kafka.admin
-import java.util.concurrent.ExecutorService
-import java.util.concurrent.Executors
-import java.util.concurrent.TimeUnit
-import java.util.Collections
-import java.util.Properties
-
-import org.junit.Assert._
-import org.junit.{After, Before, Test}
-import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, ConsumerGroupService, KafkaConsumerGroupService, ZkConsumerGroupService}
-import kafka.consumer.OldConsumer
-import kafka.consumer.Whitelist
-import kafka.integration.KafkaServerTestHarness
-import kafka.server.KafkaConfig
import kafka.utils.TestUtils
-import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.TimeoutException
-import org.apache.kafka.common.errors.WakeupException
-import org.apache.kafka.common.serialization.StringDeserializer
-
-import scala.collection.mutable.ArrayBuffer
-
-class DescribeConsumerGroupTest extends KafkaServerTestHarness {
- private val topic = "foo"
- private val group = "test.group"
-
- @deprecated("This field will be removed in a future release", "0.11.0.0")
- private val oldConsumers = new ArrayBuffer[OldConsumer]
- private var consumerGroupService: ConsumerGroupService = _
- private var consumerGroupExecutor: ConsumerGroupExecutor = _
-
- // configure the servers and clients
- override def generateConfigs = {
- TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown = false).map { props =>
- KafkaConfig.fromProps(props)
- }
- }
-
- @Before
- override def setUp() {
- super.setUp()
- AdminUtils.createTopic(zkUtils, topic, 1, 1)
- }
+import org.junit.Assert._
+import org.junit.Test
- @After
- override def tearDown(): Unit = {
- if (consumerGroupService != null)
- consumerGroupService.close()
- if (consumerGroupExecutor != null)
- consumerGroupExecutor.shutdown()
- oldConsumers.foreach(_.stop())
- super.tearDown()
- }
+class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
@Test
@deprecated("This test has been deprecated and will be removed in a future release.", "0.11.0.0")
def testDescribeNonExistingGroup() {
TestUtils.createOffsetsTopic(zkUtils, servers)
createOldConsumer()
- val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect, "--describe", "--group", "missing.group"))
- consumerGroupService = new ZkConsumerGroupService(opts)
+ val cgcArgs = Array("--zookeeper", zkConnect, "--describe", "--group", "missing.group")
+ val consumerGroupService = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => consumerGroupService.describeGroup()._2.isEmpty, "Expected no rows in describe group results.")
}
@Test
+ def testDescribeSimpleConsumerGroup() {
+ // Ensure that the offsets of consumers which don't use group management are still displayed
+
+ TestUtils.createOffsetsTopic(zkUtils, servers)
+ val topic2 = "foo2"
+ AdminUtils.createTopic(zkUtils, topic2, 2, 1)
+ addSimpleGroupExecutor(Seq(new TopicPartition(topic2, 0), new TopicPartition(topic2, 1)))
+
+ val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
+ val service = getConsumerGroupService(cgcArgs)
+
+ TestUtils.waitUntilTrue(() => {
+ val (state, assignments) = service.describeGroup()
+ println(assignments.get.map(x => (x.topic, x.partition)))
+ state.contains("Empty") && assignments.isDefined && assignments.get.count(_.group == group) == 2
+ }, "Expected two partition assignment results in describe group state result.")
+ }
+
+ @Test
@deprecated("This test has been deprecated and will be removed in a future release.", "0.11.0.0")
def testDescribeExistingGroup() {
TestUtils.createOffsetsTopic(zkUtils, servers)
createOldConsumer()
- val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect, "--describe", "--group", group))
- consumerGroupService = new ZkConsumerGroupService(opts)
+ val consumerGroupService = getConsumerGroupService(Array("--zookeeper", zkConnect, "--describe", "--group", group))
TestUtils.waitUntilTrue(() => {
val (_, assignments) = consumerGroupService.describeGroup()
assignments.isDefined &&
@@ -99,8 +72,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
def testDescribeExistingGroupWithNoMembers() {
TestUtils.createOffsetsTopic(zkUtils, servers)
createOldConsumer()
- val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect, "--describe", "--group", group))
- consumerGroupService = new ZkConsumerGroupService(opts)
+ val consumerGroupService = getConsumerGroupService(Array("--zookeeper", zkConnect, "--describe", "--group", group))
TestUtils.waitUntilTrue(() => {
val (_, assignments) = consumerGroupService.describeGroup()
@@ -108,7 +80,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
assignments.get.count(_.group == group) == 1 &&
assignments.get.filter(_.group == group).head.consumerId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE)
}, "Expected rows and a consumer id column in describe group results.")
- oldConsumers.head.stop()
+ stopRandomOldConsumer()
TestUtils.waitUntilTrue(() => {
val (_, assignments) = consumerGroupService.describeGroup()
@@ -124,8 +96,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
TestUtils.createOffsetsTopic(zkUtils, servers)
createOldConsumer()
createOldConsumer()
- val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect, "--describe", "--group", group))
- consumerGroupService = new ZkConsumerGroupService(opts)
+ val consumerGroupService = getConsumerGroupService(Array("--zookeeper", zkConnect, "--describe", "--group", group))
TestUtils.waitUntilTrue(() => {
val (_, assignments) = consumerGroupService.describeGroup()
assignments.isDefined &&
@@ -139,12 +110,11 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
def testDescribeNonExistingGroupWithNewConsumer() {
TestUtils.createOffsetsTopic(zkUtils, servers)
// run one consumer in the group consuming from a single-partition topic
- consumerGroupExecutor = new ConsumerGroupExecutor(brokerList, 1, group, topic)
+ addConsumerGroupExecutor(numConsumers = 1)
// note the group to be queried is a different (non-existing) group
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", "missing.group")
- val opts = new ConsumerGroupCommandOptions(cgcArgs)
- consumerGroupService = new KafkaConsumerGroupService(opts)
+ val consumerGroupService = getConsumerGroupService(cgcArgs)
val (state, assignments) = consumerGroupService.describeGroup()
assertTrue("Expected the state to be 'Dead' with no members in the group.", state == Some("Dead") && assignments == Some(List()))
@@ -154,11 +124,10 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
def testDescribeExistingGroupWithNewConsumer() {
TestUtils.createOffsetsTopic(zkUtils, servers)
// run one consumer in the group consuming from a single-partition topic
- consumerGroupExecutor = new ConsumerGroupExecutor(brokerList, 1, group, topic)
+ addConsumerGroupExecutor(numConsumers = 1)
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
- val opts = new ConsumerGroupCommandOptions(cgcArgs)
- consumerGroupService = new KafkaConsumerGroupService(opts)
+ val consumerGroupService = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
val (state, assignments) = consumerGroupService.describeGroup()
@@ -175,11 +144,9 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
def testDescribeExistingGroupWithNoMembersWithNewConsumer() {
TestUtils.createOffsetsTopic(zkUtils, servers)
// run one consumer in the group consuming from a single-partition topic
- consumerGroupExecutor = new ConsumerGroupExecutor(brokerList, 1, group, topic)
-
+ val consumerGroupExecutor = addConsumerGroupExecutor(numConsumers = 1)
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
- val opts = new ConsumerGroupCommandOptions(cgcArgs)
- consumerGroupService = new KafkaConsumerGroupService(opts)
+ val consumerGroupService = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
val (state, _) = consumerGroupService.describeGroup()
@@ -214,11 +181,10 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
def testDescribeConsumersWithNoAssignedPartitionsWithNewConsumer() {
TestUtils.createOffsetsTopic(zkUtils, servers)
// run two consumers in the group consuming from a single-partition topic
- consumerGroupExecutor = new ConsumerGroupExecutor(brokerList, 2, group, topic)
+ addConsumerGroupExecutor(numConsumers = 2)
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
- val opts = new ConsumerGroupCommandOptions(cgcArgs)
- consumerGroupService = new KafkaConsumerGroupService(opts)
+ val consumerGroupService = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
val (state, assignments) = consumerGroupService.describeGroup()
@@ -237,11 +203,10 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
AdminUtils.createTopic(zkUtils, topic2, 2, 1)
// run two consumers in the group consuming from a two-partition topic
- consumerGroupExecutor = new ConsumerGroupExecutor(brokerList, 2, group, topic2)
+ addConsumerGroupExecutor(numConsumers = 2, topic = topic2)
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
- val opts = new ConsumerGroupCommandOptions(cgcArgs)
- consumerGroupService = new KafkaConsumerGroupService(opts)
+ val consumerGroupService = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
val (state, assignments) = consumerGroupService.describeGroup()
@@ -259,12 +224,11 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
// complete before the timeout expires
// run one consumer in the group consuming from a single-partition topic
- consumerGroupExecutor = new ConsumerGroupExecutor(brokerList, 1, group, topic)
+ addConsumerGroupExecutor(numConsumers = 1)
// set the group initialization timeout too low for the group to stabilize
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", "group", "--timeout", "1")
- val opts = new ConsumerGroupCommandOptions(cgcArgs)
- consumerGroupService = new KafkaConsumerGroupService(opts)
+ val consumerGroupService = getConsumerGroupService(cgcArgs)
try {
consumerGroupService.describeGroup()
@@ -274,59 +238,4 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
}
}
- @deprecated("This test has been deprecated and will be removed in a future release.", "0.11.1.0")
- private def createOldConsumer(): Unit = {
- val consumerProps = new Properties
- consumerProps.setProperty("group.id", group)
- consumerProps.setProperty("zookeeper.connect", zkConnect)
- oldConsumers += new OldConsumer(Whitelist(topic), consumerProps)
- }
-}
-
-
-class ConsumerThread(broker: String, id: Int, groupId: String, topic: String) extends Runnable {
- val props = new Properties
- props.put("bootstrap.servers", broker)
- props.put("group.id", groupId)
- props.put("key.deserializer", classOf[StringDeserializer].getName)
- props.put("value.deserializer", classOf[StringDeserializer].getName)
- val consumer = new KafkaConsumer(props)
-
- def run() {
- try {
- consumer.subscribe(Collections.singleton(topic))
- while (true)
- consumer.poll(Long.MaxValue)
- } catch {
- case _: WakeupException => // OK
- } finally {
- consumer.close()
- }
- }
-
- def shutdown() {
- consumer.wakeup()
- }
-}
-
-
-class ConsumerGroupExecutor(broker: String, numConsumers: Int, groupId: String, topic: String) {
- val executor: ExecutorService = Executors.newFixedThreadPool(numConsumers)
- private val consumers = new ArrayBuffer[ConsumerThread]()
- for (i <- 1 to numConsumers) {
- val consumer = new ConsumerThread(broker, i, groupId, topic)
- consumers += consumer
- executor.submit(consumer)
- }
-
- def shutdown() {
- consumers.foreach(_.shutdown)
- executor.shutdown()
- try {
- executor.awaitTermination(5000, TimeUnit.MILLISECONDS)
- } catch {
- case e: InterruptedException =>
- e.printStackTrace()
- }
- }
}
diff --git a/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
index 6727fad..13dccbe 100644
--- a/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
@@ -18,53 +18,21 @@ package kafka.admin
import java.util.Properties
-import org.easymock.EasyMock
-import org.junit.Before
import org.junit.Test
-
import kafka.admin.ConsumerGroupCommand.ConsumerGroupCommandOptions
import kafka.admin.ConsumerGroupCommand.ZkConsumerGroupService
-import kafka.consumer.OldConsumer
-import kafka.consumer.Whitelist
-import kafka.integration.KafkaServerTestHarness
-import kafka.server.KafkaConfig
+import kafka.consumer.{OldConsumer, Whitelist}
import kafka.utils.TestUtils
+import org.easymock.EasyMock
+class ListConsumerGroupTest extends ConsumerGroupCommandTest {
-class ListConsumerGroupTest extends KafkaServerTestHarness {
-
- val overridingProps = new Properties()
- val topic = "foo"
- val topicFilter = Whitelist(topic)
- val group = "test.group"
- val props = new Properties
-
- // configure the servers and clients
- override def generateConfigs =
- TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps))
-
- @Before
- override def setUp() {
- super.setUp()
-
- AdminUtils.createTopic(zkUtils, topic, 1, 1)
+ @Test
+ def testListOldConsumerGroups() {
+ val topicFilter = Whitelist(topic)
+ val props = new Properties
props.setProperty("group.id", group)
props.setProperty("zookeeper.connect", zkConnect)
- }
-
- @Test
- def testListGroupWithNoExistingGroup() {
- val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect))
- val consumerGroupCommand = new ZkConsumerGroupService(opts)
- try {
- assert(consumerGroupCommand.listGroups().isEmpty)
- } finally {
- consumerGroupCommand.close()
- }
- }
-
- @Test
- def testListGroupWithSomeGroups() {
// mocks
val consumer1Mock = EasyMock.createMockBuilder(classOf[OldConsumer]).withConstructor(topicFilter, props).createMock()
props.setProperty("group.id", "some.other.group")
@@ -80,13 +48,42 @@ class ListConsumerGroupTest extends KafkaServerTestHarness {
// action/test
TestUtils.waitUntilTrue(() => {
- val groups = consumerGroupCommand.listGroups()
- groups.size == 2 && groups.contains(group)
- }, "Expected a different list group results.")
+ val groups = consumerGroupCommand.listGroups()
+ groups.size == 2 && groups.contains(group)
+ }, "Expected a different list group results.")
// cleanup
consumerGroupCommand.close()
consumer1Mock.stop()
consumer2Mock.stop()
}
+
+ @Test
+ def testListGroupWithNoExistingGroup() {
+ val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect))
+ val consumerGroupCommand = new ZkConsumerGroupService(opts)
+ try {
+ assert(consumerGroupCommand.listGroups().isEmpty)
+ } finally {
+ consumerGroupCommand.close()
+ }
+ }
+
+ @Test
+ def testListConsumerGroups() {
+ val simpleGroup = "simple-group"
+ addSimpleGroupExecutor(group = simpleGroup)
+ addConsumerGroupExecutor(numConsumers = 1)
+
+ val cgcArgs = Array("--bootstrap-server", brokerList, "--list")
+ val service = getConsumerGroupService(cgcArgs)
+
+ val expectedGroups = Set(group, simpleGroup)
+ var foundGroups = Set.empty[String]
+ TestUtils.waitUntilTrue(() => {
+ foundGroups = service.listGroups().toSet
+ expectedGroups == foundGroups
+ }, s"Expected --list to show groups $expectedGroups, but found $foundGroups.")
+ }
+
}
diff --git a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
index 6853b16..50356ff 100644
--- a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
@@ -16,13 +16,46 @@ import java.io.{BufferedWriter, File, FileWriter}
import java.text.{ParseException, SimpleDateFormat}
import java.util.{Calendar, Date, Properties}
-import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, KafkaConsumerGroupService}
-import kafka.integration.KafkaServerTestHarness
+import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, ConsumerGroupService, KafkaConsumerGroupService}
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
-import org.junit.{After, Before, Test}
+import org.junit.Assert._
+import org.junit.Test
-import scala.collection.mutable.ArrayBuffer
+class TimeConversionTests {
+
+ @Test
+ def testDateTimeFormats() {
+ //check valid formats
+ invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"))
+ invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ"))
+ invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX"))
+ invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXX"))
+ invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX"))
+
+ //check some invalid formats
+ try {
+ invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss"))
+ fail("Call to getDateTime should fail")
+ } catch {
+ case _: ParseException =>
+ }
+
+ try {
+ invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.X"))
+ fail("Call to getDateTime should fail")
+ } catch {
+ case _: ParseException =>
+ }
+ }
+
+ private def invokeGetDateTimeMethod(format: SimpleDateFormat) {
+ val checkpoint = new Date()
+ val timestampString = format.format(checkpoint)
+ ConsumerGroupCommand.convertTimestamp(timestampString)
+ }
+
+}
/**
* Test cases by:
@@ -34,46 +67,27 @@ import scala.collection.mutable.ArrayBuffer
* - scope=topics+partitions, scenario=to-earliest
* - export/import
*/
-class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
-
+class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
+
val overridingProps = new Properties()
val topic1 = "foo1"
val topic2 = "foo2"
- val group = "test.group"
- val props = new Properties
- val consumerGroupServices = new ArrayBuffer[KafkaConsumerGroupService]
- val executors = new ArrayBuffer[ConsumerGroupExecutor]
/**
* Implementations must override this method to return a set of KafkaConfigs. This method will be invoked for every
* test and should not reuse previous configurations unless they select their ports randomly when servers are started.
*/
- override def generateConfigs: Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps))
-
- @Before
- override def setUp() {
- super.setUp()
-
- props.setProperty("group.id", group)
- }
-
- @After
- override def tearDown() {
- try {
- executors.foreach(_.shutdown())
- consumerGroupServices.foreach(_.close())
- } finally {
- super.tearDown()
- }
- }
+ override def generateConfigs: Seq[KafkaConfig] = {
+ TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown = false)
+ .map(KafkaConfig.fromProps(_, overridingProps))
+ }
@Test
def testResetOffsetsNotExistingGroup() {
- createConsumerGroupExecutor(brokerList, 1, group, topic1)
+ addConsumerGroupExecutor(numConsumers = 1, topic1)
val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", "missing.group", "--all-topics", "--to-current")
- val opts = new ConsumerGroupCommandOptions(cgcArgs)
- val consumerGroupCommand = createConsumerGroupService(opts)
+ val consumerGroupCommand = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
val assignmentsToReset = consumerGroupCommand.resetOffsets()
@@ -113,10 +127,8 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
TestUtils.produceMessages(servers, topic1, 100, acks = 1, 100 * 1000)
val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics")
- val opts = new ConsumerGroupCommandOptions(cgcArgs)
- val consumerGroupCommand = createConsumerGroupService(opts)
-
- val executor = createConsumerGroupExecutor(brokerList, 1, group, topic1)
+ val consumerGroupCommand = getConsumerGroupService(cgcArgs)
+ val executor = addConsumerGroupExecutor(numConsumers = 1, topic1)
TestUtils.waitUntilTrue(() => {
val (_, assignmentsOption) = consumerGroupCommand.describeGroup()
@@ -134,8 +146,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
executor.shutdown()
val cgcArgs1 = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-datetime", format.format(calendar.getTime), "--execute")
- val opts1 = new ConsumerGroupCommandOptions(cgcArgs1)
- val consumerGroupCommand1 = createConsumerGroupService(opts1)
+ val consumerGroupCommand1 = getConsumerGroupService(cgcArgs1)
TestUtils.waitUntilTrue(() => {
val assignmentsToReset = consumerGroupCommand1.resetOffsets()
@@ -158,10 +169,8 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
TestUtils.produceMessages(servers, topic1, 50, acks = 1, 100 * 1000)
val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics")
- val opts = new ConsumerGroupCommandOptions(cgcArgs)
- val consumerGroupCommand = createConsumerGroupService(opts)
-
- val executor = createConsumerGroupExecutor(brokerList, 1, group, topic1)
+ val consumerGroupCommand = getConsumerGroupService(cgcArgs)
+ val executor = addConsumerGroupExecutor(numConsumers = 1, topic1)
TestUtils.waitUntilTrue(() => {
val (_, assignmentsOption) = consumerGroupCommand.describeGroup()
@@ -178,8 +187,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
executor.shutdown()
val cgcArgs1 = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-datetime", format.format(checkpoint), "--execute")
- val opts1 = new ConsumerGroupCommandOptions(cgcArgs1)
- val consumerGroupCommand1 = createConsumerGroupService(opts1)
+ val consumerGroupCommand1 = getConsumerGroupService(cgcArgs1)
TestUtils.waitUntilTrue(() => {
val assignmentsToReset = consumerGroupCommand1.resetOffsets()
@@ -192,43 +200,9 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
}
@Test
- def testDateTimeFormats() {
- //check valid formats
- invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"))
- invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ"))
- invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX"))
- invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXX"))
- invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX"))
-
- //check some invalid formats
- try {
- invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss"))
- fail("Call to getDateTime should fail")
- } catch {
- case _: ParseException =>
- }
-
- try {
- invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.X"))
- fail("Call to getDateTime should fail")
- } catch {
- case _: ParseException =>
- }
- }
-
- private def invokeGetDateTimeMethod(format: SimpleDateFormat) {
- val checkpoint = new Date()
- val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-datetime", format.format(checkpoint), "--execute")
- val opts = new ConsumerGroupCommandOptions(cgcArgs)
- val consumerGroupCommand = createConsumerGroupService(opts)
- consumerGroupCommand.getDateTime
- }
-
- @Test
def testResetOffsetsByDuration() {
- val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--by-duration", "PT1M", "--execute")
- val opts = new ConsumerGroupCommandOptions(cgcArgs)
- val consumerGroupCommand = createConsumerGroupService(opts)
+ val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--by-duration", "PT1M")
+ val consumerGroupCommand = getConsumerGroupService(cgcArgs)
AdminUtils.createTopic(zkUtils, topic1, 1, 1)
@@ -247,8 +221,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
@Test
def testResetOffsetsByDurationToEarliest() {
val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--by-duration", "PT0.1S", "--execute")
- val opts = new ConsumerGroupCommandOptions(cgcArgs)
- val consumerGroupCommand = createConsumerGroupService(opts)
+ val consumerGroupCommand = getConsumerGroupService(cgcArgs)
AdminUtils.createTopic(zkUtils, topic1, 1, 1)
@@ -266,8 +239,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
@Test
def testResetOffsetsToEarliest() {
val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-earliest", "--execute")
- val opts = new ConsumerGroupCommandOptions(cgcArgs)
- val consumerGroupCommand = createConsumerGroupService(opts)
+ val consumerGroupCommand = getConsumerGroupService(cgcArgs)
AdminUtils.createTopic(zkUtils, topic1, 1, 1)
@@ -285,8 +257,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
@Test
def testResetOffsetsToLatest() {
val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-latest", "--execute")
- val opts = new ConsumerGroupCommandOptions(cgcArgs)
- val consumerGroupCommand = createConsumerGroupService(opts)
+ val consumerGroupCommand = getConsumerGroupService(cgcArgs)
AdminUtils.createTopic(zkUtils, topic1, 1, 1)
@@ -307,8 +278,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
@Test
def testResetOffsetsToCurrentOffset() {
val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-current", "--execute")
- val opts = new ConsumerGroupCommandOptions(cgcArgs)
- val consumerGroupCommand = createConsumerGroupService(opts)
+ val consumerGroupCommand = getConsumerGroupService(cgcArgs)
AdminUtils.createTopic(zkUtils, topic1, 1, 1)
@@ -325,9 +295,9 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
AdminUtils.deleteTopic(zkUtils, topic1)
}
- private def produceConsumeAndShutdown(consumerGroupCommand: KafkaConsumerGroupService, numConsumers: Int, topic: String, totalMessages: Int) {
+ private def produceConsumeAndShutdown(consumerGroupCommand: ConsumerGroupService, numConsumers: Int, topic: String, totalMessages: Int) {
TestUtils.produceMessages(servers, topic, totalMessages, acks = 1, 100 * 1000)
- val executor = createConsumerGroupExecutor(brokerList, numConsumers, group, topic)
+ val executor = addConsumerGroupExecutor(numConsumers, topic)
TestUtils.waitUntilTrue(() => {
val (_, assignmentsOption) = consumerGroupCommand.describeGroup()
@@ -348,8 +318,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
@Test
def testResetOffsetsToSpecificOffset() {
val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-offset", "1", "--execute")
- val opts = new ConsumerGroupCommandOptions(cgcArgs)
- val consumerGroupCommand = createConsumerGroupService(opts)
+ val consumerGroupCommand = getConsumerGroupService(cgcArgs)
AdminUtils.createTopic(zkUtils, topic1, 1, 1)
@@ -368,8 +337,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
@Test
def testResetOffsetsShiftPlus() {
val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--shift-by", "50", "--execute")
- val opts = new ConsumerGroupCommandOptions(cgcArgs)
- val consumerGroupCommand = createConsumerGroupService(opts)
+ val consumerGroupCommand = getConsumerGroupService(cgcArgs)
AdminUtils.createTopic(zkUtils, topic1, 1, 1)
@@ -389,8 +357,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
@Test
def testResetOffsetsShiftMinus() {
val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--shift-by", "-50", "--execute")
- val opts = new ConsumerGroupCommandOptions(cgcArgs)
- val consumerGroupCommand = createConsumerGroupService(opts)
+ val consumerGroupCommand = getConsumerGroupService(cgcArgs)
AdminUtils.createTopic(zkUtils, topic1, 1, 1)
@@ -411,8 +378,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
@Test
def testResetOffsetsShiftByLowerThanEarliest() {
val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--shift-by", "-150", "--execute")
- val opts = new ConsumerGroupCommandOptions(cgcArgs)
- val consumerGroupCommand = createConsumerGroupService(opts)
+ val consumerGroupCommand = getConsumerGroupService(cgcArgs)
AdminUtils.createTopic(zkUtils, topic1, 1, 1)
@@ -432,8 +398,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
@Test
def testResetOffsetsShiftByHigherThanLatest() {
val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--shift-by", "150", "--execute")
- val opts = new ConsumerGroupCommandOptions(cgcArgs)
- val consumerGroupCommand = createConsumerGroupService(opts)
+ val consumerGroupCommand = getConsumerGroupService(cgcArgs)
AdminUtils.createTopic(zkUtils, topic1, 1, 1)
@@ -453,8 +418,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
@Test
def testResetOffsetsToEarliestOnOneTopic() {
val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--topic", topic1, "--to-earliest", "--execute")
- val opts = new ConsumerGroupCommandOptions(cgcArgs)
- val consumerGroupCommand = createConsumerGroupService(opts)
+ val consumerGroupCommand = getConsumerGroupService(cgcArgs)
AdminUtils.createTopic(zkUtils, topic1, 1, 1)
@@ -472,8 +436,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
@Test
def testResetOffsetsToEarliestOnOneTopicAndPartition() {
val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--topic", String.format("%s:1", topic1), "--to-earliest", "--execute")
- val opts = new ConsumerGroupCommandOptions(cgcArgs)
- val consumerGroupCommand = createConsumerGroupService(opts)
+ val consumerGroupCommand = getConsumerGroupService(cgcArgs)
AdminUtils.createTopic(zkUtils, topic1, 2, 1)
@@ -495,8 +458,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
"--topic", topic1,
"--topic", topic2,
"--to-earliest", "--execute")
- val opts = new ConsumerGroupCommandOptions(cgcArgs)
- val consumerGroupCommand = createConsumerGroupService(opts)
+ val consumerGroupCommand = getConsumerGroupService(cgcArgs)
AdminUtils.createTopic(zkUtils, topic1, 1, 1)
AdminUtils.createTopic(zkUtils, topic2, 1, 1)
@@ -522,8 +484,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
"--topic", String.format("%s:1", topic1),
"--topic", String.format("%s:1", topic2),
"--to-earliest", "--execute")
- val opts = new ConsumerGroupCommandOptions(cgcArgs)
- val consumerGroupCommand = createConsumerGroupService(opts)
+ val consumerGroupCommand = getConsumerGroupService(cgcArgs)
AdminUtils.createTopic(zkUtils, topic1, 2, 1)
AdminUtils.createTopic(zkUtils, topic2, 2, 1)
@@ -545,8 +506,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
@Test
def testResetOffsetsExportImportPlan() {
val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-offset","2", "--export")
- val opts = new ConsumerGroupCommandOptions(cgcArgs)
- val consumerGroupCommand = createConsumerGroupService(opts)
+ val consumerGroupCommand = getConsumerGroupService(cgcArgs)
AdminUtils.createTopic(zkUtils, topic1, 2, 1)
@@ -562,10 +522,8 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
assignmentsToReset.exists { assignment => assignment._2.offset() == 2 } && file.exists()
}, "Expected the consume all messages and save reset offsets plan to file")
-
val cgcArgsExec = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--from-file", file.getCanonicalPath)
- val optsExec = new ConsumerGroupCommandOptions(cgcArgsExec)
- val consumerGroupCommandExec = createConsumerGroupService(optsExec)
+ val consumerGroupCommandExec = getConsumerGroupService(cgcArgsExec)
TestUtils.waitUntilTrue(() => {
val assignmentsToReset = consumerGroupCommandExec.resetOffsets()
@@ -588,15 +546,4 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
ConsumerGroupCommand.main(cgcArgs)
}
- private def createConsumerGroupExecutor(brokerList: String, numConsumers: Int, groupId: String, topic: String): ConsumerGroupExecutor = {
- val executor = new ConsumerGroupExecutor(brokerList, numConsumers, groupId, topic)
- executors += executor
- executor
- }
-
- private def createConsumerGroupService(opts: ConsumerGroupCommandOptions): KafkaConsumerGroupService = {
- val service = new KafkaConsumerGroupService(opts)
- consumerGroupServices += service
- service
- }
}
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 7075c63..aa44d14 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -777,6 +777,36 @@ class GroupCoordinatorTest extends JUnitSuite {
}
@Test
+ def testCommitOffsetsAfterGroupIsEmpty(): Unit = {
+ // Tests the scenario where the reset offset tool modifies the offsets
+ // of a group after it becomes empty
+
+ // A group member joins
+ val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+ val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+ val assignedMemberId = joinGroupResult.memberId
+ val joinGroupError = joinGroupResult.error
+ assertEquals(Errors.NONE, joinGroupError)
+
+ // and leaves.
+ EasyMock.reset(replicaManager)
+ val leaveGroupResult = leaveGroup(groupId, assignedMemberId)
+ assertEquals(Errors.NONE, leaveGroupResult)
+
+ // The simple offset commit should now fail
+ EasyMock.reset(replicaManager)
+ val tp = new TopicPartition("topic", 0)
+ val offset = OffsetAndMetadata(0)
+ val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID,
+ OffsetCommitRequest.DEFAULT_GENERATION_ID, immutable.Map(tp -> offset))
+ assertEquals(Errors.NONE, commitOffsetResult(tp))
+
+ val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
+ assertEquals(Errors.NONE, error)
+ assertEquals(Some(0), partitionData.get(tp).map(_.offset))
+ }
+
+ @Test
def testFetchOffsets() {
val tp = new TopicPartition("topic", 0)
val offset = OffsetAndMetadata(0)
@@ -866,7 +896,7 @@ class GroupCoordinatorTest extends JUnitSuite {
groupCoordinator.handleTxnCompletion(producerId, List(offsetsTopic), TransactionResult.COMMIT)
val (thirdReqError, thirdReqPartitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
- assertEquals(Errors.NONE, secondReqError)
+ assertEquals(Errors.NONE, thirdReqError)
assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), thirdReqPartitionData.get(tp).map(_.offset))
}
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 78a5eaa..3bdffbd 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -30,7 +30,7 @@ import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.{IsolationLevel, OffsetFetchResponse}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.easymock.{Capture, EasyMock, IAnswer}
-import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
+import org.junit.Assert.{assertEquals, assertFalse, assertTrue, assertNull}
import org.junit.{Before, Test}
import java.nio.ByteBuffer
@@ -111,6 +111,41 @@ class GroupMetadataManagerTest {
}
@Test
+ def testLoadEmptyGroupWithOffsets() {
+ val groupMetadataTopicPartition = groupTopicPartition
+ val generation = 15
+ val protocolType = "consumer"
+ val startOffset = 15L
+ val committedOffsets = Map(
+ new TopicPartition("foo", 0) -> 23L,
+ new TopicPartition("foo", 1) -> 455L,
+ new TopicPartition("bar", 0) -> 8992L
+ )
+
+ val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
+ val groupMetadataRecord = buildEmptyGroupRecord(generation, protocolType)
+ val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
+ offsetCommitRecords ++ Seq(groupMetadataRecord): _*)
+
+ expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
+
+ EasyMock.replay(replicaManager)
+
+ groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ())
+
+ val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded into the cache"))
+ assertEquals(groupId, group.groupId)
+ assertEquals(Empty, group.currentState)
+ assertEquals(generation, group.generationId)
+ assertEquals(Some(protocolType), group.protocolType)
+ assertNull(group.leaderOrNull)
+ assertNull(group.protocolOrNull)
+ committedOffsets.foreach { case (topicPartition, offset) =>
+ assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
+ }
+ }
+
+ @Test
def testLoadTransactionalOffsetsWithoutGroup() {
val groupMetadataTopicPartition = groupTopicPartition
val producerId = 1000L
@@ -524,6 +559,9 @@ class GroupMetadataManagerTest {
@Test
def testLoadOffsetsAndGroup() {
val groupMetadataTopicPartition = groupTopicPartition
+ val generation = 935
+ val protocolType = "consumer"
+ val protocol = "range"
val startOffset = 15L
val committedOffsets = Map(
new TopicPartition("foo", 0) -> 23L,
@@ -533,7 +571,7 @@ class GroupMetadataManagerTest {
val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
val memberId = "98098230493"
- val groupMetadataRecord = buildStableGroupRecordWithMember(memberId)
+ val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId)
val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
offsetCommitRecords ++ Seq(groupMetadataRecord): _*)
@@ -546,7 +584,10 @@ class GroupMetadataManagerTest {
val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded into the cache"))
assertEquals(groupId, group.groupId)
assertEquals(Stable, group.currentState)
- assertEquals(memberId, group.leaderId)
+ assertEquals(memberId, group.leaderOrNull)
+ assertEquals(generation, group.generationId)
+ assertEquals(Some(protocolType), group.protocolType)
+ assertEquals(protocol, group.protocolOrNull)
assertEquals(Set(memberId), group.allMembers)
assertEquals(committedOffsets.size, group.allOffsets.size)
committedOffsets.foreach { case (topicPartition, offset) =>
@@ -558,9 +599,9 @@ class GroupMetadataManagerTest {
def testLoadGroupWithTombstone() {
val groupMetadataTopicPartition = groupTopicPartition
val startOffset = 15L
-
val memberId = "98098230493"
- val groupMetadataRecord = buildStableGroupRecordWithMember(memberId)
+ val groupMetadataRecord = buildStableGroupRecordWithMember(generation = 15,
+ protocolType = "consumer", protocol = "range", memberId)
val groupMetadataTombstone = new SimpleRecord(GroupMetadataManager.groupMetadataKey(groupId), null)
val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
Seq(groupMetadataRecord, groupMetadataTombstone): _*)
@@ -581,6 +622,9 @@ class GroupMetadataManagerTest {
// 2. a "simple" consumer (i.e. not a consumer group) then uses the same groupId to commit some offsets
val groupMetadataTopicPartition = groupTopicPartition
+ val generation = 293
+ val protocolType = "consumer"
+ val protocol = "range"
val startOffset = 15L
val committedOffsets = Map(
@@ -590,7 +634,7 @@ class GroupMetadataManagerTest {
)
val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
val memberId = "98098230493"
- val groupMetadataRecord = buildStableGroupRecordWithMember(memberId)
+ val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId)
val groupMetadataTombstone = new SimpleRecord(GroupMetadataManager.groupMetadataKey(groupId), null)
val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
Seq(groupMetadataRecord, groupMetadataTombstone) ++ offsetCommitRecords: _*)
@@ -612,6 +656,9 @@ class GroupMetadataManagerTest {
@Test
def testLoadGroupAndOffsetsFromDifferentSegments(): Unit = {
+ val generation = 293
+ val protocolType = "consumer"
+ val protocol = "range"
val startOffset = 15L
val tp0 = new TopicPartition("foo", 0)
val tp1 = new TopicPartition("foo", 1)
@@ -624,13 +671,15 @@ class GroupMetadataManagerTest {
val segment1MemberId = "a"
val segment1Offsets = Map(tp0 -> 23L, tp1 -> 455L, tp3 -> 42L)
val segment1Records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
- createCommittedOffsetRecords(segment1Offsets) ++ Seq(buildStableGroupRecordWithMember(segment1MemberId)): _*)
+ createCommittedOffsetRecords(segment1Offsets) ++ Seq(buildStableGroupRecordWithMember(
+ generation, protocolType, protocol, segment1MemberId)): _*)
val segment1End = expectGroupMetadataLoad(logMock, startOffset, segment1Records)
val segment2MemberId = "b"
val segment2Offsets = Map(tp0 -> 33L, tp2 -> 8992L, tp3 -> 10L)
val segment2Records = MemoryRecords.withRecords(segment1End, CompressionType.NONE,
- createCommittedOffsetRecords(segment2Offsets) ++ Seq(buildStableGroupRecordWithMember(segment2MemberId)): _*)
+ createCommittedOffsetRecords(segment2Offsets) ++ Seq(buildStableGroupRecordWithMember(
+ generation, protocolType, protocol, segment2MemberId)): _*)
val segment2End = expectGroupMetadataLoad(logMock, segment1End, segment2Records)
EasyMock.expect(replicaManager.getLogEndOffset(groupTopicPartition)).andStubReturn(Some(segment2End))
@@ -643,7 +692,7 @@ class GroupMetadataManagerTest {
assertEquals(groupId, group.groupId)
assertEquals(Stable, group.currentState)
- assertEquals("segment2 group record member should be elected", segment2MemberId, group.leaderId)
+ assertEquals("segment2 group record member should be elected", segment2MemberId, group.leaderOrNull)
assertEquals("segment2 group record member should be only member", Set(segment2MemberId), group.allMembers)
// offsets of segment1 should be overridden by segment2 offsets of the same topic partitions
@@ -654,20 +703,22 @@ class GroupMetadataManagerTest {
}
}
-
@Test
def testAddGroup() {
- val group = new GroupMetadata("foo")
+ val group = new GroupMetadata("foo", initialState = Empty)
assertEquals(group, groupMetadataManager.addGroup(group))
- assertEquals(group, groupMetadataManager.addGroup(new GroupMetadata("foo")))
+ assertEquals(group, groupMetadataManager.addGroup(new GroupMetadata("foo", initialState = Empty)))
}
@Test
def testStoreEmptyGroup() {
- val group = new GroupMetadata(groupId)
+ val generation = 27
+ val protocolType = "consumer"
+
+ val group = GroupMetadata.loadGroup(groupId, Empty, generation, protocolType, null, null, Seq.empty)
groupMetadataManager.addGroup(group)
- expectAppendMessage(Errors.NONE)
+ val capturedRecords = expectAppendMessage(Errors.NONE)
EasyMock.replay(replicaManager)
var maybeError: Option[Errors] = None
@@ -677,6 +728,45 @@ class GroupMetadataManagerTest {
groupMetadataManager.storeGroup(group, Map.empty, callback)
assertEquals(Some(Errors.NONE), maybeError)
+ assertTrue(capturedRecords.hasCaptured)
+ val records = capturedRecords.getValue()(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId))
+ .records.asScala.toList
+ assertEquals(1, records.size)
+
+ val record = records.head
+ val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, record.value)
+ assertTrue(groupMetadata.is(Empty))
+ assertEquals(generation, groupMetadata.generationId)
+ assertEquals(Some(protocolType), groupMetadata.protocolType)
+ }
+
+ @Test
+ def testStoreEmptySimpleGroup() {
+ val group = new GroupMetadata(groupId, initialState = Empty)
+ groupMetadataManager.addGroup(group)
+
+ val capturedRecords = expectAppendMessage(Errors.NONE)
+ EasyMock.replay(replicaManager)
+
+ var maybeError: Option[Errors] = None
+ def callback(error: Errors) {
+ maybeError = Some(error)
+ }
+
+ groupMetadataManager.storeGroup(group, Map.empty, callback)
+ assertEquals(Some(Errors.NONE), maybeError)
+ assertTrue(capturedRecords.hasCaptured)
+
+ assertTrue(capturedRecords.hasCaptured)
+ val records = capturedRecords.getValue()(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId))
+ .records.asScala.toList
+ assertEquals(1, records.size)
+
+ val record = records.head
+ val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, record.value)
+ assertTrue(groupMetadata.is(Empty))
+ assertEquals(0, groupMetadata.generationId)
+ assertEquals(None, groupMetadata.protocolType)
}
@Test
@@ -695,7 +785,7 @@ class GroupMetadataManagerTest {
private def assertStoreGroupErrorMapping(appendError: Errors, expectedError: Errors) {
EasyMock.reset(replicaManager)
- val group = new GroupMetadata(groupId)
+ val group = new GroupMetadata(groupId, initialState = Empty)
groupMetadataManager.addGroup(group)
expectAppendMessage(appendError)
@@ -718,7 +808,7 @@ class GroupMetadataManagerTest {
val clientId = "clientId"
val clientHost = "localhost"
- val group = new GroupMetadata(groupId)
+ val group = new GroupMetadata(groupId, initialState = Empty)
groupMetadataManager.addGroup(group)
val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
@@ -749,7 +839,7 @@ class GroupMetadataManagerTest {
val clientId = "clientId"
val clientHost = "localhost"
- val group = new GroupMetadata(groupId)
+ val group = new GroupMetadata(groupId, initialState = Empty)
val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
protocolType, List(("protocol", Array[Byte]())))
@@ -779,7 +869,7 @@ class GroupMetadataManagerTest {
groupMetadataManager.addPartitionOwnership(groupPartitionId)
- val group = new GroupMetadata(groupId)
+ val group = new GroupMetadata(groupId, initialState = Empty)
groupMetadataManager.addGroup(group)
val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
@@ -821,7 +911,7 @@ class GroupMetadataManagerTest {
groupMetadataManager.addPartitionOwnership(groupPartitionId)
- val group = new GroupMetadata(groupId)
+ val group = new GroupMetadata(groupId, initialState = Empty)
groupMetadataManager.addGroup(group)
val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
@@ -861,7 +951,7 @@ class GroupMetadataManagerTest {
groupMetadataManager.addPartitionOwnership(groupPartitionId)
- val group = new GroupMetadata(groupId)
+ val group = new GroupMetadata(groupId, initialState = Empty)
groupMetadataManager.addGroup(group)
val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
@@ -900,7 +990,7 @@ class GroupMetadataManagerTest {
groupMetadataManager.addPartitionOwnership(groupPartitionId)
- val group = new GroupMetadata(groupId)
+ val group = new GroupMetadata(groupId, initialState = Empty)
groupMetadataManager.addGroup(group)
val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
@@ -938,7 +1028,7 @@ class GroupMetadataManagerTest {
groupMetadataManager.addPartitionOwnership(groupPartitionId)
- val group = new GroupMetadata(groupId)
+ val group = new GroupMetadata(groupId, initialState = Empty)
groupMetadataManager.addGroup(group)
val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
@@ -980,7 +1070,7 @@ class GroupMetadataManagerTest {
groupMetadataManager.addPartitionOwnership(groupPartitionId)
- val group = new GroupMetadata(groupId)
+ val group = new GroupMetadata(groupId, initialState = Empty)
groupMetadataManager.addGroup(group)
val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
@@ -1018,7 +1108,7 @@ class GroupMetadataManagerTest {
groupMetadataManager.addPartitionOwnership(groupPartitionId)
- val group = new GroupMetadata(groupId)
+ val group = new GroupMetadata(groupId, initialState = Empty)
groupMetadataManager.addGroup(group)
// expire the offset after 1 millisecond
@@ -1071,7 +1161,7 @@ class GroupMetadataManagerTest {
groupMetadataManager.addPartitionOwnership(groupPartitionId)
- val group = new GroupMetadata(groupId)
+ val group = new GroupMetadata(groupId, initialState = Empty)
groupMetadataManager.addGroup(group)
group.generationId = 5
@@ -1119,7 +1209,7 @@ class GroupMetadataManagerTest {
groupMetadataManager.addPartitionOwnership(groupPartitionId)
- val group = new GroupMetadata(groupId)
+ val group = new GroupMetadata(groupId, initialState = Empty)
groupMetadataManager.addGroup(group)
group.generationId = 5
@@ -1173,7 +1263,7 @@ class GroupMetadataManagerTest {
groupMetadataManager.addPartitionOwnership(groupPartitionId)
- val group = new GroupMetadata(groupId)
+ val group = new GroupMetadata(groupId, initialState = Empty)
groupMetadataManager.addGroup(group)
// expire the offset after 1 millisecond
@@ -1244,7 +1334,7 @@ class GroupMetadataManagerTest {
groupMetadataManager.addPartitionOwnership(groupPartitionId)
- val group = new GroupMetadata(groupId)
+ val group = new GroupMetadata(groupId, initialState = Empty)
groupMetadataManager.addGroup(group)
val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
@@ -1314,40 +1404,47 @@ class GroupMetadataManagerTest {
capturedArgument
}
- private def expectAppendMessage(error: Errors) {
- val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
+ private def expectAppendMessage(error: Errors): Capture[Map[TopicPartition, MemoryRecords]] = {
+ val capturedCallback: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
+ val capturedRecords: Capture[Map[TopicPartition, MemoryRecords]] = EasyMock.newCapture()
EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
EasyMock.anyShort(),
internalTopicsAllowed = EasyMock.eq(true),
isFromClient = EasyMock.eq(false),
- EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
- EasyMock.capture(capturedArgument),
+ EasyMock.capture(capturedRecords),
+ EasyMock.capture(capturedCallback),
EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]],
EasyMock.anyObject())
).andAnswer(new IAnswer[Unit] {
- override def answer = capturedArgument.getValue.apply(
+ override def answer = capturedCallback.getValue.apply(
Map(groupTopicPartition ->
new PartitionResponse(error, 0L, RecordBatch.NO_TIMESTAMP, 0L)
)
)})
EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
+ capturedRecords
}
- private def buildStableGroupRecordWithMember(memberId: String): SimpleRecord = {
- val group = new GroupMetadata(groupId)
- group.transitionTo(PreparingRebalance)
- val memberProtocols = List(("roundrobin", Array.emptyByteArray))
- val member = new MemberMetadata(memberId, groupId, "clientId", "clientHost", 30000, 10000, "consumer", memberProtocols)
- group.add(member)
- member.awaitingJoinCallback = _ => {}
- group.initNextGeneration()
- group.transitionTo(Stable)
-
+ private def buildStableGroupRecordWithMember(generation: Int,
+ protocolType: String,
+ protocol: String,
+ memberId: String): SimpleRecord = {
+ val memberProtocols = List((protocol, Array.emptyByteArray))
+ val member = new MemberMetadata(memberId, groupId, "clientId", "clientHost", 30000, 10000, protocolType, memberProtocols)
+ val group = GroupMetadata.loadGroup(groupId, Stable, generation, protocolType, protocol,
+ leaderId = memberId, Seq(member))
val groupMetadataKey = GroupMetadataManager.groupMetadataKey(groupId)
val groupMetadataValue = GroupMetadataManager.groupMetadataValue(group, Map(memberId -> Array.empty[Byte]))
new SimpleRecord(groupMetadataKey, groupMetadataValue)
}
+ private def buildEmptyGroupRecord(generation: Int, protocolType: String): SimpleRecord = {
+ val group = GroupMetadata.loadGroup(groupId, Empty, generation, protocolType, null, null, Seq.empty)
+ val groupMetadataKey = GroupMetadataManager.groupMetadataKey(groupId)
+ val groupMetadataValue = GroupMetadataManager.groupMetadataValue(group, Map.empty)
+ new SimpleRecord(groupMetadataKey, groupMetadataValue)
+ }
+
private def expectGroupMetadataLoad(groupMetadataTopicPartition: TopicPartition,
startOffset: Long,
records: MemoryRecords): Unit = {
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
index 2db6603..871a2d3 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
@@ -40,7 +40,7 @@ class GroupMetadataTest extends JUnitSuite {
@Before
def setUp() {
- group = new GroupMetadata("groupId")
+ group = new GroupMetadata("groupId", initialState = Empty)
}
@Test
@@ -271,25 +271,25 @@ class GroupMetadataTest extends JUnitSuite {
group.add(member)
assertEquals(0, group.generationId)
- assertNull(group.protocol)
+ assertNull(group.protocolOrNull)
group.initNextGeneration()
assertEquals(1, group.generationId)
- assertEquals("roundrobin", group.protocol)
+ assertEquals("roundrobin", group.protocolOrNull)
}
@Test
def testInitNextGenerationEmptyGroup() {
assertEquals(Empty, group.currentState)
assertEquals(0, group.generationId)
- assertNull(group.protocol)
+ assertNull(group.protocolOrNull)
group.transitionTo(PreparingRebalance)
group.initNextGeneration()
assertEquals(1, group.generationId)
- assertNull(group.protocol)
+ assertNull(group.protocolOrNull)
}
@Test
--
To stop receiving notification emails like this one, please contact
jgus@apache.org.