You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2018/01/03 22:02:26 UTC
[kafka] branch trunk updated: MINOR: Fix zk client session state
metric names and various async zk clean-ups
This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 98296f8 MINOR: Fix zk client session state metric names and various async zk clean-ups
98296f8 is described below
commit 98296f852f334067553e541d6ecdfa624f0eb689
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Wed Jan 3 14:02:14 2018 -0800
MINOR: Fix zk client session state metric names and various async zk clean-ups
- Fix zk session state and session change rate metric names: type
should be SessionExpireListener instead of KafkaHealthCheck. Test
verifying the fix was included.
- Handle missing controller in controlled shutdown in the same way as if
the broker is not registered (i.e. retry after backoff).
- Restructure BrokerInfo to reduce duplication. It now contains a
Broker instance and the JSON serde is done in BrokerIdZNode
since `Broker` does not contain all the fields.
- Remove dead code from `ZooKeeperClient.initialize` and remove
redundant `close` calls.
- Move ACL handling and persistent paths definition from ZkUtils to
ZkData (and call ZkData from ZkUtils).
- Remove ZooKeeperClientWrapper and ZooKeeperClientMetrics from
ZkUtils (avoids metrics clash if third party users create a ZkUtils
instance in the same process as the broker).
- Introduce factory method in KafkaZkClient that creates
ZooKeeperClient and remove metric name defaults from
ZooKeeperClient.
- Fix a few instances where ZooKeeperClient was not closed in tests.
- Update a few TestUtils methods to use KafkaZkClient instead of
ZkUtils.
- Add test verifying SessionState metric.
- Various clean-ups.
Testing: mostly relying on existing tests, but added a couple
of new tests as mentioned above.
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Manikumar Reddy <ma...@gmail.com>, Jun Rao <ju...@gmail.com>
Closes #4359 from ijuma/kafka-6320-kafka-health-zk-metrics-follow-up
---
.../src/main/scala/kafka/admin/ConfigCommand.scala | 5 +-
.../PreferredReplicaLeaderElectionCommand.scala | 5 +-
.../kafka/admin/ReassignPartitionsCommand.scala | 4 +-
core/src/main/scala/kafka/admin/TopicCommand.scala | 5 +-
core/src/main/scala/kafka/cluster/Broker.scala | 129 +-----------
.../common/ZkNodeChangeNotificationListener.scala | 1 -
.../scala/kafka/controller/KafkaController.scala | 4 +-
.../transaction/ProducerIdManager.scala | 2 +-
.../kafka/security/auth/SimpleAclAuthorizer.scala | 6 +-
core/src/main/scala/kafka/server/KafkaServer.scala | 114 +++++------
core/src/main/scala/kafka/utils/Json.scala | 6 +-
core/src/main/scala/kafka/utils/ZkUtils.scala | 185 +++++------------
core/src/main/scala/kafka/zk/KafkaZkClient.scala | 54 ++---
core/src/main/scala/kafka/zk/ZkData.scala | 219 ++++++++++++++++++---
.../scala/kafka/zookeeper/ZooKeeperClient.scala | 44 ++---
.../kafka/api/AdminClientIntegrationTest.scala | 35 ++--
.../AdminClientWithPoliciesIntegrationTest.scala | 12 +-
.../integration/kafka/api/BaseConsumerTest.scala | 4 +-
.../kafka/api/BaseProducerSendTest.scala | 18 +-
.../integration/kafka/api/BaseQuotaTest.scala | 2 +-
.../integration/kafka/api/ConsumerBounceTest.scala | 4 +-
.../kafka/api/EndToEndAuthorizationTest.scala | 2 +-
.../kafka/api/EndToEndClusterIdTest.scala | 2 +-
.../kafka/api/LegacyAdminClientTest.scala | 2 +-
.../integration/kafka/api/LogAppendTimeTest.scala | 2 +-
.../scala/integration/kafka/api/MetricsTest.scala | 8 +-
.../kafka/api/PlaintextConsumerTest.scala | 46 ++---
.../kafka/api/PlaintextProducerSendTest.scala | 2 +-
.../integration/kafka/api/ProducerBounceTest.scala | 6 +-
.../SaslClientsWithInvalidCredentialsTest.scala | 2 +-
.../kafka/api/TransactionsBounceTest.scala | 4 +-
.../integration/kafka/api/TransactionsTest.scala | 8 +-
.../test/scala/unit/kafka/admin/AdminTest.scala | 2 +-
.../unit/kafka/admin/DeleteConsumerGroupTest.scala | 20 +-
.../scala/unit/kafka/admin/DeleteTopicTest.scala | 2 +-
.../kafka/admin/DescribeConsumerGroupTest.scala | 2 +-
.../admin/ReassignPartitionsCommandTest.scala | 12 +-
.../unit/kafka/cluster/BrokerEndPointTest.scala | 20 +-
.../unit/kafka/consumer/ConsumerIteratorTest.scala | 3 +-
.../consumer/ZookeeperConsumerConnectorTest.scala | 2 +-
.../controller/ControllerIntegrationTest.scala | 22 +--
.../kafka/integration/AutoOffsetResetTest.scala | 2 +-
.../scala/unit/kafka/integration/FetcherTest.scala | 2 +-
.../kafka/integration/KafkaServerTestHarness.scala | 17 ++
.../unit/kafka/integration/PrimitiveApiTest.scala | 8 +-
.../consumer/ZookeeperConsumerConnectorTest.scala | 2 +-
.../scala/unit/kafka/metrics/MetricsTest.scala | 17 +-
.../unit/kafka/producer/SyncProducerTest.scala | 2 +-
.../kafka/security/auth/ZkAuthorizationTest.scala | 14 +-
.../server/AddPartitionsToTxnRequestTest.scala | 3 +-
.../server/AlterReplicaLogDirsRequestTest.scala | 4 +-
.../kafka/server/CreateTopicsRequestTest.scala | 2 +-
.../server/CreateTopicsRequestWithPolicyTest.scala | 3 +-
.../kafka/server/DeleteTopicsRequestTest.scala | 10 +-
.../kafka/server/DescribeLogDirsRequestTest.scala | 2 +-
.../unit/kafka/server/EdgeCaseRequestTest.scala | 2 +-
.../scala/unit/kafka/server/FetchRequestTest.scala | 5 +-
.../unit/kafka/server/LogDirFailureTest.scala | 4 +-
.../unit/kafka/server/MetadataRequestTest.scala | 18 +-
.../scala/unit/kafka/server/RequestQuotaTest.scala | 2 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 41 ++--
.../scala/unit/kafka/zk/AdminZkClientTest.scala | 2 +-
.../scala/unit/kafka/zk/KafkaZkClientTest.scala | 15 +-
.../scala/unit/kafka/zk/ZooKeeperTestHarness.scala | 8 +-
.../unit/kafka/zookeeper/ZooKeeperClientTest.scala | 120 ++++++-----
.../integration/InternalTopicIntegrationTest.java | 9 +-
.../streams/integration/utils/KafkaEmbedded.java | 38 ++--
67 files changed, 684 insertions(+), 695 deletions(-)
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 53aa2c1..cf01a5f 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -27,7 +27,6 @@ import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig}
import kafka.utils.CommandLineUtils
import kafka.utils.Implicits._
import kafka.zk.{AdminZkClient, KafkaZkClient}
-import kafka.zookeeper.ZooKeeperClient
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.scram._
import org.apache.kafka.common.utils.{Sanitizer, Time, Utils}
@@ -64,8 +63,8 @@ object ConfigCommand extends Config {
opts.checkArgs()
val time = Time.SYSTEM
- val zooKeeperClient = new ZooKeeperClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000, Int.MaxValue, time)
- val zkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSecurityEnabled, time)
+ val zkClient = KafkaZkClient(opts.options.valueOf(opts.zkConnectOpt), JaasUtils.isZkSecurityEnabled, 30000, 30000,
+ Int.MaxValue, time)
val adminZkClient = new AdminZkClient(zkClient)
try {
diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
index e36b25b..89ab580 100755
--- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
+++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
@@ -20,7 +20,6 @@ import joptsimple.OptionParser
import kafka.utils._
import kafka.common.AdminCommandFailedException
import kafka.zk.KafkaZkClient
-import kafka.zookeeper.ZooKeeperClient
import collection._
import org.apache.kafka.common.utils.{Time, Utils}
@@ -54,12 +53,10 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt)
val zkConnect = options.valueOf(zkConnectOpt)
- var zooKeeperClient: ZooKeeperClient = null
var zkClient: KafkaZkClient = null
try {
val time = Time.SYSTEM
- zooKeeperClient = new ZooKeeperClient(zkConnect, 30000, 30000, Int.MaxValue, time)
- zkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSecurityEnabled, time)
+ zkClient = KafkaZkClient(zkConnect, JaasUtils.isZkSecurityEnabled, 30000, 30000, Int.MaxValue, time)
val partitionsForPreferredReplicaElection =
if (!options.has(jsonFileOpt))
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index 6bcbe91..ed9414b 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -26,7 +26,6 @@ import kafka.log.LogConfig._
import kafka.server.{ConfigType, DynamicConfig}
import kafka.utils._
import kafka.zk.{AdminZkClient, KafkaZkClient}
-import kafka.zookeeper.ZooKeeperClient
import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo
import org.apache.kafka.clients.admin.{AdminClientConfig, AlterReplicaLogDirsOptions, AdminClient => JAdminClient}
import org.apache.kafka.common.TopicPartitionReplica
@@ -50,8 +49,7 @@ object ReassignPartitionsCommand extends Logging {
val opts = validateAndParseArgs(args)
val zkConnect = opts.options.valueOf(opts.zkConnectOpt)
val time = Time.SYSTEM
- val zooKeeperClient = new ZooKeeperClient(zkConnect, 30000, 30000, Int.MaxValue, time)
- val zkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSecurityEnabled, time)
+ val zkClient = KafkaZkClient(zkConnect, JaasUtils.isZkSecurityEnabled, 30000, 30000, Int.MaxValue, time)
val adminClientOpt = createAdminClient(opts)
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index dcf970a..075252d 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -27,7 +27,6 @@ import kafka.log.LogConfig
import kafka.server.ConfigType
import kafka.utils._
import kafka.zk.{AdminZkClient, KafkaZkClient}
-import kafka.zookeeper.ZooKeeperClient
import org.apache.kafka.common.errors.{InvalidTopicException, TopicExistsException}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.security.JaasUtils
@@ -55,8 +54,8 @@ object TopicCommand extends Logging {
opts.checkArgs()
val time = Time.SYSTEM
- val zooKeeperClient = new ZooKeeperClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000, Int.MaxValue, time)
- val zkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSecurityEnabled, time)
+ val zkClient = KafkaZkClient(opts.options.valueOf(opts.zkConnectOpt), JaasUtils.isZkSecurityEnabled, 30000, 30000,
+ Int.MaxValue, time)
var exitCode = 0
try {
diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala
index df3be98..425eafc 100755
--- a/core/src/main/scala/kafka/cluster/Broker.scala
+++ b/core/src/main/scala/kafka/cluster/Broker.scala
@@ -17,139 +17,16 @@
package kafka.cluster
-import kafka.common.{BrokerEndPointNotAvailableException, BrokerNotAvailableException, KafkaException}
-import kafka.utils.Json
+import kafka.common.BrokerEndPointNotAvailableException
import org.apache.kafka.common.Node
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.kafka.common.utils.Time
-
-import scala.collection.Map
-import scala.collection.JavaConverters._
/**
* A Kafka broker.
- * A broker has an id and a collection of end-points.
- * Each end-point is (host, port, protocolType).
+ * A broker has an id, a collection of end-points, an optional rack and a listener to security protocol map.
+ * Each end-point is (host, port, listenerName).
*/
-object Broker {
-
- private val HostKey = "host"
- private val PortKey = "port"
- private val VersionKey = "version"
- private val EndpointsKey = "endpoints"
- private val RackKey = "rack"
- private val JmxPortKey = "jmx_port"
- private val ListenerSecurityProtocolMapKey = "listener_security_protocol_map"
- private val TimestampKey = "timestamp"
-
- /**
- * Create a broker object from id and JSON string.
- *
- * @param id
- * @param brokerInfoString
- *
- * Version 1 JSON schema for a broker is:
- * {
- * "version":1,
- * "host":"localhost",
- * "port":9092
- * "jmx_port":9999,
- * "timestamp":"2233345666"
- * }
- *
- * Version 2 JSON schema for a broker is:
- * {
- * "version":2,
- * "host":"localhost",
- * "port":9092,
- * "jmx_port":9999,
- * "timestamp":"2233345666",
- * "endpoints":["PLAINTEXT://host1:9092", "SSL://host1:9093"]
- * }
- *
- * Version 3 JSON schema for a broker is:
- * {
- * "version":3,
- * "host":"localhost",
- * "port":9092,
- * "jmx_port":9999,
- * "timestamp":"2233345666",
- * "endpoints":["PLAINTEXT://host1:9092", "SSL://host1:9093"],
- * "rack":"dc1"
- * }
- *
- * Version 4 (current) JSON schema for a broker is:
- * {
- * "version":4,
- * "host":"localhost",
- * "port":9092,
- * "jmx_port":9999,
- * "timestamp":"2233345666",
- * "endpoints":["CLIENT://host1:9092", "REPLICATION://host1:9093"],
- * "listener_security_protocol_map":{"CLIENT":"SSL", "REPLICATION":"PLAINTEXT"},
- * "rack":"dc1"
- * }
- */
- def createBroker(id: Int, brokerInfoString: String): Broker = {
- if (brokerInfoString == null)
- throw new BrokerNotAvailableException(s"Broker id $id does not exist")
- try {
- Json.parseFull(brokerInfoString) match {
- case Some(js) =>
- val brokerInfo = js.asJsonObject
- val version = brokerInfo(VersionKey).to[Int]
-
- val endpoints =
- if (version < 1)
- throw new KafkaException(s"Unsupported version of broker registration: $brokerInfoString")
- else if (version == 1) {
- val host = brokerInfo(HostKey).to[String]
- val port = brokerInfo(PortKey).to[Int]
- val securityProtocol = SecurityProtocol.PLAINTEXT
- val endPoint = new EndPoint(host, port, ListenerName.forSecurityProtocol(securityProtocol), securityProtocol)
- Seq(endPoint)
- }
- else {
- val securityProtocolMap = brokerInfo.get(ListenerSecurityProtocolMapKey).map(
- _.to[Map[String, String]].map { case (listenerName, securityProtocol) =>
- new ListenerName(listenerName) -> SecurityProtocol.forName(securityProtocol)
- })
- val listeners = brokerInfo(EndpointsKey).to[Seq[String]]
- listeners.map(EndPoint.createEndPoint(_, securityProtocolMap))
- }
-
- val rack = brokerInfo.get(RackKey).flatMap(_.to[Option[String]])
- Broker(id, endpoints, rack)
- case None =>
- throw new BrokerNotAvailableException(s"Broker id $id does not exist")
- }
- } catch {
- case t: Throwable =>
- throw new KafkaException(s"Failed to parse the broker info from zookeeper: $brokerInfoString", t)
- }
- }
-
- def toJsonBytes(version: Int, id: Int, host: String, port: Int, advertisedEndpoints: Seq[EndPoint], jmxPort: Int,
- rack: Option[String]): Array[Byte] = {
- val jsonMap = collection.mutable.Map(VersionKey -> version,
- HostKey -> host,
- PortKey -> port,
- EndpointsKey -> advertisedEndpoints.map(_.connectionString).toBuffer.asJava,
- JmxPortKey -> jmxPort,
- TimestampKey -> Time.SYSTEM.milliseconds().toString
- )
- rack.foreach(rack => if (version >= 3) jsonMap += (RackKey -> rack))
-
- if (version >= 4) {
- jsonMap += (ListenerSecurityProtocolMapKey -> advertisedEndpoints.map { endPoint =>
- endPoint.listenerName.value -> endPoint.securityProtocol.name
- }.toMap.asJava)
- }
- Json.encodeAsBytes(jsonMap.asJava)
- }
-}
-
case class Broker(id: Int, endPoints: Seq[EndPoint], rack: Option[String]) {
private val endPointsMap = endPoints.map { endPoint =>
diff --git a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
index 0783f61..5179851 100644
--- a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
+++ b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
@@ -143,7 +143,6 @@ class ZkNodeChangeNotificationListener(private val zkClient: KafkaZkClient,
object ZkStateChangeHandler extends StateChangeHandler {
override val name: String = StateChangeHandlers.zkNodeChangeListenerHandler(seqNodeRoot)
override def afterInitializingSession(): Unit = addChangeNotification
- override def onReconnectionTimeout(): Unit = error("Reconnection timeout.")
}
}
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index ca8422e..6b5c34e 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -54,7 +54,8 @@ object KafkaController extends Logging {
}
-class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Time, metrics: Metrics, brokerInfo: BrokerInfo, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
+class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Time, metrics: Metrics, brokerInfo: BrokerInfo,
+ threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
this.logIdent = s"[Controller id=${config.brokerId}] "
private val stateChangeLogger = new StateChangeLogger(config.brokerId, inControllerContext = true, None)
@@ -146,7 +147,6 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
def startup() = {
zkClient.registerStateChangeHandler(new StateChangeHandler {
override val name: String = StateChangeHandlers.ControllerHandler
- override def onReconnectionTimeout(): Unit = error("Reconnection timeout.")
override def afterInitializingSession(): Unit = {
eventManager.put(RegisterBrokerAndReelect)
}
diff --git a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
index 9c815bc..c3c9f7c 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
@@ -19,7 +19,7 @@ package kafka.coordinator.transaction
import java.nio.charset.StandardCharsets
import kafka.common.KafkaException
-import kafka.utils.{Json, Logging, ZkUtils}
+import kafka.utils.{Json, Logging}
import kafka.zk.{KafkaZkClient, ProducerIdBlockZNode}
import scala.collection.JavaConverters._
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index 74bc809..c439f5e 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -28,7 +28,6 @@ import kafka.server.KafkaConfig
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
import kafka.utils._
import kafka.zk.{AclChangeNotificationSequenceZNode, AclChangeNotificationZNode, KafkaZkClient}
-import kafka.zookeeper.ZooKeeperClient
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{SecurityUtils, Time}
@@ -92,9 +91,8 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
val zkMaxInFlightRequests = configs.get(SimpleAclAuthorizer.ZkMaxInFlightRequests).map(_.toString.toInt).getOrElse(kafkaConfig.zkMaxInFlightRequests)
val time = Time.SYSTEM
- val zooKeeperClient = new ZooKeeperClient(zkUrl, zkSessionTimeOutMs, zkConnectionTimeoutMs, zkMaxInFlightRequests,
- time, "kafka.security", "SimpleAclAuthorizer")
- zkClient = new KafkaZkClient(zooKeeperClient, kafkaConfig.zkEnableSecureAcls, time)
+ zkClient = KafkaZkClient(zkUrl, kafkaConfig.zkEnableSecureAcls, zkSessionTimeOutMs, zkConnectionTimeoutMs,
+ zkMaxInFlightRequests, time, "kafka.security", "SimpleAclAuthorizer")
zkClient.createAclPaths()
loadCache()
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 8643233..d073584 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -25,8 +25,8 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import com.yammer.metrics.core.Gauge
import kafka.api.KAFKA_0_9_0
-import kafka.cluster.{Broker, EndPoint}
-import kafka.common.{GenerateBrokerIdException, InconsistentBrokerIdException, KafkaException}
+import kafka.cluster.Broker
+import kafka.common.{GenerateBrokerIdException, InconsistentBrokerIdException}
import kafka.controller.KafkaController
import kafka.coordinator.group.GroupCoordinator
import kafka.coordinator.transaction.TransactionCoordinator
@@ -37,14 +37,12 @@ import kafka.security.CredentialProvider
import kafka.security.auth.Authorizer
import kafka.utils._
import kafka.zk.{BrokerInfo, KafkaZkClient}
-import kafka.zookeeper.ZooKeeperClient
import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, NetworkClient, NetworkClientUtils}
import org.apache.kafka.common.internals.ClusterResourceListeners
import org.apache.kafka.common.metrics.{JmxReporter, Metrics, _}
import org.apache.kafka.common.network._
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{ControlledShutdownRequest, ControlledShutdownResponse}
-import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.security.{JaasContext, JaasUtils}
import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time}
import org.apache.kafka.common.{ClusterResource, Node}
@@ -95,7 +93,8 @@ object KafkaServer {
* Represents the lifecycle of a single Kafka broker. Handles all functionality required
* to start up and shutdown a single Kafka node.
*/
-class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNamePrefix: Option[String] = None, kafkaMetricsReporters: Seq[KafkaMetricsReporter] = List()) extends Logging with KafkaMetricsGroup {
+class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNamePrefix: Option[String] = None,
+ kafkaMetricsReporters: Seq[KafkaMetricsReporter] = List()) extends Logging with KafkaMetricsGroup {
private val startupComplete = new AtomicBoolean(false)
private val isShuttingDown = new AtomicBoolean(false)
private val isStartingUp = new AtomicBoolean(false)
@@ -238,31 +237,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
replicaManager = createReplicaManager(isShuttingDown)
replicaManager.startup()
- /* tell everyone we are alive */
- val listeners = config.advertisedListeners.map { endpoint =>
- if (endpoint.port == 0)
- endpoint.copy(port = socketServer.boundPort(endpoint.listenerName))
- else
- endpoint
- }
-
- val updatedEndpoints = listeners.map(endpoint =>
- if (endpoint.host == null || endpoint.host.trim.isEmpty)
- endpoint.copy(host = InetAddress.getLocalHost.getCanonicalHostName)
- else
- endpoint
- )
-
- // the default host and port are here for compatibility with older clients that only support PLAINTEXT
- // we choose the first plaintext port, if there is one
- // or we register an empty endpoint, which means that older clients will not be able to connect
- val plaintextEndpoint = updatedEndpoints.find(_.securityProtocol == SecurityProtocol.PLAINTEXT).getOrElse(
- new EndPoint(null, -1, null, null))
-
- val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt
- val brokerInfo = new BrokerInfo(config.brokerId,
- plaintextEndpoint.host, plaintextEndpoint.port,
- updatedEndpoints, jmxPort, config.rack, config.interBrokerProtocolVersion)
+ val brokerInfo = createBrokerInfo
zkClient.registerBrokerInZk(brokerInfo)
// Now that the broker id is successfully registered, checkpoint it
@@ -342,6 +317,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
private def initZkClient(time: Time): Unit = {
info(s"Connecting to zookeeper on ${config.zkConnect}")
+ def createZkClient(zkConnect: String, isSecure: Boolean) =
+ KafkaZkClient(zkConnect, isSecure, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
+ config.zkMaxInFlightRequests, time)
+
val chrootIndex = config.zkConnect.indexOf("/")
val chrootOption = {
if (chrootIndex > 0) Some(config.zkConnect.substring(chrootIndex))
@@ -357,24 +336,39 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
// make sure chroot path exists
chrootOption.foreach { chroot =>
val zkConnForChrootCreation = config.zkConnect.substring(0, chrootIndex)
- val zooKeeperClient = new ZooKeeperClient(zkConnForChrootCreation, config.zkSessionTimeoutMs,
- config.zkConnectionTimeoutMs, config.zkMaxInFlightRequests, time)
- val zkClient = new KafkaZkClient(zooKeeperClient, secureAclsEnabled, time)
+ val zkClient = createZkClient(zkConnForChrootCreation, secureAclsEnabled)
zkClient.makeSurePersistentPathExists(chroot)
info(s"Created zookeeper path $chroot")
zkClient.close()
}
- val zooKeeperClient = new ZooKeeperClient(config.zkConnect, config.zkSessionTimeoutMs,
- config.zkConnectionTimeoutMs, config.zkMaxInFlightRequests, time)
- _zkClient = new KafkaZkClient(zooKeeperClient, secureAclsEnabled, time)
+ _zkClient = createZkClient(config.zkConnect, secureAclsEnabled)
_zkClient.createTopLevelPaths()
}
- def getOrGenerateClusterId(zkClient: KafkaZkClient): String = {
+ private def getOrGenerateClusterId(zkClient: KafkaZkClient): String = {
zkClient.getClusterId.getOrElse(zkClient.createOrGetClusterId(CoreUtils.generateUuidAsBase64))
}
+ private def createBrokerInfo: BrokerInfo = {
+ val listeners = config.advertisedListeners.map { endpoint =>
+ if (endpoint.port == 0)
+ endpoint.copy(port = socketServer.boundPort(endpoint.listenerName))
+ else
+ endpoint
+ }
+
+ val updatedEndpoints = listeners.map(endpoint =>
+ if (endpoint.host == null || endpoint.host.trim.isEmpty)
+ endpoint.copy(host = InetAddress.getLocalHost.getCanonicalHostName)
+ else
+ endpoint
+ )
+
+ val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt
+ BrokerInfo(Broker(config.brokerId, updatedEndpoints, config.rack), config.interBrokerProtocolVersion, jmxPort)
+ }
+
/**
* Performs controlled shutdown
*/
@@ -438,22 +432,29 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
// 1. Find the controller and establish a connection to it.
// Get the current controller info. This is to ensure we use the most recent info to issue the
- // controlled shutdown request
- val controllerId = zkClient.getControllerId.getOrElse(throw new KafkaException("Controller doesn't exist"))
- //If this method returns None ignore and try again
- zkClient.getBroker(controllerId).foreach { broker =>
- // if this is the first attempt, if the controller has changed or if an exception was thrown in a previous
- // attempt, connect to the most recent controller
- if (ioException || broker != prevController) {
-
- ioException = false
-
- if (prevController != null)
- networkClient.close(node(prevController).idString)
-
- prevController = broker
- metadataUpdater.setNodes(Seq(node(prevController)).asJava)
- }
+ // controlled shutdown request.
+ // If the controller id or the broker registration are missing, we sleep and retry (if there are remaining retries)
+ zkClient.getControllerId match {
+ case Some(controllerId) =>
+ zkClient.getBroker(controllerId) match {
+ case Some(broker) =>
+ // if this is the first attempt, if the controller has changed or if an exception was thrown in a previous
+ // attempt, connect to the most recent controller
+ if (ioException || broker != prevController) {
+
+ ioException = false
+
+ if (prevController != null)
+ networkClient.close(node(prevController).idString)
+
+ prevController = broker
+ metadataUpdater.setNodes(Seq(node(prevController)).asJava)
+ }
+ case None =>
+ info(s"Broker registration for controller $controllerId is not available (i.e. the Controller's ZK session expired)")
+ }
+ case None =>
+ info("No controller registered in ZooKeeper")
}
// 2. issue a controlled shutdown to the controller
@@ -477,14 +478,15 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
info("Controlled shutdown succeeded")
}
else {
- info("Remaining partitions to move: %s".format(shutdownResponse.partitionsRemaining.asScala.mkString(",")))
- info("Error code from controller: %d".format(shutdownResponse.error.code))
+ info(s"Remaining partitions to move: ${shutdownResponse.partitionsRemaining.asScala.mkString(",")}")
+ info(s"Error from controller: ${shutdownResponse.error}")
}
}
catch {
case ioe: IOException =>
ioException = true
- warn("Error during controlled shutdown, possibly because leader movement took longer than the configured controller.socket.timeout.ms and/or request.timeout.ms: %s".format(ioe.getMessage))
+ warn("Error during controlled shutdown, possibly because leader movement took longer than the " +
+ s"configured controller.socket.timeout.ms and/or request.timeout.ms: ${ioe.getMessage}")
// ignore and try again
}
}
diff --git a/core/src/main/scala/kafka/utils/Json.scala b/core/src/main/scala/kafka/utils/Json.scala
index e8e7d8a..cbb8dac 100644
--- a/core/src/main/scala/kafka/utils/Json.scala
+++ b/core/src/main/scala/kafka/utils/Json.scala
@@ -16,8 +16,6 @@
*/
package kafka.utils
-import java.nio.charset.StandardCharsets
-
import com.fasterxml.jackson.core.JsonProcessingException
import com.fasterxml.jackson.databind.ObjectMapper
import kafka.utils.json.JsonValue
@@ -54,6 +52,10 @@ object Json {
try Option(mapper.readTree(input)).map(JsonValue(_))
catch { case _: JsonProcessingException => None }
+ def tryParseBytes(input: Array[Byte]): Either[JsonProcessingException, JsonValue] =
+ try Right(mapper.readTree(input)).right.map(JsonValue(_))
+ catch { case e: JsonProcessingException => Left(e) }
+
/**
* Encode an object into a JSON string. This method accepts any type T where
* T => null | Boolean | String | Number | Map[String, T] | Array[T] | Iterable[T]
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 1f665e6..ac8b932 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -18,7 +18,7 @@
package kafka.utils
import java.nio.charset.StandardCharsets
-import java.util.concurrent.{CountDownLatch, TimeUnit}
+import java.util.concurrent.CountDownLatch
import kafka.admin._
import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1, LeaderAndIsr}
@@ -26,15 +26,11 @@ import kafka.cluster._
import kafka.common.{KafkaException, NoEpochForPartitionException, TopicAndPartition}
import kafka.consumer.{ConsumerThreadId, TopicCount}
import kafka.controller.{LeaderIsrAndControllerEpoch, ReassignedPartitionsContext}
-import kafka.metrics.KafkaMetricsGroup
-import kafka.server.ConfigType
-import kafka.utils.ZkUtils._
-import com.yammer.metrics.core.MetricName
+import kafka.zk.{BrokerIdZNode, ZkData}
import org.I0Itec.zkclient.exception.{ZkBadVersionException, ZkException, ZkMarshallingError, ZkNoNodeException, ZkNodeExistsException}
import org.I0Itec.zkclient.serialize.ZkSerializer
import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient, ZkConnection}
import org.apache.kafka.common.config.ConfigException
-import org.apache.kafka.common.utils.Time
import org.apache.zookeeper.AsyncCallback.{DataCallback, StringCallback}
import org.apache.zookeeper.KeeperException.Code
import org.apache.zookeeper.data.{ACL, Stat}
@@ -71,28 +67,10 @@ object ZkUtils {
val ConfigChangesPath = s"$ConfigPath/changes"
val ConfigUsersPath = s"$ConfigPath/users"
val ProducerIdBlockPath = "/latest_producer_id_block"
- // Important: it is necessary to add any new top level Zookeeper path to the Seq
- val SecureZkRootPaths = Seq(AdminPath,
- BrokersPath,
- ClusterPath,
- ConfigPath,
- ControllerPath,
- ControllerEpochPath,
- IsrChangeNotificationPath,
- KafkaAclPath,
- KafkaAclChangesPath,
- ProducerIdBlockPath,
- LogDirEventNotificationPath)
-
- // Important: it is necessary to add any new top level Zookeeper path that contains
- // sensitive information that should not be world readable to the Seq
- val SensitiveZkRootPaths = Seq(ConfigUsersPath)
-
- def withMetrics(zkUrl: String, sessionTimeout: Int, connectionTimeout: Int, isZkSecurityEnabled: Boolean,
- time: Time): ZkUtils = {
- val (zkClient, zkConnection) = createZkClientAndConnection(zkUrl, sessionTimeout, connectionTimeout)
- new ZkUtils(new ZooKeeperClientMetrics(zkClient, time), zkConnection, isZkSecurityEnabled)
- }
+
+ val SecureZkRootPaths = ZkData.SecureRootPaths
+
+ val SensitiveZkRootPaths = ZkData.SensitiveRootPaths
def apply(zkUrl: String, sessionTimeout: Int, connectionTimeout: Int, isZkSecurityEnabled: Boolean): ZkUtils = {
val (zkClient, zkConnection) = createZkClientAndConnection(zkUrl, sessionTimeout, connectionTimeout)
@@ -117,24 +95,12 @@ object ZkUtils {
(zkClient, zkConnection)
}
- def sensitivePath(path: String): Boolean = {
- path != null && SensitiveZkRootPaths.exists(path.startsWith(_))
- }
-
@deprecated("This is deprecated, use defaultAcls(isSecure, path) which doesn't make sensitive data world readable", since = "0.10.2.1")
def DefaultAcls(isSecure: Boolean): java.util.List[ACL] = defaultAcls(isSecure, "")
- def defaultAcls(isSecure: Boolean, path: String): java.util.List[ACL] = {
- if (isSecure) {
- val list = new java.util.ArrayList[ACL]
- list.addAll(ZooDefs.Ids.CREATOR_ALL_ACL)
- if (!sensitivePath(path)) {
- list.addAll(ZooDefs.Ids.READ_ACL_UNSAFE)
- }
- list
- } else
- ZooDefs.Ids.OPEN_ACL_UNSAFE
- }
+ def sensitivePath(path: String): Boolean = ZkData.sensitivePath(path)
+
+ def defaultAcls(isSecure: Boolean, path: String): java.util.List[ACL] = ZkData.defaultAcls(isSecure, path).asJava
def maybeDeletePath(zkUrl: String, dir: String) {
try {
@@ -234,72 +200,25 @@ object ZkUtils {
}
}
-class ZooKeeperClientWrapper(val zkClient: ZkClient) {
- def apply[T](method: ZkClient => T): T = method(zkClient)
- def close(): Unit = {
- if(zkClient != null)
- zkClient.close()
- }
-}
-
-class ZooKeeperClientMetrics(zkClient: ZkClient, val time: Time)
- extends ZooKeeperClientWrapper(zkClient) with KafkaMetricsGroup {
- private val latencyMetric = newHistogram("ZooKeeperRequestLatencyMs")
-
- override def metricName(name: String, metricTags: scala.collection.Map[String, String]): MetricName = {
- explicitMetricName("kafka.server", "ZooKeeperClientMetrics", name, metricTags)
- }
-
- override def apply[T](method: ZkClient => T): T = {
- val startNs = time.nanoseconds
- val ret =
- try method(zkClient)
- finally latencyMetric.update(TimeUnit.NANOSECONDS.toMillis(time.nanoseconds - startNs))
- ret
- }
-
- override def close(): Unit = {
- if (latencyMetric != null)
- removeMetric("ZooKeeperRequestLatencyMs")
- super.close()
- }
-}
-
/**
* Legacy class for interacting with ZooKeeper. Whenever possible, ``KafkaZkClient`` should be used instead.
*/
-class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
+class ZkUtils(val zkClient: ZkClient,
val zkConnection: ZkConnection,
val isSecure: Boolean) extends Logging {
+ import ZkUtils._
+
// These are persistent ZK paths that should exist on kafka broker startup.
- val persistentZkPaths = Seq(ConsumersPath,
- BrokerIdsPath,
- BrokerTopicsPath,
- ConfigChangesPath,
- getEntityConfigRootPath(ConfigType.Topic),
- getEntityConfigRootPath(ConfigType.Client),
- DeleteTopicsPath,
- BrokerSequenceIdPath,
- IsrChangeNotificationPath,
- ProducerIdBlockPath,
- LogDirEventNotificationPath)
-
- /** Present for compatibility */
- def this(zkClient: ZkClient, zkConnection: ZkConnection, isSecure: Boolean) =
- this(new ZooKeeperClientWrapper(zkClient), zkConnection, isSecure)
+ val persistentZkPaths = ZkData.PersistentZkPaths
// Visible for testing
- val zkPath = new ZkPath(zkClientWrap)
-
- import ZkUtils._
+ val zkPath = new ZkPath(zkClient)
@deprecated("This is deprecated, use defaultAcls(path) which doesn't make sensitive data world readable", since = "0.10.2.1")
val DefaultAcls: java.util.List[ACL] = ZkUtils.defaultAcls(isSecure, "")
def defaultAcls(path: String): java.util.List[ACL] = ZkUtils.defaultAcls(isSecure, path)
- def zkClient: ZkClient = zkClientWrap.zkClient
-
def getController(): Int = {
readDataMaybeNull(ControllerPath)._1 match {
case Some(controller) => parseControllerId(controller)
@@ -470,7 +389,7 @@ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
val brokerIdPath = BrokerIdsPath + "/" + id
// see method documentation for reason why we do this
val version = if (apiVersion >= KAFKA_0_10_0_IV1) 4 else 2
- val json = new String(Broker.toJsonBytes(version, id, host, port, advertisedEndpoints, jmxPort, rack),
+ val json = new String(BrokerIdZNode.encode(version, host, port, advertisedEndpoints, jmxPort, rack),
StandardCharsets.UTF_8)
registerBrokerInZk(brokerIdPath, json)
@@ -483,7 +402,7 @@ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
brokerInfo,
zkConnection.getZookeeper,
isSecure)
- zkClientWrap(_ => zkCheckedEphemeral.create())
+ zkCheckedEphemeral.create()
} catch {
case _: ZkNodeExistsException =>
throw new RuntimeException("A broker is already registered on the path " + brokerIdPath
@@ -524,7 +443,7 @@ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
acls
}
- if (!zkClientWrap(zkClient => zkClient.exists(path)))
+ if (!zkClient.exists(path))
zkPath.createPersistent(path, createParents = true, acl) //won't throw NoNodeException or NodeExistsException
}
@@ -607,7 +526,7 @@ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
def updatePersistentPath(path: String, data: String, acls: java.util.List[ACL] = UseDefaultAcls) = {
val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls
try {
- zkClientWrap(_.writeData(path, data))
+ zkClient.writeData(path, data)
} catch {
case _: ZkNoNodeException =>
createParentPath(path)
@@ -615,7 +534,7 @@ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
zkPath.createPersistent(path, data, acl)
} catch {
case _: ZkNodeExistsException =>
- zkClientWrap(_.writeData(path, data))
+ zkClient.writeData(path, data)
}
}
}
@@ -631,7 +550,7 @@ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
def conditionalUpdatePersistentPath(path: String, data: String, expectVersion: Int,
optionalChecker:Option[(ZkUtils, String, String) => (Boolean,Int)] = None): (Boolean, Int) = {
try {
- val stat = zkClientWrap(_.writeDataReturnStat(path, data, expectVersion))
+ val stat = zkClient.writeDataReturnStat(path, data, expectVersion)
debug("Conditional update of path %s with value %s and expected version %d succeeded, returning the new version: %d"
.format(path, data, expectVersion, stat.getVersion))
(true, stat.getVersion)
@@ -658,7 +577,7 @@ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
*/
def conditionalUpdatePersistentPathIfExists(path: String, data: String, expectVersion: Int): (Boolean, Int) = {
try {
- val stat = zkClientWrap(_.writeDataReturnStat(path, data, expectVersion))
+ val stat = zkClient.writeDataReturnStat(path, data, expectVersion)
debug("Conditional update of path %s with value %s and expected version %d succeeded, returning the new version: %d"
.format(path, data, expectVersion, stat.getVersion))
(true, stat.getVersion)
@@ -678,7 +597,7 @@ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
def updateEphemeralPath(path: String, data: String, acls: java.util.List[ACL] = UseDefaultAcls): Unit = {
val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls
try {
- zkClientWrap(_.writeData(path, data))
+ zkClient.writeData(path, data)
} catch {
case _: ZkNoNodeException =>
createParentPath(path)
@@ -687,7 +606,7 @@ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
}
def deletePath(path: String): Boolean = {
- zkClientWrap(_.delete(path))
+ zkClient.delete(path)
}
/**
@@ -696,7 +615,7 @@ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
*/
def conditionalDeletePath(path: String, expectedVersion: Int): Boolean = {
try {
- zkClientWrap(_.delete(path, expectedVersion))
+ zkClient.delete(path, expectedVersion)
true
} catch {
case _: ZkBadVersionException => false
@@ -704,37 +623,37 @@ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
}
def deletePathRecursive(path: String) {
- zkClientWrap(_.deleteRecursive(path))
+ zkClient.deleteRecursive(path)
}
def subscribeDataChanges(path: String, listener: IZkDataListener): Unit =
- zkClientWrap(_.subscribeDataChanges(path, listener))
+ zkClient.subscribeDataChanges(path, listener)
def unsubscribeDataChanges(path: String, dataListener: IZkDataListener): Unit =
- zkClientWrap(_.unsubscribeDataChanges(path, dataListener))
+ zkClient.unsubscribeDataChanges(path, dataListener)
def subscribeStateChanges(listener: IZkStateListener): Unit =
- zkClientWrap(_.subscribeStateChanges(listener))
+ zkClient.subscribeStateChanges(listener)
def subscribeChildChanges(path: String, listener: IZkChildListener): Option[Seq[String]] =
- Option(zkClientWrap(_.subscribeChildChanges(path, listener))).map(_.asScala)
+ Option(zkClient.subscribeChildChanges(path, listener)).map(_.asScala)
def unsubscribeChildChanges(path: String, childListener: IZkChildListener): Unit =
- zkClientWrap(_.unsubscribeChildChanges(path, childListener))
+ zkClient.unsubscribeChildChanges(path, childListener)
def unsubscribeAll(): Unit =
- zkClientWrap(_.unsubscribeAll())
+ zkClient.unsubscribeAll()
def readData(path: String): (String, Stat) = {
val stat: Stat = new Stat()
- val dataStr: String = zkClientWrap(_.readData[String](path, stat))
+ val dataStr: String = zkClient.readData[String](path, stat)
(dataStr, stat)
}
def readDataMaybeNull(path: String): (Option[String], Stat) = {
val stat = new Stat()
val dataAndStat = try {
- val dataStr = zkClientWrap(_.readData[String](path, stat))
+ val dataStr = zkClient.readData[String](path, stat)
(Some(dataStr), stat)
} catch {
case _: ZkNoNodeException =>
@@ -746,18 +665,18 @@ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
def readDataAndVersionMaybeNull(path: String): (Option[String], Int) = {
val stat = new Stat()
try {
- val data = zkClientWrap(_.readData[String](path, stat))
+ val data = zkClient.readData[String](path, stat)
(Option(data), stat.getVersion)
} catch {
case _: ZkNoNodeException => (None, stat.getVersion)
}
}
- def getChildren(path: String): Seq[String] = zkClientWrap(_.getChildren(path)).asScala
+ def getChildren(path: String): Seq[String] = zkClient.getChildren(path).asScala
def getChildrenParentMayNotExist(path: String): Seq[String] = {
try {
- zkClientWrap(_.getChildren(path)).asScala
+ zkClient.getChildren(path).asScala
} catch {
case _: ZkNoNodeException => Nil
}
@@ -767,7 +686,7 @@ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
* Check if the given path exists
*/
def pathExists(path: String): Boolean = {
- zkClientWrap(_.exists(path))
+ zkClient.exists(path)
}
def isTopicMarkedForDeletion(topic: String): Boolean = {
@@ -779,11 +698,15 @@ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
val nodes = getChildrenParentMayNotExist(BrokerIdsPath)
for (node <- nodes) {
val brokerZKString = readData(BrokerIdsPath + "/" + node)._1
- cluster.add(Broker.createBroker(node.toInt, brokerZKString))
+ cluster.add(parseBrokerJson(node.toInt, brokerZKString))
}
cluster
}
+ private def parseBrokerJson(id: Int, jsonString: String): Broker = {
+ BrokerIdZNode.decode(id, jsonString.getBytes(StandardCharsets.UTF_8)).broker
+ }
+
def getPartitionLeaderAndIsrForTopics(topicAndPartitions: Set[TopicAndPartition]): mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = {
val ret = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch]
for(topicAndPartition <- topicAndPartitions) {
@@ -894,9 +817,9 @@ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
def deletePartition(brokerId: Int, topic: String) {
val brokerIdPath = BrokerIdsPath + "/" + brokerId
- zkClientWrap(_.delete(brokerIdPath))
+ zkClient.delete(brokerIdPath)
val brokerPartTopicPath = ZkUtils.BrokerTopicsPath + "/" + topic + "/" + brokerId
- zkClientWrap(_.delete(brokerPartTopicPath))
+ zkClient.delete(brokerPartTopicPath)
}
@deprecated("This method has been deprecated and will be removed in a future release.", "0.11.0.0")
@@ -944,7 +867,7 @@ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
*/
def getBrokerInfo(brokerId: Int): Option[Broker] = {
readDataMaybeNull(BrokerIdsPath + "/" + brokerId)._1 match {
- case Some(brokerInfo) => Some(Broker.createBroker(brokerId, brokerInfo))
+ case Some(brokerInfo) => Some(parseBrokerJson(brokerId, brokerInfo))
case None => None
}
}
@@ -956,7 +879,7 @@ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
*/
def getSequenceId(path: String, acls: java.util.List[ACL] = UseDefaultAcls): Int = {
val acl = if (acls == UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls
- def writeToZk: Int = zkClientWrap(_.writeDataReturnStat(path, "", -1)).getVersion
+ def writeToZk: Int = zkClient.writeDataReturnStat(path, "", -1).getVersion
try {
writeToZk
} catch {
@@ -1020,7 +943,7 @@ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
}
def close() {
- zkClientWrap.close()
+ zkClient.close()
}
}
@@ -1040,7 +963,7 @@ private object ZKStringSerializer extends ZkSerializer {
@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
class ZKGroupDirs(val group: String) {
- def consumerDir = ConsumersPath
+ def consumerDir = ZkUtils.ConsumersPath
def consumerGroupDir = consumerDir + "/" + group
def consumerRegistryDir = consumerGroupDir + "/ids"
def consumerGroupOffsetsDir = consumerGroupDir + "/offsets"
@@ -1076,7 +999,7 @@ class ZKConfig(props: VerifiableProperties) {
val zkSyncTimeMs = props.getInt(ZkSyncTimeMsProp, 2000)
}
-class ZkPath(clientWrap: ZooKeeperClientWrapper) {
+class ZkPath(zkClient: ZkClient) {
@volatile private var isNamespacePresent: Boolean = false
@@ -1084,7 +1007,7 @@ class ZkPath(clientWrap: ZooKeeperClientWrapper) {
if (isNamespacePresent)
return
- if (!clientWrap(_.exists("/"))) {
+ if (!zkClient.exists("/")) {
throw new ConfigException("Zookeeper namespace does not exist")
}
isNamespacePresent = true
@@ -1096,22 +1019,22 @@ class ZkPath(clientWrap: ZooKeeperClientWrapper) {
def createPersistent(path: String, data: Object, acls: java.util.List[ACL]) {
checkNamespace()
- clientWrap(_.createPersistent(path, data, acls))
+ zkClient.createPersistent(path, data, acls)
}
def createPersistent(path: String, createParents: Boolean, acls: java.util.List[ACL]) {
checkNamespace()
- clientWrap(_.createPersistent(path, createParents, acls))
+ zkClient.createPersistent(path, createParents, acls)
}
def createEphemeral(path: String, data: Object, acls: java.util.List[ACL]) {
checkNamespace()
- clientWrap(_.createEphemeral(path, data, acls))
+ zkClient.createEphemeral(path, data, acls)
}
def createPersistentSequential(path: String, data: Object, acls: java.util.List[ACL]): String = {
checkNamespace()
- clientWrap(_.createPersistentSequential(path, data, acls))
+ zkClient.createPersistentSequential(path, data, acls)
}
}
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 8c3f018..8179300 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -28,7 +28,7 @@ import kafka.metrics.KafkaMetricsGroup
import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls
import kafka.security.auth.{Acl, Resource, ResourceType}
import kafka.server.ConfigType
-import kafka.utils._
+import kafka.utils.Logging
import kafka.zookeeper._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.utils.Time
@@ -49,7 +49,8 @@ import scala.collection.{Seq, mutable}
* easier to quickly migrate away from `ZkUtils`. We should revisit this once the migration is completed and tests are
* in place. We should also consider whether a monolithic [[kafka.zk.ZkData]] is the way to go.
*/
-class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean, time: Time) extends Logging with KafkaMetricsGroup {
+class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean, time: Time) extends AutoCloseable with
+ Logging with KafkaMetricsGroup {
override def metricName(name: String, metricTags: scala.collection.Map[String, String]): MetricName = {
explicitMetricName("kafka.server", "ZooKeeperClientMetrics", name, metricTags)
@@ -75,12 +76,11 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean, time: T
}
def registerBrokerInZk(brokerInfo: BrokerInfo): Unit = {
- val brokerIdPath = brokerInfo.path()
- checkedEphemeralCreate(brokerIdPath, brokerInfo.encode())
- info("Registered broker %d at path %s with addresses: %s".format(brokerInfo.id, brokerIdPath, brokerInfo.endpoints()))
+ val path = brokerInfo.path
+ checkedEphemeralCreate(path, brokerInfo.toJsonBytes)
+ info(s"Registered broker ${brokerInfo.broker.id} at path $path with addresses: ${brokerInfo.broker.endPoints}")
}
-
/**
* Gets topic partition states for the given partitions.
* @param partitions the partitions for which we want ot get states.
@@ -292,7 +292,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean, time: T
val brokerId = getDataResponse.ctx.get.asInstanceOf[Int]
getDataResponse.resultCode match {
case Code.OK =>
- Option(BrokerIdZNode.decode(brokerId, getDataResponse.data))
+ Option(BrokerIdZNode.decode(brokerId, getDataResponse.data).broker)
case Code.NONODE => None
case _ => throw getDataResponse.resultException.get
}
@@ -308,7 +308,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean, time: T
val getDataResponse = retryRequestUntilConnected(getDataRequest)
getDataResponse.resultCode match {
case Code.OK =>
- Option(BrokerIdZNode.decode(brokerId, getDataResponse.data))
+ Option(BrokerIdZNode.decode(brokerId, getDataResponse.data).broker)
case Code.NONODE => None
case _ => throw getDataResponse.resultException.get
}
@@ -453,7 +453,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean, time: T
* @param topics the topics whose partitions we wish to get the assignments for.
* @return the partition assignment for each partition from the given topics.
*/
- def getPartitionAssignmentForTopics(topics: Set[String]): Map[String, Map[Int, Seq[Int]]] = {
+ def getPartitionAssignmentForTopics(topics: Set[String]): Map[String, Map[Int, Seq[Int]]] = {
val getDataRequests = topics.map(topic => GetDataRequest(TopicZNode.path(topic), ctx = Some(topic)))
val getDataResponses = retryRequestsUntilConnected(getDataRequests.toSeq)
getDataResponses.flatMap { getDataResponse =>
@@ -657,7 +657,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean, time: T
val getDataRequest = GetDataRequest(ReassignPartitionsZNode.path)
val getDataResponse = retryRequestUntilConnected(getDataRequest)
getDataResponse.resultCode match {
- case Code.OK => ReassignPartitionsZNode.decode(getDataResponse.data)
+ case Code.OK => ReassignPartitionsZNode.decode(getDataResponse.data)
case Code.NONODE => Map.empty
case _ => throw getDataResponse.resultException.get
}
@@ -756,13 +756,8 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean, time: T
* @param partition
* @return optional integer if the leader exists and None otherwise.
*/
- def getLeaderForPartition(partition: TopicPartition): Option[Int] = {
- val leaderIsrEpoch = getTopicPartitionState(partition)
- if (leaderIsrEpoch.isDefined)
- Option(leaderIsrEpoch.get.leaderAndIsr.leader)
- else
- None
- }
+ def getLeaderForPartition(partition: TopicPartition): Option[Int] =
+ getTopicPartitionState(partition).map(_.leaderAndIsr.leader)
/**
* Gets the isr change notifications as strings. These strings are the znode names and not the absolute znode path.
@@ -1217,7 +1212,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean, time: T
}
/**
- * Generate a borker id by updating the broker sequence id path in ZK and return the version of the path.
+ * Generate a broker id by updating the broker sequence id path in ZK and return the version of the path.
* The version is incremented by one on every update starting from 1.
* @return sequence number as the broker id
*/
@@ -1351,10 +1346,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean, time: T
retryRequestsUntilConnected(getDataRequests)
}
- private def acls(path: String): Seq[ACL] = {
- import scala.collection.JavaConverters._
- ZkUtils.defaultAcls(isSecure, path).asScala
- }
+ private def acls(path: String): Seq[ACL] = ZkData.defaultAcls(isSecure, path)
private def retryRequestUntilConnected[Req <: AsyncRequest](request: Req): Req#Response = {
retryRequestsUntilConnected(Seq(request)).head
@@ -1444,4 +1436,22 @@ object KafkaZkClient {
case class UpdateLeaderAndIsrResult(successfulPartitions: Map[TopicPartition, LeaderAndIsr],
partitionsToRetry: Seq[TopicPartition],
failedPartitions: Map[TopicPartition, Exception])
+
+ /**
+ * Create an instance of this class with the provided parameters.
+ *
+ * The metric group and type are preserved by default for compatibility with previous versions.
+ */
+ def apply(connectString: String,
+ isSecure: Boolean,
+ sessionTimeoutMs: Int,
+ connectionTimeoutMs: Int,
+ maxInFlightRequests: Int,
+ time: Time,
+ metricGroup: String = "kafka.server",
+ metricType: String = "SessionExpireListener") = {
+ val zooKeeperClient = new ZooKeeperClient(connectString, sessionTimeoutMs, connectionTimeoutMs, maxInFlightRequests,
+ time, metricGroup, metricType)
+ new KafkaZkClient(zooKeeperClient, isSecure, time)
+ }
}
diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala
index 9578129..2c86c2c 100644
--- a/core/src/main/scala/kafka/zk/ZkData.scala
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -28,9 +28,15 @@ import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls
import kafka.server.ConfigType
import kafka.utils.Json
import org.apache.kafka.common.TopicPartition
-import org.apache.zookeeper.data.Stat
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.utils.Time
+import org.apache.zookeeper.ZooDefs
+import org.apache.zookeeper.data.{ACL, Stat}
import scala.collection.JavaConverters._
+import scala.collection.Seq
+import scala.collection.mutable.ArrayBuffer
// This file contains objects for encoding/decoding data stored in ZooKeeper nodes (znodes).
@@ -63,42 +69,158 @@ object BrokerIdsZNode {
def encode: Array[Byte] = null
}
-class BrokerInfo(val id: Int,
- host: String,
- port: Int,
- advertisedEndpoints: Seq[EndPoint],
- jmxPort: Int,
- rack: Option[String],
- apiVersion: ApiVersion) {
+object BrokerInfo {
- def path(): String = {
- BrokerIdZNode.path(id)
+ /**
+ * Create a broker info with v4 json format (which includes multiple endpoints and rack) if
+ * the apiVersion is 0.10.0.X or above. Register the broker with v2 json format otherwise.
+ *
+ * Due to KAFKA-3100, 0.9.0.0 broker and old clients will break if JSON version is above 2.
+ *
+ * We include v2 to make it possible for the broker to migrate from 0.9.0.0 to 0.10.0.X or above without having to
+ * upgrade to 0.9.0.1 first (clients have to be upgraded to 0.9.0.1 in any case).
+ */
+ def apply(broker: Broker, apiVersion: ApiVersion, jmxPort: Int): BrokerInfo = {
+ // see method documentation for the reason why we do this
+ val version = if (apiVersion >= KAFKA_0_10_0_IV1) 4 else 2
+ BrokerInfo(broker, version, jmxPort)
}
- def endpoints(): String = {
- advertisedEndpoints.mkString(",")
- }
+}
- def encode(): Array[Byte] = {
- BrokerIdZNode.encode(id, host, port, advertisedEndpoints, jmxPort, rack, apiVersion)
- }
+case class BrokerInfo(broker: Broker, version: Int, jmxPort: Int) {
+ val path: String = BrokerIdZNode.path(broker.id)
+ def toJsonBytes: Array[Byte] = BrokerIdZNode.encode(this)
}
object BrokerIdZNode {
+ private val HostKey = "host"
+ private val PortKey = "port"
+ private val VersionKey = "version"
+ private val EndpointsKey = "endpoints"
+ private val RackKey = "rack"
+ private val JmxPortKey = "jmx_port"
+ private val ListenerSecurityProtocolMapKey = "listener_security_protocol_map"
+ private val TimestampKey = "timestamp"
+
def path(id: Int) = s"${BrokerIdsZNode.path}/$id"
- def encode(id: Int,
- host: String,
- port: Int,
- advertisedEndpoints: Seq[EndPoint],
- jmxPort: Int,
- rack: Option[String],
- apiVersion: ApiVersion): Array[Byte] = {
- val version = if (apiVersion >= KAFKA_0_10_0_IV1) 4 else 2
- Broker.toJsonBytes(version, id, host, port, advertisedEndpoints, jmxPort, rack)
+
+ /**
+ * Encode to JSON bytes.
+ *
+ * The JSON format includes a top level host and port for compatibility with older clients.
+ */
+ def encode(version: Int, host: String, port: Int, advertisedEndpoints: Seq[EndPoint], jmxPort: Int,
+ rack: Option[String]): Array[Byte] = {
+ val jsonMap = collection.mutable.Map(VersionKey -> version,
+ HostKey -> host,
+ PortKey -> port,
+ EndpointsKey -> advertisedEndpoints.map(_.connectionString).toBuffer.asJava,
+ JmxPortKey -> jmxPort,
+ TimestampKey -> Time.SYSTEM.milliseconds().toString
+ )
+ rack.foreach(rack => if (version >= 3) jsonMap += (RackKey -> rack))
+
+ if (version >= 4) {
+ jsonMap += (ListenerSecurityProtocolMapKey -> advertisedEndpoints.map { endPoint =>
+ endPoint.listenerName.value -> endPoint.securityProtocol.name
+ }.toMap.asJava)
+ }
+ Json.encodeAsBytes(jsonMap.asJava)
+ }
+
+ def encode(brokerInfo: BrokerInfo): Array[Byte] = {
+ val broker = brokerInfo.broker
+ // the default host and port are here for compatibility with older clients that only support PLAINTEXT
+ // we choose the first plaintext port, if there is one
+ // or we register an empty endpoint, which means that older clients will not be able to connect
+ val plaintextEndpoint = broker.endPoints.find(_.securityProtocol == SecurityProtocol.PLAINTEXT).getOrElse(
+ new EndPoint(null, -1, null, null))
+ encode(brokerInfo.version, plaintextEndpoint.host, plaintextEndpoint.port, broker.endPoints, brokerInfo.jmxPort,
+ broker.rack)
}
- def decode(id: Int, bytes: Array[Byte]): Broker = {
- Broker.createBroker(id, new String(bytes, UTF_8))
+ /**
+ * Create a BrokerInfo object from id and JSON bytes.
+ *
+ * @param id
+ * @param jsonBytes
+ *
+ * Version 1 JSON schema for a broker is:
+ * {
+ * "version":1,
+ * "host":"localhost",
+ * "port":9092
+ * "jmx_port":9999,
+ * "timestamp":"2233345666"
+ * }
+ *
+ * Version 2 JSON schema for a broker is:
+ * {
+ * "version":2,
+ * "host":"localhost",
+ * "port":9092,
+ * "jmx_port":9999,
+ * "timestamp":"2233345666",
+ * "endpoints":["PLAINTEXT://host1:9092", "SSL://host1:9093"]
+ * }
+ *
+ * Version 3 JSON schema for a broker is:
+ * {
+ * "version":3,
+ * "host":"localhost",
+ * "port":9092,
+ * "jmx_port":9999,
+ * "timestamp":"2233345666",
+ * "endpoints":["PLAINTEXT://host1:9092", "SSL://host1:9093"],
+ * "rack":"dc1"
+ * }
+ *
+ * Version 4 (current) JSON schema for a broker is:
+ * {
+ * "version":4,
+ * "host":"localhost",
+ * "port":9092,
+ * "jmx_port":9999,
+ * "timestamp":"2233345666",
+ * "endpoints":["CLIENT://host1:9092", "REPLICATION://host1:9093"],
+ * "listener_security_protocol_map":{"CLIENT":"SSL", "REPLICATION":"PLAINTEXT"},
+ * "rack":"dc1"
+ * }
+ */
+ def decode(id: Int, jsonBytes: Array[Byte]): BrokerInfo = {
+ Json.tryParseBytes(jsonBytes) match {
+ case Right(js) =>
+ val brokerInfo = js.asJsonObject
+ val version = brokerInfo(VersionKey).to[Int]
+ val jmxPort = brokerInfo(JmxPortKey).to[Int]
+
+ val endpoints =
+ if (version < 1)
+ throw new KafkaException("Unsupported version of broker registration: " +
+ s"${new String(jsonBytes, UTF_8)}")
+ else if (version == 1) {
+ val host = brokerInfo(HostKey).to[String]
+ val port = brokerInfo(PortKey).to[Int]
+ val securityProtocol = SecurityProtocol.PLAINTEXT
+ val endPoint = new EndPoint(host, port, ListenerName.forSecurityProtocol(securityProtocol), securityProtocol)
+ Seq(endPoint)
+ }
+ else {
+ val securityProtocolMap = brokerInfo.get(ListenerSecurityProtocolMapKey).map(
+ _.to[Map[String, String]].map { case (listenerName, securityProtocol) =>
+ new ListenerName(listenerName) -> SecurityProtocol.forName(securityProtocol)
+ })
+ val listeners = brokerInfo(EndpointsKey).to[Seq[String]]
+ listeners.map(EndPoint.createEndPoint(_, securityProtocolMap))
+ }
+
+ val rack = brokerInfo.get(RackKey).flatMap(_.to[Option[String]])
+ BrokerInfo(Broker(id, endpoints, rack), version, jmxPort)
+ case Left(e) =>
+ throw new KafkaException(s"Failed to parse ZooKeeper registration for broker $id: " +
+ s"${new String(jsonBytes, UTF_8)}", e)
+ }
}
}
@@ -342,8 +464,12 @@ object AclChangeNotificationSequenceZNode {
def decode(bytes: Array[Byte]): String = new String(bytes, UTF_8)
}
+object ClusterZNode {
+ def path = "/cluster"
+}
+
object ClusterIdZNode {
- def path = "/cluster/id"
+ def path = s"${ClusterZNode.path}/id"
def toJson(id: String): Array[Byte] = {
Json.encodeAsBytes(Map("version" -> "1", "id" -> id).asJava)
@@ -365,17 +491,46 @@ object ProducerIdBlockZNode {
}
object ZkData {
+
+ // Important: it is necessary to add any new top level Zookeeper path to the Seq
+ val SecureRootPaths = Seq(AdminZNode.path,
+ BrokersZNode.path,
+ ClusterZNode.path,
+ ConfigZNode.path,
+ ControllerZNode.path,
+ ControllerEpochZNode.path,
+ IsrChangeNotificationZNode.path,
+ AclZNode.path,
+ AclChangeNotificationZNode.path,
+ ProducerIdBlockZNode.path,
+ LogDirEventNotificationZNode.path)
+
// These are persistent ZK paths that should exist on kafka broker startup.
val PersistentZkPaths = Seq(
"/consumers", // old consumer path
BrokerIdsZNode.path,
TopicsZNode.path,
ConfigEntityChangeNotificationZNode.path,
- ConfigEntityTypeZNode.path(ConfigType.Topic),
- ConfigEntityTypeZNode.path(ConfigType.Client),
DeleteTopicsZNode.path,
BrokerSequenceIdZNode.path,
IsrChangeNotificationZNode.path,
ProducerIdBlockZNode.path,
- LogDirEventNotificationZNode.path)
-}
\ No newline at end of file
+ LogDirEventNotificationZNode.path
+ ) ++ ConfigType.all.map(ConfigEntityTypeZNode.path)
+
+ val SensitiveRootPaths = Seq(ConfigEntityTypeZNode.path(ConfigType.User))
+
+ def sensitivePath(path: String): Boolean = {
+ path != null && SensitiveRootPaths.exists(path.startsWith)
+ }
+
+ def defaultAcls(isSecure: Boolean, path: String): Seq[ACL] = {
+ if (isSecure) {
+ val acls = new ArrayBuffer[ACL]
+ acls ++= ZooDefs.Ids.CREATOR_ALL_ACL.asScala
+ if (!sensitivePath(path))
+ acls ++= ZooDefs.Ids.READ_ACL_UNSAFE.asScala
+ acls
+ } else ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala
+ }
+}
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 6d786dc..9a1d162 100644
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -49,8 +49,8 @@ class ZooKeeperClient(connectString: String,
connectionTimeoutMs: Int,
maxInFlightRequests: Int,
time: Time,
- metricGroup: String = "kafka.server",
- metricType: String = "KafkaHealthcheck") extends Logging with KafkaMetricsGroup {
+ metricGroup: String,
+ metricType: String) extends Logging with KafkaMetricsGroup {
this.logIdent = "[ZooKeeperClient] "
private val initializationLock = new ReentrantReadWriteLock()
private val isConnectedOrExpiredLock = new ReentrantLock()
@@ -81,27 +81,27 @@ class ZooKeeperClient(connectString: String,
}
info(s"Initializing a new session to $connectString.")
+ // Fail-fast if there's an error during construction (so don't call initialize, which retries forever)
@volatile private var zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher)
- private val sessionStateGauge =
- newGauge("SessionState", new Gauge[String] {
- override def value: String =
- Option(zooKeeper.getState.toString).getOrElse("DISCONNECTED")
- })
+ newGauge("SessionState", new Gauge[String] {
+ override def value: String = Option(connectionState.toString).getOrElse("DISCONNECTED")
+ })
metricNames += "SessionState"
waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS)
-
- /**
- * This is added to preserve the original metric name in JMX
- */
override def metricName(name: String, metricTags: scala.collection.Map[String, String]): MetricName = {
explicitMetricName(metricGroup, metricType, name, metricTags)
}
/**
+ * Return the state of the ZooKeeper connection.
+ */
+ def connectionState: States = zooKeeper.getState
+
+ /**
* Send a request and wait for its response. See handle(Seq[AsyncRequest]) for details.
*
* @param request a single request to send and wait on.
@@ -214,13 +214,13 @@ class ZooKeeperClient(connectString: String,
info("Waiting until connected.")
var nanos = timeUnit.toNanos(timeout)
inLock(isConnectedOrExpiredLock) {
- var state = zooKeeper.getState
+ var state = connectionState
while (!state.isConnected && state.isAlive) {
if (nanos <= 0) {
throw new ZooKeeperClientTimeoutException(s"Timed out waiting for connection while in state: $state")
}
nanos = isConnectedOrExpiredCondition.awaitNanos(nanos)
- state = zooKeeper.getState
+ state = connectionState
}
if (state == States.AUTH_FAILED) {
throw new ZooKeeperClientAuthFailedException("Auth failed either before or while waiting for connection")
@@ -309,24 +309,23 @@ class ZooKeeperClient(connectString: String,
def sessionId: Long = inReadLock(initializationLock) {
zooKeeper.getSessionId
}
-
+
private def initialize(): Unit = {
- if (!zooKeeper.getState.isAlive) {
+ if (!connectionState.isAlive) {
+ zooKeeper.close()
info(s"Initializing a new session to $connectString.")
// retry forever until ZooKeeper can be instantiated
- while (true) {
+ var connected = false
+ while (!connected) {
try {
- zooKeeper.close()
zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher)
- return
+ connected = true
} catch {
case e: Exception =>
- info("Error when recreating ZooKeeper", e)
+ info("Error when recreating ZooKeeper, retrying after a short sleep", e)
Thread.sleep(1000)
}
}
- info(s"Timed out waiting for connection during session initialization while in state: ${zooKeeper.getState}")
- stateChangeHandlers.values.foreach(_.onReconnectionTimeout())
}
}
@@ -341,7 +340,7 @@ class ZooKeeperClient(connectString: String,
// package level visibility for testing only
private[zookeeper] object ZooKeeperClientWatcher extends Watcher {
override def process(event: WatchedEvent): Unit = {
- debug("Received event: " + event)
+ debug(s"Received event: $event")
Option(event.getPath) match {
case None =>
val state = event.getState
@@ -377,7 +376,6 @@ trait StateChangeHandler {
def beforeInitializingSession(): Unit = {}
def afterInitializingSession(): Unit = {}
def onAuthFailure(): Unit = {}
- def onReconnectionTimeout(): Unit = {}
}
trait ZNodeChangeHandler {
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index daeb82a..cac102c 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -255,7 +255,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
def testDescribeLogDirs(): Unit = {
client = AdminClient.create(createConfig())
val topic = "topic"
- val leaderByPartition = TestUtils.createTopic(zkClient, topic, 10, 1, servers, new Properties())
+ val leaderByPartition = createTopic(topic, numPartitions = 10, replicationFactor = 1)
val partitionsByBroker = leaderByPartition.groupBy { case (partitionId, leaderId) => leaderId }.mapValues(_.keys.toSeq)
val brokers = (0 until serverCount).map(Integer.valueOf)
val logDirInfosByBroker = client.describeLogDirs(brokers.asJava).all.get
@@ -281,8 +281,10 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
def testDescribeReplicaLogDirs(): Unit = {
client = AdminClient.create(createConfig())
val topic = "topic"
- val leaderByPartition = TestUtils.createTopic(zkClient, topic, 10, 1, servers, new Properties())
- val replicas = leaderByPartition.map { case (partition, brokerId) => new TopicPartitionReplica(topic, partition, brokerId) }.toSeq
+ val leaderByPartition = createTopic(topic, numPartitions = 10, replicationFactor = 1)
+ val replicas = leaderByPartition.map { case (partition, brokerId) =>
+ new TopicPartitionReplica(topic, partition, brokerId)
+ }.toSeq
val replicaDirInfos = client.describeReplicaLogDirs(replicas.asJavaCollection).all.get
replicaDirInfos.asScala.foreach { case (topicPartitionReplica, replicaDirInfo) =>
@@ -317,7 +319,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
assertTrue(exception.getCause.isInstanceOf[ReplicaNotAvailableException])
}
- TestUtils.createTopic(zkClient, topic, 1, serverCount, servers, new Properties)
+ createTopic(topic, numPartitions = 1, replicationFactor = serverCount)
servers.foreach { server =>
val logDir = server.logManager.getLog(tp).get.dir.getParent
assertEquals(firstReplicaAssignment(new TopicPartitionReplica(topic, 0, server.config.brokerId)), logDir)
@@ -389,11 +391,11 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
val topicConfig1 = new Properties
topicConfig1.setProperty(LogConfig.MaxMessageBytesProp, "500000")
topicConfig1.setProperty(LogConfig.RetentionMsProp, "60000000")
- TestUtils.createTopic(zkClient, topic1, 1, 1, servers, topicConfig1)
+ createTopic(topic1, numPartitions = 1, replicationFactor = 1, topicConfig1)
val topic2 = "describe-alter-configs-topic-2"
val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2)
- TestUtils.createTopic(zkClient, topic2, 1, 1, servers, new Properties)
+ createTopic(topic2, numPartitions = 1, replicationFactor = 1)
// Describe topics and broker
val brokerResource1 = new ConfigResource(ConfigResource.Type.BROKER, servers(1).config.brokerId.toString)
@@ -447,7 +449,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
assertEquals(servers(2).config.logCleanerThreads.toString,
configs.get(brokerResource2).get(KafkaConfig.LogCleanerThreadsProp).value)
- checkValidAlterConfigs(servers, client, topicResource1, topicResource2)
+ checkValidAlterConfigs(client, topicResource1, topicResource2)
}
@Test
@@ -456,10 +458,10 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
// Create topics
val topic1 = "create-partitions-topic-1"
- TestUtils.createTopic(zkClient, topic1, 1, 1, servers, new Properties)
+ createTopic(topic1, numPartitions = 1, replicationFactor = 1)
val topic2 = "create-partitions-topic-2"
- TestUtils.createTopic(zkClient, topic2, 1, 2, servers, new Properties)
+ createTopic(topic2, numPartitions = 1, replicationFactor = 2)
// assert that both the topics have 1 partition
assertEquals(1, client.describeTopics(Set(topic1).asJava).values.get(topic1).get.partitions.size)
@@ -714,7 +716,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
@Test
def testSeekAfterDeleteRecords(): Unit = {
- TestUtils.createTopic(zkClient, topic, 2, serverCount, servers)
+ createTopic(topic, numPartitions = 2, replicationFactor = serverCount)
client = AdminClient.create(createConfig)
@@ -741,7 +743,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
@Test
@Ignore // Disabled temporarily until flakiness is resolved
def testLogStartOffsetCheckpoint(): Unit = {
- TestUtils.createTopic(zkClient, topic, 2, serverCount, servers)
+ createTopic(topic, numPartitions = 2, replicationFactor = serverCount)
client = AdminClient.create(createConfig)
@@ -780,7 +782,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
@Test
def testLogStartOffsetAfterDeleteRecords(): Unit = {
- TestUtils.createTopic(zkClient, topic, 2, serverCount, servers)
+ createTopic(topic, numPartitions = 2, replicationFactor = serverCount)
client = AdminClient.create(createConfig)
@@ -799,7 +801,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
@Test
def testOffsetsForTimesAfterDeleteRecords(): Unit = {
- TestUtils.createTopic(zkClient, topic, 2, serverCount, servers)
+ createTopic(topic, numPartitions = 2, replicationFactor = serverCount)
client = AdminClient.create(createConfig)
@@ -942,8 +944,7 @@ object AdminClientIntegrationTest {
import org.scalatest.Assertions._
- def checkValidAlterConfigs(servers: Seq[KafkaServer], client: AdminClient,
- topicResource1: ConfigResource, topicResource2: ConfigResource): Unit = {
+ def checkValidAlterConfigs(client: AdminClient, topicResource1: ConfigResource, topicResource2: ConfigResource): Unit = {
// Alter topics
var topicConfigEntries1 = Seq(
new ConfigEntry(LogConfig.FlushMsProp, "1000")
@@ -1009,11 +1010,11 @@ object AdminClientIntegrationTest {
// Create topics
val topic1 = "invalid-alter-configs-topic-1"
val topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1)
- TestUtils.createTopic(zkClient, topic1, 1, 1, servers, new Properties())
+ TestUtils.createTopic(zkClient, topic1, 1, 1, servers)
val topic2 = "invalid-alter-configs-topic-2"
val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2)
- TestUtils.createTopic(zkClient, topic2, 1, 1, servers, new Properties)
+ TestUtils.createTopic(zkClient, topic2, 1, 1, servers)
val topicConfigEntries1 = Seq(
new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "1.1"), // this value is invalid as it's above 1.0
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
index a381c84..1bea039 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
@@ -76,13 +76,13 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with
val topicConfig1 = new Properties
topicConfig1.setProperty(LogConfig.MaxMessageBytesProp, "500000")
topicConfig1.setProperty(LogConfig.RetentionMsProp, "60000000")
- TestUtils.createTopic(zkClient, topic1, 1, 1, servers, topicConfig1)
+ createTopic(topic1, 1, 1, topicConfig1)
val topic2 = "describe-alter-configs-topic-2"
val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2)
- TestUtils.createTopic(zkClient, topic2, 1, 1, servers, new Properties)
+ createTopic(topic2, 1, 1)
- AdminClientIntegrationTest.checkValidAlterConfigs(servers, client, topicResource1, topicResource2)
+ AdminClientIntegrationTest.checkValidAlterConfigs(client, topicResource1, topicResource2)
}
@Test
@@ -98,15 +98,15 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with
// Create topics
val topic1 = "invalid-alter-configs-due-to-policy-topic-1"
val topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1)
- TestUtils.createTopic(zkClient, topic1, 1, 1, servers, new Properties())
+ createTopic(topic1, 1, 1)
val topic2 = "invalid-alter-configs-due-to-policy-topic-2"
val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2)
- TestUtils.createTopic(zkClient, topic2, 1, 1, servers, new Properties)
+ createTopic(topic2, 1, 1)
val topic3 = "invalid-alter-configs-due-to-policy-topic-3"
val topicResource3 = new ConfigResource(ConfigResource.Type.TOPIC, topic3)
- TestUtils.createTopic(zkClient, topic3, 1, 1, servers, new Properties)
+ createTopic(topic3, 1, 1)
val topicConfigEntries1 = Seq(
new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.9"),
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index a11dd8c..3521bb6 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -19,7 +19,7 @@ import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.common.{PartitionInfo, TopicPartition}
-import kafka.utils.{ShutdownableThread, TestUtils}
+import kafka.utils.ShutdownableThread
import kafka.server.KafkaConfig
import org.junit.Assert._
import org.junit.{Before, Test}
@@ -68,7 +68,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness {
super.setUp()
// create the test topic with all the brokers as replicas
- TestUtils.createTopic(zkClient, topic, 2, serverCount, this.servers)
+ createTopic(topic, 2, serverCount)
}
@Test
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 579c8bb..106984c 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -123,7 +123,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
try {
// create topic
- TestUtils.createTopic(zkClient, topic, 1, 2, servers)
+ createTopic(topic, 1, 2)
// send a normal record
val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, "key".getBytes(StandardCharsets.UTF_8),
@@ -184,7 +184,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
timeoutMs: Long = 20000L) {
val partition = 0
try {
- TestUtils.createTopic(zkClient, topic, 1, 2, servers)
+ createTopic(topic, 1, 2)
val futures = for (i <- 1 to numRecords) yield {
val record = new ProducerRecord(topic, partition, s"key$i".getBytes(StandardCharsets.UTF_8),
@@ -239,7 +239,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
topicProps.setProperty(LogConfig.MessageTimestampTypeProp, "LogAppendTime")
else
topicProps.setProperty(LogConfig.MessageTimestampTypeProp, "CreateTime")
- TestUtils.createTopic(zkClient, topic, 1, 2, servers, topicProps)
+ createTopic(topic, 1, 2, topicProps)
val recordAndFutures = for (i <- 1 to numRecords) yield {
val record = new ProducerRecord(topic, partition, baseTimestamp + i, s"key$i".getBytes(StandardCharsets.UTF_8),
@@ -271,7 +271,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
try {
// create topic
- TestUtils.createTopic(zkClient, topic, 1, 2, servers)
+ createTopic(topic, 1, 2)
// non-blocking send a list of records
val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, null, "key".getBytes(StandardCharsets.UTF_8),
@@ -303,7 +303,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
val producer = createProducer(brokerList)
try {
- TestUtils.createTopic(zkClient, topic, 2, 2, servers)
+ createTopic(topic, 2, 2)
val partition = 1
val now = System.currentTimeMillis()
@@ -348,7 +348,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
val producer = createProducer(brokerList)
// create topic
- TestUtils.createTopic(zkClient, topic, 1, 2, servers)
+ createTopic(topic, 1, 2)
val partition0 = 0
var futures0 = (1 to numRecords).map { i =>
@@ -410,7 +410,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
def testFlush() {
val producer = createProducer(brokerList, lingerMs = Long.MaxValue)
try {
- TestUtils.createTopic(zkClient, topic, 2, 2, servers)
+ createTopic(topic, 2, 2)
val record = new ProducerRecord[Array[Byte], Array[Byte]](topic,
"value".getBytes(StandardCharsets.UTF_8))
for (_ <- 0 until 50) {
@@ -429,7 +429,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
*/
@Test
def testCloseWithZeroTimeoutFromCallerThread() {
- TestUtils.createTopic(zkClient, topic, 2, 2, servers)
+ createTopic(topic, 2, 2)
val partition = 0
consumer.assign(List(new TopicPartition(topic, partition)).asJava)
val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, null,
@@ -459,7 +459,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
*/
@Test
def testCloseWithZeroTimeoutFromSenderThread() {
- TestUtils.createTopic(zkClient, topic, 1, 2, servers)
+ createTopic(topic, 1, 2)
val partition = 0
consumer.assign(List(new TopicPartition(topic, partition)).asJava)
val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, null, "value".getBytes(StandardCharsets.UTF_8))
diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
index 257308c..9b1c2aa 100644
--- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
@@ -72,7 +72,7 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
super.setUp()
val numPartitions = 1
- val leaders = TestUtils.createTopic(zkClient, topic1, numPartitions, serverCount, servers)
+ val leaders = createTopic(topic1, numPartitions, serverCount)
leaderNode = if (leaders(0) == servers.head.config.brokerId) servers.head else servers(1)
followerNode = if (leaders(0) != servers.head.config.brokerId) servers.head else servers(1)
}
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 5fca9b4..8917921 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -69,7 +69,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
super.setUp()
// create the test topic with all the brokers as replicas
- TestUtils.createTopic(zkClient, topic, 1, serverCount, this.servers)
+ createTopic(topic, 1, serverCount)
}
@After
@@ -288,7 +288,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
@Test
def testCloseDuringRebalance() {
val topic = "closetest"
- TestUtils.createTopic(zkClient, topic, 10, serverCount, this.servers)
+ createTopic(topic, 10, serverCount)
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000")
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000")
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index d997db5..1e93b37 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -161,7 +161,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
TestUtils.waitAndVerifyAcls(TopicBrokerReadAcl, s.apis.authorizer.get, new Resource(Topic, "*"))
}
// create the test topic with all the brokers as replicas
- TestUtils.createTopic(zkClient, topic, 1, 3, this.servers)
+ createTopic(topic, 1, 3)
}
override def createNewProducer: KafkaProducer[Array[Byte], Array[Byte]] = {
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
index 8022170..eff596b 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
@@ -115,7 +115,7 @@ class EndToEndClusterIdTest extends KafkaServerTestHarness {
super.setUp()
MockDeserializer.resetStaticVariables
// create the consumer offset topic
- TestUtils.createTopic(zkClient, topic, 2, serverCount, this.servers)
+ createTopic(topic, 2, serverCount)
}
@Test
diff --git a/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala b/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
index 774db18..57a2b20 100644
--- a/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
+++ b/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
@@ -69,7 +69,7 @@ class LegacyAdminClientTest extends IntegrationTestHarness with Logging {
override def setUp() {
super.setUp()
client = AdminClient.createSimplePlaintext(this.brokerList)
- TestUtils.createTopic(zkClient, topic, 2, serverCount, this.servers)
+ createTopic(topic, 2, serverCount)
}
@After
diff --git a/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala b/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala
index 9c5a9c8..eaa4a23 100644
--- a/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala
+++ b/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala
@@ -48,7 +48,7 @@ class LogAppendTimeTest extends IntegrationTestHarness {
@Before
override def setUp() {
super.setUp()
- TestUtils.createTopic(zkClient, topic, servers = servers)
+ createTopic(topic)
}
@Test
diff --git a/core/src/test/scala/integration/kafka/api/MetricsTest.scala b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
index 6e95973..80cfeca 100644
--- a/core/src/test/scala/integration/kafka/api/MetricsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
@@ -71,7 +71,7 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
val topic = "topicWithOldMessageFormat"
val props = new Properties
props.setProperty(LogConfig.MessageFormatVersionProp, "0.9.0")
- TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, this.servers, props)
+ createTopic(topic, numPartitions = 1, replicationFactor = 1, props)
val tp = new TopicPartition(topic, 0)
// Produce and consume some records
@@ -206,9 +206,9 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
private def verifyBrokerZkMetrics(server: KafkaServer, topic: String): Unit = {
// Latency is rounded to milliseconds, so check the count instead.
- val initialCount = yammerHistogramCount("kafka.server:type=ZooKeeperClientMetrics,name=ZooKeeperRequestLatencyMs").asInstanceOf[Long]
+ val initialCount = yammerHistogramCount("kafka.server:type=ZooKeeperClientMetrics,name=ZooKeeperRequestLatencyMs")
servers.head.zkClient.getLeaderForPartition(new TopicPartition(topic, 0))
- val newCount = yammerHistogramCount("kafka.server:type=ZooKeeperClientMetrics,name=ZooKeeperRequestLatencyMs").asInstanceOf[Long]
+ val newCount = yammerHistogramCount("kafka.server:type=ZooKeeperClientMetrics,name=ZooKeeperRequestLatencyMs")
assertTrue("ZooKeeper latency not recorded", newCount > initialCount)
assertEquals(s"Unexpected ZK state", "CONNECTED", yammerMetricValue("SessionState"))
@@ -274,7 +274,7 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
}
}
- private def yammerHistogramCount(name: String): Any = {
+ private def yammerHistogramCount(name: String): Long = {
val allMetrics = Metrics.defaultRegistry.allMetrics.asScala
val (_, metric) = allMetrics.find { case (n, _) => n.getMBeanName.endsWith(name) }
.getOrElse(fail(s"Unable to find broker metric $name: allMetrics: ${allMetrics.keySet.map(_.getMBeanName)}"))
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 65b2865..04935f8 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -325,17 +325,17 @@ class PlaintextConsumerTest extends BaseConsumerTest {
sendRecords(numRecords)
val topic1 = "tblablac" // matches subscribed pattern
- TestUtils.createTopic(zkClient, topic1, 2, serverCount, this.servers)
+ createTopic(topic1, 2, serverCount)
sendRecords(1000, new TopicPartition(topic1, 0))
sendRecords(1000, new TopicPartition(topic1, 1))
val topic2 = "tblablak" // does not match subscribed pattern
- TestUtils.createTopic(zkClient, topic2, 2, serverCount, this.servers)
+ createTopic(topic2, 2, serverCount)
sendRecords(1000, new TopicPartition(topic2, 0))
sendRecords(1000, new TopicPartition(topic2, 1))
val topic3 = "tblab1" // does not match subscribed pattern
- TestUtils.createTopic(zkClient, topic3, 2, serverCount, this.servers)
+ createTopic(topic3, 2, serverCount)
sendRecords(1000, new TopicPartition(topic3, 0))
sendRecords(1000, new TopicPartition(topic3, 1))
@@ -357,7 +357,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
}, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers.head.assignment()}")
val topic4 = "tsomec" // matches subscribed pattern
- TestUtils.createTopic(zkClient, topic4, 2, serverCount, this.servers)
+ createTopic(topic4, 2, serverCount)
sendRecords(1000, new TopicPartition(topic4, 0))
sendRecords(1000, new TopicPartition(topic4, 1))
@@ -396,7 +396,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
// the first topic ('topic') matches first subscription pattern only
val fooTopic = "foo" // matches both subscription patterns
- TestUtils.createTopic(zkClient, fooTopic, 1, serverCount, this.servers)
+ createTopic(fooTopic, 1, serverCount)
sendRecords(1000, new TopicPartition(fooTopic, 0))
assertEquals(0, consumer0.assignment().size)
@@ -416,7 +416,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
}, s"Expected partitions ${subscriptions.asJava} but actually got ${consumer0.assignment()}")
val barTopic = "bar" // matches the next subscription pattern
- TestUtils.createTopic(zkClient, barTopic, 1, serverCount, this.servers)
+ createTopic(barTopic, 1, serverCount)
sendRecords(1000, new TopicPartition(barTopic, 0))
val pattern2 = Pattern.compile("...") // only 'foo' and 'bar' match this
@@ -453,7 +453,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
sendRecords(numRecords)
val topic1 = "tblablac" // matches the subscription pattern
- TestUtils.createTopic(zkClient, topic1, 2, serverCount, this.servers)
+ createTopic(topic1, 2, serverCount)
sendRecords(1000, new TopicPartition(topic1, 0))
sendRecords(1000, new TopicPartition(topic1, 1))
@@ -525,7 +525,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
this.consumers.head.assignment == subscriptions.asJava
}, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers.head.assignment}")
- TestUtils.createTopic(zkClient, otherTopic, 2, serverCount, this.servers)
+ createTopic(otherTopic, 2, serverCount)
this.consumers.head.subscribe(List(topic, otherTopic).asJava)
TestUtils.waitUntilTrue(() => {
this.consumers.head.poll(50)
@@ -536,7 +536,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
@Test
def testShrinkingTopicSubscriptions() {
val otherTopic = "other"
- TestUtils.createTopic(zkClient, otherTopic, 2, serverCount, this.servers)
+ createTopic(otherTopic, 2, serverCount)
val subscriptions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1), new TopicPartition(otherTopic, 0), new TopicPartition(otherTopic, 1))
val shrunkenSubscriptions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1))
this.consumers.head.subscribe(List(topic, otherTopic).asJava)
@@ -555,7 +555,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
@Test
def testPartitionsFor() {
val numParts = 2
- TestUtils.createTopic(zkClient, "part-test", numParts, 1, this.servers)
+ createTopic("part-test", numParts, 1)
val parts = this.consumers.head.partitionsFor("part-test")
assertNotNull(parts)
assertEquals(2, parts.size)
@@ -788,7 +788,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
val partitionCount = 30
val topics = Seq(topic1, topic2, topic3)
topics.foreach { topicName =>
- TestUtils.createTopic(zkClient, topicName, partitionCount, serverCount, servers)
+ createTopic(topicName, partitionCount, serverCount)
}
val partitions = topics.flatMap { topic =>
@@ -1049,7 +1049,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
@Test
def testAutoCommitIntercept() {
val topic2 = "topic2"
- TestUtils.createTopic(zkClient, topic2, 2, serverCount, this.servers)
+ createTopic(topic2, 2, serverCount)
// produce records
val numRecords = 100
@@ -1145,7 +1145,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
val topicName = "testConsumeMessagesWithLogAppendTime"
val topicProps = new Properties()
topicProps.setProperty(LogConfig.MessageTimestampTypeProp, "LogAppendTime")
- TestUtils.createTopic(zkClient, topicName, 2, 2, servers, topicProps)
+ createTopic(topicName, 2, 2, topicProps)
val startTime = System.currentTimeMillis()
val numRecords = 50
@@ -1171,9 +1171,9 @@ class PlaintextConsumerTest extends BaseConsumerTest {
val topic1 = "part-test-topic-1"
val topic2 = "part-test-topic-2"
val topic3 = "part-test-topic-3"
- TestUtils.createTopic(zkClient, topic1, numParts, 1, this.servers)
- TestUtils.createTopic(zkClient, topic2, numParts, 1, this.servers)
- TestUtils.createTopic(zkClient, topic3, numParts, 1, this.servers)
+ createTopic(topic1, numParts, 1)
+ createTopic(topic2, numParts, 1)
+ createTopic(topic3, numParts, 1)
val topics = this.consumers.head.listTopics()
assertNotNull(topics)
@@ -1192,10 +1192,10 @@ class PlaintextConsumerTest extends BaseConsumerTest {
val topic3 = "part-test-topic-3"
val props = new Properties()
props.setProperty(LogConfig.MessageFormatVersionProp, "0.9.0")
- TestUtils.createTopic(zkClient, topic1, numParts, 1, this.servers)
+ createTopic(topic1, numParts, 1)
// Topic2 is in old message format.
- TestUtils.createTopic(zkClient, topic2, numParts, 1, this.servers, props)
- TestUtils.createTopic(zkClient, topic3, numParts, 1, this.servers)
+ createTopic(topic2, numParts, 1, props)
+ createTopic(topic3, numParts, 1)
val consumer = this.consumers.head
@@ -1242,7 +1242,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
createTopicAndSendRecords(topicName = topic0, numPartitions = 2, recordsPerPartition = 100)
val props = new Properties()
props.setProperty(LogConfig.MessageFormatVersionProp, "0.9.0")
- TestUtils.createTopic(zkClient, topic1, numPartitions = 1, replicationFactor = 1, this.servers, props)
+ createTopic(topic1, numPartitions = 1, replicationFactor = 1, props)
sendRecords(100, new TopicPartition(topic1, 0))
val t0p0 = new TopicPartition(topic0, 0)
@@ -1332,7 +1332,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
@Test
def testAutoCommitOnRebalance() {
val topic2 = "topic2"
- TestUtils.createTopic(zkClient, topic2, 2, serverCount, this.servers)
+ createTopic(topic2, 2, serverCount)
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
@@ -1380,7 +1380,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
def testPerPartitionLagMetricsCleanUpWithSubscribe() {
val numMessages = 1000
val topic2 = "topic2"
- TestUtils.createTopic(zkClient, topic2, 2, serverCount, this.servers)
+ createTopic(topic2, 2, serverCount)
// send some messages.
sendRecords(numMessages, tp)
// Test subscribe
@@ -1551,7 +1551,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
* records to each partition
*/
def createTopicAndSendRecords(topicName: String, numPartitions: Int, recordsPerPartition: Int): Set[TopicPartition] = {
- TestUtils.createTopic(zkClient, topicName, numPartitions, serverCount, this.servers)
+ createTopic(topicName, numPartitions, serverCount)
var parts = Set[TopicPartition]()
for (partition <- 0 until numPartitions) {
val tp = new TopicPartition(topicName, partition)
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
index 6425c1e..929dbe4 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
@@ -89,7 +89,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
def testSendWithInvalidCreateTime() {
val topicProps = new Properties()
topicProps.setProperty(LogConfig.MessageTimestampDifferenceMaxMsProp, "1000")
- TestUtils.createTopic(zkClient, topic, 1, 2, servers, topicProps)
+ createTopic(topic, 1, 2, topicProps)
val producer = createProducer(brokerList = brokerList)
try {
diff --git a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
index 92ccf49..e3514cd 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
@@ -68,10 +68,10 @@ class ProducerBounceTest extends KafkaServerTestHarness {
val numPartitions = 3
val topicConfig = new Properties()
topicConfig.put(KafkaConfig.MinInSyncReplicasProp, 2.toString)
- TestUtils.createTopic(zkClient, topic1, numPartitions, numServers, servers, topicConfig)
+ createTopic(topic1, numPartitions, numServers, topicConfig)
val scheduler = new ProducerScheduler()
- scheduler.start
+ scheduler.start()
// rolling bounce brokers
@@ -92,7 +92,7 @@ class ProducerBounceTest extends KafkaServerTestHarness {
(0 until numPartitions).foreach(partition => TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic1, partition))
}
- scheduler.shutdown
+ scheduler.shutdown()
// Make sure the producer do not see any exception
// when draining the left messages on shutdown
diff --git a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
index ea4b05c..4bce8e3 100644
--- a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
@@ -62,7 +62,7 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with
startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), Both,
JaasTestUtils.KafkaServerContextName))
super.setUp()
- TestUtils.createTopic(zkClient, topic, numPartitions, serverCount, this.servers)
+ createTopic(topic, numPartitions, serverCount)
}
@After
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
index da6ca1f..110f609 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
@@ -169,8 +169,8 @@ class TransactionsBounceTest extends KafkaServerTestHarness {
private def createTopics() = {
val topicConfig = new Properties()
topicConfig.put(KafkaConfig.MinInSyncReplicasProp, 2.toString)
- TestUtils.createTopic(zkClient, inputTopic, numPartitions, 3, servers, topicConfig)
- TestUtils.createTopic(zkClient, outputTopic, numPartitions, 3, servers, topicConfig)
+ createTopic(inputTopic, numPartitions, 3, topicConfig)
+ createTopic(outputTopic, numPartitions, 3, topicConfig)
}
private class BounceScheduler extends ShutdownableThread("daemon-broker-bouncer", false) {
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index d50dd33..911808a 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -60,8 +60,8 @@ class TransactionsTest extends KafkaServerTestHarness {
val numPartitions = 4
val topicConfig = new Properties()
topicConfig.put(KafkaConfig.MinInSyncReplicasProp, 2.toString)
- TestUtils.createTopic(zkClient, topic1, numPartitions, numServers, servers, topicConfig)
- TestUtils.createTopic(zkClient, topic2, numPartitions, numServers, servers, topicConfig)
+ createTopic(topic1, numPartitions, numServers, topicConfig)
+ createTopic(topic2, numPartitions, numServers, topicConfig)
for (_ <- 0 until transactionalProducerCount)
createTransactionalProducer("transactional-producer")
@@ -503,8 +503,8 @@ class TransactionsTest extends KafkaServerTestHarness {
val topicConfig = new Properties()
topicConfig.put(KafkaConfig.MinInSyncReplicasProp, 2.toString)
- TestUtils.createTopic(zkClient, topicWith10Partitions, 10, numServers, servers, topicConfig)
- TestUtils.createTopic(zkClient, topicWith10PartitionsAndOneReplica, 10, 1, servers, new Properties())
+ createTopic(topicWith10Partitions, 10, numServers, topicConfig)
+ createTopic(topicWith10PartitionsAndOneReplica, 10, 1, new Properties())
firstProducer.initTransactions()
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 1f07ce4..3b5f888 100755
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -142,7 +142,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
// create the topic
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
// create leaders for all partitions
- TestUtils.makeLeaderForPartition(zkUtils, topic, leaderForPartitionMap, 1)
+ TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1)
val actualReplicaList = leaderForPartitionMap.keys.toArray.map(p => p -> zkUtils.getReplicasForPartition(topic, p)).toMap
assertEquals(expectedReplicaAssignment.size, actualReplicaList.size)
for(i <- 0 until actualReplicaList.size)
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
index 78bbe09..8a731cf 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
@@ -36,7 +36,7 @@ class DeleteConsumerGroupTest extends KafkaServerTestHarness {
val groupToDelete = "groupToDelete"
val otherGroup = "otherGroup"
- TestUtils.createTopic(zkClient, topic, 1, 3, servers)
+ createTopic(topic, 1, 3)
fillInConsumerGroupInfo(topic, groupToDelete, "consumer", 0, 10, false)
fillInConsumerGroupInfo(topic, otherGroup, "consumer", 0, 10, false)
@@ -54,7 +54,7 @@ class DeleteConsumerGroupTest extends KafkaServerTestHarness {
val groupToDelete = "groupToDelete"
val otherGroup = "otherGroup"
- TestUtils.createTopic(zkClient, topic, 1, 3, servers)
+ createTopic(topic, 1, 3)
fillInConsumerGroupInfo(topic, groupToDelete, "consumer", 0, 10, true)
fillInConsumerGroupInfo(topic, otherGroup, "consumer", 0, 10, false)
@@ -71,7 +71,7 @@ class DeleteConsumerGroupTest extends KafkaServerTestHarness {
val topic = "test"
val groupToDelete = "groupToDelete"
val otherGroup = "otherGroup"
- TestUtils.createTopic(zkClient, topic, 1, 3, servers)
+ createTopic(topic, 1, 3)
fillInConsumerGroupInfo(topic, groupToDelete, "consumer", 0, 10, false)
fillInConsumerGroupInfo(topic, otherGroup, "consumer", 0, 10, false)
@@ -89,8 +89,8 @@ class DeleteConsumerGroupTest extends KafkaServerTestHarness {
val otherTopic = "otherTopic"
val groupToDelete = "groupToDelete"
val otherGroup = "otherGroup"
- TestUtils.createTopic(zkClient, topicToDelete, 1, 3, servers)
- TestUtils.createTopic(zkClient, otherTopic, 1, 3, servers)
+ createTopic(topicToDelete, 1, 3)
+ createTopic(otherTopic, 1, 3)
fillInConsumerGroupInfo(topicToDelete, groupToDelete, "consumer", 0, 10, false)
fillInConsumerGroupInfo(otherTopic, groupToDelete, "consumer", 0, 10, false)
@@ -111,8 +111,8 @@ class DeleteConsumerGroupTest extends KafkaServerTestHarness {
val topicToDelete = "topicToDelete"
val otherTopic = "otherTopic"
val group = "group"
- TestUtils.createTopic(zkClient, topicToDelete, 1, 3, servers)
- TestUtils.createTopic(zkClient, otherTopic, 1, 3, servers)
+ createTopic(topicToDelete, 1, 3)
+ createTopic(otherTopic, 1, 3)
fillInConsumerGroupInfo(topicToDelete, group, "consumer", 0, 10, true)
fillInConsumerGroupInfo(otherTopic, group, "consumer", 0, 10, true)
@@ -131,8 +131,8 @@ class DeleteConsumerGroupTest extends KafkaServerTestHarness {
val otherTopic = "otherTopic"
val groups = Seq("group1", "group2")
- TestUtils.createTopic(zkClient, topicToDelete, 1, 3, servers)
- TestUtils.createTopic(zkClient, otherTopic, 1, 3, servers)
+ createTopic(topicToDelete, 1, 3)
+ createTopic(otherTopic, 1, 3)
val groupTopicDirsForTopicToDelete = groups.map(group => new ZKGroupTopicDirs(group, topicToDelete))
val groupTopicDirsForOtherTopic = groups.map(group => new ZKGroupTopicDirs(group, otherTopic))
groupTopicDirsForTopicToDelete.foreach(dir => fillInConsumerGroupInfo(topicToDelete, dir.group, "consumer", 0, 10, false))
@@ -151,7 +151,7 @@ class DeleteConsumerGroupTest extends KafkaServerTestHarness {
val topic = "topic"
val group = "group"
- TestUtils.createTopic(zkClient, topic, 1, 3, servers)
+ createTopic(topic, 1, 3)
val dir = new ZKGroupTopicDirs(group, topic)
fillInConsumerGroupInfo(topic, dir.group, "consumer", 0, 10, false)
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index 593b9cc..897cc59 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -24,7 +24,7 @@ import org.junit.Assert._
import org.junit.{After, Test}
import java.util.Properties
-import kafka.common.{KafkaException, TopicAlreadyMarkedForDeletionException}
+import kafka.common.TopicAlreadyMarkedForDeletionException
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
diff --git a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
index 6325060..43ec6c6 100644
--- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
@@ -439,7 +439,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
for (describeType <- describeTypes) {
val group = this.group + describeType.mkString("")
// run one consumer in the group consuming from a single-partition topic
- val executor = addConsumerGroupExecutor(new ConsumerGroupExecutor(brokerList, 2, group, topic))
+ addConsumerGroupExecutor(new ConsumerGroupExecutor(brokerList, 2, group, topic))
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group) ++ describeType
val service = getConsumerGroupService(cgcArgs)
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
index 7347fa3..4ab2563 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
@@ -32,7 +32,7 @@ import org.junit.{Before, Test}
import org.junit.Assert.{assertEquals, assertNull}
import scala.collection.{Seq, mutable}
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import org.apache.kafka.common.TopicPartition
class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging {
@@ -270,7 +270,7 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging
assigner.maybeLimit(Throttle(1000))
//Then
- for (actual <- propsCapture.getValues) {
+ for (actual <- propsCapture.getValues.asScala) {
assertEquals("1000", actual.getProperty(DynamicConfig.Broker.LeaderReplicationThrottledRateProp))
assertEquals("1000", actual.getProperty(DynamicConfig.Broker.FollowerReplicationThrottledRateProp))
}
@@ -304,7 +304,7 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging
assigner.maybeLimit(Throttle(1000))
//Then
- for (actual <- propsCapture.getValues) {
+ for (actual <- propsCapture.getValues.asScala) {
assertEquals("1000", actual.getProperty(DynamicConfig.Broker.LeaderReplicationThrottledRateProp))
assertEquals("1000", actual.getProperty(DynamicConfig.Broker.FollowerReplicationThrottledRateProp))
}
@@ -334,7 +334,7 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging
assigner.maybeLimit(Throttle(1000))
//Then other property remains
- for (actual <- propsCapture.getValues) {
+ for (actual <- propsCapture.getValues.asScala) {
assertEquals("useful.value", actual.getProperty("useful.key"))
assertEquals("1000", actual.getProperty(DynamicConfig.Broker.LeaderReplicationThrottledRateProp))
assertEquals("1000", actual.getProperty(DynamicConfig.Broker.FollowerReplicationThrottledRateProp))
@@ -369,7 +369,7 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging
ReassignPartitionsCommand.removeThrottle(zk, status, Map.empty, admin)
//Then props should have gone (dummy remains)
- for (capture <- propsCapture.getValues) {
+ for (capture <- propsCapture.getValues.asScala) {
assertEquals("value", capture.get("useful.key"))
assertNull(capture.get(DynamicConfig.Broker.FollowerReplicationThrottledRateProp))
assertNull(capture.get(DynamicConfig.Broker.LeaderReplicationThrottledRateProp))
@@ -406,7 +406,7 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging
ReassignPartitionsCommand.removeThrottle(zk, status, Map.empty, admin)
//Then props should have gone (dummy remains)
- for (actual <- propsCapture.getValues) {
+ for (actual <- propsCapture.getValues.asScala) {
assertEquals("value", actual.getProperty("useful.key"))
assertNull(actual.getProperty(LogConfig.LeaderReplicationThrottledReplicasProp))
assertNull(actual.getProperty(LogConfig.FollowerReplicationThrottledReplicasProp))
diff --git a/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala b/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
index a563c03..c60a7ed 100644
--- a/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
@@ -17,7 +17,10 @@
package kafka.cluster
+import java.nio.charset.StandardCharsets
+
import kafka.utils.TestUtils
+import kafka.zk.BrokerIdZNode
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.junit.Assert.{assertEquals, assertNotEquals, assertNull}
@@ -44,7 +47,7 @@ class BrokerEndPointTest {
@Test
def testFromJsonFutureVersion(): Unit = {
- // `createBroker` should support future compatible versions, we use a hypothetical future version here
+ // Future compatible versions should be supported, we use a hypothetical future version here
val brokerInfoStr = """{
"foo":"bar",
"version":100,
@@ -54,7 +57,7 @@ class BrokerEndPointTest {
"timestamp":"1416974968782",
"endpoints":["SSL://localhost:9093"]
}"""
- val broker = Broker.createBroker(1, brokerInfoStr)
+ val broker = parseBrokerJson(1, brokerInfoStr)
assertEquals(1, broker.id)
val brokerEndPoint = broker.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.SSL))
assertEquals("localhost", brokerEndPoint.host)
@@ -71,7 +74,7 @@ class BrokerEndPointTest {
"timestamp":"1416974968782",
"endpoints":["PLAINTEXT://localhost:9092"]
}"""
- val broker = Broker.createBroker(1, brokerInfoStr)
+ val broker = parseBrokerJson(1, brokerInfoStr)
assertEquals(1, broker.id)
val brokerEndPoint = broker.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
assertEquals("localhost", brokerEndPoint.host)
@@ -81,7 +84,7 @@ class BrokerEndPointTest {
@Test
def testFromJsonV1(): Unit = {
val brokerInfoStr = """{"jmx_port":-1,"timestamp":"1420485325400","host":"172.16.8.243","version":1,"port":9091}"""
- val broker = Broker.createBroker(1, brokerInfoStr)
+ val broker = parseBrokerJson(1, brokerInfoStr)
assertEquals(1, broker.id)
val brokerEndPoint = broker.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
assertEquals("172.16.8.243", brokerEndPoint.host)
@@ -99,7 +102,7 @@ class BrokerEndPointTest {
"endpoints":["PLAINTEXT://host1:9092", "SSL://host1:9093"],
"rack":"dc1"
}"""
- val broker = Broker.createBroker(1, json)
+ val broker = parseBrokerJson(1, json)
assertEquals(1, broker.id)
val brokerEndPoint = broker.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.SSL))
assertEquals("host1", brokerEndPoint.host)
@@ -119,7 +122,7 @@ class BrokerEndPointTest {
"listener_security_protocol_map":{"CLIENT":"SSL", "REPLICATION":"PLAINTEXT"},
"rack":null
}"""
- val broker = Broker.createBroker(1, json)
+ val broker = parseBrokerJson(1, json)
assertEquals(1, broker.id)
val brokerEndPoint = broker.getBrokerEndPoint(new ListenerName("CLIENT"))
assertEquals("host1", brokerEndPoint.host)
@@ -138,7 +141,7 @@ class BrokerEndPointTest {
"endpoints":["CLIENT://host1:9092", "REPLICATION://host1:9093"],
"listener_security_protocol_map":{"CLIENT":"SSL", "REPLICATION":"PLAINTEXT"}
}"""
- val broker = Broker.createBroker(1, json)
+ val broker = parseBrokerJson(1, json)
assertEquals(1, broker.id)
val brokerEndPoint = broker.getBrokerEndPoint(new ListenerName("CLIENT"))
assertEquals("host1", brokerEndPoint.host)
@@ -212,4 +215,7 @@ class BrokerEndPointTest {
assertEquals(9092, endpoint.port)
assertEquals("PLAINTEXT://MyHostname:9092", endpoint.connectionString)
}
+
+ private def parseBrokerJson(id: Int, jsonString: String): Broker =
+ BrokerIdZNode.decode(id, jsonString.getBytes(StandardCharsets.UTF_8)).broker
}
diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
index f46d8cf..c495764 100755
--- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
@@ -27,7 +27,6 @@ import scala.collection._
import org.junit.Assert._
import kafka.message._
import kafka.server._
-import kafka.utils.TestUtils._
import kafka.utils._
import org.junit.{Before, Test}
import kafka.serializer._
@@ -60,7 +59,7 @@ class ConsumerIteratorTest extends KafkaServerTestHarness {
new AtomicLong(0),
new AtomicInteger(0),
""))
- createTopic(zkClient, topic, partitionReplicaAssignment = Map(0 -> Seq(configs.head.brokerId)), servers = servers)
+ createTopic(topic, partitionReplicaAssignment = Map(0 -> Seq(configs.head.brokerId)))
}
@Test
diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index 729ab1a..77930e6 100644
--- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -324,7 +324,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
val zkUtils = ZkUtils(zkConnect, 6000, 30000, false)
// create topic topic1 with 1 partition on broker 0
- createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = servers)
+ createTopic(topic, numPartitions = 1, replicationFactor = 1)
// send some messages to each broker
val sentMessages1 = sendMessages(servers, topic, nMessages)
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index 32435c6..0ad64c5 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -87,7 +87,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
@Test
def testTopicCreationWithOfflineReplica(): Unit = {
servers = makeServers(2)
- val controllerId = TestUtils.waitUntilControllerElected(zkUtils)
+ val controllerId = TestUtils.waitUntilControllerElected(zkClient)
val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
servers(otherBrokerId).shutdown()
servers(otherBrokerId).awaitShutdown()
@@ -115,7 +115,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
@Test
def testTopicPartitionExpansionWithOfflineReplica(): Unit = {
servers = makeServers(2)
- val controllerId = TestUtils.waitUntilControllerElected(zkUtils)
+ val controllerId = TestUtils.waitUntilControllerElected(zkClient)
val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
val tp0 = TopicAndPartition("t", 0)
val tp1 = TopicAndPartition("t", 1)
@@ -133,7 +133,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
@Test
def testPartitionReassignment(): Unit = {
servers = makeServers(2)
- val controllerId = TestUtils.waitUntilControllerElected(zkUtils)
+ val controllerId = TestUtils.waitUntilControllerElected(zkClient)
val metricName = s"kafka.controller:type=ControllerStats,name=${ControllerState.PartitionReassignment.rateAndTimeMetricName.get}"
val timerCount = timer(metricName).count
@@ -158,7 +158,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
@Test
def testPartitionReassignmentWithOfflineReplicaHaltingProgress(): Unit = {
servers = makeServers(2)
- val controllerId = TestUtils.waitUntilControllerElected(zkUtils)
+ val controllerId = TestUtils.waitUntilControllerElected(zkClient)
val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
val tp = TopicAndPartition("t", 0)
val assignment = Map(tp.partition -> Seq(controllerId))
@@ -176,7 +176,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
@Test
def testPartitionReassignmentResumesAfterReplicaComesOnline(): Unit = {
servers = makeServers(2)
- val controllerId = TestUtils.waitUntilControllerElected(zkUtils)
+ val controllerId = TestUtils.waitUntilControllerElected(zkClient)
val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
val tp = TopicAndPartition("t", 0)
val assignment = Map(tp.partition -> Seq(controllerId))
@@ -199,7 +199,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
@Test
def testPreferredReplicaLeaderElection(): Unit = {
servers = makeServers(2)
- val controllerId = TestUtils.waitUntilControllerElected(zkUtils)
+ val controllerId = TestUtils.waitUntilControllerElected(zkClient)
val otherBroker = servers.find(_.config.brokerId != controllerId).get
val tp = TopicAndPartition("t", 0)
val assignment = Map(tp.partition -> Seq(otherBroker.config.brokerId, controllerId))
@@ -210,7 +210,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
@Test
def testBackToBackPreferredReplicaLeaderElections(): Unit = {
servers = makeServers(2)
- val controllerId = TestUtils.waitUntilControllerElected(zkUtils)
+ val controllerId = TestUtils.waitUntilControllerElected(zkClient)
val otherBroker = servers.find(_.config.brokerId != controllerId).get
val tp = TopicAndPartition("t", 0)
val assignment = Map(tp.partition -> Seq(otherBroker.config.brokerId, controllerId))
@@ -222,7 +222,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
@Test
def testPreferredReplicaLeaderElectionWithOfflinePreferredReplica(): Unit = {
servers = makeServers(2)
- val controllerId = TestUtils.waitUntilControllerElected(zkUtils)
+ val controllerId = TestUtils.waitUntilControllerElected(zkClient)
val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
val tp = TopicAndPartition("t", 0)
val assignment = Map(tp.partition -> Seq(otherBrokerId, controllerId))
@@ -239,7 +239,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
@Test
def testAutoPreferredReplicaLeaderElection(): Unit = {
servers = makeServers(2, autoLeaderRebalanceEnable = true)
- val controllerId = TestUtils.waitUntilControllerElected(zkUtils)
+ val controllerId = TestUtils.waitUntilControllerElected(zkClient)
val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
val tp = TopicAndPartition("t", 0)
val assignment = Map(tp.partition -> Seq(1, 0))
@@ -256,7 +256,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
@Test
def testLeaderAndIsrWhenEntireIsrOfflineAndUncleanLeaderElectionDisabled(): Unit = {
servers = makeServers(2)
- val controllerId = TestUtils.waitUntilControllerElected(zkUtils)
+ val controllerId = TestUtils.waitUntilControllerElected(zkClient)
val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
val tp = TopicAndPartition("t", 0)
val assignment = Map(tp.partition -> Seq(otherBrokerId))
@@ -276,7 +276,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
@Test
def testLeaderAndIsrWhenEntireIsrOfflineAndUncleanLeaderElectionEnabled(): Unit = {
servers = makeServers(2, uncleanLeaderElectionEnable = true)
- val controllerId = TestUtils.waitUntilControllerElected(zkUtils)
+ val controllerId = TestUtils.waitUntilControllerElected(zkClient)
val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
val tp = TopicAndPartition("t", 0)
val assignment = Map(tp.partition -> Seq(otherBrokerId))
diff --git a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
index bec5282..a49523f 100644
--- a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
@@ -77,7 +77,7 @@ class AutoOffsetResetTest extends KafkaServerTestHarness with Logging {
* Returns the count of messages received.
*/
def resetAndConsume(numMessages: Int, resetTo: String, offset: Long): Int = {
- TestUtils.createTopic(zkClient, topic, 1, 1, servers)
+ createTopic(topic, 1, 1)
val producer: Producer[String, Array[Byte]] = TestUtils.createProducer(
TestUtils.getBrokerListStrFromServers(servers),
diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
index 6ecec3c..617b326 100644
--- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
@@ -43,7 +43,7 @@ class FetcherTest extends KafkaServerTestHarness {
@Before
override def setUp() {
super.setUp
- TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment = Map(0 -> Seq(configs.head.brokerId)), servers = servers)
+ createTopic(topic, partitionReplicaAssignment = Map(0 -> Seq(configs.head.brokerId)))
val cluster = new Cluster(servers.map { s =>
new Broker(s.config.brokerId, "localhost", boundPort(s), listenerName, securityProtocol)
diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index fed78a5..9c09a43 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -105,6 +105,23 @@ abstract class KafkaServerTestHarness extends ZooKeeperTestHarness {
}
/**
+ * Create a topic in ZooKeeper.
+ * Wait until the leader is elected and the metadata is propagated to all brokers.
+ * Return the leader for each partition.
+ */
+ def createTopic(topic: String, numPartitions: Int = 1, replicationFactor: Int = 1,
+ topicConfig: Properties = new Properties): scala.collection.immutable.Map[Int, Int] =
+ TestUtils.createTopic(zkClient, topic, numPartitions, replicationFactor, servers, topicConfig)
+
+ /**
+ * Create a topic in ZooKeeper using a customized replica assignment.
+ * Wait until the leader is elected and the metadata is propagated to all brokers.
+ * Return the leader for each partition.
+ */
+ def createTopic(topic: String, partitionReplicaAssignment: collection.Map[Int, Seq[Int]]): scala.collection.immutable.Map[Int, Int] =
+ TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment, servers)
+
+ /**
* Pick a broker at random and kill it if it isn't already dead
* Return the id of the broker killed
*/
diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
index d6c59d2..0cf95e9 100755
--- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
@@ -117,7 +117,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness {
private def produceAndMultiFetch(producer: Producer[String, String]) {
for(topic <- List("test1", "test2", "test3", "test4"))
- TestUtils.createTopic(zkClient, topic, servers = servers)
+ createTopic(topic)
// send some messages
val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
@@ -186,7 +186,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness {
private def multiProduce(producer: Producer[String, String]) {
val topics = Map("test4" -> 0, "test1" -> 0, "test2" -> 0, "test3" -> 0)
- topics.keys.map(topic => TestUtils.createTopic(zkClient, topic, servers = servers))
+ topics.keys.map(topic => createTopic(topic))
val messages = new mutable.HashMap[String, Seq[String]]
val builder = new FetchRequestBuilder()
@@ -214,7 +214,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness {
@Test
def testConsumerEmptyTopic() {
val newTopic = "new-topic"
- TestUtils.createTopic(zkClient, newTopic, numPartitions = 1, replicationFactor = 1, servers = servers)
+ createTopic(newTopic, numPartitions = 1, replicationFactor = 1)
val fetchResponse = consumer.fetch(new FetchRequestBuilder().addFetch(newTopic, 0, 0, 10000).build())
assertFalse(fetchResponse.messageSet(newTopic, 0).iterator.hasNext)
@@ -223,7 +223,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness {
@Test
def testPipelinedProduceRequests() {
val topics = Map("test4" -> 0, "test1" -> 0, "test2" -> 0, "test3" -> 0)
- topics.keys.map(topic => TestUtils.createTopic(zkClient, topic, servers = servers))
+ topics.keys.map(topic => createTopic(topic))
val props = new Properties()
props.put("request.required.acks", "0")
val pipelinedProducer: Producer[String, String] =
diff --git a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
index 215776f..dbd9118 100644
--- a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
@@ -57,7 +57,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
requestHandlerLogger.setLevel(Level.FATAL)
// create the topic
- TestUtils.createTopic(zkClient, topic, numParts, 1, servers)
+ createTopic(topic, numParts, 1)
// send some messages to each broker
val sentMessages1 = sendMessages(servers, nMessages, "batch1")
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
index b8794f3..4227764 100644
--- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -54,7 +54,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
def testMetricsLeak() {
val topic = "test-metrics-leak"
// create topic topic1 with 1 partition on broker 0
- createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = servers)
+ createTopic(topic, numPartitions = 1, replicationFactor = 1)
// force creation not client's specific metrics.
createAndShutdownStep(topic, "group0", "consumer0", "producer0")
@@ -130,7 +130,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
val topicConfig = new Properties
topicConfig.setProperty(LogConfig.MinInSyncReplicasProp, "2")
- createTopic(zkClient, topic, 1, numNodes, servers, topicConfig)
+ createTopic(topic, 1, numNodes, topicConfig)
// Produce a few messages to create the metrics
TestUtils.produceMessages(servers, topic, nMessages)
@@ -176,6 +176,19 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.controller:type=KafkaController,name=GlobalPartitionCount"), 1)
}
+ /**
+ * Test that the metrics are created with the right name, testZooKeeperStateChangeRateMetrics
+ * and testZooKeeperSessionStateMetric in ZooKeeperClientTest test the metrics behaviour.
+ */
+ @Test
+ def testSessionExpireListenerMetrics(): Unit = {
+ val metrics = Metrics.defaultRegistry.allMetrics
+
+ assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.server:type=SessionExpireListener,name=SessionState"), 1)
+ assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.server:type=SessionExpireListener,name=ZooKeeperExpiresPerSec"), 1)
+ assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.server:type=SessionExpireListener,name=ZooKeeperDisconnectsPerSec"), 1)
+ }
+
private def meterCount(metricName: String): Long = {
Metrics.defaultRegistry.allMetrics.asScala
.filterKeys(_.getMBeanName.endsWith(metricName))
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index 9279d90..5c1d4da 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -96,7 +96,7 @@ class SyncProducerTest extends KafkaServerTestHarness {
val props = TestUtils.getSyncProducerConfig(boundPort(server))
val producer = new SyncProducer(new SyncProducerConfig(props))
- TestUtils.createTopic(zkClient, "test", numPartitions = 1, replicationFactor = 1, servers = servers)
+ createTopic("test", numPartitions = 1, replicationFactor = 1)
val message1 = new Message(new Array[Byte](configs.head.messageMaxBytes + 1))
val messageSet1 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message1)
diff --git a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
index 646143c..033ca67 100644
--- a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
@@ -79,12 +79,16 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
assertTrue(zkUtils.isSecure)
for (path <- zkUtils.persistentZkPaths) {
zkUtils.makeSurePersistentPathExists(path)
- if(!path.equals(ZkUtils.ConsumersPath)) {
+ if (ZkUtils.sensitivePath(path)) {
val aclList = zkUtils.zkConnection.getAcl(path).getKey
- assertTrue(aclList.size == 2)
- for (acl: ACL <- aclList.asScala) {
- assertTrue(TestUtils.isAclSecure(acl, false))
- }
+ assertEquals(s"Unexpected acl list size for $path", 1, aclList.size)
+ for (acl <- aclList.asScala)
+ assertTrue(TestUtils.isAclSecure(acl, sensitive = true))
+ } else if (!path.equals(ZkUtils.ConsumersPath)) {
+ val aclList = zkUtils.zkConnection.getAcl(path).getKey
+ assertEquals(s"Unexpected acl list size for $path", 2, aclList.size)
+ for (acl <- aclList.asScala)
+ assertTrue(TestUtils.isAclSecure(acl, sensitive = false))
}
}
// Test that can create: createEphemeralPathExpectConflict
diff --git a/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestTest.scala b/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestTest.scala
index 9a3187b..4a47400 100644
--- a/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestTest.scala
@@ -19,7 +19,6 @@ package kafka.server
import java.util.Properties
-import kafka.utils.TestUtils
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, AddPartitionsToTxnResponse}
@@ -38,7 +37,7 @@ class AddPartitionsToTxnRequestTest extends BaseRequestTest {
@Before
override def setUp(): Unit = {
super.setUp()
- TestUtils.createTopic(zkClient, topic1, numPartitions, servers.size, servers, new Properties())
+ createTopic(topic1, numPartitions, servers.size, new Properties())
}
@Test
diff --git a/core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala
index fabe3d8..1739d27 100644
--- a/core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala
@@ -54,7 +54,7 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest {
assertTrue(servers.head.logManager.getLog(tp).isEmpty)
}
- TestUtils.createTopic(zkClient, topic, partitionNum, 1, servers)
+ createTopic(topic, partitionNum, 1)
(0 until partitionNum).foreach { partition =>
assertEquals(logDir1, servers.head.logManager.getLog(new TopicPartition(topic, partition)).get.dir.getParent)
}
@@ -88,7 +88,7 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest {
assertEquals(Errors.LOG_DIR_NOT_FOUND, alterReplicaDirResponse1.responses().get(new TopicPartition(topic, 0)))
assertEquals(Errors.REPLICA_NOT_AVAILABLE, alterReplicaDirResponse1.responses().get(new TopicPartition(topic, 1)))
- TestUtils.createTopic(zkClient, topic, 3, 1, servers)
+ createTopic(topic, 3, 1)
// Test AlterReplicaDirRequest after topic creation
val partitionDirs2 = mutable.Map.empty[TopicPartition, String]
diff --git a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
index a6381a6..13a2d23 100644
--- a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
@@ -59,7 +59,7 @@ class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest {
def testErrorCreateTopicsRequests() {
val timeout = 10000
val existingTopic = "existing-topic"
- TestUtils.createTopic(zkClient, existingTopic, 1, 1, servers)
+ createTopic(existingTopic, 1, 1)
// Basic
validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(Map(existingTopic -> new CreateTopicsRequest.TopicDetails(1, 1.toShort)).asJava, timeout).build(),
diff --git a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala
index 3598626..42e9ff8 100644
--- a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala
+++ b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala
@@ -21,7 +21,6 @@ import java.util
import java.util.Properties
import kafka.log.LogConfig
-import kafka.utils.TestUtils
import org.apache.kafka.common.errors.PolicyViolationException
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.CreateTopicsRequest
@@ -62,7 +61,7 @@ class CreateTopicsRequestWithPolicyTest extends AbstractCreateTopicsRequestTest
def testErrorCreateTopicsRequests() {
val timeout = 10000
val existingTopic = "existing-topic"
- TestUtils.createTopic(zkClient, existingTopic, 1, 1, servers)
+ createTopic(existingTopic, 1, 1)
// Policy violations
validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(
diff --git a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
index 237e918..4388e64 100644
--- a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
@@ -32,11 +32,11 @@ class DeleteTopicsRequestTest extends BaseRequestTest {
def testValidDeleteTopicRequests() {
val timeout = 10000
// Single topic
- TestUtils.createTopic(zkClient, "topic-1", 1, 1, servers)
+ createTopic("topic-1", 1, 1)
validateValidDeleteTopicRequests(new DeleteTopicsRequest.Builder(Set("topic-1").asJava, timeout).build())
// Multi topic
- TestUtils.createTopic(zkClient, "topic-3", 5, 2, servers)
- TestUtils.createTopic(zkClient, "topic-4", 1, 2, servers)
+ createTopic("topic-3", 5, 2)
+ createTopic("topic-4", 1, 2)
validateValidDeleteTopicRequests(new DeleteTopicsRequest.Builder(Set("topic-3", "topic-4").asJava, timeout).build())
}
@@ -61,7 +61,7 @@ class DeleteTopicsRequestTest extends BaseRequestTest {
Map("invalid-topic" -> Errors.UNKNOWN_TOPIC_OR_PARTITION))
// Partial
- TestUtils.createTopic(zkClient, "partial-topic-1", 1, 1, servers)
+ createTopic("partial-topic-1", 1, 1)
validateErrorDeleteTopicRequests(new DeleteTopicsRequest.Builder(Set(
"partial-topic-1",
"partial-invalid-topic").asJava, timeout).build(),
@@ -72,7 +72,7 @@ class DeleteTopicsRequestTest extends BaseRequestTest {
)
// Timeout
- TestUtils.createTopic(zkClient, timeoutTopic, 5, 2, servers)
+ createTopic(timeoutTopic, 5, 2)
// Must be a 0ms timeout to avoid transient test failures. Even a timeout of 1ms has succeeded in the past.
validateErrorDeleteTopicRequests(new DeleteTopicsRequest.Builder(Set(timeoutTopic).asJava, 0).build(),
Map(timeoutTopic -> Errors.REQUEST_TIMED_OUT))
diff --git a/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala
index 9074ad8..8d1eb2c 100644
--- a/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala
@@ -40,7 +40,7 @@ class DescribeLogDirsRequestTest extends BaseRequestTest {
val onlineDir = new File(servers.head.config.logDirs.head).getAbsolutePath
val offlineDir = new File(servers.head.config.logDirs.tail.head).getAbsolutePath
servers.head.replicaManager.handleLogDirFailure(offlineDir)
- TestUtils.createTopic(zkClient, topic, partitionNum, 1, servers)
+ createTopic(topic, partitionNum, 1)
TestUtils.produceMessages(servers, topic, 10)
val request = new DescribeLogDirsRequest.Builder(null).build()
diff --git a/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
index 1d8912b..a426108 100755
--- a/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
@@ -113,7 +113,7 @@ class EdgeCaseRequestTest extends KafkaServerTestHarness {
val topic = "topic"
val topicPartition = new TopicPartition(topic, 0)
val correlationId = -1
- TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = servers)
+ createTopic(topic, numPartitions = 1, replicationFactor = 1)
val version = ApiKeys.PRODUCE.latestVersion: Short
val serializedBytes = {
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index f1811ad..9090fda 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -23,7 +23,6 @@ import java.util.Properties
import kafka.api.KAFKA_0_11_0_IV2
import kafka.log.LogConfig
import kafka.utils.TestUtils
-import kafka.utils.TestUtils._
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@@ -342,8 +341,8 @@ class FetchRequestTest extends BaseRequestTest {
topicConfig.setProperty(LogConfig.MinInSyncReplicasProp, 2.toString)
configs.foreach { case (k, v) => topicConfig.setProperty(k, v) }
topics.flatMap { topic =>
- val partitionToLeader = createTopic(zkClient, topic, numPartitions = numPartitions, replicationFactor = 2,
- servers = servers, topicConfig = topicConfig)
+ val partitionToLeader = createTopic(topic, numPartitions = numPartitions, replicationFactor = 2,
+ topicConfig = topicConfig)
partitionToLeader.map { case (partition, leader) => new TopicPartition(topic, partition) -> leader }
}.toMap
}
diff --git a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
index 6b3fff3..ba33ab0 100644
--- a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
@@ -23,7 +23,7 @@ import java.util.concurrent.{ExecutionException, TimeUnit}
import kafka.server.LogDirFailureTest._
import kafka.api.IntegrationTestHarness
import kafka.controller.{OfflineReplica, PartitionAndReplica}
-import kafka.utils.{CoreUtils, Exit, TestUtils, ZkUtils}
+import kafka.utils.{CoreUtils, Exit, TestUtils}
import kafka.zk.LogDirEventNotificationZNode
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
@@ -55,7 +55,7 @@ class LogDirFailureTest extends IntegrationTestHarness {
@Before
override def setUp() {
super.setUp()
- TestUtils.createTopic(zkClient, topic, partitionNum, serverCount, servers = servers)
+ createTopic(topic, partitionNum, serverCount)
}
@Test
diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
index 51e1d95..d4c3e7c 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
@@ -85,8 +85,8 @@ class MetadataRequestTest extends BaseRequestTest {
val internalTopic = Topic.GROUP_METADATA_TOPIC_NAME
val notInternalTopic = "notInternal"
// create the topics
- TestUtils.createTopic(zkClient, internalTopic, 3, 2, servers)
- TestUtils.createTopic(zkClient, notInternalTopic, 3, 2, servers)
+ createTopic(internalTopic, 3, 2)
+ createTopic(notInternalTopic, 3, 2)
val metadataResponse = sendMetadataRequest(MetadataRequest.Builder.allTopics.build(1.toShort))
assertTrue("Response should have no errors", metadataResponse.errors.isEmpty)
@@ -104,8 +104,8 @@ class MetadataRequestTest extends BaseRequestTest {
@Test
def testNoTopicsRequest() {
// create some topics
- TestUtils.createTopic(zkClient, "t1", 3, 2, servers)
- TestUtils.createTopic(zkClient, "t2", 3, 2, servers)
+ createTopic("t1", 3, 2)
+ createTopic("t2", 3, 2)
// v0, Doesn't support a "no topics" request
// v1, Empty list represents "no topics"
@@ -128,7 +128,7 @@ class MetadataRequestTest extends BaseRequestTest {
val topic2 = "t2"
val topic3 = "t3"
val topic4 = "t4"
- TestUtils.createTopic(zkClient, topic1, 1, 1, servers)
+ createTopic(topic1, 1, 1)
val response1 = sendMetadataRequest(new MetadataRequest(Seq(topic1, topic2).asJava, true, ApiKeys.METADATA.latestVersion))
checkAutoCreatedTopic(topic1, topic2, response1)
@@ -147,8 +147,8 @@ class MetadataRequestTest extends BaseRequestTest {
@Test
def testAllTopicsRequest() {
// create some topics
- TestUtils.createTopic(zkClient, "t1", 3, 2, servers)
- TestUtils.createTopic(zkClient, "t2", 3, 2, servers)
+ createTopic("t1", 3, 2)
+ createTopic("t2", 3, 2)
// v0, Empty list represents all topics
val metadataResponseV0 = sendMetadataRequest(new MetadataRequest(List[String]().asJava, true, 0.toShort))
@@ -167,7 +167,7 @@ class MetadataRequestTest extends BaseRequestTest {
@Test
def testPreferredReplica(): Unit = {
val replicaAssignment = Map(0 -> Seq(1, 2, 0), 1 -> Seq(2, 0, 1))
- TestUtils.createTopic(zkClient, "t1", replicaAssignment, servers)
+ createTopic("t1", replicaAssignment)
// Call controller and one different broker to ensure that metadata propagation works correctly
val responses = Seq(
sendMetadataRequest(new MetadataRequest.Builder(Seq("t1").asJava, true).build(), Some(controllerSocketServer)),
@@ -194,7 +194,7 @@ class MetadataRequestTest extends BaseRequestTest {
val replicaCount = 3
// create a topic with 3 replicas
- TestUtils.createTopic(zkClient, replicaDownTopic, 1, replicaCount, servers)
+ createTopic(replicaDownTopic, 1, replicaCount)
// Kill a replica node that is not the leader
val metadataResponse = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava, true, 1.toShort))
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 5c80288..886c318 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -74,7 +74,7 @@ class RequestQuotaTest extends BaseRequestTest {
RequestQuotaTest.principal = KafkaPrincipal.ANONYMOUS
super.setUp()
- TestUtils.createTopic(zkClient, topic, numPartitions, 1, servers)
+ createTopic(topic, numPartitions, 1)
leaderNode = servers.head
// Change default client-id request quota to a small value and a single unthrottledClient with a large quota
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index c8a81c7..2c2d9dd 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -37,8 +37,8 @@ import kafka.security.auth.{Acl, Authorizer, Resource}
import kafka.serializer.{DefaultEncoder, Encoder, StringEncoder}
import kafka.server._
import kafka.server.checkpoints.OffsetCheckpointFile
-import ZkUtils._
import Implicits._
+import kafka.controller.LeaderIsrAndControllerEpoch
import kafka.zk.{AdminZkClient, BrokerIdsZNode, BrokerInfo, KafkaZkClient}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer, OffsetAndMetadata, RangeAssignor}
@@ -60,7 +60,6 @@ import org.junit.Assert._
import scala.collection.JavaConverters._
import scala.collection.{Map, mutable}
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
-import scala.util.Try
/**
* Utility functions to help with testing
@@ -707,8 +706,8 @@ object TestUtils extends Logging {
val listenerName = ListenerName.forSecurityProtocol(protocol)
Broker(b.id, Seq(EndPoint("localhost", 6667, listenerName, protocol)), b.rack)
}
- brokers.foreach(b => zkClient.registerBrokerInZk(new BrokerInfo(b.id, "localhost", 6667,
- b.endPoints, jmxPort = -1, rack = b.rack, ApiVersion.latestVersion)))
+ brokers.foreach(b => zkClient.registerBrokerInZk(BrokerInfo(Broker(b.id, b.endPoints, rack = b.rack),
+ ApiVersion.latestVersion, jmxPort = -1)))
brokers
}
@@ -753,24 +752,18 @@ object TestUtils extends Logging {
new ProducerRequest(correlationId, clientId, acks.toShort, timeout, collection.mutable.Map(data:_*))
}
- def makeLeaderForPartition(zkUtils: ZkUtils,
+ def makeLeaderForPartition(zkClient: KafkaZkClient,
topic: String,
leaderPerPartitionMap: scala.collection.immutable.Map[Int, Int],
controllerEpoch: Int) {
- leaderPerPartitionMap.foreach { case (partition, leader) =>
- try {
- val newLeaderAndIsr = zkUtils.getLeaderAndIsrForPartition(topic, partition)
- .map(_.newLeader(leader))
- .getOrElse(LeaderAndIsr(leader, List(leader)))
-
- zkUtils.updatePersistentPath(
- getTopicPartitionLeaderAndIsrPath(topic, partition),
- zkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch)
- )
- } catch {
- case oe: Throwable => error(s"Error while electing leader for partition [$topic,$partition]", oe)
- }
+ val newLeaderIsrAndControllerEpochs = leaderPerPartitionMap.map { case (partition, leader) =>
+ val topicPartition = new TopicPartition(topic, partition)
+ val newLeaderAndIsr = zkClient.getTopicPartitionState(topicPartition)
+ .map(_.leaderAndIsr.newLeader(leader))
+ .getOrElse(LeaderAndIsr(leader, List(leader)))
+ topicPartition -> LeaderIsrAndControllerEpoch(newLeaderAndIsr, controllerEpoch)
}
+ zkClient.setTopicPartitionStatesRaw(newLeaderIsrAndControllerEpochs)
}
/**
@@ -795,7 +788,7 @@ object TestUtils extends Logging {
var electedOrChangedLeader: Option[Int] = None
while (electedOrChangedLeader.isEmpty && System.currentTimeMillis() < startTime + timeoutMs) {
// check if leader is elected
- leader = zkClient.getLeaderForPartition(new TopicPartition(topic, partition))
+ leader = zkClient.getLeaderForPartition(topicPartition)
leader match {
case Some(l) => (newLeaderOpt, oldLeaderOpt) match {
case (Some(newLeader), _) if newLeader == l =>
@@ -949,13 +942,9 @@ object TestUtils extends Logging {
leader
}
- def waitUntilControllerElected(zkUtils: ZkUtils, timeout: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Int = {
- var controllerIdTry: Try[Int] = null
- TestUtils.waitUntilTrue(() => {
- controllerIdTry = Try { zkUtils.getController() }
- controllerIdTry.isSuccess
- }, s"Controller not elected after $timeout ms", waitTime = timeout)
- controllerIdTry.get
+ def waitUntilControllerElected(zkClient: KafkaZkClient, timeout: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Int = {
+ val (controllerId, _) = TestUtils.computeUntilTrue(zkClient.getControllerId, waitTime = timeout)(_.isDefined)
+ controllerId.getOrElse(fail(s"Controller not elected after $timeout ms"))
}
def waitUntilLeaderIsKnown(servers: Seq[KafkaServer], topic: String, partition: Int,
diff --git a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
index 1fdc3ea..fe5fbff 100644
--- a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
@@ -105,7 +105,7 @@ class AdminZkClientTest extends ZooKeeperTestHarness with Logging with RackAware
// create the topic
adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment)
// create leaders for all partitions
- TestUtils.makeLeaderForPartition(zkUtils, topic, leaderForPartitionMap, 1)
+ TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1)
val actualReplicaMap = leaderForPartitionMap.keys.map(p => p -> zkClient.getReplicasForPartition(new TopicPartition(topic, p))).toMap
assertEquals(expectedReplicaAssignment.size, actualReplicaMap.size)
for(i <- 0 until actualReplicaMap.size)
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index f0d6cf0..f3b8e81 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -20,7 +20,7 @@ import java.util.{Properties, UUID}
import java.nio.charset.StandardCharsets.UTF_8
import kafka.api.ApiVersion
-import kafka.cluster.EndPoint
+import kafka.cluster.{Broker, EndPoint}
import kafka.log.LogConfig
import kafka.security.auth._
import kafka.server.ConfigType
@@ -419,15 +419,12 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
def testBrokerRegistrationMethods() {
zkClient.createTopLevelPaths()
- val brokerInfo = new BrokerInfo(1, "test.host", 9999,
+ val brokerInfo = BrokerInfo(Broker(1,
Seq(new EndPoint("test.host", 9999, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT)),
- 9998, None, ApiVersion.latestVersion)
+ rack = None), ApiVersion.latestVersion, jmxPort = 9998)
zkClient.registerBrokerInZk(brokerInfo)
- val broker = zkClient.getBroker(1).getOrElse(fail("Unregistered broker"))
-
- assertEquals(brokerInfo.id, broker.id)
- assertEquals(brokerInfo.endpoints(), broker.endPoints.mkString(","))
+ assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1))
}
@Test
@@ -448,9 +445,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
def testCreateTopLevelPaths() {
zkClient.createTopLevelPaths()
- ZkData.PersistentZkPaths.foreach {
- path => assertTrue(zkClient.pathExists(path))
- }
+ ZkData.PersistentZkPaths.foreach(path => assertTrue(zkClient.pathExists(path)))
}
@Test
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index a437810..c7c3152 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -32,7 +32,6 @@ import scala.collection.JavaConverters._
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.consumer.internals.AbstractCoordinator
import kafka.controller.ControllerEventManager
-import kafka.zookeeper.ZooKeeperClient
import org.apache.kafka.common.utils.Time
@Category(Array(classOf[IntegrationTest]))
@@ -45,7 +44,6 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
protected val zkAclsEnabled: Option[Boolean] = None
var zkUtils: ZkUtils = null
- var zooKeeperClient: ZooKeeperClient = null
var zkClient: KafkaZkClient = null
var adminZkClient: AdminZkClient = null
@@ -58,10 +56,8 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
def setUp() {
zookeeper = new EmbeddedZookeeper()
zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled))
-
- val time = Time.SYSTEM
- zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkMaxInFlightRequests, time)
- zkClient = new KafkaZkClient(zooKeeperClient, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), time)
+ zkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout,
+ zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
adminZkClient = new AdminZkClient(zkClient)
}
diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
index d402abb..c8ebaa9 100644
--- a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
@@ -16,20 +16,19 @@
*/
package kafka.zookeeper
-import java.net.UnknownHostException
import java.nio.charset.StandardCharsets
import java.util.UUID
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.{ArrayBlockingQueue, CountDownLatch, TimeUnit}
-import javax.security.auth.login.Configuration
import com.yammer.metrics.Metrics
-import com.yammer.metrics.core.Meter
+import com.yammer.metrics.core.{Gauge, Meter, MetricName}
import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.utils.Time
import org.apache.zookeeper.KeeperException.{Code, NoNodeException}
import org.apache.zookeeper.Watcher.Event.{EventType, KeeperState}
+import org.apache.zookeeper.ZooKeeper.States
import org.apache.zookeeper.{CreateMode, WatchedEvent, ZooDefs}
import org.junit.Assert.{assertArrayEquals, assertEquals, assertTrue}
import org.junit.{After, Before, Test}
@@ -40,33 +39,41 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
private val mockPath = "/foo"
private val time = Time.SYSTEM
+ private var zooKeeperClient: ZooKeeperClient = _
+
@Before
override def setUp() {
cleanMetricsRegistry()
super.setUp()
+ zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkMaxInFlightRequests,
+ Time.SYSTEM, "testMetricGroup", "testMetricType")
}
@After
override def tearDown() {
+ if (zooKeeperClient != null)
+ zooKeeperClient.close()
super.tearDown()
System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
- Configuration.setConfiguration(null)
}
@Test(expected = classOf[IllegalArgumentException])
def testUnresolvableConnectString(): Unit = {
- new ZooKeeperClient("some.invalid.hostname.foo.bar.local", -1, -1, Int.MaxValue, time)
+ new ZooKeeperClient("some.invalid.hostname.foo.bar.local", -1, -1, Int.MaxValue, time, "testMetricGroup",
+ "testMetricType").close()
}
@Test(expected = classOf[ZooKeeperClientTimeoutException])
def testConnectionTimeout(): Unit = {
zookeeper.shutdown()
- new ZooKeeperClient(zkConnect, zkSessionTimeout, connectionTimeoutMs = 100, Int.MaxValue, time)
+ new ZooKeeperClient(zkConnect, zkSessionTimeout, connectionTimeoutMs = 100, Int.MaxValue, time, "testMetricGroup",
+ "testMetricType").close()
}
@Test
def testConnection(): Unit = {
- new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, time)
+ new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, time, "testMetricGroup",
+ "testMetricType").close()
}
@Test
@@ -80,8 +87,8 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
@Test
def testDeleteExistingZNode(): Unit = {
- import scala.collection.JavaConverters._
- val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
+ val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte],
+ ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
val deleteResponse = zooKeeperClient.handleRequest(DeleteRequest(mockPath, -1))
assertEquals("Response code for delete should be OK", Code.OK, deleteResponse.resultCode)
@@ -96,7 +103,8 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
@Test
def testExistsExistingZNode(): Unit = {
import scala.collection.JavaConverters._
- val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
+ val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte],
+ ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
val existsResponse = zooKeeperClient.handleRequest(ExistsRequest(mockPath))
assertEquals("Response code for exists should be OK", Code.OK, existsResponse.resultCode)
@@ -331,53 +339,60 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
}
}
- val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, time)
- zooKeeperClient.registerStateChangeHandler(stateChangeHandler)
- zooKeeperClient.reinitialize()
+ val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, time,
+ "testMetricGroup", "testMetricType")
+ try {
+ zooKeeperClient.registerStateChangeHandler(stateChangeHandler)
+ zooKeeperClient.reinitialize()
- assertTrue("Failed to receive auth failed notification", stateChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
+ assertTrue("Failed to receive auth failed notification", stateChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
+ } finally zooKeeperClient.close()
}
@Test
def testConnectionLossRequestTermination(): Unit = {
val batchSize = 10
- val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, 2, time)
+ val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, 2, time,
+ "testGroupType", "testGroupName")
zookeeper.shutdown()
- val requests = (1 to batchSize).map(i => GetDataRequest(s"/$i"))
- val countDownLatch = new CountDownLatch(1)
- val running = new AtomicBoolean(true)
- val unexpectedResponses = new ArrayBlockingQueue[GetDataResponse](batchSize)
- val requestThread = new Thread {
- override def run(): Unit = {
- while (running.get()) {
- val responses = zooKeeperClient.handleRequests(requests)
- val suffix = responses.dropWhile(response => response.resultCode != Code.CONNECTIONLOSS)
- if (!suffix.forall(response => response.resultCode == Code.CONNECTIONLOSS))
- responses.foreach(unexpectedResponses.add)
- if (!unexpectedResponses.isEmpty || suffix.nonEmpty)
- running.set(false)
+ try {
+ val requests = (1 to batchSize).map(i => GetDataRequest(s"/$i"))
+ val countDownLatch = new CountDownLatch(1)
+ val running = new AtomicBoolean(true)
+ val unexpectedResponses = new ArrayBlockingQueue[GetDataResponse](batchSize)
+ val requestThread = new Thread {
+ override def run(): Unit = {
+ while (running.get()) {
+ val responses = zooKeeperClient.handleRequests(requests)
+ val suffix = responses.dropWhile(response => response.resultCode != Code.CONNECTIONLOSS)
+ if (!suffix.forall(response => response.resultCode == Code.CONNECTIONLOSS))
+ responses.foreach(unexpectedResponses.add)
+ if (!unexpectedResponses.isEmpty || suffix.nonEmpty)
+ running.set(false)
+ }
+ countDownLatch.countDown()
}
- countDownLatch.countDown()
}
- }
- requestThread.start()
- val requestThreadTerminated = countDownLatch.await(30, TimeUnit.SECONDS)
- if (!requestThreadTerminated) {
- running.set(false)
- requestThread.join(5000)
- fail("Failed to receive a CONNECTIONLOSS response code after zookeeper has shutdown.")
- } else if (!unexpectedResponses.isEmpty) {
- fail(s"Received an unexpected non-CONNECTIONLOSS response code after a CONNECTIONLOSS response code from a single batch: $unexpectedResponses")
- }
+ requestThread.start()
+ val requestThreadTerminated = countDownLatch.await(30, TimeUnit.SECONDS)
+ if (!requestThreadTerminated) {
+ running.set(false)
+ requestThread.join(5000)
+ fail("Failed to receive a CONNECTIONLOSS response code after zookeeper has shutdown.")
+ } else if (!unexpectedResponses.isEmpty) {
+ fail(s"Received an unexpected non-CONNECTIONLOSS response code after a CONNECTIONLOSS response code from a single batch: $unexpectedResponses")
+ }
+ } finally zooKeeperClient.close()
}
- @Test
- def testSessionExpireListenerMetrics() {
- val metrics = Metrics.defaultRegistry
+ def isExpectedMetricName(metricName: MetricName, name: String): Boolean =
+ metricName.getName == name && metricName.getGroup == "testMetricGroup" && metricName.getType == "testMetricType"
+ @Test
+ def testZooKeeperStateChangeRateMetrics() {
def checkMeterCount(name: String, expected: Long) {
- val meter = metrics.allMetrics.asScala.collectFirst {
- case (metricName, meter: Meter) if metricName.getName == name => meter
+ val meter = Metrics.defaultRegistry.allMetrics.asScala.collectFirst {
+ case (metricName, meter: Meter) if isExpectedMetricName(metricName, name) => meter
}.getOrElse(sys.error(s"Unable to find meter with name $name"))
assertEquals(s"Unexpected meter count for $name", expected, meter.count)
}
@@ -396,6 +411,23 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
checkMeterCount(disconnectsPerSecName, 1)
}
+ @Test
+ def testZooKeeperSessionStateMetric(): Unit = {
+ def gaugeValue(name: String): Option[String] = {
+ Metrics.defaultRegistry.allMetrics.asScala.collectFirst {
+ case (metricName, gauge: Gauge[_]) if isExpectedMetricName(metricName, name) => gauge.value.asInstanceOf[String]
+ }
+ }
+
+ assertEquals(Some(States.CONNECTED.toString), gaugeValue("SessionState"))
+ assertEquals(States.CONNECTED, zooKeeperClient.connectionState)
+
+ zooKeeperClient.close()
+
+ assertEquals(None, gaugeValue("SessionState"))
+ assertEquals(States.CLOSED, zooKeeperClient.connectionState)
+ }
+
private def cleanMetricsRegistry() {
val metrics = Metrics.defaultRegistry
metrics.allMetrics.keySet.asScala.foreach(metrics.removeMetric)
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index 0551fac..855bcea 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -20,7 +20,6 @@ import kafka.log.LogConfig;
import kafka.utils.MockTime;
import kafka.zk.AdminZkClient;
import kafka.zk.KafkaZkClient;
-import kafka.zookeeper.ZooKeeperClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serde;
@@ -93,13 +92,9 @@ public class InternalTopicIntegrationTest {
}
private Properties getTopicConfigProperties(final String changelog) {
- final ZooKeeperClient zkClient = new ZooKeeperClient(
- CLUSTER.zKConnectString(),
- DEFAULT_ZK_SESSION_TIMEOUT_MS,
- DEFAULT_ZK_CONNECTION_TIMEOUT_MS,
- Integer.MAX_VALUE, Time.SYSTEM,
+ final KafkaZkClient kafkaZkClient = KafkaZkClient.apply(CLUSTER.zKConnectString(), false,
+ DEFAULT_ZK_SESSION_TIMEOUT_MS, DEFAULT_ZK_CONNECTION_TIMEOUT_MS, Integer.MAX_VALUE, Time.SYSTEM,
"testMetricGroup", "testMetricType");
- final KafkaZkClient kafkaZkClient = new KafkaZkClient(zkClient, false, Time.SYSTEM);
try {
final AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
index 275d580..6aafac0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
@@ -25,7 +25,6 @@ import kafka.utils.MockTime;
import kafka.utils.TestUtils;
import kafka.zk.AdminZkClient;
import kafka.zk.KafkaZkClient;
-import kafka.zookeeper.ZooKeeperClient;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.utils.Time;
import org.junit.rules.TemporaryFolder;
@@ -171,36 +170,25 @@ public class KafkaEmbedded {
final Properties topicConfig) {
log.debug("Creating topic { name: {}, partitions: {}, replication: {}, config: {} }",
topic, partitions, replication, topicConfig);
+ try (KafkaZkClient kafkaZkClient = createZkClient()) {
+ final AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);
+ adminZkClient.createTopic(topic, partitions, replication, topicConfig, RackAwareMode.Enforced$.MODULE$);
+ }
+ }
- final ZooKeeperClient zkClient = new ZooKeeperClient(
- zookeeperConnect(),
- DEFAULT_ZK_SESSION_TIMEOUT_MS,
- DEFAULT_ZK_CONNECTION_TIMEOUT_MS,
- Integer.MAX_VALUE,
- Time.SYSTEM,
- "testMetricGroup",
- "testMetricType");
- final KafkaZkClient kafkaZkClient = new KafkaZkClient(zkClient, false, Time.SYSTEM);
- final AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);
- adminZkClient.createTopic(topic, partitions, replication, topicConfig, RackAwareMode.Enforced$.MODULE$);
- kafkaZkClient.close();
+ private KafkaZkClient createZkClient() {
+ return KafkaZkClient.apply(zookeeperConnect(), false, DEFAULT_ZK_SESSION_TIMEOUT_MS,
+ DEFAULT_ZK_CONNECTION_TIMEOUT_MS, Integer.MAX_VALUE, Time.SYSTEM, "testMetricGroup", "testMetricType");
}
public void deleteTopic(final String topic) {
log.debug("Deleting topic { name: {} }", topic);
- final ZooKeeperClient zkClient = new ZooKeeperClient(
- zookeeperConnect(),
- DEFAULT_ZK_SESSION_TIMEOUT_MS,
- DEFAULT_ZK_CONNECTION_TIMEOUT_MS,
- Integer.MAX_VALUE,
- Time.SYSTEM,
- "testMetricGroup",
- "testMetricType");
- final KafkaZkClient kafkaZkClient = new KafkaZkClient(zkClient, false, Time.SYSTEM);
- final AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);
- adminZkClient.deleteTopic(topic);
- kafkaZkClient.close();
+ try (KafkaZkClient kafkaZkClient = createZkClient()) {
+ final AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);
+ adminZkClient.deleteTopic(topic);
+ kafkaZkClient.close();
+ }
}
public KafkaServer kafkaServer() {
--
To stop receiving notification emails like this one, please contact
['"commits@kafka.apache.org" <co...@kafka.apache.org>'].