You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/10/15 11:33:29 UTC

[GitHub] [kafka] tombentley opened a new pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

tombentley opened a new pull request #9441:
URL: https://github.com/apache/kafka/pull/9441


   Implements the single thread with FIFO approach suggested in https://issues.apache.org/jira/browse/KAFKA-10614


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#issuecomment-879401529


   Cherry-picked to 2.8


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#discussion_r624131473



##########
File path: core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##########
@@ -526,34 +529,42 @@ class GroupMetadataManager(brokerId: Int,
   /**
    * Asynchronously read the partition from the offsets topic and populate the cache
    */
-  def scheduleLoadGroupAndOffsets(offsetsPartition: Int, onGroupLoaded: GroupMetadata => Unit): Unit = {
+  def scheduleLoadGroupAndOffsets(offsetsPartition: Int, coordinatorEpoch: Int, onGroupLoaded: GroupMetadata => Unit): Unit = {
     val topicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
-    if (addLoadingPartition(offsetsPartition)) {
-      info(s"Scheduling loading of offsets and group metadata from $topicPartition")
-      val startTimeMs = time.milliseconds()
-      scheduler.schedule(topicPartition.toString, () => loadGroupsAndOffsets(topicPartition, onGroupLoaded, startTimeMs))
-    } else {
-      info(s"Already loading offsets and group metadata from $topicPartition")
-    }
+    info(s"Scheduling loading of offsets and group metadata from $topicPartition for epoch $coordinatorEpoch")
+    val startTimeMs = time.milliseconds()
+    scheduler.schedule(topicPartition.toString, () => loadGroupsAndOffsets(topicPartition, coordinatorEpoch, onGroupLoaded, startTimeMs))
   }
 
-  private[group] def loadGroupsAndOffsets(topicPartition: TopicPartition, onGroupLoaded: GroupMetadata => Unit, startTimeMs: java.lang.Long): Unit = {
-    try {
-      val schedulerTimeMs = time.milliseconds() - startTimeMs
-      debug(s"Started loading offsets and group metadata from $topicPartition")
-      doLoadGroupsAndOffsets(topicPartition, onGroupLoaded)
-      val endTimeMs = time.milliseconds()
-      val totalLoadingTimeMs = endTimeMs - startTimeMs
-      partitionLoadSensor.record(totalLoadingTimeMs.toDouble, endTimeMs, false)
-      info(s"Finished loading offsets and group metadata from $topicPartition "
-        + s"in $totalLoadingTimeMs milliseconds, of which $schedulerTimeMs milliseconds"
-        + s" was spent in the scheduler.")
-    } catch {
-      case t: Throwable => error(s"Error loading offsets from $topicPartition", t)
-    } finally {
-      inLock(partitionLock) {
-        ownedPartitions.add(topicPartition.partition)
-        loadingPartitions.remove(topicPartition.partition)
+  private[group] def loadGroupsAndOffsets(
+    topicPartition: TopicPartition,
+    coordinatorEpoch: Int,
+    onGroupLoaded: GroupMetadata => Unit,
+    startTimeMs: java.lang.Long
+  ): Unit = {
+    if (!maybeUpdateCoordinatorEpoch(topicPartition.partition, Some(coordinatorEpoch))) {
+      info(s"Not loading offsets and group metadata for $topicPartition " +
+        s"in epoch $coordinatorEpoch since current epoch is ${epochForPartitionId.get(topicPartition.partition)}")
+    } else if (!addLoadingPartition(topicPartition.partition)) {

Review comment:
       One minor improvement here is to change `addLoadingPartition` so that it checks whether the partition is already contained in `ownedPartitions`. If so, we can return false.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

Posted by GitBox <gi...@apache.org>.
tombentley commented on pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#issuecomment-738925531


   You're right @guozhangwang, `ScheduledFutureTask` contains a sequence number to break ties, so the executor _is_ FIFO. So you're saying that the wrong network thread handling one of these two LeaderAndISR acquires `replicaStateChangeLock` first, right? If that's the case then might a similar problem affect the `txnCoordinator` too?
   
   I updated the PR if you want to take another look, but I still need to address synchronization, since although `onLeadershipChange` is executed with `ReplicaManager.replicaStateChangeLock`, `onResignation` is also called via `handleStopReplicaRequest`. Perhaps it's enough to make `onResignation` synchronized.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

Posted by GitBox <gi...@apache.org>.
tombentley commented on pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#issuecomment-710044807


   > One thing I didn't understand was why I needed to change `AbstractFetcherManagerTest`.
   
   Stupid me, that was because I turned up the logging. I'll revert that.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#discussion_r645870784



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -279,30 +279,33 @@ class KafkaApis(val requestChannel: RequestChannel,
         new StopReplicaResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code)))
     } else {
       val partitionStates = stopReplicaRequest.partitionStates().asScala
-      val (result, error) = replicaManager.stopReplicas(
-        request.context.correlationId,
-        stopReplicaRequest.controllerId,
-        stopReplicaRequest.controllerEpoch,
-        stopReplicaRequest.brokerEpoch,
-        partitionStates)
-      // Clear the coordinator caches in case we were the leader. In the case of a reassignment, we
-      // cannot rely on the LeaderAndIsr API for this since it is only sent to active replicas.
-      result.forKeyValue { (topicPartition, error) =>
-        if (error == Errors.NONE) {
-          if (topicPartition.topic == GROUP_METADATA_TOPIC_NAME
-              && partitionStates(topicPartition).deletePartition) {
-            groupCoordinator.onResignation(topicPartition.partition)
-          } else if (topicPartition.topic == TRANSACTION_STATE_TOPIC_NAME
-                     && partitionStates(topicPartition).deletePartition) {
+      def onStopReplicas(error: Errors, partitions: Map[TopicPartition, Errors]): Unit = {
+        // Clear the coordinator caches in case we were the leader. In the case of a reassignment, we
+        // cannot rely on the LeaderAndIsr API for this since it is only sent to active replicas.
+        partitions.forKeyValue { (topicPartition, partitionError) =>
+          if (partitionError == Errors.NONE) {
             val partitionState = partitionStates(topicPartition)
             val leaderEpoch = if (partitionState.leaderEpoch >= 0)
-                Some(partitionState.leaderEpoch)
+              Some(partitionState.leaderEpoch)
             else
               None
-            txnCoordinator.onResignation(topicPartition.partition, coordinatorEpoch = leaderEpoch)
+            if (topicPartition.topic == GROUP_METADATA_TOPIC_NAME
+              && partitionState.deletePartition) {
+              groupCoordinator.onResignation(topicPartition.partition, leaderEpoch)
+            } else if (topicPartition.topic == TRANSACTION_STATE_TOPIC_NAME
+              && partitionState.deletePartition) {
+              txnCoordinator.onResignation(topicPartition.partition, coordinatorEpoch = leaderEpoch)
+            }
           }
         }
       }
+      val (result, error) = replicaManager.stopReplicas(
+        request.context.correlationId,
+        stopReplicaRequest.controllerId,
+        stopReplicaRequest.controllerEpoch,
+        stopReplicaRequest.brokerEpoch,
+        partitionStates,
+        onStopReplicas)

Review comment:
       Thanks @hachikuji , I think using epoch would be sufficient too.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#issuecomment-804526021


   Hey @guozhangwang and @hachikuji , I just ran across this PR as part of the 2.8 release. It's not going to make 2.8, but we should go ahead and close the loop to get it fixed for 3.0.
   
   Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

Posted by GitBox <gi...@apache.org>.
tombentley commented on pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#issuecomment-710030951


   @hachikuji I'm now passing in the coordinator epoch and ignoring attempts to unload state if the epoch is old than what's loaded. I've also added a test. One thing I didn't understand was why I needed to change `AbstractFetcherManagerTest`. Please could you take another look?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#issuecomment-709432283


   @tombentley Thanks for the patch. I think the problem is in the callback inside `KafkaApis.handleLeaderAndIsrRequest`. With two concurrent `LeaderAndIsr` requests, there is no guarantee about the ordering of the callbacks even though `ReplicaManager` checks epoch. They can get reordered like the following:
   
   1. Call `ReplicaManager.becomeLeaderOrFollower` with epoch=1 in thread 1
   2. Call `ReplicaManager.becomeLeaderOrFollower` with epoch=2 in thread 2
   3. Call `onLeadershipChange` callback in thread 2
   4. Call `onLeadershipChange` callback in thread 1
   
   At least that's my understanding of the issue. I think we need to find a way to push the epoch validation into `GroupCoordinator`. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on a change in pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

Posted by GitBox <gi...@apache.org>.
tombentley commented on a change in pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#discussion_r644571665



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -279,30 +279,33 @@ class KafkaApis(val requestChannel: RequestChannel,
         new StopReplicaResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code)))
     } else {
       val partitionStates = stopReplicaRequest.partitionStates().asScala
-      val (result, error) = replicaManager.stopReplicas(
-        request.context.correlationId,
-        stopReplicaRequest.controllerId,
-        stopReplicaRequest.controllerEpoch,
-        stopReplicaRequest.brokerEpoch,
-        partitionStates)
-      // Clear the coordinator caches in case we were the leader. In the case of a reassignment, we
-      // cannot rely on the LeaderAndIsr API for this since it is only sent to active replicas.
-      result.forKeyValue { (topicPartition, error) =>
-        if (error == Errors.NONE) {
-          if (topicPartition.topic == GROUP_METADATA_TOPIC_NAME
-              && partitionStates(topicPartition).deletePartition) {
-            groupCoordinator.onResignation(topicPartition.partition)
-          } else if (topicPartition.topic == TRANSACTION_STATE_TOPIC_NAME
-                     && partitionStates(topicPartition).deletePartition) {
+      def onStopReplicas(error: Errors, partitions: Map[TopicPartition, Errors]): Unit = {
+        // Clear the coordinator caches in case we were the leader. In the case of a reassignment, we
+        // cannot rely on the LeaderAndIsr API for this since it is only sent to active replicas.
+        partitions.forKeyValue { (topicPartition, partitionError) =>
+          if (partitionError == Errors.NONE) {
             val partitionState = partitionStates(topicPartition)
             val leaderEpoch = if (partitionState.leaderEpoch >= 0)
-                Some(partitionState.leaderEpoch)
+              Some(partitionState.leaderEpoch)
             else
               None
-            txnCoordinator.onResignation(topicPartition.partition, coordinatorEpoch = leaderEpoch)
+            if (topicPartition.topic == GROUP_METADATA_TOPIC_NAME
+              && partitionState.deletePartition) {
+              groupCoordinator.onResignation(topicPartition.partition, leaderEpoch)
+            } else if (topicPartition.topic == TRANSACTION_STATE_TOPIC_NAME
+              && partitionState.deletePartition) {
+              txnCoordinator.onResignation(topicPartition.partition, coordinatorEpoch = leaderEpoch)
+            }
           }
         }
       }
+      val (result, error) = replicaManager.stopReplicas(
+        request.context.correlationId,
+        stopReplicaRequest.controllerId,
+        stopReplicaRequest.controllerEpoch,
+        stopReplicaRequest.brokerEpoch,
+        partitionStates,
+        onStopReplicas)

Review comment:
       @hachikuji as @guozhangwang pointed out, the scheduler really _is_ FIFO (it uses a sequence number internally to guarantee order is maintained), so assuming his theory about the racing i/o threads is correct (and I think it is, but I've never observed this problem) then his solution of holding the lock for `handleStopReplicaRequest` would work. 
   
   The current version of the PR doesn't make assumptions about how any reordering can happen (i.e. whether caused by the inconsistent locking or anything else). So I don't think you're missing anything, you've just solved the problem differently.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on a change in pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

Posted by GitBox <gi...@apache.org>.
tombentley commented on a change in pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#discussion_r625802963



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -279,30 +279,33 @@ class KafkaApis(val requestChannel: RequestChannel,
         new StopReplicaResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code)))
     } else {
       val partitionStates = stopReplicaRequest.partitionStates().asScala
-      val (result, error) = replicaManager.stopReplicas(
-        request.context.correlationId,
-        stopReplicaRequest.controllerId,
-        stopReplicaRequest.controllerEpoch,
-        stopReplicaRequest.brokerEpoch,
-        partitionStates)
-      // Clear the coordinator caches in case we were the leader. In the case of a reassignment, we
-      // cannot rely on the LeaderAndIsr API for this since it is only sent to active replicas.
-      result.forKeyValue { (topicPartition, error) =>
-        if (error == Errors.NONE) {
-          if (topicPartition.topic == GROUP_METADATA_TOPIC_NAME
-              && partitionStates(topicPartition).deletePartition) {
-            groupCoordinator.onResignation(topicPartition.partition)
-          } else if (topicPartition.topic == TRANSACTION_STATE_TOPIC_NAME
-                     && partitionStates(topicPartition).deletePartition) {
+      def onStopReplicas(error: Errors, partitions: Map[TopicPartition, Errors]): Unit = {
+        // Clear the coordinator caches in case we were the leader. In the case of a reassignment, we
+        // cannot rely on the LeaderAndIsr API for this since it is only sent to active replicas.
+        partitions.forKeyValue { (topicPartition, partitionError) =>
+          if (partitionError == Errors.NONE) {
             val partitionState = partitionStates(topicPartition)
             val leaderEpoch = if (partitionState.leaderEpoch >= 0)
-                Some(partitionState.leaderEpoch)
+              Some(partitionState.leaderEpoch)
             else
               None
-            txnCoordinator.onResignation(topicPartition.partition, coordinatorEpoch = leaderEpoch)
+            if (topicPartition.topic == GROUP_METADATA_TOPIC_NAME
+              && partitionState.deletePartition) {
+              groupCoordinator.onResignation(topicPartition.partition, leaderEpoch)
+            } else if (topicPartition.topic == TRANSACTION_STATE_TOPIC_NAME
+              && partitionState.deletePartition) {
+              txnCoordinator.onResignation(topicPartition.partition, coordinatorEpoch = leaderEpoch)
+            }
           }
         }
       }
+      val (result, error) = replicaManager.stopReplicas(
+        request.context.correlationId,
+        stopReplicaRequest.controllerId,
+        stopReplicaRequest.controllerEpoch,
+        stopReplicaRequest.brokerEpoch,
+        partitionStates,
+        onStopReplicas)

Review comment:
       @guozhangwang yes, that's right. I forgot about our conversation about the lock when @hachikuji asked about why we were using the callback  :disappointed:. 
   
   I notice that the `partitionLock` is acquired by the `addLoadingPartition` call in `loadGroupsAndOffsets`, and is also acquired in `removeGroupsAndOffsets`. Wouldn't it be simpler to use that than `replicaStateChangeLock` at this point if we're wanting to avoid a third way of handling concurrency here, or is there some subtlety? Obviously we wouldn't hold it for the call to `doLoadGroupsAndOffsets` in `loadGroupsAndOffsets`, just for the two checks at the start
   
   ```
       if (!maybeUpdateCoordinatorEpoch(topicPartition.partition, Some(coordinatorEpoch))) {
         info(s"Not loading offsets and group metadata for $topicPartition " +
           s"in epoch $coordinatorEpoch since current epoch is ${epochForPartitionId.get(topicPartition.partition)}")
       } else if (!addLoadingPartition(topicPartition.partition)) {
         info(s"Already loading offsets and group metadata from $topicPartition")
       }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on a change in pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

Posted by GitBox <gi...@apache.org>.
tombentley commented on a change in pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#discussion_r615722282



##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -87,6 +87,8 @@ class GroupCoordinator(val brokerId: Int,
 
   private val isActive = new AtomicBoolean(false)
 
+  val epochForPartitionId = mutable.Map[Int, Int]()

Review comment:
       Yes it does, thanks! Now fixed. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#discussion_r618025026



##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -905,19 +908,32 @@ class GroupCoordinator(val brokerId: Int,
    *
    * @param offsetTopicPartitionId The partition we are now leading
    */
-  def onElection(offsetTopicPartitionId: Int): Unit = {
-    info(s"Elected as the group coordinator for partition $offsetTopicPartitionId")
-    groupManager.scheduleLoadGroupAndOffsets(offsetTopicPartitionId, onGroupLoaded)
+  def onElection(offsetTopicPartitionId: Int, coordinatorEpoch: Int): Unit = {
+    val currentEpoch = Option(epochForPartitionId.get(offsetTopicPartitionId))
+    if (currentEpoch.forall(currentEpoch => coordinatorEpoch > currentEpoch)) {
+      info(s"Elected as the group coordinator for partition $offsetTopicPartitionId in epoch $coordinatorEpoch")
+      groupManager.scheduleLoadGroupAndOffsets(offsetTopicPartitionId, onGroupLoaded)
+      epochForPartitionId.put(offsetTopicPartitionId, coordinatorEpoch)
+    } else {
+      warn(s"Ignored election as group coordinator for partition $offsetTopicPartitionId " +
+        s"in epoch $coordinatorEpoch since current epoch is $currentEpoch")
+    }
   }
 
   /**
    * Unload cached state for the given partition and stop handling requests for groups which map to it.
    *
    * @param offsetTopicPartitionId The partition we are no longer leading
    */
-  def onResignation(offsetTopicPartitionId: Int): Unit = {
-    info(s"Resigned as the group coordinator for partition $offsetTopicPartitionId")
-    groupManager.removeGroupsForPartition(offsetTopicPartitionId, onGroupUnloaded)
+  def onResignation(offsetTopicPartitionId: Int, coordinatorEpoch: Option[Int]): Unit = {
+    val currentEpoch = Option(epochForPartitionId.get(offsetTopicPartitionId))
+    if (currentEpoch.forall(currentEpoch => currentEpoch <= coordinatorEpoch.getOrElse(Int.MaxValue))) {

Review comment:
       Similarly, we should update `epochForPartitionId` here with a CAS operation.

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -905,19 +908,32 @@ class GroupCoordinator(val brokerId: Int,
    *
    * @param offsetTopicPartitionId The partition we are now leading
    */
-  def onElection(offsetTopicPartitionId: Int): Unit = {
-    info(s"Elected as the group coordinator for partition $offsetTopicPartitionId")
-    groupManager.scheduleLoadGroupAndOffsets(offsetTopicPartitionId, onGroupLoaded)
+  def onElection(offsetTopicPartitionId: Int, coordinatorEpoch: Int): Unit = {
+    val currentEpoch = Option(epochForPartitionId.get(offsetTopicPartitionId))
+    if (currentEpoch.forall(currentEpoch => coordinatorEpoch > currentEpoch)) {

Review comment:
       Can we do a CAS update? Otherwise I don't think this is safe.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -279,30 +279,33 @@ class KafkaApis(val requestChannel: RequestChannel,
         new StopReplicaResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code)))
     } else {
       val partitionStates = stopReplicaRequest.partitionStates().asScala
-      val (result, error) = replicaManager.stopReplicas(
-        request.context.correlationId,
-        stopReplicaRequest.controllerId,
-        stopReplicaRequest.controllerEpoch,
-        stopReplicaRequest.brokerEpoch,
-        partitionStates)
-      // Clear the coordinator caches in case we were the leader. In the case of a reassignment, we
-      // cannot rely on the LeaderAndIsr API for this since it is only sent to active replicas.
-      result.forKeyValue { (topicPartition, error) =>
-        if (error == Errors.NONE) {
-          if (topicPartition.topic == GROUP_METADATA_TOPIC_NAME
-              && partitionStates(topicPartition).deletePartition) {
-            groupCoordinator.onResignation(topicPartition.partition)
-          } else if (topicPartition.topic == TRANSACTION_STATE_TOPIC_NAME
-                     && partitionStates(topicPartition).deletePartition) {
+      def onStopReplicas(error: Errors, partitions: Map[TopicPartition, Errors]): Unit = {
+        // Clear the coordinator caches in case we were the leader. In the case of a reassignment, we
+        // cannot rely on the LeaderAndIsr API for this since it is only sent to active replicas.
+        partitions.forKeyValue { (topicPartition, partitionError) =>
+          if (partitionError == Errors.NONE) {
             val partitionState = partitionStates(topicPartition)
             val leaderEpoch = if (partitionState.leaderEpoch >= 0)
-                Some(partitionState.leaderEpoch)
+              Some(partitionState.leaderEpoch)
             else
               None
-            txnCoordinator.onResignation(topicPartition.partition, coordinatorEpoch = leaderEpoch)
+            if (topicPartition.topic == GROUP_METADATA_TOPIC_NAME
+              && partitionState.deletePartition) {
+              groupCoordinator.onResignation(topicPartition.partition, leaderEpoch)
+            } else if (topicPartition.topic == TRANSACTION_STATE_TOPIC_NAME
+              && partitionState.deletePartition) {
+              txnCoordinator.onResignation(topicPartition.partition, coordinatorEpoch = leaderEpoch)
+            }
           }
         }
       }
+      val (result, error) = replicaManager.stopReplicas(
+        request.context.correlationId,
+        stopReplicaRequest.controllerId,
+        stopReplicaRequest.controllerEpoch,
+        stopReplicaRequest.brokerEpoch,
+        partitionStates,
+        onStopReplicas)

Review comment:
       It's not clear to me why we moved this in `ReplicaManager`. Is there some reason we need the `replicaStateChangeLock` lock?

##########
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##########
@@ -2891,17 +2898,29 @@ class KafkaApisTest {
       EasyMock.eq(controllerId),
       EasyMock.eq(controllerEpoch),
       EasyMock.eq(brokerEpochInRequest),
-      EasyMock.eq(stopReplicaRequest.partitionStates().asScala)
-    )).andStubReturn(
-      (mutable.Map(
+      EasyMock.eq(stopReplicaRequest.partitionStates().asScala),
+      EasyMock.anyObject()
+    )).andStubAnswer {() =>
+      val result = (mutable.Map(
         fooPartition -> Errors.NONE
       ), Errors.NONE)
-    )
+//<<<<<<< HEAD

Review comment:
       Can you fix this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#issuecomment-773096994


   This change looks good to me. @hachikuji could you take another look before we merge?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on a change in pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

Posted by GitBox <gi...@apache.org>.
tombentley commented on a change in pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#discussion_r618358643



##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -905,19 +908,32 @@ class GroupCoordinator(val brokerId: Int,
    *
    * @param offsetTopicPartitionId The partition we are now leading
    */
-  def onElection(offsetTopicPartitionId: Int): Unit = {
-    info(s"Elected as the group coordinator for partition $offsetTopicPartitionId")
-    groupManager.scheduleLoadGroupAndOffsets(offsetTopicPartitionId, onGroupLoaded)
+  def onElection(offsetTopicPartitionId: Int, coordinatorEpoch: Int): Unit = {
+    val currentEpoch = Option(epochForPartitionId.get(offsetTopicPartitionId))
+    if (currentEpoch.forall(currentEpoch => coordinatorEpoch > currentEpoch)) {
+      info(s"Elected as the group coordinator for partition $offsetTopicPartitionId in epoch $coordinatorEpoch")
+      groupManager.scheduleLoadGroupAndOffsets(offsetTopicPartitionId, onGroupLoaded)
+      epochForPartitionId.put(offsetTopicPartitionId, coordinatorEpoch)
+    } else {
+      warn(s"Ignored election as group coordinator for partition $offsetTopicPartitionId " +
+        s"in epoch $coordinatorEpoch since current epoch is $currentEpoch")
+    }
   }
 
   /**
    * Unload cached state for the given partition and stop handling requests for groups which map to it.
    *
    * @param offsetTopicPartitionId The partition we are no longer leading
    */
-  def onResignation(offsetTopicPartitionId: Int): Unit = {
-    info(s"Resigned as the group coordinator for partition $offsetTopicPartitionId")
-    groupManager.removeGroupsForPartition(offsetTopicPartitionId, onGroupUnloaded)
+  def onResignation(offsetTopicPartitionId: Int, coordinatorEpoch: Option[Int]): Unit = {
+    val currentEpoch = Option(epochForPartitionId.get(offsetTopicPartitionId))
+    if (currentEpoch.forall(currentEpoch => currentEpoch <= coordinatorEpoch.getOrElse(Int.MaxValue))) {

Review comment:
       This one doesn't do an update any more (following your other comment).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#discussion_r620695517



##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -905,19 +908,35 @@ class GroupCoordinator(val brokerId: Int,
    *
    * @param offsetTopicPartitionId The partition we are now leading
    */
-  def onElection(offsetTopicPartitionId: Int): Unit = {
-    info(s"Elected as the group coordinator for partition $offsetTopicPartitionId")
-    groupManager.scheduleLoadGroupAndOffsets(offsetTopicPartitionId, onGroupLoaded)
+  def onElection(offsetTopicPartitionId: Int, coordinatorEpoch: Int): Unit = {
+    epochForPartitionId.compute(offsetTopicPartitionId, (_, epoch) => {
+      val currentEpoch = Option(epoch)
+      if (currentEpoch.forall(currentEpoch => coordinatorEpoch > currentEpoch)) {

Review comment:
       One final thing I was considering is whether we should push this check into `GroupMetadataManager.loadGroupsAndOffsets`. That would give us some protection against any assumptions about ordering in `KafkaScheduler`.

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -905,19 +908,32 @@ class GroupCoordinator(val brokerId: Int,
    *
    * @param offsetTopicPartitionId The partition we are now leading
    */
-  def onElection(offsetTopicPartitionId: Int): Unit = {
-    info(s"Elected as the group coordinator for partition $offsetTopicPartitionId")
-    groupManager.scheduleLoadGroupAndOffsets(offsetTopicPartitionId, onGroupLoaded)
+  def onElection(offsetTopicPartitionId: Int, coordinatorEpoch: Int): Unit = {
+    val currentEpoch = Option(epochForPartitionId.get(offsetTopicPartitionId))
+    if (currentEpoch.forall(currentEpoch => coordinatorEpoch > currentEpoch)) {
+      info(s"Elected as the group coordinator for partition $offsetTopicPartitionId in epoch $coordinatorEpoch")
+      groupManager.scheduleLoadGroupAndOffsets(offsetTopicPartitionId, onGroupLoaded)
+      epochForPartitionId.put(offsetTopicPartitionId, coordinatorEpoch)
+    } else {
+      warn(s"Ignored election as group coordinator for partition $offsetTopicPartitionId " +
+        s"in epoch $coordinatorEpoch since current epoch is $currentEpoch")
+    }
   }
 
   /**
    * Unload cached state for the given partition and stop handling requests for groups which map to it.
    *
    * @param offsetTopicPartitionId The partition we are no longer leading
    */
-  def onResignation(offsetTopicPartitionId: Int): Unit = {
-    info(s"Resigned as the group coordinator for partition $offsetTopicPartitionId")
-    groupManager.removeGroupsForPartition(offsetTopicPartitionId, onGroupUnloaded)
+  def onResignation(offsetTopicPartitionId: Int, coordinatorEpoch: Option[Int]): Unit = {
+    val currentEpoch = Option(epochForPartitionId.get(offsetTopicPartitionId))
+    if (currentEpoch.forall(currentEpoch => currentEpoch <= coordinatorEpoch.getOrElse(Int.MaxValue))) {

Review comment:
       I have probably not been doing a good job of being clear. It is useful to bump the epoch whenever we observe a larger value whether it is in `onResignation` or `onElection`. This protects us from all potential reorderings.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#issuecomment-773096994


   This change looks good to me. @hachikuji could you take another look before we merge?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#discussion_r618021571



##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -905,19 +907,33 @@ class GroupCoordinator(val brokerId: Int,
    *
    * @param offsetTopicPartitionId The partition we are now leading
    */
-  def onElection(offsetTopicPartitionId: Int): Unit = {
-    info(s"Elected as the group coordinator for partition $offsetTopicPartitionId")
-    groupManager.scheduleLoadGroupAndOffsets(offsetTopicPartitionId, onGroupLoaded)
+  def onElection(offsetTopicPartitionId: Int, coordinatorEpoch: Int): Unit = {
+    val currentEpoch = epochForPartitionId.get(offsetTopicPartitionId)
+    if (currentEpoch.forall(currentEpoch => coordinatorEpoch > currentEpoch)) {
+      info(s"Elected as the group coordinator for partition $offsetTopicPartitionId in epoch $coordinatorEpoch")
+      groupManager.scheduleLoadGroupAndOffsets(offsetTopicPartitionId, onGroupLoaded)
+      epochForPartitionId.put(offsetTopicPartitionId, coordinatorEpoch)
+    } else {
+      warn(s"Ignored election as group coordinator for partition $offsetTopicPartitionId " +
+        s"in epoch $coordinatorEpoch since current epoch is $currentEpoch")
+    }
   }
 
   /**
    * Unload cached state for the given partition and stop handling requests for groups which map to it.
    *
    * @param offsetTopicPartitionId The partition we are no longer leading
    */
-  def onResignation(offsetTopicPartitionId: Int): Unit = {
-    info(s"Resigned as the group coordinator for partition $offsetTopicPartitionId")
-    groupManager.removeGroupsForPartition(offsetTopicPartitionId, onGroupUnloaded)
+  def onResignation(offsetTopicPartitionId: Int, coordinatorEpoch: Option[Int]): Unit = {
+    val currentEpoch = epochForPartitionId.get(offsetTopicPartitionId)
+    if (currentEpoch.forall(currentEpoch => currentEpoch <= coordinatorEpoch.getOrElse(Int.MaxValue))) {
+      info(s"Resigned as the group coordinator for partition $offsetTopicPartitionId in epoch $coordinatorEpoch")
+      groupManager.removeGroupsForPartition(offsetTopicPartitionId, onGroupUnloaded)
+      epochForPartitionId.remove(offsetTopicPartitionId)

Review comment:
       The `onResignation` hook just means we lost leadership. By keeping track of the epoch, we are protected from all potential reorderings.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#discussion_r626250346



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -279,30 +279,33 @@ class KafkaApis(val requestChannel: RequestChannel,
         new StopReplicaResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code)))
     } else {
       val partitionStates = stopReplicaRequest.partitionStates().asScala
-      val (result, error) = replicaManager.stopReplicas(
-        request.context.correlationId,
-        stopReplicaRequest.controllerId,
-        stopReplicaRequest.controllerEpoch,
-        stopReplicaRequest.brokerEpoch,
-        partitionStates)
-      // Clear the coordinator caches in case we were the leader. In the case of a reassignment, we
-      // cannot rely on the LeaderAndIsr API for this since it is only sent to active replicas.
-      result.forKeyValue { (topicPartition, error) =>
-        if (error == Errors.NONE) {
-          if (topicPartition.topic == GROUP_METADATA_TOPIC_NAME
-              && partitionStates(topicPartition).deletePartition) {
-            groupCoordinator.onResignation(topicPartition.partition)
-          } else if (topicPartition.topic == TRANSACTION_STATE_TOPIC_NAME
-                     && partitionStates(topicPartition).deletePartition) {
+      def onStopReplicas(error: Errors, partitions: Map[TopicPartition, Errors]): Unit = {
+        // Clear the coordinator caches in case we were the leader. In the case of a reassignment, we
+        // cannot rely on the LeaderAndIsr API for this since it is only sent to active replicas.
+        partitions.forKeyValue { (topicPartition, partitionError) =>
+          if (partitionError == Errors.NONE) {
             val partitionState = partitionStates(topicPartition)
             val leaderEpoch = if (partitionState.leaderEpoch >= 0)
-                Some(partitionState.leaderEpoch)
+              Some(partitionState.leaderEpoch)
             else
               None
-            txnCoordinator.onResignation(topicPartition.partition, coordinatorEpoch = leaderEpoch)
+            if (topicPartition.topic == GROUP_METADATA_TOPIC_NAME
+              && partitionState.deletePartition) {
+              groupCoordinator.onResignation(topicPartition.partition, leaderEpoch)
+            } else if (topicPartition.topic == TRANSACTION_STATE_TOPIC_NAME
+              && partitionState.deletePartition) {
+              txnCoordinator.onResignation(topicPartition.partition, coordinatorEpoch = leaderEpoch)
+            }
           }
         }
       }
+      val (result, error) = replicaManager.stopReplicas(
+        request.context.correlationId,
+        stopReplicaRequest.controllerId,
+        stopReplicaRequest.controllerEpoch,
+        stopReplicaRequest.brokerEpoch,
+        partitionStates,
+        onStopReplicas)

Review comment:
       I think `partitionLock` and `replicaStateChangeLock` are for different purposes here: the latter is specifically for changing the replica state including leader, ISR, while the former is for more general access patterns? @hachikuji could you chime in here if you got some time.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

Posted by GitBox <gi...@apache.org>.
tombentley commented on pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#issuecomment-784956360


   Rebased for conflict. Grateful if you could take a look @hachikuji.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#discussion_r624219744



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -279,30 +279,33 @@ class KafkaApis(val requestChannel: RequestChannel,
         new StopReplicaResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code)))
     } else {
       val partitionStates = stopReplicaRequest.partitionStates().asScala
-      val (result, error) = replicaManager.stopReplicas(
-        request.context.correlationId,
-        stopReplicaRequest.controllerId,
-        stopReplicaRequest.controllerEpoch,
-        stopReplicaRequest.brokerEpoch,
-        partitionStates)
-      // Clear the coordinator caches in case we were the leader. In the case of a reassignment, we
-      // cannot rely on the LeaderAndIsr API for this since it is only sent to active replicas.
-      result.forKeyValue { (topicPartition, error) =>
-        if (error == Errors.NONE) {
-          if (topicPartition.topic == GROUP_METADATA_TOPIC_NAME
-              && partitionStates(topicPartition).deletePartition) {
-            groupCoordinator.onResignation(topicPartition.partition)
-          } else if (topicPartition.topic == TRANSACTION_STATE_TOPIC_NAME
-                     && partitionStates(topicPartition).deletePartition) {
+      def onStopReplicas(error: Errors, partitions: Map[TopicPartition, Errors]): Unit = {
+        // Clear the coordinator caches in case we were the leader. In the case of a reassignment, we
+        // cannot rely on the LeaderAndIsr API for this since it is only sent to active replicas.
+        partitions.forKeyValue { (topicPartition, partitionError) =>
+          if (partitionError == Errors.NONE) {
             val partitionState = partitionStates(topicPartition)
             val leaderEpoch = if (partitionState.leaderEpoch >= 0)
-                Some(partitionState.leaderEpoch)
+              Some(partitionState.leaderEpoch)
             else
               None
-            txnCoordinator.onResignation(topicPartition.partition, coordinatorEpoch = leaderEpoch)
+            if (topicPartition.topic == GROUP_METADATA_TOPIC_NAME
+              && partitionState.deletePartition) {
+              groupCoordinator.onResignation(topicPartition.partition, leaderEpoch)
+            } else if (topicPartition.topic == TRANSACTION_STATE_TOPIC_NAME
+              && partitionState.deletePartition) {
+              txnCoordinator.onResignation(topicPartition.partition, coordinatorEpoch = leaderEpoch)
+            }
           }
         }
       }
+      val (result, error) = replicaManager.stopReplicas(
+        request.context.correlationId,
+        stopReplicaRequest.controllerId,
+        stopReplicaRequest.controllerEpoch,
+        stopReplicaRequest.brokerEpoch,
+        partitionStates,
+        onStopReplicas)

Review comment:
       @hachikuji I think this is related to an earlier comment: https://github.com/apache/kafka/pull/9441#issuecomment-744788568 The concern is that `onResignation` is called from two places: `handleStopReplicaRequest` and `handleLeaderAndIsrRequest`. The latter is protected by `replicaStateChangeLock` but the former is not, and hence race conditions may still happen.
   
   The current approach seems to be going to a slight different way: instead of always trying to synchronize under `replicaStateChangeLock`, we just compare and swapping the `epochForPartitionId`, is that right?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on a change in pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

Posted by GitBox <gi...@apache.org>.
tombentley commented on a change in pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#discussion_r615722539



##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -905,19 +907,33 @@ class GroupCoordinator(val brokerId: Int,
    *
    * @param offsetTopicPartitionId The partition we are now leading
    */
-  def onElection(offsetTopicPartitionId: Int): Unit = {
-    info(s"Elected as the group coordinator for partition $offsetTopicPartitionId")
-    groupManager.scheduleLoadGroupAndOffsets(offsetTopicPartitionId, onGroupLoaded)
+  def onElection(offsetTopicPartitionId: Int, coordinatorEpoch: Int): Unit = {
+    val currentEpoch = epochForPartitionId.get(offsetTopicPartitionId)
+    if (currentEpoch.forall(currentEpoch => coordinatorEpoch > currentEpoch)) {
+      info(s"Elected as the group coordinator for partition $offsetTopicPartitionId in epoch $coordinatorEpoch")
+      groupManager.scheduleLoadGroupAndOffsets(offsetTopicPartitionId, onGroupLoaded)
+      epochForPartitionId.put(offsetTopicPartitionId, coordinatorEpoch)
+    } else {
+      warn(s"Ignored election as group coordinator for partition $offsetTopicPartitionId " +
+        s"in epoch $coordinatorEpoch since current epoch is $currentEpoch")
+    }
   }
 
   /**
    * Unload cached state for the given partition and stop handling requests for groups which map to it.
    *
    * @param offsetTopicPartitionId The partition we are no longer leading
    */
-  def onResignation(offsetTopicPartitionId: Int): Unit = {
-    info(s"Resigned as the group coordinator for partition $offsetTopicPartitionId")
-    groupManager.removeGroupsForPartition(offsetTopicPartitionId, onGroupUnloaded)
+  def onResignation(offsetTopicPartitionId: Int, coordinatorEpoch: Option[Int]): Unit = {
+    val currentEpoch = epochForPartitionId.get(offsetTopicPartitionId)
+    if (currentEpoch.forall(currentEpoch => currentEpoch <= coordinatorEpoch.getOrElse(Int.MaxValue))) {
+      info(s"Resigned as the group coordinator for partition $offsetTopicPartitionId in epoch $coordinatorEpoch")
+      groupManager.removeGroupsForPartition(offsetTopicPartitionId, onGroupUnloaded)
+      epochForPartitionId.remove(offsetTopicPartitionId)

Review comment:
       I guess mostly I did it for symmetry with `onElection`, however you're right that is doesn't affect correctness. I don't entirely follow your point about how we could use this when the replica is being deleted though. Could you explain?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

Posted by GitBox <gi...@apache.org>.
tombentley commented on pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#issuecomment-709457342


   I assumed the scheduler would be using a heap to prioritize scheduled tasks, and that wouldn't preserve the order of submitted tasks with the same scheduled time. Since `System.currentTimeMillis()` doesn't tick regularly enough I can see you could have the two threads end up with the same scheduled time of execution (now), but execute in the wrong order. Obviously the FIFO queue approach would solve that, at the cost of an extra thread. 
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

Posted by GitBox <gi...@apache.org>.
tombentley commented on pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#issuecomment-745239513


   @guozhangwang that's a much better solution, thanks! I've implemented that and rebased for a conflict. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

Posted by GitBox <gi...@apache.org>.
tombentley commented on pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#issuecomment-855921781


   Thanks for the reviews @hachikuji @guozhangwang.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

Posted by GitBox <gi...@apache.org>.
tombentley commented on pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#issuecomment-828333212


   @hachikuji Thanks, I was being slow on the uptake, but what you describe makes perfect sense. I've moved the logic to the GroupMetadataManager, am using the max observed epoch and have added some more tests. Very grateful if you could take another look.
   
   One thing I found awkward was using `ConcurrentHashMap#compute` with the possibility of returning null (in the case of stop replicas it makes sense to remove the mapping, right?). I had to switch the `java.lang.Integer` type args, but maybe you know a better way of handling that?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#issuecomment-822034184


   @tombentley could you see if @hachikuji 's comments can be addressed? This is a pretty tricky bug that I would like to get fixed in 3.0. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

Posted by GitBox <gi...@apache.org>.
tombentley commented on pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#issuecomment-709225396


   @guozhangwang this is a draft implementation of one of the solutions you suggested. I will try to add a test, but in the meantime any comments you have are welcome. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#issuecomment-856074980


   Thank YOU for the great patience @tombentley (it lasts for more than 6 months..) !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#issuecomment-828703078


   @tombentley Thanks for the updates. I made a few small changes in this patch: https://github.com/hachikuji/kafka/commit/27ba937cc7b7da230ccc4f0c8220f680c4a542fe. The main things are taking the loading itself out of `compute` (which is intended to be a cheap operation) and moving the `loadingPartitions` check. Feel free to pull in the changes if they make sense to you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#issuecomment-709447330


   @tombentley Actually I missed the holding of the `replicaStateChangeLock` inside `ReplicaManager.becomeLeaderOrFollower`. So the reordering I suggested above seems not possible. Perhaps the problem really is submission to the scheduler. I think we had given the scheduler only a single thread assuming that was good enough to protect the submission order, but maybe it is not.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#issuecomment-744788568


   > So you're saying that the wrong network thread handling one of these two LeaderAndISR acquires `replicaStateChangeLock` first, right?
   
   Sort of :) I think it is possible that, two ISR request reaches the same broker from old and new controller, and they are handled by two different handler threads of the broker. The handler thread handling the old controller, may grab the lock first.
   
   > since although `onLeadershipChange` is executed with `ReplicaManager.replicaStateChangeLock`, `onResignation` is also called via `handleStopReplicaRequest`
   
   How about this: since `handleStopReplicaRequest` would call `replicaManager.stopReplicas` which is also protected by the lock, we can just make the following block
   
   ```
   if (error == Errors.NONE) {
             if (topicPartition.topic == GROUP_METADATA_TOPIC_NAME
                 && partitionStates(topicPartition).deletePartition) {
               groupCoordinator.onResignation(topicPartition.partition)
             } else if (topicPartition.topic == TRANSACTION_STATE_TOPIC_NAME
                        && partitionStates(topicPartition).deletePartition) {
               val partitionState = partitionStates(topicPartition)
               val leaderEpoch = if (partitionState.leaderEpoch >= 0)
                   Some(partitionState.leaderEpoch)
               else
                 None
               txnCoordinator.onResignation(topicPartition.partition, coordinatorEpoch = leaderEpoch)
             }
   ```
   
   which is currently triggered after the call, as passed in as a delta function parameter **into that call**, just like what `becomeLeaderOrFollower` did. Then as long as we can pass in the `coordinatorEpoch = leaderEpoch` to the `groupCoordinator#onResignation` that should be sufficient.
   
   -----------
   
   I looked at your current PR, and I think that looks promising. If you agree my above suggestion and made the changes accordingly, I think we can turn this draft as an official PR and merge.
   
   cc @cadonna @hachikuji 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

Posted by GitBox <gi...@apache.org>.
tombentley commented on pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#issuecomment-766880511


   Rebased for conflict. 
   
   @guozhangwang @cadonna @hachikuji please could one of you take a look?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

Posted by GitBox <gi...@apache.org>.
tombentley commented on pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#issuecomment-769621007


   Thanks, and no worries about the wait.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

Posted by GitBox <gi...@apache.org>.
tombentley commented on pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#issuecomment-709439774


   @hachikuji, yes I see that although replica manager might be synchronized it doesn't prevent the race. I was already working on introducing the epoch into the group coordinator (as the transaction coordinator already does). I'll rework the patch, thanks for the feedback.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

Posted by GitBox <gi...@apache.org>.
tombentley commented on pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#issuecomment-829027535


   @hachikuji that's much better, thank you. I added some logging where we ignore on the remove path, hope that's OK.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

Posted by GitBox <gi...@apache.org>.
tombentley commented on pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#issuecomment-822355393


   @hachikuji I've addressed your comments, if you want to take another look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

Posted by GitBox <gi...@apache.org>.
tombentley commented on pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#issuecomment-824800741


   Thanks @hachikuji, it's much simpler now, if you could take another look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley merged pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

Posted by GitBox <gi...@apache.org>.
tombentley merged pull request #9441:
URL: https://github.com/apache/kafka/pull/9441


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#discussion_r643513632



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -279,30 +279,33 @@ class KafkaApis(val requestChannel: RequestChannel,
         new StopReplicaResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code)))
     } else {
       val partitionStates = stopReplicaRequest.partitionStates().asScala
-      val (result, error) = replicaManager.stopReplicas(
-        request.context.correlationId,
-        stopReplicaRequest.controllerId,
-        stopReplicaRequest.controllerEpoch,
-        stopReplicaRequest.brokerEpoch,
-        partitionStates)
-      // Clear the coordinator caches in case we were the leader. In the case of a reassignment, we
-      // cannot rely on the LeaderAndIsr API for this since it is only sent to active replicas.
-      result.forKeyValue { (topicPartition, error) =>
-        if (error == Errors.NONE) {
-          if (topicPartition.topic == GROUP_METADATA_TOPIC_NAME
-              && partitionStates(topicPartition).deletePartition) {
-            groupCoordinator.onResignation(topicPartition.partition)
-          } else if (topicPartition.topic == TRANSACTION_STATE_TOPIC_NAME
-                     && partitionStates(topicPartition).deletePartition) {
+      def onStopReplicas(error: Errors, partitions: Map[TopicPartition, Errors]): Unit = {
+        // Clear the coordinator caches in case we were the leader. In the case of a reassignment, we
+        // cannot rely on the LeaderAndIsr API for this since it is only sent to active replicas.
+        partitions.forKeyValue { (topicPartition, partitionError) =>
+          if (partitionError == Errors.NONE) {
             val partitionState = partitionStates(topicPartition)
             val leaderEpoch = if (partitionState.leaderEpoch >= 0)
-                Some(partitionState.leaderEpoch)
+              Some(partitionState.leaderEpoch)
             else
               None
-            txnCoordinator.onResignation(topicPartition.partition, coordinatorEpoch = leaderEpoch)
+            if (topicPartition.topic == GROUP_METADATA_TOPIC_NAME
+              && partitionState.deletePartition) {
+              groupCoordinator.onResignation(topicPartition.partition, leaderEpoch)
+            } else if (topicPartition.topic == TRANSACTION_STATE_TOPIC_NAME
+              && partitionState.deletePartition) {
+              txnCoordinator.onResignation(topicPartition.partition, coordinatorEpoch = leaderEpoch)
+            }
           }
         }
       }
+      val (result, error) = replicaManager.stopReplicas(
+        request.context.correlationId,
+        stopReplicaRequest.controllerId,
+        stopReplicaRequest.controllerEpoch,
+        stopReplicaRequest.brokerEpoch,
+        partitionStates,
+        onStopReplicas)

Review comment:
       If I understand correctly, the original issue concerned the potential reordering of loading/unloading events. This was possible because of inconsistent locking and the fact that we relied 100% on the order that the task was submitted to the scheduler. With this patch, we are now using the leader epoch in order to ensure that loading/unloading events are handled in the correct order. This means it does not actually matter if the events get submitted to the scheduler in the wrong order. Does that make sense or am I still missing something?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#issuecomment-855018136


   Made another pass on the patch. LGTM! I think we can merge after resolved the conflicts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#discussion_r607374397



##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -905,19 +907,33 @@ class GroupCoordinator(val brokerId: Int,
    *
    * @param offsetTopicPartitionId The partition we are now leading
    */
-  def onElection(offsetTopicPartitionId: Int): Unit = {
-    info(s"Elected as the group coordinator for partition $offsetTopicPartitionId")
-    groupManager.scheduleLoadGroupAndOffsets(offsetTopicPartitionId, onGroupLoaded)
+  def onElection(offsetTopicPartitionId: Int, coordinatorEpoch: Int): Unit = {
+    val currentEpoch = epochForPartitionId.get(offsetTopicPartitionId)
+    if (currentEpoch.forall(currentEpoch => coordinatorEpoch > currentEpoch)) {
+      info(s"Elected as the group coordinator for partition $offsetTopicPartitionId in epoch $coordinatorEpoch")
+      groupManager.scheduleLoadGroupAndOffsets(offsetTopicPartitionId, onGroupLoaded)
+      epochForPartitionId.put(offsetTopicPartitionId, coordinatorEpoch)
+    } else {
+      warn(s"Ignored election as group coordinator for partition $offsetTopicPartitionId " +
+        s"in epoch $coordinatorEpoch since current epoch is $currentEpoch")
+    }
   }
 
   /**
    * Unload cached state for the given partition and stop handling requests for groups which map to it.
    *
    * @param offsetTopicPartitionId The partition we are no longer leading
    */
-  def onResignation(offsetTopicPartitionId: Int): Unit = {
-    info(s"Resigned as the group coordinator for partition $offsetTopicPartitionId")
-    groupManager.removeGroupsForPartition(offsetTopicPartitionId, onGroupUnloaded)
+  def onResignation(offsetTopicPartitionId: Int, coordinatorEpoch: Option[Int]): Unit = {
+    val currentEpoch = epochForPartitionId.get(offsetTopicPartitionId)
+    if (currentEpoch.forall(currentEpoch => currentEpoch <= coordinatorEpoch.getOrElse(Int.MaxValue))) {
+      info(s"Resigned as the group coordinator for partition $offsetTopicPartitionId in epoch $coordinatorEpoch")
+      groupManager.removeGroupsForPartition(offsetTopicPartitionId, onGroupUnloaded)
+      epochForPartitionId.remove(offsetTopicPartitionId)

Review comment:
       Hmm.. Why remove the epoch after resignation? It seems like it would be useful to keep tracking it. Maybe it's useful to distinguish the case where the replica is to be deleted?

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -87,6 +87,8 @@ class GroupCoordinator(val brokerId: Int,
 
   private val isActive = new AtomicBoolean(false)
 
+  val epochForPartitionId = mutable.Map[Int, Int]()

Review comment:
       Does this need to be a concurrent collection? It does not look like we can count on a lock protecting `onElection` and `onResignation`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

Posted by GitBox <gi...@apache.org>.
tombentley commented on pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#issuecomment-766880511


   Rebased for conflict. 
   
   @guozhangwang @cadonna @hachikuji please could one of you take a look?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on a change in pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

Posted by GitBox <gi...@apache.org>.
tombentley commented on a change in pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#discussion_r644571665



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -279,30 +279,33 @@ class KafkaApis(val requestChannel: RequestChannel,
         new StopReplicaResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code)))
     } else {
       val partitionStates = stopReplicaRequest.partitionStates().asScala
-      val (result, error) = replicaManager.stopReplicas(
-        request.context.correlationId,
-        stopReplicaRequest.controllerId,
-        stopReplicaRequest.controllerEpoch,
-        stopReplicaRequest.brokerEpoch,
-        partitionStates)
-      // Clear the coordinator caches in case we were the leader. In the case of a reassignment, we
-      // cannot rely on the LeaderAndIsr API for this since it is only sent to active replicas.
-      result.forKeyValue { (topicPartition, error) =>
-        if (error == Errors.NONE) {
-          if (topicPartition.topic == GROUP_METADATA_TOPIC_NAME
-              && partitionStates(topicPartition).deletePartition) {
-            groupCoordinator.onResignation(topicPartition.partition)
-          } else if (topicPartition.topic == TRANSACTION_STATE_TOPIC_NAME
-                     && partitionStates(topicPartition).deletePartition) {
+      def onStopReplicas(error: Errors, partitions: Map[TopicPartition, Errors]): Unit = {
+        // Clear the coordinator caches in case we were the leader. In the case of a reassignment, we
+        // cannot rely on the LeaderAndIsr API for this since it is only sent to active replicas.
+        partitions.forKeyValue { (topicPartition, partitionError) =>
+          if (partitionError == Errors.NONE) {
             val partitionState = partitionStates(topicPartition)
             val leaderEpoch = if (partitionState.leaderEpoch >= 0)
-                Some(partitionState.leaderEpoch)
+              Some(partitionState.leaderEpoch)
             else
               None
-            txnCoordinator.onResignation(topicPartition.partition, coordinatorEpoch = leaderEpoch)
+            if (topicPartition.topic == GROUP_METADATA_TOPIC_NAME
+              && partitionState.deletePartition) {
+              groupCoordinator.onResignation(topicPartition.partition, leaderEpoch)
+            } else if (topicPartition.topic == TRANSACTION_STATE_TOPIC_NAME
+              && partitionState.deletePartition) {
+              txnCoordinator.onResignation(topicPartition.partition, coordinatorEpoch = leaderEpoch)
+            }
           }
         }
       }
+      val (result, error) = replicaManager.stopReplicas(
+        request.context.correlationId,
+        stopReplicaRequest.controllerId,
+        stopReplicaRequest.controllerEpoch,
+        stopReplicaRequest.brokerEpoch,
+        partitionStates,
+        onStopReplicas)

Review comment:
       @hachikuji as @guozhangwang pointed out, the scheduler really _is_ FIFO (it uses a sequence number internally to guarantee order is maintained), so assuming his theory about the racing i/o threads is correct (and I think it is, but I've never observed this problem) then his solution of holding the lock for `handleStopReplicaRequest` would work. 
   
   The current version of the PR doesn't make assumptions about how any reordering can happen (i.e. whether caused by the inconsistent locking or anything else). So I don't think you're missing anything, you've just solved the problem differently.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#discussion_r618021571



##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -905,19 +907,33 @@ class GroupCoordinator(val brokerId: Int,
    *
    * @param offsetTopicPartitionId The partition we are now leading
    */
-  def onElection(offsetTopicPartitionId: Int): Unit = {
-    info(s"Elected as the group coordinator for partition $offsetTopicPartitionId")
-    groupManager.scheduleLoadGroupAndOffsets(offsetTopicPartitionId, onGroupLoaded)
+  def onElection(offsetTopicPartitionId: Int, coordinatorEpoch: Int): Unit = {
+    val currentEpoch = epochForPartitionId.get(offsetTopicPartitionId)
+    if (currentEpoch.forall(currentEpoch => coordinatorEpoch > currentEpoch)) {
+      info(s"Elected as the group coordinator for partition $offsetTopicPartitionId in epoch $coordinatorEpoch")
+      groupManager.scheduleLoadGroupAndOffsets(offsetTopicPartitionId, onGroupLoaded)
+      epochForPartitionId.put(offsetTopicPartitionId, coordinatorEpoch)
+    } else {
+      warn(s"Ignored election as group coordinator for partition $offsetTopicPartitionId " +
+        s"in epoch $coordinatorEpoch since current epoch is $currentEpoch")
+    }
   }
 
   /**
    * Unload cached state for the given partition and stop handling requests for groups which map to it.
    *
    * @param offsetTopicPartitionId The partition we are no longer leading
    */
-  def onResignation(offsetTopicPartitionId: Int): Unit = {
-    info(s"Resigned as the group coordinator for partition $offsetTopicPartitionId")
-    groupManager.removeGroupsForPartition(offsetTopicPartitionId, onGroupUnloaded)
+  def onResignation(offsetTopicPartitionId: Int, coordinatorEpoch: Option[Int]): Unit = {
+    val currentEpoch = epochForPartitionId.get(offsetTopicPartitionId)
+    if (currentEpoch.forall(currentEpoch => currentEpoch <= coordinatorEpoch.getOrElse(Int.MaxValue))) {
+      info(s"Resigned as the group coordinator for partition $offsetTopicPartitionId in epoch $coordinatorEpoch")
+      groupManager.removeGroupsForPartition(offsetTopicPartitionId, onGroupUnloaded)
+      epochForPartitionId.remove(offsetTopicPartitionId)

Review comment:
       The `onResignation` hook just means we lost leadership. By keeping track of the epoch, we are protected from all LeaderAndIsr reorderings.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

Posted by GitBox <gi...@apache.org>.
tombentley commented on pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#issuecomment-713663078


   @guozhangwang, @hachikuji I was thinking... solving it as I have now (keeping track of the epoch) doesn't address another potential problem case where the tenure as leader is very short and the background task to load the state runs after the background task to unload the state in the same epoch. Obviously in this case the load should not be done (I guess it could result in a broker not returning NOT_COORDINATOR when it should, based on incorrect state of `ownedPartitions`). We could track a high watermark coordinator epoch and guard the load with a check (as well as the unload). 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

Posted by GitBox <gi...@apache.org>.
tombentley commented on pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#issuecomment-843148293


   @hachikuji I fixed the tests and made you suggested changes, but what do you think about @guozhangwang's point [above](https://github.com/apache/kafka/pull/9441#discussion_r624219744)?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#issuecomment-738579702


   Hey @tombentley Sorry for the late reply! I also checked the source code of `ScheduledThreadPoolExecutor` and I think what you've inferred is correct: although it did not keep a single thread alive all the time but dynamically creates and stops the thread in case there's no tasks scheduled for a long time (note we set num.thread == 1), the blocking queue should still guarantee a FIFO ordering.
   
   So what's possibly happening is that, the thread handling an leaderISR request with the old controller epoch grabs the lock and proceeds first. In that case, maybe we can simplify your proposed solution as the following:
   
   * note that, like @hachikuji mentioned, `onLeadershipChange` is actually protected by the `replicaStateChangeLock` inside the `becomeLeaderOrFollower`, and hence we only need to pass in the controller epoch in `groupCoordinator.onElection/onResignation` just like what we did in `txnCoordinator`.
   
   * Inside the `groupCoordinator.onElection/onResignation`, which is lock-protected, we just remember the latest controller epoch at the `GroupCoordinator`, and then we just to one check right before `scheduleLoadGroupAndOffsets/removeGroupsForPartition` against that controller epoch, if not passed, we can log and skip the loading / unloading function call.
   
   WDYT?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#issuecomment-769618984


   Yes! I will review it again. Thanks for hanging on there and my apologies... Review has always been a bit overwhelming for me :) 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#issuecomment-852487435


   @tombentley Apologies for the delay. I was out on leave for the past month. I responded to Guozhang's comment. Let me know if it makes sense or not.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org