You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/03/15 17:33:40 UTC
[kafka] branch trunk updated: KAFKA-6656;
Config tool should return non-zero status code on failure (#4711)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 372b4c6 KAFKA-6656; Config tool should return non-zero status code on failure (#4711)
372b4c6 is described below
commit 372b4c6a775461c6ca54d4c58078718f5f046216
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Thu Mar 15 10:33:29 2018 -0700
KAFKA-6656; Config tool should return non-zero status code on failure (#4711)
Prior to this patch, we caught some exceptions when executing the command, which meant that it would return with status code zero. This patch fixes this and makes the expected exit behavior explicit. Test cases have been added to verify the change.
Reviewers: Ismael Juma <is...@juma.me.uk>
---
.../src/main/scala/kafka/admin/ConfigCommand.scala | 67 +++++++++++-----------
.../scala/unit/kafka/admin/ConfigCommandTest.scala | 59 ++++++++++++++++---
2 files changed, 86 insertions(+), 40 deletions(-)
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index ddf6dcd..044be6a 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -25,11 +25,11 @@ import kafka.common.Config
import kafka.common.InvalidConfigException
import kafka.log.LogConfig
import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig}
-import kafka.utils.CommandLineUtils
+import kafka.utils.{CommandLineUtils, Exit}
import kafka.utils.Implicits._
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.clients.admin.{AlterConfigsOptions, Config => JConfig, ConfigEntry, DescribeConfigsOptions, AdminClient => JAdminClient}
+import org.apache.kafka.clients.admin.{AlterConfigsOptions, ConfigEntry, DescribeConfigsOptions, AdminClient => JAdminClient, Config => JConfig}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.scram._
@@ -65,35 +65,43 @@ object ConfigCommand extends Config {
DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp)
def main(args: Array[String]): Unit = {
+ try {
+ val opts = new ConfigCommandOptions(args)
- val opts = new ConfigCommandOptions(args)
-
- if(args.length == 0)
- CommandLineUtils.printUsageAndDie(opts.parser, "Add/Remove entity config for a topic, client, user or broker")
-
- opts.checkArgs()
-
- val time = Time.SYSTEM
+ if (args.length == 0)
+ CommandLineUtils.printUsageAndDie(opts.parser, "Add/Remove entity config for a topic, client, user or broker")
- if (opts.options.has(opts.zkConnectOpt)) {
- val zkClient = KafkaZkClient(opts.options.valueOf(opts.zkConnectOpt), JaasUtils.isZkSecurityEnabled, 30000, 30000,
- Int.MaxValue, time)
- val adminZkClient = new AdminZkClient(zkClient)
+ opts.checkArgs()
- try {
- if (opts.options.has(opts.alterOpt))
- alterConfig(zkClient, opts, adminZkClient)
- else if (opts.options.has(opts.describeOpt))
- describeConfig(zkClient, opts, adminZkClient)
- } catch {
- case e: Throwable =>
- println("Error while executing config command " + e.getMessage)
- println(Utils.stackTrace(e))
- } finally {
- zkClient.close()
+ if (opts.options.has(opts.zkConnectOpt)) {
+ processCommandWithZk(opts.options.valueOf(opts.zkConnectOpt), opts)
+ } else {
+ processBrokerConfig(opts)
}
- } else {
- processBrokerConfig(opts)
+ } catch {
+ case e @ (_: IllegalArgumentException | _: InvalidConfigException | _: OptionException) =>
+ logger.debug(s"Failed config command with args $args", e)
+ System.err.println(e.getMessage)
+ Exit.exit(1)
+
+ case t: Throwable =>
+ System.err.println(s"Error while executing config command with args $args")
+ t.printStackTrace(System.err)
+ Exit.exit(1)
+ }
+ }
+
+ private def processCommandWithZk(zkConnectString: String, opts: ConfigCommandOptions): Unit = {
+ val zkClient = KafkaZkClient(zkConnectString, JaasUtils.isZkSecurityEnabled, 30000, 30000,
+ Int.MaxValue, Time.SYSTEM)
+ val adminZkClient = new AdminZkClient(zkClient)
+ try {
+ if (opts.options.has(opts.alterOpt))
+ alterConfig(zkClient, opts, adminZkClient)
+ else if (opts.options.has(opts.describeOpt))
+ describeConfig(zkClient, opts, adminZkClient)
+ } finally {
+ zkClient.close()
}
}
@@ -217,14 +225,9 @@ object ConfigCommand extends Config {
alterBrokerConfig(adminClient, opts, entityName)
else if (opts.options.has(opts.describeOpt))
describeBrokerConfig(adminClient, opts, entityName)
- } catch {
- case e: Throwable =>
- println("Error while executing config command " + e.getMessage)
- println(Utils.stackTrace(e))
} finally {
adminClient.close()
}
-
}
private[admin] def alterBrokerConfig(adminClient: JAdminClient, opts: ConfigCommandOptions, entityName: String) {
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
index b3c46fa..a17f060 100644
--- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -22,7 +22,7 @@ import java.util.Properties
import kafka.admin.ConfigCommand.ConfigCommandOptions
import kafka.common.InvalidConfigException
import kafka.server.ConfigEntityName
-import kafka.utils.Logging
+import kafka.utils.{Exit, Logging}
import kafka.zk.{AdminZkClient, KafkaZkClient, ZooKeeperTestHarness}
import org.apache.kafka.clients.admin._
import org.apache.kafka.common.config.ConfigResource
@@ -38,6 +38,49 @@ import scala.collection.mutable
import scala.collection.JavaConverters._
class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
+
+ @Test
+ def shouldExitWithNonZeroStatusOnArgError(): Unit = {
+ assertNonZeroStatusExit(Array("--blah"))
+ }
+
+ @Test
+ def shouldExitWithNonZeroStatusOnZkCommandError(): Unit = {
+ assertNonZeroStatusExit(Array(
+ "--zookeeper", zkConnect,
+ "--entity-name", "1",
+ "--entity-type", "brokers",
+ "--alter",
+ "--add-config", "message.max.size=100000"))
+ }
+
+ @Test
+ def shouldExitWithNonZeroStatusOnBrokerCommandError(): Unit = {
+ assertNonZeroStatusExit(Array(
+ "--bootstrap-server", "invalid host",
+ "--entity-type", "brokers",
+ "--entity-name", "1",
+ "--describe"))
+ }
+
+ private def assertNonZeroStatusExit(args: Array[String]): Unit = {
+ var exitStatus: Option[Int] = None
+ Exit.setExitProcedure { (status, _) =>
+ exitStatus = Some(status)
+ throw new RuntimeException
+ }
+
+ try {
+ ConfigCommand.main(args)
+ } catch {
+ case e: RuntimeException =>
+ } finally {
+ Exit.resetExitProcedure()
+ }
+
+ assertEquals(Some(1), exitStatus)
+ }
+
@Test
def shouldParseArgumentsForClientsEntityType() {
testArgumentParse("clients")
@@ -111,7 +154,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
"--alter",
"--add-config", "a=b,c=d"))
- case class TestAdminZkClient(val zkClient: KafkaZkClient) extends AdminZkClient(zkClient) {
+ class TestAdminZkClient(zkClient: KafkaZkClient) extends AdminZkClient(zkClient) {
override def changeClientIdConfig(clientId: String, configChange: Properties): Unit = {
assertEquals("my-client-id", clientId)
assertEquals("b", configChange.get("a"))
@@ -130,7 +173,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
"--alter",
"--add-config", "a=b,c=d"))
- case class TestAdminZkClient(val zkClient: KafkaZkClient) extends AdminZkClient(zkClient) {
+ class TestAdminZkClient(zkClient: KafkaZkClient) extends AdminZkClient(zkClient) {
override def changeTopicConfig(topic: String, configChange: Properties): Unit = {
assertEquals("my-topic", topic)
assertEquals("b", configChange.get("a"))
@@ -149,7 +192,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
"--alter",
"--add-config", "leader.replication.throttled.rate=10,follower.replication.throttled.rate=20"))
- case class TestAdminZkClient(val zkClient: KafkaZkClient) extends AdminZkClient(zkClient) {
+ class TestAdminZkClient(zkClient: KafkaZkClient) extends AdminZkClient(zkClient) {
override def changeBrokerConfig(brokerIds: Seq[Int], configChange: Properties): Unit = {
assertEquals(Seq(1), brokerIds)
assertEquals("10", configChange.get("leader.replication.throttled.rate"))
@@ -225,7 +268,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
"--alter",
"--add-config", "a=b,c=[d,e ,f],g=[h,i]"))
- case class TestAdminZkClient(val zkClient: KafkaZkClient) extends AdminZkClient(zkClient) {
+ class TestAdminZkClient(zkClient: KafkaZkClient) extends AdminZkClient(zkClient) {
override def changeBrokerConfig(brokerIds: Seq[Int], configChange: Properties): Unit = {
assertEquals(Seq(1), brokerIds)
assertEquals("b", configChange.get("a"))
@@ -297,7 +340,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
"--alter",
"--delete-config", "a,c"))
- case class TestAdminZkClient(val zkClient: KafkaZkClient) extends AdminZkClient(zkClient) {
+ class TestAdminZkClient(zkClient: KafkaZkClient) extends AdminZkClient(zkClient) {
override def fetchEntityConfig(entityType: String, entityName: String): Properties = {
val properties: Properties = new Properties
properties.put("a", "b")
@@ -332,7 +375,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
"--delete-config", mechanism))
val credentials = mutable.Map[String, Properties]()
- case class CredentialChange(val user: String, val mechanisms: Set[String], val iterations: Int) extends AdminZkClient(zkClient) {
+ case class CredentialChange(user: String, mechanisms: Set[String], iterations: Int) extends AdminZkClient(zkClient) {
override def fetchEntityConfig(entityType: String, entityName: String): Properties = {
credentials.getOrElse(entityName, new Properties())
}
@@ -536,7 +579,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
Seq("<default>/clients/client-3", sanitizedPrincipal + "/clients/client-2"))
}
- case class DummyAdminZkClient(zkClient: KafkaZkClient) extends AdminZkClient(zkClient) {
+ class DummyAdminZkClient(zkClient: KafkaZkClient) extends AdminZkClient(zkClient) {
override def changeBrokerConfig(brokerIds: Seq[Int], configs: Properties): Unit = {}
override def fetchEntityConfig(entityType: String, entityName: String): Properties = {new Properties}
override def changeClientIdConfig(clientId: String, configs: Properties): Unit = {}
--
To stop receiving notification emails like this one, please contact
jgus@apache.org.