You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:11:00 UTC
svn commit: r961147 - in /activemq/sandbox/activemq-apollo-actor:
activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/
activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/
activemq-store/src/test/scala/org/apache/activem...
Author: chirino
Date: Wed Jul 7 04:10:59 2010
New Revision: 961147
URL: http://svn.apache.org/viewvc?rev=961147&view=rev
Log:
- Queue: Better prefetch handling.
- HawtDB Store: Better recovery and root record handling
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Helpers.scala
activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreBenchmarkSupport.scala
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961147&r1=961146&r2=961147&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul 7 04:10:59 2010
@@ -197,21 +197,14 @@ class Queue(val host: VirtualHost, val d
queueDelivery.storeBatch.enqueue(entry.createQueueEntryRecord)
}
-// var haveQuickConsumer = false
-// fastSubs.foreach{ sub=>
-// if( sub.pos.seq < entry.seq ) {
-// haveQuickConsumer = true
-// }
-// }
- def haveQuickConsumer = fastSubs.find( sub=> !sub.slow && sub.pos.seq <= entry.seq ).isDefined
+ def haveQuickConsumer = fastSubs.find( sub=> sub.pos.seq <= entry.seq ).isDefined
var dispatched = false
if( entry.prefetched > 0 || haveQuickConsumer ) {
// try to dispatch it directly...
entry.dispatch
} else {
-// println("flush: "+delivery.message.getProperty("color"))
// we flush the entry out right away if it looks
// it wont be needed.
entry.flush
@@ -228,22 +221,26 @@ class Queue(val host: VirtualHost, val d
}
}
+
+ var checkCounter = 0
def schedualSlowConsumerCheck:Unit = {
def slowConsumerCheck = {
if( retained > 0 ) {
+ checkCounter += 1
// target tune_min_subscription_rate / sec
val slowCursorDelta = (((tune_slow_subscription_rate) * tune_slow_check_interval) / 1000).toInt
var idleConsumerCount = 0
-
-// if( consumerSubs.isEmpty ) {
+// Handy for periodically looking at the dispatch state...
+//
+// if( !consumerSubs.isEmpty && (checkCounter%100)==0 ) {
// println("using "+size+" out of "+capacity+" buffer space.");
// var cur = entries.getHead
// while( cur!=null ) {
-// if( cur.asLoaded!=null ) {
+// if( cur.asLoaded!=null || cur.hasSubs || cur.prefetched>0 ) {
// println(" => "+cur)
// }
// cur = cur.getNext
@@ -291,6 +288,20 @@ class Queue(val host: VirtualHost, val d
}
}
+ // flush tail entries that are still loaded but which have no fast subs that can process them.
+ var cur = entries.getTail
+ while( cur!=null ) {
+ def haveQuickConsumer = fastSubs.find( sub=> sub.pos.seq <= cur.seq ).isDefined
+ if( !cur.hasSubs && cur.prefetched==0 && cur.asFlushed==null && !haveQuickConsumer ) {
+ // then flush out to make space...
+ cur.flush
+ cur = cur.getPrevious
+ } else {
+ cur = null
+ }
+ }
+
+
// Trigger a swap if we have slow consumers and we are full..
if( idleConsumerCount > 0 && messages.full && flushingSize==0 ) {
swap
@@ -339,8 +350,6 @@ class Queue(val host: VirtualHost, val d
entry.unlink
ack(entry.value, tx)
}
-
-// println("acked... full: "+messages.full)
messages.refiller.run
}
@@ -512,10 +521,13 @@ class Queue(val host: VirtualHost, val d
object QueueEntry extends Sizer[QueueEntry] {
+
def size(value: QueueEntry): Int = value.size
}
-class QueueEntry(val queue:Queue, val seq:Long) extends LinkedNode[QueueEntry] with Comparable[QueueEntry] with Runnable {
+class QueueEntry(val queue:Queue, val seq:Long) extends LinkedNode[QueueEntry] with Comparable[QueueEntry] with Runnable with DispatchLogging {
+ override protected def log = Queue
+
import QueueEntry._
var competing:List[Subscription] = Nil
@@ -567,11 +579,14 @@ class QueueEntry(val queue:Queue, val se
// remove from prefetch counters..
var cur = this;
- while( prefetched > 0 ) {
+ while( cur!=null && prefetched > 0 ) {
if( cur.hasSubs ) {
(cur.browsing ::: cur.competing).foreach { sub => if( sub.prefetched(cur) ) { sub.removePrefetch(cur) } }
}
cur = cur.getPrevious
+ if( cur == null ) {
+ error("illegal prefetch state detected.")
+ }
}
this.value = new Tombstone()
@@ -749,6 +764,13 @@ class QueueEntry(val queue:Queue, val se
// loads to reduce cross thread synchronization
if( delivery.isDefined ) {
queue.store_load_source.merge((QueueEntry.this, delivery.get))
+ } else {
+ // Looks like someone else removed the message from the store.. lets just
+ // tombstone this entry now.
+ queue.dispatchQueue {
+ debug("Detected store drop of message key: %d", ref)
+ tombstone
+ }
}
}
}
@@ -873,7 +895,7 @@ class QueueEntry(val queue:Queue, val se
p.addCompeting(competingFastSubs)
// flush this entry out if it's not going to be needed soon.
- def haveQuickConsumer = queue.fastSubs.find( sub=> !sub.slow && sub.pos.seq <= seq ).isDefined
+ def haveQuickConsumer = queue.fastSubs.find( sub=> sub.pos.seq <= seq ).isDefined
if( !hasSubs && prefetched==0 && !aquired && !haveQuickConsumer ) {
// then flush out to make space...
flush
@@ -947,9 +969,9 @@ class Subscription(queue:Queue) extends
}
def removePrefetch(value:QueueEntry):Unit = {
-// trace("prefetch rm: "+value.seq)
value.prefetched -= 1
prefetchSize -= value.size
+
fillPrefetch()
}
@@ -961,7 +983,6 @@ class Subscription(queue:Queue) extends
}
def addPrefetch(value:QueueEntry):Unit = {
-// trace("prefetch add: "+value.seq)
prefetchSize += value.size
value.prefetched += 1
value.load
Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala?rev=961147&r1=961146&r2=961147&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala Wed Jul 7 04:10:59 2010
@@ -44,6 +44,7 @@ import java.util.{TreeSet, HashSet}
import org.fusesource.hawtdb.api._
import org.apache.activemq.apollo.broker.{DispatchLogging, Log, Logging, BaseService}
+import org.apache.activemq.apollo.util.TimeCounter
object HawtDBClient extends Log {
val BEGIN = -1
@@ -81,7 +82,8 @@ class HawtDBClient(hawtDBStore: HawtDBSt
private var lastRecoveryPosition: Location = null
private var recoveryCounter = 0
- var databaseRootRecord = new DatabaseRootRecord.Bean
+ @volatile
+ var rootBuffer = (new DatabaseRootRecord.Bean()).freeze
val next_batch_counter = new AtomicInteger(0)
@@ -167,17 +169,17 @@ class HawtDBClient(hawtDBStore: HawtDBSt
val rootPage = tx.alloc()
assert(rootPage == 0)
- databaseRootRecord.setQueueIndexPage(alloc(QUEUE_INDEX_FACTORY))
- databaseRootRecord.setMessageKeyIndexPage(alloc(MESSAGE_KEY_INDEX_FACTORY))
- databaseRootRecord.setDataFileRefIndexPage(alloc(DATA_FILE_REF_INDEX_FACTORY))
- databaseRootRecord.setMessageRefsIndexPage(alloc(MESSAGE_REFS_INDEX_FACTORY))
- databaseRootRecord.setSubscriptionIndexPage(alloc(SUBSCRIPTIONS_INDEX_FACTORY))
+ rootBean.setQueueIndexPage(alloc(QUEUE_INDEX_FACTORY))
+ rootBean.setMessageKeyIndexPage(alloc(MESSAGE_KEY_INDEX_FACTORY))
+ rootBean.setDataFileRefIndexPage(alloc(DATA_FILE_REF_INDEX_FACTORY))
+ rootBean.setMessageRefsIndexPage(alloc(MESSAGE_REFS_INDEX_FACTORY))
+ rootBean.setSubscriptionIndexPage(alloc(SUBSCRIPTIONS_INDEX_FACTORY))
+ rootBuffer = rootBean.freeze
- tx.put(DATABASE_ROOT_RECORD_ACCESSOR, 0, databaseRootRecord.freeze)
- databaseRootRecord = databaseRootRecord.copy
+ tx.put(DATABASE_ROOT_RECORD_ACCESSOR, 0, rootBuffer)
true
} else {
- databaseRootRecord = tx.get(DATABASE_ROOT_RECORD_ACCESSOR, 0).copy
+ rootBuffer = tx.get(DATABASE_ROOT_RECORD_ACCESSOR, 0)
false
}
}
@@ -306,23 +308,35 @@ class HawtDBClient(hawtDBStore: HawtDBSt
}
}
+ val metric_load_from_index = new TimeCounter
+ val metric_load_from_journal = new TimeCounter
+
def loadMessage(messageKey: Long): Option[MessageRecord] = {
- withTx { tx =>
+ metric_load_from_index.start { end =>
+ withTx { tx =>
+ val idxPage = rootBuffer.getMessageKeyIndexPage
+
val helper = new TxHelper(tx)
import JavaConversions._
import helper._
val location = messageKeyIndex.get(messageKey)
+ end()
+
if (location != null) {
- load(location, classOf[AddMessage.Getter]) match {
- case Some(x) =>
- val messageRecord: MessageRecord = x
- Some(messageRecord)
- case None => None
+ metric_load_from_journal.time {
+ load(location, classOf[AddMessage.Getter]) match {
+ case Some(x) =>
+ val messageRecord: MessageRecord = x
+ Some(messageRecord)
+ case None => None
+ }
}
} else {
+ debug("Message not indexed. Journal location could not be determined for message: %s", messageKey)
None
}
+ }
}
}
@@ -336,12 +350,14 @@ class HawtDBClient(hawtDBStore: HawtDBSt
private def load[T <: TypeCreatable](location: Location, expected: Class[T]): Option[T] = {
try {
load(location) match {
- case (updateType, batch, data) =>
- Some(expected.cast(decode(location, updateType, data)))
+ case (updateType, batch, data) =>
+ val decoded = expected.cast(decode(location, updateType, data))
+ val rc = Some(decoded)
+ rc
}
} catch {
- case e: Exception =>
- debug("Could not load journal record at: %s", location)
+ case e: Throwable =>
+ debug(e, "Could not load journal record at: %s", location)
None
}
}
@@ -368,10 +384,10 @@ class HawtDBClient(hawtDBStore: HawtDBSt
private def _store(batch: Int, update: TypeCreatable, onComplete: Runnable): Unit = {
val kind = update.asInstanceOf[TypeCreatable]
val frozen = update.freeze
- val baos = new DataByteArrayOutputStream(frozen.serializedSizeUnframed + 1)
+ val baos = new DataByteArrayOutputStream(frozen.serializedSizeFramed + 5)
baos.writeByte(kind.toType().getNumber())
baos.writeInt(batch)
- frozen.writeUnframed(baos)
+ frozen.writeFramed(baos)
val buffer = baos.toBuffer()
append(buffer) {
@@ -464,14 +480,14 @@ class HawtDBClient(hawtDBStore: HawtDBSt
// Is this our first incremental recovery pass?
if (lastRecoveryPosition == null) {
- if (databaseRootRecord.hasFirstBatchLocation) {
+ if (rootBuffer.hasFirstBatchLocation) {
// we have to start at the first in progress batch usually...
- nextRecoveryPosition = databaseRootRecord.getFirstBatchLocation
+ nextRecoveryPosition = rootBuffer.getFirstBatchLocation
} else {
// but perhaps there were no batches in progress..
- if (databaseRootRecord.hasLastUpdateLocation) {
+ if (rootBuffer.hasLastUpdateLocation) {
// then we can just continue from the last update applied to the index
- lastRecoveryPosition = databaseRootRecord.getLastUpdateLocation
+ lastRecoveryPosition = rootBuffer.getLastUpdateLocation
nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition)
} else {
// no updates in the index?.. start from the first record in the journal.
@@ -533,16 +549,6 @@ class HawtDBClient(hawtDBStore: HawtDBSt
})
}
- /**
- */
- def benchmarkLatency[R](func: (String=>Unit)=>R ):R = {
- val start = System.nanoTime
- func { label=>
- var end = System.nanoTime
- warn("latencey: %s is %,.3f ms", label, ( (end - start).toFloat / TimeUnit.MILLISECONDS.toNanos(1)))
- }
- }
-
def read(location: Location) = journal.read(location)
/////////////////////////////////////////////////////////////////////
@@ -565,12 +571,13 @@ class HawtDBClient(hawtDBStore: HawtDBSt
// after the last update location.
if (!recovering || isAfterLastUpdateLocation(location)) {
withTx { tx =>
- // index the updates
+ val helper = new TxHelper(tx)
+ // index the updates
updates.foreach {
update =>
- index(tx, update.update, update.location)
+ index(helper, update.update, update.location)
}
- updateLocations(tx, location)
+ helper.updateLocations(location)
}
}
case None =>
@@ -589,7 +596,8 @@ class HawtDBClient(hawtDBStore: HawtDBSt
case Some((_, _)) =>
if (!recovering || isAfterLastUpdateLocation(location)) {
withTx { tx =>
- updateLocations(tx, location)
+ val helper = new TxHelper(tx)
+ helper.updateLocations(location)
}
}
case None =>
@@ -610,9 +618,9 @@ class HawtDBClient(hawtDBStore: HawtDBSt
// after the last update location.
if (!recovering || isAfterLastUpdateLocation(location)) {
withTx { tx =>
- // index the update now.
- index(tx, update, location)
- updateLocations(tx, location)
+ val helper = new TxHelper(tx)
+ index(helper, update, location)
+ helper.updateLocations(location)
}
}
@@ -637,9 +645,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
}
- private def index(tx: Transaction, update: TypeCreatable, location: Location): Unit = {
-
- val helper = new TxHelper(tx)
+ private def index(helper:TxHelper, update: TypeCreatable, location: Location): Unit = {
import JavaConversions._
import helper._
@@ -649,7 +655,9 @@ class HawtDBClient(hawtDBStore: HawtDBSt
val fileId:jl.Integer = location.getDataFileId()
addAndGet(dataFileRefIndex, fileId, -1)
} else {
- error("Cannot remove message, it did not exist: %d", key)
+ if( !recovering ) {
+ error("Cannot remove message, it did not exist: %d", key)
+ }
}
}
@@ -677,8 +685,8 @@ class HawtDBClient(hawtDBStore: HawtDBSt
case x: AddMessage.Getter =>
val messageKey = x.getMessageKey()
- if (messageKey > databaseRootRecord.getLastMessageKey) {
- databaseRootRecord.setLastMessageKey(messageKey)
+ if (messageKey > rootBean.getLastMessageKey) {
+ rootBean.setLastMessageKey(messageKey)
}
val prevLocation = messageKeyIndex.put(messageKey, location)
@@ -743,26 +751,33 @@ class HawtDBClient(hawtDBStore: HawtDBSt
if (queueEntry != null) {
val messageKey = queueEntry.getMessageKey
val existing = trackingIndex.remove(messageKey)
- if (existing == null) {
- error("Tracking entry not found for message %d", queueEntry.getMessageKey)
- }
- if( addAndGet(messageRefsIndex, new jl.Long(messageKey), -1) == 0 ) {
- // message is no longer referenced.. we can remove it..
- removeMessage(messageKey)
+ if (existing != null) {
+ if( addAndGet(messageRefsIndex, new jl.Long(messageKey), -1) == 0 ) {
+ // message is no longer referenced.. we can remove it..
+ removeMessage(messageKey)
+ }
+ } else {
+ if( !recovering ) {
+ error("Tracking entry not found for message %d", queueEntry.getMessageKey)
+ }
}
} else {
- error("Queue entry not found for seq %d", x.getQueueSeq)
+ if( !recovering ) {
+ error("Queue entry not found for seq %d", x.getQueueSeq)
+ }
}
} else {
- error("Queue not found: %d", x.getQueueKey)
+ if( !recovering ) {
+ error("Queue not found: %d", x.getQueueKey)
+ }
}
case x: AddQueue.Getter =>
val queueKey = x.getKey
if (queueIndex.get(queueKey) == null) {
- if (queueKey > databaseRootRecord.getLastQueueKey) {
- databaseRootRecord.setLastQueueKey(queueKey)
+ if (queueKey > rootBean.getLastQueueKey) {
+ rootBean.setLastQueueKey(queueKey)
}
val queueRecord = new QueueRootRecord.Bean
@@ -790,9 +805,9 @@ class HawtDBClient(hawtDBStore: HawtDBSt
messageKeyIndex.clear
messageRefsIndex.clear
dataFileRefIndex.clear
- databaseRootRecord.setLastMessageKey(0)
+ rootBean.setLastMessageKey(0)
- cleanup(tx);
+ cleanup(_tx);
info("Store purged.");
case x: AddSubscription.Getter =>
@@ -870,11 +885,11 @@ class HawtDBClient(hawtDBStore: HawtDBSt
}
// Don't GC files that we will need for recovery..
- val upto = if (databaseRootRecord.hasFirstBatchLocation) {
- Some(databaseRootRecord.getFirstBatchLocation.getDataFileId)
+ val upto = if (rootBuffer.hasFirstBatchLocation) {
+ Some(rootBuffer.getFirstBatchLocation.getDataFileId)
} else {
- if (databaseRootRecord.hasLastUpdateLocation) {
- Some(databaseRootRecord.getLastUpdateLocation.getDataFileId)
+ if (rootBuffer.hasLastUpdateLocation) {
+ Some(rootBuffer.getLastUpdateLocation.getDataFileId)
} else {
None
}
@@ -915,12 +930,12 @@ class HawtDBClient(hawtDBStore: HawtDBSt
private case class Update(update: TypeCreatable, location: Location)
- private class TxHelper(private val _tx: Transaction) {
- lazy val queueIndex = QUEUE_INDEX_FACTORY.open(_tx, databaseRootRecord.getQueueIndexPage)
- lazy val dataFileRefIndex = DATA_FILE_REF_INDEX_FACTORY.open(_tx, databaseRootRecord.getDataFileRefIndexPage)
- lazy val messageKeyIndex = MESSAGE_KEY_INDEX_FACTORY.open(_tx, databaseRootRecord.getMessageKeyIndexPage)
- lazy val messageRefsIndex = MESSAGE_REFS_INDEX_FACTORY.open(_tx, databaseRootRecord.getMessageRefsIndexPage)
- lazy val subscriptionIndex = SUBSCRIPTIONS_INDEX_FACTORY.open(_tx, databaseRootRecord.getSubscriptionIndexPage)
+ private class TxHelper(val _tx: Transaction) {
+ lazy val queueIndex = QUEUE_INDEX_FACTORY.open(_tx, rootBuffer.getQueueIndexPage)
+ lazy val dataFileRefIndex = DATA_FILE_REF_INDEX_FACTORY.open(_tx, rootBuffer.getDataFileRefIndexPage)
+ lazy val messageKeyIndex = MESSAGE_KEY_INDEX_FACTORY.open(_tx, rootBuffer.getMessageKeyIndexPage)
+ lazy val messageRefsIndex = MESSAGE_REFS_INDEX_FACTORY.open(_tx, rootBuffer.getMessageRefsIndexPage)
+ lazy val subscriptionIndex = SUBSCRIPTIONS_INDEX_FACTORY.open(_tx, rootBuffer.getSubscriptionIndexPage)
def addAndGet[K](index:SortedIndex[K, jl.Integer], key:K, amount:Int):Int = {
var counter = index.get(key)
@@ -949,6 +964,24 @@ class HawtDBClient(hawtDBStore: HawtDBSt
factory.create(_tx, rc)
rc
}
+
+ val rootBean = rootBuffer.copy
+
+ def lastUpdateLocation(location:Location) = {
+ rootBean.setLastUpdateLocation(location)
+ }
+
+ def updateLocations(lastUpdate: Location): Unit = {
+ rootBean.setLastUpdateLocation(lastUpdate)
+ if (batches.isEmpty) {
+ rootBean.clearFirstBatchLocation
+ } else {
+ rootBean.setFirstBatchLocation(batches.head._2._1)
+ }
+ rootBuffer = rootBean.freeze
+ _tx.put(DATABASE_ROOT_RECORD_ACCESSOR, 0, rootBuffer)
+ }
+
}
private def withTx[T](func: (Transaction) => T): T = {
@@ -981,18 +1014,8 @@ class HawtDBClient(hawtDBStore: HawtDBSt
}
private def isAfterLastUpdateLocation(location: Location) = {
- val lastUpdate: Location = databaseRootRecord.getLastUpdateLocation
+ val lastUpdate: Location = rootBuffer.getLastUpdateLocation
lastUpdate.compareTo(location) < 0
}
- private def updateLocations(tx: Transaction, lastUpdate: Location): Unit = {
- databaseRootRecord.setLastUpdateLocation(lastUpdate)
- if (batches.isEmpty) {
- databaseRootRecord.clearFirstBatchLocation
- } else {
- databaseRootRecord.setFirstBatchLocation(batches.head._2._1)
- }
- tx.put(DATABASE_ROOT_RECORD_ACCESSOR, 0, databaseRootRecord.freeze)
- databaseRootRecord = databaseRootRecord.copy
- }
}
\ No newline at end of file
Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala?rev=961147&r1=961146&r2=961147&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala Wed Jul 7 04:10:59 2010
@@ -91,7 +91,7 @@ class HawtDBStore extends Store with Bas
}
protected def _start(onCompleted: Runnable) = {
- executor_pool = new ThreadPoolExecutor(4, 20, 1, TimeUnit.SECONDS, new LinkedBlockingQueue[Runnable](), new ThreadFactory(){
+ executor_pool = Executors.newFixedThreadPool(20, new ThreadFactory(){
def newThread(r: Runnable) = {
val rc = new Thread(r, "hawtdb store client")
rc.setDaemon(true)
@@ -102,8 +102,8 @@ class HawtDBStore extends Store with Bas
schedualDisplayStats
executor_pool {
client.start(^{
- next_msg_key.set( client.databaseRootRecord.getLastMessageKey.longValue +1 )
- next_queue_key.set( client.databaseRootRecord.getLastQueueKey.longValue +1 )
+ next_msg_key.set( client.rootBuffer.getLastMessageKey.longValue +1 )
+ next_queue_key.set( client.rootBuffer.getLastQueueKey.longValue +1 )
onCompleted.run
})
}
@@ -162,11 +162,11 @@ class HawtDBStore extends Store with Bas
def loadMessage(id: Long)(callback: (Option[MessageRecord]) => Unit) = {
executor_pool ^{
- callback( client.loadMessage(id) )
+ val rc = client.loadMessage(id)
+ callback( rc )
}
}
-
def listQueueEntries(id: Long)(callback: (Seq[QueueEntryRecord]) => Unit) = {
executor_pool ^{
callback( client.getQueueEntries(id) )
@@ -319,9 +319,30 @@ class HawtDBStore extends Store with Bas
def rate(x:Long, y:Long):Float = ((y-x)*1000.0f)/TimeUnit.NANOSECONDS.toMillis(et-st)
+ val m1 = rate(ss._1,es._1)
+ val m2 = rate(ss._2,es._2)
+ val m3 = rate(ss._3,es._3)
+ val m4 = rate(ss._4,es._4)
+
+ if( m1>0f || m2>0f || m3>0f || m4>0f ) {
+ info("metrics: cancled: { messages: %,.3f, enqeues: %,.3f }, flushed: { messages: %,.3f, enqeues: %,.3f }, commit latency: %,.3f, store latency: %,.3f",
+ m1, m2, m3, m3, avgCommitLatency, storeLatency(true).avgTime(TimeUnit.MILLISECONDS) )
+ }
+
+
+ def display(name:String, counter:TimeCounter) {
+ var t = counter.apply(true)
+ if( t.count > 0 ) {
+ info("%s latency in ms: avg: %,.3f, max: %,.3f, min: %,.3f", name, t.avgTime(TimeUnit.MILLISECONDS), t.maxTime(TimeUnit.MILLISECONDS), t.minTime(TimeUnit.MILLISECONDS))
+ }
+ }
+
+// display("total msg load", loadMessageTimer)
+// display("index read", client.indexLoad)
+// display("toal journal load", client.journalLoad)
+// display("journal read", client.journalRead)
+// display("journal decode", client.journalDecode)
- info("metrics: cancled: { messages: %,.3f, enqeues: %,.3f }, flushed: { messages: %,.3f, enqeues: %,.3f }, commit latency: %,.3f, store latency: %,.3f",
- rate(ss._1,es._1), rate(ss._2,es._2), rate(ss._3,es._3), rate(ss._4,es._4), avgCommitLatency, storeLatency(true).avgTime(TimeUnit.MILLISECONDS) )
schedualDisplayStats
}
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Helpers.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Helpers.scala?rev=961147&r1=961146&r2=961147&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Helpers.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Helpers.scala Wed Jul 7 04:10:59 2010
@@ -178,7 +178,7 @@ object Helpers {
if (t == null) {
throw new IOException("Could not load journal record. Invalid type at location: " + location);
}
- t.parseUnframed(value).asInstanceOf[TypeCreatable]
+ t.parseFramed(value).asInstanceOf[TypeCreatable]
}
//
Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreBenchmarkSupport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreBenchmarkSupport.scala?rev=961147&r1=961146&r2=961147&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreBenchmarkSupport.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreBenchmarkSupport.scala Wed Jul 7 04:10:59 2010
@@ -144,20 +144,28 @@ abstract class StoreBenchmarkSupport ext
msgKeys
}
- test("store enqueue latencey") {
+ test("store enqueue and load latencey") {
val A = addQueue("A")
- var seq = 0
+ var messageKeys = storeMessages(A)
+ loadMessages(A, messageKeys)
+ }
+
+ def storeMessages(queue:Long) = {
+
+ var seq = 0L
+ var messageKeys = ListBuffer[Long]()
val content = payload("message\n", 1024)
- val metric = benchmark {
+ var metric = benchmarkCount(100000) {
seq += 1
var batch = store.createStoreBatch
val message = addMessage(batch, content)
- batch.enqueue(entry(A, seq, message))
+ messageKeys += message
+ batch.enqueue(entry(queue, seq, message))
val latch = new CountDownLatch(1)
- batch.setDisposer(^{cd(latch)} )
+ batch.setDisposer(^{latch.countDown} )
batch.release
store.flushMessage(message) {}
@@ -167,12 +175,27 @@ abstract class StoreBenchmarkSupport ext
println("enqueue metrics: "+metric)
println("enqueue latency is: "+metric.latency(TimeUnit.MILLISECONDS)+" ms")
println("enqueue rate is: "+metric.rate(TimeUnit.SECONDS)+" enqueues/s")
+ messageKeys.toList
}
- def cd(latch:CountDownLatch) = {
- latch.countDown
- }
+ def loadMessages(queue:Long, messageKeys: List[Long]) = {
+
+ var keys = messageKeys.toList
+ val metric = benchmarkCount(keys.size) {
+ val latch = new CountDownLatch(1)
+ store.loadMessage(keys.head) { msg=>
+ assert(msg.isDefined, "message key not found: "+keys.head)
+ latch.countDown
+ }
+ latch.await
+ keys = keys.drop(1)
+ }
+
+ println("load metrics: "+metric)
+ println("load latency is: "+metric.latency(TimeUnit.MILLISECONDS)+" ms")
+ println("load rate is: "+metric.rate(TimeUnit.SECONDS)+" loads/s")
+ }
case class Metric(count:Long, duration:Long) {
def latency(unit:TimeUnit) = {
@@ -183,14 +206,20 @@ abstract class StoreBenchmarkSupport ext
}
}
- def benchmark(func: =>Unit ) = {
+ def benchmarkFor(duration:Int)(func: =>Unit ) = {
val counter = new AtomicLong()
val done = new AtomicBoolean()
+ val warmup = new AtomicBoolean(true)
+
var startT = 0L
var endT = 0L
val thread = new Thread("benchmarked task") {
+
override def run = {
+ while(warmup.get) {
+ func
+ }
startT = System.nanoTime();
while(!done.get) {
func
@@ -201,12 +230,24 @@ abstract class StoreBenchmarkSupport ext
}
thread.start()
- Thread.sleep(1000*30)
+
+ Thread.sleep(1000*5)
+ warmup.set(false)
+ Thread.sleep(1000*duration)
done.set(true)
thread.join
Metric(counter.get, endT-startT)
}
-
+ def benchmarkCount(iterations:Int)(func: =>Unit ) = {
+ val startT = System.nanoTime();
+ var i = 0
+ while( i < iterations) {
+ func
+ i += 1
+ }
+ val endT = System.nanoTime();
+ Metric(iterations, endT-startT)
+ }
}