You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/10/19 00:24:07 UTC
[2/5] kafka git commit: KAFKA-2639: Refactoring of ZkUtils
http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/other/kafka/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestOffsetManager.scala b/core/src/test/scala/other/kafka/TestOffsetManager.scala
index e0e46c8..549a96b 100644
--- a/core/src/test/scala/other/kafka/TestOffsetManager.scala
+++ b/core/src/test/scala/other/kafka/TestOffsetManager.scala
@@ -63,13 +63,13 @@ object TestOffsetManager {
}
- class CommitThread(id: Int, partitionCount: Int, commitIntervalMs: Long, zkClient: ZkClient)
+ class CommitThread(id: Int, partitionCount: Int, commitIntervalMs: Long, zkUtils: ZkUtils)
extends ShutdownableThread("commit-thread")
with KafkaMetricsGroup {
private val groupId = "group-" + id
private val metadata = "Metadata from commit thread " + id
- private var offsetsChannel = ClientUtils.channelToOffsetManager(groupId, zkClient, SocketTimeoutMs)
+ private var offsetsChannel = ClientUtils.channelToOffsetManager(groupId, zkUtils, SocketTimeoutMs)
private var offset = 0L
val numErrors = new AtomicInteger(0)
val numCommits = new AtomicInteger(0)
@@ -79,7 +79,7 @@ object TestOffsetManager {
private def ensureConnected() {
if (!offsetsChannel.isConnected)
- offsetsChannel = ClientUtils.channelToOffsetManager(groupId, zkClient, SocketTimeoutMs)
+ offsetsChannel = ClientUtils.channelToOffsetManager(groupId, zkUtils, SocketTimeoutMs)
}
override def doWork() {
@@ -119,7 +119,7 @@ object TestOffsetManager {
}
}
- class FetchThread(numGroups: Int, fetchIntervalMs: Long, zkClient: ZkClient)
+ class FetchThread(numGroups: Int, fetchIntervalMs: Long, zkUtils: ZkUtils)
extends ShutdownableThread("fetch-thread")
with KafkaMetricsGroup {
@@ -127,7 +127,7 @@ object TestOffsetManager {
private val fetchTimer = new KafkaTimer(timer)
private val channels = mutable.Map[Int, BlockingChannel]()
- private var metadataChannel = ClientUtils.channelToAnyBroker(zkClient, SocketTimeoutMs)
+ private var metadataChannel = ClientUtils.channelToAnyBroker(zkUtils, SocketTimeoutMs)
private val numErrors = new AtomicInteger(0)
@@ -141,7 +141,7 @@ object TestOffsetManager {
val channel = if (channels.contains(coordinatorId))
channels(coordinatorId)
else {
- val newChannel = ClientUtils.channelToOffsetManager(group, zkClient, SocketTimeoutMs)
+ val newChannel = ClientUtils.channelToOffsetManager(group, zkUtils, SocketTimeoutMs)
channels.put(coordinatorId, newChannel)
newChannel
}
@@ -173,7 +173,7 @@ object TestOffsetManager {
println("Error while querying %s:%d - shutting down query channel.".format(metadataChannel.host, metadataChannel.port))
metadataChannel.disconnect()
println("Creating new query channel.")
- metadataChannel = ClientUtils.channelToAnyBroker(zkClient, SocketTimeoutMs)
+ metadataChannel = ClientUtils.channelToAnyBroker(zkUtils, SocketTimeoutMs)
}
finally {
Thread.sleep(fetchIntervalMs)
@@ -250,17 +250,17 @@ object TestOffsetManager {
println("Commit thread count: %d; Partition count: %d, Commit interval: %d ms; Fetch interval: %d ms; Reporting interval: %d ms"
.format(threadCount, partitionCount, commitIntervalMs, fetchIntervalMs, reportingIntervalMs))
- var zkClient: ZkClient = null
+ var zkUtils: ZkUtils = null
var commitThreads: Seq[CommitThread] = Seq()
var fetchThread: FetchThread = null
var statsThread: StatsThread = null
try {
- zkClient = ZkUtils.createZkClient(zookeeper, 6000, 2000)
+ zkUtils = ZkUtils(zookeeper, 6000, 2000, false)
commitThreads = (0 to (threadCount-1)).map { threadId =>
- new CommitThread(threadId, partitionCount, commitIntervalMs, zkClient)
+ new CommitThread(threadId, partitionCount, commitIntervalMs, zkUtils)
}
- fetchThread = new FetchThread(threadCount, fetchIntervalMs, zkClient)
+ fetchThread = new FetchThread(threadCount, fetchIntervalMs, zkUtils)
val statsThread = new StatsThread(reportingIntervalMs, commitThreads, fetchThread)
@@ -300,7 +300,7 @@ object TestOffsetManager {
statsThread.shutdown()
statsThread.join()
}
- zkClient.close()
+ zkUtils.close()
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index ed94039..0fce611 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -49,10 +49,10 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
brokers = servers.map(s => new Broker(s.config.brokerId, s.config.hostName, s.boundPort()))
// create topics first
- createTopic(zkClient, topic1, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers)
- createTopic(zkClient, topic2, partitionReplicaAssignment = Map(0->Seq(1,2)), servers = servers)
- createTopic(zkClient, topic3, partitionReplicaAssignment = Map(0->Seq(2,3,0,1)), servers = servers)
- createTopic(zkClient, topic4, partitionReplicaAssignment = Map(0->Seq(0,3)), servers = servers)
+ createTopic(zkUtils, topic1, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers)
+ createTopic(zkUtils, topic2, partitionReplicaAssignment = Map(0->Seq(1,2)), servers = servers)
+ createTopic(zkUtils, topic3, partitionReplicaAssignment = Map(0->Seq(2,3,0,1)), servers = servers)
+ createTopic(zkUtils, topic4, partitionReplicaAssignment = Map(0->Seq(0,3)), servers = servers)
}
@After
@@ -65,7 +65,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
@Test
def testTopicDoesNotExist {
try {
- AdminUtils.addPartitions(zkClient, "Blah", 1)
+ AdminUtils.addPartitions(zkUtils, "Blah", 1)
fail("Topic should not exist")
} catch {
case e: AdminOperationException => //this is good
@@ -76,7 +76,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
@Test
def testWrongReplicaCount {
try {
- AdminUtils.addPartitions(zkClient, topic1, 2, "0:1,0:1:2")
+ AdminUtils.addPartitions(zkUtils, topic1, 2, "0:1,0:1:2")
fail("Add partitions should fail")
} catch {
case e: AdminOperationException => //this is good
@@ -86,12 +86,12 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
@Test
def testIncrementPartitions {
- AdminUtils.addPartitions(zkClient, topic1, 3)
+ AdminUtils.addPartitions(zkUtils, topic1, 3)
// wait until leader is elected
- var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 1)
- var leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 2)
- val leader1FromZk = ZkUtils.getLeaderForPartition(zkClient, topic1, 1).get
- val leader2FromZk = ZkUtils.getLeaderForPartition(zkClient, topic1, 2).get
+ var leader1 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic1, 1)
+ var leader2 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic1, 2)
+ val leader1FromZk = zkUtils.getLeaderForPartition(topic1, 1).get
+ val leader2FromZk = zkUtils.getLeaderForPartition(topic1, 2).get
assertEquals(leader1.get, leader1FromZk)
assertEquals(leader2.get, leader2FromZk)
@@ -112,12 +112,12 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
@Test
def testManualAssignmentOfReplicas {
- AdminUtils.addPartitions(zkClient, topic2, 3, "1:2,0:1,2:3")
+ AdminUtils.addPartitions(zkUtils, topic2, 3, "1:2,0:1,2:3")
// wait until leader is elected
- var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic2, 1)
- var leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic2, 2)
- val leader1FromZk = ZkUtils.getLeaderForPartition(zkClient, topic2, 1).get
- val leader2FromZk = ZkUtils.getLeaderForPartition(zkClient, topic2, 2).get
+ var leader1 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic2, 1)
+ var leader2 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic2, 2)
+ val leader1FromZk = zkUtils.getLeaderForPartition(topic2, 1).get
+ val leader2FromZk = zkUtils.getLeaderForPartition(topic2, 2).get
assertEquals(leader1.get, leader1FromZk)
assertEquals(leader2.get, leader2FromZk)
@@ -139,7 +139,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
@Test
def testReplicaPlacement {
- AdminUtils.addPartitions(zkClient, topic3, 7)
+ AdminUtils.addPartitions(zkUtils, topic3, 7)
// read metadata from a broker and verify the new topic partitions exist
TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 1)
http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 2d18069..52ea580 100755
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -66,23 +66,23 @@ class AdminTest extends ZooKeeperTestHarness with Logging {
@Test
def testManualReplicaAssignment() {
val brokers = List(0, 1, 2, 3, 4)
- TestUtils.createBrokersInZk(zkClient, zkConnection, brokers)
+ TestUtils.createBrokersInZk(zkUtils, brokers)
// duplicate brokers
intercept[IllegalArgumentException] {
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, "test", Map(0->Seq(0,0)))
+ AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, "test", Map(0->Seq(0,0)))
}
// inconsistent replication factor
intercept[IllegalArgumentException] {
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, "test", Map(0->Seq(0,1), 1->Seq(0)))
+ AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, "test", Map(0->Seq(0,1), 1->Seq(0)))
}
// good assignment
val assignment = Map(0 -> List(0, 1, 2),
1 -> List(1, 2, 3))
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, "test", assignment)
- val found = ZkUtils.getPartitionAssignmentForTopics(zkClient, Seq("test"))
+ AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, "test", assignment)
+ val found = zkUtils.getPartitionAssignmentForTopics(Seq("test"))
assertEquals(assignment, found("test"))
}
@@ -117,19 +117,19 @@ class AdminTest extends ZooKeeperTestHarness with Logging {
11 -> 1
)
val topic = "test"
- TestUtils.createBrokersInZk(zkClient, zkConnection, List(0, 1, 2, 3, 4))
+ TestUtils.createBrokersInZk(zkUtils, List(0, 1, 2, 3, 4))
// create the topic
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
+ AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
// create leaders for all partitions
- TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1)
- val actualReplicaList = leaderForPartitionMap.keys.toArray.map(p => (p -> ZkUtils.getReplicasForPartition(zkClient, topic, p))).toMap
+ TestUtils.makeLeaderForPartition(zkUtils, topic, leaderForPartitionMap, 1)
+ val actualReplicaList = leaderForPartitionMap.keys.toArray.map(p => (p -> zkUtils.getReplicasForPartition(topic, p))).toMap
assertEquals(expectedReplicaAssignment.size, actualReplicaList.size)
for(i <- 0 until actualReplicaList.size)
assertEquals(expectedReplicaAssignment.get(i).get, actualReplicaList(i))
intercept[TopicExistsException] {
// shouldn't be able to create a topic that already exists
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
+ AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
}
}
@@ -137,13 +137,13 @@ class AdminTest extends ZooKeeperTestHarness with Logging {
def testTopicCreationWithCollision() {
val topic = "test.topic"
val collidingTopic = "test_topic"
- TestUtils.createBrokersInZk(zkClient, zkConnection, List(0, 1, 2, 3, 4))
+ TestUtils.createBrokersInZk(zkUtils, List(0, 1, 2, 3, 4))
// create the topic
- AdminUtils.createTopic(zkClient, topic, 3, 1)
+ AdminUtils.createTopic(zkUtils, topic, 3, 1)
intercept[InvalidTopicException] {
// shouldn't be able to create a topic that collides
- AdminUtils.createTopic(zkClient, collidingTopic, 3, 1)
+ AdminUtils.createTopic(zkUtils, collidingTopic, 3, 1)
}
}
@@ -160,25 +160,25 @@ class AdminTest extends ZooKeeperTestHarness with Logging {
// create brokers
val servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
// create the topic
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
+ AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
// reassign partition 0
val newReplicas = Seq(0, 2, 3)
val partitionToBeReassigned = 0
val topicAndPartition = TopicAndPartition(topic, partitionToBeReassigned)
- val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition -> newReplicas))
+ val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, Map(topicAndPartition -> newReplicas))
assertTrue("Partition reassignment attempt failed for [test, 0]", reassignPartitionsCommand.reassignPartitions())
// wait until reassignment is completed
TestUtils.waitUntilTrue(() => {
- val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas);
- ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition, newReplicas,
+ val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned().mapValues(_.newReplicas);
+ ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkUtils, topicAndPartition, newReplicas,
Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted;
},
"Partition reassignment should complete")
- val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned)
+ val assignedReplicas = zkUtils.getReplicasForPartition(topic, partitionToBeReassigned)
// in sync replicas should not have any replica that is not in the new assigned replicas
- checkForPhantomInSyncReplicas(zkClient, topic, partitionToBeReassigned, assignedReplicas)
+ checkForPhantomInSyncReplicas(zkUtils, topic, partitionToBeReassigned, assignedReplicas)
assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas)
- ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas, servers)
+ ensureNoUnderReplicatedPartitions(zkUtils, topic, partitionToBeReassigned, assignedReplicas, servers)
TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet,
"New replicas should exist on brokers")
servers.foreach(_.shutdown())
@@ -191,24 +191,24 @@ class AdminTest extends ZooKeeperTestHarness with Logging {
// create brokers
val servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
// create the topic
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
+ AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
// reassign partition 0
val newReplicas = Seq(1, 2, 3)
val partitionToBeReassigned = 0
val topicAndPartition = TopicAndPartition(topic, partitionToBeReassigned)
- val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition -> newReplicas))
+ val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, Map(topicAndPartition -> newReplicas))
assertTrue("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions())
// wait until reassignment is completed
TestUtils.waitUntilTrue(() => {
- val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas);
- ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition, newReplicas,
+ val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned().mapValues(_.newReplicas);
+ ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkUtils, topicAndPartition, newReplicas,
Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted;
},
"Partition reassignment should complete")
- val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned)
+ val assignedReplicas = zkUtils.getReplicasForPartition(topic, partitionToBeReassigned)
assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas)
- checkForPhantomInSyncReplicas(zkClient, topic, partitionToBeReassigned, assignedReplicas)
- ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas, servers)
+ checkForPhantomInSyncReplicas(zkUtils, topic, partitionToBeReassigned, assignedReplicas)
+ ensureNoUnderReplicatedPartitions(zkUtils, topic, partitionToBeReassigned, assignedReplicas, servers)
TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet,
"New replicas should exist on brokers")
@@ -222,24 +222,24 @@ class AdminTest extends ZooKeeperTestHarness with Logging {
// create brokers
val servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
// create the topic
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
+ AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
// reassign partition 0
val newReplicas = Seq(2, 3)
val partitionToBeReassigned = 0
val topicAndPartition = TopicAndPartition(topic, partitionToBeReassigned)
- val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition -> newReplicas))
+ val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, Map(topicAndPartition -> newReplicas))
assertTrue("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions())
// wait until reassignment is completed
TestUtils.waitUntilTrue(() => {
- val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas);
- ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition, newReplicas,
+ val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned().mapValues(_.newReplicas);
+ ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkUtils, topicAndPartition, newReplicas,
Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted;
},
"Partition reassignment should complete")
- val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned)
+ val assignedReplicas = zkUtils.getReplicasForPartition(topic, partitionToBeReassigned)
assertEquals("Partition should have been reassigned to 2, 3", newReplicas, assignedReplicas)
- checkForPhantomInSyncReplicas(zkClient, topic, partitionToBeReassigned, assignedReplicas)
- ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas, servers)
+ checkForPhantomInSyncReplicas(zkUtils, topic, partitionToBeReassigned, assignedReplicas)
+ ensureNoUnderReplicatedPartitions(zkUtils, topic, partitionToBeReassigned, assignedReplicas, servers)
TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet,
"New replicas should exist on brokers")
servers.foreach(_.shutdown())
@@ -254,9 +254,9 @@ class AdminTest extends ZooKeeperTestHarness with Logging {
val newReplicas = Seq(2, 3)
val partitionToBeReassigned = 0
val topicAndPartition = TopicAndPartition(topic, partitionToBeReassigned)
- val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition -> newReplicas))
+ val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, Map(topicAndPartition -> newReplicas))
assertFalse("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions())
- val reassignedPartitions = ZkUtils.getPartitionsBeingReassigned(zkClient)
+ val reassignedPartitions = zkUtils.getPartitionsBeingReassigned()
assertFalse("Partition should not be reassigned", reassignedPartitions.contains(topicAndPartition))
servers.foreach(_.shutdown())
}
@@ -266,25 +266,25 @@ class AdminTest extends ZooKeeperTestHarness with Logging {
val expectedReplicaAssignment = Map(0 -> List(0, 1))
val topic = "test"
// create the topic
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
+ AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
// put the partition in the reassigned path as well
// reassign partition 0
val newReplicas = Seq(0, 1)
val partitionToBeReassigned = 0
val topicAndPartition = TopicAndPartition(topic, partitionToBeReassigned)
- val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition -> newReplicas))
+ val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, Map(topicAndPartition -> newReplicas))
reassignPartitionsCommand.reassignPartitions
// create brokers
val servers = TestUtils.createBrokerConfigs(2, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
// wait until reassignment completes
- TestUtils.waitUntilTrue(() => !checkIfReassignPartitionPathExists(zkClient),
+ TestUtils.waitUntilTrue(() => !checkIfReassignPartitionPathExists(zkUtils),
"Partition reassignment should complete")
- val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned)
+ val assignedReplicas = zkUtils.getReplicasForPartition(topic, partitionToBeReassigned)
assertEquals("Partition should have been reassigned to 0, 1", newReplicas, assignedReplicas)
- checkForPhantomInSyncReplicas(zkClient, topic, partitionToBeReassigned, assignedReplicas)
+ checkForPhantomInSyncReplicas(zkUtils, topic, partitionToBeReassigned, assignedReplicas)
// ensure that there are no under replicated partitions
- ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas, servers)
+ ensureNoUnderReplicatedPartitions(zkUtils, topic, partitionToBeReassigned, assignedReplicas, servers)
TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet,
"New replicas should exist on brokers")
servers.foreach(_.shutdown())
@@ -294,10 +294,9 @@ class AdminTest extends ZooKeeperTestHarness with Logging {
def testPreferredReplicaJsonData() {
// write preferred replica json data to zk path
val partitionsForPreferredReplicaElection = Set(TopicAndPartition("test", 1), TopicAndPartition("test2", 1))
- PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkClient, partitionsForPreferredReplicaElection)
+ PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkUtils, partitionsForPreferredReplicaElection)
// try to read it back and compare with what was written
- val preferredReplicaElectionZkData = ZkUtils.readData(zkClient,
- ZkUtils.PreferredReplicaLeaderElectionPath)._1
+ val preferredReplicaElectionZkData = zkUtils.readData(ZkUtils.PreferredReplicaLeaderElectionPath)._1
val partitionsUndergoingPreferredReplicaElection =
PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(preferredReplicaElectionZkData)
assertEquals("Preferred replica election ser-de failed", partitionsForPreferredReplicaElection,
@@ -313,14 +312,14 @@ class AdminTest extends ZooKeeperTestHarness with Logging {
// create brokers
val serverConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false).map(KafkaConfig.fromProps)
// create the topic
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
+ AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
val servers = serverConfigs.reverseMap(s => TestUtils.createServer(s))
// broker 2 should be the leader since it was started first
- val currentLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition, oldLeaderOpt = None).get
+ val currentLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partition, oldLeaderOpt = None).get
// trigger preferred replica election
- val preferredReplicaElection = new PreferredReplicaLeaderElectionCommand(zkClient, Set(TopicAndPartition(topic, partition)))
+ val preferredReplicaElection = new PreferredReplicaLeaderElectionCommand(zkUtils, Set(TopicAndPartition(topic, partition)))
preferredReplicaElection.moveLeaderToPreferredReplica()
- val newLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition, oldLeaderOpt = Some(currentLeader)).get
+ val newLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partition, oldLeaderOpt = Some(currentLeader)).get
assertEquals("Preferred replica election failed", preferredReplica, newLeader)
servers.foreach(_.shutdown())
}
@@ -334,9 +333,9 @@ class AdminTest extends ZooKeeperTestHarness with Logging {
val serverConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false).map(KafkaConfig.fromProps)
val servers = serverConfigs.reverseMap(s => TestUtils.createServer(s))
// create the topic
- TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment = expectedReplicaAssignment, servers = servers)
+ TestUtils.createTopic(zkUtils, topic, partitionReplicaAssignment = expectedReplicaAssignment, servers = servers)
- val controllerId = ZkUtils.getController(zkClient)
+ val controllerId = zkUtils.getController()
val controller = servers.find(p => p.config.brokerId == controllerId).get.kafkaController
var partitionsRemaining = controller.shutdownBroker(2)
var activeServers = servers.filter(s => s.config.brokerId != 2)
@@ -402,16 +401,16 @@ class AdminTest extends ZooKeeperTestHarness with Logging {
// create a topic with a few config overrides and check that they are applied
val maxMessageSize = 1024
val retentionMs = 1000*1000
- AdminUtils.createTopic(server.zkClient, topic, partitions, 1, makeConfig(maxMessageSize, retentionMs))
+ AdminUtils.createTopic(server.zkUtils, topic, partitions, 1, makeConfig(maxMessageSize, retentionMs))
checkConfig(maxMessageSize, retentionMs)
// now double the config values for the topic and check that it is applied
val newConfig: Properties = makeConfig(2*maxMessageSize, 2 * retentionMs)
- AdminUtils.changeTopicConfig(server.zkClient, topic, makeConfig(2*maxMessageSize, 2 * retentionMs))
+ AdminUtils.changeTopicConfig(server.zkUtils, topic, makeConfig(2*maxMessageSize, 2 * retentionMs))
checkConfig(2*maxMessageSize, 2 * retentionMs)
// Verify that the same config can be read from ZK
- val configInZk = AdminUtils.fetchEntityConfig(server.zkClient, ConfigType.Topic, topic)
+ val configInZk = AdminUtils.fetchEntityConfig(server.zkUtils, ConfigType.Topic, topic)
assertEquals(newConfig, configInZk)
} finally {
server.shutdown()
http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
index d3abf08..c19127e 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
@@ -33,11 +33,11 @@ class DeleteConsumerGroupTest extends KafkaServerTestHarness {
val groupToDelete = "groupToDelete"
val otherGroup = "otherGroup"
- TestUtils.createTopic(zkClient, topic, 1, 3, servers)
+ TestUtils.createTopic(zkUtils, topic, 1, 3, servers)
fillInConsumerGroupInfo(topic, groupToDelete, "consumer", 0, 10, false)
fillInConsumerGroupInfo(topic, otherGroup, "consumer", 0, 10, false)
- AdminUtils.deleteConsumerGroupInZK(zkClient, groupToDelete)
+ AdminUtils.deleteConsumerGroupInZK(zkUtils, groupToDelete)
TestUtils.waitUntilTrue(() => !groupDirExists(new ZKGroupDirs(groupToDelete)),
"DeleteConsumerGroupInZK should delete the provided consumer group's directory")
@@ -51,11 +51,11 @@ class DeleteConsumerGroupTest extends KafkaServerTestHarness {
val groupToDelete = "groupToDelete"
val otherGroup = "otherGroup"
- TestUtils.createTopic(zkClient, topic, 1, 3, servers)
+ TestUtils.createTopic(zkUtils, topic, 1, 3, servers)
fillInConsumerGroupInfo(topic, groupToDelete, "consumer", 0, 10, true)
fillInConsumerGroupInfo(topic, otherGroup, "consumer", 0, 10, false)
- AdminUtils.deleteConsumerGroupInZK(zkClient, groupToDelete)
+ AdminUtils.deleteConsumerGroupInZK(zkUtils, groupToDelete)
TestUtils.waitUntilTrue(() => groupDirExists(new ZKGroupDirs(groupToDelete)),
"DeleteConsumerGroupInZK should not delete the provided consumer group's directory if the consumer group is still active")
@@ -68,11 +68,11 @@ class DeleteConsumerGroupTest extends KafkaServerTestHarness {
val topic = "test"
val groupToDelete = "groupToDelete"
val otherGroup = "otherGroup"
- TestUtils.createTopic(zkClient, topic, 1, 3, servers)
+ TestUtils.createTopic(zkUtils, topic, 1, 3, servers)
fillInConsumerGroupInfo(topic, groupToDelete, "consumer", 0, 10, false)
fillInConsumerGroupInfo(topic, otherGroup, "consumer", 0, 10, false)
- AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkClient, groupToDelete, topic)
+ AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkUtils, groupToDelete, topic)
TestUtils.waitUntilTrue(() => !groupDirExists(new ZKGroupDirs(groupToDelete)),
"DeleteConsumerGroupInfoForTopicInZK should delete the provided consumer group's directory if it just consumes from one topic")
@@ -86,14 +86,14 @@ class DeleteConsumerGroupTest extends KafkaServerTestHarness {
val otherTopic = "otherTopic"
val groupToDelete = "groupToDelete"
val otherGroup = "otherGroup"
- TestUtils.createTopic(zkClient, topicToDelete, 1, 3, servers)
- TestUtils.createTopic(zkClient, otherTopic, 1, 3, servers)
+ TestUtils.createTopic(zkUtils, topicToDelete, 1, 3, servers)
+ TestUtils.createTopic(zkUtils, otherTopic, 1, 3, servers)
fillInConsumerGroupInfo(topicToDelete, groupToDelete, "consumer", 0, 10, false)
fillInConsumerGroupInfo(otherTopic, groupToDelete, "consumer", 0, 10, false)
fillInConsumerGroupInfo(topicToDelete, otherGroup, "consumer", 0, 10, false)
- AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkClient, groupToDelete, topicToDelete)
+ AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkUtils, groupToDelete, topicToDelete)
TestUtils.waitUntilTrue(() => !groupTopicOffsetAndOwnerDirsExist(new ZKGroupTopicDirs(groupToDelete, topicToDelete)),
"DeleteConsumerGroupInfoForTopicInZK should delete the provided consumer group's owner and offset directories for the given topic")
@@ -108,13 +108,13 @@ class DeleteConsumerGroupTest extends KafkaServerTestHarness {
val topicToDelete = "topicToDelete"
val otherTopic = "otherTopic"
val group = "group"
- TestUtils.createTopic(zkClient, topicToDelete, 1, 3, servers)
- TestUtils.createTopic(zkClient, otherTopic, 1, 3, servers)
+ TestUtils.createTopic(zkUtils, topicToDelete, 1, 3, servers)
+ TestUtils.createTopic(zkUtils, otherTopic, 1, 3, servers)
fillInConsumerGroupInfo(topicToDelete, group, "consumer", 0, 10, true)
fillInConsumerGroupInfo(otherTopic, group, "consumer", 0, 10, true)
- AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkClient, group, topicToDelete)
+ AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkUtils, group, topicToDelete)
TestUtils.waitUntilTrue(() => groupTopicOffsetAndOwnerDirsExist(new ZKGroupTopicDirs(group, topicToDelete)),
"DeleteConsumerGroupInfoForTopicInZK should not delete the provided consumer group's owner and offset directories for the given topic if the consumer group is still active")
@@ -128,14 +128,14 @@ class DeleteConsumerGroupTest extends KafkaServerTestHarness {
val otherTopic = "otherTopic"
val groups = Seq("group1", "group2")
- TestUtils.createTopic(zkClient, topicToDelete, 1, 3, servers)
- TestUtils.createTopic(zkClient, otherTopic, 1, 3, servers)
+ TestUtils.createTopic(zkUtils, topicToDelete, 1, 3, servers)
+ TestUtils.createTopic(zkUtils, otherTopic, 1, 3, servers)
val groupTopicDirsForTopicToDelete = groups.map(group => new ZKGroupTopicDirs(group, topicToDelete))
val groupTopicDirsForOtherTopic = groups.map(group => new ZKGroupTopicDirs(group, otherTopic))
groupTopicDirsForTopicToDelete.foreach(dir => fillInConsumerGroupInfo(topicToDelete, dir.group, "consumer", 0, 10, false))
groupTopicDirsForOtherTopic.foreach(dir => fillInConsumerGroupInfo(otherTopic, dir.group, "consumer", 0, 10, false))
- AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkClient, topicToDelete)
+ AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkUtils, topicToDelete)
TestUtils.waitUntilTrue(() => !groupTopicDirsForTopicToDelete.exists(groupTopicOffsetAndOwnerDirsExist),
"Consumer group info on deleted topic topic should be deleted by DeleteAllConsumerGroupInfoForTopicInZK")
@@ -148,13 +148,13 @@ class DeleteConsumerGroupTest extends KafkaServerTestHarness {
val topic = "topic"
val group = "group"
- TestUtils.createTopic(zkClient, topic, 1, 3, servers)
+ TestUtils.createTopic(zkUtils, topic, 1, 3, servers)
val dir = new ZKGroupTopicDirs(group, topic)
fillInConsumerGroupInfo(topic, dir.group, "consumer", 0, 10, false)
- AdminUtils.deleteTopic(zkClient, topic)
- TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
- AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkClient, topic)
+ AdminUtils.deleteTopic(zkUtils, topic)
+ TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
+ AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkUtils, topic)
TestUtils.waitUntilTrue(() => !groupDirExists(dir),
"Consumer group info on related topics should be deleted by DeleteAllConsumerGroupInfoForTopicInZK")
@@ -185,19 +185,19 @@ class DeleteConsumerGroupTest extends KafkaServerTestHarness {
val consumerConfig = new ConsumerConfig(consumerProps)
val dir = new ZKGroupTopicDirs(group, topic)
TestUtils.updateConsumerOffset(consumerConfig, dir.consumerOffsetDir + "/" + partition, offset)
- ZkUtils.createEphemeralPathExpectConflict(zkClient, ZkUtils.getConsumerPartitionOwnerPath(group, topic, partition), "")
- ZkUtils.makeSurePersistentPathExists(zkClient, dir.consumerRegistryDir)
+ zkUtils.createEphemeralPathExpectConflict(zkUtils.getConsumerPartitionOwnerPath(group, topic, partition), "")
+ zkUtils.makeSurePersistentPathExists(dir.consumerRegistryDir)
if (registerConsumer) {
- ZkUtils.createEphemeralPathExpectConflict(zkClient, dir.consumerRegistryDir + "/" + consumerId, "")
+ zkUtils.createEphemeralPathExpectConflict(dir.consumerRegistryDir + "/" + consumerId, "")
}
}
private def groupDirExists(dir: ZKGroupDirs) = {
- ZkUtils.pathExists(zkClient, dir.consumerGroupDir)
+ zkUtils.pathExists(dir.consumerGroupDir)
}
private def groupTopicOffsetAndOwnerDirsExist(dir: ZKGroupTopicDirs) = {
- ZkUtils.pathExists(zkClient, dir.consumerOffsetDir) && ZkUtils.pathExists(zkClient, dir.consumerOwnerDir)
+ zkUtils.pathExists(dir.consumerOffsetDir) && zkUtils.pathExists(dir.consumerOwnerDir)
}
private def produceEvents(producer: KafkaProducer[Array[Byte], Array[Byte]], topic: String, messages: List[String]) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index fbae398..383cb44 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -20,6 +20,7 @@ import kafka.log.Log
import kafka.zk.ZooKeeperTestHarness
import junit.framework.Assert._
import kafka.utils.{ZkUtils, TestUtils}
+import kafka.utils.ZkUtils._
import kafka.server.{KafkaServer, KafkaConfig}
import org.junit.Test
import java.util.Properties
@@ -33,8 +34,8 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
val topic = topicAndPartition.topic
val servers = createTestTopicAndCluster(topic)
// start topic deletion
- AdminUtils.deleteTopic(zkClient, topic)
- TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
+ AdminUtils.deleteTopic(zkUtils, topic)
+ TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
servers.foreach(_.shutdown())
}
@@ -44,22 +45,22 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
val topic = topicAndPartition.topic
val servers = createTestTopicAndCluster(topic)
// shut down one follower replica
- val leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0)
+ val leaderIdOpt = zkUtils.getLeaderForPartition(topic, 0)
assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined)
val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last
follower.shutdown()
// start topic deletion
- AdminUtils.deleteTopic(zkClient, topic)
+ AdminUtils.deleteTopic(zkUtils, topic)
// check if all replicas but the one that is shut down has deleted the log
TestUtils.waitUntilTrue(() =>
servers.filter(s => s.config.brokerId != follower.config.brokerId)
.forall(_.getLogManager().getLog(topicAndPartition).isEmpty), "Replicas 0,1 have not deleted log.")
// ensure topic deletion is halted
- TestUtils.waitUntilTrue(() => ZkUtils.pathExists(zkClient, ZkUtils.getDeleteTopicPath(topic)),
+ TestUtils.waitUntilTrue(() => zkUtils.pathExists(getDeleteTopicPath(topic)),
"Admin path /admin/delete_topic/test path deleted even when a follower replica is down")
// restart follower replica
follower.startup()
- TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
+ TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
servers.foreach(_.shutdown())
}
@@ -68,25 +69,25 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
val topicAndPartition = TopicAndPartition("test", 0)
val topic = topicAndPartition.topic
val servers = createTestTopicAndCluster(topic)
- val controllerId = ZkUtils.getController(zkClient)
+ val controllerId = zkUtils.getController()
val controller = servers.filter(s => s.config.brokerId == controllerId).head
- val leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0)
+ val leaderIdOpt = zkUtils.getLeaderForPartition(topic, 0)
val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get && s.config.brokerId != controllerId).last
follower.shutdown()
// start topic deletion
- AdminUtils.deleteTopic(zkClient, topic)
+ AdminUtils.deleteTopic(zkUtils, topic)
// shut down the controller to trigger controller failover during delete topic
controller.shutdown()
// ensure topic deletion is halted
- TestUtils.waitUntilTrue(() => ZkUtils.pathExists(zkClient, ZkUtils.getDeleteTopicPath(topic)),
+ TestUtils.waitUntilTrue(() => zkUtils.pathExists(getDeleteTopicPath(topic)),
"Admin path /admin/delete_topic/test path deleted even when a replica is down")
controller.startup()
follower.startup()
- TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
+ TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
servers.foreach(_.shutdown())
}
@@ -101,37 +102,37 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
val allServers = brokerConfigs.map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
val servers = allServers.filter(s => expectedReplicaAssignment(0).contains(s.config.brokerId))
// create the topic
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
+ AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
// wait until replica log is created on every broker
TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicAndPartition).isDefined),
"Replicas for topic test not created.")
- val leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0)
+ val leaderIdOpt = zkUtils.getLeaderForPartition(topic, 0)
assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined)
val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last
follower.shutdown()
// start topic deletion
- AdminUtils.deleteTopic(zkClient, topic)
+ AdminUtils.deleteTopic(zkUtils, topic)
// start partition reassignment at the same time right after delete topic. In this case, reassignment will fail since
// the topic is being deleted
// reassign partition 0
- val oldAssignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, 0)
+ val oldAssignedReplicas = zkUtils.getReplicasForPartition(topic, 0)
val newReplicas = Seq(1, 2, 3)
- val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition -> newReplicas))
+ val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, Map(topicAndPartition -> newReplicas))
assertTrue("Partition reassignment should fail for [test,0]", reassignPartitionsCommand.reassignPartitions())
// wait until reassignment is completed
TestUtils.waitUntilTrue(() => {
- val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas);
- ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition, newReplicas,
+ val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned().mapValues(_.newReplicas);
+ ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkUtils, topicAndPartition, newReplicas,
Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentFailed;
}, "Partition reassignment shouldn't complete.")
- val controllerId = ZkUtils.getController(zkClient)
+ val controllerId = zkUtils.getController()
val controller = servers.filter(s => s.config.brokerId == controllerId).head
assertFalse("Partition reassignment should fail",
controller.kafkaController.controllerContext.partitionsBeingReassigned.contains(topicAndPartition))
- val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, 0)
+ val assignedReplicas = zkUtils.getReplicasForPartition(topic, 0)
assertEquals("Partition should not be reassigned to 0, 1, 2", oldAssignedReplicas, assignedReplicas)
follower.startup()
- TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
+ TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
allServers.foreach(_.shutdown())
}
@@ -139,18 +140,18 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
def testDeleteTopicDuringAddPartition() {
val topic = "test"
val servers = createTestTopicAndCluster(topic)
- val leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0)
+ val leaderIdOpt = zkUtils.getLeaderForPartition(topic, 0)
assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined)
val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last
val newPartition = TopicAndPartition(topic, 1)
follower.shutdown()
// add partitions to topic
- AdminUtils.addPartitions(zkClient, topic, 2, "0:1:2,0:1:2", false)
+ AdminUtils.addPartitions(zkUtils, topic, 2, "0:1:2,0:1:2", false)
// start topic deletion
- AdminUtils.deleteTopic(zkClient, topic)
+ AdminUtils.deleteTopic(zkUtils, topic)
follower.startup()
// test if topic deletion is resumed
- TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
+ TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
// verify that new partition doesn't exist on any broker either
TestUtils.waitUntilTrue(() =>
servers.forall(_.getLogManager().getLog(newPartition).isEmpty),
@@ -164,11 +165,11 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
val topicAndPartition = TopicAndPartition(topic, 0)
val servers = createTestTopicAndCluster(topic)
// start topic deletion
- AdminUtils.deleteTopic(zkClient, topic)
+ AdminUtils.deleteTopic(zkUtils, topic)
// add partitions to topic
val newPartition = TopicAndPartition(topic, 1)
- AdminUtils.addPartitions(zkClient, topic, 2, "0:1:2,0:1:2")
- TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
+ AdminUtils.addPartitions(zkUtils, topic, 2, "0:1:2,0:1:2")
+ TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
// verify that new partition doesn't exist on any broker either
assertTrue("Replica logs not deleted after delete topic is complete",
servers.forall(_.getLogManager().getLog(newPartition).isEmpty))
@@ -182,12 +183,12 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
val topicAndPartition = TopicAndPartition(topic, 0)
val servers = createTestTopicAndCluster(topic)
// start topic deletion
- AdminUtils.deleteTopic(zkClient, topic)
- TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
+ AdminUtils.deleteTopic(zkUtils, topic)
+ TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
// re-create topic on same replicas
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
+ AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
// wait until leader is elected
- val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)
+ val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0, 1000)
assertTrue("New leader should be elected after re-creating topic test", leaderIdOpt.isDefined)
// check if all replica logs are created
TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicAndPartition).isDefined),
@@ -201,16 +202,16 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
val topic = topicAndPartition.topic
val servers = createTestTopicAndCluster(topic)
// start topic deletion
- AdminUtils.deleteTopic(zkClient, "test2")
+ AdminUtils.deleteTopic(zkUtils, "test2")
// verify delete topic path for test2 is removed from zookeeper
- TestUtils.verifyTopicDeletion(zkClient, "test2", 1, servers)
+ TestUtils.verifyTopicDeletion(zkUtils, "test2", 1, servers)
// verify that topic test is untouched
TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicAndPartition).isDefined),
"Replicas for topic test not created")
// test the topic path exists
- assertTrue("Topic test mistakenly deleted", ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic)))
+ assertTrue("Topic test mistakenly deleted", zkUtils.pathExists(getTopicPath(topic)))
// topic test should have a leader
- val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)
+ val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0, 1000)
assertTrue("Leader should exist for topic test", leaderIdOpt.isDefined)
servers.foreach(_.shutdown())
}
@@ -242,8 +243,8 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
server.logManager.cleaner.awaitCleaned(topicName,0,0)
// delete topic
- AdminUtils.deleteTopic(zkClient, "test")
- TestUtils.verifyTopicDeletion(zkClient, "test", 1, servers)
+ AdminUtils.deleteTopic(zkUtils, "test")
+ TestUtils.verifyTopicDeletion(zkUtils, "test", 1, servers)
servers.foreach(_.shutdown())
}
@@ -256,16 +257,16 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
try {
// start topic deletion
- AdminUtils.deleteTopic(zkClient, topic)
+ AdminUtils.deleteTopic(zkUtils, topic)
// try to delete topic marked as deleted
- AdminUtils.deleteTopic(zkClient, topic)
+ AdminUtils.deleteTopic(zkUtils, topic)
fail("Expected TopicAlreadyMarkedForDeletionException")
}
catch {
case e: TopicAlreadyMarkedForDeletionException => // expected exception
}
- TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
+ TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
servers.foreach(_.shutdown())
}
@@ -283,7 +284,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
// create brokers
val servers = brokerConfigs.map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
// create the topic
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
+ AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
// wait until replica log is created on every broker
TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicAndPartition).isDefined),
"Replicas for topic test not created")
http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
index d4fa0d5..cab4813 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
@@ -23,7 +23,7 @@ import kafka.utils.TestUtils
import kafka.zk.ZooKeeperTestHarness
import kafka.server.ConfigType
import kafka.admin.TopicCommand.TopicCommandOptions
-import kafka.utils.ZkUtils
+import kafka.utils.ZkUtils._
import kafka.coordinator.ConsumerCoordinator
class TopicCommandTest extends ZooKeeperTestHarness with Logging {
@@ -36,25 +36,25 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging {
val cleanupVal = "compact"
// create brokers
val brokers = List(0, 1, 2)
- TestUtils.createBrokersInZk(zkClient, zkConnection, brokers)
+ TestUtils.createBrokersInZk(zkUtils, brokers)
// create the topic
val createOpts = new TopicCommandOptions(Array("--partitions", numPartitionsOriginal.toString,
"--replication-factor", "1",
"--config", cleanupKey + "=" + cleanupVal,
"--topic", topic))
- TopicCommand.createTopic(zkClient, createOpts)
- val props = AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topic, topic)
+ TopicCommand.createTopic(zkUtils, createOpts)
+ val props = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic)
assertTrue("Properties after creation don't contain " + cleanupKey, props.containsKey(cleanupKey))
assertTrue("Properties after creation have incorrect value", props.getProperty(cleanupKey).equals(cleanupVal))
// pre-create the topic config changes path to avoid a NoNodeException
- ZkUtils.createPersistentPath(zkClient, ZkUtils.EntityConfigChangesPath)
+ zkUtils.createPersistentPath(EntityConfigChangesPath)
// modify the topic to add new partitions
val numPartitionsModified = 3
val alterOpts = new TopicCommandOptions(Array("--partitions", numPartitionsModified.toString, "--topic", topic))
- TopicCommand.alterTopic(zkClient, alterOpts)
- val newProps = AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topic, topic)
+ TopicCommand.alterTopic(zkUtils, alterOpts)
+ val newProps = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic)
assertTrue("Updated properties do not contain " + cleanupKey, newProps.containsKey(cleanupKey))
assertTrue("Updated properties have incorrect value", newProps.getProperty(cleanupKey).equals(cleanupVal))
}
@@ -67,34 +67,34 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging {
// create brokers
val brokers = List(0, 1, 2)
- TestUtils.createBrokersInZk(zkClient, zkConnection, brokers)
+ TestUtils.createBrokersInZk(zkUtils, brokers)
// create the NormalTopic
val createOpts = new TopicCommandOptions(Array("--partitions", numPartitionsOriginal.toString,
"--replication-factor", "1",
"--topic", normalTopic))
- TopicCommand.createTopic(zkClient, createOpts)
+ TopicCommand.createTopic(zkUtils, createOpts)
// delete the NormalTopic
val deleteOpts = new TopicCommandOptions(Array("--topic", normalTopic))
- val deletePath = ZkUtils.getDeleteTopicPath(normalTopic)
- assertFalse("Delete path for topic shouldn't exist before deletion.", zkClient.exists(deletePath))
- TopicCommand.deleteTopic(zkClient, deleteOpts)
- assertTrue("Delete path for topic should exist after deletion.", zkClient.exists(deletePath))
+ val deletePath = getDeleteTopicPath(normalTopic)
+ assertFalse("Delete path for topic shouldn't exist before deletion.", zkUtils.zkClient.exists(deletePath))
+ TopicCommand.deleteTopic(zkUtils, deleteOpts)
+ assertTrue("Delete path for topic should exist after deletion.", zkUtils.zkClient.exists(deletePath))
// create the offset topic
val createOffsetTopicOpts = new TopicCommandOptions(Array("--partitions", numPartitionsOriginal.toString,
"--replication-factor", "1",
"--topic", ConsumerCoordinator.OffsetsTopicName))
- TopicCommand.createTopic(zkClient, createOffsetTopicOpts)
+ TopicCommand.createTopic(zkUtils, createOffsetTopicOpts)
// try to delete the OffsetManager.OffsetsTopicName and make sure it doesn't
val deleteOffsetTopicOpts = new TopicCommandOptions(Array("--topic", ConsumerCoordinator.OffsetsTopicName))
- val deleteOffsetTopicPath = ZkUtils.getDeleteTopicPath(ConsumerCoordinator.OffsetsTopicName)
- assertFalse("Delete path for topic shouldn't exist before deletion.", zkClient.exists(deleteOffsetTopicPath))
+ val deleteOffsetTopicPath = getDeleteTopicPath(ConsumerCoordinator.OffsetsTopicName)
+ assertFalse("Delete path for topic shouldn't exist before deletion.", zkUtils.zkClient.exists(deleteOffsetTopicPath))
intercept[AdminOperationException] {
- TopicCommand.deleteTopic(zkClient, deleteOffsetTopicOpts)
+ TopicCommand.deleteTopic(zkUtils, deleteOffsetTopicOpts)
}
- assertFalse("Delete path for topic shouldn't exist after deletion.", zkClient.exists(deleteOffsetTopicPath))
+ assertFalse("Delete path for topic shouldn't exist after deletion.", zkUtils.zkClient.exists(deleteOffsetTopicPath))
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
index d43778e..50496f0 100644
--- a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
+++ b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
@@ -43,10 +43,10 @@ class ZkNodeChangeNotificationListenerTest extends KafkaServerTestHarness {
val notificationMessage2 = "message2"
val changeExpirationMs = 100
- val notificationListener = new ZkNodeChangeNotificationListener(zkClient, seqNodeRoot, seqNodePrefix, notificationHandler, changeExpirationMs)
+ val notificationListener = new ZkNodeChangeNotificationListener(zkUtils, seqNodeRoot, seqNodePrefix, notificationHandler, changeExpirationMs)
notificationListener.init()
- ZkUtils.createSequentialPersistentPath(zkClient, seqNodePath, notificationMessage1)
+ zkUtils.createSequentialPersistentPath(seqNodePath, notificationMessage1)
TestUtils.waitUntilTrue(() => notificationHandler.invocationCount == 1 && notificationHandler.notification == notificationMessage1, "failed to send/process notification message in the timeout period.")
@@ -55,7 +55,7 @@ class ZkNodeChangeNotificationListenerTest extends KafkaServerTestHarness {
Assert.assertEquals(1, ZkUtils.getChildren(zkClient, seqNodeRoot).size) however even after that the assertion can fail as the second node it self can be deleted
depending on how threads get scheduled.*/
- ZkUtils.createSequentialPersistentPath(zkClient, seqNodePath, notificationMessage2)
+ zkUtils.createSequentialPersistentPath(seqNodePath, notificationMessage2)
TestUtils.waitUntilTrue(() => notificationHandler.invocationCount == 2 && notificationHandler.notification == notificationMessage2, "failed to send/process notification message in the timeout period.")
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
index ca63c80..b8ad15b 100755
--- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
@@ -57,7 +57,7 @@ class ConsumerIteratorTest extends KafkaServerTestHarness {
new AtomicLong(0),
new AtomicInteger(0),
""))
- createTopic(zkClient, topic, partitionReplicaAssignment = Map(0 -> Seq(configs.head.brokerId)), servers = servers)
+ createTopic(zkUtils, topic, partitionReplicaAssignment = Map(0 -> Seq(configs.head.brokerId)), servers = servers)
}
@Test
http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
index 6c22e8b..818229c 100644
--- a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
@@ -48,9 +48,9 @@ class PartitionAssignorTest extends Logging {
("g1c" + consumer, WildcardSubscriptionInfo(streamCount, ".*", isWhitelist = true))
}).toSeq:_*)
val scenario = Scenario("g1", topicPartitionCounts, subscriptions)
- val zkClient = PartitionAssignorTest.setupZkClientMock(scenario)
- EasyMock.replay(zkClient)
- PartitionAssignorTest.assignAndVerify(scenario, assignor, zkClient, verifyAssignmentIsUniform = true)
+ val zkUtils = PartitionAssignorTest.setupZkClientMock(scenario)
+ EasyMock.replay(zkUtils.zkClient)
+ PartitionAssignorTest.assignAndVerify(scenario, assignor, zkUtils, verifyAssignmentIsUniform = true)
})
}
@@ -73,10 +73,10 @@ class PartitionAssignorTest extends Logging {
("g1c" + consumer, StaticSubscriptionInfo(streamCounts))
}).toSeq:_*)
val scenario = Scenario("g1", topicPartitionCounts, subscriptions)
- val zkClient = PartitionAssignorTest.setupZkClientMock(scenario)
- EasyMock.replay(zkClient)
+ val zkUtils = PartitionAssignorTest.setupZkClientMock(scenario)
+ EasyMock.replay(zkUtils.zkClient)
- PartitionAssignorTest.assignAndVerify(scenario, assignor, zkClient)
+ PartitionAssignorTest.assignAndVerify(scenario, assignor, zkUtils)
})
}
}
@@ -135,6 +135,7 @@ private object PartitionAssignorTest extends Logging {
val consumers = java.util.Arrays.asList(scenario.subscriptions.keys.toSeq:_*)
val zkClient = EasyMock.createStrictMock(classOf[ZkClient])
+ val zkUtils = ZkUtils(zkClient, false)
EasyMock.checkOrder(zkClient, false)
EasyMock.expect(zkClient.getChildren("/consumers/%s/ids".format(scenario.group))).andReturn(consumers)
@@ -149,21 +150,21 @@ private object PartitionAssignorTest extends Logging {
scenario.topicPartitionCounts.foreach { case(topic, partitionCount) =>
val replicaAssignment = Map((0 until partitionCount).map(partition => (partition.toString, Seq(0))):_*)
EasyMock.expect(zkClient.readData("/brokers/topics/%s".format(topic), new Stat()))
- .andReturn(ZkUtils.replicaAssignmentZkData(replicaAssignment))
+ .andReturn(zkUtils.replicaAssignmentZkData(replicaAssignment))
EasyMock.expectLastCall().anyTimes()
}
- EasyMock.expect(zkClient.getChildren("/brokers/topics")).andReturn(
+ EasyMock.expect(zkUtils.zkClient.getChildren("/brokers/topics")).andReturn(
java.util.Arrays.asList(scenario.topicPartitionCounts.keys.toSeq:_*))
EasyMock.expectLastCall().anyTimes()
- zkClient
+ zkUtils
}
- private def assignAndVerify(scenario: Scenario, assignor: PartitionAssignor, zkClient: ZkClient,
+ private def assignAndVerify(scenario: Scenario, assignor: PartitionAssignor, zkUtils: ZkUtils,
verifyAssignmentIsUniform: Boolean = false) {
val assignments = scenario.subscriptions.map{ case(consumer, subscription) =>
- val ctx = new AssignmentContext("g1", consumer, excludeInternalTopics = true, zkClient)
+ val ctx = new AssignmentContext("g1", consumer, excludeInternalTopics = true, zkUtils)
assignor.assign(ctx).get(consumer)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index cb59542..28b1dd5 100644
--- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -97,8 +97,8 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
sendMessages(servers, topic, nMessages, 1)
// wait to make sure the topic and partition have a leader for the successful case
- waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
- waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
+ waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0)
+ waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 1)
TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1)
@@ -130,8 +130,8 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
val sentMessages2 = sendMessages(servers, topic, nMessages, 0) ++
sendMessages(servers, topic, nMessages, 1)
- waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
- waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
+ waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0)
+ waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 1)
val receivedMessages2 = getMessages(topicMessageStreams1, nMessages) ++ getMessages(topicMessageStreams2, nMessages)
assertEquals(sentMessages2.sorted, receivedMessages2.sorted)
@@ -151,8 +151,8 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
val sentMessages3 = sendMessages(servers, topic, nMessages, 0) ++
sendMessages(servers, topic, nMessages, 1)
- waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
- waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
+ waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0)
+ waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 1)
val receivedMessages3 = getMessages(topicMessageStreams1, nMessages) ++ getMessages(topicMessageStreams2, nMessages)
assertEquals(sentMessages3.sorted, receivedMessages3.sorted)
@@ -185,8 +185,8 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
val sentMessages1 = sendMessages(servers, topic, nMessages, 0, GZIPCompressionCodec) ++
sendMessages(servers, topic, nMessages, 1, GZIPCompressionCodec)
- waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
- waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
+ waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0)
+ waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 1)
TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1)
@@ -218,8 +218,8 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
val sentMessages2 = sendMessages(servers, topic, nMessages, 0, GZIPCompressionCodec) ++
sendMessages(servers, topic, nMessages, 1, GZIPCompressionCodec)
- waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
- waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
+ waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0)
+ waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 1)
val receivedMessages2 = getMessages(topicMessageStreams1, nMessages) ++ getMessages(topicMessageStreams2, nMessages)
assertEquals(sentMessages2.sorted, receivedMessages2.sorted)
@@ -239,8 +239,8 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
val sentMessages3 = sendMessages(servers, topic, nMessages, 0, GZIPCompressionCodec) ++
sendMessages(servers, topic, nMessages, 1, GZIPCompressionCodec)
- waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
- waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
+ waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0)
+ waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 1)
val receivedMessages3 = getMessages(topicMessageStreams1, nMessages) ++ getMessages(topicMessageStreams2, nMessages)
assertEquals(sentMessages3.sorted, receivedMessages3.sorted)
@@ -294,8 +294,8 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
val consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1))
- waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
- waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
+ waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0)
+ waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 1)
val zkConsumerConnector =
new ZookeeperConsumerConnector(consumerConfig, true)
@@ -322,10 +322,10 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
@Test
def testLeaderSelectionForPartition() {
- val zkClient = ZkUtils.createZkClient(zkConnect, 6000, 30000)
+ val zkUtils = ZkUtils(zkConnect, 6000, 30000, false)
// create topic topic1 with 1 partition on broker 0
- createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = servers)
+ createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 1, servers = servers)
// send some messages to each broker
val sentMessages1 = sendMessages(servers, topic, nMessages)
@@ -349,7 +349,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
val receivedMessages1 = getMessages(topicMessageStreams1, nMessages)
assertEquals(sentMessages1, receivedMessages1)
zkConsumerConnector1.shutdown()
- zkClient.close()
+ zkUtils.close()
}
@Test
@@ -416,14 +416,14 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
}
def getZKChildrenValues(path : String) : Seq[Tuple2[String,String]] = {
- val children = zkClient.getChildren(path)
+ val children = zkUtils.zkClient.getChildren(path)
Collections.sort(children)
val childrenAsSeq : Seq[java.lang.String] = {
import scala.collection.JavaConversions._
children.toSeq
}
childrenAsSeq.map(partition =>
- (partition, zkClient.readData(path + "/" + partition).asInstanceOf[String]))
+ (partition, zkUtils.zkClient.readData(path + "/" + partition).asInstanceOf[String]))
}
private class TestConsumerRebalanceListener extends ConsumerRebalanceListener {
http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
index c93eca5..91ac1f6 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
@@ -75,7 +75,7 @@ class ControllerFailoverTest extends KafkaServerTestHarness with Logging {
}
}
// Create topic with one partition
- kafka.admin.AdminUtils.createTopic(controller.zkClient, topic, 1, 1)
+ kafka.admin.AdminUtils.createTopic(controller.zkUtils, topic, 1, 1)
val topicPartition = TopicAndPartition("topic1", 0)
var partitions = controller.kafkaController.partitionStateMachine.partitionsInState(OnlinePartition)
while (!partitions.contains(topicPartition)) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala
index d8a7948..3bc37e5 100644
--- a/core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala
@@ -18,7 +18,8 @@
package kafka.coordinator
import kafka.server.KafkaConfig
-import kafka.utils.{ZkUtils, TestUtils}
+import kafka.utils.{TestUtils, ZkUtils}
+import kafka.utils.ZkUtils._
import org.junit.Assert._
import org.I0Itec.zkclient.{IZkDataListener, ZkClient}
@@ -33,14 +34,15 @@ import org.scalatest.junit.JUnitSuite
class CoordinatorMetadataTest extends JUnitSuite {
val DefaultNumPartitions = 8
val DefaultNumReplicas = 2
- var zkClient: ZkClient = null
+ var zkUtils: ZkUtils = null
var coordinatorMetadata: CoordinatorMetadata = null
@Before
def setUp() {
val props = TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "")
- zkClient = EasyMock.createStrictMock(classOf[ZkClient])
- coordinatorMetadata = new CoordinatorMetadata(KafkaConfig.fromProps(props).brokerId, zkClient, null)
+ val zkClient = EasyMock.createStrictMock(classOf[ZkClient])
+ zkUtils = ZkUtils(zkClient, false)
+ coordinatorMetadata = new CoordinatorMetadata(KafkaConfig.fromProps(props).brokerId, zkUtils, null)
}
@Test
@@ -77,8 +79,8 @@ class CoordinatorMetadataTest extends JUnitSuite {
val topics = Set("a")
coordinatorMetadata.addGroup(groupId, "range")
- expectZkClientSubscribeDataChanges(zkClient, topics)
- EasyMock.replay(zkClient)
+ expectZkClientSubscribeDataChanges(zkUtils, topics)
+ EasyMock.replay(zkUtils.zkClient)
coordinatorMetadata.bindGroupToTopics(groupId, topics)
assertEquals(Map("a" -> DefaultNumPartitions), coordinatorMetadata.partitionsPerTopic)
}
@@ -91,8 +93,8 @@ class CoordinatorMetadataTest extends JUnitSuite {
coordinatorMetadata.addGroup(group1, "range")
coordinatorMetadata.addGroup(group2, "range")
- expectZkClientSubscribeDataChanges(zkClient, topics)
- EasyMock.replay(zkClient)
+ expectZkClientSubscribeDataChanges(zkUtils, topics)
+ EasyMock.replay(zkUtils.zkClient)
coordinatorMetadata.bindGroupToTopics(group1, topics)
coordinatorMetadata.bindGroupToTopics(group2, topics)
assertEquals(Map("a" -> DefaultNumPartitions), coordinatorMetadata.partitionsPerTopic)
@@ -111,8 +113,8 @@ class CoordinatorMetadataTest extends JUnitSuite {
val topics = Set("a")
coordinatorMetadata.addGroup(groupId, "range")
- expectZkClientSubscribeDataChanges(zkClient, topics)
- EasyMock.replay(zkClient)
+ expectZkClientSubscribeDataChanges(zkUtils, topics)
+ EasyMock.replay(zkUtils.zkClient)
coordinatorMetadata.bindGroupToTopics(groupId, topics)
coordinatorMetadata.unbindGroupFromTopics(groupId, Set("b"))
assertEquals(Map("a" -> DefaultNumPartitions), coordinatorMetadata.partitionsPerTopic)
@@ -126,8 +128,8 @@ class CoordinatorMetadataTest extends JUnitSuite {
coordinatorMetadata.addGroup(group1, "range")
coordinatorMetadata.addGroup(group2, "range")
- expectZkClientSubscribeDataChanges(zkClient, topics)
- EasyMock.replay(zkClient)
+ expectZkClientSubscribeDataChanges(zkUtils, topics)
+ EasyMock.replay(zkUtils.zkClient)
coordinatorMetadata.bindGroupToTopics(group1, topics)
coordinatorMetadata.bindGroupToTopics(group2, topics)
coordinatorMetadata.unbindGroupFromTopics(group1, topics)
@@ -140,9 +142,9 @@ class CoordinatorMetadataTest extends JUnitSuite {
val topics = Set("a")
coordinatorMetadata.addGroup(groupId, "range")
- expectZkClientSubscribeDataChanges(zkClient, topics)
- expectZkClientUnsubscribeDataChanges(zkClient, topics)
- EasyMock.replay(zkClient)
+ expectZkClientSubscribeDataChanges(zkUtils, topics)
+ expectZkClientUnsubscribeDataChanges(zkUtils.zkClient, topics)
+ EasyMock.replay(zkUtils.zkClient)
coordinatorMetadata.bindGroupToTopics(groupId, topics)
coordinatorMetadata.unbindGroupFromTopics(groupId, topics)
assertEquals(Map.empty[String, Int], coordinatorMetadata.partitionsPerTopic)
@@ -163,8 +165,8 @@ class CoordinatorMetadataTest extends JUnitSuite {
coordinatorMetadata.addGroup(group1, "range")
coordinatorMetadata.addGroup(group2, "range")
- expectZkClientSubscribeDataChanges(zkClient, topics)
- EasyMock.replay(zkClient)
+ expectZkClientSubscribeDataChanges(zkUtils, topics)
+ EasyMock.replay(zkUtils.zkClient)
coordinatorMetadata.bindGroupToTopics(group1, topics)
coordinatorMetadata.bindGroupToTopics(group2, topics)
coordinatorMetadata.removeGroup(group1, topics)
@@ -179,17 +181,17 @@ class CoordinatorMetadataTest extends JUnitSuite {
val topics = Set("a")
coordinatorMetadata.addGroup(groupId, "range")
- expectZkClientSubscribeDataChanges(zkClient, topics)
- expectZkClientUnsubscribeDataChanges(zkClient, topics)
- EasyMock.replay(zkClient)
+ expectZkClientSubscribeDataChanges(zkUtils, topics)
+ expectZkClientUnsubscribeDataChanges(zkUtils.zkClient, topics)
+ EasyMock.replay(zkUtils.zkClient)
coordinatorMetadata.bindGroupToTopics(groupId, topics)
coordinatorMetadata.removeGroup(groupId, topics)
assertNull(coordinatorMetadata.getGroup(groupId))
assertEquals(Map.empty[String, Int], coordinatorMetadata.partitionsPerTopic)
}
- private def expectZkClientSubscribeDataChanges(zkClient: ZkClient, topics: Set[String]) {
- topics.foreach(topic => expectZkClientSubscribeDataChange(zkClient, topic))
+ private def expectZkClientSubscribeDataChanges(zkUtils: ZkUtils, topics: Set[String]) {
+ topics.foreach(topic => expectZkClientSubscribeDataChange(zkUtils.zkClient, topic))
}
private def expectZkClientUnsubscribeDataChanges(zkClient: ZkClient, topics: Set[String]) {
@@ -200,14 +202,14 @@ class CoordinatorMetadataTest extends JUnitSuite {
val replicaAssignment =
(0 until DefaultNumPartitions)
.map(partition => partition.toString -> (0 until DefaultNumReplicas).toSeq).toMap
- val topicPath = ZkUtils.getTopicPath(topic)
+ val topicPath = getTopicPath(topic)
EasyMock.expect(zkClient.readData(topicPath, new Stat()))
- .andReturn(ZkUtils.replicaAssignmentZkData(replicaAssignment))
+ .andReturn(zkUtils.replicaAssignmentZkData(replicaAssignment))
zkClient.subscribeDataChanges(EasyMock.eq(topicPath), EasyMock.isA(classOf[IZkDataListener]))
}
private def expectZkClientUnsubscribeDataChange(zkClient: ZkClient, topic: String) {
- val topicPath = ZkUtils.getTopicPath(topic)
+ val topicPath = getTopicPath(topic)
zkClient.unsubscribeDataChanges(EasyMock.eq(topicPath), EasyMock.isA(classOf[IZkDataListener]))
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
index 818673f..a71ddf1 100644
--- a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
@@ -76,7 +76,7 @@ class AutoOffsetResetTest extends KafkaServerTestHarness with Logging {
* Returns the count of messages received.
*/
def resetAndConsume(numMessages: Int, resetTo: String, offset: Long): Int = {
- TestUtils.createTopic(zkClient, topic, 1, 1, servers)
+ TestUtils.createTopic(zkUtils, topic, 1, 1, servers)
val producer: Producer[String, Array[Byte]] = TestUtils.createProducer(
TestUtils.getBrokerListStrFromServers(servers),
http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
index 20a4068..3cf4dae 100644
--- a/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
@@ -62,7 +62,7 @@ abstract class BaseTopicMetadataTest extends ZooKeeperTestHarness {
def testTopicMetadataRequest {
// create topic
val topic = "test"
- AdminUtils.createTopic(zkClient, topic, 1, 1)
+ AdminUtils.createTopic(zkUtils, topic, 1, 1)
// create a topic metadata request
val topicMetadataRequest = new TopicMetadataRequest(List(topic), 0)
@@ -79,7 +79,7 @@ abstract class BaseTopicMetadataTest extends ZooKeeperTestHarness {
def testBasicTopicMetadata {
// create topic
val topic = "test"
- createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server1))
+ createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server1))
val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), brokerEndPoints, "TopicMetadataTest-testBasicTopicMetadata",
2000,0).topicsMetadata
@@ -98,8 +98,8 @@ abstract class BaseTopicMetadataTest extends ZooKeeperTestHarness {
// create topic
val topic1 = "testGetAllTopicMetadata1"
val topic2 = "testGetAllTopicMetadata2"
- createTopic(zkClient, topic1, numPartitions = 1, replicationFactor = 1, servers = Seq(server1))
- createTopic(zkClient, topic2, numPartitions = 1, replicationFactor = 1, servers = Seq(server1))
+ createTopic(zkUtils, topic1, numPartitions = 1, replicationFactor = 1, servers = Seq(server1))
+ createTopic(zkUtils, topic2, numPartitions = 1, replicationFactor = 1, servers = Seq(server1))
// issue metadata request with empty list of topics
val topicsMetadata = ClientUtils.fetchTopicMetadata(Set.empty, brokerEndPoints, "TopicMetadataTest-testGetAllTopicMetadata",
@@ -130,7 +130,7 @@ abstract class BaseTopicMetadataTest extends ZooKeeperTestHarness {
assertEquals(0, topicsMetadata.head.partitionsMetadata.size)
// wait for leader to be elected
- TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
+ TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0)
TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic, 0)
// retry the metadata for the auto created topic
@@ -159,7 +159,7 @@ abstract class BaseTopicMetadataTest extends ZooKeeperTestHarness {
assertEquals("Expecting InvalidTopicCode for topic2 metadata", ErrorMapping.InvalidTopicCode, topicsMetadata(1).errorCode)
// wait for leader to be elected
- TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 0)
+ TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic1, 0)
TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic1, 0)
// retry the metadata for the first auto created topic
@@ -218,7 +218,7 @@ abstract class BaseTopicMetadataTest extends ZooKeeperTestHarness {
// create topic
val topic: String = "test"
- AdminUtils.createTopic(zkClient, topic, 1, numBrokers)
+ AdminUtils.createTopic(zkUtils, topic, 1, numBrokers)
// shutdown a broker
adHocServers.last.shutdown()
http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
index c061597..5af5d1a 100644
--- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
@@ -42,11 +42,11 @@ class FetcherTest extends KafkaServerTestHarness {
@Before
override def setUp() {
super.setUp
- TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment = Map(0 -> Seq(configs.head.brokerId)), servers = servers)
+ TestUtils.createTopic(zkUtils, topic, partitionReplicaAssignment = Map(0 -> Seq(configs.head.brokerId)), servers = servers)
val cluster = new Cluster(servers.map(s => new Broker(s.config.brokerId, "localhost", s.boundPort())))
- fetcher = new ConsumerFetcherManager("consumer1", new ConsumerConfig(TestUtils.createConsumerProperties("", "", "")), zkClient)
+ fetcher = new ConsumerFetcherManager("consumer1", new ConsumerConfig(TestUtils.createConsumerProperties("", "", "")), zkUtils)
fetcher.stopConnections()
val topicInfos = configs.map(c =>
new PartitionTopicInfo(topic,
http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
index e6f0c54..df752db 100755
--- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
@@ -113,7 +113,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness with ZooKeeperTestHar
private def produceAndMultiFetch(producer: Producer[String, String]) {
for(topic <- List("test1", "test2", "test3", "test4"))
- TestUtils.createTopic(zkClient, topic, servers = servers)
+ TestUtils.createTopic(zkUtils, topic, servers = servers)
// send some messages
val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
@@ -182,7 +182,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness with ZooKeeperTestHar
private def multiProduce(producer: Producer[String, String]) {
val topics = Map("test4" -> 0, "test1" -> 0, "test2" -> 0, "test3" -> 0)
- topics.keys.map(topic => TestUtils.createTopic(zkClient, topic, servers = servers))
+ topics.keys.map(topic => TestUtils.createTopic(zkUtils, topic, servers = servers))
val messages = new mutable.HashMap[String, Seq[String]]
val builder = new FetchRequestBuilder()
@@ -210,7 +210,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness with ZooKeeperTestHar
@Test
def testConsumerEmptyTopic() {
val newTopic = "new-topic"
- TestUtils.createTopic(zkClient, newTopic, numPartitions = 1, replicationFactor = 1, servers = servers)
+ TestUtils.createTopic(zkUtils, newTopic, numPartitions = 1, replicationFactor = 1, servers = servers)
val fetchResponse = consumer.fetch(new FetchRequestBuilder().addFetch(newTopic, 0, 0, 10000).build())
assertFalse(fetchResponse.messageSet(newTopic, 0).iterator.hasNext)
@@ -219,7 +219,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness with ZooKeeperTestHar
@Test
def testPipelinedProduceRequests() {
val topics = Map("test4" -> 0, "test1" -> 0, "test2" -> 0, "test3" -> 0)
- topics.keys.map(topic => TestUtils.createTopic(zkClient, topic, servers = servers))
+ topics.keys.map(topic => TestUtils.createTopic(zkUtils, topic, servers = servers))
val props = new Properties()
props.put("request.required.acks", "0")
val pipelinedProducer: Producer[String, String] =
http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
index 4d73be1..b931568 100755
--- a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
@@ -56,10 +56,10 @@ class RollingBounceTest extends ZooKeeperTestHarness {
val topic4 = "new-topic4"
// create topics with 1 partition, 2 replicas, one on each broker
- createTopic(zkClient, topic1, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers)
- createTopic(zkClient, topic2, partitionReplicaAssignment = Map(0->Seq(1,2)), servers = servers)
- createTopic(zkClient, topic3, partitionReplicaAssignment = Map(0->Seq(2,3)), servers = servers)
- createTopic(zkClient, topic4, partitionReplicaAssignment = Map(0->Seq(0,3)), servers = servers)
+ createTopic(zkUtils, topic1, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers)
+ createTopic(zkUtils, topic2, partitionReplicaAssignment = Map(0->Seq(1,2)), servers = servers)
+ createTopic(zkUtils, topic3, partitionReplicaAssignment = Map(0->Seq(2,3)), servers = servers)
+ createTopic(zkUtils, topic4, partitionReplicaAssignment = Map(0->Seq(0,3)), servers = servers)
// Do a rolling bounce and check if leader transitions happen correctly
@@ -86,7 +86,7 @@ class RollingBounceTest extends ZooKeeperTestHarness {
servers((startIndex + 1) % 4).shutdown()
prevLeader = (startIndex + 1) % 4
}
- var newleader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId)
+ var newleader = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId)
// Ensure the new leader is different from the old
assertTrue("Leader transition did not happen for " + topic, newleader.getOrElse(-1) != -1 && (newleader.getOrElse(-1) != prevLeader))
// Start the server back up again