You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/08/19 19:56:03 UTC

[kafka] branch 3.3 updated: MINOR: Fix unexpected request error in kraft shutdown (#12538)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 26ffdd7728 MINOR: Fix unexpected request error in kraft shutdown (#12538)
26ffdd7728 is described below

commit 26ffdd772827627af2e3213a395bae56c1b41321
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Fri Aug 19 12:45:05 2022 -0700

    MINOR: Fix unexpected request error in kraft shutdown (#12538)
    
    We have been seeing a few exceptions like the following when running integration tests:
    ```
    [2022-08-18 13:02:59,470] ERROR [ControllerApis nodeId=3000] Unexpected error handling request RequestHeader(apiKey=FETCH, apiVersion=13, clientId=raft-client-0, correlationId=7) -- FetchRequestData(clusterId='txpo87ZUSbGSeV2v7H0n_w', replicaId=0, maxWaitMs=500, minBytes=0, maxBytes=8388608, isolationLevel=0, sessionId=0, sessionEpoch=-1, topics=[FetchTopic(topic='__cluster_metadata', topicId=AAAAAAAAAAAAAAAAAAAAAQ, partitions=[FetchPartition(partition=0, currentLeaderEpoch=1, fetchOf [...]
    java.util.concurrent.CompletionException: java.util.NoSuchElementException: key not found: BROKER_NOT_AVAILABLE
            at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
            at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)
            at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:936)
            at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911)
            at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
            at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
            at org.apache.kafka.raft.KafkaRaftClient.lambda$handleRequest$19(KafkaRaftClient.java:1666)
            at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
            at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
            at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
            at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2162)
            at kafka.raft.TimingWheelExpirationService$TimerTaskCompletableFuture.run(TimingWheelExpirationService.scala:32)
            at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
            at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
            at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
            at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
            at java.base/java.lang.Thread.run(Thread.java:833)
    Caused by: java.util.NoSuchElementException: key not found: BROKER_NOT_AVAILABLE
    ```
    There are two causes for this error that I found. First, we were not shutting down the timer services in `RaftManager` which are used in the purgatory implementation. This meant that operations remaining in purgatory could be completed even after `RaftManager` was shutdown. Second, the shutdown order in `KafkaClusterTestKit` was problematic. The `RaftManager` instance depends on the `SocketServer` in `ControllerServer`, but it was the latter that was shutdown first. Instead, we should [...]
    
    Reviewers: Ismael Juma <is...@juma.me.uk>
---
 core/src/main/scala/kafka/raft/RaftManager.scala   | 10 ++++---
 .../main/scala/kafka/server/ControllerApis.scala   |  1 +
 .../main/scala/kafka/server/KafkaRaftServer.scala  |  7 +++++
 .../java/kafka/testkit/KafkaClusterTestKit.java    | 35 +++++++++++++---------
 4 files changed, 35 insertions(+), 18 deletions(-)

diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala
index a44d9d8fe0..5b8fe1e827 100644
--- a/core/src/main/scala/kafka/raft/RaftManager.scala
+++ b/core/src/main/scala/kafka/raft/RaftManager.scala
@@ -122,6 +122,8 @@ class KafkaRaftManager[T](
   private val dataDir = createDataDir()
   override val replicatedLog: ReplicatedLog = buildMetadataLog()
   private val netChannel = buildNetworkChannel()
+  private val expirationTimer = new SystemTimer("raft-expiration-executor")
+  private val expirationService = new TimingWheelExpirationService(expirationTimer)
   override val client: KafkaRaftClient[T] = buildRaftClient()
   private val raftIoThread = new RaftIoThread(client, threadNamePrefix)
 
@@ -133,10 +135,10 @@ class KafkaRaftManager[T](
         case spec: InetAddressSpec =>
           netChannel.updateEndpoint(voterAddressEntry.getKey, spec)
         case _: UnknownAddressSpec =>
-          logger.info(s"Skipping channel update for destination ID: ${voterAddressEntry.getKey} " +
+          info(s"Skipping channel update for destination ID: ${voterAddressEntry.getKey} " +
             s"because of non-routable endpoint: ${NON_ROUTABLE_ADDRESS.toString}")
         case invalid: AddressSpec =>
-          logger.warn(s"Unexpected address spec (type: ${invalid.getClass}) for channel update for " +
+          warn(s"Unexpected address spec (type: ${invalid.getClass}) for channel update for " +
             s"destination ID: ${voterAddressEntry.getKey}")
       }
     }
@@ -145,6 +147,8 @@ class KafkaRaftManager[T](
   }
 
   def shutdown(): Unit = {
+    expirationService.shutdown()
+    expirationTimer.shutdown()
     raftIoThread.shutdown()
     client.close()
     scheduler.shutdown()
@@ -177,8 +181,6 @@ class KafkaRaftManager[T](
   }
 
   private def buildRaftClient(): KafkaRaftClient[T] = {
-    val expirationTimer = new SystemTimer("raft-expiration-executor")
-    val expirationService = new TimingWheelExpirationService(expirationTimer)
     val quorumStateStore = new FileBasedStateStore(new File(dataDir, "quorum-state"))
     val nodeId = OptionalInt.of(config.nodeId)
 
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala
index 511d4b333c..235660cec0 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -68,6 +68,7 @@ class ControllerApis(val requestChannel: RequestChannel,
                      val controllerNodes: Seq[Node],
                      val apiVersionManager: ApiVersionManager) extends ApiRequestHandler with Logging {
 
+  this.logIdent = s"[ControllerApis nodeId=${config.nodeId}] "
   val authHelper = new AuthHelper(authorizer)
   val requestHelper = new RequestHandlerHelper(requestChannel, quotas, time)
   private val aclApis = new AclApis(authHelper, authorizer, requestHelper, "controller", config)
diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
index 2338ef5e7c..8ce0bc1861 100644
--- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
@@ -56,6 +56,7 @@ class KafkaRaftServer(
   threadNamePrefix: Option[String]
 ) extends Server with Logging {
 
+  this.logIdent = s"[KafkaRaftServer nodeId=${config.nodeId}] "
   KafkaMetricsReporter.startReporters(VerifiableProperties(config.originals))
   KafkaYammerMetrics.INSTANCE.configure(config.originals)
 
@@ -133,6 +134,8 @@ class KafkaRaftServer(
 
   override def startup(): Unit = {
     Mx4jLoader.maybeLoad()
+    // Note that we startup `RaftManager` first so that the controller and broker
+    // can register listeners during initialization.
     raftManager.startup()
     controller.foreach(_.startup())
     broker.foreach(_.startup())
@@ -142,6 +145,10 @@ class KafkaRaftServer(
 
   override def shutdown(): Unit = {
     broker.foreach(_.shutdown())
+    // The order of shutdown for `RaftManager` and `ControllerServer` is backwards
+    // compared to `startup()`. This is because the `SocketServer` implementation that
+    // we rely on to receive requests is owned by `ControllerServer`, so we need it
+    // to stick around until graceful shutdown of `RaftManager` can be completed.
     raftManager.shutdown()
     controller.foreach(_.shutdown())
     CoreUtils.swallow(AppInfoParser.unregisterAppInfo(Server.MetricsPrefix, config.brokerId.toString, metrics), this)
diff --git a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
index ecee13c498..139b05fa54 100644
--- a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
+++ b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
@@ -275,15 +275,15 @@ public class KafkaClusterTestKit implements AutoCloseable {
                     executorService.shutdownNow();
                     executorService.awaitTermination(5, TimeUnit.MINUTES);
                 }
-                for (ControllerServer controller : controllers.values()) {
-                    controller.shutdown();
-                }
                 for (BrokerServer brokerServer : brokers.values()) {
                     brokerServer.shutdown();
                 }
                 for (KafkaRaftManager<ApiMessageAndVersion> raftManager : raftManagers.values()) {
                     raftManager.shutdown();
                 }
+                for (ControllerServer controller : controllers.values()) {
+                    controller.shutdown();
+                }
                 connectFutureManager.close();
                 if (baseDirectory != null) {
                     Utils.delete(baseDirectory);
@@ -408,12 +408,15 @@ public class KafkaClusterTestKit implements AutoCloseable {
     public void startup() throws ExecutionException, InterruptedException {
         List<Future<?>> futures = new ArrayList<>();
         try {
-            for (ControllerServer controller : controllers.values()) {
-                futures.add(executorService.submit(controller::startup));
-            }
+            // Note the startup order here is chosen to be consistent with
+            // `KafkaRaftServer`. See comments in that class for an explanation.
+
             for (KafkaRaftManager<ApiMessageAndVersion> raftManager : raftManagers.values()) {
                 futures.add(controllerQuorumVotersFutureManager.future.thenRunAsync(raftManager::startup));
             }
+            for (ControllerServer controller : controllers.values()) {
+                futures.add(executorService.submit(controller::startup));
+            }
             for (BrokerServer broker : brokers.values()) {
                 futures.add(executorService.submit(broker::startup));
             }
@@ -513,6 +516,10 @@ public class KafkaClusterTestKit implements AutoCloseable {
         List<Entry<String, Future<?>>> futureEntries = new ArrayList<>();
         try {
             controllerQuorumVotersFutureManager.close();
+
+            // Note the shutdown order here is chosen to be consistent with
+            // `KafkaRaftServer`. See comments in that class for an explanation.
+
             for (Entry<Integer, BrokerServer> entry : brokers.entrySet()) {
                 int brokerId = entry.getKey();
                 BrokerServer broker = entry.getValue();
@@ -521,14 +528,6 @@ public class KafkaClusterTestKit implements AutoCloseable {
             }
             waitForAllFutures(futureEntries);
             futureEntries.clear();
-            for (Entry<Integer, ControllerServer> entry : controllers.entrySet()) {
-                int controllerId = entry.getKey();
-                ControllerServer controller = entry.getValue();
-                futureEntries.add(new SimpleImmutableEntry<>("controller" + controllerId,
-                    executorService.submit(controller::shutdown)));
-            }
-            waitForAllFutures(futureEntries);
-            futureEntries.clear();
             for (Entry<Integer, KafkaRaftManager<ApiMessageAndVersion>> entry : raftManagers.entrySet()) {
                 int raftManagerId = entry.getKey();
                 KafkaRaftManager<ApiMessageAndVersion> raftManager = entry.getValue();
@@ -537,6 +536,14 @@ public class KafkaClusterTestKit implements AutoCloseable {
             }
             waitForAllFutures(futureEntries);
             futureEntries.clear();
+            for (Entry<Integer, ControllerServer> entry : controllers.entrySet()) {
+                int controllerId = entry.getKey();
+                ControllerServer controller = entry.getValue();
+                futureEntries.add(new SimpleImmutableEntry<>("controller" + controllerId,
+                    executorService.submit(controller::shutdown)));
+            }
+            waitForAllFutures(futureEntries);
+            futureEntries.clear();
             Utils.delete(baseDirectory);
         } catch (Exception e) {
             for (Entry<String, Future<?>> entry : futureEntries) {