You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/10/26 04:11:20 UTC
[2/3] kafka git commit: MINOR: Rename and change package of async
ZooKeeper classes
http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/main/scala/kafka/controller/ZookeeperClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ZookeeperClient.scala b/core/src/main/scala/kafka/controller/ZookeeperClient.scala
deleted file mode 100644
index 0009439..0000000
--- a/core/src/main/scala/kafka/controller/ZookeeperClient.scala
+++ /dev/null
@@ -1,374 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.controller
-
-import java.util.concurrent.locks.{ReentrantLock, ReentrantReadWriteLock}
-import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap, CountDownLatch, TimeUnit}
-
-import kafka.utils.CoreUtils.{inLock, inReadLock, inWriteLock}
-import kafka.utils.Logging
-import org.apache.zookeeper.AsyncCallback.{ACLCallback, Children2Callback, DataCallback, StatCallback, StringCallback, VoidCallback}
-import org.apache.zookeeper.KeeperException.Code
-import org.apache.zookeeper.Watcher.Event.{EventType, KeeperState}
-import org.apache.zookeeper.ZooKeeper.States
-import org.apache.zookeeper.data.{ACL, Stat}
-import org.apache.zookeeper.{CreateMode, KeeperException, WatchedEvent, Watcher, ZooKeeper}
-
-import scala.collection.JavaConverters._
-
-/**
- * ZookeeperClient is a zookeeper client that encourages pipelined requests to zookeeper.
- *
- * @param connectString comma separated host:port pairs, each corresponding to a zk server
- * @param sessionTimeoutMs session timeout in milliseconds
- * @param connectionTimeoutMs connection timeout in milliseconds
- * @param stateChangeHandler state change handler callbacks called by the underlying zookeeper client's EventThread.
- */
-class ZookeeperClient(connectString: String, sessionTimeoutMs: Int, connectionTimeoutMs: Int,
- stateChangeHandler: StateChangeHandler) extends Logging {
- this.logIdent = "[ZookeeperClient] "
- private val initializationLock = new ReentrantReadWriteLock()
- private val isConnectedOrExpiredLock = new ReentrantLock()
- private val isConnectedOrExpiredCondition = isConnectedOrExpiredLock.newCondition()
- private val zNodeChangeHandlers = new ConcurrentHashMap[String, ZNodeChangeHandler]().asScala
- private val zNodeChildChangeHandlers = new ConcurrentHashMap[String, ZNodeChildChangeHandler]().asScala
-
- info(s"Initializing a new session to $connectString.")
- @volatile private var zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZookeeperClientWatcher)
- waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS)
-
- /**
- * Send a request and wait for its response. See handle(Seq[AsyncRequest]) for details.
- *
- * @param request a single request to send and wait on.
- * @return an instance of the response with the specific type (e.g. CreateRequest -> CreateResponse).
- */
- def handleRequest[Req <: AsyncRequest](request: Req): Req#Response = {
- handleRequests(Seq(request)).head
- }
-
- /**
- * Send a pipelined sequence of requests and wait for all of their responses.
- *
- * The watch flag on each outgoing request will be set if we've already registered a handler for the
- * path associated with the request.
- *
- * @param requests a sequence of requests to send and wait on.
- * @return the responses for the requests. If all requests have the same type, the responses will have the respective
- * response type (e.g. Seq[CreateRequest] -> Seq[CreateResponse]). Otherwise, the most specific common supertype
- * will be used (e.g. Seq[AsyncRequest] -> Seq[AsyncResponse]).
- */
- def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = inReadLock(initializationLock) {
- if (requests.isEmpty)
- Seq.empty
- else {
- val countDownLatch = new CountDownLatch(requests.size)
- val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size)
-
- requests.foreach { request =>
- send(request) { response =>
- responseQueue.add(response)
- countDownLatch.countDown()
- }
- }
- countDownLatch.await()
- responseQueue.asScala.toBuffer
- }
- }
-
- private def send[Req <: AsyncRequest](request: Req)(processResponse: Req#Response => Unit): Unit = {
- // Safe to cast as we always create a response of the right type
- def callback(response: AsyncResponse): Unit = processResponse(response.asInstanceOf[Req#Response])
-
- request match {
- case ExistsRequest(path, ctx) =>
- zooKeeper.exists(path, shouldWatch(request), new StatCallback {
- override def processResult(rc: Int, path: String, ctx: Any, stat: Stat): Unit =
- callback(ExistsResponse(Code.get(rc), path, Option(ctx), stat))
- }, ctx.orNull)
- case GetDataRequest(path, ctx) =>
- zooKeeper.getData(path, shouldWatch(request), new DataCallback {
- override def processResult(rc: Int, path: String, ctx: Any, data: Array[Byte], stat: Stat): Unit =
- callback(GetDataResponse(Code.get(rc), path, Option(ctx), data, stat))
- }, ctx.orNull)
- case GetChildrenRequest(path, ctx) =>
- zooKeeper.getChildren(path, shouldWatch(request), new Children2Callback {
- override def processResult(rc: Int, path: String, ctx: Any, children: java.util.List[String], stat: Stat): Unit =
- callback(GetChildrenResponse(Code.get(rc), path, Option(ctx),
- Option(children).map(_.asScala).getOrElse(Seq.empty), stat))
- }, ctx.orNull)
- case CreateRequest(path, data, acl, createMode, ctx) =>
- zooKeeper.create(path, data, acl.asJava, createMode, new StringCallback {
- override def processResult(rc: Int, path: String, ctx: Any, name: String): Unit =
- callback(CreateResponse(Code.get(rc), path, Option(ctx), name))
- }, ctx.orNull)
- case SetDataRequest(path, data, version, ctx) =>
- zooKeeper.setData(path, data, version, new StatCallback {
- override def processResult(rc: Int, path: String, ctx: Any, stat: Stat): Unit =
- callback(SetDataResponse(Code.get(rc), path, Option(ctx), stat))
- }, ctx.orNull)
- case DeleteRequest(path, version, ctx) =>
- zooKeeper.delete(path, version, new VoidCallback {
- override def processResult(rc: Int, path: String, ctx: Any): Unit =
- callback(DeleteResponse(Code.get(rc), path, Option(ctx)))
- }, ctx.orNull)
- case GetAclRequest(path, ctx) =>
- zooKeeper.getACL(path, null, new ACLCallback {
- override def processResult(rc: Int, path: String, ctx: Any, acl: java.util.List[ACL], stat: Stat): Unit = {
- callback(GetAclResponse(Code.get(rc), path, Option(ctx), Option(acl).map(_.asScala).getOrElse(Seq.empty),
- stat))
- }}, ctx.orNull)
- case SetAclRequest(path, acl, version, ctx) =>
- zooKeeper.setACL(path, acl.asJava, version, new StatCallback {
- override def processResult(rc: Int, path: String, ctx: Any, stat: Stat): Unit =
- callback(SetAclResponse(Code.get(rc), path, Option(ctx), stat))
- }, ctx.orNull)
- }
- }
-
- /**
- * Wait indefinitely until the underlying zookeeper client to reaches the CONNECTED state.
- * @throws ZookeeperClientAuthFailedException if the authentication failed either before or while waiting for connection.
- * @throws ZookeeperClientExpiredException if the session expired either before or while waiting for connection.
- */
- def waitUntilConnected(): Unit = inLock(isConnectedOrExpiredLock) {
- waitUntilConnected(Long.MaxValue, TimeUnit.MILLISECONDS)
- }
-
- private def waitUntilConnected(timeout: Long, timeUnit: TimeUnit): Unit = {
- info("Waiting until connected.")
- var nanos = timeUnit.toNanos(timeout)
- inLock(isConnectedOrExpiredLock) {
- var state = zooKeeper.getState
- while (!state.isConnected && state.isAlive) {
- if (nanos <= 0) {
- throw new ZookeeperClientTimeoutException(s"Timed out waiting for connection while in state: $state")
- }
- nanos = isConnectedOrExpiredCondition.awaitNanos(nanos)
- state = zooKeeper.getState
- }
- if (state == States.AUTH_FAILED) {
- throw new ZookeeperClientAuthFailedException("Auth failed either before or while waiting for connection")
- } else if (state == States.CLOSED) {
- throw new ZookeeperClientExpiredException("Session expired either before or while waiting for connection")
- }
- }
- info("Connected.")
- }
-
- // If this method is changed, the documentation for registerZNodeChangeHandler and/or registerZNodeChildChangeHandler
- // may need to be updated.
- private def shouldWatch(request: AsyncRequest): Boolean = request match {
- case _: GetChildrenRequest => zNodeChildChangeHandlers.contains(request.path)
- case _: ExistsRequest | _: GetDataRequest => zNodeChangeHandlers.contains(request.path)
- case _ => throw new IllegalArgumentException(s"Request $request is not watchable")
- }
-
- /**
- * Register the handler to ZookeeperClient. This is just a local operation. This does not actually register a watcher.
- *
- * The watcher is only registered once the user calls handle(AsyncRequest) or handle(Seq[AsyncRequest])
- * with either a GetDataRequest or ExistsRequest.
- *
- * NOTE: zookeeper only allows registration to a nonexistent znode with ExistsRequest.
- *
- * @param zNodeChangeHandler the handler to register
- */
- def registerZNodeChangeHandler(zNodeChangeHandler: ZNodeChangeHandler): Unit = {
- zNodeChangeHandlers.put(zNodeChangeHandler.path, zNodeChangeHandler)
- }
-
- /**
- * Unregister the handler from ZookeeperClient. This is just a local operation.
- * @param path the path of the handler to unregister
- */
- def unregisterZNodeChangeHandler(path: String): Unit = {
- zNodeChangeHandlers.remove(path)
- }
-
- /**
- * Register the handler to ZookeeperClient. This is just a local operation. This does not actually register a watcher.
- *
- * The watcher is only registered once the user calls handle(AsyncRequest) or handle(Seq[AsyncRequest]) with a GetChildrenRequest.
- *
- * @param zNodeChildChangeHandler the handler to register
- */
- def registerZNodeChildChangeHandler(zNodeChildChangeHandler: ZNodeChildChangeHandler): Unit = {
- zNodeChildChangeHandlers.put(zNodeChildChangeHandler.path, zNodeChildChangeHandler)
- }
-
- /**
- * Unregister the handler from ZookeeperClient. This is just a local operation.
- * @param path the path of the handler to unregister
- */
- def unregisterZNodeChildChangeHandler(path: String): Unit = {
- zNodeChildChangeHandlers.remove(path)
- }
-
- def close(): Unit = inWriteLock(initializationLock) {
- info("Closing.")
- zNodeChangeHandlers.clear()
- zNodeChildChangeHandlers.clear()
- zooKeeper.close()
- info("Closed.")
- }
-
- def sessionId: Long = inReadLock(initializationLock) {
- zooKeeper.getSessionId
- }
-
- private def initialize(): Unit = {
- if (!zooKeeper.getState.isAlive) {
- info(s"Initializing a new session to $connectString.")
- var now = System.currentTimeMillis()
- val threshold = now + connectionTimeoutMs
- while (now < threshold) {
- try {
- zooKeeper.close()
- zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZookeeperClientWatcher)
- waitUntilConnected(threshold - now, TimeUnit.MILLISECONDS)
- return
- } catch {
- case _: Exception =>
- now = System.currentTimeMillis()
- if (now < threshold) {
- Thread.sleep(1000)
- now = System.currentTimeMillis()
- }
- }
- }
- info(s"Timed out waiting for connection during session initialization while in state: ${zooKeeper.getState}")
- stateChangeHandler.onReconnectionTimeout()
- }
- }
-
- private object ZookeeperClientWatcher extends Watcher {
- override def process(event: WatchedEvent): Unit = {
- debug("Received event: " + event)
- Option(event.getPath) match {
- case None =>
- inLock(isConnectedOrExpiredLock) {
- isConnectedOrExpiredCondition.signalAll()
- }
- if (event.getState == KeeperState.AuthFailed) {
- info("Auth failed.")
- stateChangeHandler.onAuthFailure()
- } else if (event.getState == KeeperState.Expired) {
- inWriteLock(initializationLock) {
- info("Session expired.")
- stateChangeHandler.beforeInitializingSession()
- initialize()
- stateChangeHandler.afterInitializingSession()
- }
- }
- case Some(path) =>
- (event.getType: @unchecked) match {
- case EventType.NodeChildrenChanged => zNodeChildChangeHandlers.get(path).foreach(_.handleChildChange())
- case EventType.NodeCreated => zNodeChangeHandlers.get(path).foreach(_.handleCreation())
- case EventType.NodeDeleted => zNodeChangeHandlers.get(path).foreach(_.handleDeletion())
- case EventType.NodeDataChanged => zNodeChangeHandlers.get(path).foreach(_.handleDataChange())
- }
- }
- }
- }
-}
-
-trait StateChangeHandler {
- def beforeInitializingSession(): Unit = {}
- def afterInitializingSession(): Unit = {}
- def onAuthFailure(): Unit = {}
- def onReconnectionTimeout(): Unit = {}
-}
-
-trait ZNodeChangeHandler {
- val path: String
- def handleCreation(): Unit = {}
- def handleDeletion(): Unit = {}
- def handleDataChange(): Unit = {}
-}
-
-trait ZNodeChildChangeHandler {
- val path: String
- def handleChildChange(): Unit = {}
-}
-
-sealed trait AsyncRequest {
- /**
- * This type member allows us to define methods that take requests and return responses with the correct types.
- * See ``ZookeeperClient.handleRequests`` for example.
- */
- type Response <: AsyncResponse
- def path: String
- def ctx: Option[Any]
-}
-
-case class CreateRequest(path: String, data: Array[Byte], acl: Seq[ACL], createMode: CreateMode,
- ctx: Option[Any] = None) extends AsyncRequest {
- type Response = CreateResponse
-}
-
-case class DeleteRequest(path: String, version: Int, ctx: Option[Any] = None) extends AsyncRequest {
- type Response = DeleteResponse
-}
-
-case class ExistsRequest(path: String, ctx: Option[Any] = None) extends AsyncRequest {
- type Response = ExistsResponse
-}
-
-case class GetDataRequest(path: String, ctx: Option[Any] = None) extends AsyncRequest {
- type Response = GetDataResponse
-}
-
-case class SetDataRequest(path: String, data: Array[Byte], version: Int, ctx: Option[Any] = None) extends AsyncRequest {
- type Response = SetDataResponse
-}
-
-case class GetAclRequest(path: String, ctx: Option[Any] = None) extends AsyncRequest {
- type Response = GetAclResponse
-}
-
-case class SetAclRequest(path: String, acl: Seq[ACL], version: Int, ctx: Option[Any] = None) extends AsyncRequest {
- type Response = SetAclResponse
-}
-
-case class GetChildrenRequest(path: String, ctx: Option[Any] = None) extends AsyncRequest {
- type Response = GetChildrenResponse
-}
-
-sealed trait AsyncResponse {
- def resultCode: Code
- def path: String
- def ctx: Option[Any]
-
- /** Return None if the result code is OK and KeeperException otherwise. */
- def resultException: Option[KeeperException] =
- if (resultCode == Code.OK) None else Some(KeeperException.create(resultCode, path))
-}
-case class CreateResponse(resultCode: Code, path: String, ctx: Option[Any], name: String) extends AsyncResponse
-case class DeleteResponse(resultCode: Code, path: String, ctx: Option[Any]) extends AsyncResponse
-case class ExistsResponse(resultCode: Code, path: String, ctx: Option[Any], stat: Stat) extends AsyncResponse
-case class GetDataResponse(resultCode: Code, path: String, ctx: Option[Any], data: Array[Byte], stat: Stat) extends AsyncResponse
-case class SetDataResponse(resultCode: Code, path: String, ctx: Option[Any], stat: Stat) extends AsyncResponse
-case class GetAclResponse(resultCode: Code, path: String, ctx: Option[Any], acl: Seq[ACL], stat: Stat) extends AsyncResponse
-case class SetAclResponse(resultCode: Code, path: String, ctx: Option[Any], stat: Stat) extends AsyncResponse
-case class GetChildrenResponse(resultCode: Code, path: String, ctx: Option[Any], children: Seq[String], stat: Stat) extends AsyncResponse
-
-class ZookeeperClientException(message: String) extends RuntimeException(message)
-class ZookeeperClientExpiredException(message: String) extends ZookeeperClientException(message)
-class ZookeeperClientAuthFailedException(message: String) extends ZookeeperClientException(message)
-class ZookeeperClientTimeoutException(message: String) extends ZookeeperClientException(message)
http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index bd7f023..f1e2fc2 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -23,14 +23,14 @@ import java.util.concurrent._
import com.yammer.metrics.core.Gauge
import kafka.common.KafkaException
-import kafka.controller.KafkaControllerZkUtils
import kafka.metrics.KafkaMetricsGroup
import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.server.{BrokerState, RecoveringFromUncleanShutdown, _}
import kafka.utils._
+import kafka.zk.KafkaZkClient
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.utils.Time
-import org.apache.kafka.common.errors.{LogDirNotFoundException, KafkaStorageException}
+import org.apache.kafka.common.errors.{KafkaStorageException, LogDirNotFoundException}
import scala.collection.JavaConverters._
import scala.collection._
@@ -102,7 +102,6 @@ class LogManager(logDirs: Seq[File],
loadLogs()
-
// public, so we can access this from kafka.admin.DeleteTopicTest
val cleaner: LogCleaner =
if(cleanerConfig.enableCleaner)
@@ -888,7 +887,7 @@ object LogManager {
def apply(config: KafkaConfig,
initialOfflineDirs: Seq[String],
- zkUtils: KafkaControllerZkUtils,
+ zkClient: KafkaZkClient,
brokerState: BrokerState,
kafkaScheduler: KafkaScheduler,
time: Time,
@@ -897,7 +896,7 @@ object LogManager {
val defaultProps = KafkaServer.copyKafkaConfigToLog(config)
val defaultLogConfig = LogConfig(defaultProps)
- val (topicConfigs, failed) = zkUtils.getLogConfigs(zkUtils.getAllTopicsInCluster, defaultProps)
+ val (topicConfigs, failed) = zkClient.getLogConfigs(zkClient.getAllTopicsInCluster, defaultProps)
if (!failed.isEmpty) throw failed.head._2
// read the log configurations from zookeeper
http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 101e646..d576206 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -27,7 +27,7 @@ import com.yammer.metrics.core.Gauge
import kafka.api.KAFKA_0_9_0
import kafka.cluster.Broker
import kafka.common.{GenerateBrokerIdException, InconsistentBrokerIdException}
-import kafka.controller.{KafkaController, KafkaControllerZkUtils, StateChangeHandler, ZookeeperClient}
+import kafka.controller.KafkaController
import kafka.coordinator.group.GroupCoordinator
import kafka.coordinator.transaction.TransactionCoordinator
import kafka.log.{LogConfig, LogManager}
@@ -36,6 +36,8 @@ import kafka.network.SocketServer
import kafka.security.CredentialProvider
import kafka.security.auth.Authorizer
import kafka.utils._
+import kafka.zk.KafkaZkClient
+import kafka.zookeeper.{StateChangeHandler, ZooKeeperClient}
import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, NetworkClient, NetworkClientUtils}
import org.apache.kafka.common.internals.ClusterResourceListeners
import org.apache.kafka.common.metrics.{JmxReporter, Metrics, _}
@@ -135,7 +137,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
var quotaManagers: QuotaFactory.QuotaManagers = null
var zkUtils: ZkUtils = null
- var kafkaControllerZkUtils: KafkaControllerZkUtils = null
+ private var zkClient: KafkaZkClient = null
val correlationId: AtomicInteger = new AtomicInteger(0)
val brokerMetaPropsFile = "meta.properties"
val brokerMetadataCheckpoints = config.logDirs.map(logDir => (logDir, new BrokerMetadataCheckpoint(new File(logDir + File.separator + brokerMetaPropsFile)))).toMap
@@ -219,7 +221,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
- val zookeeperClient = new ZookeeperClient(config.zkConnect, config.zkSessionTimeoutMs,
+ val zooKeeperClient = new ZooKeeperClient(config.zkConnect, config.zkSessionTimeoutMs,
config.zkConnectionTimeoutMs, new StateChangeHandler {
override def onReconnectionTimeout(): Unit = {
error("Reconnection timeout.")
@@ -233,10 +235,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
override def beforeInitializingSession(): Unit = kafkaController.expire()
})
- kafkaControllerZkUtils = new KafkaControllerZkUtils(zookeeperClient, zkUtils.isSecure)
+ zkClient = new KafkaZkClient(zooKeeperClient, zkUtils.isSecure)
/* start log manager */
- logManager = LogManager(config, initialOfflineDirs, kafkaControllerZkUtils, brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)
+ logManager = LogManager(config, initialOfflineDirs, zkClient, brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)
logManager.startup()
metadataCache = new MetadataCache(config.brokerId)
@@ -250,7 +252,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
replicaManager.startup()
/* start kafka controller */
- kafkaController = new KafkaController(config, kafkaControllerZkUtils, time, metrics, threadNamePrefix)
+ kafkaController = new KafkaController(config, zkClient, time, metrics, threadNamePrefix)
kafkaController.startup()
adminManager = new AdminManager(config, metrics, metadataCache, zkUtils)
@@ -561,8 +563,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
CoreUtils.swallow(kafkaController.shutdown())
if (zkUtils != null)
CoreUtils.swallow(zkUtils.close())
- if (kafkaControllerZkUtils != null)
- CoreUtils.swallow(kafkaControllerZkUtils.close())
+ if (zkClient != null)
+ CoreUtils.swallow(zkClient.close())
if (metrics != null)
CoreUtils.swallow(metrics.close())
http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/main/scala/kafka/utils/Json.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Json.scala b/core/src/main/scala/kafka/utils/Json.scala
index a916875..ad40c49 100644
--- a/core/src/main/scala/kafka/utils/Json.scala
+++ b/core/src/main/scala/kafka/utils/Json.scala
@@ -16,9 +16,12 @@
*/
package kafka.utils
+import java.nio.charset.StandardCharsets
+
import com.fasterxml.jackson.core.JsonProcessingException
import com.fasterxml.jackson.databind.ObjectMapper
import kafka.utils.json.JsonValue
+
import scala.collection._
/**
@@ -36,6 +39,13 @@ object Json {
catch { case _: JsonProcessingException => None }
/**
+ * Parse a JSON byte array into a JsonValue if possible. `None` is returned if `input` is not valid JSON.
+ */
+ def parseBytes(input: Array[Byte]): Option[JsonValue] =
+ try Option(mapper.readTree(input)).map(JsonValue(_))
+ catch { case _: JsonProcessingException => None }
+
+ /**
* Encode an object into a JSON string. This method accepts any type T where
* T => null | Boolean | String | Number | Map[String, T] | Array[T] | Iterable[T]
* Any other type will result in an exception.
@@ -59,4 +69,13 @@ object Json {
}
}
+ /**
+ * Encode an object into a JSON value in bytes. This method accepts any type T where
+ * T => null | Boolean | String | Number | Map[String, T] | Array[T] | Iterable[T]
+ * Any other type will result in an exception.
+ *
+ * This method does not properly handle non-ascii characters.
+ */
+ def encodeAsBytes(obj: Any): Array[Byte] = encode(obj).getBytes(StandardCharsets.UTF_8)
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index cc38667..60c2adf 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -252,6 +252,9 @@ class ZooKeeperClientMetrics(zkClient: ZkClient, val time: Time)
}
}
+/**
+ * Legacy class for interacting with ZooKeeper. Whenever possible, ``KafkaZkClient`` should be used instead.
+ */
class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
val zkConnection: ZkConnection,
val isSecure: Boolean) extends Logging {
http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
new file mode 100644
index 0000000..0e48d51
--- /dev/null
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -0,0 +1,726 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.zk
+
+import java.util.Properties
+
+import kafka.api.LeaderAndIsr
+import kafka.cluster.Broker
+import kafka.common.TopicAndPartition
+import kafka.controller.LeaderIsrAndControllerEpoch
+import kafka.log.LogConfig
+import kafka.server.ConfigType
+import kafka.utils._
+import kafka.zookeeper._
+import org.apache.zookeeper.KeeperException.Code
+import org.apache.zookeeper.data.Stat
+import org.apache.zookeeper.{CreateMode, KeeperException}
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+/**
+ * Provides higher level Kafka-specific operations on top of the pipelined [[kafka.zookeeper.ZooKeeperClient]].
+ *
+ * This performs better than [[kafka.utils.ZkUtils]] and should replace it completely, eventually.
+ *
+ * Implementation note: this class includes methods for various components (Controller, Configs, Old Consumer, etc.)
+ * and returns instances of classes from the calling packages in some cases. This is not ideal, but it makes it
+ * easier to quickly migrate away from `ZkUtils`. We should revisit this once the migration is completed and tests are
+ * in place. We should also consider whether a monolithic [[kafka.zk.ZkData]] is the way to go.
+ */
+class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends Logging {
+ import KafkaZkClient._
+
+ /**
+ * Gets topic partition states for the given partitions.
+ * @param partitions the partitions for which we want ot get states.
+ * @return sequence of GetDataResponses whose contexts are the partitions they are associated with.
+ */
+ def getTopicPartitionStatesRaw(partitions: Seq[TopicAndPartition]): Seq[GetDataResponse] = {
+ val getDataRequests = partitions.map { partition =>
+ GetDataRequest(TopicPartitionStateZNode.path(partition), ctx = Some(partition))
+ }
+ retryRequestsUntilConnected(getDataRequests)
+ }
+
+ /**
+ * Sets topic partition states for the given partitions.
+ * @param leaderIsrAndControllerEpochs the partition states of each partition whose state we wish to set.
+ * @return sequence of SetDataResponse whose contexts are the partitions they are associated with.
+ */
+ def setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs: Map[TopicAndPartition, LeaderIsrAndControllerEpoch]): Seq[SetDataResponse] = {
+ val setDataRequests = leaderIsrAndControllerEpochs.map { case (partition, leaderIsrAndControllerEpoch) =>
+ val path = TopicPartitionStateZNode.path(partition)
+ val data = TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch)
+ SetDataRequest(path, data, leaderIsrAndControllerEpoch.leaderAndIsr.zkVersion, Some(partition))
+ }
+ retryRequestsUntilConnected(setDataRequests.toSeq)
+ }
+
+ /**
+ * Creates topic partition state znodes for the given partitions.
+ * @param leaderIsrAndControllerEpochs the partition states of each partition whose state we wish to set.
+ * @return sequence of CreateResponse whose contexts are the partitions they are associated with.
+ */
+ def createTopicPartitionStatesRaw(leaderIsrAndControllerEpochs: Map[TopicAndPartition, LeaderIsrAndControllerEpoch]): Seq[CreateResponse] = {
+ createTopicPartitions(leaderIsrAndControllerEpochs.keys.map(_.topic).toSet.toSeq)
+ createTopicPartition(leaderIsrAndControllerEpochs.keys.toSeq)
+ val createRequests = leaderIsrAndControllerEpochs.map { case (partition, leaderIsrAndControllerEpoch) =>
+ val path = TopicPartitionStateZNode.path(partition)
+ val data = TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch)
+ CreateRequest(path, data, acls(path), CreateMode.PERSISTENT, Some(partition))
+ }
+ retryRequestsUntilConnected(createRequests.toSeq)
+ }
+
+ /**
+ * Sets the controller epoch conditioned on the given epochZkVersion.
+ * @param epoch the epoch to set
+ * @param epochZkVersion the expected version number of the epoch znode.
+ * @return SetDataResponse
+ */
+ def setControllerEpochRaw(epoch: Int, epochZkVersion: Int): SetDataResponse = {
+ val setDataRequest = SetDataRequest(ControllerEpochZNode.path, ControllerEpochZNode.encode(epoch), epochZkVersion)
+ retryRequestUntilConnected(setDataRequest)
+ }
+
+ /**
+ * Creates the controller epoch znode.
+ * @param epoch the epoch to set
+ * @return CreateResponse
+ */
+ def createControllerEpochRaw(epoch: Int): CreateResponse = {
+ val createRequest = CreateRequest(ControllerEpochZNode.path, ControllerEpochZNode.encode(epoch),
+ acls(ControllerEpochZNode.path), CreateMode.PERSISTENT)
+ retryRequestUntilConnected(createRequest)
+ }
+
+ /**
+ * Try to update the partition states of multiple partitions in zookeeper.
+ * @param leaderAndIsrs The partition states to update.
+ * @param controllerEpoch The current controller epoch.
+ * @return UpdateLeaderAndIsrResult instance containing per partition results.
+ */
+ def updateLeaderAndIsr(leaderAndIsrs: Map[TopicAndPartition, LeaderAndIsr], controllerEpoch: Int): UpdateLeaderAndIsrResult = {
+ val successfulUpdates = mutable.Map.empty[TopicAndPartition, LeaderAndIsr]
+ val updatesToRetry = mutable.Buffer.empty[TopicAndPartition]
+ val failed = mutable.Map.empty[TopicAndPartition, Exception]
+ val leaderIsrAndControllerEpochs = leaderAndIsrs.map { case (partition, leaderAndIsr) => partition -> LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch) }
+ val setDataResponses = try {
+ setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs)
+ } catch {
+ case e: Exception =>
+ leaderAndIsrs.keys.foreach(partition => failed.put(partition, e))
+ return UpdateLeaderAndIsrResult(successfulUpdates.toMap, updatesToRetry, failed.toMap)
+ }
+ setDataResponses.foreach { setDataResponse =>
+ val partition = setDataResponse.ctx.get.asInstanceOf[TopicAndPartition]
+ if (setDataResponse.resultCode == Code.OK) {
+ val updatedLeaderAndIsr = leaderAndIsrs(partition).withZkVersion(setDataResponse.stat.getVersion)
+ successfulUpdates.put(partition, updatedLeaderAndIsr)
+ } else if (setDataResponse.resultCode == Code.BADVERSION) {
+ updatesToRetry += partition
+ } else {
+ failed.put(partition, setDataResponse.resultException.get)
+ }
+ }
+ UpdateLeaderAndIsrResult(successfulUpdates.toMap, updatesToRetry, failed.toMap)
+ }
+
+ /**
+ * Get log configs that merge local configs with topic-level configs in zookeeper.
+ * @param topics The topics to get log configs for.
+ * @param config The local configs.
+ * @return A tuple of two values:
+ * 1. The successfully gathered log configs
+ * 2. Exceptions corresponding to failed log config lookups.
+ */
+ def getLogConfigs(topics: Seq[String], config: java.util.Map[String, AnyRef]):
+ (Map[String, LogConfig], Map[String, Exception]) = {
+ val logConfigs = mutable.Map.empty[String, LogConfig]
+ val failed = mutable.Map.empty[String, Exception]
+ val configResponses = try {
+ getTopicConfigs(topics)
+ } catch {
+ case e: Exception =>
+ topics.foreach(topic => failed.put(topic, e))
+ return (logConfigs.toMap, failed.toMap)
+ }
+ configResponses.foreach { configResponse =>
+ val topic = configResponse.ctx.get.asInstanceOf[String]
+ if (configResponse.resultCode == Code.OK) {
+ val overrides = ConfigEntityZNode.decode(configResponse.data)
+ val logConfig = LogConfig.fromProps(config, overrides.getOrElse(new Properties))
+ logConfigs.put(topic, logConfig)
+ } else if (configResponse.resultCode == Code.NONODE) {
+ val logConfig = LogConfig.fromProps(config, new Properties)
+ logConfigs.put(topic, logConfig)
+ } else {
+ failed.put(topic, configResponse.resultException.get)
+ }
+ }
+ (logConfigs.toMap, failed.toMap)
+ }
+
+ /**
+ * Gets all brokers in the cluster.
+ * @return sequence of brokers in the cluster.
+ */
+ def getAllBrokersInCluster: Seq[Broker] = {
+ val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(BrokerIdsZNode.path))
+ if (getChildrenResponse.resultCode == Code.OK) {
+ val brokerIds = getChildrenResponse.children.map(_.toInt)
+ val getDataRequests = brokerIds.map(brokerId => GetDataRequest(BrokerIdZNode.path(brokerId), ctx = Some(brokerId)))
+ val getDataResponses = retryRequestsUntilConnected(getDataRequests)
+ getDataResponses.flatMap { getDataResponse =>
+ val brokerId = getDataResponse.ctx.get.asInstanceOf[Int]
+ if (getDataResponse.resultCode == Code.OK) {
+ Option(BrokerIdZNode.decode(brokerId, getDataResponse.data))
+ } else if (getDataResponse.resultCode == Code.NONODE) {
+ None
+ } else {
+ throw getDataResponse.resultException.get
+ }
+ }
+ } else if (getChildrenResponse.resultCode == Code.NONODE) {
+ Seq.empty
+ } else {
+ throw getChildrenResponse.resultException.get
+ }
+ }
+
+ /**
+ * Gets all topics in the cluster.
+ * @return sequence of topics in the cluster.
+ */
+ def getAllTopicsInCluster: Seq[String] = {
+ val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(TopicsZNode.path))
+ if (getChildrenResponse.resultCode == Code.OK) {
+ getChildrenResponse.children
+ } else if (getChildrenResponse.resultCode == Code.NONODE) {
+ Seq.empty
+ } else {
+ throw getChildrenResponse.resultException.get
+ }
+ }
+
+ /**
+ * Sets the topic znode with the given assignment.
+ * @param topic the topic whose assignment is being set.
+ * @param assignment the partition to replica mapping to set for the given topic
+ * @return SetDataResponse
+ */
+ def setTopicAssignmentRaw(topic: String, assignment: Map[TopicAndPartition, Seq[Int]]): SetDataResponse = {
+ val setDataRequest = SetDataRequest(TopicZNode.path(topic), TopicZNode.encode(assignment), -1)
+ retryRequestUntilConnected(setDataRequest)
+ }
+
+ /**
+ * Gets the log dir event notifications as strings. These strings are the znode names and not the absolute znode path.
+ * @return sequence of znode names and not the absolute znode path.
+ */
+ def getAllLogDirEventNotifications: Seq[String] = {
+ val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(LogDirEventNotificationZNode.path))
+ if (getChildrenResponse.resultCode == Code.OK) {
+ getChildrenResponse.children.map(LogDirEventNotificationSequenceZNode.sequenceNumber)
+ } else if (getChildrenResponse.resultCode == Code.NONODE) {
+ Seq.empty
+ } else {
+ throw getChildrenResponse.resultException.get
+ }
+ }
+
+ /**
+ * Reads each of the log dir event notifications associated with the given sequence numbers and extracts the broker ids.
+ * @param sequenceNumbers the sequence numbers associated with the log dir event notifications.
+ * @return broker ids associated with the given log dir event notifications.
+ */
+ def getBrokerIdsFromLogDirEvents(sequenceNumbers: Seq[String]): Seq[Int] = {
+ val getDataRequests = sequenceNumbers.map { sequenceNumber =>
+ GetDataRequest(LogDirEventNotificationSequenceZNode.path(sequenceNumber))
+ }
+ val getDataResponses = retryRequestsUntilConnected(getDataRequests)
+ getDataResponses.flatMap { getDataResponse =>
+ if (getDataResponse.resultCode == Code.OK) {
+ LogDirEventNotificationSequenceZNode.decode(getDataResponse.data)
+ } else if (getDataResponse.resultCode == Code.NONODE) {
+ None
+ } else {
+ throw getDataResponse.resultException.get
+ }
+ }
+ }
+
+ /**
+ * Deletes all log dir event notifications.
+ */
+ def deleteLogDirEventNotifications(): Unit = {
+ val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(LogDirEventNotificationZNode.path))
+ if (getChildrenResponse.resultCode == Code.OK) {
+ deleteLogDirEventNotifications(getChildrenResponse.children)
+ } else if (getChildrenResponse.resultCode != Code.NONODE) {
+ throw getChildrenResponse.resultException.get
+ }
+ }
+
+ /**
+ * Deletes the log dir event notifications associated with the given sequence numbers.
+ * @param sequenceNumbers the sequence numbers associated with the log dir event notifications to be deleted.
+ */
+ def deleteLogDirEventNotifications(sequenceNumbers: Seq[String]): Unit = {
+ val deleteRequests = sequenceNumbers.map { sequenceNumber =>
+ DeleteRequest(LogDirEventNotificationSequenceZNode.path(sequenceNumber), -1)
+ }
+ retryRequestsUntilConnected(deleteRequests)
+ }
+
+ /**
+ * Gets the assignments for the given topics.
+ * @param topics the topics whose partitions we wish to get the assignments for.
+ * @return the replica assignment for each partition from the given topics.
+ */
+ def getReplicaAssignmentForTopics(topics: Set[String]): Map[TopicAndPartition, Seq[Int]] = {
+ val getDataRequests = topics.map(topic => GetDataRequest(TopicZNode.path(topic), ctx = Some(topic)))
+ val getDataResponses = retryRequestsUntilConnected(getDataRequests.toSeq)
+ getDataResponses.flatMap { getDataResponse =>
+ val topic = getDataResponse.ctx.get.asInstanceOf[String]
+ if (getDataResponse.resultCode == Code.OK) {
+ TopicZNode.decode(topic, getDataResponse.data)
+ } else if (getDataResponse.resultCode == Code.NONODE) {
+ Map.empty[TopicAndPartition, Seq[Int]]
+ } else {
+ throw getDataResponse.resultException.get
+ }
+ }.toMap
+ }
+
+ /**
+ * Get all topics marked for deletion.
+ * @return sequence of topics marked for deletion.
+ */
+ def getTopicDeletions: Seq[String] = {
+ val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(DeleteTopicsZNode.path))
+ if (getChildrenResponse.resultCode == Code.OK) {
+ getChildrenResponse.children
+ } else if (getChildrenResponse.resultCode == Code.NONODE) {
+ Seq.empty
+ } else {
+ throw getChildrenResponse.resultException.get
+ }
+ }
+
+ /**
+ * Remove the given topics from the topics marked for deletion.
+ * @param topics the topics to remove.
+ */
+ def deleteTopicDeletions(topics: Seq[String]): Unit = {
+ val deleteRequests = topics.map(topic => DeleteRequest(DeleteTopicsTopicZNode.path(topic), -1))
+ retryRequestsUntilConnected(deleteRequests)
+ }
+
+ /**
+ * Returns all reassignments.
+ * @return the reassignments for each partition.
+ */
+ def getPartitionReassignment: Map[TopicAndPartition, Seq[Int]] = {
+ val getDataRequest = GetDataRequest(ReassignPartitionsZNode.path)
+ val getDataResponse = retryRequestUntilConnected(getDataRequest)
+ if (getDataResponse.resultCode == Code.OK) {
+ ReassignPartitionsZNode.decode(getDataResponse.data)
+ } else if (getDataResponse.resultCode == Code.NONODE) {
+ Map.empty[TopicAndPartition, Seq[Int]]
+ } else {
+ throw getDataResponse.resultException.get
+ }
+ }
+
+ /**
+ * Sets the partition reassignment znode with the given reassignment.
+ * @param reassignment the reassignment to set on the reassignment znode.
+ * @return SetDataResponse
+ */
+ def setPartitionReassignmentRaw(reassignment: Map[TopicAndPartition, Seq[Int]]): SetDataResponse = {
+ val setDataRequest = SetDataRequest(ReassignPartitionsZNode.path, ReassignPartitionsZNode.encode(reassignment), -1)
+ retryRequestUntilConnected(setDataRequest)
+ }
+
+ /**
+ * Creates the partition reassignment znode with the given reassignment.
+ * @param reassignment the reassignment to set on the reassignment znode.
+ * @return CreateResponse
+ */
+ def createPartitionReassignment(reassignment: Map[TopicAndPartition, Seq[Int]]): CreateResponse = {
+ val createRequest = CreateRequest(ReassignPartitionsZNode.path, ReassignPartitionsZNode.encode(reassignment),
+ acls(ReassignPartitionsZNode.path), CreateMode.PERSISTENT)
+ retryRequestUntilConnected(createRequest)
+ }
+
+ /**
+ * Deletes the partition reassignment znode.
+ */
+ def deletePartitionReassignment(): Unit = {
+ val deleteRequest = DeleteRequest(ReassignPartitionsZNode.path, -1)
+ retryRequestUntilConnected(deleteRequest)
+ }
+
+ /**
+ * Gets topic partition states for the given partitions.
+ * @param partitions the partitions for which we want ot get states.
+ * @return map containing LeaderIsrAndControllerEpoch of each partition for we were able to lookup the partition state.
+ */
+ def getTopicPartitionStates(partitions: Seq[TopicAndPartition]): Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = {
+ val getDataResponses = getTopicPartitionStatesRaw(partitions)
+ getDataResponses.flatMap { getDataResponse =>
+ val partition = getDataResponse.ctx.get.asInstanceOf[TopicAndPartition]
+ if (getDataResponse.resultCode == Code.OK) {
+ TopicPartitionStateZNode.decode(getDataResponse.data, getDataResponse.stat).map(partition -> _)
+ } else if (getDataResponse.resultCode == Code.NONODE) {
+ None
+ } else {
+ throw getDataResponse.resultException.get
+ }
+ }.toMap
+ }
+
+ /**
+ * Gets the isr change notifications as strings. These strings are the znode names and not the absolute znode path.
+ * @return sequence of znode names and not the absolute znode path.
+ */
+ def getAllIsrChangeNotifications: Seq[String] = {
+ val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(IsrChangeNotificationZNode.path))
+ if (getChildrenResponse.resultCode == Code.OK) {
+ getChildrenResponse.children.map(IsrChangeNotificationSequenceZNode.sequenceNumber)
+ } else if (getChildrenResponse.resultCode == Code.NONODE) {
+ Seq.empty
+ } else {
+ throw getChildrenResponse.resultException.get
+ }
+ }
+
+ /**
+ * Reads each of the isr change notifications associated with the given sequence numbers and extracts the partitions.
+ * @param sequenceNumbers the sequence numbers associated with the isr change notifications.
+ * @return partitions associated with the given isr change notifications.
+ */
+ def getPartitionsFromIsrChangeNotifications(sequenceNumbers: Seq[String]): Seq[TopicAndPartition] = {
+ val getDataRequests = sequenceNumbers.map { sequenceNumber =>
+ GetDataRequest(IsrChangeNotificationSequenceZNode.path(sequenceNumber))
+ }
+ val getDataResponses = retryRequestsUntilConnected(getDataRequests)
+ getDataResponses.flatMap { getDataResponse =>
+ if (getDataResponse.resultCode == Code.OK) {
+ IsrChangeNotificationSequenceZNode.decode(getDataResponse.data)
+ } else if (getDataResponse.resultCode == Code.NONODE) {
+ None
+ } else {
+ throw getDataResponse.resultException.get
+ }
+ }
+ }
+
+ /**
+ * Deletes all isr change notifications.
+ */
+ def deleteIsrChangeNotifications(): Unit = {
+ val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(IsrChangeNotificationZNode.path))
+ if (getChildrenResponse.resultCode == Code.OK) {
+ deleteIsrChangeNotifications(getChildrenResponse.children)
+ } else if (getChildrenResponse.resultCode != Code.NONODE) {
+ throw getChildrenResponse.resultException.get
+ }
+ }
+
+ /**
+ * Deletes the isr change notifications associated with the given sequence numbers.
+ * @param sequenceNumbers the sequence numbers associated with the isr change notifications to be deleted.
+ */
+ def deleteIsrChangeNotifications(sequenceNumbers: Seq[String]): Unit = {
+ val deleteRequests = sequenceNumbers.map { sequenceNumber =>
+ DeleteRequest(IsrChangeNotificationSequenceZNode.path(sequenceNumber), -1)
+ }
+ retryRequestsUntilConnected(deleteRequests)
+ }
+
+ /**
+ * Gets the partitions marked for preferred replica election.
+ * @return sequence of partitions.
+ */
+ def getPreferredReplicaElection: Set[TopicAndPartition] = {
+ val getDataRequest = GetDataRequest(PreferredReplicaElectionZNode.path)
+ val getDataResponse = retryRequestUntilConnected(getDataRequest)
+ if (getDataResponse.resultCode == Code.OK) {
+ PreferredReplicaElectionZNode.decode(getDataResponse.data)
+ } else if (getDataResponse.resultCode == Code.NONODE) {
+ Set.empty[TopicAndPartition]
+ } else {
+ throw getDataResponse.resultException.get
+ }
+ }
+
+ /**
+ * Deletes the preferred replica election znode.
+ */
+ def deletePreferredReplicaElection(): Unit = {
+ val deleteRequest = DeleteRequest(PreferredReplicaElectionZNode.path, -1)
+ retryRequestUntilConnected(deleteRequest)
+ }
+
+ /**
+ * Gets the controller id.
+ * @return optional integer that is Some if the controller znode exists and can be parsed and None otherwise.
+ */
+ def getControllerId: Option[Int] = {
+ val getDataRequest = GetDataRequest(ControllerZNode.path)
+ val getDataResponse = retryRequestUntilConnected(getDataRequest)
+ if (getDataResponse.resultCode == Code.OK) {
+ ControllerZNode.decode(getDataResponse.data)
+ } else if (getDataResponse.resultCode == Code.NONODE) {
+ None
+ } else {
+ throw getDataResponse.resultException.get
+ }
+ }
+
+ /**
+ * Deletes the controller znode.
+ */
+ def deleteController(): Unit = {
+ val deleteRequest = DeleteRequest(ControllerZNode.path, -1)
+ retryRequestUntilConnected(deleteRequest)
+ }
+
+ /**
+ * Gets the controller epoch.
+ * @return optional (Int, Stat) that is Some if the controller epoch path exists and None otherwise.
+ */
+ def getControllerEpoch: Option[(Int, Stat)] = {
+ val getDataRequest = GetDataRequest(ControllerEpochZNode.path)
+ val getDataResponse = retryRequestUntilConnected(getDataRequest)
+ if (getDataResponse.resultCode == Code.OK) {
+ val epoch = ControllerEpochZNode.decode(getDataResponse.data)
+ Option(epoch, getDataResponse.stat)
+ } else if (getDataResponse.resultCode == Code.NONODE) {
+ None
+ } else {
+ throw getDataResponse.resultException.get
+ }
+ }
+
+ /**
+ * Recursively deletes the topic znode.
+ * @param topic the topic whose topic znode we wish to delete.
+ */
+ def deleteTopicZNode(topic: String): Unit = {
+ deleteRecursive(TopicZNode.path(topic))
+ }
+
+ /**
+ * Deletes the topic configs for the given topics.
+ * @param topics the topics whose configs we wish to delete.
+ */
+ def deleteTopicConfigs(topics: Seq[String]): Unit = {
+ val deleteRequests = topics.map(topic => DeleteRequest(ConfigEntityZNode.path(ConfigType.Topic, topic), -1))
+ retryRequestsUntilConnected(deleteRequests)
+ }
+
+ /**
+ * This registers a ZNodeChangeHandler and attempts to register a watcher with an ExistsRequest, which allows data watcher
+ * registrations on paths which might not even exist.
+ *
+ * @param zNodeChangeHandler
+ */
+ def registerZNodeChangeHandlerAndCheckExistence(zNodeChangeHandler: ZNodeChangeHandler): Unit = {
+ zooKeeperClient.registerZNodeChangeHandler(zNodeChangeHandler)
+ val existsResponse = retryRequestUntilConnected(ExistsRequest(zNodeChangeHandler.path))
+ if (existsResponse.resultCode != Code.OK && existsResponse.resultCode != Code.NONODE) {
+ throw existsResponse.resultException.get
+ }
+ }
+
+ /**
+ * See ZooKeeperClient.registerZNodeChangeHandler
+ * @param zNodeChangeHandler
+ */
+ def registerZNodeChangeHandler(zNodeChangeHandler: ZNodeChangeHandler): Unit = {
+ zooKeeperClient.registerZNodeChangeHandler(zNodeChangeHandler)
+ }
+
+ /**
+ * See ZooKeeperClient.unregisterZNodeChangeHandler
+ * @param path
+ */
+ def unregisterZNodeChangeHandler(path: String): Unit = {
+ zooKeeperClient.unregisterZNodeChangeHandler(path)
+ }
+
+ /**
+ * See ZooKeeperClient.registerZNodeChildChangeHandler
+ * @param zNodeChildChangeHandler
+ */
+ def registerZNodeChildChangeHandler(zNodeChildChangeHandler: ZNodeChildChangeHandler): Unit = {
+ zooKeeperClient.registerZNodeChildChangeHandler(zNodeChildChangeHandler)
+ }
+
+ /**
+ * See ZooKeeperClient.unregisterZNodeChildChangeHandler
+ * @param path
+ */
+ def unregisterZNodeChildChangeHandler(path: String): Unit = {
+ zooKeeperClient.unregisterZNodeChildChangeHandler(path)
+ }
+
+ /**
+ * Close the underlying ZooKeeperClient.
+ */
+ def close(): Unit = {
+ zooKeeperClient.close()
+ }
+
+ private def deleteRecursive(path: String): Unit = {
+ val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(path))
+ if (getChildrenResponse.resultCode == Code.OK) {
+ getChildrenResponse.children.foreach(child => deleteRecursive(s"$path/$child"))
+ val deleteResponse = retryRequestUntilConnected(DeleteRequest(path, -1))
+ if (deleteResponse.resultCode != Code.OK && deleteResponse.resultCode != Code.NONODE) {
+ throw deleteResponse.resultException.get
+ }
+ } else if (getChildrenResponse.resultCode != Code.NONODE) {
+ throw getChildrenResponse.resultException.get
+ }
+ }
+ private def createTopicPartition(partitions: Seq[TopicAndPartition]) = {
+ val createRequests = partitions.map { partition =>
+ val path = TopicPartitionZNode.path(partition)
+ CreateRequest(path, null, acls(path), CreateMode.PERSISTENT, Some(partition))
+ }
+ retryRequestsUntilConnected(createRequests)
+ }
+
+ private def createTopicPartitions(topics: Seq[String]) = {
+ val createRequests = topics.map { topic =>
+ val path = TopicPartitionsZNode.path(topic)
+ CreateRequest(path, null, acls(path), CreateMode.PERSISTENT, Some(topic))
+ }
+ retryRequestsUntilConnected(createRequests)
+ }
+
+ private def getTopicConfigs(topics: Seq[String]) = {
+ val getDataRequests = topics.map { topic =>
+ GetDataRequest(ConfigEntityZNode.path(ConfigType.Topic, topic), ctx = Some(topic))
+ }
+ retryRequestsUntilConnected(getDataRequests)
+ }
+
+ private def acls(path: String) = {
+ import scala.collection.JavaConverters._
+ ZkUtils.defaultAcls(isSecure, path).asScala
+ }
+
+ private def retryRequestUntilConnected[Req <: AsyncRequest](request: Req): Req#Response = {
+ retryRequestsUntilConnected(Seq(request)).head
+ }
+
+ private def retryRequestsUntilConnected[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = {
+ val remainingRequests = ArrayBuffer(requests: _*)
+ val responses = new ArrayBuffer[Req#Response]
+ while (remainingRequests.nonEmpty) {
+ val batchResponses = zooKeeperClient.handleRequests(remainingRequests)
+
+ // Only execute slow path if we find a response with CONNECTIONLOSS
+ if (batchResponses.exists(_.resultCode == Code.CONNECTIONLOSS)) {
+ val requestResponsePairs = remainingRequests.zip(batchResponses)
+
+ remainingRequests.clear()
+ requestResponsePairs.foreach { case (request, response) =>
+ if (response.resultCode == Code.CONNECTIONLOSS)
+ remainingRequests += request
+ else
+ responses += response
+ }
+
+ if (remainingRequests.nonEmpty)
+ zooKeeperClient.waitUntilConnected()
+ } else {
+ remainingRequests.clear()
+ responses ++= batchResponses
+ }
+ }
+ responses
+ }
+
+ def checkedEphemeralCreate(path: String, data: Array[Byte]): Unit = {
+ val checkedEphemeral = new CheckedEphemeral(path, data)
+ info(s"Creating $path (is it secure? $isSecure)")
+ val code = checkedEphemeral.create()
+ info(s"Result of znode creation at $path is: $code")
+ code match {
+ case Code.OK =>
+ case _ => throw KeeperException.create(code)
+ }
+ }
+
+ private class CheckedEphemeral(path: String, data: Array[Byte]) extends Logging {
+ def create(): Code = {
+ val createRequest = CreateRequest(path, data, acls(path), CreateMode.EPHEMERAL)
+ val createResponse = retryRequestUntilConnected(createRequest)
+ val code = createResponse.resultCode
+ if (code == Code.OK) {
+ code
+ } else if (code == Code.NODEEXISTS) {
+ get()
+ } else {
+ error(s"Error while creating ephemeral at $path with return code: $code")
+ code
+ }
+ }
+
+ private def get(): Code = {
+ val getDataRequest = GetDataRequest(path)
+ val getDataResponse = retryRequestUntilConnected(getDataRequest)
+ val code = getDataResponse.resultCode
+ if (code == Code.OK) {
+ if (getDataResponse.stat.getEphemeralOwner != zooKeeperClient.sessionId) {
+ error(s"Error while creating ephemeral at $path with return code: $code")
+ Code.NODEEXISTS
+ } else {
+ code
+ }
+ } else if (code == Code.NONODE) {
+ info(s"The ephemeral node at $path went away while reading it")
+ create()
+ } else {
+ error(s"Error while creating ephemeral at $path with return code: $code")
+ code
+ }
+ }
+ }
+}
+
+object KafkaZkClient {
+
+ /**
+ * @param successfulPartitions The successfully updated partition states with adjusted znode versions.
+ * @param partitionsToRetry The partitions that we should retry due to a zookeeper BADVERSION conflict. Version conflicts
+ * can occur if the partition leader updated partition state while the controller attempted to
+ * update partition state.
+ * @param failedPartitions Exceptions corresponding to failed partition state updates.
+ */
+ case class UpdateLeaderAndIsrResult(successfulPartitions: Map[TopicAndPartition, LeaderAndIsr],
+ partitionsToRetry: Seq[TopicAndPartition],
+ failedPartitions: Map[TopicAndPartition, Exception])
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/main/scala/kafka/zk/ZkData.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala
new file mode 100644
index 0000000..292523c
--- /dev/null
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.zk
+
+import java.nio.charset.StandardCharsets.UTF_8
+import java.util.Properties
+
+import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1, LeaderAndIsr}
+import kafka.cluster.{Broker, EndPoint}
+import kafka.common.TopicAndPartition
+import kafka.controller.{IsrChangeNotificationListener, LeaderIsrAndControllerEpoch}
+import kafka.utils.Json
+import org.apache.zookeeper.data.Stat
+
+import scala.collection.Seq
+
+// This file contains objects for encoding/decoding data stored in ZooKeeper nodes (znodes).
+
+object ControllerZNode {
+ def path = "/controller"
+ def encode(brokerId: Int, timestamp: Long): Array[Byte] =
+ Json.encodeAsBytes(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp.toString))
+ def decode(bytes: Array[Byte]): Option[Int] = Json.parseBytes(bytes).map { js =>
+ js.asJsonObject("brokerid").to[Int]
+ }
+}
+
+object ControllerEpochZNode {
+ def path = "/controller_epoch"
+ def encode(epoch: Int): Array[Byte] = epoch.toString.getBytes(UTF_8)
+ def decode(bytes: Array[Byte]): Int = new String(bytes, UTF_8).toInt
+}
+
+object ConfigZNode {
+ def path = "/config"
+}
+
+object BrokersZNode {
+ def path = "/brokers"
+}
+
+object BrokerIdsZNode {
+ def path = s"${BrokersZNode.path}/ids"
+ def encode: Array[Byte] = null
+}
+
+object BrokerIdZNode {
+ def path(id: Int) = s"${BrokerIdsZNode.path}/$id"
+ def encode(id: Int,
+ host: String,
+ port: Int,
+ advertisedEndpoints: Seq[EndPoint],
+ jmxPort: Int,
+ rack: Option[String],
+ apiVersion: ApiVersion): Array[Byte] = {
+ val version = if (apiVersion >= KAFKA_0_10_0_IV1) 4 else 2
+ Broker.toJson(version, id, host, port, advertisedEndpoints, jmxPort, rack).getBytes(UTF_8)
+ }
+
+ def decode(id: Int, bytes: Array[Byte]): Broker = {
+ Broker.createBroker(id, new String(bytes, UTF_8))
+ }
+}
+
+object TopicsZNode {
+ def path = s"${BrokersZNode.path}/topics"
+}
+
+object TopicZNode {
+ def path(topic: String) = s"${TopicsZNode.path}/$topic"
+ def encode(assignment: Map[TopicAndPartition, Seq[Int]]): Array[Byte] = {
+ val assignmentJson = assignment.map { case (partition, replicas) => partition.partition.toString -> replicas }
+ Json.encodeAsBytes(Map("version" -> 1, "partitions" -> assignmentJson))
+ }
+ def decode(topic: String, bytes: Array[Byte]): Map[TopicAndPartition, Seq[Int]] = {
+ Json.parseBytes(bytes).flatMap { js =>
+ val assignmentJson = js.asJsonObject
+ val partitionsJsonOpt = assignmentJson.get("partitions").map(_.asJsonObject)
+ partitionsJsonOpt.map { partitionsJson =>
+ partitionsJson.iterator.map { case (partition, replicas) =>
+ TopicAndPartition(topic, partition.toInt) -> replicas.to[Seq[Int]]
+ }
+ }
+ }.map(_.toMap).getOrElse(Map.empty)
+ }
+}
+
+object TopicPartitionsZNode {
+ def path(topic: String) = s"${TopicZNode.path(topic)}/partitions"
+}
+
+object TopicPartitionZNode {
+ def path(partition: TopicAndPartition) = s"${TopicPartitionsZNode.path(partition.topic)}/${partition.partition}"
+}
+
+object TopicPartitionStateZNode {
+ def path(partition: TopicAndPartition) = s"${TopicPartitionZNode.path(partition)}/state"
+ def encode(leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch): Array[Byte] = {
+ val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
+ val controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
+ Json.encodeAsBytes(Map("version" -> 1, "leader" -> leaderAndIsr.leader, "leader_epoch" -> leaderAndIsr.leaderEpoch,
+ "controller_epoch" -> controllerEpoch, "isr" -> leaderAndIsr.isr))
+ }
+ def decode(bytes: Array[Byte], stat: Stat): Option[LeaderIsrAndControllerEpoch] = {
+ Json.parseBytes(bytes).map { js =>
+ val leaderIsrAndEpochInfo = js.asJsonObject
+ val leader = leaderIsrAndEpochInfo("leader").to[Int]
+ val epoch = leaderIsrAndEpochInfo("leader_epoch").to[Int]
+ val isr = leaderIsrAndEpochInfo("isr").to[List[Int]]
+ val controllerEpoch = leaderIsrAndEpochInfo("controller_epoch").to[Int]
+ val zkPathVersion = stat.getVersion
+ LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr, zkPathVersion), controllerEpoch)
+ }
+ }
+}
+
+object ConfigEntityTypeZNode {
+ def path(entityType: String) = s"${ConfigZNode.path}/$entityType"
+}
+
+object ConfigEntityZNode {
+ def path(entityType: String, entityName: String) = s"${ConfigEntityTypeZNode.path(entityType)}/$entityName"
+ def encode(config: Properties): Array[Byte] = {
+ import scala.collection.JavaConverters._
+ Json.encodeAsBytes(Map("version" -> 1, "config" -> config.asScala))
+ }
+ def decode(bytes: Array[Byte]): Option[Properties] = {
+ Json.parseBytes(bytes).map { js =>
+ val configOpt = js.asJsonObjectOption.flatMap(_.get("config").flatMap(_.asJsonObjectOption))
+ val props = new Properties()
+ configOpt.foreach(config => config.iterator.foreach { case (k, v) => props.setProperty(k, v.to[String]) })
+ props
+ }
+ }
+}
+
+object IsrChangeNotificationZNode {
+ def path = "/isr_change_notification"
+}
+
+object IsrChangeNotificationSequenceZNode {
+ val SequenceNumberPrefix = "isr_change_"
+ def path(sequenceNumber: String) = s"${IsrChangeNotificationZNode.path}/$SequenceNumberPrefix$sequenceNumber"
+ def encode(partitions: Set[TopicAndPartition]): Array[Byte] = {
+ val partitionsJson = partitions.map(partition => Map("topic" -> partition.topic, "partition" -> partition.partition))
+ Json.encodeAsBytes(Map("version" -> IsrChangeNotificationListener.version, "partitions" -> partitionsJson))
+ }
+
+ def decode(bytes: Array[Byte]): Set[TopicAndPartition] = {
+ Json.parseBytes(bytes).map { js =>
+ val partitionsJson = js.asJsonObject("partitions").asJsonArray
+ partitionsJson.iterator.map { partitionsJson =>
+ val partitionJson = partitionsJson.asJsonObject
+ val topic = partitionJson("topic").to[String]
+ val partition = partitionJson("partition").to[Int]
+ TopicAndPartition(topic, partition)
+ }
+ }
+ }.map(_.toSet).getOrElse(Set.empty)
+ def sequenceNumber(path: String) = path.substring(path.lastIndexOf(SequenceNumberPrefix) + SequenceNumberPrefix.length)
+}
+
+object LogDirEventNotificationZNode {
+ def path = "/log_dir_event_notification"
+}
+
+object LogDirEventNotificationSequenceZNode {
+ val SequenceNumberPrefix = "log_dir_event_"
+ val LogDirFailureEvent = 1
+ def path(sequenceNumber: String) = s"${LogDirEventNotificationZNode.path}/$SequenceNumberPrefix$sequenceNumber"
+ def encode(brokerId: Int) =
+ Json.encodeAsBytes(Map("version" -> 1, "broker" -> brokerId, "event" -> LogDirFailureEvent))
+ def decode(bytes: Array[Byte]): Option[Int] = Json.parseBytes(bytes).map { js =>
+ js.asJsonObject("broker").to[Int]
+ }
+ def sequenceNumber(path: String) = path.substring(path.lastIndexOf(SequenceNumberPrefix) + SequenceNumberPrefix.length)
+}
+
+object AdminZNode {
+ def path = "/admin"
+}
+
+object DeleteTopicsZNode {
+ def path = s"${AdminZNode.path}/delete_topics"
+}
+
+object DeleteTopicsTopicZNode {
+ def path(topic: String) = s"${DeleteTopicsZNode.path}/$topic"
+}
+
+object ReassignPartitionsZNode {
+ def path = s"${AdminZNode.path}/reassign_partitions"
+ def encode(reassignment: Map[TopicAndPartition, Seq[Int]]): Array[Byte] = {
+ val reassignmentJson = reassignment.map { case (TopicAndPartition(topic, partition), replicas) =>
+ Map("topic" -> topic, "partition" -> partition, "replicas" -> replicas)
+ }
+ Json.encodeAsBytes(Map("version" -> 1, "partitions" -> reassignmentJson))
+ }
+ def decode(bytes: Array[Byte]): Map[TopicAndPartition, Seq[Int]] = Json.parseBytes(bytes).flatMap { js =>
+ val reassignmentJson = js.asJsonObject
+ val partitionsJsonOpt = reassignmentJson.get("partitions")
+ partitionsJsonOpt.map { partitionsJson =>
+ partitionsJson.asJsonArray.iterator.map { partitionFieldsJs =>
+ val partitionFields = partitionFieldsJs.asJsonObject
+ val topic = partitionFields("topic").to[String]
+ val partition = partitionFields("partition").to[Int]
+ val replicas = partitionFields("replicas").to[Seq[Int]]
+ TopicAndPartition(topic, partition) -> replicas
+ }
+ }
+ }.map(_.toMap).getOrElse(Map.empty)
+}
+
+object PreferredReplicaElectionZNode {
+ def path = s"${AdminZNode.path}/preferred_replica_election"
+ def encode(partitions: Set[TopicAndPartition]): Array[Byte] = {
+ val jsonMap = Map("version" -> 1,
+ "partitions" -> partitions.map(tp => Map("topic" -> tp.topic, "partition" -> tp.partition)))
+ Json.encodeAsBytes(jsonMap)
+ }
+ def decode(bytes: Array[Byte]): Set[TopicAndPartition] = Json.parseBytes(bytes).map { js =>
+ val partitionsJson = js.asJsonObject("partitions").asJsonArray
+ partitionsJson.iterator.map { partitionsJson =>
+ val partitionJson = partitionsJson.asJsonObject
+ val topic = partitionJson("topic").to[String]
+ val partition = partitionJson("partition").to[Int]
+ TopicAndPartition(topic, partition)
+ }
+ }.map(_.toSet).getOrElse(Set.empty)
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
new file mode 100644
index 0000000..0ff34c0
--- /dev/null
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.zookeeper
+
+import java.util.concurrent.locks.{ReentrantLock, ReentrantReadWriteLock}
+import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap, CountDownLatch, TimeUnit}
+
+import kafka.utils.CoreUtils.{inLock, inReadLock, inWriteLock}
+import kafka.utils.Logging
+import org.apache.zookeeper.AsyncCallback.{ACLCallback, Children2Callback, DataCallback, StatCallback, StringCallback, VoidCallback}
+import org.apache.zookeeper.KeeperException.Code
+import org.apache.zookeeper.Watcher.Event.{EventType, KeeperState}
+import org.apache.zookeeper.ZooKeeper.States
+import org.apache.zookeeper.data.{ACL, Stat}
+import org.apache.zookeeper.{CreateMode, KeeperException, WatchedEvent, Watcher, ZooKeeper}
+
+import scala.collection.JavaConverters._
+
+/**
+ * A ZooKeeper client that encourages pipelined requests.
+ *
+ * @param connectString comma separated host:port pairs, each corresponding to a zk server
+ * @param sessionTimeoutMs session timeout in milliseconds
+ * @param connectionTimeoutMs connection timeout in milliseconds
+ * @param stateChangeHandler state change handler callbacks called by the underlying zookeeper client's EventThread.
+ */
+class ZooKeeperClient(connectString: String, sessionTimeoutMs: Int, connectionTimeoutMs: Int,
+ stateChangeHandler: StateChangeHandler) extends Logging {
+ this.logIdent = "[ZooKeeperClient] "
+ private val initializationLock = new ReentrantReadWriteLock()
+ private val isConnectedOrExpiredLock = new ReentrantLock()
+ private val isConnectedOrExpiredCondition = isConnectedOrExpiredLock.newCondition()
+ private val zNodeChangeHandlers = new ConcurrentHashMap[String, ZNodeChangeHandler]().asScala
+ private val zNodeChildChangeHandlers = new ConcurrentHashMap[String, ZNodeChildChangeHandler]().asScala
+
+ info(s"Initializing a new session to $connectString.")
+ @volatile private var zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher)
+ waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS)
+
+ /**
+ * Send a request and wait for its response. See handle(Seq[AsyncRequest]) for details.
+ *
+ * @param request a single request to send and wait on.
+ * @return an instance of the response with the specific type (e.g. CreateRequest -> CreateResponse).
+ */
+ def handleRequest[Req <: AsyncRequest](request: Req): Req#Response = {
+ handleRequests(Seq(request)).head
+ }
+
+ /**
+ * Send a pipelined sequence of requests and wait for all of their responses.
+ *
+ * The watch flag on each outgoing request will be set if we've already registered a handler for the
+ * path associated with the request.
+ *
+ * @param requests a sequence of requests to send and wait on.
+ * @return the responses for the requests. If all requests have the same type, the responses will have the respective
+ * response type (e.g. Seq[CreateRequest] -> Seq[CreateResponse]). Otherwise, the most specific common supertype
+ * will be used (e.g. Seq[AsyncRequest] -> Seq[AsyncResponse]).
+ */
+ def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = inReadLock(initializationLock) {
+ if (requests.isEmpty)
+ Seq.empty
+ else {
+ val countDownLatch = new CountDownLatch(requests.size)
+ val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size)
+
+ requests.foreach { request =>
+ send(request) { response =>
+ responseQueue.add(response)
+ countDownLatch.countDown()
+ }
+ }
+ countDownLatch.await()
+ responseQueue.asScala.toBuffer
+ }
+ }
+
+ private def send[Req <: AsyncRequest](request: Req)(processResponse: Req#Response => Unit): Unit = {
+ // Safe to cast as we always create a response of the right type
+ def callback(response: AsyncResponse): Unit = processResponse(response.asInstanceOf[Req#Response])
+
+ request match {
+ case ExistsRequest(path, ctx) =>
+ zooKeeper.exists(path, shouldWatch(request), new StatCallback {
+ override def processResult(rc: Int, path: String, ctx: Any, stat: Stat): Unit =
+ callback(ExistsResponse(Code.get(rc), path, Option(ctx), stat))
+ }, ctx.orNull)
+ case GetDataRequest(path, ctx) =>
+ zooKeeper.getData(path, shouldWatch(request), new DataCallback {
+ override def processResult(rc: Int, path: String, ctx: Any, data: Array[Byte], stat: Stat): Unit =
+ callback(GetDataResponse(Code.get(rc), path, Option(ctx), data, stat))
+ }, ctx.orNull)
+ case GetChildrenRequest(path, ctx) =>
+ zooKeeper.getChildren(path, shouldWatch(request), new Children2Callback {
+ override def processResult(rc: Int, path: String, ctx: Any, children: java.util.List[String], stat: Stat): Unit =
+ callback(GetChildrenResponse(Code.get(rc), path, Option(ctx),
+ Option(children).map(_.asScala).getOrElse(Seq.empty), stat))
+ }, ctx.orNull)
+ case CreateRequest(path, data, acl, createMode, ctx) =>
+ zooKeeper.create(path, data, acl.asJava, createMode, new StringCallback {
+ override def processResult(rc: Int, path: String, ctx: Any, name: String): Unit =
+ callback(CreateResponse(Code.get(rc), path, Option(ctx), name))
+ }, ctx.orNull)
+ case SetDataRequest(path, data, version, ctx) =>
+ zooKeeper.setData(path, data, version, new StatCallback {
+ override def processResult(rc: Int, path: String, ctx: Any, stat: Stat): Unit =
+ callback(SetDataResponse(Code.get(rc), path, Option(ctx), stat))
+ }, ctx.orNull)
+ case DeleteRequest(path, version, ctx) =>
+ zooKeeper.delete(path, version, new VoidCallback {
+ override def processResult(rc: Int, path: String, ctx: Any): Unit =
+ callback(DeleteResponse(Code.get(rc), path, Option(ctx)))
+ }, ctx.orNull)
+ case GetAclRequest(path, ctx) =>
+ zooKeeper.getACL(path, null, new ACLCallback {
+ override def processResult(rc: Int, path: String, ctx: Any, acl: java.util.List[ACL], stat: Stat): Unit = {
+ callback(GetAclResponse(Code.get(rc), path, Option(ctx), Option(acl).map(_.asScala).getOrElse(Seq.empty),
+ stat))
+ }}, ctx.orNull)
+ case SetAclRequest(path, acl, version, ctx) =>
+ zooKeeper.setACL(path, acl.asJava, version, new StatCallback {
+ override def processResult(rc: Int, path: String, ctx: Any, stat: Stat): Unit =
+ callback(SetAclResponse(Code.get(rc), path, Option(ctx), stat))
+ }, ctx.orNull)
+ }
+ }
+
+ /**
+ * Wait indefinitely until the underlying zookeeper client to reaches the CONNECTED state.
+ * @throws ZooKeeperClientAuthFailedException if the authentication failed either before or while waiting for connection.
+ * @throws ZooKeeperClientExpiredException if the session expired either before or while waiting for connection.
+ */
+ def waitUntilConnected(): Unit = inLock(isConnectedOrExpiredLock) {
+ waitUntilConnected(Long.MaxValue, TimeUnit.MILLISECONDS)
+ }
+
+ private def waitUntilConnected(timeout: Long, timeUnit: TimeUnit): Unit = {
+ info("Waiting until connected.")
+ var nanos = timeUnit.toNanos(timeout)
+ inLock(isConnectedOrExpiredLock) {
+ var state = zooKeeper.getState
+ while (!state.isConnected && state.isAlive) {
+ if (nanos <= 0) {
+ throw new ZooKeeperClientTimeoutException(s"Timed out waiting for connection while in state: $state")
+ }
+ nanos = isConnectedOrExpiredCondition.awaitNanos(nanos)
+ state = zooKeeper.getState
+ }
+ if (state == States.AUTH_FAILED) {
+ throw new ZooKeeperClientAuthFailedException("Auth failed either before or while waiting for connection")
+ } else if (state == States.CLOSED) {
+ throw new ZooKeeperClientExpiredException("Session expired either before or while waiting for connection")
+ }
+ }
+ info("Connected.")
+ }
+
+ // If this method is changed, the documentation for registerZNodeChangeHandler and/or registerZNodeChildChangeHandler
+ // may need to be updated.
+ private def shouldWatch(request: AsyncRequest): Boolean = request match {
+ case _: GetChildrenRequest => zNodeChildChangeHandlers.contains(request.path)
+ case _: ExistsRequest | _: GetDataRequest => zNodeChangeHandlers.contains(request.path)
+ case _ => throw new IllegalArgumentException(s"Request $request is not watchable")
+ }
+
+ /**
+ * Register the handler to ZooKeeperClient. This is just a local operation. This does not actually register a watcher.
+ *
+ * The watcher is only registered once the user calls handle(AsyncRequest) or handle(Seq[AsyncRequest])
+ * with either a GetDataRequest or ExistsRequest.
+ *
+ * NOTE: zookeeper only allows registration to a nonexistent znode with ExistsRequest.
+ *
+ * @param zNodeChangeHandler the handler to register
+ */
+ def registerZNodeChangeHandler(zNodeChangeHandler: ZNodeChangeHandler): Unit = {
+ zNodeChangeHandlers.put(zNodeChangeHandler.path, zNodeChangeHandler)
+ }
+
+ /**
+ * Unregister the handler from ZooKeeperClient. This is just a local operation.
+ * @param path the path of the handler to unregister
+ */
+ def unregisterZNodeChangeHandler(path: String): Unit = {
+ zNodeChangeHandlers.remove(path)
+ }
+
+ /**
+ * Register the handler to ZooKeeperClient. This is just a local operation. This does not actually register a watcher.
+ *
+ * The watcher is only registered once the user calls handle(AsyncRequest) or handle(Seq[AsyncRequest]) with a GetChildrenRequest.
+ *
+ * @param zNodeChildChangeHandler the handler to register
+ */
+ def registerZNodeChildChangeHandler(zNodeChildChangeHandler: ZNodeChildChangeHandler): Unit = {
+ zNodeChildChangeHandlers.put(zNodeChildChangeHandler.path, zNodeChildChangeHandler)
+ }
+
+ /**
+ * Unregister the handler from ZooKeeperClient. This is just a local operation.
+ * @param path the path of the handler to unregister
+ */
+ def unregisterZNodeChildChangeHandler(path: String): Unit = {
+ zNodeChildChangeHandlers.remove(path)
+ }
+
+ def close(): Unit = inWriteLock(initializationLock) {
+ info("Closing.")
+ zNodeChangeHandlers.clear()
+ zNodeChildChangeHandlers.clear()
+ zooKeeper.close()
+ info("Closed.")
+ }
+
+ def sessionId: Long = inReadLock(initializationLock) {
+ zooKeeper.getSessionId
+ }
+
+ private def initialize(): Unit = {
+ if (!zooKeeper.getState.isAlive) {
+ info(s"Initializing a new session to $connectString.")
+ var now = System.currentTimeMillis()
+ val threshold = now + connectionTimeoutMs
+ while (now < threshold) {
+ try {
+ zooKeeper.close()
+ zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher)
+ waitUntilConnected(threshold - now, TimeUnit.MILLISECONDS)
+ return
+ } catch {
+ case _: Exception =>
+ now = System.currentTimeMillis()
+ if (now < threshold) {
+ Thread.sleep(1000)
+ now = System.currentTimeMillis()
+ }
+ }
+ }
+ info(s"Timed out waiting for connection during session initialization while in state: ${zooKeeper.getState}")
+ stateChangeHandler.onReconnectionTimeout()
+ }
+ }
+
+ private object ZooKeeperClientWatcher extends Watcher {
+ override def process(event: WatchedEvent): Unit = {
+ debug("Received event: " + event)
+ Option(event.getPath) match {
+ case None =>
+ inLock(isConnectedOrExpiredLock) {
+ isConnectedOrExpiredCondition.signalAll()
+ }
+ if (event.getState == KeeperState.AuthFailed) {
+ info("Auth failed.")
+ stateChangeHandler.onAuthFailure()
+ } else if (event.getState == KeeperState.Expired) {
+ inWriteLock(initializationLock) {
+ info("Session expired.")
+ stateChangeHandler.beforeInitializingSession()
+ initialize()
+ stateChangeHandler.afterInitializingSession()
+ }
+ }
+ case Some(path) =>
+ (event.getType: @unchecked) match {
+ case EventType.NodeChildrenChanged => zNodeChildChangeHandlers.get(path).foreach(_.handleChildChange())
+ case EventType.NodeCreated => zNodeChangeHandlers.get(path).foreach(_.handleCreation())
+ case EventType.NodeDeleted => zNodeChangeHandlers.get(path).foreach(_.handleDeletion())
+ case EventType.NodeDataChanged => zNodeChangeHandlers.get(path).foreach(_.handleDataChange())
+ }
+ }
+ }
+ }
+}
+
+trait StateChangeHandler {
+ def beforeInitializingSession(): Unit = {}
+ def afterInitializingSession(): Unit = {}
+ def onAuthFailure(): Unit = {}
+ def onReconnectionTimeout(): Unit = {}
+}
+
+trait ZNodeChangeHandler {
+ val path: String
+ def handleCreation(): Unit = {}
+ def handleDeletion(): Unit = {}
+ def handleDataChange(): Unit = {}
+}
+
+trait ZNodeChildChangeHandler {
+ val path: String
+ def handleChildChange(): Unit = {}
+}
+
+sealed trait AsyncRequest {
+ /**
+ * This type member allows us to define methods that take requests and return responses with the correct types.
+ * See ``ZooKeeperClient.handleRequests`` for example.
+ */
+ type Response <: AsyncResponse
+ def path: String
+ def ctx: Option[Any]
+}
+
+case class CreateRequest(path: String, data: Array[Byte], acl: Seq[ACL], createMode: CreateMode,
+ ctx: Option[Any] = None) extends AsyncRequest {
+ type Response = CreateResponse
+}
+
+case class DeleteRequest(path: String, version: Int, ctx: Option[Any] = None) extends AsyncRequest {
+ type Response = DeleteResponse
+}
+
+case class ExistsRequest(path: String, ctx: Option[Any] = None) extends AsyncRequest {
+ type Response = ExistsResponse
+}
+
+case class GetDataRequest(path: String, ctx: Option[Any] = None) extends AsyncRequest {
+ type Response = GetDataResponse
+}
+
+case class SetDataRequest(path: String, data: Array[Byte], version: Int, ctx: Option[Any] = None) extends AsyncRequest {
+ type Response = SetDataResponse
+}
+
+case class GetAclRequest(path: String, ctx: Option[Any] = None) extends AsyncRequest {
+ type Response = GetAclResponse
+}
+
+case class SetAclRequest(path: String, acl: Seq[ACL], version: Int, ctx: Option[Any] = None) extends AsyncRequest {
+ type Response = SetAclResponse
+}
+
+case class GetChildrenRequest(path: String, ctx: Option[Any] = None) extends AsyncRequest {
+ type Response = GetChildrenResponse
+}
+
+sealed trait AsyncResponse {
+ def resultCode: Code
+ def path: String
+ def ctx: Option[Any]
+
+ /** Return None if the result code is OK and KeeperException otherwise. */
+ def resultException: Option[KeeperException] =
+ if (resultCode == Code.OK) None else Some(KeeperException.create(resultCode, path))
+}
+case class CreateResponse(resultCode: Code, path: String, ctx: Option[Any], name: String) extends AsyncResponse
+case class DeleteResponse(resultCode: Code, path: String, ctx: Option[Any]) extends AsyncResponse
+case class ExistsResponse(resultCode: Code, path: String, ctx: Option[Any], stat: Stat) extends AsyncResponse
+case class GetDataResponse(resultCode: Code, path: String, ctx: Option[Any], data: Array[Byte], stat: Stat) extends AsyncResponse
+case class SetDataResponse(resultCode: Code, path: String, ctx: Option[Any], stat: Stat) extends AsyncResponse
+case class GetAclResponse(resultCode: Code, path: String, ctx: Option[Any], acl: Seq[ACL], stat: Stat) extends AsyncResponse
+case class SetAclResponse(resultCode: Code, path: String, ctx: Option[Any], stat: Stat) extends AsyncResponse
+case class GetChildrenResponse(resultCode: Code, path: String, ctx: Option[Any], children: Seq[String], stat: Stat) extends AsyncResponse
+
+class ZooKeeperClientException(message: String) extends RuntimeException(message)
+class ZooKeeperClientExpiredException(message: String) extends ZooKeeperClientException(message)
+class ZooKeeperClientAuthFailedException(message: String) extends ZooKeeperClientException(message)
+class ZooKeeperClientTimeoutException(message: String) extends ZooKeeperClientException(message)
http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
index ea306a8..99ddcc3 100644
--- a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
@@ -47,7 +47,7 @@ class SaslPlainPlaintextConsumerTest extends BaseConsumerTest with SaslSetup {
/**
* Checks that everyone can access ZkUtils.SecureZkRootPaths and ZkUtils.SensitiveZkRootPaths
- * when zookeeper.set.acl=false, even if Zookeeper is SASL-enabled.
+ * when zookeeper.set.acl=false, even if ZooKeeper is SASL-enabled.
*/
@Test
def testZkAclsDisabled() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index 1ca5500..06ddd66 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -221,7 +221,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
} catch {
case _: UnknownTopicOrPartitionException => // expected exception
}
- // verify delete topic path for test2 is removed from zookeeper
+ // verify delete topic path for test2 is removed from ZooKeeper
TestUtils.verifyTopicDeletion(zkUtils, "test2", 1, servers)
// verify that topic test is untouched
TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicPartition).isDefined),
http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
index a646ced..7fc2436 100644
--- a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
+++ b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
@@ -53,7 +53,7 @@ class ZkNodeChangeNotificationListenerTest extends KafkaServerTestHarness {
/*
* There is no easy way to test purging. Even if we mock kafka time with MockTime, the purging compares kafka time
- * with the time stored in zookeeper stat and the embedded zookeeper server does not provide a way to mock time.
+ * with the time stored in ZooKeeper stat and the embedded ZooKeeper server does not provide a way to mock time.
* So to test purging we would have to use Time.SYSTEM.sleep(changeExpirationMs + 1) issue a write and check
* Assert.assertEquals(1, ZkUtils.getChildren(zkClient, seqNodeRoot).size). However even that the assertion
* can fail as the second node can be deleted depending on how threads get scheduled.