You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/08/24 23:10:23 UTC

[GitHub] [kafka] jsancio commented on a change in pull request #11186: KAFKA-13162: Ensure ElectLeaders is properly handled in KRaft

jsancio commented on a change in pull request #11186:
URL: https://github.com/apache/kafka/pull/11186#discussion_r687906243



##########
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##########
@@ -90,8 +90,7 @@ class BrokerServer(
 
   this.logIdent = logContext.logPrefix
 
-  val lifecycleManager: BrokerLifecycleManager =
-    new BrokerLifecycleManager(config, time, threadNamePrefix)
+  private var lifecycleManager: BrokerLifecycleManager = null

Review comment:
       Why did we move this to `startup`? We seem to check for `null` in the `shutdown` method for a few of these fields, should do the same for this field?

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java
##########
@@ -103,20 +103,22 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         ApiError apiError = ApiError.fromThrowable(e);
         List<ReplicaElectionResult> electionResults = new ArrayList<>();
 
-        for (TopicPartitions topic : data.topicPartitions()) {
-            ReplicaElectionResult electionResult = new ReplicaElectionResult();
+        if (data.topicPartitions() != null) {

Review comment:
       When is `data.topicPartitions()` `null`?

##########
File path: core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
##########
@@ -86,44 +79,22 @@ abstract class ZooKeeperTestHarness extends Logging {
 object ZooKeeperTestHarness {
   val ZkClientEventThreadSuffix = "-EventThread"
 
-  // Threads which may cause transient failures in subsequent tests if not shutdown.
-  // These include threads which make connections to brokers and may cause issues
-  // when broker ports are reused (e.g. auto-create topics) as well as threads
-  // which reset static JAAS configuration.

Review comment:
       I see. Maybe move this comment to the test utility function you created.

##########
File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala
##########
@@ -1914,4 +1933,26 @@ object TestUtils extends Logging {
     )
   }
 
+  def verifyNoUnexpectedThreads(context: String): Unit = {
+    val unexpectedThreadNames = Set(
+      ControllerEventManager.ControllerEventThreadName,
+      KafkaProducer.NETWORK_THREAD_PREFIX,
+      AdminClientUnitTestEnv.kafkaAdminClientNetworkThreadPrefix(),
+      AbstractCoordinator.HEARTBEAT_THREAD_PREFIX,
+      ZooKeeperTestHarness.ZkClientEventThreadSuffix
+    )

Review comment:
       Is there some insight into why this specific set of threads?

##########
File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala
##########
@@ -1914,4 +1933,26 @@ object TestUtils extends Logging {
     )
   }
 
+  def verifyNoUnexpectedThreads(context: String): Unit = {
+    val unexpectedThreadNames = Set(
+      ControllerEventManager.ControllerEventThreadName,
+      KafkaProducer.NETWORK_THREAD_PREFIX,
+      AdminClientUnitTestEnv.kafkaAdminClientNetworkThreadPrefix(),
+      AbstractCoordinator.HEARTBEAT_THREAD_PREFIX,
+      ZooKeeperTestHarness.ZkClientEventThreadSuffix
+    )
+
+    def unexpectedThreads: Set[String] = {
+      val allThreads = Thread.getAllStackTraces.keySet.asScala.map(thread => thread.getName)
+      allThreads.filter(t => unexpectedThreadNames.exists(s => t.contains(s))).toSet
+    }
+
+    def printUnexpectedThreads: String = {
+      val unexpected = unexpectedThreads
+      s"Found ${unexpected.size} unexpected threads during $context: ${unexpected.mkString("`", ",", "`")}"
+    }
+
+    TestUtils.waitUntilTrue(() => unexpectedThreads.isEmpty, printUnexpectedThreads)

Review comment:
       Probably unlikely to cause any issues in the thread but the set of threads check is different from the set of threads printed. Maybe we can use `computeUntilTrue`.

##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -123,8 +123,8 @@
 public class ReplicationControlManager {
 
     static class TopicControlInfo {
-        private final String name;
-        private final Uuid id;
+        final String name;
+        final Uuid id;

Review comment:
       How about adding access methods instead? The nice thing about access method is that we can easily use them as functions. E.g. `Optional<Uuid> id = maybeTopicControlInfo.map(TopicControlInfo::id);`

##########
File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala
##########
@@ -1622,19 +1623,37 @@ object TestUtils extends Logging {
     waitForLeaderToBecome(client, topicPartition, None)
   }
 
-  def waitForLeaderToBecome(client: Admin, topicPartition: TopicPartition, leader: Option[Int]): Unit = {
+  def waitForOnlineBroker(client: Admin, brokerId: Int): Unit = {
+    waitUntilTrue(() => {
+      val nodes = client.describeCluster().nodes().get()
+      nodes.asScala.exists(_.id == brokerId)
+    }, s"Timed out waiting for brokerId $brokerId to come online")
+  }
+
+  def waitForLeaderToBecome(
+    client: Admin,
+    topicPartition: TopicPartition,
+    expectedLeaderOpt: Option[Int]
+  ): Unit = {
     val topic = topicPartition.topic
-    val partition = topicPartition.partition
+    val partitionId = topicPartition.partition
+
+    def currentLeader: Try[Option[Int]] = Try {
+      val topicDescription = client.describeTopics(List(topic).asJava).all.get.get(topic)
+      topicDescription.partitions.asScala
+        .find(_.partition == partitionId)
+        .flatMap(partitionState => Option(partitionState.leader))

Review comment:
       We use `flatMap` because `partitionState.leader` can be `null`?

##########
File path: core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
##########
@@ -189,17 +197,39 @@ public void start() {
         @Override
         public void stop() {
             if (stopped.compareAndSet(false, true)) {
-                try {
-                    clusterReference.get().close();
-                } catch (Exception e) {
-                    throw new RuntimeException("Failed to stop Raft server", e);
-                }
+
+                Utils.closeQuietly(clusterReference.get(), "cluster");
+                admins.forEach(admin -> Utils.closeQuietly(admin, "admin"));

Review comment:
       Should we close the admin clients first so that we have cleaner log messages when debugging tests?

##########
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##########
@@ -178,6 +179,8 @@ class BrokerServer(
     try {
       info("Starting broker")
 
+      lifecycleManager = new BrokerLifecycleManager(config, time, threadNamePrefix)
+

Review comment:
       This comment applies to the methods `currentState()` and `boundPort()`.
   
   Both methods don't grab a lock. I think we need to change them to grab a lock or change the used fields (`sockerServer` and `lifecycleManager`) to `@volatile`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org