You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/03/08 03:20:47 UTC
git commit: Include controllerId in all requests sent by controller;
patched by Swapnil Ghike; reviewed by Jun Rao; kafka-793
Updated Branches:
refs/heads/0.8 771760ce2 -> 19ae1959b
Include controllerId in all requests sent by controller; patched by Swapnil Ghike; reviewed by Jun Rao; kafka-793
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/19ae1959
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/19ae1959
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/19ae1959
Branch: refs/heads/0.8
Commit: 19ae1959b091df44475243e3b199d6121ddedc72
Parents: 771760c
Author: Swapnil Ghike <sg...@linkedin.com>
Authored: Thu Mar 7 18:20:33 2013 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Mar 7 18:20:33 2013 -0800
----------------------------------------------------------------------
.../PreferredReplicaLeaderElectionCommand.scala | 8 +++---
.../main/scala/kafka/api/StopReplicaRequest.scala | 16 +++++++++----
core/src/main/scala/kafka/cluster/Partition.scala | 2 +-
.../controller/ControllerChannelManager.scala | 18 +++++---------
.../scala/kafka/controller/KafkaController.scala | 1 +
.../kafka/controller/PartitionStateMachine.scala | 2 +-
.../kafka/controller/ReplicaStateMachine.scala | 2 +-
.../main/scala/kafka/server/ReplicaManager.scala | 2 +-
.../api/RequestResponseSerializationTest.scala | 3 +-
9 files changed, 29 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/19ae1959/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
index e59d5af..7405c5a 100644
--- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
+++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
@@ -35,7 +35,6 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
.withRequiredArg
.describedAs("list of partitions for which preferred replica leader election needs to be triggered")
.ofType(classOf[String])
- .defaultsTo("")
val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the " +
"form host:port. Multiple URLS can be given to allow fail-over.")
.withRequiredArg
@@ -46,15 +45,16 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt)
- val jsonFile = options.valueOf(jsonFileOpt)
val zkConnect = options.valueOf(zkConnectOpt)
- val jsonString = if(jsonFile != "") Utils.readFileAsString(jsonFile) else ""
var zkClient: ZkClient = null
try {
zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
val partitionsForPreferredReplicaElection =
- if(jsonFile == "") ZkUtils.getAllPartitions(zkClient) else parsePreferredReplicaJsonData(jsonString)
+ if (!options.has(jsonFileOpt))
+ ZkUtils.getAllPartitions(zkClient)
+ else
+ parsePreferredReplicaJsonData(Utils.readFileAsString(options.valueOf(jsonFileOpt)))
val preferredReplicaElectionCommand = new PreferredReplicaLeaderElectionCommand(zkClient, partitionsForPreferredReplicaElection)
preferredReplicaElectionCommand.moveLeaderToPreferredReplica()
http://git-wip-us.apache.org/repos/asf/kafka/blob/19ae1959/core/src/main/scala/kafka/api/StopReplicaRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/StopReplicaRequest.scala b/core/src/main/scala/kafka/api/StopReplicaRequest.scala
index 5107488..cd55db4 100644
--- a/core/src/main/scala/kafka/api/StopReplicaRequest.scala
+++ b/core/src/main/scala/kafka/api/StopReplicaRequest.scala
@@ -36,6 +36,7 @@ object StopReplicaRequest extends Logging {
val correlationId = buffer.getInt
val clientId = readShortString(buffer)
val ackTimeoutMs = buffer.getInt
+ val controllerId = buffer.getInt
val controllerEpoch = buffer.getInt
val deletePartitions = buffer.get match {
case 1 => true
@@ -48,7 +49,8 @@ object StopReplicaRequest extends Logging {
(1 to topicPartitionPairCount) foreach { _ =>
topicPartitionPairSet.add(readShortString(buffer), buffer.getInt)
}
- StopReplicaRequest(versionId, correlationId, clientId, ackTimeoutMs, deletePartitions, topicPartitionPairSet.toSet, controllerEpoch)
+ StopReplicaRequest(versionId, correlationId, clientId, ackTimeoutMs, controllerId, controllerEpoch,
+ deletePartitions, topicPartitionPairSet.toSet)
}
}
@@ -56,14 +58,15 @@ case class StopReplicaRequest(versionId: Short,
override val correlationId: Int,
clientId: String,
ackTimeoutMs: Int,
+ controllerId: Int,
+ controllerEpoch: Int,
deletePartitions: Boolean,
- partitions: Set[(String, Int)],
- controllerEpoch: Int)
+ partitions: Set[(String, Int)])
extends RequestOrResponse(Some(RequestKeys.StopReplicaKey), correlationId) {
- def this(deletePartitions: Boolean, partitions: Set[(String, Int)], controllerEpoch: Int, correlationId: Int) = {
+ def this(deletePartitions: Boolean, partitions: Set[(String, Int)], controllerId: Int, controllerEpoch: Int, correlationId: Int) = {
this(StopReplicaRequest.CurrentVersion, correlationId, StopReplicaRequest.DefaultClientId, StopReplicaRequest.DefaultAckTimeout,
- deletePartitions, partitions, controllerEpoch)
+ controllerId, controllerEpoch, deletePartitions, partitions)
}
def writeTo(buffer: ByteBuffer) {
@@ -71,6 +74,7 @@ case class StopReplicaRequest(versionId: Short,
buffer.putInt(correlationId)
writeShortString(buffer, clientId)
buffer.putInt(ackTimeoutMs)
+ buffer.putInt(controllerId)
buffer.putInt(controllerEpoch)
buffer.put(if (deletePartitions) 1.toByte else 0.toByte)
buffer.putInt(partitions.size)
@@ -86,6 +90,7 @@ case class StopReplicaRequest(versionId: Short,
4 + /* correlation id */
ApiUtils.shortStringLength(clientId) +
4 + /* ackTimeoutMs */
+ 4 + /* controller id*/
4 + /* controller epoch */
1 + /* deletePartitions */
4 /* partition count */
@@ -104,6 +109,7 @@ case class StopReplicaRequest(versionId: Short,
stopReplicaRequest.append("; ClientId: " + clientId)
stopReplicaRequest.append("; AckTimeoutMs: " + ackTimeoutMs + " ms")
stopReplicaRequest.append("; DeletePartitions: " + deletePartitions)
+ stopReplicaRequest.append("; ControllerId: " + controllerId)
stopReplicaRequest.append("; ControllerEpoch: " + controllerEpoch)
stopReplicaRequest.append("; Partitions: " + partitions.mkString(","))
stopReplicaRequest.toString()
http://git-wip-us.apache.org/repos/asf/kafka/blob/19ae1959/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 39266b5..824e394 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -53,7 +53,7 @@ class Partition(val topic: String,
* each partition. */
private var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
this.logIdent = "Partition [%s,%d] on broker %d: ".format(topic, partitionId, localBrokerId)
- private val stateChangeLogger = Logger.getLogger("state.change.logger")
+ private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId)
http://git-wip-us.apache.org/repos/asf/kafka/blob/19ae1959/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 2e50b8d..6e563d2 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -26,17 +26,12 @@ import collection.mutable
import kafka.api._
import org.apache.log4j.Logger
-class ControllerChannelManager private (config: KafkaConfig) extends Logging {
- private var controllerContext: ControllerContext = null
+class ControllerChannelManager (private val controllerContext: ControllerContext, config: KafkaConfig) extends Logging {
private val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo]
private val brokerLock = new Object
this.logIdent = "[Channel manager on controller " + config.brokerId + "]: "
- def this(controllerContext: ControllerContext, config : KafkaConfig) {
- this(config)
- this.controllerContext = controllerContext
- controllerContext.liveBrokers.foreach(addNewBroker(_))
- }
+ controllerContext.liveBrokers.foreach(addNewBroker(_))
def startup() = {
brokerLock synchronized {
@@ -114,7 +109,7 @@ class RequestSendThread(val controllerId: Int,
val channel: BlockingChannel)
extends ShutdownableThread("Controller-%d-to-broker-%d-send-thread".format(controllerId, toBrokerId)) {
private val lock = new Object()
- private val stateChangeLogger = Logger.getLogger("state.change.logger")
+ private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
override def doWork(): Unit = {
val queueItem = queue.take()
@@ -154,7 +149,7 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques
val leaderAndIsrRequestMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), PartitionStateInfo]]
val stopReplicaRequestMap = new mutable.HashMap[Int, Seq[(String, Int)]]
val stopAndDeleteReplicaRequestMap = new mutable.HashMap[Int, Seq[(String, Int)]]
- private val stateChangeLogger = Logger.getLogger("state.change.logger")
+ private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
def newBatch() {
// raise error if the previous batch is not empty
@@ -212,8 +207,9 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques
if (replicas.size > 0) {
debug("The stop replica request (delete = %s) sent to broker %d is %s"
.format(deletePartitions, broker, replicas.mkString(",")))
- sendRequest(broker, new StopReplicaRequest(deletePartitions,
- Set.empty[(String, Int)] ++ replicas, controllerEpoch, correlationId), null)
+ val stopReplicaRequest = new StopReplicaRequest(deletePartitions, Set.empty[(String, Int)] ++ replicas, controllerId,
+ controllerEpoch, correlationId)
+ sendRequest(broker, stopReplicaRequest, null)
}
}
m.clear()
http://git-wip-us.apache.org/repos/asf/kafka/blob/19ae1959/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index e18ab07..25a8cfe 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -74,6 +74,7 @@ trait KafkaControllerMBean {
object KafkaController {
val MBeanName = "kafka.controller:type=KafkaController,name=ControllerOps"
+ val stateChangeLogger = "state.change.logger"
val InitialControllerEpoch = 1
val InitialControllerEpochZkVersion = 1
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/19ae1959/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index ce4b9e8..b25e9f4 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -47,7 +47,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext)
private val isShuttingDown = new AtomicBoolean(false)
this.logIdent = "[Partition state machine on Controller " + controllerId + "]: "
- private val stateChangeLogger = Logger.getLogger("state.change.logger")
+ private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
/**
* Invoked on successful controller election. First registers a topic change listener since that triggers all
http://git-wip-us.apache.org/repos/asf/kafka/blob/19ae1959/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 43d60cf..88058ec 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -45,7 +45,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest, controller.config.brokerId)
private val isShuttingDown = new AtomicBoolean(false)
this.logIdent = "[Replica state machine on controller " + controller.config.brokerId + "]: "
- private val stateChangeLogger = Logger.getLogger("state.change.logger")
+ private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
/**
* Invoked on successful controller election. First registers a broker change listener since that triggers all
http://git-wip-us.apache.org/repos/asf/kafka/blob/19ae1959/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index c10cbde..e73e529 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -52,7 +52,7 @@ class ReplicaManager(val config: KafkaConfig,
val highWatermarkCheckpoints = config.logDirs.map(dir => (dir, new HighwaterMarkCheckpoint(dir))).toMap
private var hwThreadInitialized = false
this.logIdent = "[Replica Manager on Broker " + localBrokerId + "]: "
- private val stateChangeLogger = Logger.getLogger("state.change.logger")
+ private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
newGauge(
"LeaderCount",
http://git-wip-us.apache.org/repos/asf/kafka/blob/19ae1959/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index 4c209f1..02ff81f 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -99,7 +99,8 @@ object SerializationTestUtils{
}
def createTestStopReplicaRequest() : StopReplicaRequest = {
- new StopReplicaRequest(controllerEpoch = 1, correlationId = 0, deletePartitions = true, partitions = collection.immutable.Set((topic1, 0), (topic2, 0)))
+ new StopReplicaRequest(controllerId = 0, controllerEpoch = 1, correlationId = 0, deletePartitions = true,
+ partitions = collection.immutable.Set((topic1, 0), (topic2, 0)))
}
def createTestStopReplicaResponse() : StopReplicaResponse = {