You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/08/28 20:42:00 UTC
svn commit: r1378264 -
/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/Message.scala
Author: junrao
Date: Tue Aug 28 18:42:00 2012
New Revision: 1378264
URL: http://svn.apache.org/viewvc?rev=1378264&view=rev
Log:
Add constructor for message which takes both byte array offset and length; patched by Graham Sanderson; reviewed by Jun Rao; KAFKA-393
Modified:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/Message.scala
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/Message.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/Message.scala?rev=1378264&r1=1378263&r2=1378264&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/Message.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/Message.scala Tue Aug 28 18:42:00 2012
@@ -84,10 +84,10 @@ object Message {
class Message(val buffer: ByteBuffer) {
import kafka.message.Message._
-
-
- private def this(checksum: Long, bytes: Array[Byte], compressionCodec: CompressionCodec) = {
- this(ByteBuffer.allocate(Message.headerSize(Message.CurrentMagicValue) + bytes.length))
+
+
+ private def this(checksum: Long, bytes: Array[Byte], offset: Int, size: Int, compressionCodec: CompressionCodec) = {
+ this(ByteBuffer.allocate(Message.headerSize(Message.CurrentMagicValue) + size))
buffer.put(CurrentMagicValue)
var attributes:Byte = 0
if (compressionCodec.codec > 0) {
@@ -95,18 +95,22 @@ class Message(val buffer: ByteBuffer) {
}
buffer.put(attributes)
Utils.putUnsignedInt(buffer, checksum)
- buffer.put(bytes)
+ buffer.put(bytes, offset, size)
buffer.rewind()
}
- def this(checksum:Long, bytes:Array[Byte]) = this(checksum, bytes, NoCompressionCodec)
-
- def this(bytes: Array[Byte], compressionCodec: CompressionCodec) = {
+ def this(checksum:Long, bytes:Array[Byte]) = this(checksum, bytes, 0, bytes.length, NoCompressionCodec)
+
+ def this(bytes: Array[Byte], offset: Int, size: Int, compressionCodec: CompressionCodec) = {
//Note: we're not crc-ing the attributes header, so we're susceptible to bit-flipping there
- this(Utils.crc32(bytes), bytes, compressionCodec)
+ this(Utils.crc32(bytes, offset, size), bytes, offset, size, compressionCodec)
}
- def this(bytes: Array[Byte]) = this(bytes, NoCompressionCodec)
+ def this(bytes: Array[Byte], compressionCodec: CompressionCodec) = this(bytes, 0, bytes.length, compressionCodec)
+
+ def this(bytes: Array[Byte], offset: Int, size: Int) = this(bytes, offset, size, NoCompressionCodec)
+
+ def this(bytes: Array[Byte]) = this(bytes, 0, bytes.length, NoCompressionCodec)
def size: Int = buffer.limit