You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/02/18 00:17:30 UTC
svn commit: r1245801 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/
apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/
Author: chirino
Date: Fri Feb 17 23:17:30 2012
New Revision: 1245801
URL: http://svn.apache.org/viewvc?rev=1245801&view=rev
Log:
Reduce the amount of data encoded into every leveldb store's index queue entry.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala?rev=1245801&r1=1245800&r2=1245801&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala Fri Feb 17 23:17:30 2012
@@ -111,12 +111,12 @@ abstract class StoreFunSuiteSupport exte
def populate(queue_key:Long, messages:List[String], first_seq:Long=1) = {
var batch = store.create_uow
- var msg_keys = ListBuffer[(Long, AtomicReference[Object])]()
+ var msg_keys = ListBuffer[(Long, AtomicReference[Object], Long)]()
var next_seq = first_seq
messages.foreach { message=>
val msgKey = add_message(batch, message)
- msg_keys += msgKey
+ msg_keys +=( (msgKey._1, msgKey._2, next_seq) )
batch.enqueue(entry(queue_key, next_seq, msgKey))
next_seq += 1
}
@@ -215,8 +215,8 @@ abstract class StoreFunSuiteSupport exte
val msg_keys = populate(A, "message 1"::"message 2"::"message 3"::Nil)
val rc:Seq[QueueEntryRecord] = sync_cb( cb=> store.list_queue_entries(A,0, Long.MaxValue)(cb) )
- expect(msg_keys.toSeq.map(_._1)) {
- rc.map( _.message_key )
+ expect(msg_keys.toSeq.map(_._3)) {
+ rc.map( _.entry_seq )
}
}
Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala?rev=1245801&r1=1245800&r2=1245801&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala Fri Feb 17 23:17:30 2012
@@ -387,13 +387,16 @@ class LevelDBClient(store: LevelDBStore)
log.read(pos).map {
case (kind, data, next_pos) =>
kind match {
- case LOG_ADD_MESSAGE =>
case LOG_ADD_QUEUE_ENTRY =>
replay_operations+=1
val record = QueueEntryPB.FACTORY.parseFramed(data)
- val pos = decode_vlong(record.getMessageLocator)
- index.put(encode_key(queue_entry_prefix, record.getQueueKey, record.getQueueSeq), data)
- pos.foreach(log_ref_increment(_))
+
+ val index_record = record.copy()
+ index_record.clearQueueKey()
+ index_record.clearQueueSeq()
+ index.put(encode_key(queue_entry_prefix, record.getQueueKey, record.getQueueSeq), index_record.freeze().toFramedBuffer)
+
+ log_ref_increment(decode_vlong(record.getMessageLocator))
case LOG_REMOVE_QUEUE_ENTRY =>
replay_operations+=1
@@ -436,7 +439,7 @@ class LevelDBClient(store: LevelDBStore)
index.put(encode_key(map_prefix, entry.getKey), entry.getValue.toByteArray)
}
case _ =>
- // Skip unknown records
+ // Skip records which don't require index updates.
}
pos = next_pos
}
@@ -825,18 +828,32 @@ class LevelDBClient(store: LevelDBStore)
action.enqueues.foreach { entry =>
assert(locator!=null)
val (pos, len) = locator
- entry.message_locator.set(locator)
-
if ( locator_buffer==null ) {
locator_buffer = encode_locator(pos, len)
}
- val record = PBSupport.to_pb(entry)
- record.setMessageLocator(locator_buffer)
+ entry.message_locator.set(locator)
- val encoded = record.freeze().toFramedBuffer
- appender.append(LOG_ADD_QUEUE_ENTRY, encoded)
- batch.put(encode_key(queue_entry_prefix, entry.queue_key, entry.entry_seq), encoded)
+ val log_record = new QueueEntryPB.Bean
+ // TODO: perhaps we should normalize the sender to make the index entries more compact.
+ log_record.setSender(entry.sender)
+ log_record.setMessageLocator(locator_buffer)
+ log_record.setQueueKey(entry.queue_key)
+ log_record.setQueueSeq(entry.entry_seq)
+ log_record.setSize(entry.size)
+ if (entry.expiration!=0)
+ log_record.setExpiration(entry.expiration)
+ if (entry.redeliveries!=0)
+ log_record.setRedeliveries(entry.redeliveries)
+
+ appender.append(LOG_ADD_QUEUE_ENTRY, log_record.freeze().toFramedBuffer)
+
+ // Slim down the index record, the smaller it is the cheaper the compactions
+ // will be and the more we can cache in mem.
+ val index_record = log_record.copy()
+ index_record.clearQueueKey()
+ index_record.clearQueueSeq()
+ batch.put(encode_key(queue_entry_prefix, entry.queue_key, entry.entry_seq), index_record.freeze().toFramedBuffer)
// Increment it.
log_ref_increment(pos, log_info)
@@ -998,8 +1015,11 @@ class LevelDBClient(store: LevelDBStore)
val start = encode_key(queue_entry_prefix, queue_key, firstSeq)
val end = encode_key(queue_entry_prefix, queue_key, lastSeq+1)
index.cursor_range( start, end, ro ) { (key, value) =>
+ val (_, _, queue_seq) = decode_long_long_key(key)
val record = QueueEntryPB.FACTORY.parseFramed(value)
val entry = PBSupport.from_pb(record)
+ entry.queue_key = queue_key
+ entry.entry_seq = queue_seq
entry.message_locator = new AtomicReference[Object](decode_locator(record.getMessageLocator))
rc += entry
true
@@ -1172,9 +1192,12 @@ class LevelDBClient(store: LevelDBStore)
}
// Now export the queue entries
- index.cursor_prefixed(queue_entry_prefix_array, nocache) { (_, value) =>
+ index.cursor_prefixed(queue_entry_prefix_array, nocache) { (key, value) =>
+ val (_, queue_key, queue_seq) = decode_long_long_key(key)
val record = QueueEntryPB.FACTORY.parseFramed(value).copy()
val (pos, len) = decode_locator(record.getMessageLocator)
+ record.setQueueKey(queue_key)
+ record.setQueueSeq(queue_seq)
record.setMessageKey(pos)
manager.store_queue_entry(record)
true