You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/02/15 16:41:48 UTC
[kafka] branch trunk updated: MINOR: Update test classes to use
KafkaZkClient methods (#4367)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d9d0d79 MINOR: Update test classes to use KafkaZkClient methods (#4367)
d9d0d79 is described below
commit d9d0d79287eeec0a1c3dcc2203288421284b5ca1
Author: Manikumar Reddy O <ma...@gmail.com>
AuthorDate: Thu Feb 15 22:11:38 2018 +0530
MINOR: Update test classes to use KafkaZkClient methods (#4367)
Remove ZkUtils reference form ZooKeeperTestHarness plus some minor cleanups.
---
core/src/main/scala/kafka/utils/ZkUtils.scala | 2 +-
core/src/main/scala/kafka/zk/KafkaZkClient.scala | 22 +++++-
.../kafka/api/AuthorizerIntegrationTest.scala | 12 +--
.../integration/kafka/api/ConsumerBounceTest.scala | 2 +-
.../kafka/api/ProducerFailureHandlingTest.scala | 16 ++--
.../kafka/api/SaslPlainPlaintextConsumerTest.scala | 5 +-
.../SaslPlainSslEndToEndAuthorizationTest.scala | 5 +-
.../SaslScramSslEndToEndAuthorizationTest.scala | 4 +-
.../test/scala/unit/kafka/admin/AdminTest.scala | 34 +++++---
.../unit/kafka/admin/DeleteConsumerGroupTest.scala | 18 ++++-
.../scala/unit/kafka/admin/TopicCommandTest.scala | 8 +-
.../consumer/ZookeeperConsumerConnectorTest.scala | 7 +-
.../controller/ControllerIntegrationTest.scala | 92 +++++++++++-----------
.../scala/unit/kafka/integration/FetcherTest.scala | 12 ++-
.../kafka/security/auth/ZkAuthorizationTest.scala | 11 ++-
.../unit/kafka/server/LeaderElectionTest.scala | 8 +-
.../unit/kafka/server/LogDirFailureTest.scala | 4 +-
.../kafka/server/ServerGenerateBrokerIdTest.scala | 8 --
.../kafka/server/ServerGenerateClusterIdTest.scala | 6 +-
.../unit/kafka/server/ServerStartupTest.scala | 6 +-
.../unit/kafka/utils/ReplicationUtilsTest.scala | 15 ++--
.../test/scala/unit/kafka/utils/TestUtils.scala | 15 ++--
.../test/scala/unit/kafka/utils/ZkUtilsTest.scala | 24 +++++-
.../test/scala/unit/kafka/zk/ZKEphemeralTest.scala | 13 +--
.../scala/unit/kafka/zk/ZooKeeperTestHarness.scala | 6 +-
25 files changed, 216 insertions(+), 139 deletions(-)
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index e04bce0..d5fde4d 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -592,7 +592,7 @@ class ZkUtils(val zkClient: ZkClient,
}
/**
- * Update the value of a persistent node with the given path and data.
+ * Update the value of a ephemeral node with the given path and data.
* create parent directory if necessary. Never throw NodeExistException.
*/
def updateEphemeralPath(path: String, data: String, acls: java.util.List[ACL] = UseDefaultAcls): Unit = {
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index b545455..afc8202 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -761,13 +761,31 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
/**
* Gets the leader for a given partition
- * @param partition
+ * @param partition The partition for which we want to get leader.
* @return optional integer if the leader exists and None otherwise.
*/
def getLeaderForPartition(partition: TopicPartition): Option[Int] =
getTopicPartitionState(partition).map(_.leaderAndIsr.leader)
/**
+ * Gets the in-sync replicas (ISR) for a specific topicPartition
+ * @param partition The partition for which we want to get ISR.
+ * @return optional ISR if exists and None otherwise
+ */
+ def getInSyncReplicasForPartition(partition: TopicPartition): Option[Seq[Int]] =
+ getTopicPartitionState(partition).map(_.leaderAndIsr.isr)
+
+
+ /**
+ * Gets the leader epoch for a specific topicPartition
+ * @param partition The partition for which we want to get the leader epoch
+ * @return optional integer if the leader exists and None otherwise
+ */
+ def getEpochForPartition(partition: TopicPartition): Option[Int] = {
+ getTopicPartitionState(partition).map(_.leaderAndIsr.leaderEpoch)
+ }
+
+ /**
* Gets the isr change notifications as strings. These strings are the znode names and not the absolute znode path.
* @return sequence of znode names and not the absolute znode path.
*/
@@ -1356,7 +1374,7 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
}
}
- private[zk] def pathExists(path: String): Boolean = {
+ def pathExists(path: String): Boolean = {
val existsRequest = ExistsRequest(path)
val existsResponse = retryRequestUntilConnected(existsRequest)
existsResponse.resultCode match {
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 248d219..ab7ca64 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -245,14 +245,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
consumers += TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = group, securityProtocol = SecurityProtocol.PLAINTEXT)
// create the consumer offset topic
- TestUtils.createTopic(zkClient, GROUP_METADATA_TOPIC_NAME,
- 1,
- 1,
- servers,
- servers.head.groupCoordinator.offsetsTopicConfigs)
+ createTopic(GROUP_METADATA_TOPIC_NAME, topicConfig = servers.head.groupCoordinator.offsetsTopicConfigs)
// create the test topic with all the brokers as replicas
- TestUtils.createTopic(zkClient, topic, 1, 1, this.servers)
- TestUtils.createTopic(zkClient, deleteTopic, 1, 1, this.servers)
+ createTopic(topic)
+ createTopic(deleteTopic)
}
@After
@@ -711,7 +707,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
// create an unmatched topic
val unmatchedTopic = "unmatched"
- TestUtils.createTopic(zkClient, unmatchedTopic, 1, 1, this.servers)
+ createTopic(unmatchedTopic)
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), new Resource(Topic, unmatchedTopic))
sendRecords(1, new TopicPartition(unmatchedTopic, part))
removeAllAcls()
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 58d1be9..53b3ed6 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -173,7 +173,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
val consumer = this.consumers.head
consumer.subscribe(Collections.singleton(newtopic))
executor.schedule(new Runnable {
- def run() = TestUtils.createTopic(zkClient, newtopic, serverCount, serverCount, servers)
+ def run() = createTopic(newtopic, numPartitions = serverCount, replicationFactor = serverCount)
}, 2, TimeUnit.SECONDS)
consumer.poll(0)
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index b9fb657..8ca5163 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -88,7 +88,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
@Test
def testTooLargeRecordWithAckZero() {
// create topic
- TestUtils.createTopic(zkClient, topic1, 1, numServers, servers)
+ createTopic(topic1, replicationFactor = numServers)
// send a too-large record
val record = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](serverMessageMaxBytes + 1))
@@ -105,7 +105,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
@Test
def testTooLargeRecordWithAckOne() {
// create topic
- TestUtils.createTopic(zkClient, topic1, 1, numServers, servers)
+ createTopic(topic1, replicationFactor = numServers)
// send a too-large record
val record = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](serverMessageMaxBytes + 1))
@@ -122,7 +122,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
// create topic
val topic10 = "topic10"
- TestUtils.createTopic(zkClient, topic10, servers.size, numServers, servers, topicConfig)
+ createTopic(topic10, numPartitions = servers.size, replicationFactor = numServers, topicConfig)
// send a record that is too large for replication, but within the broker max message limit
val value = new Array[Byte](maxMessageSize - DefaultRecordBatch.RECORD_BATCH_OVERHEAD - DefaultRecord.MAX_RECORD_OVERHEAD)
@@ -169,7 +169,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
@Test
def testWrongBrokerList() {
// create topic
- TestUtils.createTopic(zkClient, topic1, 1, numServers, servers)
+ createTopic(topic1, replicationFactor = numServers)
// producer with incorrect broker list
producer4 = TestUtils.createNewProducer("localhost:8686,localhost:4242", acks = 1, maxBlockMs = 10000L, bufferSize = producerBufferSize)
@@ -188,7 +188,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
@Test
def testInvalidPartition() {
// create topic with a single partition
- TestUtils.createTopic(zkClient, topic1, 1, numServers, servers)
+ createTopic(topic1, numPartitions = 1, replicationFactor = numServers)
// create a record with incorrect partition id (higher than the number of partitions), send should fail
val higherRecord = new ProducerRecord(topic1, 1, "key".getBytes, "value".getBytes)
@@ -203,7 +203,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
@Test
def testSendAfterClosed() {
// create topic
- TestUtils.createTopic(zkClient, topic1, 1, numServers, servers)
+ createTopic(topic1, replicationFactor = numServers)
val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes, "value".getBytes)
@@ -241,7 +241,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
val topicProps = new Properties()
topicProps.put("min.insync.replicas",(numServers+1).toString)
- TestUtils.createTopic(zkClient, topicName, 1, numServers, servers, topicProps)
+ createTopic(topicName, replicationFactor = numServers, topicConfig = topicProps)
val record = new ProducerRecord(topicName, null, "key".getBytes, "value".getBytes)
try {
@@ -261,7 +261,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
val topicProps = new Properties()
topicProps.put("min.insync.replicas", numServers.toString)
- TestUtils.createTopic(zkClient, topicName, 1, numServers, servers,topicProps)
+ createTopic(topicName, replicationFactor = numServers, topicConfig = topicProps)
val record = new ProducerRecord(topicName, null, "key".getBytes, "value".getBytes)
// this should work with all brokers up and running
diff --git a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
index 99ddcc3..5789d1a 100644
--- a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
@@ -16,8 +16,9 @@ import java.io.File
import java.util.Locale
import kafka.server.KafkaConfig
-import kafka.utils.{JaasTestUtils, TestUtils}
+import kafka.utils.{CoreUtils, JaasTestUtils, TestUtils, ZkUtils}
import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.junit.{After, Before, Test}
@@ -51,6 +52,8 @@ class SaslPlainPlaintextConsumerTest extends BaseConsumerTest with SaslSetup {
*/
@Test
def testZkAclsDisabled() {
+ val zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled))
TestUtils.verifyUnsecureZkAcls(zkUtils)
+ CoreUtils.swallow(zkUtils.close(), this)
}
}
diff --git a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
index c28de3e..08351aa 100644
--- a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
@@ -16,8 +16,9 @@
*/
package kafka.api
-import kafka.utils.{JaasTestUtils, TestUtils}
+import kafka.utils.{CoreUtils, JaasTestUtils, TestUtils, ZkUtils}
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
+import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal, KafkaPrincipalBuilder, SaslAuthenticationContext}
import org.junit.Test
@@ -56,6 +57,8 @@ class SaslPlainSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTes
*/
@Test
def testAcls() {
+ val zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled))
TestUtils.verifySecureZkAcls(zkUtils, 1)
+ CoreUtils.swallow(zkUtils.close(), this)
}
}
diff --git a/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
index a4f3e0b..f07f4b4 100644
--- a/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
@@ -19,6 +19,8 @@ package kafka.api
import org.apache.kafka.common.security.scram.ScramMechanism
import kafka.utils.JaasTestUtils
import kafka.utils.ZkUtils
+import kafka.zk.ConfigEntityChangeNotificationZNode
+
import scala.collection.JavaConverters._
import org.junit.Before
@@ -32,7 +34,7 @@ class SaslScramSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTes
override def configureSecurityBeforeServersStart() {
super.configureSecurityBeforeServersStart()
- zkClient.makeSurePersistentPathExists(ZkUtils.ConfigChangesPath)
+ zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
// Create broker credentials before starting brokers
createScramCredentials(zkConnect, kafkaPrincipal, kafkaPassword)
}
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 3b5f888..e0e26a8 100755
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -22,12 +22,12 @@ import org.apache.kafka.common.errors.{InvalidReplicaAssignmentException, Invali
import org.apache.kafka.common.metrics.Quota
import org.easymock.EasyMock
import org.junit.Assert._
-import org.junit.{After, Test}
+import org.junit.{After, Before, Test}
import java.util.Properties
import kafka.utils._
import kafka.log._
-import kafka.zk.ZooKeeperTestHarness
+import kafka.zk.{ConfigEntityZNode, PreferredReplicaElectionZNode, ZooKeeperTestHarness}
import kafka.utils.{Logging, TestUtils, ZkUtils}
import kafka.server.{ConfigType, KafkaConfig, KafkaServer}
import java.io.File
@@ -39,6 +39,7 @@ import kafka.utils.TestUtils._
import scala.collection.{Map, Set, immutable}
import kafka.utils.CoreUtils._
import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.security.JaasUtils
import scala.collection.JavaConverters._
import scala.util.Try
@@ -46,9 +47,18 @@ import scala.util.Try
class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
var servers: Seq[KafkaServer] = Seq()
+ var zkUtils: ZkUtils = null
+
+ @Before
+ override def setUp() {
+ super.setUp()
+ zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled))
+ }
@After
override def tearDown() {
+ if (zkUtils != null)
+ CoreUtils.swallow(zkUtils.close(), this)
TestUtils.shutdownServers(servers)
super.tearDown()
}
@@ -212,9 +222,9 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
"Partition reassignment should complete")
val assignedReplicas = zkClient.getReplicasForPartition(new TopicPartition(topic, partitionToBeReassigned))
// in sync replicas should not have any replica that is not in the new assigned replicas
- checkForPhantomInSyncReplicas(zkUtils, topic, partitionToBeReassigned, assignedReplicas)
+ checkForPhantomInSyncReplicas(zkClient, topic, partitionToBeReassigned, assignedReplicas)
assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas)
- ensureNoUnderReplicatedPartitions(zkUtils, topic, partitionToBeReassigned, assignedReplicas, servers)
+ ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas, servers)
TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet,
"New replicas should exist on brokers")
}
@@ -242,8 +252,8 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
"Partition reassignment should complete")
val assignedReplicas = zkClient.getReplicasForPartition(new TopicPartition(topic, partitionToBeReassigned))
assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas)
- checkForPhantomInSyncReplicas(zkUtils, topic, partitionToBeReassigned, assignedReplicas)
- ensureNoUnderReplicatedPartitions(zkUtils, topic, partitionToBeReassigned, assignedReplicas, servers)
+ checkForPhantomInSyncReplicas(zkClient, topic, partitionToBeReassigned, assignedReplicas)
+ ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas, servers)
TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet,
"New replicas should exist on brokers")
}
@@ -271,8 +281,8 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
"Partition reassignment should complete")
val assignedReplicas = zkClient.getReplicasForPartition(new TopicPartition(topic, partitionToBeReassigned))
assertEquals("Partition should have been reassigned to 2, 3", newReplicas, assignedReplicas)
- checkForPhantomInSyncReplicas(zkUtils, topic, partitionToBeReassigned, assignedReplicas)
- ensureNoUnderReplicatedPartitions(zkUtils, topic, partitionToBeReassigned, assignedReplicas, servers)
+ checkForPhantomInSyncReplicas(zkClient, topic, partitionToBeReassigned, assignedReplicas)
+ ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas, servers)
TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet,
"New replicas should exist on brokers")
}
@@ -313,9 +323,9 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
"Partition reassignment should complete")
val assignedReplicas = zkClient.getReplicasForPartition(new TopicPartition(topic, partitionToBeReassigned))
assertEquals("Partition should have been reassigned to 0, 1", newReplicas, assignedReplicas)
- checkForPhantomInSyncReplicas(zkUtils, topic, partitionToBeReassigned, assignedReplicas)
+ checkForPhantomInSyncReplicas(zkClient, topic, partitionToBeReassigned, assignedReplicas)
// ensure that there are no under replicated partitions
- ensureNoUnderReplicatedPartitions(zkUtils, topic, partitionToBeReassigned, assignedReplicas, servers)
+ ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas, servers)
TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet,
"New replicas should exist on brokers")
}
@@ -326,7 +336,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
val partitionsForPreferredReplicaElection = Set(new TopicPartition("test", 1), new TopicPartition("test2", 1))
PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkClient, partitionsForPreferredReplicaElection)
// try to read it back and compare with what was written
- val preferredReplicaElectionZkData = zkUtils.readData(ZkUtils.PreferredReplicaLeaderElectionPath)._1
+ val preferredReplicaElectionZkData = zkUtils.readData(PreferredReplicaElectionZNode.path)._1
val partitionsUndergoingPreferredReplicaElection =
PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(preferredReplicaElectionZkData)
assertEquals("Preferred replica election ser-de failed", partitionsForPreferredReplicaElection,
@@ -531,7 +541,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
// Write config without notification to ZK.
val configMap = Map[String, String] ("producer_byte_rate" -> "1000", "consumer_byte_rate" -> "2000")
val map = Map("version" -> 1, "config" -> configMap.asJava)
- zkUtils.updatePersistentPath(ZkUtils.getEntityConfigPath(ConfigType.Client, clientId), Json.encodeAsString(map.asJava))
+ zkUtils.updatePersistentPath(ConfigEntityZNode.path(ConfigType.Client, clientId), Json.encodeAsString(map.asJava))
val configInZk: Map[String, Properties] = AdminUtils.fetchAllEntityConfigs(zkUtils, ConfigType.Client)
assertEquals("Must have 1 overriden client config", 1, configInZk.size)
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
index 8a731cf..da17f16 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
@@ -20,15 +20,31 @@ import java.nio.charset.StandardCharsets
import kafka.utils._
import kafka.server.KafkaConfig
-import org.junit.Test
+import org.junit.{After, Before, Test}
import kafka.consumer._
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import kafka.integration.KafkaServerTestHarness
+import org.apache.kafka.common.security.JaasUtils
@deprecated("This test has been deprecated and will be removed in a future release.", "0.11.0.0")
class DeleteConsumerGroupTest extends KafkaServerTestHarness {
def generateConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false, true).map(KafkaConfig.fromProps)
+ var zkUtils: ZkUtils = null
+
+ @Before
+ override def setUp() {
+ super.setUp()
+ zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled))
+ }
+
+ @After
+ override def tearDown() {
+ if (zkUtils != null)
+ CoreUtils.swallow(zkUtils.close(), this)
+ super.tearDown()
+ }
+
@Test
def testGroupWideDeleteInZK() {
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
index 291082c..6a276df 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
@@ -80,9 +80,9 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
// delete the NormalTopic
val deleteOpts = new TopicCommandOptions(Array("--topic", normalTopic))
val deletePath = getDeleteTopicPath(normalTopic)
- assertFalse("Delete path for topic shouldn't exist before deletion.", zkUtils.pathExists(deletePath))
+ assertFalse("Delete path for topic shouldn't exist before deletion.", zkClient.pathExists(deletePath))
TopicCommand.deleteTopic(zkClient, deleteOpts)
- assertTrue("Delete path for topic should exist after deletion.", zkUtils.pathExists(deletePath))
+ assertTrue("Delete path for topic should exist after deletion.", zkClient.pathExists(deletePath))
// create the offset topic
val createOffsetTopicOpts = new TopicCommandOptions(Array("--partitions", numPartitionsOriginal.toString,
@@ -93,11 +93,11 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
// try to delete the Topic.GROUP_METADATA_TOPIC_NAME and make sure it doesn't
val deleteOffsetTopicOpts = new TopicCommandOptions(Array("--topic", Topic.GROUP_METADATA_TOPIC_NAME))
val deleteOffsetTopicPath = getDeleteTopicPath(Topic.GROUP_METADATA_TOPIC_NAME)
- assertFalse("Delete path for topic shouldn't exist before deletion.", zkUtils.pathExists(deleteOffsetTopicPath))
+ assertFalse("Delete path for topic shouldn't exist before deletion.", zkClient.pathExists(deleteOffsetTopicPath))
intercept[AdminOperationException] {
TopicCommand.deleteTopic(zkClient, deleteOffsetTopicOpts)
}
- assertFalse("Delete path for topic shouldn't exist after deletion.", zkUtils.pathExists(deleteOffsetTopicPath))
+ assertFalse("Delete path for topic shouldn't exist after deletion.", zkClient.pathExists(deleteOffsetTopicPath))
}
@Test
diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index 77930e6..b4381a4 100644
--- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -28,8 +28,9 @@ import kafka.serializer._
import kafka.server._
import kafka.utils.TestUtils._
import kafka.utils._
+import org.apache.kafka.common.security.JaasUtils
import org.apache.log4j.{Level, Logger}
-import org.junit.{Test, After, Before}
+import org.junit.{After, Before, Test}
import scala.collection._
@@ -43,6 +44,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
val topic = "topic1"
val overridingProps = new Properties()
overridingProps.put(KafkaConfig.NumPartitionsProp, numParts.toString)
+ var zkUtils: ZkUtils = null
override def generateConfigs =
TestUtils.createBrokerConfigs(numNodes, zkConnect).map(KafkaConfig.fromProps(_, overridingProps))
@@ -57,11 +59,14 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
@Before
override def setUp() {
super.setUp()
+ zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled))
dirs = new ZKGroupTopicDirs(group, topic)
}
@After
override def tearDown() {
+ if (zkUtils != null)
+ CoreUtils.swallow(zkUtils.close(), this)
super.tearDown()
}
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index 0ad64c5..88fe82b 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -20,12 +20,12 @@ package kafka.controller
import com.yammer.metrics.Metrics
import com.yammer.metrics.core.Timer
import kafka.api.LeaderAndIsr
-import kafka.common.TopicAndPartition
import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.utils.{TestUtils, ZkUtils}
-import kafka.zk.ZooKeeperTestHarness
+import kafka.utils.TestUtils
+import kafka.zk.{PreferredReplicaElectionZNode, ZooKeeperTestHarness}
import org.junit.{After, Before, Test}
import org.junit.Assert.assertTrue
+import org.apache.kafka.common.TopicPartition
import scala.collection.JavaConverters._
@@ -47,37 +47,37 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
@Test
def testEmptyCluster(): Unit = {
servers = makeServers(1)
- TestUtils.waitUntilTrue(() => zkUtils.pathExists(ZkUtils.ControllerPath), "failed to elect a controller")
+ TestUtils.waitUntilTrue(() => zkClient.getControllerId.isDefined, "failed to elect a controller")
waitUntilControllerEpoch(KafkaController.InitialControllerEpoch, "broker failed to set controller epoch")
}
@Test
def testControllerEpochPersistsWhenAllBrokersDown(): Unit = {
servers = makeServers(1)
- TestUtils.waitUntilTrue(() => zkUtils.pathExists(ZkUtils.ControllerPath), "failed to elect a controller")
+ TestUtils.waitUntilTrue(() => zkClient.getControllerId.isDefined, "failed to elect a controller")
waitUntilControllerEpoch(KafkaController.InitialControllerEpoch, "broker failed to set controller epoch")
servers.head.shutdown()
servers.head.awaitShutdown()
- TestUtils.waitUntilTrue(() => !zkUtils.pathExists(ZkUtils.ControllerPath), "failed to kill controller")
+ TestUtils.waitUntilTrue(() => !zkClient.getControllerId.isDefined, "failed to kill controller")
waitUntilControllerEpoch(KafkaController.InitialControllerEpoch, "controller epoch was not persisted after broker failure")
}
@Test
def testControllerMoveIncrementsControllerEpoch(): Unit = {
servers = makeServers(1)
- TestUtils.waitUntilTrue(() => zkUtils.pathExists(ZkUtils.ControllerPath), "failed to elect a controller")
+ TestUtils.waitUntilTrue(() => zkClient.getControllerId.isDefined, "failed to elect a controller")
waitUntilControllerEpoch(KafkaController.InitialControllerEpoch, "broker failed to set controller epoch")
servers.head.shutdown()
servers.head.awaitShutdown()
servers.head.startup()
- TestUtils.waitUntilTrue(() => zkUtils.pathExists(ZkUtils.ControllerPath), "failed to elect a controller")
+ TestUtils.waitUntilTrue(() => zkClient.getControllerId.isDefined, "failed to elect a controller")
waitUntilControllerEpoch(KafkaController.InitialControllerEpoch + 1, "controller epoch was not incremented after controller move")
}
@Test
def testTopicCreation(): Unit = {
servers = makeServers(1)
- val tp = TopicAndPartition("t", 0)
+ val tp = new TopicPartition("t", 0)
val assignment = Map(tp.partition -> Seq(0))
TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
waitForPartitionState(tp, KafkaController.InitialControllerEpoch, 0, LeaderAndIsr.initialLeaderEpoch,
@@ -91,7 +91,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
servers(otherBrokerId).shutdown()
servers(otherBrokerId).awaitShutdown()
- val tp = TopicAndPartition("t", 0)
+ val tp = new TopicPartition("t", 0)
val assignment = Map(tp.partition -> Seq(otherBrokerId, controllerId))
TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers.take(1))
waitForPartitionState(tp, KafkaController.InitialControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch,
@@ -101,12 +101,12 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
@Test
def testTopicPartitionExpansion(): Unit = {
servers = makeServers(1)
- val tp0 = TopicAndPartition("t", 0)
- val tp1 = TopicAndPartition("t", 1)
+ val tp0 = new TopicPartition("t", 0)
+ val tp1 = new TopicPartition("t", 1)
val assignment = Map(tp0.partition -> Seq(0))
- val expandedAssignment = Map(tp0.partition -> Seq(0), tp1.partition -> Seq(0))
+ val expandedAssignment = Map(tp0 -> Seq(0), tp1 -> Seq(0))
TestUtils.createTopic(zkClient, tp0.topic, partitionReplicaAssignment = assignment, servers = servers)
- zkUtils.updatePersistentPath(ZkUtils.getTopicPath(tp0.topic), zkUtils.replicaAssignmentZkData(expandedAssignment.map(kv => kv._1.toString -> kv._2)))
+ zkClient.setTopicAssignment(tp0.topic, expandedAssignment)
waitForPartitionState(tp1, KafkaController.InitialControllerEpoch, 0, LeaderAndIsr.initialLeaderEpoch,
"failed to get expected partition state upon topic partition expansion")
TestUtils.waitUntilMetadataIsPropagated(servers, tp1.topic, tp1.partition)
@@ -117,14 +117,14 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
servers = makeServers(2)
val controllerId = TestUtils.waitUntilControllerElected(zkClient)
val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
- val tp0 = TopicAndPartition("t", 0)
- val tp1 = TopicAndPartition("t", 1)
+ val tp0 = new TopicPartition("t", 0)
+ val tp1 = new TopicPartition("t", 1)
val assignment = Map(tp0.partition -> Seq(otherBrokerId, controllerId))
- val expandedAssignment = Map(tp0.partition -> Seq(otherBrokerId, controllerId), tp1.partition -> Seq(otherBrokerId, controllerId))
+ val expandedAssignment = Map(tp0 -> Seq(otherBrokerId, controllerId), tp1 -> Seq(otherBrokerId, controllerId))
TestUtils.createTopic(zkClient, tp0.topic, partitionReplicaAssignment = assignment, servers = servers)
servers(otherBrokerId).shutdown()
servers(otherBrokerId).awaitShutdown()
- zkUtils.updatePersistentPath(ZkUtils.getTopicPath(tp0.topic), zkUtils.replicaAssignmentZkData(expandedAssignment.map(kv => kv._1.toString -> kv._2)))
+ zkClient.setTopicAssignment(tp0.topic, expandedAssignment)
waitForPartitionState(tp1, KafkaController.InitialControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch,
"failed to get expected partition state upon topic partition expansion")
TestUtils.waitUntilMetadataIsPropagated(Seq(servers(controllerId)), tp1.topic, tp1.partition)
@@ -139,16 +139,16 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
val timerCount = timer(metricName).count
val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
- val tp = TopicAndPartition("t", 0)
+ val tp = new TopicPartition("t", 0)
val assignment = Map(tp.partition -> Seq(controllerId))
val reassignment = Map(tp -> Seq(otherBrokerId))
TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
- zkUtils.createPersistentPath(ZkUtils.ReassignPartitionsPath, ZkUtils.formatAsReassignmentJson(reassignment))
+ zkClient.createPartitionReassignment(reassignment)
waitForPartitionState(tp, KafkaController.InitialControllerEpoch, otherBrokerId, LeaderAndIsr.initialLeaderEpoch + 3,
"failed to get expected partition state after partition reassignment")
- TestUtils.waitUntilTrue(() => zkUtils.getReplicaAssignmentForTopics(Seq(tp.topic)) == reassignment,
+ TestUtils.waitUntilTrue(() => zkClient.getReplicaAssignmentForTopics(Set(tp.topic)) == reassignment,
"failed to get updated partition assignment on topic znode after partition reassignment")
- TestUtils.waitUntilTrue(() => !zkUtils.pathExists(ZkUtils.ReassignPartitionsPath),
+ TestUtils.waitUntilTrue(() => !zkClient.reassignPartitionsInProgress(),
"failed to remove reassign partitions path after completion")
val updatedTimerCount = timer(metricName).count
@@ -160,16 +160,16 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
servers = makeServers(2)
val controllerId = TestUtils.waitUntilControllerElected(zkClient)
val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
- val tp = TopicAndPartition("t", 0)
+ val tp = new TopicPartition("t", 0)
val assignment = Map(tp.partition -> Seq(controllerId))
val reassignment = Map(tp -> Seq(otherBrokerId))
TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
servers(otherBrokerId).shutdown()
servers(otherBrokerId).awaitShutdown()
- zkUtils.createPersistentPath(ZkUtils.ReassignPartitionsPath, ZkUtils.formatAsReassignmentJson(reassignment))
+ zkClient.setOrCreatePartitionReassignment(reassignment)
waitForPartitionState(tp, KafkaController.InitialControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch + 1,
"failed to get expected partition state during partition reassignment with offline replica")
- TestUtils.waitUntilTrue(() => zkUtils.pathExists(ZkUtils.ReassignPartitionsPath),
+ TestUtils.waitUntilTrue(() => zkClient.reassignPartitionsInProgress(),
"partition reassignment path should remain while reassignment in progress")
}
@@ -178,21 +178,21 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
servers = makeServers(2)
val controllerId = TestUtils.waitUntilControllerElected(zkClient)
val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
- val tp = TopicAndPartition("t", 0)
+ val tp = new TopicPartition("t", 0)
val assignment = Map(tp.partition -> Seq(controllerId))
val reassignment = Map(tp -> Seq(otherBrokerId))
TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
servers(otherBrokerId).shutdown()
servers(otherBrokerId).awaitShutdown()
- zkUtils.createPersistentPath(ZkUtils.ReassignPartitionsPath, ZkUtils.formatAsReassignmentJson(reassignment))
+ zkClient.createPartitionReassignment(reassignment)
waitForPartitionState(tp, KafkaController.InitialControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch + 1,
"failed to get expected partition state during partition reassignment with offline replica")
servers(otherBrokerId).startup()
waitForPartitionState(tp, KafkaController.InitialControllerEpoch, otherBrokerId, LeaderAndIsr.initialLeaderEpoch + 4,
"failed to get expected partition state after partition reassignment")
- TestUtils.waitUntilTrue(() => zkUtils.getReplicaAssignmentForTopics(Seq(tp.topic)) == reassignment,
+ TestUtils.waitUntilTrue(() => zkClient.getReplicaAssignmentForTopics(Set(tp.topic)) == reassignment,
"failed to get updated partition assignment on topic znode after partition reassignment")
- TestUtils.waitUntilTrue(() => !zkUtils.pathExists(ZkUtils.ReassignPartitionsPath),
+ TestUtils.waitUntilTrue(() => !zkClient.reassignPartitionsInProgress(),
"failed to remove reassign partitions path after completion")
}
@@ -201,7 +201,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
servers = makeServers(2)
val controllerId = TestUtils.waitUntilControllerElected(zkClient)
val otherBroker = servers.find(_.config.brokerId != controllerId).get
- val tp = TopicAndPartition("t", 0)
+ val tp = new TopicPartition("t", 0)
val assignment = Map(tp.partition -> Seq(otherBroker.config.brokerId, controllerId))
TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
preferredReplicaLeaderElection(controllerId, otherBroker, tp, assignment(tp.partition).toSet, LeaderAndIsr.initialLeaderEpoch)
@@ -212,7 +212,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
servers = makeServers(2)
val controllerId = TestUtils.waitUntilControllerElected(zkClient)
val otherBroker = servers.find(_.config.brokerId != controllerId).get
- val tp = TopicAndPartition("t", 0)
+ val tp = new TopicPartition("t", 0)
val assignment = Map(tp.partition -> Seq(otherBroker.config.brokerId, controllerId))
TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
preferredReplicaLeaderElection(controllerId, otherBroker, tp, assignment(tp.partition).toSet, LeaderAndIsr.initialLeaderEpoch)
@@ -224,13 +224,13 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
servers = makeServers(2)
val controllerId = TestUtils.waitUntilControllerElected(zkClient)
val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
- val tp = TopicAndPartition("t", 0)
+ val tp = new TopicPartition("t", 0)
val assignment = Map(tp.partition -> Seq(otherBrokerId, controllerId))
TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
servers(otherBrokerId).shutdown()
servers(otherBrokerId).awaitShutdown()
- zkUtils.createPersistentPath(ZkUtils.PreferredReplicaLeaderElectionPath, ZkUtils.preferredReplicaLeaderElectionZkData(Set(tp)))
- TestUtils.waitUntilTrue(() => !zkUtils.pathExists(ZkUtils.PreferredReplicaLeaderElectionPath),
+ zkClient.createPreferredReplicaElection(Set(tp))
+ TestUtils.waitUntilTrue(() => !zkClient.pathExists(PreferredReplicaElectionZNode.path),
"failed to remove preferred replica leader election path after giving up")
waitForPartitionState(tp, KafkaController.InitialControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch + 1,
"failed to get expected partition state upon broker shutdown")
@@ -241,7 +241,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
servers = makeServers(2, autoLeaderRebalanceEnable = true)
val controllerId = TestUtils.waitUntilControllerElected(zkClient)
val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
- val tp = TopicAndPartition("t", 0)
+ val tp = new TopicPartition("t", 0)
val assignment = Map(tp.partition -> Seq(1, 0))
TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
servers(otherBrokerId).shutdown()
@@ -258,7 +258,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
servers = makeServers(2)
val controllerId = TestUtils.waitUntilControllerElected(zkClient)
val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
- val tp = TopicAndPartition("t", 0)
+ val tp = new TopicPartition("t", 0)
val assignment = Map(tp.partition -> Seq(otherBrokerId))
TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
waitForPartitionState(tp, KafkaController.InitialControllerEpoch, otherBrokerId, LeaderAndIsr.initialLeaderEpoch,
@@ -266,7 +266,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
servers(otherBrokerId).shutdown()
servers(otherBrokerId).awaitShutdown()
TestUtils.waitUntilTrue(() => {
- val leaderIsrAndControllerEpochMap = zkUtils.getPartitionLeaderAndIsrForTopics(Set(tp))
+ val leaderIsrAndControllerEpochMap = zkClient.getTopicPartitionStates(Seq(tp))
leaderIsrAndControllerEpochMap.contains(tp) &&
isExpectedPartitionState(leaderIsrAndControllerEpochMap(tp), KafkaController.InitialControllerEpoch, LeaderAndIsr.NoLeader, LeaderAndIsr.initialLeaderEpoch + 1) &&
leaderIsrAndControllerEpochMap(tp).leaderAndIsr.isr == List(otherBrokerId)
@@ -278,7 +278,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
servers = makeServers(2, uncleanLeaderElectionEnable = true)
val controllerId = TestUtils.waitUntilControllerElected(zkClient)
val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
- val tp = TopicAndPartition("t", 0)
+ val tp = new TopicPartition("t", 0)
val assignment = Map(tp.partition -> Seq(otherBrokerId))
TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
waitForPartitionState(tp, KafkaController.InitialControllerEpoch, otherBrokerId, LeaderAndIsr.initialLeaderEpoch,
@@ -286,39 +286,39 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
servers(1).shutdown()
servers(1).awaitShutdown()
TestUtils.waitUntilTrue(() => {
- val leaderIsrAndControllerEpochMap = zkUtils.getPartitionLeaderAndIsrForTopics(Set(tp))
+ val leaderIsrAndControllerEpochMap = zkClient.getTopicPartitionStates(Seq(tp))
leaderIsrAndControllerEpochMap.contains(tp) &&
isExpectedPartitionState(leaderIsrAndControllerEpochMap(tp), KafkaController.InitialControllerEpoch, LeaderAndIsr.NoLeader, LeaderAndIsr.initialLeaderEpoch + 1) &&
leaderIsrAndControllerEpochMap(tp).leaderAndIsr.isr == List(otherBrokerId)
}, "failed to get expected partition state after entire isr went offline")
}
- private def preferredReplicaLeaderElection(controllerId: Int, otherBroker: KafkaServer, tp: TopicAndPartition,
+ private def preferredReplicaLeaderElection(controllerId: Int, otherBroker: KafkaServer, tp: TopicPartition,
replicas: Set[Int], leaderEpoch: Int): Unit = {
otherBroker.shutdown()
otherBroker.awaitShutdown()
waitForPartitionState(tp, KafkaController.InitialControllerEpoch, controllerId, leaderEpoch + 1,
"failed to get expected partition state upon broker shutdown")
otherBroker.startup()
- TestUtils.waitUntilTrue(() => zkUtils.getInSyncReplicasForPartition(tp.topic, tp.partition).toSet == replicas, "restarted broker failed to join in-sync replicas")
- zkUtils.createPersistentPath(ZkUtils.PreferredReplicaLeaderElectionPath, ZkUtils.preferredReplicaLeaderElectionZkData(Set(tp)))
- TestUtils.waitUntilTrue(() => !zkUtils.pathExists(ZkUtils.PreferredReplicaLeaderElectionPath),
+ TestUtils.waitUntilTrue(() => zkClient.getInSyncReplicasForPartition(new TopicPartition(tp.topic, tp.partition)).get.toSet == replicas, "restarted broker failed to join in-sync replicas")
+ zkClient.createPreferredReplicaElection(Set(tp))
+ TestUtils.waitUntilTrue(() => !zkClient.pathExists(PreferredReplicaElectionZNode.path),
"failed to remove preferred replica leader election path after completion")
waitForPartitionState(tp, KafkaController.InitialControllerEpoch, otherBroker.config.brokerId, leaderEpoch + 2,
"failed to get expected partition state upon broker startup")
}
private def waitUntilControllerEpoch(epoch: Int, message: String): Unit = {
- TestUtils.waitUntilTrue(() => zkUtils.readDataMaybeNull(ZkUtils.ControllerEpochPath)._1.map(_.toInt) == Some(epoch), message)
+ TestUtils.waitUntilTrue(() => zkClient.getControllerEpoch.get._1 == epoch, message)
}
- private def waitForPartitionState(tp: TopicAndPartition,
+ private def waitForPartitionState(tp: TopicPartition,
controllerEpoch: Int,
leader: Int,
leaderEpoch: Int,
message: String): Unit = {
TestUtils.waitUntilTrue(() => {
- val leaderIsrAndControllerEpochMap = zkUtils.getPartitionLeaderAndIsrForTopics(Set(tp))
+ val leaderIsrAndControllerEpochMap = zkClient.getTopicPartitionStates(Seq(tp))
leaderIsrAndControllerEpochMap.contains(tp) &&
isExpectedPartitionState(leaderIsrAndControllerEpochMap(tp), controllerEpoch, leader, leaderEpoch)
}, message)
diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
index 617b326..0a8c49f 100644
--- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
@@ -19,15 +19,16 @@ package kafka.integration
import java.util.concurrent._
import java.util.concurrent.atomic._
-import org.junit.{Test, After, Before}
+
+import org.junit.{After, Before, Test}
import scala.collection._
import org.junit.Assert._
-
import kafka.cluster._
import kafka.server._
import kafka.consumer._
-import kafka.utils.TestUtils
+import kafka.utils.{CoreUtils, TestUtils, ZkUtils}
+import org.apache.kafka.common.security.JaasUtils
@deprecated("This test has been deprecated and will be removed in a future release.", "0.11.0.0")
class FetcherTest extends KafkaServerTestHarness {
@@ -39,10 +40,13 @@ class FetcherTest extends KafkaServerTestHarness {
val queue = new LinkedBlockingQueue[FetchedDataChunk]
var fetcher: ConsumerFetcherManager = null
+ var zkUtils: ZkUtils = null
@Before
override def setUp() {
super.setUp
+ zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled))
+
createTopic(topic, partitionReplicaAssignment = Map(0 -> Seq(configs.head.brokerId)))
val cluster = new Cluster(servers.map { s =>
@@ -65,6 +69,8 @@ class FetcherTest extends KafkaServerTestHarness {
@After
override def tearDown() {
fetcher.stopConnections()
+ if (zkUtils != null)
+ CoreUtils.swallow(zkUtils.close(), this)
super.tearDown
}
diff --git a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
index 033ca67..19fa19d 100644
--- a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
@@ -18,20 +18,22 @@
package kafka.security.auth
import kafka.admin.ZkSecurityMigrator
-import kafka.utils.{Logging, TestUtils, ZkUtils}
+import kafka.utils.{CoreUtils, Logging, TestUtils, ZkUtils}
import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.security.JaasUtils
-import org.apache.zookeeper.data.{ACL}
+import org.apache.zookeeper.data.ACL
import org.junit.Assert._
import org.junit.{After, Before, Test}
+
import scala.collection.JavaConverters._
-import scala.util.{Try, Success, Failure}
+import scala.util.{Failure, Success, Try}
import javax.security.auth.login.Configuration
class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
val jaasFile = kafka.utils.JaasTestUtils.writeJaasContextsToFile(kafka.utils.JaasTestUtils.zkSections)
val authProvider = "zookeeper.authProvider.1"
+ var zkUtils: ZkUtils = null
@Before
override def setUp() {
@@ -39,10 +41,13 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
Configuration.setConfiguration(null)
System.setProperty(authProvider, "org.apache.zookeeper.server.auth.SASLAuthenticationProvider")
super.setUp()
+ zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled))
}
@After
override def tearDown() {
+ if (zkUtils != null)
+ CoreUtils.swallow(zkUtils.close(), this)
super.tearDown()
System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
System.clearProperty(authProvider)
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index 6768b84..751053d 100755
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -74,7 +74,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
// create topic with 1 partition, 2 replicas, one on each broker
val leader1 = createTopic(zkClient, topic, partitionReplicaAssignment = Map(0 -> Seq(0, 1)), servers = servers)(0)
- val leaderEpoch1 = zkUtils.getEpochForPartition(topic, partitionId)
+ val leaderEpoch1 = zkClient.getEpochForPartition(new TopicPartition(topic, partitionId)).get
debug("leader Epoch: " + leaderEpoch1)
debug("Leader is elected to be: %s".format(leader1))
// NOTE: this is to avoid transient test failures
@@ -86,7 +86,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
// check if leader moves to the other server
val leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId,
oldLeaderOpt = if (leader1 == 0) None else Some(leader1))
- val leaderEpoch2 = zkUtils.getEpochForPartition(topic, partitionId)
+ val leaderEpoch2 = zkClient.getEpochForPartition(new TopicPartition(topic, partitionId)).get
debug("Leader is elected to be: %s".format(leader1))
debug("leader Epoch: " + leaderEpoch2)
assertEquals("Leader must move to broker 0", 0, leader2)
@@ -100,7 +100,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
Thread.sleep(zookeeper.tickTime)
val leader3 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId,
oldLeaderOpt = if (leader2 == 1) None else Some(leader2))
- val leaderEpoch3 = zkUtils.getEpochForPartition(topic, partitionId)
+ val leaderEpoch3 = zkClient.getEpochForPartition(new TopicPartition(topic, partitionId)).get
debug("leader Epoch: " + leaderEpoch3)
debug("Leader is elected to be: %s".format(leader3))
assertEquals("Leader must return to 1", 1, leader3)
@@ -119,7 +119,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
// create topic with 1 partition, 2 replicas, one on each broker
val leader1 = createTopic(zkClient, topic, partitionReplicaAssignment = Map(0 -> Seq(0, 1)), servers = servers)(0)
- val leaderEpoch1 = zkUtils.getEpochForPartition(topic, partitionId)
+ val leaderEpoch1 = zkClient.getEpochForPartition(new TopicPartition(topic, partitionId)).get
debug("leader Epoch: " + leaderEpoch1)
debug("Leader is elected to be: %s".format(leader1))
// NOTE: this is to avoid transient test failures
diff --git a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
index ba33ab0..2087363 100644
--- a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
@@ -24,7 +24,7 @@ import kafka.server.LogDirFailureTest._
import kafka.api.IntegrationTestHarness
import kafka.controller.{OfflineReplica, PartitionAndReplica}
import kafka.utils.{CoreUtils, Exit, TestUtils}
-import kafka.zk.LogDirEventNotificationZNode
+
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
import org.apache.kafka.common.TopicPartition
@@ -191,7 +191,7 @@ class LogDirFailureTest extends IntegrationTestHarness {
}, "Expected some messages", 3000L)
// There should be no remaining LogDirEventNotification znode
- assertTrue(zkUtils.getChildrenParentMayNotExist(LogDirEventNotificationZNode.path).isEmpty)
+ assertTrue(zkClient.getAllLogDirEventNotifications.isEmpty)
// The controller should have marked the replica on the original leader as offline
val controllerServer = servers.find(_.kafkaController.isActive).get
diff --git a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
index 29a1fa6..2fa6600 100755
--- a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
@@ -189,12 +189,4 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
}
true
}
-
- @Test
- def testGetSequenceIdMethod() {
- val path = "/test/seqid"
- (1 to 10).foreach { seqid =>
- assertEquals(seqid, zkUtils.getSequenceId(path))
- }
- }
}
diff --git a/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala b/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala
index 54ba887..0317da3 100755
--- a/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala
@@ -49,7 +49,7 @@ class ServerGenerateClusterIdTest extends ZooKeeperTestHarness {
@Test
def testAutoGenerateClusterId() {
// Make sure that the cluster id doesn't exist yet.
- assertFalse(zkUtils.pathExists(ZkUtils.ClusterIdPath))
+ assertFalse(zkClient.getClusterId.isDefined)
var server1 = TestUtils.createServer(config1)
servers = Seq(server1)
@@ -61,7 +61,7 @@ class ServerGenerateClusterIdTest extends ZooKeeperTestHarness {
server1.shutdown()
// Make sure that the cluster id is persistent.
- assertTrue(zkUtils.pathExists(ZkUtils.ClusterIdPath))
+ assertTrue(zkClient.getClusterId.isDefined)
assertEquals(zkClient.getClusterId, Some(clusterIdOnFirstBoot))
// Restart the server check to confirm that it uses the clusterId generated previously
@@ -74,7 +74,7 @@ class ServerGenerateClusterIdTest extends ZooKeeperTestHarness {
server1.shutdown()
// Make sure that the cluster id is persistent after multiple reboots.
- assertTrue(zkUtils.pathExists(ZkUtils.ClusterIdPath))
+ assertTrue(zkClient.getClusterId.isDefined)
assertEquals(zkClient.getClusterId, Some(clusterIdOnFirstBoot))
TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
index 3414c36..4c05d98 100755
--- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
@@ -45,7 +45,7 @@ class ServerStartupTest extends ZooKeeperTestHarness {
props.put("zookeeper.connect", zooKeeperConnect + zookeeperChroot)
server = TestUtils.createServer(KafkaConfig.fromProps(props))
- val pathExists = zkUtils.pathExists(zookeeperChroot)
+ val pathExists = zkClient.pathExists(zookeeperChroot)
assertTrue(pathExists)
}
@@ -76,7 +76,7 @@ class ServerStartupTest extends ZooKeeperTestHarness {
val brokerId = 0
val props1 = TestUtils.createBrokerConfig(brokerId, zkConnect)
server = TestUtils.createServer(KafkaConfig.fromProps(props1))
- val brokerRegistration = zkUtils.readData(ZkUtils.BrokerIdsPath + "/" + brokerId)._1
+ val brokerRegistration = zkClient.getBroker(brokerId).getOrElse(fail("broker doesn't exists"))
val props2 = TestUtils.createBrokerConfig(brokerId, zkConnect)
try {
@@ -88,7 +88,7 @@ class ServerStartupTest extends ZooKeeperTestHarness {
}
// broker registration shouldn't change
- assertEquals(brokerRegistration, zkUtils.readData(ZkUtils.BrokerIdsPath + "/" + brokerId)._1)
+ assertEquals(brokerRegistration, zkClient.getBroker(brokerId).getOrElse(fail("broker doesn't exists")))
}
@Test
diff --git a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
index 3a42c3b..3389161 100644
--- a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
@@ -19,12 +19,12 @@ package kafka.utils
import kafka.server.{KafkaConfig, ReplicaFetcherManager}
import kafka.api.LeaderAndIsr
-import kafka.zk.ZooKeeperTestHarness
+import kafka.controller.LeaderIsrAndControllerEpoch
+import kafka.zk.{IsrChangeNotificationZNode, TopicZNode, ZooKeeperTestHarness}
import org.apache.kafka.common.TopicPartition
import org.junit.Assert._
import org.junit.{Before, Test}
import org.easymock.EasyMock
-import scala.collection.JavaConverters._
class ReplicationUtilsTest extends ZooKeeperTestHarness {
private val zkVersion = 1
@@ -34,14 +34,15 @@ class ReplicationUtilsTest extends ZooKeeperTestHarness {
private val leaderEpoch = 1
private val controllerEpoch = 1
private val isr = List(1, 2)
- private val topicPath = s"/brokers/topics/$topic/partitions/$partition/state"
- private val topicData = Json.encodeAsString(Map("controller_epoch" -> controllerEpoch, "leader" -> leader,
- "versions" -> 1, "leader_epoch" -> leaderEpoch, "isr" -> isr).asJava)
@Before
override def setUp() {
super.setUp()
- zkUtils.createPersistentPath(topicPath, topicData)
+ zkClient.makeSurePersistentPathExists(TopicZNode.path(topic))
+ val topicPartition = new TopicPartition(topic, partition)
+ val leaderAndIsr = LeaderAndIsr(leader, leaderEpoch, isr, 1)
+ val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch)
+ zkClient.createTopicPartitionStatesRaw(Map(topicPartition -> leaderIsrAndControllerEpoch))
}
@Test
@@ -63,7 +64,7 @@ class ReplicationUtilsTest extends ZooKeeperTestHarness {
EasyMock.expect(replicaManager.zkClient).andReturn(zkClient)
EasyMock.replay(replicaManager)
- zkClient.makeSurePersistentPathExists(ZkUtils.IsrChangeNotificationPath)
+ zkClient.makeSurePersistentPathExists(IsrChangeNotificationZNode.path)
val replicas = List(0, 1)
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 303afa7..636c0bd 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -977,24 +977,25 @@ object TestUtils extends Logging {
file.close()
}
- def checkForPhantomInSyncReplicas(zkUtils: ZkUtils, topic: String, partitionToBeReassigned: Int, assignedReplicas: Seq[Int]) {
- val inSyncReplicas = zkUtils.getInSyncReplicasForPartition(topic, partitionToBeReassigned)
+ def checkForPhantomInSyncReplicas(zkClient: KafkaZkClient, topic: String, partitionToBeReassigned: Int, assignedReplicas: Seq[Int]) {
+ val inSyncReplicas = zkClient.getInSyncReplicasForPartition(new TopicPartition(topic, partitionToBeReassigned))
// in sync replicas should not have any replica that is not in the new assigned replicas
- val phantomInSyncReplicas = inSyncReplicas.toSet -- assignedReplicas.toSet
+ val phantomInSyncReplicas = inSyncReplicas.get.toSet -- assignedReplicas.toSet
assertTrue("All in sync replicas %s must be in the assigned replica list %s".format(inSyncReplicas, assignedReplicas),
phantomInSyncReplicas.isEmpty)
}
- def ensureNoUnderReplicatedPartitions(zkUtils: ZkUtils, topic: String, partitionToBeReassigned: Int, assignedReplicas: Seq[Int],
+ def ensureNoUnderReplicatedPartitions(zkClient: KafkaZkClient, topic: String, partitionToBeReassigned: Int, assignedReplicas: Seq[Int],
servers: Seq[KafkaServer]) {
+ val topicPartition = new TopicPartition(topic, partitionToBeReassigned)
TestUtils.waitUntilTrue(() => {
- val inSyncReplicas = zkUtils.getInSyncReplicasForPartition(topic, partitionToBeReassigned)
- inSyncReplicas.size == assignedReplicas.size
+ val inSyncReplicas = zkClient.getInSyncReplicasForPartition(topicPartition)
+ inSyncReplicas.get.size == assignedReplicas.size
},
"Reassigned partition [%s,%d] is under replicated".format(topic, partitionToBeReassigned))
var leader: Option[Int] = None
TestUtils.waitUntilTrue(() => {
- leader = zkUtils.getLeaderForPartition(topic, partitionToBeReassigned)
+ leader = zkClient.getLeaderForPartition(topicPartition)
leader.isDefined
},
"Reassigned partition [%s,%d] is unavailable".format(topic, partitionToBeReassigned))
diff --git a/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala
index 9f78124..292db8b 100755
--- a/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala
@@ -21,12 +21,27 @@ import kafka.api.LeaderAndIsr
import kafka.common.TopicAndPartition
import kafka.controller.LeaderIsrAndControllerEpoch
import kafka.zk.ZooKeeperTestHarness
+import org.apache.kafka.common.security.JaasUtils
import org.junit.Assert._
-import org.junit.Test
+import org.junit.{After, Before, Test}
class ZkUtilsTest extends ZooKeeperTestHarness {
val path = "/path"
+ var zkUtils: ZkUtils = _
+
+ @Before
+ override def setUp() {
+ super.setUp
+ zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled))
+ }
+
+ @After
+ override def tearDown() {
+ if (zkUtils != null)
+ CoreUtils.swallow(zkUtils.close(), this)
+ super.tearDown
+ }
@Test
def testSuccessfulConditionalDeletePath() {
@@ -107,4 +122,11 @@ class ZkUtilsTest extends ZooKeeperTestHarness {
assertEquals(None, zkUtils.getLeaderIsrAndEpochForPartition(topic, partition + 1))
}
+ @Test
+ def testGetSequenceIdMethod() {
+ val path = "/test/seqid"
+ (1 to 10).foreach { seqid =>
+ assertEquals(seqid, zkUtils.getSequenceId(path))
+ }
+ }
}
diff --git a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
index 6280d97..06ea963 100644
--- a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
@@ -21,18 +21,15 @@ import java.lang.Iterable
import javax.security.auth.login.Configuration
import scala.collection.JavaConverters._
-
import kafka.consumer.ConsumerConfig
-import kafka.utils.ZkUtils
-import kafka.utils.ZKCheckedEphemeral
-import kafka.utils.TestUtils
+import kafka.utils.{CoreUtils, TestUtils, ZKCheckedEphemeral, ZkUtils}
import org.apache.kafka.common.security.JaasUtils
import org.apache.zookeeper.CreateMode
import org.apache.zookeeper.WatchedEvent
import org.apache.zookeeper.Watcher
import org.apache.zookeeper.ZooDefs.Ids
import org.I0Itec.zkclient.exception.ZkNodeExistsException
-import org.junit.{After, Before, Test, Assert}
+import org.junit.{After, Assert, Before, Test}
import org.junit.runners.Parameterized
import org.junit.runners.Parameterized.Parameters
import org.junit.runner.RunWith
@@ -50,7 +47,8 @@ class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness {
val jaasFile = kafka.utils.JaasTestUtils.writeJaasContextsToFile(kafka.utils.JaasTestUtils.zkSections)
val authProvider = "zookeeper.authProvider.1"
var zkSessionTimeoutMs = 1000
-
+ var zkUtils: ZkUtils = null
+
@Before
override def setUp() {
if (secure) {
@@ -61,10 +59,13 @@ class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness {
fail("Secure access not enabled")
}
super.setUp
+ zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled))
}
@After
override def tearDown() {
+ if (zkUtils != null)
+ CoreUtils.swallow(zkUtils.close(), this)
super.tearDown
System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
System.clearProperty(authProvider)
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index c7c3152..9e72583 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -19,7 +19,7 @@ package kafka.zk
import javax.security.auth.login.Configuration
-import kafka.utils.{CoreUtils, Logging, TestUtils, ZkUtils}
+import kafka.utils.{CoreUtils, Logging, TestUtils}
import org.junit.{After, AfterClass, Before, BeforeClass}
import org.junit.Assert._
import org.scalatest.junit.JUnitSuite
@@ -43,7 +43,6 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
protected val zkAclsEnabled: Option[Boolean] = None
- var zkUtils: ZkUtils = null
var zkClient: KafkaZkClient = null
var adminZkClient: AdminZkClient = null
@@ -55,7 +54,6 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
@Before
def setUp() {
zookeeper = new EmbeddedZookeeper()
- zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled))
zkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout,
zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
adminZkClient = new AdminZkClient(zkClient)
@@ -63,8 +61,6 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
@After
def tearDown() {
- if (zkUtils != null)
- CoreUtils.swallow(zkUtils.close(), this)
if (zkClient != null)
zkClient.close()
if (zookeeper != null)
--
To stop receiving notification emails like this one, please contact
jgus@apache.org.