You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/06/12 22:38:23 UTC
[kafka] branch trunk updated: MINOR: add group coordinator test
coverage (#6926)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f396372 MINOR: add group coordinator test coverage (#6926)
f396372 is described below
commit f396372fb894b43c4ca85f1a71bd1b1750ca3e05
Author: Boyang Chen <bo...@confluent.io>
AuthorDate: Wed Jun 12 15:38:06 2019 -0700
MINOR: add group coordinator test coverage (#6926)
Some edge cases are not currently being tested. Add more tests to cover.
Reviewers: Guozhang Wang <wa...@gmail.com>
---
.../coordinator/group/GroupCoordinatorTest.scala | 36 +++++++++++++++++++++-
1 file changed, 35 insertions(+), 1 deletion(-)
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index d5d33fb..4cd91dd 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -934,8 +934,27 @@ class GroupCoordinatorTest {
}
@Test
+ def staticMemberReJoinWithIllegalStateAsUnknownMember() {
+ staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
+ val group = groupCoordinator.groupManager.getGroup(groupId).get
+ group.transitionTo(PreparingRebalance)
+ group.transitionTo(Empty)
+
+ EasyMock.reset(replicaManager)
+
+ // Illegal state exception shall trigger since follower id resides in pending member bucket.
+ val expectedException = intercept[IllegalStateException] {
+ staticJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, followerInstanceId, protocolType, protocolSuperset, clockAdvance = 1)
+ }
+
+ val message = expectedException.getMessage
+ assertTrue(message.contains(group.groupId))
+ assertTrue(message.contains(followerInstanceId.get))
+ }
+
+ @Test
def staticMemberReJoinWithIllegalArgumentAsMissingOldMember() {
- val _ = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
+ staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
val group = groupCoordinator.groupManager.getGroup(groupId).get
val invalidMemberId = "invalid_member_id"
group.addStaticMember(followerInstanceId, invalidMemberId)
@@ -1186,6 +1205,21 @@ class GroupCoordinatorTest {
}
@Test
+ def testheartbeatEmptyGroup() {
+ val memberId = "memberId"
+
+ val group = new GroupMetadata(groupId, Empty, new MockTime())
+ val member = new MemberMetadata(memberId, groupId, groupInstanceId,
+ ClientId, ClientHost, DefaultRebalanceTimeout, DefaultSessionTimeout,
+ protocolType, List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte])))
+
+ group.add(member)
+ groupCoordinator.groupManager.addGroup(group)
+ val heartbeatResult = heartbeat(groupId, memberId, 0)
+ assertEquals(Errors.UNKNOWN_MEMBER_ID, heartbeatResult)
+ }
+
+ @Test
def testHeartbeatUnknownConsumerExistingGroup() {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
val otherMemberId = "memberId"