You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:07:17 UTC
svn commit: r961129 - in /activemq/sandbox/activemq-apollo-actor:
activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/
activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/
activemq-cassandra/src/test/scala/org/apac...
Author: chirino
Date: Wed Jul 7 04:07:16 2010
New Revision: 961129
URL: http://svn.apache.org/viewvc?rev=961129&view=rev
Log:
better store tests
Added:
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStoreTest.scala
- copied, changed from r961128, activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/StoreDTO.java
activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/
activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/
activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreTest.scala
activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/StoreDTO.java
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=961129&r1=961128&r2=961129&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jul 7 04:07:16 2010
@@ -137,7 +137,7 @@ class VirtualHost(val broker: Broker) ex
store.getQueueStatus(queueKey) { x =>
x match {
case Some(info)=>
- store.getQueueEntries(queueKey) { entries=>
+ store.listQueueEntries(queueKey) { entries=>
dispatchQueue ^{
val dest = DestinationParser.parse(info.record.name, destination_parser_options)
val queue = new Queue(this, dest)
Modified: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala?rev=961129&r1=961128&r2=961129&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala Wed Jul 7 04:07:16 2010
@@ -115,6 +115,15 @@ class CassandraClient() {
}
}
+ def removeQueue(queueKey: Long):Boolean = {
+ withSession {
+ session =>
+ session.remove(schema.entries \ queueKey)
+ session.remove(schema.queue_name \ queueKey)
+ }
+ true
+ }
+
def listQueues: Seq[Long] = {
withSession {
session =>
@@ -137,7 +146,7 @@ class CassandraClient() {
rc.record.key = id
rc.record.name = new AsciiBuffer(x.value)
-// rc.count = session.count( schema.entries \ id )
+ rc.count = session.count( schema.entries \ id )
// TODO
// rc.count =
Modified: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala?rev=961129&r1=961128&r2=961129&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala Wed Jul 7 04:07:16 2010
@@ -34,7 +34,6 @@ import org.fusesource.hawtdispatch.Scala
import ReporterLevel._
object CassandraStore extends Log {
- val DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
/**
* Creates a default a configuration object.
@@ -132,55 +131,61 @@ class CassandraStore extends Store with
/**
* Deletes all stored data from the store.
*/
- def purge(cb: =>Unit) = {
+ def purge(callback: =>Unit) = {
executor_pool ^{
client.purge
- cb
+ callback
}
}
- def addQueue(record: QueueRecord)(cb: (Option[Long]) => Unit) = {
+ def addQueue(record: QueueRecord)(callback: (Option[Long]) => Unit) = {
val key = next_queue_key.incrementAndGet
record.key = key
executor_pool ^{
client.addQueue(record)
- cb(Some(key))
+ callback(Some(key))
}
}
- def getQueueStatus(id: Long)(cb: (Option[QueueStatus]) => Unit) = {
+ def removeQueue(queueKey: Long)(callback: (Boolean) => Unit) = {
executor_pool ^{
- cb( client.getQueueStatus(id) )
+ callback(client.removeQueue(queueKey))
}
}
- def listQueues(cb: (Seq[Long]) => Unit) = {
+ def getQueueStatus(id: Long)(callback: (Option[QueueStatus]) => Unit) = {
executor_pool ^{
- cb( client.listQueues )
+ callback( client.getQueueStatus(id) )
}
}
- def loadMessage(id: Long)(cb: (Option[MessageRecord]) => Unit) = {
+ def listQueues(callback: (Seq[Long]) => Unit) = {
executor_pool ^{
- cb( client.loadMessage(id) )
+ callback( client.listQueues )
+ }
+ }
+
+ def loadMessage(id: Long)(callback: (Option[MessageRecord]) => Unit) = {
+ executor_pool ^{
+ callback( client.loadMessage(id) )
}
}
- def getQueueEntries(id: Long)(cb: (Seq[QueueEntryRecord]) => Unit) = {
+ def listQueueEntries(id: Long)(callback: (Seq[QueueEntryRecord]) => Unit) = {
executor_pool ^{
- cb( client.getQueueEntries(id) )
+ callback( client.getQueueEntries(id) )
}
}
- def flushMessage(id: Long)(cb: => Unit) = ^{
+ def flushMessage(id: Long)(callback: => Unit) = ^{
val action: CassandraBatch#MessageAction = pendingStores.get(id)
if( action == null ) {
- cb
+ callback
} else {
val prevDisposer = action.tx.getDisposer
action.tx.setDisposer(^{
- cb
+ callback
if(prevDisposer!=null) {
prevDisposer.run
}
@@ -291,8 +296,12 @@ class CassandraStore extends Store with
val tx_id = next_tx_id.incrementAndGet
tx.txid = tx_id
delayedTransactions.put(tx_id, tx)
- dispatchQueue.dispatchAsync(^{flush(tx_id)})
- dispatchQueue.dispatchAfter(50, TimeUnit.MILLISECONDS, ^{flush(tx_id)})
+
+ if( config.flushDelay > 0 ) {
+ dispatchQueue.dispatchAfter(config.flushDelay, TimeUnit.MILLISECONDS, ^{flush(tx_id)})
+ } else {
+ dispatchQueue.dispatchAsync(^{flush(tx_id)})
+ }
tx.actions.foreach { case (msg, action) =>
if( action.store!=null ) {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreTest.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreTest.scala?rev=961129&r1=961128&r2=961129&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreTest.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreTest.scala Wed Jul 7 04:07:16 2010
@@ -16,145 +16,17 @@
*/
package org.apache.activemq.broker.store.cassandra
-import org.fusesource.hawtbuf.AsciiBuffer._
-import org.scalatest.BeforeAndAfterAll
-import org.fusesource.hawtdispatch.ScalaDispatch._
-import org.fusesource.hawtdispatch.TaskTracker
-import org.apache.activemq.apollo.broker.{LoggingTracker, FunSuiteSupport}
-import java.util.concurrent.{TimeUnit, CountDownLatch}
-import org.apache.activemq.apollo.store.{QueueEntryRecord, QueueStatus, QueueRecord, MessageRecord}
+import org.apache.activemq.broker.store.{Store, StoreFunSuiteSupport}
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class CassandraStoreTest extends FunSuiteSupport with CassandraServerMixin {
+class CassandraStoreTest extends StoreFunSuiteSupport with CassandraServerMixin {
- def CB[T](func: (T=>Unit)=>Unit ) = {
- class X {
- var value:T = _
- }
- val rc = new X
- val cd = new CountDownLatch(1)
- def cb(x:T) = {
- rc.value = x
- cd.countDown
- }
- func(cb)
- cd.await
- rc.value
+ def createStore(flushDelay:Long):Store = {
+ val rc = new CassandraStore
+ rc.config.flushDelay = flushDelay
+ rc
}
- var store:CassandraStore=null
-
- override protected def beforeAll() = {
- store = new CassandraStore()
- val tracker = new LoggingTracker("store startup")
- tracker.start(store)
- tracker.await
- }
-
- override protected def afterAll() = {
- val tracker = new LoggingTracker("store stop")
- tracker.stop(store)
- tracker.await
- }
-
-
- def expectCB[T](expected:T)(func: (T=>Unit)=>Unit ) = {
- expect(expected) {
- CB(func)
- }
- }
-
-
- test("add message") {
- addMessage
- }
-
- def addMessage() {
- var queueA = new QueueRecord
- queueA.key =1
- queueA.name = ascii("queue:1")
-
- val rc:Option[Long] = CB( cb=> store.addQueue(queueA)(cb) )
- queueA.key = rc.get
-
- val expected:Seq[Long] = List(queueA.key)
- expectCB(expected) { cb=>
- store.listQueues(cb)
- }
-
- var tx = store.createStoreBatch
- var message = new MessageRecord
- message.key = 35
- message.protocol = ascii("test-protocol")
- message.value = ascii("test content").buffer
- message.size = message.value.length
- tx.store(message)
-
-
- val disposed = new CountDownLatch(1)
-
- var queueEntry = new QueueEntryRecord
- queueEntry.queueKey = queueA.key
- queueEntry.messageKey = message.key
- queueEntry.queueSeq = 1
-
- tx.enqueue(queueEntry)
- tx.setDisposer(^{ disposed.countDown })
- tx.dispose
-
- // It should not finish disposing right away...
- expect(false) {
- disposed.await(5, TimeUnit.SECONDS)
- }
-
- var flushed = new CountDownLatch(1)
- store.flushMessage(message.key) {
- flushed.countDown
- }
-
- // Should flush quickly now..
- expect(true) {
- flushed.await(1, TimeUnit.SECONDS)
- }
- // Flushing triggers the tx to finish disposing.
- expect(true) {
- disposed.await(1, TimeUnit.SECONDS)
- }
-
- // add another message to the queue..
- tx = store.createStoreBatch
- message = new MessageRecord
- message.key = 36
- message.protocol = ascii("test-protocol")
- message.value = ascii("test content").buffer
- message.size = message.value.length
- tx.store(message)
-
- queueEntry = new QueueEntryRecord
- queueEntry.queueKey = queueA.key
- queueEntry.messageKey = message.key
- queueEntry.queueSeq = 2
-
- tx.enqueue(queueEntry)
-
- flushed = new CountDownLatch(1)
- store.flushMessage(message.key) {
- flushed.countDown
- }
- flushed.await
-
- val qso:Option[QueueStatus] = CB( cb=> store.getQueueStatus(queueA.key)(cb) )
- expect(ascii("queue:1")) {
- qso.get.record.name
- }
- expect(2) {
- qso.get.count
- }
-
- println("xx")
-
- }
-
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/StoreDTO.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/StoreDTO.java?rev=961129&r1=961128&r2=961129&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/StoreDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/StoreDTO.java Wed Jul 7 04:07:16 2010
@@ -18,8 +18,10 @@ package org.apache.activemq.apollo.dto;
import org.codehaus.jackson.annotate.JsonTypeInfo;
+import javax.xml.bind.annotation.XmlAttribute;
import javax.xml.bind.annotation.XmlSeeAlso;
import javax.xml.bind.annotation.XmlType;
+import java.io.File;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -29,4 +31,14 @@ import javax.xml.bind.annotation.XmlType
@JsonTypeInfo(use=JsonTypeInfo.Id.CLASS, include=JsonTypeInfo.As.PROPERTY, property="@class")
public abstract class StoreDTO {
+ /**
+ * The flush delay is the amount of time in milliseconds that a store
+ * will delay persisting a messaging unit of work in hopes that it will
+ * be invalidated shortly thereafter by another unit of work which
+ * would negate the operation.
+ */
+ @XmlAttribute(name="flush-delay", required=false)
+ public long flushDelay = 100;
+
+
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala?rev=961129&r1=961128&r2=961129&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala Wed Jul 7 04:07:16 2010
@@ -16,6 +16,9 @@
*/
package org.apache.activemq.broker.store.hawtdb
+import java.{lang=>jl}
+import java.{util=>ju}
+
import model.{AddQueue, AddQueueEntry, AddMessage}
import org.apache.activemq.apollo.store.{QueueEntryRecord, QueueStatus, MessageRecord}
import org.apache.activemq.apollo.dto.HawtDBStoreDTO
@@ -39,6 +42,7 @@ import org.apache.activemq.apollo.broker
import collection.mutable.{LinkedHashMap, HashMap, ListBuffer}
import collection.JavaConversions
import java.util.{TreeSet, HashSet}
+
import org.fusesource.hawtdb.api._
object HawtDBClient extends Log {
@@ -70,7 +74,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
private var lockFile: LockFile = null
private val trackingGen = new AtomicLong(0)
- private val lockedDatatFiles = new HashSet[java.lang.Integer]()
+ private val lockedDatatFiles = new HashSet[jl.Integer]()
private var recovering = false
private var nextRecoveryPosition: Location = null
@@ -221,6 +225,13 @@ class HawtDBClient(hawtDBStore: HawtDBSt
store(update)
}
+ def removeQueue(queueKey: Long):Boolean = {
+ val update = new RemoveQueue.Bean()
+ update.setKey(queueKey)
+ store(update)
+ true
+ }
+
def store(txs: Seq[HawtDBStore#HawtDBBatch]) {
var batch = List[TypeCreatable]()
txs.foreach {
@@ -670,7 +681,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
error("Message replay with different location for: %d", key)
}
} else {
- val fileId:java.lang.Integer = location.getDataFileId()
+ val fileId:jl.Integer = location.getDataFileId()
addAndGet(dataFileRefIndex, fileId, 1)
}
}
@@ -678,7 +689,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
def removeMessage(key:Long) = {
val location = messageKeyIndex.remove(key)
if (location != null) {
- val fileId:java.lang.Integer = location.getDataFileId()
+ val fileId:jl.Integer = location.getDataFileId()
addAndGet(dataFileRefIndex, fileId, -1)
} else {
error("Cannot remove message, it did not exist: %d", key)
@@ -698,8 +709,19 @@ class HawtDBClient(hawtDBStore: HawtDBSt
def apply(x: RemoveQueue.Getter): Unit = {
val queueRecord = queueIndex.remove(x.getKey)
if (queueRecord != null) {
- queueEntryIndex(queueRecord).destroy
- queueTrackingIndex(queueRecord).destroy
+ val trackingIndex = queueTrackingIndex(queueRecord)
+ val entryIndex = queueEntryIndex(queueRecord)
+
+ trackingIndex.iterator.map { entry=>
+ val messageKey = entry.getKey
+ if( addAndGet(messageRefsIndex, messageKey, -1) == 0 ) {
+ // message is no longer referenced.. we can remove it..
+ removeMessage(messageKey.longValue)
+ }
+ }
+
+ entryIndex.destroy
+ trackingIndex.destroy
}
}
@@ -725,7 +747,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
queueRecordUpdate.setSize(queueRecord.getSize + x.getSize)
queueIndex.put(queueKey, queueRecordUpdate.freeze)
- addAndGet(messageRefsIndex, new java.lang.Long(messageKey), 1)
+ addAndGet(messageRefsIndex, new jl.Long(messageKey), 1)
} else {
error("Duplicate queue entry seq %d", x.getQueueSeq)
}
@@ -752,8 +774,8 @@ class HawtDBClient(hawtDBStore: HawtDBSt
if (existing == null) {
error("Tracking entry not found for message %d", queueEntry.getMessageKey)
}
- if( addAndGet(messageRefsIndex, new java.lang.Long(messageKey), -1) == 0 ) {
- // message is no long referenced.. we can remove it..
+ if( addAndGet(messageRefsIndex, new jl.Long(messageKey), -1) == 0 ) {
+ // message is no longer referenced.. we can remove it..
removeMessage(messageKey)
}
} else {
@@ -869,7 +891,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
import helper._
debug("Cleanup started.")
- val gcCandidateSet = new TreeSet[Integer](journal.getFileMap().keySet())
+ val gcCandidateSet = new TreeSet[jl.Integer](journal.getFileMap().keySet())
// Don't cleanup locked data files
if (lockedDatatFiles != null) {
@@ -929,7 +951,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
lazy val messageRefsIndex = MESSAGE_REFS_INDEX_FACTORY.open(_tx, databaseRootRecord.getMessageRefsIndexPage)
lazy val subscriptionIndex = SUBSCRIPTIONS_INDEX_FACTORY.open(_tx, databaseRootRecord.getSubscriptionIndexPage)
- def addAndGet[K](index:SortedIndex[K, Integer], key:K, amount:Int):Int = {
+ def addAndGet[K](index:SortedIndex[K, jl.Integer], key:K, amount:Int):Int = {
var counter = index.get(key)
if( counter == null ) {
if( amount!=0 ) {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala?rev=961129&r1=961128&r2=961129&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala Wed Jul 7 04:07:16 2010
@@ -121,55 +121,61 @@ class HawtDBStore extends Store with Bas
/**
* Deletes all stored data from the store.
*/
- def purge(cb: =>Unit) = {
+ def purge(callback: =>Unit) = {
executor_pool ^{
client.purge
- cb
+ callback
}
}
- def addQueue(record: QueueRecord)(cb: (Option[Long]) => Unit) = {
+ def addQueue(record: QueueRecord)(callback: (Option[Long]) => Unit) = {
val key = next_queue_key.incrementAndGet
record.key = key
executor_pool ^{
client.addQueue(record)
- cb(Some(key))
+ callback(Some(key))
}
}
- def getQueueStatus(id: Long)(cb: (Option[QueueStatus]) => Unit) = {
+ def removeQueue(queueKey: Long)(callback: (Boolean) => Unit) = {
executor_pool ^{
- cb( client.getQueueStatus(id) )
+ callback(client.removeQueue(queueKey))
}
}
- def listQueues(cb: (Seq[Long]) => Unit) = {
+ def getQueueStatus(id: Long)(callback: (Option[QueueStatus]) => Unit) = {
executor_pool ^{
- cb( client.listQueues )
+ callback( client.getQueueStatus(id) )
}
}
- def loadMessage(id: Long)(cb: (Option[MessageRecord]) => Unit) = {
+ def listQueues(callback: (Seq[Long]) => Unit) = {
executor_pool ^{
- cb( client.loadMessage(id) )
+ callback( client.listQueues )
+ }
+ }
+
+ def loadMessage(id: Long)(callback: (Option[MessageRecord]) => Unit) = {
+ executor_pool ^{
+ callback( client.loadMessage(id) )
}
}
- def getQueueEntries(id: Long)(cb: (Seq[QueueEntryRecord]) => Unit) = {
+ def listQueueEntries(id: Long)(callback: (Seq[QueueEntryRecord]) => Unit) = {
executor_pool ^{
- cb( client.getQueueEntries(id) )
+ callback( client.getQueueEntries(id) )
}
}
- def flushMessage(id: Long)(cb: => Unit) = ^{
+ def flushMessage(id: Long)(callback: => Unit) = ^{
val action: HawtDBBatch#MessageAction = pendingStores.get(id)
if( action == null ) {
- cb
+ callback
} else {
val prevDisposer = action.tx.getDisposer
action.tx.setDisposer(^{
- cb
+ callback
if(prevDisposer!=null) {
prevDisposer.run
}
@@ -280,8 +286,12 @@ class HawtDBStore extends Store with Bas
val tx_id = next_tx_id.incrementAndGet
tx.txid = tx_id
delayedTransactions.put(tx_id, tx)
- dispatchQueue.dispatchAsync(^{flush(tx_id)})
- dispatchQueue.dispatchAfter(50, TimeUnit.MILLISECONDS, ^{flush(tx_id)})
+
+ if( config.flushDelay > 0 ) {
+ dispatchQueue.dispatchAfter(config.flushDelay, TimeUnit.MILLISECONDS, ^{flush(tx_id)})
+ } else {
+ dispatchQueue.dispatchAsync(^{flush(tx_id)})
+ }
tx.actions.foreach { case (msg, action) =>
if( action.store!=null ) {
Copied: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStoreTest.scala (from r961128, activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/StoreDTO.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStoreTest.scala?p2=activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStoreTest.scala&p1=activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/StoreDTO.java&r1=961128&r2=961129&rev=961129&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/StoreDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStoreTest.scala Wed Jul 7 04:07:16 2010
@@ -14,19 +14,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.apollo.dto;
+package org.apache.activemq.broker.store.cassandra
-import org.codehaus.jackson.annotate.JsonTypeInfo;
-
-import javax.xml.bind.annotation.XmlSeeAlso;
-import javax.xml.bind.annotation.XmlType;
+import org.apache.activemq.broker.store.{Store, StoreFunSuiteSupport}
+import org.apache.activemq.broker.store.hawtdb.HawtDBStore
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-@XmlType(name = "store-type")
-@XmlSeeAlso({CassandraStoreDTO.class, HawtDBStoreDTO.class})
-@JsonTypeInfo(use=JsonTypeInfo.Id.CLASS, include=JsonTypeInfo.As.PROPERTY, property="@class")
-public abstract class StoreDTO {
+class HawtDBStoreTest extends StoreFunSuiteSupport {
+
+ def createStore(flushDelay:Long):Store = {
+ val rc = new HawtDBStore
+ rc.config.flushDelay = flushDelay
+ rc
+ }
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala?rev=961129&r1=961128&r2=961129&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala Wed Jul 7 04:07:16 2010
@@ -25,30 +25,38 @@ import org.apache.activemq.apollo.broker
import org.apache.activemq.apollo.dto.StoreDTO
/**
- * A StoreTransaction is used to perform persistent
- * operations as unit of work.
+ * A store batch is used to perform persistent
+ * operations as a unit of work.
+ *
+ * The batch implements the Retained interface and is
+ * thread safe. Once the batch is no longer retained,
+ * the unit of work is executed.
*
- * The disposer assigned to the store transaction will
- * be executed once all associated persistent operations
- * have been persisted.
+ * The disposer assigned to the batch will
+ * be executed once the unit of work is persisted
+ * or it has been negated by subsequent storage
+ * operations.
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
trait StoreBatch extends Retained {
/**
- * Assigns the delivery a store id if it did not already
- * have one assigned.
+ * Stores a message. Messages a reference counted, so make sure you also
+ * enqueue it to queue if you don't want it to be discarded right away.
+ *
+ * This method auto generates and assigns the key field of the message record and
+ * returns it.
*/
- def store(delivery:MessageRecord):Long
+ def store(message:MessageRecord):Long
/**
- * Adds a delivery to a specified queue at a the specified position in the queue.
+ * Adds a queue entry
*/
def enqueue(entry:QueueEntryRecord)
/**
- * Removes a delivery from a specified queue at a the specified position in the queue.
+ * Removes a queue entry
*/
def dequeue(entry:QueueEntryRecord)
@@ -59,50 +67,65 @@ trait StoreBatch extends Retained {
*/
trait Store extends Service {
+ /**
+ * Creates a store batch which is used to perform persistent
+ * operations as unit of work.
+ */
+ def createStoreBatch():StoreBatch
+
+ /**
+ * Supplies configuration data to the Store. This will be called
+ * before the store is started, but may also occur after the the Store
+ * is started.
+ */
def configure(config: StoreDTO, reporter:Reporter):Unit
/**
- * Deletes all stored data from the store.
+ * Removes all previously stored data.
*/
- def purge(cb: =>Unit):Unit
+ def purge(callback: =>Unit):Unit
/**
- * Stores a queue, calls back with a unquie id for the stored queue.
+ * Adds a queue.
+ *
+ * This method auto generates and assigns the key field of the queue record and
+ * returns it via the callback.
*/
- def addQueue(record:QueueRecord)(cb:(Option[Long])=>Unit):Unit
+ def addQueue(record:QueueRecord)(callback:(Option[Long])=>Unit):Unit
/**
- * Loads the queue information for a given queue id.
+ * Removes a queue. Success is reported via the callback.
*/
- def getQueueStatus(id:Long)(cb:(Option[QueueStatus])=>Unit )
+ def removeQueue(queueKey:Long)(callback:(Boolean)=>Unit):Unit
/**
- * gets a listing of all queues previously added.
+ * Loads the queue information for a given queue key.
*/
- def listQueues(cb: (Seq[Long])=>Unit )
+ def getQueueStatus(queueKey:Long)(callback:(Option[QueueStatus])=>Unit )
+
+ /**
+ * Gets a listing of all queue entry sequences previously added
+ * and reports them to the callback.
+ */
+ def listQueues(callback: (Seq[Long])=>Unit )
/**
* Loads the queue information for a given queue id.
*/
- def getQueueEntries(id:Long)(cb:(Seq[QueueEntryRecord])=>Unit )
+ def listQueueEntries(queueKey:Long)(callback:(Seq[QueueEntryRecord])=>Unit )
/**
* Removes a the delivery associated with the provided from any
* internal buffers/caches. The callback is executed once, the message is
* no longer buffered.
*/
- def flushMessage(id:Long)(cb: =>Unit)
+ def flushMessage(messageKey:Long)(callback: =>Unit)
/**
* Loads a delivery with the associated id from persistent storage.
*/
- def loadMessage(id:Long)(cb:(Option[MessageRecord])=>Unit )
+ def loadMessage(messageKey:Long)(callback:(Option[MessageRecord])=>Unit )
- /**
- * Creates a StoreBatch which is used to perform persistent
- * operations as unit of work.
- */
- def createStoreBatch():StoreBatch
}
Added: activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala?rev=961129&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala Wed Jul 7 04:07:16 2010
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store
+
+import org.fusesource.hawtbuf.AsciiBuffer._
+import org.fusesource.hawtdispatch.ScalaDispatch._
+import org.fusesource.hawtdispatch.TaskTracker
+import org.apache.activemq.apollo.broker.{LoggingTracker, FunSuiteSupport}
+import java.util.concurrent.{TimeUnit, CountDownLatch}
+import org.apache.activemq.apollo.store.{QueueEntryRecord, QueueStatus, QueueRecord, MessageRecord}
+import org.scalatest.{BeforeAndAfterEach, BeforeAndAfterAll}
+import collection.mutable.ListBuffer
+
+/**
+ * <p>Implements generic testing of Store implementations.</p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract class StoreFunSuiteSupport extends FunSuiteSupport with BeforeAndAfterEach {
+
+ var store:Store = null
+
+ def createStore(flushDelay:Long):Store
+
+ /**
+ * Handy helper to call an async method on the store and wait for
+ * the result of the callback.
+ */
+ def CB[T](func: (T=>Unit)=>Unit ) = {
+ class X {
+ var value:T = _
+ }
+ val rc = new X
+ val cd = new CountDownLatch(1)
+ def cb(x:T) = {
+ rc.value = x
+ cd.countDown
+ }
+ func(cb)
+ cd.await
+ rc.value
+ }
+
+
+ override protected def beforeAll() = {
+ store = createStore(5*1000)
+ val tracker = new LoggingTracker("store startup")
+ tracker.start(store)
+ tracker.await
+ }
+
+ override protected def afterAll() = {
+ val tracker = new LoggingTracker("store stop")
+ tracker.stop(store)
+ tracker.await
+ }
+
+ override protected def beforeEach() = {
+ val tracker = new LoggingTracker("store startup")
+ val task = tracker.task("purge")
+ store.purge(task.run)
+ tracker.await
+ }
+
+ def expectCB[T](expected:T)(func: (T=>Unit)=>Unit ) = {
+ expect(expected) {
+ CB(func)
+ }
+ }
+
+ def addQueue(name:String):Long = {
+ var queueA = new QueueRecord
+ queueA.name = ascii(name)
+ val rc:Option[Long] = CB( cb=> store.addQueue(queueA)(cb) )
+ expect(true)(rc.isDefined)
+ rc.get
+ }
+
+ def addMessage(batch:StoreBatch, content:String):Long = {
+ var message = new MessageRecord
+ message.protocol = ascii("test-protocol")
+ message.value = ascii(content).buffer
+ message.size = message.value.length
+ batch.store(message)
+ }
+
+
+ def entry(queueKey:Long, queueSeq:Long, messageKey:Long=0) = {
+ var queueEntry = new QueueEntryRecord
+ queueEntry.queueKey = queueKey
+ queueEntry.queueSeq = queueSeq
+ queueEntry.messageKey = messageKey
+ queueEntry
+ }
+
+ def populate(queueKey:Long, messages:List[String], firstSeq:Long=1) = {
+ var batch = store.createStoreBatch
+ var msgKeys = ListBuffer[Long]()
+ var nextSeq = firstSeq
+
+ messages.foreach { message=>
+ val msgKey = addMessage(batch, message)
+ msgKeys += msgKey
+ batch.enqueue(entry(queueKey, nextSeq, msgKey))
+ nextSeq += 1
+ }
+
+ val tracker = new TaskTracker()
+ tracker.release(batch)
+ msgKeys.foreach { msgKey =>
+ store.flushMessage(msgKey) {}
+ }
+ tracker.await
+ msgKeys
+ }
+
+ test("add and list queues") {
+ val A = addQueue("A")
+ val B = addQueue("B")
+ val C = addQueue("C")
+
+ expectCB(List(A,B,C).toSeq) { cb=>
+ store.listQueues(cb)
+ }
+ }
+
+ test("get queue status") {
+ val A = addQueue("my queue name")
+ populate(A, "message 1"::"message 2"::"message 3"::Nil)
+
+ val rc:Option[QueueStatus] = CB( cb=> store.getQueueStatus(A)(cb) )
+ expect(ascii("my queue name")) {
+ rc.get.record.name
+ }
+ expect(3) {
+ rc.get.count
+ }
+ }
+
+ test("list queue entries") {
+ val A = addQueue("A")
+ val msgKeys = populate(A, "message 1"::"message 2"::"message 3"::Nil)
+
+ val rc:Seq[QueueEntryRecord] = CB( cb=> store.listQueueEntries(A)(cb) )
+ expect(msgKeys.toSeq) {
+ rc.map( _.messageKey )
+ }
+ }
+
+ test("load stored message") {
+ val A = addQueue("A")
+ val msgKeys = populate(A, "message 1"::"message 2"::"message 3"::Nil)
+
+ val rc:Option[MessageRecord] = CB( cb=> store.loadMessage(msgKeys.head)(cb) )
+ expect(ascii("message 1").buffer) {
+ rc.get.value
+ }
+ }
+
+ test("batch completes after a delay") {x}
+ def x = {
+ val A = addQueue("A")
+ var batch = store.createStoreBatch
+
+ val m1 = addMessage(batch, "message 1")
+ batch.enqueue(entry(A, 1, m1))
+
+ val tracker = new TaskTracker()
+ tracker.release(batch)
+ expect(false) {
+ tracker.await(3, TimeUnit.SECONDS)
+ }
+ expect(true) {
+ tracker.await(3, TimeUnit.SECONDS)
+ }
+ }
+
+ test("flush cancels the completion delay") {
+ val A = addQueue("A")
+ var batch = store.createStoreBatch
+
+ val m1 = addMessage(batch, "message 1")
+ batch.enqueue(entry(A, 1, m1))
+
+ val tracker = new TaskTracker()
+ tracker.release(batch)
+
+ store.flushMessage(m1) {}
+
+ expect(true) {
+ tracker.await(1, TimeUnit.SECONDS)
+ }
+ }
+
+
+}