You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:07:27 UTC
svn commit: r961130 - in /activemq/sandbox/activemq-apollo-actor:
activemq-hawtdb/ activemq-hawtdb/src/main/proto/
activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/
activemq-store/src/test/scala/org/apache/activemq/apollo/store/
Author: chirino
Date: Wed Jul 7 04:07:26 2010
New Revision: 961130
URL: http://svn.apache.org/viewvc?rev=961130&view=rev
Log:
yay hawtdb store test is passing now
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/pom.xml
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/data.proto
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Helpers.scala
activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala
Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/pom.xml?rev=961130&r1=961129&r2=961130&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/pom.xml Wed Jul 7 04:07:26 2010
@@ -87,6 +87,14 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-util</artifactId>
+ <version>6.0-SNAPSHOT</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/data.proto
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/data.proto?rev=961130&r1=961129&r2=961130&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/data.proto (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/data.proto Wed Jul 7 04:07:26 2010
@@ -85,7 +85,7 @@ message AddQueueEntry {
required int64 queueKey=1;
required int64 queueSeq=2;
required int64 messageKey=3;
- required int32 size=4;
+ optional int32 size=4;
optional bytes attachment=5;
optional int32 redeliveries = 6;
}
@@ -152,29 +152,29 @@ message RemoveStream {
///////////////////////////////////////////////////////////////
-// Index Structures
+// Records Stored used in the Indexes
///////////////////////////////////////////////////////////////
message DatabaseRootRecord {
- required fixed32 state=1;
- required fixed64 lastMessageKey=2;
- required fixed64 firstInProgressBatch=3;
- required fixed64 lastUpdateLocation=4;
-
- required fixed32 dataFileRefIndexPage=5;
- required fixed32 messageKeyIndexPage=6;
- required fixed32 messageRefsIndexPage=7;
- required fixed32 queueIndexPage=8;
- required fixed32 subscriptionIndexPage=10;
- required fixed32 mapIndexPage=11;
+ optional fixed64 firstBatchLocation=1;
+ optional fixed64 lastUpdateLocation=2;
+ optional fixed64 lastMessageKey=3;
+ optional fixed64 lastQueueKey=4;
+
+ optional fixed32 dataFileRefIndexPage=50;
+ optional fixed32 messageKeyIndexPage=51;
+ optional fixed32 messageRefsIndexPage=52;
+ optional fixed32 queueIndexPage=53;
+ optional fixed32 subscriptionIndexPage=54;
+ optional fixed32 mapIndexPage=55;
}
message QueueRootRecord {
- required AddQueue info=1;
- required int64 size=2;
- required int64 count=3;
- required fixed32 entryIndexPage=4;
- required fixed32 trackingIndexPage=5;
+ optional AddQueue info=1;
+ optional int64 size=2;
+ optional int64 count=3;
+ optional fixed32 entryIndexPage=4;
+ optional fixed32 trackingIndexPage=5;
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala?rev=961130&r1=961129&r2=961130&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala Wed Jul 7 04:07:26 2010
@@ -38,12 +38,12 @@ import org.apache.activemq.broker.store.
import org.apache.activemq.broker.store.hawtdb.model._
import org.fusesource.hawtbuf._
import org.fusesource.hawtdispatch.ScalaDispatch._
-import org.apache.activemq.apollo.broker.{Log, Logging, BaseService}
import collection.mutable.{LinkedHashMap, HashMap, ListBuffer}
import collection.JavaConversions
import java.util.{TreeSet, HashSet}
import org.fusesource.hawtdb.api._
+import org.apache.activemq.apollo.broker.{DispatchLogging, Log, Logging, BaseService}
object HawtDBClient extends Log {
val BEGIN = -1
@@ -60,7 +60,7 @@ object HawtDBClient extends Log {
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class HawtDBClient(hawtDBStore: HawtDBStore) extends Logging {
+class HawtDBClient(hawtDBStore: HawtDBStore) extends DispatchLogging {
import HawtDBClient._
import Helpers._
@@ -144,7 +144,9 @@ class HawtDBClient(hawtDBStore: HawtDBSt
journal
}
- def start() = {
+ val schedual_version = new AtomicInteger()
+
+ def start(onComplete:Runnable) = {
lock {
journal = createJournal()
@@ -178,43 +180,18 @@ class HawtDBClient(hawtDBStore: HawtDBSt
}
pageFile.flush()
- recover
-
- // trackingGen.set(rootEntity.getLastMessageTracking() + 1)
-
- // checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
- // public void run() {
- // try {
- // long lastCleanup = System.currentTimeMillis()
- // long lastCheckpoint = System.currentTimeMillis()
- //
- // // Sleep for a short time so we can periodically check
- // // to see if we need to exit this thread.
- // long sleepTime = Math.min(checkpointInterval, 500)
- // while (opened.get()) {
- // Thread.sleep(sleepTime)
- // long now = System.currentTimeMillis()
- // if (now - lastCleanup >= cleanupInterval) {
- // checkpointCleanup(true)
- // lastCleanup = now
- // lastCheckpoint = now
- // } else if (now - lastCheckpoint >= checkpointInterval) {
- // checkpointCleanup(false)
- // lastCheckpoint = now
- // }
- // }
- // } catch (InterruptedException e) {
- // // Looks like someone really wants us to exit this
- // // thread...
- // }
- // }
- // }
- // checkpointThread.start()
+ recover(onComplete)
+ // Schedual periodic jobs.. they keep executing while schedual_version remains the same.
+ schedualCleanup(schedual_version.get())
+ schedualFlush(schedual_version.get())
}
}
def stop() = {
+ // this cancels schedualed jobs...
+ schedual_version.incrementAndGet
+ flush
}
def addQueue(record: QueueRecord) = {
@@ -238,8 +215,8 @@ class HawtDBClient(hawtDBStore: HawtDBSt
tx =>
tx.actions.foreach {
case (msg, action) =>
- if (action.store != null) {
- val update: AddMessage.Bean = action.store
+ if (action.messageRecord != null) {
+ val update: AddMessage.Bean = action.messageRecord
batch ::= update
}
action.enqueues.foreach {
@@ -352,8 +329,8 @@ class HawtDBClient(hawtDBStore: HawtDBSt
private def load[T <: TypeCreatable](location: Location, expected: Class[T]): Option[T] = {
try {
- read(location) match {
- case (updateType, data) =>
+ load(location) match {
+ case (updateType, batch, data) =>
Some(expected.cast(decode(location, updateType, data)))
}
} catch {
@@ -402,7 +379,8 @@ class HawtDBClient(hawtDBStore: HawtDBSt
baos.writeInt(batch)
frozen.writeUnframed(baos)
- append(baos.toBuffer()) {
+ val buffer = baos.toBuffer()
+ append(buffer) {
location =>
executeStore(batch, update, onComplete, location)
}
@@ -442,6 +420,14 @@ class HawtDBClient(hawtDBStore: HawtDBSt
}
}
+ def load(location: Location) = {
+ var data = read(location)
+ val editor = data.bigEndianEditor
+ val updateType = editor.readByte()
+ val batch = editor.readInt
+ (updateType, batch, data)
+ }
+
/////////////////////////////////////////////////////////////////////
//
// Methods related to recovery
@@ -449,13 +435,14 @@ class HawtDBClient(hawtDBStore: HawtDBSt
/////////////////////////////////////////////////////////////////////
/**
- * Move all the messages that were in the journal into the indexes.
+ * Recovers the journal and rollsback any in progress batches that
+ * were in progress and never committed.
*
* @throws IOException
* @throws IOException
* @throws IllegalStateException
*/
- def recover: Unit = {
+ def recover(onComplete:Runnable): Unit = {
recoveryCounter = 0
lastRecoveryPosition = null
val start = System.currentTimeMillis()
@@ -467,10 +454,11 @@ class HawtDBClient(hawtDBStore: HawtDBSt
batch =>
rollback(batch, null)
}
- })
- val end = System.currentTimeMillis()
- info("Processed %d operations from the journal in %,.3f seconds.", recoveryCounter, ((end - start) / 1000.0f))
+ val end = System.currentTimeMillis()
+ info("Processed %d operations from the journal in %,.3f seconds.", recoveryCounter, ((end - start) / 1000.0f))
+ onComplete.run
+ })
}
@@ -482,14 +470,15 @@ class HawtDBClient(hawtDBStore: HawtDBSt
// Is this our first incremental recovery pass?
if (lastRecoveryPosition == null) {
- if (databaseRootRecord.hasFirstInProgressBatch) {
+ if (databaseRootRecord.hasFirstBatchLocation) {
// we have to start at the first in progress batch usually...
- nextRecoveryPosition = databaseRootRecord.getFirstInProgressBatch
+ nextRecoveryPosition = databaseRootRecord.getFirstBatchLocation
} else {
// but perhaps there were no batches in progress..
if (databaseRootRecord.hasLastUpdateLocation) {
// then we can just continue from the last update applied to the index
- nextRecoveryPosition = journal.getNextLocation(databaseRootRecord.getLastUpdateLocation)
+ lastRecoveryPosition = databaseRootRecord.getLastUpdateLocation
+ nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition)
} else {
// no updates in the index?.. start from the first record in the journal.
nextRecoveryPosition = journal.getNextLocation(null)
@@ -533,7 +522,6 @@ class HawtDBClient(hawtDBStore: HawtDBSt
}
recoveryCounter += 1
- databaseRootRecord.setLastUpdateLocation(location)
}
@@ -554,17 +542,12 @@ class HawtDBClient(hawtDBStore: HawtDBSt
} finally {
val end = System.currentTimeMillis()
if (end - start > 1000) {
- warn("KahaDB long enqueue time: Journal add took: " + (end - start) + " ms")
+ warn("Journal append latencey: %,.3f seconds", ((end - start) / 1000.0f))
}
}
}
- def read(location: Location) = {
- var data = journal.read(location)
- val editor = data.bigEndianEditor
- val updateType = editor.readByte()
- (updateType, data)
- }
+ def read(location: Location) = journal.read(location)
/////////////////////////////////////////////////////////////////////
//
@@ -597,7 +580,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
case None =>
// when recovering.. we are more lax due recovery starting
// in the middle of a stream of in progress batches
- assert(!recovering)
+ assert(recovering)
}
if (onComplete != null) {
onComplete.run
@@ -616,7 +599,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
case None =>
// when recovering.. we are more lax due recovery starting
// in the middle of a stream of in progress batches
- assert(!recovering)
+ assert(recovering)
}
if (onComplete != null) {
onComplete.run
@@ -652,7 +635,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
case None =>
// when recovering.. we are more lax due recovery starting
// in the middle of a stream of in progress batches
- assert(!recovering)
+ assert(recovering)
}
}
}
@@ -665,20 +648,20 @@ class HawtDBClient(hawtDBStore: HawtDBSt
def apply(x: AddMessage.Getter): Unit = {
- val key = x.getMessageKey()
- if (key > databaseRootRecord.getLastMessageKey) {
- databaseRootRecord.setLastMessageKey(key)
+ val messageKey = x.getMessageKey()
+ if (messageKey > databaseRootRecord.getLastMessageKey) {
+ databaseRootRecord.setLastMessageKey(messageKey)
}
- val prevLocation = messageKeyIndex.put(key, location)
+ val prevLocation = messageKeyIndex.put(messageKey, location)
if (prevLocation != null) {
// Message existed.. undo the index update we just did. Chances
// are it's a transaction replay.
- messageKeyIndex.put(key, prevLocation)
+ messageKeyIndex.put(messageKey, prevLocation)
if (location == prevLocation) {
- warn("Message replay detected for: %d", key)
+ warn("Message replay detected for: %d", messageKey)
} else {
- error("Message replay with different location for: %d", key)
+ error("Message replay with different location for: %d", messageKey)
}
} else {
val fileId:jl.Integer = location.getDataFileId()
@@ -697,12 +680,18 @@ class HawtDBClient(hawtDBStore: HawtDBSt
}
def apply(x: AddQueue.Getter): Unit = {
- if (queueIndex.get(x.getKey) == null) {
+ val queueKey = x.getKey
+ if (queueIndex.get(queueKey) == null) {
+
+ if (queueKey > databaseRootRecord.getLastQueueKey) {
+ databaseRootRecord.setLastQueueKey(queueKey)
+ }
+
val queueRecord = new QueueRootRecord.Bean
queueRecord.setEntryIndexPage(alloc(QUEUE_ENTRY_INDEX_FACTORY))
queueRecord.setTrackingIndexPage(alloc(QUEUE_TRACKING_INDEX_FACTORY))
queueRecord.setInfo(x)
- queueIndex.put(x.getKey, queueRecord.freeze)
+ queueIndex.put(queueKey, queueRecord.freeze)
}
}
@@ -814,15 +803,22 @@ class HawtDBClient(hawtDBStore: HawtDBSt
}
update match {
- case x: AddMessage.Getter => Process(x)
- case x: AddQueueEntry.Getter => Process(x)
- case x: RemoveQueueEntry.Getter => Process(x)
-
- case x: AddQueue.Getter => Process(x)
- case x: RemoveQueue.Getter => Process(x)
-
- case x: AddTrace.Getter => Process(x)
- case x: Purge.Getter => Process(x)
+ case x: AddMessage.Getter =>
+ Process(x)
+ case x: AddQueueEntry.Getter =>
+ Process(x)
+ case x: RemoveQueueEntry.Getter =>
+ Process(x)
+
+ case x: AddQueue.Getter =>
+ Process(x)
+ case x: RemoveQueue.Getter =>
+ Process(x)
+
+ case x: AddTrace.Getter =>
+ Process(x)
+ case x: Purge.Getter =>
+ Process(x)
case x: AddSubscription.Getter =>
case x: RemoveSubscription.Getter =>
@@ -846,12 +842,12 @@ class HawtDBClient(hawtDBStore: HawtDBSt
//
/////////////////////////////////////////////////////////////////////
- def schedualFlush(): Unit = {
+ def schedualFlush(version:Int): Unit = {
def try_flush() = {
- if (hawtDBStore.serviceState.isStarted) {
+ if (version == schedual_version.get) {
hawtDBStore.executor_pool {
flush
- schedualFlush
+ schedualFlush(version)
}
}
}
@@ -863,18 +859,18 @@ class HawtDBClient(hawtDBStore: HawtDBSt
pageFile.flush
val end = System.currentTimeMillis()
if (end - start > 1000) {
- warn("Index flush took %,.3f seconds" + ((end - start) / 1000.0f))
+ warn("Index flush latency: %,.3f seconds", ((end - start) / 1000.0f))
}
}
- def schedualCleanup(): Unit = {
+ def schedualCleanup(version:Int): Unit = {
def try_cleanup() = {
- if (hawtDBStore.serviceState.isStarted) {
+ if (version == schedual_version.get) {
hawtDBStore.executor_pool {
withTx {tx =>
cleanup(tx)
}
- schedualCleanup
+ schedualCleanup(version)
}
}
}
@@ -899,8 +895,8 @@ class HawtDBClient(hawtDBStore: HawtDBSt
}
// Don't GC files that we will need for recovery..
- val upto = if (databaseRootRecord.hasFirstInProgressBatch) {
- Some(databaseRootRecord.getFirstInProgressBatch.getDataFileId)
+ val upto = if (databaseRootRecord.hasFirstBatchLocation) {
+ Some(databaseRootRecord.getFirstBatchLocation.getDataFileId)
} else {
if (databaseRootRecord.hasLastUpdateLocation) {
Some(databaseRootRecord.getLastUpdateLocation.getDataFileId)
@@ -945,7 +941,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
private case class Update(update: TypeCreatable, location: Location)
private class TxHelper(private val _tx: Transaction) {
- lazy val queueIndex = QUEUE_INDEX_FACTORY.open(_tx, databaseRootRecord.getDataFileRefIndexPage)
+ lazy val queueIndex = QUEUE_INDEX_FACTORY.open(_tx, databaseRootRecord.getQueueIndexPage)
lazy val dataFileRefIndex = DATA_FILE_REF_INDEX_FACTORY.open(_tx, databaseRootRecord.getDataFileRefIndexPage)
lazy val messageKeyIndex = MESSAGE_KEY_INDEX_FACTORY.open(_tx, databaseRootRecord.getMessageKeyIndexPage)
lazy val messageRefsIndex = MESSAGE_REFS_INDEX_FACTORY.open(_tx, databaseRootRecord.getMessageRefsIndexPage)
@@ -1014,12 +1010,12 @@ class HawtDBClient(hawtDBStore: HawtDBSt
lastUpdate.compareTo(location) < 0
}
- private def updateLocations(tx: Transaction, location: Location): Unit = {
- databaseRootRecord.setLastUpdateLocation(location)
+ private def updateLocations(tx: Transaction, lastUpdate: Location): Unit = {
+ databaseRootRecord.setLastUpdateLocation(lastUpdate)
if (batches.isEmpty) {
- databaseRootRecord.clearFirstInProgressBatch
+ databaseRootRecord.clearFirstBatchLocation
} else {
- databaseRootRecord.setFirstInProgressBatch(batches.head._2._1)
+ databaseRootRecord.setFirstBatchLocation(batches.head._2._1)
}
tx.put(DATABASE_ROOT_RECORD_ACCESSOR, 0, databaseRootRecord.freeze)
databaseRootRecord = databaseRootRecord.copy
Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala?rev=961130&r1=961129&r2=961130&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala Wed Jul 7 04:07:26 2010
@@ -24,13 +24,12 @@ import java.util.HashMap
import java.util.concurrent.{TimeUnit, Executors, ExecutorService}
import org.apache.activemq.apollo.util.IntCounter
import org.apache.activemq.apollo.store.{QueueEntryRecord, MessageRecord, QueueStatus, QueueRecord}
-import org.apache.activemq.apollo.broker.{Logging, Log, BaseService}
import org.apache.activemq.apollo.dto.{HawtDBStoreDTO, StoreDTO}
import collection.{JavaConversions, Seq}
-import org.apache.activemq.apollo.broker.{Reporting, ReporterLevel, Reporter}
import org.fusesource.hawtdispatch.ScalaDispatch._
-import ReporterLevel._
+import org.apache.activemq.apollo.broker._
import java.io.File
+import ReporterLevel._
object HawtDBStore extends Log {
val DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
@@ -59,7 +58,7 @@ object HawtDBStore extends Log {
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class HawtDBStore extends Store with BaseService with Logging {
+class HawtDBStore extends Store with BaseService with DispatchLogging {
import HawtDBStore._
override protected def log = HawtDBStore
@@ -95,8 +94,11 @@ class HawtDBStore extends Store with Bas
executor_pool = Executors.newFixedThreadPool(20)
client.config = config
executor_pool {
- client.start
- onCompleted.run
+ client.start(^{
+ next_msg_key.set( client.databaseRootRecord.getLastMessageKey.longValue +1 )
+ next_queue_key.set( client.databaseRootRecord.getLastQueueKey.longValue +1 )
+ onCompleted.run
+ })
}
}
@@ -198,12 +200,12 @@ class HawtDBStore extends Store with Bas
class MessageAction {
var msg= 0L
- var store: MessageRecord = null
+ var messageRecord: MessageRecord = null
var enqueues = ListBuffer[QueueEntryRecord]()
var dequeues = ListBuffer[QueueEntryRecord]()
def tx = HawtDBBatch.this
- def isEmpty() = store==null && enqueues==Nil && dequeues==Nil
+ def isEmpty() = messageRecord==null && enqueues==Nil && dequeues==Nil
def cancel() = {
tx.rm(msg)
if( tx.isEmpty ) {
@@ -229,7 +231,7 @@ class HawtDBStore extends Store with Bas
record.key = next_msg_key.incrementAndGet
val action = new MessageAction
action.msg = record.key
- action.store = record
+ action.messageRecord = record
this.synchronized {
actions += record.key -> action
}
@@ -294,7 +296,7 @@ class HawtDBStore extends Store with Bas
}
tx.actions.foreach { case (msg, action) =>
- if( action.store!=null ) {
+ if( action.messageRecord!=null ) {
pendingStores.put(msg, action)
}
action.enqueues.foreach { queueEntry=>
@@ -312,9 +314,9 @@ class HawtDBStore extends Store with Bas
prevAction.enqueues = prevAction.enqueues.filterNot( x=> key(x) == currentKey )
// if the message is not in any queues.. we can gc it..
- if( prevAction.enqueues == Nil && prevAction.store !=null ) {
+ if( prevAction.enqueues == Nil && prevAction.messageRecord !=null ) {
pendingStores.remove(msg)
- prevAction.store = null
+ prevAction.messageRecord = null
}
// Cancel the action if it's now empty
@@ -355,7 +357,7 @@ class HawtDBStore extends Store with Bas
if (tx!=null) {
tx.actions.foreach { case (msg, action) =>
- if( action.store!=null ) {
+ if( action.messageRecord !=null ) {
pendingStores.remove(msg)
}
action.enqueues.foreach { queueEntry=>
Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Helpers.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Helpers.scala?rev=961130&r1=961129&r2=961130&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Helpers.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Helpers.scala Wed Jul 7 04:07:26 2010
@@ -105,6 +105,7 @@ object Helpers {
implicit def toMessageRecord(pb: AddMessage.Getter): MessageRecord = {
val rc = new MessageRecord
+ rc.key = pb.getMessageKey
rc.protocol = pb.getProtocol
rc.size = pb.getSize
rc.value = pb.getValue
@@ -115,6 +116,7 @@ object Helpers {
implicit def fromMessageRecord(v: MessageRecord): AddMessage.Bean = {
val pb = new AddMessage.Bean
+ pb.setMessageKey(v.key)
pb.setProtocol(v.protocol)
pb.setSize(v.size)
pb.setValue(v.value)
Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala?rev=961130&r1=961129&r2=961130&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala Wed Jul 7 04:07:26 2010
@@ -128,6 +128,16 @@ abstract class StoreFunSuiteSupport exte
msgKeys
}
+ test("load stored message") {
+ val A = addQueue("A")
+ val msgKeys = populate(A, "message 1"::"message 2"::"message 3"::Nil)
+
+ val rc:Option[MessageRecord] = CB( cb=> store.loadMessage(msgKeys.head)(cb) )
+ expect(ascii("message 1").buffer) {
+ rc.get.value
+ }
+ }
+
test("add and list queues") {
val A = addQueue("A")
val B = addQueue("B")
@@ -161,16 +171,6 @@ abstract class StoreFunSuiteSupport exte
}
}
- test("load stored message") {
- val A = addQueue("A")
- val msgKeys = populate(A, "message 1"::"message 2"::"message 3"::Nil)
-
- val rc:Option[MessageRecord] = CB( cb=> store.loadMessage(msgKeys.head)(cb) )
- expect(ascii("message 1").buffer) {
- rc.get.value
- }
- }
-
test("batch completes after a delay") {x}
def x = {
val A = addQueue("A")
@@ -189,7 +189,7 @@ abstract class StoreFunSuiteSupport exte
}
}
- test("flush cancels the completion delay") {
+ test("flush cancels the delay") {
val A = addQueue("A")
var batch = store.createStoreBatch