You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/06/12 22:38:23 UTC

[kafka] branch trunk updated: MINOR: add group coordinator test coverage (#6926)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f396372  MINOR: add group coordinator test coverage (#6926)
f396372 is described below

commit f396372fb894b43c4ca85f1a71bd1b1750ca3e05
Author: Boyang Chen <bo...@confluent.io>
AuthorDate: Wed Jun 12 15:38:06 2019 -0700

    MINOR: add group coordinator test coverage (#6926)
    
    Some edge cases are not currently being tested. Add more tests to cover.
    
    Reviewers: Guozhang Wang <wa...@gmail.com>
---
 .../coordinator/group/GroupCoordinatorTest.scala   | 36 +++++++++++++++++++++-
 1 file changed, 35 insertions(+), 1 deletion(-)

diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index d5d33fb..4cd91dd 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -934,8 +934,27 @@ class GroupCoordinatorTest {
   }
 
   @Test
+  def staticMemberReJoinWithIllegalStateAsUnknownMember() {
+    staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
+    val group = groupCoordinator.groupManager.getGroup(groupId).get
+    group.transitionTo(PreparingRebalance)
+    group.transitionTo(Empty)
+
+    EasyMock.reset(replicaManager)
+
+    // Illegal state exception shall trigger since follower id resides in pending member bucket.
+    val expectedException = intercept[IllegalStateException] {
+      staticJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, followerInstanceId, protocolType, protocolSuperset, clockAdvance = 1)
+    }
+
+    val message = expectedException.getMessage
+    assertTrue(message.contains(group.groupId))
+    assertTrue(message.contains(followerInstanceId.get))
+  }
+
+  @Test
   def staticMemberReJoinWithIllegalArgumentAsMissingOldMember() {
-    val _ = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
+    staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
     val group = groupCoordinator.groupManager.getGroup(groupId).get
     val invalidMemberId = "invalid_member_id"
     group.addStaticMember(followerInstanceId, invalidMemberId)
@@ -1186,6 +1205,21 @@ class GroupCoordinatorTest {
   }
 
   @Test
+  def testheartbeatEmptyGroup() {
+    val memberId = "memberId"
+
+    val group = new GroupMetadata(groupId, Empty, new MockTime())
+    val member = new MemberMetadata(memberId, groupId, groupInstanceId,
+      ClientId, ClientHost, DefaultRebalanceTimeout, DefaultSessionTimeout,
+      protocolType, List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte])))
+
+    group.add(member)
+    groupCoordinator.groupManager.addGroup(group)
+    val heartbeatResult = heartbeat(groupId, memberId, 0)
+    assertEquals(Errors.UNKNOWN_MEMBER_ID, heartbeatResult)
+  }
+
+  @Test
   def testHeartbeatUnknownConsumerExistingGroup() {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
     val otherMemberId = "memberId"