You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/02/02 18:33:46 UTC
svn commit: r1239740 - in /incubator/kafka/trunk/core/src/main/scala/kafka:
log/Log.scala server/KafkaRequestHandlers.scala
Author: junrao
Date: Thu Feb 2 17:33:46 2012
New Revision: 1239740
URL: http://svn.apache.org/viewvc?rev=1239740&view=rev
Log:
Corrupted request shuts down the broker; patched by Jun Rao; reviewed by Jay Kreps and Neha Narkhede; KAFKA-261
Modified:
incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala
incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala?rev=1239740&r1=1239739&r2=1239740&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala Thu Feb 2 17:33:46 2012
@@ -211,10 +211,18 @@ private[log] class Log(val dir: File, va
// they are valid, insert them in the log
lock synchronized {
- val segment = segments.view.last
- segment.messageSet.append(messages)
- maybeFlush(numberOfMessages)
- maybeRoll(segment)
+ try {
+ val segment = segments.view.last
+ segment.messageSet.append(messages)
+ maybeFlush(numberOfMessages)
+ maybeRoll(segment)
+ }
+ catch {
+ case e: IOException =>
+ fatal("Halting due to unrecoverable I/O error while handling producer request", e)
+ Runtime.getRuntime.halt(1)
+ case e2 => throw e2
+ }
}
}
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala?rev=1239740&r1=1239739&r2=1239740&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala Thu Feb 2 17:33:46 2012
@@ -25,7 +25,6 @@ import kafka.api._
import kafka.common.ErrorMapping
import kafka.utils.SystemTime
import kafka.utils.Logging
-import java.io.IOException
/**
* Logic to handle the various Kafka requests
@@ -74,15 +73,8 @@ private[kafka] class KafkaRequestHandler
catch {
case e =>
error("Error processing " + requestHandlerName + " on " + request.topic + ":" + partition, e)
- e match {
- case _: IOException =>
- fatal("Halting due to unrecoverable I/O error while handling producer request: " + e.getMessage, e)
- Runtime.getRuntime.halt(1)
- case _ =>
- }
throw e
}
- None
}
def handleFetchRequest(request: Receive): Option[Send] = {