You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/12/10 22:32:45 UTC

[kafka] branch trunk updated: KAFKA-7610; Proactively timeout new group members if rebalance is delayed (#5962)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 20069b3  KAFKA-7610; Proactively timeout new group members if rebalance is delayed (#5962)
20069b3 is described below

commit 20069b390637813c289175327f08fe410fd0cb71
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Mon Dec 10 14:32:29 2018 -0800

    KAFKA-7610; Proactively timeout new group members if rebalance is delayed (#5962)
    
    When a consumer first joins a group, it doesn't have an assigned memberId. If the rebalance is delayed for some reason, the client may disconnect after a request timeout and retry. Since the client had not received its memberId, then we do not have a way to detect the retry and expire the previously generated member id. This can lead to unbounded growth in the size of the group until the rebalance has completed.
    
    This patch fixes the problem by proactively completing all JoinGroup requests for new members after a timeout of 5 minutes. If the client is still around, we expect it to retry.
    
    Reviewers: Stanislav Kozlovski <st...@outlook.com>, Boyang Chen <bc...@outlook.com>, Guozhang Wang <wa...@gmail.com>
---
 .../kafka/coordinator/group/DelayedHeartbeat.scala | 10 ++--
 .../kafka/coordinator/group/GroupCoordinator.scala | 41 +++++++++++-----
 .../kafka/coordinator/group/GroupMetadata.scala    | 12 ++---
 .../kafka/coordinator/group/MemberMetadata.scala   |  8 ++++
 .../coordinator/group/GroupCoordinatorTest.scala   | 55 +++++++++++++++++++---
 5 files changed, 98 insertions(+), 28 deletions(-)

diff --git a/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala b/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala
index 5f16acb..9377518 100644
--- a/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala
+++ b/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala
@@ -26,11 +26,11 @@ import kafka.server.DelayedOperation
 private[group] class DelayedHeartbeat(coordinator: GroupCoordinator,
                                       group: GroupMetadata,
                                       member: MemberMetadata,
-                                      heartbeatDeadline: Long,
-                                      sessionTimeout: Long)
-  extends DelayedOperation(sessionTimeout, Some(group.lock)) {
+                                      deadline: Long,
+                                      timeoutMs: Long)
+  extends DelayedOperation(timeoutMs, Some(group.lock)) {
 
-  override def tryComplete(): Boolean = coordinator.tryCompleteHeartbeat(group, member, heartbeatDeadline, forceComplete _)
-  override def onExpiration() = coordinator.onExpireHeartbeat(group, member, heartbeatDeadline)
+  override def tryComplete(): Boolean = coordinator.tryCompleteHeartbeat(group, member, deadline, forceComplete _)
+  override def onExpiration() = coordinator.onExpireHeartbeat(group, member, deadline)
   override def onComplete() = coordinator.onCompleteHeartbeat()
 }
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index db89f14..007c6ee 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -600,7 +600,7 @@ class GroupCoordinator(val brokerId: Int,
         case Empty | Dead =>
         case PreparingRebalance =>
           for (member <- group.allMemberMetadata) {
-            group.invokeJoinCallback(member, joinError(member.memberId, Errors.NOT_COORDINATOR))
+            group.maybeInvokeJoinCallback(member, joinError(member.memberId, Errors.NOT_COORDINATOR))
           }
 
           joinPurgatory.checkAndComplete(GroupKey(group.groupId))
@@ -674,14 +674,18 @@ class GroupCoordinator(val brokerId: Int,
    * Complete existing DelayedHeartbeats for the given member and schedule the next one
    */
   private def completeAndScheduleNextHeartbeatExpiration(group: GroupMetadata, member: MemberMetadata) {
+    completeAndScheduleNextExpiration(group, member, member.sessionTimeoutMs)
+  }
+
+  private def completeAndScheduleNextExpiration(group: GroupMetadata, member: MemberMetadata, timeoutMs: Long): Unit = {
     // complete current heartbeat expectation
     member.latestHeartbeat = time.milliseconds()
     val memberKey = MemberKey(member.groupId, member.memberId)
     heartbeatPurgatory.checkAndComplete(memberKey)
 
     // reschedule the next heartbeat expiration deadline
-    val newHeartbeatDeadline = member.latestHeartbeat + member.sessionTimeoutMs
-    val delayedHeartbeat = new DelayedHeartbeat(this, group, member, newHeartbeatDeadline, member.sessionTimeoutMs)
+    val deadline = member.latestHeartbeat + timeoutMs
+    val delayedHeartbeat = new DelayedHeartbeat(this, group, member, deadline, timeoutMs)
     heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(memberKey))
   }
 
@@ -702,11 +706,23 @@ class GroupCoordinator(val brokerId: Int,
     val memberId = clientId + "-" + group.generateMemberIdSuffix
     val member = new MemberMetadata(memberId, group.groupId, clientId, clientHost, rebalanceTimeoutMs,
       sessionTimeoutMs, protocolType, protocols)
+
+    member.isNew = true
+
     // update the newMemberAdded flag to indicate that the join group can be further delayed
     if (group.is(PreparingRebalance) && group.generationId == 0)
       group.newMemberAdded = true
 
     group.add(member, callback)
+
+    // The session timeout does not affect new members since they do not have their memberId and
+    // cannot send heartbeats. Furthermore, we cannot detect disconnects because sockets are muted
+    // while the JoinGroup is in purgatory. If the client does disconnect (e.g. because of a request
+    // timeout during a long rebalance), they may simply retry which will lead to a lot of defunct
+    // members in the rebalance. To prevent this going on indefinitely, we timeout JoinGroup requests
+    // for new members. If the new member is still there, we expect it to retry.
+    completeAndScheduleNextExpiration(group, member, NewMemberJoinTimeoutMs)
+
     maybePrepareRebalance(group, s"Adding new member $memberId")
     member
   }
@@ -751,7 +767,13 @@ class GroupCoordinator(val brokerId: Int,
   }
 
   private def removeMemberAndUpdateGroup(group: GroupMetadata, member: MemberMetadata, reason: String) {
+    // New members may timeout with a pending JoinGroup while the group is still rebalancing, so we have
+    // to invoke the callback before removing the member. We return UNKNOWN_MEMBER_ID so that the consumer
+    // will retry the JoinGroup request if is still active.
+    group.maybeInvokeJoinCallback(member, joinError(NoMemberId, Errors.UNKNOWN_MEMBER_ID))
+
     group.remove(member.memberId)
+
     group.currentState match {
       case Dead | Empty =>
       case Stable | CompletingRebalance => maybePrepareRebalance(group, reason)
@@ -813,8 +835,9 @@ class GroupCoordinator(val brokerId: Int,
               leaderId = group.leaderOrNull,
               error = Errors.NONE)
 
-            group.invokeJoinCallback(member, joinResult)
+            group.maybeInvokeJoinCallback(member, joinResult)
             completeAndScheduleNextHeartbeatExpiration(group, member)
+            member.isNew = false
           }
         }
       }
@@ -823,7 +846,7 @@ class GroupCoordinator(val brokerId: Int,
 
   def tryCompleteHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long, forceComplete: () => Boolean) = {
     group.inLock {
-      if (shouldKeepMemberAlive(member, heartbeatDeadline) || member.isLeaving)
+      if (member.shouldKeepAlive(heartbeatDeadline) || member.isLeaving)
         forceComplete()
       else false
     }
@@ -831,7 +854,7 @@ class GroupCoordinator(val brokerId: Int,
 
   def onExpireHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long) {
     group.inLock {
-      if (!shouldKeepMemberAlive(member, heartbeatDeadline)) {
+      if (!member.shouldKeepAlive(heartbeatDeadline)) {
         info(s"Member ${member.memberId} in group ${group.groupId} has failed, removing it from the group")
         removeMemberAndUpdateGroup(group, member, s"removing member ${member.memberId} on heartbeat expiration")
       }
@@ -844,11 +867,6 @@ class GroupCoordinator(val brokerId: Int,
 
   def partitionFor(group: String): Int = groupManager.partitionFor(group)
 
-  private def shouldKeepMemberAlive(member: MemberMetadata, heartbeatDeadline: Long) =
-    member.awaitingJoinCallback != null ||
-      member.awaitingSyncCallback != null ||
-      member.latestHeartbeat + member.sessionTimeoutMs > heartbeatDeadline
-
   private def isCoordinatorForGroup(groupId: String) = groupManager.isGroupLocal(groupId)
 
   private def isCoordinatorLoadInProgress(groupId: String) = groupManager.isGroupLoading(groupId)
@@ -865,6 +883,7 @@ object GroupCoordinator {
   val NoMembers = List[MemberSummary]()
   val EmptyGroup = GroupSummary(NoState, NoProtocolType, NoProtocol, NoMembers)
   val DeadGroup = GroupSummary(Dead.toString, NoProtocolType, NoProtocol, NoMembers)
+  val NewMemberJoinTimeoutMs: Int = 5 * 60 * 1000
 
   def apply(config: KafkaConfig,
             zkClient: KafkaZkClient,
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
index cbe78e9..e2d9c5f 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
@@ -220,7 +220,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
     member.supportedProtocols.foreach{ case (protocol, _) => supportedProtocols(protocol) += 1 }
     member.awaitingJoinCallback = callback
     if (member.awaitingJoinCallback != null)
-      numMembersAwaitingJoin += 1;
+      numMembersAwaitingJoin += 1
   }
 
   def remove(memberId: String) {
@@ -300,19 +300,19 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
     member.supportedProtocols = protocols
 
     if (callback != null && member.awaitingJoinCallback == null) {
-      numMembersAwaitingJoin += 1;
+      numMembersAwaitingJoin += 1
     } else if (callback == null && member.awaitingJoinCallback != null) {
-      numMembersAwaitingJoin -= 1;
+      numMembersAwaitingJoin -= 1
     }
     member.awaitingJoinCallback = callback
   }
 
-  def invokeJoinCallback(member: MemberMetadata,
-                         joinGroupResult: JoinGroupResult) : Unit = {
+  def maybeInvokeJoinCallback(member: MemberMetadata,
+                              joinGroupResult: JoinGroupResult) : Unit = {
     if (member.awaitingJoinCallback != null) {
       member.awaitingJoinCallback(joinGroupResult)
       member.awaitingJoinCallback = null
-      numMembersAwaitingJoin -= 1;
+      numMembersAwaitingJoin -= 1
     }
   }
 
diff --git a/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala b/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala
index b082b9b..8649b3e 100644
--- a/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala
@@ -64,6 +64,7 @@ private[group] class MemberMetadata(val memberId: String,
   var awaitingSyncCallback: (Array[Byte], Errors) => Unit = null
   var latestHeartbeat: Long = -1
   var isLeaving: Boolean = false
+  var isNew: Boolean = false
 
   def protocols = supportedProtocols.map(_._1).toSet
 
@@ -78,6 +79,13 @@ private[group] class MemberMetadata(val memberId: String,
     }
   }
 
+  def shouldKeepAlive(deadlineMs: Long): Boolean = {
+    if (awaitingJoinCallback != null)
+      !isNew || latestHeartbeat + GroupCoordinator.NewMemberJoinTimeoutMs > deadlineMs
+    else awaitingSyncCallback != null ||
+      latestHeartbeat + sessionTimeoutMs > deadlineMs
+  }
+
   /**
    * Check if the provided protocol metadata matches the currently stored metadata.
    */
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index c162342..1ef695e 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -56,8 +56,8 @@ class GroupCoordinatorTest extends JUnitSuite {
 
   val ClientId = "consumer-test"
   val ClientHost = "localhost"
-  val ConsumerMinSessionTimeout = 10
-  val ConsumerMaxSessionTimeout = 1000
+  val GroupMinSessionTimeout = 10
+  val GroupMaxSessionTimeout = 10 * 60 * 1000
   val DefaultRebalanceTimeout = 500
   val DefaultSessionTimeout = 500
   val GroupInitialRebalanceDelay = 50
@@ -80,8 +80,8 @@ class GroupCoordinatorTest extends JUnitSuite {
   @Before
   def setUp() {
     val props = TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "")
-    props.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, ConsumerMinSessionTimeout.toString)
-    props.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp, ConsumerMaxSessionTimeout.toString)
+    props.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, GroupMinSessionTimeout.toString)
+    props.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp, GroupMaxSessionTimeout.toString)
     props.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsProp, GroupInitialRebalanceDelay.toString)
     // make two partitions of the group topic to make sure some partitions are not owned by the coordinator
     val ret = mutable.Map[String, Map[Int, Seq[Int]]]()
@@ -194,7 +194,7 @@ class GroupCoordinatorTest extends JUnitSuite {
   def testJoinGroupSessionTimeoutTooSmall() {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
 
-    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols, sessionTimeout = ConsumerMinSessionTimeout - 1)
+    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols, sessionTimeout = GroupMinSessionTimeout - 1)
     val joinGroupError = joinGroupResult.error
     assertEquals(Errors.INVALID_SESSION_TIMEOUT, joinGroupError)
   }
@@ -203,7 +203,7 @@ class GroupCoordinatorTest extends JUnitSuite {
   def testJoinGroupSessionTimeoutTooLarge() {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
 
-    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols, sessionTimeout = ConsumerMaxSessionTimeout + 1)
+    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols, sessionTimeout = GroupMaxSessionTimeout + 1)
     val joinGroupError = joinGroupResult.error
     assertEquals(Errors.INVALID_SESSION_TIMEOUT, joinGroupError)
   }
@@ -263,6 +263,49 @@ class GroupCoordinatorTest extends JUnitSuite {
   }
 
   @Test
+  def testNewMemberJoinExpiration(): Unit = {
+    // This tests new member expiration during a protracted rebalance. We first create a
+    // group with one member which uses a large value for session timeout and rebalance timeout.
+    // We then join with one new member and let the rebalance hang while we await the first member.
+    // The new member join timeout expires and its JoinGroup request is failed.
+
+    val sessionTimeout = GroupCoordinator.NewMemberJoinTimeoutMs + 5000
+    val rebalanceTimeout = GroupCoordinator.NewMemberJoinTimeoutMs * 2
+
+    val firstJoinResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols,
+      sessionTimeout, rebalanceTimeout)
+    val firstMemberId = firstJoinResult.memberId
+    assertEquals(firstMemberId, firstJoinResult.leaderId)
+    assertEquals(Errors.NONE, firstJoinResult.error)
+
+    val groupOpt = groupCoordinator.groupManager.getGroup(groupId)
+    assertTrue(groupOpt.isDefined)
+    val group = groupOpt.get
+    assertEquals(0, group.allMemberMetadata.count(_.isNew))
+
+    EasyMock.reset(replicaManager)
+
+    val responseFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols,
+      rebalanceTimeout, sessionTimeout)
+    assertFalse(responseFuture.isCompleted)
+
+    assertEquals(2, group.allMembers.size)
+    assertEquals(1, group.allMemberMetadata.count(_.isNew))
+
+    val newMember = group.allMemberMetadata.find(_.isNew).get
+    assertNotEquals(firstMemberId, newMember.memberId)
+
+    timer.advanceClock(GroupCoordinator.NewMemberJoinTimeoutMs + 1)
+    assertTrue(responseFuture.isCompleted)
+
+    val response = Await.result(responseFuture, Duration(0, TimeUnit.MILLISECONDS))
+    assertEquals(Errors.UNKNOWN_MEMBER_ID, response.error)
+    assertEquals(1, group.allMembers.size)
+    assertEquals(0, group.allMemberMetadata.count(_.isNew))
+    assertEquals(firstMemberId, group.allMembers.head)
+  }
+
+  @Test
   def testJoinGroupInconsistentGroupProtocol() {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID