You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:13:24 UTC
svn commit: r961169 -
/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
Author: chirino
Date: Wed Jul 7 04:13:24 2010
New Revision: 961169
URL: http://svn.apache.org/viewvc?rev=961169&view=rev
Log:
Use the flush callbacks to know when updates are secured to disk. Clean up methods depends on knowing which updates have been stored.
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala?rev=961169&r1=961168&r2=961169&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala Wed Jul 7 04:13:24 2010
@@ -84,6 +84,9 @@ class HawtDBClient(hawtDBStore: HawtDBSt
@volatile
var rootBuffer = (new DatabaseRootRecord.Bean()).freeze
+ @volatile
+ var storedRootBuffer = (new DatabaseRootRecord.Bean()).freeze
+
val next_batch_counter = new AtomicInteger(0)
private var batches = new LinkedHashMap[Int, (Location, ListBuffer[Update])]()
@@ -195,12 +198,13 @@ class HawtDBClient(hawtDBStore: HawtDBSt
rootBean.setDataFileRefIndexPage(alloc(DATA_FILE_REF_INDEX_FACTORY))
rootBean.setMessageRefsIndexPage(alloc(MESSAGE_REFS_INDEX_FACTORY))
rootBean.setSubscriptionIndexPage(alloc(SUBSCRIPTIONS_INDEX_FACTORY))
-
+ storedRootBuffer = rootBean.freeze
helper.storeRootBean
true
} else {
rootBuffer = tx.get(DATABASE_ROOT_RECORD_ACCESSOR, 0)
+ storedRootBuffer = rootBuffer;
false
}
}
@@ -981,11 +985,14 @@ class HawtDBClient(hawtDBStore: HawtDBSt
}
// Don't GC files that we will need for recovery..
- val upto = if (rootBuffer.hasFirstBatchLocation) {
- Some(rootBuffer.getFirstBatchLocation.getDataFileId)
+
+ // Notice we are using the storedRootBuffer and not the rootBuffer field.
+ // rootBuffer has the latest updates, which they may not survive restart.
+ val upto = if (storedRootBuffer.hasFirstBatchLocation) {
+ Some(storedRootBuffer.getFirstBatchLocation.getDataFileId)
} else {
- if (rootBuffer.hasLastUpdateLocation) {
- Some(rootBuffer.getLastUpdateLocation.getDataFileId)
+ if (storedRootBuffer.hasLastUpdateLocation) {
+ Some(storedRootBuffer.getLastUpdateLocation.getDataFileId)
} else {
None
}
@@ -1077,8 +1084,17 @@ class HawtDBClient(hawtDBStore: HawtDBSt
}
def storeRootBean() = {
- rootBuffer = rootBean.freeze
+ val frozen = rootBean.freeze
+ rootBuffer = frozen
_tx.put(DATABASE_ROOT_RECORD_ACCESSOR, 0, rootBuffer)
+
+ // Since the index flushes updates async, hook a callback to know when
+ // the update has hit disk. storedRootBuffer is used by the
+ // cleanup task to know when which data logs are safe to cleanup.
+ _tx.onFlush(^{
+ storedRootBuffer = frozen
+ })
+
}
}