You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/08/21 23:31:22 UTC
svn commit: r1375803 -
/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStore.scala
Author: chirino
Date: Tue Aug 21 21:31:22 2012
New Revision: 1375803
URL: http://svn.apache.org/viewvc?rev=1375803&view=rev
Log:
Simplify the scheduling of periodic leveldb tasks.
Modified:
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStore.scala
Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStore.scala?rev=1375803&r1=1375802&r2=1375803&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStore.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStore.scala Tue Aug 21 21:31:22 2012
@@ -101,13 +101,19 @@ class LevelDBStore(val config: LevelDBSt
rc
}
})
- poll_stats
+ schedule_reoccurring(1, TimeUnit.SECONDS) {
+ poll_stats
+ }
+ schedule_reoccurring(10, TimeUnit.SECONDS) {
+ write_executor {
+ client.gc
+ }
+ }
write_executor {
try {
client.start()
next_msg_key.set(client.getLastMessageKey + 1)
next_queue_key.set(client.get_last_queue_key + 1)
- poll_gc
on_completed.run
} catch {
case e: Throwable =>
@@ -143,19 +149,6 @@ class LevelDBStore(val config: LevelDBSt
ss.is_starting || ss.is_started
}
- def poll_gc: Unit = dispatch_queue.after(10, TimeUnit.SECONDS) {
- if (keep_polling) {
- gc {
- poll_gc
- }
- }
- }
-
- def gc(onComplete: => Unit) = write_executor {
- client.gc
- onComplete
- }
-
/////////////////////////////////////////////////////////////////////
//
// Implementation of the Store interface
@@ -263,23 +256,12 @@ class LevelDBStore(val config: LevelDBSt
}
def poll_stats: Unit = {
- def displayStats = {
- if (service_state.is_started) {
-
- flush_latency = flush_latency_counter(true)
- message_load_latency = message_load_latency_counter(true)
- // client.metric_journal_append = client.metric_journal_append_counter(true)
- // client.metric_index_update = client.metric_index_update_counter(true)
- close_latency = close_latency_counter(true)
- message_load_batch_size = message_load_batch_size_counter(true)
-
- poll_stats
- }
- }
-
- dispatch_queue.executeAfter(1, TimeUnit.SECONDS, ^ {
- displayStats
- })
+ flush_latency = flush_latency_counter(true)
+ message_load_latency = message_load_latency_counter(true)
+ // client.metric_journal_append = client.metric_journal_append_counter(true)
+ // client.metric_index_update = client.metric_index_update_counter(true)
+ close_latency = close_latency_counter(true)
+ message_load_batch_size = message_load_batch_size_counter(true)
}
def get_store_status(callback: (StoreStatusDTO) => Unit) = dispatch_queue {