You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/04/14 18:27:37 UTC
svn commit: r1092388 -
/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Author: chirino
Date: Thu Apr 14 16:27:37 2011
New Revision: 1092388
URL: http://svn.apache.org/viewvc?rev=1092388&view=rev
Log:
Simplify setting up the session refiller.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1092388&r1=1092387&r2=1092388&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Thu Apr 14 16:27:37 2011
@@ -674,7 +674,8 @@ class QueueEntry(val queue:Queue, val se
* Dispatches this entry to the consumers and continues dispatching subsequent
* entries as long as the dispatch results in advancing in their dispatch position.
*/
- def run() = queue.dispatch_queue {
+ def run() = {
+ queue.assert_executing
var next = this;
while( next!=null && next.dispatch) {
next = next.getNext
@@ -1371,7 +1372,7 @@ class Subscription(val queue:Queue, val
assert(pos!=null)
session = consumer.connect(this)
- session.refiller = pos
+ session.refiller = dispatch_queue.runnable { pos.run }
queue.head_entry ::= this
queue.all_subscriptions += consumer -> this
@@ -1431,7 +1432,6 @@ class Subscription(val queue:Queue, val
advanced_size += pos.size
pos = value
- session.refiller = pos
if( tail_parked ) {
tail_parkings += 0
@@ -1450,7 +1450,6 @@ class Subscription(val queue:Queue, val
pos -= this
value ::= this
pos = value
- session.refiller = value
queue.dispatch_queue << value // queue up the entry to get dispatched..
}