You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/02/14 22:07:34 UTC
svn commit: r1244209 - in
/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb:
LevelDBClient.scala RecordLog.scala
Author: chirino
Date: Tue Feb 14 21:07:33 2012
New Revision: 1244209
URL: http://svn.apache.org/viewvc?rev=1244209&view=rev
Log:
Small optimization for when the store grows really large with lots of log files.
Modified:
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala?rev=1244209&r1=1244208&r2=1244209&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala Tue Feb 14 21:07:33 2012
@@ -697,8 +697,8 @@ class LevelDBClient(store: LevelDBStore)
callback.run
}
- def log_ref_decrement(pos: Long) = this.synchronized {
- log.log_info(pos) match {
+ def log_ref_decrement(pos: Long, log_info:LogInfo=null) = this.synchronized {
+ Option(log_info).orElse(log.log_info(pos)) match {
case Some(log_info)=>
log_refs.get(log_info.position).foreach { counter =>
val count = counter.decrementAndGet()
@@ -711,8 +711,8 @@ class LevelDBClient(store: LevelDBStore)
}
}
- def log_ref_increment(pos: Long) = this.synchronized {
- log.log_info(pos) match {
+ def log_ref_increment(pos: Long, log_info:LogInfo=null) = this.synchronized {
+ Option(log_info).orElse(log.log_info(pos)) match {
case Some(log_info)=>
val count = log_refs.getOrElseUpdate(log_info.position, new LongCounter()).incrementAndGet()
case None =>
@@ -766,12 +766,14 @@ class LevelDBClient(store: LevelDBStore)
uow.actions.foreach { case (msg, action) =>
val message_record = action.message_record
var locator:(Long, Int) = null
+ var log_info:LogInfo = null
if (message_record != null) {
val message_data = PBSupport.encode_message_record(message_record)
val len = message_data.length
- val pos = appender.append(LOG_ADD_MESSAGE, message_data)
- locator = (pos, len)
+ val p = appender.append(LOG_ADD_MESSAGE, message_data)
+ locator = (p._1, len)
+ log_info = p._2
message_record.locator.set(locator);
}
@@ -784,7 +786,7 @@ class LevelDBClient(store: LevelDBStore)
val key = encode_key(queue_entry_prefix, entry.queue_key, entry.entry_seq)
appender.append(LOG_REMOVE_QUEUE_ENTRY, key)
batch.delete(key)
- log_ref_decrement(pos)
+ log_ref_decrement(pos, log_info)
}
var locator_buffer:Buffer = null
@@ -805,7 +807,7 @@ class LevelDBClient(store: LevelDBStore)
batch.put(encode_key(queue_entry_prefix, entry.queue_key, entry.entry_seq), encoded)
// Increment it.
- log_ref_increment(pos)
+ log_ref_increment(pos, log_info)
}
}
@@ -1197,7 +1199,7 @@ class LevelDBClient(store: LevelDBStore)
case record:MessagePB.Buffer =>
val message_data = record.toFramedBuffer
- val pos = appender.append(LOG_ADD_MESSAGE, message_data)
+ val (pos, _) = appender.append(LOG_ADD_MESSAGE, message_data)
index.put(encode_key(tmp_prefix, record.getMessageKey), encode_locator(pos, message_data.length))
true
Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala?rev=1244209&r1=1244208&r2=1244209&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala Tue Feb 14 21:07:33 2012
@@ -103,6 +103,8 @@ case class RecordLog(directory: File, lo
class LogAppender(file:File, position:Long) extends LogReader(file, position) {
+ val info = new LogInfo(file, position, 0)
+
override def open = new RandomAccessFile(file, "rw")
override def dispose() = {
@@ -139,7 +141,7 @@ case class RecordLog(directory: File, lo
/**
* returns the offset position of the data record.
*/
- def append(id:Byte, data: Buffer): Long = this.synchronized {
+ def append(id:Byte, data: Buffer) = this.synchronized {
val record_position = append_position
val data_length = data.length
val total_length = LOG_HEADER_SIZE + data_length
@@ -180,7 +182,7 @@ case class RecordLog(directory: File, lo
write_buffer.write(data.data, data.offset, data_length)
append_offset += total_length
}
- record_position
+ (record_position,info)
}
def flush = this.synchronized {
@@ -378,7 +380,7 @@ case class RecordLog(directory: File, lo
log_infos += position -> new LogInfo(current_appender.file, current_appender.position, current_appender.append_offset)
}
current_appender = create_log_appender(position)
- log_infos += position -> new LogInfo(current_appender.file, position, 0)
+ log_infos += position -> current_appender.info
}
}