You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/12/06 18:54:22 UTC

svn commit: r1211056 - in /activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb: HelperTrait.scala LevelDBClient.scala RecordLog.scala

Author: chirino
Date: Tue Dec  6 17:54:22 2011
New Revision: 1211056

URL: http://svn.apache.org/viewvc?rev=1211056&view=rev
Log:
the LevelDB locators now contain both pos and length of the message so the messages can be more efficiently loaded from disk.

Modified:
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/HelperTrait.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/HelperTrait.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/HelperTrait.scala?rev=1211056&r1=1211055&r2=1211056&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/HelperTrait.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/HelperTrait.scala Tue Dec  6 17:54:22 2011
@@ -22,36 +22,46 @@ import java.io.DataOutput
 
 object HelperTrait {
 
-  def encode(a1:Long):Array[Byte] = {
-//    val out = new DataByteArrayOutputStream(
-//      AbstractVarIntSupport.computeVarLongSize(a1)
-//    )
-//    out.writeVarLong(a1)
-    val out = new DataByteArrayOutputStream(8)
-    out.writeLong(a1)
+  def encode_locator(pos:Long, len:Int):Array[Byte] = {
+    val out = new DataByteArrayOutputStream(
+      AbstractVarIntSupport.computeVarLongSize(pos)+
+      AbstractVarIntSupport.computeVarIntSize(len)
+    )
+    out.writeVarLong(pos)
+    out.writeVarInt(len)
     out.getData
   }
 
-  def decode_long(bytes:Buffer):Long = {
+  def decode_locator(bytes:Array[Byte]):(Long,  Int) = {
     val in = new DataByteArrayInputStream(bytes)
-//    in.readVarLong()
-    in.readLong()
+    (in.readVarLong(), in.readVarInt())
+  }
+  def decode_locator(bytes:Buffer):(Long,  Int) = {
+    val in = new DataByteArrayInputStream(bytes)
+    (in.readVarLong(), in.readVarInt())
+  }
+
+  def encode_vlong(a1:Long):Array[Byte] = {
+    val out = new DataByteArrayOutputStream(
+      AbstractVarIntSupport.computeVarLongSize(a1)
+    )
+    out.writeVarLong(a1)
+    out.getData
   }
 
-  def decode_long(bytes:Array[Byte]):Long = {
+  def decode_vlong(bytes:Array[Byte]):Long = {
     val in = new DataByteArrayInputStream(bytes)
-//    in.readVarLong()
-    in.readLong()
+    in.readVarLong()
   }
 
-  def encode(a1:Byte, a2:Long):Array[Byte] = {
+  def encode_key(a1:Byte, a2:Long):Array[Byte] = {
     val out = new DataByteArrayOutputStream(9)
     out.writeByte(a1.toInt)
     out.writeLong(a2)
     out.getData
   }
 
-  def encode(a1:Byte, a2:Buffer):Array[Byte] = {
+  def encode_key(a1:Byte, a2:Buffer):Array[Byte] = {
     val out = new DataByteArrayOutputStream(1+a2.length)
     out.writeByte(a1.toInt)
     a2.writeTo(out.asInstanceOf[DataOutput])
@@ -63,7 +73,7 @@ object HelperTrait {
     (in.readByte(), in.readLong())
   }
 
-  def encode(a1:Byte, a2:Long, a3:Long):Array[Byte] = {
+  def encode_key(a1:Byte, a2:Long, a3:Long):Array[Byte] = {
     val out = new DataByteArrayOutputStream(17)
     out.writeByte(a1)
     out.writeLong(a2)

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala?rev=1211056&r1=1211055&r2=1211056&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala Tue Dec  6 17:54:22 2011
@@ -297,20 +297,20 @@ class LevelDBClient(store: LevelDBStore)
         try {
           while (pos < log.appender_limit) {
             log.read(pos).map {
-              case (kind, data, len) =>
+              case (kind, data, next_pos) =>
                 kind match {
                   case LOG_ADD_MESSAGE =>
                     val record: MessageRecord = data
-                    index.put(encode(message_prefix, record.key), encode(pos))
+                    index.put(encode_key(message_prefix, record.key), encode_locator(pos, data.length))
                   case LOG_ADD_QUEUE_ENTRY =>
                     val record: QueueEntryRecord = data
-                    index.put(encode(queue_entry_prefix, record.queue_key, record.entry_seq), data)
+                    index.put(encode_key(queue_entry_prefix, record.queue_key, record.entry_seq), data)
                     
                     // Figure out which log file this message reference is pointing at..
                     val log_key = (if(record.message_locator!=null) {
-                      Some(decode_long(record.message_locator))
+                      Some(decode_locator(record.message_locator)._1)
                     } else {
-                      index.get(encode(message_prefix, record.message_key)).map(decode_long(_))
+                      index.get(encode_key(message_prefix, record.message_key)).map(decode_locator(_)._1)
                     }).flatMap(log.log_info(_)).map(_.position)
                     
                     // Increment it.
@@ -325,9 +325,9 @@ class LevelDBClient(store: LevelDBStore)
   
                       // Figure out which log file this message reference is pointing at..
                       val log_key = (if(record.message_locator!=null) {
-                        Some(decode_long(record.message_locator))
+                        Some(decode_locator(record.message_locator)._1)
                       } else {
-                        index.get(encode(message_prefix, record.message_key)).map(decode_long(_))
+                        index.get(encode_key(message_prefix, record.message_key)).map(decode_locator(_)._1)
                       }).flatMap(log.log_info(_)).map(_.position)
                       
                       // Decrement it.
@@ -344,14 +344,14 @@ class LevelDBClient(store: LevelDBStore)
                     
                   case LOG_ADD_QUEUE =>
                     val record: QueueRecord = data
-                    index.put(encode(queue_prefix, record.key), data)
+                    index.put(encode_key(queue_prefix, record.key), data)
                   case LOG_REMOVE_QUEUE =>
                     val ro = new ReadOptions
                     ro.fillCache(false)
                     ro.verifyChecksums(verify_checksums)
-                    val queue_key = decode_long(data)
-                    index.delete(encode(queue_prefix, queue_key))
-                    index.cursor_keys_prefixed(encode(queue_entry_prefix, queue_key), ro) {
+                    val queue_key = decode_vlong(data)
+                    index.delete(encode_key(queue_prefix, queue_key))
+                    index.cursor_keys_prefixed(encode_key(queue_entry_prefix, queue_key), ro) {
                       key =>
                         index.delete(key)
                         true
@@ -359,14 +359,14 @@ class LevelDBClient(store: LevelDBStore)
                   case LOG_MAP_ENTRY =>
                     val entry = MapEntryPB.FACTORY.parseUnframed(data)
                     if (entry.getValue == null) {
-                      index.delete(encode(map_prefix, entry.getKey))
+                      index.delete(encode_key(map_prefix, entry.getKey))
                     } else {
-                      index.put(encode(map_prefix, entry.getKey), entry.getValue.toByteArray)
+                      index.put(encode_key(map_prefix, entry.getKey), entry.getValue.toByteArray)
                     }
                   case _ =>
                   // Skip unknown records like the RecordLog headers.
                 }
-                pos += len
+                pos = next_pos
             }
           }
         }
@@ -546,7 +546,7 @@ class LevelDBClient(store: LevelDBStore)
     retry_using_index {
       log.appender { appender =>
         appender.append(LOG_ADD_QUEUE, record)
-        index.put(encode(queue_prefix, record.key), record)
+        index.put(encode_key(queue_prefix, record.key), record)
       }
     }
     callback.run
@@ -558,9 +558,9 @@ class LevelDBClient(store: LevelDBStore)
         val ro = new ReadOptions
         ro.fillCache(false)
         ro.verifyChecksums(verify_checksums)
-        appender.append(LOG_REMOVE_QUEUE, encode(queue_key))
-        index.delete(encode(queue_prefix, queue_key))
-        index.cursor_keys_prefixed(encode(queue_entry_prefix, queue_key), ro) { key=>
+        appender.append(LOG_REMOVE_QUEUE, encode_vlong(queue_key))
+        index.delete(encode_key(queue_prefix, queue_key))
+        index.cursor_keys_prefixed(encode_key(queue_entry_prefix, queue_key), ro) { key=>
           index.delete(key)
           true
         }
@@ -581,10 +581,10 @@ class LevelDBClient(store: LevelDBStore)
               val entry = new MapEntryPB.Bean()
               entry.setKey(key)
               if( value==null ) {
-                batch.delete(encode(map_prefix, key))
+                batch.delete(encode_key(map_prefix, key))
               } else {
                 entry.setValue(value)
-                batch.put(encode(map_prefix, key), value.toByteArray)
+                batch.put(encode_key(map_prefix, key), value.toByteArray)
               }
               appender.append(LOG_MAP_ENTRY, entry.freeze().toUnframedByteArray)
             }
@@ -592,24 +592,29 @@ class LevelDBClient(store: LevelDBStore)
             uow.actions.foreach { case (msg, action) =>
               val message_record = action.message_record
               var pos = -1L
-              var pos_buffer:Buffer = null
+              var len = 0
+              var locator_buffer:Buffer = null
 
               if (message_record != null) {
-                pos = appender.append(LOG_ADD_MESSAGE, message_record)
-                val pos_encoded = encode(pos)
-                pos_buffer = new Buffer(pos_encoded)
+                val message_data:Array[Byte] = message_record
+                len = message_data.length
+                pos = appender.append(LOG_ADD_MESSAGE, message_data)
+                val locator_data = encode_locator(pos, len)
+                locator_buffer = new Buffer(locator_data)
                 if( message_record.locator !=null ) {
-                  message_record.locator.set(pos_encoded);
+                  message_record.locator.set(locator_data);
                 }
-                batch.put(encode(message_prefix, action.message_record.key), pos_encoded)
+                batch.put(encode_key(message_prefix, action.message_record.key), locator_data)
               }
 
               action.dequeues.foreach { entry =>
-                if( pos_buffer==null && entry.message_locator!=null ) {
-                  pos_buffer = entry.message_locator
-                  pos = decode_long(pos_buffer)
+                if( locator_buffer==null && entry.message_locator!=null ) {
+                  locator_buffer = entry.message_locator
+                  val t = decode_locator(locator_buffer)
+                  pos = t._1
+                  len = t._2
                 }
-                val key = encode(queue_entry_prefix, entry.queue_key, entry.entry_seq)
+                val key = encode_key(queue_entry_prefix, entry.queue_key, entry.entry_seq)
                 appender.append(LOG_REMOVE_QUEUE_ENTRY, key)
                 batch.delete(key)
 
@@ -623,10 +628,10 @@ class LevelDBClient(store: LevelDBStore)
               }
 
               action.enqueues.foreach { entry =>
-                entry.message_locator = pos_buffer
+                entry.message_locator = locator_buffer
                 val encoded:Array[Byte] = entry
                 appender.append(LOG_ADD_QUEUE_ENTRY, encoded)
-                batch.put(encode(queue_entry_prefix, entry.queue_key, entry.entry_seq), encoded)
+                batch.put(encode_key(queue_entry_prefix, entry.queue_key, entry.entry_seq), encoded)
                 
                 // Increment it.
                 log.log_info(pos).foreach { log_info=>
@@ -665,18 +670,24 @@ class LevelDBClient(store: LevelDBStore)
           val (message_key, locator, callback) = x
           val record = metric_load_from_index_counter.time {
             var pos = 0L
-            var pos_array:Array[Byte] = null
+            var len = 0
+            var locator_data:Array[Byte] = null
             if( locator!=null ) {
-              pos_array = locator.get()
-              if( pos_array!=null ) {
-                pos = decode_long(pos_array)
+              locator_data = locator.get()
+              if( locator_data!=null ) {
+                val t = decode_locator(locator_data)
+                pos = t._1
+                len = t._2
               }
             }
             if( pos == 0L ) {
-              index.get(encode(message_prefix, message_key), ro) match {
+              index.get(encode_key(message_prefix, message_key), ro) match {
                 case Some(value) =>
-                  pos_array = value
-                  pos = decode_long(pos_array)
+                  locator_data = value
+                  val t = decode_locator(locator_data)
+                  pos = t._1
+                  len = t._2
+
                 case None =>
                   pos = 0L
               }
@@ -684,9 +695,9 @@ class LevelDBClient(store: LevelDBStore)
             if (pos == 0L ) {
               None
             } else {
-              log.read(pos).map { case (prefix, data, _)=>
+              log.read(pos, len).map { data =>
                 val rc:MessageRecord = data
-                rc.locator = new AtomicReference[Array[Byte]](pos_array)
+                rc.locator = new AtomicReference[Array[Byte]](locator_data)
                 rc
               }
             }
@@ -712,11 +723,11 @@ class LevelDBClient(store: LevelDBStore)
         missing.foreach { x =>
           val (message_key, locator, callback) = x
           val record = metric_load_from_index_counter.time {
-            index.get(encode(message_prefix, message_key), ro).flatMap{ pos_array=>
-              val pos = decode_long(pos_array)
-              log.read(pos).map { case (prefix, data, _)=>
+            index.get(encode_key(message_prefix, message_key), ro).flatMap{ locator_data=>
+              val (pos, len) = decode_locator(locator_data)
+              log.read(pos, len).map { data =>
                 val rc:MessageRecord = data
-                rc.locator = new AtomicReference[Array[Byte]](pos_array)
+                rc.locator = new AtomicReference[Array[Byte]](locator_data)
                 rc
               }
             }
@@ -746,7 +757,7 @@ class LevelDBClient(store: LevelDBStore)
       val ro = new ReadOptions
       ro.fillCache(false)
       ro.verifyChecksums(verify_checksums)
-      index.get(encode(queue_prefix, queue_key), ro).map( x=> decode_queue_record(x)  )
+      index.get(encode_key(queue_prefix, queue_key), ro).map( x=> decode_queue_record(x)  )
     }
   }
 
@@ -760,7 +771,7 @@ class LevelDBClient(store: LevelDBStore)
         ro.snapshot(snapshot)
 
         var group:QueueEntryRange = null
-        index.cursor_prefixed( encode(queue_entry_prefix, queue_key), ro) { (key, value) =>
+        index.cursor_prefixed( encode_key(queue_entry_prefix, queue_key), ro) { (key, value) =>
 
           val (_,_,current_key) = decode_long_long_key(key)
           if( group == null ) {
@@ -805,8 +816,8 @@ class LevelDBClient(store: LevelDBStore)
     retry_using_index {
       index.snapshot { snapshot =>
         ro.snapshot(snapshot)
-        val start = encode(queue_entry_prefix, queue_key, firstSeq)
-        val end = encode(queue_entry_prefix, queue_key, lastSeq+1)
+        val start = encode_key(queue_entry_prefix, queue_key, firstSeq)
+        val end = encode_key(queue_entry_prefix, queue_key, lastSeq+1)
         index.cursor_range( start, end, ro ) { (key, value) =>
           rc += value
           true
@@ -824,7 +835,7 @@ class LevelDBClient(store: LevelDBStore)
 
   def get(key: Buffer):Option[Buffer] = {
     retry_using_index {
-      index.get(encode(map_prefix, key)).map(new Buffer(_))
+      index.get(encode_key(map_prefix, key)).map(new Buffer(_))
     }
   }
 
@@ -898,14 +909,14 @@ class LevelDBClient(store: LevelDBStore)
 
           val entry_record:QueueEntryRecord = value
           val pos = if(entry_record.message_locator!=null) {
-            Some(decode_long(entry_record.message_locator))
+            Some(decode_locator(entry_record.message_locator)._1)
           } else {
-            index.get(encode(message_prefix, entry_record.message_key)).map(decode_long(_))
+            index.get(encode_key(message_prefix, entry_record.message_key)).map(decode_locator(_)._1)
           }
 
           pos.flatMap(lookup_usage(_)).foreach { usage =>
             if( usage.first_reference_queue == null ) {
-              usage.first_reference_queue = index.get(encode(queue_prefix, entry_record.queue_key), ro).map( x=> decode_queue_record(x) ).getOrElse(null)
+              usage.first_reference_queue = index.get(encode_key(queue_prefix, entry_record.queue_key), ro).map( x=> decode_queue_record(x) ).getOrElse(null)
             }
             usage.increment(entry_record.size)
           }
@@ -998,26 +1009,32 @@ class LevelDBClient(store: LevelDBStore)
         log.appender { appender =>
           streams.using_map_stream { stream=>
             foreach[MapEntryPB.Buffer](stream, MapEntryPB.FACTORY) { pb =>
-              index.put(encode(map_prefix, pb.getKey), pb.getValue.toByteArray)
+              index.put(encode_key(map_prefix, pb.getKey), pb.getValue.toByteArray)
             }
           }
 
           streams.using_queue_stream { stream=>
             foreach[QueuePB.Buffer](stream, QueuePB.FACTORY) { record=>
-              index.put(encode(queue_prefix, record.key), record.toUnframedByteArray)
+              index.put(encode_key(queue_prefix, record.key), record.toUnframedByteArray)
             }
           }
 
           streams.using_message_stream { stream=>
             foreach[MessagePB.Buffer](stream, MessagePB.FACTORY) { record=>
-              val pos = appender.append(LOG_ADD_MESSAGE, record.toUnframedByteArray)
-              index.put(encode(message_prefix, record.key), encode(pos))
+              val message_data = record.toUnframedByteArray
+              val pos = appender.append(LOG_ADD_MESSAGE, message_data)
+              index.put(encode_key(message_prefix, record.key), encode_locator(pos, message_data.length))
             }
           }
 
           streams.using_queue_entry_stream { stream=>
             foreach[QueueEntryPB.Buffer](stream, QueueEntryPB.FACTORY) { record=>
-              index.put(encode(queue_entry_prefix, record.queue_key, record.entry_seq), record.toUnframedByteArray)
+              val r:QueueEntryRecord = record
+              val copy = record.copy();
+              index.get(encode_key(message_prefix, r.message_key)).foreach { locator=>
+                copy.setMessageLocator(new Buffer(locator))
+                index.put(encode_key(queue_entry_prefix, r.queue_key, r.entry_seq), copy.freeze().toUnframedByteArray)
+              }
             }
           }
         }

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala?rev=1211056&r1=1211055&r2=1211056&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala Tue Dec  6 17:54:22 2011
@@ -20,7 +20,7 @@ import java.{lang=>jl}
 import java.{util=>ju}
 
 import org.apache.activemq.apollo.util._
-import org.fusesource.hawtbuf.{DataByteArrayOutputStream, AbstractVarIntSupport}
+import org.fusesource.hawtbuf.DataByteArrayOutputStream
 import java.io._
 import java.util.zip.CRC32
 import java.util.Map.Entry
@@ -28,8 +28,6 @@ import java.util.Arrays
 import collection.mutable.{HashMap, HashSet}
 import collection.immutable.TreeMap
 import java.util.concurrent.atomic.AtomicLong
-import java.util.concurrent.TimeUnit
-import org.fusesource.hawtdispatch.BaseRetained
 import java.nio.ByteBuffer
 
 object RecordLog {
@@ -162,7 +160,7 @@ case class RecordLog(directory: File, lo
 
       val start = outbound.position
       outbound.writeByte(id);
-      outbound.writeVarInt(data.length)
+      outbound.writeInt(data.length)
       outbound.write(data);
       val count = outbound.position - start
 
@@ -182,24 +180,26 @@ case class RecordLog(directory: File, lo
 
     val is = new RandomAccessFile(file, "r")
 
-    val var_support = new AbstractVarIntSupport {
-      def writeByte(p1: Int) = sys.error("Not supported")
-      def readByte(): Byte = is.readByte()
-    };
-
     def read(pos:Long) = this.synchronized {
       is.seek(pos-start)
       val id = is.read()
       if( id == LOG_HEADER_PREFIX(0) ) {
         (id, null, pos+LOG_HEADER_SIZE)
       } else {
-        val length = var_support.readVarInt()
+        val length = is.readInt()
         val data = new Array[Byte](length)
         is.readFully(data)
         (id, data, is.getFilePointer)
       }
     }
 
+    def read(pos:Long, length:Int) = this.synchronized {
+      is.seek((pos-start)+5)
+      val data = new Array[Byte](length)
+      is.readFully(data)
+      data
+    }
+
     def close = this.synchronized {
       is.close()
     }
@@ -358,5 +358,8 @@ case class RecordLog(directory: File, lo
   def read(pos:Long) = {
     get_reader(pos)(_.read(pos))
   }
+  def read(pos:Long, length:Int) = {
+    get_reader(pos)(_.read(pos, length))
+  }
 
 }