You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2022/07/01 02:52:07 UTC
[kafka] branch 3.2 updated: KAFKA-14010: AlterPartition request won't retry when receiving retriable error (#12362)
This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.2 by this push:
new f1d4e6c726 KAFKA-14010: AlterPartition request won't retry when receiving retriable error (#12362)
f1d4e6c726 is described below
commit f1d4e6c726d019189be89325e0836f9a78add5c5
Author: Luke Chen <sh...@gmail.com>
AuthorDate: Fri Jul 1 10:51:57 2022 +0800
KAFKA-14010: AlterPartition request won't retry when receiving retriable error (#12362)
Reviewers: David Jacot <dj...@confluent.io>
---
core/src/main/scala/kafka/cluster/Partition.scala | 2 +-
.../main/scala/kafka/server/AlterIsrManager.scala | 14 ++-
.../scala/unit/kafka/cluster/PartitionTest.scala | 117 ++++++++++++++++++++-
.../unit/kafka/server/AlterIsrManagerTest.scala | 74 ++++++++++++-
4 files changed, 190 insertions(+), 17 deletions(-)
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 429e73b4d8..fc56769922 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -873,7 +873,7 @@ class Partition(val topicPartition: TopicPartition,
*
* With the addition of AlterPartition, we also consider newly added replicas as part of the ISR when advancing
* the HW. These replicas have not yet been committed to the ISR by the controller, so we could revert to the previously
- * committed ISR. However, adding additional replicas to the ISR makes it more restrictive and therefor safe. We call
+ * committed ISR. However, adding additional replicas to the ISR makes it more restrictive and therefore safe. We call
* this set the "maximal" ISR. See KIP-497 for more details
*
* Note There is no need to acquire the leaderIsrUpdate lock here since all callers of this private API acquire that lock
diff --git a/core/src/main/scala/kafka/server/AlterIsrManager.scala b/core/src/main/scala/kafka/server/AlterIsrManager.scala
index 99ab7378f7..e878dc643e 100644
--- a/core/src/main/scala/kafka/server/AlterIsrManager.scala
+++ b/core/src/main/scala/kafka/server/AlterIsrManager.scala
@@ -294,14 +294,12 @@ class DefaultAlterIsrManager(
inflightAlterIsrItems.foreach { inflightAlterIsr =>
partitionResponses.get(inflightAlterIsr.topicPartition) match {
case Some(leaderAndIsrOrError) =>
- try {
- leaderAndIsrOrError match {
- case Left(error) => inflightAlterIsr.future.completeExceptionally(error.exception)
- case Right(leaderAndIsr) => inflightAlterIsr.future.complete(leaderAndIsr)
- }
- } finally {
- // Regardless of callback outcome, we need to clear from the unsent updates map to unblock further updates
- unsentIsrUpdates.remove(inflightAlterIsr.topicPartition)
+ // Regardless of callback outcome, we need to clear from the unsent updates map to unblock further
+ // updates. We clear it now to allow the callback to submit a new update if needed.
+ unsentIsrUpdates.remove(inflightAlterIsr.topicPartition)
+ leaderAndIsrOrError match {
+ case Left(error) => inflightAlterIsr.future.completeExceptionally(error.exception)
+ case Right(leaderAndIsr) => inflightAlterIsr.future.complete(leaderAndIsr)
}
case None =>
// Don't remove this partition from the update map so it will get re-sent
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index a5e7918006..2f0bd494f1 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -17,7 +17,7 @@
package kafka.cluster
import com.yammer.metrics.core.Metric
-import kafka.api.{ApiVersion, KAFKA_2_6_IV0}
+import kafka.api.{ApiVersion, KAFKA_2_6_IV0, KAFKA_3_2_IV0}
import kafka.common.UnexpectedAppendOffsetException
import kafka.log.{Defaults => _, _}
import kafka.metrics.KafkaYammerMetrics
@@ -26,12 +26,12 @@ import kafka.server.checkpoints.OffsetCheckpoints
import kafka.utils._
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.errors.{ApiException, InconsistentTopicIdException, NotLeaderOrFollowerException, OffsetNotAvailableException, OffsetOutOfRangeException}
-import org.apache.kafka.common.message.FetchResponseData
+import org.apache.kafka.common.message.{AlterPartitionResponseData, FetchResponseData}
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
-import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
import org.apache.kafka.common.record._
-import org.apache.kafka.common.requests.ListOffsetsRequest
+import org.apache.kafka.common.requests.{AlterPartitionResponse, ListOffsetsRequest, RequestHeader}
import org.apache.kafka.common.utils.SystemTime
import org.apache.kafka.common.{IsolationLevel, TopicPartition, Uuid}
import org.apache.kafka.metadata.LeaderRecoveryState
@@ -46,6 +46,7 @@ import java.nio.ByteBuffer
import java.util.Optional
import java.util.concurrent.{CountDownLatch, Semaphore}
import kafka.server.epoch.LeaderEpochFileCache
+import org.apache.kafka.clients.ClientResponse
import scala.jdk.CollectionConverters._
@@ -1641,6 +1642,114 @@ class PartitionTest extends AbstractPartitionTest {
callback(brokerId, remoteBrokerId, partition)
}
+ private def createClientResponseWithAlterPartitionResponse(
+ topicPartition: TopicPartition,
+ partitionErrorCode: Short,
+ isr: List[Int] = List.empty,
+ leaderEpoch: Int = 0,
+ partitionEpoch: Int = 0
+ ): ClientResponse = {
+ val alterPartitionResponseData = new AlterPartitionResponseData()
+ val topicResponse = new AlterPartitionResponseData.TopicData()
+ .setName(topicPartition.topic)
+
+ topicResponse.partitions.add(new AlterPartitionResponseData.PartitionData()
+ .setPartitionIndex(topicPartition.partition)
+ .setIsr(isr.map(Integer.valueOf).asJava)
+ .setLeaderEpoch(leaderEpoch)
+ .setPartitionEpoch(partitionEpoch)
+ .setErrorCode(partitionErrorCode))
+ alterPartitionResponseData.topics.add(topicResponse)
+
+ val alterPartitionResponse = new AlterPartitionResponse(alterPartitionResponseData)
+
+ new ClientResponse(new RequestHeader(ApiKeys.ALTER_PARTITION, 0, "client", 1),
+ null, null, 0, 0, false, null, null, alterPartitionResponse)
+ }
+
+ @Test
+ def testPartitionShouldRetryAlterPartitionRequest(): Unit = {
+ val mockChannelManager = mock(classOf[BrokerToControllerChannelManager])
+ val alterPartitionManager = new DefaultAlterIsrManager(
+ controllerChannelManager = mockChannelManager,
+ scheduler = mock(classOf[KafkaScheduler]),
+ time = time,
+ brokerId = brokerId,
+ brokerEpochSupplier = () => 0,
+ ibpVersion = KAFKA_3_2_IV0
+ )
+
+ partition = new Partition(topicPartition,
+ replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
+ interBrokerProtocolVersion = interBrokerProtocolVersion,
+ localBrokerId = brokerId,
+ time,
+ isrChangeListener,
+ delayedOperations,
+ metadataCache,
+ logManager,
+ alterPartitionManager)
+
+ val log = logManager.getOrCreateLog(topicPartition, topicId = None)
+ seedLogData(log, numRecords = 10, leaderEpoch = 4)
+
+ val controllerEpoch = 0
+ val leaderEpoch = 5
+ val follower1 = brokerId + 1
+ val follower2 = brokerId + 2
+ val follower3 = brokerId + 3
+ val replicas = Seq(brokerId, follower1, follower2, follower3)
+ val isr = Seq(brokerId, follower1, follower2)
+ val partitionEpoch = 1
+
+ doNothing().when(delayedOperations).checkAndCompleteAll()
+
+ // Fail the first alter partition request with a retryable error to trigger a retry from the partition callback
+ val alterPartitionResponseWithUnknownServerError =
+ createClientResponseWithAlterPartitionResponse(topicPartition, Errors.UNKNOWN_SERVER_ERROR.code)
+
+ // Complete the ISR expansion
+ val alterPartitionResponseWithoutError =
+ createClientResponseWithAlterPartitionResponse(topicPartition, Errors.NONE.code, List(brokerId, follower1, follower2, follower3), leaderEpoch, partitionEpoch + 1)
+
+ when(mockChannelManager.sendRequest(any(), any()))
+ .thenAnswer { invocation =>
+ val controllerRequestCompletionHandler = invocation.getArguments()(1).asInstanceOf[ControllerRequestCompletionHandler]
+ controllerRequestCompletionHandler.onComplete(alterPartitionResponseWithUnknownServerError)
+ }
+ .thenAnswer { invocation =>
+ val controllerRequestCompletionHandler = invocation.getArguments()(1).asInstanceOf[ControllerRequestCompletionHandler]
+ controllerRequestCompletionHandler.onComplete(alterPartitionResponseWithoutError)
+ }
+
+ assertTrue(makeLeader(
+ topicId = None,
+ controllerEpoch,
+ leaderEpoch,
+ isr,
+ replicas,
+ partitionEpoch,
+ isNew = true
+ ))
+ assertEquals(0L, partition.localLogOrException.highWatermark)
+
+ // Expand ISR
+ partition.updateFollowerFetchState(
+ followerId = follower3,
+ followerFetchOffsetMetadata = LogOffsetMetadata(10),
+ followerStartOffset = 0L,
+ followerFetchTimeMs = time.milliseconds(),
+ leaderEndOffset = 10
+ )
+
+ assertEquals(Set(brokerId, follower1, follower2, follower3), partition.partitionState.isr)
+ assertEquals(partitionEpoch + 1, partition.getZkVersion)
+ // Verify that the AlterPartition request was sent twice
+ verify(mockChannelManager, times(2)).sendRequest(any(), any())
+ // After the retry, the partition state should be committed
+ assertFalse(partition.partitionState.isInflight)
+ }
+
@Test
def testSingleInFlightAlterIsr(): Unit = {
val log = logManager.getOrCreateLog(topicPartition, topicId = None)
diff --git a/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala b/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala
index 483a5347e4..b4c09d92d4 100644
--- a/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala
@@ -43,6 +43,8 @@ import org.junit.jupiter.params.provider.MethodSource
import org.mockito.ArgumentMatchers.{any, anyString}
import org.mockito.Mockito.{mock, reset, times, verify}
import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito}
+
+import java.util.concurrent.{CompletableFuture, TimeUnit}
import scala.jdk.CollectionConverters._
class AlterIsrManagerTest {
@@ -113,7 +115,7 @@ class AlterIsrManagerTest {
assertFutureThrows(failedSubmitFuture, classOf[OperationNotAttemptedException])
// Simulate response
- val alterPartitionResp = partitionResponse(tp0, Errors.NONE)
+ val alterPartitionResp = partitionResponse()
val resp = new ClientResponse(null, null, "", 0L, 0L,
false, null, null, alterPartitionResp)
verify(brokerToController).sendRequest(capture.capture(), callbackCapture.capture())
@@ -170,6 +172,58 @@ class AlterIsrManagerTest {
assertEquals(request.data().topics().get(0).partitions().size(), 10)
}
+ @Test
+ def testSubmitFromCallback(): Unit = {
+ // prepare a partition level retriable error response
+ val alterPartitionRespWithPartitionError = partitionResponse(tp0, Errors.UNKNOWN_SERVER_ERROR)
+ val errorResponse = new ClientResponse(null, null, "", 0L, 0L,
+ false, null, null, alterPartitionRespWithPartitionError)
+
+ val leaderId = 1
+ val leaderEpoch = 1
+ val partitionEpoch = 10
+ val isr = List(1, 2, 3)
+ val leaderAndIsr = new LeaderAndIsr(leaderId, leaderEpoch, isr, LeaderRecoveryState.RECOVERED, partitionEpoch)
+ val callbackCapture : ArgumentCaptor[ControllerRequestCompletionHandler] = ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
+
+ val scheduler = new MockScheduler(time)
+ val alterPartitionManager = new DefaultAlterIsrManager(brokerToController, scheduler, time, brokerId, () => 2, KAFKA_3_2_IV0)
+ alterPartitionManager.start()
+ val future = alterPartitionManager.submit(tp0, leaderAndIsr, 0)
+ val finalFuture = new CompletableFuture[LeaderAndIsr]()
+ future.whenComplete { (_, e) =>
+ if (e != null) {
+ // Retry when error.
+ alterPartitionManager.submit(tp0, leaderAndIsr, 0).whenComplete { (result, e) =>
+ if (e != null) {
+ finalFuture.completeExceptionally(e)
+ } else {
+ finalFuture.complete(result)
+ }
+ }
+ } else {
+ finalFuture.completeExceptionally(new AssertionError("Expected the future to be failed"))
+ }
+ }
+
+ verify(brokerToController).start()
+ verify(brokerToController).sendRequest(any(), callbackCapture.capture())
+ reset(brokerToController)
+ callbackCapture.getValue.onComplete(errorResponse)
+
+ // Complete the retry request
+ val retryAlterPartitionResponse = partitionResponse(tp0, Errors.NONE, partitionEpoch, leaderId, leaderEpoch, isr)
+ val retryResponse = new ClientResponse(null, null, "", 0L, 0L,
+ false, null, null, retryAlterPartitionResponse)
+
+ verify(brokerToController).sendRequest(any(), callbackCapture.capture())
+ callbackCapture.getValue.onComplete(retryResponse)
+
+ assertEquals(leaderAndIsr, finalFuture.get(200, TimeUnit.MILLISECONDS))
+ // No more items in unsentIsrUpdates
+ assertFalse(alterPartitionManager.unsentIsrUpdates.containsKey(tp0))
+ }
+
@Test
def testAuthorizationFailed(): Unit = {
testRetryOnTopLevelError(Errors.CLUSTER_AUTHORIZATION_FAILED)
@@ -227,7 +281,7 @@ class AlterIsrManagerTest {
scheduler.tick()
// After a successful response, we can submit another AlterIsrItem
- val retryAlterPartitionResponse = partitionResponse(tp0, Errors.NONE)
+ val retryAlterPartitionResponse = partitionResponse()
val retryResponse = new ClientResponse(null, null, "", 0L, 0L,
false, null, null, retryAlterPartitionResponse)
@@ -409,14 +463,26 @@ class AlterIsrManagerTest {
assertFutureThrows(future2, classOf[InvalidUpdateVersionException])
}
- private def partitionResponse(tp: TopicPartition, error: Errors): AlterPartitionResponse = {
+ private def partitionResponse(
+ tp: TopicPartition = tp0,
+ error: Errors = Errors.NONE,
+ partitionEpoch: Int = 0,
+ leaderId: Int = 0,
+ leaderEpoch: Int = 0,
+ isr: List[Int] = List.empty
+ ): AlterPartitionResponse = {
new AlterPartitionResponse(new AlterPartitionResponseData()
.setTopics(Collections.singletonList(
new AlterPartitionResponseData.TopicData()
- .setName(tp.topic())
+ .setName(tp.topic)
.setPartitions(Collections.singletonList(
new AlterPartitionResponseData.PartitionData()
.setPartitionIndex(tp.partition())
+ .setPartitionIndex(tp.partition)
+ .setPartitionEpoch(partitionEpoch)
+ .setLeaderEpoch(leaderEpoch)
+ .setLeaderId(leaderId)
+ .setIsr(isr.map(Integer.valueOf).asJava)
.setErrorCode(error.code))))))
}
}