You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2023/05/04 07:23:07 UTC

[kafka] branch 3.4 updated: KAFKA-14644: Process should crash after failure in Raft IO thread (#13140)

This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.4 by this push:
     new c81795692fd KAFKA-14644: Process should crash after failure in Raft IO thread (#13140)
c81795692fd is described below

commit c81795692fd24ece649ba7c722f70f4dd3a5e978
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Wed Jan 25 09:41:38 2023 -0800

    KAFKA-14644: Process should crash after failure in Raft IO thread (#13140)
    
    Unexpected errors caught in the Raft IO thread should cause the process to stop. This is similar to the handling of exceptions in the controller.
    
    Reviewers: Colin P. McCabe <cm...@apache.org>
---
 core/src/main/scala/kafka/raft/RaftManager.scala       | 18 ++++++++++++++----
 core/src/main/scala/kafka/server/KafkaServer.scala     |  4 +++-
 core/src/main/scala/kafka/server/SharedServer.scala    | 10 +++++++++-
 core/src/main/scala/kafka/tools/TestRaftServer.scala   |  5 +++--
 .../test/scala/unit/kafka/raft/RaftManagerTest.scala   | 17 +++++++++++++----
 5 files changed, 42 insertions(+), 12 deletions(-)

diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala
index bbb31806c3b..33dc1dc1e26 100644
--- a/core/src/main/scala/kafka/raft/RaftManager.scala
+++ b/core/src/main/scala/kafka/raft/RaftManager.scala
@@ -47,18 +47,27 @@ import org.apache.kafka.common.utils.{LogContext, Time}
 import org.apache.kafka.raft.RaftConfig.{AddressSpec, InetAddressSpec, NON_ROUTABLE_ADDRESS, UnknownAddressSpec}
 import org.apache.kafka.raft.{FileBasedStateStore, KafkaRaftClient, LeaderAndEpoch, RaftClient, RaftConfig, RaftRequest, ReplicatedLog}
 import org.apache.kafka.server.common.serialization.RecordSerde
+import org.apache.kafka.server.util.KafkaScheduler
+import org.apache.kafka.server.fault.FaultHandler
+
 import scala.jdk.CollectionConverters._
 
 object KafkaRaftManager {
   class RaftIoThread(
     client: KafkaRaftClient[_],
-    threadNamePrefix: String
+    threadNamePrefix: String,
+    fatalFaultHandler: FaultHandler
   ) extends ShutdownableThread(
     name = threadNamePrefix + "-io-thread",
     isInterruptible = false
   ) {
     override def doWork(): Unit = {
-      client.poll()
+      try {
+        client.poll()
+      } catch {
+        case t: Throwable =>
+          throw fatalFaultHandler.handleFault("Unexpected error in raft IO thread", t)
+      }
     }
 
     override def initiateShutdown(): Boolean = {
@@ -129,7 +138,8 @@ class KafkaRaftManager[T](
   time: Time,
   metrics: Metrics,
   threadNamePrefixOpt: Option[String],
-  val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]]
+  val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]],
+  fatalFaultHandler: FaultHandler
 ) extends RaftManager[T] with Logging {
 
   val apiVersions = new ApiVersions()
@@ -164,7 +174,7 @@ class KafkaRaftManager[T](
   private val expirationTimer = new SystemTimer("raft-expiration-executor")
   private val expirationService = new TimingWheelExpirationService(expirationTimer)
   override val client: KafkaRaftClient[T] = buildRaftClient()
-  private val raftIoThread = new RaftIoThread(client, threadNamePrefix)
+  private val raftIoThread = new RaftIoThread(client, threadNamePrefix, fatalFaultHandler)
 
   def startup(): Unit = {
     // Update the voter endpoints (if valid) with what's in RaftConfig
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 5a951f77ada..d602e53fe7d 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -53,6 +53,7 @@ import org.apache.kafka.raft.RaftConfig
 import org.apache.kafka.server.authorizer.Authorizer
 import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
 import org.apache.kafka.server.common.MetadataVersion._
+import org.apache.kafka.server.fault.ProcessExitingFaultHandler
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.apache.zookeeper.client.ZKClientConfig
 
@@ -386,7 +387,8 @@ class KafkaServer(
             time,
             metrics,
             threadNamePrefix,
-            controllerQuorumVotersFuture
+            controllerQuorumVotersFuture,
+            fatalFaultHandler = new ProcessExitingFaultHandler()
           )
           val controllerNodes = RaftConfig.voterConnectionsToNodes(controllerQuorumVotersFuture.get()).asScala
           val quorumControllerNodeProvider = RaftControllerNodeProvider(raftManager, config, controllerNodes)
diff --git a/core/src/main/scala/kafka/server/SharedServer.scala b/core/src/main/scala/kafka/server/SharedServer.scala
index cb0bf5430e6..97d4a0e470f 100644
--- a/core/src/main/scala/kafka/server/SharedServer.scala
+++ b/core/src/main/scala/kafka/server/SharedServer.scala
@@ -152,6 +152,12 @@ class SharedServer(
     }
   }
 
+  def raftManagerFaultHandler: FaultHandler = faultHandlerFactory.build(
+    name = "raft manager",
+    fatal = true,
+    action = () => {}
+  )
+
   /**
    * The fault handler to use when metadata loading fails.
    */
@@ -227,7 +233,9 @@ class SharedServer(
           time,
           metrics,
           threadNamePrefix,
-          controllerQuorumVotersFuture)
+          controllerQuorumVotersFuture,
+          raftManagerFaultHandler
+        )
         raftManager.startup()
 
         if (sharedServerConfig.processRoles.contains(ControllerRole)) {
diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala b/core/src/main/scala/kafka/tools/TestRaftServer.scala
index b3e95fc9bd8..47ba5a747e1 100644
--- a/core/src/main/scala/kafka/tools/TestRaftServer.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala
@@ -19,7 +19,6 @@ package kafka.tools
 
 import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
 import java.util.concurrent.{CompletableFuture, CountDownLatch, LinkedBlockingDeque, TimeUnit}
-
 import joptsimple.OptionException
 import kafka.network.{DataPlaneAcceptor, SocketServer}
 import kafka.raft.{KafkaRaftManager, RaftManager}
@@ -39,6 +38,7 @@ import org.apache.kafka.common.{TopicPartition, Uuid, protocol}
 import org.apache.kafka.raft.errors.NotLeaderException
 import org.apache.kafka.raft.{Batch, BatchReader, LeaderAndEpoch, RaftClient, RaftConfig}
 import org.apache.kafka.server.common.serialization.RecordSerde
+import org.apache.kafka.server.fault.ProcessExitingFaultHandler
 import org.apache.kafka.snapshot.SnapshotReader
 
 import scala.jdk.CollectionConverters._
@@ -90,7 +90,8 @@ class TestRaftServer(
       time,
       metrics,
       Some(threadNamePrefix),
-      CompletableFuture.completedFuture(RaftConfig.parseVoterConnections(config.quorumVoters))
+      CompletableFuture.completedFuture(RaftConfig.parseVoterConnections(config.quorumVoters)),
+      new ProcessExitingFaultHandler()
     )
 
     workloadGenerator = new RaftWorkloadGenerator(
diff --git a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
index 9907c802d2b..5d0dc6b1d53 100644
--- a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
@@ -41,6 +41,7 @@ import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
+import org.apache.kafka.server.fault.{FaultHandler, MockFaultHandler}
 import org.mockito.Mockito._
 
 class RaftManagerTest {
@@ -96,7 +97,8 @@ class RaftManagerTest {
       Time.SYSTEM,
       new Metrics(Time.SYSTEM),
       Option.empty,
-      CompletableFuture.completedFuture(RaftConfig.parseVoterConnections(config.quorumVoters))
+      CompletableFuture.completedFuture(RaftConfig.parseVoterConnections(config.quorumVoters)),
+      mock(classOf[FaultHandler])
     )
   }
 
@@ -198,7 +200,8 @@ class RaftManagerTest {
   @Test
   def testShutdownIoThread(): Unit = {
     val raftClient = mock(classOf[KafkaRaftClient[String]])
-    val ioThread = new RaftIoThread(raftClient, threadNamePrefix = "test-raft")
+    val faultHandler = new MockFaultHandler("RaftManagerTestFaultHandler")
+    val ioThread = new RaftIoThread(raftClient, threadNamePrefix = "test-raft", faultHandler)
 
     when(raftClient.isRunning).thenReturn(true)
     assertTrue(ioThread.isRunning)
@@ -216,21 +219,27 @@ class RaftManagerTest {
     ioThread.run()
     assertFalse(ioThread.isRunning)
     assertTrue(ioThread.isShutdownComplete)
+    assertNull(faultHandler.firstException)
   }
 
   @Test
   def testUncaughtExceptionInIoThread(): Unit = {
     val raftClient = mock(classOf[KafkaRaftClient[String]])
-    val ioThread = new RaftIoThread(raftClient, threadNamePrefix = "test-raft")
+    val faultHandler = new MockFaultHandler("RaftManagerTestFaultHandler")
+    val ioThread = new RaftIoThread(raftClient, threadNamePrefix = "test-raft", faultHandler)
 
     when(raftClient.isRunning).thenReturn(true)
     assertTrue(ioThread.isRunning)
 
-    when(raftClient.poll()).thenThrow(new RuntimeException)
+    val exception = new RuntimeException()
+    when(raftClient.poll()).thenThrow(exception)
     ioThread.run()
 
     assertTrue(ioThread.isShutdownComplete)
     assertTrue(ioThread.isThreadFailed)
     assertFalse(ioThread.isRunning)
+
+    val caughtException = faultHandler.firstException.getCause
+    assertEquals(exception, caughtException)
   }
 }