You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2023/05/04 07:23:07 UTC
[kafka] branch 3.4 updated: KAFKA-14644: Process should crash after failure in Raft IO thread (#13140)
This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.4 by this push:
new c81795692fd KAFKA-14644: Process should crash after failure in Raft IO thread (#13140)
c81795692fd is described below
commit c81795692fd24ece649ba7c722f70f4dd3a5e978
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Wed Jan 25 09:41:38 2023 -0800
KAFKA-14644: Process should crash after failure in Raft IO thread (#13140)
Unexpected errors caught in the Raft IO thread should cause the process to stop. This is similar to the handling of exceptions in the controller.
Reviewers: Colin P. McCabe <cm...@apache.org>
---
core/src/main/scala/kafka/raft/RaftManager.scala | 18 ++++++++++++++----
core/src/main/scala/kafka/server/KafkaServer.scala | 4 +++-
core/src/main/scala/kafka/server/SharedServer.scala | 10 +++++++++-
core/src/main/scala/kafka/tools/TestRaftServer.scala | 5 +++--
.../test/scala/unit/kafka/raft/RaftManagerTest.scala | 17 +++++++++++++----
5 files changed, 42 insertions(+), 12 deletions(-)
diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala
index bbb31806c3b..33dc1dc1e26 100644
--- a/core/src/main/scala/kafka/raft/RaftManager.scala
+++ b/core/src/main/scala/kafka/raft/RaftManager.scala
@@ -47,18 +47,27 @@ import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.raft.RaftConfig.{AddressSpec, InetAddressSpec, NON_ROUTABLE_ADDRESS, UnknownAddressSpec}
import org.apache.kafka.raft.{FileBasedStateStore, KafkaRaftClient, LeaderAndEpoch, RaftClient, RaftConfig, RaftRequest, ReplicatedLog}
import org.apache.kafka.server.common.serialization.RecordSerde
+import org.apache.kafka.server.util.KafkaScheduler
+import org.apache.kafka.server.fault.FaultHandler
+
import scala.jdk.CollectionConverters._
object KafkaRaftManager {
class RaftIoThread(
client: KafkaRaftClient[_],
- threadNamePrefix: String
+ threadNamePrefix: String,
+ fatalFaultHandler: FaultHandler
) extends ShutdownableThread(
name = threadNamePrefix + "-io-thread",
isInterruptible = false
) {
override def doWork(): Unit = {
- client.poll()
+ try {
+ client.poll()
+ } catch {
+ case t: Throwable =>
+ throw fatalFaultHandler.handleFault("Unexpected error in raft IO thread", t)
+ }
}
override def initiateShutdown(): Boolean = {
@@ -129,7 +138,8 @@ class KafkaRaftManager[T](
time: Time,
metrics: Metrics,
threadNamePrefixOpt: Option[String],
- val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]]
+ val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]],
+ fatalFaultHandler: FaultHandler
) extends RaftManager[T] with Logging {
val apiVersions = new ApiVersions()
@@ -164,7 +174,7 @@ class KafkaRaftManager[T](
private val expirationTimer = new SystemTimer("raft-expiration-executor")
private val expirationService = new TimingWheelExpirationService(expirationTimer)
override val client: KafkaRaftClient[T] = buildRaftClient()
- private val raftIoThread = new RaftIoThread(client, threadNamePrefix)
+ private val raftIoThread = new RaftIoThread(client, threadNamePrefix, fatalFaultHandler)
def startup(): Unit = {
// Update the voter endpoints (if valid) with what's in RaftConfig
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 5a951f77ada..d602e53fe7d 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -53,6 +53,7 @@ import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
import org.apache.kafka.server.common.MetadataVersion._
+import org.apache.kafka.server.fault.ProcessExitingFaultHandler
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.zookeeper.client.ZKClientConfig
@@ -386,7 +387,8 @@ class KafkaServer(
time,
metrics,
threadNamePrefix,
- controllerQuorumVotersFuture
+ controllerQuorumVotersFuture,
+ fatalFaultHandler = new ProcessExitingFaultHandler()
)
val controllerNodes = RaftConfig.voterConnectionsToNodes(controllerQuorumVotersFuture.get()).asScala
val quorumControllerNodeProvider = RaftControllerNodeProvider(raftManager, config, controllerNodes)
diff --git a/core/src/main/scala/kafka/server/SharedServer.scala b/core/src/main/scala/kafka/server/SharedServer.scala
index cb0bf5430e6..97d4a0e470f 100644
--- a/core/src/main/scala/kafka/server/SharedServer.scala
+++ b/core/src/main/scala/kafka/server/SharedServer.scala
@@ -152,6 +152,12 @@ class SharedServer(
}
}
+ def raftManagerFaultHandler: FaultHandler = faultHandlerFactory.build(
+ name = "raft manager",
+ fatal = true,
+ action = () => {}
+ )
+
/**
* The fault handler to use when metadata loading fails.
*/
@@ -227,7 +233,9 @@ class SharedServer(
time,
metrics,
threadNamePrefix,
- controllerQuorumVotersFuture)
+ controllerQuorumVotersFuture,
+ raftManagerFaultHandler
+ )
raftManager.startup()
if (sharedServerConfig.processRoles.contains(ControllerRole)) {
diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala b/core/src/main/scala/kafka/tools/TestRaftServer.scala
index b3e95fc9bd8..47ba5a747e1 100644
--- a/core/src/main/scala/kafka/tools/TestRaftServer.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala
@@ -19,7 +19,6 @@ package kafka.tools
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
import java.util.concurrent.{CompletableFuture, CountDownLatch, LinkedBlockingDeque, TimeUnit}
-
import joptsimple.OptionException
import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.raft.{KafkaRaftManager, RaftManager}
@@ -39,6 +38,7 @@ import org.apache.kafka.common.{TopicPartition, Uuid, protocol}
import org.apache.kafka.raft.errors.NotLeaderException
import org.apache.kafka.raft.{Batch, BatchReader, LeaderAndEpoch, RaftClient, RaftConfig}
import org.apache.kafka.server.common.serialization.RecordSerde
+import org.apache.kafka.server.fault.ProcessExitingFaultHandler
import org.apache.kafka.snapshot.SnapshotReader
import scala.jdk.CollectionConverters._
@@ -90,7 +90,8 @@ class TestRaftServer(
time,
metrics,
Some(threadNamePrefix),
- CompletableFuture.completedFuture(RaftConfig.parseVoterConnections(config.quorumVoters))
+ CompletableFuture.completedFuture(RaftConfig.parseVoterConnections(config.quorumVoters)),
+ new ProcessExitingFaultHandler()
)
workloadGenerator = new RaftWorkloadGenerator(
diff --git a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
index 9907c802d2b..5d0dc6b1d53 100644
--- a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
@@ -41,6 +41,7 @@ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
+import org.apache.kafka.server.fault.{FaultHandler, MockFaultHandler}
import org.mockito.Mockito._
class RaftManagerTest {
@@ -96,7 +97,8 @@ class RaftManagerTest {
Time.SYSTEM,
new Metrics(Time.SYSTEM),
Option.empty,
- CompletableFuture.completedFuture(RaftConfig.parseVoterConnections(config.quorumVoters))
+ CompletableFuture.completedFuture(RaftConfig.parseVoterConnections(config.quorumVoters)),
+ mock(classOf[FaultHandler])
)
}
@@ -198,7 +200,8 @@ class RaftManagerTest {
@Test
def testShutdownIoThread(): Unit = {
val raftClient = mock(classOf[KafkaRaftClient[String]])
- val ioThread = new RaftIoThread(raftClient, threadNamePrefix = "test-raft")
+ val faultHandler = new MockFaultHandler("RaftManagerTestFaultHandler")
+ val ioThread = new RaftIoThread(raftClient, threadNamePrefix = "test-raft", faultHandler)
when(raftClient.isRunning).thenReturn(true)
assertTrue(ioThread.isRunning)
@@ -216,21 +219,27 @@ class RaftManagerTest {
ioThread.run()
assertFalse(ioThread.isRunning)
assertTrue(ioThread.isShutdownComplete)
+ assertNull(faultHandler.firstException)
}
@Test
def testUncaughtExceptionInIoThread(): Unit = {
val raftClient = mock(classOf[KafkaRaftClient[String]])
- val ioThread = new RaftIoThread(raftClient, threadNamePrefix = "test-raft")
+ val faultHandler = new MockFaultHandler("RaftManagerTestFaultHandler")
+ val ioThread = new RaftIoThread(raftClient, threadNamePrefix = "test-raft", faultHandler)
when(raftClient.isRunning).thenReturn(true)
assertTrue(ioThread.isRunning)
- when(raftClient.poll()).thenThrow(new RuntimeException)
+ val exception = new RuntimeException()
+ when(raftClient.poll()).thenThrow(exception)
ioThread.run()
assertTrue(ioThread.isShutdownComplete)
assertTrue(ioThread.isThreadFailed)
assertFalse(ioThread.isRunning)
+
+ val caughtException = faultHandler.firstException.getCause
+ assertEquals(exception, caughtException)
}
}