You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/05/24 21:03:05 UTC

kafka git commit: KAFKA-1737; Enforce ZKSerializer while creating ZkClient; reviewed by Guozhang Wang

Repository: kafka
Updated Branches:
  refs/heads/trunk 467736c7a -> 43b92f8b1


KAFKA-1737; Enforce ZKSerializer while creating ZkClient; reviewed by Guozhang Wang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/43b92f8b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/43b92f8b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/43b92f8b

Branch: refs/heads/trunk
Commit: 43b92f8b1ce8140c432edf11b0c842f5fbe04120
Parents: 467736c
Author: Vivek Madani <vi...@gmail.com>
Authored: Sun May 24 12:02:32 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Sun May 24 12:02:32 2015 -0700

----------------------------------------------------------------------
 .../kafka/admin/ConsumerGroupCommand.scala      |  2 +-
 .../PreferredReplicaLeaderElectionCommand.scala |  2 +-
 .../kafka/admin/ReassignPartitionsCommand.scala |  2 +-
 .../main/scala/kafka/admin/TopicCommand.scala   |  2 +-
 .../consumer/ZookeeperConsumerConnector.scala   |  2 +-
 .../consumer/ZookeeperTopicEventWatcher.scala   |  2 +-
 .../main/scala/kafka/server/KafkaServer.scala   |  4 +--
 .../scala/kafka/tools/ConsoleConsumer.scala     |  2 +-
 .../kafka/tools/ConsumerOffsetChecker.scala     |  2 +-
 .../scala/kafka/tools/ExportZkOffsets.scala     |  4 +--
 .../scala/kafka/tools/ImportZkOffsets.scala     |  4 +--
 .../scala/kafka/tools/UpdateOffsetsInZK.scala   |  6 ++--
 .../kafka/tools/VerifyConsumerRebalance.scala   |  4 +--
 core/src/main/scala/kafka/utils/ZkUtils.scala   |  9 ++++--
 .../test/scala/other/kafka/DeleteZKPath.scala   |  5 ++-
 .../scala/other/kafka/TestOffsetManager.scala   |  4 +--
 .../ZookeeperConsumerConnectorTest.scala        |  2 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala |  2 +-
 .../scala/unit/kafka/zk/ZKEphemeralTest.scala   |  8 ++---
 .../test/scala/unit/kafka/zk/ZKPathTest.scala   | 34 ++++++++------------
 .../unit/kafka/zk/ZooKeeperTestHarness.scala    |  4 +--
 21 files changed, 50 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/43b92f8b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 1c3b380..6d1c6ab 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -48,7 +48,7 @@ object ConsumerGroupCommand {
 
     opts.checkArgs()
 
-    val zkClient = new ZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000, ZKStringSerializer)
+    val zkClient = ZkUtils.createZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000)
 
     try {
       if (opts.options.has(opts.listOpt))

http://git-wip-us.apache.org/repos/asf/kafka/blob/43b92f8b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
index 3b3cd67..2aa6e62 100755
--- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
+++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
@@ -53,7 +53,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
     var zkClient: ZkClient = null
 
     try {
-      zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
+      zkClient = ZkUtils.createZkClient(zkConnect, 30000, 30000)
       val partitionsForPreferredReplicaElection =
         if (!options.has(jsonFileOpt))
           ZkUtils.getAllPartitions(zkClient)

http://git-wip-us.apache.org/repos/asf/kafka/blob/43b92f8b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index acaa611..912b718 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -38,7 +38,7 @@ object ReassignPartitionsCommand extends Logging {
     CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.zkConnectOpt)
 
     val zkConnect = opts.options.valueOf(opts.zkConnectOpt)
-    var zkClient: ZkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
+    var zkClient: ZkClient = ZkUtils.createZkClient(zkConnect, 30000, 30000)
     try {
       if(opts.options.has(opts.verifyOpt))
         verifyAssignment(zkClient, opts)

http://git-wip-us.apache.org/repos/asf/kafka/blob/43b92f8b/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 8e6f186..dacbdd0 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -47,7 +47,7 @@ object TopicCommand {
 
     opts.checkArgs()
 
-    val zkClient = new ZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000, ZKStringSerializer)
+    val zkClient = ZkUtils.createZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000)
 
     try {
       if(opts.options.has(opts.createOpt))

http://git-wip-us.apache.org/repos/asf/kafka/blob/43b92f8b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 0b0dca1..a7f2acc 100755
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -178,7 +178,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
 
   private def connectZk() {
     info("Connecting to zookeeper instance at " + config.zkConnect)
-    zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
+    zkClient = ZkUtils.createZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
   }
 
   // Blocks until the offset manager is located and a channel is established to it.

http://git-wip-us.apache.org/repos/asf/kafka/blob/43b92f8b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
index f2fa36f..f74823b 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
@@ -18,7 +18,7 @@
 package kafka.consumer
 
 import scala.collection.JavaConversions._
-import kafka.utils.{ZkUtils, ZKStringSerializer, Logging}
+import kafka.utils.{ZkUtils, Logging}
 import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
 import org.apache.zookeeper.Watcher.Event.KeeperState
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/43b92f8b/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index ea6d165..e66710d 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -196,13 +196,13 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
 
     if (chroot.length > 1) {
       val zkConnForChrootCreation = config.zkConnect.substring(0, config.zkConnect.indexOf("/"))
-      val zkClientForChrootCreation = new ZkClient(zkConnForChrootCreation, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
+      val zkClientForChrootCreation = ZkUtils.createZkClient(zkConnForChrootCreation, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
       ZkUtils.makeSurePersistentPathExists(zkClientForChrootCreation, chroot)
       info("Created zookeeper path " + chroot)
       zkClientForChrootCreation.close()
     }
 
-    val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
+    val zkClient = ZkUtils.createZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
     ZkUtils.setupCommonPaths(zkClient)
     zkClient
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/43b92f8b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index bba3990..a3bee58 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -209,7 +209,7 @@ object ConsoleConsumer extends Logging {
 
   def checkZkPathExists(zkUrl: String, path: String): Boolean = {
     try {
-      val zk = new ZkClient(zkUrl, 30*1000,30*1000, ZKStringSerializer)
+      val zk = ZkUtils.createZkClient(zkUrl, 30*1000,30*1000);
       zk.exists(path)
     } catch {
       case _: Throwable => false

http://git-wip-us.apache.org/repos/asf/kafka/blob/43b92f8b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
index d2bac85..ad64cee 100644
--- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
@@ -149,7 +149,7 @@ object ConsumerOffsetChecker extends Logging {
     var zkClient: ZkClient = null
     var channel: BlockingChannel = null
     try {
-      zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
+      zkClient = ZkUtils.createZkClient(zkConnect, 30000, 30000)
 
       val topicList = topics match {
         case Some(x) => x.split(",").view.toList

http://git-wip-us.apache.org/repos/asf/kafka/blob/43b92f8b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
index ce14bbc..7b52fe4 100644
--- a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
+++ b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
@@ -19,7 +19,7 @@ package kafka.tools
 
 import java.io.FileWriter
 import joptsimple._
-import kafka.utils.{Logging, ZkUtils, ZKStringSerializer, ZKGroupTopicDirs, CommandLineUtils}
+import kafka.utils.{Logging, ZkUtils, ZKGroupTopicDirs, CommandLineUtils}
 import org.I0Itec.zkclient.ZkClient
 
 
@@ -76,7 +76,7 @@ object ExportZkOffsets extends Logging {
     val fileWriter : FileWriter  = new FileWriter(outfile)
     
     try {
-      zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
+      zkClient = ZkUtils.createZkClient(zkConnect, 30000, 30000)
       
       var consumerGroups: Seq[String] = null
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/43b92f8b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
index 598350d..b56f587 100644
--- a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
+++ b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
@@ -20,7 +20,7 @@ package kafka.tools
 import java.io.BufferedReader
 import java.io.FileReader
 import joptsimple._
-import kafka.utils.{Logging, ZkUtils,ZKStringSerializer, CommandLineUtils}
+import kafka.utils.{Logging, ZkUtils, CommandLineUtils}
 import org.I0Itec.zkclient.ZkClient
 
 
@@ -68,7 +68,7 @@ object ImportZkOffsets extends Logging {
     val zkConnect           = options.valueOf(zkConnectOpt)
     val partitionOffsetFile = options.valueOf(inFileOpt)
 
-    val zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
+    val zkClient = ZkUtils.createZkClient(zkConnect, 30000, 30000)
     val partitionOffsets: Map[String,String] = getPartitionOffsetsFromFile(partitionOffsetFile)
 
     updateZkOffsets(zkClient, partitionOffsets)

http://git-wip-us.apache.org/repos/asf/kafka/blob/43b92f8b/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala b/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
index 9235ed9..9942686 100755
--- a/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
+++ b/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
@@ -21,7 +21,7 @@ import org.I0Itec.zkclient.ZkClient
 import kafka.consumer.{SimpleConsumer, ConsumerConfig}
 import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest}
 import kafka.common.{TopicAndPartition, KafkaException}
-import kafka.utils.{ZKGroupTopicDirs, ZkUtils, ZKStringSerializer, CoreUtils}
+import kafka.utils.{ZKGroupTopicDirs, ZkUtils, CoreUtils}
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.utils.Utils
 
@@ -36,8 +36,8 @@ object UpdateOffsetsInZK {
     if(args.length < 3)
       usage
     val config = new ConsumerConfig(Utils.loadProps(args(1)))
-    val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs,
-        config.zkConnectionTimeoutMs, ZKStringSerializer)
+    val zkClient = ZkUtils.createZkClient(config.zkConnect, config.zkSessionTimeoutMs,
+        config.zkConnectionTimeoutMs)
     args(0) match {
       case Earliest => getAndSetOffsets(zkClient, OffsetRequest.EarliestTime, config, args(2))
       case Latest => getAndSetOffsets(zkClient, OffsetRequest.LatestTime, config, args(2))

http://git-wip-us.apache.org/repos/asf/kafka/blob/43b92f8b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
index 4fb519b..db2721f 100644
--- a/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
+++ b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
@@ -19,7 +19,7 @@ package kafka.tools
 
 import joptsimple.OptionParser
 import org.I0Itec.zkclient.ZkClient
-import kafka.utils.{Logging, ZKGroupTopicDirs, ZkUtils, ZKStringSerializer, CommandLineUtils}
+import kafka.utils.{Logging, ZKGroupTopicDirs, ZkUtils, CommandLineUtils}
 
 object VerifyConsumerRebalance extends Logging {
   def main(args: Array[String]) {
@@ -48,7 +48,7 @@ object VerifyConsumerRebalance extends Logging {
 
     var zkClient: ZkClient = null
     try {
-      zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
+      zkClient = ZkUtils.createZkClient(zkConnect, 30000, 30000)
 
       debug("zkConnect = %s; group = %s".format(zkConnect, group))
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/43b92f8b/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 2618dd3..78475e3 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -460,7 +460,7 @@ object ZkUtils extends Logging {
 
   def maybeDeletePath(zkUrl: String, dir: String) {
     try {
-      val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer)
+      val zk = createZkClient(zkUrl, 30*1000, 30*1000)
       zk.deleteRecursive(dir)
       zk.close()
     } catch {
@@ -781,9 +781,14 @@ object ZkUtils extends Logging {
       }
     }
   }
+
+  def createZkClient(zkUrl: String, sessionTimeout: Int, connectionTimeout: Int): ZkClient = {
+    val zkClient = new ZkClient(zkUrl, sessionTimeout, connectionTimeout, ZKStringSerializer)
+    zkClient    
+  }
 }
 
-object ZKStringSerializer extends ZkSerializer {
+private object ZKStringSerializer extends ZkSerializer {
 
   @throws(classOf[ZkMarshallingError])
   def serialize(data : Object) : Array[Byte] = data.asInstanceOf[String].getBytes("UTF-8")

http://git-wip-us.apache.org/repos/asf/kafka/blob/43b92f8b/core/src/test/scala/other/kafka/DeleteZKPath.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/DeleteZKPath.scala b/core/src/test/scala/other/kafka/DeleteZKPath.scala
index 33c3ef8..fb8ab9f 100755
--- a/core/src/test/scala/other/kafka/DeleteZKPath.scala
+++ b/core/src/test/scala/other/kafka/DeleteZKPath.scala
@@ -18,7 +18,7 @@
 package kafka
 
 import consumer.ConsumerConfig
-import utils.{ZKStringSerializer, ZkUtils}
+import utils.ZkUtils
 import org.I0Itec.zkclient.ZkClient
 import org.apache.kafka.common.utils.Utils
 
@@ -32,8 +32,7 @@ object DeleteZKPath {
     val config = new ConsumerConfig(Utils.loadProps(args(0)))
     val zkPath = args(1)
 
-    val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
-      ZKStringSerializer)
+    val zkClient = ZkUtils.createZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
 
     try {
       ZkUtils.deletePathRecursive(zkClient, zkPath);

http://git-wip-us.apache.org/repos/asf/kafka/blob/43b92f8b/core/src/test/scala/other/kafka/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestOffsetManager.scala b/core/src/test/scala/other/kafka/TestOffsetManager.scala
index 9881bd3..4e90534 100644
--- a/core/src/test/scala/other/kafka/TestOffsetManager.scala
+++ b/core/src/test/scala/other/kafka/TestOffsetManager.scala
@@ -2,7 +2,7 @@ package other.kafka
 
 import org.I0Itec.zkclient.ZkClient
 import kafka.api._
-import kafka.utils.{ShutdownableThread, ZKStringSerializer}
+import kafka.utils.{ZkUtils, ShutdownableThread}
 import org.apache.kafka.common.protocol.SecurityProtocol
 import scala.collection._
 import kafka.client.ClientUtils
@@ -238,7 +238,7 @@ object TestOffsetManager {
     var fetchThread: FetchThread = null
     var statsThread: StatsThread = null
     try {
-      zkClient = new ZkClient(zookeeper, 6000, 2000, ZKStringSerializer)
+      zkClient = ZkUtils.createZkClient(zookeeper, 6000, 2000)
       commitThreads = (0 to (threadCount-1)).map { threadId =>
         new CommitThread(threadId, partitionCount, commitIntervalMs, zkClient)
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/43b92f8b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index 7f9fca3..359b0f5 100644
--- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -316,7 +316,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
   }
 
   def testLeaderSelectionForPartition() {
-    val zkClient = new ZkClient(zkConnect, 6000, 30000, ZKStringSerializer)
+    val zkClient = ZkUtils.createZkClient(zkConnect, 6000, 30000)
 
     // create topic topic1 with 1 partition on broker 0
     createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = servers)

http://git-wip-us.apache.org/repos/asf/kafka/blob/43b92f8b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index faae0e9..17e9fe4 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -448,7 +448,7 @@ object TestUtils extends Logging {
   }
 
   def updateConsumerOffset(config : ConsumerConfig, path : String, offset : Long) = {
-    val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
+    val zkClient = ZkUtils.createZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
     ZkUtils.updatePersistentPath(zkClient, path, offset.toString)
 
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/43b92f8b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
index 85eec6f..2be1619 100644
--- a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
@@ -19,7 +19,7 @@ package kafka.zk
 
 import kafka.consumer.ConsumerConfig
 import org.I0Itec.zkclient.ZkClient
-import kafka.utils.{ZkUtils, ZKStringSerializer}
+import kafka.utils.ZkUtils
 import kafka.utils.TestUtils
 import org.junit.Assert
 import org.scalatest.junit.JUnit3Suite
@@ -29,8 +29,7 @@ class ZKEphemeralTest extends JUnit3Suite with ZooKeeperTestHarness {
 
   def testEphemeralNodeCleanup = {
     val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
-    var zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
-                                ZKStringSerializer)
+    var zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
 
     try {
       ZkUtils.createEphemeralPathExpectConflict(zkClient, "/tmp/zktest", "node created")
@@ -42,8 +41,7 @@ class ZKEphemeralTest extends JUnit3Suite with ZooKeeperTestHarness {
     testData = ZkUtils.readData(zkClient, "/tmp/zktest")._1
     Assert.assertNotNull(testData)
     zkClient.close
-    zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
-                                ZKStringSerializer)
+    zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
     val nodeExists = ZkUtils.pathExists(zkClient, "/tmp/zktest")
     Assert.assertFalse(nodeExists)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/43b92f8b/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala b/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
index a2d062f..64c3ba2 100644
--- a/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
@@ -19,7 +19,7 @@ package unit.kafka.zk
 
 import junit.framework.Assert
 import kafka.consumer.ConsumerConfig
-import kafka.utils.{ZkPath, TestUtils, ZKStringSerializer, ZkUtils}
+import kafka.utils.{ZkPath, TestUtils, ZkUtils}
 import kafka.zk.ZooKeeperTestHarness
 import org.I0Itec.zkclient.ZkClient
 import org.apache.kafka.common.config.ConfigException
@@ -34,9 +34,8 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness {
   def testCreatePersistentPathThrowsException {
     val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot,
       "test", "1"))
-    var zkClient = new ZkClient(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
-      config.zkConnectionTimeoutMs,
-      ZKStringSerializer)
+    var zkClient = ZkUtils.createZkClient(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
+      config.zkConnectionTimeoutMs)
     try {
       ZkPath.resetNamespaceCheckedState
       ZkUtils.createPersistentPath(zkClient, path)
@@ -49,8 +48,7 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness {
 
   def testCreatePersistentPath {
     val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
-    var zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
-      ZKStringSerializer)
+    var zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
     try {
       ZkPath.resetNamespaceCheckedState
       ZkUtils.createPersistentPath(zkClient, path)
@@ -64,9 +62,8 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness {
   def testMakeSurePersistsPathExistsThrowsException {
     val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot,
       "test", "1"))
-    var zkClient = new ZkClient(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
-      config.zkConnectionTimeoutMs,
-      ZKStringSerializer)
+    var zkClient = ZkUtils.createZkClient(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
+      config.zkConnectionTimeoutMs)
     try {
       ZkPath.resetNamespaceCheckedState
       ZkUtils.makeSurePersistentPathExists(zkClient, path)
@@ -79,8 +76,7 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness {
 
   def testMakeSurePersistsPathExists {
     val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
-    var zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
-      ZKStringSerializer)
+    var zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
     try {
       ZkPath.resetNamespaceCheckedState
       ZkUtils.makeSurePersistentPathExists(zkClient, path)
@@ -94,9 +90,8 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness {
   def testCreateEphemeralPathThrowsException {
     val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot,
       "test", "1"))
-    var zkClient = new ZkClient(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
-      config.zkConnectionTimeoutMs,
-      ZKStringSerializer)
+    var zkClient = ZkUtils.createZkClient(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
+      config.zkConnectionTimeoutMs)
     try {
       ZkPath.resetNamespaceCheckedState
       ZkUtils.createEphemeralPathExpectConflict(zkClient, path, "somedata")
@@ -109,8 +104,7 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness {
 
   def testCreateEphemeralPathExists {
     val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
-    var zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
-      ZKStringSerializer)
+    var zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
     try {
       ZkPath.resetNamespaceCheckedState
       ZkUtils.createEphemeralPathExpectConflict(zkClient, path, "somedata")
@@ -124,9 +118,8 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness {
   def testCreatePersistentSequentialThrowsException {
     val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot,
       "test", "1"))
-    var zkClient = new ZkClient(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
-      config.zkConnectionTimeoutMs,
-      ZKStringSerializer)
+    var zkClient = ZkUtils.createZkClient(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
+      config.zkConnectionTimeoutMs)
     try {
       ZkPath.resetNamespaceCheckedState
       ZkUtils.createSequentialPersistentPath(zkClient, path)
@@ -139,8 +132,7 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness {
 
   def testCreatePersistentSequentialExists {
     val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
-    var zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
-      ZKStringSerializer)
+    var zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
 
     var actualPath: String = ""
     try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/43b92f8b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index 86bddea..1f4d10d 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -19,7 +19,7 @@ package kafka.zk
 
 import org.scalatest.junit.JUnit3Suite
 import org.I0Itec.zkclient.ZkClient
-import kafka.utils.{ZKStringSerializer, CoreUtils}
+import kafka.utils.{ZkUtils, CoreUtils}
 
 trait ZooKeeperTestHarness extends JUnit3Suite {
   var zkPort: Int = -1
@@ -34,7 +34,7 @@ trait ZooKeeperTestHarness extends JUnit3Suite {
     super.setUp
     zookeeper = new EmbeddedZookeeper()
     zkPort = zookeeper.port
-    zkClient = new ZkClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, ZKStringSerializer)
+    zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeout, zkConnectionTimeout)
   }
 
   override def tearDown() {