You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/08/15 17:07:42 UTC

svn commit: r1373448 - in /incubator/kafka/branches/0.8/core/src/main/scala/kafka: log/Log.scala server/KafkaController.scala

Author: junrao
Date: Wed Aug 15 15:07:41 2012
New Revision: 1373448

URL: http://svn.apache.org/viewvc?rev=1373448&view=rev
Log:
KafkaController.RequestSendThread can throw exception on broker socket; patched by Yang Ye; reviewed by Jun Rao; KAFKA-459, KAFKA-460

Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala?rev=1373448&r1=1373447&r2=1373448&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala Wed Aug 15 15:07:41 2012
@@ -438,8 +438,8 @@ private[kafka] class Log( val dir: File,
         segment.truncateTo(targetOffset)
         info("Truncated log segment %s to highwatermark %d".format(segment.file.getAbsolutePath, targetOffset))
       case None =>
-        assert(targetOffset <= segments.view.last.absoluteEndOffset, "Last checkpointed hw %d cannot be greater than the latest message offset %d in the log %s".format(targetOffset, segments.view.last.absoluteEndOffset, segments.view.last.file.getAbsolutePath))
-        error("Cannot truncate log to %d since the log start offset is %d and end offset is %d".format(targetOffset, segments.view.head.start, segments.view.last.absoluteEndOffset))
+        if(targetOffset > segments.view.last.absoluteEndOffset)
+         error("Last checkpointed hw %d cannot be greater than the latest message offset %d in the log %s".format(targetOffset, segments.view.last.absoluteEndOffset, segments.view.last.file.getAbsolutePath))
     }
 
     val segmentsToBeDeleted = segments.view.filter(segment => segment.start > targetOffset)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala?rev=1373448&r1=1373447&r2=1373448&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala Wed Aug 15 15:07:41 2012
@@ -61,25 +61,24 @@ class RequestSendThread(val controllerId
           lock synchronized {
             channel.send(request)
             receive = channel.receive()
+            var response: RequestOrResponse = null
+            request.requestId.get match {
+              case RequestKeys.LeaderAndISRRequest =>
+                response = LeaderAndISRResponse.readFrom(receive.buffer)
+              case RequestKeys.StopReplicaRequest =>
+                response = StopReplicaResponse.readFrom(receive.buffer)
+            }
+            trace("got a response %s".format(controllerId, response, toBrokerId))
+
+            if(callback != null){
+              callback(response)
+            }
           }
         } catch {
           case e =>
             // log it and let it go. Let controller shut it down.
             debug("Exception occurs", e)
         }
-
-        var response: RequestOrResponse = null
-        request.requestId.get match {
-          case RequestKeys.LeaderAndISRRequest =>
-            response = LeaderAndISRResponse.readFrom(receive.buffer)
-          case RequestKeys.StopReplicaRequest =>
-            response = StopReplicaResponse.readFrom(receive.buffer)
-        }
-        trace("got a response %s".format(controllerId, response, toBrokerId))
-
-        if(callback != null){
-          callback(response)
-        }
       }
     } catch{
       case e: InterruptedException => warn("intterrupted. Shutting down")
@@ -94,6 +93,7 @@ class ControllerChannelManager(allBroker
   private val messageChannels = new HashMap[Int, BlockingChannel]
   private val messageQueues = new HashMap[Int, BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)]]
   private val messageThreads = new HashMap[Int, RequestSendThread]
+  private val lock = new Object()
   this.logIdent = "Channel manager on controller " + config.brokerId + ", "
   for(broker <- allBrokers){
     brokers.put(broker.id, broker)
@@ -117,8 +117,10 @@ class ControllerChannelManager(allBroker
   }
 
   def shutDown() = {
-    for((brokerId, broker) <- brokers){
-      removeBroker(brokerId)
+    lock synchronized {
+      for((brokerId, broker) <- brokers){
+        removeBroker(brokerId)
+      }
     }
   }
 
@@ -127,30 +129,34 @@ class ControllerChannelManager(allBroker
   }
 
   def addBroker(broker: Broker){
-    brokers.put(broker.id, broker)
-    messageQueues.put(broker.id, new LinkedBlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)](config.controllerMessageQueueSize))
-    val channel = new BlockingChannel(broker.host, broker.port,
-                                      BlockingChannel.UseDefaultBufferSize,
-                                      BlockingChannel.UseDefaultBufferSize,
-                                      config.controllerSocketTimeoutMs)
-    channel.connect()
-    messageChannels.put(broker.id, channel)
-    val thread = new RequestSendThread(config.brokerId, broker.id, messageQueues(broker.id), messageChannels(broker.id))
-    thread.setDaemon(false)
-    thread.start()
-    messageThreads.put(broker.id, thread)
+    lock synchronized {
+      brokers.put(broker.id, broker)
+      messageQueues.put(broker.id, new LinkedBlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)](config.controllerMessageQueueSize))
+      val channel = new BlockingChannel(broker.host, broker.port,
+                                        BlockingChannel.UseDefaultBufferSize,
+                                        BlockingChannel.UseDefaultBufferSize,
+                                        config.controllerSocketTimeoutMs)
+      channel.connect()
+      messageChannels.put(broker.id, channel)
+      val thread = new RequestSendThread(config.brokerId, broker.id, messageQueues(broker.id), messageChannels(broker.id))
+      thread.setDaemon(false)
+      thread.start()
+      messageThreads.put(broker.id, thread)
+    }
   }
 
   def removeBroker(brokerId: Int){
-    brokers.remove(brokerId)
-    try {
-      messageChannels(brokerId).disconnect()
-      messageChannels.remove(brokerId)
-      messageQueues.remove(brokerId)
-      messageThreads(brokerId).shutDown()
-      messageThreads.remove(brokerId)
-    }catch {
-      case e => error("Error while removing broker by the controller", e)
+    lock synchronized {
+      brokers.remove(brokerId)
+      try {
+        messageChannels(brokerId).disconnect()
+        messageChannels.remove(brokerId)
+        messageQueues.remove(brokerId)
+        messageThreads(brokerId).shutDown()
+        messageThreads.remove(brokerId)
+      }catch {
+        case e => error("Error while removing broker by the controller", e)
+      }
     }
   }
 }