You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:08:25 UTC
svn commit: r961134 - in /activemq/sandbox/activemq-apollo-actor:
activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/
activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/
activemq-hawtdb/src/main/scala/org/apache/...
Author: chirino
Date: Wed Jul 7 04:08:24 2010
New Revision: 961134
URL: http://svn.apache.org/viewvc?rev=961134&view=rev
Log:
teaking stores to get better perf when there is not attached consumer
Added:
activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/MetricProducer.scala
- copied, changed from r961133, activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/IntCounter.scala
activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/TimeCounter.scala
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala
activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/IntCounter.scala
activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/LongCounter.scala
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961134&r1=961133&r2=961134&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul 7 04:08:24 2010
@@ -91,7 +91,7 @@ class Queue(val host: VirtualHost, val d
val session_manager = new SinkMux[Delivery](messages, dispatchQueue, Delivery)
// sequence numbers.. used to track what's in the store.
- var message_seq_counter = 0L
+ var message_seq_counter = 1L
val headEntry = new QueueEntry(this).tombstone
var tailEntry = new QueueEntry(this)
@@ -121,16 +121,18 @@ class Queue(val host: VirtualHost, val d
def restore(storeId: Long, records:Seq[QueueEntryRecord]) = ^{
this.storeId = storeId
- records.foreach { qer =>
- val entry = new QueueEntry(Queue.this).flushed(qer)
- entries.addLast(entry)
- }
- if( !entries.isEmpty ) {
- message_seq_counter = entries.getTail.seq
- }
- counter = records.size
- enqueue_counter += records.size
- debug("restored: "+records.size )
+ if( !records.isEmpty ) {
+ records.foreach { qer =>
+ val entry = new QueueEntry(Queue.this).flushed(qer)
+ entries.addLast(entry)
+ }
+
+ message_seq_counter = records.last.queueSeq+1
+
+ counter = records.size
+ enqueue_counter += records.size
+ debug("restored: "+records.size )
+ }
} >>: dispatchQueue
object messages extends Sink[Delivery] {
@@ -149,15 +151,9 @@ class Queue(val host: VirtualHost, val d
val entry = tailEntry
tailEntry = new QueueEntry(Queue.this)
- entry.created(next_message_seq, delivery)
-
- if( delivery.ack!=null ) {
- delivery.ack(delivery.storeBatch)
- }
- if (delivery.storeKey != -1) {
- delivery.storeBatch.enqueue(entry.createQueueEntryRecord)
- delivery.storeBatch.release
- }
+ val queueDelivery = delivery.copy
+ entry.created(next_message_seq, queueDelivery)
+ queueDelivery.storeBatch = delivery.storeBatch
size += entry.size
entries.addLast(entry)
@@ -165,6 +161,11 @@ class Queue(val host: VirtualHost, val d
enqueue_counter += 1
enqueue_size += entry.size
+ // Do we need to do a persistent enqueue???
+ if (queueDelivery.storeBatch != null) {
+ queueDelivery.storeBatch.enqueue(entry.createQueueEntryRecord)
+ }
+
var swap_check = false
if( !entry.hasSubs ) {
// we flush the entry out right away if it looks
@@ -192,6 +193,12 @@ class Queue(val host: VirtualHost, val d
})
}
+ // release the store batch...
+ if (queueDelivery.storeBatch != null) {
+ queueDelivery.storeBatch.release
+ queueDelivery.storeBatch = null
+ }
+
true
}
}
@@ -267,19 +274,9 @@ class Queue(val host: VirtualHost, val d
false
} else {
- // Called from the producer thread before the delivery is
- // processed by the queue's thread.. We don't
- // yet know the order of the delivery in the queue.
- if (delivery.storeKey != -1) {
- // If the message has a store id, then this delivery will
- // need a tx to track the store changes.
- if( delivery.storeBatch == null ) {
- delivery.storeBatch = host.store.createStoreBatch
- } else {
- delivery.storeBatch.retain
- }
+ if( delivery.storeBatch!=null ) {
+ delivery.storeBatch.retain
}
-
val rc = session.offer(delivery)
assert(rc, "session should accept since it was not full")
true
@@ -440,7 +437,8 @@ class Queue(val host: VirtualHost, val d
}
def drain_store_flushes() = {
- store_flush_source.getData.foreach { entry =>
+ val data = store_flush_source.getData
+ data.foreach { entry =>
flushingSize -= entry.size
// by the time we get called back, subs my be interested in the entry
@@ -708,9 +706,6 @@ class QueueEntry(val queue:Queue) extend
delivery.storeKey = tx.store(delivery.createMessageRecord)
tx.enqueue(createQueueEntryRecord)
tx.release
- true
- } else {
- false
}
}
@@ -719,10 +714,18 @@ class QueueEntry(val queue:Queue) extend
0
} else {
flushing=true
- store
- queue.host.store.flushMessage(ref) {
- queue.store_flush_source.merge(QueueEntry.this)
+
+ if( delivery.storeBatch!=null ) {
+ delivery.storeBatch.eagerFlush(^{
+ queue.store_flush_source.merge(QueueEntry.this)
+ })
+ } else {
+ store
+ queue.host.store.flushMessage(ref) {
+ queue.store_flush_source.merge(QueueEntry.this)
+ }
}
+
size
}
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=961134&r1=961133&r2=961134&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Wed Jul 7 04:08:24 2010
@@ -182,7 +182,7 @@ class Router(val host:VirtualHost) exten
} >>: dispatchQueue
def connect(destination:Destination, producer:DeliveryProducer)(completed: (DeliveryProducerRoute)=>Unit) = {
- val route = new DeliveryProducerRoute(destination, producer) {
+ val route = new DeliveryProducerRoute(this, destination, producer) {
override def on_connected = {
completed(this);
}
@@ -228,7 +228,7 @@ trait Route extends Retained {
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class DeliveryProducerRoute(val destination:Destination, val producer:DeliveryProducer) extends BaseRetained with Route with Sink[Delivery] with DispatchLogging {
+class DeliveryProducerRoute(val router:Router, val destination:Destination, val producer:DeliveryProducer) extends BaseRetained with Route with Sink[Delivery] with DispatchLogging {
override protected def log = Router
override def dispatchQueue = producer.dispatchQueue
@@ -294,22 +294,43 @@ class DeliveryProducerRoute(val destinat
def full = overflow!=null
- def offer(value:Delivery) = {
+ def offer(delivery:Delivery) = {
if( full ) {
false
} else {
+ if( delivery.message.persistent && router.host.store!=null ) {
+ delivery.storeBatch = router.host.store.createStoreBatch
+ delivery.storeKey = delivery.storeBatch.store(delivery.createMessageRecord)
+ }
+
targets.foreach { target=>
- if( !target.offer(value) ) {
+ if( !target.offer(delivery) ) {
overflowSessions ::= target
}
}
+
if( overflowSessions!=Nil ) {
- overflow = value
+ overflow = delivery
+ } else {
+ delivered(delivery)
}
true
}
}
+ private def delivered(delivery: Delivery): Unit = {
+ if (delivery.ack != null) {
+ if (delivery.storeBatch != null) {
+ delivery.storeBatch.setDisposer(^ {delivery.ack(null)})
+ } else {
+ delivery.ack(null)
+ }
+ }
+ if (delivery.storeBatch != null) {
+ delivery.storeBatch.release
+ }
+ }
+
val drainer = ^{
if( overflow!=null ) {
val original = overflowSessions;
@@ -320,6 +341,7 @@ class DeliveryProducerRoute(val destinat
}
}
if( overflowSessions==Nil ) {
+ delivered(overflow)
overflow = null
refiller.run
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=961134&r1=961133&r2=961134&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jul 7 04:08:24 2010
@@ -53,13 +53,13 @@ object VirtualHost extends Log {
rc.enabled = true
rc.hostNames.add("localhost")
- val store = new CassandraStoreDTO
- store.hosts.add("localhost:9160")
+// val store = new CassandraStoreDTO
+// store.hosts.add("localhost:9160")
// val store = new HawtDBStoreDTO
// store.directory = new File("activemq-data")
- rc.store = store
+ rc.store = null
rc
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala?rev=961134&r1=961133&r2=961134&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala Wed Jul 7 04:08:24 2010
@@ -22,7 +22,6 @@ import com.shorrockin.cascal.session._
import java.util.concurrent.atomic.AtomicLong
import collection.mutable.ListBuffer
import java.util.HashMap
-import org.apache.activemq.apollo.util.IntCounter
import org.apache.activemq.apollo.store.{QueueEntryRecord, MessageRecord, QueueStatus, QueueRecord}
import org.apache.activemq.apollo.broker.{Logging, Log, BaseService}
import org.apache.activemq.apollo.dto.{CassandraStoreDTO, StoreDTO}
@@ -31,7 +30,8 @@ import org.apache.activemq.apollo.broker
import com.shorrockin.cascal.utils.Conversions._
import org.fusesource.hawtdispatch.ScalaDispatch._
import ReporterLevel._
-import java.util.concurrent.{ThreadFactory, TimeUnit, Executors, ExecutorService}
+import java.util.concurrent._
+import org.apache.activemq.apollo.util.{TimeCounter, IntCounter}
object CassandraStore extends Log {
@@ -71,12 +71,12 @@ class CassandraStore extends Store with
/////////////////////////////////////////////////////////////////////
val dispatchQueue = createQueue("cassandra store")
- var next_queue_key = new AtomicLong(0)
- var next_msg_key = new AtomicLong(0)
+ var next_queue_key = new AtomicLong(1)
+ var next_msg_key = new AtomicLong(1)
val client = new CassandraClient()
var config:CassandraStoreDTO = defaultConfig
- var blocking:BlockingSupport = null
+ var blocking:ThreadPoolExecutor = null
def configure(config: StoreDTO, reporter: Reporter) = configure(config.asInstanceOf[CassandraStoreDTO], reporter)
@@ -94,7 +94,13 @@ class CassandraStore extends Store with
protected def _start(onCompleted: Runnable) = {
- blocking = new BlockingSupport
+ blocking = new ThreadPoolExecutor(4, 20, 1, TimeUnit.SECONDS, new LinkedBlockingQueue[Runnable](), new ThreadFactory(){
+ def newThread(r: Runnable) = {
+ val rc = new Thread(r, "cassandra client")
+ rc.setDaemon(true)
+ rc
+ }
+ })
client.schema = Schema(config.keyspace)
// TODO: move some of this parsing code into validation too.
@@ -109,82 +115,12 @@ class CassandraStore extends Store with
}.toList
client.start
+ schedualDisplayStats
onCompleted.run
}
protected def _stop(onCompleted: Runnable) = {
- blocking.setDisposer(^{
- onCompleted
- })
- blocking.release
- }
-
-
- class BlockingSupport extends BaseRetained {
-
- val name = "cassandra store worker"
- var workerStackSize = 0
- val max_workers = 20
- var executing_workers=0
-
- val dispatchQueue = createQueue()
- val queued_executions = ListBuffer[()=>Unit]()
- var executor_pool = Executors.newCachedThreadPool(new ThreadFactory {
- def newThread(r: Runnable) = {
- val thread = new Thread(null, r, name, workerStackSize)
- thread.setDaemon(true)
- thread
- }
- })
-
- /**
- * executes a blocking function in an async thread.
- */
- def apply( func: =>Unit ):Unit = {
- assertRetained
- dispatchQueue {
- if( executing_workers >= max_workers ) {
- queued_executions += func _
- } else {
- executing_workers += 1
- execute(func _)
- }
- }
- }
-
- private def execute(func: ()=>Unit):Unit = {
- executor_pool {
- try {
- func()
- } finally {
- execute_done
- }
- }
- }
-
- private def execute_done() = ^{
- if ( queued_executions.isEmpty ) {
- executing_workers -= 1
- } else {
- execute(queued_executions.head)
- queued_executions.drop(1)
- }
- if( retained < 1 ) {
- check_pool_disposed
- }
- } >>: dispatchQueue
-
- override def dispose = ^{
- check_pool_disposed
- } >>: dispatchQueue
-
- private def check_pool_disposed = {
- if( executing_workers == 0 ) {
- executor_pool.shutdown
- super.dispose
- }
- }
-
+ blocking.shutdown
}
@@ -193,6 +129,17 @@ class CassandraStore extends Store with
// Implementation of the BrokerDatabase interface
//
/////////////////////////////////////////////////////////////////////
+ val storeLatency = new TimeCounter
+ def schedualDisplayStats:Unit = {
+ def displayStats = {
+ if( serviceState.isStarted ) {
+ val cl = storeLatency.apply(true)
+ info("metrics: store latency: %,.3f ms", cl.avgTime(TimeUnit.MILLISECONDS))
+ schedualDisplayStats
+ }
+ }
+ dispatchQueue.dispatchAfter(5, TimeUnit.SECONDS, ^{ displayStats })
+ }
/**
* Deletes all stored data from the store.
@@ -200,12 +147,14 @@ class CassandraStore extends Store with
def purge(callback: =>Unit) = {
blocking {
client.purge
+ next_queue_key.set(1)
+ next_msg_key.set(1)
callback
}
}
def addQueue(record: QueueRecord)(callback: (Option[Long]) => Unit) = {
- val key = next_queue_key.incrementAndGet
+ val key = next_queue_key.getAndIncrement
record.key = key
blocking {
client.addQueue(record)
@@ -249,13 +198,7 @@ class CassandraStore extends Store with
if( action == null ) {
callback
} else {
- val prevDisposer = action.tx.getDisposer
- action.tx.setDisposer(^{
- callback
- if(prevDisposer!=null) {
- prevDisposer.run
- }
- })
+ action.tx.eagerFlush(callback _)
flush(action.tx.txid)
}
@@ -288,8 +231,11 @@ class CassandraStore extends Store with
}
}
+ val txid:Int = next_tx_id.getAndIncrement
var actions = Map[Long, MessageAction]()
- var txid:Int = 0
+
+ var flushListeners = ListBuffer[Runnable]()
+ def eagerFlush(callback: Runnable) = if( callback!=null ) { this.synchronized { flushListeners += callback } }
def rm(msg:Long) = {
actions -= msg
@@ -302,7 +248,7 @@ class CassandraStore extends Store with
}
def store(record: MessageRecord):Long = {
- record.key = next_msg_key.incrementAndGet
+ record.key = next_msg_key.getAndIncrement
val action = new MessageAction
action.msg = record.key
action.store = record
@@ -339,7 +285,11 @@ class CassandraStore extends Store with
transaction_source.merge(this)
}
+
def onPerformed() {
+ flushListeners.foreach { x=>
+ x.run()
+ }
super.dispose
}
}
@@ -354,21 +304,14 @@ class CassandraStore extends Store with
var pendingEnqueues = new HashMap[(Long,Long), CassandraBatch#MessageAction]()
var delayedTransactions = new HashMap[Int, CassandraBatch]()
- var next_tx_id = new IntCounter
+ var next_tx_id = new IntCounter(1)
def drain_transactions = {
transaction_source.getData.foreach { tx =>
- val tx_id = next_tx_id.incrementAndGet
- tx.txid = tx_id
+ val tx_id = tx.txid
delayedTransactions.put(tx_id, tx)
- if( config.flushDelay > 0 ) {
- dispatchQueue.dispatchAfter(config.flushDelay, TimeUnit.MILLISECONDS, ^{flush(tx_id)})
- } else {
- dispatchQueue.dispatchAsync(^{flush(tx_id)})
- }
-
tx.actions.foreach { case (msg, action) =>
if( action.store!=null ) {
pendingStores.put(msg, action)
@@ -407,6 +350,12 @@ class CassandraStore extends Store with
}
}
+ if( !tx.flushListeners.isEmpty || config.flushDelay <= 0 ) {
+ flush(tx_id)
+ } else {
+ dispatchQueue.dispatchAfter(config.flushDelay, TimeUnit.MILLISECONDS, ^{flush(tx_id)})
+ }
+
}
}
@@ -447,15 +396,16 @@ class CassandraStore extends Store with
}
if( !txs.isEmpty ) {
- // suspend so that we don't process more flush requests while
- // we are concurrently executing a flush
- flush_source.suspend
- blocking {
- client.store(txs)
- txs.foreach { x=>
- x.onPerformed
+ storeLatency.start { end =>
+ blocking {
+ client.store(txs)
+ dispatchQueue {
+ end()
+ txs.foreach { x=>
+ x.onPerformed
+ }
+ }
}
- flush_source.resume
}
}
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala?rev=961134&r1=961133&r2=961134&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala Wed Jul 7 04:08:24 2010
@@ -156,6 +156,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
pageFileFactory.setDrainOnClose(false)
pageFileFactory.setSync(true)
pageFileFactory.setUseWorkerThread(true)
+ pageFileFactory.setPageSize(512.toShort)
pageFileFactory.open()
withTx { tx =>
@@ -210,25 +211,25 @@ class HawtDBClient(hawtDBStore: HawtDBSt
}
def store(txs: Seq[HawtDBStore#HawtDBBatch]) {
- var batch = List[TypeCreatable]()
+ var batch = ListBuffer[TypeCreatable]()
txs.foreach {
tx =>
tx.actions.foreach {
case (msg, action) =>
if (action.messageRecord != null) {
val update: AddMessage.Bean = action.messageRecord
- batch ::= update
+ batch += update
}
action.enqueues.foreach {
queueEntry =>
val update: AddQueueEntry.Bean = queueEntry
- batch ::= update
+ batch += update
}
action.dequeues.foreach {
queueEntry =>
val qid = queueEntry.queueKey
val seq = queueEntry.queueSeq
- batch ::= new RemoveQueueEntry.Bean().setQueueKey(qid).setQueueSeq(seq)
+ batch += new RemoveQueueEntry.Bean().setQueueKey(qid).setQueueSeq(seq)
}
}
}
@@ -239,9 +240,8 @@ class HawtDBClient(hawtDBStore: HawtDBSt
})
}
- def purge() = {
- val update = new Purge.Bean()
- store(update)
+ def purge(callback: Runnable) = {
+ store(new Purge.Bean(), callback)
}
def listQueues: Seq[Long] = {
@@ -356,7 +356,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
tracker.await
}
- private def store(updates: List[TypeCreatable], onComplete: Runnable): Unit = {
+ private def store(updates: Seq[TypeCreatable], onComplete: Runnable): Unit = {
val batch = next_batch_id
begin(batch)
updates.foreach {
@@ -747,10 +747,11 @@ class HawtDBClient(hawtDBStore: HawtDBSt
addAndGet(messageRefsIndex, new jl.Long(messageKey), 1)
} else {
+ // TODO perhaps treat this like an update?
error("Duplicate queue entry seq %d", x.getQueueSeq)
}
} else {
- error("Duplicate queue entry message %d", x.getMessageKey)
+ error("Duplicate queue entry message %d was %d", x.getMessageKey, existing)
}
} else {
error("Queue not found: %d", x.getQueueKey)
Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala?rev=961134&r1=961133&r2=961134&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala Wed Jul 7 04:08:24 2010
@@ -70,8 +70,8 @@ class HawtDBStore extends Store with Bas
/////////////////////////////////////////////////////////////////////
val dispatchQueue = createQueue("hawtdb store")
- var next_queue_key = new AtomicLong(0)
- var next_msg_key = new AtomicLong(0)
+ var next_queue_key = new AtomicLong(1)
+ var next_msg_key = new AtomicLong(1)
var executor_pool:ExecutorService = _
var config:HawtDBStoreDTO = defaultConfig
@@ -125,14 +125,15 @@ class HawtDBStore extends Store with Bas
* Deletes all stored data from the store.
*/
def purge(callback: =>Unit) = {
- executor_pool ^{
- client.purge
+ client.purge(^{
+ next_queue_key.set(1)
+ next_msg_key.set(1)
callback
- }
+ })
}
def addQueue(record: QueueRecord)(callback: (Option[Long]) => Unit) = {
- val key = next_queue_key.incrementAndGet
+ val key = next_queue_key.getAndIncrement
record.key = key
executor_pool ^{
client.addQueue(record)
@@ -171,22 +172,19 @@ class HawtDBStore extends Store with Bas
}
}
- def flushMessage(id: Long)(callback: => Unit) = ^{
+ private def doFlush(id: Long, callback:Runnable) = {
val action: HawtDBBatch#MessageAction = pendingStores.get(id)
if( action == null ) {
- callback
+ callback.run
} else {
- val prevDisposer = action.tx.getDisposer
- action.tx.setDisposer(^{
- callback
- if(prevDisposer!=null) {
- prevDisposer.run
- }
- })
+ action.tx.eagerFlush(callback)
flush(action.tx.txid)
}
-
- } >>: dispatchQueue
+ }
+
+ def flushMessage(id: Long)(callback: => Unit) = dispatchQueue {
+ doFlush(id, ^{ callback } )
+ }
def createStoreBatch() = new HawtDBBatch
@@ -217,8 +215,11 @@ class HawtDBStore extends Store with Bas
}
}
+ val txid:Int = next_tx_id.getAndIncrement
var actions = Map[Long, MessageAction]()
- var txid:Int = 0
+
+ var flushListeners = ListBuffer[Runnable]()
+ def eagerFlush(callback: Runnable) = if( callback!=null ) { this.synchronized { flushListeners += callback } }
def rm(msg:Long) = {
actions -= msg
@@ -231,7 +232,7 @@ class HawtDBStore extends Store with Bas
}
def store(record: MessageRecord):Long = {
- record.key = next_msg_key.incrementAndGet
+ record.key = next_msg_key.getAndIncrement
val action = new MessageAction
action.msg = record.key
action.messageRecord = record
@@ -269,13 +270,13 @@ class HawtDBStore extends Store with Bas
transaction_source.merge(this)
}
- def onPerformed() {
+ def onPerformed() = this.synchronized {
metric_commit_counter += 1
val t = TimeUnit.NANOSECONDS.toMillis(System.nanoTime-dispose_start)
- if( t < 0 ) {
- println("wtf")
- }
metric_commit_latency_counter += t
+ flushListeners.foreach { x=>
+ x.run
+ }
super.dispose
}
}
@@ -302,8 +303,7 @@ class HawtDBStore extends Store with Bas
var pendingEnqueues = new HashMap[(Long,Long), HawtDBBatch#MessageAction]()
var delayedTransactions = new HashMap[Int, HawtDBBatch]()
- var next_tx_id = new IntCounter
-
+ var next_tx_id = new IntCounter(1)
def schedualDisplayStats:Unit = {
val st = System.nanoTime
@@ -329,15 +329,7 @@ class HawtDBStore extends Store with Bas
def drain_transactions = {
transaction_source.getData.foreach { tx =>
- val tx_id = next_tx_id.incrementAndGet
- tx.txid = tx_id
- delayedTransactions.put(tx_id, tx)
-
- if( config.flushDelay > 0 ) {
- dispatchQueue.dispatchAfter(config.flushDelay, TimeUnit.MILLISECONDS, ^{flush(tx_id)})
- } else {
- dispatchQueue.dispatchAsync(^{flush(tx_id)})
- }
+ delayedTransactions.put(tx.txid, tx)
tx.actions.foreach { case (msg, action) =>
if( action.messageRecord!=null ) {
@@ -380,6 +372,13 @@ class HawtDBStore extends Store with Bas
}
}
+ val tx_id = tx.txid
+ if( !tx.flushListeners.isEmpty || config.flushDelay <= 0 ) {
+ flush(tx_id)
+ } else {
+ dispatchQueue.dispatchAfter(config.flushDelay, TimeUnit.MILLISECONDS, ^{flush(tx_id)})
+ }
+
}
}
@@ -402,7 +401,6 @@ class HawtDBStore extends Store with Bas
// Message may be flushed or canceled before the timeout flush event..
// tx may be null in those cases
if (tx!=null) {
-
tx.actions.foreach { case (msg, action) =>
if( action.messageRecord !=null ) {
metric_flushed_message_counter += 1
@@ -422,8 +420,6 @@ class HawtDBStore extends Store with Bas
}
if( !txs.isEmpty ) {
- // suspend so that we don't process more flush requests while
- // we are concurrently executing a flush
client.store(txs)
}
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala?rev=961134&r1=961133&r2=961134&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala Wed Jul 7 04:08:24 2010
@@ -18,6 +18,8 @@ package org.apache.activemq.apollo.stomp
import org.apache.activemq.transport.TransportFactory
import org.apache.activemq.apollo.broker.{LoggingTracker, Broker}
+import java.io.File
+import org.apache.activemq.apollo.dto.{CassandraStoreDTO, HawtDBStoreDTO}
/**
*/
@@ -37,6 +39,14 @@ object StompBroker {
connector.protocol = "stomp"
connector.advertise = uri
+ val store = new CassandraStoreDTO
+ store.hosts.add("localhost:9160")
+
+// val store = new HawtDBStoreDTO
+// store.directory = new File("activemq-data")
+
+ broker.config.virtualHosts.get(0).store = store
+
val tracker = new LoggingTracker("broker startup")
tracker.start(broker)
tracker.await
@@ -48,4 +58,5 @@ object StompBroker {
println("Shutdown complete.")
}
+
}
\ No newline at end of file
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961134&r1=961133&r2=961134&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul 7 04:08:24 2010
@@ -280,10 +280,12 @@ class StompProtocolHandler extends Proto
delivery.message = message
delivery.size = message.frame.size
- if( message.persistent && host.store!=null ) {
- storeBatch = host.store.createStoreBatch
- delivery.storeBatch = storeBatch
- delivery.storeKey = delivery.storeBatch.store(delivery.createMessageRecord)
+ // User might be asking for ack that we have prcoessed the message..
+ val receipt = frame.header(Stomp.Headers.RECEIPT_REQUESTED)
+ if( receipt!=null ) {
+ delivery.ack = { storeTx =>
+ connection_sink.offer(StompFrame(Responses.RECEIPT, List((Stomp.Headers.Response.RECEIPT_ID, receipt))))
+ }
}
// routes can always accept at least 1 delivery...
@@ -299,24 +301,7 @@ class StompProtocolHandler extends Proto
// info("Dropping message. No consumers interested in message.")
}
- // User might be asking for ack that we have prcoessed the message..
- val receipt = frame.header(Stomp.Headers.RECEIPT_REQUESTED)
- if( receipt!=null ) {
- if( storeBatch==null ) {
- // message was not persistent we can ack back right away..
- connection_sink.offer(StompFrame(Responses.RECEIPT, List((Stomp.Headers.Response.RECEIPT_ID, receipt))))
- } else {
- // else lets ack back once the persistent operations are processed.
- storeBatch.setDisposer(^{
- connection_sink.offer(StompFrame(Responses.RECEIPT, List((Stomp.Headers.Response.RECEIPT_ID, receipt))))
- })
- }
- }
- if( storeBatch!=null ) {
- // We can now release the batch as we are done using it..
- storeBatch.release
- }
}
def on_stomp_subscribe(headers:HeaderMap) = {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala?rev=961134&r1=961133&r2=961134&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala Wed Jul 7 04:08:24 2010
@@ -60,10 +60,16 @@ trait StoreBatch extends Retained {
*/
def dequeue(entry:QueueEntryRecord)
+
+ /**
+ * Causes the batch to flush eagerly, callback is called once flushed.
+ */
+ def eagerFlush(callback: Runnable)
+
}
/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
trait Store extends Service {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/IntCounter.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/IntCounter.scala?rev=961134&r1=961133&r2=961134&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/IntCounter.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/IntCounter.scala Wed Jul 7 04:08:24 2010
@@ -22,7 +22,14 @@ package org.apache.activemq.apollo.util
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class IntCounter(private var value:Int = 0) {
+class IntCounter(private var value:Int = 0) extends MetricProducer[Int] {
+
+ def apply(reset:Boolean):Int = {
+ val rc = value
+ value = 0
+ rc
+ }
+ def clear() = value=0
def get() = value
Modified: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/LongCounter.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/LongCounter.scala?rev=961134&r1=961133&r2=961134&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/LongCounter.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/LongCounter.scala Wed Jul 7 04:08:24 2010
@@ -22,7 +22,14 @@ package org.apache.activemq.apollo.util
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class LongCounter(private var value:Long = 0) {
+class LongCounter(private var value:Long = 0) extends MetricProducer[Long] {
+
+ def apply(reset:Boolean):Long = {
+ val rc = value
+ value = 0
+ rc
+ }
+ def clear() = value=0
def get() = value
Copied: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/MetricProducer.scala (from r961133, activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/IntCounter.scala)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/MetricProducer.scala?p2=activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/MetricProducer.scala&p1=activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/IntCounter.scala&r1=961133&r2=961134&rev=961134&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/IntCounter.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/MetricProducer.scala Wed Jul 7 04:08:24 2010
@@ -17,28 +17,21 @@
package org.apache.activemq.apollo.util
/**
- * <p>
- * </p>
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ * The MetricProducer trait is implemented by objects which
+ * measure metrics.
*/
-class IntCounter(private var value:Int = 0) {
+trait MetricProducer[T] extends ((Boolean) => T) {
- def get() = value
+ /**
+ * The function which produces captures metric value and
+ * optionally clears it.
+ */
+ def apply(clear: Boolean):T
- def incrementAndGet() = addAndGet(1)
- def decrementAndGet() = addAndGet(-1)
- def addAndGet(amount:Int) = {
- value+=amount
- value
- }
+ /**
+ * Clears the metric value.
+ */
+ def clear()
+}
- def getAndIncrement() = getAndAdd(1)
- def getAndDecrement() = getAndAdd(-11)
- def getAndAdd(amount:Int) = {
- val rc = value
- value+=amount
- rc
- }
-}
\ No newline at end of file
Added: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/TimeCounter.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/TimeCounter.scala?rev=961134&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/TimeCounter.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/TimeCounter.scala Wed Jul 7 04:08:24 2010
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.util
+
+import java.util.concurrent.TimeUnit
+
+/**
+ * <p>A Timer collects time durations and produces a TimingMetric.</p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class TimeCounter extends MetricProducer[TimeMetric] {
+
+ private var maximum = Math.MIN_LONG
+ private var minimum = Math.MAX_LONG
+ private var total = 0L
+ private var count = 0
+
+ def apply(reset: Boolean):TimeMetric = {
+ val rc = TimeMetric(count, total, minimum, maximum)
+ if (reset) {
+ clear()
+ }
+ rc
+ }
+
+ def clear() = {
+ maximum = Math.MIN_INT
+ minimum = Math.MAX_INT
+ total = 0L
+ count = 0
+ }
+
+ /**
+ * Adds a duration to our current Timing.
+ */
+ def +=(value: Long): Unit = {
+ if (value > -1) {
+ maximum = value max maximum
+ minimum = value min minimum
+ total += value
+ count += 1
+ }
+ }
+
+ /**
+ *
+ */
+ def time[T](func: => T): T = {
+ val startTime = System.nanoTime
+ try {
+ func
+ } finally {
+ this += System.nanoTime - startTime
+ }
+ }
+
+ /**
+ *
+ */
+ def start[T](func: ( ()=>Unit )=> T): T = {
+ val startTime = System.nanoTime
+ def endFunc():Unit = {
+ val end = System.nanoTime
+ this += System.nanoTime - startTime
+ }
+ func(endFunc)
+ }
+}
+
+case class TimeMetric(count:Int, total:Long, minimum:Long, maximum:Long) {
+ def maxTime(unit:TimeUnit) = (maximum).toFloat / unit.toNanos(1)
+ def minTime(unit:TimeUnit) = (minimum).toFloat / unit.toNanos(1)
+ def totalTime(unit:TimeUnit) = (total).toFloat / unit.toNanos(1)
+ def avgTime(unit:TimeUnit) = if( count==0 ) 0f else totalTime(unit) / count
+ def avgFrequency(unit:TimeUnit) = 1.toFloat / avgTime(unit)
+}
\ No newline at end of file