You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/03/08 03:20:47 UTC

git commit: Include controllerId in all requests sent by controller; patched by Swapnil Ghike; reviewed by Jun Rao; kafka-793

Updated Branches:
  refs/heads/0.8 771760ce2 -> 19ae1959b


Include controllerId in all requests sent by controller; patched by Swapnil Ghike; reviewed by Jun Rao; kafka-793


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/19ae1959
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/19ae1959
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/19ae1959

Branch: refs/heads/0.8
Commit: 19ae1959b091df44475243e3b199d6121ddedc72
Parents: 771760c
Author: Swapnil Ghike <sg...@linkedin.com>
Authored: Thu Mar 7 18:20:33 2013 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Mar 7 18:20:33 2013 -0800

----------------------------------------------------------------------
 .../PreferredReplicaLeaderElectionCommand.scala    |    8 +++---
 .../main/scala/kafka/api/StopReplicaRequest.scala  |   16 +++++++++----
 core/src/main/scala/kafka/cluster/Partition.scala  |    2 +-
 .../controller/ControllerChannelManager.scala      |   18 +++++---------
 .../scala/kafka/controller/KafkaController.scala   |    1 +
 .../kafka/controller/PartitionStateMachine.scala   |    2 +-
 .../kafka/controller/ReplicaStateMachine.scala     |    2 +-
 .../main/scala/kafka/server/ReplicaManager.scala   |    2 +-
 .../api/RequestResponseSerializationTest.scala     |    3 +-
 9 files changed, 29 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/19ae1959/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
index e59d5af..7405c5a 100644
--- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
+++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
@@ -35,7 +35,6 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
       .withRequiredArg
       .describedAs("list of partitions for which preferred replica leader election needs to be triggered")
       .ofType(classOf[String])
-      .defaultsTo("")
     val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the " +
       "form host:port. Multiple URLS can be given to allow fail-over.")
       .withRequiredArg
@@ -46,15 +45,16 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
 
     CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt)
 
-    val jsonFile = options.valueOf(jsonFileOpt)
     val zkConnect = options.valueOf(zkConnectOpt)
-    val jsonString = if(jsonFile != "") Utils.readFileAsString(jsonFile) else ""
     var zkClient: ZkClient = null
 
     try {
       zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
       val partitionsForPreferredReplicaElection =
-        if(jsonFile == "") ZkUtils.getAllPartitions(zkClient) else parsePreferredReplicaJsonData(jsonString)
+        if (!options.has(jsonFileOpt))
+          ZkUtils.getAllPartitions(zkClient)
+        else
+          parsePreferredReplicaJsonData(Utils.readFileAsString(options.valueOf(jsonFileOpt)))
       val preferredReplicaElectionCommand = new PreferredReplicaLeaderElectionCommand(zkClient, partitionsForPreferredReplicaElection)
 
       preferredReplicaElectionCommand.moveLeaderToPreferredReplica()

http://git-wip-us.apache.org/repos/asf/kafka/blob/19ae1959/core/src/main/scala/kafka/api/StopReplicaRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/StopReplicaRequest.scala b/core/src/main/scala/kafka/api/StopReplicaRequest.scala
index 5107488..cd55db4 100644
--- a/core/src/main/scala/kafka/api/StopReplicaRequest.scala
+++ b/core/src/main/scala/kafka/api/StopReplicaRequest.scala
@@ -36,6 +36,7 @@ object StopReplicaRequest extends Logging {
     val correlationId = buffer.getInt
     val clientId = readShortString(buffer)
     val ackTimeoutMs = buffer.getInt
+    val controllerId = buffer.getInt
     val controllerEpoch = buffer.getInt
     val deletePartitions = buffer.get match {
       case 1 => true
@@ -48,7 +49,8 @@ object StopReplicaRequest extends Logging {
     (1 to topicPartitionPairCount) foreach { _ =>
       topicPartitionPairSet.add(readShortString(buffer), buffer.getInt)
     }
-    StopReplicaRequest(versionId, correlationId, clientId, ackTimeoutMs, deletePartitions, topicPartitionPairSet.toSet, controllerEpoch)
+    StopReplicaRequest(versionId, correlationId, clientId, ackTimeoutMs, controllerId, controllerEpoch,
+                       deletePartitions, topicPartitionPairSet.toSet)
   }
 }
 
@@ -56,14 +58,15 @@ case class StopReplicaRequest(versionId: Short,
                               override val correlationId: Int,
                               clientId: String,
                               ackTimeoutMs: Int,
+                              controllerId: Int,
+                              controllerEpoch: Int,
                               deletePartitions: Boolean,
-                              partitions: Set[(String, Int)],
-                              controllerEpoch: Int)
+                              partitions: Set[(String, Int)])
         extends RequestOrResponse(Some(RequestKeys.StopReplicaKey), correlationId) {
 
-  def this(deletePartitions: Boolean, partitions: Set[(String, Int)], controllerEpoch: Int, correlationId: Int) = {
+  def this(deletePartitions: Boolean, partitions: Set[(String, Int)], controllerId: Int, controllerEpoch: Int, correlationId: Int) = {
     this(StopReplicaRequest.CurrentVersion, correlationId, StopReplicaRequest.DefaultClientId, StopReplicaRequest.DefaultAckTimeout,
-         deletePartitions, partitions, controllerEpoch)
+         controllerId, controllerEpoch, deletePartitions, partitions)
   }
 
   def writeTo(buffer: ByteBuffer) {
@@ -71,6 +74,7 @@ case class StopReplicaRequest(versionId: Short,
     buffer.putInt(correlationId)
     writeShortString(buffer, clientId)
     buffer.putInt(ackTimeoutMs)
+    buffer.putInt(controllerId)
     buffer.putInt(controllerEpoch)
     buffer.put(if (deletePartitions) 1.toByte else 0.toByte)
     buffer.putInt(partitions.size)
@@ -86,6 +90,7 @@ case class StopReplicaRequest(versionId: Short,
       4 + /* correlation id */
       ApiUtils.shortStringLength(clientId) +
       4 + /* ackTimeoutMs */
+      4 + /* controller id*/
       4 + /* controller epoch */
       1 + /* deletePartitions */
       4 /* partition count */
@@ -104,6 +109,7 @@ case class StopReplicaRequest(versionId: Short,
     stopReplicaRequest.append("; ClientId: " + clientId)
     stopReplicaRequest.append("; AckTimeoutMs: " + ackTimeoutMs + " ms")
     stopReplicaRequest.append("; DeletePartitions: " + deletePartitions)
+    stopReplicaRequest.append("; ControllerId: " + controllerId)
     stopReplicaRequest.append("; ControllerEpoch: " + controllerEpoch)
     stopReplicaRequest.append("; Partitions: " + partitions.mkString(","))
     stopReplicaRequest.toString()

http://git-wip-us.apache.org/repos/asf/kafka/blob/19ae1959/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 39266b5..824e394 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -53,7 +53,7 @@ class Partition(val topic: String,
    * each partition. */
   private var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
   this.logIdent = "Partition [%s,%d] on broker %d: ".format(topic, partitionId, localBrokerId)
-  private val stateChangeLogger = Logger.getLogger("state.change.logger")
+  private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
 
   private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/19ae1959/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 2e50b8d..6e563d2 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -26,17 +26,12 @@ import collection.mutable
 import kafka.api._
 import org.apache.log4j.Logger
 
-class ControllerChannelManager private (config: KafkaConfig) extends Logging {
-  private var controllerContext: ControllerContext = null
+class ControllerChannelManager (private val controllerContext: ControllerContext, config: KafkaConfig) extends Logging {
   private val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo]
   private val brokerLock = new Object
   this.logIdent = "[Channel manager on controller " + config.brokerId + "]: "
 
-  def this(controllerContext: ControllerContext, config : KafkaConfig) {
-    this(config)
-    this.controllerContext = controllerContext
-    controllerContext.liveBrokers.foreach(addNewBroker(_))
-  }
+  controllerContext.liveBrokers.foreach(addNewBroker(_))
 
   def startup() = {
     brokerLock synchronized {
@@ -114,7 +109,7 @@ class RequestSendThread(val controllerId: Int,
                         val channel: BlockingChannel)
   extends ShutdownableThread("Controller-%d-to-broker-%d-send-thread".format(controllerId, toBrokerId)) {
   private val lock = new Object()
-  private val stateChangeLogger = Logger.getLogger("state.change.logger")
+  private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
 
   override def doWork(): Unit = {
     val queueItem = queue.take()
@@ -154,7 +149,7 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques
   val leaderAndIsrRequestMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), PartitionStateInfo]]
   val stopReplicaRequestMap = new mutable.HashMap[Int, Seq[(String, Int)]]
   val stopAndDeleteReplicaRequestMap = new mutable.HashMap[Int, Seq[(String, Int)]]
-  private val stateChangeLogger = Logger.getLogger("state.change.logger")
+  private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
 
   def newBatch() {
     // raise error if the previous batch is not empty
@@ -212,8 +207,9 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques
             if (replicas.size > 0) {
               debug("The stop replica request (delete = %s) sent to broker %d is %s"
                 .format(deletePartitions, broker, replicas.mkString(",")))
-              sendRequest(broker, new StopReplicaRequest(deletePartitions,
-                Set.empty[(String, Int)] ++ replicas, controllerEpoch, correlationId), null)
+              val stopReplicaRequest = new StopReplicaRequest(deletePartitions, Set.empty[(String, Int)] ++ replicas, controllerId,
+                                                              controllerEpoch, correlationId)
+              sendRequest(broker, stopReplicaRequest, null)
             }
         }
         m.clear()

http://git-wip-us.apache.org/repos/asf/kafka/blob/19ae1959/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index e18ab07..25a8cfe 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -74,6 +74,7 @@ trait KafkaControllerMBean {
 
 object KafkaController {
   val MBeanName = "kafka.controller:type=KafkaController,name=ControllerOps"
+  val stateChangeLogger = "state.change.logger"
   val InitialControllerEpoch = 1
   val InitialControllerEpochZkVersion = 1
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/19ae1959/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index ce4b9e8..b25e9f4 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -47,7 +47,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
   val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext)
   private val isShuttingDown = new AtomicBoolean(false)
   this.logIdent = "[Partition state machine on Controller " + controllerId + "]: "
-  private val stateChangeLogger = Logger.getLogger("state.change.logger")
+  private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
 
   /**
    * Invoked on successful controller election. First registers a topic change listener since that triggers all

http://git-wip-us.apache.org/repos/asf/kafka/blob/19ae1959/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 43d60cf..88058ec 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -45,7 +45,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
   val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest, controller.config.brokerId)
   private val isShuttingDown = new AtomicBoolean(false)
   this.logIdent = "[Replica state machine on controller " + controller.config.brokerId + "]: "
-  private val stateChangeLogger = Logger.getLogger("state.change.logger")
+  private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
 
   /**
    * Invoked on successful controller election. First registers a broker change listener since that triggers all

http://git-wip-us.apache.org/repos/asf/kafka/blob/19ae1959/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index c10cbde..e73e529 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -52,7 +52,7 @@ class ReplicaManager(val config: KafkaConfig,
   val highWatermarkCheckpoints = config.logDirs.map(dir => (dir, new HighwaterMarkCheckpoint(dir))).toMap
   private var hwThreadInitialized = false
   this.logIdent = "[Replica Manager on Broker " + localBrokerId + "]: "
-  private val stateChangeLogger = Logger.getLogger("state.change.logger")
+  private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
 
   newGauge(
     "LeaderCount",

http://git-wip-us.apache.org/repos/asf/kafka/blob/19ae1959/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index 4c209f1..02ff81f 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -99,7 +99,8 @@ object SerializationTestUtils{
   }
 
   def createTestStopReplicaRequest() : StopReplicaRequest = {
-    new StopReplicaRequest(controllerEpoch = 1, correlationId = 0, deletePartitions = true, partitions = collection.immutable.Set((topic1, 0), (topic2, 0)))
+    new StopReplicaRequest(controllerId = 0, controllerEpoch = 1, correlationId = 0, deletePartitions = true,
+                           partitions = collection.immutable.Set((topic1, 0), (topic2, 0)))
   }
 
   def createTestStopReplicaResponse() : StopReplicaResponse = {