You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/02/02 18:33:46 UTC

svn commit: r1239740 - in /incubator/kafka/trunk/core/src/main/scala/kafka: log/Log.scala server/KafkaRequestHandlers.scala

Author: junrao
Date: Thu Feb  2 17:33:46 2012
New Revision: 1239740

URL: http://svn.apache.org/viewvc?rev=1239740&view=rev
Log:
Corrupted request shuts down the broker; patched by Jun Rao; reviewed by Jay Kreps and Neha Narkhede; KAFKA-261

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala?rev=1239740&r1=1239739&r2=1239740&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala Thu Feb  2 17:33:46 2012
@@ -211,10 +211,18 @@ private[log] class Log(val dir: File, va
     
     // they are valid, insert them in the log
     lock synchronized {
-      val segment = segments.view.last
-      segment.messageSet.append(messages)
-      maybeFlush(numberOfMessages)
-      maybeRoll(segment)
+      try {
+        val segment = segments.view.last
+        segment.messageSet.append(messages)
+        maybeFlush(numberOfMessages)
+        maybeRoll(segment)
+      }
+      catch {
+        case e: IOException =>
+          fatal("Halting due to unrecoverable I/O error while handling producer request", e)
+          Runtime.getRuntime.halt(1)
+        case e2 => throw e2
+      }
     }
   }
 

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala?rev=1239740&r1=1239739&r2=1239740&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala Thu Feb  2 17:33:46 2012
@@ -25,7 +25,6 @@ import kafka.api._
 import kafka.common.ErrorMapping
 import kafka.utils.SystemTime
 import kafka.utils.Logging
-import java.io.IOException
 
 /**
  * Logic to handle the various Kafka requests
@@ -74,15 +73,8 @@ private[kafka] class KafkaRequestHandler
     catch {
       case e =>
         error("Error processing " + requestHandlerName + " on " + request.topic + ":" + partition, e)
-        e match {
-          case _: IOException =>
-            fatal("Halting due to unrecoverable I/O error while handling producer request: " + e.getMessage, e)
-            Runtime.getRuntime.halt(1)
-          case _ =>
-        }
         throw e
     }
-    None
   }
 
   def handleFetchRequest(request: Receive): Option[Send] = {