You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2014/01/03 20:29:59 UTC
git commit: KAFKA-1187 Controller should retry connecting to brokers
to send state change requests; reviewed by Jun Rao and Guozhang Wang
Updated Branches:
refs/heads/trunk a119f532c -> 3f88be631
KAFKA-1187 Controller should retry connecting to brokers to send state change requests; reviewed by Jun Rao and Guozhang Wang
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3f88be63
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3f88be63
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3f88be63
Branch: refs/heads/trunk
Commit: 3f88be6318670864b372deccb2af705e8c84382a
Parents: a119f53
Author: Neha Narkhede <ne...@gmail.com>
Authored: Fri Jan 3 11:29:53 2014 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Fri Jan 3 11:29:53 2014 -0800
----------------------------------------------------------------------
.../controller/ControllerChannelManager.scala | 49 ++++++++++++++++----
.../scala/kafka/producer/SyncProducer.scala | 5 --
2 files changed, 39 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/3f88be63/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 7991e42..33a84fb 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -17,7 +17,7 @@
package kafka.controller
import kafka.network.{Receive, BlockingChannel}
-import kafka.utils.{Logging, ShutdownableThread}
+import kafka.utils.{Utils, Logging, ShutdownableThread}
import collection.mutable.HashMap
import kafka.cluster.Broker
import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue}
@@ -81,8 +81,7 @@ class ControllerChannelManager (private val controllerContext: ControllerContext
BlockingChannel.UseDefaultBufferSize,
BlockingChannel.UseDefaultBufferSize,
config.controllerSocketTimeoutMs)
- channel.connect()
- val requestThread = new RequestSendThread(config.brokerId, controllerContext, broker.id, messageQueue, channel)
+ val requestThread = new RequestSendThread(config.brokerId, controllerContext, broker, messageQueue, channel)
requestThread.setDaemon(false)
brokerStateInfo.put(broker.id, new ControllerBrokerStateInfo(channel, broker, messageQueue, requestThread))
}
@@ -107,12 +106,13 @@ class ControllerChannelManager (private val controllerContext: ControllerContext
class RequestSendThread(val controllerId: Int,
val controllerContext: ControllerContext,
- val toBrokerId: Int,
+ val toBroker: Broker,
val queue: BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)],
val channel: BlockingChannel)
- extends ShutdownableThread("Controller-%d-to-broker-%d-send-thread".format(controllerId, toBrokerId)) {
+ extends ShutdownableThread("Controller-%d-to-broker-%d-send-thread".format(controllerId, toBroker.id)) {
private val lock = new Object()
private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
+ connectToBroker(toBroker, channel)
override def doWork(): Unit = {
val queueItem = queue.take()
@@ -123,8 +123,25 @@ class RequestSendThread(val controllerId: Int,
try{
lock synchronized {
- channel.connect() // establish a socket connection if needed
- channel.send(request)
+ var isSendSuccessful = false
+ while(isRunning.get() && !isSendSuccessful) {
+ // if a broker goes down for a long time, then at some point the controller's zookeeper listener will trigger a
+ // removeBroker which will invoke shutdown() on this thread. At that point, we will stop retrying.
+ try {
+ channel.send(request)
+ isSendSuccessful = true
+ } catch {
+ case e => // if the send was not successful, reconnect to broker and resend the message
+ error(("Controller %d epoch %d failed to send %s request with correlation id %s to broker %s. " +
+ "Reconnecting to broker.").format(controllerId, controllerContext.epoch,
+ RequestKeys.nameForKey(request.requestId.get), request.correlationId, toBroker.toString()), e)
+ channel.disconnect()
+ connectToBroker(toBroker, channel)
+ isSendSuccessful = false
+ // backoff before retrying the connection and send
+ Utils.swallow(Thread.sleep(300))
+ }
+ }
receive = channel.receive()
var response: RequestOrResponse = null
request.requestId.get match {
@@ -135,8 +152,8 @@ class RequestSendThread(val controllerId: Int,
case RequestKeys.UpdateMetadataKey =>
response = UpdateMetadataResponse.readFrom(receive.buffer)
}
- stateChangeLogger.trace("Controller %d epoch %d received response correlationId %d for a request sent to broker %d"
- .format(controllerId, controllerContext.epoch, response.correlationId, toBrokerId))
+ stateChangeLogger.trace("Controller %d epoch %d received response correlationId %d for a request sent to broker %s"
+ .format(controllerId, controllerContext.epoch, response.correlationId, toBroker.toString()))
if(callback != null){
callback(response)
@@ -144,11 +161,23 @@ class RequestSendThread(val controllerId: Int,
}
} catch {
case e: Throwable =>
- warn("Controller %d fails to send a request to broker %d".format(controllerId, toBrokerId), e)
+ warn("Controller %d fails to send a request to broker %s".format(controllerId, toBroker.toString()), e)
// If there is any socket error (eg, socket timeout), the channel is no longer usable and needs to be recreated.
channel.disconnect()
}
}
+
+ private def connectToBroker(broker: Broker, channel: BlockingChannel) {
+ try {
+ channel.connect()
+ info("Controller %d connected to %s for sending state change requests".format(controllerId, broker.toString()))
+ } catch {
+ case e => {
+ channel.disconnect()
+ error("Controller %d's connection to broker %s was unsuccessful".format(controllerId, broker.toString()), e)
+ }
+ }
+ }
}
class ControllerBrokerRequestBatch(controllerContext: ControllerContext, sendRequest: (Int, RequestOrResponse, (RequestOrResponse) => Unit) => Unit,
http://git-wip-us.apache.org/repos/asf/kafka/blob/3f88be63/core/src/main/scala/kafka/producer/SyncProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala
index 419156e..041cfa5 100644
--- a/core/src/main/scala/kafka/producer/SyncProducer.scala
+++ b/core/src/main/scala/kafka/producer/SyncProducer.scala
@@ -120,11 +120,6 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
}
}
- private def reconnect() {
- disconnect()
- connect()
- }
-
/**
* Disconnect from current channel, closing connection.
* Side effect: channel field is set to null on successful disconnect