You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/08/21 23:31:22 UTC

svn commit: r1375803 - /activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStore.scala

Author: chirino
Date: Tue Aug 21 21:31:22 2012
New Revision: 1375803

URL: http://svn.apache.org/viewvc?rev=1375803&view=rev
Log:
Simplify the scheduling of periodic leveldb tasks.

Modified:
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStore.scala

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStore.scala?rev=1375803&r1=1375802&r2=1375803&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStore.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStore.scala Tue Aug 21 21:31:22 2012
@@ -101,13 +101,19 @@ class LevelDBStore(val config: LevelDBSt
           rc
         }
       })
-      poll_stats
+      schedule_reoccurring(1, TimeUnit.SECONDS) {
+        poll_stats
+      }
+      schedule_reoccurring(10, TimeUnit.SECONDS) {
+        write_executor {
+          client.gc
+        }
+      }
       write_executor {
         try {
           client.start()
           next_msg_key.set(client.getLastMessageKey + 1)
           next_queue_key.set(client.get_last_queue_key + 1)
-          poll_gc
           on_completed.run
         } catch {
           case e: Throwable =>
@@ -143,19 +149,6 @@ class LevelDBStore(val config: LevelDBSt
     ss.is_starting || ss.is_started
   }
 
-  def poll_gc: Unit = dispatch_queue.after(10, TimeUnit.SECONDS) {
-    if (keep_polling) {
-      gc {
-        poll_gc
-      }
-    }
-  }
-
-  def gc(onComplete: => Unit) = write_executor {
-    client.gc
-    onComplete
-  }
-
   /////////////////////////////////////////////////////////////////////
   //
   // Implementation of the Store interface
@@ -263,23 +256,12 @@ class LevelDBStore(val config: LevelDBSt
   }
 
   def poll_stats: Unit = {
-    def displayStats = {
-      if (service_state.is_started) {
-
-        flush_latency = flush_latency_counter(true)
-        message_load_latency = message_load_latency_counter(true)
-        //        client.metric_journal_append = client.metric_journal_append_counter(true)
-        //        client.metric_index_update = client.metric_index_update_counter(true)
-        close_latency = close_latency_counter(true)
-        message_load_batch_size = message_load_batch_size_counter(true)
-
-        poll_stats
-      }
-    }
-
-    dispatch_queue.executeAfter(1, TimeUnit.SECONDS, ^ {
-      displayStats
-    })
+    flush_latency = flush_latency_counter(true)
+    message_load_latency = message_load_latency_counter(true)
+    //        client.metric_journal_append = client.metric_journal_append_counter(true)
+    //        client.metric_index_update = client.metric_index_update_counter(true)
+    close_latency = close_latency_counter(true)
+    message_load_batch_size = message_load_batch_size_counter(true)
   }
 
   def get_store_status(callback: (StoreStatusDTO) => Unit) = dispatch_queue {