You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/03/04 05:22:00 UTC

[29/37] git commit: Minor commit post KAFKA-765

Minor commit post KAFKA-765


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/76a4fbfb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/76a4fbfb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/76a4fbfb

Branch: refs/heads/trunk
Commit: 76a4fbfb13a3541960c8c9312defdb69fa4c3f32
Parents: db1cb5b
Author: Neha Narkhede <ne...@gmail.com>
Authored: Mon Feb 25 09:53:40 2013 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Mon Feb 25 09:53:40 2013 -0800

----------------------------------------------------------------------
 .../scala/kafka/message/ByteBufferMessageSet.scala |   16 ++++++++------
 1 files changed, 9 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/76a4fbfb/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
index 80d5fe3..03590ad 100644
--- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -40,14 +40,16 @@ object ByteBufferMessageSet {
       val byteArrayStream = new ByteArrayOutputStream(MessageSet.messageSetSize(messages))
       val output = new DataOutputStream(CompressionFactory(compressionCodec, byteArrayStream))
       var offset = -1L
-      for(message <- messages) {
-        offset = offsetCounter.getAndIncrement
-        output.writeLong(offset)
-        output.writeInt(message.size)
-        output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit)
+      try {
+        for(message <- messages) {
+          offset = offsetCounter.getAndIncrement
+          output.writeLong(offset)
+          output.writeInt(message.size)
+          output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit)
+        }
+      } finally {
+        output.close()
       }
-      output.close()
-      
       val bytes = byteArrayStream.toByteArray
       val message = new Message(bytes, compressionCodec)
       val buffer = ByteBuffer.allocate(message.size + MessageSet.LogOverhead)