You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/03/25 05:34:41 UTC

[kafka] branch 2.4 updated: KAFKA-9752; New member timeout can leave group rebalance stuck (#8339)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new bb2ea95  KAFKA-9752; New member timeout can leave group rebalance stuck (#8339)
bb2ea95 is described below

commit bb2ea951ba20f253bf46c61176b1d091d6c2fd33
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Tue Mar 24 22:16:49 2020 -0700

    KAFKA-9752; New member timeout can leave group rebalance stuck (#8339)
    
    Older versions of the JoinGroup rely on a new member timeout to keep the group from growing indefinitely in the case of client disconnects and retrying. The logic for resetting the heartbeat expiration task following completion of the rebalance failed to account for an implicit expectation that shouldKeepAlive would return false the first time it is invoked when a heartbeat expiration is scheduled. This patch fixes the issue by making heartbeat satisfaction logic explicit.
    
    Reviewers:  Chia-Ping Tsai <ch...@gmail.com>, Guozhang Wang <wa...@gmail.com>, Rajini Sivaram <ra...@googlemail.com>
---
 .../kafka/coordinator/group/DelayedHeartbeat.scala |   5 +-
 .../kafka/coordinator/group/GroupCoordinator.scala |  37 +++----
 .../kafka/coordinator/group/MemberMetadata.scala   |  20 ++--
 .../coordinator/group/GroupCoordinatorTest.scala   | 107 +++++++++++++++++++--
 4 files changed, 133 insertions(+), 36 deletions(-)

diff --git a/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala b/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala
index 09c5eea..3f402d9 100644
--- a/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala
+++ b/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala
@@ -27,11 +27,10 @@ private[group] class DelayedHeartbeat(coordinator: GroupCoordinator,
                                       group: GroupMetadata,
                                       memberId: String,
                                       isPending: Boolean,
-                                      deadline: Long,
                                       timeoutMs: Long)
   extends DelayedOperation(timeoutMs, Some(group.lock)) {
 
-  override def tryComplete(): Boolean = coordinator.tryCompleteHeartbeat(group, memberId, isPending, deadline, forceComplete _)
-  override def onExpiration() = coordinator.onExpireHeartbeat(group, memberId, isPending, deadline)
+  override def tryComplete(): Boolean = coordinator.tryCompleteHeartbeat(group, memberId, isPending, forceComplete _)
+  override def onExpiration() = coordinator.onExpireHeartbeat(group, memberId, isPending)
   override def onComplete() = coordinator.onCompleteHeartbeat()
 }
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 7b69632..5f812b1 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -910,14 +910,15 @@ class GroupCoordinator(val brokerId: Int,
   }
 
   private def completeAndScheduleNextExpiration(group: GroupMetadata, member: MemberMetadata, timeoutMs: Long): Unit = {
-    // complete current heartbeat expectation
-    member.latestHeartbeat = time.milliseconds()
     val memberKey = MemberKey(member.groupId, member.memberId)
+
+    // complete current heartbeat expectation
+    member.heartbeatSatisfied = true
     heartbeatPurgatory.checkAndComplete(memberKey)
 
     // reschedule the next heartbeat expiration deadline
-    val deadline = member.latestHeartbeat + timeoutMs
-    val delayedHeartbeat = new DelayedHeartbeat(this, group, member.memberId, isPending = false, deadline, timeoutMs)
+    member.heartbeatSatisfied = false
+    val delayedHeartbeat = new DelayedHeartbeat(this, group, member.memberId, isPending = false, timeoutMs)
     heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(memberKey))
   }
 
@@ -926,8 +927,7 @@ class GroupCoordinator(val brokerId: Int,
     */
   private def addPendingMemberExpiration(group: GroupMetadata, pendingMemberId: String, timeoutMs: Long): Unit = {
     val pendingMemberKey = MemberKey(group.groupId, pendingMemberId)
-    val deadline = time.milliseconds() + timeoutMs
-    val delayedHeartbeat = new DelayedHeartbeat(this, group, pendingMemberId, isPending = true, deadline, timeoutMs)
+    val delayedHeartbeat = new DelayedHeartbeat(this, group, pendingMemberId, isPending = true, timeoutMs)
     heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(pendingMemberKey))
   }
 
@@ -1110,7 +1110,10 @@ class GroupCoordinator(val brokerId: Int,
     }
   }
 
-  def tryCompleteHeartbeat(group: GroupMetadata, memberId: String, isPending: Boolean, heartbeatDeadline: Long, forceComplete: () => Boolean) = {
+  def tryCompleteHeartbeat(group: GroupMetadata,
+                           memberId: String,
+                           isPending: Boolean,
+                           forceComplete: () => Boolean): Boolean = {
     group.inLock {
       // The group has been unloaded and invalid, we should complete the heartbeat.
       if (group.is(Dead)) {
@@ -1120,25 +1123,23 @@ class GroupCoordinator(val brokerId: Int,
         if (group.has(memberId)) {
           forceComplete()
         } else false
-      } else {
-        if (shouldCompleteNonPendingHeartbeat(group, memberId, heartbeatDeadline)) {
-          forceComplete()
-        } else false
-      }
+      } else if (shouldCompleteNonPendingHeartbeat(group, memberId)) {
+        forceComplete()
+      } else false
     }
   }
 
-  def shouldCompleteNonPendingHeartbeat(group: GroupMetadata, memberId: String, heartbeatDeadline: Long): Boolean = {
+  def shouldCompleteNonPendingHeartbeat(group: GroupMetadata, memberId: String): Boolean = {
     if (group.has(memberId)) {
       val member = group.get(memberId)
-      member.shouldKeepAlive(heartbeatDeadline) || member.isLeaving
+      member.hasSatisfiedHeartbeat || member.isLeaving
     } else {
-      info(s"Member id $memberId was not found in ${group.groupId} during heartbeat expiration.")
-      false
+      info(s"Member id $memberId was not found in ${group.groupId} during heartbeat completion check")
+      true
     }
   }
 
-  def onExpireHeartbeat(group: GroupMetadata, memberId: String, isPending: Boolean, heartbeatDeadline: Long): Unit = {
+  def onExpireHeartbeat(group: GroupMetadata, memberId: String, isPending: Boolean): Unit = {
     group.inLock {
       if (group.is(Dead)) {
         info(s"Received notification of heartbeat expiration for member $memberId after group ${group.groupId} had already been unloaded or deleted.")
@@ -1149,7 +1150,7 @@ class GroupCoordinator(val brokerId: Int,
         debug(s"Member $memberId has already been removed from the group.")
       } else {
         val member = group.get(memberId)
-        if (!member.shouldKeepAlive(heartbeatDeadline)) {
+        if (!member.hasSatisfiedHeartbeat) {
           info(s"Member ${member.memberId} in group ${group.groupId} has failed, removing it from the group")
           removeMemberAndUpdateGroup(group, member, s"removing member ${member.memberId} on heartbeat expiration")
         }
diff --git a/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala b/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala
index f9a45d4..e9ec23e 100644
--- a/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala
@@ -66,11 +66,17 @@ private[group] class MemberMetadata(var memberId: String,
   var assignment: Array[Byte] = Array.empty[Byte]
   var awaitingJoinCallback: JoinGroupResult => Unit = null
   var awaitingSyncCallback: SyncGroupResult => Unit = null
-  var latestHeartbeat: Long = -1
   var isLeaving: Boolean = false
   var isNew: Boolean = false
   val isStaticMember: Boolean = groupInstanceId.isDefined
 
+  // This variable is used to track heartbeat completion through the delayed
+  // heartbeat purgatory. When scheduling a new heartbeat expiration, we set
+  // this value to `false`. Upon receiving the heartbeat (or any other event
+  // indicating the liveness of the client), we set it to `true` so that the
+  // delayed heartbeat can be completed.
+  var heartbeatSatisfied: Boolean = false
+
   def isAwaitingJoin = awaitingJoinCallback != null
   def isAwaitingSync = awaitingSyncCallback != null
 
@@ -85,16 +91,16 @@ private[group] class MemberMetadata(var memberId: String,
     }
   }
 
-  def shouldKeepAlive(deadlineMs: Long): Boolean = {
+  def hasSatisfiedHeartbeat: Boolean = {
     if (isNew) {
-      // New members are expired after the static join timeout
-      latestHeartbeat + GroupCoordinator.NewMemberJoinTimeoutMs > deadlineMs
+      // New members can be expired while awaiting join, so we have to check this first
+      heartbeatSatisfied
     } else if (isAwaitingJoin || isAwaitingSync) {
-      // Don't remove members as long as they have a request in purgatory
+      // Members that are awaiting a rebalance automatically satisfy expected heartbeats
       true
     } else {
-      // Otherwise check for session expiration
-      latestHeartbeat + sessionTimeoutMs > deadlineMs
+      // Otherwise we require the next heartbeat
+      heartbeatSatisfied
     }
   }
 
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 54116a3..4d6d694 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -386,6 +386,95 @@ class GroupCoordinatorTest {
   }
 
   @Test
+  def testNewMemberFailureAfterJoinGroupCompletion(): Unit = {
+    // For old versions of the JoinGroup protocol, new members were subject
+    // to expiration if the rebalance took long enough. This test case ensures
+    // that following completion of the JoinGroup phase, new members follow
+    // normal heartbeat expiration logic.
+
+    val firstJoinResult = dynamicJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
+    val firstMemberId = firstJoinResult.memberId
+    val firstGenerationId = firstJoinResult.generationId
+    assertEquals(firstMemberId, firstJoinResult.leaderId)
+    assertEquals(Errors.NONE, firstJoinResult.error)
+
+    EasyMock.reset(replicaManager)
+    val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId,
+      Map(firstMemberId -> Array[Byte]()))
+    assertEquals(Errors.NONE, firstSyncResult._2)
+
+    EasyMock.reset(replicaManager)
+    val otherJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
+
+    EasyMock.reset(replicaManager)
+    val joinFuture = sendJoinGroup(groupId, firstMemberId, protocolType, protocols,
+      requireKnownMemberId = false)
+
+    val joinResult = await(joinFuture, DefaultSessionTimeout+100)
+    val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100)
+    assertEquals(Errors.NONE, joinResult.error)
+    assertEquals(Errors.NONE, otherJoinResult.error)
+
+    verifySessionExpiration(groupId)
+  }
+
+  @Test
+  def testNewMemberFailureAfterSyncGroupCompletion(): Unit = {
+    // For old versions of the JoinGroup protocol, new members were subject
+    // to expiration if the rebalance took long enough. This test case ensures
+    // that following completion of the SyncGroup phase, new members follow
+    // normal heartbeat expiration logic.
+
+    val firstJoinResult = dynamicJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
+    val firstMemberId = firstJoinResult.memberId
+    val firstGenerationId = firstJoinResult.generationId
+    assertEquals(firstMemberId, firstJoinResult.leaderId)
+    assertEquals(Errors.NONE, firstJoinResult.error)
+
+    EasyMock.reset(replicaManager)
+    val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId,
+      Map(firstMemberId -> Array[Byte]()))
+    assertEquals(Errors.NONE, firstSyncResult._2)
+
+    EasyMock.reset(replicaManager)
+    val otherJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
+
+    EasyMock.reset(replicaManager)
+    val joinFuture = sendJoinGroup(groupId, firstMemberId, protocolType, protocols,
+      requireKnownMemberId = false)
+
+    val joinResult = await(joinFuture, DefaultSessionTimeout+100)
+    val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100)
+    assertEquals(Errors.NONE, joinResult.error)
+    assertEquals(Errors.NONE, otherJoinResult.error)
+    val secondGenerationId = joinResult.generationId
+    val secondMemberId = otherJoinResult.memberId
+
+    EasyMock.reset(replicaManager)
+    sendSyncGroupFollower(groupId, secondGenerationId, secondMemberId)
+
+    EasyMock.reset(replicaManager)
+    val syncGroupResult = syncGroupLeader(groupId, secondGenerationId, firstMemberId,
+      Map(firstMemberId -> Array.emptyByteArray, secondMemberId -> Array.emptyByteArray))
+    assertEquals(Errors.NONE, syncGroupResult._2)
+
+    verifySessionExpiration(groupId)
+  }
+
+  private def verifySessionExpiration(groupId: String): Unit = {
+    EasyMock.reset(replicaManager)
+    EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject()))
+      .andReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)).anyTimes()
+    EasyMock.replay(replicaManager)
+
+    timer.advanceClock(DefaultSessionTimeout + 1)
+
+    val groupMetadata = group(groupId)
+    assertEquals(Empty, groupMetadata.currentState)
+    assertEquals(0, groupMetadata.allMembers.size)
+  }
+
+  @Test
   def testJoinGroupInconsistentGroupProtocol(): Unit = {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
 
@@ -675,11 +764,13 @@ class GroupCoordinatorTest {
 
   @Test
   def staticMemberRejoinWithLeaderIdAndKnownMemberId(): Unit = {
-    val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId, sessionTimeout = DefaultRebalanceTimeout / 2)
+    val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId,
+      sessionTimeout = DefaultRebalanceTimeout / 2)
 
     // A static leader with known id rejoin will trigger rebalance.
     EasyMock.reset(replicaManager)
-    val joinGroupResult = staticJoinGroup(groupId, rebalanceResult.leaderId, leaderInstanceId, protocolType, protocolSuperset, clockAdvance = DefaultRebalanceTimeout + 1)
+    val joinGroupResult = staticJoinGroup(groupId, rebalanceResult.leaderId, leaderInstanceId,
+      protocolType, protocolSuperset, clockAdvance = DefaultRebalanceTimeout + 1)
     // Timeout follower in the meantime.
     assertFalse(getGroup(groupId).hasStaticMember(followerInstanceId))
     checkJoinGroupResult(joinGroupResult,
@@ -3134,8 +3225,8 @@ class GroupCoordinatorTest {
     val group = getGroup(groupId)
     group.transitionTo(Dead)
     val leaderMemberId = rebalanceResult.leaderId
-    assertTrue(groupCoordinator.tryCompleteHeartbeat(group, leaderMemberId, false, DefaultSessionTimeout, () => true))
-    groupCoordinator.onExpireHeartbeat(group, leaderMemberId, false, DefaultSessionTimeout)
+    assertTrue(groupCoordinator.tryCompleteHeartbeat(group, leaderMemberId, false, () => true))
+    groupCoordinator.onExpireHeartbeat(group, leaderMemberId, false)
     assertTrue(group.has(leaderMemberId))
   }
 
@@ -3147,8 +3238,7 @@ class GroupCoordinatorTest {
     val group = getGroup(groupId)
     val leaderMemberId = rebalanceResult.leaderId
     group.remove(leaderMemberId)
-    assertFalse(groupCoordinator.tryCompleteHeartbeat(group, leaderMemberId, false, DefaultSessionTimeout, () => true))
-    groupCoordinator.onExpireHeartbeat(group, leaderMemberId, false, DefaultSessionTimeout)
+    assertTrue(groupCoordinator.tryCompleteHeartbeat(group, leaderMemberId, false, () => true))
   }
 
   private def getGroup(groupId: String): GroupMetadata = {
@@ -3242,12 +3332,13 @@ class GroupCoordinatorTest {
   private def sendSyncGroupFollower(groupId: String,
                                     generation: Int,
                                     memberId: String,
-                                    groupInstanceId: Option[String]): Future[SyncGroupCallbackParams] = {
+                                    groupInstanceId: Option[String] = None): Future[SyncGroupCallbackParams] = {
     val (responseFuture, responseCallback) = setupSyncGroupCallback
 
     EasyMock.replay(replicaManager)
 
-    groupCoordinator.handleSyncGroup(groupId, generation, memberId, groupInstanceId, Map.empty[String, Array[Byte]], responseCallback)
+    groupCoordinator.handleSyncGroup(groupId, generation, memberId, groupInstanceId,
+      Map.empty[String, Array[Byte]], responseCallback)
     responseFuture
   }