You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/06/04 01:09:04 UTC
git commit: kafka-927;
Integrate controlled shutdown into kafka shutdown hook;
patched by Sriram Subramanian; reviewed by Neha Narkhede and Jun Rao
Updated Branches:
refs/heads/0.8 658427638 -> 4f387ae43
kafka-927; Integrate controlled shutdown into kafka shutdown hook; patched by Sriram Subramanian; reviewed by Neha Narkhede and Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4f387ae4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4f387ae4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4f387ae4
Branch: refs/heads/0.8
Commit: 4f387ae43544c422b1845b3da5ab09aee8e4acd0
Parents: 6584276
Author: Sriram Subramanian <sr...@gmail.com>
Authored: Mon Jun 3 16:08:37 2013 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Jun 3 16:08:37 2013 -0700
----------------------------------------------------------------------
.../kafka/api/ControlledShutdownRequest.scala | 73 ++++++++
.../kafka/api/ControlledShutdownResponse.scala | 70 +++++++
core/src/main/scala/kafka/api/RequestKeys.scala | 4 +-
core/src/main/scala/kafka/cluster/Partition.scala | 11 +-
.../scala/kafka/controller/KafkaController.scala | 94 ++++------
core/src/main/scala/kafka/server/KafkaApis.scala | 13 ++-
core/src/main/scala/kafka/server/KafkaConfig.scala | 14 ++-
core/src/main/scala/kafka/server/KafkaServer.scala | 112 +++++++++++-
.../main/scala/kafka/server/ReplicaManager.scala | 3 +-
.../test/scala/unit/kafka/admin/AdminTest.scala | 6 +-
.../unit/kafka/integration/RollingBounceTest.scala | 140 +++++++++++++++
.../server/HighwatermarkPersistenceTest.scala | 5 +-
.../unit/kafka/server/ISRExpirationTest.scala | 3 +-
.../scala/unit/kafka/server/SimpleFetchTest.scala | 8 +-
14 files changed, 472 insertions(+), 84 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/4f387ae4/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
new file mode 100644
index 0000000..ad6a20d
--- /dev/null
+++ b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+import kafka.api.ApiUtils._
+import collection.mutable.ListBuffer
+import kafka.network.{BoundedByteBufferSend, RequestChannel}
+import kafka.common.{TopicAndPartition, ErrorMapping}
+import kafka.network.RequestChannel.Response
+import kafka.utils.Logging
+
+object ControlledShutdownRequest extends Logging {
+ val CurrentVersion = 0.shortValue
+ val DefaultClientId = ""
+
+ def readFrom(buffer: ByteBuffer): ControlledShutdownRequest = {
+ val versionId = buffer.getShort
+ val correlationId = buffer.getInt
+ val brokerId = buffer.getInt
+ new ControlledShutdownRequest(versionId, correlationId, brokerId)
+ }
+}
+
+case class ControlledShutdownRequest(val versionId: Short,
+ override val correlationId: Int,
+ val brokerId: Int)
+ extends RequestOrResponse(Some(RequestKeys.ControlledShutdownKey), correlationId){
+
+ def this(correlationId: Int, brokerId: Int) =
+ this(ControlledShutdownRequest.CurrentVersion, correlationId, brokerId)
+
+ def writeTo(buffer: ByteBuffer) {
+ buffer.putShort(versionId)
+ buffer.putInt(correlationId)
+ buffer.putInt(brokerId)
+ }
+
+ def sizeInBytes(): Int = {
+ 2 + /* version id */
+ 4 + /* correlation id */
+ 4 /* broker id */
+ }
+
+ override def toString(): String = {
+ val controlledShutdownRequest = new StringBuilder
+ controlledShutdownRequest.append("Name: " + this.getClass.getSimpleName)
+ controlledShutdownRequest.append("; Version: " + versionId)
+ controlledShutdownRequest.append("; CorrelationId: " + correlationId)
+ controlledShutdownRequest.append("; BrokerId: " + brokerId)
+ controlledShutdownRequest.toString()
+ }
+
+ override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
+ val errorResponse = ControlledShutdownResponse(correlationId, ErrorMapping.codeFor(e.getCause.asInstanceOf[Class[Throwable]]), Set.empty[TopicAndPartition])
+ requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4f387ae4/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala b/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
new file mode 100644
index 0000000..b7c8448
--- /dev/null
+++ b/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+import collection.mutable.HashMap
+import collection.immutable.Map
+import kafka.common.{TopicAndPartition, ErrorMapping}
+import kafka.api.ApiUtils._
+
+
+object ControlledShutdownResponse {
+ def readFrom(buffer: ByteBuffer): ControlledShutdownResponse = {
+ val correlationId = buffer.getInt
+ val errorCode = buffer.getShort
+ val numEntries = buffer.getInt
+
+ var partitionsRemaining = Set[TopicAndPartition]()
+ for (i<- 0 until numEntries){
+ val topic = readShortString(buffer)
+ val partition = buffer.getInt
+ partitionsRemaining += new TopicAndPartition(topic, partition)
+ }
+ new ControlledShutdownResponse(correlationId, errorCode, partitionsRemaining)
+ }
+}
+
+
+case class ControlledShutdownResponse(override val correlationId: Int,
+ val errorCode: Short = ErrorMapping.NoError,
+ val partitionsRemaining: Set[TopicAndPartition])
+ extends RequestOrResponse(correlationId = correlationId) {
+ def sizeInBytes(): Int ={
+ var size =
+ 4 /* correlation id */ +
+ 2 /* error code */ +
+ 4 /* number of responses */
+ for (topicAndPartition <- partitionsRemaining) {
+ size +=
+ 2 + topicAndPartition.topic.length /* topic */ +
+ 4 /* partition */
+ }
+ size
+ }
+
+ def writeTo(buffer: ByteBuffer) {
+ buffer.putInt(correlationId)
+ buffer.putShort(errorCode)
+ buffer.putInt(partitionsRemaining.size)
+ for (topicAndPartition:TopicAndPartition <- partitionsRemaining){
+ writeShortString(buffer, topicAndPartition.topic)
+ buffer.putInt(topicAndPartition.partition)
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/4f387ae4/core/src/main/scala/kafka/api/RequestKeys.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/RequestKeys.scala b/core/src/main/scala/kafka/api/RequestKeys.scala
index 541cf84..e2ce9bd 100644
--- a/core/src/main/scala/kafka/api/RequestKeys.scala
+++ b/core/src/main/scala/kafka/api/RequestKeys.scala
@@ -28,6 +28,7 @@ object RequestKeys {
val LeaderAndIsrKey: Short = 4
val StopReplicaKey: Short = 5
val UpdateMetadataKey: Short = 6
+ val ControlledShutdownKey: Short = 7
val keyToNameAndDeserializerMap: Map[Short, (String, (ByteBuffer) => RequestOrResponse)]=
Map(ProduceKey -> ("Produce", ProducerRequest.readFrom),
@@ -36,7 +37,8 @@ object RequestKeys {
MetadataKey -> ("Metadata", TopicMetadataRequest.readFrom),
LeaderAndIsrKey -> ("LeaderAndIsr", LeaderAndIsrRequest.readFrom),
StopReplicaKey -> ("StopReplica", StopReplicaRequest.readFrom),
- UpdateMetadataKey -> ("UpdateMetadata", UpdateMetadataRequest.readFrom))
+ UpdateMetadataKey -> ("UpdateMetadata", UpdateMetadataRequest.readFrom),
+ ControlledShutdownKey -> ("ControlledShutdown", ControlledShutdownRequest.readFrom))
def nameForKey(key: Short): String = {
keyToNameAndDeserializerMap.get(key) match {
http://git-wip-us.apache.org/repos/asf/kafka/blob/4f387ae4/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 02d2c44..88fc8dd 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -195,8 +195,15 @@ class Partition(val topic: String,
leaderEpoch = leaderAndIsr.leaderEpoch
zkVersion = leaderAndIsr.zkVersion
leaderReplicaIdOpt = Some(newLeaderBrokerId)
- // start fetcher thread to current leader
- replicaFetcherManager.addFetcher(topic, partitionId, localReplica.logEndOffset, leaderBroker)
+ if (!replicaManager.isShuttingDown.get()) {
+ // start fetcher thread to current leader if we are not shutting down
+ replicaFetcherManager.addFetcher(topic, partitionId, localReplica.logEndOffset, leaderBroker)
+ }
+ else {
+ stateChangeLogger.trace("Broker %d ignored the become-follower state change with correlation id %d from " +
+ " controller %d epoch %d since it is shutting down"
+ .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch))
+ }
case None => // leader went down
stateChangeLogger.trace("Broker %d aborted the become-follower state change with correlation id %d from " +
" controller %d epoch %d since leader %d for partition [%s,%d] is unavailable during the state change operation"
http://git-wip-us.apache.org/repos/asf/kafka/blob/4f387ae4/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index a4e96cc..5ac38fd 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -70,7 +70,7 @@ class ControllerContext(val zkClient: ZkClient,
}
trait KafkaControllerMBean {
- def shutdownBroker(id: Int): Int
+ def shutdownBroker(id: Int): Set[TopicAndPartition]
}
object KafkaController {
@@ -118,17 +118,18 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
def clientId = "id_%d-host_%s-port_%d".format(config.brokerId, config.hostName, config.port)
/**
- * JMX operation to initiate clean shutdown of a broker. On clean shutdown,
- * the controller first determines the partitions that the shutting down
- * broker leads, and moves leadership of those partitions to another broker
- * that is in that partition's ISR. When all partitions have been moved, the
- * broker process can be stopped normally (i.e., by sending it a SIGTERM or
- * SIGINT) and no data loss should be observed.
+ * On clean shutdown, the controller first determines the partitions that the
+ * shutting down broker leads, and moves leadership of those partitions to another broker
+ * that is in that partition's ISR.
*
* @param id Id of the broker to shutdown.
* @return The number of partitions that the broker still leads.
*/
- def shutdownBroker(id: Int) = {
+ def shutdownBroker(id: Int) : Set[TopicAndPartition] = {
+
+ if (!isActive()) {
+ throw new ControllerMovedException("Controller moved to another broker. Aborting controlled shutdown")
+ }
controllerContext.brokerShutdownLock synchronized {
info("Shutting down broker " + id)
@@ -151,68 +152,40 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
}
}
- def replicatedPartitionsBrokerLeads() = controllerContext.controllerLock.synchronized {
- trace("All leaders = " + controllerContext.partitionLeadershipInfo.mkString(","))
- controllerContext.partitionLeadershipInfo.filter {
- case (topicAndPartition, leaderIsrAndControllerEpoch) =>
- leaderIsrAndControllerEpoch.leaderAndIsr.leader == id && controllerContext.partitionReplicaAssignment(topicAndPartition).size > 1
- }.map(_._1)
- }
-
- val partitionsToMove = replicatedPartitionsBrokerLeads().toSet
-
- debug("Partitions to move leadership from broker %d: %s".format(id, partitionsToMove.mkString(",")))
-
- partitionsToMove.foreach { topicAndPartition =>
- val (topic, partition) = topicAndPartition.asTuple
- // move leadership serially to relinquish lock.
+ allPartitionsAndReplicationFactorOnBroker.foreach {
+ case(topicAndPartition, replicationFactor) =>
+ // Move leadership serially to relinquish lock.
controllerContext.controllerLock synchronized {
controllerContext.partitionLeadershipInfo.get(topicAndPartition).foreach { currLeaderIsrAndControllerEpoch =>
if (currLeaderIsrAndControllerEpoch.leaderAndIsr.leader == id) {
+ // If the broker leads the topic partition, transition the leader and update isr. Updates zk and
+ // notifies all affected brokers
partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition,
controlledShutdownPartitionLeaderSelector)
- val newLeaderIsrAndControllerEpoch = controllerContext.partitionLeadershipInfo(topicAndPartition)
-
- // mark replica offline only if leadership was moved successfully
- if (newLeaderIsrAndControllerEpoch.leaderAndIsr.leader != currLeaderIsrAndControllerEpoch.leaderAndIsr.leader)
- replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topic, partition, id)), OfflineReplica)
- } else
- debug("Partition %s moved from leader %d to new leader %d during shutdown."
- .format(topicAndPartition, id, currLeaderIsrAndControllerEpoch.leaderAndIsr.leader))
+ }
+ else {
+ // Stop the replica first. The state change below initiates ZK changes which should take some time
+ // before which the stop replica request should be completed (in most cases)
+ brokerRequestBatch.newBatch()
+ brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topicAndPartition.topic, topicAndPartition.partition, deletePartition = false)
+ brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement, controllerContext.liveBrokers)
+
+ // If the broker is a follower, updates the isr in ZK and notifies the current leader
+ replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic,
+ topicAndPartition.partition, id)), OfflineReplica)
+ }
}
}
}
- val partitionsRemaining = replicatedPartitionsBrokerLeads().toSet
-
- /*
- * Force the shutting down broker out of the ISR of partitions that it
- * follows, and shutdown the corresponding replica fetcher threads.
- * This is really an optimization, so no need to register any callback
- * to wait until completion.
- */
- if (partitionsRemaining.size == 0) {
- brokerRequestBatch.newBatch()
- allPartitionsAndReplicationFactorOnBroker foreach {
- case(topicAndPartition, replicationFactor) =>
- val (topic, partition) = topicAndPartition.asTuple
- if (controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader != id) {
- brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topic, partition, deletePartition = false)
- removeReplicaFromIsr(topic, partition, id) match {
- case Some(updatedLeaderIsrAndControllerEpoch) =>
- brokerRequestBatch.addLeaderAndIsrRequestForBrokers(
- Seq(updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader), topic, partition,
- updatedLeaderIsrAndControllerEpoch, controllerContext.partitionReplicaAssignment(topicAndPartition))
- case None =>
- // ignore
- }
- }
- }
- brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement, controllerContext.liveBrokers)
+ def replicatedPartitionsBrokerLeads() = controllerContext.controllerLock.synchronized {
+ trace("All leaders = " + controllerContext.partitionLeadershipInfo.mkString(","))
+ controllerContext.partitionLeadershipInfo.filter {
+ case (topicAndPartition, leaderIsrAndControllerEpoch) =>
+ leaderIsrAndControllerEpoch.leaderAndIsr.leader == id && controllerContext.partitionReplicaAssignment(topicAndPartition).size > 1
+ }.map(_._1)
}
-
- debug("Remaining partitions to move from broker %d: %s".format(id, partitionsRemaining.mkString(",")))
- partitionsRemaining.size
+ replicatedPartitionsBrokerLeads().toSet
}
}
@@ -487,6 +460,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
controllerContext.allTopics = ZkUtils.getAllTopics(zkClient).toSet
controllerContext.partitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, controllerContext.allTopics.toSeq)
controllerContext.partitionLeadershipInfo = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch]
+ controllerContext.shuttingDownBrokerIds = mutable.Set.empty[Int]
// update the leader and isr cache for all existing partitions from Zookeeper
updateLeaderAndIsrCache()
// start the channel manager
http://git-wip-us.apache.org/repos/asf/kafka/blob/4f387ae4/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index dd88ccd..208e3ef 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -30,6 +30,7 @@ import kafka.common._
import kafka.utils.{ZkUtils, Pool, SystemTime, Logging}
import kafka.network.RequestChannel.Response
import kafka.cluster.Broker
+import kafka.controller.KafkaController
/**
@@ -38,7 +39,8 @@ import kafka.cluster.Broker
class KafkaApis(val requestChannel: RequestChannel,
val replicaManager: ReplicaManager,
val zkClient: ZkClient,
- brokerId: Int) extends Logging {
+ brokerId: Int,
+ val controller: KafkaController) extends Logging {
private val producerRequestPurgatory =
new ProducerRequestPurgatory(replicaManager.config.producerPurgatoryPurgeIntervalRequests)
@@ -68,6 +70,7 @@ class KafkaApis(val requestChannel: RequestChannel,
case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request)
case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request)
+ case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request)
case requestId => throw new KafkaException("No mapping found for handler id " + requestId)
}
} catch {
@@ -126,6 +129,14 @@ class KafkaApis(val requestChannel: RequestChannel,
requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(updateMetadataResponse)))
}
+ def handleControlledShutdownRequest(request: RequestChannel.Request) {
+ val controlledShutdownRequest = request.requestObj.asInstanceOf[ControlledShutdownRequest]
+ val partitionsRemaining = controller.shutdownBroker(controlledShutdownRequest.brokerId)
+ val controlledShutdownResponse = new ControlledShutdownResponse(controlledShutdownRequest.correlationId,
+ ErrorMapping.NoError, partitionsRemaining)
+ requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(controlledShutdownResponse)))
+ }
+
/**
* Check if a partitionData from a produce request can unblock any
* DelayedFetch requests.
http://git-wip-us.apache.org/repos/asf/kafka/blob/4f387ae4/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 549b4b0..b774431 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -169,4 +169,16 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
/* the purge interval (in number of requests) of the producer request purgatory */
val producerPurgatoryPurgeIntervalRequests = props.getInt("producer.purgatory.purge.interval.requests", 10000)
- }
+ /*********** Controlled shutdown configuration ***********/
+
+ /** Controlled shutdown can fail for multiple reasons. This determines the number of retries when such failure happens */
+ val controlledShutdownMaxRetries = props.getInt("controlled.shutdown.max.retries", 3)
+
+ /** Before each retry, the system needs time to recover from the state that caused the previous failure (Controller
+ * fail over, replica lag etc). This config determines the amount of time to wait before retrying. */
+ val controlledShutdownRetryBackoffMs = props.getInt("controlled.shutdown.retry.backoff.ms", 5000)
+
+ /* enable controlled shutdown of the server */
+ val controlledShutdownEnable = props.getBoolean("controlled.shutdown.enable", false)
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4f387ae4/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index b0348bb..a26de88 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -17,13 +17,15 @@
package kafka.server
-import kafka.network.SocketServer
+import kafka.network.{Receive, BlockingChannel, SocketServer}
import kafka.log.LogManager
import kafka.utils._
import java.util.concurrent._
-import atomic.AtomicBoolean
-import org.I0Itec.zkclient.ZkClient
+import atomic.{AtomicInteger, AtomicBoolean}
import kafka.controller.{ControllerStats, KafkaController}
+import kafka.cluster.Broker
+import kafka.api.{ControlledShutdownResponse, ControlledShutdownRequest}
+import kafka.common.ErrorMapping
/**
* Represents the lifecycle of a single Kafka broker. Handles all functionality required
@@ -33,6 +35,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
this.logIdent = "[Kafka Server " + config.brokerId + "], "
private var isShuttingDown = new AtomicBoolean(false)
private var shutdownLatch = new CountDownLatch(1)
+ private var startupComplete = new AtomicBoolean(false);
+ val correlationId: AtomicInteger = new AtomicInteger(0)
var socketServer: SocketServer = null
var requestHandlerPool: KafkaRequestHandlerPool = null
var logManager: LogManager = null
@@ -41,14 +45,14 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
var apis: KafkaApis = null
var kafkaController: KafkaController = null
val kafkaScheduler = new KafkaScheduler(4)
- var zkClient: ZkClient = null
+
/**
* Start up API for bringing up a single instance of the Kafka server.
* Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers
*/
def startup() {
- info("starting")
+ info("Starting")
isShuttingDown = new AtomicBoolean(false)
shutdownLatch = new CountDownLatch(1)
@@ -79,10 +83,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
info("Connecting to ZK: " + config.zkConnect)
- replicaManager = new ReplicaManager(config, time, kafkaZookeeper.getZookeeperClient, kafkaScheduler, logManager)
+ replicaManager = new ReplicaManager(config, time, kafkaZookeeper.getZookeeperClient, kafkaScheduler, logManager, isShuttingDown)
kafkaController = new KafkaController(config, kafkaZookeeper.getZookeeperClient)
- apis = new KafkaApis(socketServer.requestChannel, replicaManager, kafkaZookeeper.getZookeeperClient, config.brokerId)
+ apis = new KafkaApis(socketServer.requestChannel, replicaManager, kafkaZookeeper.getZookeeperClient, config.brokerId, kafkaController)
requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
Mx4jLoader.maybeLoad
@@ -92,7 +96,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
kafkaController.startup()
// register metrics beans
registerStats()
- info("started")
+ startupComplete.set(true);
+ info("Started")
}
/**
@@ -105,13 +110,99 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
}
/**
+ * Performs controlled shutdown
+ */
+ private def controlledShutdown() {
+ if (startupComplete.get() && config.controlledShutdownEnable) {
+ // We request the controller to do a controlled shutdown. On failure, we backoff for a configured period
+ // of time and try again for a configured number of retries. If all the attempt fails, we simply force
+ // the shutdown.
+ var remainingRetries = config.controlledShutdownMaxRetries
+ info("Starting controlled shutdown")
+ var channel : BlockingChannel = null;
+ var prevController : Broker = null
+ var shutdownSuceeded : Boolean =false
+ try {
+ while (!shutdownSuceeded && remainingRetries > 0) {
+ remainingRetries = remainingRetries - 1
+
+ // 1. Find the controller and establish a connection to it.
+
+ // Get the current controller info. This is to ensure we use the most recent info to issue the
+ // controlled shutdown request
+ val controllerId = ZkUtils.getController(kafkaZookeeper.getZookeeperClient)
+ ZkUtils.getBrokerInfo(kafkaZookeeper.getZookeeperClient, controllerId) match {
+ case Some(broker) =>
+ if (channel == null || prevController == null || !prevController.equals(broker)) {
+ // if this is the first attempt or if the controller has changed, create a channel to the most recent
+ // controller
+ if (channel != null) {
+ channel.disconnect()
+ }
+ channel = new BlockingChannel(broker.host, broker.port,
+ BlockingChannel.UseDefaultBufferSize,
+ BlockingChannel.UseDefaultBufferSize,
+ config.controllerSocketTimeoutMs)
+ channel.connect()
+ prevController = broker
+ }
+ case None=>
+ //ignore and try again
+ }
+
+ // 2. issue a controlled shutdown to the controller
+ if (channel != null) {
+ var response: Receive = null
+ try {
+ // send the controlled shutdown request
+ val request = new ControlledShutdownRequest(correlationId.getAndIncrement, config.brokerId)
+ channel.send(request)
+ response = channel.receive()
+ val shutdownResponse = ControlledShutdownResponse.readFrom(response.buffer)
+ if (shutdownResponse.errorCode == ErrorMapping.NoError && shutdownResponse.partitionsRemaining != null &&
+ shutdownResponse.partitionsRemaining.size == 0) {
+ shutdownSuceeded = true
+ info ("Controlled shutdown succeeded")
+ }
+ else {
+ info("Remaining partitions to move: %s".format(shutdownResponse.partitionsRemaining.mkString(",")))
+ info("Error code from controller: %d".format(shutdownResponse.errorCode))
+ }
+ }
+ catch {
+ case ioe: java.io.IOException =>
+ channel.disconnect()
+ channel = null
+ // ignore and try again
+ }
+ }
+ if (!shutdownSuceeded) {
+ Thread.sleep(config.controlledShutdownRetryBackoffMs)
+ warn("Retrying controlled shutdown after the previous attempt failed...")
+ }
+ }
+ }
+ finally {
+ if (channel != null) {
+ channel.disconnect()
+ channel = null
+ }
+ }
+ if (!shutdownSuceeded) {
+ warn("Proceeding to do an unclean shutdown as all the controlled shutdown attempts failed")
+ }
+ }
+ }
+
+ /**
* Shutdown API for shutting down a single instance of the Kafka server.
* Shuts down the LogManager, the SocketServer and the log cleaner scheduler thread
*/
def shutdown() {
- info("shutting down")
+ info("Shutting down")
val canShutdown = isShuttingDown.compareAndSet(false, true);
if (canShutdown) {
+ Utils.swallow(controlledShutdown())
if(kafkaZookeeper != null)
Utils.swallow(kafkaZookeeper.shutdown())
if(socketServer != null)
@@ -130,7 +221,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
Utils.swallow(kafkaController.shutdown())
shutdownLatch.countDown()
- info("shut down completed")
+ startupComplete.set(false);
+ info("Shut down completed")
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4f387ae4/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 8e49b83..9d41e82 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -40,7 +40,8 @@ class ReplicaManager(val config: KafkaConfig,
time: Time,
val zkClient: ZkClient,
kafkaScheduler: KafkaScheduler,
- val logManager: LogManager) extends Logging with KafkaMetricsGroup {
+ val logManager: LogManager,
+ val isShuttingDown: AtomicBoolean ) extends Logging with KafkaMetricsGroup {
/* epoch of the controller that last changed the leader */
@volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
private val localBrokerId = config.brokerId
http://git-wip-us.apache.org/repos/asf/kafka/blob/4f387ae4/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 95e7218..0d8b70f 100644
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -345,7 +345,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
// wait for the update metadata request to trickle to the brokers
assertTrue("Topic test not created after timeout", TestUtils.waitUntilTrue(() =>
activeServers.foldLeft(true)(_ && _.apis.leaderCache(TopicAndPartition(topic, partition)).leaderIsrAndControllerEpoch.leaderAndIsr.isr.size != 3), 1000))
- assertEquals(0, partitionsRemaining)
+ assertEquals(0, partitionsRemaining.size)
var partitionStateInfo = activeServers.head.apis.leaderCache(TopicAndPartition(topic, partition))
var leaderAfterShutdown = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
assertEquals(0, leaderAfterShutdown)
@@ -353,7 +353,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
assertEquals(List(0,1), partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.isr)
partitionsRemaining = controller.shutdownBroker(1)
- assertEquals(0, partitionsRemaining)
+ assertEquals(0, partitionsRemaining.size)
activeServers = servers.filter(s => s.config.brokerId == 0)
partitionStateInfo = activeServers.head.apis.leaderCache(TopicAndPartition(topic, partition))
leaderAfterShutdown = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
@@ -361,7 +361,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
assertTrue(servers.foldLeft(true)(_ && _.apis.leaderCache(TopicAndPartition(topic, partition)).leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0))
partitionsRemaining = controller.shutdownBroker(0)
- assertEquals(1, partitionsRemaining)
+ assertEquals(1, partitionsRemaining.size)
// leader doesn't change since all the replicas are shut down
assertTrue(servers.foldLeft(true)(_ && _.apis.leaderCache(TopicAndPartition(topic, partition)).leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0))
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4f387ae4/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
new file mode 100644
index 0000000..26e9bd6
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import org.scalatest.junit.JUnit3Suite
+import kafka.zk.ZooKeeperTestHarness
+import kafka.admin.CreateTopicCommand
+import kafka.utils.TestUtils._
+import junit.framework.Assert._
+import kafka.utils.{ZkUtils, Utils, TestUtils}
+import kafka.controller.{ControllerContext, LeaderIsrAndControllerEpoch, ControllerChannelManager}
+import kafka.cluster.Broker
+import kafka.common.ErrorMapping
+import kafka.api._
+
+class RollingBounceTest extends JUnit3Suite with ZooKeeperTestHarness {
+ val brokerId1 = 0
+ val brokerId2 = 1
+ val brokerId3 = 2
+ val brokerId4 = 3
+
+ val port1 = TestUtils.choosePort()
+ val port2 = TestUtils.choosePort()
+ val port3 = TestUtils.choosePort()
+ val port4 = TestUtils.choosePort()
+
+ val enableShutdown = true
+ val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1)
+ configProps1.put("controlled.shutdown.enable", "true")
+ val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2)
+ configProps2.put("controlled.shutdown.enable", "true")
+ val configProps3 = TestUtils.createBrokerConfig(brokerId3, port3)
+ configProps3.put("controlled.shutdown.enable", "true")
+ val configProps4 = TestUtils.createBrokerConfig(brokerId4, port4)
+ configProps4.put("controlled.shutdown.enable", "true")
+ configProps4.put("controlled.shutdown.retry.backoff.ms", "100")
+
+ var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
+
+ val partitionId = 0
+
+ override def setUp() {
+ super.setUp()
+ // start all the servers
+ val server1 = TestUtils.createServer(new KafkaConfig(configProps1))
+ val server2 = TestUtils.createServer(new KafkaConfig(configProps2))
+ val server3 = TestUtils.createServer(new KafkaConfig(configProps3))
+ val server4 = TestUtils.createServer(new KafkaConfig(configProps4))
+
+ servers ++= List(server1, server2, server3, server4)
+ }
+
+ override def tearDown() {
+ servers.map(server => server.shutdown())
+ servers.map(server => Utils.rm(server.config.logDirs))
+ super.tearDown()
+ }
+
+ def testRollingBounce {
+ // start all the brokers
+ val topic1 = "new-topic1"
+ val topic2 = "new-topic2"
+ val topic3 = "new-topic3"
+ val topic4 = "new-topic4"
+
+ // create topics with 1 partition, 2 replicas, one on each broker
+ CreateTopicCommand.createTopic(zkClient, topic1, 1, 2, "0:1")
+ CreateTopicCommand.createTopic(zkClient, topic2, 1, 2, "1:2")
+ CreateTopicCommand.createTopic(zkClient, topic3, 1, 2, "2:3")
+ CreateTopicCommand.createTopic(zkClient, topic4, 1, 2, "0:3")
+
+
+ // wait until leader is elected
+ var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic1, partitionId, 500)
+ var leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic2, partitionId, 500)
+ var leader3 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, partitionId, 500)
+ var leader4 = waitUntilLeaderIsElectedOrChanged(zkClient, topic4, partitionId, 500)
+
+ debug("Leader for " + topic1 + " is elected to be: %s".format(leader1.getOrElse(-1)))
+ debug("Leader for " + topic2 + " is elected to be: %s".format(leader1.getOrElse(-1)))
+ debug("Leader for " + topic3 + "is elected to be: %s".format(leader1.getOrElse(-1)))
+ debug("Leader for " + topic4 + "is elected to be: %s".format(leader1.getOrElse(-1)))
+
+ assertTrue("Leader should get elected", leader1.isDefined)
+ assertTrue("Leader should get elected", leader2.isDefined)
+ assertTrue("Leader should get elected", leader3.isDefined)
+ assertTrue("Leader should get elected", leader4.isDefined)
+
+ assertTrue("Leader could be broker 0 or broker 1 for " + topic1, (leader1.getOrElse(-1) == 0) || (leader1.getOrElse(-1) == 1))
+ assertTrue("Leader could be broker 1 or broker 2 for " + topic2, (leader2.getOrElse(-1) == 1) || (leader1.getOrElse(-1) == 2))
+ assertTrue("Leader could be broker 2 or broker 3 for " + topic3, (leader3.getOrElse(-1) == 2) || (leader1.getOrElse(-1) == 3))
+ assertTrue("Leader could be broker 3 or broker 4 for " + topic4, (leader4.getOrElse(-1) == 0) || (leader1.getOrElse(-1) == 3))
+
+ // Do a rolling bounce and check if leader transitions happen correctly
+
+ // Bring down the leader for the first topic
+ bounceServer(topic1, 0)
+
+ // Bring down the leader for the second topic
+ bounceServer(topic2, 1)
+
+ // Bring down the leader for the third topic
+ bounceServer(topic3, 2)
+
+ // Bring down the leader for the fourth topic
+ bounceServer(topic4, 3)
+ }
+
+ private def bounceServer(topic: String, startIndex: Int) {
+ var prevLeader = 0
+ if (isLeaderLocalOnBroker(topic, partitionId, servers(startIndex))) {
+ servers(startIndex).shutdown()
+ prevLeader = startIndex
+ }
+ else {
+ servers((startIndex + 1) % 4).shutdown()
+ prevLeader = (startIndex + 1) % 4
+ }
+ var newleader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 1500)
+ // Ensure the new leader is different from the old
+ assertTrue("Leader transition did not happen for " + topic, newleader.getOrElse(-1) != -1 && (newleader.getOrElse(-1) != prevLeader))
+ // Start the server back up again
+ servers(prevLeader).startup()
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/4f387ae4/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index f30b097..2719055 100644
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -25,6 +25,7 @@ import org.junit.Assert._
import kafka.common.KafkaException
import kafka.cluster.Replica
import kafka.utils.{SystemTime, KafkaScheduler, TestUtils, MockTime, Utils}
+import java.util.concurrent.atomic.AtomicBoolean
class HighwatermarkPersistenceTest extends JUnit3Suite {
@@ -47,7 +48,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite {
val scheduler = new KafkaScheduler(2)
scheduler.startup
// create replica manager
- val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler, logManagers(0))
+ val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler, logManagers(0), new AtomicBoolean(false))
replicaManager.startup()
replicaManager.checkpointHighWatermarks()
var fooPartition0Hw = hwmFor(replicaManager, topic, 0)
@@ -86,7 +87,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite {
val scheduler = new KafkaScheduler(2)
scheduler.startup
// create replica manager
- val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler, logManagers(0))
+ val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler, logManagers(0), new AtomicBoolean(false))
replicaManager.startup()
replicaManager.checkpointHighWatermarks()
var topic1Partition0Hw = hwmFor(replicaManager, topic1, 0)
http://git-wip-us.apache.org/repos/asf/kafka/blob/4f387ae4/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
index 6184f42..7026432 100644
--- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
@@ -24,6 +24,7 @@ import org.easymock.EasyMock
import kafka.log.Log
import org.junit.Assert._
import kafka.utils._
+import java.util.concurrent.atomic.AtomicBoolean
class IsrExpirationTest extends JUnit3Suite {
@@ -80,7 +81,7 @@ class IsrExpirationTest extends JUnit3Suite {
private def getPartitionWithAllReplicasInIsr(topic: String, partitionId: Int, time: Time, config: KafkaConfig,
localLog: Log): Partition = {
val leaderId=config.brokerId
- val replicaManager = new ReplicaManager(config, time, null, null, null)
+ val replicaManager = new ReplicaManager(config, time, null, null, null, new AtomicBoolean(false))
val partition = replicaManager.getOrCreatePartition(topic, partitionId, 1)
val leaderReplica = new Replica(leaderId, partition, time, 0, Some(localLog))
http://git-wip-us.apache.org/repos/asf/kafka/blob/4f387ae4/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index c7dd8a7..23a8cb5 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -87,10 +87,12 @@ class SimpleFetchTest extends JUnit3Suite {
EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get).anyTimes()
EasyMock.replay(replicaManager)
+ val controller = EasyMock.createMock(classOf[kafka.controller.KafkaController])
+
// start a request channel with 2 processors and a queue size of 5 (this is more or less arbitrary)
// don't provide replica or leader callbacks since they will not be tested here
val requestChannel = new RequestChannel(2, 5)
- val apis = new KafkaApis(requestChannel, replicaManager, zkClient, configs.head.brokerId)
+ val apis = new KafkaApis(requestChannel, replicaManager, zkClient, configs.head.brokerId, controller)
// This request (from a follower) wants to read up to 2*HW but should only get back up to HW bytes into the log
val goodFetch = new FetchRequestBuilder()
@@ -184,8 +186,10 @@ class SimpleFetchTest extends JUnit3Suite {
EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get).anyTimes()
EasyMock.replay(replicaManager)
+ val controller = EasyMock.createMock(classOf[kafka.controller.KafkaController])
+
val requestChannel = new RequestChannel(2, 5)
- val apis = new KafkaApis(requestChannel, replicaManager, zkClient, configs.head.brokerId)
+ val apis = new KafkaApis(requestChannel, replicaManager, zkClient, configs.head.brokerId, controller)
/**
* This fetch, coming from a replica, requests all data at offset "15". Because the request is coming