You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/02/21 00:45:35 UTC

svn commit: r1291527 - in /activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker: Router.scala Sink.scala

Author: chirino
Date: Mon Feb 20 23:45:35 2012
New Revision: 1291527

URL: http://svn.apache.org/viewvc?rev=1291527&view=rev
Log:
refill sinks more eagerly to avoid stalls.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1291527&r1=1291526&r2=1291527&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Mon Feb 20 23:45:35 2012
@@ -274,6 +274,7 @@ abstract class DeliveryProducerRoute(rou
   def full = overflow!=null
 
   def offer(delivery:Delivery) = {
+    dispatch_queue.assertExecuting()
     if( full ) {
       false
     } else {

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=1291527&r1=1291526&r2=1291527&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala Mon Feb 20 23:45:35 2012
@@ -16,12 +16,10 @@
  */
 package org.apache.activemq.apollo.broker
 
-import _root_.org.fusesource.hawtdispatch._
 import org.fusesource.hawtdispatch._
 import java.util.LinkedList
 import org.fusesource.hawtdispatch.transport.Transport
 import collection.mutable.HashSet
-import java.util.concurrent.atomic.AtomicLong
 
 /**
  * <p>
@@ -266,10 +264,9 @@ class CreditWindowFilter[T](val downstre
   }
 
   def credit(byte_credits:Int, delivery_credits:Int) = {
-    val was_full = full
     this.byte_credits += byte_credits
     this.delivery_credits += delivery_credits
-    if( was_full && !full ) {
+    if( !full ) {
       refiller.run()
     }
   }
@@ -413,9 +410,8 @@ class Session[T](val producer_queue:Disp
   private var rejection_handler: (T)=>Unit = _
   
   private def add_credits(value:Int) = {
-    val was_full = _full
     credits += value;
-    if( was_full && !_full ) {
+    if( value > 0 && !_full ) {
       refiller.run
     }
   }
@@ -524,7 +520,6 @@ class QueueSink[T](val sizer:Sizer[T], v
     // When a message is delivered to the consumer, we release
     // used capacity in the outbound queue, and can drain the inbound
     // queue
-    val wasBlocking = full
     size -= amount
     if( !is_empty ) {
       drain