You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/08/18 01:20:12 UTC

[kafka] branch 3.3 updated: KAFKA-14167; Completion exceptions should not be translated directly to error codes (#12518)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 81c4426550 KAFKA-14167; Completion exceptions should not be translated directly to error codes (#12518)
81c4426550 is described below

commit 81c442655079355f859dcdb495df015a1a1f7baa
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Wed Aug 17 18:11:42 2022 -0700

    KAFKA-14167; Completion exceptions should not be translated directly to error codes (#12518)
    
    There are a few cases in `ControllerApis` where we may see an `ApiException` wrapped as a `CompletionException`. This can happen in `QuorumController.allocateProducerIds` where the returned future is the result of calling `thenApply` on the future passed to the controller. The danger when this happens is that the `CompletionException` gets passed to `Errors.forException`, which translates it to an `UNKNOWN_SERVER_ERROR`. At a minimum, I found that the `AllocateProducerIds` and `Update [...]
    
    Interestingly, `DeleteTopics` is not affected by this bug as I originally suspected. This is because we have logic in `ApiError.fromThrowable` to check for both `CompletionException` and `ExecutionException` and to pull out the underlying cause. This patch duplicates this logic from `ApiError.fromThrowable` into `Errors.forException` to be sure that we handle all cases where exceptions are converted to error codes.
    
    Reviewers: David Arthur <mu...@gmail.com>
---
 .../org/apache/kafka/common/protocol/Errors.java   | 21 ++++-
 .../requests/AllocateProducerIdsResponse.java      |  4 +
 .../org/apache/kafka/common/requests/ApiError.java | 10 +--
 .../apache/kafka/common/requests/ApiErrorTest.java |  6 +-
 .../main/scala/kafka/server/ControllerApis.scala   |  7 +-
 .../test/junit/RaftClusterInvocationContext.java   |  4 +-
 .../server/AllocateProducerIdsRequestTest.scala    | 98 ++++++++++++++++++++++
 .../unit/kafka/server/ControllerApisTest.scala     | 78 ++++++++++++++++-
 8 files changed, 205 insertions(+), 23 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 2ca42bafcf..c220bbcde4 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -132,6 +132,8 @@ import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
 import java.util.function.Function;
 
 /**
@@ -469,7 +471,8 @@ public enum Errors {
      * If there are multiple matches in the class hierarchy, the first match starting from the bottom is used.
      */
     public static Errors forException(Throwable t) {
-        Class<?> clazz = t.getClass();
+        Throwable cause = maybeUnwrapException(t);
+        Class<?> clazz = cause.getClass();
         while (clazz != null) {
             Errors error = classToError.get(clazz);
             if (error != null)
@@ -479,6 +482,22 @@ public enum Errors {
         return UNKNOWN_SERVER_ERROR;
     }
 
+    /**
+     * Check if a Throwable is a commonly wrapped exception type (e.g. `CompletionException`) and return
+     * the cause if so. This is useful to handle cases where exceptions may be raised from a future or a
+     * completion stage (as might be the case for requests sent to the controller in `ControllerApis`).
+     *
+     * @param t The Throwable to check
+     * @return The throwable itself or its cause if it is an instance of a commonly wrapped exception type
+     */
+    public static Throwable maybeUnwrapException(Throwable t) {
+        if (t instanceof CompletionException || t instanceof ExecutionException) {
+            return t.getCause();
+        } else {
+            return t;
+        }
+    }
+
     private static String toHtml() {
         final StringBuilder b = new StringBuilder();
         b.append("<table class=\"data-table\"><tbody>\n");
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AllocateProducerIdsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AllocateProducerIdsResponse.java
index 5d48c39e80..41db29158e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AllocateProducerIdsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AllocateProducerIdsResponse.java
@@ -56,6 +56,10 @@ public class AllocateProducerIdsResponse extends AbstractResponse {
         return data.throttleTimeMs();
     }
 
+    public Errors error() {
+        return Errors.forCode(data.errorCode());
+    }
+
     public static AllocateProducerIdsResponse parse(ByteBuffer buffer, short version) {
         return new AllocateProducerIdsResponse(new AllocateProducerIdsResponseData(
                 new ByteBufferAccessor(buffer), version));
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
index 01966532d6..dd127fc7a5 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
@@ -21,8 +21,6 @@ import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.protocol.Errors;
 
 import java.util.Objects;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutionException;
 
 /**
  * Encapsulates an error code (via the Errors enum) and an optional message. Generally, the optional message is only
@@ -38,15 +36,9 @@ public class ApiError {
     private final String message;
 
     public static ApiError fromThrowable(Throwable t) {
-        Throwable throwableToBeEncoded = t;
-        // Get the underlying cause for common exception types from the concurrent library.
-        // This is useful to handle cases where exceptions may be raised from a future or a
-        // completion stage (as might be the case for requests sent to the controller in `ControllerApis`)
-        if (t instanceof CompletionException || t instanceof ExecutionException) {
-            throwableToBeEncoded = t.getCause();
-        }
         // Avoid populating the error message if it's a generic one. Also don't populate error
         // message for UNKNOWN_SERVER_ERROR to ensure we don't leak sensitive information.
+        Throwable throwableToBeEncoded = Errors.maybeUnwrapException(t);
         Errors error = Errors.forException(throwableToBeEncoded);
         String message = error == Errors.UNKNOWN_SERVER_ERROR ||
             error.message().equals(throwableToBeEncoded.getMessage()) ? null : throwableToBeEncoded.getMessage();
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ApiErrorTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ApiErrorTest.java
index 8b0aa470be..bf352dbb4a 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/ApiErrorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/ApiErrorTest.java
@@ -41,10 +41,10 @@ public class ApiErrorTest {
 
     @ParameterizedTest
     @MethodSource("parameters")
-    public void fromThrowableShouldReturnCorrectError(Throwable t, Errors expectedErrors, String expectedMsg) {
+    public void fromThrowableShouldReturnCorrectError(Throwable t, Errors expectedError, String expectedMsg) {
         ApiError apiError = ApiError.fromThrowable(t);
-        assertEquals(apiError.error(), expectedErrors);
-        assertEquals(apiError.message(), expectedMsg);
+        assertEquals(expectedError, apiError.error());
+        assertEquals(expectedMsg, apiError.message());
     }
 
     private static Collection<Arguments> parameters() {
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala
index efb6a36c3d..511d4b333c 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -20,7 +20,7 @@ package kafka.server
 import java.util
 import java.util.{Collections, OptionalLong}
 import java.util.Map.Entry
-import java.util.concurrent.{CompletableFuture, CompletionException}
+import java.util.concurrent.CompletableFuture
 import kafka.network.RequestChannel
 import kafka.raft.RaftManager
 import kafka.server.QuotaFactory.QuotaManagers
@@ -117,10 +117,7 @@ class ControllerApis(val requestChannel: RequestChannel,
           // log the original exception here
           error(s"Unexpected error handling request ${request.requestDesc(true)} " +
             s"with context ${request.context}", exception)
-
-          // For building the correct error request, we do need send the "cause" exception
-          val actualException = if (exception.isInstanceOf[CompletionException]) exception.getCause else exception
-          requestHelper.handleError(request, actualException)
+          requestHelper.handleError(request, exception)
         }
       }
     } catch {
diff --git a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
index 5cd3ec3e24..40669f3068 100644
--- a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
+++ b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
@@ -267,11 +267,11 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
                 .orElseThrow(() -> new IllegalArgumentException("Unknown brokerId " + brokerId));
         }
 
-        private Stream<BrokerServer> brokers() {
+        public Stream<BrokerServer> brokers() {
             return clusterReference.get().brokers().values().stream();
         }
 
-        private Stream<ControllerServer> controllers() {
+        public Stream<ControllerServer> controllers() {
             return clusterReference.get().controllers().values().stream();
         }
 
diff --git a/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala
new file mode 100644
index 0000000000..5cb59573d1
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package unit.kafka.server
+
+import kafka.network.SocketServer
+import kafka.server.{BrokerServer, ControllerServer, IntegrationTestUtils}
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterTest, ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
+import kafka.test.junit.RaftClusterInvocationContext.RaftClusterInstance
+import org.apache.kafka.common.message.AllocateProducerIdsRequestData
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests._
+import org.apache.kafka.server.common.ProducerIdsBlock
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.extension.ExtendWith
+import org.junit.jupiter.api.{Tag, Timeout}
+
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.KRAFT)
+@Tag("integration")
+class AllocateProducerIdsRequestTest(cluster: ClusterInstance) {
+
+  @ClusterTest
+  def testAllocateProducersIdSentToController(): Unit = {
+    val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
+    val sourceBroker = raftCluster.brokers.findFirst().get()
+
+    val controllerId = sourceBroker.raftManager.leaderAndEpoch.leaderId().getAsInt
+    val controllerServer = raftCluster.controllers()
+      .filter(_.config.nodeId == controllerId)
+      .findFirst()
+      .get()
+
+    val allocateResponse = sendAndReceiveAllocateProducerIds(sourceBroker, controllerServer)
+    assertEquals(Errors.NONE, allocateResponse.error)
+    assertEquals(ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE, allocateResponse.data.producerIdLen)
+    assertTrue(allocateResponse.data.producerIdStart >= 0)
+  }
+
+  @ClusterTest(controllers = 3)
+  def testAllocateProducersIdSentToNonController(): Unit = {
+    val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
+    val sourceBroker = raftCluster.brokers.findFirst().get()
+
+    val controllerId = sourceBroker.raftManager.leaderAndEpoch.leaderId().getAsInt
+    val controllerServer = raftCluster.controllers()
+      .filter(_.config.nodeId != controllerId)
+      .findFirst()
+      .get()
+
+    val allocateResponse = sendAndReceiveAllocateProducerIds(sourceBroker, controllerServer)
+    assertEquals(Errors.NOT_CONTROLLER, Errors.forCode(allocateResponse.data.errorCode))
+  }
+
+  private def sendAndReceiveAllocateProducerIds(
+    sourceBroker: BrokerServer,
+    controllerServer: ControllerServer
+  ): AllocateProducerIdsResponse = {
+    val allocateRequest = new AllocateProducerIdsRequest.Builder(
+      new AllocateProducerIdsRequestData()
+        .setBrokerId(sourceBroker.config.brokerId)
+        .setBrokerEpoch(sourceBroker.lifecycleManager.brokerEpoch)
+    ).build()
+
+    connectAndReceive(
+      controllerServer.socketServer,
+      allocateRequest
+    )
+  }
+
+  private def connectAndReceive(
+    controllerSocketServer: SocketServer,
+    request: AllocateProducerIdsRequest
+  ): AllocateProducerIdsResponse = {
+    IntegrationTestUtils.connectAndReceive[AllocateProducerIdsResponse](
+      request,
+      controllerSocketServer,
+      cluster.controllerListenerName.get
+    )
+  }
+
+}
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index 0fc9611452..05bd13d795 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -48,9 +48,9 @@ import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern,
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.{ElectionType, Uuid}
 import org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT
-import org.apache.kafka.controller.{Controller, ControllerRequestContext}
+import org.apache.kafka.controller.{Controller, ControllerRequestContext, ResultOrError}
 import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult, Authorizer}
-import org.apache.kafka.server.common.ApiMessageAndVersion
+import org.apache.kafka.server.common.{ApiMessageAndVersion, ProducerIdsBlock}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, Test}
 import org.junit.jupiter.params.ParameterizedTest
@@ -61,7 +61,7 @@ import org.mockito.{ArgumentCaptor, ArgumentMatchers}
 
 import java.net.InetAddress
 import java.util
-import java.util.Collections.singletonList
+import java.util.Collections.{singleton, singletonList, singletonMap}
 import java.util.concurrent.{CompletableFuture, ExecutionException, TimeUnit}
 import java.util.concurrent.atomic.AtomicReference
 import java.util.{Collections, Properties}
@@ -876,6 +876,78 @@ class ControllerApisTest {
     assertEquals(Errors.NOT_CONTROLLER, Errors.forCode(response.data.errorCode))
   }
 
+  @Test
+  def testDeleteTopicsReturnsNotController(): Unit = {
+    val topicId = Uuid.randomUuid()
+    val topicName = "foo"
+    val controller = mock(classOf[Controller])
+    val controllerApis = createControllerApis(None, controller)
+
+    val findNamesFuture = CompletableFuture.completedFuture(
+      singletonMap(topicId, new ResultOrError(topicName))
+    )
+    when(controller.findTopicNames(
+      any[ControllerRequestContext],
+      ArgumentMatchers.eq(singleton(topicId))
+    )).thenReturn(findNamesFuture)
+
+    val findIdsFuture = CompletableFuture.completedFuture(
+      Collections.emptyMap[String, ResultOrError[Uuid]]()
+    )
+    when(controller.findTopicIds(
+      any[ControllerRequestContext],
+      ArgumentMatchers.eq(Collections.emptySet())
+    )).thenReturn(findIdsFuture)
+
+    val deleteFuture = new CompletableFuture[util.Map[Uuid, ApiError]]()
+    deleteFuture.completeExceptionally(new NotControllerException("Controller has moved"))
+    when(controller.deleteTopics(
+      any[ControllerRequestContext],
+      ArgumentMatchers.eq(singleton(topicId))
+    )).thenReturn(deleteFuture)
+
+    val request = new DeleteTopicsRequest.Builder(
+      new DeleteTopicsRequestData().setTopics(singletonList(
+        new DeleteTopicState().setTopicId(topicId)
+      ))
+    ).build()
+
+    val response = handleRequest[DeleteTopicsResponse](request, controllerApis)
+    val topicIdResponse = response.data.responses.asScala.find(_.topicId == topicId).get
+    assertEquals(Errors.NOT_CONTROLLER, Errors.forCode(topicIdResponse.errorCode))
+  }
+
+  @Test
+  def testAllocateProducerIdsReturnsNotController(): Unit = {
+    val controller = mock(classOf[Controller])
+    val controllerApis = createControllerApis(None, controller)
+
+    // We construct the future here to mimic the logic in `QuorumController.allocateProducerIds`.
+    // When an exception is raised on the original future, the `thenApply` future is also completed
+    // exceptionally, but the underlying cause is wrapped in a `CompletionException`.
+    val future = new CompletableFuture[ProducerIdsBlock]
+    val thenApplyFuture = future.thenApply[AllocateProducerIdsResponseData] { result =>
+      new AllocateProducerIdsResponseData()
+        .setProducerIdStart(result.firstProducerId())
+        .setProducerIdLen(result.size())
+    }
+    future.completeExceptionally(new NotControllerException("Controller has moved"))
+
+    val request = new AllocateProducerIdsRequest.Builder(
+      new AllocateProducerIdsRequestData()
+        .setBrokerId(4)
+        .setBrokerEpoch(93234)
+    ).build()
+
+    when(controller.allocateProducerIds(
+      any[ControllerRequestContext],
+      ArgumentMatchers.eq(request.data)
+    )).thenReturn(thenApplyFuture)
+
+    val response = handleRequest[AllocateProducerIdsResponse](request, controllerApis)
+    assertEquals(Errors.NOT_CONTROLLER, response.error)
+  }
+
   private def handleRequest[T <: AbstractResponse](
     request: AbstractRequest,
     controllerApis: ControllerApis