You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/08/29 16:52:39 UTC

svn commit: r1378590 - in /incubator/kafka/trunk/core/src/main/scala/kafka: log/Log.scala server/KafkaRequestHandlers.scala

Author: junrao
Date: Wed Aug 29 14:52:39 2012
New Revision: 1378590

URL: http://svn.apache.org/viewvc?rev=1378590&view=rev
Log:
Message size not checked at the server (patch v3); patched by Swapnil Ghike; reviewed by Jun Rao; KAFKA-469

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala?rev=1378590&r1=1378589&r2=1378590&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala Wed Aug 29 14:52:39 2012
@@ -255,6 +255,7 @@ private[log] class Log(val dir: File, va
     }
   }
 
+
   /**
    * Read from the log file at the given offset
    */

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala?rev=1378590&r1=1378589&r2=1378590&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala Wed Aug 29 14:52:39 2012
@@ -22,7 +22,7 @@ import kafka.log._
 import kafka.network._
 import kafka.message._
 import kafka.api._
-import kafka.common.ErrorMapping
+import kafka.common.{MessageSizeTooLargeException, ErrorMapping}
 import java.util.concurrent.atomic.AtomicLong
 import kafka.utils._
 
@@ -73,11 +73,15 @@ private[kafka] class KafkaRequestHandler
       BrokerTopicStat.getBrokerAllTopicStat.recordBytesIn(request.messages.sizeInBytes)
     }
     catch {
-      case e =>
-        error("Error processing " + requestHandlerName + " on " + request.topic + ":" + partition, e)
+      case e: MessageSizeTooLargeException =>
+        warn(e.getMessage() + " on " + request.topic + ":" + partition)
+        BrokerTopicStat.getBrokerTopicStat(request.topic).recordFailedProduceRequest
+        BrokerTopicStat.getBrokerAllTopicStat.recordFailedProduceRequest
+      case t =>
+        error("Error processing " + requestHandlerName + " on " + request.topic + ":" + partition, t)
         BrokerTopicStat.getBrokerTopicStat(request.topic).recordFailedProduceRequest
         BrokerTopicStat.getBrokerAllTopicStat.recordFailedProduceRequest
-        throw e
+        throw t
     }
   }