You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/08/15 17:07:42 UTC
svn commit: r1373448 - in
/incubator/kafka/branches/0.8/core/src/main/scala/kafka: log/Log.scala
server/KafkaController.scala
Author: junrao
Date: Wed Aug 15 15:07:41 2012
New Revision: 1373448
URL: http://svn.apache.org/viewvc?rev=1373448&view=rev
Log:
KafkaController.RequestSendThread can throw exception on broker socket; patched by Yang Ye; reviewed by Jun Rao; KAFKA-459, KAFKA-460
Modified:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala?rev=1373448&r1=1373447&r2=1373448&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala Wed Aug 15 15:07:41 2012
@@ -438,8 +438,8 @@ private[kafka] class Log( val dir: File,
segment.truncateTo(targetOffset)
info("Truncated log segment %s to highwatermark %d".format(segment.file.getAbsolutePath, targetOffset))
case None =>
- assert(targetOffset <= segments.view.last.absoluteEndOffset, "Last checkpointed hw %d cannot be greater than the latest message offset %d in the log %s".format(targetOffset, segments.view.last.absoluteEndOffset, segments.view.last.file.getAbsolutePath))
- error("Cannot truncate log to %d since the log start offset is %d and end offset is %d".format(targetOffset, segments.view.head.start, segments.view.last.absoluteEndOffset))
+ if(targetOffset > segments.view.last.absoluteEndOffset)
+ error("Last checkpointed hw %d cannot be greater than the latest message offset %d in the log %s".format(targetOffset, segments.view.last.absoluteEndOffset, segments.view.last.file.getAbsolutePath))
}
val segmentsToBeDeleted = segments.view.filter(segment => segment.start > targetOffset)
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala?rev=1373448&r1=1373447&r2=1373448&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala Wed Aug 15 15:07:41 2012
@@ -61,25 +61,24 @@ class RequestSendThread(val controllerId
lock synchronized {
channel.send(request)
receive = channel.receive()
+ var response: RequestOrResponse = null
+ request.requestId.get match {
+ case RequestKeys.LeaderAndISRRequest =>
+ response = LeaderAndISRResponse.readFrom(receive.buffer)
+ case RequestKeys.StopReplicaRequest =>
+ response = StopReplicaResponse.readFrom(receive.buffer)
+ }
+ trace("got a response %s".format(controllerId, response, toBrokerId))
+
+ if(callback != null){
+ callback(response)
+ }
}
} catch {
case e =>
// log it and let it go. Let controller shut it down.
debug("Exception occurs", e)
}
-
- var response: RequestOrResponse = null
- request.requestId.get match {
- case RequestKeys.LeaderAndISRRequest =>
- response = LeaderAndISRResponse.readFrom(receive.buffer)
- case RequestKeys.StopReplicaRequest =>
- response = StopReplicaResponse.readFrom(receive.buffer)
- }
- trace("got a response %s".format(controllerId, response, toBrokerId))
-
- if(callback != null){
- callback(response)
- }
}
} catch{
case e: InterruptedException => warn("intterrupted. Shutting down")
@@ -94,6 +93,7 @@ class ControllerChannelManager(allBroker
private val messageChannels = new HashMap[Int, BlockingChannel]
private val messageQueues = new HashMap[Int, BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)]]
private val messageThreads = new HashMap[Int, RequestSendThread]
+ private val lock = new Object()
this.logIdent = "Channel manager on controller " + config.brokerId + ", "
for(broker <- allBrokers){
brokers.put(broker.id, broker)
@@ -117,8 +117,10 @@ class ControllerChannelManager(allBroker
}
def shutDown() = {
- for((brokerId, broker) <- brokers){
- removeBroker(brokerId)
+ lock synchronized {
+ for((brokerId, broker) <- brokers){
+ removeBroker(brokerId)
+ }
}
}
@@ -127,30 +129,34 @@ class ControllerChannelManager(allBroker
}
def addBroker(broker: Broker){
- brokers.put(broker.id, broker)
- messageQueues.put(broker.id, new LinkedBlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)](config.controllerMessageQueueSize))
- val channel = new BlockingChannel(broker.host, broker.port,
- BlockingChannel.UseDefaultBufferSize,
- BlockingChannel.UseDefaultBufferSize,
- config.controllerSocketTimeoutMs)
- channel.connect()
- messageChannels.put(broker.id, channel)
- val thread = new RequestSendThread(config.brokerId, broker.id, messageQueues(broker.id), messageChannels(broker.id))
- thread.setDaemon(false)
- thread.start()
- messageThreads.put(broker.id, thread)
+ lock synchronized {
+ brokers.put(broker.id, broker)
+ messageQueues.put(broker.id, new LinkedBlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)](config.controllerMessageQueueSize))
+ val channel = new BlockingChannel(broker.host, broker.port,
+ BlockingChannel.UseDefaultBufferSize,
+ BlockingChannel.UseDefaultBufferSize,
+ config.controllerSocketTimeoutMs)
+ channel.connect()
+ messageChannels.put(broker.id, channel)
+ val thread = new RequestSendThread(config.brokerId, broker.id, messageQueues(broker.id), messageChannels(broker.id))
+ thread.setDaemon(false)
+ thread.start()
+ messageThreads.put(broker.id, thread)
+ }
}
def removeBroker(brokerId: Int){
- brokers.remove(brokerId)
- try {
- messageChannels(brokerId).disconnect()
- messageChannels.remove(brokerId)
- messageQueues.remove(brokerId)
- messageThreads(brokerId).shutDown()
- messageThreads.remove(brokerId)
- }catch {
- case e => error("Error while removing broker by the controller", e)
+ lock synchronized {
+ brokers.remove(brokerId)
+ try {
+ messageChannels(brokerId).disconnect()
+ messageChannels.remove(brokerId)
+ messageQueues.remove(brokerId)
+ messageThreads(brokerId).shutDown()
+ messageThreads.remove(brokerId)
+ }catch {
+ case e => error("Error while removing broker by the controller", e)
+ }
}
}
}