You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/03/04 05:22:00 UTC

[28/37] git commit: KAFKA-765 Corrupted messages in produce request could shutdown the broker; reviewed by Jun Rao and Sriram Subramanian

KAFKA-765 Corrupted messages in produce request could shutdown the broker; reviewed by Jun Rao and Sriram Subramanian


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/db1cb5bb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/db1cb5bb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/db1cb5bb

Branch: refs/heads/trunk
Commit: db1cb5bbd9dbd46da67cb3bf859829ffa1d69ad8
Parents: d925b15
Author: Neha Narkhede <ne...@gmail.com>
Authored: Mon Feb 25 00:13:45 2013 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Mon Feb 25 00:13:45 2013 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/Log.scala            |    6 +++++-
 .../scala/kafka/message/ByteBufferMessageSet.scala |   10 ++++++----
 2 files changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/db1cb5bb/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 9a5f053..b2a7170 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -282,7 +282,11 @@ private[kafka] class Log(val dir: File,
             if(assignOffsets) {
               val offsetCounter = new AtomicLong(nextOffset.get)
               val firstOffset = offsetCounter.get
-              validMessages = validMessages.assignOffsets(offsetCounter, messageSetInfo.codec)
+              try {
+                validMessages = validMessages.assignOffsets(offsetCounter, messageSetInfo.codec)
+              } catch {
+                case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)
+              }
               val lastOffset = offsetCounter.get - 1
               (firstOffset, lastOffset)
             } else {

http://git-wip-us.apache.org/repos/asf/kafka/blob/db1cb5bb/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
index 873699f..80d5fe3 100644
--- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -62,11 +62,13 @@ object ByteBufferMessageSet {
     val inputStream: InputStream = new ByteBufferBackedInputStream(message.payload)
     val intermediateBuffer = new Array[Byte](1024)
     val compressed = CompressionFactory(message.compressionCodec, inputStream)
-    Stream.continually(compressed.read(intermediateBuffer)).takeWhile(_ > 0).foreach { dataRead =>
-      outputStream.write(intermediateBuffer, 0, dataRead)
+    try {
+      Stream.continually(compressed.read(intermediateBuffer)).takeWhile(_ > 0).foreach { dataRead =>
+        outputStream.write(intermediateBuffer, 0, dataRead)
+      }
+    } finally {
+      compressed.close()
     }
-    compressed.close()
-
     val outputBuffer = ByteBuffer.allocate(outputStream.size)
     outputBuffer.put(outputStream.toByteArray)
     outputBuffer.rewind