You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/08/29 16:52:39 UTC
svn commit: r1378590 - in /incubator/kafka/trunk/core/src/main/scala/kafka:
log/Log.scala server/KafkaRequestHandlers.scala
Author: junrao
Date: Wed Aug 29 14:52:39 2012
New Revision: 1378590
URL: http://svn.apache.org/viewvc?rev=1378590&view=rev
Log:
Message size not checked at the server (patch v3); patched by Swapnil Ghike; reviewed by Jun Rao; KAFKA-469
Modified:
incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala
incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala?rev=1378590&r1=1378589&r2=1378590&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala Wed Aug 29 14:52:39 2012
@@ -255,6 +255,7 @@ private[log] class Log(val dir: File, va
}
}
+
/**
* Read from the log file at the given offset
*/
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala?rev=1378590&r1=1378589&r2=1378590&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala Wed Aug 29 14:52:39 2012
@@ -22,7 +22,7 @@ import kafka.log._
import kafka.network._
import kafka.message._
import kafka.api._
-import kafka.common.ErrorMapping
+import kafka.common.{MessageSizeTooLargeException, ErrorMapping}
import java.util.concurrent.atomic.AtomicLong
import kafka.utils._
@@ -73,11 +73,15 @@ private[kafka] class KafkaRequestHandler
BrokerTopicStat.getBrokerAllTopicStat.recordBytesIn(request.messages.sizeInBytes)
}
catch {
- case e =>
- error("Error processing " + requestHandlerName + " on " + request.topic + ":" + partition, e)
+ case e: MessageSizeTooLargeException =>
+ warn(e.getMessage() + " on " + request.topic + ":" + partition)
+ BrokerTopicStat.getBrokerTopicStat(request.topic).recordFailedProduceRequest
+ BrokerTopicStat.getBrokerAllTopicStat.recordFailedProduceRequest
+ case t =>
+ error("Error processing " + requestHandlerName + " on " + request.topic + ":" + partition, t)
BrokerTopicStat.getBrokerTopicStat(request.topic).recordFailedProduceRequest
BrokerTopicStat.getBrokerAllTopicStat.recordFailedProduceRequest
- throw e
+ throw t
}
}