You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2022/07/01 02:52:07 UTC

[kafka] branch 3.2 updated: KAFKA-14010: AlterPartition request won't retry when receiving retriable error (#12362)

This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.2 by this push:
     new f1d4e6c726 KAFKA-14010: AlterPartition request won't retry when receiving retriable error (#12362)
f1d4e6c726 is described below

commit f1d4e6c726d019189be89325e0836f9a78add5c5
Author: Luke Chen <sh...@gmail.com>
AuthorDate: Fri Jul 1 10:51:57 2022 +0800

    KAFKA-14010: AlterPartition request won't retry when receiving retriable error (#12362)
    
    Reviewers: David Jacot <dj...@confluent.io>
---
 core/src/main/scala/kafka/cluster/Partition.scala  |   2 +-
 .../main/scala/kafka/server/AlterIsrManager.scala  |  14 ++-
 .../scala/unit/kafka/cluster/PartitionTest.scala   | 117 ++++++++++++++++++++-
 .../unit/kafka/server/AlterIsrManagerTest.scala    |  74 ++++++++++++-
 4 files changed, 190 insertions(+), 17 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 429e73b4d8..fc56769922 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -873,7 +873,7 @@ class Partition(val topicPartition: TopicPartition,
    *
    * With the addition of AlterPartition, we also consider newly added replicas as part of the ISR when advancing
    * the HW. These replicas have not yet been committed to the ISR by the controller, so we could revert to the previously
-   * committed ISR. However, adding additional replicas to the ISR makes it more restrictive and therefor safe. We call
+   * committed ISR. However, adding additional replicas to the ISR makes it more restrictive and therefore safe. We call
    * this set the "maximal" ISR. See KIP-497 for more details
    *
    * Note There is no need to acquire the leaderIsrUpdate lock here since all callers of this private API acquire that lock
diff --git a/core/src/main/scala/kafka/server/AlterIsrManager.scala b/core/src/main/scala/kafka/server/AlterIsrManager.scala
index 99ab7378f7..e878dc643e 100644
--- a/core/src/main/scala/kafka/server/AlterIsrManager.scala
+++ b/core/src/main/scala/kafka/server/AlterIsrManager.scala
@@ -294,14 +294,12 @@ class DefaultAlterIsrManager(
         inflightAlterIsrItems.foreach { inflightAlterIsr =>
           partitionResponses.get(inflightAlterIsr.topicPartition) match {
             case Some(leaderAndIsrOrError) =>
-              try {
-                leaderAndIsrOrError match {
-                  case Left(error) => inflightAlterIsr.future.completeExceptionally(error.exception)
-                  case Right(leaderAndIsr) => inflightAlterIsr.future.complete(leaderAndIsr)
-                }
-              } finally {
-                // Regardless of callback outcome, we need to clear from the unsent updates map to unblock further updates
-                unsentIsrUpdates.remove(inflightAlterIsr.topicPartition)
+              // Regardless of callback outcome, we need to clear from the unsent updates map to unblock further
+              // updates. We clear it now to allow the callback to submit a new update if needed.
+              unsentIsrUpdates.remove(inflightAlterIsr.topicPartition)
+              leaderAndIsrOrError match {
+                case Left(error) => inflightAlterIsr.future.completeExceptionally(error.exception)
+                case Right(leaderAndIsr) => inflightAlterIsr.future.complete(leaderAndIsr)
               }
             case None =>
               // Don't remove this partition from the update map so it will get re-sent
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index a5e7918006..2f0bd494f1 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -17,7 +17,7 @@
 package kafka.cluster
 
 import com.yammer.metrics.core.Metric
-import kafka.api.{ApiVersion, KAFKA_2_6_IV0}
+import kafka.api.{ApiVersion, KAFKA_2_6_IV0, KAFKA_3_2_IV0}
 import kafka.common.UnexpectedAppendOffsetException
 import kafka.log.{Defaults => _, _}
 import kafka.metrics.KafkaYammerMetrics
@@ -26,12 +26,12 @@ import kafka.server.checkpoints.OffsetCheckpoints
 import kafka.utils._
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.errors.{ApiException, InconsistentTopicIdException, NotLeaderOrFollowerException, OffsetNotAvailableException, OffsetOutOfRangeException}
-import org.apache.kafka.common.message.FetchResponseData
+import org.apache.kafka.common.message.{AlterPartitionResponseData, FetchResponseData}
 import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
-import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
 import org.apache.kafka.common.record._
-import org.apache.kafka.common.requests.ListOffsetsRequest
+import org.apache.kafka.common.requests.{AlterPartitionResponse, ListOffsetsRequest, RequestHeader}
 import org.apache.kafka.common.utils.SystemTime
 import org.apache.kafka.common.{IsolationLevel, TopicPartition, Uuid}
 import org.apache.kafka.metadata.LeaderRecoveryState
@@ -46,6 +46,7 @@ import java.nio.ByteBuffer
 import java.util.Optional
 import java.util.concurrent.{CountDownLatch, Semaphore}
 import kafka.server.epoch.LeaderEpochFileCache
+import org.apache.kafka.clients.ClientResponse
 
 import scala.jdk.CollectionConverters._
 
@@ -1641,6 +1642,114 @@ class PartitionTest extends AbstractPartitionTest {
     callback(brokerId, remoteBrokerId, partition)
   }
 
+  private def createClientResponseWithAlterPartitionResponse(
+    topicPartition: TopicPartition,
+    partitionErrorCode: Short,
+    isr: List[Int] = List.empty,
+    leaderEpoch: Int = 0,
+    partitionEpoch: Int = 0
+  ): ClientResponse = {
+    val alterPartitionResponseData = new AlterPartitionResponseData()
+    val topicResponse = new AlterPartitionResponseData.TopicData()
+      .setName(topicPartition.topic)
+
+    topicResponse.partitions.add(new AlterPartitionResponseData.PartitionData()
+      .setPartitionIndex(topicPartition.partition)
+      .setIsr(isr.map(Integer.valueOf).asJava)
+      .setLeaderEpoch(leaderEpoch)
+      .setPartitionEpoch(partitionEpoch)
+      .setErrorCode(partitionErrorCode))
+    alterPartitionResponseData.topics.add(topicResponse)
+
+    val alterPartitionResponse = new AlterPartitionResponse(alterPartitionResponseData)
+
+    new ClientResponse(new RequestHeader(ApiKeys.ALTER_PARTITION, 0, "client", 1),
+      null, null, 0, 0, false, null, null, alterPartitionResponse)
+  }
+
+  @Test
+  def testPartitionShouldRetryAlterPartitionRequest(): Unit = {
+    val mockChannelManager = mock(classOf[BrokerToControllerChannelManager])
+    val alterPartitionManager = new DefaultAlterIsrManager(
+      controllerChannelManager = mockChannelManager,
+      scheduler = mock(classOf[KafkaScheduler]),
+      time = time,
+      brokerId = brokerId,
+      brokerEpochSupplier = () => 0,
+      ibpVersion = KAFKA_3_2_IV0
+    )
+
+    partition = new Partition(topicPartition,
+      replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
+      interBrokerProtocolVersion = interBrokerProtocolVersion,
+      localBrokerId = brokerId,
+      time,
+      isrChangeListener,
+      delayedOperations,
+      metadataCache,
+      logManager,
+      alterPartitionManager)
+
+    val log = logManager.getOrCreateLog(topicPartition, topicId = None)
+    seedLogData(log, numRecords = 10, leaderEpoch = 4)
+
+    val controllerEpoch = 0
+    val leaderEpoch = 5
+    val follower1 = brokerId + 1
+    val follower2 = brokerId + 2
+    val follower3 = brokerId + 3
+    val replicas = Seq(brokerId, follower1, follower2, follower3)
+    val isr = Seq(brokerId, follower1, follower2)
+    val partitionEpoch = 1
+
+    doNothing().when(delayedOperations).checkAndCompleteAll()
+
+    // Fail the first alter partition request with a retryable error to trigger a retry from the partition callback
+    val alterPartitionResponseWithUnknownServerError =
+      createClientResponseWithAlterPartitionResponse(topicPartition, Errors.UNKNOWN_SERVER_ERROR.code)
+
+    // Complete the ISR expansion
+    val alterPartitionResponseWithoutError =
+      createClientResponseWithAlterPartitionResponse(topicPartition, Errors.NONE.code, List(brokerId, follower1, follower2, follower3), leaderEpoch, partitionEpoch + 1)
+
+    when(mockChannelManager.sendRequest(any(), any()))
+      .thenAnswer { invocation =>
+        val controllerRequestCompletionHandler = invocation.getArguments()(1).asInstanceOf[ControllerRequestCompletionHandler]
+        controllerRequestCompletionHandler.onComplete(alterPartitionResponseWithUnknownServerError)
+      }
+      .thenAnswer { invocation =>
+        val controllerRequestCompletionHandler = invocation.getArguments()(1).asInstanceOf[ControllerRequestCompletionHandler]
+        controllerRequestCompletionHandler.onComplete(alterPartitionResponseWithoutError)
+      }
+
+    assertTrue(makeLeader(
+      topicId = None,
+      controllerEpoch,
+      leaderEpoch,
+      isr,
+      replicas,
+      partitionEpoch,
+      isNew = true
+    ))
+    assertEquals(0L, partition.localLogOrException.highWatermark)
+
+    // Expand ISR
+    partition.updateFollowerFetchState(
+      followerId = follower3,
+      followerFetchOffsetMetadata = LogOffsetMetadata(10),
+      followerStartOffset = 0L,
+      followerFetchTimeMs = time.milliseconds(),
+      leaderEndOffset = 10
+    )
+
+    assertEquals(Set(brokerId, follower1, follower2, follower3), partition.partitionState.isr)
+    assertEquals(partitionEpoch + 1, partition.getZkVersion)
+    // Verify that the AlterPartition request was sent twice
+    verify(mockChannelManager, times(2)).sendRequest(any(), any())
+    // After the retry, the partition state should be committed
+    assertFalse(partition.partitionState.isInflight)
+  }
+
   @Test
   def testSingleInFlightAlterIsr(): Unit = {
     val log = logManager.getOrCreateLog(topicPartition, topicId = None)
diff --git a/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala b/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala
index 483a5347e4..b4c09d92d4 100644
--- a/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala
@@ -43,6 +43,8 @@ import org.junit.jupiter.params.provider.MethodSource
 import org.mockito.ArgumentMatchers.{any, anyString}
 import org.mockito.Mockito.{mock, reset, times, verify}
 import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito}
+
+import java.util.concurrent.{CompletableFuture, TimeUnit}
 import scala.jdk.CollectionConverters._
 
 class AlterIsrManagerTest {
@@ -113,7 +115,7 @@ class AlterIsrManagerTest {
     assertFutureThrows(failedSubmitFuture, classOf[OperationNotAttemptedException])
 
     // Simulate response
-    val alterPartitionResp = partitionResponse(tp0, Errors.NONE)
+    val alterPartitionResp = partitionResponse()
     val resp = new ClientResponse(null, null, "", 0L, 0L,
       false, null, null, alterPartitionResp)
     verify(brokerToController).sendRequest(capture.capture(), callbackCapture.capture())
@@ -170,6 +172,58 @@ class AlterIsrManagerTest {
     assertEquals(request.data().topics().get(0).partitions().size(), 10)
   }
 
+  @Test
+  def testSubmitFromCallback(): Unit = {
+    // prepare a partition level retriable error response
+    val alterPartitionRespWithPartitionError = partitionResponse(tp0, Errors.UNKNOWN_SERVER_ERROR)
+    val errorResponse = new ClientResponse(null, null, "", 0L, 0L,
+      false, null, null, alterPartitionRespWithPartitionError)
+
+    val leaderId = 1
+    val leaderEpoch = 1
+    val partitionEpoch = 10
+    val isr = List(1, 2, 3)
+    val leaderAndIsr = new LeaderAndIsr(leaderId, leaderEpoch, isr, LeaderRecoveryState.RECOVERED, partitionEpoch)
+    val callbackCapture : ArgumentCaptor[ControllerRequestCompletionHandler] = ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
+
+    val scheduler = new MockScheduler(time)
+    val alterPartitionManager = new DefaultAlterIsrManager(brokerToController, scheduler, time, brokerId, () => 2, KAFKA_3_2_IV0)
+    alterPartitionManager.start()
+    val future = alterPartitionManager.submit(tp0, leaderAndIsr, 0)
+    val finalFuture = new CompletableFuture[LeaderAndIsr]()
+    future.whenComplete { (_, e) =>
+      if (e != null) {
+        // Retry when error.
+        alterPartitionManager.submit(tp0, leaderAndIsr, 0).whenComplete { (result, e) =>
+          if (e != null) {
+            finalFuture.completeExceptionally(e)
+          } else {
+            finalFuture.complete(result)
+          }
+        }
+      } else {
+        finalFuture.completeExceptionally(new AssertionError("Expected the future to be failed"))
+      }
+    }
+
+    verify(brokerToController).start()
+    verify(brokerToController).sendRequest(any(), callbackCapture.capture())
+    reset(brokerToController)
+    callbackCapture.getValue.onComplete(errorResponse)
+
+    // Complete the retry request
+    val retryAlterPartitionResponse = partitionResponse(tp0, Errors.NONE, partitionEpoch, leaderId, leaderEpoch, isr)
+    val retryResponse = new ClientResponse(null, null, "", 0L, 0L,
+      false, null, null, retryAlterPartitionResponse)
+
+    verify(brokerToController).sendRequest(any(), callbackCapture.capture())
+    callbackCapture.getValue.onComplete(retryResponse)
+
+    assertEquals(leaderAndIsr, finalFuture.get(200, TimeUnit.MILLISECONDS))
+    // No more items in unsentIsrUpdates
+    assertFalse(alterPartitionManager.unsentIsrUpdates.containsKey(tp0))
+  }
+
   @Test
   def testAuthorizationFailed(): Unit = {
     testRetryOnTopLevelError(Errors.CLUSTER_AUTHORIZATION_FAILED)
@@ -227,7 +281,7 @@ class AlterIsrManagerTest {
     scheduler.tick()
 
     // After a successful response, we can submit another AlterIsrItem
-    val retryAlterPartitionResponse = partitionResponse(tp0, Errors.NONE)
+    val retryAlterPartitionResponse = partitionResponse()
     val retryResponse = new ClientResponse(null, null, "", 0L, 0L,
       false, null, null, retryAlterPartitionResponse)
 
@@ -409,14 +463,26 @@ class AlterIsrManagerTest {
     assertFutureThrows(future2, classOf[InvalidUpdateVersionException])
   }
 
-  private def partitionResponse(tp: TopicPartition, error: Errors): AlterPartitionResponse = {
+  private def partitionResponse(
+    tp: TopicPartition = tp0,
+    error: Errors = Errors.NONE,
+    partitionEpoch: Int = 0,
+    leaderId: Int = 0,
+    leaderEpoch: Int = 0,
+    isr: List[Int] = List.empty
+  ): AlterPartitionResponse = {
     new AlterPartitionResponse(new AlterPartitionResponseData()
       .setTopics(Collections.singletonList(
         new AlterPartitionResponseData.TopicData()
-          .setName(tp.topic())
+          .setName(tp.topic)
           .setPartitions(Collections.singletonList(
             new AlterPartitionResponseData.PartitionData()
               .setPartitionIndex(tp.partition())
+              .setPartitionIndex(tp.partition)
+              .setPartitionEpoch(partitionEpoch)
+              .setLeaderEpoch(leaderEpoch)
+              .setLeaderId(leaderId)
+              .setIsr(isr.map(Integer.valueOf).asJava)
               .setErrorCode(error.code))))))
   }
 }