You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/03/04 05:22:00 UTC
[28/37] git commit: KAFKA-765 Corrupted messages in produce request
could shutdown the broker; reviewed by Jun Rao and Sriram Subramanian
KAFKA-765 Corrupted messages in produce request could shutdown the broker; reviewed by Jun Rao and Sriram Subramanian
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/db1cb5bb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/db1cb5bb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/db1cb5bb
Branch: refs/heads/trunk
Commit: db1cb5bbd9dbd46da67cb3bf859829ffa1d69ad8
Parents: d925b15
Author: Neha Narkhede <ne...@gmail.com>
Authored: Mon Feb 25 00:13:45 2013 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Mon Feb 25 00:13:45 2013 -0800
----------------------------------------------------------------------
core/src/main/scala/kafka/log/Log.scala | 6 +++++-
.../scala/kafka/message/ByteBufferMessageSet.scala | 10 ++++++----
2 files changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/db1cb5bb/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 9a5f053..b2a7170 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -282,7 +282,11 @@ private[kafka] class Log(val dir: File,
if(assignOffsets) {
val offsetCounter = new AtomicLong(nextOffset.get)
val firstOffset = offsetCounter.get
- validMessages = validMessages.assignOffsets(offsetCounter, messageSetInfo.codec)
+ try {
+ validMessages = validMessages.assignOffsets(offsetCounter, messageSetInfo.codec)
+ } catch {
+ case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)
+ }
val lastOffset = offsetCounter.get - 1
(firstOffset, lastOffset)
} else {
http://git-wip-us.apache.org/repos/asf/kafka/blob/db1cb5bb/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
index 873699f..80d5fe3 100644
--- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -62,11 +62,13 @@ object ByteBufferMessageSet {
val inputStream: InputStream = new ByteBufferBackedInputStream(message.payload)
val intermediateBuffer = new Array[Byte](1024)
val compressed = CompressionFactory(message.compressionCodec, inputStream)
- Stream.continually(compressed.read(intermediateBuffer)).takeWhile(_ > 0).foreach { dataRead =>
- outputStream.write(intermediateBuffer, 0, dataRead)
+ try {
+ Stream.continually(compressed.read(intermediateBuffer)).takeWhile(_ > 0).foreach { dataRead =>
+ outputStream.write(intermediateBuffer, 0, dataRead)
+ }
+ } finally {
+ compressed.close()
}
- compressed.close()
-
val outputBuffer = ByteBuffer.allocate(outputStream.size)
outputBuffer.put(outputStream.toByteArray)
outputBuffer.rewind