You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/08/06 19:07:21 UTC

[GitHub] [kafka] hachikuji opened a new pull request #11186: KAFKA-13162: Ensure ElectLeaders is forwarded to controller

hachikuji opened a new pull request #11186:
URL: https://github.com/apache/kafka/pull/11186


   This patch fixes several problems with the `ElectLeaders` API in KRaft:
   
   - `KafkaApis` did not properly forward this request type to the controller.
   - `ControllerApis` did not handle the request type.
   - `ElectLeadersRequest.getErrorResponse` may raise NPE when `TopicPartitions` is null.
   - Controller should not do preferred election if `ElectLeaders` specifies `UNCLEAN` election.
   - Controller should not do unclean election if `ElectLeaders` specifies `PREFERRED` election.
   
   To test this patch, I have converted `LeaderElectionCommandTest` to use KRaft. These test cases are flaky at the moment because of https://issues.apache.org/jira/browse/KAFKA-13161, so we will need to merge that patch first. Additionally, I've implemented a workaround for https://issues.apache.org/jira/browse/KAFKA-13173, so this patch should probably also follow that one. I have also added test coverage to `KafkaApisTest` and `ControllerApisTest` for the fixed behavior and I have added some tests to `ReplicationControlManagerTest` for uncleaned election.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11186: KAFKA-13162: Ensure ElectLeaders is properly handled in KRaft

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11186:
URL: https://github.com/apache/kafka/pull/11186#discussion_r700614819



##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -856,21 +867,24 @@ ApiError electLeader(String topic, int partitionId, boolean uncleanOk,
             return new ApiError(UNKNOWN_TOPIC_OR_PARTITION,
                 "No such partition as " + topic + "-" + partitionId);
         }
+        if ((electionType == ElectionType.PREFERRED && partition.hasPreferredLeader())
+            || (electionType == ElectionType.UNCLEAN && partition.hasLeader())) {
+            return new ApiError(Errors.ELECTION_NOT_NEEDED);
+        }
+
         PartitionChangeBuilder builder = new PartitionChangeBuilder(partition,
             topicId,
             partitionId,
             r -> clusterControl.unfenced(r),
-            () -> uncleanOk || configurationControl.uncleanLeaderElectionEnabledForTopic(topic));
-        builder.setAlwaysElectPreferredIfPossible(true);
+            () -> electionType == ElectionType.UNCLEAN);
+
+        builder.setAlwaysElectPreferredIfPossible(electionType == ElectionType.PREFERRED);

Review comment:
       To be honest, it wasn't clear to me why we passed the unclean flag through a function while we use the builder to specify whether preferred election is desired. It would be nice if we could pass through the `ElectionType` directly to make the pattern more consistent.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11186: KAFKA-13162: Ensure ElectLeaders is properly handled in KRaft

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11186:
URL: https://github.com/apache/kafka/pull/11186#discussion_r700620002



##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -804,7 +817,7 @@ void handleBrokerUnfenced(int brokerId, long brokerEpoch, List<ApiMessageAndVers
                 TopicControlInfo topic = topics.get(topicEntry.getValue());
                 if (topic != null) {
                     for (int partitionId : topic.parts.keySet()) {
-                        ApiError error = electLeader(topicName, partitionId, uncleanOk, records);
+                        ApiError error = electLeader(topicName, partitionId, electionType, records);

Review comment:
       Good point. I've updated the code to handle this case. It is covered in the unit tests, so perhaps that is good enough? It would be difficult to test through `LeaderElectionCommandTest`, so we would need an `ElectLeadersRequestTest` if we want an integration test.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11186: KAFKA-13162: Ensure ElectLeaders is properly handled in KRaft

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11186:
URL: https://github.com/apache/kafka/pull/11186#discussion_r696116190



##########
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##########
@@ -90,8 +90,7 @@ class BrokerServer(
 
   this.logIdent = logContext.logPrefix
 
-  val lifecycleManager: BrokerLifecycleManager =
-    new BrokerLifecycleManager(config, time, threadNamePrefix)
+  private var lifecycleManager: BrokerLifecycleManager = null

Review comment:
       This was a workaround to allow `BrokerServer` to be restartable in the same way that `KafkaServer` is. It would be better to let the test kit construct a new instance, but I decided to save that for a follow-up.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11186: KAFKA-13162: Ensure ElectLeaders is properly handled in KRaft

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11186:
URL: https://github.com/apache/kafka/pull/11186#discussion_r696115948



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java
##########
@@ -103,20 +103,22 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         ApiError apiError = ApiError.fromThrowable(e);
         List<ReplicaElectionResult> electionResults = new ArrayList<>();
 
-        for (TopicPartitions topic : data.topicPartitions()) {
-            ReplicaElectionResult electionResult = new ReplicaElectionResult();
+        if (data.topicPartitions() != null) {

Review comment:
       We use null to indicate election for all partitions.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11186: KAFKA-13162: Ensure ElectLeaders is properly handled in KRaft

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11186:
URL: https://github.com/apache/kafka/pull/11186#discussion_r700614819



##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -856,21 +867,24 @@ ApiError electLeader(String topic, int partitionId, boolean uncleanOk,
             return new ApiError(UNKNOWN_TOPIC_OR_PARTITION,
                 "No such partition as " + topic + "-" + partitionId);
         }
+        if ((electionType == ElectionType.PREFERRED && partition.hasPreferredLeader())
+            || (electionType == ElectionType.UNCLEAN && partition.hasLeader())) {
+            return new ApiError(Errors.ELECTION_NOT_NEEDED);
+        }
+
         PartitionChangeBuilder builder = new PartitionChangeBuilder(partition,
             topicId,
             partitionId,
             r -> clusterControl.unfenced(r),
-            () -> uncleanOk || configurationControl.uncleanLeaderElectionEnabledForTopic(topic));
-        builder.setAlwaysElectPreferredIfPossible(true);
+            () -> electionType == ElectionType.UNCLEAN);
+
+        builder.setAlwaysElectPreferredIfPossible(electionType == ElectionType.PREFERRED);

Review comment:
       To be honest, it wasn't clear to me why we passed the unclean flag through a function while we use the builder to specify whether preferred election is desired. It would be nice if we could pass through the `ElectionType` directly to make the pattern more consistent and I agree that having an `ANY` option might enable that.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11186: KAFKA-13162: Ensure ElectLeaders is properly handled in KRaft

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11186:
URL: https://github.com/apache/kafka/pull/11186#discussion_r708533935



##########
File path: core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
##########
@@ -117,6 +117,8 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
                        delta: MetadataDelta,
                        newImage: MetadataImage): Unit = {
     try {
+      debug(s"Publishing delta $delta with highest offset $newHighestMetadataOffset")

Review comment:
       I decided to make this trace level since it could be a bit noisy in some cases.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #11186: KAFKA-13162: Ensure ElectLeaders is properly handled in KRaft

Posted by GitBox <gi...@apache.org>.
hachikuji merged pull request #11186:
URL: https://github.com/apache/kafka/pull/11186


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #11186: KAFKA-13162: Ensure ElectLeaders is properly handled in KRaft

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #11186:
URL: https://github.com/apache/kafka/pull/11186#discussion_r707674451



##########
File path: core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
##########
@@ -117,6 +117,8 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
                        delta: MetadataDelta,
                        newImage: MetadataImage): Unit = {
     try {
+      debug(s"Publishing delta $delta with highest offset $newHighestMetadataOffset")

Review comment:
       Yea, configs is what I was thinking of. Logging the keys only sounds like a good idea




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #11186: KAFKA-13162: Ensure ElectLeaders is properly handled in KRaft

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #11186:
URL: https://github.com/apache/kafka/pull/11186#discussion_r707654639



##########
File path: core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
##########
@@ -117,6 +117,8 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
                        delta: MetadataDelta,
                        newImage: MetadataImage): Unit = {
     try {
+      debug(s"Publishing delta $delta with highest offset $newHighestMetadataOffset")

Review comment:
       Are there any concerns with the MetadataDelta having a lot of records in it? I'm wondering if this could potentially dump out a lot of text into the debug log.
   
   Also, from a security perspective, is it okay to write the contents of _any_ metadata record to the debug log? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11186: KAFKA-13162: Ensure ElectLeaders is properly handled in KRaft

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11186:
URL: https://github.com/apache/kafka/pull/11186#discussion_r696117735



##########
File path: core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
##########
@@ -189,17 +197,39 @@ public void start() {
         @Override
         public void stop() {
             if (stopped.compareAndSet(false, true)) {
-                try {
-                    clusterReference.get().close();
-                } catch (Exception e) {
-                    throw new RuntimeException("Failed to stop Raft server", e);
-                }
+
+                Utils.closeQuietly(clusterReference.get(), "cluster");
+                admins.forEach(admin -> Utils.closeQuietly(admin, "admin"));

Review comment:
       Makes sense




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11186: KAFKA-13162: Ensure ElectLeaders is properly handled in KRaft

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11186:
URL: https://github.com/apache/kafka/pull/11186#discussion_r707673948



##########
File path: core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
##########
@@ -117,6 +117,8 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
                        delta: MetadataDelta,
                        newImage: MetadataImage): Unit = {
     try {
+      debug(s"Publishing delta $delta with highest offset $newHighestMetadataOffset")

Review comment:
       I found it useful when debugging the failures here, so I thought it would come in handy when debugging system test failures as well. I guess if there were any concerns from a security perspective, it would be about the `ConfigurationDelta`. Perhaps if we want to err on the safe side, we could print the keys only? That might be good enough since it would probably be clear from the test context what the change was. What do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11186: KAFKA-13162: Ensure ElectLeaders is properly handled in KRaft

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11186:
URL: https://github.com/apache/kafka/pull/11186#discussion_r696118985



##########
File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala
##########
@@ -1622,19 +1623,37 @@ object TestUtils extends Logging {
     waitForLeaderToBecome(client, topicPartition, None)
   }
 
-  def waitForLeaderToBecome(client: Admin, topicPartition: TopicPartition, leader: Option[Int]): Unit = {
+  def waitForOnlineBroker(client: Admin, brokerId: Int): Unit = {
+    waitUntilTrue(() => {
+      val nodes = client.describeCluster().nodes().get()
+      nodes.asScala.exists(_.id == brokerId)
+    }, s"Timed out waiting for brokerId $brokerId to come online")
+  }
+
+  def waitForLeaderToBecome(
+    client: Admin,
+    topicPartition: TopicPartition,
+    expectedLeaderOpt: Option[Int]
+  ): Unit = {
     val topic = topicPartition.topic
-    val partition = topicPartition.partition
+    val partitionId = topicPartition.partition
+
+    def currentLeader: Try[Option[Int]] = Try {
+      val topicDescription = client.describeTopics(List(topic).asJava).all.get.get(topic)
+      topicDescription.partitions.asScala
+        .find(_.partition == partitionId)
+        .flatMap(partitionState => Option(partitionState.leader))

Review comment:
       Yes




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11186: KAFKA-13162: Ensure ElectLeaders is properly handled in KRaft

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11186:
URL: https://github.com/apache/kafka/pull/11186#discussion_r700607209



##########
File path: core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
##########
@@ -732,6 +730,79 @@ class ControllerApisTest {
       controllerApis.createPartitions(request, false, _ => Set("foo", "bar")).get().asScala.toSet)
   }
 
+  @Test
+  def testElectLeadersAuthorization(): Unit = {

Review comment:
       That would skip the exception handler in `ControllerApis.handle`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11186: KAFKA-13162: Ensure ElectLeaders is properly handled in KRaft

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11186:
URL: https://github.com/apache/kafka/pull/11186#discussion_r701234135



##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -804,7 +817,7 @@ void handleBrokerUnfenced(int brokerId, long brokerEpoch, List<ApiMessageAndVers
                 TopicControlInfo topic = topics.get(topicEntry.getValue());
                 if (topic != null) {
                     for (int partitionId : topic.parts.keySet()) {
-                        ApiError error = electLeader(topicName, partitionId, uncleanOk, records);
+                        ApiError error = electLeader(topicName, partitionId, electionType, records);

Review comment:
       Okay, sounds good to me. We have `PlaintextAdminIntegrationTest` but that suite is tightly coupled to the ZK implementation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11186: KAFKA-13162: Ensure ElectLeaders is properly handled in KRaft

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11186:
URL: https://github.com/apache/kafka/pull/11186#discussion_r696118985



##########
File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala
##########
@@ -1622,19 +1623,37 @@ object TestUtils extends Logging {
     waitForLeaderToBecome(client, topicPartition, None)
   }
 
-  def waitForLeaderToBecome(client: Admin, topicPartition: TopicPartition, leader: Option[Int]): Unit = {
+  def waitForOnlineBroker(client: Admin, brokerId: Int): Unit = {
+    waitUntilTrue(() => {
+      val nodes = client.describeCluster().nodes().get()
+      nodes.asScala.exists(_.id == brokerId)
+    }, s"Timed out waiting for brokerId $brokerId to come online")
+  }
+
+  def waitForLeaderToBecome(
+    client: Admin,
+    topicPartition: TopicPartition,
+    expectedLeaderOpt: Option[Int]
+  ): Unit = {
     val topic = topicPartition.topic
-    val partition = topicPartition.partition
+    val partitionId = topicPartition.partition
+
+    def currentLeader: Try[Option[Int]] = Try {
+      val topicDescription = client.describeTopics(List(topic).asJava).all.get.get(topic)
+      topicDescription.partitions.asScala
+        .find(_.partition == partitionId)
+        .flatMap(partitionState => Option(partitionState.leader))

Review comment:
       Yes:
   ```
       /**
        * Return the leader of the partition or null if there is none.
        */
       public Node leader() {
           return leader;
       }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11186: KAFKA-13162: Ensure ElectLeaders is properly handled in KRaft

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11186:
URL: https://github.com/apache/kafka/pull/11186#discussion_r687906243



##########
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##########
@@ -90,8 +90,7 @@ class BrokerServer(
 
   this.logIdent = logContext.logPrefix
 
-  val lifecycleManager: BrokerLifecycleManager =
-    new BrokerLifecycleManager(config, time, threadNamePrefix)
+  private var lifecycleManager: BrokerLifecycleManager = null

Review comment:
       Why did we move this to `startup`? We seem to check for `null` in the `shutdown` method for a few of these fields, should do the same for this field?

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java
##########
@@ -103,20 +103,22 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         ApiError apiError = ApiError.fromThrowable(e);
         List<ReplicaElectionResult> electionResults = new ArrayList<>();
 
-        for (TopicPartitions topic : data.topicPartitions()) {
-            ReplicaElectionResult electionResult = new ReplicaElectionResult();
+        if (data.topicPartitions() != null) {

Review comment:
       When is `data.topicPartitions()` `null`?

##########
File path: core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
##########
@@ -86,44 +79,22 @@ abstract class ZooKeeperTestHarness extends Logging {
 object ZooKeeperTestHarness {
   val ZkClientEventThreadSuffix = "-EventThread"
 
-  // Threads which may cause transient failures in subsequent tests if not shutdown.
-  // These include threads which make connections to brokers and may cause issues
-  // when broker ports are reused (e.g. auto-create topics) as well as threads
-  // which reset static JAAS configuration.

Review comment:
       I see. Maybe move this comment to the test utility function you created.

##########
File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala
##########
@@ -1914,4 +1933,26 @@ object TestUtils extends Logging {
     )
   }
 
+  def verifyNoUnexpectedThreads(context: String): Unit = {
+    val unexpectedThreadNames = Set(
+      ControllerEventManager.ControllerEventThreadName,
+      KafkaProducer.NETWORK_THREAD_PREFIX,
+      AdminClientUnitTestEnv.kafkaAdminClientNetworkThreadPrefix(),
+      AbstractCoordinator.HEARTBEAT_THREAD_PREFIX,
+      ZooKeeperTestHarness.ZkClientEventThreadSuffix
+    )

Review comment:
       Is there some insight into why this specific set of threads?

##########
File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala
##########
@@ -1914,4 +1933,26 @@ object TestUtils extends Logging {
     )
   }
 
+  def verifyNoUnexpectedThreads(context: String): Unit = {
+    val unexpectedThreadNames = Set(
+      ControllerEventManager.ControllerEventThreadName,
+      KafkaProducer.NETWORK_THREAD_PREFIX,
+      AdminClientUnitTestEnv.kafkaAdminClientNetworkThreadPrefix(),
+      AbstractCoordinator.HEARTBEAT_THREAD_PREFIX,
+      ZooKeeperTestHarness.ZkClientEventThreadSuffix
+    )
+
+    def unexpectedThreads: Set[String] = {
+      val allThreads = Thread.getAllStackTraces.keySet.asScala.map(thread => thread.getName)
+      allThreads.filter(t => unexpectedThreadNames.exists(s => t.contains(s))).toSet
+    }
+
+    def printUnexpectedThreads: String = {
+      val unexpected = unexpectedThreads
+      s"Found ${unexpected.size} unexpected threads during $context: ${unexpected.mkString("`", ",", "`")}"
+    }
+
+    TestUtils.waitUntilTrue(() => unexpectedThreads.isEmpty, printUnexpectedThreads)

Review comment:
       Probably unlikely to cause any issues in the thread but the set of threads check is different from the set of threads printed. Maybe we can use `computeUntilTrue`.

##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -123,8 +123,8 @@
 public class ReplicationControlManager {
 
     static class TopicControlInfo {
-        private final String name;
-        private final Uuid id;
+        final String name;
+        final Uuid id;

Review comment:
       How about adding access methods instead? The nice thing about access method is that we can easily use them as functions. E.g. `Optional<Uuid> id = maybeTopicControlInfo.map(TopicControlInfo::id);`

##########
File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala
##########
@@ -1622,19 +1623,37 @@ object TestUtils extends Logging {
     waitForLeaderToBecome(client, topicPartition, None)
   }
 
-  def waitForLeaderToBecome(client: Admin, topicPartition: TopicPartition, leader: Option[Int]): Unit = {
+  def waitForOnlineBroker(client: Admin, brokerId: Int): Unit = {
+    waitUntilTrue(() => {
+      val nodes = client.describeCluster().nodes().get()
+      nodes.asScala.exists(_.id == brokerId)
+    }, s"Timed out waiting for brokerId $brokerId to come online")
+  }
+
+  def waitForLeaderToBecome(
+    client: Admin,
+    topicPartition: TopicPartition,
+    expectedLeaderOpt: Option[Int]
+  ): Unit = {
     val topic = topicPartition.topic
-    val partition = topicPartition.partition
+    val partitionId = topicPartition.partition
+
+    def currentLeader: Try[Option[Int]] = Try {
+      val topicDescription = client.describeTopics(List(topic).asJava).all.get.get(topic)
+      topicDescription.partitions.asScala
+        .find(_.partition == partitionId)
+        .flatMap(partitionState => Option(partitionState.leader))

Review comment:
       We use `flatMap` because `partitionState.leader` can be `null`?

##########
File path: core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
##########
@@ -189,17 +197,39 @@ public void start() {
         @Override
         public void stop() {
             if (stopped.compareAndSet(false, true)) {
-                try {
-                    clusterReference.get().close();
-                } catch (Exception e) {
-                    throw new RuntimeException("Failed to stop Raft server", e);
-                }
+
+                Utils.closeQuietly(clusterReference.get(), "cluster");
+                admins.forEach(admin -> Utils.closeQuietly(admin, "admin"));

Review comment:
       Should we close the admin clients first so that we have cleaner log messages when debugging tests?

##########
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##########
@@ -178,6 +179,8 @@ class BrokerServer(
     try {
       info("Starting broker")
 
+      lifecycleManager = new BrokerLifecycleManager(config, time, threadNamePrefix)
+

Review comment:
       This comment applies to the methods `currentState()` and `boundPort()`.
   
   Both methods don't grab a lock. I think we need to change them to grab a lock or change the used fields (`sockerServer` and `lifecycleManager`) to `@volatile`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11186: KAFKA-13162: Ensure ElectLeaders is properly handled in KRaft

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11186:
URL: https://github.com/apache/kafka/pull/11186#discussion_r697544035



##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -856,21 +867,24 @@ ApiError electLeader(String topic, int partitionId, boolean uncleanOk,
             return new ApiError(UNKNOWN_TOPIC_OR_PARTITION,
                 "No such partition as " + topic + "-" + partitionId);
         }
+        if ((electionType == ElectionType.PREFERRED && partition.hasPreferredLeader())
+            || (electionType == ElectionType.UNCLEAN && partition.hasLeader())) {
+            return new ApiError(Errors.ELECTION_NOT_NEEDED);
+        }
+
         PartitionChangeBuilder builder = new PartitionChangeBuilder(partition,
             topicId,
             partitionId,
             r -> clusterControl.unfenced(r),
-            () -> uncleanOk || configurationControl.uncleanLeaderElectionEnabledForTopic(topic));
-        builder.setAlwaysElectPreferredIfPossible(true);
+            () -> electionType == ElectionType.UNCLEAN);
+
+        builder.setAlwaysElectPreferredIfPossible(electionType == ElectionType.PREFERRED);

Review comment:
       Outside the scope of this change but do you think that it would help if internally we supported 3 elections: PREFERRED, ANY, UNCLEAN. With this enum we can change the constructor for `PartitionChangeBuilder` to take a function that returns this enum. Would that help with the API for `PartitionChangeBuilder` and address all of its uses?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11186: KAFKA-13162: Ensure ElectLeaders is properly handled in KRaft

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11186:
URL: https://github.com/apache/kafka/pull/11186#discussion_r697549554



##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -804,7 +817,7 @@ void handleBrokerUnfenced(int brokerId, long brokerEpoch, List<ApiMessageAndVers
                 TopicControlInfo topic = topics.get(topicEntry.getValue());
                 if (topic != null) {
                     for (int partitionId : topic.parts.keySet()) {
-                        ApiError error = electLeader(topicName, partitionId, uncleanOk, records);
+                        ApiError error = electLeader(topicName, partitionId, electionType, records);

Review comment:
       Maybe we need an integration test for this but if the ApiError is `new ApiError(Errors.ELECTION_NOT_NEEDED)` we should not add the topic partition to the result. In the ZK implementation this filtering is done in KafkaApis. See `sendResponseCallback` in `handleElectLeaders`.

##########
File path: core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala
##########
@@ -18,188 +18,249 @@ package kafka.admin
 
 import java.io.File
 import java.nio.charset.StandardCharsets
-import java.nio.file.Files
-import java.nio.file.Path
+import java.nio.file.{Files, Path}
 
 import kafka.common.AdminCommandFailedException
-import kafka.server.KafkaConfig
-import kafka.server.KafkaServer
+import kafka.server.IntegrationTestUtils.createTopic
+import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.test.annotation.{ClusterTest, ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
+import kafka.test.{ClusterConfig, ClusterInstance}
 import kafka.utils.TestUtils
-import kafka.zk.ZooKeeperTestHarness
-import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
+import org.apache.kafka.clients.admin.AdminClientConfig
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.TimeoutException
-import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
+import org.apache.kafka.common.errors.{TimeoutException, UnknownTopicOrPartitionException}
 import org.apache.kafka.common.network.ListenerName
-import org.junit.jupiter.api.AfterEach
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.BeforeEach
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.extension.ExtendWith
+import org.junit.jupiter.api.{BeforeEach, Tag, Test}
 
-import scala.jdk.CollectionConverters._
-import scala.collection.Seq
 import scala.concurrent.duration._
 
-final class LeaderElectionCommandTest extends ZooKeeperTestHarness {
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.BOTH, brokers = 3)
+@Tag("integration")
+final class LeaderElectionCommandTest(cluster: ClusterInstance) {
   import LeaderElectionCommandTest._
 
-  var servers = Seq.empty[KafkaServer]
   val broker1 = 0
   val broker2 = 1
   val broker3 = 2
 
   @BeforeEach
-  override def setUp(): Unit = {
-    super.setUp()
-
-    val brokerConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false)
-    servers = brokerConfigs.map { config =>
-      config.setProperty("auto.leader.rebalance.enable", "false")
-      config.setProperty("controlled.shutdown.enable", "true")
-      config.setProperty("controlled.shutdown.max.retries", "1")
-      config.setProperty("controlled.shutdown.retry.backoff.ms", "1000")
-      TestUtils.createServer(KafkaConfig.fromProps(config))
-    }
+  def setup(clusterConfig: ClusterConfig): Unit = {
+    TestUtils.verifyNoUnexpectedThreads("@BeforeEach")
+    clusterConfig.serverProperties().put(KafkaConfig.AutoLeaderRebalanceEnableProp, "false")
+    clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownEnableProp, "true")
+    clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownMaxRetriesProp, "1")
+    clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownRetryBackoffMsProp, "1000")
+    clusterConfig.serverProperties().put(KafkaConfig.OffsetsTopicReplicationFactorProp, "2")
   }
 
-  @AfterEach
-  override def tearDown(): Unit = {
-    TestUtils.shutdownServers(servers)
-
-    super.tearDown()
-  }
-
-  @Test
+  @ClusterTest
   def testAllTopicPartition(): Unit = {
-    TestUtils.resource(Admin.create(createConfig(servers).asJava)) { client =>
-      val topic = "unclean-topic"
-      val partition = 0
-      val assignment = Seq(broker2, broker3)
-
-      TestUtils.createTopic(zkClient, topic, Map(partition -> assignment), servers)
+    val client = cluster.createAdminClient()
+    val topic = "unclean-topic"
+    val partition = 0
+    val assignment = Seq(broker2, broker3)
 
-      val topicPartition = new TopicPartition(topic, partition)
+    cluster.waitForReadyBrokers()
+    createTopic(client, topic, Map(partition -> assignment))
 
-      TestUtils.assertLeader(client, topicPartition, broker2)
+    val topicPartition = new TopicPartition(topic, partition)
 
-      servers(broker3).shutdown()
-      TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3))
-      servers(broker2).shutdown()
-      TestUtils.assertNoLeader(client, topicPartition)
-      servers(broker3).startup()
+    TestUtils.assertLeader(client, topicPartition, broker2)
+    cluster.shutdownBroker(broker3)
+    TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3))
+    cluster.shutdownBroker(broker2)
+    TestUtils.assertNoLeader(client, topicPartition)
+    cluster.startBroker(broker3)
+    TestUtils.waitForOnlineBroker(client, broker3)
 
-      LeaderElectionCommand.main(
-        Array(
-          "--bootstrap-server", bootstrapServers(servers),
-          "--election-type", "unclean",
-          "--all-topic-partitions"
-        )
+    LeaderElectionCommand.main(
+      Array(
+        "--bootstrap-server", cluster.bootstrapServers(),
+        "--election-type", "unclean",
+        "--all-topic-partitions"
       )
+    )
 
-      TestUtils.assertLeader(client, topicPartition, broker3)
-    }
+    TestUtils.assertLeader(client, topicPartition, broker3)
   }
 
-  @Test
+  @ClusterTest
   def testTopicPartition(): Unit = {
-    TestUtils.resource(Admin.create(createConfig(servers).asJava)) { client =>
-      val topic = "unclean-topic"
-      val partition = 0
-      val assignment = Seq(broker2, broker3)
+    val client = cluster.createAdminClient()
+    val topic = "unclean-topic"
+    val partition = 0
+    val assignment = Seq(broker2, broker3)
 
-      TestUtils.createTopic(zkClient, topic, Map(partition -> assignment), servers)
+    cluster.waitForReadyBrokers()
+    createTopic(client, topic, Map(partition -> assignment))
 
-      val topicPartition = new TopicPartition(topic, partition)
+    val topicPartition = new TopicPartition(topic, partition)
 
-      TestUtils.assertLeader(client, topicPartition, broker2)
+    TestUtils.assertLeader(client, topicPartition, broker2)
 
-      servers(broker3).shutdown()
-      TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3))
-      servers(broker2).shutdown()
-      TestUtils.assertNoLeader(client, topicPartition)
-      servers(broker3).startup()
+    cluster.shutdownBroker(broker3)
+    TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3))
+    cluster.shutdownBroker(broker2)
+    TestUtils.assertNoLeader(client, topicPartition)
+    cluster.startBroker(broker3)
+    TestUtils.waitForOnlineBroker(client, broker3)
 
-      LeaderElectionCommand.main(
-        Array(
-          "--bootstrap-server", bootstrapServers(servers),
-          "--election-type", "unclean",
-          "--topic", topic,
-          "--partition", partition.toString
-        )
+    LeaderElectionCommand.main(
+      Array(
+        "--bootstrap-server", cluster.bootstrapServers(),
+        "--election-type", "unclean",
+        "--topic", topic,
+        "--partition", partition.toString
       )
+    )
 
-      TestUtils.assertLeader(client, topicPartition, broker3)
-    }
+    TestUtils.assertLeader(client, topicPartition, broker3)
   }
 
-  @Test
+  @ClusterTest
   def testPathToJsonFile(): Unit = {
-    TestUtils.resource(Admin.create(createConfig(servers).asJava)) { client =>
-      val topic = "unclean-topic"
-      val partition = 0
-      val assignment = Seq(broker2, broker3)
+    val client = cluster.createAdminClient()
+    val topic = "unclean-topic"
+    val partition = 0
+    val assignment = Seq(broker2, broker3)
 
-      TestUtils.createTopic(zkClient, topic, Map(partition -> assignment), servers)
+    cluster.waitForReadyBrokers()
+    createTopic(client, topic, Map(partition -> assignment))
 
-      val topicPartition = new TopicPartition(topic, partition)
+    val topicPartition = new TopicPartition(topic, partition)
 
-      TestUtils.assertLeader(client, topicPartition, broker2)
+    TestUtils.assertLeader(client, topicPartition, broker2)
 
-      servers(broker3).shutdown()
-      TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3))
-      servers(broker2).shutdown()
-      TestUtils.assertNoLeader(client, topicPartition)
-      servers(broker3).startup()
+    cluster.shutdownBroker(broker3)
+    TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3))
+    cluster.shutdownBroker(broker2)
+    TestUtils.assertNoLeader(client, topicPartition)
+    cluster.startBroker(broker3)
+    TestUtils.waitForOnlineBroker(client, broker3)
 
-      val topicPartitionPath = tempTopicPartitionFile(Set(topicPartition))
+    val topicPartitionPath = tempTopicPartitionFile(Set(topicPartition))
 
-      LeaderElectionCommand.main(
-        Array(
-          "--bootstrap-server", bootstrapServers(servers),
-          "--election-type", "unclean",
-          "--path-to-json-file", topicPartitionPath.toString
-        )
+    LeaderElectionCommand.main(
+      Array(
+        "--bootstrap-server", cluster.bootstrapServers(),
+        "--election-type", "unclean",
+        "--path-to-json-file", topicPartitionPath.toString
       )
+    )
 
-      TestUtils.assertLeader(client, topicPartition, broker3)
-    }
+    TestUtils.assertLeader(client, topicPartition, broker3)
   }
 
-  @Test
+  @ClusterTest
   def testPreferredReplicaElection(): Unit = {
-    TestUtils.resource(Admin.create(createConfig(servers).asJava)) { client =>
-      val topic = "unclean-topic"
-      val partition = 0
-      val assignment = Seq(broker2, broker3)
+    val client = cluster.createAdminClient()
+    val topic = "preferred-topic"
+    val partition = 0
+    val assignment = Seq(broker2, broker3)
+
+    cluster.waitForReadyBrokers()
+    createTopic(client, topic, Map(partition -> assignment))
+
+    val topicPartition = new TopicPartition(topic, partition)
+
+    TestUtils.assertLeader(client, topicPartition, broker2)
+
+    cluster.shutdownBroker(broker2)
+    TestUtils.assertLeader(client, topicPartition, broker3)
+    cluster.startBroker(broker2)
+    TestUtils.waitForBrokersInIsr(client, topicPartition, Set(broker2))
+
+    LeaderElectionCommand.main(
+      Array(
+        "--bootstrap-server", cluster.bootstrapServers(),
+        "--election-type", "preferred",
+        "--all-topic-partitions"
+      )
+    )
+
+    TestUtils.assertLeader(client, topicPartition, broker2)
+  }
 
-      TestUtils.createTopic(zkClient, topic, Map(partition -> assignment), servers)
+  @ClusterTest
+  def testTopicDoesNotExist(): Unit = {
+    val e = assertThrows(classOf[AdminCommandFailedException], () => LeaderElectionCommand.main(
+      Array(
+        "--bootstrap-server", cluster.bootstrapServers(),
+        "--election-type", "preferred",
+        "--topic", "unknown-topic-name",
+        "--partition", "0"
+      )
+    ))
+    assertTrue(e.getSuppressed()(0).isInstanceOf[UnknownTopicOrPartitionException])
+  }
 
-      val topicPartition = new TopicPartition(topic, partition)
+  @ClusterTest
+  def testElectionResultOutput(): Unit = {
+    val client = cluster.createAdminClient()
+    val topic = "non-preferred-topic"
+    val partition0 = 0
+    val partition1 = 1
+    val assignment0 = Seq(broker2, broker3)
+    val assignment1 = Seq(broker3, broker2)
+
+    cluster.waitForReadyBrokers()
+    createTopic(client, topic, Map(
+      partition0 -> assignment0,
+      partition1 -> assignment1
+    ))
+
+    val topicPartition0 = new TopicPartition(topic, partition0)
+    val topicPartition1 = new TopicPartition(topic, partition1)
 
-      TestUtils.assertLeader(client, topicPartition, broker2)
+    TestUtils.assertLeader(client, topicPartition0, broker2)
+    TestUtils.assertLeader(client, topicPartition1, broker3)
 
-      servers(broker2).shutdown()
-      TestUtils.assertLeader(client, topicPartition, broker3)
-      servers(broker2).startup()
-      TestUtils.waitForBrokersInIsr(client, topicPartition, Set(broker2))
+    cluster.shutdownBroker(broker2)
+    TestUtils.assertLeader(client, topicPartition0, broker3)
+    cluster.startBroker(broker2)
+    TestUtils.waitForBrokersInIsr(client, topicPartition0, Set(broker2))
+    TestUtils.waitForBrokersInIsr(client, topicPartition1, Set(broker2))
 
+    val topicPartitionPath = tempTopicPartitionFile(Set(topicPartition0, topicPartition1))
+    val output = TestUtils.grabConsoleOutput(
       LeaderElectionCommand.main(
         Array(
-          "--bootstrap-server", bootstrapServers(servers),
+          "--bootstrap-server", cluster.bootstrapServers(),
           "--election-type", "preferred",
-          "--all-topic-partitions"
+          "--path-to-json-file", topicPartitionPath.toString
         )
       )
+    )
 
-      TestUtils.assertLeader(client, topicPartition, broker2)
-    }
+    val electionResultOutputIter = output.split("\n").iterator
+
+    assertTrue(electionResultOutputIter.hasNext)
+    val firstLine = electionResultOutputIter.next()
+    assertTrue(firstLine.contains(s"Successfully completed leader election (PREFERRED) for partitions $topicPartition0"),
+    s"Unexpected output: $firstLine")
+
+    assertTrue(electionResultOutputIter.hasNext)
+    val secondLine = electionResultOutputIter.next()
+    assertTrue(secondLine.contains(s"Valid replica already elected for partitions $topicPartition1"),
+    s"Unexpected output: $secondLine")
   }
+}
+
+/**
+ * For some error cases, we can save a little build time by avoiding the overhead
+ * for cluster creation and cleanup.
+ */
+class LeaderElectionCommandErrorTest {

Review comment:
       Good idea. I searched for "CommandErrorTest" and it looks like this is the first time we do this. Should we standardize this pattern and move this class to its own file?

##########
File path: metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
##########
@@ -1052,6 +1114,160 @@ public void testCreatePartitionsFailsWithManualAssignmentWithAllFenced() throws
             ctx.replicationControl.getPartition(fooId, 1));
     }
 
+    private void assertLeaderAndIsr(
+        ReplicationControlManager replication,
+        TopicIdPartition topicIdPartition,
+        int leaderId,
+        int[] isr
+    ) {
+        PartitionRegistration registration = replication.getPartition(
+            topicIdPartition.topicId(),
+            topicIdPartition.partitionId()
+        );
+        assertArrayEquals(isr, registration.isr);
+        assertEquals(leaderId, registration.leader);
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testElectUncleanLeaders(boolean electAllPartitions) throws Exception {
+        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
+        ReplicationControlManager replication = ctx.replicationControl;
+        ctx.registerBrokers(0, 1, 2, 3, 4);
+        ctx.unfenceBrokers(0, 1, 2, 3, 4);
+
+        Uuid fooId = ctx.createTestTopic("foo", new int[][]{
+            new int[]{1, 2, 3}, new int[]{2, 3, 4}, new int[]{0, 2, 1}}).topicId();
+
+        TopicIdPartition partition0 = new TopicIdPartition(fooId, 0);
+        TopicIdPartition partition1 = new TopicIdPartition(fooId, 1);
+        TopicIdPartition partition2 = new TopicIdPartition(fooId, 2);
+
+        ctx.fenceBrokers(Utils.mkSet(2, 3));
+        ctx.fenceBrokers(Utils.mkSet(1, 2, 3));
+
+        assertLeaderAndIsr(replication, partition0, NO_LEADER, new int[]{1});
+        assertLeaderAndIsr(replication, partition1, 4, new int[]{4});
+        assertLeaderAndIsr(replication, partition2, 0, new int[]{0});
+
+        ElectLeadersRequestData request = buildElectLeadersRequest(
+            ElectionType.UNCLEAN,
+            electAllPartitions ? null : singletonMap("foo", asList(0, 1, 2))
+        );
+
+        // No election can be done yet because no replicas are available for partition 0
+        ControllerResult<ElectLeadersResponseData> result1 = replication.electLeaders(request);
+        assertEquals(Collections.emptyList(), result1.records());
+
+        ElectLeadersResponseData expectedResponse1 = buildElectLeadersResponse(NONE, Utils.mkMap(
+            Utils.mkEntry(
+                new TopicPartition("foo", 0),
+                new ApiError(ELIGIBLE_LEADERS_NOT_AVAILABLE)
+            ),
+            Utils.mkEntry(
+                new TopicPartition("foo", 1),
+                new ApiError(ELECTION_NOT_NEEDED)
+            ),
+            Utils.mkEntry(
+                new TopicPartition("foo", 2),
+                new ApiError(ELECTION_NOT_NEEDED)
+            )

Review comment:
       I think we are not suppose to return partitions 1 and 2 if `null` is used in elect leaders request. In other words, it is expected that only partitions that changed are returned when elect all partitions is used.

##########
File path: metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
##########
@@ -1052,6 +1114,160 @@ public void testCreatePartitionsFailsWithManualAssignmentWithAllFenced() throws
             ctx.replicationControl.getPartition(fooId, 1));
     }
 
+    private void assertLeaderAndIsr(
+        ReplicationControlManager replication,
+        TopicIdPartition topicIdPartition,
+        int leaderId,
+        int[] isr
+    ) {
+        PartitionRegistration registration = replication.getPartition(
+            topicIdPartition.topicId(),
+            topicIdPartition.partitionId()
+        );
+        assertArrayEquals(isr, registration.isr);
+        assertEquals(leaderId, registration.leader);
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testElectUncleanLeaders(boolean electAllPartitions) throws Exception {
+        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
+        ReplicationControlManager replication = ctx.replicationControl;
+        ctx.registerBrokers(0, 1, 2, 3, 4);
+        ctx.unfenceBrokers(0, 1, 2, 3, 4);
+
+        Uuid fooId = ctx.createTestTopic("foo", new int[][]{
+            new int[]{1, 2, 3}, new int[]{2, 3, 4}, new int[]{0, 2, 1}}).topicId();
+
+        TopicIdPartition partition0 = new TopicIdPartition(fooId, 0);
+        TopicIdPartition partition1 = new TopicIdPartition(fooId, 1);
+        TopicIdPartition partition2 = new TopicIdPartition(fooId, 2);
+
+        ctx.fenceBrokers(Utils.mkSet(2, 3));
+        ctx.fenceBrokers(Utils.mkSet(1, 2, 3));
+
+        assertLeaderAndIsr(replication, partition0, NO_LEADER, new int[]{1});
+        assertLeaderAndIsr(replication, partition1, 4, new int[]{4});
+        assertLeaderAndIsr(replication, partition2, 0, new int[]{0});
+
+        ElectLeadersRequestData request = buildElectLeadersRequest(
+            ElectionType.UNCLEAN,
+            electAllPartitions ? null : singletonMap("foo", asList(0, 1, 2))
+        );
+
+        // No election can be done yet because no replicas are available for partition 0
+        ControllerResult<ElectLeadersResponseData> result1 = replication.electLeaders(request);
+        assertEquals(Collections.emptyList(), result1.records());
+
+        ElectLeadersResponseData expectedResponse1 = buildElectLeadersResponse(NONE, Utils.mkMap(
+            Utils.mkEntry(
+                new TopicPartition("foo", 0),
+                new ApiError(ELIGIBLE_LEADERS_NOT_AVAILABLE)
+            ),
+            Utils.mkEntry(
+                new TopicPartition("foo", 1),
+                new ApiError(ELECTION_NOT_NEEDED)
+            ),
+            Utils.mkEntry(
+                new TopicPartition("foo", 2),
+                new ApiError(ELECTION_NOT_NEEDED)
+            )

Review comment:
       This comment applies to a few places in this file.

##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -856,21 +867,24 @@ ApiError electLeader(String topic, int partitionId, boolean uncleanOk,
             return new ApiError(UNKNOWN_TOPIC_OR_PARTITION,
                 "No such partition as " + topic + "-" + partitionId);
         }
+        if ((electionType == ElectionType.PREFERRED && partition.hasPreferredLeader())
+            || (electionType == ElectionType.UNCLEAN && partition.hasLeader())) {
+            return new ApiError(Errors.ELECTION_NOT_NEEDED);
+        }
+
         PartitionChangeBuilder builder = new PartitionChangeBuilder(partition,
             topicId,
             partitionId,
             r -> clusterControl.unfenced(r),
-            () -> uncleanOk || configurationControl.uncleanLeaderElectionEnabledForTopic(topic));
-        builder.setAlwaysElectPreferredIfPossible(true);
+            () -> electionType == ElectionType.UNCLEAN);
+
+        builder.setAlwaysElectPreferredIfPossible(electionType == ElectionType.PREFERRED);

Review comment:
       Outside the scope of this change but do you that it would help if internally we supported 3 elections: PREFERRED, ANY, UNCLEAN. With this enum we can change the constructor for `PartitionChangeBuilder` to take a function that returns this enum. Would that help with the API for `PartitionChangeBuilder` and address all of its uses?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #11186: KAFKA-13162: Ensure ElectLeaders is properly handled in KRaft

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #11186:
URL: https://github.com/apache/kafka/pull/11186#discussion_r697800147



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -488,6 +488,31 @@ class ControllerApis(val requestChannel: RequestChannel,
     handleRaftRequest(request, response => new DescribeQuorumResponse(response.asInstanceOf[DescribeQuorumResponseData]))
   }
 
+  def handleElectLeaders(request: RequestChannel.Request): Unit = {
+    val electLeadersRequest = request.body[ElectLeadersRequest]
+
+    def sendErrorResponse(exception: Throwable): Unit = {
+      requestHelper.sendResponseMaybeThrottle(request, throttleMs => {
+        electLeadersRequest.getErrorResponse(throttleMs, exception)
+      })
+    }
+
+    if (!authHelper.authorize(request.context, ALTER, CLUSTER, CLUSTER_NAME)) {
+      sendErrorResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.exception)

Review comment:
       Why not call `authHelper.authorizeClusterOperation(request, ALTER)` as other methods, It seems unnecessary to define a `sendErrorResponse` method.

##########
File path: core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
##########
@@ -732,6 +730,79 @@ class ControllerApisTest {
       controllerApis.createPartitions(request, false, _ => Set("foo", "bar")).get().asScala.toSet)
   }
 
+  @Test
+  def testElectLeadersAuthorization(): Unit = {

Review comment:
       Similar question: why not use `createControllerApis().handleElectLeaders` ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org