You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:06:48 UTC
svn commit: r961128 [1/2] - in /activemq/sandbox/activemq-apollo-actor:
activemq-dto/src/main/java/org/apache/activemq/apollo/dto/
activemq-hawtdb/src/main/proto/
activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/
activemq-hawtdb/s...
Author: chirino
Date: Wed Jul 7 04:06:47 2010
New Revision: 961128
URL: http://svn.apache.org/viewvc?rev=961128&view=rev
Log:
making more progress on the hawtdb-store
Added:
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Helpers.scala
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/HawtDBManagerBenchmark.java
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/HawtDBManagerTest.java
Removed:
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Codecs.java
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/Callback.java
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/DestinationEntity.java
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/DuplicateKeyException.java
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/FatalStoreException.java
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBManager.java
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBSession.java
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/KeyNotFoundException.java
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/MessageKeys.java
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/RootEntity.java
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/VoidCallback.java
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBManagerBenchmark.java
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBManagerTest.java
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/HawtDBStoreDTO.java
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/data.proto
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/RootEntity.scala
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/HawtDBStoreDTO.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/HawtDBStoreDTO.java?rev=961128&r1=961127&r2=961128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/HawtDBStoreDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/HawtDBStoreDTO.java Wed Jul 7 04:06:47 2010
@@ -32,8 +32,8 @@ public class HawtDBStoreDTO extends Stor
@XmlAttribute(name="directory", required=false)
public File directory;
- @XmlAttribute(name="checkpoint-interval", required=false)
- public long checkpointInterval = 5 * 1000L;
+ @XmlAttribute(name="index-flush-interval", required=false)
+ public long indexFlushInterval = 5 * 1000L;
@XmlAttribute(name="cleanup-interval", required=false)
public long cleanupInterval = 30 * 1000L;
Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/data.proto
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/data.proto?rev=961128&r1=961127&r2=961128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/data.proto (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/data.proto Wed Jul 7 04:06:47 2010
@@ -154,18 +154,27 @@ message RemoveStream {
///////////////////////////////////////////////////////////////
// Index Structures
///////////////////////////////////////////////////////////////
-message RootRecord {
+message DatabaseRootRecord {
required fixed32 state=1;
required fixed64 lastMessageKey=2;
required fixed64 firstInProgressBatch=3;
required fixed64 lastUpdateLocation=4;
- required fixed32 locationIndexPage=5;
+ required fixed32 dataFileRefIndexPage=5;
required fixed32 messageKeyIndexPage=6;
required fixed32 messageRefsIndexPage=7;
- required fixed32 destinationIndexPage=8;
+ required fixed32 queueIndexPage=8;
required fixed32 subscriptionIndexPage=10;
required fixed32 mapIndexPage=11;
-}
\ No newline at end of file
+}
+
+message QueueRootRecord {
+ required AddQueue info=1;
+ required int64 size=2;
+ required int64 count=3;
+ required fixed32 entryIndexPage=4;
+ required fixed32 trackingIndexPage=5;
+}
+
Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala?rev=961128&r1=961127&r2=961128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala Wed Jul 7 04:06:47 2010
@@ -26,9 +26,6 @@ import org.apache.activemq.apollo.store.
import org.fusesource.hawtbuf.proto.MessageBuffer
import org.fusesource.hawtbuf.proto.PBMessage
import org.apache.activemq.util.LockFile
-import org.fusesource.hawtdb.api.{Transaction, TxPageFileFactory}
-import java.util.HashSet
-import collection.mutable.{HashMap, ListBuffer}
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
import org.fusesource.hawtdb.internal.journal.{JournalCallback, Journal, Location}
import org.fusesource.hawtdispatch.TaskTracker
@@ -39,84 +36,52 @@ import org.apache.activemq.broker.store.
import org.fusesource.hawtbuf._
import org.fusesource.hawtdispatch.ScalaDispatch._
import org.apache.activemq.apollo.broker.{Log, Logging, BaseService}
+import collection.mutable.{LinkedHashMap, HashMap, ListBuffer}
+import collection.JavaConversions
+import java.util.{TreeSet, HashSet}
+import org.fusesource.hawtdb.api._
object HawtDBClient extends Log {
-
- type PB = PBMessage[_ <: PBMessage[_,_], _ <: MessageBuffer[_,_]]
-
- implicit def toPBMessage(value:TypeCreatable):PB = value.asInstanceOf[PB]
-
val BEGIN = -1
val COMMIT = -2
+ val ROLLBACK = -3
val DATABASE_LOCKED_WAIT_DELAY = 10 * 1000
val CLOSED_STATE = 1
val OPEN_STATE = 2
-
- implicit def decodeMessageRecord(pb: AddMessage.Getter): MessageRecord = {
- val rc = new MessageRecord
- rc.protocol = pb.getProtocol
- rc.size = pb.getSize
- rc.value = pb.getValue
- rc.stream = pb.getStreamKey
- rc.expiration = pb.getExpiration
- rc
- }
-
- implicit def encodeMessageRecord(v: MessageRecord): AddMessage.Bean = {
- val pb = new AddMessage.Bean
- pb.setProtocol(v.protocol)
- pb.setSize(v.size)
- pb.setValue(v.value)
- pb.setStreamKey(v.stream)
- pb.setExpiration(v.expiration)
- pb
- }
-
- implicit def decodeQueueEntryRecord(pb: AddQueueEntry.Getter): QueueEntryRecord = {
- val rc = new QueueEntryRecord
- rc.messageKey = pb.getMessageKey
- rc.attachment = pb.getAttachment
- rc.size = pb.getSize
- rc.redeliveries = pb.getRedeliveries.toShort
- rc
- }
-
- implicit def encodeQueueEntryRecord(v: QueueEntryRecord): AddQueueEntry.Bean = {
- val pb = new AddQueueEntry.Bean
- pb.setMessageKey(v.messageKey)
- pb.setAttachment(v.attachment)
- pb.setSize(v.size)
- pb.setRedeliveries(v.redeliveries)
- pb
- }
}
/**
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class HawtDBClient() extends Logging {
+class HawtDBClient(hawtDBStore: HawtDBStore) extends Logging {
import HawtDBClient._
+ import Helpers._
override def log: Log = HawtDBClient
- val dispatchQueue = createQueue("hawtdb store")
+ def dispatchQueue = hawtDBStore.dispatchQueue
private val pageFileFactory = new TxPageFileFactory()
private var journal: Journal = null
private var lockFile: LockFile = null
- private var nextRecoveryPosition: Location = null
- private var lastRecoveryPosition: Location = null
private val trackingGen = new AtomicLong(0)
+ private val lockedDatatFiles = new HashSet[java.lang.Integer]()
- private val journalFilesBeingReplicated = new HashSet[Integer]()
private var recovering = false
+ private var nextRecoveryPosition: Location = null
+ private var lastRecoveryPosition: Location = null
+ private var recoveryCounter = 0
- //protected RootEntity rootEntity = new RootEntity()
+ var databaseRootRecord = new DatabaseRootRecord.Bean
+
+
+ val next_batch_counter = new AtomicInteger(0)
+ private var batches = new LinkedHashMap[Int, (Location, ListBuffer[Update])]()
/////////////////////////////////////////////////////////////////////
//
@@ -128,7 +93,7 @@ class HawtDBClient() extends Logging {
private def journalMaxFileLength = config.journalLogSize
- private def checkpointInterval = config.checkpointInterval
+ private def checkpointInterval = config.indexFlushInterval
private def cleanupInterval = config.cleanupInterval
@@ -139,7 +104,7 @@ class HawtDBClient() extends Logging {
/////////////////////////////////////////////////////////////////////
//
- // Public interface
+ // Public interface used by the HawtDBStore
//
/////////////////////////////////////////////////////////////////////
@@ -168,12 +133,17 @@ class HawtDBClient() extends Logging {
}
}
+ def createJournal() = {
+ val journal = new Journal()
+ journal.setDirectory(directory)
+ journal.setMaxFileLength(config.journalLogSize)
+ journal
+ }
+
def start() = {
lock {
- journal = new Journal()
- journal.setDirectory(directory)
- journal.setMaxFileLength(config.journalLogSize)
+ journal = createJournal()
journal.start
pageFileFactory.setFile(new File(directory, "db"))
@@ -182,14 +152,30 @@ class HawtDBClient() extends Logging {
pageFileFactory.setUseWorkerThread(true)
pageFileFactory.open()
- withTx {tx =>
- if (!tx.allocator().isAllocated(0)) {
- // rootEntity.allocate(tx)
- }
- // rootEntity.load(tx)
+ withTx { tx =>
+ val helper = new TxHelper(tx)
+ import helper._
+
+ if (!tx.allocator().isAllocated(0)) {
+ val rootPage = tx.alloc()
+ assert(rootPage == 0)
+
+ databaseRootRecord.setQueueIndexPage(alloc(QUEUE_INDEX_FACTORY))
+ databaseRootRecord.setMessageKeyIndexPage(alloc(MESSAGE_KEY_INDEX_FACTORY))
+ databaseRootRecord.setDataFileRefIndexPage(alloc(DATA_FILE_REF_INDEX_FACTORY))
+ databaseRootRecord.setMessageRefsIndexPage(alloc(MESSAGE_REFS_INDEX_FACTORY))
+ databaseRootRecord.setSubscriptionIndexPage(alloc(SUBSCRIPTIONS_INDEX_FACTORY))
+
+ tx.put(DATABASE_ROOT_RECORD_ACCESSOR, 0, databaseRootRecord.freeze)
+ databaseRootRecord = databaseRootRecord.copy
+ } else {
+ databaseRootRecord = tx.get(DATABASE_ROOT_RECORD_ACCESSOR, 0).copy
+ }
}
+
pageFile.flush()
- // recover()
+ recover
+
// trackingGen.set(rootEntity.getLastMessageTracking() + 1)
// checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
@@ -227,7 +213,6 @@ class HawtDBClient() extends Logging {
def stop() = {
}
-
def addQueue(record: QueueRecord) = {
val update = new AddQueue.Bean()
update.setKey(record.key)
@@ -236,177 +221,160 @@ class HawtDBClient() extends Logging {
store(update)
}
+ def store(txs: Seq[HawtDBStore#HawtDBBatch]) {
+ var batch = List[TypeCreatable]()
+ txs.foreach {
+ tx =>
+ tx.actions.foreach {
+ case (msg, action) =>
+ if (action.store != null) {
+ val update: AddMessage.Bean = action.store
+ batch ::= update
+ }
+ action.enqueues.foreach {
+ queueEntry =>
+ val update: AddQueueEntry.Bean = queueEntry
+ batch ::= update
+ }
+ action.dequeues.foreach {
+ queueEntry =>
+ val qid = queueEntry.queueKey
+ val seq = queueEntry.queueSeq
+ batch ::= new RemoveQueueEntry.Bean().setQueueKey(qid).setQueueSeq(seq)
+ }
+ }
+ }
+ store(batch)
+ }
+
def purge() = {
-// withSession {
-// session =>
-// session.list(schema.queue_name).map {
-// x =>
-// val qid: Long = x.name
-// session.remove(schema.entries \ qid)
-// }
-// session.remove(schema.queue_name)
-// session.remove(schema.message_data)
-// }
+ val update = new Purge.Bean()
+ store(update)
}
def listQueues: Seq[Long] = {
- null
-// withSession {
-// session =>
-// session.list(schema.queue_name).map {
-// x =>
-// val id: Long = x.name
-// id
-// }
-// }
- }
-
- def getQueueStatus(id: Long): Option[QueueStatus] = {
- null
-// withSession {
-// session =>
-// session.get(schema.queue_name \ id) match {
-// case Some(x) =>
-//
-// val rc = new QueueStatus
-// rc.record = new QueueRecord
-// rc.record.key = id
-// rc.record.name = new AsciiBuffer(x.value)
-//
-// // rc.count = session.count( schema.entries \ id )
-//
-// // TODO
-// // rc.count =
-// // rc.first =
-// // rc.last =
-//
-// Some(rc)
-// case None =>
-// None
-// }
-// }
+ withTx { tx =>
+ val helper = new TxHelper(tx)
+ import JavaConversions._
+ import helper._
+ queueIndex.iterator.map {
+ entry =>
+ entry.getKey.longValue
+ }.toSeq
+ }
}
+ def getQueueStatus(queueKey: Long): Option[QueueStatus] = {
+ withTx { tx =>
+ val helper = new TxHelper(tx)
+ import JavaConversions._
+ import helper._
+
+ val queueRecord = queueIndex.get(queueKey)
+ if (queueRecord != null) {
+ val rc = new QueueStatus
+ rc.record = new QueueRecord
+ rc.record.key = queueKey
+ rc.record.name = queueRecord.getInfo.getName
+ rc.record.queueType = queueRecord.getInfo.getQueueType
+ rc.count = queueRecord.getCount.toInt
+ rc.size = queueRecord.getSize
+
+ // TODO
+ // rc.first =
+ // rc.last =
+
+ Some(rc)
+ } else {
+ None
+ }
+ }
+ }
- def store(txs: Seq[HawtDBStore#HawtDBBatch]) {
-// withSession {
-// session =>
-// var operations = List[Operation]()
-// txs.foreach {
-// tx =>
-// tx.actions.foreach {
-// case (msg, action) =>
-// var rc =
-// if (action.store != null) {
-// operations ::= Insert( schema.message_data \ (msg, action.store) )
-// }
-// action.enqueues.foreach {
-// queueEntry =>
-// val qid = queueEntry.queueKey
-// val seq = queueEntry.queueSeq
-// operations ::= Insert( schema.entries \ qid \ (seq, queueEntry) )
-// }
-// action.dequeues.foreach {
-// queueEntry =>
-// val qid = queueEntry.queueKey
-// val seq = queueEntry.queueSeq
-// operations ::= Delete( schema.entries \ qid, ColumnPredicate(seq :: Nil) )
-// }
-// }
-// }
-// session.batch(operations)
-// }
- }
-
- def loadMessage(id: Long): Option[MessageRecord] = {
- null
-// withSession {
-// session =>
-// session.get(schema.message_data \ id) match {
-// case Some(x) =>
-// val rc: MessageRecord = x.value
-// rc.key = id
-// Some(rc)
-// case None =>
-// None
-// }
-// }
- }
-
- def getQueueEntries(qid: Long): Seq[QueueEntryRecord] = {
- null
-// withSession {
-// session =>
-// session.list(schema.entries \ qid).map {
-// x =>
-// val rc: QueueEntryRecord = x.value
-// rc.queueKey = qid
-// rc.queueSeq = x.name
-// rc
-// }
-// }
+
+ def getQueueEntries(queueKey: Long): Seq[QueueEntryRecord] = {
+ withTx { tx =>
+ val helper = new TxHelper(tx)
+ import JavaConversions._
+ import helper._
+
+ val queueRecord = queueIndex.get(queueKey)
+ if (queueRecord != null) {
+ val entryIndex = queueEntryIndex(queueRecord)
+ entryIndex.iterator.map {
+ entry =>
+ val rc: QueueEntryRecord = entry.getValue
+ rc
+ }.toSeq
+ } else {
+ Nil.toSeq
+ }
+ }
}
+ def loadMessage(messageKey: Long): Option[MessageRecord] = {
+ withTx { tx =>
+ val helper = new TxHelper(tx)
+ import JavaConversions._
+ import helper._
+
+ val location = messageKeyIndex.get(messageKey)
+ if (location != null) {
+ load(location, classOf[AddMessage.Getter]) match {
+ case Some(x) =>
+ val messageRecord: MessageRecord = x
+ Some(messageRecord)
+ case None => None
+ }
+ } else {
+ None
+ }
+ }
+ }
+
+
/////////////////////////////////////////////////////////////////////
//
- // Implementation
+ // Batch/Transactional interface to storing/accessing journaled updates.
//
/////////////////////////////////////////////////////////////////////
- private def withTx[T](func: (Transaction) => T) {
- val tx = pageFile.tx
- var ok = false
+ private def load[T <: TypeCreatable](location: Location, expected: Class[T]): Option[T] = {
try {
- val rc = func(tx)
- ok = true
- rc
- } finally {
- if (ok) {
- tx.commit
- } else {
- tx.rollback
+ read(location) match {
+ case (updateType, data) =>
+ Some(expected.cast(decode(location, updateType, data)))
}
+ } catch {
+ case e: Exception =>
+ debug("Could not load journal record at: %s", location)
+ None
}
}
- val next_batch_counter = new AtomicInteger(0)
-
- // Gets the next batch id.. after a while we may wrap around
- // start producing batch ids from zero
- val next_batch_id = {
- var rc = next_batch_counter.getAndIncrement
- while (rc < 0) {
- // We just wrapped around.. reset the counter to 0
- // Use a CAS operation so that only 1 thread resets the counter
- next_batch_counter.compareAndSet(rc + 1, 0)
- rc = next_batch_counter.getAndIncrement
- }
- rc
- }
-
-
- private def store(updates: List[TypeCreatable]):Unit = {
+ private def store(updates: List[TypeCreatable]): Unit = {
val tracker = new TaskTracker("storing")
- store( updates, tracker.task(updates))
+ store(updates, tracker.task(updates))
tracker.await
}
- private def store(update: TypeCreatable):Unit = {
+ private def store(update: TypeCreatable): Unit = {
val tracker = new TaskTracker("storing")
- store( update, tracker.task(update))
+ store(update, tracker.task(update))
tracker.await
}
- private def store(updates: List[TypeCreatable], onComplete: Runnable):Unit = {
+ private def store(updates: List[TypeCreatable], onComplete: Runnable): Unit = {
val batch = next_batch_id
begin(batch)
- updates.foreach {update =>
- store(batch, update, null)
+ updates.foreach {
+ update =>
+ store(batch, update, null)
}
commit(batch, onComplete)
}
- private def store(update: TypeCreatable, onComplete: Runnable):Unit = store(-1, update, onComplete)
+ private def store(update: TypeCreatable, onComplete: Runnable): Unit = store(-1, update, onComplete)
/**
* All updated are are funneled through this method. The updates are logged to
@@ -415,7 +383,7 @@ class HawtDBClient() extends Logging {
*
* @throws IOException
*/
- private def store(batch: Int, update: TypeCreatable, onComplete: Runnable):Unit = {
+ private def store(batch: Int, update: TypeCreatable, onComplete: Runnable): Unit = {
val kind = update.asInstanceOf[TypeCreatable]
val frozen = update.freeze
val baos = new DataByteArrayOutputStream(frozen.serializedSizeUnframed + 1)
@@ -423,191 +391,615 @@ class HawtDBClient() extends Logging {
baos.writeInt(batch)
frozen.writeUnframed(baos)
- journal(baos.toBuffer()) {location =>
- store(batch, update, onComplete, location)
+ append(baos.toBuffer()) {
+ location =>
+ executeStore(batch, update, onComplete, location)
}
}
-
/**
*/
- private def begin(batch: Int):Unit = {
+ private def begin(batch: Int): Unit = {
val baos = new DataByteArrayOutputStream(5)
baos.writeByte(BEGIN)
baos.writeInt(batch)
- journal(baos.toBuffer) {location =>
- begin(batch, location)
+ append(baos.toBuffer) {
+ location =>
+ executeBegin(batch, location)
}
}
/**
*/
- private def commit(batch: Int, onComplete: Runnable):Unit = {
+ private def commit(batch: Int, onComplete: Runnable): Unit = {
val baos = new DataByteArrayOutputStream(5)
baos.writeByte(COMMIT)
baos.writeInt(batch)
- journal(baos.toBuffer) {location =>
- commit(batch, onComplete, location)
+ append(baos.toBuffer) {
+ location =>
+ executeCommit(batch, onComplete, location)
}
}
- private def journal(data: Buffer)(cb: (Location) => Unit):Unit = {
- val start = System.currentTimeMillis()
- try {
- journal.write(data, new JournalCallback() {
- def success(location: Location) = {
- cb(location)
- }
- })
- } finally {
- val end = System.currentTimeMillis()
- if (end - start > 1000) {
- warn("KahaDB long enqueue time: Journal add took: " + (end - start) + " ms")
- }
+ private def rollback(batch: Int, onComplete: Runnable): Unit = {
+ val baos = new DataByteArrayOutputStream(5)
+ baos.writeByte(ROLLBACK)
+ baos.writeInt(batch)
+ append(baos.toBuffer) {
+ location =>
+ executeRollback(batch, onComplete, location)
}
}
+ /////////////////////////////////////////////////////////////////////
+ //
+ // Methods related to recovery
+ //
+ /////////////////////////////////////////////////////////////////////
/**
- * Move all the messages that were in the journal into long term storage. We
- * just replay and do a checkpoint.
+ * Move all the messages that were in the journal into the indexes.
*
* @throws IOException
* @throws IOException
* @throws IllegalStateException
*/
- def recover = {
+ def recover: Unit = {
+ recoveryCounter = 0
+ lastRecoveryPosition = null
+ val start = System.currentTimeMillis()
+ incrementalRecover
+
+ store(new AddTrace.Bean().setMessage("RECOVERED"), ^ {
+ // Rollback any batches that did not complete.
+ batches.keysIterator.foreach {
+ batch =>
+ rollback(batch, null)
+ }
+ })
+
+ val end = System.currentTimeMillis()
+ info("Processed %d operations from the journal in %,.3f seconds.", recoveryCounter, ((end - start) / 1000.0f))
+ }
+
+
+ /**
+ * incrementally recovers the journal. It can be run again and again
+ * if the journal is being appended to.
+ */
+ def incrementalRecover(): Unit = {
+
+ // Is this our first incremental recovery pass?
+ if (lastRecoveryPosition == null) {
+ if (databaseRootRecord.hasFirstInProgressBatch) {
+ // we have to start at the first in progress batch usually...
+ nextRecoveryPosition = databaseRootRecord.getFirstInProgressBatch
+ } else {
+ // but perhaps there were no batches in progress..
+ if (databaseRootRecord.hasLastUpdateLocation) {
+ // then we can just continue from the last update applied to the index
+ nextRecoveryPosition = journal.getNextLocation(databaseRootRecord.getLastUpdateLocation)
+ } else {
+ // no updates in the index?.. start from the first record in the journal.
+ nextRecoveryPosition = journal.getNextLocation(null)
+ }
+ }
+ } else {
+ nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition)
+ }
+
try {
- val start = System.currentTimeMillis()
recovering = true
- var location = getRecoveryPosition()
- if (location != null) {
- var counter = 0
- var uow: Transaction = null
- val uowCounter = 0
- while (location != null) {
- import BufferEditor.BIG_ENDIAN._
-
- var data = journal.read(location)
- val updateType = readByte(data)
- val batch = readInt(data)
- updateType match {
- case BEGIN => begin(batch, location)
- case COMMIT => commit(batch, null, location)
- case _ =>
- val update = decode(location, updateType, data)
- store(batch, update, null, location)
- }
- counter += 1
- location = journal.getNextLocation(location)
- }
- val end = System.currentTimeMillis()
- info("Processed %d operations from the journal in %,.3f seconds.", counter, ((end - start) / 1000.0f))
+ // Continue recovering until journal runs out of records.
+ while (nextRecoveryPosition != null) {
+ lastRecoveryPosition = nextRecoveryPosition
+ recover(lastRecoveryPosition)
+ nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition)
}
- // We may have to undo some index updates.
-// withTx {tx =>
-// recoverIndex(tx)
-// }
} finally {
recovering = false
}
}
- def decode(location:Location, updateType:Int, value:Buffer) = {
- val t = Type.valueOf(updateType);
- if (t == null) {
- throw new IOException("Could not load journal record. Invalid type at location: " + location);
- }
- t.parseUnframed(value).asInstanceOf[TypeCreatable]
+ /**
+ * Recovers the logged record at the specified location.
+ */
+ def recover(location: Location): Unit = {
+ var data = journal.read(location)
+
+ val editor = data.bigEndianEditor
+ val updateType = editor.readByte()
+ val batch = editor.readInt()
+
+ updateType match {
+ case BEGIN => executeBegin(batch, location)
+ case COMMIT => executeCommit(batch, null, location)
+ case _ =>
+ val update = decode(location, updateType, data)
+ executeStore(batch, update, null, location)
+ }
+
+ recoveryCounter += 1
+ databaseRootRecord.setLastUpdateLocation(location)
}
-// def incrementalRecover() = {
-// try {
-// recovering = true
-// if (nextRecoveryPosition == null) {
-// if (lastRecoveryPosition == null) {
-// nextRecoveryPosition = getRecoveryPosition()
-// } else {
-// nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition)
-// }
-// }
-// while (nextRecoveryPosition != null) {
-// lastRecoveryPosition = nextRecoveryPosition
-// rootEntity.setLastUpdate(lastRecoveryPosition)
-// val message = load(lastRecoveryPosition)
-// val location = lastRecoveryPosition
-//
-// withTx {tx =>
-// updateIndex(tx, message.toType(), (MessageBuffer) message, location)
-// }
-// nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition)
-// }
-// } finally {
-// recovering = false
-// }
-// }
-
-
- def getRecoveryPosition(): Location = {
-// if (rootEntity.getLastUpdate() != null) {
-// // Start replay at the record after the last one recorded in the
-// // index file.
-// return journal.getNextLocation(rootEntity.getLastUpdate());
-// }
+ /////////////////////////////////////////////////////////////////////
+ //
+ // Methods for Journal access
+ //
+ /////////////////////////////////////////////////////////////////////
- // This loads the first position.
- return journal.getNextLocation(null);
+ private def append(data: Buffer)(cb: (Location) => Unit): Unit = {
+ val start = System.currentTimeMillis()
+ try {
+ journal.write(data, new JournalCallback() {
+ def success(location: Location) = {
+ cb(location)
+ }
+ })
+ } finally {
+ val end = System.currentTimeMillis()
+ if (end - start > 1000) {
+ warn("KahaDB long enqueue time: Journal add took: " + (end - start) + " ms")
+ }
+ }
}
+ def read(location: Location) = {
+ var data = journal.read(location)
+ val editor = data.bigEndianEditor
+ val updateType = editor.readByte()
+ (updateType, data)
+ }
- private var batches = new HashMap[Int, ListBuffer[Update]]()
- private case class Update(update: TypeCreatable, location: Location)
+ /////////////////////////////////////////////////////////////////////
+ //
+ // Methods that execute updates stored in the journal by indexing them
+ // Used both in normal operation and durring recovery.
+ //
+ /////////////////////////////////////////////////////////////////////
+
+ private def executeBegin(batch: Int, location: Location): Unit = {
+ assert(batches.get(batch).isEmpty)
+ batches.put(batch, (location, ListBuffer()))
+ }
+
+ private def executeCommit(batch: Int, onComplete: Runnable, location: Location): Unit = {
+ // apply all the updates in the batch as a single unit of work.
+ batches.remove(batch) match {
+ case Some((_, updates)) =>
+ // When recovering.. we only want to redo updates that committed
+ // after the last update location.
+ if (!recovering || isAfterLastUpdateLocation(location)) {
+ withTx { tx =>
+ // index the updates
+ updates.foreach {
+ update =>
+ index(tx, update.update, update.location)
+ }
+ updateLocations(tx, location)
+ }
+ }
+ case None =>
+ // when recovering.. we are more lax due recovery starting
+ // in the middle of a stream of in progress batches
+ assert(!recovering)
+ }
+ if (onComplete != null) {
+ onComplete.run
+ }
+ }
+
+ private def executeRollback(batch: Int, onComplete: Runnable, location: Location): Unit = {
+ // apply all the updates in the batch as a single unit of work.
+ batches.remove(batch) match {
+ case Some((_, _)) =>
+ if (!recovering || isAfterLastUpdateLocation(location)) {
+ withTx { tx =>
+ updateLocations(tx, location)
+ }
+ }
+ case None =>
+ // when recovering.. we are more lax due recovery starting
+ // in the middle of a stream of in progress batches
+ assert(!recovering)
+ }
+ if (onComplete != null) {
+ onComplete.run
+ }
+ }
- private def store(batch: Int, update: TypeCreatable, onComplete: Runnable, location: Location): Unit = {
+ private def executeStore(batch: Int, update: TypeCreatable, onComplete: Runnable, location: Location): Unit = {
if (batch == -1) {
- // update is not part of the batch.. apply it now.
- withTx {tx =>
- store(tx, update, location)
+ // update is not part of the batch..
+
+ // When recovering.. we only want to redo updates that happen
+ // after the last update location.
+ if (!recovering || isAfterLastUpdateLocation(location)) {
+ withTx { tx =>
+ // index the update now.
+ index(tx, update, location)
+ updateLocations(tx, location)
+ }
}
+
if (onComplete != null) {
onComplete.run
}
} else {
+
+ // only the commit/rollback in batch can have an onCompelte handler
+ assert(onComplete == null)
+
// if the update was part of a batch don't apply till the batch is committed.
batches.get(batch) match {
- case Some(updates)=> updates += Update(update, location)
+ case Some((_, updates)) =>
+ updates += Update(update, location)
case None =>
+ // when recovering.. we are more lax due recovery starting
+ // in the middle of a stream of in progress batches
+ assert(!recovering)
}
}
}
- private def begin(batch: Int, location: Location): Unit = {
- assert( batches.get(batch).isEmpty )
- batches.put(batch, ListBuffer())
+
+ private def index(tx: Transaction, update: TypeCreatable, location: Location): Unit = {
+
+ object Process extends TxHelper(tx) {
+ import JavaConversions._
+
+ def apply(x: AddMessage.Getter): Unit = {
+
+ val key = x.getMessageKey()
+ if (key > databaseRootRecord.getLastMessageKey) {
+ databaseRootRecord.setLastMessageKey(key)
+ }
+
+ val prevLocation = messageKeyIndex.put(key, location)
+ if (prevLocation != null) {
+ // Message existed.. undo the index update we just did. Chances
+ // are it's a transaction replay.
+ messageKeyIndex.put(key, prevLocation)
+ if (location == prevLocation) {
+ warn("Message replay detected for: %d", key)
+ } else {
+ error("Message replay with different location for: %d", key)
+ }
+ } else {
+ val fileId:java.lang.Integer = location.getDataFileId()
+ addAndGet(dataFileRefIndex, fileId, 1)
+ }
+ }
+
+ def removeMessage(key:Long) = {
+ val location = messageKeyIndex.remove(key)
+ if (location != null) {
+ val fileId:java.lang.Integer = location.getDataFileId()
+ addAndGet(dataFileRefIndex, fileId, -1)
+ } else {
+ error("Cannot remove message, it did not exist: %d", key)
+ }
+ }
+
+ def apply(x: AddQueue.Getter): Unit = {
+ if (queueIndex.get(x.getKey) == null) {
+ val queueRecord = new QueueRootRecord.Bean
+ queueRecord.setEntryIndexPage(alloc(QUEUE_ENTRY_INDEX_FACTORY))
+ queueRecord.setTrackingIndexPage(alloc(QUEUE_TRACKING_INDEX_FACTORY))
+ queueRecord.setInfo(x)
+ queueIndex.put(x.getKey, queueRecord.freeze)
+ }
+ }
+
+ def apply(x: RemoveQueue.Getter): Unit = {
+ val queueRecord = queueIndex.remove(x.getKey)
+ if (queueRecord != null) {
+ queueEntryIndex(queueRecord).destroy
+ queueTrackingIndex(queueRecord).destroy
+ }
+ }
+
+ def apply(x: AddQueueEntry.Getter): Unit = {
+ val queueKey = x.getQueueKey
+ val queueRecord = queueIndex.get(queueKey)
+ if (queueRecord != null) {
+ val trackingIndex = queueTrackingIndex(queueRecord)
+ val entryIndex = queueEntryIndex(queueRecord)
+
+ // a message can only appear once in a queue (for now).. perhaps we should
+ // relax this constraint.
+ val messageKey = x.getMessageKey
+ val queueSeq = x.getQueueSeq
+
+ val existing = trackingIndex.put(messageKey, queueSeq)
+ if (existing == null) {
+ val previous = entryIndex.put(queueSeq, x.freeze)
+ if (previous == null) {
+
+ val queueRecordUpdate = queueRecord.copy
+ queueRecordUpdate.setCount(queueRecord.getCount + 1)
+ queueRecordUpdate.setSize(queueRecord.getSize + x.getSize)
+ queueIndex.put(queueKey, queueRecordUpdate.freeze)
+
+ addAndGet(messageRefsIndex, new java.lang.Long(messageKey), 1)
+ } else {
+ error("Duplicate queue entry seq %d", x.getQueueSeq)
+ }
+ } else {
+ error("Duplicate queue entry message %d", x.getMessageKey)
+ }
+ } else {
+ error("Queue not found: %d", x.getQueueKey)
+ }
+ }
+
+ def apply(x: RemoveQueueEntry.Getter): Unit = {
+ val queueKey = x.getQueueKey
+ val queueRecord = queueIndex.get(queueKey)
+ if (queueRecord != null) {
+ val trackingIndex = queueTrackingIndex(queueRecord)
+ val entryIndex = queueEntryIndex(queueRecord)
+
+ val queueSeq = x.getQueueSeq
+ val queueEntry = entryIndex.remove(queueSeq)
+ if (queueEntry != null) {
+ val messageKey = queueEntry.getMessageKey
+ val existing = trackingIndex.remove(messageKey)
+ if (existing == null) {
+ error("Tracking entry not found for message %d", queueEntry.getMessageKey)
+ }
+ if( addAndGet(messageRefsIndex, new java.lang.Long(messageKey), -1) == 0 ) {
+ // message is no long referenced.. we can remove it..
+ removeMessage(messageKey)
+ }
+ } else {
+ error("Queue entry not found for seq %d", x.getQueueSeq)
+ }
+ } else {
+ error("Queue not found: %d", x.getQueueKey)
+ }
+ }
+
+ def apply(x: Purge.Getter): Unit = {
+
+ // Remove all the queues...
+ queueIndex.iterator.map {
+ entry =>
+ entry.getKey
+ }.foreach {
+ key =>
+ apply(new RemoveQueue.Bean().setKey(key.intValue))
+ }
+
+ // Remove stored messages...
+ messageKeyIndex.clear
+ messageRefsIndex.clear
+ dataFileRefIndex.clear
+ databaseRootRecord.setLastMessageKey(0)
+
+ cleanup(tx);
+ info("Store purged.");
+ }
+
+ def apply(x: AddTrace.Getter): Unit = {
+ // trace messages are informational messages in the journal used to log
+ // historical info about store state. They don't update the indexes.
+ }
+ }
+
+ update match {
+ case x: AddMessage.Getter => Process(x)
+ case x: AddQueueEntry.Getter => Process(x)
+ case x: RemoveQueueEntry.Getter => Process(x)
+
+ case x: AddQueue.Getter => Process(x)
+ case x: RemoveQueue.Getter => Process(x)
+
+ case x: AddTrace.Getter => Process(x)
+ case x: Purge.Getter => Process(x)
+
+ case x: AddSubscription.Getter =>
+ case x: RemoveSubscription.Getter =>
+
+ case x: AddMap.Getter =>
+ case x: RemoveMap.Getter =>
+ case x: PutMapEntry.Getter =>
+ case x: RemoveMapEntry.Getter =>
+
+ case x: OpenStream.Getter =>
+ case x: WriteStream.Getter =>
+ case x: CloseStream.Getter =>
+ case x: RemoveStream.Getter =>
+ }
}
- private def commit(batch: Int, onComplete: Runnable, location: Location): Unit = {
- // apply all the updates in the batch as a single unit of work.
- withTx {tx =>
- batches.get(batch) match {
- case Some(updates) =>
- updates.foreach {update =>
- store(tx, update.update, update.location)
+
+ /////////////////////////////////////////////////////////////////////
+ //
+ // Periodic Maintance
+ //
+ /////////////////////////////////////////////////////////////////////
+
+ def schedualFlush(): Unit = {
+ def try_flush() = {
+ if (hawtDBStore.serviceState.isStarted) {
+ hawtDBStore.executor_pool {
+ flush
+ schedualFlush
+ }
+ }
+ }
+ dispatchQueue.dispatchAfter(config.indexFlushInterval, TimeUnit.MILLISECONDS, ^ {try_flush})
+ }
+
+ def flush() = {
+ val start = System.currentTimeMillis()
+ pageFile.flush
+ val end = System.currentTimeMillis()
+ if (end - start > 1000) {
+ warn("Index flush took %,.3f seconds" + ((end - start) / 1000.0f))
+ }
+ }
+
+ def schedualCleanup(): Unit = {
+ def try_cleanup() = {
+ if (hawtDBStore.serviceState.isStarted) {
+ hawtDBStore.executor_pool {
+ withTx {tx =>
+ cleanup(tx)
}
- if (onComplete != null) {
- onComplete.run
+ schedualCleanup
+ }
+ }
+ }
+ dispatchQueue.dispatchAfter(config.cleanupInterval, TimeUnit.MILLISECONDS, ^ {try_cleanup})
+ }
+
+ /**
+ * @param tx
+ * @throws IOException
+ */
+ def cleanup(tx:Transaction) = {
+ val helper = new TxHelper(tx)
+ import JavaConversions._
+ import helper._
+
+ debug("Cleanup started.")
+ val gcCandidateSet = new TreeSet[Integer](journal.getFileMap().keySet())
+
+ // Don't cleanup locked data files
+ if (lockedDatatFiles != null) {
+ gcCandidateSet.removeAll(lockedDatatFiles)
+ }
+
+ // Don't GC files that we will need for recovery..
+ val upto = if (databaseRootRecord.hasFirstInProgressBatch) {
+ Some(databaseRootRecord.getFirstInProgressBatch.getDataFileId)
+ } else {
+ if (databaseRootRecord.hasLastUpdateLocation) {
+ Some(databaseRootRecord.getLastUpdateLocation.getDataFileId)
+ } else {
+ None
+ }
+ }
+
+ upto match {
+ case Some(dataFile) =>
+ var done = false
+ while (!done && !gcCandidateSet.isEmpty()) {
+ val last = gcCandidateSet.last()
+ if (last.intValue >= dataFile) {
+ gcCandidateSet.remove(last)
+ } else {
+ done = true
}
- case None =>
+ }
+
+ case None =>
+ }
+
+ if (!gcCandidateSet.isEmpty() ) {
+ dataFileRefIndex.iterator.foreach { entry =>
+ gcCandidateSet.remove(entry.getKey)
+ }
+ if (!gcCandidateSet.isEmpty()) {
+ debug("Cleanup removing the data files: %s", gcCandidateSet)
+ journal.removeDataFiles(gcCandidateSet)
+ }
+ }
+ debug("Cleanup done.")
+ }
+
+ /////////////////////////////////////////////////////////////////////
+ //
+ // Helper Methods / Classes
+ //
+ /////////////////////////////////////////////////////////////////////
+
+ private case class Update(update: TypeCreatable, location: Location)
+
+ private class TxHelper(private val _tx: Transaction) {
+ lazy val queueIndex = QUEUE_INDEX_FACTORY.open(_tx, databaseRootRecord.getDataFileRefIndexPage)
+ lazy val dataFileRefIndex = DATA_FILE_REF_INDEX_FACTORY.open(_tx, databaseRootRecord.getDataFileRefIndexPage)
+ lazy val messageKeyIndex = MESSAGE_KEY_INDEX_FACTORY.open(_tx, databaseRootRecord.getMessageKeyIndexPage)
+ lazy val messageRefsIndex = MESSAGE_REFS_INDEX_FACTORY.open(_tx, databaseRootRecord.getMessageRefsIndexPage)
+ lazy val subscriptionIndex = SUBSCRIPTIONS_INDEX_FACTORY.open(_tx, databaseRootRecord.getSubscriptionIndexPage)
+
+ def addAndGet[K](index:SortedIndex[K, Integer], key:K, amount:Int):Int = {
+ var counter = index.get(key)
+ if( counter == null ) {
+ if( amount!=0 ) {
+ index.put(key, amount)
+ }
+ amount
+ } else {
+ val update = counter.intValue + amount
+ if( update == 0 ) {
+ index.remove(key)
+ } else {
+ index.put(key, update)
+ }
+ update
}
}
+
+ def queueEntryIndex(root: QueueRootRecord.Getter) = QUEUE_ENTRY_INDEX_FACTORY.open(_tx, root.getEntryIndexPage)
+
+ def queueTrackingIndex(root: QueueRootRecord.Getter) = QUEUE_TRACKING_INDEX_FACTORY.open(_tx, root.getTrackingIndexPage)
+
+ def alloc(factory: IndexFactory[_, _]) = {
+ val rc = _tx.alloc
+ factory.create(_tx, rc)
+ rc
+ }
}
- private def store(tx: Transaction, update: TypeCreatable, location: Location): Unit = {
+ private def withTx[T](func: (Transaction) => T): T = {
+ val tx = pageFile.tx
+ var ok = false
+ try {
+ val rc = func(tx)
+ ok = true
+ rc
+ } finally {
+ if (ok) {
+ tx.commit
+ } else {
+ tx.rollback
+ }
+ }
+ }
+ // Gets the next batch id.. after a while we may wrap around
+ // start producing batch ids from zero
+ val next_batch_id = {
+ var rc = next_batch_counter.getAndIncrement
+ while (rc < 0) {
+ // We just wrapped around.. reset the counter to 0
+ // Use a CAS operation so that only 1 thread resets the counter
+ next_batch_counter.compareAndSet(rc + 1, 0)
+ rc = next_batch_counter.getAndIncrement
+ }
+ rc
}
+ private def isAfterLastUpdateLocation(location: Location) = {
+ val lastUpdate: Location = databaseRootRecord.getLastUpdateLocation
+ lastUpdate.compareTo(location) < 0
+ }
+ private def updateLocations(tx: Transaction, location: Location): Unit = {
+ databaseRootRecord.setLastUpdateLocation(location)
+ if (batches.isEmpty) {
+ databaseRootRecord.clearFirstInProgressBatch
+ } else {
+ databaseRootRecord.setFirstInProgressBatch(batches.head._2._1)
+ }
+ tx.put(DATABASE_ROOT_RECORD_ACCESSOR, 0, databaseRootRecord.freeze)
+ databaseRootRecord = databaseRootRecord.copy
+ }
}
\ No newline at end of file
Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala?rev=961128&r1=961127&r2=961128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala Wed Jul 7 04:06:47 2010
@@ -74,9 +74,9 @@ class HawtDBStore extends Store with Bas
var next_queue_key = new AtomicLong(0)
var next_msg_key = new AtomicLong(0)
- protected var executor_pool:ExecutorService = _
+ var executor_pool:ExecutorService = _
var config:HawtDBStoreDTO = defaultConfig
- val client = new HawtDBClient
+ val client = new HawtDBClient(this)
def configure(config: StoreDTO, reporter: Reporter) = configure(config.asInstanceOf[HawtDBStoreDTO], reporter)
Added: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Helpers.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Helpers.scala?rev=961128&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Helpers.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Helpers.scala Wed Jul 7 04:06:47 2010
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store.hawtdb
+
+import model._
+import model.Type.TypeCreatable
+import org.apache.activemq.apollo.store.{MessageRecord, QueueRecord, QueueEntryRecord}
+import org.fusesource.hawtbuf.codec._
+import org.fusesource.hawtbuf.{UTF8Buffer, AsciiBuffer, Buffer}
+import java.io.{IOException, DataInput, DataOutput}
+import org.fusesource.hawtdb.internal.journal.{LocationCodec, Location}
+import org.fusesource.hawtdb.api._
+import org.fusesource.hawtbuf.proto.{MessageBuffer, PBMessage}
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object Helpers {
+
+ val QUEUE_RECORD_CODEC = new VariableCodec[QueueEntryRecord]() {
+ def decode(dataIn: DataInput): QueueEntryRecord = {
+ val rc = new QueueEntryRecord();
+ rc.queueKey = dataIn.readLong();
+ rc.messageKey = dataIn.readLong();
+ rc.size = dataIn.readInt();
+ // if (dataIn.readBoolean()) {
+ // rc.setTte(dataIn.readLong());
+ // }
+ rc.redeliveries = dataIn.readShort();
+ if (dataIn.readBoolean()) {
+ rc.attachment = BUFFER_CODEC.decode(dataIn);
+ }
+ return rc;
+ }
+
+ def encode(value: QueueEntryRecord, dataOut: DataOutput) = {
+ dataOut.writeLong(value.queueKey);
+ dataOut.writeLong(value.messageKey);
+ dataOut.writeInt(value.size);
+ // if (value.getTte() >= 0) {
+ // dataOut.writeBoolean(true);
+ // dataOut.writeLong(value.getTte());
+ // } else {
+ // dataOut.writeBoolean(false);
+ // }
+ dataOut.writeShort(value.redeliveries);
+ if (value.attachment != null) {
+ dataOut.writeBoolean(true);
+ BUFFER_CODEC.encode(value.attachment, dataOut);
+ } else {
+ dataOut.writeBoolean(false);
+ }
+ }
+
+ def estimatedSize(value: QueueEntryRecord) = throw new UnsupportedOperationException()
+ }
+
+ val QUEUE_DESCRIPTOR_CODEC = new VariableCodec[QueueRecord]() {
+ def decode(dataIn: DataInput): QueueRecord = {
+ val record = new QueueRecord();
+ record.queueType = ASCII_BUFFER_CODEC.decode(dataIn);
+ record.name = ASCII_BUFFER_CODEC.decode(dataIn);
+ // if (dataIn.readBoolean()) {
+ // record.parent = ASCII_BUFFER_MARSHALLER.readPayload(dataIn)
+ // record.setPartitionId(dataIn.readInt());
+ // }
+ return record;
+ }
+
+ def encode(value: QueueRecord, dataOut: DataOutput) = {
+ ASCII_BUFFER_CODEC.encode(value.queueType, dataOut);
+ ASCII_BUFFER_CODEC.encode(value.name, dataOut);
+ // if (value.parent != null) {
+ // dataOut.writeBoolean(true);
+ // ASCII_BUFFER_MARSHALLER.writePayload(value.parent, dataOut);
+ // dataOut.writeInt(value.getPartitionKey());
+ // } else {
+ // dataOut.writeBoolean(false);
+ // }
+ }
+
+ def estimatedSize(value: QueueRecord) = throw new UnsupportedOperationException()
+ };
+
+ val ASCII_BUFFER_CODEC = AsciiBufferCodec.INSTANCE;
+ val BUFFER_CODEC = BufferCodec.INSTANCE;
+
+
+ implicit def toMessageRecord(pb: AddMessage.Getter): MessageRecord = {
+ val rc = new MessageRecord
+ rc.protocol = pb.getProtocol
+ rc.size = pb.getSize
+ rc.value = pb.getValue
+ rc.stream = pb.getStreamKey
+ rc.expiration = pb.getExpiration
+ rc
+ }
+
+ implicit def fromMessageRecord(v: MessageRecord): AddMessage.Bean = {
+ val pb = new AddMessage.Bean
+ pb.setProtocol(v.protocol)
+ pb.setSize(v.size)
+ pb.setValue(v.value)
+ pb.setStreamKey(v.stream)
+ pb.setExpiration(v.expiration)
+ pb
+ }
+
+ implicit def toQueueEntryRecord(pb: AddQueueEntry.Getter): QueueEntryRecord = {
+ val rc = new QueueEntryRecord
+ rc.queueKey = pb.getQueueKey
+ rc.queueSeq = pb.getQueueSeq
+ rc.messageKey = pb.getMessageKey
+ rc.attachment = pb.getAttachment
+ rc.size = pb.getSize
+ rc.redeliveries = pb.getRedeliveries.toShort
+ rc
+ }
+
+ implicit def fromQueueEntryRecord(v: QueueEntryRecord): AddQueueEntry.Bean = {
+ val pb = new AddQueueEntry.Bean
+ pb.setQueueKey(v.queueKey)
+ pb.setQueueSeq(v.queueSeq)
+ pb.setMessageKey(v.messageKey)
+ pb.setAttachment(v.attachment)
+ pb.setSize(v.size)
+ pb.setRedeliveries(v.redeliveries)
+ pb
+ }
+
+ implicit def toLocation(value: Long): Location = {
+ val temp = new Buffer(8)
+ val editor = temp.bigEndianEditor
+ editor.writeLong(value)
+ temp.reset
+ new Location(editor.readInt(), editor.readInt())
+ }
+
+ implicit def fromLocation(value: Location):Long = {
+ val temp = new Buffer(8)
+ val editor = temp.bigEndianEditor
+ editor.writeInt(value.getDataFileId)
+ editor.writeInt(value.getOffset)
+ temp.reset
+ editor.readLong
+ }
+
+ implicit def toAsciiBuffer(value:String):AsciiBuffer = new AsciiBuffer(value)
+ implicit def toUTF8Buffer(value:String):UTF8Buffer = new UTF8Buffer(value)
+
+ type PB = PBMessage[_ <: PBMessage[_, _], _ <: MessageBuffer[_, _]]
+ implicit def toPBMessage(value: TypeCreatable): PB = value.asInstanceOf[PB]
+
+
+ val DATABASE_ROOT_RECORD_ACCESSOR = new CodecPagedAccessor[DatabaseRootRecord.Buffer](DatabaseRootRecord.FRAMED_CODEC);
+
+ def decode(location: Location, updateType: Int, value: Buffer) = {
+ val t = Type.valueOf(updateType);
+ if (t == null) {
+ throw new IOException("Could not load journal record. Invalid type at location: " + location);
+ }
+ t.parseUnframed(value).asInstanceOf[TypeCreatable]
+ }
+
+ //
+ // Index factories...
+ //
+
+ import java.{lang => jl}
+
+ // maps message key -> Journal Location
+ val MESSAGE_KEY_INDEX_FACTORY = new BTreeIndexFactory[jl.Long, Location]();
+ MESSAGE_KEY_INDEX_FACTORY.setKeyCodec(LongCodec.INSTANCE);
+ MESSAGE_KEY_INDEX_FACTORY.setValueCodec(LocationCodec.INSTANCE);
+ MESSAGE_KEY_INDEX_FACTORY.setDeferredEncoding(true);
+
+ // maps Journal Data File Id -> Ref Counter
+ val DATA_FILE_REF_INDEX_FACTORY = new BTreeIndexFactory[jl.Integer, jl.Integer]();
+ DATA_FILE_REF_INDEX_FACTORY.setKeyCodec(VarIntegerCodec.INSTANCE);
+ DATA_FILE_REF_INDEX_FACTORY.setValueCodec(VarIntegerCodec.INSTANCE);
+ DATA_FILE_REF_INDEX_FACTORY.setDeferredEncoding(true);
+
+ // maps message key -> Ref Counter
+ val MESSAGE_REFS_INDEX_FACTORY = new BTreeIndexFactory[jl.Long, jl.Integer]();
+ MESSAGE_REFS_INDEX_FACTORY.setKeyCodec(LongCodec.INSTANCE);
+ MESSAGE_REFS_INDEX_FACTORY.setValueCodec(VarIntegerCodec.INSTANCE);
+ MESSAGE_REFS_INDEX_FACTORY.setDeferredEncoding(true);
+
+ // maps queue key -> QueueRootRecord
+ val QUEUE_INDEX_FACTORY = new BTreeIndexFactory[jl.Long, QueueRootRecord.Buffer]();
+ QUEUE_INDEX_FACTORY.setKeyCodec(VarLongCodec.INSTANCE);
+ QUEUE_INDEX_FACTORY.setValueCodec(QueueRootRecord.FRAMED_CODEC);
+ QUEUE_INDEX_FACTORY.setDeferredEncoding(true);
+
+ // maps queue seq -> AddQueueEntry
+ val QUEUE_ENTRY_INDEX_FACTORY = new BTreeIndexFactory[jl.Long, AddQueueEntry.Buffer]();
+ QUEUE_ENTRY_INDEX_FACTORY.setKeyCodec(VarLongCodec.INSTANCE);
+ QUEUE_ENTRY_INDEX_FACTORY.setValueCodec(AddQueueEntry.FRAMED_CODEC);
+ QUEUE_ENTRY_INDEX_FACTORY.setDeferredEncoding(true);
+
+ // maps message key -> queue seq
+ val QUEUE_TRACKING_INDEX_FACTORY = new BTreeIndexFactory[jl.Long, jl.Long]();
+ QUEUE_TRACKING_INDEX_FACTORY.setKeyCodec(LongCodec.INSTANCE);
+ QUEUE_TRACKING_INDEX_FACTORY.setValueCodec(VarLongCodec.INSTANCE);
+ QUEUE_TRACKING_INDEX_FACTORY.setDeferredEncoding(true);
+
+ val SUBSCRIPTIONS_INDEX_FACTORY = new BTreeIndexFactory[AsciiBuffer, AddSubscription.Buffer]();
+ SUBSCRIPTIONS_INDEX_FACTORY.setKeyCodec(AsciiBufferCodec.INSTANCE);
+ SUBSCRIPTIONS_INDEX_FACTORY.setValueCodec(AddSubscription.FRAMED_CODEC);
+ SUBSCRIPTIONS_INDEX_FACTORY.setDeferredEncoding(true);
+
+}
\ No newline at end of file
Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/RootEntity.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/RootEntity.scala?rev=961128&r1=961127&r2=961128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/RootEntity.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/RootEntity.scala Wed Jul 7 04:06:47 2010
@@ -16,7 +16,6 @@
*/
package org.apache.activemq.broker.store.hawtdb
-import model.RootRecord
import org.fusesource.hawtbuf.{Buffer, AsciiBuffer}
import org.fusesource.hawtdb.api._
import org.fusesource.hawtbuf.proto.{MessageBuffer, PBMessageFactory, PBMessage}
@@ -37,37 +36,7 @@ class DestinationEntity {
//}
object RootEntity {
-// val messageKeyIndexFactory = new BTreeIndexFactory[Long, Long]();
-// val locationIndexFactory = new BTreeIndexFactory[Integer, Long]();
-// val messageRefsIndexFactory = new BTreeIndexFactory[Long, Long]();
-// val destinationIndexFactory = new BTreeIndexFactory[Long, DestinationEntity]();
-// val subscriptionIndexFactory = new BTreeIndexFactory[AsciiBuffer, Buffer]();
-// val mapIndexFactory = new BTreeIndexFactory[AsciiBuffer, Integer]();
-// val mapInstanceIndexFactory = new BTreeIndexFactory[AsciiBuffer, Buffer]();
-//
-// messageKeyIndexFactory.setKeyCodec(LongCodec.INSTANCE);
-// messageKeyIndexFactory.setValueCodec(LongCodec.INSTANCE);
-// messageKeyIndexFactory.setDeferredEncoding(true);
-//
-// locationIndexFactory.setKeyCodec(IntegerCodec.INSTANCE);
-// locationIndexFactory.setValueCodec(LongCodec.INSTANCE);
-// locationIndexFactory.setDeferredEncoding(true);
-//
-// messageRefsIndexFactory.setKeyCodec(LongCodec.INSTANCE);
-// messageRefsIndexFactory.setValueCodec(LongCodec.INSTANCE);
-// messageRefsIndexFactory.setDeferredEncoding(true);
-//
-// destinationIndexFactory.setKeyCodec(LongCodec.INSTANCE);
-// destinationIndexFactory.setValueCodec(DestinationEntity.MARSHALLER);
-// destinationIndexFactory.setDeferredEncoding(true);
-//
-// subscriptionIndexFactory.setKeyCodec(Codecs.ASCII_BUFFER_CODEC);
-// subscriptionIndexFactory.setValueCodec(Codecs.BUFFER_CODEC);
-// subscriptionIndexFactory.setDeferredEncoding(true);
-//
-// mapIndexFactory.setKeyCodec(Codecs.ASCII_BUFFER_CODEC);
-// mapIndexFactory.setValueCodec(IntegerCodec.INSTANCE);
-// mapIndexFactory.setDeferredEncoding(true);
+
//
// val DATA_ENCODER_DECODER = PBEncoderDecoder(RootRecord.FACTORY)
Added: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/HawtDBManagerBenchmark.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/HawtDBManagerBenchmark.java?rev=961128&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/HawtDBManagerBenchmark.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/HawtDBManagerBenchmark.java Wed Jul 7 04:06:47 2010
@@ -0,0 +1,405 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store.hawtdb;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+import org.apache.activemq.apollo.store.MessageRecord;
+import org.apache.activemq.apollo.store.QueueRecord;
+import org.apache.activemq.apollo.store.QueueStatus;
+import org.apache.activemq.apollo.store.QueueEntryRecord;
+import org.apache.activemq.metric.MetricAggregator;
+import org.apache.activemq.metric.MetricCounter;
+import org.apache.activemq.metric.Period;
+import org.fusesource.hawtbuf.AsciiBuffer;
+import org.fusesource.hawtbuf.Buffer;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class HawtDBManagerBenchmark extends TestCase {
+
+// private static int PERFORMANCE_SAMPLES = 50;
+// private static boolean SYNC_TO_DISK = true;
+// private static final boolean USE_SHARED_WRITER = true;
+//
+// private HawtDBManager store;
+// private QueueRecord queueId;
+// private AtomicLong queueKey = new AtomicLong(0);
+//
+// protected MetricAggregator totalProducerRate = new MetricAggregator().name("Aggregate Producer Rate").unit("items");
+// protected MetricAggregator totalConsumerRate = new MetricAggregator().name("Aggregate Consumer Rate").unit("items");
+//
+// protected ArrayList<Consumer> consumers = new ArrayList<Consumer>();
+// protected ArrayList<Producer> producers = new ArrayList<Producer>();
+//
+// protected HawtDBManager createStore() {
+// HawtDBManager rc = new HawtDBManager();
+// rc.setStoreDirectory(new File("target/test-data/kahadb-store-performance"));
+// rc.setDeleteAllMessages(true);
+// return rc;
+// }
+//
+// private SharedWriter writer = null;
+//
+// private Semaphore enqueuePermits;
+// private Semaphore dequeuePermits;
+//
+// @Override
+// protected void setUp() throws Exception {
+// store = createStore();
+// //store.setDeleteAllMessages(false);
+// store.start();
+//
+// if (USE_SHARED_WRITER) {
+// writer = new SharedWriter();
+// writer.start();
+// }
+//
+// enqueuePermits = new Semaphore(20000000);
+// dequeuePermits = new Semaphore(0);
+//
+// queueId = new QueueRecord();
+// queueId.name = new AsciiBuffer("test");
+// store.execute(new VoidCallback<Exception>() {
+// @Override
+// public void run(HawtDBSession session) throws Exception {
+// session.queueAdd(queueId);
+// }
+// }, null);
+//
+// store.execute(new VoidCallback<Exception>() {
+// @Override
+// public void run(HawtDBSession session) throws Exception {
+// Iterator<QueueStatus> qqrs = session.queueList(queueId, 1);
+// Assert.assertTrue(qqrs.hasNext());
+// QueueStatus qqr = qqrs.next();
+// if(qqr.size > 0)
+// {
+// queueKey.set(qqr.last + 1);
+// System.out.println("Recovered queue: " + qqr.record.name + " with " + qqr.count + " messages");
+// }
+// }
+// }, null);
+// }
+//
+// @Override
+// protected void tearDown() throws Exception {
+// for (Consumer c : consumers) {
+// c.stop();
+// }
+// consumers.clear();
+// for (Producer p : producers) {
+// p.stop();
+// }
+// producers.clear();
+//
+// if (writer != null) {
+// writer.stop();
+// }
+//
+// if (store != null) {
+// store.stop();
+// }
+// }
+//
+// class SharedWriter implements Runnable {
+// LinkedBlockingQueue<SharedQueueOp> queue = new LinkedBlockingQueue<SharedQueueOp>(1000);
+// private Thread thread;
+// private AtomicBoolean stopped = new AtomicBoolean();
+//
+// public void start() {
+// thread = new Thread(this, "Writer");
+// thread.start();
+// }
+//
+// public void stop() throws InterruptedException {
+// stopped.set(true);
+//
+// //Add an op to trigger shutdown:
+// SharedQueueOp op = new SharedQueueOp() {
+// public void run() {
+// }
+// };
+// op.op = new VoidCallback<Exception>() {
+//
+// @Override
+// public void run(HawtDBSession session) throws Exception {
+// // TODO Auto-generated method stub
+// }
+// };
+//
+// queue.put(op);
+// thread.join();
+// }
+//
+// public void run() {
+// HawtDBSession session = store.getSession();
+// try {
+// LinkedList<Runnable> processed = new LinkedList<Runnable>();
+// while (!stopped.get()) {
+// SharedQueueOp op = queue.take();
+// session.acquireLock();
+// int ops = 0;
+// while (op != null && ops < 1000) {
+// op.op.execute(session);
+// processed.add(op);
+// op = queue.poll();
+// ops++;
+// }
+//
+// session.commit();
+// session.releaseLock();
+//
+// if (SYNC_TO_DISK) {
+// store.flush();
+// }
+//
+// for (Runnable r : processed) {
+// r.run();
+// }
+// processed.clear();
+// }
+//
+// } catch (InterruptedException e) {
+// if (!stopped.get()) {
+// e.printStackTrace();
+// }
+// return;
+// } catch (Exception e) {
+// e.printStackTrace();
+// return;
+// }
+// }
+//
+// public void addOp(SharedQueueOp op) throws InterruptedException {
+// queue.put(op);
+// }
+// }
+//
+// abstract class SharedQueueOp implements Runnable {
+// VoidCallback<Exception> op;
+// }
+//
+// class Producer implements Runnable {
+// private Thread thread;
+// private AtomicBoolean stopped = new AtomicBoolean();
+// private String name;
+// protected final MetricCounter rate = new MetricCounter();
+// private long sleep;
+//
+// public Producer(String name) {
+// this.name = name;
+// }
+//
+// public void start() {
+// rate.name("Producer " + name + " Rate");
+// totalProducerRate.add(rate);
+// thread = new Thread(this, "Producer" + name);
+// thread.start();
+// }
+//
+// public void stop() throws InterruptedException {
+// stopped.set(true);
+// while (enqueuePermits.hasQueuedThreads()) {
+// enqueuePermits.release();
+// }
+// thread.join();
+// }
+//
+// public void run() {
+// try {
+// Buffer buffer = new Buffer(new byte[1024]);
+// for (long i = 0; !stopped.get(); i++) {
+//
+// enqueuePermits.acquire();
+//
+// final MessageRecord messageRecord = new MessageRecord();
+// messageRecord.key = store.allocateStoreTracking();
+// messageRecord.protocol = new AsciiBuffer("encoding");
+// messageRecord.value = buffer;
+// messageRecord.size = buffer.getLength();
+//
+// SharedQueueOp op = new SharedQueueOp() {
+// public void run() {
+// rate.increment();
+// }
+// };
+//
+// op.op = new VoidCallback<Exception>() {
+// @Override
+// public void run(HawtDBSession session) throws Exception {
+// session.messageAdd(messageRecord);
+// QueueEntryRecord queueEntryRecord = new QueueEntryRecord();
+// queueEntryRecord.messageKey = messageRecord.key;
+// queueEntryRecord.queueKey = queueKey.incrementAndGet();
+// queueEntryRecord.size = messageRecord.size;
+// session.queueAddMessage(queueId, queueEntryRecord);
+// dequeuePermits.release();
+// }
+// };
+//
+// if (!USE_SHARED_WRITER) {
+// store.execute(op.op, op);
+//
+// if (SYNC_TO_DISK) {
+// store.flush();
+// }
+//
+// } else {
+// writer.addOp(op);
+// }
+//
+// if (sleep > 0) {
+// Thread.sleep(sleep);
+// }
+// }
+// } catch (InterruptedException e) {
+// if (!stopped.get()) {
+// e.printStackTrace();
+// }
+// return;
+// } catch (Exception e) {
+// e.printStackTrace();
+// }
+// }
+// }
+//
+// class Consumer implements Runnable {
+// private Thread thread;
+// private AtomicBoolean stopped = new AtomicBoolean();
+// protected final MetricCounter rate = new MetricCounter();
+// private String name;
+// private final Semaphore queryWait = new Semaphore(0);
+//
+// public Consumer(String name) {
+// this.name = name;
+// }
+//
+// public void start() {
+// rate.name("Consumer " + name + " Rate");
+// totalConsumerRate.add(rate);
+// thread = new Thread(this, "Consumer " + name);
+// thread.start();
+// }
+//
+// public void stop() throws InterruptedException {
+// stopped.set(true);
+// queryWait.release();
+// thread.join();
+// }
+//
+// public void run() {
+// try {
+// while (!stopped.get()) {
+// final ArrayList<MessageRecord> records = new ArrayList<MessageRecord>(1000);
+// SharedQueueOp op = new SharedQueueOp() {
+// public void run() {
+// rate.increment(records.size());
+// enqueuePermits.release(records.size());
+// queryWait.release();
+// }
+// };
+//
+// op.op = new VoidCallback<Exception>() {
+// @Override
+// public void run(HawtDBSession session) throws Exception {
+// Iterator<QueueEntryRecord> queueRecords = session.queueListMessagesQueue(queueId, 0L, -1L, 1000);
+// for (Iterator<QueueEntryRecord> iterator = queueRecords; iterator.hasNext();) {
+// QueueEntryRecord r = iterator.next();
+// records.add(session.messageGetRecord(r.messageKey));
+// session.queueRemoveMessage(queueId, r.queueKey);
+// }
+// }
+// };
+//
+// if (!USE_SHARED_WRITER) {
+// store.execute(op.op, op);
+// if (SYNC_TO_DISK) {
+// store.flush();
+// }
+// } else {
+// writer.addOp(op);
+// }
+//
+// dequeuePermits.acquire();
+// records.clear();
+// }
+// } catch (InterruptedException e) {
+// if (!stopped.get()) {
+// e.printStackTrace();
+// }
+// return;
+// } catch (Exception e) {
+// e.printStackTrace();
+// }
+// }
+// }
+//
+// public void test1_1_0() throws Exception {
+// startProducers(1);
+// reportRates();
+// }
+//
+//
+// public void test1_1_1() throws Exception {
+// startProducers(1);
+// startConsumers(1);
+// reportRates();
+// }
+//
+// public void test10_1_1() throws Exception {
+// startProducers(10);
+// startConsumers(1);
+// reportRates();
+// }
+//
+// private void startProducers(int count) {
+// for (int i = 0; i < count; i++) {
+// Producer p = new Producer("" + (i + 1));
+// producers.add(p);
+// p.start();
+// }
+// }
+//
+// private void startConsumers(int count) {
+// for (int i = 0; i < count; i++) {
+// Consumer c = new Consumer("" + (i + 1));
+// consumers.add(c);
+// c.start();
+// }
+// }
+//
+// private void reportRates() throws InterruptedException {
+// System.out.println("Checking rates for test: " + getName());
+// for (int i = 0; i < PERFORMANCE_SAMPLES; i++) {
+// Period p = new Period();
+// Thread.sleep(1000 * 5);
+// System.out.println(totalProducerRate.getRateSummary(p));
+// System.out.println(totalConsumerRate.getRateSummary(p));
+// totalProducerRate.reset();
+// totalConsumerRate.reset();
+// }
+// }
+
+}
\ No newline at end of file