You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/10/31 17:02:04 UTC

kafka git commit: KAFKA-2680; Use IBM ConfigFile class to load jaas config if IBM JDK

Repository: kafka
Updated Branches:
  refs/heads/trunk b94435699 -> 9d8dd9f10


KAFKA-2680; Use IBM ConfigFile class to load jaas config if IBM JDK

Use IBM ConfigFile class with IBM JDK since JavaLoginConfig provided by SUN provider is not included with IBM JDK.

Author: Rajini Sivaram <ra...@googlemail.com>

Reviewers: Ismael Juma <is...@juma.me.uk>, Flavio Junqueira <fp...@apache.org>, Jun Rao <ju...@gmail.com>

Closes #357 from rajinisivaram/KAFKA-2680


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9d8dd9f1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9d8dd9f1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9d8dd9f1

Branch: refs/heads/trunk
Commit: 9d8dd9f104aef3a9db9005d85bc55a15f851d258
Parents: b944356
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Sat Oct 31 09:01:59 2015 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Sat Oct 31 09:01:59 2015 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/common/security/JaasUtils.java | 13 +++----------
 core/src/main/scala/kafka/admin/ConfigCommand.scala |  2 +-
 .../scala/kafka/admin/ConsumerGroupCommand.scala    |  2 +-
 .../PreferredReplicaLeaderElectionCommand.scala     |  2 +-
 .../kafka/admin/ReassignPartitionsCommand.scala     |  2 +-
 core/src/main/scala/kafka/admin/TopicCommand.scala  |  2 +-
 .../main/scala/kafka/admin/ZkSecurityMigrator.scala | 14 ++++----------
 .../kafka/consumer/ZookeeperConsumerConnector.scala |  2 +-
 .../kafka/security/auth/SimpleAclAuthorizer.scala   |  2 +-
 core/src/main/scala/kafka/server/KafkaServer.scala  |  2 +-
 .../scala/kafka/server/ZookeeperLeaderElector.scala |  2 +-
 .../scala/kafka/tools/ConsumerOffsetChecker.scala   |  2 +-
 .../main/scala/kafka/tools/ExportZkOffsets.scala    |  2 +-
 .../main/scala/kafka/tools/ImportZkOffsets.scala    |  2 +-
 .../main/scala/kafka/tools/UpdateOffsetsInZK.scala  |  2 +-
 .../scala/kafka/tools/VerifyConsumerRebalance.scala |  2 +-
 .../kafka/security/auth/ZkAuthorizationTest.scala   | 16 ++++++++++------
 .../test/scala/unit/kafka/zk/ZKEphemeralTest.scala  | 14 ++++++++------
 .../scala/unit/kafka/zk/ZooKeeperTestHarness.scala  |  3 ++-
 19 files changed, 41 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9d8dd9f1/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java b/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java
index c081a76..0467a09 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java
@@ -22,8 +22,6 @@ import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.io.IOException;
 import java.io.File;
-import java.net.URI;
-import java.security.URIParameter;
 
 import org.apache.kafka.common.KafkaException;
 import org.slf4j.Logger;
@@ -85,11 +83,12 @@ public class JaasUtils {
         return (String) getDefaultRealmMethod.invoke(kerbConf, new Object[0]);
     }
 
-    public static boolean isZkSecurityEnabled(String loginConfigFile) {
+    public static boolean isZkSecurityEnabled() {
         boolean isSecurityEnabled = false;
         boolean zkSaslEnabled = Boolean.parseBoolean(System.getProperty(ZK_SASL_CLIENT, "true"));
         String zkLoginContextName = System.getProperty(ZK_LOGIN_CONTEXT_NAME_KEY, "Client");
 
+        String loginConfigFile = System.getProperty(JAVA_LOGIN_CONFIG_PARAM);
         if (loginConfigFile != null && loginConfigFile.length() > 0) {
             File configFile = new File(loginConfigFile);
             if (!configFile.canRead()) {
@@ -97,8 +96,7 @@ public class JaasUtils {
             }
                 
             try {
-                URI configUri = configFile.toURI();
-                Configuration loginConf = Configuration.getInstance("JavaLoginConfig", new URIParameter(configUri));
+                Configuration loginConf = Configuration.getConfiguration();
                 isSecurityEnabled = loginConf.getAppConfigurationEntry(zkLoginContextName) != null;
             } catch (Exception e) {
                 throw new KafkaException(e);
@@ -110,11 +108,6 @@ public class JaasUtils {
                 throw new KafkaException("Exception while determining if ZooKeeper is secure");
             }
         }
-        /*
-         * Tests fail if we don't reset the login configuration. It is unclear
-         * what is actually triggering this bug.
-         */
-        Configuration.setConfiguration(null);
 
         return isSecurityEnabled;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9d8dd9f1/core/src/main/scala/kafka/admin/ConfigCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 3e9293c..82a6612 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -48,7 +48,7 @@ object ConfigCommand {
     val zkUtils = ZkUtils(opts.options.valueOf(opts.zkConnectOpt),
                           30000,
                           30000,
-                          JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)))
+                          JaasUtils.isZkSecurityEnabled())
 
     try {
       if (opts.options.has(opts.alterOpt))

http://git-wip-us.apache.org/repos/asf/kafka/blob/9d8dd9f1/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 8efbb2a..a30c12d 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -52,7 +52,7 @@ object ConsumerGroupCommand {
     val zkUtils = ZkUtils(opts.options.valueOf(opts.zkConnectOpt), 
                           30000,
                           30000,
-                          JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)))
+                          JaasUtils.isZkSecurityEnabled())
 
     try {
       if (opts.options.has(opts.listOpt))

http://git-wip-us.apache.org/repos/asf/kafka/blob/9d8dd9f1/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
index e74fcb6..d194eca 100755
--- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
+++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
@@ -58,7 +58,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
       zkUtils = ZkUtils(zkConnect, 
                         30000,
                         30000,
-                        JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)))
+                        JaasUtils.isZkSecurityEnabled())
       val partitionsForPreferredReplicaElection =
         if (!options.has(jsonFileOpt))
           zkUtils.getAllPartitions()

http://git-wip-us.apache.org/repos/asf/kafka/blob/9d8dd9f1/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index 10182f6..13e423d 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -42,7 +42,7 @@ object ReassignPartitionsCommand extends Logging {
     val zkUtils = ZkUtils(zkConnect, 
                           30000,
                           30000,
-                          JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)))
+                          JaasUtils.isZkSecurityEnabled())
     try {
       if(opts.options.has(opts.verifyOpt))
         verifyAssignment(zkUtils, opts)

http://git-wip-us.apache.org/repos/asf/kafka/blob/9d8dd9f1/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index ed54aee..51b4957 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -53,7 +53,7 @@ object TopicCommand extends Logging {
     val zkUtils = ZkUtils(opts.options.valueOf(opts.zkConnectOpt), 
                           30000,
                           30000,
-                          JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)))
+                          JaasUtils.isZkSecurityEnabled())
     var exitCode = 0
     try {
       if(opts.options.has(opts.createOpt))

http://git-wip-us.apache.org/repos/asf/kafka/blob/9d8dd9f1/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
index e3ab7f2..8e2f040 100644
--- a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
+++ b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
@@ -70,7 +70,6 @@ object ZkSecurityMigrator extends Logging {
     val parser = new OptionParser()
     val zkAclOpt = parser.accepts("zookeeper.acl", "Indicates whether to make the Kafka znodes in ZooKeeper secure or unsecure."
         + " The options are 'secure' and 'unsecure'").withRequiredArg().ofType(classOf[String])
-    val jaasFileOpt = parser.accepts("jaas.file", "JAAS Config file.").withOptionalArg().ofType(classOf[String])
     val zkUrlOpt = parser.accepts("zookeeper.connect", "Sets the ZooKeeper connect string (ensemble). This parameter " +
       "takes a comma-separated list of host:port pairs.").withRequiredArg().defaultsTo("localhost:2181").
       ofType(classOf[String])
@@ -84,19 +83,14 @@ object ZkSecurityMigrator extends Logging {
     if (options.has(helpOpt))
       CommandLineUtils.printUsageAndDie(parser, usageMessage)
 
-    if ((jaasFile == null) && !options.has(jaasFileOpt)) {
-     val errorMsg = ("No JAAS configuration file has been specified. Please make sure that you have set either " + 
-                    "the system property %s or the option %s".format(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, "--jaas.file")) 
+    if ((jaasFile == null)) {
+     val errorMsg = ("No JAAS configuration file has been specified. Please make sure that you have set " +
+                    "the system property %s".format(JaasUtils.JAVA_LOGIN_CONFIG_PARAM))
      System.out.println("ERROR: %s".format(errorMsg))
      throw new IllegalArgumentException("Incorrect configuration")
     }
 
-    if (jaasFile == null) {
-      jaasFile = options.valueOf(jaasFileOpt)
-      System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasFile)
-    }
-
-    if (!JaasUtils.isZkSecurityEnabled(jaasFile)) {
+    if (!JaasUtils.isZkSecurityEnabled()) {
       val errorMsg = "Security isn't enabled, most likely the file isn't set properly: %s".format(jaasFile)
       System.out.println("ERROR: %s".format(errorMsg))
       throw new IllegalArgumentException("Incorrect configuration") 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9d8dd9f1/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index f39b9a1..2c5432e 100755
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -181,7 +181,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     zkUtils = ZkUtils(config.zkConnect,
                       config.zkSessionTimeoutMs,
                       config.zkConnectionTimeoutMs,
-                      JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)))
+                      JaasUtils.isZkSecurityEnabled())
   }
 
   // Blocks until the offset manager is located and a channel is established to it.

http://git-wip-us.apache.org/repos/asf/kafka/blob/9d8dd9f1/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index 9b4314e..7bfb092 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -96,7 +96,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
     zkUtils = ZkUtils(zkUrl,
                       zkConnectionTimeoutMs,
                       zkSessionTimeOutMs,
-                      JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)))
+                      JaasUtils.isZkSecurityEnabled())
     zkUtils.makeSurePersistentPathExists(SimpleAclAuthorizer.AclZkPath)
 
     loadCache()

http://git-wip-us.apache.org/repos/asf/kafka/blob/9d8dd9f1/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index c6ea84e..732eb55 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -260,7 +260,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
         ""
     }
 
-    val secureAclsEnabled = JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)) && config.zkEnableSecureAcls
+    val secureAclsEnabled = JaasUtils.isZkSecurityEnabled() && config.zkEnableSecureAcls
     
     if(config.zkEnableSecureAcls && !secureAclsEnabled) {
       throw new java.lang.SecurityException("zkEnableSecureAcls is true, but the verification of the JAAS login file failed.")    

http://git-wip-us.apache.org/repos/asf/kafka/blob/9d8dd9f1/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
index e4e1e9c..5c487bf 100755
--- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
+++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
@@ -77,7 +77,7 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext,
       val zkCheckedEphemeral = new ZKCheckedEphemeral(electionPath,
                                                       electString,
                                                       controllerContext.zkUtils.zkConnection.getZookeeper,
-                                                      JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)))
+                                                      JaasUtils.isZkSecurityEnabled())
       zkCheckedEphemeral.create()
       info(brokerId + " successfully elected as leader")
       leaderId = brokerId

http://git-wip-us.apache.org/repos/asf/kafka/blob/9d8dd9f1/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
index 87d9fb7..8af7614 100644
--- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
@@ -155,7 +155,7 @@ object ConsumerOffsetChecker extends Logging {
       zkUtils = ZkUtils(zkConnect,
                         30000,
                         30000,
-                        JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)))
+                        JaasUtils.isZkSecurityEnabled())
 
       val topicList = topics match {
         case Some(x) => x.split(",").view.toList

http://git-wip-us.apache.org/repos/asf/kafka/blob/9d8dd9f1/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
index 75d4fd1..ccccae5 100644
--- a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
+++ b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
@@ -80,7 +80,7 @@ object ExportZkOffsets extends Logging {
       zkUtils = ZkUtils(zkConnect,
                         30000,
                         30000,
-                        JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)))
+                        JaasUtils.isZkSecurityEnabled())
       
       var consumerGroups: Seq[String] = null
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9d8dd9f1/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
index 38a71ae..60d48fa 100644
--- a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
+++ b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
@@ -69,7 +69,7 @@ object ImportZkOffsets extends Logging {
     val zkConnect           = options.valueOf(zkConnectOpt)
     val partitionOffsetFile = options.valueOf(inFileOpt)
 
-    val zkUtils = ZkUtils(zkConnect, 30000, 30000, JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)))
+    val zkUtils = ZkUtils(zkConnect, 30000, 30000, JaasUtils.isZkSecurityEnabled())
     val partitionOffsets: Map[String,String] = getPartitionOffsetsFromFile(partitionOffsetFile)
 
     updateZkOffsets(zkUtils, partitionOffsets)

http://git-wip-us.apache.org/repos/asf/kafka/blob/9d8dd9f1/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala b/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
index 95dd2a6..96a33b1 100755
--- a/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
+++ b/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
@@ -38,7 +38,7 @@ object UpdateOffsetsInZK {
       usage
     val config = new ConsumerConfig(Utils.loadProps(args(1)))
     val zkUtils = ZkUtils(config.zkConnect, config.zkSessionTimeoutMs,
-        config.zkConnectionTimeoutMs, JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)))
+        config.zkConnectionTimeoutMs, JaasUtils.isZkSecurityEnabled())
     args(0) match {
       case Earliest => getAndSetOffsets(zkUtils, OffsetRequest.EarliestTime, config, args(2))
       case Latest => getAndSetOffsets(zkUtils, OffsetRequest.LatestTime, config, args(2))

http://git-wip-us.apache.org/repos/asf/kafka/blob/9d8dd9f1/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
index 5a505c6..3077896 100644
--- a/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
+++ b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
@@ -53,7 +53,7 @@ object VerifyConsumerRebalance extends Logging {
       zkUtils = ZkUtils(zkConnect,
                         30000,
                         30000, 
-                        JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)))
+                        JaasUtils.isZkSecurityEnabled())
 
       debug("zkConnect = %s; group = %s".format(zkConnect, group))
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9d8dd9f1/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
index 7b8ba74..c4e4299 100644
--- a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
@@ -20,13 +20,14 @@ package kafka.security.auth
 import kafka.admin.ZkSecurityMigrator
 import kafka.utils.{Logging, ZkUtils}
 import kafka.zk.ZooKeeperTestHarness
-import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.KafkaException
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.zookeeper.data.{ACL, Stat}
 import org.junit.Assert._
 import org.junit.{After, Before, BeforeClass, Test}
 import scala.collection.JavaConverters._
 import scala.util.{Try, Success, Failure}
+import javax.security.auth.login.Configuration
 
 
 class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging{
@@ -34,6 +35,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging{
   val authProvider: String = "zookeeper.authProvider.1"
   @Before
   override def setUp() {
+    Configuration.setConfiguration(null)
     System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasFile)
     System.setProperty(authProvider, "org.apache.zookeeper.server.auth.SASLAuthenticationProvider")
     super.setUp()
@@ -52,10 +54,12 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging{
    */
   @Test
   def testIsZkSecurityEnabled() {
-    assertTrue(JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)))
-    assertFalse(JaasUtils.isZkSecurityEnabled(""))
-    try {
-      JaasUtils.isZkSecurityEnabled("no-such-file-exists.conf")
+    assertTrue(JaasUtils.isZkSecurityEnabled())
+    System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
+    assertFalse(JaasUtils.isZkSecurityEnabled())
+    try {     
+      System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, "no-such-file-exists.conf")
+      JaasUtils.isZkSecurityEnabled()
       fail("Should have thrown an exception")
     } catch {
       case e: KafkaException => {
@@ -312,4 +316,4 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging{
         }
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9d8dd9f1/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
index 0486313..fb53d77 100644
--- a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
@@ -55,9 +55,10 @@ class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness {
   @Before
   override def setUp() {
     if(secure) {
+      Configuration.setConfiguration(null)
       System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasFile)
       System.setProperty(authProvider, "org.apache.zookeeper.server.auth.SASLAuthenticationProvider")
-      if(!JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM))) {
+      if(!JaasUtils.isZkSecurityEnabled()) {
         fail("Secure access not enabled")
      }
     }
@@ -69,12 +70,13 @@ class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness {
     super.tearDown
     System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
     System.clearProperty(authProvider)
+    Configuration.setConfiguration(null)
   }
   
   @Test
   def testEphemeralNodeCleanup = {
     val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
-    var zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, JaasUtils.isZkSecurityEnabled(confFile))
+    var zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, JaasUtils.isZkSecurityEnabled())
 
     try {
       zkUtils.createEphemeralPathExpectConflict("/tmp/zktest", "node created")
@@ -86,7 +88,7 @@ class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness {
     testData = zkUtils.readData("/tmp/zktest")._1
     Assert.assertNotNull(testData)
     zkUtils.close
-    zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, JaasUtils.isZkSecurityEnabled(confFile))
+    zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, JaasUtils.isZkSecurityEnabled())
     val nodeExists = zkUtils.pathExists("/tmp/zktest")
     Assert.assertFalse(nodeExists)
   }
@@ -108,7 +110,7 @@ class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness {
  
   private def testCreation(path: String) {
     val zk = zkUtils.zkConnection.getZookeeper
-    val zwe = new ZKCheckedEphemeral(path, "", zk, JaasUtils.isZkSecurityEnabled(confFile))
+    val zwe = new ZKCheckedEphemeral(path, "", zk, JaasUtils.isZkSecurityEnabled())
     var created = false
     var counter = 10
 
@@ -137,7 +139,7 @@ class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness {
     //Creates a second session
     val (_, zkConnection2) = ZkUtils.createZkClientAndConnection(zkConnect, zkSessionTimeoutMs, zkConnectionTimeout)
     val zk2 = zkConnection2.getZookeeper
-    var zwe = new ZKCheckedEphemeral(path, "", zk2, JaasUtils.isZkSecurityEnabled(confFile))
+    var zwe = new ZKCheckedEphemeral(path, "", zk2, JaasUtils.isZkSecurityEnabled())
 
     // Creates znode for path in the first session
     zk1.create(path, Array[Byte](), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL)
@@ -164,7 +166,7 @@ class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness {
     // Creates znode for path in the first session
     zk.create(path, Array[Byte](), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL)
     
-    var zwe = new ZKCheckedEphemeral(path, "", zk, JaasUtils.isZkSecurityEnabled(confFile))
+    var zwe = new ZKCheckedEphemeral(path, "", zk, JaasUtils.isZkSecurityEnabled())
     //Bootstraps the ZKWatchedEphemeral object
     var gotException = false;
     try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9d8dd9f1/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index f567555..d618ba6 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -64,7 +64,7 @@ trait ZooKeeperTestHarness extends JUnitSuite with Logging {
   def setUp() {
     zookeeper = new EmbeddedZookeeper()
     zkPort = zookeeper.port
-    zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, "")))
+    zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, JaasUtils.isZkSecurityEnabled())
   }
 
   @After
@@ -85,5 +85,6 @@ trait ZooKeeperTestHarness extends JUnitSuite with Logging {
         }
       }
     }
+    Configuration.setConfiguration(null)
   }
 }