You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2012/03/23 17:46:03 UTC
svn commit: r1304473 - in /incubator/kafka/branches/0.8/core/src:
main/scala/kafka/log/ main/scala/kafka/server/ test/resources/
test/scala/unit/kafka/integration/ test/scala/unit/kafka/log/
test/scala/unit/kafka/server/ test/scala/unit/kafka/zk/
Author: nehanarkhede
Date: Fri Mar 23 16:46:03 2012
New Revision: 1304473
URL: http://svn.apache.org/viewvc?rev=1304473&view=rev
Log:
KAFKA-307 Refactor server code to reduce interdependencies; patched by nehanarkhede; reviewed by junrao
Added:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
Modified:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
incubator/kafka/branches/0.8/core/src/test/resources/log4j.properties
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala?rev=1304473&r1=1304472&r2=1304473&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala Fri Mar 23 16:46:03 2012
@@ -21,11 +21,9 @@ import java.io._
import kafka.utils._
import scala.collection._
import java.util.concurrent.CountDownLatch
-import kafka.server.{KafkaConfig, KafkaZooKeeper}
+import kafka.server.KafkaConfig
import kafka.common.{InvalidTopicException, InvalidPartitionException}
import kafka.api.OffsetRequest
-import org.I0Itec.zkclient.ZkClient
-import kafka.cluster.{Partition, Replica}
/**
* The guy who creates and hands out logs
@@ -38,8 +36,6 @@ private[kafka] class LogManager(val conf
needRecovery: Boolean) extends Logging {
val logDir: File = new File(config.logDir)
- var kafkaZookeeper = new KafkaZooKeeper(config, this)
-
private val numPartitions = config.numPartitions
private val maxSize: Long = config.logFileSize
private val flushInterval = config.flushInterval
@@ -52,7 +48,6 @@ private[kafka] class LogManager(val conf
private val logRetentionMSMap = getLogRetentionMSMap(config.logRetentionHoursMap)
private val logRetentionSize = config.logRetentionSize
private val scheduler = new KafkaScheduler(1, "kafka-logcleaner-", false)
- private var replicas: Map[(String, Int), Replica] = new mutable.HashMap[(String, Int), Replica]()
/* Initialize a log for each subdirectory of the main log directory */
private val logs = new Pool[String, Pool[Int, Log]]()
@@ -89,8 +84,6 @@ private[kafka] class LogManager(val conf
* Register this broker in ZK for the first time.
*/
def startup() {
- kafkaZookeeper.startup
- kafkaZookeeper.registerBrokerInZk()
/* Schedule the cleanup task to delete old logs */
if(scheduler != null) {
@@ -112,6 +105,7 @@ private[kafka] class LogManager(val conf
startupLatch.await
}
+
/**
* Create a log for the given topic and the given partition
*/
@@ -123,8 +117,6 @@ private[kafka] class LogManager(val conf
}
}
- def getReplicaForPartition(topic: String, partition: Int): Option[Replica] = replicas.get((topic, partition))
-
/**
* Return the Pool (partitions) for a specific log
*/
@@ -172,10 +164,6 @@ private[kafka] class LogManager(val conf
* Create the log if it does not exist, if it exists just return it
*/
def getOrCreateLog(topic: String, partition: Int): Log = {
- // TODO: Change this later
- if(!ZkUtils.isPartitionOnBroker(kafkaZookeeper.zkClient, topic, partition, config.brokerId))
- throw new InvalidPartitionException("Broker %d does not host partition %d for topic %s".
- format(config.brokerId, partition, topic))
var hasNewTopic = false
var parts = getLogPool(topic, partition)
if (parts == null) {
@@ -198,40 +186,9 @@ private[kafka] class LogManager(val conf
info("Created log for '" + topic + "'-" + partition)
}
- // add this log to the list of replicas hosted on this broker
- addReplicaForPartition(topic, partition)
log
}
- def addReplicaForPartition(topic: String, partitionId: Int): Replica = {
- val replica = replicas.get((topic, partitionId))
- val log = getLog(topic, partitionId)
- replica match {
- case Some(r) =>
- r.log match {
- case None =>
- val log = getLog(topic, partitionId)
- r.log = log
- case Some(l) => // nothing to do since log already exists
- }
- case None =>
- val partition = new Partition(topic, partitionId)
- log match {
- case Some(l) =>
- val replica = new Replica(config.brokerId, partition, topic, log, l.getHighwaterMark, l.maxSize, true)
- replicas += (topic, partitionId) -> replica
- info("Added replica for topic %s partition %s on broker %d"
- .format(replica.topic, replica.partition.partId, replica.brokerId))
- case None =>
- val replica = new Replica(config.brokerId, partition, topic, None, -1, -1, false)
- replicas += (topic, partitionId) -> replica
- info("Added replica for topic %s partition %s on broker %d"
- .format(replica.topic, replica.partition.partId, replica.brokerId))
- }
- }
- replicas.get((topic, partitionId)).get
- }
-
/* Attemps to delete all provided segments from a log and returns how many it was able to */
private def deleteSegments(log: Log, segments: Seq[LogSegment]): Int = {
var total = 0
@@ -304,7 +261,6 @@ private[kafka] class LogManager(val conf
val iter = getLogIterator
while(iter.hasNext)
iter.next.close()
- kafkaZookeeper.close
}
private def getLogIterator(): Iterator[Log] = {
@@ -360,5 +316,4 @@ private[kafka] class LogManager(val conf
def getTopicPartitionsMap() = topicPartitionsMap
def getServerConfig: KafkaConfig = config
- def getZookeeperClient: ZkClient = kafkaZookeeper.zkClient
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1304473&r1=1304472&r2=1304473&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Fri Mar 23 16:46:03 2012
@@ -24,15 +24,15 @@ import kafka.api._
import kafka.log._
import kafka.message._
import kafka.network._
-import kafka.utils.{SystemTime, Logging}
import org.apache.log4j.Logger
import scala.collection.mutable.ListBuffer
+import kafka.utils.{SystemTime, Logging}
import kafka.common.{FetchRequestFormatException, ErrorMapping}
/**
* Logic to handle the various Kafka requests
*/
-class KafkaApis(val logManager: LogManager) extends Logging {
+class KafkaApis(val logManager: LogManager, val kafkaZookeeper: KafkaZooKeeper) extends Logging {
private val requestLogger = Logger.getLogger("kafka.request.logger")
@@ -62,7 +62,7 @@ class KafkaApis(val logManager: LogManag
val requestSize = request.getNumTopicPartitions
val errors = new Array[Short](requestSize)
val offsets = new Array[Long](requestSize)
-
+
var msgIndex = -1
for( topicData <- request.data ) {
for( partitionData <- topicData.partitionData ) {
@@ -70,6 +70,7 @@ class KafkaApis(val logManager: LogManag
val partition = partitionData.getTranslatedPartition(topicData.topic, logManager.chooseRandomPartition)
try {
// TODO: need to handle ack's here! Will probably move to another method.
+ kafkaZookeeper.ensurePartitionOnThisBroker(topicData.topic, partition)
val log = logManager.getOrCreateLog(topicData.topic, partition)
log.append(partitionData.messages)
offsets(msgIndex) = log.nextAppendOffset
@@ -156,7 +157,7 @@ class KafkaApis(val logManager: LogManag
val topicsMetadata = new ListBuffer[TopicMetadata]()
val config = logManager.getServerConfig
- val zkClient = logManager.getZookeeperClient
+ val zkClient = kafkaZookeeper.getZookeeperClient
val topicMetadataList = AdminUtils.getTopicMetaDataFromZK(metadataRequest.topics, zkClient)
metadataRequest.topics.zip(topicMetadataList).foreach { topicAndMetadata =>
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1304473&r1=1304472&r2=1304473&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala Fri Mar 23 16:46:03 2012
@@ -23,6 +23,7 @@ import java.io.File
import kafka.network.{SocketServerStats, SocketServer}
import kafka.log.LogManager
import kafka.utils._
+import kafka.cluster.Replica
/**
* Represents the lifecycle of a single Kafka broker. Handles all functionality required
@@ -37,6 +38,8 @@ class KafkaServer(val config: KafkaConfi
var socketServer: SocketServer = null
var requestHandlerPool: KafkaRequestHandlerPool = null
private var logManager: LogManager = null
+ var kafkaZookeeper: KafkaZooKeeper = null
+ val replicaManager = new ReplicaManager(config)
/**
* Start up API for bringing up a single instance of the Kafka server.
@@ -62,7 +65,11 @@ class KafkaServer(val config: KafkaConfi
config.numQueuedRequests,
config.maxSocketRequestSize)
Utils.registerMBean(socketServer.stats, statsMBeanName)
- requestHandlerPool = new KafkaRequestHandlerPool(socketServer.requestChannel, new KafkaApis(logManager).handle, config.numIoThreads)
+
+ kafkaZookeeper = new KafkaZooKeeper(config, addReplica, getReplica)
+
+ requestHandlerPool = new KafkaRequestHandlerPool(socketServer.requestChannel,
+ new KafkaApis(logManager, kafkaZookeeper).handle, config.numIoThreads)
socketServer.startup
Mx4jLoader.maybeLoad
@@ -74,9 +81,7 @@ class KafkaServer(val config: KafkaConfi
logManager.startup
// starting relevant replicas and leader election for partitions assigned to this broker
- // TODO: Some part of the broker startup logic is hidden inside KafkaZookeeper, but some of it has to be done here
- // since it requires the log manager to come up. Ideally log manager should not hide KafkaZookeeper inside it
- logManager.kafkaZookeeper.startReplicasForTopics(ZkUtils.getAllTopics(logManager.getZookeeperClient))
+ kafkaZookeeper.startup
info("Server started.")
}
@@ -88,6 +93,7 @@ class KafkaServer(val config: KafkaConfi
val canShutdown = isShuttingDown.compareAndSet(false, true);
if (canShutdown) {
info("Shutting down Kafka server with id " + config.brokerId)
+ kafkaZookeeper.close
if (socketServer != null)
socketServer.shutdown()
if(requestHandlerPool != null)
@@ -109,6 +115,14 @@ class KafkaServer(val config: KafkaConfi
*/
def awaitShutdown(): Unit = shutdownLatch.await()
+ def addReplica(topic: String, partition: Int): Replica = {
+ // get local log
+ val log = logManager.getOrCreateLog(topic, partition)
+ replicaManager.addLocalReplica(topic, partition, log)
+ }
+
+ def getReplica(topic: String, partition: Int): Option[Replica] = replicaManager.getReplica(topic, partition)
+
def getLogManager(): LogManager = logManager
def getStats(): SocketServerStats = socketServer.stats
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala?rev=1304473&r1=1304472&r2=1304473&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala Fri Mar 23 16:46:03 2012
@@ -19,9 +19,8 @@ package kafka.server
import kafka.utils._
import org.apache.zookeeper.Watcher.Event.KeeperState
-import kafka.log.LogManager
import java.net.InetAddress
-import kafka.common.KafkaZookeeperClient
+import kafka.common.{InvalidPartitionException, KafkaZookeeperClient}
import kafka.cluster.Replica
import org.I0Itec.zkclient.{IZkDataListener, IZkChildListener, IZkStateListener, ZkClient}
@@ -31,10 +30,12 @@ import org.I0Itec.zkclient.{IZkDataListe
* /brokers/[0...N] --> host:port
*
*/
-class KafkaZooKeeper(config: KafkaConfig, logManager: LogManager) extends Logging {
+class KafkaZooKeeper(config: KafkaConfig,
+ addReplicaCbk: (String, Int) => Replica,
+ getReplicaCbk: (String, Int) => Option[Replica]) extends Logging {
val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + config.brokerId
- var zkClient: ZkClient = null
+ private var zkClient: ZkClient = null
var topics: List[String] = Nil
val lock = new Object()
var existingTopics: Set[String] = Set.empty[String]
@@ -48,10 +49,11 @@ class KafkaZooKeeper(config: KafkaConfig
info("connecting to ZK: " + config.zkConnect)
zkClient = KafkaZookeeperClient.getZookeeperClient(config)
zkClient.subscribeStateChanges(new SessionExpireListener)
- subscribeToTopicAndPartitionsChanges
+ registerBrokerInZk()
+ subscribeToTopicAndPartitionsChanges(true)
}
- def registerBrokerInZk() {
+ private def registerBrokerInZk() {
info("Registering broker " + brokerIdPath)
val hostName = if (config.hostName == null) InetAddress.getLocalHost.getHostAddress else config.hostName
val creatorId = hostName + "-" + System.currentTimeMillis
@@ -96,6 +98,14 @@ class KafkaZooKeeper(config: KafkaConfig
}
}
+ def ensurePartitionOnThisBroker(topic: String, partition: Int) {
+ if(!ZkUtils.isPartitionOnBroker(zkClient, topic, partition, config.brokerId))
+ throw new InvalidPartitionException("Broker %d does not host partition %d for topic %s".
+ format(config.brokerId, partition, topic))
+ }
+
+ def getZookeeperClient = zkClient
+
def handleNewTopics(topics: Seq[String]) {
// get relevant partitions to this broker
val topicsAndPartitionsOnThisBroker = ZkUtils.getPartitionsAssignedToBroker(zkClient, topics, config.brokerId)
@@ -122,7 +132,7 @@ class KafkaZooKeeper(config: KafkaConfig
startReplicasForPartitions(topic, partitionsAssignedToThisBroker)
}
- def subscribeToTopicAndPartitionsChanges {
+ def subscribeToTopicAndPartitionsChanges(startReplicas: Boolean) {
info("Subscribing to %s path to watch for new topics".format(ZkUtils.BrokerTopicsPath))
zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, topicPartitionsChangeListener)
val topics = ZkUtils.getAllTopics(zkClient)
@@ -136,8 +146,10 @@ class KafkaZooKeeper(config: KafkaConfig
val partitions = tp._2.map(p => p.toInt)
partitions.foreach { partition =>
// register leader change listener
- zkClient.subscribeDataChanges(ZkUtils.getTopicPartitionLeaderPath(topic, partition.toString), leaderChangeListener)
+ zkClient.subscribeDataChanges(ZkUtils.getTopicPartitionLeaderPath(topic, partition.toString), leaderChangeListener)
}
+ if(startReplicas)
+ startReplicasForPartitions(topic, partitions)
}
}
@@ -149,16 +161,11 @@ class KafkaZooKeeper(config: KafkaConfig
}
}
- def startReplicasForTopics(topics: Seq[String]) {
- val partitionsAssignedToThisBroker = ZkUtils.getPartitionsAssignedToBroker(zkClient, topics, config.brokerId)
- partitionsAssignedToThisBroker.foreach(tp => startReplicasForPartitions(tp._1, tp._2))
- }
-
private def startReplicasForPartitions(topic: String, partitions: Seq[Int]) {
partitions.foreach { partition =>
val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partition).map(r => r.toInt)
if(assignedReplicas.contains(config.brokerId)) {
- val replica = logManager.addReplicaForPartition(topic, partition)
+ val replica = addReplicaCbk(topic, partition)
startReplica(replica)
} else
warn("Ignoring partition %d of topic %s since broker %d doesn't host any replicas for it"
@@ -246,7 +253,7 @@ class KafkaZooKeeper(config: KafkaConfig
.format(topic, partitionId, config.brokerId))
val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionId).map(r => r.toInt)
if(assignedReplicas.contains(config.brokerId)) {
- val replica = logManager.getReplicaForPartition(topic, partitionId)
+ val replica = getReplicaCbk(topic, partitionId)
replica match {
case Some(r) => leaderElection(r)
case None => error("No replica exists for topic %s partition %s on broker %d"
Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala?rev=1304473&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala Fri Mar 23 16:46:03 2012
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.server
+
+import kafka.log.Log
+import kafka.cluster.{Partition, Replica}
+import kafka.utils.Logging
+import collection.mutable
+
+class ReplicaManager(config: KafkaConfig) extends Logging {
+
+ private var replicas: mutable.Map[(String, Int), Replica] = new mutable.HashMap[(String, Int), Replica]()
+
+ def addLocalReplica(topic: String, partitionId: Int, log: Log): Replica = {
+ val replica = replicas.get((topic, partitionId))
+ replica match {
+ case Some(r) =>
+ r.log match {
+ case None =>
+ r.log = Some(log)
+ case Some(l) => // nothing to do since log already exists
+ }
+ case None =>
+ val partition = new Partition(topic, partitionId)
+ val replica = new Replica(config.brokerId, partition, topic, Some(log), log.getHighwaterMark, log.maxSize, true)
+ replicas += (topic, partitionId) -> replica
+ info("Added local replica for topic %s partition %s on broker %d"
+ .format(replica.topic, replica.partition.partId, replica.brokerId))
+ }
+ replicas.get((topic, partitionId)).get
+ }
+
+ def addRemoteReplica(topic: String, partitionId: Int): Replica = {
+ val replica = replicas.get((topic, partitionId))
+ replica match {
+ case Some(r) =>
+ case None =>
+ val partition = new Partition(topic, partitionId)
+ val replica = new Replica(config.brokerId, partition, topic, None, -1, -1, false)
+ replicas += (topic, partitionId) -> replica
+ info("Added remote replica for topic %s partition %s on broker %d"
+ .format(replica.topic, replica.partition.partId, replica.brokerId))
+ }
+ replicas.get((topic, partitionId)).get
+ }
+
+ def getReplica(topic: String, partitionId: Int): Option[Replica] = {
+ replicas.get((topic, partitionId))
+ }
+}
\ No newline at end of file
Modified: incubator/kafka/branches/0.8/core/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/resources/log4j.properties?rev=1304473&r1=1304472&r2=1304473&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/resources/log4j.properties (original)
+++ incubator/kafka/branches/0.8/core/src/test/resources/log4j.properties Fri Mar 23 16:46:03 2012
@@ -18,8 +18,8 @@ log4j.appender.stdout=org.apache.log4j.C
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
-log4j.logger.kafka=INFO
+log4j.logger.kafka=ERROR
# zkclient can be verbose, during debugging it is common to adjust is separately
-log4j.logger.org.I0Itec.zkclient.ZkClient=OFF
-log4j.logger.org.apache.zookeeper=OFF
+log4j.logger.org.I0Itec.zkclient.ZkClient=ERROR
+log4j.logger.org.apache.zookeeper=ERROR
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala?rev=1304473&r1=1304472&r2=1304473&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala Fri Mar 23 16:46:03 2012
@@ -23,7 +23,6 @@ import kafka.admin.CreateTopicCommand
import java.nio.ByteBuffer
import kafka.log.LogManager
import kafka.utils.TestUtils
-import kafka.server.{KafkaApis, KafkaConfig}
import junit.framework.Assert._
import org.I0Itec.zkclient.ZkClient
import TestUtils._
@@ -31,6 +30,7 @@ import org.easymock.EasyMock
import kafka.network.BoundedByteBufferReceive
import kafka.api.{TopicMetadataSend, TopicMetadataRequest}
import kafka.cluster.Broker
+import kafka.server.{KafkaZooKeeper, KafkaApis, KafkaConfig}
class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
val props = createBrokerConfigs(1)
@@ -83,9 +83,11 @@ class TopicMetadataTest extends JUnit3Su
private def mockLogManagerAndTestTopic(topic: String) = {
// topic metadata request only requires 2 APIs from the log manager
val logManager = EasyMock.createMock(classOf[LogManager])
+ val kafkaZookeeper = EasyMock.createMock(classOf[KafkaZooKeeper])
+ EasyMock.expect(kafkaZookeeper.getZookeeperClient).andReturn(zkClient)
EasyMock.expect(logManager.getServerConfig).andReturn(configs.head)
- EasyMock.expect(logManager.getZookeeperClient).andReturn(zkClient)
EasyMock.replay(logManager)
+ EasyMock.replay(kafkaZookeeper)
// create a topic metadata request
val topicMetadataRequest = new TopicMetadataRequest(List(topic))
@@ -95,7 +97,7 @@ class TopicMetadataTest extends JUnit3Su
serializedMetadataRequest.rewind()
// create the kafka request handler
- val kafkaRequestHandler = new KafkaApis(logManager)
+ val kafkaRequestHandler = new KafkaApis(logManager, kafkaZookeeper)
// mock the receive API to return the request buffer as created above
val receivedRequest = EasyMock.createMock(classOf[BoundedByteBufferReceive])
@@ -123,6 +125,7 @@ class TopicMetadataTest extends JUnit3Su
// verify the expected calls to log manager occurred in the right order
EasyMock.verify(logManager)
+ EasyMock.verify(kafkaZookeeper)
EasyMock.verify(receivedRequest)
}
}
\ No newline at end of file
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala?rev=1304473&r1=1304472&r2=1304473&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala Fri Mar 23 16:46:03 2012
@@ -19,13 +19,13 @@ package kafka.log
import java.io._
import junit.framework.Assert._
-import kafka.server.KafkaConfig
import org.junit.Test
import kafka.common.OffsetOutOfRangeException
import kafka.zk.ZooKeeperTestHarness
import kafka.utils.{TestZKUtils, Utils, MockTime, TestUtils}
import org.scalatest.junit.JUnit3Suite
import kafka.admin.CreateTopicCommand
+import kafka.server.KafkaConfig
class LogManagerTest extends JUnit3Suite with ZooKeeperTestHarness {
@@ -48,9 +48,10 @@ class LogManagerTest extends JUnit3Suite
logManager.startup
logDir = logManager.logDir
+ TestUtils.createBrokersInZk(zookeeper.client, List(config.brokerId))
+
// setup brokers in zookeeper as owners of partitions for this test
CreateTopicCommand.createTopic(zookeeper.client, name, 3, 1, "0,0,0")
-
}
override def tearDown() {
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala?rev=1304473&r1=1304472&r2=1304473&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala Fri Mar 23 16:46:03 2012
@@ -79,7 +79,7 @@ class LeaderElectionTest extends JUnit3S
leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 5000)
assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1))
- Thread.sleep(500)
+ Thread.sleep(zookeeper.tickTime)
// bring the preferred replica back
servers.head.startup()
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala?rev=1304473&r1=1304472&r2=1304473&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala Fri Mar 23 16:46:03 2012
@@ -27,7 +27,8 @@ import kafka.utils.{Utils, ZKStringSeria
class EmbeddedZookeeper(val connectString: String) {
val snapshotDir = TestUtils.tempDir()
val logDir = TestUtils.tempDir()
- val zookeeper = new ZooKeeperServer(snapshotDir, logDir, 3000)
+ val tickTime = 2000
+ val zookeeper = new ZooKeeperServer(snapshotDir, logDir, tickTime)
val port = connectString.split(":")(1).toInt
val factory = new NIOServerCnxn.Factory(new InetSocketAddress("127.0.0.1", port))
factory.startup(zookeeper)