You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:13:10 UTC
svn commit: r961166 - in /activemq/sandbox/activemq-apollo-actor:
activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/
activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/
activemq-hawtdb/src/main/scala/org/apache/...
Author: chirino
Date: Wed Jul 7 04:13:10 2010
New Revision: 961166
URL: http://svn.apache.org/viewvc?rev=961166&view=rev
Log:
- Eliminated the use of tombstone entries in the queue list
- Renamed QueueEntryGroup to QueueEntryRange and FlushedGroup to FlushedRange
- Simplified serveral methods in the QueueEntry interface
- Always start queues
- Updated HawtDB store to use the new JournalListener interfaces
Removed:
activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryGroup.java
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryRange.java
activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961166&r1=961165&r2=961166&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul 7 04:13:10 2010
@@ -26,7 +26,8 @@ import org.apache.activemq.broker.store.
import protocol.ProtocolFactory
import java.util.concurrent.TimeUnit
import java.util.{HashSet, Collections, ArrayList, LinkedList}
-import org.apache.activemq.apollo.store.{QueueEntryGroup, QueueEntryRecord, MessageRecord}
+import org.apache.activemq.apollo.store.{QueueEntryRange, QueueEntryRecord, MessageRecord}
+import collection.mutable.ListBuffer
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -87,11 +88,9 @@ class Queue(val host: VirtualHost, val d
var message_seq_counter = 1L
val entries = new LinkedNodeList[QueueEntry]()
- val head_entry = new QueueEntry(this, 0L);
- var tail_entry = new QueueEntry(this, next_message_seq)
-
+ val head_entry = new QueueEntry(this, 0L).head
+ var tail_entry = new QueueEntry(this, next_message_seq).tail
entries.addFirst(head_entry)
- head_entry.tombstone
var loading_size = 0
var flushing_size = 0
@@ -180,31 +179,17 @@ class Queue(val host: VirtualHost, val d
}
if( tune_persistent ) {
- host.store.listQueueEntryGroups(queueKey, tune_entry_group_size) { groups=>
+ host.store.listQueueEntryRanges(queueKey, tune_entry_group_size) { ranges=>
dispatchQueue {
- if( !groups.isEmpty ) {
-
- // adjust the head tombstone.
- head_entry.as_tombstone.count = groups.head.firstSeq
+ if( !ranges.isEmpty ) {
- var last:QueueEntryGroup = null
- groups.foreach { group =>
-
- // if groups were not adjacent.. create tombstone entry.
- if( last!=null && (last.lastSeq+1 != group.firstSeq) ) {
- val entry = new QueueEntry(Queue.this, last.lastSeq+1)
- entry.tombstone.as_tombstone.count = group.firstSeq - (last.lastSeq+1)
- entries.addLast(entry)
- }
-
- val entry = new QueueEntry(Queue.this, group.firstSeq).init(group)
+ ranges.foreach { range =>
+ val entry = new QueueEntry(Queue.this, range.firstQueueSeq).init(range)
entries.addLast(entry)
- message_seq_counter = group.lastSeq
- enqueue_item_counter += group.count
- enqueue_size_counter += group.size
-
- last = group
+ message_seq_counter = range.lastQueueSeq + 1
+ enqueue_item_counter += range.count
+ enqueue_size_counter += range.size
}
debug("restored: "+enqueue_item_counter)
@@ -297,7 +282,7 @@ class Queue(val host: VirtualHost, val d
if (cur.is_flushed || cur.is_loaded) {
total_items += 1
} else if (cur.is_flushed_group ) {
- total_items += cur.as_flushed_group.items
+ total_items += cur.as_flushed_group.count
}
cur = cur.getNext
@@ -541,21 +526,17 @@ class Queue(val host: VirtualHost, val d
debug("swapping...")
- var entry = entries.getHead
+ var entry = head_entry.getNext
while( entry!=null ) {
- if( entry.as_tombstone == null ) {
-
- val loaded = entry.as_loaded
+ val loaded = entry.as_loaded
- // Keep around prefetched and loaded entries.
- if( entry.is_prefetched || (loaded!=null && loaded.acquired)) {
- entry.load
- } else {
- // flush the the others out of memory.
- entry.flush
- }
+ // Keep around prefetched and loaded entries.
+ if( entry.is_prefetched || (loaded!=null && loaded.acquired)) {
+ entry.load
+ } else {
+ // flush the the others out of memory.
+ entry.flush
}
-
entry = entry.getNext
}
}
@@ -594,7 +575,7 @@ class Queue(val host: VirtualHost, val d
def collocate(value:DispatchQueue):Unit = {
if( value.getTargetQueue ne dispatchQueue.getTargetQueue ) {
- println(dispatchQueue.getLabel+" co-locating with: "+value.getLabel);
+ debug(dispatchQueue.getLabel+" co-locating with: "+value.getLabel);
this.dispatchQueue.setTargetQueue(value.getTargetQueue)
}
}
@@ -610,7 +591,8 @@ class QueueEntry(val queue:Queue, val se
import QueueEntry._
// Subscriptions waiting to dispatch this entry.
- var subscriptions:List[Subscription] = Nil
+ var parked:List[Subscription] = Nil
+
// The number of subscriptions which have requested this entry to be prefeteched (held in memory) so that it's
// ready for them to get dispatched.
var prefetched = 0
@@ -620,44 +602,56 @@ class QueueEntry(val queue:Queue, val se
def is_prefetched = prefetched>0
+ def head():QueueEntry = {
+ state = new Head
+ this
+ }
+
+ def tail():QueueEntry = {
+ state = new Tail
+ this
+ }
+
def init(delivery:Delivery):QueueEntry = {
- this.state = new Loaded(delivery, false)
+ state = new Loaded(delivery, false)
queue.size += size
this
}
def init(qer:QueueEntryRecord):QueueEntry = {
- this.state = new Flushed(qer.messageKey, qer.size)
+ state = new Flushed(qer.messageKey, qer.size)
this
}
- def init(group:QueueEntryGroup):QueueEntry = {
- val count = (((group.lastSeq+1)-group.firstSeq)).toInt
- val tombstones = count-group.count
- this.state = new FlushedGroup(count, group.size, tombstones)
+ def init(range:QueueEntryRange):QueueEntry = {
+ state = new FlushedRange(range.lastQueueSeq, range.count, range.size)
this
}
- def hasSubs = !(subscriptions == Nil )
+ def hasSubs = !parked.isEmpty
/**
* Dispatches this entry to the consumers and continues dispatching subsequent
- * entries if it has subscriptions which accept the dispatch.
+ * entries as long as the dispatch results in advancing in their dispatch position.
*/
def run() = {
- var next = dispatch()
- while( next!=null ) {
- next = next.dispatch
+ var next = this;
+ while( next!=null && next.dispatch) {
+ next = next.getNext
}
}
- def addSubscriptions(l:List[Subscription]) = {
- subscriptions :::= l
+ def ::=(sub:Subscription) = {
+ parked ::= sub
+ }
+
+ def :::=(l:List[Subscription]) = {
+ parked :::= l
}
- def removeSubscriptions(s:Subscription) = {
- subscriptions = subscriptions.filterNot(_ == s)
+ def -=(s:Subscription) = {
+ parked = parked.filterNot(_ == s)
}
def nextOrTail():QueueEntry = {
@@ -683,7 +677,7 @@ class QueueEntry(val queue:Queue, val se
}
override def toString = {
- "{seq: "+seq+", prefetched: "+prefetched+", value: "+state+", subscriptions: "+subscriptions+"}"
+ "{seq: "+seq+", prefetched: "+prefetched+", value: "+state+", subscriptions: "+parked+"}"
}
/////////////////////////////////////////////////////
@@ -693,11 +687,12 @@ class QueueEntry(val queue:Queue, val se
/////////////////////////////////////////////////////
// What state is it in?
- def as_tombstone = this.state.as_tombstone
- def as_flushed = this.state.as_flushed
- def as_flushed_group = this.state.as_flushed_group
- def as_loaded = this.state.as_loaded
- def as_tail = this.state.as_tail
+ def as_head = state.as_head
+ def as_tail = state.as_tail
+
+ def as_flushed = state.as_flushed
+ def as_flushed_group = state.as_flushed_group
+ def as_loaded = state.as_loaded
def is_tail = this == queue.tail_entry
def is_head = this == queue.head_entry
@@ -705,18 +700,17 @@ class QueueEntry(val queue:Queue, val se
def is_loaded = as_loaded!=null
def is_flushed = as_flushed!=null
def is_flushed_group = as_flushed_group!=null
- def is_tombstone = as_tombstone!=null
// These should not change the current state.
- def size = this.state.size
- def messageKey = this.state.messageKey
+ def size = state.size
+ def messageKey = state.messageKey
def is_flushed_or_flushing = state.is_flushed_or_flushing
- def dispatch():QueueEntry = state.dispatch
+ def dispatch() = state.dispatch
// These methods may cause a change in the current state.
- def flush:QueueEntry = this.state.flush
- def load:QueueEntry = this.state.load
- def tombstone:QueueEntry = this.state.tombstone
+ def flush = state.flush
+ def load = state.load
+ def remove = state.remove
trait EntryState {
@@ -725,18 +719,46 @@ class QueueEntry(val queue:Queue, val se
def as_tail:Tail = null
def as_loaded:Loaded = null
def as_flushed:Flushed = null
- def as_flushed_group:FlushedGroup = null
- def as_tombstone:Tombstone = null
+ def as_flushed_group:FlushedRange = null
+ def as_head:Head = null
- def size:Int
- def dispatch():QueueEntry
- def messageKey:Long
+ /**
+ * Gets the size of this entry in bytes. The head and tail entries allways return 0.
+ */
+ def size = 0
+
+ /**
+ * Gets the message key for the entry.
+ * @returns -1 if it is not known.
+ */
+ def messageKey = -1L
+
+ /**
+ * Attempts to dispatch the current entry to the subscriptions position at the entry.
+ * @returns true if at least one subscription advanced to the next entry as a result of dispatching.
+ */
+ def dispatch() = false
+
+ /**
+ * @returns true if the entry is either flushed or flushing.
+ */
def is_flushed_or_flushing = false
- def load = entry
+ /**
+ * Triggers the entry to get loaded if it's not already loaded.
+ */
+ def load = {}
- def flush = entry
+ /**
+ * Triggers the entry to get flushed if it's not already flushed.
+ */
+ def flush = {}
+ /**
+ * Takes the current entry out of the prefetch of all subscriptions
+ * which have prefetched the entry. Returns the list of subscriptions which
+ * had prefetched it.
+ */
def prefetch_remove = {
var rc = List[Subscription]()
if( queue.tune_flush_to_store ) {
@@ -744,7 +766,7 @@ class QueueEntry(val queue:Queue, val se
var cur = entry
while( cur!=null && is_prefetched ) {
if( cur.hasSubs ) {
- (cur.subscriptions).foreach { sub =>
+ (cur.parked).foreach { sub =>
if( sub.is_prefetched(entry) ) {
sub.remove_from_prefetch(entry)
rc ::= sub
@@ -758,45 +780,62 @@ class QueueEntry(val queue:Queue, val se
rc
}
- def tombstone = {
-
+ /**
+ * Removes the entry from the queue's linked list of entries. This gets called
+ * as a result of an aquired ack.
+ */
+ def remove = {
+ // take us out of subscription prefetches..
var refill_preftch_list = prefetch_remove
+ // advance subscriptions that were on this entry..
+ parked.foreach(_.advance(next))
+ nextOrTail :::= parked
+ parked = Nil
+ // take the entry of the entries list..
+ unlink
+ // refill the subscription prefetches..
+ refill_preftch_list.foreach( _.refill_prefetch )
+ }
- // if rv and lv are both adjacent tombstones, then this merges the rv
- // tombstone into lv, unlinks rv, and returns lv, otherwise it returns
- // rv.
- def merge(lv:QueueEntry, rv:QueueEntry):QueueEntry = {
- if( lv==null || rv==null) {
- return rv
- }
-
- val lts = lv.state.as_tombstone
- val rts = rv.state.as_tombstone
+ /**
+ * Advances the specified subscriptions to the next entry in
+ * the linked list
+ */
+ def advance(advancing: Seq[Subscription]): Unit = {
+ val nextPos = nextOrTail
+ nextPos :::= advancing.toList
+ advancing.foreach(_.advance(nextPos))
+ }
- if( lts==null || rts==null ) {
- return rv
- }
+ }
- // Sanity check: the the entries are adjacent.. this should
- // always be the case.
- assert( lv.seq + lts.count == rv.seq , "entries are not adjacent.")
+ /**
+ * Used for the head entry. This is the starting point for all new subscriptions.
+ */
+ class Head extends EntryState {
- lts.count += rts.count
- rv.dispatch // moves the subs to the next entry.
- rv.unlink
- return lv
- }
+ override def toString = "head"
+ override def as_head = this
- state = new Tombstone()
- merge(entry, getNext)
- val rc = merge(getPrevious, entry)
+ /**
+ * New subs get parked here at the Head. There is nothing to actually dispatch
+ * in this entry.. just advance the parked subs onto the next entry.
+ */
+ override def dispatch() = {
+ if( parked != Nil ) {
- refill_preftch_list.foreach( _.refill_prefetch )
+ advance(parked)
+ parked = Nil
+ true
- rc.run // dispatch to move the subs to the next entry..
- rc
+ } else {
+ false
+ }
}
+ override def remove = throw new AssertionError("Head entry cannot be removed")
+ override def load = throw new AssertionError("Head entry cannot be loaded")
+ override def flush = throw new AssertionError("Head entry cannot be flushed")
}
/**
@@ -806,30 +845,30 @@ class QueueEntry(val queue:Queue, val se
*/
class Tail extends EntryState {
+ override def toString = "tail"
override def as_tail:Tail = this
- def size = 0
- def messageKey = -1
- def dispatch():QueueEntry = null
-
- override def toString = { "tail" }
+ override def remove = throw new AssertionError("Tail entry cannot be removed")
override def load = throw new AssertionError("Tail entry cannot be loaded")
override def flush = throw new AssertionError("Tail entry cannot be flushed")
}
/**
- * This state is used while a message is loaded in memory. A message must be in this state
- * before it can be dispatched to a consumer. It can transition to Flushed or Tombstone.
+ * The entry is in this state while a message is loaded in memory. A message must be in this state
+ * before it can be dispatched to a subscription.
*/
- class Loaded(val delivery: Delivery, var store_completed:Boolean) extends EntryState {
+ class Loaded(val delivery: Delivery, var stored:Boolean) extends EntryState {
+
+ assert( delivery!=null, "delivery cannot be null")
var acquired = false
- def messageKey = delivery.storeKey
- def size = delivery.size
var flushing = false
- override def toString = { "loaded:{ flushing: "+flushing+", acquired: "+acquired+", size:"+size+"}" }
+ override def toString = { "loaded:{ stored: "+stored+", flushing: "+flushing+", acquired: "+acquired+", size:"+size+"}" }
+
+ override def size = delivery.size
+ override def messageKey = delivery.storeKey
override def is_flushed_or_flushing = {
flushing
@@ -846,7 +885,7 @@ class QueueEntry(val queue:Queue, val se
override def flush() = {
if( queue.tune_flush_to_store ) {
- if( store_completed ) {
+ if( stored ) {
flushing=true
flushed
} else {
@@ -882,12 +921,10 @@ class QueueEntry(val queue:Queue, val se
}
}
}
-
- entry
}
def flushed() = {
- store_completed = true
+ stored = true
delivery.uow = null
if( flushing ) {
queue.flushing_size-=size
@@ -901,66 +938,56 @@ class QueueEntry(val queue:Queue, val se
flushing = false
queue.flushing_size-=size
}
- entry
}
- override def tombstone = {
+ override def remove = {
if( flushing ) {
flushing = false
queue.flushing_size-=size
}
queue.size -= size
- super.tombstone
+ super.remove
}
- def dispatch():QueueEntry = {
+ override def dispatch():Boolean = {
// Nothing to dispatch if we don't have subs..
- if( subscriptions==Nil ) {
- // This usualy happens when a new consumer connects, it's not marked as slow but
- // is not at the tail. And this entry is an entry just sent by a producer.
- return nextOrTail
- }
-
- // can't dispatch until the delivery is set.
- if( delivery==null ) {
- // TODO: check to see if this ever happens
- return null
+ if( parked.isEmpty ) {
+ return false
}
- var heldBack:List[Subscription] = Nil
- var advancing:List[Subscription] = Nil
-
+ var heldBack = ListBuffer[Subscription]()
+ var advancing = ListBuffer[Subscription]()
var acquiringSub: Subscription = null
- subscriptions.foreach{ sub=>
+ parked.foreach{ sub=>
if( sub.browser ) {
if (!sub.matches(delivery)) {
// advance: not interested.
- advancing ::= sub
+ advancing += sub
} else {
if (sub.offer(delivery)) {
// advance: accepted...
- advancing ::= sub
+ advancing == sub
} else {
// hold back: flow controlled
- heldBack ::= sub
+ heldBack += sub
}
}
} else {
if( acquired ) {
// advance: another sub already acquired this entry..
- advancing = advancing ::: sub :: Nil
+ advancing += sub
} else {
if (!sub.matches(delivery)) {
// advance: not interested.
- advancing = advancing ::: sub :: Nil
+ advancing += sub
} else {
if( sub.full ) {
// hold back: flow controlled
- heldBack = heldBack ::: sub :: Nil
+ heldBack += sub
} else {
// advance: accepted...
acquiringSub = sub
@@ -983,23 +1010,23 @@ class QueueEntry(val queue:Queue, val se
// the other competing subs get first dibs at the next entry.
if( acquiringSub != null ) {
- // Advancing may need to be held back because the sub's prefetch is full.
- if( acquiringSub.prefetchFull ) {
- advancing = advancing ::: acquiringSub :: Nil
+ // Advancing may need to be held back because the sub's prefetch is full
+ if( acquiringSub.prefetchFull && !acquiringSub.is_prefetched(getNext) ) {
+ heldBack += acquiringSub
} else {
- heldBack = heldBack ::: acquiringSub :: Nil
+ advancing += acquiringSub
}
}
- // The held back subs stay on this entry..
- subscriptions = heldBack
+ if ( advancing.isEmpty ) {
+ return false
+ } else {
- // the advancing subs move on to the next entry...
- if ( advancing!=Nil ) {
+ // The held back subs stay on this entry..
+ parked = heldBack.toList
- val next = nextOrTail
- next.addSubscriptions(advancing)
- advancing.foreach(_.advance(next))
+ // the advancing subs move on to the next entry...
+ advance(advancing)
// flush this entry out if it's not going to be needed soon.
def haveQuickConsumer = queue.fast_subscriptions.find( sub=> sub.pos.seq <= seq ).isDefined
@@ -1007,60 +1034,98 @@ class QueueEntry(val queue:Queue, val se
// then flush out to make space...
flush
}
-
- return next
- } else {
- return null
+ return true
}
}
}
-
/**
- * Entries which have been deleted get put into the Tombstone state. Adjacent entries in the
- * Tombstone state get merged into a single entry.
+ * Loaded entries are moved into the Flushed state reduce memory usage. Once a Loaded
+ * entry is persisted, it can move into this state. This state only holds onto the
+ * the massage key so that it can reload the message from the store quickly when needed.
*/
- class Tombstone extends EntryState {
+ class Flushed(override val messageKey:Long, override val size:Int) extends EntryState {
- /** The number of adjacent entries this Tombstone represents. */
- var count = 1L
+ var loading = false
- def size = 0
- def messageKey = -1
+ override def as_flushed = this
- override def as_tombstone = this
+ override def is_flushed_or_flushing = true
- /**
- * Nothing ot dispatch in a Tombstone, move the subscriptions to the next entry.
- */
- def dispatch():QueueEntry = {
- assert(prefetched==0, "tombstones should never be prefetched.")
+ override def toString = { "flushed:{ loading: "+loading+", size:"+size+"}" }
- val next = nextOrTail
- next.addSubscriptions(subscriptions)
- subscriptions.foreach(_.advance(next))
- subscriptions = Nil
- next
- }
+ override def load() = {
+ if( !loading ) {
+// trace("Start entry load of message seq: %s", seq)
+ // start loading it back...
+ loading = true
+ queue.loading_size += size
+ queue.host.store.loadMessage(messageKey) { delivery =>
+ // pass off to a source so it can aggregate multiple
+ // loads to reduce cross thread synchronization
+ if( delivery.isDefined ) {
+ queue.dispatchQueue {
+ queue.store_load_source.merge((this, delivery.get))
+ }
+ } else {
- override def tombstone = throw new AssertionError("Tombstone entry cannot be tombstoned")
- override def toString = { "tombstone:{ count: "+count+"}" }
+ info("Detected store dropped message at seq: %d", seq)
- }
+ // Looks like someone else removed the message from the store.. lets just
+ // tombstone this entry now.
+ queue.dispatchQueue {
+ remove
+ }
+ }
+ }
+ }
+ }
+ def loaded(messageRecord:MessageRecord) = {
+ if( loading ) {
+// debug("Loaded message seq: ", seq )
+ loading = false
+ queue.loading_size -= size
- class FlushedGroup(
- /** The number of adjacent entries this FlushedGroup represents. */
- var count:Long,
- /** size in bytes of the group */
- var size:Int,
- /** The number of tombstone entries in the groups */
- var tombstones:Int ) extends EntryState {
+ val delivery = new Delivery()
+ delivery.message = ProtocolFactory.get(messageRecord.protocol).decode(messageRecord.value)
+ delivery.size = messageRecord.size
+ delivery.storeKey = messageRecord.key
+ queue.size += size
+ state = new Loaded(delivery, true)
+ } else {
+// debug("Ignoring store load of: ", messageKey)
+ }
+ }
- def items = count - tombstones
- def messageKey = -1
+ override def remove = {
+ if( loading ) {
+ loading = false
+ queue.loading_size -= size
+ }
+ super.remove
+ }
+ }
+
+ /**
+ * A FlushedRange stat is assigned entry is used to represent a rage of flushed entries.
+ *
+ * Even when entries that are Flushed can us a significant amount of memory if the queue is holding
+ * thousands of them. Multiple entries in the Flushed state can be combined into a single entry in
+ * the FlushedRange state thereby conserving even more memory. A FlushedRange entry only tracks
+ * the first, and last sequnce ids of the range. When the entry needs to be loaded from the range
+ * it replaces the FlushedRange entry with all the Flushed entries by querying the store of all the
+ * message keys for the entries in the range.
+ */
+ class FlushedRange(
+ /** the last seq id in the range */
+ var last:Long,
+ /** the number of items in the range */
+ var count:Int,
+ /** size in bytes of the range */
+ override val size:Int) extends EntryState {
var loading = false
@@ -1068,51 +1133,34 @@ class QueueEntry(val queue:Queue, val se
override def is_flushed_or_flushing = true
- override def toString = { "flushed_group:{ loading: "+loading+", items: "+items+", size: "+size+"}" }
-
- // Flushed entries can't be dispatched until
- // they get loaded.
- def dispatch():QueueEntry = {
- null
- }
+ override def toString = { "flushed_group:{ loading: "+loading+", count: "+count+", size: "+size+"}" }
override def load() = {
if( !loading ) {
loading = true
- queue.host.store.listQueueEntries(queue.queueKey, seq, seq+count-1) { records =>
+ queue.host.store.listQueueEntries(queue.queueKey, seq, last) { records =>
queue.dispatchQueue {
var item_count=0
var size_count=0
- var last:QueueEntryRecord = null
val tmpList = new LinkedNodeList[QueueEntry]()
records.foreach { record =>
-
- // if entries were not adjacent.. create tombstone entry.
- if( last!=null && (last.queueSeq+1 != record.queueSeq) ) {
- val entry = new QueueEntry(queue, last.queueSeq+1)
- entry.tombstone.as_tombstone.count = record.queueSeq - (last.queueSeq+1)
- tmpList.addLast(entry)
- }
-
val entry = new QueueEntry(queue, record.queueSeq).init(record)
tmpList.addLast(entry)
-
item_count += 1
size_count += record.size
- last = record
}
// we may need to adjust the enqueue count if entries
// were dropped at the store level
- var item_delta = (items - item_count)
+ var item_delta = (count - item_count)
val size_delta: Int = size - size_count
if ( item_delta!=0 || size_delta!=0 ) {
assert(item_delta <= 0)
assert(size_delta <= 0)
- info("Detected store dropped %d message(s) in seq range %d to %d using %d bytes", item_delta, seq, seq+count-1, size_delta)
+ info("Detected store dropped %d message(s) in seq range %d to %d using %d bytes", item_delta, seq, last, size_delta)
queue.enqueue_item_counter += item_delta
queue.enqueue_size_counter += size_delta
}
@@ -1123,103 +1171,21 @@ class QueueEntry(val queue:Queue, val se
val next = getNext
// move the subs to the first entry that we just loaded.
- subscriptions.foreach(_.advance(next))
- next.addSubscriptions(subscriptions)
+ parked.foreach(_.advance(next))
+ next :::= parked
unlink
refill_preftch_list.foreach( _.refill_prefetch )
}
}
}
- entry
}
- override def tombstone = {
- throw new AssertionError("Flush group cannbot be tombstone.");
+ override def remove = {
+ throw new AssertionError("Flushed range cannbot be removed.");
}
}
- /**
- * Entries in the Flushed state are not holding the referenced messages in memory anymore.
- * This state can transition to Loaded or Tombstone.
- *
- */
- class Flushed(val messageKey:Long, val size:Int) extends EntryState {
-
- var loading = false
-
- override def as_flushed = this
-
- override def is_flushed_or_flushing = true
-
- override def toString = { "flushed:{ loading: "+loading+", size:"+size+"}" }
-
- // Flushed entries can't be dispatched until
- // they get loaded.
- def dispatch():QueueEntry = {
- // This dispatch can happen when a subscription is holding onto lots of acquired entries
- // it can't prefetch anymore as it's waiting for ack on those messages to avoid
- // blowing it's memory limits.
- null
- }
-
- override def load() = {
- if( !loading ) {
-// trace("Start entry load of message seq: %s", seq)
- // start loading it back...
- loading = true
- queue.loading_size += size
- queue.host.store.loadMessage(messageKey) { delivery =>
- // pass off to a source so it can aggregate multiple
- // loads to reduce cross thread synchronization
- if( delivery.isDefined ) {
-// debug("Store found message seq: %d", seq)
- queue.store_load_source.merge((this, delivery.get))
- } else {
-
- info("Detected store dropped message at seq: %d", seq)
-
- // Looks like someone else removed the message from the store.. lets just
- // tombstone this entry now.
- queue.dispatchQueue {
- tombstone
- }
- }
- }
- }
- entry
- }
-
- def loaded(messageRecord:MessageRecord) = {
- if( loading ) {
-// debug("Loaded message seq: ", seq )
- loading = false
- queue.loading_size -= size
-
- val delivery = new Delivery()
- delivery.message = ProtocolFactory.get(messageRecord.protocol).decode(messageRecord.value)
- delivery.size = messageRecord.size
- delivery.storeKey = messageRecord.key
-
- queue.size += size
- state = new Loaded(delivery, true)
- } else {
-// debug("Ignoring store load of: ", messageKey)
- }
- }
-
-
- override def tombstone = {
- if( loading ) {
-// debug("Tombstoned, will ignore store load of seq: ", seq)
- loading = false
- queue.loading_size -= size
- }
- super.tombstone
- }
- }
-
-
}
@@ -1248,7 +1214,7 @@ class Subscription(queue:Queue) extends
override def toString = {
def seq(entry:QueueEntry) = if(entry==null) null else entry.seq
- "{ acquired_size: "+acquired_size+", prefetch_size: "+prefetched_size+", pos: "+seq(pos)+" prefetch_tail: "+seq(prefetch_tail)+" }"
+ "{ acquired_size: "+acquired_size+", prefetch_size: "+prefetched_size+", pos: "+seq(pos)+" prefetch_tail: "+seq(prefetch_tail)+", tail_parkings: "+tail_parkings+"}"
}
def browser = session.consumer.browser
@@ -1257,7 +1223,7 @@ class Subscription(queue:Queue) extends
pos = queue.head_entry;
session = consumer.connect(this)
session.refiller = pos
- queue.head_entry.addSubscriptions(this :: Nil)
+ queue.head_entry ::= this
if( queue.serviceState.isStarted ) {
// kick off the initial dispatch.
@@ -1267,7 +1233,7 @@ class Subscription(queue:Queue) extends
}
def close() = {
- pos.removeSubscriptions(this)
+ pos.-=(this)
invalidate_prefetch
@@ -1298,7 +1264,7 @@ class Subscription(queue:Queue) extends
assert(value!=null)
// Remove the previous pos from the prefetch counters.
- if( prefetch_tail!=null && !pos.is_tombstone) {
+ if( prefetch_tail!=null && !pos.is_head) {
remove_from_prefetch(pos)
}
advanced_size += pos.size
@@ -1329,7 +1295,7 @@ class Subscription(queue:Queue) extends
// release the prefetch counters...
var cur = pos
while (cur.seq <= prefetch_tail.seq) {
- if (!cur.is_tombstone) {
+ if (!cur.is_head) {
prefetched_size -= cur.size
cur.prefetched -= 1
}
@@ -1344,12 +1310,12 @@ class Subscription(queue:Queue) extends
* Is the specified queue entry prefeteched by this subscription?
*/
def is_prefetched(value:QueueEntry) = {
- prefetch_tail!=null && pos.seq <= value.seq && value.seq <= prefetch_tail.seq
+ prefetch_tail!=null && value!=null && pos.seq <= value.seq && value.seq <= prefetch_tail.seq
}
def add_to_prefetch(entry:QueueEntry):Unit = {
- assert( !entry.is_tombstone, "tombstones should not be prefetched..")
+ assert( !entry.is_head, "tombstones should not be prefetched..")
prefetched_size += entry.size
entry.prefetched += 1
entry.load
@@ -1386,7 +1352,7 @@ class Subscription(queue:Queue) extends
// attempts to fill the prefetch...
var next = next_prefetch_pos
while( !prefetchFull && next!=null ) {
- if( !next.is_tombstone ) {
+ if( !next.is_head ) {
add_to_prefetch(next)
}
next = next.getNext
@@ -1433,7 +1399,7 @@ class Subscription(queue:Queue) extends
// we may now be able to prefetch some messages..
acquired_size -= entry.size
- entry.tombstone // entry size changes to 0
+ entry.remove // entry size changes to 0
refill_prefetch
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=961166&r1=961165&r2=961166&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jul 7 04:13:10 2010
@@ -268,9 +268,9 @@ class VirtualHost(val broker: Broker) ex
case Some(queueKey) =>
dispatchQueue {
val queue = new Queue(this, dest, queueKey)
+ queue.start()
queues.put(dest.getName, queue)
cb(queue)
- queue.start()
}
case None => // store could not create
cb(null)
@@ -278,6 +278,7 @@ class VirtualHost(val broker: Broker) ex
}
} else {
val queue = new Queue(this, dest)
+ queue.start()
queues.put(dest.getName, queue)
cb(queue)
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala?rev=961166&r1=961165&r2=961166&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala Wed Jul 7 04:13:10 2010
@@ -206,11 +206,11 @@ class CassandraClient() {
}
}
- def listQueueEntryGroups(queueKey: Long, limit: Int): Seq[QueueEntryGroup] = {
+ def listQueueEntryGroups(queueKey: Long, limit: Int): Seq[QueueEntryRange] = {
withSession {
session =>
- var rc = ListBuffer[QueueEntryGroup]()
- var group:QueueEntryGroup = null
+ var rc = ListBuffer[QueueEntryRange]()
+ var group:QueueEntryRange = null
// TODO: this is going to bring back lots of entries.. not good.
session.list(schema.entries \ queueKey).foreach { x=>
@@ -218,10 +218,10 @@ class CassandraClient() {
val record:QueueEntryRecord = x.value
if( group == null ) {
- group = new QueueEntryGroup
- group.firstSeq = record.queueSeq
+ group = new QueueEntryRange
+ group.firstQueueSeq = record.queueSeq
}
- group.lastSeq = record.queueSeq
+ group.lastQueueSeq = record.queueSeq
group.count += 1
group.size += record.size
if( group.count == limit) {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala?rev=961166&r1=961165&r2=961166&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala Wed Jul 7 04:13:10 2010
@@ -195,7 +195,7 @@ class CassandraStore extends Store with
}
- def listQueueEntryGroups(queueKey: Long, limit: Int)(callback: (Seq[QueueEntryGroup]) => Unit) = {
+ def listQueueEntryRanges(queueKey: Long, limit: Int)(callback: (Seq[QueueEntryRange]) => Unit) = {
blocking {
callback( client.listQueueEntryGroups(queueKey, limit) )
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala?rev=961166&r1=961165&r2=961166&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala Wed Jul 7 04:13:10 2010
@@ -28,7 +28,7 @@ import org.fusesource.hawtbuf.proto.Mess
import org.fusesource.hawtbuf.proto.PBMessage
import org.apache.activemq.util.LockFile
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
-import org.fusesource.hawtdb.internal.journal.{JournalCallback, Journal, Location}
+import org.fusesource.hawtdb.internal.journal.{JournalListener, Journal, Location}
import org.fusesource.hawtdispatch.TaskTracker
import org.fusesource.hawtbuf.AsciiBuffer._
@@ -152,6 +152,20 @@ class HawtDBClient(hawtDBStore: HawtDBSt
journal.setMaxFileLength(config.journalLogSize)
journal.setMaxWriteBatchSize(config.journalBatchSize);
journal.setChecksum(true);
+ journal.setListener( new JournalListener{
+ def synced(writes: Array[Write]) = {
+ var onCompletes = List[Runnable]()
+ withTx { tx=>
+ val helper = new TxHelper(tx)
+ writes.foreach { write=>
+ val func = write.getAttachment.asInstanceOf[(TxHelper, Location)=>List[Runnable]]
+ onCompletes = onCompletes ::: func(helper, write.getLocation)
+ }
+ helper.storeRootBean
+ }
+ onCompletes.foreach( _.run )
+ }
+ })
if( config.archiveDirectory!=null ) {
journal.setDirectoryArchive(config.archiveDirectory)
@@ -169,10 +183,10 @@ class HawtDBClient(hawtDBStore: HawtDBSt
pageFileFactory.open()
val initialized = withTx { tx =>
- val helper = new TxHelper(tx)
- import helper._
-
if (!tx.allocator().isAllocated(0)) {
+ val helper = new TxHelper(tx)
+ import helper._
+
val rootPage = tx.alloc()
assert(rootPage == 0)
@@ -181,9 +195,9 @@ class HawtDBClient(hawtDBStore: HawtDBSt
rootBean.setDataFileRefIndexPage(alloc(DATA_FILE_REF_INDEX_FACTORY))
rootBean.setMessageRefsIndexPage(alloc(MESSAGE_REFS_INDEX_FACTORY))
rootBean.setSubscriptionIndexPage(alloc(SUBSCRIPTIONS_INDEX_FACTORY))
- rootBuffer = rootBean.freeze
- tx.put(DATABASE_ROOT_RECORD_ACCESSOR, 0, rootBuffer)
+ helper.storeRootBean
+
true
} else {
rootBuffer = tx.get(DATABASE_ROOT_RECORD_ACCESSOR, 0)
@@ -296,7 +310,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
}
}
- def listQueueEntryGroups(queueKey: Long, limit: Int) : Seq[QueueEntryGroup] = {
+ def listQueueEntryGroups(queueKey: Long, limit: Int) : Seq[QueueEntryRange] = {
withTx { tx =>
val helper = new TxHelper(tx)
import JavaConversions._
@@ -307,15 +321,15 @@ class HawtDBClient(hawtDBStore: HawtDBSt
if (queueRecord != null) {
val entryIndex = queueEntryIndex(queueRecord)
- var rc = ListBuffer[QueueEntryGroup]()
- var group:QueueEntryGroup = null
+ var rc = ListBuffer[QueueEntryRange]()
+ var group:QueueEntryRange = null
entryIndex.iterator.foreach { entry =>
if( group == null ) {
- group = new QueueEntryGroup
- group.firstSeq = entry.getKey.longValue
+ group = new QueueEntryRange
+ group.firstQueueSeq = entry.getKey.longValue
}
- group.lastSeq = entry.getKey.longValue
+ group.lastQueueSeq = entry.getKey.longValue
group.count += 1
group.size += entry.getValue.getSize
if( group.count == limit) {
@@ -470,11 +484,10 @@ class HawtDBClient(hawtDBStore: HawtDBSt
frozen.writeFramed(baos)
val buffer = baos.toBuffer()
- append(buffer) {
- location =>
- metric_index_update.time {
- executeStore(batch, update, onComplete, location)
- }
+ append(buffer) { (helper, location) =>
+ metric_index_update.time {
+ executeStore(helper, location, batch, update, onComplete)
+ }
}
}
@@ -484,9 +497,8 @@ class HawtDBClient(hawtDBStore: HawtDBSt
val baos = new DataByteArrayOutputStream(5)
baos.writeByte(BEGIN)
baos.writeInt(batch)
- append(baos.toBuffer) {
- location =>
- executeBegin(batch, location)
+ append(baos.toBuffer) { (helper,location) =>
+ executeBegin(helper, location, batch)
}
}
@@ -496,9 +508,8 @@ class HawtDBClient(hawtDBStore: HawtDBSt
val baos = new DataByteArrayOutputStream(5)
baos.writeByte(COMMIT)
baos.writeInt(batch)
- append(baos.toBuffer) {
- location =>
- executeCommit(batch, onComplete, location)
+ append(baos.toBuffer) { (helper,location) =>
+ executeCommit(helper, location, batch, onComplete)
}
}
@@ -506,9 +517,8 @@ class HawtDBClient(hawtDBStore: HawtDBSt
val baos = new DataByteArrayOutputStream(5)
baos.writeByte(ROLLBACK)
baos.writeInt(batch)
- append(baos.toBuffer) {
- location =>
- executeRollback(batch, onComplete, location)
+ append(baos.toBuffer) { (helper,location) =>
+ executeRollback(helper, location, batch, onComplete)
}
}
@@ -606,12 +616,16 @@ class HawtDBClient(hawtDBStore: HawtDBSt
val updateType = editor.readByte()
val batch = editor.readInt()
- updateType match {
- case BEGIN => executeBegin(batch, location)
- case COMMIT => executeCommit(batch, null, location)
- case _ =>
- val update = decode(location, updateType, data)
- executeStore(batch, update, null, location)
+ withTx { tx=>
+ val helper = new TxHelper(tx)
+ updateType match {
+ case BEGIN => executeBegin(helper, location, batch)
+ case COMMIT => executeCommit(helper, location, batch, null)
+ case _ =>
+ val update = decode(location, updateType, data)
+ executeStore(helper, location, batch, update, null)
+ }
+ helper.storeRootBean
}
recoveryCounter += 1
@@ -624,14 +638,13 @@ class HawtDBClient(hawtDBStore: HawtDBSt
//
/////////////////////////////////////////////////////////////////////
- private def append(data: Buffer)(cb: (Location) => Unit): Unit = {
+ private def append(data: Buffer)(cb: (TxHelper, Location) => List[Runnable]): Unit = {
metric_journal_append.start { end =>
- journal.write(data, new JournalCallback() {
- def success(location: Location) = {
- end()
- cb(location)
- }
- })
+ def cbintercept(tx:TxHelper,location:Location) = {
+ end()
+ cb(tx, location)
+ }
+ journal.write(data, cbintercept _ )
}
}
@@ -644,74 +657,70 @@ class HawtDBClient(hawtDBStore: HawtDBSt
//
/////////////////////////////////////////////////////////////////////
- private def executeBegin(batch: Int, location: Location): Unit = {
+ private def executeBegin(helper:TxHelper, location: Location, batch: Int):List[Runnable] = {
assert(batches.get(batch).isEmpty)
batches.put(batch, (location, ListBuffer()))
+ Nil
}
- private def executeCommit(batch: Int, onComplete: Runnable, location: Location): Unit = {
+ private def executeCommit(helper:TxHelper, location: Location, batch: Int, onComplete: Runnable):List[Runnable] = {
// apply all the updates in the batch as a single unit of work.
batches.remove(batch) match {
case Some((_, updates)) =>
// When recovering.. we only want to redo updates that committed
// after the last update location.
if (!recovering || isAfterLastUpdateLocation(location)) {
- withTx { tx =>
- val helper = new TxHelper(tx)
- // index the updates
- updates.foreach {
- update =>
- index(helper, update.update, update.location)
- }
- helper.updateLocations(location)
- }
+ // index the updates
+ updates.foreach {
+ update =>
+ index(helper, update.update, update.location)
+ }
+ helper.updateLocations(location)
}
case None =>
// when recovering.. we are more lax due recovery starting
// in the middle of a stream of in progress batches
assert(recovering)
}
- if (onComplete != null) {
- onComplete.run
+ if(onComplete!=null) {
+ return List(onComplete)
+ } else {
+ Nil
}
}
- private def executeRollback(batch: Int, onComplete: Runnable, location: Location): Unit = {
+ private def executeRollback(helper:TxHelper, location: Location, batch: Int, onComplete: Runnable): List[Runnable] = {
// apply all the updates in the batch as a single unit of work.
batches.remove(batch) match {
case Some((_, _)) =>
if (!recovering || isAfterLastUpdateLocation(location)) {
- withTx { tx =>
- val helper = new TxHelper(tx)
- helper.updateLocations(location)
- }
+ helper.updateLocations(location)
}
case None =>
// when recovering.. we are more lax due recovery starting
// in the middle of a stream of in progress batches
assert(recovering)
}
- if (onComplete != null) {
- onComplete.run
+ if(onComplete!=null) {
+ return List(onComplete)
+ } else {
+ Nil
}
}
- private def executeStore(batch: Int, update: TypeCreatable, onComplete: Runnable, location: Location): Unit = {
+ private def executeStore(helper:TxHelper, location: Location, batch: Int, update: TypeCreatable, onComplete: Runnable): List[Runnable] = {
if (batch == -1) {
// update is not part of the batch..
// When recovering.. we only want to redo updates that happen
// after the last update location.
if (!recovering || isAfterLastUpdateLocation(location)) {
- withTx { tx =>
- val helper = new TxHelper(tx)
- index(helper, update, location)
- helper.updateLocations(location)
- }
+ index(helper, update, location)
+ helper.updateLocations(location)
}
- if (onComplete != null) {
- onComplete.run
+ if ( onComplete != null) {
+ return List(onComplete)
}
} else {
@@ -728,6 +737,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
assert(recovering)
}
}
+ return Nil
}
@@ -1064,6 +1074,9 @@ class HawtDBClient(hawtDBStore: HawtDBSt
} else {
rootBean.setFirstBatchLocation(batches.head._2._1)
}
+ }
+
+ def storeRootBean() = {
rootBuffer = rootBean.freeze
_tx.put(DATABASE_ROOT_RECORD_ACCESSOR, 0, rootBuffer)
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala?rev=961166&r1=961165&r2=961166&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala Wed Jul 7 04:13:10 2010
@@ -183,7 +183,7 @@ class HawtDBStore extends Store with Bas
}
}
- def listQueueEntryGroups(queueKey: Long, limit: Int)(callback: (Seq[QueueEntryGroup]) => Unit) = {
+ def listQueueEntryRanges(queueKey: Long, limit: Int)(callback: (Seq[QueueEntryRange]) => Unit) = {
executor_pool ^{
callback( client.listQueueEntryGroups(queueKey, limit) )
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryRange.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryRange.java?rev=961166&r1=961165&r2=961166&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryRange.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryRange.java Wed Jul 7 04:13:10 2010
@@ -22,8 +22,8 @@ import org.fusesource.hawtbuf.Buffer;
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public class QueueEntryRange {
- public long firstSeq;
- public long lastSeq;
+ public long firstQueueSeq;
+ public long lastQueueSeq;
public int count;
public int size;
}
\ No newline at end of file
Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala?rev=961166&r1=961165&r2=961166&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala Wed Jul 7 04:13:10 2010
@@ -74,10 +74,11 @@ trait Store extends Service {
def listQueues(callback: (Seq[Long])=>Unit )
/**
- * Groups all the entries in the specified queue into groups containing limit entries and returns those
- * groups. Allows you to quickly get a rough idea of the items in queue without consuming too much memory.
+ * Groups all the entries in the specified queue into ranges containing up limit entries
+ * big and returns those ranges. Allows you to incrementally, load all the entries in
+ * a queue.
*/
- def listQueueEntryGroups(queueKey:Long, limit:Int)(callback:(Seq[QueueEntryGroup])=>Unit )
+ def listQueueEntryRanges(queueKey:Long, limit:Int)(callback:(Seq[QueueEntryRange])=>Unit )
/**
* Loads all the queue entry records for the given queue id between the first and last provided