You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/02/14 22:07:34 UTC

svn commit: r1244209 - in /activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb: LevelDBClient.scala RecordLog.scala

Author: chirino
Date: Tue Feb 14 21:07:33 2012
New Revision: 1244209

URL: http://svn.apache.org/viewvc?rev=1244209&view=rev
Log:
Small optimization for when the store grows really large with lots of log files.

Modified:
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala?rev=1244209&r1=1244208&r2=1244209&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala Tue Feb 14 21:07:33 2012
@@ -697,8 +697,8 @@ class LevelDBClient(store: LevelDBStore)
     callback.run
   }
 
-  def log_ref_decrement(pos: Long) = this.synchronized {
-    log.log_info(pos) match {
+  def log_ref_decrement(pos: Long, log_info:LogInfo=null) = this.synchronized {
+    Option(log_info).orElse(log.log_info(pos)) match {
       case Some(log_info)=>
         log_refs.get(log_info.position).foreach { counter =>
           val count = counter.decrementAndGet()
@@ -711,8 +711,8 @@ class LevelDBClient(store: LevelDBStore)
     }
   }
 
-  def log_ref_increment(pos: Long) = this.synchronized {
-    log.log_info(pos) match {
+  def log_ref_increment(pos: Long, log_info:LogInfo=null) = this.synchronized {
+    Option(log_info).orElse(log.log_info(pos)) match {
       case Some(log_info)=>
         val count = log_refs.getOrElseUpdate(log_info.position, new LongCounter()).incrementAndGet()
       case None =>
@@ -766,12 +766,14 @@ class LevelDBClient(store: LevelDBStore)
             uow.actions.foreach { case (msg, action) =>
               val message_record = action.message_record
               var locator:(Long, Int) = null
+              var log_info:LogInfo = null
 
               if (message_record != null) {
                 val message_data = PBSupport.encode_message_record(message_record)
                 val len = message_data.length
-                val pos = appender.append(LOG_ADD_MESSAGE, message_data)
-                locator = (pos, len)
+                val p = appender.append(LOG_ADD_MESSAGE, message_data)
+                locator = (p._1, len)
+                log_info = p._2
                 message_record.locator.set(locator);
               }
 
@@ -784,7 +786,7 @@ class LevelDBClient(store: LevelDBStore)
                 val key = encode_key(queue_entry_prefix, entry.queue_key, entry.entry_seq)
                 appender.append(LOG_REMOVE_QUEUE_ENTRY, key)
                 batch.delete(key)
-                log_ref_decrement(pos)
+                log_ref_decrement(pos, log_info)
               }
 
               var locator_buffer:Buffer = null
@@ -805,7 +807,7 @@ class LevelDBClient(store: LevelDBStore)
                 batch.put(encode_key(queue_entry_prefix, entry.queue_key, entry.entry_seq), encoded)
                 
                 // Increment it.
-                log_ref_increment(pos)
+                log_ref_increment(pos, log_info)
                 
               }
             }
@@ -1197,7 +1199,7 @@ class LevelDBClient(store: LevelDBStore)
 
             case record:MessagePB.Buffer =>
               val message_data = record.toFramedBuffer
-              val pos = appender.append(LOG_ADD_MESSAGE, message_data)
+              val (pos, _) = appender.append(LOG_ADD_MESSAGE, message_data)
               index.put(encode_key(tmp_prefix, record.getMessageKey), encode_locator(pos, message_data.length))
               true
 

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala?rev=1244209&r1=1244208&r2=1244209&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala Tue Feb 14 21:07:33 2012
@@ -103,6 +103,8 @@ case class RecordLog(directory: File, lo
 
   class LogAppender(file:File, position:Long) extends LogReader(file, position) {
 
+    val info = new LogInfo(file, position, 0)
+
     override def open = new RandomAccessFile(file, "rw")
 
     override def dispose() = {
@@ -139,7 +141,7 @@ case class RecordLog(directory: File, lo
     /**
      * returns the offset position of the data record.
      */
-    def append(id:Byte, data: Buffer): Long = this.synchronized {
+    def append(id:Byte, data: Buffer) = this.synchronized {
       val record_position = append_position
       val data_length = data.length
       val total_length = LOG_HEADER_SIZE + data_length
@@ -180,7 +182,7 @@ case class RecordLog(directory: File, lo
         write_buffer.write(data.data, data.offset, data_length)
         append_offset += total_length
       }
-      record_position
+      (record_position,info)
     }
 
     def flush = this.synchronized {
@@ -378,7 +380,7 @@ case class RecordLog(directory: File, lo
         log_infos += position -> new LogInfo(current_appender.file, current_appender.position, current_appender.append_offset)
       }
       current_appender = create_log_appender(position)
-      log_infos += position -> new LogInfo(current_appender.file, position, 0)
+      log_infos += position -> current_appender.info
     }
   }