You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2012/03/23 17:46:03 UTC

svn commit: r1304473 - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/log/ main/scala/kafka/server/ test/resources/ test/scala/unit/kafka/integration/ test/scala/unit/kafka/log/ test/scala/unit/kafka/server/ test/scala/unit/kafka/zk/

Author: nehanarkhede
Date: Fri Mar 23 16:46:03 2012
New Revision: 1304473

URL: http://svn.apache.org/viewvc?rev=1304473&view=rev
Log:
KAFKA-307 Refactor server code to reduce interdependencies; patched by nehanarkhede; reviewed by junrao

Added:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
    incubator/kafka/branches/0.8/core/src/test/resources/log4j.properties
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala?rev=1304473&r1=1304472&r2=1304473&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala Fri Mar 23 16:46:03 2012
@@ -21,11 +21,9 @@ import java.io._
 import kafka.utils._
 import scala.collection._
 import java.util.concurrent.CountDownLatch
-import kafka.server.{KafkaConfig, KafkaZooKeeper}
+import kafka.server.KafkaConfig
 import kafka.common.{InvalidTopicException, InvalidPartitionException}
 import kafka.api.OffsetRequest
-import org.I0Itec.zkclient.ZkClient
-import kafka.cluster.{Partition, Replica}
 
 /**
  * The guy who creates and hands out logs
@@ -38,8 +36,6 @@ private[kafka] class LogManager(val conf
                                 needRecovery: Boolean) extends Logging {
   
   val logDir: File = new File(config.logDir)
-  var kafkaZookeeper = new KafkaZooKeeper(config, this)
-
   private val numPartitions = config.numPartitions
   private val maxSize: Long = config.logFileSize
   private val flushInterval = config.flushInterval
@@ -52,7 +48,6 @@ private[kafka] class LogManager(val conf
   private val logRetentionMSMap = getLogRetentionMSMap(config.logRetentionHoursMap)
   private val logRetentionSize = config.logRetentionSize
   private val scheduler = new KafkaScheduler(1, "kafka-logcleaner-", false)
-  private var replicas: Map[(String, Int), Replica] = new mutable.HashMap[(String, Int), Replica]()
 
   /* Initialize a log for each subdirectory of the main log directory */
   private val logs = new Pool[String, Pool[Int, Log]]()
@@ -89,8 +84,6 @@ private[kafka] class LogManager(val conf
    *  Register this broker in ZK for the first time.
    */
   def startup() {
-    kafkaZookeeper.startup
-    kafkaZookeeper.registerBrokerInZk()
 
     /* Schedule the cleanup task to delete old logs */
     if(scheduler != null) {
@@ -112,6 +105,7 @@ private[kafka] class LogManager(val conf
     startupLatch.await
   }
 
+
   /**
    * Create a log for the given topic and the given partition
    */
@@ -123,8 +117,6 @@ private[kafka] class LogManager(val conf
     }
   }
 
-  def getReplicaForPartition(topic: String, partition: Int): Option[Replica] = replicas.get((topic, partition))
-
   /**
    * Return the Pool (partitions) for a specific log
    */
@@ -172,10 +164,6 @@ private[kafka] class LogManager(val conf
    * Create the log if it does not exist, if it exists just return it
    */
   def getOrCreateLog(topic: String, partition: Int): Log = {
-    // TODO: Change this later
-    if(!ZkUtils.isPartitionOnBroker(kafkaZookeeper.zkClient, topic, partition, config.brokerId))
-      throw new InvalidPartitionException("Broker %d does not host partition %d for topic %s".
-        format(config.brokerId, partition, topic))
     var hasNewTopic = false
     var parts = getLogPool(topic, partition)
     if (parts == null) {
@@ -198,40 +186,9 @@ private[kafka] class LogManager(val conf
         info("Created log for '" + topic + "'-" + partition)
     }
 
-    // add this log to the list of replicas hosted on this broker
-    addReplicaForPartition(topic, partition)
     log
   }
 
-  def addReplicaForPartition(topic: String, partitionId: Int): Replica = {
-    val replica = replicas.get((topic, partitionId))
-    val log = getLog(topic, partitionId)
-    replica match {
-      case Some(r) =>
-        r.log match {
-          case None =>
-            val log = getLog(topic, partitionId)
-            r.log = log
-          case Some(l) => // nothing to do since log already exists
-        }
-      case None =>
-        val partition = new Partition(topic, partitionId)
-        log match {
-          case Some(l) =>
-            val replica = new Replica(config.brokerId, partition, topic, log, l.getHighwaterMark, l.maxSize, true)
-            replicas += (topic, partitionId) -> replica
-            info("Added replica for topic %s partition %s on broker %d"
-              .format(replica.topic, replica.partition.partId, replica.brokerId))
-          case None =>
-            val replica = new Replica(config.brokerId, partition, topic, None, -1, -1, false)
-            replicas += (topic, partitionId) -> replica
-            info("Added replica for topic %s partition %s on broker %d"
-              .format(replica.topic, replica.partition.partId, replica.brokerId))
-        }
-    }
-    replicas.get((topic, partitionId)).get
-  }
-
   /* Attemps to delete all provided segments from a log and returns how many it was able to */
   private def deleteSegments(log: Log, segments: Seq[LogSegment]): Int = {
     var total = 0
@@ -304,7 +261,6 @@ private[kafka] class LogManager(val conf
     val iter = getLogIterator
     while(iter.hasNext)
       iter.next.close()
-    kafkaZookeeper.close
   }
   
   private def getLogIterator(): Iterator[Log] = {
@@ -360,5 +316,4 @@ private[kafka] class LogManager(val conf
   def getTopicPartitionsMap() = topicPartitionsMap
 
   def getServerConfig: KafkaConfig = config
-  def getZookeeperClient: ZkClient = kafkaZookeeper.zkClient
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1304473&r1=1304472&r2=1304473&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Fri Mar 23 16:46:03 2012
@@ -24,15 +24,15 @@ import kafka.api._
 import kafka.log._
 import kafka.message._
 import kafka.network._
-import kafka.utils.{SystemTime, Logging}
 import org.apache.log4j.Logger
 import scala.collection.mutable.ListBuffer
+import kafka.utils.{SystemTime, Logging}
 import kafka.common.{FetchRequestFormatException, ErrorMapping}
 
 /**
  * Logic to handle the various Kafka requests
  */
-class KafkaApis(val logManager: LogManager) extends Logging {
+class KafkaApis(val logManager: LogManager, val kafkaZookeeper: KafkaZooKeeper) extends Logging {
   
   private val requestLogger = Logger.getLogger("kafka.request.logger")
 
@@ -62,7 +62,7 @@ class KafkaApis(val logManager: LogManag
     val requestSize = request.getNumTopicPartitions
     val errors = new Array[Short](requestSize)
     val offsets = new Array[Long](requestSize)
-	
+
     var msgIndex = -1
     for( topicData <- request.data ) {
       for( partitionData <- topicData.partitionData ) {
@@ -70,6 +70,7 @@ class KafkaApis(val logManager: LogManag
         val partition = partitionData.getTranslatedPartition(topicData.topic, logManager.chooseRandomPartition)
         try {
           // TODO: need to handle ack's here!  Will probably move to another method.
+          kafkaZookeeper.ensurePartitionOnThisBroker(topicData.topic, partition)
           val log = logManager.getOrCreateLog(topicData.topic, partition)
           log.append(partitionData.messages)
           offsets(msgIndex) = log.nextAppendOffset
@@ -156,7 +157,7 @@ class KafkaApis(val logManager: LogManag
 
     val topicsMetadata = new ListBuffer[TopicMetadata]()
     val config = logManager.getServerConfig
-    val zkClient = logManager.getZookeeperClient
+    val zkClient = kafkaZookeeper.getZookeeperClient
     val topicMetadataList = AdminUtils.getTopicMetaDataFromZK(metadataRequest.topics, zkClient)
 
     metadataRequest.topics.zip(topicMetadataList).foreach { topicAndMetadata =>

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1304473&r1=1304472&r2=1304473&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala Fri Mar 23 16:46:03 2012
@@ -23,6 +23,7 @@ import java.io.File
 import kafka.network.{SocketServerStats, SocketServer}
 import kafka.log.LogManager
 import kafka.utils._
+import kafka.cluster.Replica
 
 /**
  * Represents the lifecycle of a single Kafka broker. Handles all functionality required
@@ -37,6 +38,8 @@ class KafkaServer(val config: KafkaConfi
   var socketServer: SocketServer = null
   var requestHandlerPool: KafkaRequestHandlerPool = null
   private var logManager: LogManager = null
+  var kafkaZookeeper: KafkaZooKeeper = null
+  val replicaManager = new ReplicaManager(config)
 
   /**
    * Start up API for bringing up a single instance of the Kafka server.
@@ -62,7 +65,11 @@ class KafkaServer(val config: KafkaConfi
                                     config.numQueuedRequests,
                                     config.maxSocketRequestSize)
     Utils.registerMBean(socketServer.stats, statsMBeanName)
-    requestHandlerPool = new KafkaRequestHandlerPool(socketServer.requestChannel, new KafkaApis(logManager).handle, config.numIoThreads)
+
+    kafkaZookeeper = new KafkaZooKeeper(config, addReplica, getReplica)
+
+    requestHandlerPool = new KafkaRequestHandlerPool(socketServer.requestChannel,
+      new KafkaApis(logManager, kafkaZookeeper).handle, config.numIoThreads)
     socketServer.startup
 
     Mx4jLoader.maybeLoad
@@ -74,9 +81,7 @@ class KafkaServer(val config: KafkaConfi
     logManager.startup
 
     // starting relevant replicas and leader election for partitions assigned to this broker
-    // TODO: Some part of the broker startup logic is hidden inside KafkaZookeeper, but some of it has to be done here
-    // since it requires the log manager to come up. Ideally log manager should not hide KafkaZookeeper inside it
-    logManager.kafkaZookeeper.startReplicasForTopics(ZkUtils.getAllTopics(logManager.getZookeeperClient))
+    kafkaZookeeper.startup
     info("Server started.")
   }
   
@@ -88,6 +93,7 @@ class KafkaServer(val config: KafkaConfi
     val canShutdown = isShuttingDown.compareAndSet(false, true);
     if (canShutdown) {
       info("Shutting down Kafka server with id " + config.brokerId)
+      kafkaZookeeper.close
       if (socketServer != null)
         socketServer.shutdown()
       if(requestHandlerPool != null)
@@ -109,6 +115,14 @@ class KafkaServer(val config: KafkaConfi
    */
   def awaitShutdown(): Unit = shutdownLatch.await()
 
+  def addReplica(topic: String, partition: Int): Replica = {
+    // get local log
+    val log = logManager.getOrCreateLog(topic, partition)
+    replicaManager.addLocalReplica(topic, partition, log)
+  }
+
+  def getReplica(topic: String, partition: Int): Option[Replica] = replicaManager.getReplica(topic, partition)
+
   def getLogManager(): LogManager = logManager
 
   def getStats(): SocketServerStats = socketServer.stats

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala?rev=1304473&r1=1304472&r2=1304473&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala Fri Mar 23 16:46:03 2012
@@ -19,9 +19,8 @@ package kafka.server
 
 import kafka.utils._
 import org.apache.zookeeper.Watcher.Event.KeeperState
-import kafka.log.LogManager
 import java.net.InetAddress
-import kafka.common.KafkaZookeeperClient
+import kafka.common.{InvalidPartitionException, KafkaZookeeperClient}
 import kafka.cluster.Replica
 import org.I0Itec.zkclient.{IZkDataListener, IZkChildListener, IZkStateListener, ZkClient}
 
@@ -31,10 +30,12 @@ import org.I0Itec.zkclient.{IZkDataListe
  *   /brokers/[0...N] --> host:port
  *
  */
-class KafkaZooKeeper(config: KafkaConfig, logManager: LogManager) extends Logging {
+class KafkaZooKeeper(config: KafkaConfig,
+                     addReplicaCbk: (String, Int) => Replica,
+                     getReplicaCbk: (String, Int) => Option[Replica]) extends Logging {
 
   val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + config.brokerId
-  var zkClient: ZkClient = null
+  private var zkClient: ZkClient = null
   var topics: List[String] = Nil
   val lock = new Object()
   var existingTopics: Set[String] = Set.empty[String]
@@ -48,10 +49,11 @@ class KafkaZooKeeper(config: KafkaConfig
     info("connecting to ZK: " + config.zkConnect)
     zkClient = KafkaZookeeperClient.getZookeeperClient(config)
     zkClient.subscribeStateChanges(new SessionExpireListener)
-    subscribeToTopicAndPartitionsChanges
+    registerBrokerInZk()
+    subscribeToTopicAndPartitionsChanges(true)
   }
 
-  def registerBrokerInZk() {
+  private def registerBrokerInZk() {
     info("Registering broker " + brokerIdPath)
     val hostName = if (config.hostName == null) InetAddress.getLocalHost.getHostAddress else config.hostName
     val creatorId = hostName + "-" + System.currentTimeMillis
@@ -96,6 +98,14 @@ class KafkaZooKeeper(config: KafkaConfig
     }
   }
 
+  def ensurePartitionOnThisBroker(topic: String, partition: Int) {
+    if(!ZkUtils.isPartitionOnBroker(zkClient, topic, partition, config.brokerId))
+      throw new InvalidPartitionException("Broker %d does not host partition %d for topic %s".
+        format(config.brokerId, partition, topic))
+  }
+
+  def getZookeeperClient = zkClient
+
   def handleNewTopics(topics: Seq[String]) {
     // get relevant partitions to this broker
     val topicsAndPartitionsOnThisBroker = ZkUtils.getPartitionsAssignedToBroker(zkClient, topics, config.brokerId)
@@ -122,7 +132,7 @@ class KafkaZooKeeper(config: KafkaConfig
     startReplicasForPartitions(topic, partitionsAssignedToThisBroker)
   }
 
-  def subscribeToTopicAndPartitionsChanges {
+  def subscribeToTopicAndPartitionsChanges(startReplicas: Boolean) {
     info("Subscribing to %s path to watch for new topics".format(ZkUtils.BrokerTopicsPath))
     zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, topicPartitionsChangeListener)
     val topics = ZkUtils.getAllTopics(zkClient)
@@ -136,8 +146,10 @@ class KafkaZooKeeper(config: KafkaConfig
       val partitions = tp._2.map(p => p.toInt)
       partitions.foreach { partition =>
           // register leader change listener
-          zkClient.subscribeDataChanges(ZkUtils.getTopicPartitionLeaderPath(topic, partition.toString), leaderChangeListener)
+        zkClient.subscribeDataChanges(ZkUtils.getTopicPartitionLeaderPath(topic, partition.toString), leaderChangeListener)
       }
+      if(startReplicas)
+        startReplicasForPartitions(topic, partitions)
     }
   }
 
@@ -149,16 +161,11 @@ class KafkaZooKeeper(config: KafkaConfig
     }
   }
 
-  def startReplicasForTopics(topics: Seq[String]) {
-    val partitionsAssignedToThisBroker = ZkUtils.getPartitionsAssignedToBroker(zkClient, topics, config.brokerId)
-    partitionsAssignedToThisBroker.foreach(tp => startReplicasForPartitions(tp._1, tp._2))
-  }
-
   private def startReplicasForPartitions(topic: String, partitions: Seq[Int]) {
     partitions.foreach { partition =>
       val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partition).map(r => r.toInt)
       if(assignedReplicas.contains(config.brokerId)) {
-        val replica = logManager.addReplicaForPartition(topic, partition)
+        val replica = addReplicaCbk(topic, partition)
         startReplica(replica)
       } else
         warn("Ignoring partition %d of topic %s since broker %d doesn't host any replicas for it"
@@ -246,7 +253,7 @@ class KafkaZooKeeper(config: KafkaConfig
           .format(topic, partitionId, config.brokerId))
         val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionId).map(r => r.toInt)
         if(assignedReplicas.contains(config.brokerId)) {
-          val replica = logManager.getReplicaForPartition(topic, partitionId)
+          val replica = getReplicaCbk(topic, partitionId)
           replica match {
             case Some(r) => leaderElection(r)
             case None =>  error("No replica exists for topic %s partition %s on broker %d"

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala?rev=1304473&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala Fri Mar 23 16:46:03 2012
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.server
+
+import kafka.log.Log
+import kafka.cluster.{Partition, Replica}
+import kafka.utils.Logging
+import collection.mutable
+
+class ReplicaManager(config: KafkaConfig) extends Logging {
+
+  private var replicas: mutable.Map[(String, Int), Replica] = new mutable.HashMap[(String, Int), Replica]()
+
+  def addLocalReplica(topic: String, partitionId: Int, log: Log): Replica = {
+    val replica = replicas.get((topic, partitionId))
+    replica match {
+      case Some(r) =>
+        r.log match {
+          case None =>
+            r.log = Some(log)
+          case Some(l) => // nothing to do since log already exists
+        }
+      case None =>
+        val partition = new Partition(topic, partitionId)
+        val replica = new Replica(config.brokerId, partition, topic, Some(log), log.getHighwaterMark, log.maxSize, true)
+        replicas += (topic, partitionId) -> replica
+        info("Added local replica for topic %s partition %s on broker %d"
+          .format(replica.topic, replica.partition.partId, replica.brokerId))
+    }
+    replicas.get((topic, partitionId)).get
+  }
+
+  def addRemoteReplica(topic: String, partitionId: Int): Replica = {
+    val replica = replicas.get((topic, partitionId))
+    replica match {
+      case Some(r) =>
+      case None =>
+        val partition = new Partition(topic, partitionId)
+        val replica = new Replica(config.brokerId, partition, topic, None, -1, -1, false)
+        replicas += (topic, partitionId) -> replica
+        info("Added remote replica for topic %s partition %s on broker %d"
+          .format(replica.topic, replica.partition.partId, replica.brokerId))
+    }
+    replicas.get((topic, partitionId)).get
+  }
+
+  def getReplica(topic: String, partitionId: Int): Option[Replica] = {
+    replicas.get((topic, partitionId))
+  }
+}
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/resources/log4j.properties?rev=1304473&r1=1304472&r2=1304473&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/resources/log4j.properties (original)
+++ incubator/kafka/branches/0.8/core/src/test/resources/log4j.properties Fri Mar 23 16:46:03 2012
@@ -18,8 +18,8 @@ log4j.appender.stdout=org.apache.log4j.C
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
 
-log4j.logger.kafka=INFO
+log4j.logger.kafka=ERROR
 
 # zkclient can be verbose, during debugging it is common to adjust is separately
-log4j.logger.org.I0Itec.zkclient.ZkClient=OFF
-log4j.logger.org.apache.zookeeper=OFF
+log4j.logger.org.I0Itec.zkclient.ZkClient=ERROR
+log4j.logger.org.apache.zookeeper=ERROR

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala?rev=1304473&r1=1304472&r2=1304473&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala Fri Mar 23 16:46:03 2012
@@ -23,7 +23,6 @@ import kafka.admin.CreateTopicCommand
 import java.nio.ByteBuffer
 import kafka.log.LogManager
 import kafka.utils.TestUtils
-import kafka.server.{KafkaApis, KafkaConfig}
 import junit.framework.Assert._
 import org.I0Itec.zkclient.ZkClient
 import TestUtils._
@@ -31,6 +30,7 @@ import org.easymock.EasyMock
 import kafka.network.BoundedByteBufferReceive
 import kafka.api.{TopicMetadataSend, TopicMetadataRequest}
 import kafka.cluster.Broker
+import kafka.server.{KafkaZooKeeper, KafkaApis, KafkaConfig}
 
 class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
   val props = createBrokerConfigs(1)
@@ -83,9 +83,11 @@ class TopicMetadataTest extends JUnit3Su
   private def mockLogManagerAndTestTopic(topic: String) = {
     // topic metadata request only requires 2 APIs from the log manager
     val logManager = EasyMock.createMock(classOf[LogManager])
+    val kafkaZookeeper = EasyMock.createMock(classOf[KafkaZooKeeper])
+    EasyMock.expect(kafkaZookeeper.getZookeeperClient).andReturn(zkClient)
     EasyMock.expect(logManager.getServerConfig).andReturn(configs.head)
-    EasyMock.expect(logManager.getZookeeperClient).andReturn(zkClient)
     EasyMock.replay(logManager)
+    EasyMock.replay(kafkaZookeeper)
 
     // create a topic metadata request
     val topicMetadataRequest = new TopicMetadataRequest(List(topic))
@@ -95,7 +97,7 @@ class TopicMetadataTest extends JUnit3Su
     serializedMetadataRequest.rewind()
 
     // create the kafka request handler
-    val kafkaRequestHandler = new KafkaApis(logManager)
+    val kafkaRequestHandler = new KafkaApis(logManager, kafkaZookeeper)
 
     // mock the receive API to return the request buffer as created above
     val receivedRequest = EasyMock.createMock(classOf[BoundedByteBufferReceive])
@@ -123,6 +125,7 @@ class TopicMetadataTest extends JUnit3Su
 
     // verify the expected calls to log manager occurred in the right order
     EasyMock.verify(logManager)
+    EasyMock.verify(kafkaZookeeper)
     EasyMock.verify(receivedRequest)
   }
 }
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala?rev=1304473&r1=1304472&r2=1304473&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala Fri Mar 23 16:46:03 2012
@@ -19,13 +19,13 @@ package kafka.log
 
 import java.io._
 import junit.framework.Assert._
-import kafka.server.KafkaConfig
 import org.junit.Test
 import kafka.common.OffsetOutOfRangeException
 import kafka.zk.ZooKeeperTestHarness
 import kafka.utils.{TestZKUtils, Utils, MockTime, TestUtils}
 import org.scalatest.junit.JUnit3Suite
 import kafka.admin.CreateTopicCommand
+import kafka.server.KafkaConfig
 
 class LogManagerTest extends JUnit3Suite with ZooKeeperTestHarness {
 
@@ -48,9 +48,10 @@ class LogManagerTest extends JUnit3Suite
     logManager.startup
     logDir = logManager.logDir
 
+    TestUtils.createBrokersInZk(zookeeper.client, List(config.brokerId))
+
     // setup brokers in zookeeper as owners of partitions for this test
     CreateTopicCommand.createTopic(zookeeper.client, name, 3, 1, "0,0,0")
-
   }
 
   override def tearDown() {

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala?rev=1304473&r1=1304472&r2=1304473&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala Fri Mar 23 16:46:03 2012
@@ -79,7 +79,7 @@ class LeaderElectionTest extends JUnit3S
     leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 5000)
     assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1))
 
-    Thread.sleep(500)
+    Thread.sleep(zookeeper.tickTime)
 
     // bring the preferred replica back
     servers.head.startup()

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala?rev=1304473&r1=1304472&r2=1304473&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala Fri Mar 23 16:46:03 2012
@@ -27,7 +27,8 @@ import kafka.utils.{Utils, ZKStringSeria
 class EmbeddedZookeeper(val connectString: String) {
   val snapshotDir = TestUtils.tempDir()
   val logDir = TestUtils.tempDir()
-  val zookeeper = new ZooKeeperServer(snapshotDir, logDir, 3000)
+  val tickTime = 2000
+  val zookeeper = new ZooKeeperServer(snapshotDir, logDir, tickTime)
   val port = connectString.split(":")(1).toInt
   val factory = new NIOServerCnxn.Factory(new InetSocketAddress("127.0.0.1", port))
   factory.startup(zookeeper)