You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/11/01 01:06:56 UTC
kafka git commit: KAFKA-6072;
User ZookeeperClient in GroupCoordinator and TransactionCoordinator
Repository: kafka
Updated Branches:
refs/heads/trunk 517870271 -> f88fdbd31
KAFKA-6072; User ZookeeperClient in GroupCoordinator and TransactionCoordinator
Author: Manikumar Reddy <ma...@gmail.com>
Reviewers: Ted Yu <yu...@gmail.com>, Jun Rao <ju...@gmail.com>
Closes #4126 from omkreddy/KAFKA-6072-ZK-IN-GRoupCoordinator
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f88fdbd3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f88fdbd3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f88fdbd3
Branch: refs/heads/trunk
Commit: f88fdbd3115cdb0f1bd26817513f3d33359512b1
Parents: 5178702
Author: Manikumar Reddy <ma...@gmail.com>
Authored: Tue Oct 31 18:06:51 2017 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Oct 31 18:06:51 2017 -0700
----------------------------------------------------------------------
.../coordinator/group/GroupCoordinator.scala | 11 +--
.../group/GroupMetadataManager.scala | 5 +-
.../transaction/ProducerIdManager.scala | 11 +--
.../transaction/TransactionCoordinator.scala | 9 ++-
.../transaction/TransactionStateManager.scala | 5 +-
.../main/scala/kafka/server/KafkaServer.scala | 4 +-
.../src/main/scala/kafka/zk/KafkaZkClient.scala | 82 +++++++++++++++++++-
.../group/GroupCoordinatorTest.scala | 11 +--
.../group/GroupMetadataManagerTest.scala | 14 ++--
.../transaction/ProducerIdManagerTest.scala | 24 +++---
.../TransactionStateManagerTest.scala | 13 ++--
.../scala/unit/kafka/zk/KafkaZkClientTest.scala | 74 +++++++++++++++++-
12 files changed, 210 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/f88fdbd3/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index ed13a08..129eae4 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -23,7 +23,8 @@ import kafka.common.OffsetAndMetadata
import kafka.log.LogConfig
import kafka.message.ProducerCompressionCodec
import kafka.server._
-import kafka.utils._
+import kafka.utils.Logging
+import kafka.zk.KafkaZkClient
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.protocol.Errors
@@ -819,12 +820,12 @@ object GroupCoordinator {
val DeadGroup = GroupSummary(Dead.toString, NoProtocolType, NoProtocol, NoMembers)
def apply(config: KafkaConfig,
- zkUtils: ZkUtils,
+ zkClient: KafkaZkClient,
replicaManager: ReplicaManager,
time: Time): GroupCoordinator = {
val heartbeatPurgatory = DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", config.brokerId)
val joinPurgatory = DelayedOperationPurgatory[DelayedJoin]("Rebalance", config.brokerId)
- apply(config, zkUtils, replicaManager, heartbeatPurgatory, joinPurgatory, time)
+ apply(config, zkClient, replicaManager, heartbeatPurgatory, joinPurgatory, time)
}
private[group] def offsetConfig(config: KafkaConfig) = OffsetConfig(
@@ -841,7 +842,7 @@ object GroupCoordinator {
)
def apply(config: KafkaConfig,
- zkUtils: ZkUtils,
+ zkClient: KafkaZkClient,
replicaManager: ReplicaManager,
heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat],
joinPurgatory: DelayedOperationPurgatory[DelayedJoin],
@@ -852,7 +853,7 @@ object GroupCoordinator {
groupInitialRebalanceDelayMs = config.groupInitialRebalanceDelay)
val groupMetadataManager = new GroupMetadataManager(config.brokerId, config.interBrokerProtocolVersion,
- offsetConfig, replicaManager, zkUtils, time)
+ offsetConfig, replicaManager, zkClient, time)
new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, groupMetadataManager, heartbeatPurgatory, joinPurgatory, time)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f88fdbd3/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 67a048d..7e6a643 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -31,6 +31,7 @@ import kafka.metrics.KafkaMetricsGroup
import kafka.server.ReplicaManager
import kafka.utils.CoreUtils.inLock
import kafka.utils._
+import kafka.zk.KafkaZkClient
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.internals.Topic
@@ -50,7 +51,7 @@ class GroupMetadataManager(brokerId: Int,
interBrokerProtocolVersion: ApiVersion,
config: OffsetConfig,
replicaManager: ReplicaManager,
- zkUtils: ZkUtils,
+ zkClient: KafkaZkClient,
time: Time) extends Logging with KafkaMetricsGroup {
private val compressionType: CompressionType = CompressionType.forId(config.offsetsTopicCompressionCodec.codec)
@@ -842,7 +843,7 @@ class GroupMetadataManager(brokerId: Int,
* If the topic does not exist, the configured partition count is returned.
*/
private def getGroupMetadataTopicPartitionCount: Int = {
- zkUtils.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.offsetsTopicNumPartitions)
+ zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.offsetsTopicNumPartitions)
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/f88fdbd3/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
index f7bde96..6be3c6b 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
@@ -18,6 +18,7 @@ package kafka.coordinator.transaction
import kafka.common.KafkaException
import kafka.utils.{Json, Logging, ZkUtils}
+import kafka.zk.KafkaZkClient
/**
* ProducerIdManager is the part of the transaction coordinator that provides ProducerIds in a unique way
@@ -65,7 +66,7 @@ case class ProducerIdBlock(brokerId: Int, blockStartId: Long, blockEndId: Long)
}
}
-class ProducerIdManager(val brokerId: Int, val zkUtils: ZkUtils) extends Logging {
+class ProducerIdManager(val brokerId: Int, val zkClient: KafkaZkClient) extends Logging {
this.logIdent = "[ProducerId Manager " + brokerId + "]: "
@@ -82,7 +83,7 @@ class ProducerIdManager(val brokerId: Int, val zkUtils: ZkUtils) extends Logging
var zkWriteComplete = false
while (!zkWriteComplete) {
// refresh current producerId block from zookeeper again
- val (dataOpt, zkVersion) = zkUtils.readDataAndVersionMaybeNull(ZkUtils.ProducerIdBlockPath)
+ val (dataOpt, zkVersion) = zkClient.getDataAndVersion(ZkUtils.ProducerIdBlockPath)
// generate the new producerId block
currentProducerIdBlock = dataOpt match {
@@ -105,7 +106,7 @@ class ProducerIdManager(val brokerId: Int, val zkUtils: ZkUtils) extends Logging
val newProducerIdBlockData = ProducerIdManager.generateProducerIdBlockJson(currentProducerIdBlock)
// try to write the new producerId block into zookeeper
- val (succeeded, version) = zkUtils.conditionalUpdatePersistentPath(ZkUtils.ProducerIdBlockPath,
+ val (succeeded, version) = zkClient.conditionalUpdatePath(ZkUtils.ProducerIdBlockPath,
newProducerIdBlockData, zkVersion, Some(checkProducerIdBlockZkData))
zkWriteComplete = succeeded
@@ -114,10 +115,10 @@ class ProducerIdManager(val brokerId: Int, val zkUtils: ZkUtils) extends Logging
}
}
- private def checkProducerIdBlockZkData(zkUtils: ZkUtils, path: String, expectedData: String): (Boolean, Int) = {
+ private def checkProducerIdBlockZkData(zkClient: KafkaZkClient, path: String, expectedData: String): (Boolean, Int) = {
try {
val expectedPidBlock = ProducerIdManager.parseProducerIdBlockData(expectedData)
- val (dataOpt, zkVersion) = zkUtils.readDataAndVersionMaybeNull(ZkUtils.ProducerIdBlockPath)
+ val (dataOpt, zkVersion) = zkClient.getDataAndVersion(ZkUtils.ProducerIdBlockPath)
dataOpt match {
case Some(data) =>
val currProducerIdBLock = ProducerIdManager.parseProducerIdBlockData(data)
http://git-wip-us.apache.org/repos/asf/kafka/blob/f88fdbd3/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index 6ad1f40..2cc719d 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -20,7 +20,8 @@ import java.util.Properties
import java.util.concurrent.atomic.AtomicBoolean
import kafka.server.{DelayedOperationPurgatory, KafkaConfig, MetadataCache, ReplicaManager}
-import kafka.utils.{Logging, Scheduler, ZkUtils}
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.metrics.Metrics
@@ -34,7 +35,7 @@ object TransactionCoordinator {
def apply(config: KafkaConfig,
replicaManager: ReplicaManager,
scheduler: Scheduler,
- zkUtils: ZkUtils,
+ zkClient: KafkaZkClient,
metrics: Metrics,
metadataCache: MetadataCache,
time: Time): TransactionCoordinator = {
@@ -50,11 +51,11 @@ object TransactionCoordinator {
config.transactionRemoveExpiredTransactionalIdCleanupIntervalMs,
config.requestTimeoutMs)
- val producerIdManager = new ProducerIdManager(config.brokerId, zkUtils)
+ val producerIdManager = new ProducerIdManager(config.brokerId, zkClient)
// we do not need to turn on reaper thread since no tasks will be expired and there are no completed tasks to be purged
val txnMarkerPurgatory = DelayedOperationPurgatory[DelayedTxnMarker]("txn-marker-purgatory", config.brokerId,
reaperEnabled = false, timerEnabled = false)
- val txnStateManager = new TransactionStateManager(config.brokerId, zkUtils, scheduler, replicaManager, txnConfig, time)
+ val txnStateManager = new TransactionStateManager(config.brokerId, zkClient, scheduler, replicaManager, txnConfig, time)
val logContext = new LogContext(s"[TransactionCoordinator id=${config.brokerId}] ")
val txnMarkerChannelManager = TransactionMarkerChannelManager(config, metrics, metadataCache, txnStateManager,
http://git-wip-us.apache.org/repos/asf/kafka/blob/f88fdbd3/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index f2e25c4..b962e82 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -29,6 +29,7 @@ import kafka.server.Defaults
import kafka.server.ReplicaManager
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
import kafka.utils.{Logging, Pool, Scheduler, ZkUtils}
+import kafka.zk.KafkaZkClient
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.protocol.Errors
@@ -68,7 +69,7 @@ object TransactionStateManager {
* </ul>
*/
class TransactionStateManager(brokerId: Int,
- zkUtils: ZkUtils,
+ zkClient: KafkaZkClient,
scheduler: Scheduler,
replicaManager: ReplicaManager,
config: TransactionConfig,
@@ -282,7 +283,7 @@ class TransactionStateManager(brokerId: Int,
* If the topic does not exist, the default partition count is returned.
*/
private def getTransactionTopicPartitionCount: Int = {
- zkUtils.getTopicPartitionCount(Topic.TRANSACTION_STATE_TOPIC_NAME).getOrElse(config.transactionLogNumPartitions)
+ zkClient.getTopicPartitionCount(Topic.TRANSACTION_STATE_TOPIC_NAME).getOrElse(config.transactionLogNumPartitions)
}
private def loadTransactionMetadata(topicPartition: TopicPartition, coordinatorEpoch: Int): Pool[String, TransactionMetadata] = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/f88fdbd3/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index e870ce4..f8111ff 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -259,12 +259,12 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
/* start group coordinator */
// Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
- groupCoordinator = GroupCoordinator(config, zkUtils, replicaManager, Time.SYSTEM)
+ groupCoordinator = GroupCoordinator(config, zkClient, replicaManager, Time.SYSTEM)
groupCoordinator.startup()
/* start transaction coordinator, with a separate background thread scheduler for transaction expiration and log loading */
// Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
- transactionCoordinator = TransactionCoordinator(config, replicaManager, new KafkaScheduler(threads = 1, threadNamePrefix = "transaction-log-manager-"), zkUtils, metrics, metadataCache, Time.SYSTEM)
+ transactionCoordinator = TransactionCoordinator(config, replicaManager, new KafkaScheduler(threads = 1, threadNamePrefix = "transaction-log-manager-"), zkClient, metrics, metadataCache, Time.SYSTEM)
transactionCoordinator.startup()
/* Get the authorizer and initialize it if one is specified.*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/f88fdbd3/core/src/main/scala/kafka/zk/KafkaZkClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 925a6f6..026dc9d 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -16,6 +16,7 @@
*/
package kafka.zk
+import java.nio.charset.StandardCharsets.UTF_8
import java.util.Properties
import kafka.api.LeaderAndIsr
@@ -26,13 +27,13 @@ import kafka.log.LogConfig
import kafka.server.ConfigType
import kafka.utils._
import kafka.zookeeper._
+import org.apache.kafka.common.TopicPartition
import org.apache.zookeeper.KeeperException.Code
import org.apache.zookeeper.data.Stat
import org.apache.zookeeper.{CreateMode, KeeperException}
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
-import org.apache.kafka.common.TopicPartition
/**
* Provides higher level Kafka-specific operations on top of the pipelined [[kafka.zookeeper.ZooKeeperClient]].
@@ -311,6 +312,85 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
}
/**
+ * Gets the partition count for a given topic
+ * @param topic The topic to get partition count for.
+ * @return optional integer that is Some if the topic exists and None otherwise.
+ */
+ def getTopicPartitionCount(topic: String): Option[Int] = {
+ val topicData = getReplicaAssignmentForTopics(Set(topic))
+ if (topicData.nonEmpty)
+ Some(topicData.size)
+ else
+ None
+ }
+
+ /**
+ * Gets the data and version at the given zk path
+ * @param path zk node path
+ * @return A tuple of 2 elements, where first element is zk node data as string
+ * and second element is zk node version.
+ * returns (None, -1) if node doesn't exists and throws exception for any error
+ */
+ def getDataAndVersion(path: String): (Option[String], Int) = {
+ val getDataRequest = GetDataRequest(path)
+ val getDataResponse = retryRequestUntilConnected(getDataRequest)
+
+ if (getDataResponse.resultCode == Code.OK) {
+ if (getDataResponse.data == null)
+ (None, getDataResponse.stat.getVersion)
+ else {
+ val data = new String(getDataResponse.data, UTF_8)
+ (Some(data), getDataResponse.stat.getVersion)
+ }
+ } else if (getDataResponse.resultCode == Code.NONODE)
+ (None, -1)
+ else
+ throw getDataResponse.resultException.get
+ }
+
+ /**
+ * Conditional update the persistent path data, return (true, newVersion) if it succeeds, otherwise (the path doesn't
+ * exist, the current version is not the expected version, etc.) return (false, -1)
+ *
+ * When there is a ConnectionLossException during the conditional update, ZookeeperClient will retry the update and may fail
+ * since the previous update may have succeeded (but the stored zkVersion no longer matches the expected one).
+ * In this case, we will run the optionalChecker to further check if the previous write did indeed succeeded.
+ */
+ def conditionalUpdatePath(path: String, data: String, expectVersion: Int,
+ optionalChecker:Option[(KafkaZkClient, String, String) => (Boolean,Int)] = None): (Boolean, Int) = {
+
+ val setDataRequest = SetDataRequest(path, data.getBytes(UTF_8), expectVersion)
+ val setDataResponse = retryRequestUntilConnected(setDataRequest)
+
+ setDataResponse.resultCode match {
+ case Code.OK =>
+ debug("Conditional update of path %s with value %s and expected version %d succeeded, returning the new version: %d"
+ .format(path, data, expectVersion, setDataResponse.stat.getVersion))
+ (true, setDataResponse.stat.getVersion)
+
+ case Code.BADVERSION =>
+ optionalChecker match {
+ case Some(checker) => checker(this, path, data)
+ case _ =>
+ debug("Checker method is not passed skipping zkData match")
+ debug("Conditional update of path %s with data %s and expected version %d failed due to %s"
+ .format(path, data, expectVersion, setDataResponse.resultException.get.getMessage))
+ (false, -1)
+ }
+
+ case Code.NONODE =>
+ debug("Conditional update of path %s with data %s and expected version %d failed due to %s".format(path, data,
+ expectVersion, setDataResponse.resultException.get.getMessage))
+ (false, -1)
+
+ case _ =>
+ debug("Conditional update of path %s with data %s and expected version %d failed due to %s".format(path, data,
+ expectVersion, setDataResponse.resultException.get.getMessage))
+ throw setDataResponse.resultException.get
+ }
+ }
+
+ /**
* Get all topics marked for deletion.
* @return sequence of topics marked for deletion.
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/f88fdbd3/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 22efb33..1c770a4 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -30,6 +30,7 @@ import org.easymock.{Capture, EasyMock, IAnswer}
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
+import kafka.zk.KafkaZkClient
import org.apache.kafka.common.internals.Topic
import org.junit.Assert._
import org.junit.{After, Assert, Before, Test}
@@ -61,7 +62,7 @@ class GroupCoordinatorTest extends JUnitSuite {
var groupCoordinator: GroupCoordinator = null
var replicaManager: ReplicaManager = null
var scheduler: KafkaScheduler = null
- var zkUtils: ZkUtils = null
+ var zkClient: KafkaZkClient = null
private val groupId = "groupId"
private val protocolType = "consumer"
@@ -85,10 +86,10 @@ class GroupCoordinatorTest extends JUnitSuite {
replicaManager = EasyMock.createNiceMock(classOf[ReplicaManager])
- zkUtils = EasyMock.createNiceMock(classOf[ZkUtils])
+ zkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
// make two partitions of the group topic to make sure some partitions are not owned by the coordinator
- EasyMock.expect(zkUtils.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME)).andReturn(Some(2))
- EasyMock.replay(zkUtils)
+ EasyMock.expect(zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME)).andReturn(Some(2))
+ EasyMock.replay(zkClient)
timer = new MockTimer
@@ -97,7 +98,7 @@ class GroupCoordinatorTest extends JUnitSuite {
val heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", timer, config.brokerId, reaperEnabled = false)
val joinPurgatory = new DelayedOperationPurgatory[DelayedJoin]("Rebalance", timer, config.brokerId, reaperEnabled = false)
- groupCoordinator = GroupCoordinator(config, zkUtils, replicaManager, heartbeatPurgatory, joinPurgatory, timer.time)
+ groupCoordinator = GroupCoordinator(config, zkClient, replicaManager, heartbeatPurgatory, joinPurgatory, timer.time)
groupCoordinator.startup(false)
// add the partition into the owned partition list
http://git-wip-us.apache.org/repos/asf/kafka/blob/f88fdbd3/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index b437405..abedcb8 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -23,7 +23,7 @@ import kafka.common.OffsetAndMetadata
import kafka.log.{Log, LogAppendInfo}
import kafka.server.{FetchDataInfo, KafkaConfig, LogOffsetMetadata, ReplicaManager}
import kafka.utils.TestUtils.fail
-import kafka.utils.{KafkaScheduler, MockTime, TestUtils, ZkUtils}
+import kafka.utils.{KafkaScheduler, MockTime, TestUtils}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record._
@@ -42,13 +42,15 @@ import scala.collection.JavaConverters._
import scala.collection._
import java.util.concurrent.locks.ReentrantLock
+import kafka.zk.KafkaZkClient
+
class GroupMetadataManagerTest {
var time: MockTime = null
var replicaManager: ReplicaManager = null
var groupMetadataManager: GroupMetadataManager = null
var scheduler: KafkaScheduler = null
- var zkUtils: ZkUtils = null
+ var zkClient: KafkaZkClient = null
var partition: Partition = null
val groupId = "foo"
@@ -74,13 +76,13 @@ class GroupMetadataManagerTest {
offsetCommitRequiredAcks = config.offsetCommitRequiredAcks)
// make two partitions of the group topic to make sure some partitions are not owned by the coordinator
- zkUtils = EasyMock.createNiceMock(classOf[ZkUtils])
- EasyMock.expect(zkUtils.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME)).andReturn(Some(2))
- EasyMock.replay(zkUtils)
+ zkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
+ EasyMock.expect(zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME)).andReturn(Some(2))
+ EasyMock.replay(zkClient)
time = new MockTime
replicaManager = EasyMock.createNiceMock(classOf[ReplicaManager])
- groupMetadataManager = new GroupMetadataManager(0, ApiVersion.latestVersion, offsetConfig, replicaManager, zkUtils, time)
+ groupMetadataManager = new GroupMetadataManager(0, ApiVersion.latestVersion, offsetConfig, replicaManager, zkClient, time)
partition = EasyMock.niceMock(classOf[Partition])
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f88fdbd3/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
index 39353b8..c5b42d4 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
@@ -17,34 +17,34 @@
package kafka.coordinator.transaction
import kafka.common.KafkaException
-import kafka.utils.ZkUtils
+import kafka.zk.KafkaZkClient
import org.easymock.{Capture, EasyMock, IAnswer}
import org.junit.{After, Test}
import org.junit.Assert._
class ProducerIdManagerTest {
- private val zkUtils = EasyMock.createNiceMock(classOf[ZkUtils])
+ private val zkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
@After
def tearDown(): Unit = {
- EasyMock.reset(zkUtils)
+ EasyMock.reset(zkClient)
}
@Test
def testGetProducerId() {
var zkVersion: Option[Int] = None
var data: String = null
- EasyMock.expect(zkUtils.readDataAndVersionMaybeNull(EasyMock.anyString)).andAnswer(new IAnswer[(Option[String], Int)] {
+ EasyMock.expect(zkClient.getDataAndVersion(EasyMock.anyString)).andAnswer(new IAnswer[(Option[String], Int)] {
override def answer(): (Option[String], Int) = zkVersion.map(Some(data) -> _).getOrElse(None, 0)
}).anyTimes()
val capturedVersion: Capture[Int] = EasyMock.newCapture()
val capturedData: Capture[String] = EasyMock.newCapture()
- EasyMock.expect(zkUtils.conditionalUpdatePersistentPath(EasyMock.anyString(),
+ EasyMock.expect(zkClient.conditionalUpdatePath(EasyMock.anyString(),
EasyMock.capture(capturedData),
EasyMock.capture(capturedVersion),
- EasyMock.anyObject[Option[(ZkUtils, String, String) => (Boolean, Int)]])).andAnswer(new IAnswer[(Boolean, Int)] {
+ EasyMock.anyObject[Option[(KafkaZkClient, String, String) => (Boolean, Int)]])).andAnswer(new IAnswer[(Boolean, Int)] {
override def answer(): (Boolean, Int) = {
val newZkVersion = capturedVersion.getValue + 1
zkVersion = Some(newZkVersion)
@@ -53,10 +53,10 @@ class ProducerIdManagerTest {
}
}).anyTimes()
- EasyMock.replay(zkUtils)
+ EasyMock.replay(zkClient)
- val manager1 = new ProducerIdManager(0, zkUtils)
- val manager2 = new ProducerIdManager(1, zkUtils)
+ val manager1 = new ProducerIdManager(0, zkClient)
+ val manager2 = new ProducerIdManager(1, zkClient)
val pid1 = manager1.generateProducerId()
val pid2 = manager2.generateProducerId()
@@ -76,15 +76,15 @@ class ProducerIdManagerTest {
@Test(expected = classOf[KafkaException])
def testExceedProducerIdLimit() {
- EasyMock.expect(zkUtils.readDataAndVersionMaybeNull(EasyMock.anyString)).andAnswer(new IAnswer[(Option[String], Int)] {
+ EasyMock.expect(zkClient.getDataAndVersion(EasyMock.anyString)).andAnswer(new IAnswer[(Option[String], Int)] {
override def answer(): (Option[String], Int) = {
val json = ProducerIdManager.generateProducerIdBlockJson(
ProducerIdBlock(0, Long.MaxValue - ProducerIdManager.PidBlockSize, Long.MaxValue))
(Some(json), 0)
}
}).anyTimes()
- EasyMock.replay(zkUtils)
- new ProducerIdManager(0, zkUtils)
+ EasyMock.replay(zkClient)
+ new ProducerIdManager(0, zkClient)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f88fdbd3/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 7973b9a..20dfaa6 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -21,8 +21,9 @@ import java.util.concurrent.locks.ReentrantLock
import kafka.log.Log
import kafka.server.{FetchDataInfo, LogOffsetMetadata, ReplicaManager}
-import kafka.utils.{MockScheduler, Pool, ZkUtils}
+import kafka.utils.{MockScheduler, Pool}
import kafka.utils.TestUtils.fail
+import kafka.zk.KafkaZkClient
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.internals.Topic.TRANSACTION_STATE_TOPIC_NAME
import org.apache.kafka.common.protocol.Errors
@@ -51,17 +52,17 @@ class TransactionStateManagerTest {
val time = new MockTime()
val scheduler = new MockScheduler(time)
- val zkUtils: ZkUtils = EasyMock.createNiceMock(classOf[ZkUtils])
+ val zkClient: KafkaZkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
val replicaManager: ReplicaManager = EasyMock.createNiceMock(classOf[ReplicaManager])
- EasyMock.expect(zkUtils.getTopicPartitionCount(TRANSACTION_STATE_TOPIC_NAME))
+ EasyMock.expect(zkClient.getTopicPartitionCount(TRANSACTION_STATE_TOPIC_NAME))
.andReturn(Some(numPartitions))
.anyTimes()
- EasyMock.replay(zkUtils)
+ EasyMock.replay(zkClient)
val txnConfig = TransactionConfig()
- val transactionManager: TransactionStateManager = new TransactionStateManager(0, zkUtils, scheduler, replicaManager, txnConfig, time)
+ val transactionManager: TransactionStateManager = new TransactionStateManager(0, zkClient, scheduler, replicaManager, txnConfig, time)
val transactionalId1: String = "one"
val transactionalId2: String = "two"
@@ -82,7 +83,7 @@ class TransactionStateManagerTest {
@After
def tearDown() {
- EasyMock.reset(zkUtils, replicaManager)
+ EasyMock.reset(zkClient, replicaManager)
transactionManager.shutdown()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f88fdbd3/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index 00c0a02..f2d95c2 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -16,11 +16,14 @@
*/
package kafka.zk
+import kafka.common.TopicAndPartition
+import kafka.utils.ZkUtils
import kafka.zookeeper.ZooKeeperClient
-
-import org.junit.{After, Before, Test}
-import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
import org.apache.kafka.common.TopicPartition
+import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
+import org.junit.{After, Before, Test}
+
+import scala.collection.mutable
class KafkaZkClientTest extends ZooKeeperTestHarness {
@@ -89,4 +92,69 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
intercept[IllegalArgumentException](zkClient.createRecursive("create-invalid-path"))
}
+ @Test
+ def testGetTopicPartitionCount() {
+ val topic = "mytest"
+
+ // test with non-existing topic
+ assertTrue(zkClient.getTopicPartitionCount(topic).isEmpty)
+
+ // create a topic path
+ zkClient.createRecursive(ZkUtils.getTopicPath(topic))
+
+ val assignment = new mutable.HashMap[TopicAndPartition, Seq[Int]]()
+ assignment.put(new TopicAndPartition(topic, 0), Seq(0,1))
+ assignment.put(new TopicAndPartition(topic, 1), Seq(0,1))
+ zkClient.setTopicAssignmentRaw(topic, assignment.toMap)
+
+ assertEquals(2, zkClient.getTopicPartitionCount(topic).get)
+ }
+
+
+ @Test
+ def testGetDataAndVersion() {
+ val path = "/testpath"
+
+ // test with non-existing path
+ var dataAndVersion = zkClient.getDataAndVersion(path)
+ assertTrue(dataAndVersion._1.isEmpty)
+ assertEquals(-1, dataAndVersion._2)
+
+ // create a test path
+ zkClient.createRecursive(path)
+ zkClient.conditionalUpdatePath(path, "version1", 0)
+
+ // test with existing path
+ dataAndVersion = zkClient.getDataAndVersion(path)
+ assertEquals("version1", dataAndVersion._1.get)
+ assertEquals(1, dataAndVersion._2)
+
+ zkClient.conditionalUpdatePath(path, "version2", 1)
+ dataAndVersion = zkClient.getDataAndVersion(path)
+ assertEquals("version2", dataAndVersion._1.get)
+ assertEquals(2, dataAndVersion._2)
+ }
+
+ @Test
+ def testConditionalUpdatePath() {
+ val path = "/testconditionalpath"
+
+ // test with non-existing path
+ var statusAndVersion = zkClient.conditionalUpdatePath(path, "version0", 0)
+ assertFalse(statusAndVersion._1)
+ assertEquals(-1, statusAndVersion._2)
+
+ // create path
+ zkClient.createRecursive(path)
+
+ // test with valid expected version
+ statusAndVersion = zkClient.conditionalUpdatePath(path, "version1", 0)
+ assertTrue(statusAndVersion._1)
+ assertEquals(1, statusAndVersion._2)
+
+ // test with invalid expected version
+ statusAndVersion = zkClient.conditionalUpdatePath(path, "version2", 2)
+ assertFalse(statusAndVersion._1)
+ assertEquals(-1, statusAndVersion._2)
+ }
}
\ No newline at end of file