You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "OmniaGM (via GitHub)" <gi...@apache.org> on 2023/02/06 15:28:08 UTC

[GitHub] [kafka] OmniaGM opened a new pull request, #13204: KAFKA-14593: Move LeaderElectionCommand to tools

OmniaGM opened a new pull request, #13204:
URL: https://github.com/apache/kafka/pull/13204

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] OmniaGM commented on a diff in pull request #13204: KAFKA-14593: Move LeaderElectionCommand to tools

Posted by "OmniaGM (via GitHub)" <gi...@apache.org>.
OmniaGM commented on code in PR #13204:
URL: https://github.com/apache/kafka/pull/13204#discussion_r1294461075


##########
tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java:
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.test.ClusterConfig;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import kafka.utils.TestUtils;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.extension.ExtendWith;
+import scala.collection.JavaConverters;
+
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+@SuppressWarnings("deprecation")
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ALL, brokers = 3)
+@Tag("integration")
+public class LeaderElectionCommandTest {
+    private final ClusterInstance cluster;
+    int broker1 = 0;
+    int broker2 = 1;
+    int broker3 = 2;
+
+    public LeaderElectionCommandTest(ClusterInstance cluster) {
+        this.cluster = cluster;
+    }
+
+    @BeforeEach
+    void setup(ClusterConfig clusterConfig) {
+        TestUtils.verifyNoUnexpectedThreads("@BeforeEach");
+        clusterConfig.serverProperties().put(KafkaConfig.AutoLeaderRebalanceEnableProp(), "false");
+        clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownEnableProp(), "true");
+        clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownMaxRetriesProp(), "1");
+        clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownRetryBackoffMsProp(), "1000");
+        clusterConfig.serverProperties().put(KafkaConfig.OffsetsTopicReplicationFactorProp(), "2");
+    }
+
+    @ClusterTest
+    public void testAllTopicPartition() throws InterruptedException, ExecutionException {
+        String topic = "unclean-topic";
+        int partition = 0;
+        List<Integer> assignment = Arrays.asList(broker2, broker3);
+
+        cluster.waitForReadyBrokers();
+        Admin client = cluster.createAdminClient();
+        Map<Integer, List<Integer>> partitionAssignment = new HashMap<>();
+        partitionAssignment.put(partition, assignment);
+
+        createTopic(client, topic, partitionAssignment);
+
+        TopicPartition topicPartition = new TopicPartition(topic, partition);
+
+        TestUtils.assertLeader(client, topicPartition, broker2);
+        cluster.shutdownBroker(broker3);
+        TestUtils.waitForBrokersOutOfIsr(client,
+            JavaConverters.asScalaBuffer(Collections.singletonList(topicPartition)).toSet(),
+            JavaConverters.asScalaBuffer(Collections.singletonList(broker3)).toSet()
+        );

Review Comment:
   updated with the suggested comment



##########
tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java:
##########
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.databind.JsonMappingException;
+import joptsimple.AbstractOptionSpec;
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpecBuilder;
+import joptsimple.util.EnumConverter;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.ElectionNotNeededException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.apache.kafka.server.common.AdminOperationException;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.Json;
+import org.apache.kafka.server.util.json.DecodeJson;
+import org.apache.kafka.server.util.json.JsonObject;
+import org.apache.kafka.server.util.json.JsonValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+public class LeaderElectionCommand {
+    private static final Logger LOG = LoggerFactory.getLogger(LeaderElectionCommand.class);
+    private static final DecodeJson.DecodeString STRING = new DecodeJson.DecodeString();
+    private static final DecodeJson.DecodeInteger INT = new DecodeJson.DecodeInteger();
+
+    public static void main(String... args) {
+        try {
+            run(Duration.ofMillis(30000), args);
+        } catch (Exception e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+        }
+    }
+
+    static void run(Duration timeoutMs, String... args) throws Exception {
+        LeaderElectionCommandOptions commandOptions = new LeaderElectionCommandOptions(args);
+
+        commandOptions.maybePrintHelpOrVersion();
+
+        commandOptions.validate();
+        ElectionType electionType = commandOptions.getElectionType();
+        Optional<Set<TopicPartition>> jsonFileTopicPartitions =
+            Optional.ofNullable(commandOptions.getPathToJsonFile())
+                .map(path -> parseReplicaElectionData(path));
+
+        Optional<String> topicOption = Optional.ofNullable(commandOptions.getTopic());
+        Optional<Integer> partitionOption = Optional.ofNullable(commandOptions.getPartition());
+        final Optional<Set<TopicPartition>> singleTopicPartition =
+            (topicOption.isPresent() && partitionOption.isPresent()) ?
+                Optional.of(Collections.singleton(new TopicPartition(topicOption.get(), partitionOption.get()))) :
+                Optional.empty();
+
+        /* Note: No need to look at --all-topic-partitions as we want this to be null if it is use.
+         * The validate function should be checking that this option is required if the --topic and --path-to-json-file
+         * are not specified.
+         */
+        Optional<Set<TopicPartition>> topicPartitions = jsonFileTopicPartitions.map(Optional::of).orElse(singleTopicPartition);
+
+        Properties props = new Properties();
+        if (commandOptions.hasAdminClientConfig()) {
+            props.putAll(Utils.loadProps(commandOptions.getAdminClientConfig()));
+        }
+        props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, commandOptions.getBootstrapServer());
+        props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, Long.toString(timeoutMs.toMillis()));
+        props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, Long.toString(timeoutMs.toMillis() / 2));
+
+        try (Admin adminClient = Admin.create(props)) {
+            electLeaders(adminClient, electionType, topicPartitions);
+        }
+    }
+
+    private static void electLeaders(Admin client, ElectionType electionType, Optional<Set<TopicPartition>> partitions) {
+        LOG.debug(String.format("Calling AdminClient.electLeaders(%s, %s)", electionType, partitions.orElse(null)));

Review Comment:
   updated with the suggested comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] OmniaGM commented on a diff in pull request #13204: KAFKA-14593: Move LeaderElectionCommand to tools

Posted by "OmniaGM (via GitHub)" <gi...@apache.org>.
OmniaGM commented on code in PR #13204:
URL: https://github.com/apache/kafka/pull/13204#discussion_r1288877172


##########
tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java:
##########
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.databind.JsonMappingException;
+import joptsimple.AbstractOptionSpec;
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpecBuilder;
+import joptsimple.util.EnumConverter;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.ElectionNotNeededException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.apache.kafka.server.common.AdminOperationException;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.Json;
+import org.apache.kafka.server.util.json.DecodeJson;
+import org.apache.kafka.server.util.json.JsonObject;
+import org.apache.kafka.server.util.json.JsonValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+public class LeaderElectionCommand {
+    private static final Logger LOG = LoggerFactory.getLogger(LeaderElectionCommand.class);
+    private static final DecodeJson.DecodeString STRING = new DecodeJson.DecodeString();
+    private static final DecodeJson.DecodeInteger INT = new DecodeJson.DecodeInteger();
+
+    public static void main(String... args) {
+        try {
+            run(Duration.ofMillis(30000), args);
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+        } catch (Exception e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+        }
+    }
+
+    static void run(Duration timeoutMs, String... args) throws Exception {
+        LeaderElectionCommandOptions commandOptions = new LeaderElectionCommandOptions(args);
+
+        commandOptions.maybePrintHelpOrVersion();
+
+        commandOptions.validate();
+        ElectionType electionType = commandOptions.getElectionType();
+        Optional<Set<TopicPartition>> jsonFileTopicPartitions =
+            Optional.ofNullable(commandOptions.getPathToJsonFile())
+                .map(path -> parseReplicaElectionData(path));
+
+        Optional<String> topicOption = Optional.ofNullable(commandOptions.getTopic());
+        Optional<Integer> partitionOption = Optional.ofNullable(commandOptions.getPartition());
+        final Optional<Set<TopicPartition>> singleTopicPartition =
+            (topicOption.isPresent() && partitionOption.isPresent()) ?
+                Optional.of(Collections.singleton(new TopicPartition(topicOption.get(), partitionOption.get()))) :
+                Optional.empty();
+
+        /* Note: No need to look at --all-topic-partitions as we want this to be null if it is use.
+         * The validate function should be checking that this option is required if the --topic and --path-to-json-file
+         * are not specified.
+         */
+        Optional<Set<TopicPartition>> topicPartitions = jsonFileTopicPartitions.map(Optional::of).orElse(singleTopicPartition);
+
+        Properties props = new Properties();
+        if (commandOptions.hasAdminClientConfig()) {
+            props.putAll(Utils.loadProps(commandOptions.getAdminClientConfig()));
+        }
+        props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, commandOptions.getBootstrapServer());
+        props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, Long.toString(timeoutMs.toMillis()));
+        props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, Long.toString(timeoutMs.toMillis() / 2));
+
+        try (Admin adminClient = Admin.create(props)) {
+            electLeaders(adminClient, electionType, topicPartitions);
+        }
+    }
+
+    private static void electLeaders(Admin client, ElectionType electionType, Optional<Set<TopicPartition>> partitions) {
+        LOG.debug(String.format("Calling AdminClient.electLeaders(%s, %s)", electionType, partitions.orElse(null)));

Review Comment:
   I kept it as we had it in the original scala code. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] OmniaGM commented on a diff in pull request #13204: KAFKA-14593: Move LeaderElectionCommand to tools

Posted by "OmniaGM (via GitHub)" <gi...@apache.org>.
OmniaGM commented on code in PR #13204:
URL: https://github.com/apache/kafka/pull/13204#discussion_r1294460465


##########
tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java:
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.test.ClusterConfig;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import kafka.utils.TestUtils;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.extension.ExtendWith;
+import scala.collection.JavaConverters;
+
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+@SuppressWarnings("deprecation")
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ALL, brokers = 3)
+@Tag("integration")
+public class LeaderElectionCommandTest {
+    private final ClusterInstance cluster;
+    int broker1 = 0;
+    int broker2 = 1;
+    int broker3 = 2;
+
+    public LeaderElectionCommandTest(ClusterInstance cluster) {
+        this.cluster = cluster;
+    }
+
+    @BeforeEach
+    void setup(ClusterConfig clusterConfig) {
+        TestUtils.verifyNoUnexpectedThreads("@BeforeEach");
+        clusterConfig.serverProperties().put(KafkaConfig.AutoLeaderRebalanceEnableProp(), "false");
+        clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownEnableProp(), "true");
+        clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownMaxRetriesProp(), "1");
+        clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownRetryBackoffMsProp(), "1000");
+        clusterConfig.serverProperties().put(KafkaConfig.OffsetsTopicReplicationFactorProp(), "2");
+    }
+
+    @ClusterTest
+    public void testAllTopicPartition() throws InterruptedException, ExecutionException {
+        String topic = "unclean-topic";
+        int partition = 0;
+        List<Integer> assignment = Arrays.asList(broker2, broker3);
+
+        cluster.waitForReadyBrokers();
+        Admin client = cluster.createAdminClient();
+        Map<Integer, List<Integer>> partitionAssignment = new HashMap<>();
+        partitionAssignment.put(partition, assignment);
+
+        createTopic(client, topic, partitionAssignment);
+
+        TopicPartition topicPartition = new TopicPartition(topic, partition);
+
+        TestUtils.assertLeader(client, topicPartition, broker2);
+        cluster.shutdownBroker(broker3);
+        TestUtils.waitForBrokersOutOfIsr(client,
+            JavaConverters.asScalaBuffer(Collections.singletonList(topicPartition)).toSet(),
+            JavaConverters.asScalaBuffer(Collections.singletonList(broker3)).toSet()
+        );
+        cluster.shutdownBroker(broker2);
+        TestUtils.assertNoLeader(client, topicPartition);
+        cluster.startBroker(broker3);
+        TestUtils.waitForOnlineBroker(client, broker3);
+
+        LeaderElectionCommand.main(
+            new String[] {
+                "--bootstrap-server", cluster.bootstrapServers(),
+                "--election-type", "unclean",
+                "--all-topic-partitions"
+            }
+        );
+
+        TestUtils.assertLeader(client, topicPartition, broker3);
+    }
+
+    @ClusterTest
+    public void testTopicPartition() throws InterruptedException, ExecutionException {
+        String topic = "unclean-topic";
+        int partition = 0;
+        List<Integer> assignment = Arrays.asList(broker2, broker3);
+
+        cluster.waitForReadyBrokers();
+        Map<Integer, List<Integer>> partitionAssignment = new HashMap<>();
+        partitionAssignment.put(partition, assignment);
+        Admin client = cluster.createAdminClient();
+        createTopic(client, topic, partitionAssignment);
+
+        TopicPartition topicPartition = new TopicPartition(topic, partition);
+
+        TestUtils.assertLeader(client, topicPartition, broker2);
+
+        cluster.shutdownBroker(broker3);
+        TestUtils.waitForBrokersOutOfIsr(client,
+            JavaConverters.asScalaBuffer(Collections.singletonList(topicPartition)).toSet(),
+            JavaConverters.asScalaBuffer(Collections.singletonList(broker3)).toSet()
+        );
+        cluster.shutdownBroker(broker2);
+        TestUtils.assertNoLeader(client, topicPartition);
+        cluster.startBroker(broker3);
+        TestUtils.waitForOnlineBroker(client, broker3);
+
+        LeaderElectionCommand.main(
+            new String[] {
+                "--bootstrap-server", cluster.bootstrapServers(),
+                "--election-type", "unclean",
+                "--topic", topic,
+                "--partition", Integer.toString(partition)
+            }
+        );
+
+        TestUtils.assertLeader(client, topicPartition, broker3);
+    }
+
+    @ClusterTest
+    public void testPathToJsonFile() throws Exception {
+        String topic = "unclean-topic";
+        int partition = 0;
+        List<Integer> assignment = Arrays.asList(broker2, broker3);
+
+        cluster.waitForReadyBrokers();
+        Map<Integer, List<Integer>> partitionAssignment = new HashMap<>();
+        partitionAssignment.put(partition, assignment);
+
+        Admin client = cluster.createAdminClient();
+        createTopic(client, topic, partitionAssignment);
+
+        TopicPartition topicPartition = new TopicPartition(topic, partition);
+
+        TestUtils.assertLeader(client, topicPartition, broker2);
+
+        cluster.shutdownBroker(broker3);
+        TestUtils.waitForBrokersOutOfIsr(client,
+            JavaConverters.asScalaBuffer(Collections.singletonList(topicPartition)).toSet(),
+            JavaConverters.asScalaBuffer(Collections.singletonList(broker3)).toSet()
+        );
+        cluster.shutdownBroker(broker2);
+        TestUtils.assertNoLeader(client, topicPartition);
+        cluster.startBroker(broker3);
+        TestUtils.waitForOnlineBroker(client, broker3);
+
+        Path topicPartitionPath = tempTopicPartitionFile(Collections.singletonList(topicPartition));
+
+        LeaderElectionCommand.main(
+            new String[] {
+                "--bootstrap-server", cluster.bootstrapServers(),
+                "--election-type", "unclean",
+                "--path-to-json-file", topicPartitionPath.toString()
+            }
+        );
+
+        TestUtils.assertLeader(client, topicPartition, broker3);
+    }
+
+    @ClusterTest
+    public void testPreferredReplicaElection() throws InterruptedException, ExecutionException {
+        String topic = "preferred-topic";
+        int partition = 0;
+        List<Integer> assignment = Arrays.asList(broker2, broker3);
+
+        cluster.waitForReadyBrokers();
+        Admin client = cluster.createAdminClient();
+        Map<Integer, List<Integer>> partitionAssignment = new HashMap<>();
+        partitionAssignment.put(partition, assignment);
+
+        createTopic(client, topic, partitionAssignment);
+
+        TopicPartition topicPartition = new TopicPartition(topic, partition);
+
+        TestUtils.assertLeader(client, topicPartition, broker2);
+
+        cluster.shutdownBroker(broker2);
+        TestUtils.assertLeader(client, topicPartition, broker3);
+        cluster.startBroker(broker2);
+        TestUtils.waitForBrokersInIsr(client, topicPartition,
+            JavaConverters.asScalaBuffer(Collections.singletonList(broker2)).toSet()
+        );
+
+        LeaderElectionCommand.main(
+            new String[] {
+                "--bootstrap-server", cluster.bootstrapServers(),
+                "--election-type", "preferred",
+                "--all-topic-partitions"
+            }
+        );
+
+        TestUtils.assertLeader(client, topicPartition, broker2);
+    }
+
+    @ClusterTest
+    public void testTopicDoesNotExist() {
+        Throwable e =  assertThrows(AdminCommandFailedException.class, () -> LeaderElectionCommand.run(
+            Duration.ofSeconds(30),
+            new String[] {
+                "--bootstrap-server", cluster.bootstrapServers(),
+                "--election-type", "preferred",
+                "--topic", "unknown-topic-name",
+                "--partition", "0"
+            }));
+        assertTrue(e.getSuppressed()[0] instanceof UnknownTopicOrPartitionException);
+    }
+
+    @ClusterTest
+    public void testElectionResultOutput() throws Exception {
+        String topic = "non-preferred-topic";
+        int partition0 = 0;
+        int partition1 = 1;
+        List<Integer> assignment0 = Arrays.asList(broker2, broker3);
+        List<Integer> assignment1 = Arrays.asList(broker3, broker2);
+
+        cluster.waitForReadyBrokers();
+        Admin client = cluster.createAdminClient();
+        Map<Integer, List<Integer>> partitionAssignment = new HashMap<Integer, List<Integer>>();
+        partitionAssignment.put(partition0, assignment0);
+        partitionAssignment.put(partition1, assignment1);
+
+        createTopic(client, topic, partitionAssignment);
+
+        TopicPartition topicPartition0 = new TopicPartition(topic, partition0);
+        TopicPartition topicPartition1 = new TopicPartition(topic, partition1);
+
+        TestUtils.assertLeader(client, topicPartition0, broker2);
+        TestUtils.assertLeader(client, topicPartition1, broker3);
+
+        cluster.shutdownBroker(broker2);
+        TestUtils.assertLeader(client, topicPartition0, broker3);
+        cluster.startBroker(broker2);
+        TestUtils.waitForBrokersInIsr(client, topicPartition0,
+            JavaConverters.asScalaBuffer(Collections.singletonList(broker2)).toSet()
+        );
+        TestUtils.waitForBrokersInIsr(client, topicPartition1,
+            JavaConverters.asScalaBuffer(Collections.singletonList(broker2)).toSet()
+        );
+
+        Path topicPartitionPath = tempTopicPartitionFile(Arrays.asList(topicPartition0, topicPartition1));
+        String output = ToolsTestUtils.captureStandardOut(() ->
+            LeaderElectionCommand.main(
+                new String[] {
+                    "--bootstrap-server", cluster.bootstrapServers(),
+                    "--election-type", "preferred",
+                    "--path-to-json-file", topicPartitionPath.toString()
+                }
+            )
+        );
+
+        Iterator<String> electionResultOutputIter = Arrays.stream(output.split("\n")).iterator();
+
+        assertTrue(electionResultOutputIter.hasNext());
+        String firstLine = electionResultOutputIter.next();
+        assertTrue(firstLine.contains(String.format(
+            "Successfully completed leader election (PREFERRED) for partitions %s", topicPartition0)),
+            String.format("Unexpected output: %s", firstLine));
+
+        assertTrue(electionResultOutputIter.hasNext());
+        String secondLine = electionResultOutputIter.next();
+        assertTrue(secondLine.contains(String.format("Valid replica already elected for partitions %s", topicPartition1)),
+            String.format("Unexpected output: %s", secondLine));
+    }
+    private static Map<String, Object> createConfig(List<KafkaServer> servers) {

Review Comment:
   You are right we don't need it anymore. I also removed the dependency on `kafka.server`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] OmniaGM commented on a diff in pull request #13204: KAFKA-14593: Move LeaderElectionCommand to tools

Posted by "OmniaGM (via GitHub)" <gi...@apache.org>.
OmniaGM commented on code in PR #13204:
URL: https://github.com/apache/kafka/pull/13204#discussion_r1288887595


##########
tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java:
##########
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.databind.JsonMappingException;
+import joptsimple.AbstractOptionSpec;
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpecBuilder;
+import joptsimple.util.EnumConverter;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.ElectionNotNeededException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.apache.kafka.server.common.AdminOperationException;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.Json;
+import org.apache.kafka.server.util.json.DecodeJson;
+import org.apache.kafka.server.util.json.JsonObject;
+import org.apache.kafka.server.util.json.JsonValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+public class LeaderElectionCommand {
+    private static final Logger LOG = LoggerFactory.getLogger(LeaderElectionCommand.class);
+    private static final DecodeJson.DecodeString STRING = new DecodeJson.DecodeString();
+    private static final DecodeJson.DecodeInteger INT = new DecodeJson.DecodeInteger();
+
+    public static void main(String... args) {
+        try {
+            run(Duration.ofMillis(30000), args);
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+        } catch (Exception e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+        }
+    }
+
+    static void run(Duration timeoutMs, String... args) throws Exception {
+        LeaderElectionCommandOptions commandOptions = new LeaderElectionCommandOptions(args);
+
+        commandOptions.maybePrintHelpOrVersion();
+
+        commandOptions.validate();
+        ElectionType electionType = commandOptions.getElectionType();
+        Optional<Set<TopicPartition>> jsonFileTopicPartitions =
+            Optional.ofNullable(commandOptions.getPathToJsonFile())
+                .map(path -> parseReplicaElectionData(path));
+
+        Optional<String> topicOption = Optional.ofNullable(commandOptions.getTopic());
+        Optional<Integer> partitionOption = Optional.ofNullable(commandOptions.getPartition());
+        final Optional<Set<TopicPartition>> singleTopicPartition =
+            (topicOption.isPresent() && partitionOption.isPresent()) ?
+                Optional.of(Collections.singleton(new TopicPartition(topicOption.get(), partitionOption.get()))) :
+                Optional.empty();
+
+        /* Note: No need to look at --all-topic-partitions as we want this to be null if it is use.
+         * The validate function should be checking that this option is required if the --topic and --path-to-json-file
+         * are not specified.
+         */
+        Optional<Set<TopicPartition>> topicPartitions = jsonFileTopicPartitions.map(Optional::of).orElse(singleTopicPartition);
+
+        Properties props = new Properties();
+        if (commandOptions.hasAdminClientConfig()) {
+            props.putAll(Utils.loadProps(commandOptions.getAdminClientConfig()));
+        }
+        props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, commandOptions.getBootstrapServer());
+        props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, Long.toString(timeoutMs.toMillis()));
+        props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, Long.toString(timeoutMs.toMillis() / 2));
+
+        try (Admin adminClient = Admin.create(props)) {
+            electLeaders(adminClient, electionType, topicPartitions);
+        }
+    }
+
+    private static void electLeaders(Admin client, ElectionType electionType, Optional<Set<TopicPartition>> partitions) {
+        LOG.debug(String.format("Calling AdminClient.electLeaders(%s, %s)", electionType, partitions.orElse(null)));
+        Map<TopicPartition, Optional<Throwable>> electionResults;
+        try {
+            electionResults = client.electLeaders(electionType, partitions.orElse(null)).partitions().get();
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof TimeoutException) {
+                String message = "Timeout waiting for election results";
+                System.err.println(message);

Review Comment:
   Switched to standard out. I think it is still wrong to print it as standard out



##########
tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java:
##########
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.databind.JsonMappingException;
+import joptsimple.AbstractOptionSpec;
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpecBuilder;
+import joptsimple.util.EnumConverter;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.ElectionNotNeededException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.apache.kafka.server.common.AdminOperationException;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.Json;
+import org.apache.kafka.server.util.json.DecodeJson;
+import org.apache.kafka.server.util.json.JsonObject;
+import org.apache.kafka.server.util.json.JsonValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+public class LeaderElectionCommand {
+    private static final Logger LOG = LoggerFactory.getLogger(LeaderElectionCommand.class);
+    private static final DecodeJson.DecodeString STRING = new DecodeJson.DecodeString();
+    private static final DecodeJson.DecodeInteger INT = new DecodeJson.DecodeInteger();
+
+    public static void main(String... args) {
+        try {
+            run(Duration.ofMillis(30000), args);
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+        } catch (Exception e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+        }
+    }
+
+    static void run(Duration timeoutMs, String... args) throws Exception {
+        LeaderElectionCommandOptions commandOptions = new LeaderElectionCommandOptions(args);
+
+        commandOptions.maybePrintHelpOrVersion();
+
+        commandOptions.validate();
+        ElectionType electionType = commandOptions.getElectionType();
+        Optional<Set<TopicPartition>> jsonFileTopicPartitions =
+            Optional.ofNullable(commandOptions.getPathToJsonFile())
+                .map(path -> parseReplicaElectionData(path));
+
+        Optional<String> topicOption = Optional.ofNullable(commandOptions.getTopic());
+        Optional<Integer> partitionOption = Optional.ofNullable(commandOptions.getPartition());
+        final Optional<Set<TopicPartition>> singleTopicPartition =
+            (topicOption.isPresent() && partitionOption.isPresent()) ?
+                Optional.of(Collections.singleton(new TopicPartition(topicOption.get(), partitionOption.get()))) :
+                Optional.empty();
+
+        /* Note: No need to look at --all-topic-partitions as we want this to be null if it is use.
+         * The validate function should be checking that this option is required if the --topic and --path-to-json-file
+         * are not specified.
+         */
+        Optional<Set<TopicPartition>> topicPartitions = jsonFileTopicPartitions.map(Optional::of).orElse(singleTopicPartition);
+
+        Properties props = new Properties();
+        if (commandOptions.hasAdminClientConfig()) {
+            props.putAll(Utils.loadProps(commandOptions.getAdminClientConfig()));
+        }
+        props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, commandOptions.getBootstrapServer());
+        props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, Long.toString(timeoutMs.toMillis()));
+        props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, Long.toString(timeoutMs.toMillis() / 2));
+
+        try (Admin adminClient = Admin.create(props)) {
+            electLeaders(adminClient, electionType, topicPartitions);
+        }
+    }
+
+    private static void electLeaders(Admin client, ElectionType electionType, Optional<Set<TopicPartition>> partitions) {
+        LOG.debug(String.format("Calling AdminClient.electLeaders(%s, %s)", electionType, partitions.orElse(null)));
+        Map<TopicPartition, Optional<Throwable>> electionResults;
+        try {
+            electionResults = client.electLeaders(electionType, partitions.orElse(null)).partitions().get();
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof TimeoutException) {
+                String message = "Timeout waiting for election results";
+                System.err.println(message);
+                throw new AdminCommandFailedException(message, e.getCause());
+            } else if (e.getCause() instanceof ClusterAuthorizationException) {
+                String message = "Not authorized to perform leader election";
+                System.err.println(message);
+                throw new AdminCommandFailedException(message, e.getCause().getCause());
+            } else {
+                throw new RuntimeException(e);
+            }
+        } catch (InterruptedException e) {
+            System.err.println("Error while making request");

Review Comment:
   Switched to standard out. I think it is still wrong to print it as standard out



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] OmniaGM commented on a diff in pull request #13204: KAFKA-14593: Move LeaderElectionCommand to tools

Posted by "OmniaGM (via GitHub)" <gi...@apache.org>.
OmniaGM commented on code in PR #13204:
URL: https://github.com/apache/kafka/pull/13204#discussion_r1294456594


##########
tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandErrorTest.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * For some error cases, we can save a little build time by avoiding the overhead for
+ * cluster creation and cleanup because the command is expected to fail immediately.
+ */
+public class LeaderElectionCommandErrorTest {
+    @Test
+    public void testTopicWithoutPartition() {
+        String out = ToolsTestUtils.captureStandardErr(() -> LeaderElectionCommand.main(
+                "--bootstrap-server", "nohost:9092",
+                "--election-type", "unclean",
+                "--topic", "some-topic"
+            ));
+        assertTrue(out.startsWith("Missing required option(s)"));
+        assertTrue(out.contains(" partition"));
+    }
+
+    @Test
+    public void testPartitionWithoutTopic() {
+        String out = ToolsTestUtils.captureStandardErr(() -> LeaderElectionCommand.main(
+                "--bootstrap-server", "nohost:9092",
+                "--election-type", "unclean",
+                "--all-topic-partitions",
+                "--partition", "0"
+        ));
+        String[] rows = out.split("\n");

Review Comment:
   Updated to use the suggested assertion. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] OmniaGM commented on a diff in pull request #13204: KAFKA-14593: Move LeaderElectionCommand to tools

Posted by "OmniaGM (via GitHub)" <gi...@apache.org>.
OmniaGM commented on code in PR #13204:
URL: https://github.com/apache/kafka/pull/13204#discussion_r1288887858


##########
tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java:
##########
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.databind.JsonMappingException;
+import joptsimple.AbstractOptionSpec;
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpecBuilder;
+import joptsimple.util.EnumConverter;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.ElectionNotNeededException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.apache.kafka.server.common.AdminOperationException;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.Json;
+import org.apache.kafka.server.util.json.DecodeJson;
+import org.apache.kafka.server.util.json.JsonObject;
+import org.apache.kafka.server.util.json.JsonValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+public class LeaderElectionCommand {
+    private static final Logger LOG = LoggerFactory.getLogger(LeaderElectionCommand.class);
+    private static final DecodeJson.DecodeString STRING = new DecodeJson.DecodeString();
+    private static final DecodeJson.DecodeInteger INT = new DecodeJson.DecodeInteger();
+
+    public static void main(String... args) {
+        try {
+            run(Duration.ofMillis(30000), args);
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+        } catch (Exception e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+        }
+    }
+
+    static void run(Duration timeoutMs, String... args) throws Exception {
+        LeaderElectionCommandOptions commandOptions = new LeaderElectionCommandOptions(args);
+
+        commandOptions.maybePrintHelpOrVersion();
+
+        commandOptions.validate();
+        ElectionType electionType = commandOptions.getElectionType();
+        Optional<Set<TopicPartition>> jsonFileTopicPartitions =
+            Optional.ofNullable(commandOptions.getPathToJsonFile())
+                .map(path -> parseReplicaElectionData(path));
+
+        Optional<String> topicOption = Optional.ofNullable(commandOptions.getTopic());
+        Optional<Integer> partitionOption = Optional.ofNullable(commandOptions.getPartition());
+        final Optional<Set<TopicPartition>> singleTopicPartition =
+            (topicOption.isPresent() && partitionOption.isPresent()) ?
+                Optional.of(Collections.singleton(new TopicPartition(topicOption.get(), partitionOption.get()))) :
+                Optional.empty();
+
+        /* Note: No need to look at --all-topic-partitions as we want this to be null if it is use.
+         * The validate function should be checking that this option is required if the --topic and --path-to-json-file
+         * are not specified.
+         */
+        Optional<Set<TopicPartition>> topicPartitions = jsonFileTopicPartitions.map(Optional::of).orElse(singleTopicPartition);
+
+        Properties props = new Properties();
+        if (commandOptions.hasAdminClientConfig()) {
+            props.putAll(Utils.loadProps(commandOptions.getAdminClientConfig()));
+        }
+        props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, commandOptions.getBootstrapServer());
+        props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, Long.toString(timeoutMs.toMillis()));
+        props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, Long.toString(timeoutMs.toMillis() / 2));
+
+        try (Admin adminClient = Admin.create(props)) {
+            electLeaders(adminClient, electionType, topicPartitions);
+        }
+    }
+
+    private static void electLeaders(Admin client, ElectionType electionType, Optional<Set<TopicPartition>> partitions) {
+        LOG.debug(String.format("Calling AdminClient.electLeaders(%s, %s)", electionType, partitions.orElse(null)));
+        Map<TopicPartition, Optional<Throwable>> electionResults;
+        try {
+            electionResults = client.electLeaders(electionType, partitions.orElse(null)).partitions().get();
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof TimeoutException) {
+                String message = "Timeout waiting for election results";
+                System.err.println(message);
+                throw new AdminCommandFailedException(message, e.getCause());
+            } else if (e.getCause() instanceof ClusterAuthorizationException) {
+                String message = "Not authorized to perform leader election";
+                System.err.println(message);

Review Comment:
   Switched to standard out. I think it is still wrong to print it as standard out



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] OmniaGM commented on a diff in pull request #13204: KAFKA-14593: Move LeaderElectionCommand to tools

Posted by "OmniaGM (via GitHub)" <gi...@apache.org>.
OmniaGM commented on code in PR #13204:
URL: https://github.com/apache/kafka/pull/13204#discussion_r1294455955


##########
tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java:
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.test.ClusterConfig;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import kafka.utils.TestUtils;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.extension.ExtendWith;
+import scala.collection.JavaConverters;
+
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+@SuppressWarnings("deprecation")
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ALL, brokers = 3)
+@Tag("integration")
+public class LeaderElectionCommandTest {
+    private final ClusterInstance cluster;
+    int broker1 = 0;
+    int broker2 = 1;
+    int broker3 = 2;
+
+    public LeaderElectionCommandTest(ClusterInstance cluster) {
+        this.cluster = cluster;
+    }
+
+    @BeforeEach
+    void setup(ClusterConfig clusterConfig) {
+        TestUtils.verifyNoUnexpectedThreads("@BeforeEach");
+        clusterConfig.serverProperties().put(KafkaConfig.AutoLeaderRebalanceEnableProp(), "false");
+        clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownEnableProp(), "true");
+        clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownMaxRetriesProp(), "1");
+        clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownRetryBackoffMsProp(), "1000");
+        clusterConfig.serverProperties().put(KafkaConfig.OffsetsTopicReplicationFactorProp(), "2");
+    }
+
+    @ClusterTest
+    public void testAllTopicPartition() throws InterruptedException, ExecutionException {
+        String topic = "unclean-topic";
+        int partition = 0;
+        List<Integer> assignment = Arrays.asList(broker2, broker3);
+
+        cluster.waitForReadyBrokers();
+        Admin client = cluster.createAdminClient();
+        Map<Integer, List<Integer>> partitionAssignment = new HashMap<>();
+        partitionAssignment.put(partition, assignment);
+
+        createTopic(client, topic, partitionAssignment);
+
+        TopicPartition topicPartition = new TopicPartition(topic, partition);
+
+        TestUtils.assertLeader(client, topicPartition, broker2);
+        cluster.shutdownBroker(broker3);
+        TestUtils.waitForBrokersOutOfIsr(client,
+            JavaConverters.asScalaBuffer(Collections.singletonList(topicPartition)).toSet(),
+            JavaConverters.asScalaBuffer(Collections.singletonList(broker3)).toSet()
+        );
+        cluster.shutdownBroker(broker2);
+        TestUtils.assertNoLeader(client, topicPartition);
+        cluster.startBroker(broker3);
+        TestUtils.waitForOnlineBroker(client, broker3);
+
+        LeaderElectionCommand.main(
+            new String[] {

Review Comment:
   Updated this 



##########
tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java:
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.test.ClusterConfig;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import kafka.utils.TestUtils;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.extension.ExtendWith;
+import scala.collection.JavaConverters;
+
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+@SuppressWarnings("deprecation")
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ALL, brokers = 3)
+@Tag("integration")
+public class LeaderElectionCommandTest {
+    private final ClusterInstance cluster;
+    int broker1 = 0;
+    int broker2 = 1;
+    int broker3 = 2;
+
+    public LeaderElectionCommandTest(ClusterInstance cluster) {
+        this.cluster = cluster;
+    }
+
+    @BeforeEach
+    void setup(ClusterConfig clusterConfig) {
+        TestUtils.verifyNoUnexpectedThreads("@BeforeEach");
+        clusterConfig.serverProperties().put(KafkaConfig.AutoLeaderRebalanceEnableProp(), "false");
+        clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownEnableProp(), "true");
+        clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownMaxRetriesProp(), "1");
+        clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownRetryBackoffMsProp(), "1000");
+        clusterConfig.serverProperties().put(KafkaConfig.OffsetsTopicReplicationFactorProp(), "2");
+    }
+
+    @ClusterTest
+    public void testAllTopicPartition() throws InterruptedException, ExecutionException {
+        String topic = "unclean-topic";
+        int partition = 0;
+        List<Integer> assignment = Arrays.asList(broker2, broker3);
+
+        cluster.waitForReadyBrokers();
+        Admin client = cluster.createAdminClient();
+        Map<Integer, List<Integer>> partitionAssignment = new HashMap<>();
+        partitionAssignment.put(partition, assignment);
+
+        createTopic(client, topic, partitionAssignment);
+
+        TopicPartition topicPartition = new TopicPartition(topic, partition);
+
+        TestUtils.assertLeader(client, topicPartition, broker2);
+        cluster.shutdownBroker(broker3);
+        TestUtils.waitForBrokersOutOfIsr(client,
+            JavaConverters.asScalaBuffer(Collections.singletonList(topicPartition)).toSet(),
+            JavaConverters.asScalaBuffer(Collections.singletonList(broker3)).toSet()
+        );
+        cluster.shutdownBroker(broker2);
+        TestUtils.assertNoLeader(client, topicPartition);
+        cluster.startBroker(broker3);
+        TestUtils.waitForOnlineBroker(client, broker3);
+
+        LeaderElectionCommand.main(
+            new String[] {
+                "--bootstrap-server", cluster.bootstrapServers(),
+                "--election-type", "unclean",
+                "--all-topic-partitions"
+            }
+        );
+
+        TestUtils.assertLeader(client, topicPartition, broker3);
+    }
+
+    @ClusterTest
+    public void testTopicPartition() throws InterruptedException, ExecutionException {
+        String topic = "unclean-topic";
+        int partition = 0;
+        List<Integer> assignment = Arrays.asList(broker2, broker3);
+
+        cluster.waitForReadyBrokers();
+        Map<Integer, List<Integer>> partitionAssignment = new HashMap<>();
+        partitionAssignment.put(partition, assignment);
+        Admin client = cluster.createAdminClient();
+        createTopic(client, topic, partitionAssignment);
+
+        TopicPartition topicPartition = new TopicPartition(topic, partition);
+
+        TestUtils.assertLeader(client, topicPartition, broker2);
+
+        cluster.shutdownBroker(broker3);
+        TestUtils.waitForBrokersOutOfIsr(client,
+            JavaConverters.asScalaBuffer(Collections.singletonList(topicPartition)).toSet(),
+            JavaConverters.asScalaBuffer(Collections.singletonList(broker3)).toSet()
+        );
+        cluster.shutdownBroker(broker2);
+        TestUtils.assertNoLeader(client, topicPartition);
+        cluster.startBroker(broker3);
+        TestUtils.waitForOnlineBroker(client, broker3);
+
+        LeaderElectionCommand.main(
+            new String[] {
+                "--bootstrap-server", cluster.bootstrapServers(),
+                "--election-type", "unclean",
+                "--topic", topic,
+                "--partition", Integer.toString(partition)
+            }
+        );
+
+        TestUtils.assertLeader(client, topicPartition, broker3);
+    }
+
+    @ClusterTest
+    public void testPathToJsonFile() throws Exception {
+        String topic = "unclean-topic";
+        int partition = 0;
+        List<Integer> assignment = Arrays.asList(broker2, broker3);
+
+        cluster.waitForReadyBrokers();
+        Map<Integer, List<Integer>> partitionAssignment = new HashMap<>();
+        partitionAssignment.put(partition, assignment);
+
+        Admin client = cluster.createAdminClient();
+        createTopic(client, topic, partitionAssignment);
+
+        TopicPartition topicPartition = new TopicPartition(topic, partition);
+
+        TestUtils.assertLeader(client, topicPartition, broker2);
+
+        cluster.shutdownBroker(broker3);
+        TestUtils.waitForBrokersOutOfIsr(client,
+            JavaConverters.asScalaBuffer(Collections.singletonList(topicPartition)).toSet(),
+            JavaConverters.asScalaBuffer(Collections.singletonList(broker3)).toSet()
+        );
+        cluster.shutdownBroker(broker2);
+        TestUtils.assertNoLeader(client, topicPartition);
+        cluster.startBroker(broker3);
+        TestUtils.waitForOnlineBroker(client, broker3);
+
+        Path topicPartitionPath = tempTopicPartitionFile(Collections.singletonList(topicPartition));
+
+        LeaderElectionCommand.main(
+            new String[] {
+                "--bootstrap-server", cluster.bootstrapServers(),
+                "--election-type", "unclean",
+                "--path-to-json-file", topicPartitionPath.toString()
+            }
+        );
+
+        TestUtils.assertLeader(client, topicPartition, broker3);
+    }
+
+    @ClusterTest
+    public void testPreferredReplicaElection() throws InterruptedException, ExecutionException {
+        String topic = "preferred-topic";
+        int partition = 0;
+        List<Integer> assignment = Arrays.asList(broker2, broker3);
+
+        cluster.waitForReadyBrokers();
+        Admin client = cluster.createAdminClient();
+        Map<Integer, List<Integer>> partitionAssignment = new HashMap<>();
+        partitionAssignment.put(partition, assignment);
+
+        createTopic(client, topic, partitionAssignment);
+
+        TopicPartition topicPartition = new TopicPartition(topic, partition);
+
+        TestUtils.assertLeader(client, topicPartition, broker2);
+
+        cluster.shutdownBroker(broker2);
+        TestUtils.assertLeader(client, topicPartition, broker3);
+        cluster.startBroker(broker2);
+        TestUtils.waitForBrokersInIsr(client, topicPartition,
+            JavaConverters.asScalaBuffer(Collections.singletonList(broker2)).toSet()
+        );
+
+        LeaderElectionCommand.main(
+            new String[] {
+                "--bootstrap-server", cluster.bootstrapServers(),
+                "--election-type", "preferred",
+                "--all-topic-partitions"
+            }
+        );
+
+        TestUtils.assertLeader(client, topicPartition, broker2);
+    }
+
+    @ClusterTest
+    public void testTopicDoesNotExist() {
+        Throwable e =  assertThrows(AdminCommandFailedException.class, () -> LeaderElectionCommand.run(
+            Duration.ofSeconds(30),
+            new String[] {
+                "--bootstrap-server", cluster.bootstrapServers(),
+                "--election-type", "preferred",
+                "--topic", "unknown-topic-name",
+                "--partition", "0"
+            }));
+        assertTrue(e.getSuppressed()[0] instanceof UnknownTopicOrPartitionException);
+    }
+
+    @ClusterTest
+    public void testElectionResultOutput() throws Exception {
+        String topic = "non-preferred-topic";
+        int partition0 = 0;
+        int partition1 = 1;
+        List<Integer> assignment0 = Arrays.asList(broker2, broker3);
+        List<Integer> assignment1 = Arrays.asList(broker3, broker2);
+
+        cluster.waitForReadyBrokers();
+        Admin client = cluster.createAdminClient();
+        Map<Integer, List<Integer>> partitionAssignment = new HashMap<Integer, List<Integer>>();

Review Comment:
   removed the type on the right



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] OmniaGM commented on a diff in pull request #13204: KAFKA-14593: Move LeaderElectionCommand to tools

Posted by "OmniaGM (via GitHub)" <gi...@apache.org>.
OmniaGM commented on code in PR #13204:
URL: https://github.com/apache/kafka/pull/13204#discussion_r1103882891


##########
tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java:
##########
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import joptsimple.AbstractOptionSpec;
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpecBuilder;
+import joptsimple.util.EnumConverter;
+import kafka.admin.AdminOperationException;
+import kafka.common.AdminCommandFailedException;
+import kafka.utils.CoreUtils;
+import kafka.utils.Json;
+import kafka.utils.json.DecodeJson;
+import kafka.utils.json.JsonObject;
+import kafka.utils.json.JsonValue;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.ElectionNotNeededException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.collection.Iterable;
+import scala.collection.Iterator;
+import scala.collection.JavaConverters;
+import scala.collection.mutable.Seq;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+public class LeaderElectionCommand {
+    private static final Logger log = LoggerFactory.getLogger(LeaderElectionCommand.class);

Review Comment:
   Removed it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] github-actions[bot] commented on pull request #13204: KAFKA-14593: Move LeaderElectionCommand to tools

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #13204:
URL: https://github.com/apache/kafka/pull/13204#issuecomment-1585813226

   This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has  merge conflicts, please update it with the latest from trunk (or appropriate release branch)
   If this PR is no longer valid or desired, please feel free to close it. If no activity occurrs in the next 30 days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13204: KAFKA-14593: Move LeaderElectionCommand to tools

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on code in PR #13204:
URL: https://github.com/apache/kafka/pull/13204#discussion_r1283120724


##########
tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java:
##########
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.databind.JsonMappingException;
+import joptsimple.AbstractOptionSpec;
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpecBuilder;
+import joptsimple.util.EnumConverter;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.ElectionNotNeededException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.apache.kafka.server.common.AdminOperationException;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.Json;
+import org.apache.kafka.server.util.json.DecodeJson;
+import org.apache.kafka.server.util.json.JsonObject;
+import org.apache.kafka.server.util.json.JsonValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+public class LeaderElectionCommand {
+    private static final Logger LOG = LoggerFactory.getLogger(LeaderElectionCommand.class);
+    private static final DecodeJson.DecodeString STRING = new DecodeJson.DecodeString();
+    private static final DecodeJson.DecodeInteger INT = new DecodeJson.DecodeInteger();
+
+    public static void main(String... args) {
+        try {
+            run(Duration.ofMillis(30000), args);
+        } catch (TerseException e) {

Review Comment:
   We don't raise any TerseException, so I think we can get rid of this block.



##########
tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java:
##########
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.databind.JsonMappingException;
+import joptsimple.AbstractOptionSpec;
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpecBuilder;
+import joptsimple.util.EnumConverter;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.ElectionNotNeededException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.apache.kafka.server.common.AdminOperationException;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.Json;
+import org.apache.kafka.server.util.json.DecodeJson;
+import org.apache.kafka.server.util.json.JsonObject;
+import org.apache.kafka.server.util.json.JsonValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+public class LeaderElectionCommand {
+    private static final Logger LOG = LoggerFactory.getLogger(LeaderElectionCommand.class);
+    private static final DecodeJson.DecodeString STRING = new DecodeJson.DecodeString();
+    private static final DecodeJson.DecodeInteger INT = new DecodeJson.DecodeInteger();
+
+    public static void main(String... args) {
+        try {
+            run(Duration.ofMillis(30000), args);
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+        } catch (Exception e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+        }
+    }
+
+    static void run(Duration timeoutMs, String... args) throws Exception {
+        LeaderElectionCommandOptions commandOptions = new LeaderElectionCommandOptions(args);
+
+        commandOptions.maybePrintHelpOrVersion();
+
+        commandOptions.validate();
+        ElectionType electionType = commandOptions.getElectionType();
+        Optional<Set<TopicPartition>> jsonFileTopicPartitions =
+            Optional.ofNullable(commandOptions.getPathToJsonFile())
+                .map(path -> parseReplicaElectionData(path));
+
+        Optional<String> topicOption = Optional.ofNullable(commandOptions.getTopic());
+        Optional<Integer> partitionOption = Optional.ofNullable(commandOptions.getPartition());
+        final Optional<Set<TopicPartition>> singleTopicPartition =
+            (topicOption.isPresent() && partitionOption.isPresent()) ?
+                Optional.of(Collections.singleton(new TopicPartition(topicOption.get(), partitionOption.get()))) :
+                Optional.empty();
+
+        /* Note: No need to look at --all-topic-partitions as we want this to be null if it is use.
+         * The validate function should be checking that this option is required if the --topic and --path-to-json-file
+         * are not specified.
+         */
+        Optional<Set<TopicPartition>> topicPartitions = jsonFileTopicPartitions.map(Optional::of).orElse(singleTopicPartition);
+
+        Properties props = new Properties();
+        if (commandOptions.hasAdminClientConfig()) {
+            props.putAll(Utils.loadProps(commandOptions.getAdminClientConfig()));
+        }
+        props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, commandOptions.getBootstrapServer());
+        props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, Long.toString(timeoutMs.toMillis()));
+        props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, Long.toString(timeoutMs.toMillis() / 2));
+
+        try (Admin adminClient = Admin.create(props)) {
+            electLeaders(adminClient, electionType, topicPartitions);
+        }
+    }
+
+    private static void electLeaders(Admin client, ElectionType electionType, Optional<Set<TopicPartition>> partitions) {
+        LOG.debug(String.format("Calling AdminClient.electLeaders(%s, %s)", electionType, partitions.orElse(null)));

Review Comment:
   Why we need this extra logging?



##########
tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java:
##########
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.databind.JsonMappingException;
+import joptsimple.AbstractOptionSpec;
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpecBuilder;
+import joptsimple.util.EnumConverter;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.ElectionNotNeededException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.apache.kafka.server.common.AdminOperationException;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.Json;
+import org.apache.kafka.server.util.json.DecodeJson;
+import org.apache.kafka.server.util.json.JsonObject;
+import org.apache.kafka.server.util.json.JsonValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+public class LeaderElectionCommand {
+    private static final Logger LOG = LoggerFactory.getLogger(LeaderElectionCommand.class);
+    private static final DecodeJson.DecodeString STRING = new DecodeJson.DecodeString();
+    private static final DecodeJson.DecodeInteger INT = new DecodeJson.DecodeInteger();
+
+    public static void main(String... args) {
+        try {
+            run(Duration.ofMillis(30000), args);
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+        } catch (Exception e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+        }
+    }
+
+    static void run(Duration timeoutMs, String... args) throws Exception {
+        LeaderElectionCommandOptions commandOptions = new LeaderElectionCommandOptions(args);
+
+        commandOptions.maybePrintHelpOrVersion();
+
+        commandOptions.validate();
+        ElectionType electionType = commandOptions.getElectionType();
+        Optional<Set<TopicPartition>> jsonFileTopicPartitions =
+            Optional.ofNullable(commandOptions.getPathToJsonFile())
+                .map(path -> parseReplicaElectionData(path));
+
+        Optional<String> topicOption = Optional.ofNullable(commandOptions.getTopic());
+        Optional<Integer> partitionOption = Optional.ofNullable(commandOptions.getPartition());
+        final Optional<Set<TopicPartition>> singleTopicPartition =
+            (topicOption.isPresent() && partitionOption.isPresent()) ?
+                Optional.of(Collections.singleton(new TopicPartition(topicOption.get(), partitionOption.get()))) :
+                Optional.empty();
+
+        /* Note: No need to look at --all-topic-partitions as we want this to be null if it is use.
+         * The validate function should be checking that this option is required if the --topic and --path-to-json-file
+         * are not specified.
+         */
+        Optional<Set<TopicPartition>> topicPartitions = jsonFileTopicPartitions.map(Optional::of).orElse(singleTopicPartition);
+
+        Properties props = new Properties();
+        if (commandOptions.hasAdminClientConfig()) {
+            props.putAll(Utils.loadProps(commandOptions.getAdminClientConfig()));
+        }
+        props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, commandOptions.getBootstrapServer());
+        props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, Long.toString(timeoutMs.toMillis()));
+        props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, Long.toString(timeoutMs.toMillis() / 2));
+
+        try (Admin adminClient = Admin.create(props)) {
+            electLeaders(adminClient, electionType, topicPartitions);
+        }
+    }
+
+    private static void electLeaders(Admin client, ElectionType electionType, Optional<Set<TopicPartition>> partitions) {
+        LOG.debug(String.format("Calling AdminClient.electLeaders(%s, %s)", electionType, partitions.orElse(null)));
+        Map<TopicPartition, Optional<Throwable>> electionResults;
+        try {
+            electionResults = client.electLeaders(electionType, partitions.orElse(null)).partitions().get();
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof TimeoutException) {
+                String message = "Timeout waiting for election results";
+                System.err.println(message);

Review Comment:
   This was printed on standard out. Maybe it's not correct, but we risk breaking custom tools parsing the output.



##########
tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandErrorTest.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * For some error cases, we can save a little build time by avoiding the overhead for
+ * cluster creation and cleanup because the command is expected to fail immediately.
+ */
+public class LeaderElectionCommandErrorTest {
+    @Test
+    public void testTopicWithoutPartition() {
+        String out = ToolsTestUtils.captureStandardErr(() -> LeaderElectionCommand.main(
+            new String[] {

Review Comment:
   The array is redundant here, you can simply pass the options.



##########
tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java:
##########
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.databind.JsonMappingException;
+import joptsimple.AbstractOptionSpec;
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpecBuilder;
+import joptsimple.util.EnumConverter;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.ElectionNotNeededException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.apache.kafka.server.common.AdminOperationException;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.Json;
+import org.apache.kafka.server.util.json.DecodeJson;
+import org.apache.kafka.server.util.json.JsonObject;
+import org.apache.kafka.server.util.json.JsonValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+public class LeaderElectionCommand {
+    private static final Logger LOG = LoggerFactory.getLogger(LeaderElectionCommand.class);
+    private static final DecodeJson.DecodeString STRING = new DecodeJson.DecodeString();
+    private static final DecodeJson.DecodeInteger INT = new DecodeJson.DecodeInteger();
+
+    public static void main(String... args) {
+        try {
+            run(Duration.ofMillis(30000), args);
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+        } catch (Exception e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+        }
+    }
+
+    static void run(Duration timeoutMs, String... args) throws Exception {
+        LeaderElectionCommandOptions commandOptions = new LeaderElectionCommandOptions(args);
+
+        commandOptions.maybePrintHelpOrVersion();
+
+        commandOptions.validate();
+        ElectionType electionType = commandOptions.getElectionType();
+        Optional<Set<TopicPartition>> jsonFileTopicPartitions =
+            Optional.ofNullable(commandOptions.getPathToJsonFile())
+                .map(path -> parseReplicaElectionData(path));
+
+        Optional<String> topicOption = Optional.ofNullable(commandOptions.getTopic());
+        Optional<Integer> partitionOption = Optional.ofNullable(commandOptions.getPartition());
+        final Optional<Set<TopicPartition>> singleTopicPartition =
+            (topicOption.isPresent() && partitionOption.isPresent()) ?
+                Optional.of(Collections.singleton(new TopicPartition(topicOption.get(), partitionOption.get()))) :
+                Optional.empty();
+
+        /* Note: No need to look at --all-topic-partitions as we want this to be null if it is use.
+         * The validate function should be checking that this option is required if the --topic and --path-to-json-file
+         * are not specified.
+         */
+        Optional<Set<TopicPartition>> topicPartitions = jsonFileTopicPartitions.map(Optional::of).orElse(singleTopicPartition);
+
+        Properties props = new Properties();
+        if (commandOptions.hasAdminClientConfig()) {
+            props.putAll(Utils.loadProps(commandOptions.getAdminClientConfig()));
+        }
+        props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, commandOptions.getBootstrapServer());
+        props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, Long.toString(timeoutMs.toMillis()));
+        props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, Long.toString(timeoutMs.toMillis() / 2));
+
+        try (Admin adminClient = Admin.create(props)) {
+            electLeaders(adminClient, electionType, topicPartitions);
+        }
+    }
+
+    private static void electLeaders(Admin client, ElectionType electionType, Optional<Set<TopicPartition>> partitions) {
+        LOG.debug(String.format("Calling AdminClient.electLeaders(%s, %s)", electionType, partitions.orElse(null)));
+        Map<TopicPartition, Optional<Throwable>> electionResults;
+        try {
+            electionResults = client.electLeaders(electionType, partitions.orElse(null)).partitions().get();
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof TimeoutException) {
+                String message = "Timeout waiting for election results";
+                System.err.println(message);
+                throw new AdminCommandFailedException(message, e.getCause());
+            } else if (e.getCause() instanceof ClusterAuthorizationException) {
+                String message = "Not authorized to perform leader election";
+                System.err.println(message);
+                throw new AdminCommandFailedException(message, e.getCause().getCause());
+            } else {
+                throw new RuntimeException(e);
+            }
+        } catch (InterruptedException e) {
+            System.err.println("Error while making request");

Review Comment:
   This was printed on standard out. Maybe it's not correct, but we risk breaking custom tools parsing the output.



##########
tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java:
##########
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.databind.JsonMappingException;
+import joptsimple.AbstractOptionSpec;
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpecBuilder;
+import joptsimple.util.EnumConverter;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.ElectionNotNeededException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.apache.kafka.server.common.AdminOperationException;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.Json;
+import org.apache.kafka.server.util.json.DecodeJson;
+import org.apache.kafka.server.util.json.JsonObject;
+import org.apache.kafka.server.util.json.JsonValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+public class LeaderElectionCommand {
+    private static final Logger LOG = LoggerFactory.getLogger(LeaderElectionCommand.class);
+    private static final DecodeJson.DecodeString STRING = new DecodeJson.DecodeString();
+    private static final DecodeJson.DecodeInteger INT = new DecodeJson.DecodeInteger();
+
+    public static void main(String... args) {
+        try {
+            run(Duration.ofMillis(30000), args);
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+        } catch (Exception e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+        }
+    }
+
+    static void run(Duration timeoutMs, String... args) throws Exception {
+        LeaderElectionCommandOptions commandOptions = new LeaderElectionCommandOptions(args);
+
+        commandOptions.maybePrintHelpOrVersion();
+
+        commandOptions.validate();
+        ElectionType electionType = commandOptions.getElectionType();
+        Optional<Set<TopicPartition>> jsonFileTopicPartitions =
+            Optional.ofNullable(commandOptions.getPathToJsonFile())
+                .map(path -> parseReplicaElectionData(path));
+
+        Optional<String> topicOption = Optional.ofNullable(commandOptions.getTopic());
+        Optional<Integer> partitionOption = Optional.ofNullable(commandOptions.getPartition());
+        final Optional<Set<TopicPartition>> singleTopicPartition =
+            (topicOption.isPresent() && partitionOption.isPresent()) ?
+                Optional.of(Collections.singleton(new TopicPartition(topicOption.get(), partitionOption.get()))) :
+                Optional.empty();
+
+        /* Note: No need to look at --all-topic-partitions as we want this to be null if it is use.
+         * The validate function should be checking that this option is required if the --topic and --path-to-json-file
+         * are not specified.
+         */
+        Optional<Set<TopicPartition>> topicPartitions = jsonFileTopicPartitions.map(Optional::of).orElse(singleTopicPartition);
+
+        Properties props = new Properties();
+        if (commandOptions.hasAdminClientConfig()) {
+            props.putAll(Utils.loadProps(commandOptions.getAdminClientConfig()));
+        }
+        props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, commandOptions.getBootstrapServer());
+        props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, Long.toString(timeoutMs.toMillis()));
+        props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, Long.toString(timeoutMs.toMillis() / 2));
+
+        try (Admin adminClient = Admin.create(props)) {
+            electLeaders(adminClient, electionType, topicPartitions);
+        }
+    }
+
+    private static void electLeaders(Admin client, ElectionType electionType, Optional<Set<TopicPartition>> partitions) {
+        LOG.debug(String.format("Calling AdminClient.electLeaders(%s, %s)", electionType, partitions.orElse(null)));
+        Map<TopicPartition, Optional<Throwable>> electionResults;
+        try {
+            electionResults = client.electLeaders(electionType, partitions.orElse(null)).partitions().get();
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof TimeoutException) {
+                String message = "Timeout waiting for election results";
+                System.err.println(message);
+                throw new AdminCommandFailedException(message, e.getCause());
+            } else if (e.getCause() instanceof ClusterAuthorizationException) {
+                String message = "Not authorized to perform leader election";
+                System.err.println(message);
+                throw new AdminCommandFailedException(message, e.getCause().getCause());
+            } else {
+                throw new RuntimeException(e);
+            }
+        } catch (InterruptedException e) {
+            System.err.println("Error while making request");
+            throw new RuntimeException(e);
+        }
+
+        Set<TopicPartition> succeeded = new HashSet<>();
+        Set<TopicPartition> noop = new HashSet<>();
+        Map<TopicPartition, Throwable> failed = new HashMap<>();
+
+        electionResults.entrySet().stream().forEach(entry -> {
+            Optional<Throwable> error = entry.getValue();
+            if (error.isPresent()) {
+                if (error.get() instanceof ElectionNotNeededException) {
+                    noop.add(entry.getKey());
+                } else {
+                    failed.put(entry.getKey(), error.get());
+                }
+            } else {
+                succeeded.add(entry.getKey());
+            }
+        });
+
+        if (!succeeded.isEmpty()) {
+            String partitionsAsString = succeeded.stream()
+                .map(TopicPartition::toString)
+                .collect(Collectors.joining(", "));
+            System.out.println(String.format("Successfully completed leader election (%s) for partitions %s",
+                electionType, partitionsAsString));
+        }
+
+        if (!noop.isEmpty()) {
+            String partitionsAsString = noop.stream()
+                .map(TopicPartition::toString)
+                .collect(Collectors.joining(", "));
+            System.out.println(String.format("Valid replica already elected for partitions %s", partitionsAsString));
+        }
+
+        if (!failed.isEmpty()) {
+            AdminCommandFailedException rootException =
+                new AdminCommandFailedException(String.format("%s replica(s) could not be elected", failed.size()));
+            failed.entrySet().forEach(entry -> {
+                System.err.println(String.format("Error completing leader election (%s) for partition: %s: %s",

Review Comment:
   This was printed on standard out. Maybe it's not correct, but we risk breaking custom tools parsing the output.



##########
tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java:
##########
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.databind.JsonMappingException;
+import joptsimple.AbstractOptionSpec;
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpecBuilder;
+import joptsimple.util.EnumConverter;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.ElectionNotNeededException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.apache.kafka.server.common.AdminOperationException;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.Json;
+import org.apache.kafka.server.util.json.DecodeJson;
+import org.apache.kafka.server.util.json.JsonObject;
+import org.apache.kafka.server.util.json.JsonValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+public class LeaderElectionCommand {
+    private static final Logger LOG = LoggerFactory.getLogger(LeaderElectionCommand.class);
+    private static final DecodeJson.DecodeString STRING = new DecodeJson.DecodeString();
+    private static final DecodeJson.DecodeInteger INT = new DecodeJson.DecodeInteger();
+
+    public static void main(String... args) {
+        try {
+            run(Duration.ofMillis(30000), args);
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+        } catch (Exception e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+        }
+    }
+
+    static void run(Duration timeoutMs, String... args) throws Exception {
+        LeaderElectionCommandOptions commandOptions = new LeaderElectionCommandOptions(args);
+
+        commandOptions.maybePrintHelpOrVersion();
+
+        commandOptions.validate();
+        ElectionType electionType = commandOptions.getElectionType();
+        Optional<Set<TopicPartition>> jsonFileTopicPartitions =
+            Optional.ofNullable(commandOptions.getPathToJsonFile())
+                .map(path -> parseReplicaElectionData(path));
+
+        Optional<String> topicOption = Optional.ofNullable(commandOptions.getTopic());
+        Optional<Integer> partitionOption = Optional.ofNullable(commandOptions.getPartition());
+        final Optional<Set<TopicPartition>> singleTopicPartition =
+            (topicOption.isPresent() && partitionOption.isPresent()) ?
+                Optional.of(Collections.singleton(new TopicPartition(topicOption.get(), partitionOption.get()))) :
+                Optional.empty();
+
+        /* Note: No need to look at --all-topic-partitions as we want this to be null if it is use.
+         * The validate function should be checking that this option is required if the --topic and --path-to-json-file
+         * are not specified.
+         */
+        Optional<Set<TopicPartition>> topicPartitions = jsonFileTopicPartitions.map(Optional::of).orElse(singleTopicPartition);
+
+        Properties props = new Properties();
+        if (commandOptions.hasAdminClientConfig()) {
+            props.putAll(Utils.loadProps(commandOptions.getAdminClientConfig()));
+        }
+        props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, commandOptions.getBootstrapServer());
+        props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, Long.toString(timeoutMs.toMillis()));
+        props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, Long.toString(timeoutMs.toMillis() / 2));
+
+        try (Admin adminClient = Admin.create(props)) {
+            electLeaders(adminClient, electionType, topicPartitions);
+        }
+    }
+
+    private static void electLeaders(Admin client, ElectionType electionType, Optional<Set<TopicPartition>> partitions) {
+        LOG.debug(String.format("Calling AdminClient.electLeaders(%s, %s)", electionType, partitions.orElse(null)));
+        Map<TopicPartition, Optional<Throwable>> electionResults;
+        try {
+            electionResults = client.electLeaders(electionType, partitions.orElse(null)).partitions().get();
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof TimeoutException) {
+                String message = "Timeout waiting for election results";
+                System.err.println(message);
+                throw new AdminCommandFailedException(message, e.getCause());
+            } else if (e.getCause() instanceof ClusterAuthorizationException) {
+                String message = "Not authorized to perform leader election";
+                System.err.println(message);

Review Comment:
   This was printed on standard out. Maybe it's not correct, but we risk breaking custom tools parsing the output.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] OmniaGM commented on a diff in pull request #13204: KAFKA-14593: Move LeaderElectionCommand to tools

Posted by "OmniaGM (via GitHub)" <gi...@apache.org>.
OmniaGM commented on code in PR #13204:
URL: https://github.com/apache/kafka/pull/13204#discussion_r1294455490


##########
tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java:
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.test.ClusterConfig;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import kafka.utils.TestUtils;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.extension.ExtendWith;
+import scala.collection.JavaConverters;
+
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+@SuppressWarnings("deprecation")
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ALL, brokers = 3)
+@Tag("integration")
+public class LeaderElectionCommandTest {
+    private final ClusterInstance cluster;
+    int broker1 = 0;

Review Comment:
   Removed it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] OmniaGM commented on pull request #13204: KAFKA-14593: Move LeaderElectionCommand to tools

Posted by "OmniaGM (via GitHub)" <gi...@apache.org>.
OmniaGM commented on PR #13204:
URL: https://github.com/apache/kafka/pull/13204#issuecomment-1687836451

   @mimaison I can't reproduce the problem locally. I ran `./gradlew -PscalaVersion=2.13 test --profile --continue -PkeepAliveMode=session -PtestLoggingEvents=started,passed,skipped,failed -PignoreFailures=true -PmaxParallelForks=2 -PmaxTestRetries=1 -PmaxTestRetryFailures=10` but I get different set of failed tests. Any suggestions regards how to reproduce this locally


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13204: KAFKA-14593: Move LeaderElectionCommand to tools

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13204:
URL: https://github.com/apache/kafka/pull/13204#discussion_r1104395849


##########
build.gradle:
##########
@@ -1756,6 +1756,7 @@ project(':tools') {
 
   dependencies {
     implementation project(':clients')
+    implementation project(':core')

Review Comment:
   These classes may be candidates to land in the module `server-common`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] OmniaGM commented on a diff in pull request #13204: KAFKA-14593: Move LeaderElectionCommand to tools

Posted by "OmniaGM (via GitHub)" <gi...@apache.org>.
OmniaGM commented on code in PR #13204:
URL: https://github.com/apache/kafka/pull/13204#discussion_r1294460859


##########
tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java:
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.test.ClusterConfig;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import kafka.utils.TestUtils;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.extension.ExtendWith;
+import scala.collection.JavaConverters;
+
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+@SuppressWarnings("deprecation")
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ALL, brokers = 3)
+@Tag("integration")
+public class LeaderElectionCommandTest {
+    private final ClusterInstance cluster;
+    int broker1 = 0;
+    int broker2 = 1;
+    int broker3 = 2;
+
+    public LeaderElectionCommandTest(ClusterInstance cluster) {
+        this.cluster = cluster;
+    }
+
+    @BeforeEach
+    void setup(ClusterConfig clusterConfig) {
+        TestUtils.verifyNoUnexpectedThreads("@BeforeEach");
+        clusterConfig.serverProperties().put(KafkaConfig.AutoLeaderRebalanceEnableProp(), "false");
+        clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownEnableProp(), "true");
+        clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownMaxRetriesProp(), "1");
+        clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownRetryBackoffMsProp(), "1000");
+        clusterConfig.serverProperties().put(KafkaConfig.OffsetsTopicReplicationFactorProp(), "2");
+    }
+
+    @ClusterTest
+    public void testAllTopicPartition() throws InterruptedException, ExecutionException {
+        String topic = "unclean-topic";
+        int partition = 0;
+        List<Integer> assignment = Arrays.asList(broker2, broker3);
+
+        cluster.waitForReadyBrokers();
+        Admin client = cluster.createAdminClient();
+        Map<Integer, List<Integer>> partitionAssignment = new HashMap<>();
+        partitionAssignment.put(partition, assignment);
+
+        createTopic(client, topic, partitionAssignment);

Review Comment:
   updated with the suggested comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] OmniaGM commented on pull request #13204: KAFKA-14593: Move LeaderElectionCommand to tools

Posted by "OmniaGM (via GitHub)" <gi...@apache.org>.
OmniaGM commented on PR #13204:
URL: https://github.com/apache/kafka/pull/13204#issuecomment-1717905593

   I found out why the tests were failing. It turns out that we are hitting a similar problem to this gradle [issue#847](https://github.com/gradle/gradle/issues/847) that causing transitive dependency problems where `storage:api` and `connect:api` were causing unintended conflicts. The gradle issue seems to not be 100% solved so I pushed a fix with a workaround that renames the `storage:api` project to `storage:storage-api`. This shouldn't impact the name of the final jar.  
   I pushed the changes and am waiting for the pipelines to pass. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13204: KAFKA-14593: Move LeaderElectionCommand to tools

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on code in PR #13204:
URL: https://github.com/apache/kafka/pull/13204#discussion_r1102366936


##########
build.gradle:
##########
@@ -1756,6 +1756,7 @@ project(':tools') {
 
   dependencies {
     implementation project(':clients')
+    implementation project(':core')

Review Comment:
   The direction from previous migrations is that we don't want to depend on core. I see there are a few kafka.* dependencies in LeaderElectionCommand class. Do you think we can also migrate them as part of this PR?



##########
tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java:
##########
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import joptsimple.AbstractOptionSpec;
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpecBuilder;
+import joptsimple.util.EnumConverter;
+import kafka.admin.AdminOperationException;
+import kafka.common.AdminCommandFailedException;
+import kafka.utils.CoreUtils;
+import kafka.utils.Json;
+import kafka.utils.json.DecodeJson;
+import kafka.utils.json.JsonObject;
+import kafka.utils.json.JsonValue;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.ElectionNotNeededException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.collection.Iterable;
+import scala.collection.Iterator;
+import scala.collection.JavaConverters;
+import scala.collection.mutable.Seq;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+public class LeaderElectionCommand {
+    private static final Logger log = LoggerFactory.getLogger(LeaderElectionCommand.class);
+
+    public static void main(String... args) {
+        try {
+            run(Duration.ofSeconds(30), args);
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+        } catch (Exception e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+        }
+    }
+
+    static void run(Duration timeout, String... args) throws Exception {
+        LeaderElectionCommandOptions commandOptions = new LeaderElectionCommandOptions(args);
+
+        CommandLineUtils.maybePrintHelpOrVersion(
+            commandOptions,
+            "This tool attempts to elect a new leader for a set of topic partitions. The type of elections supported are preferred replicas and unclean replicas."
+        );
+
+        validate(commandOptions);
+        ElectionType electionType = commandOptions.options.valueOf(commandOptions.electionType);

Review Comment:
   You can create a LeaderElectionCommandOptions wrapper method for every `commandOptions.` parsing/transformation logic. It would be much more readable.



##########
tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java:
##########
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import joptsimple.AbstractOptionSpec;
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpecBuilder;
+import joptsimple.util.EnumConverter;
+import kafka.admin.AdminOperationException;
+import kafka.common.AdminCommandFailedException;
+import kafka.utils.CoreUtils;
+import kafka.utils.Json;
+import kafka.utils.json.DecodeJson;
+import kafka.utils.json.JsonObject;
+import kafka.utils.json.JsonValue;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.ElectionNotNeededException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.collection.Iterable;
+import scala.collection.Iterator;
+import scala.collection.JavaConverters;
+import scala.collection.mutable.Seq;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+public class LeaderElectionCommand {
+    private static final Logger log = LoggerFactory.getLogger(LeaderElectionCommand.class);
+
+    public static void main(String... args) {
+        try {
+            run(Duration.ofSeconds(30), args);
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+        } catch (Exception e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+        }
+    }
+
+    static void run(Duration timeout, String... args) throws Exception {

Review Comment:
   I know this is as it was in the original command, but I think we can improve by passing milliseconds rather than seconds and change the name to timeoutMs.  



##########
tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java:
##########
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import joptsimple.AbstractOptionSpec;
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpecBuilder;
+import joptsimple.util.EnumConverter;
+import kafka.admin.AdminOperationException;
+import kafka.common.AdminCommandFailedException;
+import kafka.utils.CoreUtils;
+import kafka.utils.Json;
+import kafka.utils.json.DecodeJson;
+import kafka.utils.json.JsonObject;
+import kafka.utils.json.JsonValue;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.ElectionNotNeededException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.collection.Iterable;
+import scala.collection.Iterator;
+import scala.collection.JavaConverters;
+import scala.collection.mutable.Seq;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+public class LeaderElectionCommand {
+    private static final Logger log = LoggerFactory.getLogger(LeaderElectionCommand.class);
+
+    public static void main(String... args) {
+        try {
+            run(Duration.ofSeconds(30), args);
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+        } catch (Exception e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+        }
+    }
+
+    static void run(Duration timeout, String... args) throws Exception {
+        LeaderElectionCommandOptions commandOptions = new LeaderElectionCommandOptions(args);
+
+        CommandLineUtils.maybePrintHelpOrVersion(
+            commandOptions,
+            "This tool attempts to elect a new leader for a set of topic partitions. The type of elections supported are preferred replicas and unclean replicas."
+        );
+
+        validate(commandOptions);

Review Comment:
   We can make this method part of LeaderElectionCommandOptions and simply call commandOptions.validate(), also including the above maybePrintHelpOrVersion.



##########
tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java:
##########
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import joptsimple.AbstractOptionSpec;
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpecBuilder;
+import joptsimple.util.EnumConverter;
+import kafka.admin.AdminOperationException;
+import kafka.common.AdminCommandFailedException;
+import kafka.utils.CoreUtils;
+import kafka.utils.Json;
+import kafka.utils.json.DecodeJson;
+import kafka.utils.json.JsonObject;
+import kafka.utils.json.JsonValue;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.ElectionNotNeededException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.collection.Iterable;
+import scala.collection.Iterator;
+import scala.collection.JavaConverters;
+import scala.collection.mutable.Seq;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+public class LeaderElectionCommand {
+    private static final Logger log = LoggerFactory.getLogger(LeaderElectionCommand.class);
+
+    public static void main(String... args) {
+        try {
+            run(Duration.ofSeconds(30), args);
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+        } catch (Exception e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+        }
+    }
+
+    static void run(Duration timeout, String... args) throws Exception {
+        LeaderElectionCommandOptions commandOptions = new LeaderElectionCommandOptions(args);
+
+        CommandLineUtils.maybePrintHelpOrVersion(
+            commandOptions,
+            "This tool attempts to elect a new leader for a set of topic partitions. The type of elections supported are preferred replicas and unclean replicas."
+        );
+
+        validate(commandOptions);
+        ElectionType electionType = commandOptions.options.valueOf(commandOptions.electionType);
+        Optional<Set<TopicPartition>> jsonFileTopicPartitions =
+            Optional.ofNullable(commandOptions.options.valueOf(commandOptions.pathToJsonFile))
+                .map(path -> parseReplicaElectionData(path));
+
+        Optional<String> topicOption = Optional.ofNullable(commandOptions.options.valueOf(commandOptions.topic));
+        Optional<Integer> partitionOption = Optional.ofNullable(commandOptions.options.valueOf(commandOptions.partition));
+        final Optional<Set<TopicPartition>> singleTopicPartition =
+            (topicOption.isPresent() && partitionOption.isPresent()) ?
+                Optional.of(Collections.singleton(new TopicPartition(topicOption.get(), partitionOption.get()))) :
+                Optional.empty();
+
+        /* Note: No need to look at --all-topic-partitions as we want this to be None if it is use.
+         * The validate function should be checking that this option is required if the --topic and --path-to-json-file
+         * are not specified.
+         */
+        Optional<Set<TopicPartition>> topicPartitions = jsonFileTopicPartitions.map(Optional::of).orElse(singleTopicPartition);
+
+        Properties props = new Properties();
+        if (commandOptions.options.has(commandOptions.adminClientConfig)) {
+            props.putAll(Utils.loadProps(commandOptions.options.valueOf(commandOptions.adminClientConfig)));
+        }
+        props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, commandOptions.options.valueOf(commandOptions.bootstrapServer));
+        props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, Long.toString(timeout.toMillis()));
+        props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, Long.toString(timeout.toMillis() / 2));
+
+        Admin adminClient = Admin.create(props);
+        try {
+            electLeaders(adminClient, electionType, topicPartitions);
+        } finally {
+            adminClient.close();
+        }
+    }
+
+    private static void electLeaders(Admin client, ElectionType electionType, Optional<Set<TopicPartition>> partitions) {
+        log.debug(String.format("Calling AdminClient.electLeaders(%s, %s)", electionType, partitions.orElse(null)));
+        Map<TopicPartition, Optional<Throwable>> electionResults;
+        try {
+            electionResults = client.electLeaders(electionType, partitions.orElse(null)).partitions().get();
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof TimeoutException) {
+                String message = "Timeout waiting for election results";
+                System.err.println(message);
+                throw new AdminCommandFailedException(message, e.getCause());
+            } else if (e.getCause() instanceof ClusterAuthorizationException) {
+                String message = "Not authorized to perform leader election";
+                System.err.println(message);
+                throw new AdminCommandFailedException(message, e.getCause().getCause());
+            } else {
+                throw new RuntimeException(e);
+            }
+        } catch (InterruptedException e) {
+            System.err.println("Error while making request");
+            throw new RuntimeException(e);
+        }
+
+        Set<TopicPartition> succeeded = new HashSet<>();
+        Set<TopicPartition> noop = new HashSet<>();
+        Map<TopicPartition, Throwable> failed = new HashMap<>();
+
+        electionResults.entrySet().stream().forEach(entry -> {
+            Optional<Throwable> error = entry.getValue();
+            if (error.isPresent()) {
+                if (error.get() instanceof ElectionNotNeededException) {
+                    noop.add(entry.getKey());
+                } else {
+                    failed.put(entry.getKey(), error.get());
+                }
+            } else {
+                succeeded.add(entry.getKey());
+            }
+        });
+
+        if (!succeeded.isEmpty()) {
+            String partitionsAsString = succeeded.stream()
+                .map(TopicPartition::toString)
+                .collect(Collectors.joining(", "));
+            System.out.println(String.format("Successfully completed leader election (%s) for partitions %s",
+                electionType, partitionsAsString));
+        }
+
+        if (!noop.isEmpty()) {
+            String partitionsAsString = noop.stream()
+                .map(TopicPartition::toString)
+                .collect(Collectors.joining(", "));
+            System.out.println(String.format("Valid replica already elected for partitions %s", partitionsAsString));
+        }
+
+        if (!failed.isEmpty()) {
+            AdminCommandFailedException rootException =
+                new AdminCommandFailedException(String.format("%s replica(s) could not be elected", failed.size()));
+            failed.entrySet().forEach(entry -> {
+                System.err.println(String.format("Error completing leader election (%s) for partition: %s: %s",
+                    electionType, entry.getKey(), entry.getValue()));
+                rootException.addSuppressed(entry.getValue());
+            });
+            throw rootException;
+        }
+    }
+
+    private static Set<TopicPartition> parseReplicaElectionData(String path) {
+        Optional<JsonValue> jsonFile;
+        try {
+            jsonFile = Optional.ofNullable(Json.parseFull(Utils.readFileAsString(path)).getOrElse(null));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return jsonFile.map(js -> topicPartitions(js))
+            .orElseThrow(() -> new AdminOperationException("Replica election data is empty"));
+    }
+
+    private static Set<TopicPartition> topicPartitions(JsonValue js) {
+        Option<Seq<TopicPartition>> topicPartitions = js.asJsonObject().get("partitions").map(partitionsList -> {
+            Iterator<JsonObject> partitionsRaw = partitionsList.asJsonArray().iterator()
+                .map(jsonValue -> jsonValue.asJsonObject());
+
+            Seq<TopicPartition> partitions = partitionsRaw
+                .map(p -> new TopicPartition(
+                    p.get("topic").get().to(DecodeJson.DecodeString$.MODULE$),
+                    (int) p.get("partition").get().to(DecodeJson.DecodeInt$.MODULE$)))
+                .toBuffer();
+
+            Iterable<TopicPartition> duplicatePartitions = CoreUtils.duplicates(partitions);
+            if (duplicatePartitions.nonEmpty()) {
+                throw new AdminOperationException(String.format(
+                    "Replica election data contains duplicate partitions: %s", duplicatePartitions.mkString(","))
+                );
+            }
+            return partitions;
+        });
+        if (topicPartitions.isEmpty()) {
+            throw new AdminOperationException("Replica election data is missing \"partitions\" field");
+        } else {
+            return new HashSet<>(JavaConverters.asJavaCollection(topicPartitions.get()));
+        }
+    }
+    private static void validate(LeaderElectionCommandOptions commandOptions) {
+        // required options: --bootstrap-server and --election-type
+        List<String> missingOptions = new ArrayList<>();
+
+        if (!commandOptions.options.has(commandOptions.bootstrapServer)) {
+            missingOptions.add(commandOptions.bootstrapServer.options().get(0).toString());
+        }
+        if (!commandOptions.options.has(commandOptions.electionType)) {
+            missingOptions.add(commandOptions.electionType.options().get(0));
+        }
+        if (!missingOptions.isEmpty()) {
+            throw new AdminCommandFailedException("Missing required option(s): " + String.join(", ", missingOptions));
+        }
+
+        // One and only one is required: --topic, --all-topic-partitions or --path-to-json-file
+        List<AbstractOptionSpec<?>> mutuallyExclusiveOptions = Arrays.asList(
+            commandOptions.topic,
+            commandOptions.allTopicPartitions,
+            commandOptions.pathToJsonFile
+        );
+
+        long mutuallyExclusiveOptionsCount = mutuallyExclusiveOptions.stream()
+            .filter(abstractOptionSpec -> commandOptions.options.has(abstractOptionSpec))
+            .count();
+        // 1 is the only correct configuration, don't throw an exception
+        if (mutuallyExclusiveOptionsCount != 1) {
+            throw new AdminCommandFailedException(
+                "One and only one of the following options is required: " +
+                    mutuallyExclusiveOptions.stream().map(opt -> opt.options().get(0)).collect(Collectors.joining(", "))
+            );
+        }
+        // --partition if and only if --topic is used
+        if (commandOptions.options.has(commandOptions.topic) && !commandOptions.options.has(commandOptions.partition)) {
+            throw new AdminCommandFailedException(String.format("Missing required option(s): %s",
+                commandOptions.partition.options().get(0)));
+        }
+
+        if (!commandOptions.options.has(commandOptions.topic) && commandOptions.options.has(commandOptions.partition)) {
+            throw new AdminCommandFailedException(String.format("Option %s is only allowed if %s is used",
+                commandOptions.partition.options().get(0),
+                commandOptions.topic.options().get(0)
+            ));
+        }
+    }
+
+    static class LeaderElectionCommandOptions  extends CommandDefaultOptions {

Review Comment:
   ```suggestion
       static class LeaderElectionCommandOptions extends CommandDefaultOptions {
   ```



##########
tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java:
##########
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import kafka.common.AdminCommandFailedException;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.test.ClusterConfig;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import kafka.utils.TestUtils;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.extension.ExtendWith;
+import scala.collection.JavaConverters;
+
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+@SuppressWarnings("deprecation")
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ALL, brokers = 3)
+@Tag("integration")
+public class LeaderElectionCommandTest {
+    private final ClusterInstance cluster;
+    int broker1 = 0;
+    int broker2 = 1;
+    int broker3 = 2;
+
+    public LeaderElectionCommandTest(ClusterInstance cluster) {
+        this.cluster = cluster;
+    }
+

Review Comment:
   ```suggestion
   ```



##########
tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java:
##########
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import joptsimple.AbstractOptionSpec;
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpecBuilder;
+import joptsimple.util.EnumConverter;
+import kafka.admin.AdminOperationException;
+import kafka.common.AdminCommandFailedException;
+import kafka.utils.CoreUtils;
+import kafka.utils.Json;
+import kafka.utils.json.DecodeJson;
+import kafka.utils.json.JsonObject;
+import kafka.utils.json.JsonValue;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.ElectionNotNeededException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.collection.Iterable;
+import scala.collection.Iterator;
+import scala.collection.JavaConverters;
+import scala.collection.mutable.Seq;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+public class LeaderElectionCommand {
+    private static final Logger log = LoggerFactory.getLogger(LeaderElectionCommand.class);

Review Comment:
   We can probably get rid of this. There is only one log statement that just prints input parameters, not much useful.



##########
tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java:
##########
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import joptsimple.AbstractOptionSpec;
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpecBuilder;
+import joptsimple.util.EnumConverter;
+import kafka.admin.AdminOperationException;
+import kafka.common.AdminCommandFailedException;
+import kafka.utils.CoreUtils;
+import kafka.utils.Json;
+import kafka.utils.json.DecodeJson;
+import kafka.utils.json.JsonObject;
+import kafka.utils.json.JsonValue;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.ElectionNotNeededException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.collection.Iterable;
+import scala.collection.Iterator;
+import scala.collection.JavaConverters;
+import scala.collection.mutable.Seq;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+public class LeaderElectionCommand {
+    private static final Logger log = LoggerFactory.getLogger(LeaderElectionCommand.class);
+
+    public static void main(String... args) {
+        try {
+            run(Duration.ofSeconds(30), args);
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+        } catch (Exception e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+        }
+    }
+
+    static void run(Duration timeout, String... args) throws Exception {
+        LeaderElectionCommandOptions commandOptions = new LeaderElectionCommandOptions(args);
+
+        CommandLineUtils.maybePrintHelpOrVersion(
+            commandOptions,
+            "This tool attempts to elect a new leader for a set of topic partitions. The type of elections supported are preferred replicas and unclean replicas."
+        );
+
+        validate(commandOptions);
+        ElectionType electionType = commandOptions.options.valueOf(commandOptions.electionType);
+        Optional<Set<TopicPartition>> jsonFileTopicPartitions =
+            Optional.ofNullable(commandOptions.options.valueOf(commandOptions.pathToJsonFile))
+                .map(path -> parseReplicaElectionData(path));
+
+        Optional<String> topicOption = Optional.ofNullable(commandOptions.options.valueOf(commandOptions.topic));
+        Optional<Integer> partitionOption = Optional.ofNullable(commandOptions.options.valueOf(commandOptions.partition));
+        final Optional<Set<TopicPartition>> singleTopicPartition =
+            (topicOption.isPresent() && partitionOption.isPresent()) ?
+                Optional.of(Collections.singleton(new TopicPartition(topicOption.get(), partitionOption.get()))) :
+                Optional.empty();
+
+        /* Note: No need to look at --all-topic-partitions as we want this to be None if it is use.
+         * The validate function should be checking that this option is required if the --topic and --path-to-json-file
+         * are not specified.
+         */
+        Optional<Set<TopicPartition>> topicPartitions = jsonFileTopicPartitions.map(Optional::of).orElse(singleTopicPartition);
+
+        Properties props = new Properties();
+        if (commandOptions.options.has(commandOptions.adminClientConfig)) {
+            props.putAll(Utils.loadProps(commandOptions.options.valueOf(commandOptions.adminClientConfig)));
+        }
+        props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, commandOptions.options.valueOf(commandOptions.bootstrapServer));
+        props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, Long.toString(timeout.toMillis()));
+        props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, Long.toString(timeout.toMillis() / 2));
+
+        Admin adminClient = Admin.create(props);

Review Comment:
   The Admin interface extends AutoCloseable, so we can use try-with-resources here.



##########
tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java:
##########
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import joptsimple.AbstractOptionSpec;
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpecBuilder;
+import joptsimple.util.EnumConverter;
+import kafka.admin.AdminOperationException;
+import kafka.common.AdminCommandFailedException;
+import kafka.utils.CoreUtils;
+import kafka.utils.Json;
+import kafka.utils.json.DecodeJson;
+import kafka.utils.json.JsonObject;
+import kafka.utils.json.JsonValue;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.ElectionNotNeededException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.collection.Iterable;
+import scala.collection.Iterator;
+import scala.collection.JavaConverters;
+import scala.collection.mutable.Seq;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+public class LeaderElectionCommand {
+    private static final Logger log = LoggerFactory.getLogger(LeaderElectionCommand.class);
+
+    public static void main(String... args) {
+        try {
+            run(Duration.ofSeconds(30), args);
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+        } catch (Exception e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+        }
+    }
+
+    static void run(Duration timeout, String... args) throws Exception {
+        LeaderElectionCommandOptions commandOptions = new LeaderElectionCommandOptions(args);
+
+        CommandLineUtils.maybePrintHelpOrVersion(
+            commandOptions,
+            "This tool attempts to elect a new leader for a set of topic partitions. The type of elections supported are preferred replicas and unclean replicas."
+        );
+
+        validate(commandOptions);
+        ElectionType electionType = commandOptions.options.valueOf(commandOptions.electionType);
+        Optional<Set<TopicPartition>> jsonFileTopicPartitions =
+            Optional.ofNullable(commandOptions.options.valueOf(commandOptions.pathToJsonFile))
+                .map(path -> parseReplicaElectionData(path));
+
+        Optional<String> topicOption = Optional.ofNullable(commandOptions.options.valueOf(commandOptions.topic));
+        Optional<Integer> partitionOption = Optional.ofNullable(commandOptions.options.valueOf(commandOptions.partition));
+        final Optional<Set<TopicPartition>> singleTopicPartition =
+            (topicOption.isPresent() && partitionOption.isPresent()) ?
+                Optional.of(Collections.singleton(new TopicPartition(topicOption.get(), partitionOption.get()))) :
+                Optional.empty();
+
+        /* Note: No need to look at --all-topic-partitions as we want this to be None if it is use.

Review Comment:
   ```suggestion
           /* Note: No need to look at --all-topic-partitions as we want this to be null if it is use.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] OmniaGM commented on a diff in pull request #13204: KAFKA-14593: Move LeaderElectionCommand to tools

Posted by "OmniaGM (via GitHub)" <gi...@apache.org>.
OmniaGM commented on code in PR #13204:
URL: https://github.com/apache/kafka/pull/13204#discussion_r1288888079


##########
tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java:
##########
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.databind.JsonMappingException;
+import joptsimple.AbstractOptionSpec;
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpecBuilder;
+import joptsimple.util.EnumConverter;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.ElectionNotNeededException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.apache.kafka.server.common.AdminOperationException;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.Json;
+import org.apache.kafka.server.util.json.DecodeJson;
+import org.apache.kafka.server.util.json.JsonObject;
+import org.apache.kafka.server.util.json.JsonValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+public class LeaderElectionCommand {
+    private static final Logger LOG = LoggerFactory.getLogger(LeaderElectionCommand.class);
+    private static final DecodeJson.DecodeString STRING = new DecodeJson.DecodeString();
+    private static final DecodeJson.DecodeInteger INT = new DecodeJson.DecodeInteger();
+
+    public static void main(String... args) {
+        try {
+            run(Duration.ofMillis(30000), args);
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+        } catch (Exception e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+        }
+    }
+
+    static void run(Duration timeoutMs, String... args) throws Exception {
+        LeaderElectionCommandOptions commandOptions = new LeaderElectionCommandOptions(args);
+
+        commandOptions.maybePrintHelpOrVersion();
+
+        commandOptions.validate();
+        ElectionType electionType = commandOptions.getElectionType();
+        Optional<Set<TopicPartition>> jsonFileTopicPartitions =
+            Optional.ofNullable(commandOptions.getPathToJsonFile())
+                .map(path -> parseReplicaElectionData(path));
+
+        Optional<String> topicOption = Optional.ofNullable(commandOptions.getTopic());
+        Optional<Integer> partitionOption = Optional.ofNullable(commandOptions.getPartition());
+        final Optional<Set<TopicPartition>> singleTopicPartition =
+            (topicOption.isPresent() && partitionOption.isPresent()) ?
+                Optional.of(Collections.singleton(new TopicPartition(topicOption.get(), partitionOption.get()))) :
+                Optional.empty();
+
+        /* Note: No need to look at --all-topic-partitions as we want this to be null if it is use.
+         * The validate function should be checking that this option is required if the --topic and --path-to-json-file
+         * are not specified.
+         */
+        Optional<Set<TopicPartition>> topicPartitions = jsonFileTopicPartitions.map(Optional::of).orElse(singleTopicPartition);
+
+        Properties props = new Properties();
+        if (commandOptions.hasAdminClientConfig()) {
+            props.putAll(Utils.loadProps(commandOptions.getAdminClientConfig()));
+        }
+        props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, commandOptions.getBootstrapServer());
+        props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, Long.toString(timeoutMs.toMillis()));
+        props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, Long.toString(timeoutMs.toMillis() / 2));
+
+        try (Admin adminClient = Admin.create(props)) {
+            electLeaders(adminClient, electionType, topicPartitions);
+        }
+    }
+
+    private static void electLeaders(Admin client, ElectionType electionType, Optional<Set<TopicPartition>> partitions) {
+        LOG.debug(String.format("Calling AdminClient.electLeaders(%s, %s)", electionType, partitions.orElse(null)));
+        Map<TopicPartition, Optional<Throwable>> electionResults;
+        try {
+            electionResults = client.electLeaders(electionType, partitions.orElse(null)).partitions().get();
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof TimeoutException) {
+                String message = "Timeout waiting for election results";
+                System.err.println(message);
+                throw new AdminCommandFailedException(message, e.getCause());
+            } else if (e.getCause() instanceof ClusterAuthorizationException) {
+                String message = "Not authorized to perform leader election";
+                System.err.println(message);
+                throw new AdminCommandFailedException(message, e.getCause().getCause());
+            } else {
+                throw new RuntimeException(e);
+            }
+        } catch (InterruptedException e) {
+            System.err.println("Error while making request");
+            throw new RuntimeException(e);
+        }
+
+        Set<TopicPartition> succeeded = new HashSet<>();
+        Set<TopicPartition> noop = new HashSet<>();
+        Map<TopicPartition, Throwable> failed = new HashMap<>();
+
+        electionResults.entrySet().stream().forEach(entry -> {
+            Optional<Throwable> error = entry.getValue();
+            if (error.isPresent()) {
+                if (error.get() instanceof ElectionNotNeededException) {
+                    noop.add(entry.getKey());
+                } else {
+                    failed.put(entry.getKey(), error.get());
+                }
+            } else {
+                succeeded.add(entry.getKey());
+            }
+        });
+
+        if (!succeeded.isEmpty()) {
+            String partitionsAsString = succeeded.stream()
+                .map(TopicPartition::toString)
+                .collect(Collectors.joining(", "));
+            System.out.println(String.format("Successfully completed leader election (%s) for partitions %s",
+                electionType, partitionsAsString));
+        }
+
+        if (!noop.isEmpty()) {
+            String partitionsAsString = noop.stream()
+                .map(TopicPartition::toString)
+                .collect(Collectors.joining(", "));
+            System.out.println(String.format("Valid replica already elected for partitions %s", partitionsAsString));
+        }
+
+        if (!failed.isEmpty()) {
+            AdminCommandFailedException rootException =
+                new AdminCommandFailedException(String.format("%s replica(s) could not be elected", failed.size()));
+            failed.entrySet().forEach(entry -> {
+                System.err.println(String.format("Error completing leader election (%s) for partition: %s: %s",

Review Comment:
   Switched to standard out. I think it is still wrong to print it as standard out



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mimaison commented on a diff in pull request #13204: KAFKA-14593: Move LeaderElectionCommand to tools

Posted by "mimaison (via GitHub)" <gi...@apache.org>.
mimaison commented on code in PR #13204:
URL: https://github.com/apache/kafka/pull/13204#discussion_r1290277580


##########
tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java:
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.test.ClusterConfig;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import kafka.utils.TestUtils;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.extension.ExtendWith;
+import scala.collection.JavaConverters;
+
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+@SuppressWarnings("deprecation")
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ALL, brokers = 3)
+@Tag("integration")
+public class LeaderElectionCommandTest {
+    private final ClusterInstance cluster;
+    int broker1 = 0;
+    int broker2 = 1;
+    int broker3 = 2;
+
+    public LeaderElectionCommandTest(ClusterInstance cluster) {
+        this.cluster = cluster;
+    }
+
+    @BeforeEach
+    void setup(ClusterConfig clusterConfig) {
+        TestUtils.verifyNoUnexpectedThreads("@BeforeEach");
+        clusterConfig.serverProperties().put(KafkaConfig.AutoLeaderRebalanceEnableProp(), "false");
+        clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownEnableProp(), "true");
+        clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownMaxRetriesProp(), "1");
+        clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownRetryBackoffMsProp(), "1000");
+        clusterConfig.serverProperties().put(KafkaConfig.OffsetsTopicReplicationFactorProp(), "2");
+    }
+
+    @ClusterTest
+    public void testAllTopicPartition() throws InterruptedException, ExecutionException {
+        String topic = "unclean-topic";
+        int partition = 0;
+        List<Integer> assignment = Arrays.asList(broker2, broker3);
+
+        cluster.waitForReadyBrokers();
+        Admin client = cluster.createAdminClient();
+        Map<Integer, List<Integer>> partitionAssignment = new HashMap<>();
+        partitionAssignment.put(partition, assignment);
+
+        createTopic(client, topic, partitionAssignment);
+
+        TopicPartition topicPartition = new TopicPartition(topic, partition);
+
+        TestUtils.assertLeader(client, topicPartition, broker2);
+        cluster.shutdownBroker(broker3);
+        TestUtils.waitForBrokersOutOfIsr(client,
+            JavaConverters.asScalaBuffer(Collections.singletonList(topicPartition)).toSet(),
+            JavaConverters.asScalaBuffer(Collections.singletonList(broker3)).toSet()
+        );
+        cluster.shutdownBroker(broker2);
+        TestUtils.assertNoLeader(client, topicPartition);
+        cluster.startBroker(broker3);
+        TestUtils.waitForOnlineBroker(client, broker3);
+
+        LeaderElectionCommand.main(
+            new String[] {

Review Comment:
   Since `main()` takes a varargs we don't need to create an array here.
   Same below



##########
tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java:
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.test.ClusterConfig;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import kafka.utils.TestUtils;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.extension.ExtendWith;
+import scala.collection.JavaConverters;
+
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+@SuppressWarnings("deprecation")
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ALL, brokers = 3)
+@Tag("integration")
+public class LeaderElectionCommandTest {
+    private final ClusterInstance cluster;
+    int broker1 = 0;

Review Comment:
   We don't seem to be using this one



##########
tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java:
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.test.ClusterConfig;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import kafka.utils.TestUtils;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.extension.ExtendWith;
+import scala.collection.JavaConverters;
+
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+@SuppressWarnings("deprecation")
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ALL, brokers = 3)
+@Tag("integration")
+public class LeaderElectionCommandTest {
+    private final ClusterInstance cluster;
+    int broker1 = 0;
+    int broker2 = 1;
+    int broker3 = 2;
+
+    public LeaderElectionCommandTest(ClusterInstance cluster) {
+        this.cluster = cluster;
+    }
+
+    @BeforeEach
+    void setup(ClusterConfig clusterConfig) {
+        TestUtils.verifyNoUnexpectedThreads("@BeforeEach");
+        clusterConfig.serverProperties().put(KafkaConfig.AutoLeaderRebalanceEnableProp(), "false");
+        clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownEnableProp(), "true");
+        clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownMaxRetriesProp(), "1");
+        clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownRetryBackoffMsProp(), "1000");
+        clusterConfig.serverProperties().put(KafkaConfig.OffsetsTopicReplicationFactorProp(), "2");
+    }
+
+    @ClusterTest
+    public void testAllTopicPartition() throws InterruptedException, ExecutionException {
+        String topic = "unclean-topic";
+        int partition = 0;
+        List<Integer> assignment = Arrays.asList(broker2, broker3);
+
+        cluster.waitForReadyBrokers();
+        Admin client = cluster.createAdminClient();
+        Map<Integer, List<Integer>> partitionAssignment = new HashMap<>();
+        partitionAssignment.put(partition, assignment);
+
+        createTopic(client, topic, partitionAssignment);
+
+        TopicPartition topicPartition = new TopicPartition(topic, partition);
+
+        TestUtils.assertLeader(client, topicPartition, broker2);
+        cluster.shutdownBroker(broker3);
+        TestUtils.waitForBrokersOutOfIsr(client,
+            JavaConverters.asScalaBuffer(Collections.singletonList(topicPartition)).toSet(),
+            JavaConverters.asScalaBuffer(Collections.singletonList(broker3)).toSet()
+        );
+        cluster.shutdownBroker(broker2);
+        TestUtils.assertNoLeader(client, topicPartition);
+        cluster.startBroker(broker3);
+        TestUtils.waitForOnlineBroker(client, broker3);
+
+        LeaderElectionCommand.main(
+            new String[] {
+                "--bootstrap-server", cluster.bootstrapServers(),
+                "--election-type", "unclean",
+                "--all-topic-partitions"
+            }
+        );
+
+        TestUtils.assertLeader(client, topicPartition, broker3);
+    }
+
+    @ClusterTest
+    public void testTopicPartition() throws InterruptedException, ExecutionException {
+        String topic = "unclean-topic";
+        int partition = 0;
+        List<Integer> assignment = Arrays.asList(broker2, broker3);
+
+        cluster.waitForReadyBrokers();
+        Map<Integer, List<Integer>> partitionAssignment = new HashMap<>();
+        partitionAssignment.put(partition, assignment);
+        Admin client = cluster.createAdminClient();
+        createTopic(client, topic, partitionAssignment);
+
+        TopicPartition topicPartition = new TopicPartition(topic, partition);
+
+        TestUtils.assertLeader(client, topicPartition, broker2);
+
+        cluster.shutdownBroker(broker3);
+        TestUtils.waitForBrokersOutOfIsr(client,
+            JavaConverters.asScalaBuffer(Collections.singletonList(topicPartition)).toSet(),
+            JavaConverters.asScalaBuffer(Collections.singletonList(broker3)).toSet()
+        );
+        cluster.shutdownBroker(broker2);
+        TestUtils.assertNoLeader(client, topicPartition);
+        cluster.startBroker(broker3);
+        TestUtils.waitForOnlineBroker(client, broker3);
+
+        LeaderElectionCommand.main(
+            new String[] {
+                "--bootstrap-server", cluster.bootstrapServers(),
+                "--election-type", "unclean",
+                "--topic", topic,
+                "--partition", Integer.toString(partition)
+            }
+        );
+
+        TestUtils.assertLeader(client, topicPartition, broker3);
+    }
+
+    @ClusterTest
+    public void testPathToJsonFile() throws Exception {
+        String topic = "unclean-topic";
+        int partition = 0;
+        List<Integer> assignment = Arrays.asList(broker2, broker3);
+
+        cluster.waitForReadyBrokers();
+        Map<Integer, List<Integer>> partitionAssignment = new HashMap<>();
+        partitionAssignment.put(partition, assignment);
+
+        Admin client = cluster.createAdminClient();
+        createTopic(client, topic, partitionAssignment);
+
+        TopicPartition topicPartition = new TopicPartition(topic, partition);
+
+        TestUtils.assertLeader(client, topicPartition, broker2);
+
+        cluster.shutdownBroker(broker3);
+        TestUtils.waitForBrokersOutOfIsr(client,
+            JavaConverters.asScalaBuffer(Collections.singletonList(topicPartition)).toSet(),
+            JavaConverters.asScalaBuffer(Collections.singletonList(broker3)).toSet()
+        );
+        cluster.shutdownBroker(broker2);
+        TestUtils.assertNoLeader(client, topicPartition);
+        cluster.startBroker(broker3);
+        TestUtils.waitForOnlineBroker(client, broker3);
+
+        Path topicPartitionPath = tempTopicPartitionFile(Collections.singletonList(topicPartition));
+
+        LeaderElectionCommand.main(
+            new String[] {
+                "--bootstrap-server", cluster.bootstrapServers(),
+                "--election-type", "unclean",
+                "--path-to-json-file", topicPartitionPath.toString()
+            }
+        );
+
+        TestUtils.assertLeader(client, topicPartition, broker3);
+    }
+
+    @ClusterTest
+    public void testPreferredReplicaElection() throws InterruptedException, ExecutionException {
+        String topic = "preferred-topic";
+        int partition = 0;
+        List<Integer> assignment = Arrays.asList(broker2, broker3);
+
+        cluster.waitForReadyBrokers();
+        Admin client = cluster.createAdminClient();
+        Map<Integer, List<Integer>> partitionAssignment = new HashMap<>();
+        partitionAssignment.put(partition, assignment);
+
+        createTopic(client, topic, partitionAssignment);
+
+        TopicPartition topicPartition = new TopicPartition(topic, partition);
+
+        TestUtils.assertLeader(client, topicPartition, broker2);
+
+        cluster.shutdownBroker(broker2);
+        TestUtils.assertLeader(client, topicPartition, broker3);
+        cluster.startBroker(broker2);
+        TestUtils.waitForBrokersInIsr(client, topicPartition,
+            JavaConverters.asScalaBuffer(Collections.singletonList(broker2)).toSet()
+        );
+
+        LeaderElectionCommand.main(
+            new String[] {
+                "--bootstrap-server", cluster.bootstrapServers(),
+                "--election-type", "preferred",
+                "--all-topic-partitions"
+            }
+        );
+
+        TestUtils.assertLeader(client, topicPartition, broker2);
+    }
+
+    @ClusterTest
+    public void testTopicDoesNotExist() {
+        Throwable e =  assertThrows(AdminCommandFailedException.class, () -> LeaderElectionCommand.run(
+            Duration.ofSeconds(30),
+            new String[] {
+                "--bootstrap-server", cluster.bootstrapServers(),
+                "--election-type", "preferred",
+                "--topic", "unknown-topic-name",
+                "--partition", "0"
+            }));
+        assertTrue(e.getSuppressed()[0] instanceof UnknownTopicOrPartitionException);
+    }
+
+    @ClusterTest
+    public void testElectionResultOutput() throws Exception {
+        String topic = "non-preferred-topic";
+        int partition0 = 0;
+        int partition1 = 1;
+        List<Integer> assignment0 = Arrays.asList(broker2, broker3);
+        List<Integer> assignment1 = Arrays.asList(broker3, broker2);
+
+        cluster.waitForReadyBrokers();
+        Admin client = cluster.createAdminClient();
+        Map<Integer, List<Integer>> partitionAssignment = new HashMap<Integer, List<Integer>>();

Review Comment:
   We can remove the types on the right



##########
tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java:
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.test.ClusterConfig;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import kafka.utils.TestUtils;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.extension.ExtendWith;
+import scala.collection.JavaConverters;
+
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+@SuppressWarnings("deprecation")
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ALL, brokers = 3)
+@Tag("integration")
+public class LeaderElectionCommandTest {
+    private final ClusterInstance cluster;
+    int broker1 = 0;
+    int broker2 = 1;
+    int broker3 = 2;
+
+    public LeaderElectionCommandTest(ClusterInstance cluster) {
+        this.cluster = cluster;
+    }
+
+    @BeforeEach
+    void setup(ClusterConfig clusterConfig) {
+        TestUtils.verifyNoUnexpectedThreads("@BeforeEach");
+        clusterConfig.serverProperties().put(KafkaConfig.AutoLeaderRebalanceEnableProp(), "false");
+        clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownEnableProp(), "true");
+        clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownMaxRetriesProp(), "1");
+        clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownRetryBackoffMsProp(), "1000");
+        clusterConfig.serverProperties().put(KafkaConfig.OffsetsTopicReplicationFactorProp(), "2");
+    }
+
+    @ClusterTest
+    public void testAllTopicPartition() throws InterruptedException, ExecutionException {
+        String topic = "unclean-topic";
+        int partition = 0;
+        List<Integer> assignment = Arrays.asList(broker2, broker3);
+
+        cluster.waitForReadyBrokers();
+        Admin client = cluster.createAdminClient();
+        Map<Integer, List<Integer>> partitionAssignment = new HashMap<>();
+        partitionAssignment.put(partition, assignment);
+
+        createTopic(client, topic, partitionAssignment);
+
+        TopicPartition topicPartition = new TopicPartition(topic, partition);
+
+        TestUtils.assertLeader(client, topicPartition, broker2);
+        cluster.shutdownBroker(broker3);
+        TestUtils.waitForBrokersOutOfIsr(client,
+            JavaConverters.asScalaBuffer(Collections.singletonList(topicPartition)).toSet(),
+            JavaConverters.asScalaBuffer(Collections.singletonList(broker3)).toSet()
+        );

Review Comment:
   This can be simplified slightly into:
   ```
   TestUtils.waitForBrokersOutOfIsr(client,
                   JavaConverters.asScala(Collections.singleton(topicPartition)).toSet(),
                   JavaConverters.asScala(Collections.singleton(broker3)).toSet()
   );
   ```
   Same below



##########
tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java:
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.test.ClusterConfig;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import kafka.utils.TestUtils;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.extension.ExtendWith;
+import scala.collection.JavaConverters;
+
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+@SuppressWarnings("deprecation")
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ALL, brokers = 3)
+@Tag("integration")
+public class LeaderElectionCommandTest {
+    private final ClusterInstance cluster;
+    int broker1 = 0;
+    int broker2 = 1;
+    int broker3 = 2;
+
+    public LeaderElectionCommandTest(ClusterInstance cluster) {
+        this.cluster = cluster;
+    }
+
+    @BeforeEach
+    void setup(ClusterConfig clusterConfig) {
+        TestUtils.verifyNoUnexpectedThreads("@BeforeEach");
+        clusterConfig.serverProperties().put(KafkaConfig.AutoLeaderRebalanceEnableProp(), "false");
+        clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownEnableProp(), "true");
+        clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownMaxRetriesProp(), "1");
+        clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownRetryBackoffMsProp(), "1000");
+        clusterConfig.serverProperties().put(KafkaConfig.OffsetsTopicReplicationFactorProp(), "2");
+    }
+
+    @ClusterTest
+    public void testAllTopicPartition() throws InterruptedException, ExecutionException {
+        String topic = "unclean-topic";
+        int partition = 0;
+        List<Integer> assignment = Arrays.asList(broker2, broker3);
+
+        cluster.waitForReadyBrokers();
+        Admin client = cluster.createAdminClient();
+        Map<Integer, List<Integer>> partitionAssignment = new HashMap<>();
+        partitionAssignment.put(partition, assignment);
+
+        createTopic(client, topic, partitionAssignment);
+
+        TopicPartition topicPartition = new TopicPartition(topic, partition);
+
+        TestUtils.assertLeader(client, topicPartition, broker2);
+        cluster.shutdownBroker(broker3);
+        TestUtils.waitForBrokersOutOfIsr(client,
+            JavaConverters.asScalaBuffer(Collections.singletonList(topicPartition)).toSet(),
+            JavaConverters.asScalaBuffer(Collections.singletonList(broker3)).toSet()
+        );
+        cluster.shutdownBroker(broker2);
+        TestUtils.assertNoLeader(client, topicPartition);
+        cluster.startBroker(broker3);
+        TestUtils.waitForOnlineBroker(client, broker3);
+
+        LeaderElectionCommand.main(
+            new String[] {
+                "--bootstrap-server", cluster.bootstrapServers(),
+                "--election-type", "unclean",
+                "--all-topic-partitions"
+            }
+        );
+
+        TestUtils.assertLeader(client, topicPartition, broker3);
+    }
+
+    @ClusterTest
+    public void testTopicPartition() throws InterruptedException, ExecutionException {
+        String topic = "unclean-topic";
+        int partition = 0;
+        List<Integer> assignment = Arrays.asList(broker2, broker3);
+
+        cluster.waitForReadyBrokers();
+        Map<Integer, List<Integer>> partitionAssignment = new HashMap<>();
+        partitionAssignment.put(partition, assignment);
+        Admin client = cluster.createAdminClient();
+        createTopic(client, topic, partitionAssignment);
+
+        TopicPartition topicPartition = new TopicPartition(topic, partition);
+
+        TestUtils.assertLeader(client, topicPartition, broker2);
+
+        cluster.shutdownBroker(broker3);
+        TestUtils.waitForBrokersOutOfIsr(client,
+            JavaConverters.asScalaBuffer(Collections.singletonList(topicPartition)).toSet(),
+            JavaConverters.asScalaBuffer(Collections.singletonList(broker3)).toSet()
+        );
+        cluster.shutdownBroker(broker2);
+        TestUtils.assertNoLeader(client, topicPartition);
+        cluster.startBroker(broker3);
+        TestUtils.waitForOnlineBroker(client, broker3);
+
+        LeaderElectionCommand.main(
+            new String[] {
+                "--bootstrap-server", cluster.bootstrapServers(),
+                "--election-type", "unclean",
+                "--topic", topic,
+                "--partition", Integer.toString(partition)
+            }
+        );
+
+        TestUtils.assertLeader(client, topicPartition, broker3);
+    }
+
+    @ClusterTest
+    public void testPathToJsonFile() throws Exception {
+        String topic = "unclean-topic";
+        int partition = 0;
+        List<Integer> assignment = Arrays.asList(broker2, broker3);
+
+        cluster.waitForReadyBrokers();
+        Map<Integer, List<Integer>> partitionAssignment = new HashMap<>();
+        partitionAssignment.put(partition, assignment);
+
+        Admin client = cluster.createAdminClient();
+        createTopic(client, topic, partitionAssignment);
+
+        TopicPartition topicPartition = new TopicPartition(topic, partition);
+
+        TestUtils.assertLeader(client, topicPartition, broker2);
+
+        cluster.shutdownBroker(broker3);
+        TestUtils.waitForBrokersOutOfIsr(client,
+            JavaConverters.asScalaBuffer(Collections.singletonList(topicPartition)).toSet(),
+            JavaConverters.asScalaBuffer(Collections.singletonList(broker3)).toSet()
+        );
+        cluster.shutdownBroker(broker2);
+        TestUtils.assertNoLeader(client, topicPartition);
+        cluster.startBroker(broker3);
+        TestUtils.waitForOnlineBroker(client, broker3);
+
+        Path topicPartitionPath = tempTopicPartitionFile(Collections.singletonList(topicPartition));
+
+        LeaderElectionCommand.main(
+            new String[] {
+                "--bootstrap-server", cluster.bootstrapServers(),
+                "--election-type", "unclean",
+                "--path-to-json-file", topicPartitionPath.toString()
+            }
+        );
+
+        TestUtils.assertLeader(client, topicPartition, broker3);
+    }
+
+    @ClusterTest
+    public void testPreferredReplicaElection() throws InterruptedException, ExecutionException {
+        String topic = "preferred-topic";
+        int partition = 0;
+        List<Integer> assignment = Arrays.asList(broker2, broker3);
+
+        cluster.waitForReadyBrokers();
+        Admin client = cluster.createAdminClient();
+        Map<Integer, List<Integer>> partitionAssignment = new HashMap<>();
+        partitionAssignment.put(partition, assignment);
+
+        createTopic(client, topic, partitionAssignment);
+
+        TopicPartition topicPartition = new TopicPartition(topic, partition);
+
+        TestUtils.assertLeader(client, topicPartition, broker2);
+
+        cluster.shutdownBroker(broker2);
+        TestUtils.assertLeader(client, topicPartition, broker3);
+        cluster.startBroker(broker2);
+        TestUtils.waitForBrokersInIsr(client, topicPartition,
+            JavaConverters.asScalaBuffer(Collections.singletonList(broker2)).toSet()
+        );
+
+        LeaderElectionCommand.main(
+            new String[] {
+                "--bootstrap-server", cluster.bootstrapServers(),
+                "--election-type", "preferred",
+                "--all-topic-partitions"
+            }
+        );
+
+        TestUtils.assertLeader(client, topicPartition, broker2);
+    }
+
+    @ClusterTest
+    public void testTopicDoesNotExist() {
+        Throwable e =  assertThrows(AdminCommandFailedException.class, () -> LeaderElectionCommand.run(
+            Duration.ofSeconds(30),
+            new String[] {
+                "--bootstrap-server", cluster.bootstrapServers(),
+                "--election-type", "preferred",
+                "--topic", "unknown-topic-name",
+                "--partition", "0"
+            }));
+        assertTrue(e.getSuppressed()[0] instanceof UnknownTopicOrPartitionException);
+    }
+
+    @ClusterTest
+    public void testElectionResultOutput() throws Exception {
+        String topic = "non-preferred-topic";
+        int partition0 = 0;
+        int partition1 = 1;
+        List<Integer> assignment0 = Arrays.asList(broker2, broker3);
+        List<Integer> assignment1 = Arrays.asList(broker3, broker2);
+
+        cluster.waitForReadyBrokers();
+        Admin client = cluster.createAdminClient();
+        Map<Integer, List<Integer>> partitionAssignment = new HashMap<Integer, List<Integer>>();
+        partitionAssignment.put(partition0, assignment0);
+        partitionAssignment.put(partition1, assignment1);
+
+        createTopic(client, topic, partitionAssignment);
+
+        TopicPartition topicPartition0 = new TopicPartition(topic, partition0);
+        TopicPartition topicPartition1 = new TopicPartition(topic, partition1);
+
+        TestUtils.assertLeader(client, topicPartition0, broker2);
+        TestUtils.assertLeader(client, topicPartition1, broker3);
+
+        cluster.shutdownBroker(broker2);
+        TestUtils.assertLeader(client, topicPartition0, broker3);
+        cluster.startBroker(broker2);
+        TestUtils.waitForBrokersInIsr(client, topicPartition0,
+            JavaConverters.asScalaBuffer(Collections.singletonList(broker2)).toSet()
+        );
+        TestUtils.waitForBrokersInIsr(client, topicPartition1,
+            JavaConverters.asScalaBuffer(Collections.singletonList(broker2)).toSet()
+        );
+
+        Path topicPartitionPath = tempTopicPartitionFile(Arrays.asList(topicPartition0, topicPartition1));
+        String output = ToolsTestUtils.captureStandardOut(() ->
+            LeaderElectionCommand.main(
+                new String[] {
+                    "--bootstrap-server", cluster.bootstrapServers(),
+                    "--election-type", "preferred",
+                    "--path-to-json-file", topicPartitionPath.toString()
+                }
+            )
+        );
+
+        Iterator<String> electionResultOutputIter = Arrays.stream(output.split("\n")).iterator();
+
+        assertTrue(electionResultOutputIter.hasNext());
+        String firstLine = electionResultOutputIter.next();
+        assertTrue(firstLine.contains(String.format(
+            "Successfully completed leader election (PREFERRED) for partitions %s", topicPartition0)),
+            String.format("Unexpected output: %s", firstLine));
+
+        assertTrue(electionResultOutputIter.hasNext());
+        String secondLine = electionResultOutputIter.next();
+        assertTrue(secondLine.contains(String.format("Valid replica already elected for partitions %s", topicPartition1)),
+            String.format("Unexpected output: %s", secondLine));
+    }
+    private static Map<String, Object> createConfig(List<KafkaServer> servers) {

Review Comment:
   Do we need this method? If I remove it all tests still pass.
   If it's not needed that would remove the need to import `KafkaServer`. Then we could also replace the `KafkaConfig` usage by their literals to fully remove the dependency onto `kafka.server`



##########
tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java:
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.test.ClusterConfig;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import kafka.utils.TestUtils;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.extension.ExtendWith;
+import scala.collection.JavaConverters;
+
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+@SuppressWarnings("deprecation")
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ALL, brokers = 3)
+@Tag("integration")
+public class LeaderElectionCommandTest {
+    private final ClusterInstance cluster;
+    int broker1 = 0;
+    int broker2 = 1;
+    int broker3 = 2;
+
+    public LeaderElectionCommandTest(ClusterInstance cluster) {
+        this.cluster = cluster;
+    }
+
+    @BeforeEach
+    void setup(ClusterConfig clusterConfig) {
+        TestUtils.verifyNoUnexpectedThreads("@BeforeEach");
+        clusterConfig.serverProperties().put(KafkaConfig.AutoLeaderRebalanceEnableProp(), "false");
+        clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownEnableProp(), "true");
+        clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownMaxRetriesProp(), "1");
+        clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownRetryBackoffMsProp(), "1000");
+        clusterConfig.serverProperties().put(KafkaConfig.OffsetsTopicReplicationFactorProp(), "2");
+    }
+
+    @ClusterTest
+    public void testAllTopicPartition() throws InterruptedException, ExecutionException {
+        String topic = "unclean-topic";
+        int partition = 0;
+        List<Integer> assignment = Arrays.asList(broker2, broker3);
+
+        cluster.waitForReadyBrokers();
+        Admin client = cluster.createAdminClient();
+        Map<Integer, List<Integer>> partitionAssignment = new HashMap<>();
+        partitionAssignment.put(partition, assignment);
+
+        createTopic(client, topic, partitionAssignment);

Review Comment:
   This can be simplified into:
   ```
   createTopic(client, topic, Collections.singletonMap(partition, assignment));
   ```
   
   Same below



##########
tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandErrorTest.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * For some error cases, we can save a little build time by avoiding the overhead for
+ * cluster creation and cleanup because the command is expected to fail immediately.
+ */
+public class LeaderElectionCommandErrorTest {
+    @Test
+    public void testTopicWithoutPartition() {
+        String out = ToolsTestUtils.captureStandardErr(() -> LeaderElectionCommand.main(
+                "--bootstrap-server", "nohost:9092",
+                "--election-type", "unclean",
+                "--topic", "some-topic"
+            ));
+        assertTrue(out.startsWith("Missing required option(s)"));
+        assertTrue(out.contains(" partition"));
+    }
+
+    @Test
+    public void testPartitionWithoutTopic() {
+        String out = ToolsTestUtils.captureStandardErr(() -> LeaderElectionCommand.main(
+                "--bootstrap-server", "nohost:9092",
+                "--election-type", "unclean",
+                "--all-topic-partitions",
+                "--partition", "0"
+        ));
+        String[] rows = out.split("\n");

Review Comment:
   Could we use `assertTrue(out.startsWith("Option partition is only allowed if topic is used"));`?



##########
tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java:
##########
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.databind.JsonMappingException;
+import joptsimple.AbstractOptionSpec;
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpecBuilder;
+import joptsimple.util.EnumConverter;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.ElectionNotNeededException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.apache.kafka.server.common.AdminOperationException;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.Json;
+import org.apache.kafka.server.util.json.DecodeJson;
+import org.apache.kafka.server.util.json.JsonObject;
+import org.apache.kafka.server.util.json.JsonValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+public class LeaderElectionCommand {
+    private static final Logger LOG = LoggerFactory.getLogger(LeaderElectionCommand.class);
+    private static final DecodeJson.DecodeString STRING = new DecodeJson.DecodeString();
+    private static final DecodeJson.DecodeInteger INT = new DecodeJson.DecodeInteger();
+
+    public static void main(String... args) {
+        try {
+            run(Duration.ofMillis(30000), args);
+        } catch (Exception e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+        }
+    }
+
+    static void run(Duration timeoutMs, String... args) throws Exception {
+        LeaderElectionCommandOptions commandOptions = new LeaderElectionCommandOptions(args);
+
+        commandOptions.maybePrintHelpOrVersion();
+
+        commandOptions.validate();
+        ElectionType electionType = commandOptions.getElectionType();
+        Optional<Set<TopicPartition>> jsonFileTopicPartitions =
+            Optional.ofNullable(commandOptions.getPathToJsonFile())
+                .map(path -> parseReplicaElectionData(path));
+
+        Optional<String> topicOption = Optional.ofNullable(commandOptions.getTopic());
+        Optional<Integer> partitionOption = Optional.ofNullable(commandOptions.getPartition());
+        final Optional<Set<TopicPartition>> singleTopicPartition =
+            (topicOption.isPresent() && partitionOption.isPresent()) ?
+                Optional.of(Collections.singleton(new TopicPartition(topicOption.get(), partitionOption.get()))) :
+                Optional.empty();
+
+        /* Note: No need to look at --all-topic-partitions as we want this to be null if it is use.
+         * The validate function should be checking that this option is required if the --topic and --path-to-json-file
+         * are not specified.
+         */
+        Optional<Set<TopicPartition>> topicPartitions = jsonFileTopicPartitions.map(Optional::of).orElse(singleTopicPartition);
+
+        Properties props = new Properties();
+        if (commandOptions.hasAdminClientConfig()) {
+            props.putAll(Utils.loadProps(commandOptions.getAdminClientConfig()));
+        }
+        props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, commandOptions.getBootstrapServer());
+        props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, Long.toString(timeoutMs.toMillis()));
+        props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, Long.toString(timeoutMs.toMillis() / 2));
+
+        try (Admin adminClient = Admin.create(props)) {
+            electLeaders(adminClient, electionType, topicPartitions);
+        }
+    }
+
+    private static void electLeaders(Admin client, ElectionType electionType, Optional<Set<TopicPartition>> partitions) {
+        LOG.debug(String.format("Calling AdminClient.electLeaders(%s, %s)", electionType, partitions.orElse(null)));
+        Map<TopicPartition, Optional<Throwable>> electionResults;
+        try {
+            electionResults = client.electLeaders(electionType, partitions.orElse(null)).partitions().get();
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof TimeoutException) {
+                String message = "Timeout waiting for election results";
+                System.out.println(message);
+                throw new AdminCommandFailedException(message, e.getCause());
+            } else if (e.getCause() instanceof ClusterAuthorizationException) {
+                String message = "Not authorized to perform leader election";
+                System.out.println(message);
+                throw new AdminCommandFailedException(message, e.getCause().getCause());
+            } else {
+                throw new RuntimeException(e);
+            }
+        } catch (InterruptedException e) {
+            System.out.println("Error while making request");
+            throw new RuntimeException(e);
+        }
+
+        Set<TopicPartition> succeeded = new HashSet<>();
+        Set<TopicPartition> noop = new HashSet<>();
+        Map<TopicPartition, Throwable> failed = new HashMap<>();
+
+        electionResults.entrySet().stream().forEach(entry -> {
+            Optional<Throwable> error = entry.getValue();
+            if (error.isPresent()) {
+                if (error.get() instanceof ElectionNotNeededException) {
+                    noop.add(entry.getKey());
+                } else {
+                    failed.put(entry.getKey(), error.get());
+                }
+            } else {
+                succeeded.add(entry.getKey());
+            }
+        });
+
+        if (!succeeded.isEmpty()) {
+            String partitionsAsString = succeeded.stream()
+                .map(TopicPartition::toString)
+                .collect(Collectors.joining(", "));
+            System.out.println(String.format("Successfully completed leader election (%s) for partitions %s",
+                electionType, partitionsAsString));
+        }
+
+        if (!noop.isEmpty()) {
+            String partitionsAsString = noop.stream()
+                .map(TopicPartition::toString)
+                .collect(Collectors.joining(", "));
+            System.out.println(String.format("Valid replica already elected for partitions %s", partitionsAsString));
+        }
+
+        if (!failed.isEmpty()) {
+            AdminCommandFailedException rootException =
+                new AdminCommandFailedException(String.format("%s replica(s) could not be elected", failed.size()));
+            failed.entrySet().forEach(entry -> {
+                System.out.println(String.format("Error completing leader election (%s) for partition: %s: %s",
+                    electionType, entry.getKey(), entry.getValue()));
+                rootException.addSuppressed(entry.getValue());
+            });
+            throw rootException;
+        }
+    }
+
+    private static Set<TopicPartition> parseReplicaElectionData(String path) {
+        Optional<JsonValue> jsonFile;
+        try {
+            jsonFile = Json.parseFull(Utils.readFileAsString(path));
+            return jsonFile.map(js -> {
+                try {
+                    return topicPartitions(js);
+                } catch (JsonMappingException e) {
+                    throw new RuntimeException(e);
+                }
+            }).orElseThrow(() -> new AdminOperationException("Replica election data is empty"));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static Set<TopicPartition> topicPartitions(JsonValue js) throws JsonMappingException {
+        return js.asJsonObject().get("partitions")
+            .map(partitionsList -> {
+                try {
+                    return toTopicPartition(partitionsList);
+                } catch (JsonMappingException e) {
+                    throw new RuntimeException(e);
+                }
+            })
+            .orElseThrow(() -> new AdminOperationException("Replica election data is missing \"partitions\" field"));
+    }
+
+    private static Set<TopicPartition> toTopicPartition(JsonValue partitionsList) throws JsonMappingException {
+        List<TopicPartition> partitions = new ArrayList<>();
+        Iterator<JsonValue> iterator = partitionsList.asJsonArray().iterator();
+
+        while (iterator.hasNext()) {
+            JsonObject partitionJs = iterator.next().asJsonObject();
+            String topic = partitionJs.apply("topic").to(STRING);
+            int partition = partitionJs.apply("partition").to(INT);
+            partitions.add(new TopicPartition(topic, partition));
+        }
+
+        Set<TopicPartition> duplicatePartitions  = partitions.stream()
+            .filter(i -> Collections.frequency(partitions, i) > 1)
+            .collect(Collectors.toSet());
+
+        if (duplicatePartitions.size() > 0) {
+            throw new AdminOperationException(String.format(
+                "Replica election data contains duplicate partitions: %s", String.join(",", duplicatePartitions.toString()))
+            );
+        }
+        return new HashSet<>(partitions);
+    }
+
+    static class LeaderElectionCommandOptions extends CommandDefaultOptions {
+        private final ArgumentAcceptingOptionSpec<String> bootstrapServer;
+        private final ArgumentAcceptingOptionSpec<String> adminClientConfig;
+        private final ArgumentAcceptingOptionSpec<String> pathToJsonFile;
+        private final ArgumentAcceptingOptionSpec<String> topic;
+        private final ArgumentAcceptingOptionSpec<Integer> partition;
+        private final OptionSpecBuilder allTopicPartitions;
+        private final ArgumentAcceptingOptionSpec<ElectionType> electionType;
+        public LeaderElectionCommandOptions(String[] args) {
+            super(args);
+            bootstrapServer = parser
+                .accepts(
+                    "bootstrap-server",
+                    "A hostname and port for the broker to connect to, in the form host:port. Multiple comma separated URLs can be given. REQUIRED.")
+                .withRequiredArg()
+                .describedAs("host:port")
+                .ofType(String.class);
+            adminClientConfig = parser
+                .accepts(
+                    "admin.config",
+                    "Configuration properties files to pass to the admin client")
+                .withRequiredArg()
+                .describedAs("config file")
+                .ofType(String.class);
+            pathToJsonFile = parser
+                .accepts(
+                    "path-to-json-file",
+                    "The JSON file with the list  of partition for which leader elections should be performed. This is an example format. \n{\"partitions\":\n\t[{\"topic\": \"foo\", \"partition\": 1},\n\t {\"topic\": \"foobar\", \"partition\": 2}]\n}\nNot allowed if --all-topic-partitions or --topic flags are specified.")
+                .withRequiredArg()
+                .describedAs("Path to JSON file")
+                .ofType(String.class);
+            topic = parser
+                .accepts(
+                    "topic",
+                    "Name of topic for which to perform an election. Not allowed if --path-to-json-file or --all-topic-partitions is specified.")
+                .withRequiredArg()
+                .describedAs("topic name")
+                .ofType(String.class);
+
+            partition = parser
+                .accepts(
+                    "partition",
+                    "Partition id for which to perform an election. REQUIRED if --topic is specified.")
+                .withRequiredArg()
+                .describedAs("partition id")
+                .ofType(Integer.class);
+
+            allTopicPartitions = parser
+                .accepts(
+                    "all-topic-partitions",
+                    "Perform election on all of the eligible topic partitions based on the type of election (see the --election-type flag). Not allowed if --topic or --path-to-json-file is specified.");
+            electionType = parser
+                .accepts(
+                    "election-type",
+                    "Type of election to attempt. Possible values are \"preferred\" for preferred leader election or \"unclean\" for unclean leader election. If preferred election is selection, the election is only performed if the current leader is not the preferred leader for the topic partition. If unclean election is selected, the election is only performed if there are no leader for the topic partition. REQUIRED.")
+                .withRequiredArg()
+                .describedAs("election type")
+                .withValuesConvertedBy(new ElectionTypeConverter());
+
+            options = parser.parse(args);
+        }
+
+        public boolean hasAdminClientConfig() {
+            return options.has(adminClientConfig);
+        }
+
+        public ElectionType getElectionType() {
+            return options.valueOf(electionType);
+        }
+
+        public String getPathToJsonFile() {
+            return options.valueOf(pathToJsonFile);
+        }
+
+        public String getBootstrapServer() {
+            return options.valueOf(bootstrapServer);
+        }
+
+        public String getAdminClientConfig() {
+            return options.valueOf(adminClientConfig);
+        }
+
+        public String getTopic() {
+            return options.valueOf(topic);
+        }
+
+        public Integer getPartition() {
+            return options.valueOf(partition);
+        }
+
+        public void validate() {
+            // required options: --bootstrap-server and --election-type
+            List<String> missingOptions = new ArrayList<>();
+
+            if (!options.has(bootstrapServer)) {
+                missingOptions.add(bootstrapServer.options().get(0).toString());

Review Comment:
   I don't think we need `toString()` here



##########
tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java:
##########
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.databind.JsonMappingException;
+import joptsimple.AbstractOptionSpec;
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpecBuilder;
+import joptsimple.util.EnumConverter;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.ElectionNotNeededException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.apache.kafka.server.common.AdminOperationException;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.Json;
+import org.apache.kafka.server.util.json.DecodeJson;
+import org.apache.kafka.server.util.json.JsonObject;
+import org.apache.kafka.server.util.json.JsonValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+public class LeaderElectionCommand {
+    private static final Logger LOG = LoggerFactory.getLogger(LeaderElectionCommand.class);
+    private static final DecodeJson.DecodeString STRING = new DecodeJson.DecodeString();
+    private static final DecodeJson.DecodeInteger INT = new DecodeJson.DecodeInteger();
+
+    public static void main(String... args) {
+        try {
+            run(Duration.ofMillis(30000), args);
+        } catch (Exception e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+        }
+    }
+
+    static void run(Duration timeoutMs, String... args) throws Exception {
+        LeaderElectionCommandOptions commandOptions = new LeaderElectionCommandOptions(args);
+
+        commandOptions.maybePrintHelpOrVersion();
+
+        commandOptions.validate();
+        ElectionType electionType = commandOptions.getElectionType();
+        Optional<Set<TopicPartition>> jsonFileTopicPartitions =
+            Optional.ofNullable(commandOptions.getPathToJsonFile())
+                .map(path -> parseReplicaElectionData(path));
+
+        Optional<String> topicOption = Optional.ofNullable(commandOptions.getTopic());
+        Optional<Integer> partitionOption = Optional.ofNullable(commandOptions.getPartition());
+        final Optional<Set<TopicPartition>> singleTopicPartition =
+            (topicOption.isPresent() && partitionOption.isPresent()) ?
+                Optional.of(Collections.singleton(new TopicPartition(topicOption.get(), partitionOption.get()))) :
+                Optional.empty();
+
+        /* Note: No need to look at --all-topic-partitions as we want this to be null if it is use.
+         * The validate function should be checking that this option is required if the --topic and --path-to-json-file
+         * are not specified.
+         */
+        Optional<Set<TopicPartition>> topicPartitions = jsonFileTopicPartitions.map(Optional::of).orElse(singleTopicPartition);
+
+        Properties props = new Properties();
+        if (commandOptions.hasAdminClientConfig()) {
+            props.putAll(Utils.loadProps(commandOptions.getAdminClientConfig()));
+        }
+        props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, commandOptions.getBootstrapServer());
+        props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, Long.toString(timeoutMs.toMillis()));
+        props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, Long.toString(timeoutMs.toMillis() / 2));
+
+        try (Admin adminClient = Admin.create(props)) {
+            electLeaders(adminClient, electionType, topicPartitions);
+        }
+    }
+
+    private static void electLeaders(Admin client, ElectionType electionType, Optional<Set<TopicPartition>> partitions) {
+        LOG.debug(String.format("Calling AdminClient.electLeaders(%s, %s)", electionType, partitions.orElse(null)));

Review Comment:
   Can we use string interpolation instead of using `String.format()` here?
   For example something like: `LOG.debug("Calling AdminClient.electLeaders({}, {})", electionType, partitions.orElse(null));`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] KAFKA-14593: Move LeaderElectionCommand to tools [kafka]

Posted by "nizhikov (via GitHub)" <gi...@apache.org>.
nizhikov commented on PR #13204:
URL: https://github.com/apache/kafka/pull/13204#issuecomment-1745233460

   Hello @OmniaGM @mimaison @fvaleri
   
   Looks like https://github.com/apache/kafka/commit/7553d3f562f3af6c7f9b062b9220bcad80b00478 overlaps with https://github.com/apache/kafka/commit/8f8dbad564ffd9be409bb85edadbc40659cd0a56 and break trunk compilation.
   
   I prepared #14475 to fix compilation. Please, take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] KAFKA-14593: Move LeaderElectionCommand to tools [kafka]

Posted by "mimaison (via GitHub)" <gi...@apache.org>.
mimaison merged PR #13204:
URL: https://github.com/apache/kafka/pull/13204


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] OmniaGM commented on a diff in pull request #13204: KAFKA-14593: Move LeaderElectionCommand to tools

Posted by "OmniaGM (via GitHub)" <gi...@apache.org>.
OmniaGM commented on code in PR #13204:
URL: https://github.com/apache/kafka/pull/13204#discussion_r1103879807


##########
build.gradle:
##########
@@ -1756,6 +1756,7 @@ project(':tools') {
 
   dependencies {
     implementation project(':clients')
+    implementation project(':core')

Review Comment:
   It would be bit harder to move them to tools. For example the command uses `kafka.utils.json` which is used by `kafka.server`, `kafka.security.authorizer`. And `kafka.admin.AdminOperationException` is used by `kafka.admin`, `kafka.controller`, `kafka.zk`, etc. So not sure it's easy to move them out. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mimaison commented on a diff in pull request #13204: KAFKA-14593: Move LeaderElectionCommand to tools

Posted by "mimaison (via GitHub)" <gi...@apache.org>.
mimaison commented on code in PR #13204:
URL: https://github.com/apache/kafka/pull/13204#discussion_r1113274635


##########
build.gradle:
##########
@@ -1756,6 +1756,7 @@ project(':tools') {
 
   dependencies {
     implementation project(':clients')
+    implementation project(':core')

Review Comment:
   Let's avoid making tools depend on core if possible. I opened https://issues.apache.org/jira/browse/KAFKA-14737 to move the kafka.utils.json classes to server-common.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mimaison commented on pull request #13204: KAFKA-14593: Move LeaderElectionCommand to tools

Posted by "mimaison (via GitHub)" <gi...@apache.org>.
mimaison commented on PR #13204:
URL: https://github.com/apache/kafka/pull/13204#issuecomment-1688394680

   To be honest, I'm not sure. Try rebasing on trunk. Maybe there's a conflict with a recent commit.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mimaison commented on pull request #13204: KAFKA-14593: Move LeaderElectionCommand to tools

Posted by "mimaison (via GitHub)" <gi...@apache.org>.
mimaison commented on PR #13204:
URL: https://github.com/apache/kafka/pull/13204#issuecomment-1685892269

   I've re-kicked a build, hopefully that will clear the test failures.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mimaison commented on pull request #13204: KAFKA-14593: Move LeaderElectionCommand to tools

Posted by "mimaison (via GitHub)" <gi...@apache.org>.
mimaison commented on PR #13204:
URL: https://github.com/apache/kafka/pull/13204#issuecomment-1686299835

   Still with all the `LeaderElectionCommandTest` failures
   https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13204/15/testReport/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13204: KAFKA-14593: Move LeaderElectionCommand to tools

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on code in PR #13204:
URL: https://github.com/apache/kafka/pull/13204#discussion_r1289694241


##########
tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java:
##########
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.databind.JsonMappingException;
+import joptsimple.AbstractOptionSpec;
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpecBuilder;
+import joptsimple.util.EnumConverter;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.ElectionNotNeededException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.apache.kafka.server.common.AdminOperationException;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.Json;
+import org.apache.kafka.server.util.json.DecodeJson;
+import org.apache.kafka.server.util.json.JsonObject;
+import org.apache.kafka.server.util.json.JsonValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+public class LeaderElectionCommand {
+    private static final Logger LOG = LoggerFactory.getLogger(LeaderElectionCommand.class);
+    private static final DecodeJson.DecodeString STRING = new DecodeJson.DecodeString();
+    private static final DecodeJson.DecodeInteger INT = new DecodeJson.DecodeInteger();
+
+    public static void main(String... args) {
+        try {
+            run(Duration.ofMillis(30000), args);
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+        } catch (Exception e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+        }
+    }
+
+    static void run(Duration timeoutMs, String... args) throws Exception {
+        LeaderElectionCommandOptions commandOptions = new LeaderElectionCommandOptions(args);
+
+        commandOptions.maybePrintHelpOrVersion();
+
+        commandOptions.validate();
+        ElectionType electionType = commandOptions.getElectionType();
+        Optional<Set<TopicPartition>> jsonFileTopicPartitions =
+            Optional.ofNullable(commandOptions.getPathToJsonFile())
+                .map(path -> parseReplicaElectionData(path));
+
+        Optional<String> topicOption = Optional.ofNullable(commandOptions.getTopic());
+        Optional<Integer> partitionOption = Optional.ofNullable(commandOptions.getPartition());
+        final Optional<Set<TopicPartition>> singleTopicPartition =
+            (topicOption.isPresent() && partitionOption.isPresent()) ?
+                Optional.of(Collections.singleton(new TopicPartition(topicOption.get(), partitionOption.get()))) :
+                Optional.empty();
+
+        /* Note: No need to look at --all-topic-partitions as we want this to be null if it is use.
+         * The validate function should be checking that this option is required if the --topic and --path-to-json-file
+         * are not specified.
+         */
+        Optional<Set<TopicPartition>> topicPartitions = jsonFileTopicPartitions.map(Optional::of).orElse(singleTopicPartition);
+
+        Properties props = new Properties();
+        if (commandOptions.hasAdminClientConfig()) {
+            props.putAll(Utils.loadProps(commandOptions.getAdminClientConfig()));
+        }
+        props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, commandOptions.getBootstrapServer());
+        props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, Long.toString(timeoutMs.toMillis()));
+        props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, Long.toString(timeoutMs.toMillis() / 2));
+
+        try (Admin adminClient = Admin.create(props)) {
+            electLeaders(adminClient, electionType, topicPartitions);
+        }
+    }
+
+    private static void electLeaders(Admin client, ElectionType electionType, Optional<Set<TopicPartition>> partitions) {
+        LOG.debug(String.format("Calling AdminClient.electLeaders(%s, %s)", electionType, partitions.orElse(null)));

Review Comment:
   Oh yes, I missed that. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] OmniaGM commented on pull request #13204: KAFKA-14593: Move LeaderElectionCommand to tools

Posted by "OmniaGM (via GitHub)" <gi...@apache.org>.
OmniaGM commented on PR #13204:
URL: https://github.com/apache/kafka/pull/13204#issuecomment-1686311572

   > Still with all the `LeaderElectionCommandTest` failures https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13204/15/testReport/
   
   I will have a look into the `LeaderElectionCommandTest` ones.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] OmniaGM commented on pull request #13204: KAFKA-14593: Move LeaderElectionCommand to tools

Posted by "OmniaGM (via GitHub)" <gi...@apache.org>.
OmniaGM commented on PR #13204:
URL: https://github.com/apache/kafka/pull/13204#issuecomment-1689824516

   I managed to reproduce it locally. Seems like any test that includes shutting down a broker and bringing it back up result in an odd situation where the leader of the topic shrinks the ISR to itself even though there is another replica online. 
   
   So far the only thing I'm noticing is that every time the test restarts a broker it gets this error  
   ```
   org.apache.kafka.server.fault.FaultHandlerException: nonFatalFaultHandler: Error applying topics delta in MetadataDelta up to 12: org/apache/kafka/server/log/remote/storage/RemoteStorageException
   java.lang.NoClassDefFoundError: org/apache/kafka/server/log/remote/storage/RemoteStorageException
   	at kafka.server.ReplicaFetcherThread.<init>(ReplicaFetcherThread.scala:40)
   	at kafka.server.ReplicaFetcherManager.createFetcherThread(ReplicaFetcherManager.scala:50)
   	at kafka.server.ReplicaFetcherManager.createFetcherThread(ReplicaFetcherManager.scala:26)
   	at kafka.server.AbstractFetcherManager.addAndStartFetcherThread$1(AbstractFetcherManager.scala:136)
   	at kafka.server.AbstractFetcherManager.$anonfun$addFetcherForPartitions$3(AbstractFetcherManager.scala:152)
   	at kafka.server.AbstractFetcherManager.$anonfun$addFetcherForPartitions$3$adapted(AbstractFetcherManager.scala:142)
   	at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:576)
   	at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:574)
   	at scala.collection.AbstractIterable.foreach(Iterable.scala:933)
   	at scala.collection.IterableOps$WithFilter.foreach(Iterable.scala:903)
   	at kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:142)
   	at kafka.server.ReplicaManager.applyLocalFollowersDelta(ReplicaManager.scala:2609)
   	at kafka.server.ReplicaManager.applyDelta(ReplicaManager.scala:2482)
   	at kafka.server.metadata.BrokerMetadataPublisher.$anonfun$onMetadataUpdate$7(BrokerMetadataPublisher.scala:176)
   	at scala.Option.foreach(Option.scala:437)
   	at kafka.server.metadata.BrokerMetadataPublisher.onMetadataUpdate(BrokerMetadataPublisher.scala:173)
   	at org.apache.kafka.image.loader.MetadataLoader.initializeNewPublishers(MetadataLoader.java:305)
   	at org.apache.kafka.image.loader.MetadataLoader.lambda$scheduleInitializeNewPublishers$0(MetadataLoader.java:262)
   	at org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
   	at org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
   	at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
   	at java.base/java.lang.Thread.run(Thread.java:833)
   Caused by: java.lang.ClassNotFoundException: org.apache.kafka.server.log.remote.storage.RemoteStorageException
   	at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641)
   	at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188)
   	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
   	... 22 more
   ``` 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] OmniaGM commented on a diff in pull request #13204: KAFKA-14593: Move LeaderElectionCommand to tools

Posted by "OmniaGM (via GitHub)" <gi...@apache.org>.
OmniaGM commented on code in PR #13204:
URL: https://github.com/apache/kafka/pull/13204#discussion_r1294456310


##########
tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java:
##########
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.databind.JsonMappingException;
+import joptsimple.AbstractOptionSpec;
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpecBuilder;
+import joptsimple.util.EnumConverter;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.ElectionNotNeededException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.apache.kafka.server.common.AdminOperationException;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.Json;
+import org.apache.kafka.server.util.json.DecodeJson;
+import org.apache.kafka.server.util.json.JsonObject;
+import org.apache.kafka.server.util.json.JsonValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+public class LeaderElectionCommand {
+    private static final Logger LOG = LoggerFactory.getLogger(LeaderElectionCommand.class);
+    private static final DecodeJson.DecodeString STRING = new DecodeJson.DecodeString();
+    private static final DecodeJson.DecodeInteger INT = new DecodeJson.DecodeInteger();
+
+    public static void main(String... args) {
+        try {
+            run(Duration.ofMillis(30000), args);
+        } catch (Exception e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+        }
+    }
+
+    static void run(Duration timeoutMs, String... args) throws Exception {
+        LeaderElectionCommandOptions commandOptions = new LeaderElectionCommandOptions(args);
+
+        commandOptions.maybePrintHelpOrVersion();
+
+        commandOptions.validate();
+        ElectionType electionType = commandOptions.getElectionType();
+        Optional<Set<TopicPartition>> jsonFileTopicPartitions =
+            Optional.ofNullable(commandOptions.getPathToJsonFile())
+                .map(path -> parseReplicaElectionData(path));
+
+        Optional<String> topicOption = Optional.ofNullable(commandOptions.getTopic());
+        Optional<Integer> partitionOption = Optional.ofNullable(commandOptions.getPartition());
+        final Optional<Set<TopicPartition>> singleTopicPartition =
+            (topicOption.isPresent() && partitionOption.isPresent()) ?
+                Optional.of(Collections.singleton(new TopicPartition(topicOption.get(), partitionOption.get()))) :
+                Optional.empty();
+
+        /* Note: No need to look at --all-topic-partitions as we want this to be null if it is use.
+         * The validate function should be checking that this option is required if the --topic and --path-to-json-file
+         * are not specified.
+         */
+        Optional<Set<TopicPartition>> topicPartitions = jsonFileTopicPartitions.map(Optional::of).orElse(singleTopicPartition);
+
+        Properties props = new Properties();
+        if (commandOptions.hasAdminClientConfig()) {
+            props.putAll(Utils.loadProps(commandOptions.getAdminClientConfig()));
+        }
+        props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, commandOptions.getBootstrapServer());
+        props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, Long.toString(timeoutMs.toMillis()));
+        props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, Long.toString(timeoutMs.toMillis() / 2));
+
+        try (Admin adminClient = Admin.create(props)) {
+            electLeaders(adminClient, electionType, topicPartitions);
+        }
+    }
+
+    private static void electLeaders(Admin client, ElectionType electionType, Optional<Set<TopicPartition>> partitions) {
+        LOG.debug(String.format("Calling AdminClient.electLeaders(%s, %s)", electionType, partitions.orElse(null)));
+        Map<TopicPartition, Optional<Throwable>> electionResults;
+        try {
+            electionResults = client.electLeaders(electionType, partitions.orElse(null)).partitions().get();
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof TimeoutException) {
+                String message = "Timeout waiting for election results";
+                System.out.println(message);
+                throw new AdminCommandFailedException(message, e.getCause());
+            } else if (e.getCause() instanceof ClusterAuthorizationException) {
+                String message = "Not authorized to perform leader election";
+                System.out.println(message);
+                throw new AdminCommandFailedException(message, e.getCause().getCause());
+            } else {
+                throw new RuntimeException(e);
+            }
+        } catch (InterruptedException e) {
+            System.out.println("Error while making request");
+            throw new RuntimeException(e);
+        }
+
+        Set<TopicPartition> succeeded = new HashSet<>();
+        Set<TopicPartition> noop = new HashSet<>();
+        Map<TopicPartition, Throwable> failed = new HashMap<>();
+
+        electionResults.entrySet().stream().forEach(entry -> {
+            Optional<Throwable> error = entry.getValue();
+            if (error.isPresent()) {
+                if (error.get() instanceof ElectionNotNeededException) {
+                    noop.add(entry.getKey());
+                } else {
+                    failed.put(entry.getKey(), error.get());
+                }
+            } else {
+                succeeded.add(entry.getKey());
+            }
+        });
+
+        if (!succeeded.isEmpty()) {
+            String partitionsAsString = succeeded.stream()
+                .map(TopicPartition::toString)
+                .collect(Collectors.joining(", "));
+            System.out.println(String.format("Successfully completed leader election (%s) for partitions %s",
+                electionType, partitionsAsString));
+        }
+
+        if (!noop.isEmpty()) {
+            String partitionsAsString = noop.stream()
+                .map(TopicPartition::toString)
+                .collect(Collectors.joining(", "));
+            System.out.println(String.format("Valid replica already elected for partitions %s", partitionsAsString));
+        }
+
+        if (!failed.isEmpty()) {
+            AdminCommandFailedException rootException =
+                new AdminCommandFailedException(String.format("%s replica(s) could not be elected", failed.size()));
+            failed.entrySet().forEach(entry -> {
+                System.out.println(String.format("Error completing leader election (%s) for partition: %s: %s",
+                    electionType, entry.getKey(), entry.getValue()));
+                rootException.addSuppressed(entry.getValue());
+            });
+            throw rootException;
+        }
+    }
+
+    private static Set<TopicPartition> parseReplicaElectionData(String path) {
+        Optional<JsonValue> jsonFile;
+        try {
+            jsonFile = Json.parseFull(Utils.readFileAsString(path));
+            return jsonFile.map(js -> {
+                try {
+                    return topicPartitions(js);
+                } catch (JsonMappingException e) {
+                    throw new RuntimeException(e);
+                }
+            }).orElseThrow(() -> new AdminOperationException("Replica election data is empty"));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static Set<TopicPartition> topicPartitions(JsonValue js) throws JsonMappingException {
+        return js.asJsonObject().get("partitions")
+            .map(partitionsList -> {
+                try {
+                    return toTopicPartition(partitionsList);
+                } catch (JsonMappingException e) {
+                    throw new RuntimeException(e);
+                }
+            })
+            .orElseThrow(() -> new AdminOperationException("Replica election data is missing \"partitions\" field"));
+    }
+
+    private static Set<TopicPartition> toTopicPartition(JsonValue partitionsList) throws JsonMappingException {
+        List<TopicPartition> partitions = new ArrayList<>();
+        Iterator<JsonValue> iterator = partitionsList.asJsonArray().iterator();
+
+        while (iterator.hasNext()) {
+            JsonObject partitionJs = iterator.next().asJsonObject();
+            String topic = partitionJs.apply("topic").to(STRING);
+            int partition = partitionJs.apply("partition").to(INT);
+            partitions.add(new TopicPartition(topic, partition));
+        }
+
+        Set<TopicPartition> duplicatePartitions  = partitions.stream()
+            .filter(i -> Collections.frequency(partitions, i) > 1)
+            .collect(Collectors.toSet());
+
+        if (duplicatePartitions.size() > 0) {
+            throw new AdminOperationException(String.format(
+                "Replica election data contains duplicate partitions: %s", String.join(",", duplicatePartitions.toString()))
+            );
+        }
+        return new HashSet<>(partitions);
+    }
+
+    static class LeaderElectionCommandOptions extends CommandDefaultOptions {
+        private final ArgumentAcceptingOptionSpec<String> bootstrapServer;
+        private final ArgumentAcceptingOptionSpec<String> adminClientConfig;
+        private final ArgumentAcceptingOptionSpec<String> pathToJsonFile;
+        private final ArgumentAcceptingOptionSpec<String> topic;
+        private final ArgumentAcceptingOptionSpec<Integer> partition;
+        private final OptionSpecBuilder allTopicPartitions;
+        private final ArgumentAcceptingOptionSpec<ElectionType> electionType;
+        public LeaderElectionCommandOptions(String[] args) {
+            super(args);
+            bootstrapServer = parser
+                .accepts(
+                    "bootstrap-server",
+                    "A hostname and port for the broker to connect to, in the form host:port. Multiple comma separated URLs can be given. REQUIRED.")
+                .withRequiredArg()
+                .describedAs("host:port")
+                .ofType(String.class);
+            adminClientConfig = parser
+                .accepts(
+                    "admin.config",
+                    "Configuration properties files to pass to the admin client")
+                .withRequiredArg()
+                .describedAs("config file")
+                .ofType(String.class);
+            pathToJsonFile = parser
+                .accepts(
+                    "path-to-json-file",
+                    "The JSON file with the list  of partition for which leader elections should be performed. This is an example format. \n{\"partitions\":\n\t[{\"topic\": \"foo\", \"partition\": 1},\n\t {\"topic\": \"foobar\", \"partition\": 2}]\n}\nNot allowed if --all-topic-partitions or --topic flags are specified.")
+                .withRequiredArg()
+                .describedAs("Path to JSON file")
+                .ofType(String.class);
+            topic = parser
+                .accepts(
+                    "topic",
+                    "Name of topic for which to perform an election. Not allowed if --path-to-json-file or --all-topic-partitions is specified.")
+                .withRequiredArg()
+                .describedAs("topic name")
+                .ofType(String.class);
+
+            partition = parser
+                .accepts(
+                    "partition",
+                    "Partition id for which to perform an election. REQUIRED if --topic is specified.")
+                .withRequiredArg()
+                .describedAs("partition id")
+                .ofType(Integer.class);
+
+            allTopicPartitions = parser
+                .accepts(
+                    "all-topic-partitions",
+                    "Perform election on all of the eligible topic partitions based on the type of election (see the --election-type flag). Not allowed if --topic or --path-to-json-file is specified.");
+            electionType = parser
+                .accepts(
+                    "election-type",
+                    "Type of election to attempt. Possible values are \"preferred\" for preferred leader election or \"unclean\" for unclean leader election. If preferred election is selection, the election is only performed if the current leader is not the preferred leader for the topic partition. If unclean election is selected, the election is only performed if there are no leader for the topic partition. REQUIRED.")
+                .withRequiredArg()
+                .describedAs("election type")
+                .withValuesConvertedBy(new ElectionTypeConverter());
+
+            options = parser.parse(args);
+        }
+
+        public boolean hasAdminClientConfig() {
+            return options.has(adminClientConfig);
+        }
+
+        public ElectionType getElectionType() {
+            return options.valueOf(electionType);
+        }
+
+        public String getPathToJsonFile() {
+            return options.valueOf(pathToJsonFile);
+        }
+
+        public String getBootstrapServer() {
+            return options.valueOf(bootstrapServer);
+        }
+
+        public String getAdminClientConfig() {
+            return options.valueOf(adminClientConfig);
+        }
+
+        public String getTopic() {
+            return options.valueOf(topic);
+        }
+
+        public Integer getPartition() {
+            return options.valueOf(partition);
+        }
+
+        public void validate() {
+            // required options: --bootstrap-server and --election-type
+            List<String> missingOptions = new ArrayList<>();
+
+            if (!options.has(bootstrapServer)) {
+                missingOptions.add(bootstrapServer.options().get(0).toString());

Review Comment:
   removed `toString`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] OmniaGM commented on pull request #13204: KAFKA-14593: Move LeaderElectionCommand to tools

Posted by "OmniaGM (via GitHub)" <gi...@apache.org>.
OmniaGM commented on PR #13204:
URL: https://github.com/apache/kafka/pull/13204#issuecomment-1682276851

   The test is failing because of an unrelated package `:storage:test` with JDK 11 and Scala 2.13. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] OmniaGM commented on pull request #13204: KAFKA-14593: Move LeaderElectionCommand to tools

Posted by "OmniaGM (via GitHub)" <gi...@apache.org>.
OmniaGM commented on PR #13204:
URL: https://github.com/apache/kafka/pull/13204#issuecomment-1719152037

   The test is failing for different packages https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13204/19/testReport/ and one of the tasks succeeded in building but failed with an error 
   ```
   Publishing failed.
   
   The response from https://ge.apache.org/scans/publish/gradle/3.14.1/token was not from Gradle Enterprise.
   The specified server address may be incorrect, or your network environment may be interfering.
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] OmniaGM commented on a diff in pull request #13204: KAFKA-14593: Move LeaderElectionCommand to tools

Posted by "OmniaGM (via GitHub)" <gi...@apache.org>.
OmniaGM commented on code in PR #13204:
URL: https://github.com/apache/kafka/pull/13204#discussion_r1288886617


##########
tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java:
##########
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.databind.JsonMappingException;
+import joptsimple.AbstractOptionSpec;
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpecBuilder;
+import joptsimple.util.EnumConverter;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.ElectionNotNeededException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.apache.kafka.server.common.AdminOperationException;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.Json;
+import org.apache.kafka.server.util.json.DecodeJson;
+import org.apache.kafka.server.util.json.JsonObject;
+import org.apache.kafka.server.util.json.JsonValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+public class LeaderElectionCommand {
+    private static final Logger LOG = LoggerFactory.getLogger(LeaderElectionCommand.class);
+    private static final DecodeJson.DecodeString STRING = new DecodeJson.DecodeString();
+    private static final DecodeJson.DecodeInteger INT = new DecodeJson.DecodeInteger();
+
+    public static void main(String... args) {
+        try {
+            run(Duration.ofMillis(30000), args);
+        } catch (TerseException e) {

Review Comment:
   Removed the `TerseException`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] OmniaGM commented on a diff in pull request #13204: KAFKA-14593: Move LeaderElectionCommand to tools

Posted by "OmniaGM (via GitHub)" <gi...@apache.org>.
OmniaGM commented on code in PR #13204:
URL: https://github.com/apache/kafka/pull/13204#discussion_r1288886278


##########
tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandErrorTest.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * For some error cases, we can save a little build time by avoiding the overhead for
+ * cluster creation and cleanup because the command is expected to fail immediately.
+ */
+public class LeaderElectionCommandErrorTest {
+    @Test
+    public void testTopicWithoutPartition() {
+        String out = ToolsTestUtils.captureStandardErr(() -> LeaderElectionCommand.main(
+            new String[] {

Review Comment:
   Remove it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org