You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/08/19 19:56:03 UTC
[kafka] branch 3.3 updated: MINOR: Fix unexpected request error in kraft shutdown (#12538)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push:
new 26ffdd7728 MINOR: Fix unexpected request error in kraft shutdown (#12538)
26ffdd7728 is described below
commit 26ffdd772827627af2e3213a395bae56c1b41321
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Fri Aug 19 12:45:05 2022 -0700
MINOR: Fix unexpected request error in kraft shutdown (#12538)
We have been seeing a few exceptions like the following when running integration tests:
```
[2022-08-18 13:02:59,470] ERROR [ControllerApis nodeId=3000] Unexpected error handling request RequestHeader(apiKey=FETCH, apiVersion=13, clientId=raft-client-0, correlationId=7) -- FetchRequestData(clusterId='txpo87ZUSbGSeV2v7H0n_w', replicaId=0, maxWaitMs=500, minBytes=0, maxBytes=8388608, isolationLevel=0, sessionId=0, sessionEpoch=-1, topics=[FetchTopic(topic='__cluster_metadata', topicId=AAAAAAAAAAAAAAAAAAAAAQ, partitions=[FetchPartition(partition=0, currentLeaderEpoch=1, fetchOf [...]
java.util.concurrent.CompletionException: java.util.NoSuchElementException: key not found: BROKER_NOT_AVAILABLE
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)
at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:936)
at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
at org.apache.kafka.raft.KafkaRaftClient.lambda$handleRequest$19(KafkaRaftClient.java:1666)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2162)
at kafka.raft.TimingWheelExpirationService$TimerTaskCompletableFuture.run(TimingWheelExpirationService.scala:32)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.util.NoSuchElementException: key not found: BROKER_NOT_AVAILABLE
```
There are two causes for this error that I found. First, we were not shutting down the timer services in `RaftManager` which are used in the purgatory implementation. This meant that operations remaining in purgatory could be completed even after `RaftManager` was shutdown. Second, the shutdown order in `KafkaClusterTestKit` was problematic. The `RaftManager` instance depends on the `SocketServer` in `ControllerServer`, but it was the latter that was shutdown first. Instead, we should [...]
Reviewers: Ismael Juma <is...@juma.me.uk>
---
core/src/main/scala/kafka/raft/RaftManager.scala | 10 ++++---
.../main/scala/kafka/server/ControllerApis.scala | 1 +
.../main/scala/kafka/server/KafkaRaftServer.scala | 7 +++++
.../java/kafka/testkit/KafkaClusterTestKit.java | 35 +++++++++++++---------
4 files changed, 35 insertions(+), 18 deletions(-)
diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala
index a44d9d8fe0..5b8fe1e827 100644
--- a/core/src/main/scala/kafka/raft/RaftManager.scala
+++ b/core/src/main/scala/kafka/raft/RaftManager.scala
@@ -122,6 +122,8 @@ class KafkaRaftManager[T](
private val dataDir = createDataDir()
override val replicatedLog: ReplicatedLog = buildMetadataLog()
private val netChannel = buildNetworkChannel()
+ private val expirationTimer = new SystemTimer("raft-expiration-executor")
+ private val expirationService = new TimingWheelExpirationService(expirationTimer)
override val client: KafkaRaftClient[T] = buildRaftClient()
private val raftIoThread = new RaftIoThread(client, threadNamePrefix)
@@ -133,10 +135,10 @@ class KafkaRaftManager[T](
case spec: InetAddressSpec =>
netChannel.updateEndpoint(voterAddressEntry.getKey, spec)
case _: UnknownAddressSpec =>
- logger.info(s"Skipping channel update for destination ID: ${voterAddressEntry.getKey} " +
+ info(s"Skipping channel update for destination ID: ${voterAddressEntry.getKey} " +
s"because of non-routable endpoint: ${NON_ROUTABLE_ADDRESS.toString}")
case invalid: AddressSpec =>
- logger.warn(s"Unexpected address spec (type: ${invalid.getClass}) for channel update for " +
+ warn(s"Unexpected address spec (type: ${invalid.getClass}) for channel update for " +
s"destination ID: ${voterAddressEntry.getKey}")
}
}
@@ -145,6 +147,8 @@ class KafkaRaftManager[T](
}
def shutdown(): Unit = {
+ expirationService.shutdown()
+ expirationTimer.shutdown()
raftIoThread.shutdown()
client.close()
scheduler.shutdown()
@@ -177,8 +181,6 @@ class KafkaRaftManager[T](
}
private def buildRaftClient(): KafkaRaftClient[T] = {
- val expirationTimer = new SystemTimer("raft-expiration-executor")
- val expirationService = new TimingWheelExpirationService(expirationTimer)
val quorumStateStore = new FileBasedStateStore(new File(dataDir, "quorum-state"))
val nodeId = OptionalInt.of(config.nodeId)
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala
index 511d4b333c..235660cec0 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -68,6 +68,7 @@ class ControllerApis(val requestChannel: RequestChannel,
val controllerNodes: Seq[Node],
val apiVersionManager: ApiVersionManager) extends ApiRequestHandler with Logging {
+ this.logIdent = s"[ControllerApis nodeId=${config.nodeId}] "
val authHelper = new AuthHelper(authorizer)
val requestHelper = new RequestHandlerHelper(requestChannel, quotas, time)
private val aclApis = new AclApis(authHelper, authorizer, requestHelper, "controller", config)
diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
index 2338ef5e7c..8ce0bc1861 100644
--- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
@@ -56,6 +56,7 @@ class KafkaRaftServer(
threadNamePrefix: Option[String]
) extends Server with Logging {
+ this.logIdent = s"[KafkaRaftServer nodeId=${config.nodeId}] "
KafkaMetricsReporter.startReporters(VerifiableProperties(config.originals))
KafkaYammerMetrics.INSTANCE.configure(config.originals)
@@ -133,6 +134,8 @@ class KafkaRaftServer(
override def startup(): Unit = {
Mx4jLoader.maybeLoad()
+ // Note that we startup `RaftManager` first so that the controller and broker
+ // can register listeners during initialization.
raftManager.startup()
controller.foreach(_.startup())
broker.foreach(_.startup())
@@ -142,6 +145,10 @@ class KafkaRaftServer(
override def shutdown(): Unit = {
broker.foreach(_.shutdown())
+ // The order of shutdown for `RaftManager` and `ControllerServer` is backwards
+ // compared to `startup()`. This is because the `SocketServer` implementation that
+ // we rely on to receive requests is owned by `ControllerServer`, so we need it
+ // to stick around until graceful shutdown of `RaftManager` can be completed.
raftManager.shutdown()
controller.foreach(_.shutdown())
CoreUtils.swallow(AppInfoParser.unregisterAppInfo(Server.MetricsPrefix, config.brokerId.toString, metrics), this)
diff --git a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
index ecee13c498..139b05fa54 100644
--- a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
+++ b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
@@ -275,15 +275,15 @@ public class KafkaClusterTestKit implements AutoCloseable {
executorService.shutdownNow();
executorService.awaitTermination(5, TimeUnit.MINUTES);
}
- for (ControllerServer controller : controllers.values()) {
- controller.shutdown();
- }
for (BrokerServer brokerServer : brokers.values()) {
brokerServer.shutdown();
}
for (KafkaRaftManager<ApiMessageAndVersion> raftManager : raftManagers.values()) {
raftManager.shutdown();
}
+ for (ControllerServer controller : controllers.values()) {
+ controller.shutdown();
+ }
connectFutureManager.close();
if (baseDirectory != null) {
Utils.delete(baseDirectory);
@@ -408,12 +408,15 @@ public class KafkaClusterTestKit implements AutoCloseable {
public void startup() throws ExecutionException, InterruptedException {
List<Future<?>> futures = new ArrayList<>();
try {
- for (ControllerServer controller : controllers.values()) {
- futures.add(executorService.submit(controller::startup));
- }
+ // Note the startup order here is chosen to be consistent with
+ // `KafkaRaftServer`. See comments in that class for an explanation.
+
for (KafkaRaftManager<ApiMessageAndVersion> raftManager : raftManagers.values()) {
futures.add(controllerQuorumVotersFutureManager.future.thenRunAsync(raftManager::startup));
}
+ for (ControllerServer controller : controllers.values()) {
+ futures.add(executorService.submit(controller::startup));
+ }
for (BrokerServer broker : brokers.values()) {
futures.add(executorService.submit(broker::startup));
}
@@ -513,6 +516,10 @@ public class KafkaClusterTestKit implements AutoCloseable {
List<Entry<String, Future<?>>> futureEntries = new ArrayList<>();
try {
controllerQuorumVotersFutureManager.close();
+
+ // Note the shutdown order here is chosen to be consistent with
+ // `KafkaRaftServer`. See comments in that class for an explanation.
+
for (Entry<Integer, BrokerServer> entry : brokers.entrySet()) {
int brokerId = entry.getKey();
BrokerServer broker = entry.getValue();
@@ -521,14 +528,6 @@ public class KafkaClusterTestKit implements AutoCloseable {
}
waitForAllFutures(futureEntries);
futureEntries.clear();
- for (Entry<Integer, ControllerServer> entry : controllers.entrySet()) {
- int controllerId = entry.getKey();
- ControllerServer controller = entry.getValue();
- futureEntries.add(new SimpleImmutableEntry<>("controller" + controllerId,
- executorService.submit(controller::shutdown)));
- }
- waitForAllFutures(futureEntries);
- futureEntries.clear();
for (Entry<Integer, KafkaRaftManager<ApiMessageAndVersion>> entry : raftManagers.entrySet()) {
int raftManagerId = entry.getKey();
KafkaRaftManager<ApiMessageAndVersion> raftManager = entry.getValue();
@@ -537,6 +536,14 @@ public class KafkaClusterTestKit implements AutoCloseable {
}
waitForAllFutures(futureEntries);
futureEntries.clear();
+ for (Entry<Integer, ControllerServer> entry : controllers.entrySet()) {
+ int controllerId = entry.getKey();
+ ControllerServer controller = entry.getValue();
+ futureEntries.add(new SimpleImmutableEntry<>("controller" + controllerId,
+ executorService.submit(controller::shutdown)));
+ }
+ waitForAllFutures(futureEntries);
+ futureEntries.clear();
Utils.delete(baseDirectory);
} catch (Exception e) {
for (Entry<String, Future<?>> entry : futureEntries) {