You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2016/05/03 19:55:59 UTC
kafka git commit: KAFKA-3128;
Add metrics for ZooKeeper events zookeeper metrics
Repository: kafka
Updated Branches:
refs/heads/trunk 1f528815d -> c7425be5b
KAFKA-3128; Add metrics for ZooKeeper events zookeeper metrics
Also:
* Remove redundant `time.milliseconds` call in `Sensor.record`
* Clean-up a number of tests and remove a manual test that is no longer required
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Liquan Pei <li...@gmail.com>, Jun Rao <ju...@gmail.com>
Closes #1265 from ijuma/kafka-3128-zookeeper-metrics
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c7425be5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c7425be5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c7425be5
Branch: refs/heads/trunk
Commit: c7425be5be8d0c2786155fbc697d83f80827d084
Parents: 1f52881
Author: Ismael Juma <is...@juma.me.uk>
Authored: Tue May 3 10:55:54 2016 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue May 3 10:55:54 2016 -0700
----------------------------------------------------------------------
.../org/apache/kafka/common/metrics/Sensor.java | 2 +-
.../scala/kafka/server/KafkaHealthcheck.scala | 57 ++++++++++------
.../main/scala/kafka/server/KafkaServer.scala | 5 +-
core/src/main/scala/kafka/utils/ZkUtils.scala | 2 +-
.../integration/kafka/api/AdminClientTest.scala | 2 +-
.../kafka/api/ProducerBounceTest.scala | 16 ++---
.../test/scala/other/kafka/DeleteZKPath.scala | 44 -------------
.../kafka/integration/PrimitiveApiTest.scala | 2 +-
.../ZookeeperConsumerConnectorTest.scala | 2 +-
.../unit/kafka/server/ReplicaManagerTest.scala | 2 +-
.../server/SessionExpireListenerTest.scala | 68 ++++++++++++++++++++
.../unit/kafka/utils/CommandLineUtilsTest.scala | 2 +-
.../test/scala/unit/kafka/zk/ZKPathTest.scala | 27 ++++----
.../unit/kafka/zk/ZooKeeperTestHarness.scala | 13 ++--
14 files changed, 136 insertions(+), 108 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/c7425be5/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
index 0c5bcb7..098bfa8 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
@@ -96,7 +96,7 @@ public final class Sensor {
* bound
*/
public void record(double value, long timeMs) {
- this.lastRecordTime = time.milliseconds();
+ this.lastRecordTime = timeMs;
synchronized (this) {
// increment all the stats
for (int i = 0; i < this.stats.size(); i++)
http://git-wip-us.apache.org/repos/asf/kafka/blob/c7425be5/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
index 4e3fc29..117899b 100644
--- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
+++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
@@ -18,15 +18,17 @@
package kafka.server
import java.net.InetAddress
+import java.util.Locale
+import java.util.concurrent.TimeUnit
import kafka.api.ApiVersion
import kafka.cluster.EndPoint
+import kafka.metrics.KafkaMetricsGroup
import kafka.utils._
import org.I0Itec.zkclient.IZkStateListener
import org.apache.kafka.common.protocol.SecurityProtocol
import org.apache.zookeeper.Watcher.Event.KeeperState
-
/**
* This class registers the broker in zookeeper to allow
* other brokers and consumers to detect failures. It uses an ephemeral znode with the path:
@@ -35,14 +37,14 @@ import org.apache.zookeeper.Watcher.Event.KeeperState
* Right now our definition of health is fairly naive. If we register in zk we are healthy, otherwise
* we are dead.
*/
-class KafkaHealthcheck(private val brokerId: Int,
- private val advertisedEndpoints: Map[SecurityProtocol, EndPoint],
- private val zkUtils: ZkUtils,
- private val rack: Option[String],
- private val interBrokerProtocolVersion: ApiVersion) extends Logging {
+class KafkaHealthcheck(brokerId: Int,
+ advertisedEndpoints: Map[SecurityProtocol, EndPoint],
+ zkUtils: ZkUtils,
+ rack: Option[String],
+ interBrokerProtocolVersion: ApiVersion) extends Logging {
- val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + brokerId
- val sessionExpireListener = new SessionExpireListener
+ private val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + brokerId
+ private[server] val sessionExpireListener = new SessionExpireListener
def startup() {
zkUtils.zkClient.subscribeStateChanges(sessionExpireListener)
@@ -70,31 +72,44 @@ class KafkaHealthcheck(private val brokerId: Int,
}
/**
- * When we get a SessionExpired event, we lost all ephemeral nodes and zkclient has reestablished a
- * connection for us. We need to re-register this broker in the broker registry.
+ * When we get a SessionExpired event, it means that we have lost all ephemeral nodes and ZKClient has re-established
+ * a connection for us. We need to re-register this broker in the broker registry. We rely on `handleStateChanged`
+ * to record ZooKeeper connection state metrics.
*/
- class SessionExpireListener() extends IZkStateListener {
+ class SessionExpireListener extends IZkStateListener with KafkaMetricsGroup {
+
+ private[server] val stateToMeterMap = {
+ import KeeperState._
+ val stateToEventTypeMap = Map(
+ Disconnected -> "Disconnects",
+ SyncConnected -> "SyncConnects",
+ AuthFailed -> "AuthFailures",
+ ConnectedReadOnly -> "ReadOnlyConnects",
+ SaslAuthenticated -> "SaslAuthentications",
+ Expired -> "Expires"
+ )
+ stateToEventTypeMap.map { case (state, eventType) =>
+ state -> newMeter(s"ZooKeeper${eventType}PerSec", eventType.toLowerCase(Locale.ROOT), TimeUnit.SECONDS)
+ }
+ }
+
@throws(classOf[Exception])
- def handleStateChanged(state: KeeperState) {}
+ override def handleStateChanged(state: KeeperState) {
+ stateToMeterMap.get(state).foreach(_.mark())
+ }
- /**
- * Called after the zookeeper session has expired and a new session has been created. You would have to re-create
- * any ephemeral nodes here.
- *
- * @throws Exception
- * On any error.
- */
@throws(classOf[Exception])
- def handleNewSession() {
+ override def handleNewSession() {
info("re-registering broker info in ZK for broker " + brokerId)
register()
info("done re-registering broker")
info("Subscribing to %s path to watch for new topics".format(ZkUtils.BrokerTopicsPath))
}
- override def handleSessionEstablishmentError(error: Throwable): Unit = {
+ override def handleSessionEstablishmentError(error: Throwable) {
fatal("Could not establish session with zookeeper", error)
}
+
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c7425be5/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 36b52fd..2832ebc 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -96,7 +96,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
private var shutdownLatch = new CountDownLatch(1)
private val jmxPrefix: String = "kafka.server"
- private val reporters: java.util.List[MetricsReporter] = config.metricReporterClasses
+ private val reporters: java.util.List[MetricsReporter] = config.metricReporterClasses
reporters.add(new JmxReporter(jmxPrefix))
// This exists because the Metrics package from clients has its own Time implementation.
@@ -239,7 +239,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
else
(protocol, endpoint)
}
- kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils, config.rack, config.interBrokerProtocolVersion)
+ kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils, config.rack,
+ config.interBrokerProtocolVersion)
kafkaHealthcheck.startup()
// Now that the broker id is successfully registered via KafkaHealthcheck, checkpoint it
http://git-wip-us.apache.org/repos/asf/kafka/blob/c7425be5/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 83ff517..81eb24a 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -924,7 +924,7 @@ object ZkPath {
isNamespacePresent = true
}
- def resetNamespaceCheckedState {
+ def resetNamespaceCheckedState() {
isNamespacePresent = false
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c7425be5/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
index ade1911..7fae81e 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
@@ -84,7 +84,7 @@ class AdminClientTest extends IntegrationTestHarness with Logging {
!consumers(0).assignment().isEmpty
}, "Expected non-empty assignment")
- val group= client.describeGroup(groupId)
+ val group = client.describeGroup(groupId)
assertEquals("consumer", group.protocolType)
assertEquals("range", group.protocol)
assertEquals("Stable", group.state)
http://git-wip-us.apache.org/repos/asf/kafka/blob/c7425be5/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
index 369c3b7..5994a1d 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
@@ -51,16 +51,11 @@ class ProducerBounceTest extends KafkaServerTestHarness {
.map(KafkaConfig.fromProps(_, overridingProps))
}
- private var consumer1: SimpleConsumer = null
- private var consumer2: SimpleConsumer = null
-
private var producer1: KafkaProducer[Array[Byte],Array[Byte]] = null
private var producer2: KafkaProducer[Array[Byte],Array[Byte]] = null
private var producer3: KafkaProducer[Array[Byte],Array[Byte]] = null
- private var producer4: KafkaProducer[Array[Byte],Array[Byte]] = null
private val topic1 = "topic-1"
- private val topic2 = "topic-2"
@Before
override def setUp() {
@@ -76,7 +71,6 @@ class ProducerBounceTest extends KafkaServerTestHarness {
if (producer1 != null) producer1.close
if (producer2 != null) producer2.close
if (producer3 != null) producer3.close
- if (producer4 != null) producer4.close
super.tearDown()
}
@@ -102,9 +96,8 @@ class ProducerBounceTest extends KafkaServerTestHarness {
Thread.sleep(2000)
}
- // Make sure the producer do not see any exception
- // in returned metadata due to broker failures
- assertTrue(scheduler.failed == false)
+ // Make sure the producer do not see any exception in returned metadata due to broker failures
+ assertFalse(scheduler.failed)
// Make sure the leader still exists after bouncing brokers
(0 until numPartitions).foreach(partition => TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic1, partition))
@@ -114,7 +107,7 @@ class ProducerBounceTest extends KafkaServerTestHarness {
// Make sure the producer do not see any exception
// when draining the left messages on shutdown
- assertTrue(scheduler.failed == false)
+ assertFalse(scheduler.failed)
// double check that the leader info has been propagated after consecutive bounces
val newLeaders = (0 until numPartitions).map(i => TestUtils.waitUntilMetadataIsPropagated(servers, topic1, i))
@@ -132,8 +125,7 @@ class ProducerBounceTest extends KafkaServerTestHarness {
assertEquals("Should have fetched " + scheduler.sent + " unique messages", scheduler.sent, uniqueMessageSize)
}
- private class ProducerScheduler extends ShutdownableThread("daemon-producer", false)
- {
+ private class ProducerScheduler extends ShutdownableThread("daemon-producer", false) {
val numRecords = 1000
var sent = 0
var failed = false
http://git-wip-us.apache.org/repos/asf/kafka/blob/c7425be5/core/src/test/scala/other/kafka/DeleteZKPath.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/DeleteZKPath.scala b/core/src/test/scala/other/kafka/DeleteZKPath.scala
deleted file mode 100755
index 202bf43..0000000
--- a/core/src/test/scala/other/kafka/DeleteZKPath.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package kafka
-
-import consumer.ConsumerConfig
-import utils.ZkUtils
-import org.I0Itec.zkclient.ZkClient
-import org.apache.kafka.common.utils.Utils
-
-object DeleteZKPath {
- def main(args: Array[String]) {
- if(args.length < 2) {
- println("USAGE: " + DeleteZKPath.getClass.getName + " consumer.properties zk_path")
- System.exit(1)
- }
-
- val config = new ConsumerConfig(Utils.loadProps(args(0)))
- val zkPath = args(1)
- val zkUtils = ZkUtils(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
-
- try {
- zkUtils.deletePathRecursive(zkPath);
- System.out.println(zkPath + " is deleted")
- } catch {
- case e: Exception => System.err.println("Path not deleted " + e.printStackTrace())
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c7425be5/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
index beb5d0e..85e9cad 100755
--- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
@@ -35,7 +35,7 @@ import java.util.Properties
* End to end tests of the primitive apis against a local server
*/
@deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0")
-class PrimitiveApiTest extends ProducerConsumerTestHarness with ZooKeeperTestHarness {
+class PrimitiveApiTest extends ProducerConsumerTestHarness {
val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
http://git-wip-us.apache.org/repos/asf/kafka/blob/c7425be5/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
index e4c4697..83cce77 100644
--- a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
@@ -37,7 +37,7 @@ import org.apache.log4j.{Level, Logger}
import org.junit.Assert._
@deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0")
-class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with ZooKeeperTestHarness with Logging {
+class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging {
val numNodes = 2
val numParts = 2
val topic = "topic1"
http://git-wip-us.apache.org/repos/asf/kafka/blob/c7425be5/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index c2c670e..2cdf924 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -58,7 +58,7 @@ class ReplicaManagerTest {
@After
def tearDown() {
- metrics.close();
+ metrics.close()
}
@Test
http://git-wip-us.apache.org/repos/asf/kafka/blob/c7425be5/core/src/test/scala/unit/kafka/server/SessionExpireListenerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SessionExpireListenerTest.scala b/core/src/test/scala/unit/kafka/server/SessionExpireListenerTest.scala
new file mode 100644
index 0000000..4ffb189
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/SessionExpireListenerTest.scala
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.api.ApiVersion
+import kafka.utils.ZkUtils
+import org.I0Itec.zkclient.ZkClient
+import org.apache.zookeeper.Watcher
+import org.easymock.EasyMock
+import org.junit.{Assert, Before, Test}
+import Assert._
+import com.yammer.metrics.Metrics
+import com.yammer.metrics.core.{Meter, Metric, MetricName}
+import org.apache.kafka.common.utils.MockTime
+import scala.collection.JavaConverters._
+
+class SessionExpireListenerTest {
+
+ private var time = new MockTime
+ private val brokerId = 1
+
+ @Test
+ def testSessionExpireListenerMetrics() {
+
+ val metrics = Metrics.defaultRegistry
+
+ def checkMeterCount(name: String, expected: Long) {
+ val meter = metrics.allMetrics.asScala.collectFirst {
+ case (metricName, meter: Meter) if metricName.getName == name => meter
+ }.getOrElse(sys.error(s"Unable to find meter with name $name"))
+ assertEquals("Unexpected meter count", expected, meter.count)
+ }
+
+ val zkClient = EasyMock.mock(classOf[ZkClient])
+ val zkUtils = ZkUtils(zkClient, isZkSecurityEnabled = false)
+ import Watcher._
+ val healthcheck = new KafkaHealthcheck(brokerId, Map.empty, zkUtils, None, ApiVersion.latestVersion)
+
+ val expiresPerSecName = "ZooKeeperExpiresPerSec"
+ val disconnectsPerSecName = "ZooKeeperDisconnectsPerSec"
+ checkMeterCount(expiresPerSecName, 0)
+ checkMeterCount(disconnectsPerSecName, 0)
+
+ healthcheck.sessionExpireListener.handleStateChanged(Event.KeeperState.Expired)
+ checkMeterCount(expiresPerSecName, 1)
+ checkMeterCount(disconnectsPerSecName, 0)
+
+ healthcheck.sessionExpireListener.handleStateChanged(Event.KeeperState.Disconnected)
+ checkMeterCount(expiresPerSecName, 1)
+ checkMeterCount(disconnectsPerSecName, 1)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c7425be5/core/src/test/scala/unit/kafka/utils/CommandLineUtilsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/CommandLineUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/CommandLineUtilsTest.scala
index 6cc868d..50023f8 100644
--- a/core/src/test/scala/unit/kafka/utils/CommandLineUtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/CommandLineUtilsTest.scala
@@ -23,7 +23,7 @@ import org.junit.Test
class CommandLineUtilsTest {
- @Test (expected = classOf[java.lang.IllegalArgumentException])
+ @Test(expected = classOf[java.lang.IllegalArgumentException])
def testParseEmptyArg() {
val argArray = Array("my.empty.property=")
CommandLineUtils.parseKeyValueArgs(argArray, false)
http://git-wip-us.apache.org/repos/asf/kafka/blob/c7425be5/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala b/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
index 92fae02..7ef4550 100644
--- a/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
@@ -25,7 +25,7 @@ import org.junit.Test
class ZKPathTest extends ZooKeeperTestHarness {
- val path: String = "/some_dir"
+ val path = "/some_dir"
val zkSessionTimeoutMs = 1000
def zkConnectWithInvalidRoot: String = zkConnect + "/ghost"
@@ -33,7 +33,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
def testCreatePersistentPathThrowsException {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot,
"test", "1"))
- var zkUtils = ZkUtils(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
+ val zkUtils = ZkUtils(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
config.zkConnectionTimeoutMs, false)
try {
ZkPath.resetNamespaceCheckedState
@@ -49,7 +49,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
@Test
def testCreatePersistentPath {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
- var zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
+ val zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
try {
ZkPath.resetNamespaceCheckedState
zkUtils.createPersistentPath(path)
@@ -63,10 +63,8 @@ class ZKPathTest extends ZooKeeperTestHarness {
@Test
def testMakeSurePersistsPathExistsThrowsException {
- val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot,
- "test", "1"))
- var zkUtils = ZkUtils(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
- config.zkConnectionTimeoutMs, false)
+ val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot, "test", "1"))
+ val zkUtils = ZkUtils(zkConnectWithInvalidRoot, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
try {
ZkPath.resetNamespaceCheckedState
zkUtils.makeSurePersistentPathExists(path)
@@ -81,7 +79,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
@Test
def testMakeSurePersistsPathExists {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
- var zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
+ val zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
try {
ZkPath.resetNamespaceCheckedState
zkUtils.makeSurePersistentPathExists(path)
@@ -95,10 +93,8 @@ class ZKPathTest extends ZooKeeperTestHarness {
@Test
def testCreateEphemeralPathThrowsException {
- val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot,
- "test", "1"))
- var zkUtils = ZkUtils(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
- config.zkConnectionTimeoutMs, false)
+ val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot, "test", "1"))
+ val zkUtils = ZkUtils(zkConnectWithInvalidRoot, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
try {
ZkPath.resetNamespaceCheckedState
zkUtils.createEphemeralPathExpectConflict(path, "somedata")
@@ -113,7 +109,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
@Test
def testCreateEphemeralPathExists {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
- var zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
+ val zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
try {
ZkPath.resetNamespaceCheckedState
zkUtils.createEphemeralPathExpectConflict(path, "somedata")
@@ -129,8 +125,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
def testCreatePersistentSequentialThrowsException {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot,
"test", "1"))
- var zkUtils = ZkUtils(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
- config.zkConnectionTimeoutMs, false)
+ val zkUtils = ZkUtils(zkConnectWithInvalidRoot, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
try {
ZkPath.resetNamespaceCheckedState
zkUtils.createSequentialPersistentPath(path)
@@ -145,7 +140,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
@Test
def testCreatePersistentSequentialExists {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
- var zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
+ val zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
var actualPath: String = ""
try {
http://git-wip-us.apache.org/repos/asf/kafka/blob/c7425be5/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index 95f4e35..0de11cd 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -24,18 +24,19 @@ import org.scalatest.junit.JUnitSuite
import org.apache.kafka.common.security.JaasUtils
trait ZooKeeperTestHarness extends JUnitSuite with Logging {
- var zookeeper: EmbeddedZookeeper = null
- var zkPort: Int = -1
- var zkUtils: ZkUtils = null
+
val zkConnectionTimeout = 6000
val zkSessionTimeout = 6000
- def zkConnect: String = "127.0.0.1:" + zkPort
- def confFile: String = System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, "")
+
+ var zkUtils: ZkUtils = null
+ var zookeeper: EmbeddedZookeeper = null
+
+ def zkPort: Int = zookeeper.port
+ def zkConnect: String = s"127.0.0.1:$zkPort"
@Before
def setUp() {
zookeeper = new EmbeddedZookeeper()
- zkPort = zookeeper.port
zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, JaasUtils.isZkSecurityEnabled())
}