You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2014/01/03 20:29:59 UTC

git commit: KAFKA-1187 Controller should retry connecting to brokers to send state change requests; reviewed by Jun Rao and Guozhang Wang

Updated Branches:
  refs/heads/trunk a119f532c -> 3f88be631


KAFKA-1187 Controller should retry connecting to brokers to send state change requests; reviewed by Jun Rao and Guozhang Wang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3f88be63
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3f88be63
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3f88be63

Branch: refs/heads/trunk
Commit: 3f88be6318670864b372deccb2af705e8c84382a
Parents: a119f53
Author: Neha Narkhede <ne...@gmail.com>
Authored: Fri Jan 3 11:29:53 2014 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Fri Jan 3 11:29:53 2014 -0800

----------------------------------------------------------------------
 .../controller/ControllerChannelManager.scala   | 49 ++++++++++++++++----
 .../scala/kafka/producer/SyncProducer.scala     |  5 --
 2 files changed, 39 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3f88be63/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 7991e42..33a84fb 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -17,7 +17,7 @@
 package kafka.controller
 
 import kafka.network.{Receive, BlockingChannel}
-import kafka.utils.{Logging, ShutdownableThread}
+import kafka.utils.{Utils, Logging, ShutdownableThread}
 import collection.mutable.HashMap
 import kafka.cluster.Broker
 import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue}
@@ -81,8 +81,7 @@ class ControllerChannelManager (private val controllerContext: ControllerContext
       BlockingChannel.UseDefaultBufferSize,
       BlockingChannel.UseDefaultBufferSize,
       config.controllerSocketTimeoutMs)
-    channel.connect()
-    val requestThread = new RequestSendThread(config.brokerId, controllerContext, broker.id, messageQueue, channel)
+    val requestThread = new RequestSendThread(config.brokerId, controllerContext, broker, messageQueue, channel)
     requestThread.setDaemon(false)
     brokerStateInfo.put(broker.id, new ControllerBrokerStateInfo(channel, broker, messageQueue, requestThread))
   }
@@ -107,12 +106,13 @@ class ControllerChannelManager (private val controllerContext: ControllerContext
 
 class RequestSendThread(val controllerId: Int,
                         val controllerContext: ControllerContext,
-                        val toBrokerId: Int,
+                        val toBroker: Broker,
                         val queue: BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)],
                         val channel: BlockingChannel)
-  extends ShutdownableThread("Controller-%d-to-broker-%d-send-thread".format(controllerId, toBrokerId)) {
+  extends ShutdownableThread("Controller-%d-to-broker-%d-send-thread".format(controllerId, toBroker.id)) {
   private val lock = new Object()
   private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
+  connectToBroker(toBroker, channel)
 
   override def doWork(): Unit = {
     val queueItem = queue.take()
@@ -123,8 +123,25 @@ class RequestSendThread(val controllerId: Int,
 
     try{
       lock synchronized {
-        channel.connect() // establish a socket connection if needed
-        channel.send(request)
+        var isSendSuccessful = false
+        while(isRunning.get() && !isSendSuccessful) {
+          // if a broker goes down for a long time, then at some point the controller's zookeeper listener will trigger a
+          // removeBroker which will invoke shutdown() on this thread. At that point, we will stop retrying.
+          try {
+            channel.send(request)
+            isSendSuccessful = true
+          } catch {
+            case e => // if the send was not successful, reconnect to broker and resend the message
+              error(("Controller %d epoch %d failed to send %s request with correlation id %s to broker %s. " +
+                "Reconnecting to broker.").format(controllerId, controllerContext.epoch,
+                RequestKeys.nameForKey(request.requestId.get), request.correlationId, toBroker.toString()), e)
+              channel.disconnect()
+              connectToBroker(toBroker, channel)
+              isSendSuccessful = false
+              // backoff before retrying the connection and send
+              Utils.swallow(Thread.sleep(300))
+          }
+        }
         receive = channel.receive()
         var response: RequestOrResponse = null
         request.requestId.get match {
@@ -135,8 +152,8 @@ class RequestSendThread(val controllerId: Int,
           case RequestKeys.UpdateMetadataKey =>
             response = UpdateMetadataResponse.readFrom(receive.buffer)
         }
-        stateChangeLogger.trace("Controller %d epoch %d received response correlationId %d for a request sent to broker %d"
-                                  .format(controllerId, controllerContext.epoch, response.correlationId, toBrokerId))
+        stateChangeLogger.trace("Controller %d epoch %d received response correlationId %d for a request sent to broker %s"
+                                  .format(controllerId, controllerContext.epoch, response.correlationId, toBroker.toString()))
 
         if(callback != null){
           callback(response)
@@ -144,11 +161,23 @@ class RequestSendThread(val controllerId: Int,
       }
     } catch {
       case e: Throwable =>
-        warn("Controller %d fails to send a request to broker %d".format(controllerId, toBrokerId), e)
+        warn("Controller %d fails to send a request to broker %s".format(controllerId, toBroker.toString()), e)
         // If there is any socket error (eg, socket timeout), the channel is no longer usable and needs to be recreated.
         channel.disconnect()
     }
   }
+
+  private def connectToBroker(broker: Broker, channel: BlockingChannel) {
+    try {
+      channel.connect()
+      info("Controller %d connected to %s for sending state change requests".format(controllerId, broker.toString()))
+    } catch {
+      case e => {
+        channel.disconnect()
+        error("Controller %d's connection to broker %s was unsuccessful".format(controllerId, broker.toString()), e)
+      }
+    }
+  }
 }
 
 class ControllerBrokerRequestBatch(controllerContext: ControllerContext, sendRequest: (Int, RequestOrResponse, (RequestOrResponse) => Unit) => Unit,

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f88be63/core/src/main/scala/kafka/producer/SyncProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala
index 419156e..041cfa5 100644
--- a/core/src/main/scala/kafka/producer/SyncProducer.scala
+++ b/core/src/main/scala/kafka/producer/SyncProducer.scala
@@ -120,11 +120,6 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
     }
   }
 
-  private def reconnect() {
-    disconnect()
-    connect()
-  }
-
   /**
    * Disconnect from current channel, closing connection.
    * Side effect: channel field is set to null on successful disconnect