You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/05/24 21:03:05 UTC
kafka git commit: KAFKA-1737;
Enforce ZKSerializer while creating ZkClient; reviewed by Guozhang Wang
Repository: kafka
Updated Branches:
refs/heads/trunk 467736c7a -> 43b92f8b1
KAFKA-1737; Enforce ZKSerializer while creating ZkClient; reviewed by Guozhang Wang
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/43b92f8b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/43b92f8b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/43b92f8b
Branch: refs/heads/trunk
Commit: 43b92f8b1ce8140c432edf11b0c842f5fbe04120
Parents: 467736c
Author: Vivek Madani <vi...@gmail.com>
Authored: Sun May 24 12:02:32 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Sun May 24 12:02:32 2015 -0700
----------------------------------------------------------------------
.../kafka/admin/ConsumerGroupCommand.scala | 2 +-
.../PreferredReplicaLeaderElectionCommand.scala | 2 +-
.../kafka/admin/ReassignPartitionsCommand.scala | 2 +-
.../main/scala/kafka/admin/TopicCommand.scala | 2 +-
.../consumer/ZookeeperConsumerConnector.scala | 2 +-
.../consumer/ZookeeperTopicEventWatcher.scala | 2 +-
.../main/scala/kafka/server/KafkaServer.scala | 4 +--
.../scala/kafka/tools/ConsoleConsumer.scala | 2 +-
.../kafka/tools/ConsumerOffsetChecker.scala | 2 +-
.../scala/kafka/tools/ExportZkOffsets.scala | 4 +--
.../scala/kafka/tools/ImportZkOffsets.scala | 4 +--
.../scala/kafka/tools/UpdateOffsetsInZK.scala | 6 ++--
.../kafka/tools/VerifyConsumerRebalance.scala | 4 +--
core/src/main/scala/kafka/utils/ZkUtils.scala | 9 ++++--
.../test/scala/other/kafka/DeleteZKPath.scala | 5 ++-
.../scala/other/kafka/TestOffsetManager.scala | 4 +--
.../ZookeeperConsumerConnectorTest.scala | 2 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 2 +-
.../scala/unit/kafka/zk/ZKEphemeralTest.scala | 8 ++---
.../test/scala/unit/kafka/zk/ZKPathTest.scala | 34 ++++++++------------
.../unit/kafka/zk/ZooKeeperTestHarness.scala | 4 +--
21 files changed, 50 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/43b92f8b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 1c3b380..6d1c6ab 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -48,7 +48,7 @@ object ConsumerGroupCommand {
opts.checkArgs()
- val zkClient = new ZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000, ZKStringSerializer)
+ val zkClient = ZkUtils.createZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000)
try {
if (opts.options.has(opts.listOpt))
http://git-wip-us.apache.org/repos/asf/kafka/blob/43b92f8b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
index 3b3cd67..2aa6e62 100755
--- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
+++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
@@ -53,7 +53,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
var zkClient: ZkClient = null
try {
- zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
+ zkClient = ZkUtils.createZkClient(zkConnect, 30000, 30000)
val partitionsForPreferredReplicaElection =
if (!options.has(jsonFileOpt))
ZkUtils.getAllPartitions(zkClient)
http://git-wip-us.apache.org/repos/asf/kafka/blob/43b92f8b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index acaa611..912b718 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -38,7 +38,7 @@ object ReassignPartitionsCommand extends Logging {
CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.zkConnectOpt)
val zkConnect = opts.options.valueOf(opts.zkConnectOpt)
- var zkClient: ZkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
+ var zkClient: ZkClient = ZkUtils.createZkClient(zkConnect, 30000, 30000)
try {
if(opts.options.has(opts.verifyOpt))
verifyAssignment(zkClient, opts)
http://git-wip-us.apache.org/repos/asf/kafka/blob/43b92f8b/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 8e6f186..dacbdd0 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -47,7 +47,7 @@ object TopicCommand {
opts.checkArgs()
- val zkClient = new ZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000, ZKStringSerializer)
+ val zkClient = ZkUtils.createZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000)
try {
if(opts.options.has(opts.createOpt))
http://git-wip-us.apache.org/repos/asf/kafka/blob/43b92f8b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 0b0dca1..a7f2acc 100755
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -178,7 +178,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
private def connectZk() {
info("Connecting to zookeeper instance at " + config.zkConnect)
- zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
+ zkClient = ZkUtils.createZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
}
// Blocks until the offset manager is located and a channel is established to it.
http://git-wip-us.apache.org/repos/asf/kafka/blob/43b92f8b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
index f2fa36f..f74823b 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
@@ -18,7 +18,7 @@
package kafka.consumer
import scala.collection.JavaConversions._
-import kafka.utils.{ZkUtils, ZKStringSerializer, Logging}
+import kafka.utils.{ZkUtils, Logging}
import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
import org.apache.zookeeper.Watcher.Event.KeeperState
http://git-wip-us.apache.org/repos/asf/kafka/blob/43b92f8b/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index ea6d165..e66710d 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -196,13 +196,13 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
if (chroot.length > 1) {
val zkConnForChrootCreation = config.zkConnect.substring(0, config.zkConnect.indexOf("/"))
- val zkClientForChrootCreation = new ZkClient(zkConnForChrootCreation, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
+ val zkClientForChrootCreation = ZkUtils.createZkClient(zkConnForChrootCreation, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
ZkUtils.makeSurePersistentPathExists(zkClientForChrootCreation, chroot)
info("Created zookeeper path " + chroot)
zkClientForChrootCreation.close()
}
- val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
+ val zkClient = ZkUtils.createZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
ZkUtils.setupCommonPaths(zkClient)
zkClient
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/43b92f8b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index bba3990..a3bee58 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -209,7 +209,7 @@ object ConsoleConsumer extends Logging {
def checkZkPathExists(zkUrl: String, path: String): Boolean = {
try {
- val zk = new ZkClient(zkUrl, 30*1000,30*1000, ZKStringSerializer)
+ val zk = ZkUtils.createZkClient(zkUrl, 30*1000,30*1000);
zk.exists(path)
} catch {
case _: Throwable => false
http://git-wip-us.apache.org/repos/asf/kafka/blob/43b92f8b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
index d2bac85..ad64cee 100644
--- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
@@ -149,7 +149,7 @@ object ConsumerOffsetChecker extends Logging {
var zkClient: ZkClient = null
var channel: BlockingChannel = null
try {
- zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
+ zkClient = ZkUtils.createZkClient(zkConnect, 30000, 30000)
val topicList = topics match {
case Some(x) => x.split(",").view.toList
http://git-wip-us.apache.org/repos/asf/kafka/blob/43b92f8b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
index ce14bbc..7b52fe4 100644
--- a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
+++ b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
@@ -19,7 +19,7 @@ package kafka.tools
import java.io.FileWriter
import joptsimple._
-import kafka.utils.{Logging, ZkUtils, ZKStringSerializer, ZKGroupTopicDirs, CommandLineUtils}
+import kafka.utils.{Logging, ZkUtils, ZKGroupTopicDirs, CommandLineUtils}
import org.I0Itec.zkclient.ZkClient
@@ -76,7 +76,7 @@ object ExportZkOffsets extends Logging {
val fileWriter : FileWriter = new FileWriter(outfile)
try {
- zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
+ zkClient = ZkUtils.createZkClient(zkConnect, 30000, 30000)
var consumerGroups: Seq[String] = null
http://git-wip-us.apache.org/repos/asf/kafka/blob/43b92f8b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
index 598350d..b56f587 100644
--- a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
+++ b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
@@ -20,7 +20,7 @@ package kafka.tools
import java.io.BufferedReader
import java.io.FileReader
import joptsimple._
-import kafka.utils.{Logging, ZkUtils,ZKStringSerializer, CommandLineUtils}
+import kafka.utils.{Logging, ZkUtils, CommandLineUtils}
import org.I0Itec.zkclient.ZkClient
@@ -68,7 +68,7 @@ object ImportZkOffsets extends Logging {
val zkConnect = options.valueOf(zkConnectOpt)
val partitionOffsetFile = options.valueOf(inFileOpt)
- val zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
+ val zkClient = ZkUtils.createZkClient(zkConnect, 30000, 30000)
val partitionOffsets: Map[String,String] = getPartitionOffsetsFromFile(partitionOffsetFile)
updateZkOffsets(zkClient, partitionOffsets)
http://git-wip-us.apache.org/repos/asf/kafka/blob/43b92f8b/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala b/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
index 9235ed9..9942686 100755
--- a/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
+++ b/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
@@ -21,7 +21,7 @@ import org.I0Itec.zkclient.ZkClient
import kafka.consumer.{SimpleConsumer, ConsumerConfig}
import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest}
import kafka.common.{TopicAndPartition, KafkaException}
-import kafka.utils.{ZKGroupTopicDirs, ZkUtils, ZKStringSerializer, CoreUtils}
+import kafka.utils.{ZKGroupTopicDirs, ZkUtils, CoreUtils}
import org.apache.kafka.common.protocol.SecurityProtocol
import org.apache.kafka.common.utils.Utils
@@ -36,8 +36,8 @@ object UpdateOffsetsInZK {
if(args.length < 3)
usage
val config = new ConsumerConfig(Utils.loadProps(args(1)))
- val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs,
- config.zkConnectionTimeoutMs, ZKStringSerializer)
+ val zkClient = ZkUtils.createZkClient(config.zkConnect, config.zkSessionTimeoutMs,
+ config.zkConnectionTimeoutMs)
args(0) match {
case Earliest => getAndSetOffsets(zkClient, OffsetRequest.EarliestTime, config, args(2))
case Latest => getAndSetOffsets(zkClient, OffsetRequest.LatestTime, config, args(2))
http://git-wip-us.apache.org/repos/asf/kafka/blob/43b92f8b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
index 4fb519b..db2721f 100644
--- a/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
+++ b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
@@ -19,7 +19,7 @@ package kafka.tools
import joptsimple.OptionParser
import org.I0Itec.zkclient.ZkClient
-import kafka.utils.{Logging, ZKGroupTopicDirs, ZkUtils, ZKStringSerializer, CommandLineUtils}
+import kafka.utils.{Logging, ZKGroupTopicDirs, ZkUtils, CommandLineUtils}
object VerifyConsumerRebalance extends Logging {
def main(args: Array[String]) {
@@ -48,7 +48,7 @@ object VerifyConsumerRebalance extends Logging {
var zkClient: ZkClient = null
try {
- zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
+ zkClient = ZkUtils.createZkClient(zkConnect, 30000, 30000)
debug("zkConnect = %s; group = %s".format(zkConnect, group))
http://git-wip-us.apache.org/repos/asf/kafka/blob/43b92f8b/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 2618dd3..78475e3 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -460,7 +460,7 @@ object ZkUtils extends Logging {
def maybeDeletePath(zkUrl: String, dir: String) {
try {
- val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer)
+ val zk = createZkClient(zkUrl, 30*1000, 30*1000)
zk.deleteRecursive(dir)
zk.close()
} catch {
@@ -781,9 +781,14 @@ object ZkUtils extends Logging {
}
}
}
+
+ def createZkClient(zkUrl: String, sessionTimeout: Int, connectionTimeout: Int): ZkClient = {
+ val zkClient = new ZkClient(zkUrl, sessionTimeout, connectionTimeout, ZKStringSerializer)
+ zkClient
+ }
}
-object ZKStringSerializer extends ZkSerializer {
+private object ZKStringSerializer extends ZkSerializer {
@throws(classOf[ZkMarshallingError])
def serialize(data : Object) : Array[Byte] = data.asInstanceOf[String].getBytes("UTF-8")
http://git-wip-us.apache.org/repos/asf/kafka/blob/43b92f8b/core/src/test/scala/other/kafka/DeleteZKPath.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/DeleteZKPath.scala b/core/src/test/scala/other/kafka/DeleteZKPath.scala
index 33c3ef8..fb8ab9f 100755
--- a/core/src/test/scala/other/kafka/DeleteZKPath.scala
+++ b/core/src/test/scala/other/kafka/DeleteZKPath.scala
@@ -18,7 +18,7 @@
package kafka
import consumer.ConsumerConfig
-import utils.{ZKStringSerializer, ZkUtils}
+import utils.ZkUtils
import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.common.utils.Utils
@@ -32,8 +32,7 @@ object DeleteZKPath {
val config = new ConsumerConfig(Utils.loadProps(args(0)))
val zkPath = args(1)
- val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
- ZKStringSerializer)
+ val zkClient = ZkUtils.createZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
try {
ZkUtils.deletePathRecursive(zkClient, zkPath);
http://git-wip-us.apache.org/repos/asf/kafka/blob/43b92f8b/core/src/test/scala/other/kafka/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestOffsetManager.scala b/core/src/test/scala/other/kafka/TestOffsetManager.scala
index 9881bd3..4e90534 100644
--- a/core/src/test/scala/other/kafka/TestOffsetManager.scala
+++ b/core/src/test/scala/other/kafka/TestOffsetManager.scala
@@ -2,7 +2,7 @@ package other.kafka
import org.I0Itec.zkclient.ZkClient
import kafka.api._
-import kafka.utils.{ShutdownableThread, ZKStringSerializer}
+import kafka.utils.{ZkUtils, ShutdownableThread}
import org.apache.kafka.common.protocol.SecurityProtocol
import scala.collection._
import kafka.client.ClientUtils
@@ -238,7 +238,7 @@ object TestOffsetManager {
var fetchThread: FetchThread = null
var statsThread: StatsThread = null
try {
- zkClient = new ZkClient(zookeeper, 6000, 2000, ZKStringSerializer)
+ zkClient = ZkUtils.createZkClient(zookeeper, 6000, 2000)
commitThreads = (0 to (threadCount-1)).map { threadId =>
new CommitThread(threadId, partitionCount, commitIntervalMs, zkClient)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/43b92f8b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index 7f9fca3..359b0f5 100644
--- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -316,7 +316,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
}
def testLeaderSelectionForPartition() {
- val zkClient = new ZkClient(zkConnect, 6000, 30000, ZKStringSerializer)
+ val zkClient = ZkUtils.createZkClient(zkConnect, 6000, 30000)
// create topic topic1 with 1 partition on broker 0
createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = servers)
http://git-wip-us.apache.org/repos/asf/kafka/blob/43b92f8b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index faae0e9..17e9fe4 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -448,7 +448,7 @@ object TestUtils extends Logging {
}
def updateConsumerOffset(config : ConsumerConfig, path : String, offset : Long) = {
- val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
+ val zkClient = ZkUtils.createZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
ZkUtils.updatePersistentPath(zkClient, path, offset.toString)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/43b92f8b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
index 85eec6f..2be1619 100644
--- a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
@@ -19,7 +19,7 @@ package kafka.zk
import kafka.consumer.ConsumerConfig
import org.I0Itec.zkclient.ZkClient
-import kafka.utils.{ZkUtils, ZKStringSerializer}
+import kafka.utils.ZkUtils
import kafka.utils.TestUtils
import org.junit.Assert
import org.scalatest.junit.JUnit3Suite
@@ -29,8 +29,7 @@ class ZKEphemeralTest extends JUnit3Suite with ZooKeeperTestHarness {
def testEphemeralNodeCleanup = {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
- var zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
- ZKStringSerializer)
+ var zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
try {
ZkUtils.createEphemeralPathExpectConflict(zkClient, "/tmp/zktest", "node created")
@@ -42,8 +41,7 @@ class ZKEphemeralTest extends JUnit3Suite with ZooKeeperTestHarness {
testData = ZkUtils.readData(zkClient, "/tmp/zktest")._1
Assert.assertNotNull(testData)
zkClient.close
- zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
- ZKStringSerializer)
+ zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
val nodeExists = ZkUtils.pathExists(zkClient, "/tmp/zktest")
Assert.assertFalse(nodeExists)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/43b92f8b/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala b/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
index a2d062f..64c3ba2 100644
--- a/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
@@ -19,7 +19,7 @@ package unit.kafka.zk
import junit.framework.Assert
import kafka.consumer.ConsumerConfig
-import kafka.utils.{ZkPath, TestUtils, ZKStringSerializer, ZkUtils}
+import kafka.utils.{ZkPath, TestUtils, ZkUtils}
import kafka.zk.ZooKeeperTestHarness
import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.common.config.ConfigException
@@ -34,9 +34,8 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness {
def testCreatePersistentPathThrowsException {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot,
"test", "1"))
- var zkClient = new ZkClient(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
- config.zkConnectionTimeoutMs,
- ZKStringSerializer)
+ var zkClient = ZkUtils.createZkClient(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
+ config.zkConnectionTimeoutMs)
try {
ZkPath.resetNamespaceCheckedState
ZkUtils.createPersistentPath(zkClient, path)
@@ -49,8 +48,7 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness {
def testCreatePersistentPath {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
- var zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
- ZKStringSerializer)
+ var zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
try {
ZkPath.resetNamespaceCheckedState
ZkUtils.createPersistentPath(zkClient, path)
@@ -64,9 +62,8 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness {
def testMakeSurePersistsPathExistsThrowsException {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot,
"test", "1"))
- var zkClient = new ZkClient(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
- config.zkConnectionTimeoutMs,
- ZKStringSerializer)
+ var zkClient = ZkUtils.createZkClient(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
+ config.zkConnectionTimeoutMs)
try {
ZkPath.resetNamespaceCheckedState
ZkUtils.makeSurePersistentPathExists(zkClient, path)
@@ -79,8 +76,7 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness {
def testMakeSurePersistsPathExists {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
- var zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
- ZKStringSerializer)
+ var zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
try {
ZkPath.resetNamespaceCheckedState
ZkUtils.makeSurePersistentPathExists(zkClient, path)
@@ -94,9 +90,8 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness {
def testCreateEphemeralPathThrowsException {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot,
"test", "1"))
- var zkClient = new ZkClient(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
- config.zkConnectionTimeoutMs,
- ZKStringSerializer)
+ var zkClient = ZkUtils.createZkClient(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
+ config.zkConnectionTimeoutMs)
try {
ZkPath.resetNamespaceCheckedState
ZkUtils.createEphemeralPathExpectConflict(zkClient, path, "somedata")
@@ -109,8 +104,7 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness {
def testCreateEphemeralPathExists {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
- var zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
- ZKStringSerializer)
+ var zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
try {
ZkPath.resetNamespaceCheckedState
ZkUtils.createEphemeralPathExpectConflict(zkClient, path, "somedata")
@@ -124,9 +118,8 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness {
def testCreatePersistentSequentialThrowsException {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot,
"test", "1"))
- var zkClient = new ZkClient(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
- config.zkConnectionTimeoutMs,
- ZKStringSerializer)
+ var zkClient = ZkUtils.createZkClient(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
+ config.zkConnectionTimeoutMs)
try {
ZkPath.resetNamespaceCheckedState
ZkUtils.createSequentialPersistentPath(zkClient, path)
@@ -139,8 +132,7 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness {
def testCreatePersistentSequentialExists {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
- var zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
- ZKStringSerializer)
+ var zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
var actualPath: String = ""
try {
http://git-wip-us.apache.org/repos/asf/kafka/blob/43b92f8b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index 86bddea..1f4d10d 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -19,7 +19,7 @@ package kafka.zk
import org.scalatest.junit.JUnit3Suite
import org.I0Itec.zkclient.ZkClient
-import kafka.utils.{ZKStringSerializer, CoreUtils}
+import kafka.utils.{ZkUtils, CoreUtils}
trait ZooKeeperTestHarness extends JUnit3Suite {
var zkPort: Int = -1
@@ -34,7 +34,7 @@ trait ZooKeeperTestHarness extends JUnit3Suite {
super.setUp
zookeeper = new EmbeddedZookeeper()
zkPort = zookeeper.port
- zkClient = new ZkClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, ZKStringSerializer)
+ zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeout, zkConnectionTimeout)
}
override def tearDown() {