You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/02/20 11:02:28 UTC

svn commit: r1291168 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/proto/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/ apollo-broker/src/test/scala/...

Author: chirino
Date: Mon Feb 20 10:02:27 2012
New Revision: 1291168

URL: http://svn.apache.org/viewvc?rev=1291168&view=rev
Log:
Message size and expiration do not need to be tracked in the message record since they are tracked in the queue entry records.  Support holding a compressed encoding the the message and changed the leveldb store to compress earlier in the process to avoid bottlenecking the thread that does the IO.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStore.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/RecordLog.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireCodec.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto?rev=1291168&r1=1291167&r2=1291168&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto Mon Feb 20 10:02:27 2012
@@ -25,9 +25,8 @@ option java_multiple_files = true;
 message MessagePB {
   required int64 messageKey=1;
   required bytes protocol = 2 [java_override_type = "AsciiBuffer"];
-  required int32 size = 3;
   optional bytes value = 4;
-  optional sint64 expiration = 5;
+  optional int32 compression = 5;
   
   optional bytes direct_data = 10;
   optional bytes direct_file = 12;

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1291168&r1=1291167&r2=1291168&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Mon Feb 20 10:02:27 2012
@@ -242,7 +242,6 @@ class Delivery {
 
   def createMessageRecord() = {
     val record = message.protocol.encode(message)
-    record.size = size
     record.locator = storeLocator
     record
   }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1291168&r1=1291167&r2=1291168&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Mon Feb 20 10:02:27 2012
@@ -1776,8 +1776,8 @@ class QueueEntry(val queue:Queue, val se
 
         val delivery = new Delivery()
         delivery.seq = seq
+        delivery.size = size
         delivery.message = ProtocolFactory.get(messageRecord.protocol.toString).get.decode(messageRecord)
-        delivery.size = messageRecord.size
         delivery.storeKey = messageRecord.key
         delivery.storeLocator = messageRecord.locator
         delivery.redeliveries = redelivery_count

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala?rev=1291168&r1=1291167&r2=1291168&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala Mon Feb 20 10:02:27 2012
@@ -214,6 +214,7 @@ trait DelayingStoreSupport extends Store
       val action = new MessageAction
       action.msg = record.key
       action.message_record = record
+      on_store_requested(record)
       this.synchronized {
         actions += record.key -> action
         pending_stores.put(action.message_record.key, action)
@@ -301,6 +302,8 @@ trait DelayingStoreSupport extends Store
     rc
   }
 
+  def on_store_requested(mr:MessageRecord) = {}
+
   var metric_canceled_message_counter:Long = 0
   var metric_canceled_enqueue_counter:Long = 0
   var metric_flushed_message_counter:Long = 0

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala?rev=1291168&r1=1291167&r2=1291168&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala Mon Feb 20 10:02:27 2012
@@ -28,10 +28,9 @@ class MessageRecord {
 
   var key = -1L
   var protocol: AsciiBuffer = _
-  var size = 0
   var buffer: Buffer = _
+  var compressed: Buffer = _
   var direct_buffer: DirectBuffer = _
-  var expiration = 0L
   var locator:AtomicReference[Object] = _
 
 }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala?rev=1291168&r1=1291167&r2=1291168&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala Mon Feb 20 10:02:27 2012
@@ -31,10 +31,7 @@ object PBSupport {
     val pb = new MessagePB.Bean
     pb.setMessageKey(v.key)
     pb.setProtocol(v.protocol)
-    pb.setSize(v.size)
     pb.setValue(v.buffer)
-    if(v.expiration!=0)
-      pb.setExpiration(v.expiration)
     pb
   }
 
@@ -42,9 +39,7 @@ object PBSupport {
     val rc = new MessageRecord
     rc.key = pb.getMessageKey
     rc.protocol = pb.getProtocol
-    rc.size = pb.getSize
     rc.buffer = pb.getValue
-    rc.expiration = pb.getExpiration
     rc
   }
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala?rev=1291168&r1=1291167&r2=1291168&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala Mon Feb 20 10:02:27 2012
@@ -93,7 +93,6 @@ abstract class StoreFunSuiteSupport exte
     var message = new MessageRecord
     message.protocol = ascii("test-protocol")
     message.buffer = ascii(content).buffer
-    message.size = message.buffer.length
     message.locator = new AtomicReference[Object]()
     val key = batch.store(message)
     (key, message.locator)

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala?rev=1291168&r1=1291167&r2=1291168&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala Mon Feb 20 10:02:27 2012
@@ -39,8 +39,8 @@ import org.iq80.leveldb._
 import org.apache.activemq.apollo.broker.store.leveldb.RecordLog.LogInfo
 import org.apache.activemq.apollo.broker.store.PBSupport
 import java.util.concurrent.atomic.AtomicReference
-import org.fusesource.hawtbuf.{AsciiBuffer, Buffer, AbstractVarIntSupport}
 import org.apache.activemq.apollo.broker.store.leveldb.HelperTrait.encode_key
+import org.fusesource.hawtbuf.{DataByteArrayInputStream, AsciiBuffer, Buffer, AbstractVarIntSupport}
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -71,9 +71,6 @@ object LevelDBClient extends Log {
   final val LOG_REMOVE_QUEUE_ENTRY = 6.toByte
   final val LOG_MAP_ENTRY = 7.toByte
 
-  final val LOG_ADD_MESSAGE_SNAPPY = (LOG_ADD_MESSAGE + 100).toByte
-  final val LOG_MAP_ENTRY_SNAPPY = (LOG_MAP_ENTRY + 100).toByte
-
   final val LOG_SUFFIX = ".log"
   final val INDEX_SUFFIX = ".index"
 
@@ -450,7 +447,7 @@ class LevelDBClient(store: LevelDBStore)
                         true
                     }
 
-                  case LOG_MAP_ENTRY | LOG_MAP_ENTRY_SNAPPY =>
+                  case LOG_MAP_ENTRY =>
                     replay_operations += 1
                     val entry = MapEntryPB.FACTORY.parseUnframed(data)
                     if (entry.getValue == null) {
@@ -846,23 +843,17 @@ class LevelDBClient(store: LevelDBStore)
 
                         val pb = new MessagePB.Bean
                         pb.setProtocol(message_record.protocol)
-                        pb.setSize(message_record.size)
-                        pb.setValue(message_record.buffer)
-                        var message_data = pb.freeze().toUnframedBuffer
-
-                        val p = if (snappy_compress_logs) {
-                          val compressed = Snappy.compress(message_data)
-                          if (compressed.length < message_data.length) {
-                            message_data = compressed
-                            appender.append(LOG_ADD_MESSAGE_SNAPPY, message_data)
-                          } else {
-                            appender.append(LOG_ADD_MESSAGE, message_data)
-                          }
+
+                        val body = if(message_record.compressed!=null) {
+                          pb.setCompression(1)
+                          message_record.compressed
                         } else {
-                          appender.append(LOG_ADD_MESSAGE, message_data)
+                          message_record.buffer
                         }
-                        locator = (p._1, message_data.length)
-                        log_info = p._2
+                        var header = pb.freeze().toFramedBuffer
+
+                        val (pos, log_info) = appender.append(LOG_ADD_MESSAGE, header, body)
+                        locator = (pos, header.length + body.length)
                         message_record.locator.set(locator);
                       }
 
@@ -949,17 +940,16 @@ class LevelDBClient(store: LevelDBStore)
               val (_, locator, callback) = x
               val record = metric_load_from_index_counter.time {
                 val (pos, len) = locator.get().asInstanceOf[(Long, Int)]
-                log.read(pos, len).map {
-                  case (kind, data) =>
-
-                    val msg_data = kind match {
-                      case LOG_ADD_MESSAGE => data
-                      case LOG_ADD_MESSAGE_SNAPPY => Snappy.uncompress(data)
-                    }
-                    val rc = PBSupport.from_pb(MessagePB.FACTORY.parseUnframed(msg_data))
-                    rc.locator = locator
-                    assert(rc.protocol != null)
-                    rc
+                log.read(pos, len).map { data =>
+                  val is = new DataByteArrayInputStream(data)
+                  val pb = MessagePB.FACTORY.parseFramed(is)
+                  val rc = PBSupport.from_pb(pb)
+                  rc.buffer = is.readBuffer(is.available())
+                  rc.locator = locator
+                  if(pb.getCompression == 1) {
+                    rc.buffer = Snappy.uncompress(rc.buffer)
+                  }
+                  rc
                 }
               }
               if (record.isDefined) {
@@ -986,16 +976,16 @@ class LevelDBClient(store: LevelDBStore)
               val (_, locator, callback) = x
               val record: Option[MessageRecord] = metric_load_from_index_counter.time {
                 val (pos, len) = locator.get().asInstanceOf[(Long, Int)]
-                log.read(pos, len).map {
-                  case (kind, data) =>
-                    val msg_data = kind match {
-                      case LOG_ADD_MESSAGE => data
-                      case LOG_ADD_MESSAGE_SNAPPY => Snappy.uncompress(data)
-                    }
-                    val rc = PBSupport.from_pb(MessagePB.FACTORY.parseUnframed(msg_data))
-                    rc.locator = locator
-                    assert(rc.protocol != null)
-                    rc
+                log.read(pos, len).map { data=>
+                  val is = new DataByteArrayInputStream(data)
+                  val pb = MessagePB.FACTORY.parseFramed(is)
+                  val rc = PBSupport.from_pb(pb)
+                  rc.buffer = is.readBuffer(is.available())
+                  rc.locator = locator
+                  if(pb.getCompression == 1) {
+                    rc.buffer = Snappy.uncompress(rc.buffer)
+                  }
+                  rc
                 }
               }
               callback(record)
@@ -1267,15 +1257,16 @@ class LevelDBClient(store: LevelDBStore)
               (key, value) =>
                 val (_, pos) = decode_long_key(key)
                 val len = decode_vlong(value).toInt
-                log.read(pos, len).foreach {
-                  case (kind, data) =>
-                    val msg_data = kind match {
-                      case LOG_ADD_MESSAGE => data
-                      case LOG_ADD_MESSAGE_SNAPPY => Snappy.uncompress(data)
-                    }
-                    val record = MessagePB.FACTORY.parseUnframed(msg_data).copy()
-                    record.setMessageKey(pos)
-                    manager.store_message(record)
+                log.read(pos, len).foreach { data =>
+                  val is = new DataByteArrayInputStream(data)
+                  val record = MessagePB.FACTORY.parseFramed(is).copy()
+                  var buffer = is.readBuffer(is.available())
+                  if(record.getCompression == 1) {
+                    buffer = Snappy.uncompress(buffer)
+                  }
+                  record.setMessageKey(pos)
+                  record.setValue(buffer)
+                  manager.store_message(record)
                 }
                 true
             }
@@ -1347,19 +1338,22 @@ class LevelDBClient(store: LevelDBStore)
             while (manager.getNext match {
 
               case record: MessagePB.Buffer =>
-                var message_data = record.toUnframedBuffer
-                val (pos, _) = if (snappy_compress_logs) {
-                  val compressed = Snappy.compress(message_data)
-                  if (compressed.length < message_data.length) {
-                    message_data = compressed
-                    appender.append(LOG_ADD_MESSAGE_SNAPPY, message_data)
+                val pb = new MessagePB.Bean
+                pb.setProtocol(record.getProtocol)
+                val body = if(snappy_compress_logs) {
+                  val compressed = Snappy.compress(record.getValue)
+                  if (compressed.length < record.getValue.length) {
+                    pb.setCompression(1)
+                    compressed
                   } else {
-                    appender.append(LOG_ADD_MESSAGE, message_data)
+                    record.getValue
                   }
                 } else {
-                  appender.append(LOG_ADD_MESSAGE, message_data)
+                  record.getValue
                 }
-                index.put(encode_key(tmp_prefix, record.getMessageKey), encode_locator(pos, message_data.length))
+                var header = pb.freeze().toFramedBuffer
+                val (pos, log_info) = appender.append(LOG_ADD_MESSAGE, header, body)
+                index.put(encode_key(tmp_prefix, record.getMessageKey), encode_locator(pos, header.length+body.length))
                 true
 
               case record: QueueEntryPB.Buffer =>

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStore.scala?rev=1291168&r1=1291167&r2=1291168&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStore.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStore.scala Mon Feb 20 10:02:27 2012
@@ -64,6 +64,16 @@ class LevelDBStore(val config: LevelDBSt
 
   protected def get_next_msg_key = next_msg_key.getAndIncrement
 
+
+  override def on_store_requested(mr: MessageRecord) = {
+    if( client.snappy_compress_logs && mr.compressed==null ) {
+      val compressed = Snappy.compress(mr.buffer)
+      if (compressed.length < mr.buffer.length) {
+        mr.compressed = compressed
+      }
+    }
+  }
+
   protected def store(uows: Seq[DelayableUOW])(callback: => Unit) = {
     write_executor {
       client.store(uows, ^ {

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/RecordLog.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/RecordLog.scala?rev=1291168&r1=1291167&r2=1291168&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/RecordLog.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/RecordLog.scala Mon Feb 20 10:02:27 2012
@@ -99,9 +99,11 @@ case class RecordLog(directory: File, lo
     file.delete()
   }
 
-  def checksum(data: Buffer): Int = {
+  def checksum(data: Buffer*): Int = {
     val checksum = new CRC32
-    checksum.update(data.data, data.offset, data.length)
+    data.foreach { data =>
+      checksum.update(data.data, data.offset, data.length)
+    }
     (checksum.getValue & 0xFFFFFFFF).toInt
   }
 
@@ -145,47 +147,51 @@ case class RecordLog(directory: File, lo
     /**
      * returns the offset position of the data record.
      */
-    def append(id: Byte, data: Buffer) = this.synchronized {
+    def append(id: Byte, data: Buffer*) = this.synchronized {
       val record_position = append_position
-      val data_length = data.length
+      var data_length = 0
+      data.foreach(data_length += _.length)
       val total_length = LOG_HEADER_SIZE + data_length
 
       if (write_buffer.position() + total_length > BUFFER_SIZE) {
         flush
       }
 
-      val cs: Int = checksum(data)
+      val cs: Int = checksum(data:_*)
       //      trace("Writing at: "+record_position+" len: "+data_length+" with checksum: "+cs)
 
-      if (false && total_length > BYPASS_BUFFER_SIZE) {
-
-        // Write the header and flush..
+//      if (false && total_length > BYPASS_BUFFER_SIZE) {
+//
+//        // Write the header and flush..
+//        write_buffer.writeByte(LOG_HEADER_PREFIX)
+//        write_buffer.writeByte(id)
+//        write_buffer.writeInt(cs)
+//        write_buffer.writeInt(data_length)
+//
+//        append_offset += LOG_HEADER_SIZE
+//        flush
+//
+//        // Directly write the data to the channel since it's large.
+//        val buffer = data.toByteBuffer
+//        val pos = append_offset + LOG_HEADER_SIZE
+//        flushed_offset.addAndGet(buffer.remaining)
+//        channel.write(buffer, pos)
+//        if (buffer.hasRemaining) {
+//          throw new IOException("Short write")
+//        }
+//        append_offset += data_length
+//
+//      } else {
         write_buffer.writeByte(LOG_HEADER_PREFIX)
         write_buffer.writeByte(id)
         write_buffer.writeInt(cs)
         write_buffer.writeInt(data_length)
-
-        append_offset += LOG_HEADER_SIZE
-        flush
-
-        // Directly write the data to the channel since it's large.
-        val buffer = data.toByteBuffer
-        val pos = append_offset + LOG_HEADER_SIZE
-        flushed_offset.addAndGet(buffer.remaining)
-        channel.write(buffer, pos)
-        if (buffer.hasRemaining) {
-          throw new IOException("Short write")
+        data.foreach { data=>
+          write_buffer.write(data.data, data.offset, data.length)
         }
-        append_offset += data_length
 
-      } else {
-        write_buffer.writeByte(LOG_HEADER_PREFIX)
-        write_buffer.writeByte(id)
-        write_buffer.writeInt(cs)
-        write_buffer.writeInt(data_length)
-        write_buffer.write(data.data, data.offset, data_length)
         append_offset += total_length
-      }
+//      }
       (record_position, info)
     }
 
@@ -231,13 +237,13 @@ case class RecordLog(directory: File, lo
 
       check_read_flush(offset + LOG_HEADER_SIZE + length)
 
-      val record = new Buffer(LOG_HEADER_SIZE + length)
-      if (channel.read(record.toByteBuffer, offset) != record.length) {
-        throw new IOException("short record at position: " + record_position + " in file: " + file + ", offset: " + offset)
-      }
-
       if (verify_checksums) {
 
+        val record = new Buffer(LOG_HEADER_SIZE + length)
+        if (channel.read(record.toByteBuffer, offset) != record.length) {
+          throw new IOException("short record at position: " + record_position + " in file: " + file + ", offset: " + offset)
+        }
+
         def record_is_not_changing = {
           using(open) {
             fd =>
@@ -271,11 +277,13 @@ case class RecordLog(directory: File, lo
           }
         }
 
-        (kind, data)
+        data
       } else {
-        val kind = record.get(1)
-        record.moveHead(LOG_HEADER_SIZE)
-        (kind, record)
+        val record = new Buffer(length)
+        if (channel.read(record.toByteBuffer, offset+LOG_HEADER_SIZE) != record.length) {
+          throw new IOException("short record at position: " + record_position + " in file: " + file + ", offset: " + offset)
+        }
+        record
       }
     }
 

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireCodec.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireCodec.scala?rev=1291168&r1=1291167&r2=1291168&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireCodec.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireCodec.scala Mon Feb 20 10:02:27 2012
@@ -45,7 +45,6 @@ object OpenwireCodec extends Sizer[Comma
   def encode(message: Message):MessageRecord = {
     val rc = new MessageRecord
     rc.protocol = PROTOCOL
-    rc.expiration = message.expiration
 
     val msg = message.asInstanceOf[OpenwireMessage];
     rc.buffer = msg.message.getCachedEncoding match {

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala?rev=1291168&r1=1291167&r2=1291168&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala Mon Feb 20 10:02:27 2012
@@ -42,7 +42,6 @@ object StompCodec extends Log {
 
     val rc = new MessageRecord
     rc.protocol = PROTOCOL
-    rc.expiration = message.expiration
 
     if( frame.content.isInstanceOf[ZeroCopyContent] ) {
       rc.direct_buffer = frame.content.asInstanceOf[ZeroCopyContent].zero_copy_buffer