You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2016/05/03 19:55:59 UTC

kafka git commit: KAFKA-3128; Add metrics for ZooKeeper events zookeeper metrics

Repository: kafka
Updated Branches:
  refs/heads/trunk 1f528815d -> c7425be5b


KAFKA-3128; Add metrics for ZooKeeper events zookeeper metrics

Also:
* Remove redundant `time.milliseconds` call in `Sensor.record`
* Clean-up a number of tests and remove a manual test that is no longer required

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Liquan Pei <li...@gmail.com>, Jun Rao <ju...@gmail.com>

Closes #1265 from ijuma/kafka-3128-zookeeper-metrics


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c7425be5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c7425be5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c7425be5

Branch: refs/heads/trunk
Commit: c7425be5be8d0c2786155fbc697d83f80827d084
Parents: 1f52881
Author: Ismael Juma <is...@juma.me.uk>
Authored: Tue May 3 10:55:54 2016 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue May 3 10:55:54 2016 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/common/metrics/Sensor.java |  2 +-
 .../scala/kafka/server/KafkaHealthcheck.scala   | 57 ++++++++++------
 .../main/scala/kafka/server/KafkaServer.scala   |  5 +-
 core/src/main/scala/kafka/utils/ZkUtils.scala   |  2 +-
 .../integration/kafka/api/AdminClientTest.scala |  2 +-
 .../kafka/api/ProducerBounceTest.scala          | 16 ++---
 .../test/scala/other/kafka/DeleteZKPath.scala   | 44 -------------
 .../kafka/integration/PrimitiveApiTest.scala    |  2 +-
 .../ZookeeperConsumerConnectorTest.scala        |  2 +-
 .../unit/kafka/server/ReplicaManagerTest.scala  |  2 +-
 .../server/SessionExpireListenerTest.scala      | 68 ++++++++++++++++++++
 .../unit/kafka/utils/CommandLineUtilsTest.scala |  2 +-
 .../test/scala/unit/kafka/zk/ZKPathTest.scala   | 27 ++++----
 .../unit/kafka/zk/ZooKeeperTestHarness.scala    | 13 ++--
 14 files changed, 136 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c7425be5/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
index 0c5bcb7..098bfa8 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
@@ -96,7 +96,7 @@ public final class Sensor {
      *         bound
      */
     public void record(double value, long timeMs) {
-        this.lastRecordTime = time.milliseconds();
+        this.lastRecordTime = timeMs;
         synchronized (this) {
             // increment all the stats
             for (int i = 0; i < this.stats.size(); i++)

http://git-wip-us.apache.org/repos/asf/kafka/blob/c7425be5/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
index 4e3fc29..117899b 100644
--- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
+++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
@@ -18,15 +18,17 @@
 package kafka.server
 
 import java.net.InetAddress
+import java.util.Locale
+import java.util.concurrent.TimeUnit
 
 import kafka.api.ApiVersion
 import kafka.cluster.EndPoint
+import kafka.metrics.KafkaMetricsGroup
 import kafka.utils._
 import org.I0Itec.zkclient.IZkStateListener
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.zookeeper.Watcher.Event.KeeperState
 
-
 /**
  * This class registers the broker in zookeeper to allow 
  * other brokers and consumers to detect failures. It uses an ephemeral znode with the path:
@@ -35,14 +37,14 @@ import org.apache.zookeeper.Watcher.Event.KeeperState
  * Right now our definition of health is fairly naive. If we register in zk we are healthy, otherwise
  * we are dead.
  */
-class KafkaHealthcheck(private val brokerId: Int,
-                       private val advertisedEndpoints: Map[SecurityProtocol, EndPoint],
-                       private val zkUtils: ZkUtils,
-                       private val rack: Option[String],
-                       private val interBrokerProtocolVersion: ApiVersion) extends Logging {
+class KafkaHealthcheck(brokerId: Int,
+                       advertisedEndpoints: Map[SecurityProtocol, EndPoint],
+                       zkUtils: ZkUtils,
+                       rack: Option[String],
+                       interBrokerProtocolVersion: ApiVersion) extends Logging {
 
-  val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + brokerId
-  val sessionExpireListener = new SessionExpireListener
+  private val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + brokerId
+  private[server] val sessionExpireListener = new SessionExpireListener
 
   def startup() {
     zkUtils.zkClient.subscribeStateChanges(sessionExpireListener)
@@ -70,31 +72,44 @@ class KafkaHealthcheck(private val brokerId: Int,
   }
 
   /**
-   *  When we get a SessionExpired event, we lost all ephemeral nodes and zkclient has reestablished a
-   *  connection for us. We need to re-register this broker in the broker registry.
+   *  When we get a SessionExpired event, it means that we have lost all ephemeral nodes and ZKClient has re-established
+   *  a connection for us. We need to re-register this broker in the broker registry. We rely on `handleStateChanged`
+   *  to record ZooKeeper connection state metrics.
    */
-  class SessionExpireListener() extends IZkStateListener {
+  class SessionExpireListener extends IZkStateListener with KafkaMetricsGroup {
+
+    private[server] val stateToMeterMap = {
+      import KeeperState._
+      val stateToEventTypeMap = Map(
+        Disconnected -> "Disconnects",
+        SyncConnected -> "SyncConnects",
+        AuthFailed -> "AuthFailures",
+        ConnectedReadOnly -> "ReadOnlyConnects",
+        SaslAuthenticated -> "SaslAuthentications",
+        Expired -> "Expires"
+      )
+      stateToEventTypeMap.map { case (state, eventType) =>
+        state -> newMeter(s"ZooKeeper${eventType}PerSec", eventType.toLowerCase(Locale.ROOT), TimeUnit.SECONDS)
+      }
+    }
+
     @throws(classOf[Exception])
-    def handleStateChanged(state: KeeperState) {}
+    override def handleStateChanged(state: KeeperState) {
+      stateToMeterMap.get(state).foreach(_.mark())
+    }
 
-    /**
-     * Called after the zookeeper session has expired and a new session has been created. You would have to re-create
-     * any ephemeral nodes here.
-     *
-     * @throws Exception
-     *             On any error.
-     */
     @throws(classOf[Exception])
-    def handleNewSession() {
+    override def handleNewSession() {
       info("re-registering broker info in ZK for broker " + brokerId)
       register()
       info("done re-registering broker")
       info("Subscribing to %s path to watch for new topics".format(ZkUtils.BrokerTopicsPath))
     }
 
-    override def handleSessionEstablishmentError(error: Throwable): Unit = {
+    override def handleSessionEstablishmentError(error: Throwable) {
       fatal("Could not establish session with zookeeper", error)
     }
+
   }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c7425be5/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 36b52fd..2832ebc 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -96,7 +96,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
   private var shutdownLatch = new CountDownLatch(1)
 
   private val jmxPrefix: String = "kafka.server"
-  private val reporters: java.util.List[MetricsReporter] =  config.metricReporterClasses
+  private val reporters: java.util.List[MetricsReporter] = config.metricReporterClasses
   reporters.add(new JmxReporter(jmxPrefix))
 
   // This exists because the Metrics package from clients has its own Time implementation.
@@ -239,7 +239,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
           else
             (protocol, endpoint)
         }
-        kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils, config.rack, config.interBrokerProtocolVersion)
+        kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils, config.rack,
+          config.interBrokerProtocolVersion)
         kafkaHealthcheck.startup()
 
         // Now that the broker id is successfully registered via KafkaHealthcheck, checkpoint it

http://git-wip-us.apache.org/repos/asf/kafka/blob/c7425be5/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 83ff517..81eb24a 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -924,7 +924,7 @@ object ZkPath {
     isNamespacePresent = true
   }
 
-  def resetNamespaceCheckedState {
+  def resetNamespaceCheckedState() {
     isNamespacePresent = false
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c7425be5/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
index ade1911..7fae81e 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
@@ -84,7 +84,7 @@ class AdminClientTest extends IntegrationTestHarness with Logging {
       !consumers(0).assignment().isEmpty
     }, "Expected non-empty assignment")
 
-    val group= client.describeGroup(groupId)
+    val group = client.describeGroup(groupId)
     assertEquals("consumer", group.protocolType)
     assertEquals("range", group.protocol)
     assertEquals("Stable", group.state)

http://git-wip-us.apache.org/repos/asf/kafka/blob/c7425be5/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
index 369c3b7..5994a1d 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
@@ -51,16 +51,11 @@ class ProducerBounceTest extends KafkaServerTestHarness {
       .map(KafkaConfig.fromProps(_, overridingProps))
   }
 
-  private var consumer1: SimpleConsumer = null
-  private var consumer2: SimpleConsumer = null
-
   private var producer1: KafkaProducer[Array[Byte],Array[Byte]] = null
   private var producer2: KafkaProducer[Array[Byte],Array[Byte]] = null
   private var producer3: KafkaProducer[Array[Byte],Array[Byte]] = null
-  private var producer4: KafkaProducer[Array[Byte],Array[Byte]] = null
 
   private val topic1 = "topic-1"
-  private val topic2 = "topic-2"
 
   @Before
   override def setUp() {
@@ -76,7 +71,6 @@ class ProducerBounceTest extends KafkaServerTestHarness {
     if (producer1 != null) producer1.close
     if (producer2 != null) producer2.close
     if (producer3 != null) producer3.close
-    if (producer4 != null) producer4.close
 
     super.tearDown()
   }
@@ -102,9 +96,8 @@ class ProducerBounceTest extends KafkaServerTestHarness {
         Thread.sleep(2000)
       }
 
-      // Make sure the producer do not see any exception
-      // in returned metadata due to broker failures
-      assertTrue(scheduler.failed == false)
+      // Make sure the producer do not see any exception in returned metadata due to broker failures
+      assertFalse(scheduler.failed)
 
       // Make sure the leader still exists after bouncing brokers
       (0 until numPartitions).foreach(partition => TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic1, partition))
@@ -114,7 +107,7 @@ class ProducerBounceTest extends KafkaServerTestHarness {
 
     // Make sure the producer do not see any exception
     // when draining the left messages on shutdown
-    assertTrue(scheduler.failed == false)
+    assertFalse(scheduler.failed)
 
     // double check that the leader info has been propagated after consecutive bounces
     val newLeaders = (0 until numPartitions).map(i => TestUtils.waitUntilMetadataIsPropagated(servers, topic1, i))
@@ -132,8 +125,7 @@ class ProducerBounceTest extends KafkaServerTestHarness {
     assertEquals("Should have fetched " + scheduler.sent + " unique messages", scheduler.sent, uniqueMessageSize)
   }
 
-  private class ProducerScheduler extends ShutdownableThread("daemon-producer", false)
-  {
+  private class ProducerScheduler extends ShutdownableThread("daemon-producer", false) {
     val numRecords = 1000
     var sent = 0
     var failed = false

http://git-wip-us.apache.org/repos/asf/kafka/blob/c7425be5/core/src/test/scala/other/kafka/DeleteZKPath.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/DeleteZKPath.scala b/core/src/test/scala/other/kafka/DeleteZKPath.scala
deleted file mode 100755
index 202bf43..0000000
--- a/core/src/test/scala/other/kafka/DeleteZKPath.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package kafka
-
-import consumer.ConsumerConfig
-import utils.ZkUtils
-import org.I0Itec.zkclient.ZkClient
-import org.apache.kafka.common.utils.Utils
-
-object DeleteZKPath {
-  def main(args: Array[String]) {
-    if(args.length < 2) {
-      println("USAGE: " + DeleteZKPath.getClass.getName + " consumer.properties zk_path")
-      System.exit(1)
-    }
-
-    val config = new ConsumerConfig(Utils.loadProps(args(0)))
-    val zkPath = args(1)
-    val zkUtils = ZkUtils(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
-
-    try {
-      zkUtils.deletePathRecursive(zkPath);
-      System.out.println(zkPath + " is deleted")
-    } catch {
-      case e: Exception => System.err.println("Path not deleted " + e.printStackTrace())
-    }
-    
-  }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c7425be5/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
index beb5d0e..85e9cad 100755
--- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
@@ -35,7 +35,7 @@ import java.util.Properties
  * End to end tests of the primitive apis against a local server
  */
 @deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0")
-class PrimitiveApiTest extends ProducerConsumerTestHarness with ZooKeeperTestHarness {
+class PrimitiveApiTest extends ProducerConsumerTestHarness {
   val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
 
   def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))

http://git-wip-us.apache.org/repos/asf/kafka/blob/c7425be5/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
index e4c4697..83cce77 100644
--- a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
@@ -37,7 +37,7 @@ import org.apache.log4j.{Level, Logger}
 import org.junit.Assert._
 
 @deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0")
-class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with ZooKeeperTestHarness with Logging {
+class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging {
   val numNodes = 2
   val numParts = 2
   val topic = "topic1"

http://git-wip-us.apache.org/repos/asf/kafka/blob/c7425be5/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index c2c670e..2cdf924 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -58,7 +58,7 @@ class ReplicaManagerTest {
   
   @After
   def tearDown() {
-    metrics.close();
+    metrics.close()
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/c7425be5/core/src/test/scala/unit/kafka/server/SessionExpireListenerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SessionExpireListenerTest.scala b/core/src/test/scala/unit/kafka/server/SessionExpireListenerTest.scala
new file mode 100644
index 0000000..4ffb189
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/SessionExpireListenerTest.scala
@@ -0,0 +1,68 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.server
+
+import kafka.api.ApiVersion
+import kafka.utils.ZkUtils
+import org.I0Itec.zkclient.ZkClient
+import org.apache.zookeeper.Watcher
+import org.easymock.EasyMock
+import org.junit.{Assert, Before, Test}
+import Assert._
+import com.yammer.metrics.Metrics
+import com.yammer.metrics.core.{Meter, Metric, MetricName}
+import org.apache.kafka.common.utils.MockTime
+import scala.collection.JavaConverters._
+
+class SessionExpireListenerTest {
+
+  private var time = new MockTime
+  private val brokerId = 1
+
+  @Test
+  def testSessionExpireListenerMetrics() {
+
+    val metrics = Metrics.defaultRegistry
+
+    def checkMeterCount(name: String, expected: Long) {
+      val meter = metrics.allMetrics.asScala.collectFirst {
+        case (metricName, meter: Meter) if metricName.getName == name => meter
+      }.getOrElse(sys.error(s"Unable to find meter with name $name"))
+      assertEquals("Unexpected meter count", expected, meter.count)
+    }
+
+    val zkClient = EasyMock.mock(classOf[ZkClient])
+    val zkUtils = ZkUtils(zkClient, isZkSecurityEnabled = false)
+    import Watcher._
+    val healthcheck = new KafkaHealthcheck(brokerId, Map.empty, zkUtils, None, ApiVersion.latestVersion)
+
+    val expiresPerSecName = "ZooKeeperExpiresPerSec"
+    val disconnectsPerSecName = "ZooKeeperDisconnectsPerSec"
+    checkMeterCount(expiresPerSecName, 0)
+    checkMeterCount(disconnectsPerSecName, 0)
+
+    healthcheck.sessionExpireListener.handleStateChanged(Event.KeeperState.Expired)
+    checkMeterCount(expiresPerSecName, 1)
+    checkMeterCount(disconnectsPerSecName, 0)
+
+    healthcheck.sessionExpireListener.handleStateChanged(Event.KeeperState.Disconnected)
+    checkMeterCount(expiresPerSecName, 1)
+    checkMeterCount(disconnectsPerSecName, 1)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c7425be5/core/src/test/scala/unit/kafka/utils/CommandLineUtilsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/CommandLineUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/CommandLineUtilsTest.scala
index 6cc868d..50023f8 100644
--- a/core/src/test/scala/unit/kafka/utils/CommandLineUtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/CommandLineUtilsTest.scala
@@ -23,7 +23,7 @@ import org.junit.Test
 class CommandLineUtilsTest {
 
 
-  @Test (expected = classOf[java.lang.IllegalArgumentException])
+  @Test(expected = classOf[java.lang.IllegalArgumentException])
   def testParseEmptyArg() {
     val argArray = Array("my.empty.property=")
     CommandLineUtils.parseKeyValueArgs(argArray, false)

http://git-wip-us.apache.org/repos/asf/kafka/blob/c7425be5/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala b/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
index 92fae02..7ef4550 100644
--- a/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
@@ -25,7 +25,7 @@ import org.junit.Test
 
 class ZKPathTest extends ZooKeeperTestHarness {
 
-  val path: String = "/some_dir"
+  val path = "/some_dir"
   val zkSessionTimeoutMs = 1000
   def zkConnectWithInvalidRoot: String = zkConnect + "/ghost"
 
@@ -33,7 +33,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
   def testCreatePersistentPathThrowsException {
     val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot,
       "test", "1"))
-    var zkUtils = ZkUtils(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
+    val zkUtils = ZkUtils(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
       config.zkConnectionTimeoutMs, false)
     try {
       ZkPath.resetNamespaceCheckedState
@@ -49,7 +49,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
   @Test
   def testCreatePersistentPath {
     val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
-    var zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
+    val zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
     try {
       ZkPath.resetNamespaceCheckedState
       zkUtils.createPersistentPath(path)
@@ -63,10 +63,8 @@ class ZKPathTest extends ZooKeeperTestHarness {
 
   @Test
   def testMakeSurePersistsPathExistsThrowsException {
-    val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot,
-      "test", "1"))
-    var zkUtils = ZkUtils(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
-      config.zkConnectionTimeoutMs, false)
+    val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot, "test", "1"))
+    val zkUtils = ZkUtils(zkConnectWithInvalidRoot, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
     try {
       ZkPath.resetNamespaceCheckedState
       zkUtils.makeSurePersistentPathExists(path)
@@ -81,7 +79,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
   @Test
   def testMakeSurePersistsPathExists {
     val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
-    var zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
+    val zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
     try {
       ZkPath.resetNamespaceCheckedState
       zkUtils.makeSurePersistentPathExists(path)
@@ -95,10 +93,8 @@ class ZKPathTest extends ZooKeeperTestHarness {
 
   @Test
   def testCreateEphemeralPathThrowsException {
-    val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot,
-      "test", "1"))
-    var zkUtils = ZkUtils(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
-      config.zkConnectionTimeoutMs, false)
+    val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot, "test", "1"))
+    val zkUtils = ZkUtils(zkConnectWithInvalidRoot, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
     try {
       ZkPath.resetNamespaceCheckedState
       zkUtils.createEphemeralPathExpectConflict(path, "somedata")
@@ -113,7 +109,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
   @Test
   def testCreateEphemeralPathExists {
     val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
-    var zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
+    val zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
     try {
       ZkPath.resetNamespaceCheckedState
       zkUtils.createEphemeralPathExpectConflict(path, "somedata")
@@ -129,8 +125,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
   def testCreatePersistentSequentialThrowsException {
     val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot,
       "test", "1"))
-    var zkUtils = ZkUtils(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
-      config.zkConnectionTimeoutMs, false)
+    val zkUtils = ZkUtils(zkConnectWithInvalidRoot, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
     try {
       ZkPath.resetNamespaceCheckedState
       zkUtils.createSequentialPersistentPath(path)
@@ -145,7 +140,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
   @Test
   def testCreatePersistentSequentialExists {
     val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
-    var zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
+    val zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
 
     var actualPath: String = ""
     try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c7425be5/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index 95f4e35..0de11cd 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -24,18 +24,19 @@ import org.scalatest.junit.JUnitSuite
 import org.apache.kafka.common.security.JaasUtils
 
 trait ZooKeeperTestHarness extends JUnitSuite with Logging {
-  var zookeeper: EmbeddedZookeeper = null
-  var zkPort: Int = -1
-  var zkUtils: ZkUtils = null
+
   val zkConnectionTimeout = 6000
   val zkSessionTimeout = 6000
-  def zkConnect: String = "127.0.0.1:" + zkPort
-  def confFile: String = System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, "")
+
+  var zkUtils: ZkUtils = null
+  var zookeeper: EmbeddedZookeeper = null
+
+  def zkPort: Int = zookeeper.port
+  def zkConnect: String = s"127.0.0.1:$zkPort"
   
   @Before
   def setUp() {
     zookeeper = new EmbeddedZookeeper()
-    zkPort = zookeeper.port
     zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, JaasUtils.isZkSecurityEnabled())
   }