You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/02/15 14:19:36 UTC

svn commit: r1446566 - in /activemq/activemq-apollo/trunk: apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/ apollo-op...

Author: chirino
Date: Fri Feb 15 13:19:35 2013
New Revision: 1446566

URL: http://svn.apache.org/r1446566
Log:
Revert "When messages were being moved to the DLQ they would occasionally get into a bad state.  We now handle the case were a swap out gets canceled much better."
Introduces some regressions.

This reverts commit 3da03ce728687d683a9e12249aa730fafdb5075c.

Modified:
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
    activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/DeadLetterQueueLoadTest.scala

Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala?rev=1446566&r1=1446565&r2=1446566&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala Fri Feb 15 13:19:35 2013
@@ -787,11 +787,14 @@ class AmqpProtocolHandler extends Protoc
   class AmqpProducerRoute(val id:Long, val receiver: Receiver, val addresses: Array[SimpleAddress]) extends DeliveryProducerRoute(host.router) with ProducerSupport {
 
     val key = addresses.toList
+    var is_connected = false
 
     override def send_buffer_size = buffer_size
 
     override def connection = Some(AmqpProtocolHandler.this.connection)
 
+    override def connected() = is_connected = true
+
     override def dispatch_queue = queue
 
     val producer_overflow = new OverflowSink[Delivery](this) {

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1446566&r1=1446565&r2=1446566&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Fri Feb 15 13:19:35 2013
@@ -439,9 +439,8 @@ class Queue(val router: LocalRouter, val
     rc.consumer_count = cur.parked.size
     rc.is_prefetched = cur.prefetched
     rc.state = cur.label
-    rc.acquirer = cur.acquiring_subscription match {
-      case sub:Subscription => sub.create_link_dto(false)
-      case _ => null
+    if( cur.acquiring_subscription != null ) {
+      rc.acquirer = cur.acquiring_subscription.create_link_dto(false)
     }
     rc
   }
@@ -701,12 +700,9 @@ class Queue(val router: LocalRouter, val
         queue_delivery.seq = entry.seq
         entry.init(queue_delivery)
         
-        val uow = if( tune_persistent ) {
-          delivery.uow
-        } else {
-          null
+        if( tune_persistent ) {
+          queue_delivery.uow = delivery.uow
         }
-        queue_delivery.uow = uow
 
         entries.addLast(entry)
         enqueue_item_counter += 1
@@ -717,7 +713,8 @@ class Queue(val router: LocalRouter, val
         enqueue_remaining_take(entry.size)
 
         // Do we need to do a persistent enqueue???
-        if (uow != null) {
+        val persisted = queue_delivery.uow != null
+        if (persisted) {
           entry.state match {
             case state:entry.Loaded => state.store
             case state:entry.Swapped => delivery.uow.enqueue(entry.toQueueEntryRecord)
@@ -733,7 +730,7 @@ class Queue(val router: LocalRouter, val
         if( entry.isLinked ) {
           if( !consumers_keeping_up_historically  ) {
             entry.swap(true)
-          } else if( entry.is_acquired && uow != null) {
+          } else if( entry.as_loaded.is_acquired && persisted) {
             // If the message as dispatched and it's marked to get persisted anyways,
             // then it's ok if it falls out of memory since we won't need to load it again.
             entry.swap(false)
@@ -745,8 +742,8 @@ class Queue(val router: LocalRouter, val
         }
 
         // release the store batch...
-        if (uow != null) {
-          uow.release
+        if (persisted) {
+          queue_delivery.uow.release
           queue_delivery.uow = null
         }
 
@@ -1073,27 +1070,19 @@ class Queue(val router: LocalRouter, val
     override def connection = None
     override def dispatch_queue = Queue.this.dispatch_queue
   }
-
   var dlq_route:DlqProducerRoute = _
-  var dlq_overflow:OverflowSink[(Delivery, (StoreUOW)=>Unit)] = _
-
+  
   def dead_letter(original_uow:StoreUOW, entry:QueueEntry)(removeFunc: (StoreUOW)=>Unit) = {
     assert_executing
     if( config.dlq==null ) {
-      removeFunc(null)
+      removeFunc(original_uow)
     } else {
 
-      def complete(original_delivery:Delivery) = {
-        assert_executing
-        val delivery = original_delivery.copy()
-        delivery.uow = if(delivery.storeKey == -1) {
-          null
-        } else {
-          if( original_uow == null ) {
-            create_uow
-          } else {
-            original_uow.retain()
-            original_uow
+      def complete(delivery:Delivery) = {
+        delivery.uow = original_uow
+        delivery.ack = (result, uow) => {
+          dispatch_queue {
+            removeFunc(uow)
           }
         }
         delivery.expiration=0
@@ -1104,47 +1093,34 @@ class Queue(val router: LocalRouter, val
           router.virtual_host.dispatch_queue {
             val rc = router.connect(dlq_route.addresses, dlq_route, null)
             assert( rc == None ) // Not expecting this to ever fail.
-          }
-
-          dlq_overflow = new OverflowSink[(Delivery, (StoreUOW)=>Unit)](dlq_route.flatMap{ x =>
-            Some(x._1)
-          }) {
-            override protected def onDelivered(value: (Delivery, (StoreUOW) => Unit)) {
-              val (delivery, callback) = value;
-              callback(delivery.uow)
-              if( delivery.uow!=null ) {
-                delivery.uow.release()
-              }
+            dlq_route.dispatch_queue {
+              dlq_route.offer(delivery)
             }
           }
+        } else {
+          dlq_route.offer(delivery)
         }
-        dlq_overflow.offer((delivery, removeFunc))
       }
 
       entry.state match {
         case x:entry.Loaded=>
           if( x.swapping_out ) {
-            x.acquirer = DeadLetterHandler
             x.on_swap_out ::=( ()=> {
-              complete(entry.state match {
-                case state:entry.Swapped=>
-                  state.to_delivery
-                case state:entry.Loaded =>
-                  state.delivery
-                case state => sys.error("Unexpected type: "+state)
-              })
+              complete(entry.state.asInstanceOf[entry.Swapped].to_delivery)
             })
           } else {
-            complete(x.delivery)
+            complete(x.delivery.copy())
           }
         case x:entry.Swapped=>
           complete(x.to_delivery)
         case _ =>
           throw new Exception("Invalid queue entry state, it cannot be DQLed.")
       }
+
     }
   }
-
+  
+  
   def drain_acks = might_unfill {
     val end = System.nanoTime()
     ack_source.getData.foreach {
@@ -1169,7 +1145,7 @@ class Queue(val router: LocalRouter, val
             var limit = dlq_nak_limit
             if( limit>0 && entry.entry.redelivery_count >= limit ) {
               dead_letter(uow, entry.entry) { uow =>
-                entry.remove(uow)
+                entry.ack(uow)
               }
             } else {
               entry.nack

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala?rev=1446566&r1=1446565&r2=1446566&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala Fri Feb 15 13:19:35 2013
@@ -72,7 +72,7 @@ class QueueEntry(val queue:Queue, val se
       state = new Swapped(delivery.storeKey, delivery.storeLocator, delivery.size, delivery.expiration, 0, null, delivery.sender)
     } else {
       queue.producer_swapped_in += delivery
-      state = new Loaded(delivery, delivery.storeKey != -1, queue.producer_swapped_in)
+      state = new Loaded(delivery, false, queue.producer_swapped_in)
     }
     this
   }
@@ -298,7 +298,7 @@ class QueueEntry(val queue:Queue, val se
      * Is the entry acquired by a subscription.
      */
     def is_acquired = acquiring_subscription!=null
-    def acquiring_subscription:Acquirer = null
+    def acquiring_subscription:Subscription = null
 
     /**
      * @returns true if the entry is either swapped or swapping.
@@ -400,7 +400,7 @@ class QueueEntry(val queue:Queue, val se
 
     assert( delivery!=null, "delivery cannot be null")
 
-    var acquirer:Acquirer = _
+    var acquirer:Subscription = _
     override def acquiring_subscription = acquirer
 
     override def memory_space = space
@@ -471,7 +471,6 @@ class QueueEntry(val queue:Queue, val se
 
           // The storeBatch is only set when called from the messages.offer method
           if( delivery.uow!=null ) {
-            assert( delivery.storeKey != -1 )
             if( asap ) {
               delivery.uow.complete_asap
             }
@@ -520,29 +519,22 @@ class QueueEntry(val queue:Queue, val se
       delivery.uow = null
       if( swapping_out ) {
         swapping_out = false
+        space -= delivery
 
         if( store_wrote_to_disk ) {
-
-          space -= delivery
           queue.swap_out_size_counter += size
           queue.swap_out_item_counter += 1
+        }
 
-          state = new Swapped(delivery.storeKey, delivery.storeLocator, size, expiration, redelivery_count, acquirer, sender)
-
-          if( remove_pending ) {
-            state.remove
-          } else {
-
-            if( can_combine_with_prev ) {
-              getPrevious.as_swapped_range.combineNext
-            }
-
-            queue.loaded_items -= 1
-            queue.loaded_size -= size
-          }
-
+        state = new Swapped(delivery.storeKey, delivery.storeLocator, size, expiration, redelivery_count, acquirer, sender)
+        if( can_combine_with_prev ) {
+          getPrevious.as_swapped_range.combineNext
+        }
+        if( remove_pending ) {
+          state.remove
         } else {
-          delivery.storeKey = -1
+          queue.loaded_items -= 1
+          queue.loaded_size -= size
         }
 
         fire_swap_out_watchers
@@ -728,9 +720,7 @@ class QueueEntry(val queue:Queue, val se
    * entry is persisted, it can move into this state.  This state only holds onto the
    * the massage key so that it can reload the message from the store quickly when needed.
    */
-  class Swapped(override val message_key:Long, override val message_locator:AtomicReference[Object], override val size:Int, override val expiration:Long, var _redeliveries:Short, var acquirer:Acquirer, override  val sender:List[DestinationAddress]) extends EntryState {
-
-    assert( message_key!= -1 )
+  class Swapped(override val message_key:Long, override val message_locator:AtomicReference[Object], override val size:Int, override val expiration:Long, var _redeliveries:Short, var acquirer:Subscription, override  val sender:List[DestinationAddress]) extends EntryState {
 
     queue.individual_swapped_items += 1
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1446566&r1=1446565&r2=1446566&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Fri Feb 15 13:19:35 2013
@@ -216,18 +216,8 @@ abstract class DeliveryProducerRoute(rou
   } else {
     null
   }
-  var is_connected = false
 
   def connected() = defer {
-    is_connected = true
-    if( overflow!=null ) {
-      val t = overflow
-      overflow = null
-      _offer(t)
-      if( refiller!=null && !full ) {
-        refiller.run()
-      }
-    }
     on_connected
   }
 
@@ -268,7 +258,6 @@ abstract class DeliveryProducerRoute(rou
       debug("producer route detaching from consumer.")
       x.close
     }
-    is_connected = false
   }
 
   protected def on_connected = {}
@@ -287,89 +276,76 @@ abstract class DeliveryProducerRoute(rou
 
   def full = overflow!=null
 
-  def offer(delivery:Delivery):Boolean = {
+  def offer(delivery:Delivery) = {
     dispatch_queue.assertExecuting()
     if( full ) {
       false
     } else {
-      if (delivery.uow != null) {
-        delivery.uow.retain()
-      }
-      if ( !is_connected ) {
-        overflow = delivery
-      } else {
-        _offer(delivery)
-      }
-      return true
-    }
-  }
+      last_send = Broker.now
 
-  private def _offer(delivery:Delivery):Boolean = {
-    last_send = Broker.now
-
-    // Do we need to store the message if we have a matching consumer?
-    var matching_targets = 0
-    val original_ack = delivery.ack
-    val copy = delivery.copy
-    copy.uow = delivery.uow
-
-    if ( original_ack!=null ) {
-      copy.ack = (result, uow)=> {
-        defer {
-          matching_targets -= 1
-          if ( matching_targets<= 0 && copy.ack!=null ) {
-            copy.ack = null
-            if (delivery.uow != null) {
-              delivery.uow.on_complete {
-                defer {
-                  original_ack(Consumed, null)
+      // Do we need to store the message if we have a matching consumer?
+      var matching_targets = 0
+      val original_ack = delivery.ack
+      val copy = delivery.copy
+
+        if ( original_ack!=null ) {
+        copy.ack = (result, uow)=> {
+          defer {
+            matching_targets -= 1
+            if ( matching_targets<= 0 && copy.ack!=null ) {
+              copy.ack = null
+              if (delivery.uow != null) {
+                delivery.uow.on_complete {
+                  defer {
+                    original_ack(Consumed, null)
+                  }
                 }
+              } else {
+                original_ack(Consumed, null)
               }
-            } else {
-              original_ack(Consumed, null)
             }
           }
         }
       }
-    }
 
-    if(copy.message!=null) {
-      copy.message.retain
-    }
+      if(copy.message!=null) {
+        copy.message.retain
+      }
+      
+      targets.foreach { target=>
 
-    targets.foreach { target=>
+        // only deliver to matching consumers
+        if( target.consumer.matches(copy) ) {
+          matching_targets += 1
+          if ( target.consumer.is_persistent && copy.persistent && store != null) {
 
-      // only deliver to matching consumers
-      if( target.consumer.matches(copy) ) {
-        matching_targets += 1
-        if ( target.consumer.is_persistent && copy.persistent && store != null) {
+            if (copy.uow == null) {
+              copy.uow = store.create_uow
+            }
 
-          if (copy.uow == null) {
-            copy.uow = store.create_uow
+            if( copy.storeKey == -1L ) {
+              copy.storeLocator = new AtomicReference[Object]()
+              copy.storeKey = copy.uow.store(copy.createMessageRecord)
+            }
           }
 
-          if( copy.storeKey == -1L ) {
-            copy.storeLocator = new AtomicReference[Object]()
-            copy.storeKey = copy.uow.store(copy.createMessageRecord)
+          if( !target.offer(copy) ) {
+            overflowSessions ::= target
           }
         }
-
-        if( !target.offer(copy) ) {
-          overflowSessions ::= target
-        }
       }
-    }
 
-    if ( matching_targets == 0 && original_ack!=null ) {
-      original_ack(Consumed, null)
-    }
+      if ( matching_targets == 0 && original_ack!=null ) {
+        original_ack(Consumed, null)
+      }
 
-    if( overflowSessions!=Nil ) {
-      overflow = copy
-    } else {
-      release(copy)
+      if( overflowSessions!=Nil ) {
+        overflow = copy
+      } else {
+        release(copy)
+      }
+      true
     }
-    true
   }
 
 
@@ -383,23 +359,19 @@ abstract class DeliveryProducerRoute(rou
   }
 
   val drainer = ^{
-    if( is_connected ) {
-      if( overflow!=null ) {
-        val original = overflowSessions;
-        overflowSessions = Nil
-        original.foreach { target=>
-          if( !target.offer(overflow) ) {
-            overflowSessions ::= target
-          }
-        }
-        if( overflowSessions==Nil ) {
-          release(overflow)
-          overflow = null
-          if(refiller!=null)
-            refiller.run
+    if( overflow!=null ) {
+      val original = overflowSessions;
+      overflowSessions = Nil
+      original.foreach { target=>
+        if( !target.offer(overflow) ) {
+          overflowSessions ::= target
         }
-      } else if(refiller!=null) {
-        refiller.run
+      }
+      if( overflowSessions==Nil ) {
+        release(overflow)
+        overflow = null
+        if(refiller!=null)
+          refiller.run
       }
     }
   }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala?rev=1446566&r1=1446565&r2=1446566&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala Fri Feb 15 13:19:35 2013
@@ -23,9 +23,6 @@ import org.apache.activemq.apollo.util._
 import org.apache.activemq.apollo.util.list._
 import org.apache.activemq.apollo.dto.QueueConsumerLinkDTO
 
-trait Acquirer
-object DeadLetterHandler extends Acquirer
-
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
@@ -38,7 +35,7 @@ object Subscription extends Log
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class Subscription(val queue:Queue, val consumer:DeliveryConsumer) extends Acquirer with DeliveryProducer with Dispatched with StallCheckSupport {
+class Subscription(val queue:Queue, val consumer:DeliveryConsumer) extends DeliveryProducer with Dispatched with StallCheckSupport {
   import Subscription._
 
   def dispatch_queue = queue.dispatch_queue
@@ -369,18 +366,14 @@ class Subscription(val queue:Queue, val 
     def ack(uow:StoreUOW):Unit = {
       assert_executing
       if(!isLinked) {
-        debug("Unexpected ack: message seq already acked: "+entry.seq)
+        debug("Unexpected ack: message seq allready acked: "+entry.seq)
         return
       }
 
+      val next = entry.getNext
+
       total_ack_count += 1
       total_ack_size += entry.size
-      remove(uow)
-    }
-
-    def remove(uow:StoreUOW):Unit = {
-      assert_executing
-      val next = entry.getNext
       entry.dequeue(uow)
 
       // removes this entry from the acquired list.

Modified: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala?rev=1446566&r1=1446565&r2=1446566&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala Fri Feb 15 13:19:35 2013
@@ -863,16 +863,9 @@ case class MqttSession(host_state:HostSt
     override def send_buffer_size = handler.codec.getReadBufferSize
     override def connection = Some(handler.connection)
     override def dispatch_queue = queue
-
-    var suspended = false
-
-    refiller = ^ {
-      if( suspended ) {
-        suspended = false
-        handler.resume_read
-      }
+    refiller = ^{
+      handler.resume_read
     }
-
   }
 
   def on_mqtt_publish(publish:PUBLISH):Unit = {
@@ -914,7 +907,7 @@ case class MqttSession(host_state:HostSt
     }
   }
 
-  def send_via_route(route:MqttProducerRoute, publish:PUBLISH):Unit = {
+  def send_via_route(route:DeliveryProducerRoute, publish:PUBLISH):Unit = {
     queue.assertExecuting()
 
     def at_least_once_ack(r:DeliveryResult, uow:StoreUOW):Unit = queue {
@@ -959,7 +952,6 @@ case class MqttSession(host_state:HostSt
       route.offer(delivery)
       if( route.full ) {
         // but once it gets full.. suspend to flow control the producer.
-        route.suspended = true
         handler.get.suspend_read("blocked sending to: "+route.overflowSessions.mkString(", "))
       }
 

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1446566&r1=1446565&r2=1446566&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Fri Feb 15 13:19:35 2013
@@ -632,13 +632,8 @@ class OpenwireProtocolHandler extends Pr
 
     override def connection = Some(OpenwireProtocolHandler.this.connection)
     override def dispatch_queue = queue
-    var suspended = false
-
     refiller = ^ {
-      if( suspended ) {
-        suspended = false
-        resume_read
-      }
+      resume_read
     }
   }
 
@@ -650,9 +645,6 @@ class OpenwireProtocolHandler extends Pr
         val addresses = to_destination_dto(msg.getDestination, this)
         val route = OpenwireDeliveryProducerRoute(addresses)
 
-        if( uow!=null ) {
-          uow.retain()
-        }
         // don't process frames until producer is connected...
         suspend_read("connecting producer route")
         host.dispatch_queue {
@@ -668,9 +660,6 @@ class OpenwireProtocolHandler extends Pr
                   send_via_route(route, msg, uow)
                 }
             }
-            if( uow!=null ) {
-              uow.release()
-            }
           }
         }
 
@@ -681,7 +670,7 @@ class OpenwireProtocolHandler extends Pr
     }
   }
 
-  def send_via_route(route:OpenwireDeliveryProducerRoute, message:ActiveMQMessage, uow:StoreUOW) = {
+  def send_via_route(route:DeliveryProducerRoute, message:ActiveMQMessage, uow:StoreUOW) = {
     if( !route.targets.isEmpty ) {
 
       // We may need to add some headers..
@@ -712,7 +701,6 @@ class OpenwireProtocolHandler extends Pr
       if( route.full ) {
         // but once it gets full.. suspend, so that we get more messages
         // until it's not full anymore.
-        route.suspended = true
         suspend_read("blocked destination: "+route.overflowSessions.mkString(", "))
       }
 

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1446566&r1=1446565&r2=1446566&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Fri Feb 15 13:19:35 2013
@@ -1158,13 +1158,10 @@ class StompProtocolHandler extends Proto
     override def connection = Some(StompProtocolHandler.this.connection)
 
     override def dispatch_queue = queue
-    var suspended = false
+
 
     refiller = ^ {
-      if( suspended ) {
-        resume_read
-        suspended = false
-      }
+      resume_read
     }
 
 
@@ -1242,10 +1239,7 @@ class StompProtocolHandler extends Proto
         val trimmed_dest = dest.deepCopy().ascii()
         // create the producer route...
         val route = new StompProducerRoute(trimmed_dest)   // don't process frames until producer is connected...
-        suspend_read("Connecting to destination")
-        if( uow !=null ) {
-          uow.retain()
-        }
+        connection.transport.suspendRead
         host.dispatch_queue {
           val rc = host.router.connect(route.addresses, route, security_context)
           dispatchQueue {
@@ -1260,9 +1254,6 @@ class StompProtocolHandler extends Proto
                   send_via_route(route.addresses, route, frame, uow)
                 }
             }
-            if( uow !=null ) {
-              uow.release()
-            }
           }
         }
 
@@ -1366,7 +1357,7 @@ class StompProtocolHandler extends Proto
     rc
   }
 
-  def send_via_route(addresses: Array[SimpleAddress], route:StompProducerRoute, frame:StompFrame, uow:StoreUOW) = {
+  def send_via_route(addresses: Array[SimpleAddress], route:DeliveryProducerRoute, frame:StompFrame, uow:StoreUOW) = {
     var storeBatch:StoreUOW=null
 
     // User might be asking for ack that we have processed the message..
@@ -1411,7 +1402,6 @@ class StompProtocolHandler extends Proto
       if( route.full ) {
         // but once it gets full.. suspend, so that we get more stomp messages
         // until it's not full anymore.
-        route.suspended = true
         suspend_read("blocked sending to: "+route.overflowSessions.mkString(", "))
       }
 

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/DeadLetterQueueLoadTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/DeadLetterQueueLoadTest.scala?rev=1446566&r1=1446565&r2=1446566&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/DeadLetterQueueLoadTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/DeadLetterQueueLoadTest.scala Fri Feb 15 13:19:35 2013
@@ -26,14 +26,14 @@ class DeadLetterQueueLoadTest extends St
   override def broker_config_uri: String = "xml:classpath:apollo-stomp-bdb.xml"
 
   for (i <- 1 to 16 )
-  test("naker.load."+i) {
+  test("naker."+i) {
     connect("1.1")
     val dlq_client = connect("1.1", new StompClient)
-    subscribe("0", "/queue/nacker.load."+i, "client", false, "", false)
-    subscribe("dlq", "/queue/dlq.nacker.load."+i, "client", false, "", false, c=dlq_client)
+    subscribe("0", "/queue/nacker."+i, "client", false, "", false)
+    subscribe("dlq", "/queue/dlq.nacker."+i, "auto", false, "", false, c=dlq_client)
 
     for( j <- 1 to 1000 ) {
-      async_send("/queue/nacker.load."+i, j)
+      async_send("/queue/nacker."+i, j)
       assert_received(j, "0")(false)
       assert_received(j, "0")(false)
       // It should be sent to the DLQ after the 2nd nak