You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/02/20 11:02:28 UTC
svn commit: r1291168 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/main/proto/
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/
apollo-broker/src/test/scala/...
Author: chirino
Date: Mon Feb 20 10:02:27 2012
New Revision: 1291168
URL: http://svn.apache.org/viewvc?rev=1291168&view=rev
Log:
Message size and expiration do not need to be tracked in the message record since they are tracked in the queue entry records. Support holding a compressed encoding the the message and changed the leveldb store to compress earlier in the process to avoid bottlenecking the thread that does the IO.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStore.scala
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/RecordLog.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireCodec.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto?rev=1291168&r1=1291167&r2=1291168&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto Mon Feb 20 10:02:27 2012
@@ -25,9 +25,8 @@ option java_multiple_files = true;
message MessagePB {
required int64 messageKey=1;
required bytes protocol = 2 [java_override_type = "AsciiBuffer"];
- required int32 size = 3;
optional bytes value = 4;
- optional sint64 expiration = 5;
+ optional int32 compression = 5;
optional bytes direct_data = 10;
optional bytes direct_file = 12;
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1291168&r1=1291167&r2=1291168&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Mon Feb 20 10:02:27 2012
@@ -242,7 +242,6 @@ class Delivery {
def createMessageRecord() = {
val record = message.protocol.encode(message)
- record.size = size
record.locator = storeLocator
record
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1291168&r1=1291167&r2=1291168&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Mon Feb 20 10:02:27 2012
@@ -1776,8 +1776,8 @@ class QueueEntry(val queue:Queue, val se
val delivery = new Delivery()
delivery.seq = seq
+ delivery.size = size
delivery.message = ProtocolFactory.get(messageRecord.protocol.toString).get.decode(messageRecord)
- delivery.size = messageRecord.size
delivery.storeKey = messageRecord.key
delivery.storeLocator = messageRecord.locator
delivery.redeliveries = redelivery_count
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala?rev=1291168&r1=1291167&r2=1291168&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala Mon Feb 20 10:02:27 2012
@@ -214,6 +214,7 @@ trait DelayingStoreSupport extends Store
val action = new MessageAction
action.msg = record.key
action.message_record = record
+ on_store_requested(record)
this.synchronized {
actions += record.key -> action
pending_stores.put(action.message_record.key, action)
@@ -301,6 +302,8 @@ trait DelayingStoreSupport extends Store
rc
}
+ def on_store_requested(mr:MessageRecord) = {}
+
var metric_canceled_message_counter:Long = 0
var metric_canceled_enqueue_counter:Long = 0
var metric_flushed_message_counter:Long = 0
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala?rev=1291168&r1=1291167&r2=1291168&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala Mon Feb 20 10:02:27 2012
@@ -28,10 +28,9 @@ class MessageRecord {
var key = -1L
var protocol: AsciiBuffer = _
- var size = 0
var buffer: Buffer = _
+ var compressed: Buffer = _
var direct_buffer: DirectBuffer = _
- var expiration = 0L
var locator:AtomicReference[Object] = _
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala?rev=1291168&r1=1291167&r2=1291168&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala Mon Feb 20 10:02:27 2012
@@ -31,10 +31,7 @@ object PBSupport {
val pb = new MessagePB.Bean
pb.setMessageKey(v.key)
pb.setProtocol(v.protocol)
- pb.setSize(v.size)
pb.setValue(v.buffer)
- if(v.expiration!=0)
- pb.setExpiration(v.expiration)
pb
}
@@ -42,9 +39,7 @@ object PBSupport {
val rc = new MessageRecord
rc.key = pb.getMessageKey
rc.protocol = pb.getProtocol
- rc.size = pb.getSize
rc.buffer = pb.getValue
- rc.expiration = pb.getExpiration
rc
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala?rev=1291168&r1=1291167&r2=1291168&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala Mon Feb 20 10:02:27 2012
@@ -93,7 +93,6 @@ abstract class StoreFunSuiteSupport exte
var message = new MessageRecord
message.protocol = ascii("test-protocol")
message.buffer = ascii(content).buffer
- message.size = message.buffer.length
message.locator = new AtomicReference[Object]()
val key = batch.store(message)
(key, message.locator)
Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala?rev=1291168&r1=1291167&r2=1291168&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala Mon Feb 20 10:02:27 2012
@@ -39,8 +39,8 @@ import org.iq80.leveldb._
import org.apache.activemq.apollo.broker.store.leveldb.RecordLog.LogInfo
import org.apache.activemq.apollo.broker.store.PBSupport
import java.util.concurrent.atomic.AtomicReference
-import org.fusesource.hawtbuf.{AsciiBuffer, Buffer, AbstractVarIntSupport}
import org.apache.activemq.apollo.broker.store.leveldb.HelperTrait.encode_key
+import org.fusesource.hawtbuf.{DataByteArrayInputStream, AsciiBuffer, Buffer, AbstractVarIntSupport}
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -71,9 +71,6 @@ object LevelDBClient extends Log {
final val LOG_REMOVE_QUEUE_ENTRY = 6.toByte
final val LOG_MAP_ENTRY = 7.toByte
- final val LOG_ADD_MESSAGE_SNAPPY = (LOG_ADD_MESSAGE + 100).toByte
- final val LOG_MAP_ENTRY_SNAPPY = (LOG_MAP_ENTRY + 100).toByte
-
final val LOG_SUFFIX = ".log"
final val INDEX_SUFFIX = ".index"
@@ -450,7 +447,7 @@ class LevelDBClient(store: LevelDBStore)
true
}
- case LOG_MAP_ENTRY | LOG_MAP_ENTRY_SNAPPY =>
+ case LOG_MAP_ENTRY =>
replay_operations += 1
val entry = MapEntryPB.FACTORY.parseUnframed(data)
if (entry.getValue == null) {
@@ -846,23 +843,17 @@ class LevelDBClient(store: LevelDBStore)
val pb = new MessagePB.Bean
pb.setProtocol(message_record.protocol)
- pb.setSize(message_record.size)
- pb.setValue(message_record.buffer)
- var message_data = pb.freeze().toUnframedBuffer
-
- val p = if (snappy_compress_logs) {
- val compressed = Snappy.compress(message_data)
- if (compressed.length < message_data.length) {
- message_data = compressed
- appender.append(LOG_ADD_MESSAGE_SNAPPY, message_data)
- } else {
- appender.append(LOG_ADD_MESSAGE, message_data)
- }
+
+ val body = if(message_record.compressed!=null) {
+ pb.setCompression(1)
+ message_record.compressed
} else {
- appender.append(LOG_ADD_MESSAGE, message_data)
+ message_record.buffer
}
- locator = (p._1, message_data.length)
- log_info = p._2
+ var header = pb.freeze().toFramedBuffer
+
+ val (pos, log_info) = appender.append(LOG_ADD_MESSAGE, header, body)
+ locator = (pos, header.length + body.length)
message_record.locator.set(locator);
}
@@ -949,17 +940,16 @@ class LevelDBClient(store: LevelDBStore)
val (_, locator, callback) = x
val record = metric_load_from_index_counter.time {
val (pos, len) = locator.get().asInstanceOf[(Long, Int)]
- log.read(pos, len).map {
- case (kind, data) =>
-
- val msg_data = kind match {
- case LOG_ADD_MESSAGE => data
- case LOG_ADD_MESSAGE_SNAPPY => Snappy.uncompress(data)
- }
- val rc = PBSupport.from_pb(MessagePB.FACTORY.parseUnframed(msg_data))
- rc.locator = locator
- assert(rc.protocol != null)
- rc
+ log.read(pos, len).map { data =>
+ val is = new DataByteArrayInputStream(data)
+ val pb = MessagePB.FACTORY.parseFramed(is)
+ val rc = PBSupport.from_pb(pb)
+ rc.buffer = is.readBuffer(is.available())
+ rc.locator = locator
+ if(pb.getCompression == 1) {
+ rc.buffer = Snappy.uncompress(rc.buffer)
+ }
+ rc
}
}
if (record.isDefined) {
@@ -986,16 +976,16 @@ class LevelDBClient(store: LevelDBStore)
val (_, locator, callback) = x
val record: Option[MessageRecord] = metric_load_from_index_counter.time {
val (pos, len) = locator.get().asInstanceOf[(Long, Int)]
- log.read(pos, len).map {
- case (kind, data) =>
- val msg_data = kind match {
- case LOG_ADD_MESSAGE => data
- case LOG_ADD_MESSAGE_SNAPPY => Snappy.uncompress(data)
- }
- val rc = PBSupport.from_pb(MessagePB.FACTORY.parseUnframed(msg_data))
- rc.locator = locator
- assert(rc.protocol != null)
- rc
+ log.read(pos, len).map { data=>
+ val is = new DataByteArrayInputStream(data)
+ val pb = MessagePB.FACTORY.parseFramed(is)
+ val rc = PBSupport.from_pb(pb)
+ rc.buffer = is.readBuffer(is.available())
+ rc.locator = locator
+ if(pb.getCompression == 1) {
+ rc.buffer = Snappy.uncompress(rc.buffer)
+ }
+ rc
}
}
callback(record)
@@ -1267,15 +1257,16 @@ class LevelDBClient(store: LevelDBStore)
(key, value) =>
val (_, pos) = decode_long_key(key)
val len = decode_vlong(value).toInt
- log.read(pos, len).foreach {
- case (kind, data) =>
- val msg_data = kind match {
- case LOG_ADD_MESSAGE => data
- case LOG_ADD_MESSAGE_SNAPPY => Snappy.uncompress(data)
- }
- val record = MessagePB.FACTORY.parseUnframed(msg_data).copy()
- record.setMessageKey(pos)
- manager.store_message(record)
+ log.read(pos, len).foreach { data =>
+ val is = new DataByteArrayInputStream(data)
+ val record = MessagePB.FACTORY.parseFramed(is).copy()
+ var buffer = is.readBuffer(is.available())
+ if(record.getCompression == 1) {
+ buffer = Snappy.uncompress(buffer)
+ }
+ record.setMessageKey(pos)
+ record.setValue(buffer)
+ manager.store_message(record)
}
true
}
@@ -1347,19 +1338,22 @@ class LevelDBClient(store: LevelDBStore)
while (manager.getNext match {
case record: MessagePB.Buffer =>
- var message_data = record.toUnframedBuffer
- val (pos, _) = if (snappy_compress_logs) {
- val compressed = Snappy.compress(message_data)
- if (compressed.length < message_data.length) {
- message_data = compressed
- appender.append(LOG_ADD_MESSAGE_SNAPPY, message_data)
+ val pb = new MessagePB.Bean
+ pb.setProtocol(record.getProtocol)
+ val body = if(snappy_compress_logs) {
+ val compressed = Snappy.compress(record.getValue)
+ if (compressed.length < record.getValue.length) {
+ pb.setCompression(1)
+ compressed
} else {
- appender.append(LOG_ADD_MESSAGE, message_data)
+ record.getValue
}
} else {
- appender.append(LOG_ADD_MESSAGE, message_data)
+ record.getValue
}
- index.put(encode_key(tmp_prefix, record.getMessageKey), encode_locator(pos, message_data.length))
+ var header = pb.freeze().toFramedBuffer
+ val (pos, log_info) = appender.append(LOG_ADD_MESSAGE, header, body)
+ index.put(encode_key(tmp_prefix, record.getMessageKey), encode_locator(pos, header.length+body.length))
true
case record: QueueEntryPB.Buffer =>
Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStore.scala?rev=1291168&r1=1291167&r2=1291168&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStore.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStore.scala Mon Feb 20 10:02:27 2012
@@ -64,6 +64,16 @@ class LevelDBStore(val config: LevelDBSt
protected def get_next_msg_key = next_msg_key.getAndIncrement
+
+ override def on_store_requested(mr: MessageRecord) = {
+ if( client.snappy_compress_logs && mr.compressed==null ) {
+ val compressed = Snappy.compress(mr.buffer)
+ if (compressed.length < mr.buffer.length) {
+ mr.compressed = compressed
+ }
+ }
+ }
+
protected def store(uows: Seq[DelayableUOW])(callback: => Unit) = {
write_executor {
client.store(uows, ^ {
Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/RecordLog.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/RecordLog.scala?rev=1291168&r1=1291167&r2=1291168&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/RecordLog.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/RecordLog.scala Mon Feb 20 10:02:27 2012
@@ -99,9 +99,11 @@ case class RecordLog(directory: File, lo
file.delete()
}
- def checksum(data: Buffer): Int = {
+ def checksum(data: Buffer*): Int = {
val checksum = new CRC32
- checksum.update(data.data, data.offset, data.length)
+ data.foreach { data =>
+ checksum.update(data.data, data.offset, data.length)
+ }
(checksum.getValue & 0xFFFFFFFF).toInt
}
@@ -145,47 +147,51 @@ case class RecordLog(directory: File, lo
/**
* returns the offset position of the data record.
*/
- def append(id: Byte, data: Buffer) = this.synchronized {
+ def append(id: Byte, data: Buffer*) = this.synchronized {
val record_position = append_position
- val data_length = data.length
+ var data_length = 0
+ data.foreach(data_length += _.length)
val total_length = LOG_HEADER_SIZE + data_length
if (write_buffer.position() + total_length > BUFFER_SIZE) {
flush
}
- val cs: Int = checksum(data)
+ val cs: Int = checksum(data:_*)
// trace("Writing at: "+record_position+" len: "+data_length+" with checksum: "+cs)
- if (false && total_length > BYPASS_BUFFER_SIZE) {
-
- // Write the header and flush..
+// if (false && total_length > BYPASS_BUFFER_SIZE) {
+//
+// // Write the header and flush..
+// write_buffer.writeByte(LOG_HEADER_PREFIX)
+// write_buffer.writeByte(id)
+// write_buffer.writeInt(cs)
+// write_buffer.writeInt(data_length)
+//
+// append_offset += LOG_HEADER_SIZE
+// flush
+//
+// // Directly write the data to the channel since it's large.
+// val buffer = data.toByteBuffer
+// val pos = append_offset + LOG_HEADER_SIZE
+// flushed_offset.addAndGet(buffer.remaining)
+// channel.write(buffer, pos)
+// if (buffer.hasRemaining) {
+// throw new IOException("Short write")
+// }
+// append_offset += data_length
+//
+// } else {
write_buffer.writeByte(LOG_HEADER_PREFIX)
write_buffer.writeByte(id)
write_buffer.writeInt(cs)
write_buffer.writeInt(data_length)
-
- append_offset += LOG_HEADER_SIZE
- flush
-
- // Directly write the data to the channel since it's large.
- val buffer = data.toByteBuffer
- val pos = append_offset + LOG_HEADER_SIZE
- flushed_offset.addAndGet(buffer.remaining)
- channel.write(buffer, pos)
- if (buffer.hasRemaining) {
- throw new IOException("Short write")
+ data.foreach { data=>
+ write_buffer.write(data.data, data.offset, data.length)
}
- append_offset += data_length
- } else {
- write_buffer.writeByte(LOG_HEADER_PREFIX)
- write_buffer.writeByte(id)
- write_buffer.writeInt(cs)
- write_buffer.writeInt(data_length)
- write_buffer.write(data.data, data.offset, data_length)
append_offset += total_length
- }
+// }
(record_position, info)
}
@@ -231,13 +237,13 @@ case class RecordLog(directory: File, lo
check_read_flush(offset + LOG_HEADER_SIZE + length)
- val record = new Buffer(LOG_HEADER_SIZE + length)
- if (channel.read(record.toByteBuffer, offset) != record.length) {
- throw new IOException("short record at position: " + record_position + " in file: " + file + ", offset: " + offset)
- }
-
if (verify_checksums) {
+ val record = new Buffer(LOG_HEADER_SIZE + length)
+ if (channel.read(record.toByteBuffer, offset) != record.length) {
+ throw new IOException("short record at position: " + record_position + " in file: " + file + ", offset: " + offset)
+ }
+
def record_is_not_changing = {
using(open) {
fd =>
@@ -271,11 +277,13 @@ case class RecordLog(directory: File, lo
}
}
- (kind, data)
+ data
} else {
- val kind = record.get(1)
- record.moveHead(LOG_HEADER_SIZE)
- (kind, record)
+ val record = new Buffer(length)
+ if (channel.read(record.toByteBuffer, offset+LOG_HEADER_SIZE) != record.length) {
+ throw new IOException("short record at position: " + record_position + " in file: " + file + ", offset: " + offset)
+ }
+ record
}
}
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireCodec.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireCodec.scala?rev=1291168&r1=1291167&r2=1291168&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireCodec.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireCodec.scala Mon Feb 20 10:02:27 2012
@@ -45,7 +45,6 @@ object OpenwireCodec extends Sizer[Comma
def encode(message: Message):MessageRecord = {
val rc = new MessageRecord
rc.protocol = PROTOCOL
- rc.expiration = message.expiration
val msg = message.asInstanceOf[OpenwireMessage];
rc.buffer = msg.message.getCachedEncoding match {
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala?rev=1291168&r1=1291167&r2=1291168&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala Mon Feb 20 10:02:27 2012
@@ -42,7 +42,6 @@ object StompCodec extends Log {
val rc = new MessageRecord
rc.protocol = PROTOCOL
- rc.expiration = message.expiration
if( frame.content.isInstanceOf[ZeroCopyContent] ) {
rc.direct_buffer = frame.content.asInstanceOf[ZeroCopyContent].zero_copy_buffer