You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/08/28 20:42:00 UTC

svn commit: r1378264 - /incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/Message.scala

Author: junrao
Date: Tue Aug 28 18:42:00 2012
New Revision: 1378264

URL: http://svn.apache.org/viewvc?rev=1378264&view=rev
Log:
Add constructor for message which takes both byte array offset and length; patched by Graham Sanderson; reviewed by Jun Rao; KAFKA-393

Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/Message.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/Message.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/Message.scala?rev=1378264&r1=1378263&r2=1378264&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/Message.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/Message.scala Tue Aug 28 18:42:00 2012
@@ -84,10 +84,10 @@ object Message {
 class Message(val buffer: ByteBuffer) {
   
   import kafka.message.Message._
-    
-  
-  private def this(checksum: Long, bytes: Array[Byte], compressionCodec: CompressionCodec) = {
-    this(ByteBuffer.allocate(Message.headerSize(Message.CurrentMagicValue) + bytes.length))
+
+
+  private def this(checksum: Long, bytes: Array[Byte], offset: Int, size: Int, compressionCodec: CompressionCodec) = {
+    this(ByteBuffer.allocate(Message.headerSize(Message.CurrentMagicValue) + size))
     buffer.put(CurrentMagicValue)
     var attributes:Byte = 0
     if (compressionCodec.codec > 0) {
@@ -95,18 +95,22 @@ class Message(val buffer: ByteBuffer) {
     }
     buffer.put(attributes)
     Utils.putUnsignedInt(buffer, checksum)
-    buffer.put(bytes)
+    buffer.put(bytes, offset, size)
     buffer.rewind()
   }
 
-  def this(checksum:Long, bytes:Array[Byte]) = this(checksum, bytes, NoCompressionCodec)
-  
-  def this(bytes: Array[Byte], compressionCodec: CompressionCodec) = {
+  def this(checksum:Long, bytes:Array[Byte]) = this(checksum, bytes, 0, bytes.length, NoCompressionCodec)
+
+  def this(bytes: Array[Byte], offset: Int, size: Int, compressionCodec: CompressionCodec) = {
     //Note: we're not crc-ing the attributes header, so we're susceptible to bit-flipping there
-    this(Utils.crc32(bytes), bytes, compressionCodec)
+    this(Utils.crc32(bytes, offset, size), bytes, offset, size, compressionCodec)
   }
 
-  def this(bytes: Array[Byte]) = this(bytes, NoCompressionCodec)
+  def this(bytes: Array[Byte], compressionCodec: CompressionCodec) = this(bytes, 0, bytes.length, compressionCodec)
+
+  def this(bytes: Array[Byte], offset: Int, size: Int) = this(bytes, offset, size, NoCompressionCodec)
+
+  def this(bytes: Array[Byte]) = this(bytes, 0, bytes.length, NoCompressionCodec)
   
   def size: Int = buffer.limit