You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/12/06 18:54:22 UTC
svn commit: r1211056 - in
/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb:
HelperTrait.scala LevelDBClient.scala RecordLog.scala
Author: chirino
Date: Tue Dec 6 17:54:22 2011
New Revision: 1211056
URL: http://svn.apache.org/viewvc?rev=1211056&view=rev
Log:
the LevelDB locators now contain both pos and length of the message so the messages can be more efficiently loaded from disk.
Modified:
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/HelperTrait.scala
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/HelperTrait.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/HelperTrait.scala?rev=1211056&r1=1211055&r2=1211056&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/HelperTrait.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/HelperTrait.scala Tue Dec 6 17:54:22 2011
@@ -22,36 +22,46 @@ import java.io.DataOutput
object HelperTrait {
- def encode(a1:Long):Array[Byte] = {
-// val out = new DataByteArrayOutputStream(
-// AbstractVarIntSupport.computeVarLongSize(a1)
-// )
-// out.writeVarLong(a1)
- val out = new DataByteArrayOutputStream(8)
- out.writeLong(a1)
+ def encode_locator(pos:Long, len:Int):Array[Byte] = {
+ val out = new DataByteArrayOutputStream(
+ AbstractVarIntSupport.computeVarLongSize(pos)+
+ AbstractVarIntSupport.computeVarIntSize(len)
+ )
+ out.writeVarLong(pos)
+ out.writeVarInt(len)
out.getData
}
- def decode_long(bytes:Buffer):Long = {
+ def decode_locator(bytes:Array[Byte]):(Long, Int) = {
val in = new DataByteArrayInputStream(bytes)
-// in.readVarLong()
- in.readLong()
+ (in.readVarLong(), in.readVarInt())
+ }
+ def decode_locator(bytes:Buffer):(Long, Int) = {
+ val in = new DataByteArrayInputStream(bytes)
+ (in.readVarLong(), in.readVarInt())
+ }
+
+ def encode_vlong(a1:Long):Array[Byte] = {
+ val out = new DataByteArrayOutputStream(
+ AbstractVarIntSupport.computeVarLongSize(a1)
+ )
+ out.writeVarLong(a1)
+ out.getData
}
- def decode_long(bytes:Array[Byte]):Long = {
+ def decode_vlong(bytes:Array[Byte]):Long = {
val in = new DataByteArrayInputStream(bytes)
-// in.readVarLong()
- in.readLong()
+ in.readVarLong()
}
- def encode(a1:Byte, a2:Long):Array[Byte] = {
+ def encode_key(a1:Byte, a2:Long):Array[Byte] = {
val out = new DataByteArrayOutputStream(9)
out.writeByte(a1.toInt)
out.writeLong(a2)
out.getData
}
- def encode(a1:Byte, a2:Buffer):Array[Byte] = {
+ def encode_key(a1:Byte, a2:Buffer):Array[Byte] = {
val out = new DataByteArrayOutputStream(1+a2.length)
out.writeByte(a1.toInt)
a2.writeTo(out.asInstanceOf[DataOutput])
@@ -63,7 +73,7 @@ object HelperTrait {
(in.readByte(), in.readLong())
}
- def encode(a1:Byte, a2:Long, a3:Long):Array[Byte] = {
+ def encode_key(a1:Byte, a2:Long, a3:Long):Array[Byte] = {
val out = new DataByteArrayOutputStream(17)
out.writeByte(a1)
out.writeLong(a2)
Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala?rev=1211056&r1=1211055&r2=1211056&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala Tue Dec 6 17:54:22 2011
@@ -297,20 +297,20 @@ class LevelDBClient(store: LevelDBStore)
try {
while (pos < log.appender_limit) {
log.read(pos).map {
- case (kind, data, len) =>
+ case (kind, data, next_pos) =>
kind match {
case LOG_ADD_MESSAGE =>
val record: MessageRecord = data
- index.put(encode(message_prefix, record.key), encode(pos))
+ index.put(encode_key(message_prefix, record.key), encode_locator(pos, data.length))
case LOG_ADD_QUEUE_ENTRY =>
val record: QueueEntryRecord = data
- index.put(encode(queue_entry_prefix, record.queue_key, record.entry_seq), data)
+ index.put(encode_key(queue_entry_prefix, record.queue_key, record.entry_seq), data)
// Figure out which log file this message reference is pointing at..
val log_key = (if(record.message_locator!=null) {
- Some(decode_long(record.message_locator))
+ Some(decode_locator(record.message_locator)._1)
} else {
- index.get(encode(message_prefix, record.message_key)).map(decode_long(_))
+ index.get(encode_key(message_prefix, record.message_key)).map(decode_locator(_)._1)
}).flatMap(log.log_info(_)).map(_.position)
// Increment it.
@@ -325,9 +325,9 @@ class LevelDBClient(store: LevelDBStore)
// Figure out which log file this message reference is pointing at..
val log_key = (if(record.message_locator!=null) {
- Some(decode_long(record.message_locator))
+ Some(decode_locator(record.message_locator)._1)
} else {
- index.get(encode(message_prefix, record.message_key)).map(decode_long(_))
+ index.get(encode_key(message_prefix, record.message_key)).map(decode_locator(_)._1)
}).flatMap(log.log_info(_)).map(_.position)
// Decrement it.
@@ -344,14 +344,14 @@ class LevelDBClient(store: LevelDBStore)
case LOG_ADD_QUEUE =>
val record: QueueRecord = data
- index.put(encode(queue_prefix, record.key), data)
+ index.put(encode_key(queue_prefix, record.key), data)
case LOG_REMOVE_QUEUE =>
val ro = new ReadOptions
ro.fillCache(false)
ro.verifyChecksums(verify_checksums)
- val queue_key = decode_long(data)
- index.delete(encode(queue_prefix, queue_key))
- index.cursor_keys_prefixed(encode(queue_entry_prefix, queue_key), ro) {
+ val queue_key = decode_vlong(data)
+ index.delete(encode_key(queue_prefix, queue_key))
+ index.cursor_keys_prefixed(encode_key(queue_entry_prefix, queue_key), ro) {
key =>
index.delete(key)
true
@@ -359,14 +359,14 @@ class LevelDBClient(store: LevelDBStore)
case LOG_MAP_ENTRY =>
val entry = MapEntryPB.FACTORY.parseUnframed(data)
if (entry.getValue == null) {
- index.delete(encode(map_prefix, entry.getKey))
+ index.delete(encode_key(map_prefix, entry.getKey))
} else {
- index.put(encode(map_prefix, entry.getKey), entry.getValue.toByteArray)
+ index.put(encode_key(map_prefix, entry.getKey), entry.getValue.toByteArray)
}
case _ =>
// Skip unknown records like the RecordLog headers.
}
- pos += len
+ pos = next_pos
}
}
}
@@ -546,7 +546,7 @@ class LevelDBClient(store: LevelDBStore)
retry_using_index {
log.appender { appender =>
appender.append(LOG_ADD_QUEUE, record)
- index.put(encode(queue_prefix, record.key), record)
+ index.put(encode_key(queue_prefix, record.key), record)
}
}
callback.run
@@ -558,9 +558,9 @@ class LevelDBClient(store: LevelDBStore)
val ro = new ReadOptions
ro.fillCache(false)
ro.verifyChecksums(verify_checksums)
- appender.append(LOG_REMOVE_QUEUE, encode(queue_key))
- index.delete(encode(queue_prefix, queue_key))
- index.cursor_keys_prefixed(encode(queue_entry_prefix, queue_key), ro) { key=>
+ appender.append(LOG_REMOVE_QUEUE, encode_vlong(queue_key))
+ index.delete(encode_key(queue_prefix, queue_key))
+ index.cursor_keys_prefixed(encode_key(queue_entry_prefix, queue_key), ro) { key=>
index.delete(key)
true
}
@@ -581,10 +581,10 @@ class LevelDBClient(store: LevelDBStore)
val entry = new MapEntryPB.Bean()
entry.setKey(key)
if( value==null ) {
- batch.delete(encode(map_prefix, key))
+ batch.delete(encode_key(map_prefix, key))
} else {
entry.setValue(value)
- batch.put(encode(map_prefix, key), value.toByteArray)
+ batch.put(encode_key(map_prefix, key), value.toByteArray)
}
appender.append(LOG_MAP_ENTRY, entry.freeze().toUnframedByteArray)
}
@@ -592,24 +592,29 @@ class LevelDBClient(store: LevelDBStore)
uow.actions.foreach { case (msg, action) =>
val message_record = action.message_record
var pos = -1L
- var pos_buffer:Buffer = null
+ var len = 0
+ var locator_buffer:Buffer = null
if (message_record != null) {
- pos = appender.append(LOG_ADD_MESSAGE, message_record)
- val pos_encoded = encode(pos)
- pos_buffer = new Buffer(pos_encoded)
+ val message_data:Array[Byte] = message_record
+ len = message_data.length
+ pos = appender.append(LOG_ADD_MESSAGE, message_data)
+ val locator_data = encode_locator(pos, len)
+ locator_buffer = new Buffer(locator_data)
if( message_record.locator !=null ) {
- message_record.locator.set(pos_encoded);
+ message_record.locator.set(locator_data);
}
- batch.put(encode(message_prefix, action.message_record.key), pos_encoded)
+ batch.put(encode_key(message_prefix, action.message_record.key), locator_data)
}
action.dequeues.foreach { entry =>
- if( pos_buffer==null && entry.message_locator!=null ) {
- pos_buffer = entry.message_locator
- pos = decode_long(pos_buffer)
+ if( locator_buffer==null && entry.message_locator!=null ) {
+ locator_buffer = entry.message_locator
+ val t = decode_locator(locator_buffer)
+ pos = t._1
+ len = t._2
}
- val key = encode(queue_entry_prefix, entry.queue_key, entry.entry_seq)
+ val key = encode_key(queue_entry_prefix, entry.queue_key, entry.entry_seq)
appender.append(LOG_REMOVE_QUEUE_ENTRY, key)
batch.delete(key)
@@ -623,10 +628,10 @@ class LevelDBClient(store: LevelDBStore)
}
action.enqueues.foreach { entry =>
- entry.message_locator = pos_buffer
+ entry.message_locator = locator_buffer
val encoded:Array[Byte] = entry
appender.append(LOG_ADD_QUEUE_ENTRY, encoded)
- batch.put(encode(queue_entry_prefix, entry.queue_key, entry.entry_seq), encoded)
+ batch.put(encode_key(queue_entry_prefix, entry.queue_key, entry.entry_seq), encoded)
// Increment it.
log.log_info(pos).foreach { log_info=>
@@ -665,18 +670,24 @@ class LevelDBClient(store: LevelDBStore)
val (message_key, locator, callback) = x
val record = metric_load_from_index_counter.time {
var pos = 0L
- var pos_array:Array[Byte] = null
+ var len = 0
+ var locator_data:Array[Byte] = null
if( locator!=null ) {
- pos_array = locator.get()
- if( pos_array!=null ) {
- pos = decode_long(pos_array)
+ locator_data = locator.get()
+ if( locator_data!=null ) {
+ val t = decode_locator(locator_data)
+ pos = t._1
+ len = t._2
}
}
if( pos == 0L ) {
- index.get(encode(message_prefix, message_key), ro) match {
+ index.get(encode_key(message_prefix, message_key), ro) match {
case Some(value) =>
- pos_array = value
- pos = decode_long(pos_array)
+ locator_data = value
+ val t = decode_locator(locator_data)
+ pos = t._1
+ len = t._2
+
case None =>
pos = 0L
}
@@ -684,9 +695,9 @@ class LevelDBClient(store: LevelDBStore)
if (pos == 0L ) {
None
} else {
- log.read(pos).map { case (prefix, data, _)=>
+ log.read(pos, len).map { data =>
val rc:MessageRecord = data
- rc.locator = new AtomicReference[Array[Byte]](pos_array)
+ rc.locator = new AtomicReference[Array[Byte]](locator_data)
rc
}
}
@@ -712,11 +723,11 @@ class LevelDBClient(store: LevelDBStore)
missing.foreach { x =>
val (message_key, locator, callback) = x
val record = metric_load_from_index_counter.time {
- index.get(encode(message_prefix, message_key), ro).flatMap{ pos_array=>
- val pos = decode_long(pos_array)
- log.read(pos).map { case (prefix, data, _)=>
+ index.get(encode_key(message_prefix, message_key), ro).flatMap{ locator_data=>
+ val (pos, len) = decode_locator(locator_data)
+ log.read(pos, len).map { data =>
val rc:MessageRecord = data
- rc.locator = new AtomicReference[Array[Byte]](pos_array)
+ rc.locator = new AtomicReference[Array[Byte]](locator_data)
rc
}
}
@@ -746,7 +757,7 @@ class LevelDBClient(store: LevelDBStore)
val ro = new ReadOptions
ro.fillCache(false)
ro.verifyChecksums(verify_checksums)
- index.get(encode(queue_prefix, queue_key), ro).map( x=> decode_queue_record(x) )
+ index.get(encode_key(queue_prefix, queue_key), ro).map( x=> decode_queue_record(x) )
}
}
@@ -760,7 +771,7 @@ class LevelDBClient(store: LevelDBStore)
ro.snapshot(snapshot)
var group:QueueEntryRange = null
- index.cursor_prefixed( encode(queue_entry_prefix, queue_key), ro) { (key, value) =>
+ index.cursor_prefixed( encode_key(queue_entry_prefix, queue_key), ro) { (key, value) =>
val (_,_,current_key) = decode_long_long_key(key)
if( group == null ) {
@@ -805,8 +816,8 @@ class LevelDBClient(store: LevelDBStore)
retry_using_index {
index.snapshot { snapshot =>
ro.snapshot(snapshot)
- val start = encode(queue_entry_prefix, queue_key, firstSeq)
- val end = encode(queue_entry_prefix, queue_key, lastSeq+1)
+ val start = encode_key(queue_entry_prefix, queue_key, firstSeq)
+ val end = encode_key(queue_entry_prefix, queue_key, lastSeq+1)
index.cursor_range( start, end, ro ) { (key, value) =>
rc += value
true
@@ -824,7 +835,7 @@ class LevelDBClient(store: LevelDBStore)
def get(key: Buffer):Option[Buffer] = {
retry_using_index {
- index.get(encode(map_prefix, key)).map(new Buffer(_))
+ index.get(encode_key(map_prefix, key)).map(new Buffer(_))
}
}
@@ -898,14 +909,14 @@ class LevelDBClient(store: LevelDBStore)
val entry_record:QueueEntryRecord = value
val pos = if(entry_record.message_locator!=null) {
- Some(decode_long(entry_record.message_locator))
+ Some(decode_locator(entry_record.message_locator)._1)
} else {
- index.get(encode(message_prefix, entry_record.message_key)).map(decode_long(_))
+ index.get(encode_key(message_prefix, entry_record.message_key)).map(decode_locator(_)._1)
}
pos.flatMap(lookup_usage(_)).foreach { usage =>
if( usage.first_reference_queue == null ) {
- usage.first_reference_queue = index.get(encode(queue_prefix, entry_record.queue_key), ro).map( x=> decode_queue_record(x) ).getOrElse(null)
+ usage.first_reference_queue = index.get(encode_key(queue_prefix, entry_record.queue_key), ro).map( x=> decode_queue_record(x) ).getOrElse(null)
}
usage.increment(entry_record.size)
}
@@ -998,26 +1009,32 @@ class LevelDBClient(store: LevelDBStore)
log.appender { appender =>
streams.using_map_stream { stream=>
foreach[MapEntryPB.Buffer](stream, MapEntryPB.FACTORY) { pb =>
- index.put(encode(map_prefix, pb.getKey), pb.getValue.toByteArray)
+ index.put(encode_key(map_prefix, pb.getKey), pb.getValue.toByteArray)
}
}
streams.using_queue_stream { stream=>
foreach[QueuePB.Buffer](stream, QueuePB.FACTORY) { record=>
- index.put(encode(queue_prefix, record.key), record.toUnframedByteArray)
+ index.put(encode_key(queue_prefix, record.key), record.toUnframedByteArray)
}
}
streams.using_message_stream { stream=>
foreach[MessagePB.Buffer](stream, MessagePB.FACTORY) { record=>
- val pos = appender.append(LOG_ADD_MESSAGE, record.toUnframedByteArray)
- index.put(encode(message_prefix, record.key), encode(pos))
+ val message_data = record.toUnframedByteArray
+ val pos = appender.append(LOG_ADD_MESSAGE, message_data)
+ index.put(encode_key(message_prefix, record.key), encode_locator(pos, message_data.length))
}
}
streams.using_queue_entry_stream { stream=>
foreach[QueueEntryPB.Buffer](stream, QueueEntryPB.FACTORY) { record=>
- index.put(encode(queue_entry_prefix, record.queue_key, record.entry_seq), record.toUnframedByteArray)
+ val r:QueueEntryRecord = record
+ val copy = record.copy();
+ index.get(encode_key(message_prefix, r.message_key)).foreach { locator=>
+ copy.setMessageLocator(new Buffer(locator))
+ index.put(encode_key(queue_entry_prefix, r.queue_key, r.entry_seq), copy.freeze().toUnframedByteArray)
+ }
}
}
}
Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala?rev=1211056&r1=1211055&r2=1211056&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala Tue Dec 6 17:54:22 2011
@@ -20,7 +20,7 @@ import java.{lang=>jl}
import java.{util=>ju}
import org.apache.activemq.apollo.util._
-import org.fusesource.hawtbuf.{DataByteArrayOutputStream, AbstractVarIntSupport}
+import org.fusesource.hawtbuf.DataByteArrayOutputStream
import java.io._
import java.util.zip.CRC32
import java.util.Map.Entry
@@ -28,8 +28,6 @@ import java.util.Arrays
import collection.mutable.{HashMap, HashSet}
import collection.immutable.TreeMap
import java.util.concurrent.atomic.AtomicLong
-import java.util.concurrent.TimeUnit
-import org.fusesource.hawtdispatch.BaseRetained
import java.nio.ByteBuffer
object RecordLog {
@@ -162,7 +160,7 @@ case class RecordLog(directory: File, lo
val start = outbound.position
outbound.writeByte(id);
- outbound.writeVarInt(data.length)
+ outbound.writeInt(data.length)
outbound.write(data);
val count = outbound.position - start
@@ -182,24 +180,26 @@ case class RecordLog(directory: File, lo
val is = new RandomAccessFile(file, "r")
- val var_support = new AbstractVarIntSupport {
- def writeByte(p1: Int) = sys.error("Not supported")
- def readByte(): Byte = is.readByte()
- };
-
def read(pos:Long) = this.synchronized {
is.seek(pos-start)
val id = is.read()
if( id == LOG_HEADER_PREFIX(0) ) {
(id, null, pos+LOG_HEADER_SIZE)
} else {
- val length = var_support.readVarInt()
+ val length = is.readInt()
val data = new Array[Byte](length)
is.readFully(data)
(id, data, is.getFilePointer)
}
}
+ def read(pos:Long, length:Int) = this.synchronized {
+ is.seek((pos-start)+5)
+ val data = new Array[Byte](length)
+ is.readFully(data)
+ data
+ }
+
def close = this.synchronized {
is.close()
}
@@ -358,5 +358,8 @@ case class RecordLog(directory: File, lo
def read(pos:Long) = {
get_reader(pos)(_.read(pos))
}
+ def read(pos:Long, length:Int) = {
+ get_reader(pos)(_.read(pos, length))
+ }
}