You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/12/06 14:27:22 UTC
svn commit: r1210901 - in /activemq/activemq-apollo/trunk:
apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/
apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/
apollo-leveldb/src/main...
Author: chirino
Date: Tue Dec 6 13:27:22 2011
New Revision: 1210901
URL: http://svn.apache.org/viewvc?rev=1210901&view=rev
Log:
Made the leveldb store's GC passes much more efficient by maintaining in memory log reference counters.
Removed the gc_interval config option since we can now run the gc frequently and fixed it to run every 10 seconds.
Modified:
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreDTO.java
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreStatusDTO.java
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/HelperTrait.scala
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/webapp/WEB-INF/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreStatusDTO.jade
activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreDTO.java?rev=1210901&r1=1210900&r2=1210901&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreDTO.java Tue Dec 6 13:27:22 2011
@@ -34,9 +34,6 @@ public class LevelDBStoreDTO extends Sto
@XmlAttribute
public File directory;
- @XmlAttribute(name="gc_interval")
- public Integer gc_interval;
-
@XmlAttribute(name="read_threads")
public Integer read_threads;
@@ -85,7 +82,6 @@ public class LevelDBStoreDTO extends Sto
LevelDBStoreDTO that = (LevelDBStoreDTO) o;
if (directory != null ? !directory.equals(that.directory) : that.directory != null) return false;
- if (gc_interval != null ? !gc_interval.equals(that.gc_interval) : that.gc_interval != null) return false;
if (index_block_restart_interval != null ? !index_block_restart_interval.equals(that.index_block_restart_interval) : that.index_block_restart_interval != null)
return false;
if (index_block_size != null ? !index_block_size.equals(that.index_block_size) : that.index_block_size != null)
@@ -116,7 +112,6 @@ public class LevelDBStoreDTO extends Sto
public int hashCode() {
int result = super.hashCode();
result = 31 * result + (directory != null ? directory.hashCode() : 0);
- result = 31 * result + (gc_interval != null ? gc_interval.hashCode() : 0);
result = 31 * result + (read_threads != null ? read_threads.hashCode() : 0);
result = 31 * result + (index_factory != null ? index_factory.hashCode() : 0);
result = 31 * result + (sync != null ? sync.hashCode() : 0);
Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreStatusDTO.java?rev=1210901&r1=1210900&r2=1210901&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreStatusDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreStatusDTO.java Tue Dec 6 13:27:22 2011
@@ -48,15 +48,6 @@ public class LevelDBStoreStatusDTO exten
@XmlElement(name="last_checkpoint_pos")
public long index_snapshot_pos;
- @XmlElement(name="last_gc_ts")
- public long last_gc_ts;
-
- @XmlElement(name="in_gc")
- public boolean in_gc;
-
- @XmlElement(name="last_gc_duration")
- public long last_gc_duration;
-
@XmlElement(name="last_append_pos")
public long log_append_pos;
Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/HelperTrait.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/HelperTrait.scala?rev=1210901&r1=1210900&r2=1210901&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/HelperTrait.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/HelperTrait.scala Tue Dec 6 13:27:22 2011
@@ -32,6 +32,12 @@ object HelperTrait {
out.getData
}
+ def decode_long(bytes:Buffer):Long = {
+ val in = new DataByteArrayInputStream(bytes)
+// in.readVarLong()
+ in.readLong()
+ }
+
def decode_long(bytes:Array[Byte]):Long = {
val in = new DataByteArrayInputStream(bytes)
// in.readVarLong()
Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala?rev=1210901&r1=1210900&r2=1210901&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala Tue Dec 6 13:27:22 2011
@@ -26,17 +26,20 @@ import org.apache.activemq.apollo.broker
import java.io._
import java.util.concurrent.TimeUnit
import org.apache.activemq.apollo.util._
-import collection.mutable.ListBuffer
import java.util.concurrent.locks.ReentrantReadWriteLock
import org.fusesource.hawtdispatch._
import org.apache.activemq.apollo.util.{TreeMap=>ApolloTreeMap}
import collection.immutable.TreeMap
-import org.iq80.leveldb._
import org.fusesource.leveldbjni.internal.Util
import org.fusesource.hawtbuf.{Buffer, AbstractVarIntSupport}
import java.util.concurrent.atomic.AtomicReference
import org.apache.activemq.apollo.broker.Broker
import org.apache.activemq.apollo.util.ProcessSupport._
+import collection.mutable.{HashMap, ListBuffer}
+import org.apache.activemq.apollo.dto.JsonCodec
+import java.util.Map
+import org.iq80.leveldb._
+import org.apache.activemq.apollo.broker.store.leveldb.HelperTrait._
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -54,6 +57,7 @@ object LevelDBClient extends Log {
final val queue_entry_prefix_array = Array(queue_entry_prefix)
final val dirty_index_key = bytes(":dirty")
+ final val log_refs_index_key = bytes(":log-refs")
final val TRUE = bytes("true")
final val FALSE = bytes("false")
@@ -89,15 +93,6 @@ object LevelDBClient extends Log {
}): _* )
}
- case class UsageCounter() {
- var count = 0L
- var size = 0L
- def increment(value:Int) = {
- count += 1
- size += value
- }
- }
-
val on_windows = System.getProperty("os.name").toLowerCase().startsWith("windows")
var link_strategy = 0
@@ -193,12 +188,8 @@ class LevelDBClient(store: LevelDBStore)
var last_index_snapshot_pos:Long = _
val snapshot_rw_lock = new ReentrantReadWriteLock(true)
- var last_gc_ts = 0L
- var last_gc_duration = 0L
- var in_gc = false
- var gc_detected_log_usage = Map[Long, UsageCounter]()
var factory:DBFactory = _
-
+ val log_refs = HashMap[Long, LongCounter]()
def dirty_index_file = directory / ("dirty"+INDEX_SUFFIX)
def temp_index_file = directory / ("temp"+INDEX_SUFFIX)
@@ -259,7 +250,7 @@ class LevelDBClient(store: LevelDBStore)
// lets queue a request to checkpoint when
// the logs rotate.. queue it on the GC thread since GC's lock
// the index for a long time.
- store.gc_executor {
+ store.write_executor {
snapshot_index
}
}
@@ -298,11 +289,11 @@ class LevelDBClient(store: LevelDBStore)
index = new RichDB(factory.open(dirty_index_file, index_options));
try {
+ load_log_refs
index.put(dirty_index_key, TRUE)
// Update the index /w what was stored on the logs..
var pos = last_index_snapshot_pos;
- // Replay the log from the last update position..
try {
while (pos < log.appender_limit) {
log.read(pos).map {
@@ -314,8 +305,43 @@ class LevelDBClient(store: LevelDBStore)
case LOG_ADD_QUEUE_ENTRY =>
val record: QueueEntryRecord = data
index.put(encode(queue_entry_prefix, record.queue_key, record.entry_seq), data)
+
+ // Figure out which log file this message reference is pointing at..
+ val log_key = (if(record.message_locator!=null) {
+ Some(decode_long(record.message_locator))
+ } else {
+ index.get(encode(message_prefix, record.message_key)).map(decode_long(_))
+ }).flatMap(log.log_info(_)).map(_.position)
+
+ // Increment it.
+ log_key.foreach { log_key=>
+ log_refs.getOrElseUpdate(log_key, new LongCounter()).incrementAndGet()
+ }
+
case LOG_REMOVE_QUEUE_ENTRY =>
- index.delete(data)
+
+ index.get(data, new ReadOptions).foreach { value=>
+ val record: QueueEntryRecord = value
+
+ // Figure out which log file this message reference is pointing at..
+ val log_key = (if(record.message_locator!=null) {
+ Some(decode_long(record.message_locator))
+ } else {
+ index.get(encode(message_prefix, record.message_key)).map(decode_long(_))
+ }).flatMap(log.log_info(_)).map(_.position)
+
+ // Decrement it.
+ log_key.foreach { log_key=>
+ log_refs.get(log_key).foreach{ counter=>
+ if( counter.decrementAndGet() == 0 ) {
+ log_refs.remove(log_key)
+ }
+ }
+ }
+
+ index.delete(data)
+ }
+
case LOG_ADD_QUEUE =>
val record: QueueRecord = data
index.put(encode(queue_prefix, record.key), data)
@@ -358,6 +384,20 @@ class LevelDBClient(store: LevelDBStore)
}
}
+ private def store_log_refs = {
+ index.put(log_refs_index_key, JsonCodec.encode(collection.JavaConversions.mapAsJavaMap(log_refs.mapValues(_.get()))).toByteArray)
+ }
+
+ private def load_log_refs = {
+ log_refs.clear()
+ index.get(log_refs_index_key, new ReadOptions).foreach { value=>
+ val javamap = JsonCodec.decode(new Buffer(value), classOf[java.util.Map[String, Object]])
+ collection.JavaConversions.mapAsScalaMap(javamap).foreach { case (k,v)=>
+ log_refs.put(k.toLong, new LongCounter(v.asInstanceOf[Number].longValue()))
+ }
+ }
+ }
+
def stop() = {
// this blocks until all io completes..
// Suspend also deletes the index.
@@ -392,6 +432,7 @@ class LevelDBClient(store: LevelDBStore)
snapshot_rw_lock.writeLock().lock()
// Close the index so that it's files are not changed async on us.
+ store_log_refs
index.put(dirty_index_key, FALSE, new WriteOptions().sync(true))
index.close
}
@@ -550,7 +591,7 @@ class LevelDBClient(store: LevelDBStore)
uow.actions.foreach { case (msg, action) =>
val message_record = action.message_record
- var pos = 0L
+ var pos = -1L
var pos_buffer:Buffer = null
if (message_record != null) {
@@ -566,10 +607,19 @@ class LevelDBClient(store: LevelDBStore)
action.dequeues.foreach { entry =>
if( pos_buffer==null && entry.message_locator!=null ) {
pos_buffer = entry.message_locator
+ pos = decode_long(pos_buffer)
}
val key = encode(queue_entry_prefix, entry.queue_key, entry.entry_seq)
appender.append(LOG_REMOVE_QUEUE_ENTRY, key)
batch.delete(key)
+
+ log.log_info(pos).foreach { log_info=>
+ log_refs.get(log_info.position).foreach{ counter=>
+ if( counter.decrementAndGet() == 0 ) {
+ log_refs.remove(log_info.position)
+ }
+ }
+ }
}
action.enqueues.foreach { entry =>
@@ -577,6 +627,12 @@ class LevelDBClient(store: LevelDBStore)
val encoded:Array[Byte] = entry
appender.append(LOG_ADD_QUEUE_ENTRY, encoded)
batch.put(encode(queue_entry_prefix, entry.queue_key, entry.entry_seq), encoded)
+
+ // Increment it.
+ log.log_info(pos).foreach { log_info=>
+ log_refs.getOrElseUpdate(log_info.position, new LongCounter()).incrementAndGet()
+ }
+
}
}
if( !uow.complete_listeners.isEmpty ) {
@@ -779,32 +835,48 @@ class LevelDBClient(store: LevelDBStore)
}
def gc:Unit = {
- var active_counter = 0
- var delete_counter = 0
- val latency_counter = new TimeCounter
+ last_index_snapshot_pos
+ val empty_journals = log.log_infos.keySet.toSet -- log_refs.keySet
- val ro = new ReadOptions()
- ro.fillCache(false)
- ro.verifyChecksums(verify_checksums)
+ // We don't want to delete any journals that the index has not snapshot'ed or
+ // the the
+ val delete_limit = log.log_info(last_index_snapshot_pos).map(_.position).
+ getOrElse(last_index_snapshot_pos).min(log.appender_start)
- //
- // This journal_usage will let us get a picture of which queues are using how much of each
- // log file. It will help folks figure out why a log file is not getting deleted.
- //
- val journal_usage = new ApolloTreeMap[Long,(RecordLog#LogInfo , UsageCounter)]()
- var append_journal = 0L
+ empty_journals.foreach { id =>
+ if ( id < delete_limit ) {
+ log.delete(id)
+ }
+ }
+ }
+
+ case class UsageCounter(info:RecordLog#LogInfo) {
+ var count = 0L
+ var size = 0L
+ var first_reference_queue:QueueRecord = _
+
+ def increment(value:Int) = {
+ count += 1
+ size += value
+ }
+ }
+ //
+ // Collects detailed usage information about the journal like who's referencing it.
+ //
+ def get_log_usage_details = {
+
+ val usage_map = new ApolloTreeMap[Long,UsageCounter]()
log.log_mutex.synchronized {
- append_journal = log.log_infos.last._1
- log.log_infos.foreach(entry=> journal_usage.put(entry._1, (entry._2, UsageCounter())) )
+ log.log_infos.foreach(entry=> usage_map.put(entry._1, UsageCounter(entry._2)) )
}
- def find_journal(pos: Long) = {
- var entry = journal_usage.floorEntry(pos)
+ def lookup_usage(pos: Long) = {
+ var entry = usage_map.floorEntry(pos)
if (entry != null) {
- val (info, usageCounter) = entry.getValue()
- if (pos < info.limit) {
- Some(entry.getKey -> usageCounter)
+ val usage = entry.getValue()
+ if (pos < usage.info.limit) {
+ Some(usage)
} else {
None
}
@@ -813,88 +885,38 @@ class LevelDBClient(store: LevelDBStore)
}
}
- in_gc = true
- val now = System.currentTimeMillis()
- debug(store.store_kind+" gc starting")
- latency_counter.time {
-
- retry_using_index {
- index.snapshot { snapshot =>
- ro.snapshot(snapshot)
+ val ro = new ReadOptions()
+ ro.fillCache(false)
+ ro.verifyChecksums(verify_checksums)
- // Figure out which journal files are still in use by which queues.
- index.cursor_prefixed(queue_entry_prefix_array, ro) { (_,value) =>
- val entry_record:QueueEntryRecord = value
- val pos = if(entry_record.message_locator!=null) {
- decode_long(entry_record.message_locator.toByteArray)
- } else {
- index.get(encode(message_prefix, entry_record.message_key)).map(decode_long(_)).getOrElse(0L)
- }
+ retry_using_index {
+ index.snapshot { snapshot =>
+ ro.snapshot(snapshot)
- find_journal(pos) match {
- case Some((key,usageCounter)) =>
- usageCounter.increment(entry_record.size)
- case None =>
- }
+ // Figure out which journal files are still in use by which queues.
+ index.cursor_prefixed(queue_entry_prefix_array, ro) { (_,value) =>
- // only continue while the service is still running..
- store.service_state.is_started
+ val entry_record:QueueEntryRecord = value
+ val pos = if(entry_record.message_locator!=null) {
+ Some(decode_long(entry_record.message_locator))
+ } else {
+ index.get(encode(message_prefix, entry_record.message_key)).map(decode_long(_))
}
- if (store.service_state.is_started) {
-
- gc_detected_log_usage = Map((collection.JavaConversions.asScalaSet(journal_usage.entrySet()).map { x=>
- x.getKey -> x.getValue._2
- }).toSeq : _ * )
-
- // Take empty journals out of the map..
- val empty_journals = ListBuffer[Long]()
-
- val i = journal_usage.entrySet().iterator();
- while( i.hasNext ) {
- val (info, usageCounter) = i.next().getValue
- if( usageCounter.count==0 && info.position < append_journal) {
- empty_journals += info.position
- i.remove()
- }
- }
-
- index.cursor_prefixed(message_prefix_array) { (key,value) =>
- val pos = decode_long(value)
-
- if ( !find_journal(pos).isDefined ) {
- // Delete it.
- index.delete(key)
- delete_counter += 1
- } else {
- active_counter += 1
- }
- // only continue while the service is still running..
- store.service_state.is_started
- }
-
- if (store.service_state.is_started) {
- // We don't want to delete any journals that the index has not snapshot'ed or
- // the the
- val delete_limit = find_journal(last_index_snapshot_pos).map(_._1).
- getOrElse(last_index_snapshot_pos).min(log.appender_start)
-
- empty_journals.foreach { id =>
- if ( id < delete_limit ) {
- log.delete(id)
- }
- }
+ pos.flatMap(lookup_usage(_)).foreach { usage =>
+ if( usage.first_reference_queue == null ) {
+ usage.first_reference_queue = index.get(encode(queue_prefix, entry_record.queue_key), ro).map( x=> decode_queue_record(x) ).getOrElse(null)
}
+ usage.increment(entry_record.size)
}
- }
-
+ true
+ }
}
}
- last_gc_ts=now
- last_gc_duration = latency_counter.total(TimeUnit.MILLISECONDS)
- in_gc = false
- debug(store.store_kind+" gc ended")
+
+ import collection.JavaConversions._
+ usage_map.values.toSeq.toArray
}
Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala?rev=1210901&r1=1210900&r2=1210901&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala Tue Dec 6 13:27:22 2011
@@ -48,7 +48,6 @@ class LevelDBStore(val config:LevelDBSto
var next_msg_key = new AtomicLong(1)
var write_executor:ExecutorService = _
- var gc_executor:ExecutorService = _
var read_executor:ExecutorService = _
var client:LevelDBClient = _
@@ -83,13 +82,6 @@ class LevelDBStore(val config:LevelDBSto
rc
}
})
- gc_executor = Executors.newFixedThreadPool(1, new ThreadFactory() {
- def newThread(r: Runnable) = {
- val rc = new Thread(r, store_kind + " store gc")
- rc.setDaemon(true)
- rc
- }
- })
read_executor = Executors.newFixedThreadPool(config.read_threads.getOrElse(10), new ThreadFactory() {
def newThread(r: Runnable) = {
val rc = new Thread(r, store_kind + " store io read")
@@ -128,7 +120,6 @@ class LevelDBStore(val config:LevelDBSto
read_executor.shutdown
read_executor.awaitTermination(60, TimeUnit.SECONDS)
read_executor = null
- gc_executor.shutdown
client.stop
on_completed.run
}
@@ -140,20 +131,15 @@ class LevelDBStore(val config:LevelDBSto
ss.is_starting || ss.is_started
}
- def poll_gc:Unit = {
- val interval = config.gc_interval.getOrElse(60*30)
- if( interval>0 ) {
- dispatch_queue.after(interval, TimeUnit.SECONDS) {
- if( keep_polling ) {
- gc {
- poll_gc
- }
- }
+ def poll_gc:Unit = dispatch_queue.after(10, TimeUnit.SECONDS) {
+ if( keep_polling ) {
+ gc {
+ poll_gc
}
}
}
- def gc(onComplete: =>Unit) = gc_executor {
+ def gc(onComplete: =>Unit) = write_executor {
client.gc
onComplete
}
@@ -277,23 +263,18 @@ class LevelDBStore(val config:LevelDBSto
rc.index_stats = client.index.getProperty("leveldb.stats")
rc.log_append_pos = client.log.appender_limit
rc.index_snapshot_pos = client.last_index_snapshot_pos
- rc.last_gc_duration = client.last_gc_duration
- rc.last_gc_ts = client.last_gc_ts
- rc.in_gc = client.in_gc
rc.log_stats = {
- var row_layout = "%-20s | %-10s | %10s/%-10s\n"
- row_layout.format("File", "Messages", "Used Size", "Total Size")+
- client.log.log_infos.map(x=> x._1 -> client.gc_detected_log_usage.get(x._1)).toSeq.flatMap { x=>
+ var row_layout = "%-20s | %-10s | %-10s\n"
+ row_layout.format("File", "References", "Total Size")+
+ client.log.log_infos.map{case (id,info)=> id -> client.log_refs.get(id).map(_.get)}.toSeq.flatMap { case (id, refs)=>
try {
- val file = LevelDBClient.create_sequence_file(client.directory, x._1, LevelDBClient.LOG_SUFFIX)
+ val file = LevelDBClient.create_sequence_file(client.directory, id, LevelDBClient.LOG_SUFFIX)
val size = file.length()
- val usage = x._2 match {
- case Some(usage)=>
- (usage.count.toString, ViewHelper.memory(usage.size))
- case None=>
- ("unknown", "unknown")
- }
- Some(row_layout.format(file.getName, usage._1, usage._2, ViewHelper.memory(size)))
+ Some(row_layout.format(
+ file.getName,
+ refs.getOrElse(0L).toString,
+ ViewHelper.memory(size)
+ ))
} catch {
case e:Throwable =>
None
Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala?rev=1210901&r1=1210900&r2=1210901&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala Tue Dec 6 13:27:22 2011
@@ -323,10 +323,10 @@ case class RecordLog(directory: File, lo
}
}
+ def log_info(pos:Long) = log_mutex.synchronized(log_infos.range(0L, pos+1).lastOption.map(_._2))
private def get_reader[T](pos:Long)(func: (LogReader)=>T) = {
- val infos = log_mutex.synchronized(log_infos)
- val info = infos.range(0L, pos+1).lastOption.map(_._2)
+ val info = log_info(pos)
info.map { info =>
// Checkout a reader from the cache...
val (set, reader_id, reader) = reader_cache_files.synchronized {
Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/webapp/WEB-INF/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreStatusDTO.jade
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/webapp/WEB-INF/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreStatusDTO.jade?rev=1210901&r1=1210900&r2=1210901&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/webapp/WEB-INF/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreStatusDTO.jade (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/webapp/WEB-INF/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreStatusDTO.jade Tue Dec 6 13:27:22 2011
@@ -46,15 +46,13 @@ h2 Store Latency Stats
- show("UOW flush latency", flush_latency)
h2 Log Status
-p last log GC occured #{uptime(last_gc_ts)}
-p last log GC duration: #{friendly_duration(last_gc_duration)}
pre
!~~ log_stats
p
- Index recovery starts from log position:
+ | Index recovery starts from log position:
code #{"%016x".format(index_snapshot_pos)}
p
- Append position:
+ | Append position:
code #{"%016x".format(log_append_pos)}
h2 Index Status
Modified: activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md?rev=1210901&r1=1210900&r2=1210901&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md (original)
+++ activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md Tue Dec 6 13:27:22 2011
@@ -385,8 +385,6 @@ A `leveldb_store` element may be configu
that a store will delay persisting a messaging unit of work in hopes
that it will be invalidated shortly thereafter by another unit of work
which would negate the operation.
-* `gc_interval` : How often to check to find log files which can be discarded
- in seconds. The value defaults to 1800 (30 minutes).
* `read_threads` : The number of concurrent IO reads to allow. The value
defaults to 10.
* `sync` : If set to `false`, then the store does not sync logging operations to