You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/10/21 21:08:46 UTC
[2/8] kafka git commit: KAFKA-2464: client-side assignment for new
consumer
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/test/scala/other/kafka/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestOffsetManager.scala b/core/src/test/scala/other/kafka/TestOffsetManager.scala
index 549a96b..a77979a 100644
--- a/core/src/test/scala/other/kafka/TestOffsetManager.scala
+++ b/core/src/test/scala/other/kafka/TestOffsetManager.scala
@@ -135,8 +135,8 @@ object TestOffsetManager {
val id = random.nextInt().abs % numGroups
val group = "group-" + id
try {
- metadataChannel.send(ConsumerMetadataRequest(group))
- val coordinatorId = ConsumerMetadataResponse.readFrom(metadataChannel.receive().payload()).coordinatorOpt.map(_.id).getOrElse(-1)
+ metadataChannel.send(GroupMetadataRequest(group))
+ val coordinatorId = GroupMetadataResponse.readFrom(metadataChannel.receive().payload()).coordinatorOpt.map(_.id).getOrElse(-1)
val channel = if (channels.contains(coordinatorId))
channels(coordinatorId)
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
index e2a75e2..1266598 100644
--- a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
@@ -35,18 +35,18 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
private val HostsString = Hosts.mkString(AclCommand.Delimiter.toString)
private val TopicResources = Set(new Resource(Topic, "test-1"), new Resource(Topic, "test-2"))
- private val ConsumerGroupResources = Set(new Resource(ConsumerGroup, "testGroup-1"), new Resource(ConsumerGroup, "testGroup-2"))
+ private val GroupResources = Set(new Resource(Group, "testGroup-1"), new Resource(Group, "testGroup-2"))
private val ResourceToCommand = Map[Set[Resource], Array[String]](
TopicResources -> Array("--topic", "test-1,test-2"),
Set(Resource.ClusterResource) -> Array("--cluster"),
- ConsumerGroupResources -> Array("--consumer-group", "testGroup-1,testGroup-2")
+ GroupResources -> Array("--group", "testGroup-1,testGroup-2")
)
private val ResourceToOperations = Map[Set[Resource], (Set[Operation], Array[String])](
TopicResources -> (Set(Read, Write, Describe), Array("--operations", "Read,Write,Describe")),
Set(Resource.ClusterResource) -> (Set(Create, ClusterAction), Array("--operations", "Create,ClusterAction")),
- ConsumerGroupResources -> (Set(Read).toSet[Operation], Array("--operations", "Read"))
+ GroupResources -> (Set(Read).toSet[Operation], Array("--operations", "Read"))
)
private val ProducerResourceToAcls = Map[Set[Resource], Set[Acl]](
@@ -56,7 +56,7 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
private val ConsumerResourceToAcls = Map[Set[Resource], Set[Acl]](
TopicResources -> AclCommand.getAcls(Users, Allow, Set(Read, Describe), Hosts),
- ConsumerGroupResources -> AclCommand.getAcls(Users, Allow, Set(Read), Hosts)
+ GroupResources -> AclCommand.getAcls(Users, Allow, Set(Read), Hosts)
)
private val CmdToResourcesToAcl = Map[Array[String], Map[Set[Resource], Set[Acl]]](
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
index cab4813..820a825 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
@@ -24,7 +24,7 @@ import kafka.zk.ZooKeeperTestHarness
import kafka.server.ConfigType
import kafka.admin.TopicCommand.TopicCommandOptions
import kafka.utils.ZkUtils._
-import kafka.coordinator.ConsumerCoordinator
+import kafka.coordinator.GroupCoordinator
class TopicCommandTest extends ZooKeeperTestHarness with Logging {
@@ -85,12 +85,12 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging {
// create the offset topic
val createOffsetTopicOpts = new TopicCommandOptions(Array("--partitions", numPartitionsOriginal.toString,
"--replication-factor", "1",
- "--topic", ConsumerCoordinator.OffsetsTopicName))
+ "--topic", GroupCoordinator.OffsetsTopicName))
TopicCommand.createTopic(zkUtils, createOffsetTopicOpts)
// try to delete the OffsetManager.OffsetsTopicName and make sure it doesn't
- val deleteOffsetTopicOpts = new TopicCommandOptions(Array("--topic", ConsumerCoordinator.OffsetsTopicName))
- val deleteOffsetTopicPath = getDeleteTopicPath(ConsumerCoordinator.OffsetsTopicName)
+ val deleteOffsetTopicOpts = new TopicCommandOptions(Array("--topic", GroupCoordinator.OffsetsTopicName))
+ val deleteOffsetTopicPath = getDeleteTopicPath(GroupCoordinator.OffsetsTopicName)
assertFalse("Delete path for topic shouldn't exist before deletion.", zkUtils.zkClient.exists(deleteOffsetTopicPath))
intercept[AdminOperationException] {
TopicCommand.deleteTopic(zkUtils, deleteOffsetTopicOpts)
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index b7e7967..09e9ce3 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -231,12 +231,12 @@ object SerializationTestUtils {
))
}
- def createConsumerMetadataRequest: ConsumerMetadataRequest = {
- ConsumerMetadataRequest("group 1", clientId = "client 1")
+ def createConsumerMetadataRequest: GroupMetadataRequest = {
+ GroupMetadataRequest("group 1", clientId = "client 1")
}
- def createConsumerMetadataResponse: ConsumerMetadataResponse = {
- ConsumerMetadataResponse(Some(brokers.head.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), ErrorMapping.NoError, 0)
+ def createConsumerMetadataResponse: GroupMetadataResponse = {
+ GroupMetadataResponse(Some(brokers.head.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), ErrorMapping.NoError, 0)
}
def createUpdateMetadataRequest(versionId: Short): UpdateMetadataRequest = {
@@ -276,7 +276,7 @@ class RequestResponseSerializationTest extends JUnitSuite {
private val offsetFetchResponse = SerializationTestUtils.createTestOffsetFetchResponse
private val consumerMetadataRequest = SerializationTestUtils.createConsumerMetadataRequest
private val consumerMetadataResponse = SerializationTestUtils.createConsumerMetadataResponse
- private val consumerMetadataResponseNoCoordinator = ConsumerMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, 0)
+ private val consumerMetadataResponseNoCoordinator = GroupMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, 0)
private val updateMetadataRequestV0 = SerializationTestUtils.createUpdateMetadataRequest(0)
private val updateMetadataRequestV1 = SerializationTestUtils.createUpdateMetadataRequest(1)
private val updateMetdataResponse = SerializationTestUtils.createUpdateMetadataResponse
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
index 2e18e92..24fba45 100644
--- a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
@@ -22,7 +22,7 @@ import org.junit.Assert._
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import kafka.server.OffsetManager
-import kafka.coordinator.ConsumerCoordinator
+import kafka.coordinator.GroupCoordinator
class TopicFilterTest extends JUnitSuite {
@@ -38,8 +38,8 @@ class TopicFilterTest extends JUnitSuite {
val topicFilter2 = new Whitelist(".+")
assertTrue(topicFilter2.isTopicAllowed("alltopics", excludeInternalTopics = true))
- assertFalse(topicFilter2.isTopicAllowed(ConsumerCoordinator.OffsetsTopicName, excludeInternalTopics = true))
- assertTrue(topicFilter2.isTopicAllowed(ConsumerCoordinator.OffsetsTopicName, excludeInternalTopics = false))
+ assertFalse(topicFilter2.isTopicAllowed(GroupCoordinator.OffsetsTopicName, excludeInternalTopics = true))
+ assertTrue(topicFilter2.isTopicAllowed(GroupCoordinator.OffsetsTopicName, excludeInternalTopics = false))
val topicFilter3 = new Whitelist("white_listed-topic.+")
assertTrue(topicFilter3.isTopicAllowed("white_listed-topic1", excludeInternalTopics = true))
@@ -58,8 +58,8 @@ class TopicFilterTest extends JUnitSuite {
assertFalse(topicFilter1.isTopicAllowed("black1", excludeInternalTopics = true))
assertFalse(topicFilter1.isTopicAllowed("black1", excludeInternalTopics = false))
- assertFalse(topicFilter1.isTopicAllowed(ConsumerCoordinator.OffsetsTopicName, excludeInternalTopics = true))
- assertTrue(topicFilter1.isTopicAllowed(ConsumerCoordinator.OffsetsTopicName, excludeInternalTopics = false))
+ assertFalse(topicFilter1.isTopicAllowed(GroupCoordinator.OffsetsTopicName, excludeInternalTopics = true))
+ assertTrue(topicFilter1.isTopicAllowed(GroupCoordinator.OffsetsTopicName, excludeInternalTopics = false))
}
@Test
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala
deleted file mode 100644
index c108955..0000000
--- a/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala
+++ /dev/null
@@ -1,447 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.coordinator
-
-
-import java.util.concurrent.TimeUnit
-
-import org.junit.Assert._
-import kafka.common.{OffsetAndMetadata, TopicAndPartition}
-import kafka.server.{OffsetManager, KafkaConfig}
-import kafka.utils.TestUtils
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.{OffsetCommitRequest, JoinGroupRequest}
-import org.easymock.{IAnswer, EasyMock}
-import org.junit.{After, Before, Test}
-import org.scalatest.junit.JUnitSuite
-
-import scala.concurrent.duration.Duration
-import scala.concurrent.{Await, Future, Promise}
-
-/**
- * Test ConsumerCoordinator responses
- */
-class ConsumerCoordinatorResponseTest extends JUnitSuite {
- type JoinGroupCallbackParams = (Set[TopicAndPartition], String, Int, Short)
- type JoinGroupCallback = (Set[TopicAndPartition], String, Int, Short) => Unit
- type HeartbeatCallbackParams = Short
- type HeartbeatCallback = Short => Unit
- type CommitOffsetCallbackParams = Map[TopicAndPartition, Short]
- type CommitOffsetCallback = Map[TopicAndPartition, Short] => Unit
- type LeaveGroupCallbackParams = Short
- type LeaveGroupCallback = Short => Unit
-
- val ConsumerMinSessionTimeout = 10
- val ConsumerMaxSessionTimeout = 200
- val DefaultSessionTimeout = 100
- var consumerCoordinator: ConsumerCoordinator = null
- var offsetManager : OffsetManager = null
-
- @Before
- def setUp() {
- val props = TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "")
- props.setProperty(KafkaConfig.ConsumerMinSessionTimeoutMsProp, ConsumerMinSessionTimeout.toString)
- props.setProperty(KafkaConfig.ConsumerMaxSessionTimeoutMsProp, ConsumerMaxSessionTimeout.toString)
- offsetManager = EasyMock.createStrictMock(classOf[OffsetManager])
- consumerCoordinator = ConsumerCoordinator.create(KafkaConfig.fromProps(props), null, offsetManager)
- consumerCoordinator.startup()
- }
-
- @After
- def tearDown() {
- EasyMock.reset(offsetManager)
- consumerCoordinator.shutdown()
- }
-
- @Test
- def testJoinGroupWrongCoordinator() {
- val groupId = "groupId"
- val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
- val partitionAssignmentStrategy = "range"
-
- val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = false)
- val joinGroupErrorCode = joinGroupResult._4
- assertEquals(Errors.NOT_COORDINATOR_FOR_CONSUMER.code, joinGroupErrorCode)
- }
-
- @Test
- def testJoinGroupUnknownPartitionAssignmentStrategy() {
- val groupId = "groupId"
- val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
- val partitionAssignmentStrategy = "foo"
-
- val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true)
- val joinGroupErrorCode = joinGroupResult._4
- assertEquals(Errors.UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY.code, joinGroupErrorCode)
- }
-
- @Test
- def testJoinGroupSessionTimeoutTooSmall() {
- val groupId = "groupId"
- val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
- val partitionAssignmentStrategy = "range"
-
- val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, ConsumerMinSessionTimeout - 1, isCoordinatorForGroup = true)
- val joinGroupErrorCode = joinGroupResult._4
- assertEquals(Errors.INVALID_SESSION_TIMEOUT.code, joinGroupErrorCode)
- }
-
- @Test
- def testJoinGroupSessionTimeoutTooLarge() {
- val groupId = "groupId"
- val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
- val partitionAssignmentStrategy = "range"
-
- val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, ConsumerMaxSessionTimeout + 1, isCoordinatorForGroup = true)
- val joinGroupErrorCode = joinGroupResult._4
- assertEquals(Errors.INVALID_SESSION_TIMEOUT.code, joinGroupErrorCode)
- }
-
- @Test
- def testJoinGroupUnknownConsumerNewGroup() {
- val groupId = "groupId"
- val consumerId = "consumerId"
- val partitionAssignmentStrategy = "range"
-
- val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true)
- val joinGroupErrorCode = joinGroupResult._4
- assertEquals(Errors.UNKNOWN_CONSUMER_ID.code, joinGroupErrorCode)
- }
-
- @Test
- def testValidJoinGroup() {
- val groupId = "groupId"
- val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
- val partitionAssignmentStrategy = "range"
-
- val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true)
- val joinGroupErrorCode = joinGroupResult._4
- assertEquals(Errors.NONE.code, joinGroupErrorCode)
- }
-
- @Test
- def testJoinGroupInconsistentPartitionAssignmentStrategy() {
- val groupId = "groupId"
- val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
- val otherConsumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
- val partitionAssignmentStrategy = "range"
- val otherPartitionAssignmentStrategy = "roundrobin"
-
- val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true)
- val joinGroupErrorCode = joinGroupResult._4
- assertEquals(Errors.NONE.code, joinGroupErrorCode)
-
- EasyMock.reset(offsetManager)
- val otherJoinGroupResult = joinGroup(groupId, otherConsumerId, otherPartitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true)
- val otherJoinGroupErrorCode = otherJoinGroupResult._4
- assertEquals(Errors.INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY.code, otherJoinGroupErrorCode)
- }
-
- @Test
- def testJoinGroupUnknownConsumerExistingGroup() {
- val groupId = "groupId"
- val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
- val otherConsumerId = "consumerId"
- val partitionAssignmentStrategy = "range"
-
- val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true)
- val joinGroupErrorCode = joinGroupResult._4
- assertEquals(Errors.NONE.code, joinGroupErrorCode)
-
- EasyMock.reset(offsetManager)
- val otherJoinGroupResult = joinGroup(groupId, otherConsumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true)
- val otherJoinGroupErrorCode = otherJoinGroupResult._4
- assertEquals(Errors.UNKNOWN_CONSUMER_ID.code, otherJoinGroupErrorCode)
- }
-
- @Test
- def testHeartbeatWrongCoordinator() {
- val groupId = "groupId"
- val consumerId = "consumerId"
-
- val heartbeatResult = heartbeat(groupId, consumerId, -1, isCoordinatorForGroup = false)
- assertEquals(Errors.NOT_COORDINATOR_FOR_CONSUMER.code, heartbeatResult)
- }
-
- @Test
- def testHeartbeatUnknownGroup() {
- val groupId = "groupId"
- val consumerId = "consumerId"
-
- val heartbeatResult = heartbeat(groupId, consumerId, -1, isCoordinatorForGroup = true)
- assertEquals(Errors.UNKNOWN_CONSUMER_ID.code, heartbeatResult)
- }
-
- @Test
- def testHeartbeatUnknownConsumerExistingGroup() {
- val groupId = "groupId"
- val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
- val otherConsumerId = "consumerId"
- val partitionAssignmentStrategy = "range"
-
- val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true)
- val joinGroupErrorCode = joinGroupResult._4
- assertEquals(Errors.NONE.code, joinGroupErrorCode)
-
- EasyMock.reset(offsetManager)
- val heartbeatResult = heartbeat(groupId, otherConsumerId, 1, isCoordinatorForGroup = true)
- assertEquals(Errors.UNKNOWN_CONSUMER_ID.code, heartbeatResult)
- }
-
- @Test
- def testHeartbeatIllegalGeneration() {
- val groupId = "groupId"
- val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
- val partitionAssignmentStrategy = "range"
-
- val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true)
- val assignedConsumerId = joinGroupResult._2
- val joinGroupErrorCode = joinGroupResult._4
- assertEquals(Errors.NONE.code, joinGroupErrorCode)
-
- EasyMock.reset(offsetManager)
- val heartbeatResult = heartbeat(groupId, assignedConsumerId, 2, isCoordinatorForGroup = true)
- assertEquals(Errors.ILLEGAL_GENERATION.code, heartbeatResult)
- }
-
- @Test
- def testValidHeartbeat() {
- val groupId = "groupId"
- val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
- val partitionAssignmentStrategy = "range"
-
- val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true)
- val assignedConsumerId = joinGroupResult._2
- val joinGroupErrorCode = joinGroupResult._4
- assertEquals(Errors.NONE.code, joinGroupErrorCode)
-
- EasyMock.reset(offsetManager)
- val heartbeatResult = heartbeat(groupId, assignedConsumerId, 1, isCoordinatorForGroup = true)
- assertEquals(Errors.NONE.code, heartbeatResult)
- }
-
- @Test
- def testCommitOffsetFromUnknownGroup() {
- val groupId = "groupId"
- val consumerId = "consumer"
- val generationId = 1
- val tp = new TopicAndPartition("topic", 0)
- val offset = OffsetAndMetadata(0)
-
- val commitOffsetResult = commitOffsets(groupId, consumerId, generationId, Map(tp -> offset), true)
- assertEquals(Errors.ILLEGAL_GENERATION.code, commitOffsetResult(tp))
- }
-
- @Test
- def testCommitOffsetWithDefaultGeneration() {
- val groupId = "groupId"
- val tp = new TopicAndPartition("topic", 0)
- val offset = OffsetAndMetadata(0)
-
- val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_CONSUMER_ID,
- OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp -> offset), true)
- assertEquals(Errors.NONE.code, commitOffsetResult(tp))
- }
-
- @Test
- def testHeartbeatDuringRebalanceCausesRebalanceInProgress() {
- val groupId = "groupId"
- val partitionAssignmentStrategy = "range"
-
- // First start up a group (with a slightly larger timeout to give us time to heartbeat when the rebalance starts)
- val joinGroupResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_CONSUMER_ID, partitionAssignmentStrategy,
- DefaultSessionTimeout, isCoordinatorForGroup = true)
- val assignedConsumerId = joinGroupResult._2
- val initialGenerationId = joinGroupResult._3
- val joinGroupErrorCode = joinGroupResult._4
- assertEquals(Errors.NONE.code, joinGroupErrorCode)
-
- // Then join with a new consumer to trigger a rebalance
- EasyMock.reset(offsetManager)
- sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_CONSUMER_ID, partitionAssignmentStrategy,
- DefaultSessionTimeout, isCoordinatorForGroup = true)
-
- // We should be in the middle of a rebalance, so the heartbeat should return rebalance in progress
- EasyMock.reset(offsetManager)
- val heartbeatResult = heartbeat(groupId, assignedConsumerId, initialGenerationId, isCoordinatorForGroup = true)
- assertEquals(Errors.REBALANCE_IN_PROGRESS.code, heartbeatResult)
- }
-
- @Test
- def testGenerationIdIncrementsOnRebalance() {
- val groupId = "groupId"
- val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
- val otherConsumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
- val partitionAssignmentStrategy = "range"
-
- val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true)
- val initialGenerationId = joinGroupResult._3
- val joinGroupErrorCode = joinGroupResult._4
- assertEquals(1, initialGenerationId)
- assertEquals(Errors.NONE.code, joinGroupErrorCode)
-
- EasyMock.reset(offsetManager)
- val otherJoinGroupResult = joinGroup(groupId, otherConsumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true)
- val nextGenerationId = otherJoinGroupResult._3
- val otherJoinGroupErrorCode = otherJoinGroupResult._4
- assertEquals(2, nextGenerationId)
- assertEquals(Errors.NONE.code, otherJoinGroupErrorCode)
- }
-
- @Test
- def testLeaveGroupWrongCoordinator() {
- val groupId = "groupId"
- val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
-
- val leaveGroupResult = leaveGroup(groupId, consumerId, isCoordinatorForGroup = false)
- assertEquals(Errors.NOT_COORDINATOR_FOR_CONSUMER.code, leaveGroupResult)
- }
-
- @Test
- def testLeaveGroupUnknownGroup() {
- val groupId = "groupId"
- val consumerId = "consumerId"
-
- val leaveGroupResult = leaveGroup(groupId, consumerId, isCoordinatorForGroup = true)
- assertEquals(Errors.UNKNOWN_CONSUMER_ID.code, leaveGroupResult)
- }
-
- @Test
- def testLeaveGroupUnknownConsumerExistingGroup() {
- val groupId = "groupId"
- val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
- val otherConsumerId = "consumerId"
- val partitionAssignmentStrategy = "range"
-
- val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true)
- val joinGroupErrorCode = joinGroupResult._4
- assertEquals(Errors.NONE.code, joinGroupErrorCode)
-
- EasyMock.reset(offsetManager)
- val leaveGroupResult = leaveGroup(groupId, otherConsumerId, isCoordinatorForGroup = true)
- assertEquals(Errors.UNKNOWN_CONSUMER_ID.code, leaveGroupResult)
- }
-
- @Test
- def testValidLeaveGroup() {
- val groupId = "groupId"
- val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
- val partitionAssignmentStrategy = "range"
-
- val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true)
- val assignedConsumerId = joinGroupResult._2
- val joinGroupErrorCode = joinGroupResult._4
- assertEquals(Errors.NONE.code, joinGroupErrorCode)
-
- EasyMock.reset(offsetManager)
- val leaveGroupResult = leaveGroup(groupId, assignedConsumerId, isCoordinatorForGroup = true)
- assertEquals(Errors.NONE.code, leaveGroupResult)
- }
-
- private def setupJoinGroupCallback: (Future[JoinGroupCallbackParams], JoinGroupCallback) = {
- val responsePromise = Promise[JoinGroupCallbackParams]
- val responseFuture = responsePromise.future
- val responseCallback: JoinGroupCallback = (partitions, consumerId, generationId, errorCode) =>
- responsePromise.success((partitions, consumerId, generationId, errorCode))
- (responseFuture, responseCallback)
- }
-
- private def setupHeartbeatCallback: (Future[HeartbeatCallbackParams], HeartbeatCallback) = {
- val responsePromise = Promise[HeartbeatCallbackParams]
- val responseFuture = responsePromise.future
- val responseCallback: HeartbeatCallback = errorCode => responsePromise.success(errorCode)
- (responseFuture, responseCallback)
- }
-
- private def setupCommitOffsetsCallback: (Future[CommitOffsetCallbackParams], CommitOffsetCallback) = {
- val responsePromise = Promise[CommitOffsetCallbackParams]
- val responseFuture = responsePromise.future
- val responseCallback: CommitOffsetCallback = offsets => responsePromise.success(offsets)
- (responseFuture, responseCallback)
- }
-
- private def setupLeaveGroupCallback: (Future[LeaveGroupCallbackParams], LeaveGroupCallback) = {
- val responsePromise = Promise[LeaveGroupCallbackParams]
- val responseFuture = responsePromise.future
- val responseCallback: LeaveGroupCallback = errorCode => responsePromise.success(errorCode)
- (responseFuture, responseCallback)
- }
-
- private def sendJoinGroup(groupId: String,
- consumerId: String,
- partitionAssignmentStrategy: String,
- sessionTimeout: Int,
- isCoordinatorForGroup: Boolean): Future[JoinGroupCallbackParams] = {
- val (responseFuture, responseCallback) = setupJoinGroupCallback
- EasyMock.expect(offsetManager.partitionFor(groupId)).andReturn(1)
- EasyMock.expect(offsetManager.leaderIsLocal(1)).andReturn(isCoordinatorForGroup)
- EasyMock.replay(offsetManager)
- consumerCoordinator.handleJoinGroup(groupId, consumerId, Set.empty, sessionTimeout, partitionAssignmentStrategy, responseCallback)
- responseFuture
- }
-
- private def joinGroup(groupId: String,
- consumerId: String,
- partitionAssignmentStrategy: String,
- sessionTimeout: Int,
- isCoordinatorForGroup: Boolean): JoinGroupCallbackParams = {
- val responseFuture = sendJoinGroup(groupId, consumerId, partitionAssignmentStrategy, sessionTimeout, isCoordinatorForGroup)
- // should only have to wait as long as session timeout, but allow some extra time in case of an unexpected delay
- Await.result(responseFuture, Duration(sessionTimeout+100, TimeUnit.MILLISECONDS))
- }
-
- private def heartbeat(groupId: String,
- consumerId: String,
- generationId: Int,
- isCoordinatorForGroup: Boolean): HeartbeatCallbackParams = {
- val (responseFuture, responseCallback) = setupHeartbeatCallback
- EasyMock.expect(offsetManager.partitionFor(groupId)).andReturn(1)
- EasyMock.expect(offsetManager.leaderIsLocal(1)).andReturn(isCoordinatorForGroup)
- EasyMock.replay(offsetManager)
- consumerCoordinator.handleHeartbeat(groupId, consumerId, generationId, responseCallback)
- Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
- }
-
- private def commitOffsets(groupId: String,
- consumerId: String,
- generationId: Int,
- offsets: Map[TopicAndPartition, OffsetAndMetadata],
- isCoordinatorForGroup: Boolean): CommitOffsetCallbackParams = {
- val (responseFuture, responseCallback) = setupCommitOffsetsCallback
- EasyMock.expect(offsetManager.partitionFor(groupId)).andReturn(1)
- EasyMock.expect(offsetManager.leaderIsLocal(1)).andReturn(isCoordinatorForGroup)
- val storeOffsetAnswer = new IAnswer[Unit] {
- override def answer = responseCallback.apply(offsets.mapValues(_ => Errors.NONE.code))
- }
- EasyMock.expect(offsetManager.storeOffsets(groupId, consumerId, generationId, offsets, responseCallback))
- .andAnswer(storeOffsetAnswer)
- EasyMock.replay(offsetManager)
- consumerCoordinator.handleCommitOffsets(groupId, consumerId, generationId, offsets, responseCallback)
- Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
- Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
- }
-
- private def leaveGroup(groupId: String, consumerId: String, isCoordinatorForGroup: Boolean): LeaveGroupCallbackParams = {
- val (responseFuture, responseCallback) = setupHeartbeatCallback
- EasyMock.expect(offsetManager.partitionFor(groupId)).andReturn(1)
- EasyMock.expect(offsetManager.leaderIsLocal(1)).andReturn(isCoordinatorForGroup)
- EasyMock.replay(offsetManager)
- consumerCoordinator.handleLeaveGroup(groupId, consumerId, responseCallback)
- Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/test/scala/unit/kafka/coordinator/ConsumerGroupMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/ConsumerGroupMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/ConsumerGroupMetadataTest.scala
deleted file mode 100644
index 5d812c2..0000000
--- a/core/src/test/scala/unit/kafka/coordinator/ConsumerGroupMetadataTest.scala
+++ /dev/null
@@ -1,172 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.coordinator
-
-import org.junit.Assert._
-import org.junit.{Before, Test}
-import org.scalatest.junit.JUnitSuite
-
-/**
- * Test group state transitions
- */
-class ConsumerGroupMetadataTest extends JUnitSuite {
- var group: ConsumerGroupMetadata = null
-
- @Before
- def setUp() {
- group = new ConsumerGroupMetadata("test", "range")
- }
-
- @Test
- def testCanRebalanceWhenStable() {
- assertTrue(group.canRebalance)
- }
-
- @Test
- def testCannotRebalanceWhenPreparingRebalance() {
- group.transitionTo(PreparingRebalance)
- assertFalse(group.canRebalance)
- }
-
- @Test
- def testCannotRebalanceWhenRebalancing() {
- group.transitionTo(PreparingRebalance)
- group.transitionTo(Rebalancing)
- assertFalse(group.canRebalance)
- }
-
- @Test
- def testCannotRebalanceWhenDead() {
- group.transitionTo(PreparingRebalance)
- group.transitionTo(Dead)
- assertFalse(group.canRebalance)
- }
-
- @Test
- def testStableToPreparingRebalanceTransition() {
- group.transitionTo(PreparingRebalance)
- assertState(group, PreparingRebalance)
- }
-
- @Test
- def testPreparingRebalanceToRebalancingTransition() {
- group.transitionTo(PreparingRebalance)
- group.transitionTo(Rebalancing)
- assertState(group, Rebalancing)
- }
-
- @Test
- def testPreparingRebalanceToDeadTransition() {
- group.transitionTo(PreparingRebalance)
- group.transitionTo(Dead)
- assertState(group, Dead)
- }
-
- @Test
- def testRebalancingToStableTransition() {
- group.transitionTo(PreparingRebalance)
- group.transitionTo(Rebalancing)
- group.transitionTo(Stable)
- assertState(group, Stable)
- }
-
- @Test(expected = classOf[IllegalStateException])
- def testStableToStableIllegalTransition() {
- group.transitionTo(Stable)
- }
-
- @Test(expected = classOf[IllegalStateException])
- def testStableToRebalancingIllegalTransition() {
- group.transitionTo(Rebalancing)
- }
-
- @Test(expected = classOf[IllegalStateException])
- def testStableToDeadIllegalTransition() {
- group.transitionTo(Dead)
- }
-
- @Test(expected = classOf[IllegalStateException])
- def testPreparingRebalanceToPreparingRebalanceIllegalTransition() {
- group.transitionTo(PreparingRebalance)
- group.transitionTo(PreparingRebalance)
- }
-
- @Test(expected = classOf[IllegalStateException])
- def testPreparingRebalanceToStableIllegalTransition() {
- group.transitionTo(PreparingRebalance)
- group.transitionTo(Stable)
- }
-
- @Test(expected = classOf[IllegalStateException])
- def testRebalancingToRebalancingIllegalTransition() {
- group.transitionTo(PreparingRebalance)
- group.transitionTo(Rebalancing)
- group.transitionTo(Rebalancing)
- }
-
- @Test(expected = classOf[IllegalStateException])
- def testRebalancingToPreparingRebalanceTransition() {
- group.transitionTo(PreparingRebalance)
- group.transitionTo(Rebalancing)
- group.transitionTo(PreparingRebalance)
- }
-
- @Test(expected = classOf[IllegalStateException])
- def testRebalancingToDeadIllegalTransition() {
- group.transitionTo(PreparingRebalance)
- group.transitionTo(Rebalancing)
- group.transitionTo(Dead)
- }
-
- @Test(expected = classOf[IllegalStateException])
- def testDeadToDeadIllegalTransition() {
- group.transitionTo(PreparingRebalance)
- group.transitionTo(Dead)
- group.transitionTo(Dead)
- }
-
- @Test(expected = classOf[IllegalStateException])
- def testDeadToStableIllegalTransition() {
- group.transitionTo(PreparingRebalance)
- group.transitionTo(Dead)
- group.transitionTo(Stable)
- }
-
- @Test(expected = classOf[IllegalStateException])
- def testDeadToPreparingRebalanceIllegalTransition() {
- group.transitionTo(PreparingRebalance)
- group.transitionTo(Dead)
- group.transitionTo(PreparingRebalance)
- }
-
- @Test(expected = classOf[IllegalStateException])
- def testDeadToRebalancingIllegalTransition() {
- group.transitionTo(PreparingRebalance)
- group.transitionTo(Dead)
- group.transitionTo(Rebalancing)
- }
-
- private def assertState(group: ConsumerGroupMetadata, targetState: GroupState) {
- val states: Set[GroupState] = Set(Stable, PreparingRebalance, Rebalancing, Dead)
- val otherStates = states - targetState
- otherStates.foreach { otherState =>
- assertFalse(group.is(otherState))
- }
- assertTrue(group.is(targetState))
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala
index 3bc37e5..49a237b 100644
--- a/core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala
@@ -18,13 +18,9 @@
package kafka.coordinator
import kafka.server.KafkaConfig
-import kafka.utils.{TestUtils, ZkUtils}
-import kafka.utils.ZkUtils._
+import kafka.utils.TestUtils
import org.junit.Assert._
-import org.I0Itec.zkclient.{IZkDataListener, ZkClient}
-import org.apache.zookeeper.data.Stat
-import org.easymock.EasyMock
import org.junit.{Before, Test}
import org.scalatest.junit.JUnitSuite
@@ -34,15 +30,12 @@ import org.scalatest.junit.JUnitSuite
class CoordinatorMetadataTest extends JUnitSuite {
val DefaultNumPartitions = 8
val DefaultNumReplicas = 2
- var zkUtils: ZkUtils = null
var coordinatorMetadata: CoordinatorMetadata = null
@Before
def setUp() {
val props = TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "")
- val zkClient = EasyMock.createStrictMock(classOf[ZkClient])
- zkUtils = ZkUtils(zkClient, false)
- coordinatorMetadata = new CoordinatorMetadata(KafkaConfig.fromProps(props).brokerId, zkUtils, null)
+ coordinatorMetadata = new CoordinatorMetadata(KafkaConfig.fromProps(props).brokerId)
}
@Test
@@ -53,7 +46,8 @@ class CoordinatorMetadataTest extends JUnitSuite {
@Test
def testGetGroup() {
val groupId = "group"
- val expected = coordinatorMetadata.addGroup(groupId, "range")
+ val protocolType = "consumer"
+ val expected = coordinatorMetadata.addGroup(groupId, protocolType)
val actual = coordinatorMetadata.getGroup(groupId)
assertEquals(expected, actual)
}
@@ -61,155 +55,17 @@ class CoordinatorMetadataTest extends JUnitSuite {
@Test
def testAddGroupReturnsPreexistingGroupIfItAlreadyExists() {
val groupId = "group"
- val group1 = coordinatorMetadata.addGroup(groupId, "range")
- val group2 = coordinatorMetadata.addGroup(groupId, "range")
+ val protocolType = "consumer"
+ val group1 = coordinatorMetadata.addGroup(groupId, protocolType)
+ val group2 = coordinatorMetadata.addGroup(groupId, protocolType)
assertEquals(group1, group2)
}
@Test(expected = classOf[IllegalArgumentException])
- def testBindNonexistentGroupToTopics() {
- val groupId = "group"
- val topics = Set("a")
- coordinatorMetadata.bindGroupToTopics(groupId, topics)
- }
-
- @Test
- def testBindGroupToTopicsNotListenedOn() {
- val groupId = "group"
- val topics = Set("a")
- coordinatorMetadata.addGroup(groupId, "range")
-
- expectZkClientSubscribeDataChanges(zkUtils, topics)
- EasyMock.replay(zkUtils.zkClient)
- coordinatorMetadata.bindGroupToTopics(groupId, topics)
- assertEquals(Map("a" -> DefaultNumPartitions), coordinatorMetadata.partitionsPerTopic)
- }
-
- @Test
- def testBindGroupToTopicsAlreadyListenedOn() {
- val group1 = "group1"
- val group2 = "group2"
- val topics = Set("a")
- coordinatorMetadata.addGroup(group1, "range")
- coordinatorMetadata.addGroup(group2, "range")
-
- expectZkClientSubscribeDataChanges(zkUtils, topics)
- EasyMock.replay(zkUtils.zkClient)
- coordinatorMetadata.bindGroupToTopics(group1, topics)
- coordinatorMetadata.bindGroupToTopics(group2, topics)
- assertEquals(Map("a" -> DefaultNumPartitions), coordinatorMetadata.partitionsPerTopic)
- }
-
- @Test(expected = classOf[IllegalArgumentException])
- def testUnbindNonexistentGroupFromTopics() {
- val groupId = "group"
- val topics = Set("a")
- coordinatorMetadata.unbindGroupFromTopics(groupId, topics)
- }
-
- @Test
- def testUnbindGroupFromTopicsNotListenedOn() {
- val groupId = "group"
- val topics = Set("a")
- coordinatorMetadata.addGroup(groupId, "range")
-
- expectZkClientSubscribeDataChanges(zkUtils, topics)
- EasyMock.replay(zkUtils.zkClient)
- coordinatorMetadata.bindGroupToTopics(groupId, topics)
- coordinatorMetadata.unbindGroupFromTopics(groupId, Set("b"))
- assertEquals(Map("a" -> DefaultNumPartitions), coordinatorMetadata.partitionsPerTopic)
- }
-
- @Test
- def testUnbindGroupFromTopicsListenedOnByOtherGroups() {
- val group1 = "group1"
- val group2 = "group2"
- val topics = Set("a")
- coordinatorMetadata.addGroup(group1, "range")
- coordinatorMetadata.addGroup(group2, "range")
-
- expectZkClientSubscribeDataChanges(zkUtils, topics)
- EasyMock.replay(zkUtils.zkClient)
- coordinatorMetadata.bindGroupToTopics(group1, topics)
- coordinatorMetadata.bindGroupToTopics(group2, topics)
- coordinatorMetadata.unbindGroupFromTopics(group1, topics)
- assertEquals(Map("a" -> DefaultNumPartitions), coordinatorMetadata.partitionsPerTopic)
- }
-
- @Test
- def testUnbindGroupFromTopicsListenedOnByNoOtherGroup() {
- val groupId = "group"
- val topics = Set("a")
- coordinatorMetadata.addGroup(groupId, "range")
-
- expectZkClientSubscribeDataChanges(zkUtils, topics)
- expectZkClientUnsubscribeDataChanges(zkUtils.zkClient, topics)
- EasyMock.replay(zkUtils.zkClient)
- coordinatorMetadata.bindGroupToTopics(groupId, topics)
- coordinatorMetadata.unbindGroupFromTopics(groupId, topics)
- assertEquals(Map.empty[String, Int], coordinatorMetadata.partitionsPerTopic)
- }
-
- @Test(expected = classOf[IllegalArgumentException])
def testRemoveNonexistentGroup() {
val groupId = "group"
val topics = Set("a")
- coordinatorMetadata.removeGroup(groupId, topics)
- }
-
- @Test
- def testRemoveGroupWithOtherGroupsBoundToItsTopics() {
- val group1 = "group1"
- val group2 = "group2"
- val topics = Set("a")
- coordinatorMetadata.addGroup(group1, "range")
- coordinatorMetadata.addGroup(group2, "range")
-
- expectZkClientSubscribeDataChanges(zkUtils, topics)
- EasyMock.replay(zkUtils.zkClient)
- coordinatorMetadata.bindGroupToTopics(group1, topics)
- coordinatorMetadata.bindGroupToTopics(group2, topics)
- coordinatorMetadata.removeGroup(group1, topics)
- assertNull(coordinatorMetadata.getGroup(group1))
- assertNotNull(coordinatorMetadata.getGroup(group2))
- assertEquals(Map("a" -> DefaultNumPartitions), coordinatorMetadata.partitionsPerTopic)
- }
-
- @Test
- def testRemoveGroupWithNoOtherGroupsBoundToItsTopics() {
- val groupId = "group"
- val topics = Set("a")
- coordinatorMetadata.addGroup(groupId, "range")
-
- expectZkClientSubscribeDataChanges(zkUtils, topics)
- expectZkClientUnsubscribeDataChanges(zkUtils.zkClient, topics)
- EasyMock.replay(zkUtils.zkClient)
- coordinatorMetadata.bindGroupToTopics(groupId, topics)
- coordinatorMetadata.removeGroup(groupId, topics)
- assertNull(coordinatorMetadata.getGroup(groupId))
- assertEquals(Map.empty[String, Int], coordinatorMetadata.partitionsPerTopic)
+ coordinatorMetadata.removeGroup(groupId)
}
- private def expectZkClientSubscribeDataChanges(zkUtils: ZkUtils, topics: Set[String]) {
- topics.foreach(topic => expectZkClientSubscribeDataChange(zkUtils.zkClient, topic))
- }
-
- private def expectZkClientUnsubscribeDataChanges(zkClient: ZkClient, topics: Set[String]) {
- topics.foreach(topic => expectZkClientUnsubscribeDataChange(zkClient, topic))
- }
-
- private def expectZkClientSubscribeDataChange(zkClient: ZkClient, topic: String) {
- val replicaAssignment =
- (0 until DefaultNumPartitions)
- .map(partition => partition.toString -> (0 until DefaultNumReplicas).toSeq).toMap
- val topicPath = getTopicPath(topic)
- EasyMock.expect(zkClient.readData(topicPath, new Stat()))
- .andReturn(zkUtils.replicaAssignmentZkData(replicaAssignment))
- zkClient.subscribeDataChanges(EasyMock.eq(topicPath), EasyMock.isA(classOf[IZkDataListener]))
- }
-
- private def expectZkClientUnsubscribeDataChange(zkClient: ZkClient, topic: String) {
- val topicPath = getTopicPath(topic)
- zkClient.unsubscribeDataChanges(EasyMock.eq(topicPath), EasyMock.isA(classOf[IZkDataListener]))
- }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
new file mode 100644
index 0000000..cdd78ef
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
@@ -0,0 +1,907 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.coordinator
+
+
+import java.util.concurrent.TimeUnit
+
+import org.junit.Assert._
+import kafka.common.{OffsetAndMetadata, TopicAndPartition}
+import kafka.server.{OffsetManager, KafkaConfig}
+import kafka.utils.TestUtils
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{OffsetCommitRequest, JoinGroupRequest}
+import org.easymock.{IAnswer, EasyMock}
+import org.junit.{After, Before, Test}
+import org.scalatest.junit.JUnitSuite
+
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, Future, Promise}
+
+/**
+ * Test GroupCoordinator responses
+ */
+class GroupCoordinatorResponseTest extends JUnitSuite {
+ type JoinGroupCallback = JoinGroupResult => Unit
+ type SyncGroupCallbackParams = (Array[Byte], Short)
+ type SyncGroupCallback = (Array[Byte], Short) => Unit
+ type HeartbeatCallbackParams = Short
+ type HeartbeatCallback = Short => Unit
+ type CommitOffsetCallbackParams = Map[TopicAndPartition, Short]
+ type CommitOffsetCallback = Map[TopicAndPartition, Short] => Unit
+ type LeaveGroupCallbackParams = Short
+ type LeaveGroupCallback = Short => Unit
+
+ val ConsumerMinSessionTimeout = 10
+ val ConsumerMaxSessionTimeout = 1000
+ val DefaultSessionTimeout = 500
+ var consumerCoordinator: GroupCoordinator = null
+ var offsetManager : OffsetManager = null
+
+ @Before
+ def setUp() {
+ val props = TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "")
+ props.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, ConsumerMinSessionTimeout.toString)
+ props.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp, ConsumerMaxSessionTimeout.toString)
+ offsetManager = EasyMock.createStrictMock(classOf[OffsetManager])
+ consumerCoordinator = GroupCoordinator.create(KafkaConfig.fromProps(props), null, offsetManager)
+ consumerCoordinator.startup()
+ }
+
+ @After
+ def tearDown() {
+ EasyMock.reset(offsetManager)
+ consumerCoordinator.shutdown()
+ }
+
+ @Test
+ def testJoinGroupWrongCoordinator() {
+ val groupId = "groupId"
+ val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+ val metadata = Array[Byte]()
+ val protocolType = "consumer"
+ val protocols = List(("range", metadata))
+
+ val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType,
+ protocols, isCoordinatorForGroup = false)
+ val joinGroupErrorCode = joinGroupResult.errorCode
+ assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP.code, joinGroupErrorCode)
+ }
+
+ @Test
+ def testJoinGroupSessionTimeoutTooSmall() {
+ val groupId = "groupId"
+ val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+ val metadata = Array[Byte]()
+ val protocolType = "consumer"
+ val protocols = List(("range", metadata))
+
+ val joinGroupResult = joinGroup(groupId, memberId, ConsumerMinSessionTimeout - 1, protocolType, protocols,
+ isCoordinatorForGroup = true)
+ val joinGroupErrorCode = joinGroupResult.errorCode
+ assertEquals(Errors.INVALID_SESSION_TIMEOUT.code, joinGroupErrorCode)
+ }
+
+ @Test
+ def testJoinGroupSessionTimeoutTooLarge() {
+ val groupId = "groupId"
+ val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+ val metadata = Array[Byte]()
+ val protocolType = "consumer"
+ val protocols = List(("range", metadata))
+
+ val joinGroupResult = joinGroup(groupId, memberId, ConsumerMaxSessionTimeout + 1, protocolType, protocols,
+ isCoordinatorForGroup = true)
+ val joinGroupErrorCode = joinGroupResult.errorCode
+ assertEquals(Errors.INVALID_SESSION_TIMEOUT.code, joinGroupErrorCode)
+ }
+
+ @Test
+ def testJoinGroupUnknownConsumerNewGroup() {
+ val groupId = "groupId"
+ val memberId = "memberId"
+ val metadata = Array[Byte]()
+ val protocolType = "consumer"
+ val protocols = List(("range", metadata))
+
+ val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols,
+ isCoordinatorForGroup = true)
+ val joinGroupErrorCode = joinGroupResult.errorCode
+ assertEquals(Errors.UNKNOWN_MEMBER_ID.code, joinGroupErrorCode)
+ }
+
+ @Test
+ def testValidJoinGroup() {
+ val groupId = "groupId"
+ val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+ val metadata = Array[Byte]()
+ val protocolType = "consumer"
+ val protocols = List(("range", metadata))
+
+ val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType,
+ protocols, isCoordinatorForGroup = true)
+ val joinGroupErrorCode = joinGroupResult.errorCode
+ assertEquals(Errors.NONE.code, joinGroupErrorCode)
+ }
+
+ @Test
+ def testJoinGroupInconsistentProtocolType() {
+ val groupId = "groupId"
+ val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+ val metadata = Array[Byte]()
+
+ val otherMemberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+
+ val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, "consumer", List(("range", metadata)),
+ isCoordinatorForGroup = true)
+ assertEquals(Errors.NONE.code, joinGroupResult.errorCode)
+
+ EasyMock.reset(offsetManager)
+ val otherJoinGroupResult = joinGroup(groupId, otherMemberId, DefaultSessionTimeout, "copycat",
+ List(("range", metadata)), isCoordinatorForGroup = true)
+ assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL.code, otherJoinGroupResult.errorCode)
+ }
+
+ @Test
+ def testJoinGroupInconsistentGroupProtocol() {
+ val groupId = "groupId"
+ val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+ val protocolType = "consumer"
+ val metadata = Array[Byte]()
+
+ val otherMemberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+
+ val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, List(("range", metadata)),
+ isCoordinatorForGroup = true)
+ assertEquals(Errors.NONE.code, joinGroupResult.errorCode)
+
+ EasyMock.reset(offsetManager)
+ val otherJoinGroupResult = joinGroup(groupId, otherMemberId, DefaultSessionTimeout, protocolType,
+ List(("roundrobin", metadata)), isCoordinatorForGroup = true)
+ assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL.code, otherJoinGroupResult.errorCode)
+ }
+
+ @Test
+ def testJoinGroupUnknownConsumerExistingGroup() {
+ val groupId = "groupId"
+ val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+ val otherMemberId = "memberId"
+ val metadata = Array[Byte]()
+ val protocolType = "consumer"
+ val protocols = List(("range", metadata))
+
+ val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols,
+ isCoordinatorForGroup = true)
+ assertEquals(Errors.NONE.code, joinGroupResult.errorCode)
+
+ EasyMock.reset(offsetManager)
+ val otherJoinGroupResult = joinGroup(groupId, otherMemberId, DefaultSessionTimeout, protocolType, protocols,
+ isCoordinatorForGroup = true)
+ assertEquals(Errors.UNKNOWN_MEMBER_ID.code, otherJoinGroupResult.errorCode)
+ }
+
+ @Test
+ def testHeartbeatWrongCoordinator() {
+ val groupId = "groupId"
+ val consumerId = "memberId"
+
+ val heartbeatResult = heartbeat(groupId, consumerId, -1, isCoordinatorForGroup = false)
+ assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP.code, heartbeatResult)
+ }
+
+ @Test
+ def testHeartbeatUnknownGroup() {
+ val groupId = "groupId"
+ val consumerId = "memberId"
+
+ val heartbeatResult = heartbeat(groupId, consumerId, -1, isCoordinatorForGroup = true)
+ assertEquals(Errors.UNKNOWN_MEMBER_ID.code, heartbeatResult)
+ }
+
+ @Test
+ def testHeartbeatUnknownConsumerExistingGroup() {
+ val groupId = "groupId"
+ val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+ val otherMemberId = "memberId"
+ val metadata = Array[Byte]()
+ val protocolType = "consumer"
+ val protocols = List(("range", metadata))
+
+ val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols,
+ isCoordinatorForGroup = true)
+ val joinGroupErrorCode = joinGroupResult.errorCode
+ assertEquals(Errors.NONE.code, joinGroupErrorCode)
+
+ EasyMock.reset(offsetManager)
+ val syncGroupResult = syncGroupLeader(groupId, joinGroupResult.generationId, joinGroupResult.memberId, Map.empty, true)
+ val syncGroupErrorCode = syncGroupResult._2
+ assertEquals(Errors.NONE.code, syncGroupErrorCode)
+
+ EasyMock.reset(offsetManager)
+ val heartbeatResult = heartbeat(groupId, otherMemberId, 1, isCoordinatorForGroup = true)
+ assertEquals(Errors.UNKNOWN_MEMBER_ID.code, heartbeatResult)
+ }
+
+ @Test
+ def testHeartbeatRebalanceInProgress() {
+ val groupId = "groupId"
+ val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+ val metadata = Array[Byte]()
+ val protocolType = "consumer"
+ val protocols = List(("range", metadata))
+
+ val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols,
+ isCoordinatorForGroup = true)
+ val assignedMemberId = joinGroupResult.memberId
+ val joinGroupErrorCode = joinGroupResult.errorCode
+ assertEquals(Errors.NONE.code, joinGroupErrorCode)
+
+ EasyMock.reset(offsetManager)
+ val heartbeatResult = heartbeat(groupId, assignedMemberId, 2, isCoordinatorForGroup = true)
+ assertEquals(Errors.REBALANCE_IN_PROGRESS.code, heartbeatResult)
+ }
+
+ @Test
+ def testHeartbeatIllegalGeneration() {
+ val groupId = "groupId"
+ val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+ val metadata = Array[Byte]()
+ val protocolType = "consumer"
+ val protocols = List(("range", metadata))
+
+ val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols,
+ isCoordinatorForGroup = true)
+ val assignedMemberId = joinGroupResult.memberId
+ val joinGroupErrorCode = joinGroupResult.errorCode
+ assertEquals(Errors.NONE.code, joinGroupErrorCode)
+
+ EasyMock.reset(offsetManager)
+ val syncGroupResult = syncGroupLeader(groupId, joinGroupResult.generationId, assignedMemberId, Map.empty, true)
+ val syncGroupErrorCode = syncGroupResult._2
+ assertEquals(Errors.NONE.code, syncGroupErrorCode)
+
+ EasyMock.reset(offsetManager)
+ val heartbeatResult = heartbeat(groupId, assignedMemberId, 2, isCoordinatorForGroup = true)
+ assertEquals(Errors.ILLEGAL_GENERATION.code, heartbeatResult)
+ }
+
+ @Test
+ def testValidHeartbeat() {
+ val groupId = "groupId"
+ val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+ val metadata = Array[Byte]()
+ val protocolType = "consumer"
+ val protocols = List(("range", metadata))
+
+ val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols,
+ isCoordinatorForGroup = true)
+ val assignedConsumerId = joinGroupResult.memberId
+ val generationId = joinGroupResult.generationId
+ val joinGroupErrorCode = joinGroupResult.errorCode
+ assertEquals(Errors.NONE.code, joinGroupErrorCode)
+
+ EasyMock.reset(offsetManager)
+ val syncGroupResult = syncGroupLeader(groupId, generationId, assignedConsumerId, Map.empty, true)
+ val syncGroupErrorCode = syncGroupResult._2
+ assertEquals(Errors.NONE.code, syncGroupErrorCode)
+
+ EasyMock.reset(offsetManager)
+ val heartbeatResult = heartbeat(groupId, assignedConsumerId, 1, isCoordinatorForGroup = true)
+ assertEquals(Errors.NONE.code, heartbeatResult)
+ }
+
+ @Test
+ def testSyncGroupNotCoordinator() {
+ val groupId = "groupId"
+ val memberId = "member"
+ val generation = 1
+
+ val syncGroupResult = syncGroupFollower(groupId, generation, memberId, false)
+ assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP.code, syncGroupResult._2)
+ }
+
+ @Test
+ def testSyncGroupFromUnknownGroup() {
+ val groupId = "groupId"
+ val memberId = "member"
+ val generation = 1
+
+ val syncGroupResult = syncGroupFollower(groupId, generation, memberId, true)
+ assertEquals(Errors.UNKNOWN_MEMBER_ID.code, syncGroupResult._2)
+ }
+
+ @Test
+ def testSyncGroupFromUnknownMember() {
+ val groupId = "groupId"
+ val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+ val metadata = Array[Byte]()
+ val protocolType = "consumer"
+ val protocols = List(("range", metadata))
+
+ val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols,
+ isCoordinatorForGroup = true)
+ val assignedConsumerId = joinGroupResult.memberId
+ val generationId = joinGroupResult.generationId
+ assertEquals(Errors.NONE.code, joinGroupResult.errorCode)
+
+ EasyMock.reset(offsetManager)
+ val syncGroupResult = syncGroupLeader(groupId, generationId, assignedConsumerId, Map.empty, true)
+ val syncGroupErrorCode = syncGroupResult._2
+ assertEquals(Errors.NONE.code, syncGroupErrorCode)
+
+ EasyMock.reset(offsetManager)
+ val unknownMemberId = "blah"
+ val unknownMemberSyncResult = syncGroupFollower(groupId, generationId, unknownMemberId, true)
+ assertEquals(Errors.UNKNOWN_MEMBER_ID.code, unknownMemberSyncResult._2)
+ }
+
+ @Test
+ def testSyncGroupFromIllegalGeneration() {
+ val groupId = "groupId"
+ val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+ val metadata = Array[Byte]()
+ val protocolType = "consumer"
+ val protocols = List(("range", metadata))
+
+ val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols,
+ isCoordinatorForGroup = true)
+ val assignedConsumerId = joinGroupResult.memberId
+ val generationId = joinGroupResult.generationId
+ assertEquals(Errors.NONE.code, joinGroupResult.errorCode)
+
+ EasyMock.reset(offsetManager)
+ // send the sync group with an invalid generation
+ val syncGroupResult = syncGroupLeader(groupId, generationId+1, assignedConsumerId, Map.empty, true)
+ assertEquals(Errors.ILLEGAL_GENERATION.code, syncGroupResult._2)
+ }
+
+ @Test
+ def testJoinGroupFromUnchangedFollowerDoesNotRebalance() {
+ val groupId = "groupId"
+ val protocolType = "consumer"
+ val protocols = List(("range", Array[Byte]()))
+
+ // to get a group of two members:
+ // 1. join and sync with a single member (because we can't immediately join with two members)
+ // 2. join and sync with the first member and a new member
+
+ val firstJoinResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, DefaultSessionTimeout,
+ protocolType, protocols, isCoordinatorForGroup = true)
+ val firstMemberId = firstJoinResult.memberId
+ val firstGenerationId = firstJoinResult.generationId
+ assertEquals(firstMemberId, firstJoinResult.leaderId)
+ assertEquals(Errors.NONE.code, firstJoinResult.errorCode)
+
+ EasyMock.reset(offsetManager)
+ val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map.empty, true)
+ assertEquals(Errors.NONE.code, firstSyncResult._2)
+
+ EasyMock.reset(offsetManager)
+ val otherJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, DefaultSessionTimeout,
+ protocolType, protocols, isCoordinatorForGroup = true)
+
+ EasyMock.reset(offsetManager)
+ val joinFuture = sendJoinGroup(groupId, firstMemberId, DefaultSessionTimeout, protocolType, protocols,
+ isCoordinatorForGroup = true);
+
+ val joinResult = await(joinFuture, DefaultSessionTimeout+100)
+ val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100)
+ assertEquals(Errors.NONE.code, joinResult.errorCode)
+ assertEquals(Errors.NONE.code, otherJoinResult.errorCode)
+ assertTrue(joinResult.generationId == otherJoinResult.generationId)
+
+ assertEquals(firstMemberId, joinResult.leaderId)
+ assertEquals(firstMemberId, otherJoinResult.leaderId)
+
+ val nextGenerationId = joinResult.generationId
+
+ // this shouldn't cause a rebalance since protocol information hasn't changed
+ EasyMock.reset(offsetManager)
+ val followerJoinResult = joinGroup(groupId, otherJoinResult.memberId, DefaultSessionTimeout, protocolType, protocols,
+ isCoordinatorForGroup = true)
+
+ assertEquals(Errors.NONE.code, followerJoinResult.errorCode)
+ assertEquals(nextGenerationId, followerJoinResult.generationId)
+ }
+
+ @Test
+ def testJoinGroupFromUnchangedLeaderShouldRebalance() {
+ val groupId = "groupId"
+ val protocolType = "consumer"
+ val protocols = List(("range", Array[Byte]()))
+
+ val firstJoinResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, DefaultSessionTimeout,
+ protocolType, protocols, isCoordinatorForGroup = true)
+ val firstMemberId = firstJoinResult.memberId
+ val firstGenerationId = firstJoinResult.generationId
+ assertEquals(firstMemberId, firstJoinResult.leaderId)
+ assertEquals(Errors.NONE.code, firstJoinResult.errorCode)
+
+ EasyMock.reset(offsetManager)
+ val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map.empty, true)
+ assertEquals(Errors.NONE.code, firstSyncResult._2)
+
+ // join groups from the leader should force the group to rebalance, which allows the
+ // leader to push new assignments when local metadata changes
+
+ EasyMock.reset(offsetManager)
+ val secondJoinResult = joinGroup(groupId, firstMemberId, DefaultSessionTimeout, protocolType, protocols,
+ isCoordinatorForGroup = true)
+
+ assertEquals(Errors.NONE.code, secondJoinResult.errorCode)
+ assertNotEquals(firstGenerationId, secondJoinResult.generationId)
+ }
+
+ @Test
+ def testLeaderFailureInSyncGroup() {
+ val groupId = "groupId"
+ val protocolType = "consumer"
+ val protocols = List(("range", Array[Byte]()))
+
+ // to get a group of two members:
+ // 1. join and sync with a single member (because we can't immediately join with two members)
+ // 2. join and sync with the first member and a new member
+
+ val firstJoinResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, DefaultSessionTimeout,
+ protocolType, protocols, isCoordinatorForGroup = true)
+ val firstMemberId = firstJoinResult.memberId
+ val firstGenerationId = firstJoinResult.generationId
+ assertEquals(firstMemberId, firstJoinResult.leaderId)
+ assertEquals(Errors.NONE.code, firstJoinResult.errorCode)
+
+ EasyMock.reset(offsetManager)
+ val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map.empty, true)
+ assertEquals(Errors.NONE.code, firstSyncResult._2)
+
+ EasyMock.reset(offsetManager)
+ val otherJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, DefaultSessionTimeout,
+ protocolType, protocols, isCoordinatorForGroup = true)
+
+ EasyMock.reset(offsetManager)
+ val joinFuture = sendJoinGroup(groupId, firstMemberId, DefaultSessionTimeout, protocolType, protocols,
+ isCoordinatorForGroup = true);
+
+ val joinResult = await(joinFuture, DefaultSessionTimeout+100)
+ val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100)
+ assertEquals(Errors.NONE.code, joinResult.errorCode)
+ assertEquals(Errors.NONE.code, otherJoinResult.errorCode)
+ assertTrue(joinResult.generationId == otherJoinResult.generationId)
+
+ assertEquals(firstMemberId, joinResult.leaderId)
+ assertEquals(firstMemberId, otherJoinResult.leaderId)
+
+ val nextGenerationId = joinResult.generationId
+
+ // with no leader SyncGroup, the follower's request should failure with an error indicating
+ // that it should rejoin
+ EasyMock.reset(offsetManager)
+ val followerSyncFuture= sendSyncGroupFollower(groupId, nextGenerationId, otherJoinResult.memberId,
+ isCoordinatorForGroup = true)
+ val followerSyncResult = await(followerSyncFuture, DefaultSessionTimeout+100)
+ assertEquals(Errors.REBALANCE_IN_PROGRESS.code, followerSyncResult._2)
+ }
+
+ @Test
+ def testSyncGroupFollowerAfterLeader() {
+ val groupId = "groupId"
+ val protocolType = "consumer"
+ val protocols = List(("range", Array[Byte]()))
+
+ // to get a group of two members:
+ // 1. join and sync with a single member (because we can't immediately join with two members)
+ // 2. join and sync with the first member and a new member
+
+ val firstJoinResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, DefaultSessionTimeout,
+ protocolType, protocols, isCoordinatorForGroup = true)
+ val firstMemberId = firstJoinResult.memberId
+ val firstGenerationId = firstJoinResult.generationId
+ assertEquals(firstMemberId, firstJoinResult.leaderId)
+ assertEquals(Errors.NONE.code, firstJoinResult.errorCode)
+
+ EasyMock.reset(offsetManager)
+ val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map.empty, true)
+ assertEquals(Errors.NONE.code, firstSyncResult._2)
+
+ EasyMock.reset(offsetManager)
+ val otherJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, DefaultSessionTimeout,
+ protocolType, protocols, isCoordinatorForGroup = true)
+
+ EasyMock.reset(offsetManager)
+ val joinFuture = sendJoinGroup(groupId, firstMemberId, DefaultSessionTimeout, protocolType, protocols,
+ isCoordinatorForGroup = true);
+
+ val joinResult = await(joinFuture, DefaultSessionTimeout+100)
+ val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100)
+ assertEquals(Errors.NONE.code, joinResult.errorCode)
+ assertEquals(Errors.NONE.code, otherJoinResult.errorCode)
+ assertTrue(joinResult.generationId == otherJoinResult.generationId)
+
+ assertEquals(firstMemberId, joinResult.leaderId)
+ assertEquals(firstMemberId, otherJoinResult.leaderId)
+
+ val nextGenerationId = joinResult.generationId
+ val leaderId = firstMemberId
+ val leaderAssignment = Array[Byte](0)
+ val followerId = otherJoinResult.memberId
+ val followerAssignment = Array[Byte](1)
+
+ EasyMock.reset(offsetManager)
+ val leaderSyncResult = syncGroupLeader(groupId, nextGenerationId, leaderId,
+ Map(leaderId -> leaderAssignment, followerId -> followerAssignment), isCoordinatorForGroup = true)
+ assertEquals(Errors.NONE.code, leaderSyncResult._2)
+ assertEquals(leaderAssignment, leaderSyncResult._1)
+
+ EasyMock.reset(offsetManager)
+ val followerSyncResult = syncGroupFollower(groupId, nextGenerationId, otherJoinResult.memberId,
+ isCoordinatorForGroup = true)
+ assertEquals(Errors.NONE.code, followerSyncResult._2)
+ assertEquals(followerAssignment, followerSyncResult._1)
+ }
+
+ @Test
+ def testSyncGroupLeaderAfterFollower() {
+ val groupId = "groupId"
+ val protocolType = "consumer"
+ val protocols = List(("range", Array[Byte]()))
+
+ // to get a group of two members:
+ // 1. join and sync with a single member (because we can't immediately join with two members)
+ // 2. join and sync with the first member and a new member
+
+ val joinGroupResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, DefaultSessionTimeout,
+ protocolType, protocols, isCoordinatorForGroup = true)
+ val firstMemberId = joinGroupResult.memberId
+ val firstGenerationId = joinGroupResult.generationId
+ assertEquals(firstMemberId, joinGroupResult.leaderId)
+ assertEquals(Errors.NONE.code, joinGroupResult.errorCode)
+
+ EasyMock.reset(offsetManager)
+ val syncGroupResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map.empty, true)
+ val syncGroupErrorCode = syncGroupResult._2
+ assertEquals(Errors.NONE.code, syncGroupErrorCode)
+
+ EasyMock.reset(offsetManager)
+ val otherJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, DefaultSessionTimeout,
+ protocolType, protocols, isCoordinatorForGroup = true)
+
+ EasyMock.reset(offsetManager)
+ val joinFuture = sendJoinGroup(groupId, firstMemberId, DefaultSessionTimeout, protocolType, protocols,
+ isCoordinatorForGroup = true);
+
+ val joinResult = await(joinFuture, DefaultSessionTimeout+100)
+ val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100)
+ assertEquals(Errors.NONE.code, joinResult.errorCode)
+ assertEquals(Errors.NONE.code, otherJoinResult.errorCode)
+ assertTrue(joinResult.generationId == otherJoinResult.generationId)
+
+ val nextGenerationId = joinResult.generationId
+ val leaderId = joinResult.leaderId
+ val leaderAssignment = Array[Byte](0)
+ val followerId = otherJoinResult.memberId
+ val followerAssignment = Array[Byte](1)
+
+ assertEquals(firstMemberId, joinResult.leaderId)
+ assertEquals(firstMemberId, otherJoinResult.leaderId)
+
+ EasyMock.reset(offsetManager)
+ val followerSyncFuture = sendSyncGroupFollower(groupId, nextGenerationId, followerId, isCoordinatorForGroup = true)
+
+ EasyMock.reset(offsetManager)
+ val leaderSyncResult = syncGroupLeader(groupId, nextGenerationId, leaderId,
+ Map(leaderId -> leaderAssignment, followerId -> followerAssignment), isCoordinatorForGroup = true)
+ assertEquals(Errors.NONE.code, leaderSyncResult._2)
+ assertEquals(leaderAssignment, leaderSyncResult._1)
+
+ val followerSyncResult = await(followerSyncFuture, DefaultSessionTimeout+100)
+ assertEquals(Errors.NONE.code, followerSyncResult._2)
+ assertEquals(followerAssignment, followerSyncResult._1)
+ }
+
+ @Test
+ def testCommitOffsetFromUnknownGroup() {
+ val groupId = "groupId"
+ val consumerId = "consumer"
+ val generationId = 1
+ val tp = new TopicAndPartition("topic", 0)
+ val offset = OffsetAndMetadata(0)
+
+ val commitOffsetResult = commitOffsets(groupId, consumerId, generationId, Map(tp -> offset), true)
+ assertEquals(Errors.ILLEGAL_GENERATION.code, commitOffsetResult(tp))
+ }
+
+ @Test
+ def testCommitOffsetWithDefaultGeneration() {
+ val groupId = "groupId"
+ val tp = new TopicAndPartition("topic", 0)
+ val offset = OffsetAndMetadata(0)
+
+ val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID,
+ OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp -> offset), true)
+ assertEquals(Errors.NONE.code, commitOffsetResult(tp))
+ }
+
+ @Test
+ def testCommitOffsetInAwaitingSync() {
+ val groupId = "groupId"
+ val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+ val metadata = Array[Byte]()
+ val protocolType = "consumer"
+ val protocols = List(("range", metadata))
+ val tp = new TopicAndPartition("topic", 0)
+ val offset = OffsetAndMetadata(0)
+
+ val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols,
+ isCoordinatorForGroup = true)
+ val assignedMemberId = joinGroupResult.memberId
+ val generationId = joinGroupResult.generationId
+ val joinGroupErrorCode = joinGroupResult.errorCode
+ assertEquals(Errors.NONE.code, joinGroupErrorCode)
+
+ EasyMock.reset(offsetManager)
+ val commitOffsetResult = commitOffsets(groupId, assignedMemberId, generationId, Map(tp -> offset), true)
+ assertEquals(Errors.REBALANCE_IN_PROGRESS.code, commitOffsetResult(tp))
+ }
+
+ @Test
+ def testHeartbeatDuringRebalanceCausesRebalanceInProgress() {
+ val groupId = "groupId"
+ val metadata = Array[Byte]()
+ val protocolType = "consumer"
+ val protocols = List(("range", metadata))
+
+ // First start up a group (with a slightly larger timeout to give us time to heartbeat when the rebalance starts)
+ val joinGroupResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, DefaultSessionTimeout,
+ protocolType, protocols, isCoordinatorForGroup = true)
+ val assignedConsumerId = joinGroupResult.memberId
+ val initialGenerationId = joinGroupResult.generationId
+ val joinGroupErrorCode = joinGroupResult.errorCode
+ assertEquals(Errors.NONE.code, joinGroupErrorCode)
+
+ // Then join with a new consumer to trigger a rebalance
+ EasyMock.reset(offsetManager)
+ sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, DefaultSessionTimeout, protocolType, protocols,
+ isCoordinatorForGroup = true)
+
+ // We should be in the middle of a rebalance, so the heartbeat should return rebalance in progress
+ EasyMock.reset(offsetManager)
+ val heartbeatResult = heartbeat(groupId, assignedConsumerId, initialGenerationId, isCoordinatorForGroup = true)
+ assertEquals(Errors.REBALANCE_IN_PROGRESS.code, heartbeatResult)
+ }
+
+ @Test
+ def testGenerationIdIncrementsOnRebalance() {
+ val groupId = "groupId"
+ val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+ val otherMemberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+ val metadata = Array[Byte]()
+ val protocolType = "consumer"
+ val protocols = List(("range", metadata))
+
+ val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols,
+ isCoordinatorForGroup = true)
+ val initialGenerationId = joinGroupResult.generationId
+ val joinGroupErrorCode = joinGroupResult.errorCode
+ assertEquals(1, initialGenerationId)
+ assertEquals(Errors.NONE.code, joinGroupErrorCode)
+
+ EasyMock.reset(offsetManager)
+ val otherJoinGroupResult = joinGroup(groupId, otherMemberId, DefaultSessionTimeout, protocolType, protocols,
+ isCoordinatorForGroup = true)
+ val nextGenerationId = otherJoinGroupResult.generationId
+ val otherJoinGroupErrorCode = otherJoinGroupResult.errorCode
+ assertEquals(2, nextGenerationId)
+ assertEquals(Errors.NONE.code, otherJoinGroupErrorCode)
+ }
+
+ @Test
+ def testLeaveGroupWrongCoordinator() {
+ val groupId = "groupId"
+ val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+
+ val leaveGroupResult = leaveGroup(groupId, memberId, isCoordinatorForGroup = false)
+ assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP.code, leaveGroupResult)
+ }
+
+ @Test
+ def testLeaveGroupUnknownGroup() {
+ val groupId = "groupId"
+ val memberId = "consumerId"
+
+ val leaveGroupResult = leaveGroup(groupId, memberId, isCoordinatorForGroup = true)
+ assertEquals(Errors.UNKNOWN_MEMBER_ID.code, leaveGroupResult)
+ }
+
+ @Test
+ def testLeaveGroupUnknownConsumerExistingGroup() {
+ val groupId = "groupId"
+ val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+ val otherMemberId = "consumerId"
+ val metadata = Array[Byte]()
+ val protocolType = "consumer"
+ val protocols = List(("range", metadata))
+
+ val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols,
+ isCoordinatorForGroup = true)
+ val joinGroupErrorCode = joinGroupResult.errorCode
+ assertEquals(Errors.NONE.code, joinGroupErrorCode)
+
+ EasyMock.reset(offsetManager)
+ val leaveGroupResult = leaveGroup(groupId, otherMemberId, isCoordinatorForGroup = true)
+ assertEquals(Errors.UNKNOWN_MEMBER_ID.code, leaveGroupResult)
+ }
+
+ @Test
+ def testValidLeaveGroup() {
+ val groupId = "groupId"
+ val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+ val metadata = Array[Byte]()
+ val protocolType = "consumer"
+ val protocols = List(("range", metadata))
+
+ val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols,
+ isCoordinatorForGroup = true)
+ val assignedMemberId = joinGroupResult.memberId
+ val joinGroupErrorCode = joinGroupResult.errorCode
+ assertEquals(Errors.NONE.code, joinGroupErrorCode)
+
+ EasyMock.reset(offsetManager)
+ val leaveGroupResult = leaveGroup(groupId, assignedMemberId, isCoordinatorForGroup = true)
+ assertEquals(Errors.NONE.code, leaveGroupResult)
+ }
+
+ private def setupJoinGroupCallback: (Future[JoinGroupResult], JoinGroupCallback) = {
+ val responsePromise = Promise[JoinGroupResult]
+ val responseFuture = responsePromise.future
+ val responseCallback: JoinGroupCallback = responsePromise.success(_)
+ (responseFuture, responseCallback)
+ }
+
+ private def setupSyncGroupCallback: (Future[SyncGroupCallbackParams], SyncGroupCallback) = {
+ val responsePromise = Promise[SyncGroupCallbackParams]
+ val responseFuture = responsePromise.future
+ val responseCallback: SyncGroupCallback = (assignment, errorCode) =>
+ responsePromise.success((assignment, errorCode))
+ (responseFuture, responseCallback)
+ }
+
+ private def setupHeartbeatCallback: (Future[HeartbeatCallbackParams], HeartbeatCallback) = {
+ val responsePromise = Promise[HeartbeatCallbackParams]
+ val responseFuture = responsePromise.future
+ val responseCallback: HeartbeatCallback = errorCode => responsePromise.success(errorCode)
+ (responseFuture, responseCallback)
+ }
+
+ private def setupCommitOffsetsCallback: (Future[CommitOffsetCallbackParams], CommitOffsetCallback) = {
+ val responsePromise = Promise[CommitOffsetCallbackParams]
+ val responseFuture = responsePromise.future
+ val responseCallback: CommitOffsetCallback = offsets => responsePromise.success(offsets)
+ (responseFuture, responseCallback)
+ }
+
+ private def sendJoinGroup(groupId: String,
+ memberId: String,
+ sessionTimeout: Int,
+ protocolType: String,
+ protocols: List[(String, Array[Byte])],
+ isCoordinatorForGroup: Boolean): Future[JoinGroupResult] = {
+ val (responseFuture, responseCallback) = setupJoinGroupCallback
+ EasyMock.expect(offsetManager.partitionFor(groupId)).andReturn(1)
+ EasyMock.expect(offsetManager.leaderIsLocal(1)).andReturn(isCoordinatorForGroup)
+ EasyMock.replay(offsetManager)
+ consumerCoordinator.handleJoinGroup(groupId, memberId, sessionTimeout, protocolType, protocols, responseCallback)
+ responseFuture
+ }
+
+
+ private def sendSyncGroupLeader(groupId: String,
+ generation: Int,
+ leaderId: String,
+ assignment: Map[String, Array[Byte]],
+ isCoordinatorForGroup: Boolean): Future[SyncGroupCallbackParams] = {
+ val (responseFuture, responseCallback) = setupSyncGroupCallback
+ EasyMock.expect(offsetManager.partitionFor(groupId)).andReturn(1)
+ EasyMock.expect(offsetManager.leaderIsLocal(1)).andReturn(isCoordinatorForGroup)
+ EasyMock.replay(offsetManager)
+ consumerCoordinator.handleSyncGroup(groupId, generation, leaderId, assignment, responseCallback)
+ responseFuture
+ }
+
+ private def sendSyncGroupFollower(groupId: String,
+ generation: Int,
+ memberId: String,
+ isCoordinatorForGroup: Boolean): Future[SyncGroupCallbackParams] = {
+ val (responseFuture, responseCallback) = setupSyncGroupCallback
+ EasyMock.expect(offsetManager.partitionFor(groupId)).andReturn(1)
+ EasyMock.expect(offsetManager.leaderIsLocal(1)).andReturn(isCoordinatorForGroup)
+ EasyMock.replay(offsetManager)
+ consumerCoordinator.handleSyncGroup(groupId, generation, memberId, Map.empty[String, Array[Byte]], responseCallback)
+ responseFuture
+ }
+
+ private def joinGroup(groupId: String,
+ memberId: String,
+ sessionTimeout: Int,
+ protocolType: String,
+ protocols: List[(String, Array[Byte])],
+ isCoordinatorForGroup: Boolean): JoinGroupResult = {
+ val responseFuture = sendJoinGroup(groupId, memberId, sessionTimeout, protocolType, protocols, isCoordinatorForGroup)
+ // should only have to wait as long as session timeout, but allow some extra time in case of an unexpected delay
+ Await.result(responseFuture, Duration(sessionTimeout+100, TimeUnit.MILLISECONDS))
+ }
+
+
+ private def syncGroupFollower(groupId: String,
+ generationId: Int,
+ memberId: String,
+ isCoordinatorForGroup: Boolean): SyncGroupCallbackParams = {
+ val responseFuture = sendSyncGroupFollower(groupId, generationId, memberId, isCoordinatorForGroup)
+ Await.result(responseFuture, Duration(DefaultSessionTimeout+100, TimeUnit.MILLISECONDS))
+ }
+
+ private def syncGroupLeader(groupId: String,
+ generationId: Int,
+ memberId: String,
+ assignment: Map[String, Array[Byte]],
+ isCoordinatorForGroup: Boolean): SyncGroupCallbackParams = {
+ val responseFuture = sendSyncGroupLeader(groupId, generationId, memberId, assignment, isCoordinatorForGroup)
+ Await.result(responseFuture, Duration(DefaultSessionTimeout+100, TimeUnit.MILLISECONDS))
+ }
+
+ private def heartbeat(groupId: String,
+ consumerId: String,
+ generationId: Int,
+ isCoordinatorForGroup: Boolean): HeartbeatCallbackParams = {
+ val (responseFuture, responseCallback) = setupHeartbeatCallback
+ EasyMock.expect(offsetManager.partitionFor(groupId)).andReturn(1)
+ EasyMock.expect(offsetManager.leaderIsLocal(1)).andReturn(isCoordinatorForGroup)
+ EasyMock.replay(offsetManager)
+ consumerCoordinator.handleHeartbeat(groupId, consumerId, generationId, responseCallback)
+ Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
+ }
+
+ private def await[T](future: Future[T], millis: Long): T = {
+ Await.result(future, Duration(millis, TimeUnit.MILLISECONDS))
+ }
+
+ private def commitOffsets(groupId: String,
+ consumerId: String,
+ generationId: Int,
+ offsets: Map[TopicAndPartition, OffsetAndMetadata],
+ isCoordinatorForGroup: Boolean): CommitOffsetCallbackParams = {
+ val (responseFuture, responseCallback) = setupCommitOffsetsCallback
+ EasyMock.expect(offsetManager.partitionFor(groupId)).andReturn(1)
+ EasyMock.expect(offsetManager.leaderIsLocal(1)).andReturn(isCoordinatorForGroup)
+ val storeOffsetAnswer = new IAnswer[Unit] {
+ override def answer = responseCallback.apply(offsets.mapValues(_ => Errors.NONE.code))
+ }
+ EasyMock.expect(offsetManager.storeOffsets(groupId, consumerId, generationId, offsets, responseCallback))
+ .andAnswer(storeOffsetAnswer)
+ EasyMock.replay(offsetManager)
+ consumerCoordinator.handleCommitOffsets(groupId, consumerId, generationId, offsets, responseCallback)
+ Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
+ }
+
+ private def leaveGroup(groupId: String, consumerId: String, isCoordinatorForGroup: Boolean): LeaveGroupCallbackParams = {
+ val (responseFuture, responseCallback) = setupHeartbeatCallback
+ EasyMock.expect(offsetManager.partitionFor(groupId)).andReturn(1)
+ EasyMock.expect(offsetManager.leaderIsLocal(1)).andReturn(isCoordinatorForGroup)
+ EasyMock.replay(offsetManager)
+ consumerCoordinator.handleLeaveGroup(groupId, consumerId, responseCallback)
+ Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
+ }
+}