You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2022/07/26 23:09:13 UTC
[kafka] branch trunk updated: KAFKA-13166 Fix missing ControllerApis error handling (#12403)
This is an automated email from the ASF dual-hosted git repository.
davidarthur pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 14d2269471 KAFKA-13166 Fix missing ControllerApis error handling (#12403)
14d2269471 is described below
commit 14d2269471141067dc3c45300187f20a0a051777
Author: David Arthur <mu...@gmail.com>
AuthorDate: Tue Jul 26 19:08:59 2022 -0400
KAFKA-13166 Fix missing ControllerApis error handling (#12403)
Makes all ControllerApis request handlers return a `CompletableFuture[Unit]`. Also adds an additional completion stage which ensures we capture errors thrown during response building.
Reviewed-by: Colin P. McCabe <cm...@apache.org>
---
core/src/main/scala/kafka/server/AclApis.scala | 32 ++++--
.../main/scala/kafka/server/ControllerApis.scala | 121 ++++++++++++---------
.../unit/kafka/server/ControllerApisTest.scala | 30 +++++
3 files changed, 124 insertions(+), 59 deletions(-)
diff --git a/core/src/main/scala/kafka/server/AclApis.scala b/core/src/main/scala/kafka/server/AclApis.scala
index 97b685bc0a..485cafeca2 100644
--- a/core/src/main/scala/kafka/server/AclApis.scala
+++ b/core/src/main/scala/kafka/server/AclApis.scala
@@ -24,14 +24,16 @@ import org.apache.kafka.common.acl.AclOperation._
import org.apache.kafka.common.acl.AclBinding
import org.apache.kafka.common.errors._
import org.apache.kafka.common.message.CreateAclsResponseData.AclCreationResult
+import org.apache.kafka.common.message.DeleteAclsResponseData.DeleteAclsFilterResult
import org.apache.kafka.common.message._
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests._
import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
import org.apache.kafka.common.resource.ResourceType
import org.apache.kafka.server.authorizer._
-import java.util
+import java.util
+import java.util.concurrent.CompletableFuture
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable
import scala.compat.java8.OptionConverters._
@@ -53,7 +55,7 @@ class AclApis(authHelper: AuthHelper,
def close(): Unit = alterAclsPurgatory.shutdown()
- def handleDescribeAcls(request: RequestChannel.Request): Unit = {
+ def handleDescribeAcls(request: RequestChannel.Request): CompletableFuture[Unit] = {
authHelper.authorizeClusterOperation(request, DESCRIBE)
val describeAclsRequest = request.body[DescribeAclsRequest]
authorizer match {
@@ -74,9 +76,10 @@ class AclApis(authHelper: AuthHelper,
.setResources(DescribeAclsResponse.aclsResources(returnedAcls)),
describeAclsRequest.version))
}
+ CompletableFuture.completedFuture[Unit](())
}
- def handleCreateAcls(request: RequestChannel.Request): Unit = {
+ def handleCreateAcls(request: RequestChannel.Request): CompletableFuture[Unit] = {
authHelper.authorizeClusterOperation(request, ALTER)
val createAclsRequest = request.body[CreateAclsRequest]
@@ -84,6 +87,7 @@ class AclApis(authHelper: AuthHelper,
case None => requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
createAclsRequest.getErrorResponse(requestThrottleMs,
new SecurityDisabledException("No Authorizer is configured.")))
+ CompletableFuture.completedFuture[Unit](())
case Some(auth) =>
val allBindings = createAclsRequest.aclCreations.asScala.map(CreateAclsRequest.aclBinding)
val errorResults = mutable.Map[AclBinding, AclCreateResult]()
@@ -103,6 +107,7 @@ class AclApis(authHelper: AuthHelper,
validBindings += acl
}
+ val future = new CompletableFuture[util.List[AclCreationResult]]()
val createResults = auth.createAcls(request.context, validBindings.asJava).asScala.map(_.toCompletableFuture)
def sendResponseCallback(): Unit = {
@@ -117,17 +122,20 @@ class AclApis(authHelper: AuthHelper,
}
creationResult
}
+ future.complete(aclCreationResults.asJava)
+ }
+ alterAclsPurgatory.tryCompleteElseWatch(config.connectionsMaxIdleMs, createResults, sendResponseCallback)
+
+ future.thenApply[Unit] { aclCreationResults =>
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new CreateAclsResponse(new CreateAclsResponseData()
.setThrottleTimeMs(requestThrottleMs)
- .setResults(aclCreationResults.asJava)))
+ .setResults(aclCreationResults)))
}
-
- alterAclsPurgatory.tryCompleteElseWatch(config.connectionsMaxIdleMs, createResults, sendResponseCallback)
}
}
- def handleDeleteAcls(request: RequestChannel.Request): Unit = {
+ def handleDeleteAcls(request: RequestChannel.Request): CompletableFuture[Unit] = {
authHelper.authorizeClusterOperation(request, ALTER)
val deleteAclsRequest = request.body[DeleteAclsRequest]
authorizer match {
@@ -135,13 +143,20 @@ class AclApis(authHelper: AuthHelper,
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
deleteAclsRequest.getErrorResponse(requestThrottleMs,
new SecurityDisabledException("No Authorizer is configured.")))
+ CompletableFuture.completedFuture[Unit](())
case Some(auth) =>
+ val future = new CompletableFuture[util.List[DeleteAclsFilterResult]]()
val deleteResults = auth.deleteAcls(request.context, deleteAclsRequest.filters)
.asScala.map(_.toCompletableFuture).toList
def sendResponseCallback(): Unit = {
val filterResults = deleteResults.map(_.get).map(DeleteAclsResponse.filterResult).asJava
+ future.complete(filterResults)
+ }
+
+ alterAclsPurgatory.tryCompleteElseWatch(config.connectionsMaxIdleMs, deleteResults, sendResponseCallback)
+ future.thenApply[Unit] { filterResults =>
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new DeleteAclsResponse(
new DeleteAclsResponseData()
@@ -149,7 +164,6 @@ class AclApis(authHelper: AuthHelper,
.setFilterResults(filterResults),
deleteAclsRequest.version))
}
- alterAclsPurgatory.tryCompleteElseWatch(config.connectionsMaxIdleMs, deleteResults, sendResponseCallback)
}
}
-}
+ }
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala
index 74bc4dd406..efb6a36c3d 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -20,7 +20,7 @@ package kafka.server
import java.util
import java.util.{Collections, OptionalLong}
import java.util.Map.Entry
-import java.util.concurrent.{CompletableFuture, ExecutionException}
+import java.util.concurrent.{CompletableFuture, CompletionException}
import kafka.network.RequestChannel
import kafka.raft.RaftManager
import kafka.server.QuotaFactory.QuotaManagers
@@ -78,7 +78,7 @@ class ControllerApis(val requestChannel: RequestChannel,
override def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
try {
- request.header.apiKey match {
+ val handlerFuture: CompletableFuture[Unit] = request.header.apiKey match {
case ApiKeys.FETCH => handleFetch(request)
case ApiKeys.FETCH_SNAPSHOT => handleFetchSnapshot(request)
case ApiKeys.CREATE_TOPICS => handleCreateTopics(request)
@@ -109,10 +109,24 @@ class ControllerApis(val requestChannel: RequestChannel,
case ApiKeys.UPDATE_FEATURES => handleUpdateFeatures(request)
case _ => throw new ApiException(s"Unsupported ApiKey ${request.context.header.apiKey}")
}
+
+ // This catches exceptions in the future and subsequent completion stages returned by the request handlers.
+ handlerFuture.whenComplete { (_, exception) =>
+ if (exception != null) {
+ // CompletionException does not include the stack frames in its "cause" exception, so we need to
+ // log the original exception here
+ error(s"Unexpected error handling request ${request.requestDesc(true)} " +
+ s"with context ${request.context}", exception)
+
+ // For building the correct error request, we do need send the "cause" exception
+ val actualException = if (exception.isInstanceOf[CompletionException]) exception.getCause else exception
+ requestHelper.handleError(request, actualException)
+ }
+ }
} catch {
case e: FatalExitError => throw e
- case e: Throwable => {
- val t = if (e.isInstanceOf[ExecutionException]) e.getCause else e
+ case t: Throwable => {
+ // This catches exceptions in the blocking parts of the request handlers
error(s"Unexpected error handling request ${request.requestDesc(true)} " +
s"with context ${request.context}", t)
requestHelper.handleError(request, t)
@@ -125,38 +139,41 @@ class ControllerApis(val requestChannel: RequestChannel,
}
}
- def handleEnvelopeRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
+ def handleEnvelopeRequest(request: RequestChannel.Request, requestLocal: RequestLocal): CompletableFuture[Unit] = {
if (!authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) {
requestHelper.sendErrorResponseMaybeThrottle(request, new ClusterAuthorizationException(
s"Principal ${request.context.principal} does not have required CLUSTER_ACTION for envelope"))
} else {
EnvelopeUtils.handleEnvelopeRequest(request, requestChannel.metrics, handle(_, requestLocal))
}
+ CompletableFuture.completedFuture[Unit](())
}
- def handleSaslHandshakeRequest(request: RequestChannel.Request): Unit = {
+ def handleSaslHandshakeRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
val responseData = new SaslHandshakeResponseData().setErrorCode(ILLEGAL_SASL_STATE.code)
requestHelper.sendResponseMaybeThrottle(request, _ => new SaslHandshakeResponse(responseData))
+ CompletableFuture.completedFuture[Unit](())
}
- def handleSaslAuthenticateRequest(request: RequestChannel.Request): Unit = {
+ def handleSaslAuthenticateRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
val responseData = new SaslAuthenticateResponseData()
.setErrorCode(ILLEGAL_SASL_STATE.code)
.setErrorMessage("SaslAuthenticate request received after successful authentication")
requestHelper.sendResponseMaybeThrottle(request, _ => new SaslAuthenticateResponse(responseData))
+ CompletableFuture.completedFuture[Unit](())
}
- def handleFetch(request: RequestChannel.Request): Unit = {
+ def handleFetch(request: RequestChannel.Request): CompletableFuture[Unit] = {
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
handleRaftRequest(request, response => new FetchResponse(response.asInstanceOf[FetchResponseData]))
}
- def handleFetchSnapshot(request: RequestChannel.Request): Unit = {
+ def handleFetchSnapshot(request: RequestChannel.Request): CompletableFuture[Unit] = {
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
handleRaftRequest(request, response => new FetchSnapshotResponse(response.asInstanceOf[FetchSnapshotResponseData]))
}
- def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+ def handleDeleteTopics(request: RequestChannel.Request): CompletableFuture[Unit] = {
val deleteTopicsRequest = request.body[DeleteTopicsRequest]
val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
requestTimeoutMsToDeadlineNs(time, deleteTopicsRequest.data.timeoutMs))
@@ -166,7 +183,7 @@ class ControllerApis(val requestChannel: RequestChannel,
authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME, logIfDenied = false),
names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n),
names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n))
- future.whenComplete { (results, exception) =>
+ future.handle[Unit] { (results, exception) =>
requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
if (exception != null) {
deleteTopicsRequest.getErrorResponse(throttleTimeMs, exception)
@@ -320,7 +337,7 @@ class ControllerApis(val requestChannel: RequestChannel,
}
}
- def handleCreateTopics(request: RequestChannel.Request): Unit = {
+ def handleCreateTopics(request: RequestChannel.Request): CompletableFuture[Unit] = {
val createTopicsRequest = request.body[CreateTopicsRequest]
val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
requestTimeoutMsToDeadlineNs(time, createTopicsRequest.data.timeoutMs))
@@ -330,7 +347,7 @@ class ControllerApis(val requestChannel: RequestChannel,
names => authHelper.filterByAuthorized(request.context, CREATE, TOPIC, names)(identity),
names => authHelper.filterByAuthorized(request.context, DESCRIBE_CONFIGS, TOPIC,
names, logIfDenied = false)(identity))
- future.whenComplete { (result, exception) =>
+ future.handle[Unit] { (result, exception) =>
requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
if (exception != null) {
createTopicsRequest.getErrorResponse(throttleTimeMs, exception)
@@ -392,7 +409,7 @@ class ControllerApis(val requestChannel: RequestChannel,
}
}
- def handleApiVersionsRequest(request: RequestChannel.Request): Unit = {
+ def handleApiVersionsRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
// Note that broker returns its full list of supported ApiKeys and versions regardless of current
// authentication state (e.g., before SASL authentication on an SASL listener, do note that no
// Kafka protocol requests may take place on an SSL listener before the SSL handshake is finished).
@@ -410,6 +427,7 @@ class ControllerApis(val requestChannel: RequestChannel,
}
}
requestHelper.sendResponseMaybeThrottle(request, createResponseCallback)
+ CompletableFuture.completedFuture[Unit](())
}
def authorizeAlterResource(requestContext: RequestContext,
@@ -431,7 +449,7 @@ class ControllerApis(val requestChannel: RequestChannel,
}
}
- def handleLegacyAlterConfigs(request: RequestChannel.Request): Unit = {
+ def handleLegacyAlterConfigs(request: RequestChannel.Request): CompletableFuture[Unit] = {
val response = new AlterConfigsResponseData()
val alterConfigsRequest = request.body[AlterConfigsRequest]
val context = new ControllerRequestContext(request.context.header.data, request.context.principal, OptionalLong.empty())
@@ -474,7 +492,7 @@ class ControllerApis(val requestChannel: RequestChannel,
}
}
controller.legacyAlterConfigs(context, configChanges, alterConfigsRequest.data.validateOnly)
- .whenComplete { (controllerResults, exception) =>
+ .handle[Unit] { (controllerResults, exception) =>
if (exception != null) {
requestHelper.handleError(request, exception)
} else {
@@ -490,33 +508,33 @@ class ControllerApis(val requestChannel: RequestChannel,
}
}
- def handleVote(request: RequestChannel.Request): Unit = {
+ def handleVote(request: RequestChannel.Request): CompletableFuture[Unit] = {
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
handleRaftRequest(request, response => new VoteResponse(response.asInstanceOf[VoteResponseData]))
}
- def handleBeginQuorumEpoch(request: RequestChannel.Request): Unit = {
+ def handleBeginQuorumEpoch(request: RequestChannel.Request): CompletableFuture[Unit] = {
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
handleRaftRequest(request, response => new BeginQuorumEpochResponse(response.asInstanceOf[BeginQuorumEpochResponseData]))
}
- def handleEndQuorumEpoch(request: RequestChannel.Request): Unit = {
+ def handleEndQuorumEpoch(request: RequestChannel.Request): CompletableFuture[Unit] = {
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
handleRaftRequest(request, response => new EndQuorumEpochResponse(response.asInstanceOf[EndQuorumEpochResponseData]))
}
- def handleDescribeQuorum(request: RequestChannel.Request): Unit = {
+ def handleDescribeQuorum(request: RequestChannel.Request): CompletableFuture[Unit] = {
authHelper.authorizeClusterOperation(request, DESCRIBE)
handleRaftRequest(request, response => new DescribeQuorumResponse(response.asInstanceOf[DescribeQuorumResponseData]))
}
- def handleElectLeaders(request: RequestChannel.Request): Unit = {
+ def handleElectLeaders(request: RequestChannel.Request): CompletableFuture[Unit] = {
authHelper.authorizeClusterOperation(request, ALTER)
val electLeadersRequest = request.body[ElectLeadersRequest]
val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
requestTimeoutMsToDeadlineNs(time, electLeadersRequest.data.timeoutMs))
val future = controller.electLeaders(context, electLeadersRequest.data)
- future.whenComplete { (responseData, exception) =>
+ future.handle[Unit] { (responseData, exception) =>
if (exception != null) {
requestHelper.sendResponseMaybeThrottle(request, throttleMs => {
electLeadersRequest.getErrorResponse(throttleMs, exception)
@@ -529,13 +547,13 @@ class ControllerApis(val requestChannel: RequestChannel,
}
}
- def handleAlterPartitionRequest(request: RequestChannel.Request): Unit = {
+ def handleAlterPartitionRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
val alterPartitionRequest = request.body[AlterPartitionRequest]
val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
OptionalLong.empty())
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
val future = controller.alterPartition(context, alterPartitionRequest.data)
- future.whenComplete { (result, exception) =>
+ future.handle[Unit] { (result, exception) =>
val response = if (exception != null) {
alterPartitionRequest.getErrorResponse(exception)
} else {
@@ -545,7 +563,7 @@ class ControllerApis(val requestChannel: RequestChannel,
}
}
- def handleBrokerHeartBeatRequest(request: RequestChannel.Request): Unit = {
+ def handleBrokerHeartBeatRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
val heartbeatRequest = request.body[BrokerHeartbeatRequest]
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
@@ -572,7 +590,7 @@ class ControllerApis(val requestChannel: RequestChannel,
}
}
- def handleUnregisterBroker(request: RequestChannel.Request): Unit = {
+ def handleUnregisterBroker(request: RequestChannel.Request): CompletableFuture[Unit] = {
val decommissionRequest = request.body[UnregisterBrokerRequest]
authHelper.authorizeClusterOperation(request, ALTER)
val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
@@ -595,7 +613,7 @@ class ControllerApis(val requestChannel: RequestChannel,
}
}
- def handleBrokerRegistration(request: RequestChannel.Request): Unit = {
+ def handleBrokerRegistration(request: RequestChannel.Request): CompletableFuture[Unit] = {
val registrationRequest = request.body[BrokerRegistrationRequest]
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
@@ -622,11 +640,10 @@ class ControllerApis(val requestChannel: RequestChannel,
}
private def handleRaftRequest(request: RequestChannel.Request,
- buildResponse: ApiMessage => AbstractResponse): Unit = {
+ buildResponse: ApiMessage => AbstractResponse): CompletableFuture[Unit] = {
val requestBody = request.body[AbstractRequest]
val future = raftManager.handleRequest(request.header, requestBody.data, time.milliseconds())
-
- future.whenComplete { (responseData, exception) =>
+ future.handle[Unit] { (responseData, exception) =>
val response = if (exception != null) {
requestBody.getErrorResponse(exception)
} else {
@@ -636,13 +653,13 @@ class ControllerApis(val requestChannel: RequestChannel,
}
}
- def handleAlterClientQuotas(request: RequestChannel.Request): Unit = {
+ def handleAlterClientQuotas(request: RequestChannel.Request): CompletableFuture[Unit] = {
val quotaRequest = request.body[AlterClientQuotasRequest]
authHelper.authorizeClusterOperation(request, ALTER_CONFIGS)
val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
OptionalLong.empty())
controller.alterClientQuotas(context, quotaRequest.entries, quotaRequest.validateOnly)
- .whenComplete { (results, exception) =>
+ .handle[Unit] { (results, exception) =>
if (exception != null) {
requestHelper.handleError(request, exception)
} else {
@@ -652,7 +669,7 @@ class ControllerApis(val requestChannel: RequestChannel,
}
}
- def handleIncrementalAlterConfigs(request: RequestChannel.Request): Unit = {
+ def handleIncrementalAlterConfigs(request: RequestChannel.Request): CompletableFuture[Unit] = {
val response = new IncrementalAlterConfigsResponseData()
val alterConfigsRequest = request.body[IncrementalAlterConfigsRequest]
val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
@@ -700,7 +717,7 @@ class ControllerApis(val requestChannel: RequestChannel,
}
}
controller.incrementalAlterConfigs(context, configChanges, alterConfigsRequest.data.validateOnly)
- .whenComplete { (controllerResults, exception) =>
+ .handle[Unit] { (controllerResults, exception) =>
if (exception != null) {
requestHelper.handleError(request, exception)
} else {
@@ -716,7 +733,7 @@ class ControllerApis(val requestChannel: RequestChannel,
}
}
- def handleCreatePartitions(request: RequestChannel.Request): Unit = {
+ def handleCreatePartitions(request: RequestChannel.Request): CompletableFuture[Unit] = {
def filterAlterAuthorizedTopics(topics: Iterable[String]): Set[String] = {
authHelper.filterByAuthorized(request.context, ALTER, TOPIC, topics)(n => n)
}
@@ -726,7 +743,7 @@ class ControllerApis(val requestChannel: RequestChannel,
val future = createPartitions(context,
createPartitionsRequest.data(),
filterAlterAuthorizedTopics)
- future.whenComplete { (responses, exception) =>
+ future.handle[Unit] { (responses, exception) =>
if (exception != null) {
requestHelper.handleError(request, exception)
} else {
@@ -778,33 +795,37 @@ class ControllerApis(val requestChannel: RequestChannel,
}
}
- def handleAlterPartitionReassignments(request: RequestChannel.Request): Unit = {
+ def handleAlterPartitionReassignments(request: RequestChannel.Request): CompletableFuture[Unit] = {
val alterRequest = request.body[AlterPartitionReassignmentsRequest]
authHelper.authorizeClusterOperation(request, ALTER)
val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
requestTimeoutMsToDeadlineNs(time, alterRequest.data.timeoutMs))
- val response = controller.alterPartitionReassignments(context, alterRequest.data).get()
- requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
- new AlterPartitionReassignmentsResponse(response.setThrottleTimeMs(requestThrottleMs)))
+ controller.alterPartitionReassignments(context, alterRequest.data)
+ .thenApply[Unit] { response =>
+ requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
+ new AlterPartitionReassignmentsResponse(response.setThrottleTimeMs(requestThrottleMs)))
+ }
}
- def handleListPartitionReassignments(request: RequestChannel.Request): Unit = {
+ def handleListPartitionReassignments(request: RequestChannel.Request): CompletableFuture[Unit] = {
val listRequest = request.body[ListPartitionReassignmentsRequest]
authHelper.authorizeClusterOperation(request, DESCRIBE)
val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
OptionalLong.empty())
- val response = controller.listPartitionReassignments(context, listRequest.data).get()
- requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
- new ListPartitionReassignmentsResponse(response.setThrottleTimeMs(requestThrottleMs)))
+ controller.listPartitionReassignments(context, listRequest.data)
+ .thenApply[Unit] { response =>
+ requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
+ new ListPartitionReassignmentsResponse(response.setThrottleTimeMs(requestThrottleMs)))
+ }
}
- def handleAllocateProducerIdsRequest(request: RequestChannel.Request): Unit = {
+ def handleAllocateProducerIdsRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
val allocatedProducerIdsRequest = request.body[AllocateProducerIdsRequest]
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
OptionalLong.empty())
controller.allocateProducerIds(context, allocatedProducerIdsRequest.data)
- .whenComplete((results, exception) => {
+ .handle[Unit] { (results, exception) =>
if (exception != null) {
requestHelper.handleError(request, exception)
} else {
@@ -813,22 +834,22 @@ class ControllerApis(val requestChannel: RequestChannel,
new AllocateProducerIdsResponse(results)
})
}
- })
+ }
}
- def handleUpdateFeatures(request: RequestChannel.Request): Unit = {
+ def handleUpdateFeatures(request: RequestChannel.Request): CompletableFuture[Unit] = {
val updateFeaturesRequest = request.body[UpdateFeaturesRequest]
authHelper.authorizeClusterOperation(request, ALTER)
val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
OptionalLong.empty())
controller.updateFeatures(context, updateFeaturesRequest.data)
- .whenComplete((response, exception) => {
+ .handle[Unit] { (response, exception) =>
if (exception != null) {
requestHelper.handleError(request, exception)
} else {
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new UpdateFeaturesResponse(response.setThrottleTimeMs(requestThrottleMs)))
}
- })
+ }
}
}
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index 86a0c85470..0fc9611452 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -63,6 +63,7 @@ import java.net.InetAddress
import java.util
import java.util.Collections.singletonList
import java.util.concurrent.{CompletableFuture, ExecutionException, TimeUnit}
+import java.util.concurrent.atomic.AtomicReference
import java.util.{Collections, Properties}
import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
@@ -902,6 +903,35 @@ class ControllerApisTest {
}
}
+ @Test
+ def testCompletableFutureExceptions(): Unit = {
+ // This test simulates an error in a completable future as we return from the controller. We need to ensure
+ // that any exception throw in the completion phase is properly captured and translated to an error response.
+ val request = buildRequest(new FetchRequest(new FetchRequestData(), 12))
+ val response = new FetchResponseData()
+ val responseFuture = new CompletableFuture[ApiMessage]()
+ val errorResponseFuture = new AtomicReference[AbstractResponse]()
+ when(raftManager.handleRequest(any(), any(), any())).thenReturn(responseFuture)
+ when(requestChannel.sendResponse(any(), any(), any())).thenAnswer { _ =>
+ // Simulate an encoding failure in the initial fetch response
+ throw new UnsupportedVersionException("Something went wrong")
+ }.thenAnswer { invocation =>
+ val resp = invocation.getArgument(1, classOf[AbstractResponse])
+ errorResponseFuture.set(resp)
+ }
+
+ // Calling handle does not block since we do not call get() in ControllerApis
+ createControllerApis(None,
+ new MockController.Builder().build()).handle(request, null)
+
+ // When we complete this future, the completion stages will fire (including the error handler in ControllerApis#request)
+ responseFuture.complete(response)
+
+ // Now we should get an error response with UNSUPPORTED_VERSION
+ val errorResponse = errorResponseFuture.get()
+ assertEquals(1, errorResponse.errorCounts().getOrDefault(Errors.UNSUPPORTED_VERSION, 0))
+ }
+
@AfterEach
def tearDown(): Unit = {
quotas.shutdown()