You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/02/10 00:14:20 UTC
kafka git commit: KAFKA-1333; Add the consumer coordinator to server;
reviewed by Onur Karaman and Jay Kreps
Repository: kafka
Updated Branches:
refs/heads/trunk 0839def4b -> 39cd48de3
KAFKA-1333; Add the consumer coordinator to server; reviewed by Onur Karaman and Jay Kreps
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/39cd48de
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/39cd48de
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/39cd48de
Branch: refs/heads/trunk
Commit: 39cd48de321585ac55c94ec407fede5858f38962
Parents: 0839def
Author: Guozhang Wang <wa...@gmail.com>
Authored: Mon Feb 9 15:13:10 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Feb 9 15:13:10 2015 -0800
----------------------------------------------------------------------
.../main/scala/kafka/network/SocketServer.scala | 2 +-
.../kafka/server/DelayedOperationKey.scala | 19 +++
.../src/main/scala/kafka/server/KafkaApis.scala | 70 +++++----
.../main/scala/kafka/server/KafkaServer.scala | 144 ++++++++++++-------
.../main/scala/kafka/server/MetadataCache.scala | 17 ++-
.../main/scala/kafka/server/OffsetManager.scala | 2 +
.../main/scala/kafka/tools/MirrorMaker.scala | 6 +-
.../kafka/api/ProducerFailureHandlingTest.scala | 2 +-
.../unit/kafka/server/ServerShutdownTest.scala | 13 +-
.../unit/kafka/server/ServerStartupTest.scala | 15 --
10 files changed, 179 insertions(+), 111 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/39cd48de/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 39b1651..76ce41a 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -107,7 +107,7 @@ class SocketServer(val brokerId: Int,
*/
private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQuotas) extends Runnable with Logging {
- protected val selector = Selector.open();
+ protected val selector = Selector.open()
private val startupLatch = new CountDownLatch(1)
private val shutdownLatch = new CountDownLatch(1)
private val alive = new AtomicBoolean(true)
http://git-wip-us.apache.org/repos/asf/kafka/blob/39cd48de/core/src/main/scala/kafka/server/DelayedOperationKey.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedOperationKey.scala b/core/src/main/scala/kafka/server/DelayedOperationKey.scala
index fb7e9ed..b673e43 100644
--- a/core/src/main/scala/kafka/server/DelayedOperationKey.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperationKey.scala
@@ -30,9 +30,28 @@ object DelayedOperationKey {
val globalLabel = "All"
}
+/* used by delayed-produce and delayed-fetch operations */
case class TopicPartitionOperationKey(topic: String, partition: Int) extends DelayedOperationKey {
def this(topicAndPartition: TopicAndPartition) = this(topicAndPartition.topic, topicAndPartition.partition)
override def keyLabel = "%s-%d".format(topic, partition)
}
+
+/* used by bucketized delayed-heartbeat operations */
+case class TTimeMsKey(time: Long) extends DelayedOperationKey {
+
+ override def keyLabel = "%d".format(time)
+}
+
+/* used by delayed-join-group operations */
+case class ConsumerKey(groupId: String, consumerId: String) extends DelayedOperationKey {
+
+ override def keyLabel = "%s-%s".format(groupId, consumerId)
+}
+
+/* used by delayed-rebalance operations */
+case class ConsumerGroupKey(groupId: String) extends DelayedOperationKey {
+
+ override def keyLabel = groupId
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/39cd48de/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index f2b027b..6ee7d88 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -17,24 +17,20 @@
package kafka.server
-import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.JoinGroupResponse
import org.apache.kafka.common.requests.HeartbeatResponse
-import org.apache.kafka.common.requests.ResponseHeader
-import org.apache.kafka.common.protocol.types.Struct
+import org.apache.kafka.common.TopicPartition
import kafka.api._
+import kafka.admin.AdminUtils
import kafka.common._
+import kafka.controller.KafkaController
+import kafka.coordinator.ConsumerCoordinator
import kafka.log._
import kafka.network._
-import kafka.admin.AdminUtils
import kafka.network.RequestChannel.Response
-import kafka.controller.KafkaController
import kafka.utils.{SystemTime, Logging}
-import java.nio.ByteBuffer
-import java.util.concurrent.TimeUnit
-import java.util.concurrent.atomic._
import scala.collection._
import org.I0Itec.zkclient.ZkClient
@@ -45,14 +41,14 @@ import org.I0Itec.zkclient.ZkClient
class KafkaApis(val requestChannel: RequestChannel,
val replicaManager: ReplicaManager,
val offsetManager: OffsetManager,
+ val coordinator: ConsumerCoordinator,
+ val controller: KafkaController,
val zkClient: ZkClient,
val brokerId: Int,
- val config: KafkaConfig,
- val controller: KafkaController) extends Logging {
+ val config: KafkaConfig) extends Logging {
this.logIdent = "[KafkaApi-%d] ".format(brokerId)
- val metadataCache = new MetadataCache
- private var consumerGroupGenerationId = 0
+ val metadataCache = new MetadataCache(brokerId)
/**
* Top-level method that handles all requests and multiplexes to the right api
@@ -137,7 +133,7 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleOffsetCommitRequest(request: RequestChannel.Request) {
val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest]
- // the callback for sending the response
+ // the callback for sending an offset commit response
def sendResponseCallback(commitStatus: immutable.Map[TopicAndPartition, Short]) {
commitStatus.foreach { case (topicAndPartition, errorCode) =>
// we only print warnings for known errors here; only replica manager could see an unknown
@@ -169,7 +165,7 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleProducerRequest(request: RequestChannel.Request) {
val produceRequest = request.requestObj.asInstanceOf[ProducerRequest]
- // the callback for sending the response
+ // the callback for sending a produce response
def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) {
var errorInResponse = false
responseStatus.foreach { case (topicAndPartition, status) =>
@@ -224,7 +220,7 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleFetchRequest(request: RequestChannel.Request) {
val fetchRequest = request.requestObj.asInstanceOf[FetchRequest]
- // the callback for sending the response
+ // the callback for sending a fetch response
def sendResponseCallback(responsePartitionData: Map[TopicAndPartition, FetchResponsePartitionData]) {
responsePartitionData.foreach { case (topicAndPartition, data) =>
// we only print warnings for known errors here; if it is unknown, it will cause
@@ -456,20 +452,42 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleJoinGroupRequest(request: RequestChannel.Request) {
import JavaConversions._
- val joinGroupReq = request.requestObj.asInstanceOf[JoinGroupRequestAndHeader]
- val topics = joinGroupReq.body.topics().toSet
- val partitions = this.replicaManager.logManager.allLogs.filter(log => topics.contains(log.topicAndPartition.topic))
- val partitionList = partitions.map(_.topicAndPartition).map(tp => new org.apache.kafka.common.TopicPartition(tp.topic, tp.partition)).toBuffer
- this.consumerGroupGenerationId += 1
- val response = new JoinGroupResponse(ErrorMapping.NoError, this.consumerGroupGenerationId, joinGroupReq.body.consumerId, partitionList)
- val send = new BoundedByteBufferSend(new JoinGroupResponseAndHeader(joinGroupReq.correlationId, response))
- requestChannel.sendResponse(new RequestChannel.Response(request, send))
+
+ val joinGroupRequest = request.requestObj.asInstanceOf[JoinGroupRequestAndHeader]
+
+ // the callback for sending a join-group response
+ def sendResponseCallback(partitions: List[TopicAndPartition], generationId: Int, errorCode: Short) {
+ val partitionList = partitions.map(tp => new TopicPartition(tp.topic, tp.partition)).toBuffer
+ val responseBody = new JoinGroupResponse(errorCode, generationId, joinGroupRequest.body.consumerId, partitionList)
+ val response = new JoinGroupResponseAndHeader(joinGroupRequest.correlationId, responseBody)
+ requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
+ }
+
+ // let the coordinator to handle join-group
+ coordinator.consumerJoinGroup(
+ joinGroupRequest.body.groupId(),
+ joinGroupRequest.body.consumerId(),
+ joinGroupRequest.body.topics().toList,
+ joinGroupRequest.body.sessionTimeout(),
+ joinGroupRequest.body.strategy(),
+ sendResponseCallback)
}
def handleHeartbeatRequest(request: RequestChannel.Request) {
- val hbReq = request.requestObj.asInstanceOf[HeartbeatRequestAndHeader]
- val send = new BoundedByteBufferSend(new HeartbeatResponseAndHeader(hbReq.correlationId, new HeartbeatResponse(Errors.NONE.code)))
- requestChannel.sendResponse(new RequestChannel.Response(request, send))
+ val heartbeatRequest = request.requestObj.asInstanceOf[HeartbeatRequestAndHeader]
+
+ // the callback for sending a heartbeat response
+ def sendResponseCallback(errorCode: Short) {
+ val response = new HeartbeatResponseAndHeader(heartbeatRequest.correlationId, new HeartbeatResponse(errorCode))
+ requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
+ }
+
+ // let the coordinator to handle heartbeat
+ coordinator.consumerHeartbeat(
+ heartbeatRequest.body.groupId(),
+ heartbeatRequest.body.consumerId(),
+ heartbeatRequest.body.groupGenerationId(),
+ sendResponseCallback)
}
def close() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/39cd48de/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 89200da..7e5ddcb 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -34,30 +34,43 @@ import kafka.common.{ErrorMapping, InconsistentBrokerIdException, GenerateBroker
import kafka.network.{Receive, BlockingChannel, SocketServer}
import kafka.metrics.KafkaMetricsGroup
import com.yammer.metrics.core.Gauge
+import kafka.coordinator.ConsumerCoordinator
/**
* Represents the lifecycle of a single Kafka broker. Handles all functionality required
* to start up and shutdown a single Kafka node.
*/
class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
- private var isShuttingDown = new AtomicBoolean(false)
+ private val startupComplete = new AtomicBoolean(false)
+ private val isShuttingDown = new AtomicBoolean(false)
+ private val isStartingUp = new AtomicBoolean(false)
+
private var shutdownLatch = new CountDownLatch(1)
- private var startupComplete = new AtomicBoolean(false)
- private var brokerId: Int = -1
val brokerState: BrokerState = new BrokerState
- val correlationId: AtomicInteger = new AtomicInteger(0)
+
+ var apis: KafkaApis = null
var socketServer: SocketServer = null
var requestHandlerPool: KafkaRequestHandlerPool = null
+
var logManager: LogManager = null
+
var offsetManager: OffsetManager = null
- var kafkaHealthcheck: KafkaHealthcheck = null
- var topicConfigManager: TopicConfigManager = null
+
var replicaManager: ReplicaManager = null
- var apis: KafkaApis = null
+
+ var topicConfigManager: TopicConfigManager = null
+
+ var consumerCoordinator: ConsumerCoordinator = null
+
var kafkaController: KafkaController = null
+
val kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
+
+ var kafkaHealthcheck: KafkaHealthcheck = null
+
var zkClient: ZkClient = null
+ val correlationId: AtomicInteger = new AtomicInteger(0)
val brokerMetaPropsFile = "meta.properties"
val brokerMetadataCheckpoints = config.logDirs.map(logDir => (logDir, new BrokerMetadataCheckpoint(new File(logDir + File.separator +brokerMetaPropsFile)))).toMap
@@ -75,69 +88,87 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
def startup() {
try {
info("starting")
- brokerState.newState(Starting)
- isShuttingDown = new AtomicBoolean(false)
- shutdownLatch = new CountDownLatch(1)
- /* start scheduler */
- kafkaScheduler.startup()
+ if(isShuttingDown.get)
+ throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")
+
+ if(startupComplete.get)
+ return
+
+ val canStartup = isStartingUp.compareAndSet(false, true)
+ if (canStartup) {
+ brokerState.newState(Starting)
- /* setup zookeeper */
- zkClient = initZk()
+ /* start scheduler */
+ kafkaScheduler.startup()
- /* start log manager */
- logManager = createLogManager(zkClient, brokerState)
- logManager.startup()
+ /* setup zookeeper */
+ zkClient = initZk()
- /* generate brokerId */
- config.brokerId = getBrokerId
- this.logIdent = "[Kafka Server " + config.brokerId + "], "
+ /* start log manager */
+ logManager = createLogManager(zkClient, brokerState)
+ logManager.startup()
- socketServer = new SocketServer(config.brokerId,
- config.hostName,
- config.port,
- config.numNetworkThreads,
- config.queuedMaxRequests,
- config.socketSendBufferBytes,
- config.socketReceiveBufferBytes,
- config.socketRequestMaxBytes,
- config.maxConnectionsPerIp,
- config.connectionsMaxIdleMs,
- config.maxConnectionsPerIpOverrides)
- socketServer.startup()
+ /* generate brokerId */
+ config.brokerId = getBrokerId
+ this.logIdent = "[Kafka Server " + config.brokerId + "], "
- replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown)
+ socketServer = new SocketServer(config.brokerId,
+ config.hostName,
+ config.port,
+ config.numNetworkThreads,
+ config.queuedMaxRequests,
+ config.socketSendBufferBytes,
+ config.socketReceiveBufferBytes,
+ config.socketRequestMaxBytes,
+ config.maxConnectionsPerIp,
+ config.connectionsMaxIdleMs,
+ config.maxConnectionsPerIpOverrides)
+ socketServer.startup()
- /* start offset manager */
- offsetManager = createOffsetManager()
+ /* start replica manager */
+ replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown)
+ replicaManager.startup()
- kafkaController = new KafkaController(config, zkClient, brokerState)
+ /* start offset manager */
+ offsetManager = createOffsetManager()
- /* start processing requests */
- apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, zkClient, config.brokerId, config, kafkaController)
- requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
- brokerState.newState(RunningAsBroker)
+ /* start kafka controller */
+ kafkaController = new KafkaController(config, zkClient, brokerState)
+ kafkaController.startup()
- Mx4jLoader.maybeLoad()
+ /* start kafka coordinator */
+ consumerCoordinator = new ConsumerCoordinator(config, zkClient)
+ consumerCoordinator.startup()
- replicaManager.startup()
+ /* start processing requests */
+ apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, consumerCoordinator, kafkaController, zkClient, config.brokerId, config)
+ requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
+ brokerState.newState(RunningAsBroker)
- kafkaController.startup()
+ Mx4jLoader.maybeLoad()
- topicConfigManager = new TopicConfigManager(zkClient, logManager)
- topicConfigManager.startup()
+ /* start topic config manager */
+ topicConfigManager = new TopicConfigManager(zkClient, logManager)
+ topicConfigManager.startup()
- /* tell everyone we are alive */
- kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient)
- kafkaHealthcheck.startup()
+ /* tell everyone we are alive */
+ kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient)
+ kafkaHealthcheck.startup()
- registerStats()
- startupComplete.set(true)
- info("started")
+ /* register broker metrics */
+ registerStats()
+
+ shutdownLatch = new CountDownLatch(1)
+ startupComplete.set(true)
+ isStartingUp.set(false)
+ info("started")
+ }
}
catch {
case e: Throwable =>
fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
+ isStartingUp.set(false)
shutdown()
throw e
}
@@ -271,6 +302,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
def shutdown() {
try {
info("shutting down")
+
+ if(isStartingUp.get)
+ throw new IllegalStateException("Kafka server is still starting up, cannot shut down!")
+
val canShutdown = isShuttingDown.compareAndSet(false, true)
if (canShutdown) {
Utils.swallow(controlledShutdown())
@@ -290,20 +325,25 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
Utils.swallow(replicaManager.shutdown())
if(logManager != null)
Utils.swallow(logManager.shutdown())
+ if(consumerCoordinator != null)
+ Utils.swallow(consumerCoordinator.shutdown())
if(kafkaController != null)
Utils.swallow(kafkaController.shutdown())
if(zkClient != null)
Utils.swallow(zkClient.close())
brokerState.newState(NotRunning)
- shutdownLatch.countDown()
+
startupComplete.set(false)
+ isShuttingDown.set(false)
+ shutdownLatch.countDown()
info("shut down completed")
}
}
catch {
case e: Throwable =>
fatal("Fatal error during KafkaServer shutdown.", e)
+ isShuttingDown.set(false)
throw e
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/39cd48de/core/src/main/scala/kafka/server/MetadataCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index bf81a1a..4c70aa7 100644
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -17,25 +17,28 @@
package kafka.server
-import scala.collection.{Seq, Set, mutable}
import kafka.api._
+import kafka.common._
import kafka.cluster.Broker
-import java.util.concurrent.locks.ReentrantReadWriteLock
-import kafka.utils.Utils._
-import kafka.common.{ErrorMapping, ReplicaNotAvailableException, LeaderNotAvailableException}
-import kafka.common.TopicAndPartition
import kafka.controller.KafkaController.StateChangeLogger
+import scala.collection.{Seq, Set, mutable}
+import kafka.utils.Logging
+import kafka.utils.Utils._
+
+import java.util.concurrent.locks.ReentrantReadWriteLock
/**
* A cache for the state (e.g., current leader) of each partition. This cache is updated through
* UpdateMetadataRequest from the controller. Every broker maintains the same cache, asynchronously.
*/
-private[server] class MetadataCache {
+private[server] class MetadataCache(brokerId: Int) extends Logging {
private val cache: mutable.Map[String, mutable.Map[Int, PartitionStateInfo]] =
new mutable.HashMap[String, mutable.Map[Int, PartitionStateInfo]]()
private var aliveBrokers: Map[Int, Broker] = Map()
private val partitionMetadataLock = new ReentrantReadWriteLock()
+ this.logIdent = "[Kafka Metadata Cache on broker %d] ".format(brokerId)
+
def getTopicMetadata(topics: Set[String]) = {
val isAllTopics = topics.isEmpty
val topicsRequested = if(isAllTopics) cache.keySet else topics
@@ -68,7 +71,7 @@ private[server] class MetadataCache {
new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError)
} catch {
case e: Throwable =>
- debug("Error while fetching metadata for %s. Possible cause: %s".format(topicPartition, e.getMessage))
+ debug("Error while fetching metadata for %s: %s".format(topicPartition, e.toString))
new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo,
ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/39cd48de/core/src/main/scala/kafka/server/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala
index 0bdd42f..83d5264 100644
--- a/core/src/main/scala/kafka/server/OffsetManager.scala
+++ b/core/src/main/scala/kafka/server/OffsetManager.scala
@@ -96,6 +96,8 @@ class OffsetManager(val config: OffsetManagerConfig,
private val shuttingDown = new AtomicBoolean(false)
+ this.logIdent = "[Offset Manager on Broker " + replicaManager.config.brokerId + "]: "
+
scheduler.schedule(name = "offsets-cache-compactor",
fun = compact,
period = config.offsetsRetentionCheckIntervalMs,
http://git-wip-us.apache.org/repos/asf/kafka/blob/39cd48de/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 81ae205..5374280 100644
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -442,7 +442,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
val producer: MirrorMakerBaseProducer,
val threadId: Int) extends Thread with Logging with KafkaMetricsGroup {
private val threadName = "mirrormaker-producer-" + threadId
- private val shutdownComplete: CountDownLatch = new CountDownLatch(1)
+ private val shutdownLatch: CountDownLatch = new CountDownLatch(1)
this.logIdent = "[%s] ".format(threadName)
setName(threadName)
@@ -466,7 +466,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
case t: Throwable =>
fatal("Producer thread failure due to ", t)
} finally {
- shutdownComplete.countDown()
+ shutdownLatch.countDown()
info("Producer thread stopped")
// if it exits accidentally, stop the entire mirror maker
if (!isShuttingdown.get()) {
@@ -490,7 +490,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
def awaitShutdown() {
try {
- shutdownComplete.await()
+ shutdownLatch.await()
producer.close()
info("Producer thread shutdown complete")
} catch {
http://git-wip-us.apache.org/repos/asf/kafka/blob/39cd48de/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index 90c0b7a..11d6a97 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -272,7 +272,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
for (server <- servers) {
server.shutdown()
server.awaitShutdown()
- server.startup
+ server.startup()
Thread.sleep(2000)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/39cd48de/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index ba1e48e..82fa4cf 100644
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -119,7 +119,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
val newProps = TestUtils.createBrokerConfig(0, port)
newProps.setProperty("zookeeper.connect", "fakehostthatwontresolve:65535")
val newConfig = new KafkaConfig(newProps)
- var server = new KafkaServer(newConfig)
+ val server = new KafkaServer(newConfig)
try {
server.startup()
fail("Expected KafkaServer setup to fail, throw exception")
@@ -129,14 +129,15 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
// identify the correct exception, making sure the server was shutdown, and cleaning up if anything
// goes wrong so that awaitShutdown doesn't hang
case e: org.I0Itec.zkclient.exception.ZkException =>
- assertEquals(server.brokerState.currentState, NotRunning.state)
- if (server.brokerState.currentState != NotRunning.state)
- server.shutdown()
+ assertEquals(NotRunning.state, server.brokerState.currentState)
case e: Throwable =>
- fail("Expected KafkaServer setup to fail with connection exception but caught a different exception.")
+ fail("Expected ZkException during Kafka server starting up but caught a different exception %s".format(e.toString))
+ }
+ finally {
+ if (server.brokerState.currentState != NotRunning.state)
server.shutdown()
+ server.awaitShutdown()
}
- server.awaitShutdown()
Utils.rm(server.config.logDirs)
verifyNonDaemonThreadsStatus
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/39cd48de/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
index 8fe7cd4..764655a 100644
--- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
@@ -48,19 +48,4 @@ class ServerStartupTest extends JUnit3Suite with ZooKeeperTestHarness {
val pathExists = ZkUtils.pathExists(zkClient, zookeeperChroot)
assertTrue(pathExists)
}
-
- def testServerStartupConsecutively() {
- server.shutdown()
- try {
- intercept[IllegalStateException]{
- server.startup()
- server.startup()
- }
- }
- finally {
- server.shutdown()
- }
-
- }
-
}
\ No newline at end of file