You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/02/10 05:49:02 UTC

kafka git commit: KAFKA-3141: Add checks to catch invalid authorizer porperties

Repository: kafka
Updated Branches:
  refs/heads/trunk 6d0dca734 -> fe3b7492b


KAFKA-3141: Add checks to catch invalid authorizer porperties

Skip misformed properties instead of throwing ArrayIndexOutOfBoundsException

Author: Ashish Singh <as...@cloudera.com>

Reviewers: Ismael Juma, Gwen Shapira

Closes #806 from SinghAsDev/KAFKA-3141


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fe3b7492
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fe3b7492
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fe3b7492

Branch: refs/heads/trunk
Commit: fe3b7492b73bf21ecb1a49e730dd30efd1e66693
Parents: 6d0dca7
Author: Ashish Singh <as...@cloudera.com>
Authored: Tue Feb 9 21:48:56 2016 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Tue Feb 9 21:48:56 2016 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/admin/AclCommand.scala       | 12 +++++++-----
 core/src/main/scala/kafka/utils/CommandLineUtils.scala |  7 +++++--
 .../test/scala/unit/kafka/admin/AclCommandTest.scala   |  7 +++++++
 .../scala/unit/kafka/utils/CommandLineUtilsTest.scala  | 13 ++++++++++---
 4 files changed, 29 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/fe3b7492/core/src/main/scala/kafka/admin/AclCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala
index 3a82e89..134ac38 100644
--- a/core/src/main/scala/kafka/admin/AclCommand.scala
+++ b/core/src/main/scala/kafka/admin/AclCommand.scala
@@ -59,11 +59,13 @@ object AclCommand {
   }
 
   def withAuthorizer(opts: AclCommandOptions)(f: Authorizer => Unit) {
-    var authorizerProperties = Map.empty[String, Any]
-    if (opts.options.has(opts.authorizerPropertiesOpt)) {
-      val props = opts.options.valuesOf(opts.authorizerPropertiesOpt).asScala.map(_.split("="))
-      props.foreach(pair => authorizerProperties += (pair(0).trim -> pair(1).trim))
-    }
+    val authorizerProperties =
+      if (opts.options.has(opts.authorizerPropertiesOpt)) {
+        val authorizerProperties = opts.options.valuesOf(opts.authorizerPropertiesOpt).asScala
+        CommandLineUtils.parseKeyValueArgs(authorizerProperties, acceptMissingValue = false).asScala
+      } else {
+        Map.empty[String, Any]
+      }
 
     val authorizerClass = opts.options.valueOf(opts.authorizerOpt)
     val authZ = CoreUtils.createObject[Authorizer](authorizerClass)

http://git-wip-us.apache.org/repos/asf/kafka/blob/fe3b7492/core/src/main/scala/kafka/utils/CommandLineUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/CommandLineUtils.scala b/core/src/main/scala/kafka/utils/CommandLineUtils.scala
index c51735d..87a5b00 100644
--- a/core/src/main/scala/kafka/utils/CommandLineUtils.scala
+++ b/core/src/main/scala/kafka/utils/CommandLineUtils.scala
@@ -59,12 +59,15 @@ object CommandLineUtils extends Logging {
   /**
    * Parse key-value pairs in the form key=value
    */
-  def parseKeyValueArgs(args: Iterable[String]): Properties = {
+  def parseKeyValueArgs(args: Iterable[String], acceptMissingValue: Boolean = true): Properties = {
     val splits = args.map(_ split "=").filterNot(_.length == 0)
 
     val props = new Properties
     for(a <- splits) {
-      if (a.length == 1) props.put(a(0), "")
+      if (a.length == 1) {
+        if (acceptMissingValue) props.put(a(0), "")
+        else throw new IllegalArgumentException(s"Missing value for key ${a(0)}")
+      }
       else if (a.length == 2) props.put(a(0), a(1))
       else {
         System.err.println("Invalid command line properties: " + args.mkString(" "))

http://git-wip-us.apache.org/repos/asf/kafka/blob/fe3b7492/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
index 88c87a7..d43d0d4 100644
--- a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
@@ -19,6 +19,7 @@ package kafka.admin
 import java.io.StringReader
 import java.util.Properties
 
+import kafka.admin.AclCommand.AclCommandOptions
 import kafka.security.auth._
 import kafka.server.KafkaConfig
 import kafka.utils.{Logging, TestUtils}
@@ -107,6 +108,12 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
     }
   }
 
+  @Test (expected = classOf[IllegalArgumentException])
+  def testInvalidAuthorizerProperty() {
+    val args = Array("--authorizer-properties", "zookeeper.connect " + zkConnect)
+    AclCommand.withAuthorizer(new AclCommandOptions(args))(null)
+  }
+
   private def testRemove(resources: Set[Resource], resourceCmd: Array[String], args: Array[String], brokerProps: Properties) {
     for (resource <- resources) {
       Console.withIn(new StringReader(s"y${AclCommand.Newline}" * resources.size)) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/fe3b7492/core/src/test/scala/unit/kafka/utils/CommandLineUtilsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/CommandLineUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/CommandLineUtilsTest.scala
index 068526e..6cc868d 100644
--- a/core/src/test/scala/unit/kafka/utils/CommandLineUtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/CommandLineUtilsTest.scala
@@ -23,11 +23,18 @@ import org.junit.Test
 class CommandLineUtilsTest {
 
 
-  @Test
+  @Test (expected = classOf[java.lang.IllegalArgumentException])
   def testParseEmptyArg() {
     val argArray = Array("my.empty.property=")
+    CommandLineUtils.parseKeyValueArgs(argArray, false)
+  }
+
+
+  @Test
+  def testParseEmptyArgAsValid() {
+    val argArray = Array("my.empty.property=")
     val props = CommandLineUtils.parseKeyValueArgs(argArray)
-    assertEquals("Empty value should be equal to empty string",props.getProperty("my.empty.property"),"")
+    assertEquals("Value of a key with missing value should be an empty string",props.getProperty("my.empty.property"),"")
   }
 
   @Test
@@ -40,7 +47,7 @@ class CommandLineUtilsTest {
   @Test
   def testParseArgs() {
     val argArray = Array("first.property=first","second.property=second")
-    val props = CommandLineUtils.parseKeyValueArgs(argArray)
+    val props = CommandLineUtils.parseKeyValueArgs(argArray, false)
     assertEquals("Value of first property should be 'first'",props.getProperty("first.property"),"first")
     assertEquals("Value of second property should be 'second'",props.getProperty("second.property"),"second")
   }