You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/02/15 00:31:35 UTC
svn commit: r1446395 - in /activemq/activemq-apollo/trunk:
apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/ apollo-op...
Author: chirino
Date: Thu Feb 14 23:31:34 2013
New Revision: 1446395
URL: http://svn.apache.org/r1446395
Log:
When messages were being moved to the DLQ they would occasionally get into a bad state. We now handle the case were a swap out gets canceled much better.
Modified:
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/DeadLetterQueueLoadTest.scala
Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala?rev=1446395&r1=1446394&r2=1446395&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala Thu Feb 14 23:31:34 2013
@@ -787,14 +787,11 @@ class AmqpProtocolHandler extends Protoc
class AmqpProducerRoute(val id:Long, val receiver: Receiver, val addresses: Array[SimpleAddress]) extends DeliveryProducerRoute(host.router) with ProducerSupport {
val key = addresses.toList
- var is_connected = false
override def send_buffer_size = buffer_size
override def connection = Some(AmqpProtocolHandler.this.connection)
- override def connected() = is_connected = true
-
override def dispatch_queue = queue
val producer_overflow = new OverflowSink[Delivery](this) {
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1446395&r1=1446394&r2=1446395&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Thu Feb 14 23:31:34 2013
@@ -439,8 +439,9 @@ class Queue(val router: LocalRouter, val
rc.consumer_count = cur.parked.size
rc.is_prefetched = cur.prefetched
rc.state = cur.label
- if( cur.acquiring_subscription != null ) {
- rc.acquirer = cur.acquiring_subscription.create_link_dto(false)
+ rc.acquirer = cur.acquiring_subscription match {
+ case sub:Subscription => sub.create_link_dto(false)
+ case _ => null
}
rc
}
@@ -700,9 +701,12 @@ class Queue(val router: LocalRouter, val
queue_delivery.seq = entry.seq
entry.init(queue_delivery)
- if( tune_persistent ) {
- queue_delivery.uow = delivery.uow
+ val uow = if( tune_persistent ) {
+ delivery.uow
+ } else {
+ null
}
+ queue_delivery.uow = uow
entries.addLast(entry)
enqueue_item_counter += 1
@@ -713,8 +717,7 @@ class Queue(val router: LocalRouter, val
enqueue_remaining_take(entry.size)
// Do we need to do a persistent enqueue???
- val persisted = queue_delivery.uow != null
- if (persisted) {
+ if (uow != null) {
entry.state match {
case state:entry.Loaded => state.store
case state:entry.Swapped => delivery.uow.enqueue(entry.toQueueEntryRecord)
@@ -730,7 +733,7 @@ class Queue(val router: LocalRouter, val
if( entry.isLinked ) {
if( !consumers_keeping_up_historically ) {
entry.swap(true)
- } else if( entry.as_loaded.is_acquired && persisted) {
+ } else if( entry.is_acquired && uow != null) {
// If the message as dispatched and it's marked to get persisted anyways,
// then it's ok if it falls out of memory since we won't need to load it again.
entry.swap(false)
@@ -742,8 +745,8 @@ class Queue(val router: LocalRouter, val
}
// release the store batch...
- if (persisted) {
- queue_delivery.uow.release
+ if (uow != null) {
+ uow.release
queue_delivery.uow = null
}
@@ -1070,19 +1073,27 @@ class Queue(val router: LocalRouter, val
override def connection = None
override def dispatch_queue = Queue.this.dispatch_queue
}
+
var dlq_route:DlqProducerRoute = _
-
+ var dlq_overflow:OverflowSink[(Delivery, (StoreUOW)=>Unit)] = _
+
def dead_letter(original_uow:StoreUOW, entry:QueueEntry)(removeFunc: (StoreUOW)=>Unit) = {
assert_executing
if( config.dlq==null ) {
- removeFunc(original_uow)
+ removeFunc(null)
} else {
- def complete(delivery:Delivery) = {
- delivery.uow = original_uow
- delivery.ack = (result, uow) => {
- dispatch_queue {
- removeFunc(uow)
+ def complete(original_delivery:Delivery) = {
+ assert_executing
+ val delivery = original_delivery.copy()
+ delivery.uow = if(delivery.storeKey == -1) {
+ null
+ } else {
+ if( original_uow == null ) {
+ create_uow
+ } else {
+ original_uow.retain()
+ original_uow
}
}
delivery.expiration=0
@@ -1093,34 +1104,47 @@ class Queue(val router: LocalRouter, val
router.virtual_host.dispatch_queue {
val rc = router.connect(dlq_route.addresses, dlq_route, null)
assert( rc == None ) // Not expecting this to ever fail.
- dlq_route.dispatch_queue {
- dlq_route.offer(delivery)
+ }
+
+ dlq_overflow = new OverflowSink[(Delivery, (StoreUOW)=>Unit)](dlq_route.flatMap{ x =>
+ Some(x._1)
+ }) {
+ override protected def onDelivered(value: (Delivery, (StoreUOW) => Unit)) {
+ val (delivery, callback) = value;
+ callback(delivery.uow)
+ if( delivery.uow!=null ) {
+ delivery.uow.release()
+ }
}
}
- } else {
- dlq_route.offer(delivery)
}
+ dlq_overflow.offer((delivery, removeFunc))
}
entry.state match {
case x:entry.Loaded=>
if( x.swapping_out ) {
+ x.acquirer = DeadLetterHandler
x.on_swap_out ::=( ()=> {
- complete(entry.state.asInstanceOf[entry.Swapped].to_delivery)
+ complete(entry.state match {
+ case state:entry.Swapped=>
+ state.to_delivery
+ case state:entry.Loaded =>
+ state.delivery
+ case state => sys.error("Unexpected type: "+state)
+ })
})
} else {
- complete(x.delivery.copy())
+ complete(x.delivery)
}
case x:entry.Swapped=>
complete(x.to_delivery)
case _ =>
throw new Exception("Invalid queue entry state, it cannot be DQLed.")
}
-
}
}
-
-
+
def drain_acks = might_unfill {
val end = System.nanoTime()
ack_source.getData.foreach {
@@ -1145,7 +1169,7 @@ class Queue(val router: LocalRouter, val
var limit = dlq_nak_limit
if( limit>0 && entry.entry.redelivery_count >= limit ) {
dead_letter(uow, entry.entry) { uow =>
- entry.ack(uow)
+ entry.remove(uow)
}
} else {
entry.nack
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala?rev=1446395&r1=1446394&r2=1446395&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala Thu Feb 14 23:31:34 2013
@@ -72,7 +72,7 @@ class QueueEntry(val queue:Queue, val se
state = new Swapped(delivery.storeKey, delivery.storeLocator, delivery.size, delivery.expiration, 0, null, delivery.sender)
} else {
queue.producer_swapped_in += delivery
- state = new Loaded(delivery, false, queue.producer_swapped_in)
+ state = new Loaded(delivery, delivery.storeKey != -1, queue.producer_swapped_in)
}
this
}
@@ -298,7 +298,7 @@ class QueueEntry(val queue:Queue, val se
* Is the entry acquired by a subscription.
*/
def is_acquired = acquiring_subscription!=null
- def acquiring_subscription:Subscription = null
+ def acquiring_subscription:Acquirer = null
/**
* @returns true if the entry is either swapped or swapping.
@@ -400,7 +400,7 @@ class QueueEntry(val queue:Queue, val se
assert( delivery!=null, "delivery cannot be null")
- var acquirer:Subscription = _
+ var acquirer:Acquirer = _
override def acquiring_subscription = acquirer
override def memory_space = space
@@ -471,6 +471,7 @@ class QueueEntry(val queue:Queue, val se
// The storeBatch is only set when called from the messages.offer method
if( delivery.uow!=null ) {
+ assert( delivery.storeKey != -1 )
if( asap ) {
delivery.uow.complete_asap
}
@@ -519,22 +520,29 @@ class QueueEntry(val queue:Queue, val se
delivery.uow = null
if( swapping_out ) {
swapping_out = false
- space -= delivery
if( store_wrote_to_disk ) {
+
+ space -= delivery
queue.swap_out_size_counter += size
queue.swap_out_item_counter += 1
- }
- state = new Swapped(delivery.storeKey, delivery.storeLocator, size, expiration, redelivery_count, acquirer, sender)
- if( can_combine_with_prev ) {
- getPrevious.as_swapped_range.combineNext
- }
- if( remove_pending ) {
- state.remove
+ state = new Swapped(delivery.storeKey, delivery.storeLocator, size, expiration, redelivery_count, acquirer, sender)
+
+ if( remove_pending ) {
+ state.remove
+ } else {
+
+ if( can_combine_with_prev ) {
+ getPrevious.as_swapped_range.combineNext
+ }
+
+ queue.loaded_items -= 1
+ queue.loaded_size -= size
+ }
+
} else {
- queue.loaded_items -= 1
- queue.loaded_size -= size
+ delivery.storeKey = -1
}
fire_swap_out_watchers
@@ -720,7 +728,9 @@ class QueueEntry(val queue:Queue, val se
* entry is persisted, it can move into this state. This state only holds onto the
* the massage key so that it can reload the message from the store quickly when needed.
*/
- class Swapped(override val message_key:Long, override val message_locator:AtomicReference[Object], override val size:Int, override val expiration:Long, var _redeliveries:Short, var acquirer:Subscription, override val sender:List[DestinationAddress]) extends EntryState {
+ class Swapped(override val message_key:Long, override val message_locator:AtomicReference[Object], override val size:Int, override val expiration:Long, var _redeliveries:Short, var acquirer:Acquirer, override val sender:List[DestinationAddress]) extends EntryState {
+
+ assert( message_key!= -1 )
queue.individual_swapped_items += 1
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1446395&r1=1446394&r2=1446395&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Thu Feb 14 23:31:34 2013
@@ -216,8 +216,18 @@ abstract class DeliveryProducerRoute(rou
} else {
null
}
+ var is_connected = false
def connected() = defer {
+ is_connected = true
+ if( overflow!=null ) {
+ val t = overflow
+ overflow = null
+ _offer(t)
+ if( refiller!=null && !full ) {
+ refiller.run()
+ }
+ }
on_connected
}
@@ -258,6 +268,7 @@ abstract class DeliveryProducerRoute(rou
debug("producer route detaching from consumer.")
x.close
}
+ is_connected = false
}
protected def on_connected = {}
@@ -276,76 +287,89 @@ abstract class DeliveryProducerRoute(rou
def full = overflow!=null
- def offer(delivery:Delivery) = {
+ def offer(delivery:Delivery):Boolean = {
dispatch_queue.assertExecuting()
if( full ) {
false
} else {
- last_send = Broker.now
+ if (delivery.uow != null) {
+ delivery.uow.retain()
+ }
+ if ( !is_connected ) {
+ overflow = delivery
+ } else {
+ _offer(delivery)
+ }
+ return true
+ }
+ }
- // Do we need to store the message if we have a matching consumer?
- var matching_targets = 0
- val original_ack = delivery.ack
- val copy = delivery.copy
-
- if ( original_ack!=null ) {
- copy.ack = (result, uow)=> {
- defer {
- matching_targets -= 1
- if ( matching_targets<= 0 && copy.ack!=null ) {
- copy.ack = null
- if (delivery.uow != null) {
- delivery.uow.on_complete {
- defer {
- original_ack(Consumed, null)
- }
+ private def _offer(delivery:Delivery):Boolean = {
+ last_send = Broker.now
+
+ // Do we need to store the message if we have a matching consumer?
+ var matching_targets = 0
+ val original_ack = delivery.ack
+ val copy = delivery.copy
+ copy.uow = delivery.uow
+
+ if ( original_ack!=null ) {
+ copy.ack = (result, uow)=> {
+ defer {
+ matching_targets -= 1
+ if ( matching_targets<= 0 && copy.ack!=null ) {
+ copy.ack = null
+ if (delivery.uow != null) {
+ delivery.uow.on_complete {
+ defer {
+ original_ack(Consumed, null)
}
- } else {
- original_ack(Consumed, null)
}
+ } else {
+ original_ack(Consumed, null)
}
}
}
}
+ }
- if(copy.message!=null) {
- copy.message.retain
- }
-
- targets.foreach { target=>
+ if(copy.message!=null) {
+ copy.message.retain
+ }
- // only deliver to matching consumers
- if( target.consumer.matches(copy) ) {
- matching_targets += 1
- if ( target.consumer.is_persistent && copy.persistent && store != null) {
+ targets.foreach { target=>
- if (copy.uow == null) {
- copy.uow = store.create_uow
- }
+ // only deliver to matching consumers
+ if( target.consumer.matches(copy) ) {
+ matching_targets += 1
+ if ( target.consumer.is_persistent && copy.persistent && store != null) {
- if( copy.storeKey == -1L ) {
- copy.storeLocator = new AtomicReference[Object]()
- copy.storeKey = copy.uow.store(copy.createMessageRecord)
- }
+ if (copy.uow == null) {
+ copy.uow = store.create_uow
}
- if( !target.offer(copy) ) {
- overflowSessions ::= target
+ if( copy.storeKey == -1L ) {
+ copy.storeLocator = new AtomicReference[Object]()
+ copy.storeKey = copy.uow.store(copy.createMessageRecord)
}
}
- }
- if ( matching_targets == 0 && original_ack!=null ) {
- original_ack(Consumed, null)
+ if( !target.offer(copy) ) {
+ overflowSessions ::= target
+ }
}
+ }
- if( overflowSessions!=Nil ) {
- overflow = copy
- } else {
- release(copy)
- }
- true
+ if ( matching_targets == 0 && original_ack!=null ) {
+ original_ack(Consumed, null)
}
+
+ if( overflowSessions!=Nil ) {
+ overflow = copy
+ } else {
+ release(copy)
+ }
+ true
}
@@ -359,19 +383,23 @@ abstract class DeliveryProducerRoute(rou
}
val drainer = ^{
- if( overflow!=null ) {
- val original = overflowSessions;
- overflowSessions = Nil
- original.foreach { target=>
- if( !target.offer(overflow) ) {
- overflowSessions ::= target
+ if( is_connected ) {
+ if( overflow!=null ) {
+ val original = overflowSessions;
+ overflowSessions = Nil
+ original.foreach { target=>
+ if( !target.offer(overflow) ) {
+ overflowSessions ::= target
+ }
}
- }
- if( overflowSessions==Nil ) {
- release(overflow)
- overflow = null
- if(refiller!=null)
- refiller.run
+ if( overflowSessions==Nil ) {
+ release(overflow)
+ overflow = null
+ if(refiller!=null)
+ refiller.run
+ }
+ } else if(refiller!=null) {
+ refiller.run
}
}
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala?rev=1446395&r1=1446394&r2=1446395&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala Thu Feb 14 23:31:34 2013
@@ -23,6 +23,9 @@ import org.apache.activemq.apollo.util._
import org.apache.activemq.apollo.util.list._
import org.apache.activemq.apollo.dto.QueueConsumerLinkDTO
+trait Acquirer
+object DeadLetterHandler extends Acquirer
+
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
@@ -35,7 +38,7 @@ object Subscription extends Log
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class Subscription(val queue:Queue, val consumer:DeliveryConsumer) extends DeliveryProducer with Dispatched with StallCheckSupport {
+class Subscription(val queue:Queue, val consumer:DeliveryConsumer) extends Acquirer with DeliveryProducer with Dispatched with StallCheckSupport {
import Subscription._
def dispatch_queue = queue.dispatch_queue
@@ -366,14 +369,18 @@ class Subscription(val queue:Queue, val
def ack(uow:StoreUOW):Unit = {
assert_executing
if(!isLinked) {
- debug("Unexpected ack: message seq allready acked: "+entry.seq)
+ debug("Unexpected ack: message seq already acked: "+entry.seq)
return
}
- val next = entry.getNext
-
total_ack_count += 1
total_ack_size += entry.size
+ remove(uow)
+ }
+
+ def remove(uow:StoreUOW):Unit = {
+ assert_executing
+ val next = entry.getNext
entry.dequeue(uow)
// removes this entry from the acquired list.
Modified: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala?rev=1446395&r1=1446394&r2=1446395&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala Thu Feb 14 23:31:34 2013
@@ -863,9 +863,16 @@ case class MqttSession(host_state:HostSt
override def send_buffer_size = handler.codec.getReadBufferSize
override def connection = Some(handler.connection)
override def dispatch_queue = queue
- refiller = ^{
- handler.resume_read
+
+ var suspended = false
+
+ refiller = ^ {
+ if( suspended ) {
+ suspended = false
+ handler.resume_read
+ }
}
+
}
def on_mqtt_publish(publish:PUBLISH):Unit = {
@@ -907,7 +914,7 @@ case class MqttSession(host_state:HostSt
}
}
- def send_via_route(route:DeliveryProducerRoute, publish:PUBLISH):Unit = {
+ def send_via_route(route:MqttProducerRoute, publish:PUBLISH):Unit = {
queue.assertExecuting()
def at_least_once_ack(r:DeliveryResult, uow:StoreUOW):Unit = queue {
@@ -952,6 +959,7 @@ case class MqttSession(host_state:HostSt
route.offer(delivery)
if( route.full ) {
// but once it gets full.. suspend to flow control the producer.
+ route.suspended = true
handler.get.suspend_read("blocked sending to: "+route.overflowSessions.mkString(", "))
}
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1446395&r1=1446394&r2=1446395&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Thu Feb 14 23:31:34 2013
@@ -632,8 +632,13 @@ class OpenwireProtocolHandler extends Pr
override def connection = Some(OpenwireProtocolHandler.this.connection)
override def dispatch_queue = queue
+ var suspended = false
+
refiller = ^ {
- resume_read
+ if( suspended ) {
+ suspended = false
+ resume_read
+ }
}
}
@@ -645,6 +650,9 @@ class OpenwireProtocolHandler extends Pr
val addresses = to_destination_dto(msg.getDestination, this)
val route = OpenwireDeliveryProducerRoute(addresses)
+ if( uow!=null ) {
+ uow.retain()
+ }
// don't process frames until producer is connected...
suspend_read("connecting producer route")
host.dispatch_queue {
@@ -660,6 +668,9 @@ class OpenwireProtocolHandler extends Pr
send_via_route(route, msg, uow)
}
}
+ if( uow!=null ) {
+ uow.release()
+ }
}
}
@@ -670,7 +681,7 @@ class OpenwireProtocolHandler extends Pr
}
}
- def send_via_route(route:DeliveryProducerRoute, message:ActiveMQMessage, uow:StoreUOW) = {
+ def send_via_route(route:OpenwireDeliveryProducerRoute, message:ActiveMQMessage, uow:StoreUOW) = {
if( !route.targets.isEmpty ) {
// We may need to add some headers..
@@ -701,6 +712,7 @@ class OpenwireProtocolHandler extends Pr
if( route.full ) {
// but once it gets full.. suspend, so that we get more messages
// until it's not full anymore.
+ route.suspended = true
suspend_read("blocked destination: "+route.overflowSessions.mkString(", "))
}
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1446395&r1=1446394&r2=1446395&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Thu Feb 14 23:31:34 2013
@@ -1158,10 +1158,13 @@ class StompProtocolHandler extends Proto
override def connection = Some(StompProtocolHandler.this.connection)
override def dispatch_queue = queue
-
+ var suspended = false
refiller = ^ {
- resume_read
+ if( suspended ) {
+ resume_read
+ suspended = false
+ }
}
@@ -1239,7 +1242,10 @@ class StompProtocolHandler extends Proto
val trimmed_dest = dest.deepCopy().ascii()
// create the producer route...
val route = new StompProducerRoute(trimmed_dest) // don't process frames until producer is connected...
- connection.transport.suspendRead
+ suspend_read("Connecting to destination")
+ if( uow !=null ) {
+ uow.retain()
+ }
host.dispatch_queue {
val rc = host.router.connect(route.addresses, route, security_context)
dispatchQueue {
@@ -1254,6 +1260,9 @@ class StompProtocolHandler extends Proto
send_via_route(route.addresses, route, frame, uow)
}
}
+ if( uow !=null ) {
+ uow.release()
+ }
}
}
@@ -1357,7 +1366,7 @@ class StompProtocolHandler extends Proto
rc
}
- def send_via_route(addresses: Array[SimpleAddress], route:DeliveryProducerRoute, frame:StompFrame, uow:StoreUOW) = {
+ def send_via_route(addresses: Array[SimpleAddress], route:StompProducerRoute, frame:StompFrame, uow:StoreUOW) = {
var storeBatch:StoreUOW=null
// User might be asking for ack that we have processed the message..
@@ -1402,6 +1411,7 @@ class StompProtocolHandler extends Proto
if( route.full ) {
// but once it gets full.. suspend, so that we get more stomp messages
// until it's not full anymore.
+ route.suspended = true
suspend_read("blocked sending to: "+route.overflowSessions.mkString(", "))
}
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/DeadLetterQueueLoadTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/DeadLetterQueueLoadTest.scala?rev=1446395&r1=1446394&r2=1446395&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/DeadLetterQueueLoadTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/DeadLetterQueueLoadTest.scala Thu Feb 14 23:31:34 2013
@@ -26,14 +26,14 @@ class DeadLetterQueueLoadTest extends St
override def broker_config_uri: String = "xml:classpath:apollo-stomp-bdb.xml"
for (i <- 1 to 16 )
- test("naker."+i) {
+ test("naker.load."+i) {
connect("1.1")
val dlq_client = connect("1.1", new StompClient)
- subscribe("0", "/queue/nacker."+i, "client", false, "", false)
- subscribe("dlq", "/queue/dlq.nacker."+i, "auto", false, "", false, c=dlq_client)
+ subscribe("0", "/queue/nacker.load."+i, "client", false, "", false)
+ subscribe("dlq", "/queue/dlq.nacker.load."+i, "client", false, "", false, c=dlq_client)
for( j <- 1 to 1000 ) {
- async_send("/queue/nacker."+i, j)
+ async_send("/queue/nacker.load."+i, j)
assert_received(j, "0")(false)
assert_received(j, "0")(false)
// It should be sent to the DLQ after the 2nd nak