You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/04/11 23:47:16 UTC
[kafka] branch trunk updated: KAFKA-6773;
Allow offset commit/fetch/describe/delete with empty groupId (#4851)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7421f9d KAFKA-6773; Allow offset commit/fetch/describe/delete with empty groupId (#4851)
7421f9d is described below
commit 7421f9dce2d2e763f70602b13df1efcd32f691b9
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Wed Apr 11 16:47:11 2018 -0700
KAFKA-6773; Allow offset commit/fetch/describe/delete with empty groupId (#4851)
We had a regression in #4788 which caused the offset commit/fetch/describe APIs to fail if the groupId was empty. This should be allowed for backwards compatibility. Additionally, I have modified DeleteGroups to allow removal of the empty group, which was missed in the initial implementation. I've added a test case to ensure that we do not miss this again in the future.
Reviewers: Ismael Juma <is...@juma.me.uk>
---
.../kafka/coordinator/group/GroupCoordinator.scala | 43 +++++----
.../kafka/admin/DeleteConsumerGroupsTest.scala | 26 ------
.../coordinator/group/GroupCoordinatorTest.scala | 100 ++++++++++++++-------
3 files changed, 96 insertions(+), 73 deletions(-)
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 225b709..cbbd913 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -27,7 +27,7 @@ import kafka.utils.Logging
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.internals.Topic
-import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.RecordBatch.{NO_PRODUCER_EPOCH, NO_PRODUCER_ID}
import org.apache.kafka.common.requests._
import org.apache.kafka.common.utils.Time
@@ -109,7 +109,7 @@ class GroupCoordinator(val brokerId: Int,
protocolType: String,
protocols: List[(String, Array[Byte])],
responseCallback: JoinCallback): Unit = {
- validateGroup(groupId).foreach { error =>
+ validateGroupStatus(groupId, ApiKeys.JOIN_GROUP).foreach { error =>
responseCallback(joinError(memberId, error))
return
}
@@ -237,7 +237,7 @@ class GroupCoordinator(val brokerId: Int,
memberId: String,
groupAssignment: Map[String, Array[Byte]],
responseCallback: SyncCallback): Unit = {
- validateGroup(groupId) match {
+ validateGroupStatus(groupId, ApiKeys.SYNC_GROUP) match {
case Some(error) if error == Errors.COORDINATOR_LOAD_IN_PROGRESS =>
// The coordinator is loading, which means we've lost the state of the active rebalance and the
// group will need to start over at JoinGroup. By returning rebalance in progress, the consumer
@@ -313,7 +313,7 @@ class GroupCoordinator(val brokerId: Int,
}
def handleLeaveGroup(groupId: String, memberId: String, responseCallback: Errors => Unit): Unit = {
- validateGroup(groupId).foreach { error =>
+ validateGroupStatus(groupId, ApiKeys.LEAVE_GROUP).foreach { error =>
responseCallback(error)
return
}
@@ -346,7 +346,7 @@ class GroupCoordinator(val brokerId: Int,
var groupsEligibleForDeletion: Seq[GroupMetadata] = Seq()
groupIds.foreach { groupId =>
- validateGroup(groupId) match {
+ validateGroupStatus(groupId, ApiKeys.DELETE_GROUPS) match {
case Some(error) =>
groupErrors += groupId -> error
@@ -386,7 +386,7 @@ class GroupCoordinator(val brokerId: Int,
memberId: String,
generationId: Int,
responseCallback: Errors => Unit) {
- validateGroup(groupId).foreach { error =>
+ validateGroupStatus(groupId, ApiKeys.HEARTBEAT).foreach { error =>
if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS)
// the group is still loading, so respond just blindly
responseCallback(Errors.NONE)
@@ -448,7 +448,7 @@ class GroupCoordinator(val brokerId: Int,
producerEpoch: Short,
offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
responseCallback: immutable.Map[TopicPartition, Errors] => Unit): Unit = {
- validateGroup(groupId) match {
+ validateGroupStatus(groupId, ApiKeys.TXN_OFFSET_COMMIT) match {
case Some(error) => responseCallback(offsetMetadata.mapValues(_ => error))
case None =>
val group = groupManager.getGroup(groupId).getOrElse {
@@ -463,7 +463,7 @@ class GroupCoordinator(val brokerId: Int,
generationId: Int,
offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
responseCallback: immutable.Map[TopicPartition, Errors] => Unit) {
- validateGroup(groupId) match {
+ validateGroupStatus(groupId, ApiKeys.OFFSET_COMMIT) match {
case Some(error) => responseCallback(offsetMetadata.mapValues(_ => error))
case None =>
groupManager.getGroup(groupId) match {
@@ -524,7 +524,7 @@ class GroupCoordinator(val brokerId: Int,
def handleFetchOffsets(groupId: String, partitions: Option[Seq[TopicPartition]] = None):
(Errors, Map[TopicPartition, OffsetFetchResponse.PartitionData]) = {
- validateGroup(groupId) match {
+ validateGroupStatus(groupId, ApiKeys.OFFSET_FETCH) match {
case Some(error) => error -> Map.empty
case None =>
// return offsets blindly regardless the current group state since the group may be using
@@ -543,7 +543,7 @@ class GroupCoordinator(val brokerId: Int,
}
def handleDescribeGroup(groupId: String): (Errors, GroupSummary) = {
- validateGroup(groupId) match {
+ validateGroupStatus(groupId, ApiKeys.DESCRIBE_GROUPS) match {
case Some(error) => (error, GroupCoordinator.EmptyGroup)
case None =>
groupManager.getGroup(groupId) match {
@@ -563,8 +563,23 @@ class GroupCoordinator(val brokerId: Int,
info(s"Removed $offsetsRemoved offsets associated with deleted partitions: ${topicPartitions.mkString(", ")}.")
}
- private def validateGroup(groupId: String): Option[Errors] = {
- if (!validGroupId(groupId))
+ private def isValidGroupId(groupId: String, api: ApiKeys): Boolean = {
+ api match {
+ case ApiKeys.OFFSET_COMMIT | ApiKeys.OFFSET_FETCH | ApiKeys.DESCRIBE_GROUPS | ApiKeys.DELETE_GROUPS =>
+ // For backwards compatibility, we support the offset commit APIs for the empty groupId, and also
+ // in DescribeGroups and DeleteGroups so that users can view and delete state of all groups.
+ groupId != null
+ case _ =>
+ // The remaining APIs are groups using Kafka for group coordination and must have a non-empty groupId
+ groupId != null && !groupId.isEmpty
+ }
+ }
+
+ /**
+ * Check that the groupId is valid, assigned to this coordinator and that the group has been loaded.
+ */
+ private def validateGroupStatus(groupId: String, api: ApiKeys): Option[Errors] = {
+ if (!isValidGroupId(groupId, api))
Some(Errors.INVALID_GROUP_ID)
else if (!isActive.get)
Some(Errors.COORDINATOR_NOT_AVAILABLE)
@@ -648,10 +663,6 @@ class GroupCoordinator(val brokerId: Int,
}
}
- private def validGroupId(groupId: String): Boolean = {
- groupId != null && !groupId.isEmpty
- }
-
private def joinError(memberId: String, error: Errors): JoinGroupResult = {
JoinGroupResult(
members = Map.empty,
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala
index effa55d..4cc2837 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala
@@ -60,32 +60,6 @@ class DeleteConsumerGroupTest extends ConsumerGroupCommandTest {
}
@Test
- def testDeleteCmdInvalidGroupId() {
- TestUtils.createOffsetsTopic(zkClient, servers)
- val invalidGroupId = ""
-
- val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", invalidGroupId)
- val service = getConsumerGroupService(cgcArgs)
-
- val output = TestUtils.grabConsoleOutput(service.deleteGroups())
- assertTrue(s"The expected error (${Errors.INVALID_GROUP_ID}) was not detected while deleting consumer group",
- output.contains(s"Group '$invalidGroupId' could not be deleted due to: ${Errors.INVALID_GROUP_ID.toString}"))
- }
-
- @Test
- def testDeleteInvalidGroupId() {
- TestUtils.createOffsetsTopic(zkClient, servers)
- val invalidGroupId = ""
-
- val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", invalidGroupId)
- val service = getConsumerGroupService(cgcArgs)
-
- val result = service.deleteGroups()
- assertTrue(s"The expected error (${Errors.INVALID_GROUP_ID}) was not detected while deleting consumer group",
- result.size == 1 && result.keySet.contains(invalidGroupId) && result.get(invalidGroupId).contains(Errors.INVALID_GROUP_ID))
- }
-
- @Test
def testDeleteCmdNonEmptyGroup() {
TestUtils.createOffsetsTopic(zkClient, servers)
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 08c13eb..933e91b 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -37,7 +37,7 @@ import org.junit.Assert._
import org.junit.{After, Assert, Before, Test}
import org.scalatest.junit.JUnitSuite
-import scala.collection._
+import scala.collection.mutable
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future, Promise, TimeoutException}
@@ -138,7 +138,7 @@ class GroupCoordinatorTest extends JUnitSuite {
val topicPartition = new TopicPartition("foo", 0)
var offsetCommitErrors = Map.empty[TopicPartition, Errors]
groupCoordinator.handleCommitOffsets(otherGroupId, memberId, 1,
- immutable.Map(topicPartition -> OffsetAndMetadata(15L)), result => { offsetCommitErrors = result })
+ Map(topicPartition -> OffsetAndMetadata(15L)), result => { offsetCommitErrors = result })
assertEquals(Some(Errors.COORDINATOR_LOAD_IN_PROGRESS), offsetCommitErrors.get(topicPartition))
// Heartbeat
@@ -155,7 +155,7 @@ class GroupCoordinatorTest extends JUnitSuite {
assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, listGroupsError)
// DeleteGroups
- val deleteGroupsErrors = groupCoordinator.handleDeleteGroups(immutable.Set(otherGroupId))
+ val deleteGroupsErrors = groupCoordinator.handleDeleteGroups(Set(otherGroupId))
assertEquals(Some(Errors.COORDINATOR_LOAD_IN_PROGRESS), deleteGroupsErrors.get(otherGroupId))
// Check that non-loading groups are still accessible
@@ -452,7 +452,7 @@ class GroupCoordinatorTest extends JUnitSuite {
timer.advanceClock(sessionTimeout / 2)
EasyMock.reset(replicaManager)
- val commitOffsetResult = commitOffsets(groupId, assignedConsumerId, generationId, immutable.Map(tp -> offset))
+ val commitOffsetResult = commitOffsets(groupId, assignedConsumerId, generationId, Map(tp -> offset))
assertEquals(Errors.NONE, commitOffsetResult(tp))
timer.advanceClock(sessionTimeout / 2 + 100)
@@ -820,7 +820,7 @@ class GroupCoordinatorTest extends JUnitSuite {
val tp = new TopicPartition("topic", 0)
val offset = OffsetAndMetadata(0)
- val commitOffsetResult = commitOffsets(groupId, memberId, generationId, immutable.Map(tp -> offset))
+ val commitOffsetResult = commitOffsets(groupId, memberId, generationId, Map(tp -> offset))
assertEquals(Errors.ILLEGAL_GENERATION, commitOffsetResult(tp))
}
@@ -830,7 +830,7 @@ class GroupCoordinatorTest extends JUnitSuite {
val offset = OffsetAndMetadata(0)
val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID,
- OffsetCommitRequest.DEFAULT_GENERATION_ID, immutable.Map(tp -> offset))
+ OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp -> offset))
assertEquals(Errors.NONE, commitOffsetResult(tp))
}
@@ -856,7 +856,7 @@ class GroupCoordinatorTest extends JUnitSuite {
val tp = new TopicPartition("topic", 0)
val offset = OffsetAndMetadata(0)
val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID,
- OffsetCommitRequest.DEFAULT_GENERATION_ID, immutable.Map(tp -> offset))
+ OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp -> offset))
assertEquals(Errors.NONE, commitOffsetResult(tp))
val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
@@ -870,7 +870,7 @@ class GroupCoordinatorTest extends JUnitSuite {
val offset = OffsetAndMetadata(0)
val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID,
- OffsetCommitRequest.DEFAULT_GENERATION_ID, immutable.Map(tp -> offset))
+ OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp -> offset))
assertEquals(Errors.NONE, commitOffsetResult(tp))
val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
@@ -879,13 +879,51 @@ class GroupCoordinatorTest extends JUnitSuite {
}
@Test
+ def testCommitAndFetchOffsetsWithEmptyGroup() {
+ // For backwards compatibility, the coordinator supports committing/fetching offsets with an empty groupId.
+ // To allow inspection and removal of the empty group, we must also support DescribeGroups and DeleteGroups
+
+ val tp = new TopicPartition("topic", 0)
+ val offset = OffsetAndMetadata(0)
+ val groupId = ""
+
+ val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID,
+ OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp -> offset))
+ assertEquals(Errors.NONE, commitOffsetResult(tp))
+
+ val (fetchError, partitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
+ assertEquals(Errors.NONE, fetchError)
+ assertEquals(Some(0), partitionData.get(tp).map(_.offset))
+
+ val (describeError, summary) = groupCoordinator.handleDescribeGroup(groupId)
+ assertEquals(Errors.NONE, describeError)
+ assertEquals(Empty.toString, summary.state)
+
+ val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
+ val partition = EasyMock.niceMock(classOf[Partition])
+
+ EasyMock.reset(replicaManager)
+ EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
+ EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(Some(partition))
+ EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition))
+ EasyMock.replay(replicaManager, partition)
+
+ val deleteErrors = groupCoordinator.handleDeleteGroups(Set(groupId))
+ assertEquals(Errors.NONE, deleteErrors(groupId))
+
+ val (err, data) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
+ assertEquals(Errors.NONE, err)
+ assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), data.get(tp).map(_.offset))
+ }
+
+ @Test
def testBasicFetchTxnOffsets() {
val tp = new TopicPartition("topic", 0)
val offset = OffsetAndMetadata(0)
val producerId = 1000L
val producerEpoch : Short = 2
- val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, producerEpoch, immutable.Map(tp -> offset))
+ val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, producerEpoch, Map(tp -> offset))
assertEquals(Errors.NONE, commitOffsetResult(tp))
val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
@@ -912,7 +950,7 @@ class GroupCoordinatorTest extends JUnitSuite {
val producerId = 1000L
val producerEpoch : Short = 2
- val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, producerEpoch, immutable.Map(tp -> offset))
+ val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, producerEpoch, Map(tp -> offset))
assertEquals(Errors.NONE, commitOffsetResult(tp))
val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
@@ -936,7 +974,7 @@ class GroupCoordinatorTest extends JUnitSuite {
val producerId = 1000L
val producerEpoch : Short = 2
- val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, producerEpoch, immutable.Map(tp -> offset))
+ val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, producerEpoch, Map(tp -> offset))
assertEquals(Errors.NONE, commitOffsetResult(tp))
val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
@@ -975,16 +1013,16 @@ class GroupCoordinatorTest extends JUnitSuite {
groupCoordinator.groupManager.addPartitionOwnership(offsetTopicPartitions(1).partition)
val errors = mutable.ArrayBuffer[Errors]()
- val partitionData = mutable.ArrayBuffer[Map[TopicPartition, OffsetFetchResponse.PartitionData]]()
+ val partitionData = mutable.ArrayBuffer[scala.collection.Map[TopicPartition, OffsetFetchResponse.PartitionData]]()
val commitOffsetResults = mutable.ArrayBuffer[CommitOffsetCallbackParams]()
// Ensure that the two groups map to different partitions.
assertNotEquals(offsetTopicPartitions(0), offsetTopicPartitions(1))
- commitOffsetResults.append(commitTransactionalOffsets(groupId, producerId, producerEpoch, immutable.Map(partitions(0) -> offsets(0))))
+ commitOffsetResults.append(commitTransactionalOffsets(groupId, producerId, producerEpoch, Map(partitions(0) -> offsets(0))))
assertEquals(Errors.NONE, commitOffsetResults(0)(partitions(0)))
- commitOffsetResults.append(commitTransactionalOffsets(otherGroupId, producerId, producerEpoch, immutable.Map(partitions(1) -> offsets(1))))
+ commitOffsetResults.append(commitTransactionalOffsets(otherGroupId, producerId, producerEpoch, Map(partitions(1) -> offsets(1))))
assertEquals(Errors.NONE, commitOffsetResults(1)(partitions(1)))
// We got a commit for only one __consumer_offsets partition. We should only materialize it's group offsets.
@@ -1051,16 +1089,16 @@ class GroupCoordinatorTest extends JUnitSuite {
val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupCoordinator.partitionFor(groupId))
val errors = mutable.ArrayBuffer[Errors]()
- val partitionData = mutable.ArrayBuffer[Map[TopicPartition, OffsetFetchResponse.PartitionData]]()
+ val partitionData = mutable.ArrayBuffer[scala.collection.Map[TopicPartition, OffsetFetchResponse.PartitionData]]()
val commitOffsetResults = mutable.ArrayBuffer[CommitOffsetCallbackParams]()
// producer0 commits the offsets for partition0
- commitOffsetResults.append(commitTransactionalOffsets(groupId, producerIds(0), producerEpochs(0), immutable.Map(partitions(0) -> offsets(0))))
+ commitOffsetResults.append(commitTransactionalOffsets(groupId, producerIds(0), producerEpochs(0), Map(partitions(0) -> offsets(0))))
assertEquals(Errors.NONE, commitOffsetResults(0)(partitions(0)))
// producer1 commits the offsets for partition1
- commitOffsetResults.append(commitTransactionalOffsets(groupId, producerIds(1), producerEpochs(1), immutable.Map(partitions(1) -> offsets(1))))
+ commitOffsetResults.append(commitTransactionalOffsets(groupId, producerIds(1), producerEpochs(1), Map(partitions(1) -> offsets(1))))
assertEquals(Errors.NONE, commitOffsetResults(1)(partitions(1)))
// producer0 commits its transaction.
@@ -1123,7 +1161,7 @@ class GroupCoordinatorTest extends JUnitSuite {
assertEquals((Errors.NONE, Map.empty), groupCoordinator.handleFetchOffsets(groupId))
val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID,
- OffsetCommitRequest.DEFAULT_GENERATION_ID, immutable.Map(tp1 -> offset1, tp2 -> offset2, tp3 -> offset3))
+ OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp1 -> offset1, tp2 -> offset2, tp3 -> offset3))
assertEquals(Errors.NONE, commitOffsetResult(tp1))
assertEquals(Errors.NONE, commitOffsetResult(tp2))
assertEquals(Errors.NONE, commitOffsetResult(tp3))
@@ -1150,7 +1188,7 @@ class GroupCoordinatorTest extends JUnitSuite {
assertEquals(Errors.NONE, joinGroupError)
EasyMock.reset(replicaManager)
- val commitOffsetResult = commitOffsets(groupId, assignedMemberId, generationId, immutable.Map(tp -> offset))
+ val commitOffsetResult = commitOffsets(groupId, assignedMemberId, generationId, Map(tp -> offset))
assertEquals(Errors.REBALANCE_IN_PROGRESS, commitOffsetResult(tp))
}
@@ -1332,20 +1370,20 @@ class GroupCoordinatorTest extends JUnitSuite {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
joinGroup(groupId, memberId, protocolType, protocols)
- val result = groupCoordinator.handleDeleteGroups(Set(groupId).toSet)
+ val result = groupCoordinator.handleDeleteGroups(Set(groupId))
assert(result.size == 1 && result.contains(groupId) && result.get(groupId).contains(Errors.NON_EMPTY_GROUP))
}
@Test
def testDeleteGroupWithInvalidGroupId() {
- val invalidGroupId = ""
- val result = groupCoordinator.handleDeleteGroups(Set(invalidGroupId).toSet)
+ val invalidGroupId = null
+ val result = groupCoordinator.handleDeleteGroups(Set(invalidGroupId))
assert(result.size == 1 && result.contains(invalidGroupId) && result.get(invalidGroupId).contains(Errors.INVALID_GROUP_ID))
}
@Test
def testDeleteGroupWithWrongCoordinator() {
- val result = groupCoordinator.handleDeleteGroups(Set(otherGroupId).toSet)
+ val result = groupCoordinator.handleDeleteGroups(Set(otherGroupId))
assert(result.size == 1 && result.contains(otherGroupId) && result.get(otherGroupId).contains(Errors.NOT_COORDINATOR))
}
@@ -1367,7 +1405,7 @@ class GroupCoordinatorTest extends JUnitSuite {
EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition))
EasyMock.replay(replicaManager, partition)
- val result = groupCoordinator.handleDeleteGroups(Set(groupId).toSet)
+ val result = groupCoordinator.handleDeleteGroups(Set(groupId))
assert(result.size == 1 && result.contains(groupId) && result.get(groupId).contains(Errors.NONE))
}
@@ -1388,7 +1426,7 @@ class GroupCoordinatorTest extends JUnitSuite {
EasyMock.reset(replicaManager)
val tp = new TopicPartition("topic", 0)
val offset = OffsetAndMetadata(0)
- val commitOffsetResult = commitOffsets(groupId, assignedMemberId, joinGroupResult.generationId, immutable.Map(tp -> offset))
+ val commitOffsetResult = commitOffsets(groupId, assignedMemberId, joinGroupResult.generationId, Map(tp -> offset))
assertEquals(Errors.NONE, commitOffsetResult(tp))
val describeGroupResult = groupCoordinator.handleDescribeGroup(groupId)
@@ -1408,7 +1446,7 @@ class GroupCoordinatorTest extends JUnitSuite {
EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition))
EasyMock.replay(replicaManager, partition)
- val result = groupCoordinator.handleDeleteGroups(Set(groupId).toSet)
+ val result = groupCoordinator.handleDeleteGroups(Set(groupId))
assert(result.size == 1 && result.contains(groupId) && result.get(groupId).contains(Errors.NONE))
assertEquals(Dead.toString, groupCoordinator.handleDescribeGroup(groupId)._2.state)
@@ -1535,7 +1573,7 @@ class GroupCoordinatorTest extends JUnitSuite {
assignment: Map[String, Array[Byte]]): Future[SyncGroupCallbackParams] = {
val (responseFuture, responseCallback) = setupSyncGroupCallback
- val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
+ val capturedArgument: Capture[scala.collection.Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
EasyMock.anyShort(),
@@ -1616,10 +1654,10 @@ class GroupCoordinatorTest extends JUnitSuite {
private def commitOffsets(groupId: String,
consumerId: String,
generationId: Int,
- offsets: immutable.Map[TopicPartition, OffsetAndMetadata]): CommitOffsetCallbackParams = {
+ offsets: Map[TopicPartition, OffsetAndMetadata]): CommitOffsetCallbackParams = {
val (responseFuture, responseCallback) = setupCommitOffsetsCallback
- val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
+ val capturedArgument: Capture[scala.collection.Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
EasyMock.anyShort(),
@@ -1646,10 +1684,10 @@ class GroupCoordinatorTest extends JUnitSuite {
private def commitTransactionalOffsets(groupId: String,
producerId: Long,
producerEpoch: Short,
- offsets: immutable.Map[TopicPartition, OffsetAndMetadata]): CommitOffsetCallbackParams = {
+ offsets: Map[TopicPartition, OffsetAndMetadata]): CommitOffsetCallbackParams = {
val (responseFuture, responseCallback) = setupCommitOffsetsCallback
- val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
+ val capturedArgument: Capture[scala.collection.Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
EasyMock.anyShort(),
--
To stop receiving notification emails like this one, please contact
jgus@apache.org.