You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/02/15 00:31:35 UTC

svn commit: r1446395 - in /activemq/activemq-apollo/trunk: apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/ apollo-op...

Author: chirino
Date: Thu Feb 14 23:31:34 2013
New Revision: 1446395

URL: http://svn.apache.org/r1446395
Log:
When messages were being moved to the DLQ they would occasionally get into a bad state.  We now handle the case were a swap out gets canceled much better.

Modified:
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
    activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/DeadLetterQueueLoadTest.scala

Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala?rev=1446395&r1=1446394&r2=1446395&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala Thu Feb 14 23:31:34 2013
@@ -787,14 +787,11 @@ class AmqpProtocolHandler extends Protoc
   class AmqpProducerRoute(val id:Long, val receiver: Receiver, val addresses: Array[SimpleAddress]) extends DeliveryProducerRoute(host.router) with ProducerSupport {
 
     val key = addresses.toList
-    var is_connected = false
 
     override def send_buffer_size = buffer_size
 
     override def connection = Some(AmqpProtocolHandler.this.connection)
 
-    override def connected() = is_connected = true
-
     override def dispatch_queue = queue
 
     val producer_overflow = new OverflowSink[Delivery](this) {

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1446395&r1=1446394&r2=1446395&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Thu Feb 14 23:31:34 2013
@@ -439,8 +439,9 @@ class Queue(val router: LocalRouter, val
     rc.consumer_count = cur.parked.size
     rc.is_prefetched = cur.prefetched
     rc.state = cur.label
-    if( cur.acquiring_subscription != null ) {
-      rc.acquirer = cur.acquiring_subscription.create_link_dto(false)
+    rc.acquirer = cur.acquiring_subscription match {
+      case sub:Subscription => sub.create_link_dto(false)
+      case _ => null
     }
     rc
   }
@@ -700,9 +701,12 @@ class Queue(val router: LocalRouter, val
         queue_delivery.seq = entry.seq
         entry.init(queue_delivery)
         
-        if( tune_persistent ) {
-          queue_delivery.uow = delivery.uow
+        val uow = if( tune_persistent ) {
+          delivery.uow
+        } else {
+          null
         }
+        queue_delivery.uow = uow
 
         entries.addLast(entry)
         enqueue_item_counter += 1
@@ -713,8 +717,7 @@ class Queue(val router: LocalRouter, val
         enqueue_remaining_take(entry.size)
 
         // Do we need to do a persistent enqueue???
-        val persisted = queue_delivery.uow != null
-        if (persisted) {
+        if (uow != null) {
           entry.state match {
             case state:entry.Loaded => state.store
             case state:entry.Swapped => delivery.uow.enqueue(entry.toQueueEntryRecord)
@@ -730,7 +733,7 @@ class Queue(val router: LocalRouter, val
         if( entry.isLinked ) {
           if( !consumers_keeping_up_historically  ) {
             entry.swap(true)
-          } else if( entry.as_loaded.is_acquired && persisted) {
+          } else if( entry.is_acquired && uow != null) {
             // If the message as dispatched and it's marked to get persisted anyways,
             // then it's ok if it falls out of memory since we won't need to load it again.
             entry.swap(false)
@@ -742,8 +745,8 @@ class Queue(val router: LocalRouter, val
         }
 
         // release the store batch...
-        if (persisted) {
-          queue_delivery.uow.release
+        if (uow != null) {
+          uow.release
           queue_delivery.uow = null
         }
 
@@ -1070,19 +1073,27 @@ class Queue(val router: LocalRouter, val
     override def connection = None
     override def dispatch_queue = Queue.this.dispatch_queue
   }
+
   var dlq_route:DlqProducerRoute = _
-  
+  var dlq_overflow:OverflowSink[(Delivery, (StoreUOW)=>Unit)] = _
+
   def dead_letter(original_uow:StoreUOW, entry:QueueEntry)(removeFunc: (StoreUOW)=>Unit) = {
     assert_executing
     if( config.dlq==null ) {
-      removeFunc(original_uow)
+      removeFunc(null)
     } else {
 
-      def complete(delivery:Delivery) = {
-        delivery.uow = original_uow
-        delivery.ack = (result, uow) => {
-          dispatch_queue {
-            removeFunc(uow)
+      def complete(original_delivery:Delivery) = {
+        assert_executing
+        val delivery = original_delivery.copy()
+        delivery.uow = if(delivery.storeKey == -1) {
+          null
+        } else {
+          if( original_uow == null ) {
+            create_uow
+          } else {
+            original_uow.retain()
+            original_uow
           }
         }
         delivery.expiration=0
@@ -1093,34 +1104,47 @@ class Queue(val router: LocalRouter, val
           router.virtual_host.dispatch_queue {
             val rc = router.connect(dlq_route.addresses, dlq_route, null)
             assert( rc == None ) // Not expecting this to ever fail.
-            dlq_route.dispatch_queue {
-              dlq_route.offer(delivery)
+          }
+
+          dlq_overflow = new OverflowSink[(Delivery, (StoreUOW)=>Unit)](dlq_route.flatMap{ x =>
+            Some(x._1)
+          }) {
+            override protected def onDelivered(value: (Delivery, (StoreUOW) => Unit)) {
+              val (delivery, callback) = value;
+              callback(delivery.uow)
+              if( delivery.uow!=null ) {
+                delivery.uow.release()
+              }
             }
           }
-        } else {
-          dlq_route.offer(delivery)
         }
+        dlq_overflow.offer((delivery, removeFunc))
       }
 
       entry.state match {
         case x:entry.Loaded=>
           if( x.swapping_out ) {
+            x.acquirer = DeadLetterHandler
             x.on_swap_out ::=( ()=> {
-              complete(entry.state.asInstanceOf[entry.Swapped].to_delivery)
+              complete(entry.state match {
+                case state:entry.Swapped=>
+                  state.to_delivery
+                case state:entry.Loaded =>
+                  state.delivery
+                case state => sys.error("Unexpected type: "+state)
+              })
             })
           } else {
-            complete(x.delivery.copy())
+            complete(x.delivery)
           }
         case x:entry.Swapped=>
           complete(x.to_delivery)
         case _ =>
           throw new Exception("Invalid queue entry state, it cannot be DQLed.")
       }
-
     }
   }
-  
-  
+
   def drain_acks = might_unfill {
     val end = System.nanoTime()
     ack_source.getData.foreach {
@@ -1145,7 +1169,7 @@ class Queue(val router: LocalRouter, val
             var limit = dlq_nak_limit
             if( limit>0 && entry.entry.redelivery_count >= limit ) {
               dead_letter(uow, entry.entry) { uow =>
-                entry.ack(uow)
+                entry.remove(uow)
               }
             } else {
               entry.nack

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala?rev=1446395&r1=1446394&r2=1446395&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala Thu Feb 14 23:31:34 2013
@@ -72,7 +72,7 @@ class QueueEntry(val queue:Queue, val se
       state = new Swapped(delivery.storeKey, delivery.storeLocator, delivery.size, delivery.expiration, 0, null, delivery.sender)
     } else {
       queue.producer_swapped_in += delivery
-      state = new Loaded(delivery, false, queue.producer_swapped_in)
+      state = new Loaded(delivery, delivery.storeKey != -1, queue.producer_swapped_in)
     }
     this
   }
@@ -298,7 +298,7 @@ class QueueEntry(val queue:Queue, val se
      * Is the entry acquired by a subscription.
      */
     def is_acquired = acquiring_subscription!=null
-    def acquiring_subscription:Subscription = null
+    def acquiring_subscription:Acquirer = null
 
     /**
      * @returns true if the entry is either swapped or swapping.
@@ -400,7 +400,7 @@ class QueueEntry(val queue:Queue, val se
 
     assert( delivery!=null, "delivery cannot be null")
 
-    var acquirer:Subscription = _
+    var acquirer:Acquirer = _
     override def acquiring_subscription = acquirer
 
     override def memory_space = space
@@ -471,6 +471,7 @@ class QueueEntry(val queue:Queue, val se
 
           // The storeBatch is only set when called from the messages.offer method
           if( delivery.uow!=null ) {
+            assert( delivery.storeKey != -1 )
             if( asap ) {
               delivery.uow.complete_asap
             }
@@ -519,22 +520,29 @@ class QueueEntry(val queue:Queue, val se
       delivery.uow = null
       if( swapping_out ) {
         swapping_out = false
-        space -= delivery
 
         if( store_wrote_to_disk ) {
+
+          space -= delivery
           queue.swap_out_size_counter += size
           queue.swap_out_item_counter += 1
-        }
 
-        state = new Swapped(delivery.storeKey, delivery.storeLocator, size, expiration, redelivery_count, acquirer, sender)
-        if( can_combine_with_prev ) {
-          getPrevious.as_swapped_range.combineNext
-        }
-        if( remove_pending ) {
-          state.remove
+          state = new Swapped(delivery.storeKey, delivery.storeLocator, size, expiration, redelivery_count, acquirer, sender)
+
+          if( remove_pending ) {
+            state.remove
+          } else {
+
+            if( can_combine_with_prev ) {
+              getPrevious.as_swapped_range.combineNext
+            }
+
+            queue.loaded_items -= 1
+            queue.loaded_size -= size
+          }
+
         } else {
-          queue.loaded_items -= 1
-          queue.loaded_size -= size
+          delivery.storeKey = -1
         }
 
         fire_swap_out_watchers
@@ -720,7 +728,9 @@ class QueueEntry(val queue:Queue, val se
    * entry is persisted, it can move into this state.  This state only holds onto the
    * the massage key so that it can reload the message from the store quickly when needed.
    */
-  class Swapped(override val message_key:Long, override val message_locator:AtomicReference[Object], override val size:Int, override val expiration:Long, var _redeliveries:Short, var acquirer:Subscription, override  val sender:List[DestinationAddress]) extends EntryState {
+  class Swapped(override val message_key:Long, override val message_locator:AtomicReference[Object], override val size:Int, override val expiration:Long, var _redeliveries:Short, var acquirer:Acquirer, override  val sender:List[DestinationAddress]) extends EntryState {
+
+    assert( message_key!= -1 )
 
     queue.individual_swapped_items += 1
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1446395&r1=1446394&r2=1446395&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Thu Feb 14 23:31:34 2013
@@ -216,8 +216,18 @@ abstract class DeliveryProducerRoute(rou
   } else {
     null
   }
+  var is_connected = false
 
   def connected() = defer {
+    is_connected = true
+    if( overflow!=null ) {
+      val t = overflow
+      overflow = null
+      _offer(t)
+      if( refiller!=null && !full ) {
+        refiller.run()
+      }
+    }
     on_connected
   }
 
@@ -258,6 +268,7 @@ abstract class DeliveryProducerRoute(rou
       debug("producer route detaching from consumer.")
       x.close
     }
+    is_connected = false
   }
 
   protected def on_connected = {}
@@ -276,76 +287,89 @@ abstract class DeliveryProducerRoute(rou
 
   def full = overflow!=null
 
-  def offer(delivery:Delivery) = {
+  def offer(delivery:Delivery):Boolean = {
     dispatch_queue.assertExecuting()
     if( full ) {
       false
     } else {
-      last_send = Broker.now
+      if (delivery.uow != null) {
+        delivery.uow.retain()
+      }
+      if ( !is_connected ) {
+        overflow = delivery
+      } else {
+        _offer(delivery)
+      }
+      return true
+    }
+  }
 
-      // Do we need to store the message if we have a matching consumer?
-      var matching_targets = 0
-      val original_ack = delivery.ack
-      val copy = delivery.copy
-
-        if ( original_ack!=null ) {
-        copy.ack = (result, uow)=> {
-          defer {
-            matching_targets -= 1
-            if ( matching_targets<= 0 && copy.ack!=null ) {
-              copy.ack = null
-              if (delivery.uow != null) {
-                delivery.uow.on_complete {
-                  defer {
-                    original_ack(Consumed, null)
-                  }
+  private def _offer(delivery:Delivery):Boolean = {
+    last_send = Broker.now
+
+    // Do we need to store the message if we have a matching consumer?
+    var matching_targets = 0
+    val original_ack = delivery.ack
+    val copy = delivery.copy
+    copy.uow = delivery.uow
+
+    if ( original_ack!=null ) {
+      copy.ack = (result, uow)=> {
+        defer {
+          matching_targets -= 1
+          if ( matching_targets<= 0 && copy.ack!=null ) {
+            copy.ack = null
+            if (delivery.uow != null) {
+              delivery.uow.on_complete {
+                defer {
+                  original_ack(Consumed, null)
                 }
-              } else {
-                original_ack(Consumed, null)
               }
+            } else {
+              original_ack(Consumed, null)
             }
           }
         }
       }
+    }
 
-      if(copy.message!=null) {
-        copy.message.retain
-      }
-      
-      targets.foreach { target=>
+    if(copy.message!=null) {
+      copy.message.retain
+    }
 
-        // only deliver to matching consumers
-        if( target.consumer.matches(copy) ) {
-          matching_targets += 1
-          if ( target.consumer.is_persistent && copy.persistent && store != null) {
+    targets.foreach { target=>
 
-            if (copy.uow == null) {
-              copy.uow = store.create_uow
-            }
+      // only deliver to matching consumers
+      if( target.consumer.matches(copy) ) {
+        matching_targets += 1
+        if ( target.consumer.is_persistent && copy.persistent && store != null) {
 
-            if( copy.storeKey == -1L ) {
-              copy.storeLocator = new AtomicReference[Object]()
-              copy.storeKey = copy.uow.store(copy.createMessageRecord)
-            }
+          if (copy.uow == null) {
+            copy.uow = store.create_uow
           }
 
-          if( !target.offer(copy) ) {
-            overflowSessions ::= target
+          if( copy.storeKey == -1L ) {
+            copy.storeLocator = new AtomicReference[Object]()
+            copy.storeKey = copy.uow.store(copy.createMessageRecord)
           }
         }
-      }
 
-      if ( matching_targets == 0 && original_ack!=null ) {
-        original_ack(Consumed, null)
+        if( !target.offer(copy) ) {
+          overflowSessions ::= target
+        }
       }
+    }
 
-      if( overflowSessions!=Nil ) {
-        overflow = copy
-      } else {
-        release(copy)
-      }
-      true
+    if ( matching_targets == 0 && original_ack!=null ) {
+      original_ack(Consumed, null)
     }
+
+    if( overflowSessions!=Nil ) {
+      overflow = copy
+    } else {
+      release(copy)
+    }
+    true
   }
 
 
@@ -359,19 +383,23 @@ abstract class DeliveryProducerRoute(rou
   }
 
   val drainer = ^{
-    if( overflow!=null ) {
-      val original = overflowSessions;
-      overflowSessions = Nil
-      original.foreach { target=>
-        if( !target.offer(overflow) ) {
-          overflowSessions ::= target
+    if( is_connected ) {
+      if( overflow!=null ) {
+        val original = overflowSessions;
+        overflowSessions = Nil
+        original.foreach { target=>
+          if( !target.offer(overflow) ) {
+            overflowSessions ::= target
+          }
         }
-      }
-      if( overflowSessions==Nil ) {
-        release(overflow)
-        overflow = null
-        if(refiller!=null)
-          refiller.run
+        if( overflowSessions==Nil ) {
+          release(overflow)
+          overflow = null
+          if(refiller!=null)
+            refiller.run
+        }
+      } else if(refiller!=null) {
+        refiller.run
       }
     }
   }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala?rev=1446395&r1=1446394&r2=1446395&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala Thu Feb 14 23:31:34 2013
@@ -23,6 +23,9 @@ import org.apache.activemq.apollo.util._
 import org.apache.activemq.apollo.util.list._
 import org.apache.activemq.apollo.dto.QueueConsumerLinkDTO
 
+trait Acquirer
+object DeadLetterHandler extends Acquirer
+
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
@@ -35,7 +38,7 @@ object Subscription extends Log
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class Subscription(val queue:Queue, val consumer:DeliveryConsumer) extends DeliveryProducer with Dispatched with StallCheckSupport {
+class Subscription(val queue:Queue, val consumer:DeliveryConsumer) extends Acquirer with DeliveryProducer with Dispatched with StallCheckSupport {
   import Subscription._
 
   def dispatch_queue = queue.dispatch_queue
@@ -366,14 +369,18 @@ class Subscription(val queue:Queue, val 
     def ack(uow:StoreUOW):Unit = {
       assert_executing
       if(!isLinked) {
-        debug("Unexpected ack: message seq allready acked: "+entry.seq)
+        debug("Unexpected ack: message seq already acked: "+entry.seq)
         return
       }
 
-      val next = entry.getNext
-
       total_ack_count += 1
       total_ack_size += entry.size
+      remove(uow)
+    }
+
+    def remove(uow:StoreUOW):Unit = {
+      assert_executing
+      val next = entry.getNext
       entry.dequeue(uow)
 
       // removes this entry from the acquired list.

Modified: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala?rev=1446395&r1=1446394&r2=1446395&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala Thu Feb 14 23:31:34 2013
@@ -863,9 +863,16 @@ case class MqttSession(host_state:HostSt
     override def send_buffer_size = handler.codec.getReadBufferSize
     override def connection = Some(handler.connection)
     override def dispatch_queue = queue
-    refiller = ^{
-      handler.resume_read
+
+    var suspended = false
+
+    refiller = ^ {
+      if( suspended ) {
+        suspended = false
+        handler.resume_read
+      }
     }
+
   }
 
   def on_mqtt_publish(publish:PUBLISH):Unit = {
@@ -907,7 +914,7 @@ case class MqttSession(host_state:HostSt
     }
   }
 
-  def send_via_route(route:DeliveryProducerRoute, publish:PUBLISH):Unit = {
+  def send_via_route(route:MqttProducerRoute, publish:PUBLISH):Unit = {
     queue.assertExecuting()
 
     def at_least_once_ack(r:DeliveryResult, uow:StoreUOW):Unit = queue {
@@ -952,6 +959,7 @@ case class MqttSession(host_state:HostSt
       route.offer(delivery)
       if( route.full ) {
         // but once it gets full.. suspend to flow control the producer.
+        route.suspended = true
         handler.get.suspend_read("blocked sending to: "+route.overflowSessions.mkString(", "))
       }
 

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1446395&r1=1446394&r2=1446395&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Thu Feb 14 23:31:34 2013
@@ -632,8 +632,13 @@ class OpenwireProtocolHandler extends Pr
 
     override def connection = Some(OpenwireProtocolHandler.this.connection)
     override def dispatch_queue = queue
+    var suspended = false
+
     refiller = ^ {
-      resume_read
+      if( suspended ) {
+        suspended = false
+        resume_read
+      }
     }
   }
 
@@ -645,6 +650,9 @@ class OpenwireProtocolHandler extends Pr
         val addresses = to_destination_dto(msg.getDestination, this)
         val route = OpenwireDeliveryProducerRoute(addresses)
 
+        if( uow!=null ) {
+          uow.retain()
+        }
         // don't process frames until producer is connected...
         suspend_read("connecting producer route")
         host.dispatch_queue {
@@ -660,6 +668,9 @@ class OpenwireProtocolHandler extends Pr
                   send_via_route(route, msg, uow)
                 }
             }
+            if( uow!=null ) {
+              uow.release()
+            }
           }
         }
 
@@ -670,7 +681,7 @@ class OpenwireProtocolHandler extends Pr
     }
   }
 
-  def send_via_route(route:DeliveryProducerRoute, message:ActiveMQMessage, uow:StoreUOW) = {
+  def send_via_route(route:OpenwireDeliveryProducerRoute, message:ActiveMQMessage, uow:StoreUOW) = {
     if( !route.targets.isEmpty ) {
 
       // We may need to add some headers..
@@ -701,6 +712,7 @@ class OpenwireProtocolHandler extends Pr
       if( route.full ) {
         // but once it gets full.. suspend, so that we get more messages
         // until it's not full anymore.
+        route.suspended = true
         suspend_read("blocked destination: "+route.overflowSessions.mkString(", "))
       }
 

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1446395&r1=1446394&r2=1446395&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Thu Feb 14 23:31:34 2013
@@ -1158,10 +1158,13 @@ class StompProtocolHandler extends Proto
     override def connection = Some(StompProtocolHandler.this.connection)
 
     override def dispatch_queue = queue
-
+    var suspended = false
 
     refiller = ^ {
-      resume_read
+      if( suspended ) {
+        resume_read
+        suspended = false
+      }
     }
 
 
@@ -1239,7 +1242,10 @@ class StompProtocolHandler extends Proto
         val trimmed_dest = dest.deepCopy().ascii()
         // create the producer route...
         val route = new StompProducerRoute(trimmed_dest)   // don't process frames until producer is connected...
-        connection.transport.suspendRead
+        suspend_read("Connecting to destination")
+        if( uow !=null ) {
+          uow.retain()
+        }
         host.dispatch_queue {
           val rc = host.router.connect(route.addresses, route, security_context)
           dispatchQueue {
@@ -1254,6 +1260,9 @@ class StompProtocolHandler extends Proto
                   send_via_route(route.addresses, route, frame, uow)
                 }
             }
+            if( uow !=null ) {
+              uow.release()
+            }
           }
         }
 
@@ -1357,7 +1366,7 @@ class StompProtocolHandler extends Proto
     rc
   }
 
-  def send_via_route(addresses: Array[SimpleAddress], route:DeliveryProducerRoute, frame:StompFrame, uow:StoreUOW) = {
+  def send_via_route(addresses: Array[SimpleAddress], route:StompProducerRoute, frame:StompFrame, uow:StoreUOW) = {
     var storeBatch:StoreUOW=null
 
     // User might be asking for ack that we have processed the message..
@@ -1402,6 +1411,7 @@ class StompProtocolHandler extends Proto
       if( route.full ) {
         // but once it gets full.. suspend, so that we get more stomp messages
         // until it's not full anymore.
+        route.suspended = true
         suspend_read("blocked sending to: "+route.overflowSessions.mkString(", "))
       }
 

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/DeadLetterQueueLoadTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/DeadLetterQueueLoadTest.scala?rev=1446395&r1=1446394&r2=1446395&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/DeadLetterQueueLoadTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/DeadLetterQueueLoadTest.scala Thu Feb 14 23:31:34 2013
@@ -26,14 +26,14 @@ class DeadLetterQueueLoadTest extends St
   override def broker_config_uri: String = "xml:classpath:apollo-stomp-bdb.xml"
 
   for (i <- 1 to 16 )
-  test("naker."+i) {
+  test("naker.load."+i) {
     connect("1.1")
     val dlq_client = connect("1.1", new StompClient)
-    subscribe("0", "/queue/nacker."+i, "client", false, "", false)
-    subscribe("dlq", "/queue/dlq.nacker."+i, "auto", false, "", false, c=dlq_client)
+    subscribe("0", "/queue/nacker.load."+i, "client", false, "", false)
+    subscribe("dlq", "/queue/dlq.nacker.load."+i, "client", false, "", false, c=dlq_client)
 
     for( j <- 1 to 1000 ) {
-      async_send("/queue/nacker."+i, j)
+      async_send("/queue/nacker.load."+i, j)
       assert_received(j, "0")(false)
       assert_received(j, "0")(false)
       // It should be sent to the DLQ after the 2nd nak