You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/02/18 00:55:46 UTC
svn commit: r1245811 -
/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
Author: chirino
Date: Fri Feb 17 23:55:46 2012
New Revision: 1245811
URL: http://svn.apache.org/viewvc?rev=1245811&view=rev
Log:
Switch to Unframed protobuf messages to further reduce index usage size.
Modified:
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala?rev=1245811&r1=1245810&r2=1245811&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala Fri Feb 17 23:55:46 2012
@@ -47,7 +47,7 @@ import org.apache.activemq.apollo.broker
object LevelDBClient extends Log {
final val STORE_SCHEMA_PREFIX = "leveldb_store:"
- final val STORE_SCHEMA_VERSION = 2
+ final val STORE_SCHEMA_VERSION = 3
final val queue_prefix = 'q'.toByte
final val queue_entry_prefix = 'e'.toByte
@@ -389,27 +389,27 @@ class LevelDBClient(store: LevelDBStore)
kind match {
case LOG_ADD_QUEUE_ENTRY =>
replay_operations+=1
- val record = QueueEntryPB.FACTORY.parseFramed(data)
+ val record = QueueEntryPB.FACTORY.parseUnframed(data)
val index_record = record.copy()
index_record.clearQueueKey()
index_record.clearQueueSeq()
- index.put(encode_key(queue_entry_prefix, record.getQueueKey, record.getQueueSeq), index_record.freeze().toFramedBuffer)
+ index.put(encode_key(queue_entry_prefix, record.getQueueKey, record.getQueueSeq), index_record.freeze().toUnframedBuffer)
log_ref_increment(decode_vlong(record.getMessageLocator))
case LOG_REMOVE_QUEUE_ENTRY =>
replay_operations+=1
index.get(data, new ReadOptions).foreach { value=>
- val record = QueueEntryPB.FACTORY.parseFramed(value)
+ val record = QueueEntryPB.FACTORY.parseUnframed(value)
val pos = decode_vlong(record.getMessageLocator)
pos.foreach(log_ref_decrement(_))
index.delete(data)
}
-
+
case LOG_ADD_QUEUE =>
replay_operations+=1
- val record = QueuePB.FACTORY.parseFramed(data)
+ val record = QueuePB.FACTORY.parseUnframed(data)
index.put(encode_key(queue_prefix, record.getKey), data)
case LOG_REMOVE_QUEUE =>
@@ -424,7 +424,7 @@ class LevelDBClient(store: LevelDBStore)
// Figure out what log file that message entry was in so we can,
// decrement the log file reference.
- val record = QueueEntryPB.FACTORY.parseFramed(value)
+ val record = QueueEntryPB.FACTORY.parseUnframed(value)
val pos = decode_vlong(record.getMessageLocator)
log_ref_decrement(pos)
true
@@ -432,7 +432,7 @@ class LevelDBClient(store: LevelDBStore)
case LOG_MAP_ENTRY =>
replay_operations+=1
- val entry = MapEntryPB.FACTORY.parseFramed(data)
+ val entry = MapEntryPB.FACTORY.parseUnframed(data)
if (entry.getValue == null) {
index.delete(encode_key(map_prefix, entry.getKey))
} else {
@@ -467,11 +467,11 @@ class LevelDBClient(store: LevelDBStore)
var referenced_queues = Set[Long]()
// Lets find out what the queue entries are..
- var fixed_records = 0
+ var fixed_records = 0
index.cursor_prefixed(queue_entry_prefix_array) { (key, value)=>
try {
val (_, queue_key, seq_key) = decode_long_long_key(key)
- val record = QueueEntryPB.FACTORY.parseFramed(value)
+ val record = QueueEntryPB.FACTORY.parseUnframed(value)
val (pos, len) = decode_locator(record.getMessageLocator)
if (record.getQueueKey != queue_key) {
throw new IOException("key missmatch")
@@ -493,12 +493,12 @@ class LevelDBClient(store: LevelDBStore)
}
true
}
-
+
// Lets cross check the queues.
index.cursor_prefixed(queue_prefix_array) { (key, value)=>
try {
val (_, queue_key) = decode_long_key(key)
- val record = QueuePB.FACTORY.parseFramed(value)
+ val record = QueuePB.FACTORY.parseUnframed(value)
if (record.getKey != queue_key) {
throw new IOException("key missmatch")
}
@@ -519,7 +519,7 @@ class LevelDBClient(store: LevelDBStore)
trace("invalid queue entry record: %s, error: queue key does not exits %s", new Buffer(key), queue_key)
fixed_records += 1
index.delete(key)
- val record = QueueEntryPB.FACTORY.parseFramed(value)
+ val record = QueueEntryPB.FACTORY.parseUnframed(value)
val pos = decode_vlong(record.getMessageLocator)
log.log_info(pos).foreach { log_info =>
actual_log_refs.get(log_info.position).foreach { counter =>
@@ -537,14 +537,14 @@ class LevelDBClient(store: LevelDBStore)
log_refs.clear()
log_refs ++= actual_log_refs
}
-
+
if( fixed_records > 0 ) {
warn("Fixed %d invalid index enties in the leveldb store", fixed_records)
}
}
var lock_file:LockFile = _
-
+
def lock_store = {
import OptionSupport._
if (config.fail_if_locked.getOrElse(false)) {
@@ -555,11 +555,11 @@ class LevelDBClient(store: LevelDBStore)
}
}
}
-
+
def unlock_store = {
lock_file.unlock()
}
-
+
private def store_log_refs = {
index.put(log_refs_index_key, JsonCodec.encode(collection.JavaConversions.mapAsJavaMap(log_refs.mapValues(_.get()))).toByteArray)
}
@@ -573,7 +573,7 @@ class LevelDBClient(store: LevelDBStore)
}
}
}
-
+
def stop() = {
// this blocks until all io completes..
// Suspend also deletes the index.
@@ -768,7 +768,7 @@ class LevelDBClient(store: LevelDBStore)
// Figure out what log file that message entry was in so we can,
// decrement the log file reference.
- val record = QueueEntryPB.FACTORY.parseFramed(value)
+ val record = QueueEntryPB.FACTORY.parseUnframed(value)
val pos = decode_vlong(record.getMessageLocator)
log_ref_decrement(pos)
true
@@ -795,7 +795,7 @@ class LevelDBClient(store: LevelDBStore)
entry.setValue(value)
batch.put(encode_key(map_prefix, key), value.toByteArray)
}
- appender.append(LOG_MAP_ENTRY, entry.freeze().toFramedByteArray)
+ appender.append(LOG_MAP_ENTRY, entry.freeze().toUnframedByteArray)
}
uow.actions.foreach { case (msg, action) =>
@@ -846,18 +846,18 @@ class LevelDBClient(store: LevelDBStore)
if (entry.redeliveries!=0)
log_record.setRedeliveries(entry.redeliveries)
- appender.append(LOG_ADD_QUEUE_ENTRY, log_record.freeze().toFramedBuffer)
+ appender.append(LOG_ADD_QUEUE_ENTRY, log_record.freeze().toUnframedBuffer)
// Slim down the index record, the smaller it is the cheaper the compactions
// will be and the more we can cache in mem.
val index_record = log_record.copy()
index_record.clearQueueKey()
index_record.clearQueueSeq()
- batch.put(encode_key(queue_entry_prefix, entry.queue_key, entry.entry_seq), index_record.freeze().toFramedBuffer)
-
+ batch.put(encode_key(queue_entry_prefix, entry.queue_key, entry.entry_seq), index_record.freeze().toUnframedBuffer)
+
// Increment it.
log_ref_increment(pos, log_info)
-
+
}
}
if( uow.flush_sync ) {
@@ -974,7 +974,7 @@ class LevelDBClient(store: LevelDBStore)
group.first_entry_seq = current_key
}
- val entry = QueueEntryPB.FACTORY.parseFramed(value)
+ val entry = QueueEntryPB.FACTORY.parseUnframed(value)
val pos = decode_vlong(entry.getMessageLocator)
group.last_entry_seq = current_key
@@ -1016,7 +1016,7 @@ class LevelDBClient(store: LevelDBStore)
val end = encode_key(queue_entry_prefix, queue_key, lastSeq+1)
index.cursor_range( start, end, ro ) { (key, value) =>
val (_, _, queue_seq) = decode_long_long_key(key)
- val record = QueueEntryPB.FACTORY.parseFramed(value)
+ val record = QueueEntryPB.FACTORY.parseUnframed(value)
val entry = PBSupport.from_pb(record)
entry.queue_key = queue_key
entry.entry_seq = queue_seq
@@ -1036,7 +1036,7 @@ class LevelDBClient(store: LevelDBStore)
index.get(encode_key(map_prefix, key)).map(new Buffer(_))
}
}
-
+
def get_prefixed_map_entries(prefix:Buffer):Seq[(Buffer, Buffer)] = {
val rc = ListBuffer[(Buffer, Buffer)]()
retry_using_index {
@@ -1046,7 +1046,7 @@ class LevelDBClient(store: LevelDBStore)
}
}
rc
- }
+ }
def get_last_queue_key:Long = {
retry_using_index {
@@ -1074,12 +1074,12 @@ class LevelDBClient(store: LevelDBStore)
}
}
}
-
+
case class UsageCounter(info:LogInfo) {
var count = 0L
var size = 0L
var first_reference_queue:QueueRecord = _
-
+
def increment(value:Int) = {
count += 1
size += value
@@ -1150,19 +1150,19 @@ class LevelDBClient(store: LevelDBStore)
val manager = ExportStreamManager(os, 1)
retry_using_index {
-
+
// Delete all the tmp keys..
index.cursor_keys_prefixed(Array(tmp_prefix)) { key =>
index.delete(key)
true
}
-
+
index.snapshot { snapshot=>
val nocache = new ReadOptions
nocache.snapshot(snapshot)
nocache.verifyChecksums(verify_checksums)
nocache.fillCache(false)
-
+
val cache = new ReadOptions
nocache.snapshot(snapshot)
nocache.verifyChecksums(false)
@@ -1171,7 +1171,7 @@ class LevelDBClient(store: LevelDBStore)
// Build a temp table of all references messages by the queues
// Remember 2 queues could reference the same message.
index.cursor_prefixed(queue_entry_prefix_array, cache) { (_, value) =>
- val record = QueueEntryPB.FACTORY.parseFramed(value)
+ val record = QueueEntryPB.FACTORY.parseUnframed(value)
val (pos, len) = decode_locator(record.getMessageLocator)
index.put(encode_key(tmp_prefix, pos), encode_vlong(len))
true
@@ -1184,7 +1184,7 @@ class LevelDBClient(store: LevelDBStore)
val len = decode_vlong(value).toInt
log.read(pos, len).foreach { value =>
// Set the message key to be the position in the log.
- val record = MessagePB.FACTORY.parseFramed(value).copy
+ val record = MessagePB.FACTORY.parseUnframed(value).copy
record.setMessageKey(pos)
manager.store_message(record)
}
@@ -1194,7 +1194,7 @@ class LevelDBClient(store: LevelDBStore)
// Now export the queue entries
index.cursor_prefixed(queue_entry_prefix_array, nocache) { (key, value) =>
val (_, queue_key, queue_seq) = decode_long_long_key(key)
- val record = QueueEntryPB.FACTORY.parseFramed(value).copy()
+ val record = QueueEntryPB.FACTORY.parseUnframed(value).copy()
val (pos, len) = decode_locator(record.getMessageLocator)
record.setQueueKey(queue_key)
record.setQueueSeq(queue_seq)
@@ -1204,7 +1204,7 @@ class LevelDBClient(store: LevelDBStore)
}
index.cursor_prefixed(queue_prefix_array) { (_, value) =>
- val record = QueuePB.FACTORY.parseFramed(value)
+ val record = QueuePB.FACTORY.parseUnframed(value)
manager.store_queue(record)
true
}
@@ -1253,7 +1253,7 @@ class LevelDBClient(store: LevelDBStore)
while(manager.getNext match {
case record:MessagePB.Buffer =>
- val message_data = record.toFramedBuffer
+ val message_data = record.toUnframedBuffer
val (pos, _) = appender.append(LOG_ADD_MESSAGE, message_data)
index.put(encode_key(tmp_prefix, record.getMessageKey), encode_locator(pos, message_data.length))
true
@@ -1265,7 +1265,7 @@ class LevelDBClient(store: LevelDBStore)
case Some(locator)=>
val (pos, len) = decode_locator(locator)
copy.setMessageLocator(locator)
- index.put(encode_key(queue_entry_prefix, record.getQueueKey, record.getQueueSeq), copy.freeze().toFramedBuffer)
+ index.put(encode_key(queue_entry_prefix, record.getQueueKey, record.getQueueSeq), copy.freeze().toUnframedBuffer)
log.log_info(pos).foreach { log_info =>
log_refs.getOrElseUpdate(log_info.position, new LongCounter()).incrementAndGet()
}
@@ -1275,7 +1275,7 @@ class LevelDBClient(store: LevelDBStore)
true
case record:QueuePB.Buffer =>
- index.put(encode_key(queue_prefix, record.getKey), record.toFramedBuffer)
+ index.put(encode_key(queue_prefix, record.getKey), record.toUnframedBuffer)
true
case record:MapEntryPB.Buffer =>