You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/02/18 00:55:46 UTC

svn commit: r1245811 - /activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala

Author: chirino
Date: Fri Feb 17 23:55:46 2012
New Revision: 1245811

URL: http://svn.apache.org/viewvc?rev=1245811&view=rev
Log:
Switch to Unframed protobuf messages to further reduce index usage size.

Modified:
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala?rev=1245811&r1=1245810&r2=1245811&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala Fri Feb 17 23:55:46 2012
@@ -47,7 +47,7 @@ import org.apache.activemq.apollo.broker
 object LevelDBClient extends Log {
 
   final val STORE_SCHEMA_PREFIX = "leveldb_store:"
-  final val STORE_SCHEMA_VERSION = 2
+  final val STORE_SCHEMA_VERSION = 3
 
   final val queue_prefix = 'q'.toByte
   final val queue_entry_prefix = 'e'.toByte
@@ -389,27 +389,27 @@ class LevelDBClient(store: LevelDBStore)
                 kind match {
                   case LOG_ADD_QUEUE_ENTRY =>
                     replay_operations+=1
-                    val record = QueueEntryPB.FACTORY.parseFramed(data)
+                    val record = QueueEntryPB.FACTORY.parseUnframed(data)
 
                     val index_record = record.copy()
                     index_record.clearQueueKey()
                     index_record.clearQueueSeq()
-                    index.put(encode_key(queue_entry_prefix, record.getQueueKey, record.getQueueSeq), index_record.freeze().toFramedBuffer)
+                    index.put(encode_key(queue_entry_prefix, record.getQueueKey, record.getQueueSeq), index_record.freeze().toUnframedBuffer)
 
                     log_ref_increment(decode_vlong(record.getMessageLocator))
 
                   case LOG_REMOVE_QUEUE_ENTRY =>
                     replay_operations+=1
                     index.get(data, new ReadOptions).foreach { value=>
-                      val record = QueueEntryPB.FACTORY.parseFramed(value)
+                      val record = QueueEntryPB.FACTORY.parseUnframed(value)
                       val pos = decode_vlong(record.getMessageLocator)
                       pos.foreach(log_ref_decrement(_))
                       index.delete(data)
                     }
-                    
+
                   case LOG_ADD_QUEUE =>
                     replay_operations+=1
-                    val record = QueuePB.FACTORY.parseFramed(data)
+                    val record = QueuePB.FACTORY.parseUnframed(data)
                     index.put(encode_key(queue_prefix, record.getKey), data)
 
                   case LOG_REMOVE_QUEUE =>
@@ -424,7 +424,7 @@ class LevelDBClient(store: LevelDBStore)
 
                       // Figure out what log file that message entry was in so we can,
                       // decrement the log file reference.
-                      val record = QueueEntryPB.FACTORY.parseFramed(value)
+                      val record = QueueEntryPB.FACTORY.parseUnframed(value)
                       val pos = decode_vlong(record.getMessageLocator)
                       log_ref_decrement(pos)
                       true
@@ -432,7 +432,7 @@ class LevelDBClient(store: LevelDBStore)
 
                   case LOG_MAP_ENTRY =>
                     replay_operations+=1
-                    val entry = MapEntryPB.FACTORY.parseFramed(data)
+                    val entry = MapEntryPB.FACTORY.parseUnframed(data)
                     if (entry.getValue == null) {
                       index.delete(encode_key(map_prefix, entry.getKey))
                     } else {
@@ -467,11 +467,11 @@ class LevelDBClient(store: LevelDBStore)
     var referenced_queues = Set[Long]()
 
     // Lets find out what the queue entries are..
-    var fixed_records = 0 
+    var fixed_records = 0
     index.cursor_prefixed(queue_entry_prefix_array) { (key, value)=>
       try {
         val (_, queue_key, seq_key) = decode_long_long_key(key)
-        val record = QueueEntryPB.FACTORY.parseFramed(value)
+        val record = QueueEntryPB.FACTORY.parseUnframed(value)
         val (pos, len) = decode_locator(record.getMessageLocator)
         if (record.getQueueKey != queue_key) {
           throw new IOException("key missmatch")
@@ -493,12 +493,12 @@ class LevelDBClient(store: LevelDBStore)
       }
       true
     }
-    
+
     // Lets cross check the queues.
     index.cursor_prefixed(queue_prefix_array) { (key, value)=>
       try {
         val (_, queue_key) = decode_long_key(key)
-        val record = QueuePB.FACTORY.parseFramed(value)
+        val record = QueuePB.FACTORY.parseUnframed(value)
         if (record.getKey != queue_key) {
           throw new IOException("key missmatch")
         }
@@ -519,7 +519,7 @@ class LevelDBClient(store: LevelDBStore)
         trace("invalid queue entry record: %s, error: queue key does not exits %s", new Buffer(key), queue_key)
         fixed_records += 1
         index.delete(key)
-        val record = QueueEntryPB.FACTORY.parseFramed(value)
+        val record = QueueEntryPB.FACTORY.parseUnframed(value)
         val pos = decode_vlong(record.getMessageLocator)
         log.log_info(pos).foreach { log_info =>
           actual_log_refs.get(log_info.position).foreach { counter =>
@@ -537,14 +537,14 @@ class LevelDBClient(store: LevelDBStore)
       log_refs.clear()
       log_refs ++= actual_log_refs
     }
-    
+
     if( fixed_records > 0 ) {
       warn("Fixed %d invalid index enties in the leveldb store", fixed_records)
     }
   }
 
   var lock_file:LockFile = _
-  
+
   def lock_store = {
     import OptionSupport._
     if (config.fail_if_locked.getOrElse(false)) {
@@ -555,11 +555,11 @@ class LevelDBClient(store: LevelDBStore)
       }
     }
   }
-  
+
   def unlock_store = {
     lock_file.unlock()
   }
-  
+
   private def store_log_refs = {
     index.put(log_refs_index_key, JsonCodec.encode(collection.JavaConversions.mapAsJavaMap(log_refs.mapValues(_.get()))).toByteArray)
   }
@@ -573,7 +573,7 @@ class LevelDBClient(store: LevelDBStore)
       }
     }
   }
-  
+
   def stop() = {
     // this blocks until all io completes..
     // Suspend also deletes the index.
@@ -768,7 +768,7 @@ class LevelDBClient(store: LevelDBStore)
 
           // Figure out what log file that message entry was in so we can,
           // decrement the log file reference.
-          val record = QueueEntryPB.FACTORY.parseFramed(value)
+          val record = QueueEntryPB.FACTORY.parseUnframed(value)
           val pos = decode_vlong(record.getMessageLocator)
           log_ref_decrement(pos)
           true
@@ -795,7 +795,7 @@ class LevelDBClient(store: LevelDBStore)
                 entry.setValue(value)
                 batch.put(encode_key(map_prefix, key), value.toByteArray)
               }
-              appender.append(LOG_MAP_ENTRY, entry.freeze().toFramedByteArray)
+              appender.append(LOG_MAP_ENTRY, entry.freeze().toUnframedByteArray)
             }
 
             uow.actions.foreach { case (msg, action) =>
@@ -846,18 +846,18 @@ class LevelDBClient(store: LevelDBStore)
                 if (entry.redeliveries!=0)
                   log_record.setRedeliveries(entry.redeliveries)
 
-                appender.append(LOG_ADD_QUEUE_ENTRY, log_record.freeze().toFramedBuffer)
+                appender.append(LOG_ADD_QUEUE_ENTRY, log_record.freeze().toUnframedBuffer)
 
                 // Slim down the index record, the smaller it is the cheaper the compactions
                 // will be and the more we can cache in mem.
                 val index_record = log_record.copy()
                 index_record.clearQueueKey()
                 index_record.clearQueueSeq()
-                batch.put(encode_key(queue_entry_prefix, entry.queue_key, entry.entry_seq), index_record.freeze().toFramedBuffer)
-                
+                batch.put(encode_key(queue_entry_prefix, entry.queue_key, entry.entry_seq), index_record.freeze().toUnframedBuffer)
+
                 // Increment it.
                 log_ref_increment(pos, log_info)
-                
+
               }
             }
             if( uow.flush_sync ) {
@@ -974,7 +974,7 @@ class LevelDBClient(store: LevelDBStore)
             group.first_entry_seq = current_key
           }
 
-          val entry = QueueEntryPB.FACTORY.parseFramed(value)
+          val entry = QueueEntryPB.FACTORY.parseUnframed(value)
           val pos = decode_vlong(entry.getMessageLocator)
 
           group.last_entry_seq = current_key
@@ -1016,7 +1016,7 @@ class LevelDBClient(store: LevelDBStore)
         val end = encode_key(queue_entry_prefix, queue_key, lastSeq+1)
         index.cursor_range( start, end, ro ) { (key, value) =>
           val (_, _, queue_seq) = decode_long_long_key(key)
-          val record = QueueEntryPB.FACTORY.parseFramed(value)
+          val record = QueueEntryPB.FACTORY.parseUnframed(value)
           val entry = PBSupport.from_pb(record)
           entry.queue_key = queue_key
           entry.entry_seq = queue_seq
@@ -1036,7 +1036,7 @@ class LevelDBClient(store: LevelDBStore)
       index.get(encode_key(map_prefix, key)).map(new Buffer(_))
     }
   }
-  
+
   def get_prefixed_map_entries(prefix:Buffer):Seq[(Buffer, Buffer)] = {
     val rc = ListBuffer[(Buffer, Buffer)]()
     retry_using_index {
@@ -1046,7 +1046,7 @@ class LevelDBClient(store: LevelDBStore)
       }
     }
     rc
-  }  
+  }
 
   def get_last_queue_key:Long = {
     retry_using_index {
@@ -1074,12 +1074,12 @@ class LevelDBClient(store: LevelDBStore)
       }
     }
   }
-  
+
   case class UsageCounter(info:LogInfo) {
     var count = 0L
     var size = 0L
     var first_reference_queue:QueueRecord = _
-    
+
     def increment(value:Int) = {
       count += 1
       size += value
@@ -1150,19 +1150,19 @@ class LevelDBClient(store: LevelDBStore)
       val manager = ExportStreamManager(os, 1)
 
       retry_using_index {
-        
+
         // Delete all the tmp keys..
         index.cursor_keys_prefixed(Array(tmp_prefix)) { key =>
           index.delete(key)
           true
         }
-        
+
         index.snapshot { snapshot=>
           val nocache = new ReadOptions
           nocache.snapshot(snapshot)
           nocache.verifyChecksums(verify_checksums)
           nocache.fillCache(false)
-          
+
           val cache = new ReadOptions
           nocache.snapshot(snapshot)
           nocache.verifyChecksums(false)
@@ -1171,7 +1171,7 @@ class LevelDBClient(store: LevelDBStore)
           // Build a temp table of all references messages by the queues
           // Remember 2 queues could reference the same message.
           index.cursor_prefixed(queue_entry_prefix_array, cache) { (_, value) =>
-            val record = QueueEntryPB.FACTORY.parseFramed(value)
+            val record = QueueEntryPB.FACTORY.parseUnframed(value)
             val (pos, len) = decode_locator(record.getMessageLocator)
             index.put(encode_key(tmp_prefix, pos), encode_vlong(len))
             true
@@ -1184,7 +1184,7 @@ class LevelDBClient(store: LevelDBStore)
             val len = decode_vlong(value).toInt
             log.read(pos, len).foreach { value =>
               // Set the message key to be the position in the log.
-              val record = MessagePB.FACTORY.parseFramed(value).copy
+              val record = MessagePB.FACTORY.parseUnframed(value).copy
               record.setMessageKey(pos)
               manager.store_message(record)
             }
@@ -1194,7 +1194,7 @@ class LevelDBClient(store: LevelDBStore)
           // Now export the queue entries
           index.cursor_prefixed(queue_entry_prefix_array, nocache) { (key, value) =>
             val (_, queue_key, queue_seq) = decode_long_long_key(key)
-            val record = QueueEntryPB.FACTORY.parseFramed(value).copy()
+            val record = QueueEntryPB.FACTORY.parseUnframed(value).copy()
             val (pos, len) = decode_locator(record.getMessageLocator)
             record.setQueueKey(queue_key)
             record.setQueueSeq(queue_seq)
@@ -1204,7 +1204,7 @@ class LevelDBClient(store: LevelDBStore)
           }
 
           index.cursor_prefixed(queue_prefix_array) { (_, value) =>
-            val record = QueuePB.FACTORY.parseFramed(value)
+            val record = QueuePB.FACTORY.parseUnframed(value)
             manager.store_queue(record)
             true
           }
@@ -1253,7 +1253,7 @@ class LevelDBClient(store: LevelDBStore)
           while(manager.getNext match {
 
             case record:MessagePB.Buffer =>
-              val message_data = record.toFramedBuffer
+              val message_data = record.toUnframedBuffer
               val (pos, _) = appender.append(LOG_ADD_MESSAGE, message_data)
               index.put(encode_key(tmp_prefix, record.getMessageKey), encode_locator(pos, message_data.length))
               true
@@ -1265,7 +1265,7 @@ class LevelDBClient(store: LevelDBStore)
                 case Some(locator)=>
                 val (pos, len) = decode_locator(locator)
                 copy.setMessageLocator(locator)
-                index.put(encode_key(queue_entry_prefix, record.getQueueKey, record.getQueueSeq), copy.freeze().toFramedBuffer)
+                index.put(encode_key(queue_entry_prefix, record.getQueueKey, record.getQueueSeq), copy.freeze().toUnframedBuffer)
                 log.log_info(pos).foreach { log_info =>
                   log_refs.getOrElseUpdate(log_info.position, new LongCounter()).incrementAndGet()
                 }
@@ -1275,7 +1275,7 @@ class LevelDBClient(store: LevelDBStore)
               true
 
             case record:QueuePB.Buffer =>
-              index.put(encode_key(queue_prefix, record.getKey), record.toFramedBuffer)
+              index.put(encode_key(queue_prefix, record.getKey), record.toUnframedBuffer)
               true
 
             case record:MapEntryPB.Buffer =>