You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/02/18 00:17:30 UTC

svn commit: r1245801 - in /activemq/activemq-apollo/trunk: apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/ apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/

Author: chirino
Date: Fri Feb 17 23:17:30 2012
New Revision: 1245801

URL: http://svn.apache.org/viewvc?rev=1245801&view=rev
Log:
Reduce the amount of data encoded into every leveldb store's index queue entry.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala?rev=1245801&r1=1245800&r2=1245801&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala Fri Feb 17 23:17:30 2012
@@ -111,12 +111,12 @@ abstract class StoreFunSuiteSupport exte
 
   def populate(queue_key:Long, messages:List[String], first_seq:Long=1) = {
     var batch = store.create_uow
-    var msg_keys = ListBuffer[(Long, AtomicReference[Object])]()
+    var msg_keys = ListBuffer[(Long, AtomicReference[Object], Long)]()
     var next_seq = first_seq
 
     messages.foreach { message=>
       val msgKey = add_message(batch, message)
-      msg_keys += msgKey
+      msg_keys +=( (msgKey._1, msgKey._2, next_seq) )
       batch.enqueue(entry(queue_key, next_seq, msgKey))
       next_seq += 1
     }
@@ -215,8 +215,8 @@ abstract class StoreFunSuiteSupport exte
     val msg_keys = populate(A, "message 1"::"message 2"::"message 3"::Nil)
 
     val rc:Seq[QueueEntryRecord] = sync_cb( cb=> store.list_queue_entries(A,0, Long.MaxValue)(cb) )
-    expect(msg_keys.toSeq.map(_._1)) {
-      rc.map( _.message_key )
+    expect(msg_keys.toSeq.map(_._3)) {
+      rc.map( _.entry_seq )
     }
   }
 

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala?rev=1245801&r1=1245800&r2=1245801&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala Fri Feb 17 23:17:30 2012
@@ -387,13 +387,16 @@ class LevelDBClient(store: LevelDBStore)
             log.read(pos).map {
               case (kind, data, next_pos) =>
                 kind match {
-                  case LOG_ADD_MESSAGE =>
                   case LOG_ADD_QUEUE_ENTRY =>
                     replay_operations+=1
                     val record = QueueEntryPB.FACTORY.parseFramed(data)
-                    val pos = decode_vlong(record.getMessageLocator)
-                    index.put(encode_key(queue_entry_prefix, record.getQueueKey, record.getQueueSeq), data)
-                    pos.foreach(log_ref_increment(_))
+
+                    val index_record = record.copy()
+                    index_record.clearQueueKey()
+                    index_record.clearQueueSeq()
+                    index.put(encode_key(queue_entry_prefix, record.getQueueKey, record.getQueueSeq), index_record.freeze().toFramedBuffer)
+
+                    log_ref_increment(decode_vlong(record.getMessageLocator))
 
                   case LOG_REMOVE_QUEUE_ENTRY =>
                     replay_operations+=1
@@ -436,7 +439,7 @@ class LevelDBClient(store: LevelDBStore)
                       index.put(encode_key(map_prefix, entry.getKey), entry.getValue.toByteArray)
                     }
                   case _ =>
-                  // Skip unknown records
+                  // Skip records which don't require index updates.
                 }
                 pos = next_pos
             }
@@ -825,18 +828,32 @@ class LevelDBClient(store: LevelDBStore)
               action.enqueues.foreach { entry =>
                 assert(locator!=null)
                 val (pos, len) = locator
-                entry.message_locator.set(locator)
-
                 if ( locator_buffer==null ) {
                   locator_buffer = encode_locator(pos, len)
                 }
 
-                val record = PBSupport.to_pb(entry)
-                record.setMessageLocator(locator_buffer)
+                entry.message_locator.set(locator)
 
-                val encoded = record.freeze().toFramedBuffer
-                appender.append(LOG_ADD_QUEUE_ENTRY, encoded)
-                batch.put(encode_key(queue_entry_prefix, entry.queue_key, entry.entry_seq), encoded)
+                val log_record = new QueueEntryPB.Bean
+                // TODO: perhaps we should normalize the sender to make the index entries more compact.
+                log_record.setSender(entry.sender)
+                log_record.setMessageLocator(locator_buffer)
+                log_record.setQueueKey(entry.queue_key)
+                log_record.setQueueSeq(entry.entry_seq)
+                log_record.setSize(entry.size)
+                if (entry.expiration!=0)
+                  log_record.setExpiration(entry.expiration)
+                if (entry.redeliveries!=0)
+                  log_record.setRedeliveries(entry.redeliveries)
+
+                appender.append(LOG_ADD_QUEUE_ENTRY, log_record.freeze().toFramedBuffer)
+
+                // Slim down the index record, the smaller it is the cheaper the compactions
+                // will be and the more we can cache in mem.
+                val index_record = log_record.copy()
+                index_record.clearQueueKey()
+                index_record.clearQueueSeq()
+                batch.put(encode_key(queue_entry_prefix, entry.queue_key, entry.entry_seq), index_record.freeze().toFramedBuffer)
                 
                 // Increment it.
                 log_ref_increment(pos, log_info)
@@ -998,8 +1015,11 @@ class LevelDBClient(store: LevelDBStore)
         val start = encode_key(queue_entry_prefix, queue_key, firstSeq)
         val end = encode_key(queue_entry_prefix, queue_key, lastSeq+1)
         index.cursor_range( start, end, ro ) { (key, value) =>
+          val (_, _, queue_seq) = decode_long_long_key(key)
           val record = QueueEntryPB.FACTORY.parseFramed(value)
           val entry = PBSupport.from_pb(record)
+          entry.queue_key = queue_key
+          entry.entry_seq = queue_seq
           entry.message_locator = new AtomicReference[Object](decode_locator(record.getMessageLocator))
           rc += entry
           true
@@ -1172,9 +1192,12 @@ class LevelDBClient(store: LevelDBStore)
           }
 
           // Now export the queue entries
-          index.cursor_prefixed(queue_entry_prefix_array, nocache) { (_, value) =>
+          index.cursor_prefixed(queue_entry_prefix_array, nocache) { (key, value) =>
+            val (_, queue_key, queue_seq) = decode_long_long_key(key)
             val record = QueueEntryPB.FACTORY.parseFramed(value).copy()
             val (pos, len) = decode_locator(record.getMessageLocator)
+            record.setQueueKey(queue_key)
+            record.setQueueSeq(queue_seq)
             record.setMessageKey(pos)
             manager.store_queue_entry(record)
             true