You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/03/04 05:22:00 UTC
[29/37] git commit: Minor commit post KAFKA-765
Minor commit post KAFKA-765
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/76a4fbfb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/76a4fbfb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/76a4fbfb
Branch: refs/heads/trunk
Commit: 76a4fbfb13a3541960c8c9312defdb69fa4c3f32
Parents: db1cb5b
Author: Neha Narkhede <ne...@gmail.com>
Authored: Mon Feb 25 09:53:40 2013 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Mon Feb 25 09:53:40 2013 -0800
----------------------------------------------------------------------
.../scala/kafka/message/ByteBufferMessageSet.scala | 16 ++++++++------
1 files changed, 9 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/76a4fbfb/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
index 80d5fe3..03590ad 100644
--- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -40,14 +40,16 @@ object ByteBufferMessageSet {
val byteArrayStream = new ByteArrayOutputStream(MessageSet.messageSetSize(messages))
val output = new DataOutputStream(CompressionFactory(compressionCodec, byteArrayStream))
var offset = -1L
- for(message <- messages) {
- offset = offsetCounter.getAndIncrement
- output.writeLong(offset)
- output.writeInt(message.size)
- output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit)
+ try {
+ for(message <- messages) {
+ offset = offsetCounter.getAndIncrement
+ output.writeLong(offset)
+ output.writeInt(message.size)
+ output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit)
+ }
+ } finally {
+ output.close()
}
- output.close()
-
val bytes = byteArrayStream.toByteArray
val message = new Message(bytes, compressionCodec)
val buffer = ByteBuffer.allocate(message.size + MessageSet.LogOverhead)