You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:13:24 UTC

svn commit: r961169 - /activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala

Author: chirino
Date: Wed Jul  7 04:13:24 2010
New Revision: 961169

URL: http://svn.apache.org/viewvc?rev=961169&view=rev
Log:
Use the flush callbacks to know when updates are secured to disk.  Clean up methods depends on knowing which updates have been stored.

Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala?rev=961169&r1=961168&r2=961169&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala Wed Jul  7 04:13:24 2010
@@ -84,6 +84,9 @@ class HawtDBClient(hawtDBStore: HawtDBSt
   @volatile
   var rootBuffer = (new DatabaseRootRecord.Bean()).freeze
 
+  @volatile
+  var storedRootBuffer = (new DatabaseRootRecord.Bean()).freeze
+
 
   val next_batch_counter = new AtomicInteger(0)
   private var batches = new LinkedHashMap[Int, (Location, ListBuffer[Update])]()
@@ -195,12 +198,13 @@ class HawtDBClient(hawtDBStore: HawtDBSt
             rootBean.setDataFileRefIndexPage(alloc(DATA_FILE_REF_INDEX_FACTORY))
             rootBean.setMessageRefsIndexPage(alloc(MESSAGE_REFS_INDEX_FACTORY))
             rootBean.setSubscriptionIndexPage(alloc(SUBSCRIPTIONS_INDEX_FACTORY))
-
+            storedRootBuffer = rootBean.freeze
             helper.storeRootBean
 
             true
           } else {
             rootBuffer = tx.get(DATABASE_ROOT_RECORD_ACCESSOR, 0)
+            storedRootBuffer = rootBuffer;
             false
           }
       }
@@ -981,11 +985,14 @@ class HawtDBClient(hawtDBStore: HawtDBSt
     }
 
     // Don't GC files that we will need for recovery..
-    val upto = if (rootBuffer.hasFirstBatchLocation) {
-      Some(rootBuffer.getFirstBatchLocation.getDataFileId)
+
+    // Notice we are using the storedRootBuffer and not the rootBuffer field.
+    // rootBuffer has the latest updates, which they may not survive restart.
+    val upto = if (storedRootBuffer.hasFirstBatchLocation) {
+      Some(storedRootBuffer.getFirstBatchLocation.getDataFileId)
     } else {
-      if (rootBuffer.hasLastUpdateLocation) {
-        Some(rootBuffer.getLastUpdateLocation.getDataFileId)
+      if (storedRootBuffer.hasLastUpdateLocation) {
+        Some(storedRootBuffer.getLastUpdateLocation.getDataFileId)
       } else {
         None
       }
@@ -1077,8 +1084,17 @@ class HawtDBClient(hawtDBStore: HawtDBSt
     }
 
     def storeRootBean() = {
-      rootBuffer = rootBean.freeze
+      val frozen = rootBean.freeze
+      rootBuffer = frozen
       _tx.put(DATABASE_ROOT_RECORD_ACCESSOR, 0, rootBuffer)
+
+      // Since the index flushes updates async, hook a callback to know when
+      // the update has hit disk.  storedRootBuffer is used by the
+      // cleanup task to know when which data logs are safe to cleanup.
+      _tx.onFlush(^{
+        storedRootBuffer = frozen
+      })
+
     }
 
   }