You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/03 09:46:19 UTC

[GitHub] [kafka] dajac commented on a change in pull request #9958: MINOR: Clean up group instance id handling in `GroupCoordinator`

dajac commented on a change in pull request #9958:
URL: https://github.com/apache/kafka/pull/9958#discussion_r569206619



##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -185,9 +185,9 @@ class GroupCoordinator(val brokerId: Int,
               group.remove(memberId)
               responseCallback(JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.GROUP_MAX_SIZE_REACHED))
             } else if (isUnknownMember) {
-              doUnknownJoinGroup(group, groupInstanceId, requireKnownMemberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
+              doNewMemberJoinGroup(group, groupInstanceId, requireKnownMemberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)

Review comment:
       nit:  Could we break this long line and the one below as well?

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -725,50 +890,67 @@ class GroupCoordinator(val brokerId: Int,
                                  offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
                                  responseCallback: immutable.Map[TopicPartition, Errors] => Unit): Unit = {
     group.inLock {
-      if (group.is(Dead)) {
-        // if the group is marked as dead, it means some other thread has just removed the group
-        // from the coordinator metadata; it is likely that the group has migrated to some other
-        // coordinator OR the group is in a transient unstable phase. Let the member retry
-        // finding the correct coordinator and rejoin.
-        responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.COORDINATOR_NOT_AVAILABLE })
-      } else if (group.isStaticMemberFenced(memberId, groupInstanceId, "txn-commit-offsets")) {
-        responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.FENCED_INSTANCE_ID })
-      } else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID && !group.has(memberId)) {
-        // Enforce member id when it is set.
-        responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.UNKNOWN_MEMBER_ID })
-      } else if (generationId >= 0 && generationId != group.generationId) {
-        // Enforce generation check when it is set.
-        responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.ILLEGAL_GENERATION })
+      val validationErrorOpt = validateTxnOffsetCommit(
+        group,
+        generationId,
+        memberId,
+        groupInstanceId
+      )
+
+      if (validationErrorOpt.isDefined) {
+        responseCallback(offsetMetadata.map { case (k, _) => k -> validationErrorOpt.get })

Review comment:
       nit: I would use `match` here as well.

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -220,67 +222,163 @@ class GroupCoordinator(val brokerId: Int,
         responseCallback(JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.INCONSISTENT_GROUP_PROTOCOL))
       } else {
         val newMemberId = group.generateMemberId(clientId, groupInstanceId)
+        groupInstanceId match {
+          case Some(instanceId) =>
+            doStaticNewMemberJoinGroup(
+              group,
+              instanceId,
+              newMemberId,
+              clientId,
+              clientHost,
+              rebalanceTimeoutMs,
+              sessionTimeoutMs,
+              protocolType,
+              protocols,
+              responseCallback
+            )
 
-        if (group.hasStaticMember(groupInstanceId)) {
-          updateStaticMemberAndRebalance(group, newMemberId, groupInstanceId, protocols, responseCallback)
-        } else if (requireKnownMemberId) {
-            // If member id required (dynamic membership), register the member in the pending member list
-            // and send back a response to call for another join group request with allocated member id.
-          debug(s"Dynamic member with unknown member id joins group ${group.groupId} in " +
-              s"${group.currentState} state. Created a new member id $newMemberId and request the member to rejoin with this id.")
-          group.addPendingMember(newMemberId)
-          addPendingMemberExpiration(group, newMemberId, sessionTimeoutMs)
-          responseCallback(JoinGroupResult(newMemberId, Errors.MEMBER_ID_REQUIRED))
-        } else {
-          info(s"${if (groupInstanceId.isDefined) "Static" else "Dynamic"} Member with unknown member id joins group ${group.groupId} in " +
-            s"${group.currentState} state. Created a new member id $newMemberId for this member and add to the group.")
-          addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, newMemberId, groupInstanceId,
-            clientId, clientHost, protocolType, protocols, group, responseCallback)
+          case None =>
+            doDynamicNewMemberJoinGroup(
+              group,
+              requireKnownMemberId,
+              newMemberId,
+              clientId,
+              clientHost,
+              rebalanceTimeoutMs,
+              sessionTimeoutMs,
+              protocolType,
+              protocols,
+              responseCallback
+            )
         }
       }
     }
   }
 
-  private def doJoinGroup(group: GroupMetadata,
-                          memberId: String,
-                          groupInstanceId: Option[String],
-                          clientId: String,
-                          clientHost: String,
-                          rebalanceTimeoutMs: Int,
-                          sessionTimeoutMs: Int,
-                          protocolType: String,
-                          protocols: List[(String, Array[Byte])],
-                          responseCallback: JoinCallback): Unit = {
+  private def doStaticNewMemberJoinGroup(
+    group: GroupMetadata,
+    groupInstanceId: String,
+    newMemberId: String,
+    clientId: String,
+    clientHost: String,
+    rebalanceTimeoutMs: Int,
+    sessionTimeoutMs: Int,
+    protocolType: String,
+    protocols: List[(String, Array[Byte])],
+    responseCallback: JoinCallback
+  ): Unit = {
+    group.currentStaticMemberId(groupInstanceId) match {
+      case Some(oldMemberId) =>
+        info(s"Static member with groupInstanceId=$groupInstanceId and unknown member id joins " +
+          s"group ${group.groupId} in ${group.currentState} state. Replacing previously mapped " +
+          s"member $oldMemberId with this groupInstanceId.")
+        updateStaticMemberAndRebalance(group, newMemberId, oldMemberId, groupInstanceId, protocols, responseCallback)
+
+      case None =>
+        info(s"Static member with groupInstanceId=$groupInstanceId and unknown member id joins " +
+          s"group ${group.groupId} in ${group.currentState} state. Created a new member id $newMemberId " +
+          s"for this member and add to the group.")
+        addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, newMemberId, Some(groupInstanceId),
+          clientId, clientHost, protocolType, protocols, group, responseCallback)
+    }
+  }
+
+  private def doDynamicNewMemberJoinGroup(
+    group: GroupMetadata,
+    requireKnownMemberId: Boolean,
+    newMemberId: String,
+    clientId: String,
+    clientHost: String,
+    rebalanceTimeoutMs: Int,
+    sessionTimeoutMs: Int,
+    protocolType: String,
+    protocols: List[(String, Array[Byte])],
+    responseCallback: JoinCallback
+  ): Unit = {
+    if (requireKnownMemberId) {
+      // If member id required, register the member in the pending member list and send
+      // back a response to call for another join group request with allocated member id.
+      info(s"Dynamic member with unknown member id joins group ${group.groupId} in " +
+        s"${group.currentState} state. Created a new member id $newMemberId and request the " +
+        s"member to rejoin with this id.")
+      group.addPendingMember(newMemberId)
+      addPendingMemberExpiration(group, newMemberId, sessionTimeoutMs)
+      responseCallback(JoinGroupResult(newMemberId, Errors.MEMBER_ID_REQUIRED))
+    } else {
+      info(s"Dynamic Member with unknown member id joins group ${group.groupId} in " +
+        s"${group.currentState} state. Created a new member id $newMemberId for this member " +
+        s"and add to the group.")
+      addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, newMemberId, None,
+        clientId, clientHost, protocolType, protocols, group, responseCallback)
+    }
+  }
+
+  private def validateCurrentMember(
+    group: GroupMetadata,
+    memberId: String,
+    groupInstanceId: Option[String],
+    operation: String
+  ): Option[Errors] = {
+    // We are validating two things:
+    // 1. If `groupInstanceId` is present, then it exists and is mapped to `memberId`
+    // 2. The `memberId` exists in the group
+

Review comment:
       nit: This empty line could be removed.

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -603,6 +731,25 @@ class GroupCoordinator(val brokerId: Int,
     groupError -> partitionErrors
   }
 
+  private def validateHeartbeat(
+    group: GroupMetadata,
+    generationId: Int,
+    memberId: String,
+    groupInstanceId: Option[String]
+  ): Option[Errors] = {
+    if (group.is(Dead)) {
+      Some(Errors.COORDINATOR_NOT_AVAILABLE)
+    } else {
+      validateCurrentMember(group, memberId, groupInstanceId, "sync-group").orElse {

Review comment:
       `heartbeat` instead of `sync-group` should be used here.

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -1017,26 +1198,17 @@ class GroupCoordinator(val brokerId: Int,
     // for new members. If the new member is still there, we expect it to retry.
     completeAndScheduleNextExpiration(group, member, NewMemberJoinTimeoutMs)
 
-    if (member.isStaticMember) {
-      info(s"Adding new static member $groupInstanceId to group ${group.groupId} with member id $memberId.")
-      group.addStaticMember(groupInstanceId, memberId)
-    } else {
-      group.removePendingMember(memberId)
-    }
     maybePrepareRebalance(group, s"Adding new member $memberId with group instance id $groupInstanceId")
   }
 
   private def updateStaticMemberAndRebalance(group: GroupMetadata,
                                              newMemberId: String,
-                                             groupInstanceId: Option[String],
+                                             oldMemberId: String,

Review comment:
       nit: I would put `oldMemberId` before `newMemberId`. It feels a bit more natural when reading it.

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -1055,11 +1227,15 @@ class GroupCoordinator(val brokerId: Int,
           val groupAssignment: Map[String, Array[Byte]] = group.allMemberMetadata.map(member => member.memberId -> member.assignment).toMap
           groupManager.storeGroup(group, groupAssignment, error => {
             if (error != Errors.NONE) {
+
+              // TODO: This logic seems questionable. The write was not committed, but that doesn't
+              //  mean it wasn't written to the log and cannot eventually become committed.

Review comment:
       I do agree. Should we turn this onto a Jira and address it in a follow up PR?

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
##########
@@ -247,11 +250,15 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
 
     if (leaderId.isEmpty)
       leaderId = Some(member.memberId)
+
     members.put(member.memberId, member)
     member.supportedProtocols.foreach{ case (protocol, _) => supportedProtocols(protocol) += 1 }

Review comment:
       nit: Not related to your PR but a space is missing before `{`.

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -725,50 +890,67 @@ class GroupCoordinator(val brokerId: Int,
                                  offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
                                  responseCallback: immutable.Map[TopicPartition, Errors] => Unit): Unit = {
     group.inLock {
-      if (group.is(Dead)) {
-        // if the group is marked as dead, it means some other thread has just removed the group
-        // from the coordinator metadata; it is likely that the group has migrated to some other
-        // coordinator OR the group is in a transient unstable phase. Let the member retry
-        // finding the correct coordinator and rejoin.
-        responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.COORDINATOR_NOT_AVAILABLE })
-      } else if (group.isStaticMemberFenced(memberId, groupInstanceId, "txn-commit-offsets")) {
-        responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.FENCED_INSTANCE_ID })
-      } else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID && !group.has(memberId)) {
-        // Enforce member id when it is set.
-        responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.UNKNOWN_MEMBER_ID })
-      } else if (generationId >= 0 && generationId != group.generationId) {
-        // Enforce generation check when it is set.
-        responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.ILLEGAL_GENERATION })
+      val validationErrorOpt = validateTxnOffsetCommit(
+        group,
+        generationId,
+        memberId,
+        groupInstanceId
+      )
+
+      if (validationErrorOpt.isDefined) {
+        responseCallback(offsetMetadata.map { case (k, _) => k -> validationErrorOpt.get })
       } else {
         groupManager.storeOffsets(group, memberId, offsetMetadata, responseCallback, producerId, producerEpoch)
       }
     }
   }
 
+
+  private def validateOffsetCommit(
+    group: GroupMetadata,
+    generationId: Int,
+    memberId: String,
+    groupInstanceId: Option[String]
+  ): Option[Errors] = {
+    if (group.is(Dead)) {
+      Some(Errors.COORDINATOR_NOT_AVAILABLE)
+    } else if (generationId >= 0 || memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID || groupInstanceId.isDefined) {
+      validateCurrentMember(group, memberId, groupInstanceId, "offset-commit").orElse {
+        if (generationId != group.generationId) {
+          Some(Errors.ILLEGAL_GENERATION)
+        } else {
+          None
+        }
+      }
+    } else if (!group.is(Empty)) {
+      // When the group is non-empty, only members can commit offsets
+      Some(Errors.UNKNOWN_MEMBER_ID)
+    } else {
+      None
+    }
+  }
+
   private def doCommitOffsets(group: GroupMetadata,
                               memberId: String,
                               groupInstanceId: Option[String],
                               generationId: Int,
                               offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
                               responseCallback: immutable.Map[TopicPartition, Errors] => Unit): Unit = {
     group.inLock {
-      if (group.is(Dead)) {
-        // if the group is marked as dead, it means some other thread has just removed the group
-        // from the coordinator metadata; it is likely that the group has migrated to some other
-        // coordinator OR the group is in a transient unstable phase. Let the member retry
-        // finding the correct coordinator and rejoin.
-        responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.COORDINATOR_NOT_AVAILABLE })
-      } else if (group.isStaticMemberFenced(memberId, groupInstanceId, "commit-offsets")) {
-        responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.FENCED_INSTANCE_ID })
-      } else if (generationId < 0 && group.is(Empty)) {
-        // The group is only using Kafka to store offsets.
-        groupManager.storeOffsets(group, memberId, offsetMetadata, responseCallback)
-      } else if (!group.has(memberId)) {
-        responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.UNKNOWN_MEMBER_ID })
-      } else if (generationId != group.generationId) {
-        responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.ILLEGAL_GENERATION })
+      val validationErrorOpt = validateOffsetCommit(
+        group,
+        generationId,
+        memberId,
+        groupInstanceId
+      )
+
+      if (validationErrorOpt.isDefined) {
+        responseCallback(offsetMetadata.map { case (k, _) => k -> validationErrorOpt.get })

Review comment:
       nit: I would use `match` here as well.

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -470,33 +600,31 @@ class GroupCoordinator(val brokerId: Int,
                 val memberErrors = leavingMembers.map { leavingMember =>
                   val memberId = leavingMember.memberId
                   val groupInstanceId = Option(leavingMember.groupInstanceId)
-                  if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID
-                    && group.isStaticMemberFenced(memberId, groupInstanceId, "leave-group")) {
-                    memberLeaveError(leavingMember, Errors.FENCED_INSTANCE_ID)
+
+                  // The LeaveGroup API allows administrative removal of members by GroupInstanceId
+                  // in which case we expect the MemberId to be undefined.
+                  if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
+                    groupInstanceId.flatMap(group.currentStaticMemberId) match {
+                      case Some(currentMemberId) =>
+                        removeCurrentMemberFromGroup(group, currentMemberId)
+                        memberLeaveError(leavingMember, Errors.NONE)
+                      case None =>
+                        memberLeaveError(leavingMember, Errors.UNKNOWN_MEMBER_ID)
+                    }
                   } else if (group.isPendingMember(memberId)) {
-                    if (groupInstanceId.isDefined) {
-                      throw new IllegalStateException(s"the static member $groupInstanceId was not expected to be leaving " +
-                        s"from pending member bucket with member id $memberId")
+                    removePendingMemberAndUpdateGroup(group, memberId)
+                    heartbeatPurgatory.checkAndComplete(MemberKey(group.groupId, memberId))
+                    info(s"Pending member with memberId=$memberId has left group ${group.groupId} " +
+                      s"through explicit `LeaveGroup` request")
+                    memberLeaveError(leavingMember, Errors.NONE)
+                  } else {
+                    val memberErrorOpt = validateCurrentMember(group, memberId, groupInstanceId, "leave-group")
+                    if (memberErrorOpt.isDefined) {
+                      memberLeaveError(leavingMember, memberErrorOpt.get)

Review comment:
       nit: It might be better to use `match` here instead of using `isDefined` and `get` on `memberErrorOpt`.

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -725,50 +890,67 @@ class GroupCoordinator(val brokerId: Int,
                                  offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
                                  responseCallback: immutable.Map[TopicPartition, Errors] => Unit): Unit = {
     group.inLock {
-      if (group.is(Dead)) {
-        // if the group is marked as dead, it means some other thread has just removed the group
-        // from the coordinator metadata; it is likely that the group has migrated to some other
-        // coordinator OR the group is in a transient unstable phase. Let the member retry
-        // finding the correct coordinator and rejoin.
-        responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.COORDINATOR_NOT_AVAILABLE })
-      } else if (group.isStaticMemberFenced(memberId, groupInstanceId, "txn-commit-offsets")) {
-        responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.FENCED_INSTANCE_ID })
-      } else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID && !group.has(memberId)) {
-        // Enforce member id when it is set.
-        responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.UNKNOWN_MEMBER_ID })
-      } else if (generationId >= 0 && generationId != group.generationId) {
-        // Enforce generation check when it is set.
-        responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.ILLEGAL_GENERATION })
+      val validationErrorOpt = validateTxnOffsetCommit(
+        group,
+        generationId,
+        memberId,
+        groupInstanceId
+      )
+
+      if (validationErrorOpt.isDefined) {
+        responseCallback(offsetMetadata.map { case (k, _) => k -> validationErrorOpt.get })
       } else {
         groupManager.storeOffsets(group, memberId, offsetMetadata, responseCallback, producerId, producerEpoch)
       }
     }
   }
 
+
+  private def validateOffsetCommit(
+    group: GroupMetadata,
+    generationId: Int,
+    memberId: String,
+    groupInstanceId: Option[String]
+  ): Option[Errors] = {
+    if (group.is(Dead)) {
+      Some(Errors.COORDINATOR_NOT_AVAILABLE)
+    } else if (generationId >= 0 || memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID || groupInstanceId.isDefined) {
+      validateCurrentMember(group, memberId, groupInstanceId, "offset-commit").orElse {
+        if (generationId != group.generationId) {
+          Some(Errors.ILLEGAL_GENERATION)
+        } else {
+          None
+        }
+      }

Review comment:
       This block is similar to the one in `validateTxnOffsetCommit` module the reason. I wonder if we could define a common helper method for both cases.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org