You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2023/04/14 17:39:16 UTC

[kafka] branch trunk updated: MINOR: fix some flaky KRaft-related tests (#13543) (#13543)

This is an automated email from the ASF dual-hosted git repository.

davidarthur pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cfd05030061 MINOR: fix some flaky KRaft-related tests (#13543) (#13543)
cfd05030061 is described below

commit cfd0503006127b58bb181076bffaccb947fdd2bd
Author: Colin Patrick McCabe <cm...@apache.org>
AuthorDate: Fri Apr 14 10:39:08 2023 -0700

    MINOR: fix some flaky KRaft-related tests (#13543) (#13543)
    
    In SharedServer, fix some cases where a volatile variable could change to null while we were using
    it, during shutdown. This is mainly a junit test issue, although it could also cause ugly error
    messages during shutdown when running the server in a production context.
    
    Fix a race in KafkaEventQueueTest.testSize.
    
    Reviewers: David Arthur <mu...@gmail.com>
---
 .../src/main/scala/kafka/server/SharedServer.scala | 33 +++++++++++-----------
 .../apache/kafka/queue/KafkaEventQueueTest.java    |  4 +--
 2 files changed, 19 insertions(+), 18 deletions(-)

diff --git a/core/src/main/scala/kafka/server/SharedServer.scala b/core/src/main/scala/kafka/server/SharedServer.scala
index ef5f053a8e8..3ba06f36008 100644
--- a/core/src/main/scala/kafka/server/SharedServer.scala
+++ b/core/src/main/scala/kafka/server/SharedServer.scala
@@ -164,8 +164,8 @@ class SharedServer(
     name = "metadata loading",
     fatal = sharedServerConfig.processRoles.contains(ControllerRole),
     action = () => SharedServer.this.synchronized {
-      if (brokerMetrics != null) brokerMetrics.metadataLoadErrorCount.getAndIncrement()
-      if (controllerServerMetrics != null) controllerServerMetrics.incrementMetadataErrorCount()
+      Option(brokerMetrics).foreach(_.metadataLoadErrorCount.getAndIncrement())
+      Option(controllerServerMetrics).foreach(_.incrementMetadataErrorCount())
       snapshotsDiabledReason.compareAndSet(null, "metadata loading fault")
     })
 
@@ -176,7 +176,7 @@ class SharedServer(
     name = "controller startup",
     fatal = true,
     action = () => SharedServer.this.synchronized {
-      if (controllerServerMetrics != null) controllerServerMetrics.incrementMetadataErrorCount()
+      Option(controllerServerMetrics).foreach(_.incrementMetadataErrorCount())
       snapshotsDiabledReason.compareAndSet(null, "controller startup fault")
     })
 
@@ -187,8 +187,8 @@ class SharedServer(
     name = "initial broker metadata loading",
     fatal = true,
     action = () => SharedServer.this.synchronized {
-      if (brokerMetrics != null) brokerMetrics.metadataApplyErrorCount.getAndIncrement()
-      if (controllerServerMetrics != null) controllerServerMetrics.incrementMetadataErrorCount()
+      Option(brokerMetrics).foreach(_.metadataApplyErrorCount.getAndIncrement())
+      Option(controllerServerMetrics).foreach(_.incrementMetadataErrorCount())
       snapshotsDiabledReason.compareAndSet(null, "initial broker metadata loading fault")
     })
 
@@ -199,7 +199,7 @@ class SharedServer(
     name = "quorum controller",
     fatal = true,
     action = () => SharedServer.this.synchronized {
-      if (controllerServerMetrics != null) controllerServerMetrics.incrementMetadataErrorCount()
+      Option(controllerServerMetrics).foreach(_.incrementMetadataErrorCount())
       snapshotsDiabledReason.compareAndSet(null, "quorum controller fault")
     })
 
@@ -210,8 +210,8 @@ class SharedServer(
     name = "metadata publishing",
     fatal = false,
     action = () => SharedServer.this.synchronized {
-      if (brokerMetrics != null) brokerMetrics.metadataApplyErrorCount.getAndIncrement()
-      if (controllerServerMetrics != null) controllerServerMetrics.incrementMetadataErrorCount()
+      Option(brokerMetrics).foreach(_.metadataApplyErrorCount.getAndIncrement())
+      Option(controllerServerMetrics).foreach(_.incrementMetadataErrorCount())
       // Note: snapshot generation does not need to be disabled for a publishing fault.
     })
 
@@ -234,7 +234,7 @@ class SharedServer(
         if (sharedServerConfig.processRoles.contains(ControllerRole)) {
           controllerServerMetrics = new ControllerMetadataMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry()))
         }
-        raftManager = new KafkaRaftManager[ApiMessageAndVersion](
+        val _raftManager = new KafkaRaftManager[ApiMessageAndVersion](
           metaProps,
           sharedServerConfig,
           new MetadataRecordSerde,
@@ -246,21 +246,22 @@ class SharedServer(
           controllerQuorumVotersFuture,
           raftManagerFaultHandler
         )
-        raftManager.startup()
+        raftManager = _raftManager
+        _raftManager.startup()
 
         val loaderBuilder = new MetadataLoader.Builder().
           setNodeId(metaProps.nodeId).
           setTime(time).
           setThreadNamePrefix(s"kafka-${sharedServerConfig.nodeId}-").
           setFaultHandler(metadataLoaderFaultHandler).
-          setHighWaterMarkAccessor(() => raftManager.client.highWatermark())
+          setHighWaterMarkAccessor(() => _raftManager.client.highWatermark())
         if (brokerMetrics != null) {
           loaderBuilder.setMetadataLoaderMetrics(brokerMetrics)
         }
         loader = loaderBuilder.build()
         snapshotEmitter = new SnapshotEmitter.Builder().
           setNodeId(metaProps.nodeId).
-          setRaftClient(raftManager.client).
+          setRaftClient(_raftManager.client).
           build()
         snapshotGenerator = new SnapshotGenerator.Builder(snapshotEmitter).
           setNodeId(metaProps.nodeId).
@@ -271,7 +272,7 @@ class SharedServer(
           setDisabledReason(snapshotsDiabledReason).
           setThreadNamePrefix(s"kafka-${sharedServerConfig.nodeId}-").
           build()
-        raftManager.register(loader)
+        _raftManager.register(loader)
         try {
           loader.installPublishers(Collections.singletonList(snapshotGenerator))
         } catch {
@@ -294,10 +295,10 @@ class SharedServer(
   def ensureNotRaftLeader(): Unit = synchronized {
     // Ideally, this would just resign our leadership, if we had it. But we don't have an API in
     // RaftManager for that yet, so shut down the RaftManager.
-    if (raftManager != null) {
-      CoreUtils.swallow(raftManager.shutdown(), this)
+    Option(raftManager).foreach(_raftManager => {
+      CoreUtils.swallow(_raftManager.shutdown(), this)
       raftManager = null
-    }
+    })
   }
 
   private def stop(): Unit = synchronized {
diff --git a/server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java b/server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java
index 7310cff6375..3c0aa63661e 100644
--- a/server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java
+++ b/server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java
@@ -267,7 +267,7 @@ public class KafkaEventQueueTest {
         assertFalse(queue.isEmpty());
         queue.cancelDeferred("later");
         queue.cancelDeferred("soon");
-        assertTrue(queue.isEmpty());
+        TestUtils.waitForCondition(() -> queue.isEmpty(), "Failed to see the queue become empty.");
         queue.close();
         assertTrue(queue.isEmpty());
     }
@@ -412,4 +412,4 @@ public class KafkaEventQueueTest {
         assertEquals(InterruptedException.class, ieTrapper2.exception.get().getClass());
         queue.close();
     }
-}
\ No newline at end of file
+}