You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/02/10 00:14:20 UTC

kafka git commit: KAFKA-1333; Add the consumer coordinator to server; reviewed by Onur Karaman and Jay Kreps

Repository: kafka
Updated Branches:
  refs/heads/trunk 0839def4b -> 39cd48de3


KAFKA-1333; Add the consumer coordinator to server; reviewed by Onur Karaman and Jay Kreps


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/39cd48de
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/39cd48de
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/39cd48de

Branch: refs/heads/trunk
Commit: 39cd48de321585ac55c94ec407fede5858f38962
Parents: 0839def
Author: Guozhang Wang <wa...@gmail.com>
Authored: Mon Feb 9 15:13:10 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Feb 9 15:13:10 2015 -0800

----------------------------------------------------------------------
 .../main/scala/kafka/network/SocketServer.scala |   2 +-
 .../kafka/server/DelayedOperationKey.scala      |  19 +++
 .../src/main/scala/kafka/server/KafkaApis.scala |  70 +++++----
 .../main/scala/kafka/server/KafkaServer.scala   | 144 ++++++++++++-------
 .../main/scala/kafka/server/MetadataCache.scala |  17 ++-
 .../main/scala/kafka/server/OffsetManager.scala |   2 +
 .../main/scala/kafka/tools/MirrorMaker.scala    |   6 +-
 .../kafka/api/ProducerFailureHandlingTest.scala |   2 +-
 .../unit/kafka/server/ServerShutdownTest.scala  |  13 +-
 .../unit/kafka/server/ServerStartupTest.scala   |  15 --
 10 files changed, 179 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/39cd48de/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 39b1651..76ce41a 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -107,7 +107,7 @@ class SocketServer(val brokerId: Int,
  */
 private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQuotas) extends Runnable with Logging {
 
-  protected val selector = Selector.open();
+  protected val selector = Selector.open()
   private val startupLatch = new CountDownLatch(1)
   private val shutdownLatch = new CountDownLatch(1)
   private val alive = new AtomicBoolean(true)

http://git-wip-us.apache.org/repos/asf/kafka/blob/39cd48de/core/src/main/scala/kafka/server/DelayedOperationKey.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedOperationKey.scala b/core/src/main/scala/kafka/server/DelayedOperationKey.scala
index fb7e9ed..b673e43 100644
--- a/core/src/main/scala/kafka/server/DelayedOperationKey.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperationKey.scala
@@ -30,9 +30,28 @@ object DelayedOperationKey {
   val globalLabel = "All"
 }
 
+/* used by delayed-produce and delayed-fetch operations */
 case class TopicPartitionOperationKey(topic: String, partition: Int) extends DelayedOperationKey {
 
   def this(topicAndPartition: TopicAndPartition) = this(topicAndPartition.topic, topicAndPartition.partition)
 
   override def keyLabel = "%s-%d".format(topic, partition)
 }
+
+/* used by bucketized delayed-heartbeat operations */
+case class TTimeMsKey(time: Long) extends DelayedOperationKey {
+
+  override def keyLabel = "%d".format(time)
+}
+
+/* used by delayed-join-group operations */
+case class ConsumerKey(groupId: String, consumerId: String) extends DelayedOperationKey {
+
+  override def keyLabel = "%s-%s".format(groupId, consumerId)
+}
+
+/* used by delayed-rebalance operations */
+case class ConsumerGroupKey(groupId: String) extends DelayedOperationKey {
+
+  override def keyLabel = groupId
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/39cd48de/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index f2b027b..6ee7d88 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -17,24 +17,20 @@
 
 package kafka.server
 
-import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.JoinGroupResponse
 import org.apache.kafka.common.requests.HeartbeatResponse
-import org.apache.kafka.common.requests.ResponseHeader
-import org.apache.kafka.common.protocol.types.Struct
+import org.apache.kafka.common.TopicPartition
 
 import kafka.api._
+import kafka.admin.AdminUtils
 import kafka.common._
+import kafka.controller.KafkaController
+import kafka.coordinator.ConsumerCoordinator
 import kafka.log._
 import kafka.network._
-import kafka.admin.AdminUtils
 import kafka.network.RequestChannel.Response
-import kafka.controller.KafkaController
 import kafka.utils.{SystemTime, Logging}
 
-import java.nio.ByteBuffer
-import java.util.concurrent.TimeUnit
-import java.util.concurrent.atomic._
 import scala.collection._
 
 import org.I0Itec.zkclient.ZkClient
@@ -45,14 +41,14 @@ import org.I0Itec.zkclient.ZkClient
 class KafkaApis(val requestChannel: RequestChannel,
                 val replicaManager: ReplicaManager,
                 val offsetManager: OffsetManager,
+                val coordinator: ConsumerCoordinator,
+                val controller: KafkaController,
                 val zkClient: ZkClient,
                 val brokerId: Int,
-                val config: KafkaConfig,
-                val controller: KafkaController) extends Logging {
+                val config: KafkaConfig) extends Logging {
 
   this.logIdent = "[KafkaApi-%d] ".format(brokerId)
-  val metadataCache = new MetadataCache
-  private var consumerGroupGenerationId = 0
+  val metadataCache = new MetadataCache(brokerId)
 
   /**
    * Top-level method that handles all requests and multiplexes to the right api
@@ -137,7 +133,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleOffsetCommitRequest(request: RequestChannel.Request) {
     val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest]
 
-    // the callback for sending the response
+    // the callback for sending an offset commit response
     def sendResponseCallback(commitStatus: immutable.Map[TopicAndPartition, Short]) {
       commitStatus.foreach { case (topicAndPartition, errorCode) =>
         // we only print warnings for known errors here; only replica manager could see an unknown
@@ -169,7 +165,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleProducerRequest(request: RequestChannel.Request) {
     val produceRequest = request.requestObj.asInstanceOf[ProducerRequest]
 
-    // the callback for sending the response
+    // the callback for sending a produce response
     def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) {
       var errorInResponse = false
       responseStatus.foreach { case (topicAndPartition, status) =>
@@ -224,7 +220,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleFetchRequest(request: RequestChannel.Request) {
     val fetchRequest = request.requestObj.asInstanceOf[FetchRequest]
 
-    // the callback for sending the response
+    // the callback for sending a fetch response
     def sendResponseCallback(responsePartitionData: Map[TopicAndPartition, FetchResponsePartitionData]) {
       responsePartitionData.foreach { case (topicAndPartition, data) =>
         // we only print warnings for known errors here; if it is unknown, it will cause
@@ -456,20 +452,42 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleJoinGroupRequest(request: RequestChannel.Request) {
     import JavaConversions._
-    val joinGroupReq = request.requestObj.asInstanceOf[JoinGroupRequestAndHeader]
-    val topics = joinGroupReq.body.topics().toSet
-    val partitions = this.replicaManager.logManager.allLogs.filter(log => topics.contains(log.topicAndPartition.topic))
-    val partitionList = partitions.map(_.topicAndPartition).map(tp => new org.apache.kafka.common.TopicPartition(tp.topic, tp.partition)).toBuffer
-    this.consumerGroupGenerationId += 1
-    val response = new JoinGroupResponse(ErrorMapping.NoError, this.consumerGroupGenerationId, joinGroupReq.body.consumerId, partitionList)
-    val send = new BoundedByteBufferSend(new JoinGroupResponseAndHeader(joinGroupReq.correlationId, response))
-    requestChannel.sendResponse(new RequestChannel.Response(request, send))
+
+    val joinGroupRequest = request.requestObj.asInstanceOf[JoinGroupRequestAndHeader]
+    
+    // the callback for sending a join-group response
+    def sendResponseCallback(partitions: List[TopicAndPartition], generationId: Int, errorCode: Short) {
+      val partitionList = partitions.map(tp => new TopicPartition(tp.topic, tp.partition)).toBuffer
+      val responseBody = new JoinGroupResponse(errorCode, generationId, joinGroupRequest.body.consumerId, partitionList)
+      val response = new JoinGroupResponseAndHeader(joinGroupRequest.correlationId, responseBody)
+      requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
+    }
+
+    // let the coordinator to handle join-group
+    coordinator.consumerJoinGroup(
+      joinGroupRequest.body.groupId(),
+      joinGroupRequest.body.consumerId(),
+      joinGroupRequest.body.topics().toList,
+      joinGroupRequest.body.sessionTimeout(),
+      joinGroupRequest.body.strategy(),
+      sendResponseCallback)
   }
   
   def handleHeartbeatRequest(request: RequestChannel.Request) {
-    val hbReq = request.requestObj.asInstanceOf[HeartbeatRequestAndHeader]
-    val send = new BoundedByteBufferSend(new HeartbeatResponseAndHeader(hbReq.correlationId, new HeartbeatResponse(Errors.NONE.code)))
-    requestChannel.sendResponse(new RequestChannel.Response(request, send))
+    val heartbeatRequest = request.requestObj.asInstanceOf[HeartbeatRequestAndHeader]
+
+    // the callback for sending a heartbeat response
+    def sendResponseCallback(errorCode: Short) {
+      val response = new HeartbeatResponseAndHeader(heartbeatRequest.correlationId, new HeartbeatResponse(errorCode))
+      requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
+    }
+
+    // let the coordinator to handle heartbeat
+    coordinator.consumerHeartbeat(
+      heartbeatRequest.body.groupId(),
+      heartbeatRequest.body.consumerId(),
+      heartbeatRequest.body.groupGenerationId(),
+      sendResponseCallback)
   }
   
   def close() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/39cd48de/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 89200da..7e5ddcb 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -34,30 +34,43 @@ import kafka.common.{ErrorMapping, InconsistentBrokerIdException, GenerateBroker
 import kafka.network.{Receive, BlockingChannel, SocketServer}
 import kafka.metrics.KafkaMetricsGroup
 import com.yammer.metrics.core.Gauge
+import kafka.coordinator.ConsumerCoordinator
 
 /**
  * Represents the lifecycle of a single Kafka broker. Handles all functionality required
  * to start up and shutdown a single Kafka node.
  */
 class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
-  private var isShuttingDown = new AtomicBoolean(false)
+  private val startupComplete = new AtomicBoolean(false)
+  private val isShuttingDown = new AtomicBoolean(false)
+  private val isStartingUp = new AtomicBoolean(false)
+
   private var shutdownLatch = new CountDownLatch(1)
-  private var startupComplete = new AtomicBoolean(false)
-  private var brokerId: Int = -1
 
   val brokerState: BrokerState = new BrokerState
-  val correlationId: AtomicInteger = new AtomicInteger(0)
+
+  var apis: KafkaApis = null
   var socketServer: SocketServer = null
   var requestHandlerPool: KafkaRequestHandlerPool = null
+
   var logManager: LogManager = null
+
   var offsetManager: OffsetManager = null
-  var kafkaHealthcheck: KafkaHealthcheck = null
-  var topicConfigManager: TopicConfigManager = null
+
   var replicaManager: ReplicaManager = null
-  var apis: KafkaApis = null
+
+  var topicConfigManager: TopicConfigManager = null
+
+  var consumerCoordinator: ConsumerCoordinator = null
+
   var kafkaController: KafkaController = null
+
   val kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
+
+  var kafkaHealthcheck: KafkaHealthcheck = null
+
   var zkClient: ZkClient = null
+  val correlationId: AtomicInteger = new AtomicInteger(0)
   val brokerMetaPropsFile = "meta.properties"
   val brokerMetadataCheckpoints = config.logDirs.map(logDir => (logDir, new BrokerMetadataCheckpoint(new File(logDir + File.separator +brokerMetaPropsFile)))).toMap
 
@@ -75,69 +88,87 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
   def startup() {
     try {
       info("starting")
-      brokerState.newState(Starting)
-      isShuttingDown = new AtomicBoolean(false)
-      shutdownLatch = new CountDownLatch(1)
 
-      /* start scheduler */
-      kafkaScheduler.startup()
+      if(isShuttingDown.get)
+        throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")
+
+      if(startupComplete.get)
+        return
+
+      val canStartup = isStartingUp.compareAndSet(false, true)
+      if (canStartup) {
+        brokerState.newState(Starting)
 
-      /* setup zookeeper */
-      zkClient = initZk()
+        /* start scheduler */
+        kafkaScheduler.startup()
 
-      /* start log manager */
-      logManager = createLogManager(zkClient, brokerState)
-      logManager.startup()
+        /* setup zookeeper */
+        zkClient = initZk()
 
-      /* generate brokerId */
-      config.brokerId =  getBrokerId
-      this.logIdent = "[Kafka Server " + config.brokerId + "], "
+        /* start log manager */
+        logManager = createLogManager(zkClient, brokerState)
+        logManager.startup()
 
-      socketServer = new SocketServer(config.brokerId,
-                                      config.hostName,
-                                      config.port,
-                                      config.numNetworkThreads,
-                                      config.queuedMaxRequests,
-                                      config.socketSendBufferBytes,
-                                      config.socketReceiveBufferBytes,
-                                      config.socketRequestMaxBytes,
-                                      config.maxConnectionsPerIp,
-                                      config.connectionsMaxIdleMs,
-                                      config.maxConnectionsPerIpOverrides)
-      socketServer.startup()
+        /* generate brokerId */
+        config.brokerId =  getBrokerId
+        this.logIdent = "[Kafka Server " + config.brokerId + "], "
 
-      replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown)
+        socketServer = new SocketServer(config.brokerId,
+          config.hostName,
+          config.port,
+          config.numNetworkThreads,
+          config.queuedMaxRequests,
+          config.socketSendBufferBytes,
+          config.socketReceiveBufferBytes,
+          config.socketRequestMaxBytes,
+          config.maxConnectionsPerIp,
+          config.connectionsMaxIdleMs,
+          config.maxConnectionsPerIpOverrides)
+        socketServer.startup()
 
-      /* start offset manager */
-      offsetManager = createOffsetManager()
+        /* start replica manager */
+        replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown)
+        replicaManager.startup()
 
-      kafkaController = new KafkaController(config, zkClient, brokerState)
+        /* start offset manager */
+        offsetManager = createOffsetManager()
 
-      /* start processing requests */
-      apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, zkClient, config.brokerId, config, kafkaController)
-      requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
-      brokerState.newState(RunningAsBroker)
+        /* start kafka controller */
+        kafkaController = new KafkaController(config, zkClient, brokerState)
+        kafkaController.startup()
 
-      Mx4jLoader.maybeLoad()
+        /* start kafka coordinator */
+        consumerCoordinator = new ConsumerCoordinator(config, zkClient)
+        consumerCoordinator.startup()
 
-      replicaManager.startup()
+        /* start processing requests */
+        apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, consumerCoordinator, kafkaController, zkClient, config.brokerId, config)
+        requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
+        brokerState.newState(RunningAsBroker)
 
-      kafkaController.startup()
+        Mx4jLoader.maybeLoad()
 
-      topicConfigManager = new TopicConfigManager(zkClient, logManager)
-      topicConfigManager.startup()
+        /* start topic config manager */
+        topicConfigManager = new TopicConfigManager(zkClient, logManager)
+        topicConfigManager.startup()
 
-      /* tell everyone we are alive */
-      kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient)
-      kafkaHealthcheck.startup()
+        /* tell everyone we are alive */
+        kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient)
+        kafkaHealthcheck.startup()
 
-      registerStats()
-      startupComplete.set(true)
-      info("started")
+        /* register broker metrics */
+        registerStats()
+
+        shutdownLatch = new CountDownLatch(1)
+        startupComplete.set(true)
+        isStartingUp.set(false)
+        info("started")
+      }
     }
     catch {
       case e: Throwable =>
         fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
+        isStartingUp.set(false)
         shutdown()
         throw e
     }
@@ -271,6 +302,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
   def shutdown() {
     try {
       info("shutting down")
+
+      if(isStartingUp.get)
+        throw new IllegalStateException("Kafka server is still starting up, cannot shut down!")
+
       val canShutdown = isShuttingDown.compareAndSet(false, true)
       if (canShutdown) {
         Utils.swallow(controlledShutdown())
@@ -290,20 +325,25 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
           Utils.swallow(replicaManager.shutdown())
         if(logManager != null)
           Utils.swallow(logManager.shutdown())
+        if(consumerCoordinator != null)
+          Utils.swallow(consumerCoordinator.shutdown())
         if(kafkaController != null)
           Utils.swallow(kafkaController.shutdown())
         if(zkClient != null)
           Utils.swallow(zkClient.close())
 
         brokerState.newState(NotRunning)
-        shutdownLatch.countDown()
+
         startupComplete.set(false)
+        isShuttingDown.set(false)
+        shutdownLatch.countDown()
         info("shut down completed")
       }
     }
     catch {
       case e: Throwable =>
         fatal("Fatal error during KafkaServer shutdown.", e)
+        isShuttingDown.set(false)
         throw e
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/39cd48de/core/src/main/scala/kafka/server/MetadataCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index bf81a1a..4c70aa7 100644
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -17,25 +17,28 @@
 
 package kafka.server
 
-import scala.collection.{Seq, Set, mutable}
 import kafka.api._
+import kafka.common._
 import kafka.cluster.Broker
-import java.util.concurrent.locks.ReentrantReadWriteLock
-import kafka.utils.Utils._
-import kafka.common.{ErrorMapping, ReplicaNotAvailableException, LeaderNotAvailableException}
-import kafka.common.TopicAndPartition
 import kafka.controller.KafkaController.StateChangeLogger
+import scala.collection.{Seq, Set, mutable}
+import kafka.utils.Logging
+import kafka.utils.Utils._
+
+import java.util.concurrent.locks.ReentrantReadWriteLock
 
 /**
  *  A cache for the state (e.g., current leader) of each partition. This cache is updated through
  *  UpdateMetadataRequest from the controller. Every broker maintains the same cache, asynchronously.
  */
-private[server] class MetadataCache {
+private[server] class MetadataCache(brokerId: Int) extends Logging {
   private val cache: mutable.Map[String, mutable.Map[Int, PartitionStateInfo]] =
     new mutable.HashMap[String, mutable.Map[Int, PartitionStateInfo]]()
   private var aliveBrokers: Map[Int, Broker] = Map()
   private val partitionMetadataLock = new ReentrantReadWriteLock()
 
+  this.logIdent = "[Kafka Metadata Cache on broker %d] ".format(brokerId)
+
   def getTopicMetadata(topics: Set[String]) = {
     val isAllTopics = topics.isEmpty
     val topicsRequested = if(isAllTopics) cache.keySet else topics
@@ -68,7 +71,7 @@ private[server] class MetadataCache {
                 new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError)
               } catch {
                 case e: Throwable =>
-                  debug("Error while fetching metadata for %s. Possible cause: %s".format(topicPartition, e.getMessage))
+                  debug("Error while fetching metadata for %s: %s".format(topicPartition, e.toString))
                   new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo,
                     ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
               }

http://git-wip-us.apache.org/repos/asf/kafka/blob/39cd48de/core/src/main/scala/kafka/server/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala
index 0bdd42f..83d5264 100644
--- a/core/src/main/scala/kafka/server/OffsetManager.scala
+++ b/core/src/main/scala/kafka/server/OffsetManager.scala
@@ -96,6 +96,8 @@ class OffsetManager(val config: OffsetManagerConfig,
 
   private val shuttingDown = new AtomicBoolean(false)
 
+  this.logIdent = "[Offset Manager on Broker " + replicaManager.config.brokerId + "]: "
+
   scheduler.schedule(name = "offsets-cache-compactor",
                      fun = compact,
                      period = config.offsetsRetentionCheckIntervalMs,

http://git-wip-us.apache.org/repos/asf/kafka/blob/39cd48de/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 81ae205..5374280 100644
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -442,7 +442,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
                         val producer: MirrorMakerBaseProducer,
                         val threadId: Int) extends Thread with Logging with KafkaMetricsGroup {
     private val threadName = "mirrormaker-producer-" + threadId
-    private val shutdownComplete: CountDownLatch = new CountDownLatch(1)
+    private val shutdownLatch: CountDownLatch = new CountDownLatch(1)
     this.logIdent = "[%s] ".format(threadName)
 
     setName(threadName)
@@ -466,7 +466,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
         case t: Throwable =>
           fatal("Producer thread failure due to ", t)
       } finally {
-        shutdownComplete.countDown()
+        shutdownLatch.countDown()
         info("Producer thread stopped")
         // if it exits accidentally, stop the entire mirror maker
         if (!isShuttingdown.get()) {
@@ -490,7 +490,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
 
     def awaitShutdown() {
       try {
-        shutdownComplete.await()
+        shutdownLatch.await()
         producer.close()
         info("Producer thread shutdown complete")
       } catch {

http://git-wip-us.apache.org/repos/asf/kafka/blob/39cd48de/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index 90c0b7a..11d6a97 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -272,7 +272,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
       for (server <- servers) {
         server.shutdown()
         server.awaitShutdown()
-        server.startup
+        server.startup()
 
         Thread.sleep(2000)
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/39cd48de/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index ba1e48e..82fa4cf 100644
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -119,7 +119,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
     val newProps = TestUtils.createBrokerConfig(0, port)
     newProps.setProperty("zookeeper.connect", "fakehostthatwontresolve:65535")
     val newConfig = new KafkaConfig(newProps)
-    var server = new KafkaServer(newConfig)
+    val server = new KafkaServer(newConfig)
     try {
       server.startup()
       fail("Expected KafkaServer setup to fail, throw exception")
@@ -129,14 +129,15 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
       // identify the correct exception, making sure the server was shutdown, and cleaning up if anything
       // goes wrong so that awaitShutdown doesn't hang
       case e: org.I0Itec.zkclient.exception.ZkException =>
-        assertEquals(server.brokerState.currentState, NotRunning.state)
-        if (server.brokerState.currentState != NotRunning.state)
-          server.shutdown()
+        assertEquals(NotRunning.state, server.brokerState.currentState)
       case e: Throwable =>
-        fail("Expected KafkaServer setup to fail with connection exception but caught a different exception.")
+        fail("Expected ZkException during Kafka server starting up but caught a different exception %s".format(e.toString))
+    }
+    finally {
+      if (server.brokerState.currentState != NotRunning.state)
         server.shutdown()
+      server.awaitShutdown()
     }
-    server.awaitShutdown()
     Utils.rm(server.config.logDirs)
     verifyNonDaemonThreadsStatus
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/39cd48de/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
index 8fe7cd4..764655a 100644
--- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
@@ -48,19 +48,4 @@ class ServerStartupTest extends JUnit3Suite with ZooKeeperTestHarness {
     val pathExists = ZkUtils.pathExists(zkClient, zookeeperChroot)
     assertTrue(pathExists)
   }
-
-  def testServerStartupConsecutively() {
-    server.shutdown()
-    try {
-      intercept[IllegalStateException]{
-        server.startup()
-        server.startup()
-      }
-    }
-    finally {
-      server.shutdown()
-    }
-
-  }
-
 }
\ No newline at end of file