You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/01/04 17:51:34 UTC

kafka git commit: KAFKA-3052; Broker properties get logged twice if acl enabled

Repository: kafka
Updated Branches:
  refs/heads/trunk 267952460 -> f9642e2a9


KAFKA-3052; Broker properties get logged twice if acl enabled

Fix it by making it possible to pass the `doLog` parameter to `AbstractConfig`. As explained in the code comments, this means that we can continue to benefit from ZK default settings as specified in `KafkaConfig` without having to duplicate code.

Also:
* Removed unused private methods from `KafkaConfig`
* Removed `case` modifier from `KafkaConfig` so that `hashCode`, `equals`
and `toString` from `AbstractConfig` are used.
* Made `props` a `val` and added `apply` method to `KafkaConfig` to
remain binary compatible.
* Call authorizer.close even if an exception is thrown during `configure`.

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Guozhang Wang

Closes #725 from ijuma/kafka-3052-broker-properties-get-logged-twice-if-acl-enabled


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f9642e2a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f9642e2a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f9642e2a

Branch: refs/heads/trunk
Commit: f9642e2a9878faff81366dbc885a206727bd7c7b
Parents: 2679524
Author: Ismael Juma <is...@juma.me.uk>
Authored: Mon Jan 4 08:51:30 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Jan 4 08:51:30 2016 -0800

----------------------------------------------------------------------
 .../src/main/scala/kafka/admin/AclCommand.scala |  6 ++-
 .../security/auth/SimpleAclAuthorizer.scala     | 15 ++++---
 .../main/scala/kafka/server/KafkaConfig.scala   | 45 ++++++--------------
 .../scala/unit/kafka/admin/AclCommandTest.scala |  2 +-
 4 files changed, 28 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f9642e2a/core/src/main/scala/kafka/admin/AclCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala
index 505be5a..841b278 100644
--- a/core/src/main/scala/kafka/admin/AclCommand.scala
+++ b/core/src/main/scala/kafka/admin/AclCommand.scala
@@ -67,8 +67,10 @@ object AclCommand {
 
     val authorizerClass = opts.options.valueOf(opts.authorizerOpt)
     val authZ = CoreUtils.createObject[Authorizer](authorizerClass)
-    authZ.configure(authorizerProperties.asJava)
-    try f(authZ)
+    try {
+      authZ.configure(authorizerProperties.asJava)
+      f(authZ)
+    }
     finally CoreUtils.swallow(authZ.close())
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f9642e2a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index d0d226c..780bdf3 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -80,18 +80,21 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
   override def configure(javaConfigs: util.Map[String, _]) {
     val configs = javaConfigs.asScala
     val props = new java.util.Properties()
-    configs foreach { case (key, value) => props.put(key, value.toString) }
-    val kafkaConfig = KafkaConfig.fromProps(props)
+    configs.foreach { case (key, value) => props.put(key, value.toString) }
 
     superUsers = configs.get(SimpleAclAuthorizer.SuperUsersProp).collect {
       case str: String if str.nonEmpty => str.split(";").map(s => KafkaPrincipal.fromString(s.trim)).toSet
     }.getOrElse(Set.empty[KafkaPrincipal])
 
-    shouldAllowEveryoneIfNoAclIsFound = configs.get(SimpleAclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).map(_.toString.toBoolean).getOrElse(false)
+    shouldAllowEveryoneIfNoAclIsFound = configs.get(SimpleAclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.toBoolean)
 
-    val zkUrl = configs.getOrElse(SimpleAclAuthorizer.ZkUrlProp, kafkaConfig.zkConnect).toString
-    val zkConnectionTimeoutMs = configs.getOrElse(SimpleAclAuthorizer.ZkConnectionTimeOutProp, kafkaConfig.zkConnectionTimeoutMs).toString.toInt
-    val zkSessionTimeOutMs = configs.getOrElse(SimpleAclAuthorizer.ZkSessionTimeOutProp, kafkaConfig.zkSessionTimeoutMs).toString.toInt
+    // Use `KafkaConfig` in order to get the default ZK config values if not present in `javaConfigs`. Note that this
+    // means that `KafkaConfig.zkConnect` must always be set by the user (even if `SimpleAclAuthorizer.ZkUrlProp` is also
+    // set).
+    val kafkaConfig = KafkaConfig.fromProps(props, doLog = false)
+    val zkUrl = configs.get(SimpleAclAuthorizer.ZkUrlProp).map(_.toString).getOrElse(kafkaConfig.zkConnect)
+    val zkConnectionTimeoutMs = configs.get(SimpleAclAuthorizer.ZkConnectionTimeOutProp).map(_.toString.toInt).getOrElse(kafkaConfig.zkConnectionTimeoutMs)
+    val zkSessionTimeOutMs = configs.get(SimpleAclAuthorizer.ZkSessionTimeOutProp).map(_.toString.toInt).getOrElse(kafkaConfig.zkSessionTimeoutMs)
 
     zkUtils = ZkUtils(zkUrl,
                       zkConnectionTimeoutMs,

http://git-wip-us.apache.org/repos/asf/kafka/blob/f9642e2a/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 856742f..9556799 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -17,7 +17,6 @@
 
 package kafka.server
 
-import java.util
 import java.util.Properties
 
 import kafka.api.ApiVersion
@@ -32,7 +31,6 @@ import org.apache.kafka.common.config.SaslConfigs
 import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, SslConfigs}
 import org.apache.kafka.common.metrics.MetricsReporter
 import org.apache.kafka.common.protocol.SecurityProtocol
-import org.apache.kafka.common.security.auth.PrincipalBuilder
 
 import scala.collection.{Map, immutable}
 
@@ -687,19 +685,27 @@ object KafkaConfig {
       require(names.contains(name), "Unknown configuration \"%s\".".format(name))
   }
 
-  def fromProps(props: Properties): KafkaConfig = {
-    KafkaConfig(props)
-  }
+  def fromProps(props: Properties): KafkaConfig =
+    fromProps(props, true)
+
+  def fromProps(props: Properties, doLog: Boolean): KafkaConfig =
+    new KafkaConfig(props, doLog)
 
-  def fromProps(defaults: Properties, overrides: Properties): KafkaConfig = {
+  def fromProps(defaults: Properties, overrides: Properties): KafkaConfig =
+    fromProps(defaults, overrides, true)
+
+  def fromProps(defaults: Properties, overrides: Properties, doLog: Boolean): KafkaConfig = {
     val props = new Properties()
     props.putAll(defaults)
     props.putAll(overrides)
-    fromProps(props)
+    fromProps(props, doLog)
   }
+
+  def apply(props: java.util.Map[_, _]): KafkaConfig = new KafkaConfig(props, true)
+
 }
 
-case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(KafkaConfig.configDef, props) {
+class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends AbstractConfig(KafkaConfig.configDef, props, doLog) {
 
   /** ********* Zookeeper Configuration ***********/
   val zkConnect: String = getString(KafkaConfig.ZkConnectProp)
@@ -916,29 +922,6 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka
     }
   }
 
-  private def getMetricClasses(metricClasses: java.util.List[String]): java.util.List[MetricsReporter] = {
-
-    val reporterList = new util.ArrayList[MetricsReporter]()
-    val iterator = metricClasses.iterator()
-
-    while (iterator.hasNext) {
-      val reporterName = iterator.next()
-      if (!reporterName.isEmpty) {
-        val reporter: MetricsReporter = CoreUtils.createObject[MetricsReporter](reporterName)
-        reporter.configure(originals)
-        reporterList.add(reporter)
-      }
-    }
-
-    reporterList
-
-  }
-
-
-  private def getPrincipalBuilderClass(principalBuilderClass: String): PrincipalBuilder = {
-    CoreUtils.createObject[PrincipalBuilder](principalBuilderClass)
-  }
-
   validateValues()
 
   private def validateValues() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f9642e2a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
index 65393d8..9802811 100644
--- a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
@@ -131,7 +131,7 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
   }
 
   def withAuthorizer(props: Properties)(f: Authorizer => Unit) {
-    val kafkaConfig = KafkaConfig.fromProps(props)
+    val kafkaConfig = KafkaConfig.fromProps(props, doLog = false)
     val authZ = new SimpleAclAuthorizer
     try {
       authZ.configure(kafkaConfig.originals)