You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/09/01 17:31:55 UTC

[GitHub] [kafka] hachikuji opened a new pull request #11289: KAFKA-13254 [WIP]: Fix deadlock when AlterIsr response returns

hachikuji opened a new pull request #11289:
URL: https://github.com/apache/kafka/pull/11289


   This patch fixes a deadlock when incrementing the high watermark after the synchronous zk ISR modification happens. The main difference is that we prevent the callback from executing while under the leader and ISR lock.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #11289: KAFKA-13254: Fix deadlock when AlterIsr response returns

Posted by GitBox <gi...@apache.org>.
hachikuji merged pull request #11289:
URL: https://github.com/apache/kafka/pull/11289


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11289: KAFKA-13254: Fix deadlock when AlterIsr response returns

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11289:
URL: https://github.com/apache/kafka/pull/11289#discussion_r711231383



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -1304,53 +1323,41 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
-  private[cluster] def expandIsr(newInSyncReplica: Int): Unit = {
-    // This is called from maybeExpandIsr which holds the ISR write lock
-    if (!isrState.isInflight) {
-      // When expanding the ISR, we can safely assume the new replica will make it into the ISR since this puts us in
-      // a more constrained state for advancing the HW.
-      sendAlterIsrRequest(PendingExpandIsr(isrState.isr, newInSyncReplica))
-    } else {
-      trace(s"ISR update in-flight, not adding new in-sync replica $newInSyncReplica")
-    }
+  private def prepareIsrExpand(newInSyncReplicaId: Int): PendingExpandIsr = {
+    // When expanding the ISR, we assume that the new replica will make it into the ISR
+    // before we receive confirmation that it has. This ensures that the HW will already
+    // reflect the updated ISR even if there is a delay before we receive the confirmation.
+    // Alternatively, if the update fails, no harm is done since the expanded ISR puts
+    // a stricter requirement for advancement of the HW.
+    val isrToSend = isrState.isr + newInSyncReplicaId
+    val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, isrToSend.toList, zkVersion)
+    val updatedState = PendingExpandIsr(isrState.isr, newInSyncReplicaId, newLeaderAndIsr)
+    isrState = updatedState
+    updatedState
   }
 
-  private[cluster] def shrinkIsr(outOfSyncReplicas: Set[Int]): Unit = {
-    // This is called from maybeShrinkIsr which holds the ISR write lock
-    if (!isrState.isInflight) {
-      // When shrinking the ISR, we cannot assume that the update will succeed as this could erroneously advance the HW
-      // We update pendingInSyncReplicaIds here simply to prevent any further ISR updates from occurring until we get
-      // the next LeaderAndIsr
-      sendAlterIsrRequest(PendingShrinkIsr(isrState.isr, outOfSyncReplicas))
-    } else {
-      trace(s"ISR update in-flight, not removing out-of-sync replicas $outOfSyncReplicas")
-    }
+  private[cluster] def prepareIsrShrink(outOfSyncReplicaIds: Set[Int]): PendingShrinkIsr = {
+    // When shrinking the ISR, we cannot assume that the update will succeed as this could
+    // erroneously advance the HW if the `AlterIsr` were to fail. Hence the "maximal ISR"
+    // for `PendingShrinkIsr` is the the current ISR.
+    val isrToSend = isrState.isr -- outOfSyncReplicaIds
+    val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, isrToSend.toList, zkVersion)
+    val updatedState = PendingShrinkIsr(isrState.isr, outOfSyncReplicaIds, newLeaderAndIsr)
+    isrState = updatedState
+    updatedState
   }
 
-  private def sendAlterIsrRequest(proposedIsrState: IsrState): Unit = {
-    val isrToSend: Set[Int] = proposedIsrState match {
-      case PendingExpandIsr(isr, newInSyncReplicaId) => isr + newInSyncReplicaId
-      case PendingShrinkIsr(isr, outOfSyncReplicaIds) => isr -- outOfSyncReplicaIds
-      case state =>
-        isrChangeListener.markFailed()
-        throw new IllegalStateException(s"Invalid state $state for ISR change for partition $topicPartition")
-    }
+  private def alterIsr(proposedIsrState: PendingIsrChange): CompletableFuture[LeaderAndIsr] = {
+    debug(s"Submitting ISR state change $proposedIsrState")
+    val future = alterIsrManager.submit(topicPartition, proposedIsrState.sentLeaderAndIsr, controllerEpoch)
 
-    val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, isrToSend.toList, zkVersion)
-    val alterIsrItem = AlterIsrItem(topicPartition, newLeaderAndIsr, handleAlterIsrResponse(proposedIsrState), controllerEpoch)
-
-    val oldState = isrState
-    isrState = proposedIsrState
-
-    if (!alterIsrManager.submit(alterIsrItem)) {
-      // If the ISR manager did not accept our update, we need to revert the proposed state.
-      // This can happen if the ISR state was updated by the controller (via LeaderAndIsr in ZK-mode or
-      // ChangePartitionRecord in KRaft mode) but we have an AlterIsr request still in-flight.
-      isrState = oldState
-      isrChangeListener.markFailed()
-      warn(s"Failed to enqueue ISR change state $newLeaderAndIsr for partition $topicPartition")
-    } else {
-      debug(s"Enqueued ISR change to state $newLeaderAndIsr after transition to $proposedIsrState")
+    val callback = handleAlterIsrResponse(proposedIsrState) _

Review comment:
       Yeah, thanks for the nudge. I ended up refactoring the handling logic. It seems cleaner now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11289: KAFKA-13254: Fix deadlock when AlterIsr response returns

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11289:
URL: https://github.com/apache/kafka/pull/11289#discussion_r711228534



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -1360,41 +1367,46 @@ class Partition(val topicPartition: TopicPartition,
    * Since our error was non-retryable we are okay staying in this state until we see new metadata from UpdateMetadata
    * or LeaderAndIsr
    */
-  private def handleAlterIsrResponse(proposedIsrState: IsrState)(result: Either[Errors, LeaderAndIsr]): Unit = {
-    val hwIncremented = inWriteLock(leaderIsrUpdateLock) {
+  private def handleAlterIsrResponse(proposedIsrState: PendingIsrChange)(result: Either[Errors, LeaderAndIsr]): Unit = {
+    var hwIncremented = false
+    var shouldRetry = false
+
+    inWriteLock(leaderIsrUpdateLock) {
       if (isrState != proposedIsrState) {
         // This means isrState was updated through leader election or some other mechanism before we got the AlterIsr
         // response. We don't know what happened on the controller exactly, but we do know this response is out of date
         // so we ignore it.
         debug(s"Ignoring failed ISR update to $proposedIsrState since we have already updated state to $isrState")
-        false
       } else {
         result match {
           case Left(error: Errors) =>
             isrChangeListener.markFailed()
             error match {
+              case Errors.OPERATION_NOT_ATTEMPTED =>
+                // Since the operation was not attempted, it is safe to reset back to the committed state.
+                isrState = CommittedIsr(proposedIsrState.isr)

Review comment:
       Right. The pending state changes have an inflight flag that we check for before attempting another change.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #11289: KAFKA-13254 [WIP]: Fix deadlock when AlterIsr response returns

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #11289:
URL: https://github.com/apache/kafka/pull/11289#issuecomment-913838350


   @hachikuji Thanks for the PR. Can we please also mention the change that introduced this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11289: KAFKA-13254: Fix deadlock when AlterIsr response returns

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11289:
URL: https://github.com/apache/kafka/pull/11289#discussion_r711204252



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -1304,53 +1323,41 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
-  private[cluster] def expandIsr(newInSyncReplica: Int): Unit = {
-    // This is called from maybeExpandIsr which holds the ISR write lock
-    if (!isrState.isInflight) {
-      // When expanding the ISR, we can safely assume the new replica will make it into the ISR since this puts us in
-      // a more constrained state for advancing the HW.
-      sendAlterIsrRequest(PendingExpandIsr(isrState.isr, newInSyncReplica))
-    } else {
-      trace(s"ISR update in-flight, not adding new in-sync replica $newInSyncReplica")
-    }
+  private def prepareIsrExpand(newInSyncReplicaId: Int): PendingExpandIsr = {
+    // When expanding the ISR, we assume that the new replica will make it into the ISR
+    // before we receive confirmation that it has. This ensures that the HW will already
+    // reflect the updated ISR even if there is a delay before we receive the confirmation.
+    // Alternatively, if the update fails, no harm is done since the expanded ISR puts
+    // a stricter requirement for advancement of the HW.
+    val isrToSend = isrState.isr + newInSyncReplicaId
+    val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, isrToSend.toList, zkVersion)
+    val updatedState = PendingExpandIsr(isrState.isr, newInSyncReplicaId, newLeaderAndIsr)
+    isrState = updatedState
+    updatedState
   }
 
-  private[cluster] def shrinkIsr(outOfSyncReplicas: Set[Int]): Unit = {
-    // This is called from maybeShrinkIsr which holds the ISR write lock
-    if (!isrState.isInflight) {
-      // When shrinking the ISR, we cannot assume that the update will succeed as this could erroneously advance the HW
-      // We update pendingInSyncReplicaIds here simply to prevent any further ISR updates from occurring until we get
-      // the next LeaderAndIsr
-      sendAlterIsrRequest(PendingShrinkIsr(isrState.isr, outOfSyncReplicas))
-    } else {
-      trace(s"ISR update in-flight, not removing out-of-sync replicas $outOfSyncReplicas")
-    }
+  private[cluster] def prepareIsrShrink(outOfSyncReplicaIds: Set[Int]): PendingShrinkIsr = {
+    // When shrinking the ISR, we cannot assume that the update will succeed as this could
+    // erroneously advance the HW if the `AlterIsr` were to fail. Hence the "maximal ISR"
+    // for `PendingShrinkIsr` is the the current ISR.
+    val isrToSend = isrState.isr -- outOfSyncReplicaIds
+    val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, isrToSend.toList, zkVersion)
+    val updatedState = PendingShrinkIsr(isrState.isr, outOfSyncReplicaIds, newLeaderAndIsr)
+    isrState = updatedState
+    updatedState
   }
 
-  private def sendAlterIsrRequest(proposedIsrState: IsrState): Unit = {
-    val isrToSend: Set[Int] = proposedIsrState match {
-      case PendingExpandIsr(isr, newInSyncReplicaId) => isr + newInSyncReplicaId
-      case PendingShrinkIsr(isr, outOfSyncReplicaIds) => isr -- outOfSyncReplicaIds
-      case state =>
-        isrChangeListener.markFailed()
-        throw new IllegalStateException(s"Invalid state $state for ISR change for partition $topicPartition")
-    }
+  private def alterIsr(proposedIsrState: PendingIsrChange): CompletableFuture[LeaderAndIsr] = {

Review comment:
       Yeah, the name could be improved. How about `submitAlterIsr` since that is a bit closer to what the function really does?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #11289: KAFKA-13254: Fix deadlock when AlterIsr response returns

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #11289:
URL: https://github.com/apache/kafka/pull/11289#issuecomment-921102941


   For reference, I'm able to reproduce the deadlock with the concurrency test in about 1 out of every 10 runs. With the fix here, I have not seen it again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #11289: KAFKA-13254: Fix deadlock when AlterIsr response returns

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #11289:
URL: https://github.com/apache/kafka/pull/11289#discussion_r711225003



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -1304,53 +1323,41 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
-  private[cluster] def expandIsr(newInSyncReplica: Int): Unit = {
-    // This is called from maybeExpandIsr which holds the ISR write lock
-    if (!isrState.isInflight) {
-      // When expanding the ISR, we can safely assume the new replica will make it into the ISR since this puts us in
-      // a more constrained state for advancing the HW.
-      sendAlterIsrRequest(PendingExpandIsr(isrState.isr, newInSyncReplica))
-    } else {
-      trace(s"ISR update in-flight, not adding new in-sync replica $newInSyncReplica")
-    }
+  private def prepareIsrExpand(newInSyncReplicaId: Int): PendingExpandIsr = {
+    // When expanding the ISR, we assume that the new replica will make it into the ISR
+    // before we receive confirmation that it has. This ensures that the HW will already
+    // reflect the updated ISR even if there is a delay before we receive the confirmation.
+    // Alternatively, if the update fails, no harm is done since the expanded ISR puts
+    // a stricter requirement for advancement of the HW.
+    val isrToSend = isrState.isr + newInSyncReplicaId
+    val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, isrToSend.toList, zkVersion)
+    val updatedState = PendingExpandIsr(isrState.isr, newInSyncReplicaId, newLeaderAndIsr)
+    isrState = updatedState
+    updatedState
   }
 
-  private[cluster] def shrinkIsr(outOfSyncReplicas: Set[Int]): Unit = {
-    // This is called from maybeShrinkIsr which holds the ISR write lock
-    if (!isrState.isInflight) {
-      // When shrinking the ISR, we cannot assume that the update will succeed as this could erroneously advance the HW
-      // We update pendingInSyncReplicaIds here simply to prevent any further ISR updates from occurring until we get
-      // the next LeaderAndIsr
-      sendAlterIsrRequest(PendingShrinkIsr(isrState.isr, outOfSyncReplicas))
-    } else {
-      trace(s"ISR update in-flight, not removing out-of-sync replicas $outOfSyncReplicas")
-    }
+  private[cluster] def prepareIsrShrink(outOfSyncReplicaIds: Set[Int]): PendingShrinkIsr = {
+    // When shrinking the ISR, we cannot assume that the update will succeed as this could
+    // erroneously advance the HW if the `AlterIsr` were to fail. Hence the "maximal ISR"
+    // for `PendingShrinkIsr` is the the current ISR.
+    val isrToSend = isrState.isr -- outOfSyncReplicaIds
+    val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, isrToSend.toList, zkVersion)
+    val updatedState = PendingShrinkIsr(isrState.isr, outOfSyncReplicaIds, newLeaderAndIsr)
+    isrState = updatedState
+    updatedState
   }
 
-  private def sendAlterIsrRequest(proposedIsrState: IsrState): Unit = {
-    val isrToSend: Set[Int] = proposedIsrState match {
-      case PendingExpandIsr(isr, newInSyncReplicaId) => isr + newInSyncReplicaId
-      case PendingShrinkIsr(isr, outOfSyncReplicaIds) => isr -- outOfSyncReplicaIds
-      case state =>
-        isrChangeListener.markFailed()
-        throw new IllegalStateException(s"Invalid state $state for ISR change for partition $topicPartition")
-    }
+  private def alterIsr(proposedIsrState: PendingIsrChange): CompletableFuture[LeaderAndIsr] = {

Review comment:
       `submitAlterIsr` sounds good to me.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #11289: KAFKA-13254 [WIP]: Fix deadlock when AlterIsr response returns

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #11289:
URL: https://github.com/apache/kafka/pull/11289#issuecomment-911102986


   I'm still working on some additional tests here, but it looks like this has fixed the upgrade/downgrade tests that were regularly failing before this patch: http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2021-09-01--001.system-test-kafka-branch-builder--1630549487--hachikuji--KAFKA-13254--f720b713ad/report.html.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #11289: KAFKA-13254: Fix deadlock when AlterIsr response returns

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #11289:
URL: https://github.com/apache/kafka/pull/11289#discussion_r711148821



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -1304,53 +1323,41 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
-  private[cluster] def expandIsr(newInSyncReplica: Int): Unit = {
-    // This is called from maybeExpandIsr which holds the ISR write lock
-    if (!isrState.isInflight) {
-      // When expanding the ISR, we can safely assume the new replica will make it into the ISR since this puts us in
-      // a more constrained state for advancing the HW.
-      sendAlterIsrRequest(PendingExpandIsr(isrState.isr, newInSyncReplica))
-    } else {
-      trace(s"ISR update in-flight, not adding new in-sync replica $newInSyncReplica")
-    }
+  private def prepareIsrExpand(newInSyncReplicaId: Int): PendingExpandIsr = {
+    // When expanding the ISR, we assume that the new replica will make it into the ISR
+    // before we receive confirmation that it has. This ensures that the HW will already
+    // reflect the updated ISR even if there is a delay before we receive the confirmation.
+    // Alternatively, if the update fails, no harm is done since the expanded ISR puts
+    // a stricter requirement for advancement of the HW.
+    val isrToSend = isrState.isr + newInSyncReplicaId
+    val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, isrToSend.toList, zkVersion)
+    val updatedState = PendingExpandIsr(isrState.isr, newInSyncReplicaId, newLeaderAndIsr)
+    isrState = updatedState
+    updatedState
   }
 
-  private[cluster] def shrinkIsr(outOfSyncReplicas: Set[Int]): Unit = {
-    // This is called from maybeShrinkIsr which holds the ISR write lock
-    if (!isrState.isInflight) {
-      // When shrinking the ISR, we cannot assume that the update will succeed as this could erroneously advance the HW
-      // We update pendingInSyncReplicaIds here simply to prevent any further ISR updates from occurring until we get
-      // the next LeaderAndIsr
-      sendAlterIsrRequest(PendingShrinkIsr(isrState.isr, outOfSyncReplicas))
-    } else {
-      trace(s"ISR update in-flight, not removing out-of-sync replicas $outOfSyncReplicas")
-    }
+  private[cluster] def prepareIsrShrink(outOfSyncReplicaIds: Set[Int]): PendingShrinkIsr = {
+    // When shrinking the ISR, we cannot assume that the update will succeed as this could
+    // erroneously advance the HW if the `AlterIsr` were to fail. Hence the "maximal ISR"
+    // for `PendingShrinkIsr` is the the current ISR.
+    val isrToSend = isrState.isr -- outOfSyncReplicaIds
+    val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, isrToSend.toList, zkVersion)
+    val updatedState = PendingShrinkIsr(isrState.isr, outOfSyncReplicaIds, newLeaderAndIsr)
+    isrState = updatedState
+    updatedState
   }
 
-  private def sendAlterIsrRequest(proposedIsrState: IsrState): Unit = {
-    val isrToSend: Set[Int] = proposedIsrState match {
-      case PendingExpandIsr(isr, newInSyncReplicaId) => isr + newInSyncReplicaId
-      case PendingShrinkIsr(isr, outOfSyncReplicaIds) => isr -- outOfSyncReplicaIds
-      case state =>
-        isrChangeListener.markFailed()
-        throw new IllegalStateException(s"Invalid state $state for ISR change for partition $topicPartition")
-    }
+  private def alterIsr(proposedIsrState: PendingIsrChange): CompletableFuture[LeaderAndIsr] = {
+    debug(s"Submitting ISR state change $proposedIsrState")
+    val future = alterIsrManager.submit(topicPartition, proposedIsrState.sentLeaderAndIsr, controllerEpoch)
 
-    val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, isrToSend.toList, zkVersion)
-    val alterIsrItem = AlterIsrItem(topicPartition, newLeaderAndIsr, handleAlterIsrResponse(proposedIsrState), controllerEpoch)
-
-    val oldState = isrState
-    isrState = proposedIsrState
-
-    if (!alterIsrManager.submit(alterIsrItem)) {
-      // If the ISR manager did not accept our update, we need to revert the proposed state.
-      // This can happen if the ISR state was updated by the controller (via LeaderAndIsr in ZK-mode or
-      // ChangePartitionRecord in KRaft mode) but we have an AlterIsr request still in-flight.
-      isrState = oldState
-      isrChangeListener.markFailed()
-      warn(s"Failed to enqueue ISR change state $newLeaderAndIsr for partition $topicPartition")
-    } else {
-      debug(s"Enqueued ISR change to state $newLeaderAndIsr after transition to $proposedIsrState")
+    val callback = handleAlterIsrResponse(proposedIsrState) _

Review comment:
       nit: Now that we don't pass the callback via the the `AlterIsrItem`, we don't really need the partial function anymore. Should we remove it to simplify the code?

##########
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
##########
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import java.net.InetAddress
+import java.util
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.{CompletableFuture, Executors, LinkedBlockingQueue, TimeUnit}
+import java.util.{Collections, Optional, Properties}
+
+import kafka.api.LeaderAndIsr
+import kafka.log.{AppendOrigin, LogConfig}
+import kafka.server.metadata.MockConfigRepository
+import kafka.utils.TestUtils.waitUntilTrue
+import kafka.utils.{MockTime, ShutdownableThread, TestUtils}
+import org.apache.kafka.common.metadata.{PartitionChangeRecord, PartitionRecord, TopicRecord}
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.SimpleRecord
+import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata
+import org.apache.kafka.common.requests.{FetchRequest, ProduceResponse}
+import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.common.{IsolationLevel, TopicPartition, Uuid}
+import org.apache.kafka.image.{MetadataDelta, MetadataImage}
+import org.apache.kafka.metadata.PartitionRegistration
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.{AfterEach, Test}
+import org.mockito.Mockito
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+import scala.util.Random
+
+class ReplicaManagerConcurrencyTest {
+
+  private val time = new MockTime()
+  private val metrics = new Metrics()
+  private val executor = Executors.newScheduledThreadPool(8)
+  private val tasks = mutable.Buffer.empty[ShutdownableThread]
+
+  private def submit(task: ShutdownableThread): Unit = {
+    tasks += task
+    executor.submit(task)
+  }
+
+  @AfterEach
+  def cleanup(): Unit = {
+    tasks.foreach(_.shutdown())
+    executor.shutdownNow()
+    executor.awaitTermination(5, TimeUnit.SECONDS)
+    metrics.close()
+  }
+
+  @Test
+  def testIsrExpandAndShrinkWithConcurrentProduce(): Unit = {

Review comment:
       This is really cool! 👍🏻 

##########
File path: core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
##########
@@ -309,8 +309,11 @@ class PartitionLockTest extends Logging {
     }
     when(offsetCheckpoints.fetch(ArgumentMatchers.anyString, ArgumentMatchers.eq(topicPartition)))
       .thenReturn(None)
-    when(alterIsrManager.submit(ArgumentMatchers.any[AlterIsrItem]))
-      .thenReturn(true)
+    when(alterIsrManager.submit(
+      ArgumentMatchers.eq(topicPartition),
+      ArgumentMatchers.any[LeaderAndIsr],
+      ArgumentMatchers.anyInt()))
+      .thenReturn(new CompletableFuture[LeaderAndIsr]())

Review comment:
       nit: These two lines read a bit weird. I would move the closing parenthesis of `when` and `submit` to the next line, before `.thenReturn` to make it a bit more readable.
   
   ```
   when(alterIsrManager.submit(
     ArgumentMatchers.eq(topicPartition),
     ArgumentMatchers.any[LeaderAndIsr],
     ArgumentMatchers.anyInt()
   )).thenReturn(new CompletableFuture[LeaderAndIsr]())
   ```

##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -1304,53 +1323,41 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
-  private[cluster] def expandIsr(newInSyncReplica: Int): Unit = {
-    // This is called from maybeExpandIsr which holds the ISR write lock
-    if (!isrState.isInflight) {
-      // When expanding the ISR, we can safely assume the new replica will make it into the ISR since this puts us in
-      // a more constrained state for advancing the HW.
-      sendAlterIsrRequest(PendingExpandIsr(isrState.isr, newInSyncReplica))
-    } else {
-      trace(s"ISR update in-flight, not adding new in-sync replica $newInSyncReplica")
-    }
+  private def prepareIsrExpand(newInSyncReplicaId: Int): PendingExpandIsr = {
+    // When expanding the ISR, we assume that the new replica will make it into the ISR
+    // before we receive confirmation that it has. This ensures that the HW will already
+    // reflect the updated ISR even if there is a delay before we receive the confirmation.
+    // Alternatively, if the update fails, no harm is done since the expanded ISR puts
+    // a stricter requirement for advancement of the HW.
+    val isrToSend = isrState.isr + newInSyncReplicaId
+    val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, isrToSend.toList, zkVersion)
+    val updatedState = PendingExpandIsr(isrState.isr, newInSyncReplicaId, newLeaderAndIsr)
+    isrState = updatedState
+    updatedState
   }
 
-  private[cluster] def shrinkIsr(outOfSyncReplicas: Set[Int]): Unit = {
-    // This is called from maybeShrinkIsr which holds the ISR write lock
-    if (!isrState.isInflight) {
-      // When shrinking the ISR, we cannot assume that the update will succeed as this could erroneously advance the HW
-      // We update pendingInSyncReplicaIds here simply to prevent any further ISR updates from occurring until we get
-      // the next LeaderAndIsr
-      sendAlterIsrRequest(PendingShrinkIsr(isrState.isr, outOfSyncReplicas))
-    } else {
-      trace(s"ISR update in-flight, not removing out-of-sync replicas $outOfSyncReplicas")
-    }
+  private[cluster] def prepareIsrShrink(outOfSyncReplicaIds: Set[Int]): PendingShrinkIsr = {
+    // When shrinking the ISR, we cannot assume that the update will succeed as this could
+    // erroneously advance the HW if the `AlterIsr` were to fail. Hence the "maximal ISR"
+    // for `PendingShrinkIsr` is the the current ISR.
+    val isrToSend = isrState.isr -- outOfSyncReplicaIds
+    val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, isrToSend.toList, zkVersion)
+    val updatedState = PendingShrinkIsr(isrState.isr, outOfSyncReplicaIds, newLeaderAndIsr)
+    isrState = updatedState
+    updatedState
   }
 
-  private def sendAlterIsrRequest(proposedIsrState: IsrState): Unit = {
-    val isrToSend: Set[Int] = proposedIsrState match {
-      case PendingExpandIsr(isr, newInSyncReplicaId) => isr + newInSyncReplicaId
-      case PendingShrinkIsr(isr, outOfSyncReplicaIds) => isr -- outOfSyncReplicaIds
-      case state =>
-        isrChangeListener.markFailed()
-        throw new IllegalStateException(s"Invalid state $state for ISR change for partition $topicPartition")
-    }
+  private def alterIsr(proposedIsrState: PendingIsrChange): CompletableFuture[LeaderAndIsr] = {

Review comment:
       Would it make sense to call the method `commitIsr`? That would emphasize the relationship with the two `prepare*` methods above. What do you think?

##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -1360,41 +1367,46 @@ class Partition(val topicPartition: TopicPartition,
    * Since our error was non-retryable we are okay staying in this state until we see new metadata from UpdateMetadata
    * or LeaderAndIsr
    */
-  private def handleAlterIsrResponse(proposedIsrState: IsrState)(result: Either[Errors, LeaderAndIsr]): Unit = {
-    val hwIncremented = inWriteLock(leaderIsrUpdateLock) {
+  private def handleAlterIsrResponse(proposedIsrState: PendingIsrChange)(result: Either[Errors, LeaderAndIsr]): Unit = {
+    var hwIncremented = false
+    var shouldRetry = false
+
+    inWriteLock(leaderIsrUpdateLock) {
       if (isrState != proposedIsrState) {
         // This means isrState was updated through leader election or some other mechanism before we got the AlterIsr
         // response. We don't know what happened on the controller exactly, but we do know this response is out of date
         // so we ignore it.
         debug(s"Ignoring failed ISR update to $proposedIsrState since we have already updated state to $isrState")
-        false
       } else {
         result match {
           case Left(error: Errors) =>
             isrChangeListener.markFailed()
             error match {
+              case Errors.OPERATION_NOT_ATTEMPTED =>
+                // Since the operation was not attempted, it is safe to reset back to the committed state.
+                isrState = CommittedIsr(proposedIsrState.isr)

Review comment:
       My understanding is that it is safe to revert back to `CommittedIsr` because the two `prepare*` methods will actually never update `isrState` if it is not in `CommittedIsr`. Am I getting this right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ccding commented on pull request #11289: KAFKA-13254: Fix deadlock when AlterIsr response returns

Posted by GitBox <gi...@apache.org>.
ccding commented on pull request #11289:
URL: https://github.com/apache/kafka/pull/11289#issuecomment-923206682


   It appears the trunk branch fails to build after merging this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org