You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/08/18 01:11:51 UTC
[kafka] branch trunk updated: KAFKA-14167; Completion exceptions should not be translated directly to error codes (#12518)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new bc90c29faf KAFKA-14167; Completion exceptions should not be translated directly to error codes (#12518)
bc90c29faf is described below
commit bc90c29fafc69747daeecada8bb0c347e138edc8
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Wed Aug 17 18:11:42 2022 -0700
KAFKA-14167; Completion exceptions should not be translated directly to error codes (#12518)
There are a few cases in `ControllerApis` where we may see an `ApiException` wrapped as a `CompletionException`. This can happen in `QuorumController.allocateProducerIds` where the returned future is the result of calling `thenApply` on the future passed to the controller. The danger when this happens is that the `CompletionException` gets passed to `Errors.forException`, which translates it to an `UNKNOWN_SERVER_ERROR`. At a minimum, I found that the `AllocateProducerIds` and `Update [...]
Interestingly, `DeleteTopics` is not affected by this bug as I originally suspected. This is because we have logic in `ApiError.fromThrowable` to check for both `CompletionException` and `ExecutionException` and to pull out the underlying cause. This patch duplicates this logic from `ApiError.fromThrowable` into `Errors.forException` to be sure that we handle all cases where exceptions are converted to error codes.
Reviewers: David Arthur <mu...@gmail.com>
---
.../org/apache/kafka/common/protocol/Errors.java | 21 ++++-
.../requests/AllocateProducerIdsResponse.java | 4 +
.../org/apache/kafka/common/requests/ApiError.java | 10 +--
.../apache/kafka/common/requests/ApiErrorTest.java | 6 +-
.../main/scala/kafka/server/ControllerApis.scala | 7 +-
.../test/junit/RaftClusterInvocationContext.java | 4 +-
.../server/AllocateProducerIdsRequestTest.scala | 98 ++++++++++++++++++++++
.../unit/kafka/server/ControllerApisTest.scala | 78 ++++++++++++++++-
8 files changed, 205 insertions(+), 23 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 2ca42bafcf..c220bbcde4 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -132,6 +132,8 @@ import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
import java.util.function.Function;
/**
@@ -469,7 +471,8 @@ public enum Errors {
* If there are multiple matches in the class hierarchy, the first match starting from the bottom is used.
*/
public static Errors forException(Throwable t) {
- Class<?> clazz = t.getClass();
+ Throwable cause = maybeUnwrapException(t);
+ Class<?> clazz = cause.getClass();
while (clazz != null) {
Errors error = classToError.get(clazz);
if (error != null)
@@ -479,6 +482,22 @@ public enum Errors {
return UNKNOWN_SERVER_ERROR;
}
+ /**
+ * Check if a Throwable is a commonly wrapped exception type (e.g. `CompletionException`) and return
+ * the cause if so. This is useful to handle cases where exceptions may be raised from a future or a
+ * completion stage (as might be the case for requests sent to the controller in `ControllerApis`).
+ *
+ * @param t The Throwable to check
+ * @return The throwable itself or its cause if it is an instance of a commonly wrapped exception type
+ */
+ public static Throwable maybeUnwrapException(Throwable t) {
+ if (t instanceof CompletionException || t instanceof ExecutionException) {
+ return t.getCause();
+ } else {
+ return t;
+ }
+ }
+
private static String toHtml() {
final StringBuilder b = new StringBuilder();
b.append("<table class=\"data-table\"><tbody>\n");
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AllocateProducerIdsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AllocateProducerIdsResponse.java
index 5d48c39e80..41db29158e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AllocateProducerIdsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AllocateProducerIdsResponse.java
@@ -56,6 +56,10 @@ public class AllocateProducerIdsResponse extends AbstractResponse {
return data.throttleTimeMs();
}
+ public Errors error() {
+ return Errors.forCode(data.errorCode());
+ }
+
public static AllocateProducerIdsResponse parse(ByteBuffer buffer, short version) {
return new AllocateProducerIdsResponse(new AllocateProducerIdsResponseData(
new ByteBufferAccessor(buffer), version));
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
index 01966532d6..dd127fc7a5 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
@@ -21,8 +21,6 @@ import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.protocol.Errors;
import java.util.Objects;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutionException;
/**
* Encapsulates an error code (via the Errors enum) and an optional message. Generally, the optional message is only
@@ -38,15 +36,9 @@ public class ApiError {
private final String message;
public static ApiError fromThrowable(Throwable t) {
- Throwable throwableToBeEncoded = t;
- // Get the underlying cause for common exception types from the concurrent library.
- // This is useful to handle cases where exceptions may be raised from a future or a
- // completion stage (as might be the case for requests sent to the controller in `ControllerApis`)
- if (t instanceof CompletionException || t instanceof ExecutionException) {
- throwableToBeEncoded = t.getCause();
- }
// Avoid populating the error message if it's a generic one. Also don't populate error
// message for UNKNOWN_SERVER_ERROR to ensure we don't leak sensitive information.
+ Throwable throwableToBeEncoded = Errors.maybeUnwrapException(t);
Errors error = Errors.forException(throwableToBeEncoded);
String message = error == Errors.UNKNOWN_SERVER_ERROR ||
error.message().equals(throwableToBeEncoded.getMessage()) ? null : throwableToBeEncoded.getMessage();
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ApiErrorTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ApiErrorTest.java
index 8b0aa470be..bf352dbb4a 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/ApiErrorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/ApiErrorTest.java
@@ -41,10 +41,10 @@ public class ApiErrorTest {
@ParameterizedTest
@MethodSource("parameters")
- public void fromThrowableShouldReturnCorrectError(Throwable t, Errors expectedErrors, String expectedMsg) {
+ public void fromThrowableShouldReturnCorrectError(Throwable t, Errors expectedError, String expectedMsg) {
ApiError apiError = ApiError.fromThrowable(t);
- assertEquals(apiError.error(), expectedErrors);
- assertEquals(apiError.message(), expectedMsg);
+ assertEquals(expectedError, apiError.error());
+ assertEquals(expectedMsg, apiError.message());
}
private static Collection<Arguments> parameters() {
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala
index efb6a36c3d..511d4b333c 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -20,7 +20,7 @@ package kafka.server
import java.util
import java.util.{Collections, OptionalLong}
import java.util.Map.Entry
-import java.util.concurrent.{CompletableFuture, CompletionException}
+import java.util.concurrent.CompletableFuture
import kafka.network.RequestChannel
import kafka.raft.RaftManager
import kafka.server.QuotaFactory.QuotaManagers
@@ -117,10 +117,7 @@ class ControllerApis(val requestChannel: RequestChannel,
// log the original exception here
error(s"Unexpected error handling request ${request.requestDesc(true)} " +
s"with context ${request.context}", exception)
-
- // For building the correct error request, we do need send the "cause" exception
- val actualException = if (exception.isInstanceOf[CompletionException]) exception.getCause else exception
- requestHelper.handleError(request, actualException)
+ requestHelper.handleError(request, exception)
}
}
} catch {
diff --git a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
index 5cd3ec3e24..40669f3068 100644
--- a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
+++ b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
@@ -267,11 +267,11 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
.orElseThrow(() -> new IllegalArgumentException("Unknown brokerId " + brokerId));
}
- private Stream<BrokerServer> brokers() {
+ public Stream<BrokerServer> brokers() {
return clusterReference.get().brokers().values().stream();
}
- private Stream<ControllerServer> controllers() {
+ public Stream<ControllerServer> controllers() {
return clusterReference.get().controllers().values().stream();
}
diff --git a/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala
new file mode 100644
index 0000000000..5cb59573d1
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package unit.kafka.server
+
+import kafka.network.SocketServer
+import kafka.server.{BrokerServer, ControllerServer, IntegrationTestUtils}
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterTest, ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
+import kafka.test.junit.RaftClusterInvocationContext.RaftClusterInstance
+import org.apache.kafka.common.message.AllocateProducerIdsRequestData
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests._
+import org.apache.kafka.server.common.ProducerIdsBlock
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.extension.ExtendWith
+import org.junit.jupiter.api.{Tag, Timeout}
+
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.KRAFT)
+@Tag("integration")
+class AllocateProducerIdsRequestTest(cluster: ClusterInstance) {
+
+ @ClusterTest
+ def testAllocateProducersIdSentToController(): Unit = {
+ val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
+ val sourceBroker = raftCluster.brokers.findFirst().get()
+
+ val controllerId = sourceBroker.raftManager.leaderAndEpoch.leaderId().getAsInt
+ val controllerServer = raftCluster.controllers()
+ .filter(_.config.nodeId == controllerId)
+ .findFirst()
+ .get()
+
+ val allocateResponse = sendAndReceiveAllocateProducerIds(sourceBroker, controllerServer)
+ assertEquals(Errors.NONE, allocateResponse.error)
+ assertEquals(ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE, allocateResponse.data.producerIdLen)
+ assertTrue(allocateResponse.data.producerIdStart >= 0)
+ }
+
+ @ClusterTest(controllers = 3)
+ def testAllocateProducersIdSentToNonController(): Unit = {
+ val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
+ val sourceBroker = raftCluster.brokers.findFirst().get()
+
+ val controllerId = sourceBroker.raftManager.leaderAndEpoch.leaderId().getAsInt
+ val controllerServer = raftCluster.controllers()
+ .filter(_.config.nodeId != controllerId)
+ .findFirst()
+ .get()
+
+ val allocateResponse = sendAndReceiveAllocateProducerIds(sourceBroker, controllerServer)
+ assertEquals(Errors.NOT_CONTROLLER, Errors.forCode(allocateResponse.data.errorCode))
+ }
+
+ private def sendAndReceiveAllocateProducerIds(
+ sourceBroker: BrokerServer,
+ controllerServer: ControllerServer
+ ): AllocateProducerIdsResponse = {
+ val allocateRequest = new AllocateProducerIdsRequest.Builder(
+ new AllocateProducerIdsRequestData()
+ .setBrokerId(sourceBroker.config.brokerId)
+ .setBrokerEpoch(sourceBroker.lifecycleManager.brokerEpoch)
+ ).build()
+
+ connectAndReceive(
+ controllerServer.socketServer,
+ allocateRequest
+ )
+ }
+
+ private def connectAndReceive(
+ controllerSocketServer: SocketServer,
+ request: AllocateProducerIdsRequest
+ ): AllocateProducerIdsResponse = {
+ IntegrationTestUtils.connectAndReceive[AllocateProducerIdsResponse](
+ request,
+ controllerSocketServer,
+ cluster.controllerListenerName.get
+ )
+ }
+
+}
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index 0fc9611452..05bd13d795 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -48,9 +48,9 @@ import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern,
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.{ElectionType, Uuid}
import org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT
-import org.apache.kafka.controller.{Controller, ControllerRequestContext}
+import org.apache.kafka.controller.{Controller, ControllerRequestContext, ResultOrError}
import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult, Authorizer}
-import org.apache.kafka.server.common.ApiMessageAndVersion
+import org.apache.kafka.server.common.{ApiMessageAndVersion, ProducerIdsBlock}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}
import org.junit.jupiter.params.ParameterizedTest
@@ -61,7 +61,7 @@ import org.mockito.{ArgumentCaptor, ArgumentMatchers}
import java.net.InetAddress
import java.util
-import java.util.Collections.singletonList
+import java.util.Collections.{singleton, singletonList, singletonMap}
import java.util.concurrent.{CompletableFuture, ExecutionException, TimeUnit}
import java.util.concurrent.atomic.AtomicReference
import java.util.{Collections, Properties}
@@ -876,6 +876,78 @@ class ControllerApisTest {
assertEquals(Errors.NOT_CONTROLLER, Errors.forCode(response.data.errorCode))
}
+ @Test
+ def testDeleteTopicsReturnsNotController(): Unit = {
+ val topicId = Uuid.randomUuid()
+ val topicName = "foo"
+ val controller = mock(classOf[Controller])
+ val controllerApis = createControllerApis(None, controller)
+
+ val findNamesFuture = CompletableFuture.completedFuture(
+ singletonMap(topicId, new ResultOrError(topicName))
+ )
+ when(controller.findTopicNames(
+ any[ControllerRequestContext],
+ ArgumentMatchers.eq(singleton(topicId))
+ )).thenReturn(findNamesFuture)
+
+ val findIdsFuture = CompletableFuture.completedFuture(
+ Collections.emptyMap[String, ResultOrError[Uuid]]()
+ )
+ when(controller.findTopicIds(
+ any[ControllerRequestContext],
+ ArgumentMatchers.eq(Collections.emptySet())
+ )).thenReturn(findIdsFuture)
+
+ val deleteFuture = new CompletableFuture[util.Map[Uuid, ApiError]]()
+ deleteFuture.completeExceptionally(new NotControllerException("Controller has moved"))
+ when(controller.deleteTopics(
+ any[ControllerRequestContext],
+ ArgumentMatchers.eq(singleton(topicId))
+ )).thenReturn(deleteFuture)
+
+ val request = new DeleteTopicsRequest.Builder(
+ new DeleteTopicsRequestData().setTopics(singletonList(
+ new DeleteTopicState().setTopicId(topicId)
+ ))
+ ).build()
+
+ val response = handleRequest[DeleteTopicsResponse](request, controllerApis)
+ val topicIdResponse = response.data.responses.asScala.find(_.topicId == topicId).get
+ assertEquals(Errors.NOT_CONTROLLER, Errors.forCode(topicIdResponse.errorCode))
+ }
+
+ @Test
+ def testAllocateProducerIdsReturnsNotController(): Unit = {
+ val controller = mock(classOf[Controller])
+ val controllerApis = createControllerApis(None, controller)
+
+ // We construct the future here to mimic the logic in `QuorumController.allocateProducerIds`.
+ // When an exception is raised on the original future, the `thenApply` future is also completed
+ // exceptionally, but the underlying cause is wrapped in a `CompletionException`.
+ val future = new CompletableFuture[ProducerIdsBlock]
+ val thenApplyFuture = future.thenApply[AllocateProducerIdsResponseData] { result =>
+ new AllocateProducerIdsResponseData()
+ .setProducerIdStart(result.firstProducerId())
+ .setProducerIdLen(result.size())
+ }
+ future.completeExceptionally(new NotControllerException("Controller has moved"))
+
+ val request = new AllocateProducerIdsRequest.Builder(
+ new AllocateProducerIdsRequestData()
+ .setBrokerId(4)
+ .setBrokerEpoch(93234)
+ ).build()
+
+ when(controller.allocateProducerIds(
+ any[ControllerRequestContext],
+ ArgumentMatchers.eq(request.data)
+ )).thenReturn(thenApplyFuture)
+
+ val response = handleRequest[AllocateProducerIdsResponse](request, controllerApis)
+ assertEquals(Errors.NOT_CONTROLLER, response.error)
+ }
+
private def handleRequest[T <: AbstractResponse](
request: AbstractRequest,
controllerApis: ControllerApis