You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/03/25 05:42:43 UTC

[kafka] branch 2.3 updated: KAFKA-9752; New member timeout can leave group rebalance stuck (#8339)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new 3b3eb70  KAFKA-9752; New member timeout can leave group rebalance stuck (#8339)
3b3eb70 is described below

commit 3b3eb705cdcb876e23e7aa4c2086de6244b0b52b
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Tue Mar 24 22:16:49 2020 -0700

    KAFKA-9752; New member timeout can leave group rebalance stuck (#8339)
    
    Older versions of the JoinGroup rely on a new member timeout to keep the group from growing indefinitely in the case of client disconnects and retrying. The logic for resetting the heartbeat expiration task following completion of the rebalance failed to account for an implicit expectation that shouldKeepAlive would return false the first time it is invoked when a heartbeat expiration is scheduled. This patch fixes the issue by making heartbeat satisfaction logic explicit.
    
    Reviewers:  Chia-Ping Tsai <ch...@gmail.com>, Guozhang Wang <wa...@gmail.com>, Rajini Sivaram <ra...@googlemail.com>
---
 .../kafka/coordinator/group/DelayedHeartbeat.scala |   5 +-
 .../kafka/coordinator/group/GroupCoordinator.scala |  39 ++++----
 .../kafka/coordinator/group/MemberMetadata.scala   |  20 ++--
 .../coordinator/group/GroupCoordinatorTest.scala   | 111 +++++++++++++++++++--
 4 files changed, 136 insertions(+), 39 deletions(-)

diff --git a/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala b/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala
index 09c5eea..3f402d9 100644
--- a/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala
+++ b/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala
@@ -27,11 +27,10 @@ private[group] class DelayedHeartbeat(coordinator: GroupCoordinator,
                                       group: GroupMetadata,
                                       memberId: String,
                                       isPending: Boolean,
-                                      deadline: Long,
                                       timeoutMs: Long)
   extends DelayedOperation(timeoutMs, Some(group.lock)) {
 
-  override def tryComplete(): Boolean = coordinator.tryCompleteHeartbeat(group, memberId, isPending, deadline, forceComplete _)
-  override def onExpiration() = coordinator.onExpireHeartbeat(group, memberId, isPending, deadline)
+  override def tryComplete(): Boolean = coordinator.tryCompleteHeartbeat(group, memberId, isPending, forceComplete _)
+  override def onExpiration() = coordinator.onExpireHeartbeat(group, memberId, isPending)
   override def onComplete() = coordinator.onCompleteHeartbeat()
 }
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 1e1a759..bbd4766 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -772,15 +772,16 @@ class GroupCoordinator(val brokerId: Int,
     completeAndScheduleNextExpiration(group, member, member.sessionTimeoutMs)
   }
 
-  private def completeAndScheduleNextExpiration(group: GroupMetadata, member: MemberMetadata, timeoutMs: Long) {
-    // complete current heartbeat expectation
-    member.latestHeartbeat = time.milliseconds()
+  private def completeAndScheduleNextExpiration(group: GroupMetadata, member: MemberMetadata, timeoutMs: Long): Unit = {
     val memberKey = MemberKey(member.groupId, member.memberId)
+
+    // complete current heartbeat expectation
+    member.heartbeatSatisfied = true
     heartbeatPurgatory.checkAndComplete(memberKey)
 
     // reschedule the next heartbeat expiration deadline
-    val deadline = member.latestHeartbeat + timeoutMs
-    val delayedHeartbeat = new DelayedHeartbeat(this, group, member.memberId, isPending = false, deadline, timeoutMs)
+    member.heartbeatSatisfied = false
+    val delayedHeartbeat = new DelayedHeartbeat(this, group, member.memberId, isPending = false, timeoutMs)
     heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(memberKey))
   }
 
@@ -789,8 +790,7 @@ class GroupCoordinator(val brokerId: Int,
     */
   private def addPendingMemberExpiration(group: GroupMetadata, pendingMemberId: String, timeoutMs: Long) {
     val pendingMemberKey = MemberKey(group.groupId, pendingMemberId)
-    val deadline = time.milliseconds() + timeoutMs
-    val delayedHeartbeat = new DelayedHeartbeat(this, group, pendingMemberId, isPending = true, deadline, timeoutMs)
+    val delayedHeartbeat = new DelayedHeartbeat(this, group, pendingMemberId, isPending = true, timeoutMs)
     heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(pendingMemberKey))
   }
 
@@ -973,7 +973,10 @@ class GroupCoordinator(val brokerId: Int,
     }
   }
 
-  def tryCompleteHeartbeat(group: GroupMetadata, memberId: String, isPending: Boolean, heartbeatDeadline: Long, forceComplete: () => Boolean) = {
+  def tryCompleteHeartbeat(group: GroupMetadata,
+                           memberId: String,
+                           isPending: Boolean,
+                           forceComplete: () => Boolean): Boolean = {
     group.inLock {
       // The group has been unloaded and invalid, we should complete the heartbeat.
       if (group.is(Dead)) {
@@ -983,25 +986,23 @@ class GroupCoordinator(val brokerId: Int,
         if (group.has(memberId)) {
           forceComplete()
         } else false
-      } else {
-        if (shouldCompleteNonPendingHeartbeat(group, memberId, heartbeatDeadline)) {
-          forceComplete()
-        } else false
-      }
+      } else if (shouldCompleteNonPendingHeartbeat(group, memberId)) {
+        forceComplete()
+      } else false
     }
   }
 
-  def shouldCompleteNonPendingHeartbeat(group: GroupMetadata, memberId: String, heartbeatDeadline: Long): Boolean = {
+  def shouldCompleteNonPendingHeartbeat(group: GroupMetadata, memberId: String): Boolean = {
     if (group.has(memberId)) {
       val member = group.get(memberId)
-      member.shouldKeepAlive(heartbeatDeadline) || member.isLeaving
+      member.hasSatisfiedHeartbeat || member.isLeaving
     } else {
-      info(s"Member id $memberId was not found in ${group.groupId} during heartbeat expiration.")
-      false
+      info(s"Member id $memberId was not found in ${group.groupId} during heartbeat completion check")
+      true
     }
   }
 
-  def onExpireHeartbeat(group: GroupMetadata, memberId: String, isPending: Boolean, heartbeatDeadline: Long): Unit = {
+  def onExpireHeartbeat(group: GroupMetadata, memberId: String, isPending: Boolean): Unit = {
     group.inLock {
       if (group.is(Dead)) {
         info(s"Received notification of heartbeat expiration for member $memberId after group ${group.groupId} had already been unloaded or deleted.")
@@ -1012,7 +1013,7 @@ class GroupCoordinator(val brokerId: Int,
         debug(s"Member $memberId has already been removed from the group.")
       } else {
         val member = group.get(memberId)
-        if (!member.shouldKeepAlive(heartbeatDeadline)) {
+        if (!member.hasSatisfiedHeartbeat) {
           info(s"Member ${member.memberId} in group ${group.groupId} has failed, removing it from the group")
           removeMemberAndUpdateGroup(group, member, s"removing member ${member.memberId} on heartbeat expiration")
         }
diff --git a/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala b/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala
index fc90c95..c73b0b3 100644
--- a/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala
@@ -67,11 +67,17 @@ private[group] class MemberMetadata(var memberId: String,
   var assignment: Array[Byte] = Array.empty[Byte]
   var awaitingJoinCallback: JoinGroupResult => Unit = null
   var awaitingSyncCallback: SyncGroupResult => Unit = null
-  var latestHeartbeat: Long = -1
   var isLeaving: Boolean = false
   var isNew: Boolean = false
   val isStaticMember: Boolean = groupInstanceId.isDefined
 
+  // This variable is used to track heartbeat completion through the delayed
+  // heartbeat purgatory. When scheduling a new heartbeat expiration, we set
+  // this value to `false`. Upon receiving the heartbeat (or any other event
+  // indicating the liveness of the client), we set it to `true` so that the
+  // delayed heartbeat can be completed.
+  var heartbeatSatisfied: Boolean = false
+
   def isAwaitingJoin = awaitingJoinCallback != null
   def isAwaitingSync = awaitingSyncCallback != null
 
@@ -86,16 +92,16 @@ private[group] class MemberMetadata(var memberId: String,
     }
   }
 
-  def shouldKeepAlive(deadlineMs: Long): Boolean = {
+  def hasSatisfiedHeartbeat: Boolean = {
     if (isNew) {
-      // New members are expired after the static join timeout
-      latestHeartbeat + GroupCoordinator.NewMemberJoinTimeoutMs > deadlineMs
+      // New members can be expired while awaiting join, so we have to check this first
+      heartbeatSatisfied
     } else if (isAwaitingJoin || isAwaitingSync) {
-      // Don't remove members as long as they have a request in purgatory
+      // Members that are awaiting a rebalance automatically satisfy expected heartbeats
       true
     } else {
-      // Otherwise check for session expiration
-      latestHeartbeat + sessionTimeoutMs > deadlineMs
+      // Otherwise we require the next heartbeat
+      heartbeatSatisfied
     }
   }
 
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index d62a123..3b79e98 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -379,7 +379,96 @@ class GroupCoordinatorTest {
   }
 
   @Test
-  def testJoinGroupInconsistentGroupProtocol() {
+  def testNewMemberFailureAfterJoinGroupCompletion(): Unit = {
+    // For old versions of the JoinGroup protocol, new members were subject
+    // to expiration if the rebalance took long enough. This test case ensures
+    // that following completion of the JoinGroup phase, new members follow
+    // normal heartbeat expiration logic.
+
+    val firstJoinResult = dynamicJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
+    val firstMemberId = firstJoinResult.memberId
+    val firstGenerationId = firstJoinResult.generationId
+    assertEquals(firstMemberId, firstJoinResult.leaderId)
+    assertEquals(Errors.NONE, firstJoinResult.error)
+
+    EasyMock.reset(replicaManager)
+    val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId,
+      Map(firstMemberId -> Array[Byte]()))
+    assertEquals(Errors.NONE, firstSyncResult._2)
+
+    EasyMock.reset(replicaManager)
+    val otherJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
+
+    EasyMock.reset(replicaManager)
+    val joinFuture = sendJoinGroup(groupId, firstMemberId, protocolType, protocols,
+      requireKnownMemberId = false)
+
+    val joinResult = await(joinFuture, DefaultSessionTimeout+100)
+    val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100)
+    assertEquals(Errors.NONE, joinResult.error)
+    assertEquals(Errors.NONE, otherJoinResult.error)
+
+    verifySessionExpiration(groupId)
+  }
+
+  @Test
+  def testNewMemberFailureAfterSyncGroupCompletion(): Unit = {
+    // For old versions of the JoinGroup protocol, new members were subject
+    // to expiration if the rebalance took long enough. This test case ensures
+    // that following completion of the SyncGroup phase, new members follow
+    // normal heartbeat expiration logic.
+
+    val firstJoinResult = dynamicJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
+    val firstMemberId = firstJoinResult.memberId
+    val firstGenerationId = firstJoinResult.generationId
+    assertEquals(firstMemberId, firstJoinResult.leaderId)
+    assertEquals(Errors.NONE, firstJoinResult.error)
+
+    EasyMock.reset(replicaManager)
+    val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId,
+      Map(firstMemberId -> Array[Byte]()))
+    assertEquals(Errors.NONE, firstSyncResult._2)
+
+    EasyMock.reset(replicaManager)
+    val otherJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
+
+    EasyMock.reset(replicaManager)
+    val joinFuture = sendJoinGroup(groupId, firstMemberId, protocolType, protocols,
+      requireKnownMemberId = false)
+
+    val joinResult = await(joinFuture, DefaultSessionTimeout+100)
+    val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100)
+    assertEquals(Errors.NONE, joinResult.error)
+    assertEquals(Errors.NONE, otherJoinResult.error)
+    val secondGenerationId = joinResult.generationId
+    val secondMemberId = otherJoinResult.memberId
+
+    EasyMock.reset(replicaManager)
+    sendSyncGroupFollower(groupId, secondGenerationId, secondMemberId)
+
+    EasyMock.reset(replicaManager)
+    val syncGroupResult = syncGroupLeader(groupId, secondGenerationId, firstMemberId,
+      Map(firstMemberId -> Array.emptyByteArray, secondMemberId -> Array.emptyByteArray))
+    assertEquals(Errors.NONE, syncGroupResult._2)
+
+    verifySessionExpiration(groupId)
+  }
+
+  private def verifySessionExpiration(groupId: String): Unit = {
+    EasyMock.reset(replicaManager)
+    EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject()))
+      .andReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)).anyTimes()
+    EasyMock.replay(replicaManager)
+
+    timer.advanceClock(DefaultSessionTimeout + 1)
+
+    val groupMetadata = group(groupId)
+    assertEquals(Empty, groupMetadata.currentState)
+    assertEquals(0, groupMetadata.allMembers.size)
+  }
+
+  @Test
+  def testJoinGroupInconsistentGroupProtocol(): Unit = {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
 
     val otherMemberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
@@ -667,12 +756,14 @@ class GroupCoordinatorTest {
   }
 
   @Test
-  def staticMemberRejoinWithLeaderIdAndKnownMemberId() {
-    val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId, sessionTimeout = DefaultRebalanceTimeout / 2)
+  def staticMemberRejoinWithLeaderIdAndKnownMemberId(): Unit = {
+    val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId,
+      sessionTimeout = DefaultRebalanceTimeout / 2)
 
     // A static leader with known id rejoin will trigger rebalance.
     EasyMock.reset(replicaManager)
-    val joinGroupResult = staticJoinGroup(groupId, rebalanceResult.leaderId, leaderInstanceId, protocolType, protocolSuperset, clockAdvance = DefaultRebalanceTimeout + 1)
+    val joinGroupResult = staticJoinGroup(groupId, rebalanceResult.leaderId, leaderInstanceId,
+      protocolType, protocolSuperset, clockAdvance = DefaultRebalanceTimeout + 1)
     // Timeout follower in the meantime.
     assertFalse(getGroup(groupId).hasStaticMember(followerInstanceId))
     checkJoinGroupResult(joinGroupResult,
@@ -2699,8 +2790,8 @@ class GroupCoordinatorTest {
     val group = getGroup(groupId)
     group.transitionTo(Dead)
     val leaderMemberId = rebalanceResult.leaderId
-    assertTrue(groupCoordinator.tryCompleteHeartbeat(group, leaderMemberId, false, DefaultSessionTimeout, () => true))
-    groupCoordinator.onExpireHeartbeat(group, leaderMemberId, false, DefaultSessionTimeout)
+    assertTrue(groupCoordinator.tryCompleteHeartbeat(group, leaderMemberId, false, () => true))
+    groupCoordinator.onExpireHeartbeat(group, leaderMemberId, false)
     assertTrue(group.has(leaderMemberId))
   }
 
@@ -2712,8 +2803,7 @@ class GroupCoordinatorTest {
     val group = getGroup(groupId)
     val leaderMemberId = rebalanceResult.leaderId
     group.remove(leaderMemberId)
-    assertFalse(groupCoordinator.tryCompleteHeartbeat(group, leaderMemberId, false, DefaultSessionTimeout, () => true))
-    groupCoordinator.onExpireHeartbeat(group, leaderMemberId, false, DefaultSessionTimeout)
+    assertTrue(groupCoordinator.tryCompleteHeartbeat(group, leaderMemberId, false, () => true))
   }
 
   private def getGroup(groupId: String): GroupMetadata = {
@@ -2800,12 +2890,13 @@ class GroupCoordinatorTest {
   private def sendSyncGroupFollower(groupId: String,
                                     generation: Int,
                                     memberId: String,
-                                    groupInstanceId: Option[String]): Future[SyncGroupCallbackParams] = {
+                                    groupInstanceId: Option[String] = None): Future[SyncGroupCallbackParams] = {
     val (responseFuture, responseCallback) = setupSyncGroupCallback
 
     EasyMock.replay(replicaManager)
 
-    groupCoordinator.handleSyncGroup(groupId, generation, memberId, groupInstanceId, Map.empty[String, Array[Byte]], responseCallback)
+    groupCoordinator.handleSyncGroup(groupId, generation, memberId, groupInstanceId,
+      Map.empty[String, Array[Byte]], responseCallback)
     responseFuture
   }