You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:12:40 UTC
svn commit: r961163 - in /activemq/sandbox/activemq-apollo-actor:
activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/
activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/
activemq-hawtdb/src/main/scala/org/apache/...
Author: chirino
Date: Wed Jul 7 04:12:40 2010
New Revision: 961163
URL: http://svn.apache.org/viewvc?rev=961163&view=rev
Log:
A group of Flushed messages can now be tracked as a single Queue entry. Right now only used on restart when the initial queue contents are loaded from the store. Drastically reduced memory consumption needed for managing queues with a large number of entries. Updated store interfaces so to allow loading in groups. Renamed StoreBatch to StoreUOW
Added:
activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryGroup.java
activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/StoreUOW.scala
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala
activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreBenchmarkSupport.scala
activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=961163&r1=961162&r2=961163&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jul 7 04:12:40 2010
@@ -20,7 +20,7 @@ import _root_.org.apache.activemq.filter
import _root_.java.lang.{String}
import _root_.org.fusesource.hawtdispatch._
import org.fusesource.hawtbuf._
-import org.apache.activemq.broker.store.StoreBatch
+import org.apache.activemq.broker.store.StoreUOW
import org.apache.activemq.apollo.store.MessageRecord
import protocol.ProtocolFactory
@@ -149,13 +149,13 @@ class Delivery extends BaseRetained {
/**
* The transaction the delivery is participating in.
*/
- var storeBatch:StoreBatch = null
+ var uow:StoreUOW = null
/**
* Set if the producer requires an ack to be sent back. Consumer
* should execute once the message is processed.
*/
- var ack:(StoreBatch)=>Unit = null
+ var ack:(StoreUOW)=>Unit = null
def copy() = (new Delivery).set(this)
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961163&r1=961162&r2=961163&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul 7 04:12:40 2010
@@ -22,11 +22,11 @@ import collection.{SortedMap}
import org.fusesource.hawtdispatch.{ScalaDispatch, DispatchQueue, BaseRetained}
import org.apache.activemq.util.TreeMap.TreeEntry
import org.apache.activemq.util.list.{LinkedNodeList, LinkedNode}
-import org.apache.activemq.broker.store.{StoreBatch}
+import org.apache.activemq.broker.store.{StoreUOW}
import protocol.ProtocolFactory
-import org.apache.activemq.apollo.store.{QueueEntryRecord, MessageRecord}
import java.util.concurrent.TimeUnit
import java.util.{HashSet, Collections, ArrayList, LinkedList}
+import org.apache.activemq.apollo.store.{QueueEntryGroup, QueueEntryRecord, MessageRecord}
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -57,7 +57,7 @@ object Queue extends Log {
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class Queue(val host: VirtualHost, val destination: Destination) extends BaseRetained with Route with DeliveryConsumer with DispatchLogging {
+class Queue(val host: VirtualHost, val destination: Destination, val queueKey: Long = -1L) extends BaseRetained with Route with DeliveryConsumer with BaseService with DispatchLogging {
override protected def log = Queue
import Queue._
@@ -77,7 +77,7 @@ class Queue(val host: VirtualHost, val d
})
- val ack_source = createSource(new ListEventAggregator[(Subscription#AcquiredQueueEntry, StoreBatch)](), dispatchQueue)
+ val ack_source = createSource(new ListEventAggregator[(Subscription#AcquiredQueueEntry, StoreUOW)](), dispatchQueue)
ack_source.setEventHandler(^ {drain_acks});
ack_source.resume
@@ -95,7 +95,7 @@ class Queue(val host: VirtualHost, val d
var loading_size = 0
var flushing_size = 0
- var store_id: Long = -1L
+
//
// Tuning options.
@@ -123,15 +123,23 @@ class Queue(val host: VirtualHost, val d
var tune_slow_check_interval = 200L
/**
+ * Should this queue persistently store it's entries?
+ */
+ def tune_persistent = host.store !=null
+
+ /**
* Should messages be flushed or swapped out of memory if
* no consumers need the message?
*/
def tune_flush_to_store = tune_persistent
/**
- * Should this queue persistently store it's entries?
+ * The number max number of flushed queue entries to load
+ * for the store at a time. Not that Flushed entires are just
+ * reference pointers to the actual messages. When not loaded,
+ * the batch is referenced as sequence range to conserve memory.
*/
- def tune_persistent = host.store !=null
+ def tune_entry_group_size = 10000
/**
* The number of intervals that a consumer must not meeting the subscription rate before it is
@@ -146,31 +154,72 @@ class Queue(val host: VirtualHost, val d
var nack_item_counter = 0L
var nack_size_counter = 0L
+ var flushed_items = 0L
+
def queue_size = enqueue_size_counter - dequeue_size_counter
def queue_items = enqueue_item_counter - dequeue_item_counter
private var capacity = tune_producer_buffer
var size = 0
- schedual_slow_consumer_check
+ protected def _start(onCompleted: Runnable) = {
- def restore(storeId: Long, records:Seq[QueueEntryRecord]) = ^{
- this.store_id = storeId
- if( !records.isEmpty ) {
+ def completed: Unit = {
+ // by the time this is run, consumers and producers may have already joined.
+ onCompleted.run
+ display_stats
+ schedual_slow_consumer_check
+ // wake up the producers to fill us up...
+ if (messages.refiller != null) {
+ messages.refiller.run
+ }
+
+ // kick of dispatching to the consumers.
+ all_subscriptions.valuesIterator.foreach( _.refill_prefetch )
+ dispatchQueue << head_entry
+ }
+
+ if( tune_persistent ) {
+ host.store.listQueueEntryGroups(queueKey, tune_entry_group_size) { groups=>
+ dispatchQueue {
+ if( !groups.isEmpty ) {
+
+ // adjust the head tombstone.
+ head_entry.as_tombstone.count = groups.head.firstSeq
+
+ var last:QueueEntryGroup = null
+ groups.foreach { group =>
+
+ // if groups were not adjacent.. create tombstone entry.
+ if( last!=null && (last.lastSeq+1 != group.firstSeq) ) {
+ val entry = new QueueEntry(Queue.this, last.lastSeq+1)
+ entry.tombstone.as_tombstone.count = group.firstSeq - (last.lastSeq+1)
+ entries.addLast(entry)
+ }
- // adjust the head tombstone.
- head_entry.as_tombstone.count = records.head.queueSeq
+ val entry = new QueueEntry(Queue.this, group.firstSeq).init(group)
+ entries.addLast(entry)
- records.foreach { qer =>
- val entry = new QueueEntry(Queue.this,qer.queueSeq).init(qer)
- entries.addLast(entry)
- }
+ message_seq_counter = group.lastSeq
+ enqueue_item_counter += group.count
+ enqueue_size_counter += group.size
- message_seq_counter = records.last.queueSeq+1
- enqueue_item_counter += records.size
- debug("restored: "+records.size )
+ last = group
+ }
+
+ debug("restored: "+enqueue_item_counter)
+ }
+ completed
+ }
+ }
+ } else {
+ completed
}
- } >>: dispatchQueue
+ }
+
+ protected def _stop(onCompleted: Runnable) = {
+ throw new AssertionError("Not implemented.");
+ }
def addCapacity(amount:Int) = {
capacity += amount
@@ -180,10 +229,7 @@ class Queue(val host: VirtualHost, val d
var refiller: Runnable = null
- def full = if(size >= capacity)
- true
- else
- false
+ def full = (size >= capacity) || !serviceState.isStarted
def offer(delivery: Delivery): Boolean = {
if (full) {
@@ -196,7 +242,7 @@ class Queue(val host: VirtualHost, val d
entry.init(queueDelivery)
if( tune_persistent ) {
- queueDelivery.storeBatch = delivery.storeBatch
+ queueDelivery.uow = delivery.uow
}
entries.addLast(entry)
@@ -204,11 +250,10 @@ class Queue(val host: VirtualHost, val d
enqueue_size_counter += entry.size
// Do we need to do a persistent enqueue???
- if (queueDelivery.storeBatch != null) {
- queueDelivery.storeBatch.enqueue(entry.toQueueEntryRecord)
+ if (queueDelivery.uow != null) {
+ entry.as_loaded.store
}
-
def haveQuickConsumer = fast_subscriptions.find( sub=> sub.pos.seq <= entry.seq ).isDefined
var dispatched = false
@@ -222,9 +267,9 @@ class Queue(val host: VirtualHost, val d
}
// release the store batch...
- if (queueDelivery.storeBatch != null) {
- queueDelivery.storeBatch.release
- queueDelivery.storeBatch = null
+ if (queueDelivery.uow != null) {
+ queueDelivery.uow.release
+ queueDelivery.uow = null
}
true
@@ -244,20 +289,29 @@ class Queue(val host: VirtualHost, val d
var total_items = 0L
var total_size = 0L
while (cur != null) {
- if (cur.is_loaded || cur.hasSubs || cur.is_prefetched) {
+ if (cur.is_loaded || cur.hasSubs || cur.is_prefetched || cur.is_flushed_group ) {
info(" => " + cur)
}
+
+ total_size += cur.size
if (cur.is_flushed || cur.is_loaded) {
total_items += 1
- total_size += cur.size
+ } else if (cur.is_flushed_group ) {
+ total_items += cur.as_flushed_group.items
}
+
cur = cur.getNext
}
info("tail: " + tail_entry)
// sanitiy checks..
- assert(total_items == queue_items)
- assert(total_size == queue_size)
+ if(total_items != queue_items) {
+ warn("queue_items mismatch, found %d, expected %d", total_size, queue_items)
+ }
+ if(total_size != queue_size) {
+ warn("queue_size mismatch, found %d, expected %d", total_size, queue_size)
+
+ }
}
def schedual_slow_consumer_check:Unit = {
@@ -269,14 +323,14 @@ class Queue(val host: VirtualHost, val d
check_counter += 1
if( (check_counter%10)==0 ) {
-// display_stats
+ display_stats
}
-// if( (check_counter%100)==0 ) {
+ if( (check_counter%25)==0 ) {
// if (!all_subscriptions.isEmpty) {
// display_active_entries
// }
-// }
+ }
// target tune_min_subscription_rate / sec
val slow_cursor_delta = (((tune_slow_subscription_rate) * tune_slow_check_interval) / 1000).toInt
@@ -413,8 +467,8 @@ class Queue(val host: VirtualHost, val d
false
} else {
- if( tune_persistent && delivery.storeBatch!=null ) {
- delivery.storeBatch.retain
+ if( tune_persistent && delivery.uow!=null ) {
+ delivery.uow.retain
}
val rc = session.offer(delivery)
assert(rc, "session should accept since it was not full")
@@ -567,7 +621,7 @@ class QueueEntry(val queue:Queue, val se
def is_prefetched = prefetched>0
def init(delivery:Delivery):QueueEntry = {
- this.state = new Loaded(delivery)
+ this.state = new Loaded(delivery, false)
queue.size += size
this
}
@@ -577,6 +631,13 @@ class QueueEntry(val queue:Queue, val se
this
}
+ def init(group:QueueEntryGroup):QueueEntry = {
+ val count = (((group.lastSeq+1)-group.firstSeq)).toInt
+ val tombstones = count-group.count
+ this.state = new FlushedGroup(count, group.size, tombstones)
+ this
+ }
+
def hasSubs = !(subscriptions == Nil )
/**
@@ -614,7 +675,7 @@ class QueueEntry(val queue:Queue, val se
def toQueueEntryRecord = {
val qer = new QueueEntryRecord
- qer.queueKey = queue.store_id
+ qer.queueKey = queue.queueKey
qer.queueSeq = seq
qer.messageKey = state.messageKey
qer.size = state.size
@@ -634,6 +695,7 @@ class QueueEntry(val queue:Queue, val se
// What state is it in?
def as_tombstone = this.state.as_tombstone
def as_flushed = this.state.as_flushed
+ def as_flushed_group = this.state.as_flushed_group
def as_loaded = this.state.as_loaded
def as_tail = this.state.as_tail
@@ -642,6 +704,7 @@ class QueueEntry(val queue:Queue, val se
def is_loaded = as_loaded!=null
def is_flushed = as_flushed!=null
+ def is_flushed_group = as_flushed_group!=null
def is_tombstone = as_tombstone!=null
// These should not change the current state.
@@ -662,6 +725,7 @@ class QueueEntry(val queue:Queue, val se
def as_tail:Tail = null
def as_loaded:Loaded = null
def as_flushed:Flushed = null
+ def as_flushed_group:FlushedGroup = null
def as_tombstone:Tombstone = null
def size:Int
@@ -673,9 +737,8 @@ class QueueEntry(val queue:Queue, val se
def flush = entry
- def tombstone = {
-
- var refill_preftch_list = List[Subscription]()
+ def prefetch_remove = {
+ var rc = List[Subscription]()
if( queue.tune_flush_to_store ) {
// Update the prefetch counter to reflect that this entry is no longer being prefetched.
var cur = entry
@@ -684,7 +747,7 @@ class QueueEntry(val queue:Queue, val se
(cur.subscriptions).foreach { sub =>
if( sub.is_prefetched(entry) ) {
sub.remove_from_prefetch(entry)
- refill_preftch_list ::= sub
+ rc ::= sub
}
}
}
@@ -692,6 +755,12 @@ class QueueEntry(val queue:Queue, val se
}
assert(!is_prefetched, "entry should not be prefetched.")
}
+ rc
+ }
+
+ def tombstone = {
+
+ var refill_preftch_list = prefetch_remove
// if rv and lv are both adjacent tombstones, then this merges the rv
// tombstone into lv, unlinks rv, and returns lv, otherwise it returns
@@ -753,7 +822,7 @@ class QueueEntry(val queue:Queue, val se
* This state is used while a message is loaded in memory. A message must be in this state
* before it can be dispatched to a consumer. It can transition to Flushed or Tombstone.
*/
- class Loaded(val delivery: Delivery) extends EntryState {
+ class Loaded(val delivery: Delivery, var store_completed:Boolean) extends EntryState {
var acquired = false
def messageKey = delivery.storeKey
@@ -768,34 +837,58 @@ class QueueEntry(val queue:Queue, val se
override def as_loaded = this
- def store() = {
- if( delivery.storeKey == -1 ) {
- val tx = queue.host.store.createStoreBatch
- delivery.storeKey = tx.store(delivery.createMessageRecord)
- tx.enqueue(toQueueEntryRecord)
- tx.release
- }
+ def store = {
+ delivery.uow.enqueue(toQueueEntryRecord)
+ delivery.uow.onComplete(^{
+ queue.store_flush_source.merge(this)
+ })
}
override def flush() = {
- if( queue.tune_flush_to_store && !flushing ) {
- flushing=true
- queue.flushing_size+=size
- if( delivery.storeBatch!=null ) {
- delivery.storeBatch.eagerFlush(^{
- queue.store_flush_source.merge(this)
- })
+ if( queue.tune_flush_to_store ) {
+ if( store_completed ) {
+ flushing=true
+ flushed
} else {
- store
- queue.host.store.flushMessage(messageKey) {
- queue.store_flush_source.merge(this)
+ if( !flushing ) {
+ flushing=true
+ queue.flushing_size+=size
+
+ // The storeBatch is only set when called from the messages.offer method
+ if( delivery.uow!=null ) {
+ delivery.uow.completeASAP
+ } else {
+
+ // Are swapping out a non-persistent message?
+ if( delivery.storeKey == -1 ) {
+
+ delivery.uow = queue.host.store.createStoreUOW
+ val uow = delivery.uow
+ delivery.storeKey = uow.store(delivery.createMessageRecord)
+ store
+ uow.completeASAP
+ uow.release
+ delivery.uow = null
+
+ } else {
+
+ queue.host.store.flushMessage(messageKey) {
+ queue.store_flush_source.merge(this)
+ }
+
+ }
+
+ }
}
}
}
+
entry
}
def flushed() = {
+ store_completed = true
+ delivery.uow = null
if( flushing ) {
queue.flushing_size-=size
queue.size -= size
@@ -955,13 +1048,101 @@ class QueueEntry(val queue:Queue, val se
}
+
+ class FlushedGroup(
+ /** The number of adjacent entries this FlushedGroup represents. */
+ var count:Long,
+ /** size in bytes of the group */
+ var size:Int,
+ /** The number of tombstone entries in the groups */
+ var tombstones:Int ) extends EntryState {
+
+
+ def items = count - tombstones
+
+ def messageKey = -1
+
+ var loading = false
+
+ override def as_flushed_group = this
+
+ override def is_flushed_or_flushing = true
+
+ override def toString = { "flushed_group:{ loading: "+loading+", items: "+items+", size: "+size+"}" }
+
+ // Flushed entries can't be dispatched until
+ // they get loaded.
+ def dispatch():QueueEntry = {
+ null
+ }
+
+ override def load() = {
+ if( !loading ) {
+ loading = true
+ queue.host.store.listQueueEntries(queue.queueKey, seq, seq+count-1) { records =>
+ queue.dispatchQueue {
+
+ var item_count=0
+ var size_count=0
+ var last:QueueEntryRecord = null
+
+ val tmpList = new LinkedNodeList[QueueEntry]()
+ records.foreach { record =>
+
+ // if entries were not adjacent.. create tombstone entry.
+ if( last!=null && (last.queueSeq+1 != record.queueSeq) ) {
+ val entry = new QueueEntry(queue, last.queueSeq+1)
+ entry.tombstone.as_tombstone.count = record.queueSeq - (last.queueSeq+1)
+ tmpList.addLast(entry)
+ }
+
+ val entry = new QueueEntry(queue, record.queueSeq).init(record)
+ tmpList.addLast(entry)
+
+ item_count += 1
+ size_count += record.size
+ last = record
+ }
+
+ // we may need to adjust the enqueue count if entries
+ // were dropped at the store level
+ var item_delta = (items - item_count)
+ val size_delta: Int = size - size_count
+
+ if ( item_delta!=0 || size_delta!=0 ) {
+ assert(item_delta <= 0)
+ assert(size_delta <= 0)
+ info("Detected store dropped %d message(s) in seq range %d to %d using %d bytes", item_delta, seq, seq+count-1, size_delta)
+ queue.enqueue_item_counter += item_delta
+ queue.enqueue_size_counter += size_delta
+ }
+
+ var refill_preftch_list = prefetch_remove
+
+ linkAfter(tmpList)
+ val next = getNext
+
+ // move the subs to the first entry that we just loaded.
+ subscriptions.foreach(_.advance(next))
+ next.addSubscriptions(subscriptions)
+
+ unlink
+ refill_preftch_list.foreach( _.refill_prefetch )
+ }
+ }
+ }
+ entry
+ }
+
+ override def tombstone = {
+ throw new AssertionError("Flush group cannbot be tombstone.");
+ }
+ }
+
/**
* Entries in the Flushed state are not holding the referenced messages in memory anymore.
* This state can transition to Loaded or Tombstone.
*
- * TODO: Add a new FlushedList state which can be used to merge multiple
- * adjacent Flushed entries into a single FlushedList state. This would allow us
- * to support queues with millions of flushed entries without much memory impact.
*/
class Flushed(val messageKey:Long, val size:Int) extends EntryState {
@@ -1021,7 +1202,7 @@ class QueueEntry(val queue:Queue, val se
delivery.storeKey = messageRecord.key
queue.size += size
- state = new Loaded(delivery)
+ state = new Loaded(delivery, true)
} else {
// debug("Ignoring store load of: ", messageKey)
}
@@ -1037,6 +1218,9 @@ class QueueEntry(val queue:Queue, val se
super.tombstone
}
}
+
+
+
}
@@ -1073,12 +1257,13 @@ class Subscription(queue:Queue) extends
pos = queue.head_entry;
session = consumer.connect(this)
session.refiller = pos
-
queue.head_entry.addSubscriptions(this :: Nil)
- refill_prefetch
- // kick off the initial dispatch.
- queue.dispatchQueue << queue.head_entry
+ if( queue.serviceState.isStarted ) {
+ // kick off the initial dispatch.
+ refill_prefetch
+ queue.dispatchQueue << queue.head_entry
+ }
}
def close() = {
@@ -1223,11 +1408,11 @@ class Subscription(queue:Queue) extends
acquired.addLast(this)
acquired_size += entry.size
- def ack(sb:StoreBatch) = {
+ def ack(sb:StoreUOW) = {
if (entry.messageKey != -1) {
val storeBatch = if( sb == null ) {
- queue.host.store.createStoreBatch
+ queue.host.store.createStoreUOW
} else {
sb
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=961163&r1=961162&r2=961163&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Wed Jul 7 04:12:40 2010
@@ -308,8 +308,8 @@ class DeliveryProducerRoute(val router:R
if( target.consumer.matches(delivery) ) {
if( storeOnMatch ) {
- delivery.storeBatch = router.host.store.createStoreBatch
- delivery.storeKey = delivery.storeBatch.store(delivery.createMessageRecord)
+ delivery.uow = router.host.store.createStoreUOW
+ delivery.storeKey = delivery.uow.store(delivery.createMessageRecord)
storeOnMatch = false
}
@@ -330,14 +330,14 @@ class DeliveryProducerRoute(val router:R
private def delivered(delivery: Delivery): Unit = {
if (delivery.ack != null) {
- if (delivery.storeBatch != null) {
- delivery.storeBatch.setDisposer(^ {delivery.ack(null)})
+ if (delivery.uow != null) {
+ delivery.uow.setDisposer(^ {delivery.ack(null)})
} else {
delivery.ack(null)
}
}
- if (delivery.storeBatch != null) {
- delivery.storeBatch.release
+ if (delivery.uow != null) {
+ delivery.uow.release
}
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=961163&r1=961162&r2=961163&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jul 7 04:12:40 2010
@@ -146,14 +146,13 @@ class VirtualHost(val broker: Broker) ex
store.getQueueStatus(queueKey) { x =>
x match {
case Some(info)=>
- store.listQueueEntries(queueKey) { entries=>
- dispatchQueue ^{
- val dest = DestinationParser.parse(info.record.name, destination_parser_options)
- val queue = new Queue(this, dest)
- queue.restore(queueKey, entries)
- queues.put(dest.getName, queue)
- task.run
- }
+
+ dispatchQueue ^{
+ val dest = DestinationParser.parse(info.record.name, destination_parser_options)
+ val queue = new Queue(this, dest, queueKey)
+ queue.start
+ queues.put(dest.getName, queue)
+ task.run
}
case _ =>
task.run
@@ -267,11 +266,11 @@ class VirtualHost(val broker: Broker) ex
store.addQueue(record) { rc =>
rc match {
case Some(queueKey) =>
- dispatchQueue ^ {
- val queue = new Queue(this, dest)
- queue.restore(queueKey, Nil)
+ dispatchQueue {
+ val queue = new Queue(this, dest, queueKey)
queues.put(dest.getName, queue)
cb(queue)
+ queue.start()
}
case None => // store could not create
cb(null)
Modified: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala?rev=961163&r1=961162&r2=961163&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala Wed Jul 7 04:12:40 2010
@@ -22,7 +22,8 @@ import com.shorrockin.cascal.utils.Conve
import java.util.{HashMap}
import org.fusesource.hawtbuf.AsciiBuffer._
import org.fusesource.hawtbuf.{AsciiBuffer, DataByteArrayInputStream, DataByteArrayOutputStream, Buffer}
-import org.apache.activemq.apollo.store.{QueueEntryRecord, QueueStatus, MessageRecord, QueueRecord}
+import org.apache.activemq.apollo.store._
+import collection.mutable.ListBuffer
/**
*
@@ -161,7 +162,7 @@ class CassandraClient() {
}
- def store(txs:Seq[CassandraStore#CassandraBatch]) {
+ def store(txs:Seq[CassandraStore#CassandraUOW]) {
withSession {
session =>
var operations = List[Operation]()
@@ -205,12 +206,43 @@ class CassandraClient() {
}
}
- def getQueueEntries(qid: Long): Seq[QueueEntryRecord] = {
+ def listQueueEntryGroups(queueKey: Long, limit: Int): Seq[QueueEntryGroup] = {
withSession {
session =>
- session.list(schema.entries \ qid).map { x=>
+ var rc = ListBuffer[QueueEntryGroup]()
+ var group:QueueEntryGroup = null
+
+ // TODO: this is going to bring back lots of entries.. not good.
+ session.list(schema.entries \ queueKey).foreach { x=>
+
+ val record:QueueEntryRecord = x.value
+
+ if( group == null ) {
+ group = new QueueEntryGroup
+ group.firstSeq = record.queueSeq
+ }
+ group.lastSeq = record.queueSeq
+ group.count += 1
+ group.size += record.size
+ if( group.count == limit) {
+ rc += group
+ group = null
+ }
+ }
+
+ if( group!=null ) {
+ rc += group
+ }
+ rc
+ }
+ }
+
+ def getQueueEntries(queueKey: Long, firstSeq:Long, lastSeq:Long): Seq[QueueEntryRecord] = {
+ withSession {
+ session =>
+ session.list(schema.entries \ queueKey, RangePredicate(firstSeq, lastSeq)).map { x=>
val rc:QueueEntryRecord = x.value
- rc.queueKey = qid
+ rc.queueKey = queueKey
rc.queueSeq = x.name
rc
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala?rev=961163&r1=961162&r2=961163&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala Wed Jul 7 04:12:40 2010
@@ -16,13 +16,12 @@
*/
package org.apache.activemq.broker.store.cassandra
-import org.apache.activemq.broker.store.{StoreBatch, Store}
+import org.apache.activemq.broker.store.{StoreUOW, Store}
import org.fusesource.hawtdispatch.BaseRetained
import com.shorrockin.cascal.session._
import java.util.concurrent.atomic.AtomicLong
import collection.mutable.ListBuffer
import java.util.HashMap
-import org.apache.activemq.apollo.store.{QueueEntryRecord, MessageRecord, QueueStatus, QueueRecord}
import org.apache.activemq.apollo.broker.{Logging, Log, BaseService}
import org.apache.activemq.apollo.dto.{CassandraStoreDTO, StoreDTO}
import collection.{JavaConversions, Seq}
@@ -32,6 +31,7 @@ import org.fusesource.hawtdispatch.Scala
import ReporterLevel._
import java.util.concurrent._
import org.apache.activemq.apollo.util.{TimeCounter, IntCounter}
+import org.apache.activemq.apollo.store._
object CassandraStore extends Log {
@@ -195,24 +195,31 @@ class CassandraStore extends Store with
}
- def listQueueEntries(id: Long)(callback: (Seq[QueueEntryRecord]) => Unit) = {
+ def listQueueEntryGroups(queueKey: Long, limit: Int)(callback: (Seq[QueueEntryGroup]) => Unit) = {
blocking {
- callback( client.getQueueEntries(id) )
+ callback( client.listQueueEntryGroups(queueKey, limit) )
+ }
+ }
+
+
+ def listQueueEntries(queueKey: Long, firstSeq: Long, lastSeq: Long)(callback: (Seq[QueueEntryRecord]) => Unit) = {
+ blocking {
+ callback( client.getQueueEntries(queueKey, firstSeq, lastSeq) )
}
}
def flushMessage(id: Long)(callback: => Unit) = ^{
- val action: CassandraBatch#MessageAction = pendingStores.get(id)
+ val action: CassandraUOW#MessageAction = pendingStores.get(id)
if( action == null ) {
callback
} else {
- action.tx.eagerFlush(callback _)
- flush(action.tx.txid)
+ action.uow.onComplete(callback _)
+ flush(action.uow.uow_id)
}
} >>: dispatchQueue
- def createStoreBatch() = new CassandraBatch
+ def createStoreUOW() = new CassandraUOW
/////////////////////////////////////////////////////////////////////
@@ -220,7 +227,7 @@ class CassandraStore extends Store with
// Implementation of the StoreBatch interface
//
/////////////////////////////////////////////////////////////////////
- class CassandraBatch extends BaseRetained with StoreBatch {
+ class CassandraUOW extends BaseRetained with StoreUOW {
class MessageAction {
@@ -229,22 +236,29 @@ class CassandraStore extends Store with
var enqueues = ListBuffer[QueueEntryRecord]()
var dequeues = ListBuffer[QueueEntryRecord]()
- def tx = CassandraBatch.this
+ def uow = CassandraUOW.this
def isEmpty() = store==null && enqueues==Nil && dequeues==Nil
def cancel() = {
- tx.rm(msg)
- if( tx.isEmpty ) {
- tx.cancel
+ uow.rm(msg)
+ if( uow.isEmpty ) {
+ uow.cancel
}
}
}
- val txid:Int = next_tx_id.getAndIncrement
+ val uow_id:Int = next_uow_id.getAndIncrement
var actions = Map[Long, MessageAction]()
var flushing= false
- var flushListeners = ListBuffer[Runnable]()
- def eagerFlush(callback: Runnable) = if( callback!=null ) { this.synchronized { flushListeners += callback } }
+ var completeListeners = ListBuffer[Runnable]()
+
+ def onComplete(callback: Runnable) = if( callback!=null ) { this.synchronized { completeListeners += callback } }
+
+ var disableDelay = false
+ def completeASAP() = this.synchronized { disableDelay=true }
+
+ def delayable = !disableDelay
+
def rm(msg:Long) = {
actions -= msg
@@ -252,7 +266,7 @@ class CassandraStore extends Store with
def isEmpty = actions.isEmpty
def cancel = {
- delayedTransactions.remove(txid)
+ delayedUOWs.remove(uow_id)
onPerformed
}
@@ -298,12 +312,12 @@ class CassandraStore extends Store with
}
override def dispose = {
- transaction_source.merge(this)
+ uow_source.merge(this)
}
def onPerformed() {
- flushListeners.foreach { x=>
+ completeListeners.foreach { x=>
x.run()
}
super.dispose
@@ -312,29 +326,29 @@ class CassandraStore extends Store with
def key(x:QueueEntryRecord) = (x.queueKey, x.queueSeq)
- val transaction_source = createSource(new ListEventAggregator[CassandraBatch](), dispatchQueue)
- transaction_source.setEventHandler(^{drain_transactions});
- transaction_source.resume
-
- var pendingStores = new HashMap[Long, CassandraBatch#MessageAction]()
- var pendingEnqueues = new HashMap[(Long,Long), CassandraBatch#MessageAction]()
- var delayedTransactions = new HashMap[Int, CassandraBatch]()
+ val uow_source = createSource(new ListEventAggregator[CassandraUOW](), dispatchQueue)
+ uow_source.setEventHandler(^{drain_uows});
+ uow_source.resume
+
+ var pendingStores = new HashMap[Long, CassandraUOW#MessageAction]()
+ var pendingEnqueues = new HashMap[(Long,Long), CassandraUOW#MessageAction]()
+ var delayedUOWs = new HashMap[Int, CassandraUOW]()
- var next_tx_id = new IntCounter(1)
+ var next_uow_id = new IntCounter(1)
- def drain_transactions = {
- transaction_source.getData.foreach { tx =>
+ def drain_uows = {
+ uow_source.getData.foreach { uow =>
- val tx_id = tx.txid
- delayedTransactions.put(tx_id, tx)
+ val uow_id = uow.uow_id
+ delayedUOWs.put(uow_id, uow)
- tx.actions.foreach { case (msg, action) =>
+ uow.actions.foreach { case (msg, action) =>
// dequeues can cancel out previous enqueues
action.dequeues.foreach { currentDequeue=>
val currentKey = key(currentDequeue)
- val prevAction:CassandraBatch#MessageAction = pendingEnqueues.remove(currentKey)
- if( prevAction!=null && !prevAction.tx.flushing ) {
+ val prevAction:CassandraUOW#MessageAction = pendingEnqueues.remove(currentKey)
+ if( prevAction!=null && !prevAction.uow.flushing ) {
// yay we can cancel out a previous enqueue
prevAction.enqueues = prevAction.enqueues.filterNot( x=> key(x) == currentKey )
@@ -359,17 +373,17 @@ class CassandraStore extends Store with
}
}
- if( !tx.flushListeners.isEmpty || config.flushDelay <= 0 ) {
- flush(tx_id)
+ if( !uow.completeListeners.isEmpty || config.flushDelay <= 0 ) {
+ flush(uow_id)
} else {
- dispatchQueue.dispatchAfter(config.flushDelay, TimeUnit.MILLISECONDS, ^{flush(tx_id)})
+ dispatchQueue.dispatchAfter(config.flushDelay, TimeUnit.MILLISECONDS, ^{flush(uow_id)})
}
}
}
- def flush(tx_id:Int) = {
- flush_source.merge(tx_id)
+ def flush(uow_id:Int) = {
+ flush_source.merge(uow_id)
}
val flush_source = createSource(new ListEventAggregator[Int](), dispatchQueue)
@@ -382,27 +396,27 @@ class CassandraStore extends Store with
return
}
- val txs = flush_source.getData.flatMap{ tx_id =>
- val tx = delayedTransactions.remove(tx_id)
+ val uows = flush_source.getData.flatMap{ uow_id =>
+ val uow = delayedUOWs.remove(uow_id)
// Message may be flushed or canceled before the timeout flush event..
- // tx may be null in those cases
- if (tx!=null) {
- tx.flushing = true
- Some(tx)
+ // uow may be null in those cases
+ if (uow!=null) {
+ uow.flushing = true
+ Some(uow)
} else {
None
}
}
- if( !txs.isEmpty ) {
+ if( !uows.isEmpty ) {
storeLatency.start { end =>
blocking {
- client.store(txs)
+ client.store(uows)
dispatchQueue {
end()
- txs.foreach { tx=>
+ uows.foreach { uow=>
- tx.actions.foreach { case (msg, action) =>
+ uow.actions.foreach { case (msg, action) =>
if( action.store!=null ) {
pendingStores.remove(msg)
}
@@ -412,7 +426,7 @@ class CassandraStore extends Store with
}
}
- tx.onPerformed
+ uow.onPerformed
}
}
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala?rev=961163&r1=961162&r2=961163&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala Wed Jul 7 04:12:40 2010
@@ -20,12 +20,10 @@ import java.{lang=>jl}
import java.{util=>ju}
import model.{AddQueue, AddQueueEntry, AddMessage}
-import org.apache.activemq.apollo.store.{QueueEntryRecord, QueueStatus, MessageRecord}
import org.apache.activemq.apollo.dto.HawtDBStoreDTO
import java.io.File
import java.io.IOException
import java.util.concurrent.TimeUnit
-import org.apache.activemq.apollo.store.QueueRecord
import org.fusesource.hawtbuf.proto.MessageBuffer
import org.fusesource.hawtbuf.proto.PBMessage
import org.apache.activemq.util.LockFile
@@ -40,11 +38,12 @@ import org.fusesource.hawtbuf._
import org.fusesource.hawtdispatch.ScalaDispatch._
import collection.mutable.{LinkedHashMap, HashMap, ListBuffer}
import collection.JavaConversions
-import java.util.{TreeSet, HashSet}
+import ju.{TreeSet, HashSet}
import org.fusesource.hawtdb.api._
import org.apache.activemq.apollo.broker.{DispatchLogging, Log, Logging, BaseService}
import org.apache.activemq.apollo.util.TimeCounter
+import org.apache.activemq.apollo.store._
object HawtDBClient extends Log {
val BEGIN = -1
@@ -134,35 +133,38 @@ class HawtDBClient(hawtDBStore: HawtDBSt
func
} else {
info("Database " + lockFileName + " is locked... waiting " + (DATABASE_LOCKED_WAIT_DELAY / 1000) + " seconds for the database to be unlocked.")
- dispatchQueue.dispatchAfter(DATABASE_LOCKED_WAIT_DELAY, TimeUnit.MILLISECONDS, ^ {lock(func _)})
+ dispatchQueue.dispatchAfter(DATABASE_LOCKED_WAIT_DELAY, TimeUnit.MILLISECONDS, ^ {
+ hawtDBStore.executor_pool {
+ lock(func _)
+ }
+ })
}
}
}
- def createJournal() = {
- val journal = new Journal()
- journal.setDirectory(directory)
- journal.setMaxFileLength(config.journalLogSize)
- journal
- }
-
val schedual_version = new AtomicInteger()
def start(onComplete:Runnable) = {
lock {
- journal = createJournal()
+ journal = new Journal()
+ journal.setDirectory(directory)
+ journal.setMaxFileLength(config.journalLogSize)
+ journal.setMaxWriteBatchSize(config.journalBatchSize);
+ journal.setChecksum(true);
+
+ if( config.archiveDirectory!=null ) {
+ journal.setDirectoryArchive(config.archiveDirectory)
+ journal.setArchiveDataLogs(true)
+ }
journal.start
pageFileFactory.setFile(new File(directory, "db"))
pageFileFactory.setDrainOnClose(false)
pageFileFactory.setSync(true)
pageFileFactory.setUseWorkerThread(true)
- pageFileFactory.setPageSize(512.toShort)
-
- // Empirically found (using profiler) that a cached BTree page retains
- // about 4000 bytes of mem ON 64 bit platform.
- pageFileFactory.setCacheSize((1024*1024*20)/4000 );
+ pageFileFactory.setPageSize(config.indexPageSize)
+ pageFileFactory.setCacheSize(config.indexCacheSize);
pageFileFactory.open()
@@ -222,7 +224,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
_store(update, callback)
}
- def store(txs: Seq[HawtDBStore#HawtDBBatch], callback:Runnable) {
+ def store(txs: Seq[HawtDBStore#HawtDBUOW], callback:Runnable) {
var batch = ListBuffer[TypeCreatable]()
txs.foreach {
tx =>
@@ -239,9 +241,9 @@ class HawtDBClient(hawtDBStore: HawtDBSt
}
action.dequeues.foreach {
queueEntry =>
- val qid = queueEntry.queueKey
- val seq = queueEntry.queueSeq
- batch += new RemoveQueueEntry.Bean().setQueueKey(qid).setQueueSeq(seq)
+ val queueKey = queueEntry.queueKey
+ val queueSeq = queueEntry.queueSeq
+ batch += new RemoveQueueEntry.Bean().setQueueKey(queueKey).setQueueSeq(queueSeq)
}
}
}
@@ -254,15 +256,17 @@ class HawtDBClient(hawtDBStore: HawtDBSt
}
def listQueues: Seq[Long] = {
+ val rc = ListBuffer[Long]()
withTx { tx =>
- val helper = new TxHelper(tx)
- import JavaConversions._
- import helper._
- queueIndex.iterator.map {
- entry =>
- entry.getKey.longValue
- }.toSeq
+ val helper = new TxHelper(tx)
+ import JavaConversions._
+ import helper._
+
+ queueIndex.iterator.foreach { entry =>
+ rc += entry.getKey.longValue
+ }
}
+ rc
}
def getQueueStatus(queueKey: Long): Option[QueueStatus] = {
@@ -292,27 +296,69 @@ class HawtDBClient(hawtDBStore: HawtDBSt
}
}
-
- def getQueueEntries(queueKey: Long): Seq[QueueEntryRecord] = {
+ def listQueueEntryGroups(queueKey: Long, limit: Int) : Seq[QueueEntryGroup] = {
withTx { tx =>
val helper = new TxHelper(tx)
import JavaConversions._
import helper._
+ import Predicates._
val queueRecord = queueIndex.get(queueKey)
if (queueRecord != null) {
val entryIndex = queueEntryIndex(queueRecord)
- entryIndex.iterator.map {
- entry =>
- val rc: QueueEntryRecord = entry.getValue
- rc
- }.toSeq
+
+ var rc = ListBuffer[QueueEntryGroup]()
+ var group:QueueEntryGroup = null
+
+ entryIndex.iterator.foreach { entry =>
+ if( group == null ) {
+ group = new QueueEntryGroup
+ group.firstSeq = entry.getKey.longValue
+ }
+ group.lastSeq = entry.getKey.longValue
+ group.count += 1
+ group.size += entry.getValue.getSize
+ if( group.count == limit) {
+ rc += group
+ group = null
+ }
+ }
+
+ if( group!=null ) {
+ rc += group
+ }
+ rc
} else {
- Nil.toSeq
+ null
}
}
}
+ def getQueueEntries(queueKey: Long, firstSeq:Long, lastSeq:Long): Seq[QueueEntryRecord] = {
+ var rc = ListBuffer[QueueEntryRecord]()
+ withTx { tx =>
+ val helper = new TxHelper(tx)
+ import JavaConversions._
+ import helper._
+ import Predicates._
+
+ val queueRecord = queueIndex.get(queueKey)
+ if (queueRecord != null) {
+ val entryIndex = queueEntryIndex(queueRecord)
+
+ val where = and(gte(new jl.Long(firstSeq)), lte(new jl.Long(lastSeq)))
+ entryIndex.iterator( where ).foreach {
+ entry =>
+ val record: QueueEntryRecord = entry.getValue
+ rc += record
+ }
+ } else {
+ rc = null
+ }
+ }
+ rc
+ }
+
val metric_load_from_index = new TimeCounter
val metric_load_from_journal = new TimeCounter
@@ -405,6 +451,9 @@ class HawtDBClient(hawtDBStore: HawtDBSt
private def _store(update: TypeCreatable, onComplete: Runnable): Unit = _store(-1, update, onComplete)
+ val metric_journal_append = new TimeCounter
+ val metric_index_update = new TimeCounter
+
/**
* All updated are are funneled through this method. The updates are logged to
* the journal and then the indexes are update. onFlush will be called back once
@@ -423,7 +472,9 @@ class HawtDBClient(hawtDBStore: HawtDBSt
val buffer = baos.toBuffer()
append(buffer) {
location =>
- executeStore(batch, update, onComplete, location)
+ metric_index_update.time {
+ executeStore(batch, update, onComplete, location)
+ }
}
}
@@ -489,6 +540,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
val start = System.currentTimeMillis()
incrementalRecover
+
_store(new AddTrace.Bean().setMessage("RECOVERED"), ^ {
// Rollback any batches that did not complete.
batches.keysIterator.foreach {
@@ -573,11 +625,14 @@ class HawtDBClient(hawtDBStore: HawtDBSt
/////////////////////////////////////////////////////////////////////
private def append(data: Buffer)(cb: (Location) => Unit): Unit = {
- journal.write(data, new JournalCallback() {
- def success(location: Location) = {
- cb(location)
- }
- })
+ metric_journal_append.start { end =>
+ journal.write(data, new JournalCallback() {
+ def success(location: Location) = {
+ end()
+ cb(location)
+ }
+ })
+ }
}
def read(location: Location) = journal.read(location)
@@ -698,7 +753,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
val trackingIndex = queueTrackingIndex(queueRecord)
val entryIndex = queueEntryIndex(queueRecord)
- trackingIndex.iterator.map { entry=>
+ trackingIndex.iterator.foreach { entry=>
val messageKey = entry.getKey
if( addAndGet(messageRefsIndex, messageKey, -1) == 0 ) {
// message is no longer referenced.. we can remove it..
@@ -827,7 +882,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
case x: Purge.Getter =>
// Remove all the queues...
- val queueKeys = queueIndex.iterator.map( _.getKey ).toList
+ val queueKeys = queueIndex.iterator.map( _.getKey )
queueKeys.foreach { key =>
removeQueue(key.longValue)
}
@@ -1028,6 +1083,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
} else {
tx.rollback
}
+ tx.close
}
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala?rev=961163&r1=961162&r2=961163&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala Wed Jul 7 04:12:40 2010
@@ -16,20 +16,20 @@
*/
package org.apache.activemq.broker.store.hawtdb
-import org.apache.activemq.broker.store.{StoreBatch, Store}
+import org.apache.activemq.broker.store.{StoreUOW, Store}
import org.fusesource.hawtdispatch.BaseRetained
import java.util.concurrent.atomic.AtomicLong
import collection.mutable.ListBuffer
import java.util.HashMap
-import org.apache.activemq.apollo.store.{QueueEntryRecord, MessageRecord, QueueStatus, QueueRecord}
import org.apache.activemq.apollo.dto.{HawtDBStoreDTO, StoreDTO}
import collection.{JavaConversions, Seq}
import org.fusesource.hawtdispatch.ScalaDispatch._
import org.apache.activemq.apollo.broker._
import java.io.File
import ReporterLevel._
-import org.apache.activemq.apollo.util.{TimeCounter, IntCounter}
import java.util.concurrent._
+import org.apache.activemq.apollo.store._
+import org.apache.activemq.apollo.util.{IntMetricCounter, TimeCounter, IntCounter}
object HawtDBStore extends Log {
val DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
@@ -138,6 +138,7 @@ class HawtDBStore extends Store with Bas
})
}
+
def addQueue(record: QueueRecord)(callback: (Option[Long]) => Unit) = {
val key = next_queue_key.getAndIncrement
record.key = key
@@ -148,9 +149,9 @@ class HawtDBStore extends Store with Bas
client.removeQueue(queueKey,^{ callback(true) })
}
- def getQueueStatus(id: Long)(callback: (Option[QueueStatus]) => Unit) = {
+ def getQueueStatus(queueKey: Long)(callback: (Option[QueueStatus]) => Unit) = {
executor_pool ^{
- callback( client.getQueueStatus(id) )
+ callback( client.getQueueStatus(queueKey) )
}
}
@@ -165,35 +166,46 @@ class HawtDBStore extends Store with Bas
load_source.resume
- def loadMessage(id: Long)(callback: (Option[MessageRecord]) => Unit) = {
- load_source.merge((id, callback))
+ def loadMessage(messageKey: Long)(callback: (Option[MessageRecord]) => Unit) = {
+ message_load_latency.start { end=>
+ load_source.merge((messageKey, { (result)=>
+ end()
+ callback(result)
+ }))
+ }
}
def drain_loads = {
var data = load_source.getData
+ message_load_batch_size += data.size
executor_pool ^{
client.loadMessages(data)
}
}
- def listQueueEntries(id: Long)(callback: (Seq[QueueEntryRecord]) => Unit) = {
+ def listQueueEntryGroups(queueKey: Long, limit: Int)(callback: (Seq[QueueEntryGroup]) => Unit) = {
executor_pool ^{
- callback( client.getQueueEntries(id) )
+ callback( client.listQueueEntryGroups(queueKey, limit) )
}
}
- def flushMessage(id: Long)(cb: => Unit) = dispatchQueue {
- val action: HawtDBBatch#MessageAction = pendingStores.get(id)
+ def listQueueEntries(queueKey: Long, firstSeq: Long, lastSeq: Long)(callback: (Seq[QueueEntryRecord]) => Unit) = {
+ executor_pool ^{
+ callback( client.getQueueEntries(queueKey, firstSeq, lastSeq) )
+ }
+ }
+
+ def flushMessage(messageKey: Long)(cb: => Unit) = dispatchQueue {
+ val action: HawtDBUOW#MessageAction = pendingStores.get(messageKey)
if( action == null ) {
-// println("flush due to not found: "+id)
cb
} else {
- action.tx.eagerFlush(^{ cb })
- flush(action.tx.txid)
+ action.uow.onComplete(^{ cb })
+ flush(action.uow.uow_id)
}
}
- def createStoreBatch() = new HawtDBBatch
+ def createStoreUOW() = new HawtDBUOW
/////////////////////////////////////////////////////////////////////
@@ -201,7 +213,7 @@ class HawtDBStore extends Store with Bas
// Implementation of the StoreBatch interface
//
/////////////////////////////////////////////////////////////////////
- class HawtDBBatch extends BaseRetained with StoreBatch {
+ class HawtDBUOW extends BaseRetained with StoreUOW {
var dispose_start:Long = 0
var flushing = false;
@@ -213,29 +225,37 @@ class HawtDBStore extends Store with Bas
var enqueues = ListBuffer[QueueEntryRecord]()
var dequeues = ListBuffer[QueueEntryRecord]()
- def tx = HawtDBBatch.this
+ def uow = HawtDBUOW.this
def isEmpty() = messageRecord==null && enqueues==Nil && dequeues==Nil
+
def cancel() = {
- tx.rm(msg)
- if( tx.isEmpty ) {
- tx.cancel
- }
+ uow.rm(msg)
}
}
- val txid:Int = next_tx_id.getAndIncrement
+ val uow_id:Int = next_batch_id.getAndIncrement
var actions = Map[Long, MessageAction]()
- var flushListeners = ListBuffer[Runnable]()
- def eagerFlush(callback: Runnable) = if( callback!=null ) { this.synchronized { flushListeners += callback } }
+ var completeListeners = ListBuffer[Runnable]()
+ var disableDelay = false
+
+ def onComplete(callback: Runnable) = if( callback!=null ) { this.synchronized { completeListeners += callback } }
+
+ def completeASAP() = this.synchronized { disableDelay=true }
+
+ var delayable_actions = 0
+
+ def delayable = !disableDelay && delayable_actions>0 && config.flushDelay>=0
def rm(msg:Long) = {
actions -= msg
+ if( actions.isEmpty ) {
+ cancel
+ }
}
- def isEmpty = actions.isEmpty
def cancel = {
- delayedTransactions.remove(txid)
+ delayedUOWs.remove(uow_id)
onPerformed
}
@@ -250,7 +270,7 @@ class HawtDBStore extends Store with Bas
dispatchQueue {
pendingStores.put(record.key, action)
}
-
+ delayable_actions += 1
record.key
}
@@ -266,13 +286,14 @@ class HawtDBStore extends Store with Bas
}
def enqueue(entry: QueueEntryRecord) = {
-
- this.synchronized {
+ val a = this.synchronized {
val a = action(entry.messageKey)
a.enqueues += entry
- dispatchQueue {
- pendingEnqueues.put(key(entry), a)
- }
+ delayable_actions += 1
+ a
+ }
+ dispatchQueue {
+ pending_enqueues.put(key(entry), a)
}
}
@@ -285,14 +306,12 @@ class HawtDBStore extends Store with Bas
override def dispose = {
dispose_start = System.nanoTime
- transaction_source.merge(this)
+ uow_source.merge(this)
}
def onPerformed() = this.synchronized {
- metric_commit_counter += 1
- val t = TimeUnit.NANOSECONDS.toMillis(System.nanoTime-dispose_start)
- metric_commit_latency_counter += t
- flushListeners.foreach { x=>
+ commit_latency += System.nanoTime-dispose_start
+ completeListeners.foreach { x=>
x.run
}
super.dispose
@@ -303,9 +322,10 @@ class HawtDBStore extends Store with Bas
var metric_canceled_enqueue_counter:Long = 0
var metric_flushed_message_counter:Long = 0
var metric_flushed_enqueue_counter:Long = 0
- var metric_commit_counter:Long = 0
- var metric_commit_latency_counter:Long = 0
+ val commit_latency = new TimeCounter
+ val message_load_batch_size = new IntMetricCounter
+ val message_load_latency = new TimeCounter
var canceled_add_message:Long = 0
var canceled_enqueue:Long = 0
@@ -313,27 +333,23 @@ class HawtDBStore extends Store with Bas
def key(x:QueueEntryRecord) = (x.queueKey, x.queueSeq)
- val transaction_source = createSource(new ListEventAggregator[HawtDBBatch](), dispatchQueue)
- transaction_source.setEventHandler(^{drain_transactions});
- transaction_source.resume
-
- var pendingStores = new HashMap[Long, HawtDBBatch#MessageAction]()
- var pendingEnqueues = new HashMap[(Long,Long), HawtDBBatch#MessageAction]()
- var delayedTransactions = new HashMap[Int, HawtDBBatch]()
+ val uow_source = createSource(new ListEventAggregator[HawtDBUOW](), dispatchQueue)
+ uow_source.setEventHandler(^{drain_uows});
+ uow_source.resume
+
+ var pendingStores = new HashMap[Long, HawtDBUOW#MessageAction]()
+ var pending_enqueues = new HashMap[(Long,Long), HawtDBUOW#MessageAction]()
+ var delayedUOWs = new HashMap[Int, HawtDBUOW]()
- var next_tx_id = new IntCounter(1)
+ var next_batch_id = new IntCounter(1)
def schedualDisplayStats:Unit = {
val st = System.nanoTime
- val ss = (metric_canceled_message_counter, metric_canceled_enqueue_counter, metric_flushed_message_counter, metric_flushed_enqueue_counter, metric_commit_counter, metric_commit_latency_counter)
+ val ss = (metric_canceled_message_counter, metric_canceled_enqueue_counter, metric_flushed_message_counter, metric_flushed_enqueue_counter)
def displayStats = {
if( serviceState.isStarted ) {
val et = System.nanoTime
- val es = (metric_canceled_message_counter, metric_canceled_enqueue_counter, metric_flushed_message_counter, metric_flushed_enqueue_counter, metric_commit_counter, metric_commit_latency_counter)
-
- val commits = es._5-ss._5
- var avgCommitLatency = if (commits!=0) (es._6 - ss._6).toFloat / commits else 0f
-
+ val es = (metric_canceled_message_counter, metric_canceled_enqueue_counter, metric_flushed_message_counter, metric_flushed_enqueue_counter)
def rate(x:Long, y:Long):Float = ((y-x)*1000.0f)/TimeUnit.NANOSECONDS.toMillis(et-st)
val m1 = rate(ss._1,es._1)
@@ -342,23 +358,29 @@ class HawtDBStore extends Store with Bas
val m4 = rate(ss._4,es._4)
if( m1>0f || m2>0f || m3>0f || m4>0f ) {
- info("metrics: cancled: { messages: %,.3f, enqeues: %,.3f }, flushed: { messages: %,.3f, enqeues: %,.3f }, commit latency: %,.3f, store latency: %,.3f",
- m1, m2, m3, m3, avgCommitLatency, storeLatency(true).avgTime(TimeUnit.MILLISECONDS) )
+ info("metrics: cancled: { messages: %,.3f, enqeues: %,.3f }, flushed: { messages: %,.3f, enqeues: %,.3f }",
+ m1, m2, m3, m4 )
}
-
- def display(name:String, counter:TimeCounter) {
+ def displayTime(name:String, counter:TimeCounter) = {
var t = counter.apply(true)
if( t.count > 0 ) {
- info("%s latency in ms: avg: %,.3f, max: %,.3f, min: %,.3f", name, t.avgTime(TimeUnit.MILLISECONDS), t.maxTime(TimeUnit.MILLISECONDS), t.minTime(TimeUnit.MILLISECONDS))
+ info("%s latency in ms: avg: %,.3f, min: %,.3f, max: %,.3f", name, t.avgTime(TimeUnit.MILLISECONDS), t.minTime(TimeUnit.MILLISECONDS), t.maxTime(TimeUnit.MILLISECONDS))
+ }
+ }
+ def displayInt(name:String, counter:IntMetricCounter) = {
+ var t = counter.apply(true)
+ if( t.count > 0 ) {
+ info("%s: avg: %,.3f, min: %d, max: %d", name, t.avg, t.min, t.max )
}
}
-// display("total msg load", loadMessageTimer)
-// display("index read", client.indexLoad)
-// display("toal journal load", client.journalLoad)
-// display("journal read", client.journalRead)
-// display("journal decode", client.journalDecode)
+ displayTime("commit", commit_latency)
+ displayTime("store", store_latency)
+ displayTime("message load", message_load_latency)
+ displayTime("journal append", client.metric_journal_append)
+ displayTime("index update", client.metric_index_update)
+ displayInt("load batch size", message_load_batch_size)
schedualDisplayStats
}
@@ -366,34 +388,43 @@ class HawtDBStore extends Store with Bas
dispatchQueue.dispatchAfter(5, TimeUnit.SECONDS, ^{ displayStats })
}
- def drain_transactions = {
- transaction_source.getData.foreach { tx =>
+ def drain_uows = {
+ uow_source.getData.foreach { uow =>
- delayedTransactions.put(tx.txid, tx)
+ delayedUOWs.put(uow.uow_id, uow)
- tx.actions.foreach { case (msg, action) =>
+ uow.actions.foreach { case (msg, action) =>
// dequeues can cancel out previous enqueues
action.dequeues.foreach { currentDequeue=>
val currentKey = key(currentDequeue)
- val prevAction:HawtDBBatch#MessageAction = pendingEnqueues.remove(currentKey)
- if( prevAction!=null && !prevAction.tx.flushing ) {
+ val prev_action:HawtDBUOW#MessageAction = pending_enqueues.remove(currentKey)
+
+ def prev_batch = prev_action.uow
+
+ if( prev_action!=null && !prev_batch.flushing ) {
+
+ prev_batch.delayable_actions -= 1
metric_canceled_enqueue_counter += 1
// yay we can cancel out a previous enqueue
- prevAction.enqueues = prevAction.enqueues.filterNot( x=> key(x) == currentKey )
+ prev_action.enqueues = prev_action.enqueues.filterNot( x=> key(x) == currentKey )
// if the message is not in any queues.. we can gc it..
- if( prevAction.enqueues == Nil && prevAction.messageRecord !=null ) {
+ if( prev_action.enqueues == Nil && prev_action.messageRecord !=null ) {
pendingStores.remove(msg)
- prevAction.messageRecord = null
+ prev_action.messageRecord = null
+ prev_batch.delayable_actions -= 1
metric_canceled_message_counter += 1
}
// Cancel the action if it's now empty
- if( prevAction.isEmpty ) {
- prevAction.cancel()
+ if( prev_action.isEmpty ) {
+ prev_action.cancel()
+ } else if( !prev_batch.delayable ) {
+ // flush it if there is no point in delyaing anymore
+ flush(prev_batch.uow_id)
}
// since we canceled out the previous enqueue.. now cancel out the action
@@ -405,25 +436,25 @@ class HawtDBStore extends Store with Bas
}
}
- val tx_id = tx.txid
- if( !tx.flushListeners.isEmpty || config.flushDelay <= 0 ) {
- flush(tx_id)
+ val batch_id = uow.uow_id
+ if( uow.delayable ) {
+ dispatchQueue.dispatchAfter(config.flushDelay, TimeUnit.MILLISECONDS, ^{flush(batch_id)})
} else {
- dispatchQueue.dispatchAfter(config.flushDelay, TimeUnit.MILLISECONDS, ^{flush(tx_id)})
+ flush(batch_id)
}
}
}
- def flush(tx_id:Int) = {
- flush_source.merge(tx_id)
+ def flush(batch_id:Int) = {
+ flush_source.merge(batch_id)
}
val flush_source = createSource(new ListEventAggregator[Int](), dispatchQueue)
flush_source.setEventHandler(^{drain_flushes});
flush_source.resume
- val storeLatency = new TimeCounter
+ val store_latency = new TimeCounter
def drain_flushes:Unit = {
@@ -431,29 +462,29 @@ class HawtDBStore extends Store with Bas
return
}
- val txs = flush_source.getData.flatMap{ tx_id =>
+ val uows = flush_source.getData.flatMap{ uow_id =>
- val tx = delayedTransactions.remove(tx_id)
+ val uow = delayedUOWs.remove(uow_id)
// Message may be flushed or canceled before the timeout flush event..
- // tx may be null in those cases
- if (tx!=null) {
- tx.flushing = true
- Some(tx)
+ // uow may be null in those cases
+ if (uow!=null) {
+ uow.flushing = true
+ Some(uow)
} else {
None
}
}
- if( !txs.isEmpty ) {
- storeLatency.start { end=>
+ if( !uows.isEmpty ) {
+ store_latency.start { end=>
executor_pool {
- client.store(txs, ^{
+ client.store(uows, ^{
dispatchQueue {
end()
- txs.foreach { tx=>
+ uows.foreach { uow=>
- tx.actions.foreach { case (msg, action) =>
+ uow.actions.foreach { case (msg, action) =>
if( action.messageRecord !=null ) {
metric_flushed_message_counter += 1
pendingStores.remove(msg)
@@ -461,10 +492,10 @@ class HawtDBStore extends Store with Bas
action.enqueues.foreach { queueEntry=>
metric_flushed_enqueue_counter += 1
val k = key(queueEntry)
- pendingEnqueues.remove(k)
+ pending_enqueues.remove(k)
}
}
- tx.onPerformed
+ uow.onPerformed
}
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961163&r1=961162&r2=961163&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul 7 04:12:40 2010
@@ -29,7 +29,7 @@ import Stomp._
import BufferConversions._
import StompFrameConstants._
import java.io.IOException
-import org.apache.activemq.broker.store.StoreBatch
+import org.apache.activemq.broker.store.StoreUOW
import org.apache.activemq.selector.SelectorParser
import org.apache.activemq.filter.{BooleanExpression, FilterException}
@@ -160,7 +160,7 @@ class StompProtocolHandler extends Proto
var host:VirtualHost = null
private def queue = connection.dispatchQueue
- var pendingAcks = HashMap[AsciiBuffer, (StoreBatch)=>Unit]()
+ var pendingAcks = HashMap[AsciiBuffer, (StoreUOW)=>Unit]()
override def onTransportConnected() = {
session_manager = new SinkMux[StompFrame]( MapSink(connection.transportSink){ x=>x }, dispatchQueue, StompFrame)
@@ -283,7 +283,7 @@ class StompProtocolHandler extends Proto
}
def send_via_route(route:DeliveryProducerRoute, frame:StompFrame) = {
- var storeBatch:StoreBatch=null
+ var storeBatch:StoreUOW=null
// User might be asking for ack that we have prcoessed the message..
val receipt = frame.header(Stomp.Headers.RECEIPT_REQUESTED)
Added: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryGroup.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryGroup.java?rev=961163&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryGroup.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryGroup.java Wed Jul 7 04:12:40 2010
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.store;
+
+import org.fusesource.hawtbuf.Buffer;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class QueueEntryGroup {
+ public long firstSeq;
+ public long lastSeq;
+ public int count;
+ public int size;
+}
\ No newline at end of file
Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala?rev=961163&r1=961162&r2=961163&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala Wed Jul 7 04:12:40 2010
@@ -16,68 +16,26 @@
*/
package org.apache.activemq.broker.store
-import _root_.java.lang.{String}
-import org.fusesource.hawtbuf._
import org.apache.activemq.Service
-import org.fusesource.hawtdispatch.{Retained}
import org.apache.activemq.apollo.store._
import org.apache.activemq.apollo.broker.Reporter
import org.apache.activemq.apollo.dto.StoreDTO
/**
- * A store batch is used to perform persistent
- * operations as a unit of work.
- *
- * The batch implements the Retained interface and is
- * thread safe. Once the batch is no longer retained,
- * the unit of work is executed.
+ * <p>
+ * The Store is service which offers asynchronous persistence services
+ * to a Broker.
+ * </p>
*
- * The disposer assigned to the batch will
- * be executed once the unit of work is persisted
- * or it has been negated by subsequent storage
- * operations.
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-trait StoreBatch extends Retained {
-
- /**
- * Stores a message. Messages a reference counted, so make sure you also
- * enqueue it to queue if you don't want it to be discarded right away.
- *
- * This method auto generates and assigns the key field of the message record and
- * returns it.
- */
- def store(message:MessageRecord):Long
-
- /**
- * Adds a queue entry
- */
- def enqueue(entry:QueueEntryRecord)
-
- /**
- * Removes a queue entry
- */
- def dequeue(entry:QueueEntryRecord)
-
-
- /**
- * Causes the batch to flush eagerly, callback is called once flushed.
- */
- def eagerFlush(callback: Runnable)
-
-}
-
-/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
trait Store extends Service {
/**
- * Creates a store batch which is used to perform persistent
+ * Creates a store uow which is used to perform persistent
* operations as unit of work.
*/
- def createStoreBatch():StoreBatch
+ def createStoreUOW():StoreUOW
/**
* Supplies configuration data to the Store. This will be called
@@ -116,9 +74,16 @@ trait Store extends Service {
def listQueues(callback: (Seq[Long])=>Unit )
/**
- * Loads the queue information for a given queue id.
+ * Groups all the entries in the specified queue into groups containing limit entries and returns those
+ * groups. Allows you to quickly get a rough idea of the items in queue without consuming too much memory.
+ */
+ def listQueueEntryGroups(queueKey:Long, limit:Int)(callback:(Seq[QueueEntryGroup])=>Unit )
+
+ /**
+ * Loads all the queue entry records for the given queue id between the first and last provided
+ * queue sequences (inclusive).
*/
- def listQueueEntries(queueKey:Long)(callback:(Seq[QueueEntryRecord])=>Unit )
+ def listQueueEntries(queueKey:Long, firstSeq:Long, lastSeq:Long)(callback:(Seq[QueueEntryRecord])=>Unit )
/**
* Removes a the delivery associated with the provided from any
Added: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/StoreUOW.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/StoreUOW.scala?rev=961163&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/StoreUOW.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/StoreUOW.scala Wed Jul 7 04:12:40 2010
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store
+
+import org.fusesource.hawtdispatch.{Retained}
+import org.apache.activemq.apollo.store._
+
+/**
+ * A store uow is used to perform persistent
+ * operations as a unit of work.
+ *
+ * The uow implements the Retained interface and is
+ * thread safe. Once the uow is no longer retained,
+ * the unit of work is executed.
+ *
+ * The disposer assigned to the uow will
+ * be executed once the unit of work is persisted
+ * or it has been negated by subsequent storage
+ * operations.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+trait StoreUOW extends Retained {
+
+ /**
+ * Stores a message. Messages a reference counted, so make sure you also
+ * enqueue it to queue if you don't want it to be discarded right away.
+ *
+ * This method auto generates and assigns the key field of the message record and
+ * returns it.
+ */
+ def store(message:MessageRecord):Long
+
+ /**
+ * Adds a queue entry
+ */
+ def enqueue(entry:QueueEntryRecord)
+
+ /**
+ * Removes a queue entry
+ */
+ def dequeue(entry:QueueEntryRecord)
+
+ /**
+ * Marks this uow as needing to be completed
+ * as soon as possible. If not called, the Store
+ * implementation may delay completing the uow in
+ * the hopes that a subsequent uow will cancel negate
+ * all it operations and thus avoid the cost of the
+ * persistence operations.
+ */
+ def completeASAP()
+
+ /**
+ * The specified callback is executed once the UOW
+ * is completed.
+ */
+ def onComplete(callback: Runnable)
+
+}
\ No newline at end of file
Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreBenchmarkSupport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreBenchmarkSupport.scala?rev=961163&r1=961162&r2=961163&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreBenchmarkSupport.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreBenchmarkSupport.scala Wed Jul 7 04:12:40 2010
@@ -91,7 +91,7 @@ abstract class StoreBenchmarkSupport ext
rc.get
}
- def addMessage(batch:StoreBatch, content:String):Long = {
+ def addMessage(batch:StoreUOW, content:String):Long = {
var message = new MessageRecord
message.protocol = ascii("test-protocol")
message.value = ascii(content).buffer
@@ -124,7 +124,7 @@ abstract class StoreBenchmarkSupport ext
}
def populate(queueKey:Long, messages:List[String], firstSeq:Long=1) = {
- var batch = store.createStoreBatch
+ var batch = store.createStoreUOW
var msgKeys = ListBuffer[Long]()
var nextSeq = firstSeq
@@ -159,7 +159,7 @@ abstract class StoreBenchmarkSupport ext
var metric = benchmarkCount(100000) {
seq += 1
- var batch = store.createStoreBatch
+ var batch = store.createStoreUOW
val message = addMessage(batch, content)
messageKeys += message
batch.enqueue(entry(queue, seq, message))
Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala?rev=961163&r1=961162&r2=961163&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala Wed Jul 7 04:12:40 2010
@@ -90,7 +90,7 @@ abstract class StoreFunSuiteSupport exte
rc.get
}
- def addMessage(batch:StoreBatch, content:String):Long = {
+ def addMessage(batch:StoreUOW, content:String):Long = {
var message = new MessageRecord
message.protocol = ascii("test-protocol")
message.value = ascii(content).buffer
@@ -108,7 +108,7 @@ abstract class StoreFunSuiteSupport exte
}
def populate(queueKey:Long, messages:List[String], firstSeq:Long=1) = {
- var batch = store.createStoreBatch
+ var batch = store.createStoreUOW
var msgKeys = ListBuffer[Long]()
var nextSeq = firstSeq
@@ -165,7 +165,7 @@ abstract class StoreFunSuiteSupport exte
val A = addQueue("A")
val msgKeys = populate(A, "message 1"::"message 2"::"message 3"::Nil)
- val rc:Seq[QueueEntryRecord] = CB( cb=> store.listQueueEntries(A)(cb) )
+ val rc:Seq[QueueEntryRecord] = CB( cb=> store.listQueueEntries(A,msgKeys.head, msgKeys.last)(cb) )
expect(msgKeys.toSeq) {
rc.map( _.messageKey )
}
@@ -174,7 +174,7 @@ abstract class StoreFunSuiteSupport exte
test("batch completes after a delay") {x}
def x = {
val A = addQueue("A")
- var batch = store.createStoreBatch
+ var batch = store.createStoreUOW
val m1 = addMessage(batch, "message 1")
batch.enqueue(entry(A, 1, m1))
@@ -191,7 +191,7 @@ abstract class StoreFunSuiteSupport exte
test("flush cancels the delay") {
val A = addQueue("A")
- var batch = store.createStoreBatch
+ var batch = store.createStoreUOW
val m1 = addMessage(batch, "message 1")
batch.enqueue(entry(A, 1, m1))