You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2023/04/14 17:39:16 UTC
[kafka] branch trunk updated: MINOR: fix some flaky KRaft-related tests (#13543) (#13543)
This is an automated email from the ASF dual-hosted git repository.
davidarthur pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new cfd05030061 MINOR: fix some flaky KRaft-related tests (#13543) (#13543)
cfd05030061 is described below
commit cfd0503006127b58bb181076bffaccb947fdd2bd
Author: Colin Patrick McCabe <cm...@apache.org>
AuthorDate: Fri Apr 14 10:39:08 2023 -0700
MINOR: fix some flaky KRaft-related tests (#13543) (#13543)
In SharedServer, fix some cases where a volatile variable could change to null while we were using
it, during shutdown. This is mainly a junit test issue, although it could also cause ugly error
messages during shutdown when running the server in a production context.
Fix a race in KafkaEventQueueTest.testSize.
Reviewers: David Arthur <mu...@gmail.com>
---
.../src/main/scala/kafka/server/SharedServer.scala | 33 +++++++++++-----------
.../apache/kafka/queue/KafkaEventQueueTest.java | 4 +--
2 files changed, 19 insertions(+), 18 deletions(-)
diff --git a/core/src/main/scala/kafka/server/SharedServer.scala b/core/src/main/scala/kafka/server/SharedServer.scala
index ef5f053a8e8..3ba06f36008 100644
--- a/core/src/main/scala/kafka/server/SharedServer.scala
+++ b/core/src/main/scala/kafka/server/SharedServer.scala
@@ -164,8 +164,8 @@ class SharedServer(
name = "metadata loading",
fatal = sharedServerConfig.processRoles.contains(ControllerRole),
action = () => SharedServer.this.synchronized {
- if (brokerMetrics != null) brokerMetrics.metadataLoadErrorCount.getAndIncrement()
- if (controllerServerMetrics != null) controllerServerMetrics.incrementMetadataErrorCount()
+ Option(brokerMetrics).foreach(_.metadataLoadErrorCount.getAndIncrement())
+ Option(controllerServerMetrics).foreach(_.incrementMetadataErrorCount())
snapshotsDiabledReason.compareAndSet(null, "metadata loading fault")
})
@@ -176,7 +176,7 @@ class SharedServer(
name = "controller startup",
fatal = true,
action = () => SharedServer.this.synchronized {
- if (controllerServerMetrics != null) controllerServerMetrics.incrementMetadataErrorCount()
+ Option(controllerServerMetrics).foreach(_.incrementMetadataErrorCount())
snapshotsDiabledReason.compareAndSet(null, "controller startup fault")
})
@@ -187,8 +187,8 @@ class SharedServer(
name = "initial broker metadata loading",
fatal = true,
action = () => SharedServer.this.synchronized {
- if (brokerMetrics != null) brokerMetrics.metadataApplyErrorCount.getAndIncrement()
- if (controllerServerMetrics != null) controllerServerMetrics.incrementMetadataErrorCount()
+ Option(brokerMetrics).foreach(_.metadataApplyErrorCount.getAndIncrement())
+ Option(controllerServerMetrics).foreach(_.incrementMetadataErrorCount())
snapshotsDiabledReason.compareAndSet(null, "initial broker metadata loading fault")
})
@@ -199,7 +199,7 @@ class SharedServer(
name = "quorum controller",
fatal = true,
action = () => SharedServer.this.synchronized {
- if (controllerServerMetrics != null) controllerServerMetrics.incrementMetadataErrorCount()
+ Option(controllerServerMetrics).foreach(_.incrementMetadataErrorCount())
snapshotsDiabledReason.compareAndSet(null, "quorum controller fault")
})
@@ -210,8 +210,8 @@ class SharedServer(
name = "metadata publishing",
fatal = false,
action = () => SharedServer.this.synchronized {
- if (brokerMetrics != null) brokerMetrics.metadataApplyErrorCount.getAndIncrement()
- if (controllerServerMetrics != null) controllerServerMetrics.incrementMetadataErrorCount()
+ Option(brokerMetrics).foreach(_.metadataApplyErrorCount.getAndIncrement())
+ Option(controllerServerMetrics).foreach(_.incrementMetadataErrorCount())
// Note: snapshot generation does not need to be disabled for a publishing fault.
})
@@ -234,7 +234,7 @@ class SharedServer(
if (sharedServerConfig.processRoles.contains(ControllerRole)) {
controllerServerMetrics = new ControllerMetadataMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry()))
}
- raftManager = new KafkaRaftManager[ApiMessageAndVersion](
+ val _raftManager = new KafkaRaftManager[ApiMessageAndVersion](
metaProps,
sharedServerConfig,
new MetadataRecordSerde,
@@ -246,21 +246,22 @@ class SharedServer(
controllerQuorumVotersFuture,
raftManagerFaultHandler
)
- raftManager.startup()
+ raftManager = _raftManager
+ _raftManager.startup()
val loaderBuilder = new MetadataLoader.Builder().
setNodeId(metaProps.nodeId).
setTime(time).
setThreadNamePrefix(s"kafka-${sharedServerConfig.nodeId}-").
setFaultHandler(metadataLoaderFaultHandler).
- setHighWaterMarkAccessor(() => raftManager.client.highWatermark())
+ setHighWaterMarkAccessor(() => _raftManager.client.highWatermark())
if (brokerMetrics != null) {
loaderBuilder.setMetadataLoaderMetrics(brokerMetrics)
}
loader = loaderBuilder.build()
snapshotEmitter = new SnapshotEmitter.Builder().
setNodeId(metaProps.nodeId).
- setRaftClient(raftManager.client).
+ setRaftClient(_raftManager.client).
build()
snapshotGenerator = new SnapshotGenerator.Builder(snapshotEmitter).
setNodeId(metaProps.nodeId).
@@ -271,7 +272,7 @@ class SharedServer(
setDisabledReason(snapshotsDiabledReason).
setThreadNamePrefix(s"kafka-${sharedServerConfig.nodeId}-").
build()
- raftManager.register(loader)
+ _raftManager.register(loader)
try {
loader.installPublishers(Collections.singletonList(snapshotGenerator))
} catch {
@@ -294,10 +295,10 @@ class SharedServer(
def ensureNotRaftLeader(): Unit = synchronized {
// Ideally, this would just resign our leadership, if we had it. But we don't have an API in
// RaftManager for that yet, so shut down the RaftManager.
- if (raftManager != null) {
- CoreUtils.swallow(raftManager.shutdown(), this)
+ Option(raftManager).foreach(_raftManager => {
+ CoreUtils.swallow(_raftManager.shutdown(), this)
raftManager = null
- }
+ })
}
private def stop(): Unit = synchronized {
diff --git a/server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java b/server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java
index 7310cff6375..3c0aa63661e 100644
--- a/server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java
+++ b/server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java
@@ -267,7 +267,7 @@ public class KafkaEventQueueTest {
assertFalse(queue.isEmpty());
queue.cancelDeferred("later");
queue.cancelDeferred("soon");
- assertTrue(queue.isEmpty());
+ TestUtils.waitForCondition(() -> queue.isEmpty(), "Failed to see the queue become empty.");
queue.close();
assertTrue(queue.isEmpty());
}
@@ -412,4 +412,4 @@ public class KafkaEventQueueTest {
assertEquals(InterruptedException.class, ieTrapper2.exception.get().getClass());
queue.close();
}
-}
\ No newline at end of file
+}