You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/04/14 18:27:37 UTC

svn commit: r1092388 - /activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala

Author: chirino
Date: Thu Apr 14 16:27:37 2011
New Revision: 1092388

URL: http://svn.apache.org/viewvc?rev=1092388&view=rev
Log:
Simplify setting up the session refiller.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1092388&r1=1092387&r2=1092388&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Thu Apr 14 16:27:37 2011
@@ -674,7 +674,8 @@ class QueueEntry(val queue:Queue, val se
    * Dispatches this entry to the consumers and continues dispatching subsequent
    * entries as long as the dispatch results in advancing in their dispatch position.
    */
-  def run() = queue.dispatch_queue {
+  def run() = {
+    queue.assert_executing
     var next = this;
     while( next!=null && next.dispatch) {
       next = next.getNext
@@ -1371,7 +1372,7 @@ class Subscription(val queue:Queue, val 
     assert(pos!=null)
 
     session = consumer.connect(this)
-    session.refiller = pos
+    session.refiller = dispatch_queue.runnable { pos.run }
     queue.head_entry ::= this
 
     queue.all_subscriptions += consumer -> this
@@ -1431,7 +1432,6 @@ class Subscription(val queue:Queue, val 
     advanced_size += pos.size
 
     pos = value
-    session.refiller = pos
 
     if( tail_parked ) {
       tail_parkings += 0
@@ -1450,7 +1450,6 @@ class Subscription(val queue:Queue, val 
     pos -= this
     value ::= this
     pos = value
-    session.refiller = value
     queue.dispatch_queue << value // queue up the entry to get dispatched..
   }