You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/02/15 14:19:36 UTC
svn commit: r1446566 - in /activemq/activemq-apollo/trunk:
apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/ apollo-op...
Author: chirino
Date: Fri Feb 15 13:19:35 2013
New Revision: 1446566
URL: http://svn.apache.org/r1446566
Log:
Revert "When messages were being moved to the DLQ they would occasionally get into a bad state. We now handle the case were a swap out gets canceled much better."
Introduces some regressions.
This reverts commit 3da03ce728687d683a9e12249aa730fafdb5075c.
Modified:
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/DeadLetterQueueLoadTest.scala
Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala?rev=1446566&r1=1446565&r2=1446566&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala Fri Feb 15 13:19:35 2013
@@ -787,11 +787,14 @@ class AmqpProtocolHandler extends Protoc
class AmqpProducerRoute(val id:Long, val receiver: Receiver, val addresses: Array[SimpleAddress]) extends DeliveryProducerRoute(host.router) with ProducerSupport {
val key = addresses.toList
+ var is_connected = false
override def send_buffer_size = buffer_size
override def connection = Some(AmqpProtocolHandler.this.connection)
+ override def connected() = is_connected = true
+
override def dispatch_queue = queue
val producer_overflow = new OverflowSink[Delivery](this) {
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1446566&r1=1446565&r2=1446566&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Fri Feb 15 13:19:35 2013
@@ -439,9 +439,8 @@ class Queue(val router: LocalRouter, val
rc.consumer_count = cur.parked.size
rc.is_prefetched = cur.prefetched
rc.state = cur.label
- rc.acquirer = cur.acquiring_subscription match {
- case sub:Subscription => sub.create_link_dto(false)
- case _ => null
+ if( cur.acquiring_subscription != null ) {
+ rc.acquirer = cur.acquiring_subscription.create_link_dto(false)
}
rc
}
@@ -701,12 +700,9 @@ class Queue(val router: LocalRouter, val
queue_delivery.seq = entry.seq
entry.init(queue_delivery)
- val uow = if( tune_persistent ) {
- delivery.uow
- } else {
- null
+ if( tune_persistent ) {
+ queue_delivery.uow = delivery.uow
}
- queue_delivery.uow = uow
entries.addLast(entry)
enqueue_item_counter += 1
@@ -717,7 +713,8 @@ class Queue(val router: LocalRouter, val
enqueue_remaining_take(entry.size)
// Do we need to do a persistent enqueue???
- if (uow != null) {
+ val persisted = queue_delivery.uow != null
+ if (persisted) {
entry.state match {
case state:entry.Loaded => state.store
case state:entry.Swapped => delivery.uow.enqueue(entry.toQueueEntryRecord)
@@ -733,7 +730,7 @@ class Queue(val router: LocalRouter, val
if( entry.isLinked ) {
if( !consumers_keeping_up_historically ) {
entry.swap(true)
- } else if( entry.is_acquired && uow != null) {
+ } else if( entry.as_loaded.is_acquired && persisted) {
// If the message as dispatched and it's marked to get persisted anyways,
// then it's ok if it falls out of memory since we won't need to load it again.
entry.swap(false)
@@ -745,8 +742,8 @@ class Queue(val router: LocalRouter, val
}
// release the store batch...
- if (uow != null) {
- uow.release
+ if (persisted) {
+ queue_delivery.uow.release
queue_delivery.uow = null
}
@@ -1073,27 +1070,19 @@ class Queue(val router: LocalRouter, val
override def connection = None
override def dispatch_queue = Queue.this.dispatch_queue
}
-
var dlq_route:DlqProducerRoute = _
- var dlq_overflow:OverflowSink[(Delivery, (StoreUOW)=>Unit)] = _
-
+
def dead_letter(original_uow:StoreUOW, entry:QueueEntry)(removeFunc: (StoreUOW)=>Unit) = {
assert_executing
if( config.dlq==null ) {
- removeFunc(null)
+ removeFunc(original_uow)
} else {
- def complete(original_delivery:Delivery) = {
- assert_executing
- val delivery = original_delivery.copy()
- delivery.uow = if(delivery.storeKey == -1) {
- null
- } else {
- if( original_uow == null ) {
- create_uow
- } else {
- original_uow.retain()
- original_uow
+ def complete(delivery:Delivery) = {
+ delivery.uow = original_uow
+ delivery.ack = (result, uow) => {
+ dispatch_queue {
+ removeFunc(uow)
}
}
delivery.expiration=0
@@ -1104,47 +1093,34 @@ class Queue(val router: LocalRouter, val
router.virtual_host.dispatch_queue {
val rc = router.connect(dlq_route.addresses, dlq_route, null)
assert( rc == None ) // Not expecting this to ever fail.
- }
-
- dlq_overflow = new OverflowSink[(Delivery, (StoreUOW)=>Unit)](dlq_route.flatMap{ x =>
- Some(x._1)
- }) {
- override protected def onDelivered(value: (Delivery, (StoreUOW) => Unit)) {
- val (delivery, callback) = value;
- callback(delivery.uow)
- if( delivery.uow!=null ) {
- delivery.uow.release()
- }
+ dlq_route.dispatch_queue {
+ dlq_route.offer(delivery)
}
}
+ } else {
+ dlq_route.offer(delivery)
}
- dlq_overflow.offer((delivery, removeFunc))
}
entry.state match {
case x:entry.Loaded=>
if( x.swapping_out ) {
- x.acquirer = DeadLetterHandler
x.on_swap_out ::=( ()=> {
- complete(entry.state match {
- case state:entry.Swapped=>
- state.to_delivery
- case state:entry.Loaded =>
- state.delivery
- case state => sys.error("Unexpected type: "+state)
- })
+ complete(entry.state.asInstanceOf[entry.Swapped].to_delivery)
})
} else {
- complete(x.delivery)
+ complete(x.delivery.copy())
}
case x:entry.Swapped=>
complete(x.to_delivery)
case _ =>
throw new Exception("Invalid queue entry state, it cannot be DQLed.")
}
+
}
}
-
+
+
def drain_acks = might_unfill {
val end = System.nanoTime()
ack_source.getData.foreach {
@@ -1169,7 +1145,7 @@ class Queue(val router: LocalRouter, val
var limit = dlq_nak_limit
if( limit>0 && entry.entry.redelivery_count >= limit ) {
dead_letter(uow, entry.entry) { uow =>
- entry.remove(uow)
+ entry.ack(uow)
}
} else {
entry.nack
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala?rev=1446566&r1=1446565&r2=1446566&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala Fri Feb 15 13:19:35 2013
@@ -72,7 +72,7 @@ class QueueEntry(val queue:Queue, val se
state = new Swapped(delivery.storeKey, delivery.storeLocator, delivery.size, delivery.expiration, 0, null, delivery.sender)
} else {
queue.producer_swapped_in += delivery
- state = new Loaded(delivery, delivery.storeKey != -1, queue.producer_swapped_in)
+ state = new Loaded(delivery, false, queue.producer_swapped_in)
}
this
}
@@ -298,7 +298,7 @@ class QueueEntry(val queue:Queue, val se
* Is the entry acquired by a subscription.
*/
def is_acquired = acquiring_subscription!=null
- def acquiring_subscription:Acquirer = null
+ def acquiring_subscription:Subscription = null
/**
* @returns true if the entry is either swapped or swapping.
@@ -400,7 +400,7 @@ class QueueEntry(val queue:Queue, val se
assert( delivery!=null, "delivery cannot be null")
- var acquirer:Acquirer = _
+ var acquirer:Subscription = _
override def acquiring_subscription = acquirer
override def memory_space = space
@@ -471,7 +471,6 @@ class QueueEntry(val queue:Queue, val se
// The storeBatch is only set when called from the messages.offer method
if( delivery.uow!=null ) {
- assert( delivery.storeKey != -1 )
if( asap ) {
delivery.uow.complete_asap
}
@@ -520,29 +519,22 @@ class QueueEntry(val queue:Queue, val se
delivery.uow = null
if( swapping_out ) {
swapping_out = false
+ space -= delivery
if( store_wrote_to_disk ) {
-
- space -= delivery
queue.swap_out_size_counter += size
queue.swap_out_item_counter += 1
+ }
- state = new Swapped(delivery.storeKey, delivery.storeLocator, size, expiration, redelivery_count, acquirer, sender)
-
- if( remove_pending ) {
- state.remove
- } else {
-
- if( can_combine_with_prev ) {
- getPrevious.as_swapped_range.combineNext
- }
-
- queue.loaded_items -= 1
- queue.loaded_size -= size
- }
-
+ state = new Swapped(delivery.storeKey, delivery.storeLocator, size, expiration, redelivery_count, acquirer, sender)
+ if( can_combine_with_prev ) {
+ getPrevious.as_swapped_range.combineNext
+ }
+ if( remove_pending ) {
+ state.remove
} else {
- delivery.storeKey = -1
+ queue.loaded_items -= 1
+ queue.loaded_size -= size
}
fire_swap_out_watchers
@@ -728,9 +720,7 @@ class QueueEntry(val queue:Queue, val se
* entry is persisted, it can move into this state. This state only holds onto the
* the massage key so that it can reload the message from the store quickly when needed.
*/
- class Swapped(override val message_key:Long, override val message_locator:AtomicReference[Object], override val size:Int, override val expiration:Long, var _redeliveries:Short, var acquirer:Acquirer, override val sender:List[DestinationAddress]) extends EntryState {
-
- assert( message_key!= -1 )
+ class Swapped(override val message_key:Long, override val message_locator:AtomicReference[Object], override val size:Int, override val expiration:Long, var _redeliveries:Short, var acquirer:Subscription, override val sender:List[DestinationAddress]) extends EntryState {
queue.individual_swapped_items += 1
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1446566&r1=1446565&r2=1446566&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Fri Feb 15 13:19:35 2013
@@ -216,18 +216,8 @@ abstract class DeliveryProducerRoute(rou
} else {
null
}
- var is_connected = false
def connected() = defer {
- is_connected = true
- if( overflow!=null ) {
- val t = overflow
- overflow = null
- _offer(t)
- if( refiller!=null && !full ) {
- refiller.run()
- }
- }
on_connected
}
@@ -268,7 +258,6 @@ abstract class DeliveryProducerRoute(rou
debug("producer route detaching from consumer.")
x.close
}
- is_connected = false
}
protected def on_connected = {}
@@ -287,89 +276,76 @@ abstract class DeliveryProducerRoute(rou
def full = overflow!=null
- def offer(delivery:Delivery):Boolean = {
+ def offer(delivery:Delivery) = {
dispatch_queue.assertExecuting()
if( full ) {
false
} else {
- if (delivery.uow != null) {
- delivery.uow.retain()
- }
- if ( !is_connected ) {
- overflow = delivery
- } else {
- _offer(delivery)
- }
- return true
- }
- }
+ last_send = Broker.now
- private def _offer(delivery:Delivery):Boolean = {
- last_send = Broker.now
-
- // Do we need to store the message if we have a matching consumer?
- var matching_targets = 0
- val original_ack = delivery.ack
- val copy = delivery.copy
- copy.uow = delivery.uow
-
- if ( original_ack!=null ) {
- copy.ack = (result, uow)=> {
- defer {
- matching_targets -= 1
- if ( matching_targets<= 0 && copy.ack!=null ) {
- copy.ack = null
- if (delivery.uow != null) {
- delivery.uow.on_complete {
- defer {
- original_ack(Consumed, null)
+ // Do we need to store the message if we have a matching consumer?
+ var matching_targets = 0
+ val original_ack = delivery.ack
+ val copy = delivery.copy
+
+ if ( original_ack!=null ) {
+ copy.ack = (result, uow)=> {
+ defer {
+ matching_targets -= 1
+ if ( matching_targets<= 0 && copy.ack!=null ) {
+ copy.ack = null
+ if (delivery.uow != null) {
+ delivery.uow.on_complete {
+ defer {
+ original_ack(Consumed, null)
+ }
}
+ } else {
+ original_ack(Consumed, null)
}
- } else {
- original_ack(Consumed, null)
}
}
}
}
- }
- if(copy.message!=null) {
- copy.message.retain
- }
+ if(copy.message!=null) {
+ copy.message.retain
+ }
+
+ targets.foreach { target=>
- targets.foreach { target=>
+ // only deliver to matching consumers
+ if( target.consumer.matches(copy) ) {
+ matching_targets += 1
+ if ( target.consumer.is_persistent && copy.persistent && store != null) {
- // only deliver to matching consumers
- if( target.consumer.matches(copy) ) {
- matching_targets += 1
- if ( target.consumer.is_persistent && copy.persistent && store != null) {
+ if (copy.uow == null) {
+ copy.uow = store.create_uow
+ }
- if (copy.uow == null) {
- copy.uow = store.create_uow
+ if( copy.storeKey == -1L ) {
+ copy.storeLocator = new AtomicReference[Object]()
+ copy.storeKey = copy.uow.store(copy.createMessageRecord)
+ }
}
- if( copy.storeKey == -1L ) {
- copy.storeLocator = new AtomicReference[Object]()
- copy.storeKey = copy.uow.store(copy.createMessageRecord)
+ if( !target.offer(copy) ) {
+ overflowSessions ::= target
}
}
-
- if( !target.offer(copy) ) {
- overflowSessions ::= target
- }
}
- }
- if ( matching_targets == 0 && original_ack!=null ) {
- original_ack(Consumed, null)
- }
+ if ( matching_targets == 0 && original_ack!=null ) {
+ original_ack(Consumed, null)
+ }
- if( overflowSessions!=Nil ) {
- overflow = copy
- } else {
- release(copy)
+ if( overflowSessions!=Nil ) {
+ overflow = copy
+ } else {
+ release(copy)
+ }
+ true
}
- true
}
@@ -383,23 +359,19 @@ abstract class DeliveryProducerRoute(rou
}
val drainer = ^{
- if( is_connected ) {
- if( overflow!=null ) {
- val original = overflowSessions;
- overflowSessions = Nil
- original.foreach { target=>
- if( !target.offer(overflow) ) {
- overflowSessions ::= target
- }
- }
- if( overflowSessions==Nil ) {
- release(overflow)
- overflow = null
- if(refiller!=null)
- refiller.run
+ if( overflow!=null ) {
+ val original = overflowSessions;
+ overflowSessions = Nil
+ original.foreach { target=>
+ if( !target.offer(overflow) ) {
+ overflowSessions ::= target
}
- } else if(refiller!=null) {
- refiller.run
+ }
+ if( overflowSessions==Nil ) {
+ release(overflow)
+ overflow = null
+ if(refiller!=null)
+ refiller.run
}
}
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala?rev=1446566&r1=1446565&r2=1446566&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala Fri Feb 15 13:19:35 2013
@@ -23,9 +23,6 @@ import org.apache.activemq.apollo.util._
import org.apache.activemq.apollo.util.list._
import org.apache.activemq.apollo.dto.QueueConsumerLinkDTO
-trait Acquirer
-object DeadLetterHandler extends Acquirer
-
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
@@ -38,7 +35,7 @@ object Subscription extends Log
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class Subscription(val queue:Queue, val consumer:DeliveryConsumer) extends Acquirer with DeliveryProducer with Dispatched with StallCheckSupport {
+class Subscription(val queue:Queue, val consumer:DeliveryConsumer) extends DeliveryProducer with Dispatched with StallCheckSupport {
import Subscription._
def dispatch_queue = queue.dispatch_queue
@@ -369,18 +366,14 @@ class Subscription(val queue:Queue, val
def ack(uow:StoreUOW):Unit = {
assert_executing
if(!isLinked) {
- debug("Unexpected ack: message seq already acked: "+entry.seq)
+ debug("Unexpected ack: message seq allready acked: "+entry.seq)
return
}
+ val next = entry.getNext
+
total_ack_count += 1
total_ack_size += entry.size
- remove(uow)
- }
-
- def remove(uow:StoreUOW):Unit = {
- assert_executing
- val next = entry.getNext
entry.dequeue(uow)
// removes this entry from the acquired list.
Modified: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala?rev=1446566&r1=1446565&r2=1446566&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala Fri Feb 15 13:19:35 2013
@@ -863,16 +863,9 @@ case class MqttSession(host_state:HostSt
override def send_buffer_size = handler.codec.getReadBufferSize
override def connection = Some(handler.connection)
override def dispatch_queue = queue
-
- var suspended = false
-
- refiller = ^ {
- if( suspended ) {
- suspended = false
- handler.resume_read
- }
+ refiller = ^{
+ handler.resume_read
}
-
}
def on_mqtt_publish(publish:PUBLISH):Unit = {
@@ -914,7 +907,7 @@ case class MqttSession(host_state:HostSt
}
}
- def send_via_route(route:MqttProducerRoute, publish:PUBLISH):Unit = {
+ def send_via_route(route:DeliveryProducerRoute, publish:PUBLISH):Unit = {
queue.assertExecuting()
def at_least_once_ack(r:DeliveryResult, uow:StoreUOW):Unit = queue {
@@ -959,7 +952,6 @@ case class MqttSession(host_state:HostSt
route.offer(delivery)
if( route.full ) {
// but once it gets full.. suspend to flow control the producer.
- route.suspended = true
handler.get.suspend_read("blocked sending to: "+route.overflowSessions.mkString(", "))
}
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1446566&r1=1446565&r2=1446566&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Fri Feb 15 13:19:35 2013
@@ -632,13 +632,8 @@ class OpenwireProtocolHandler extends Pr
override def connection = Some(OpenwireProtocolHandler.this.connection)
override def dispatch_queue = queue
- var suspended = false
-
refiller = ^ {
- if( suspended ) {
- suspended = false
- resume_read
- }
+ resume_read
}
}
@@ -650,9 +645,6 @@ class OpenwireProtocolHandler extends Pr
val addresses = to_destination_dto(msg.getDestination, this)
val route = OpenwireDeliveryProducerRoute(addresses)
- if( uow!=null ) {
- uow.retain()
- }
// don't process frames until producer is connected...
suspend_read("connecting producer route")
host.dispatch_queue {
@@ -668,9 +660,6 @@ class OpenwireProtocolHandler extends Pr
send_via_route(route, msg, uow)
}
}
- if( uow!=null ) {
- uow.release()
- }
}
}
@@ -681,7 +670,7 @@ class OpenwireProtocolHandler extends Pr
}
}
- def send_via_route(route:OpenwireDeliveryProducerRoute, message:ActiveMQMessage, uow:StoreUOW) = {
+ def send_via_route(route:DeliveryProducerRoute, message:ActiveMQMessage, uow:StoreUOW) = {
if( !route.targets.isEmpty ) {
// We may need to add some headers..
@@ -712,7 +701,6 @@ class OpenwireProtocolHandler extends Pr
if( route.full ) {
// but once it gets full.. suspend, so that we get more messages
// until it's not full anymore.
- route.suspended = true
suspend_read("blocked destination: "+route.overflowSessions.mkString(", "))
}
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1446566&r1=1446565&r2=1446566&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Fri Feb 15 13:19:35 2013
@@ -1158,13 +1158,10 @@ class StompProtocolHandler extends Proto
override def connection = Some(StompProtocolHandler.this.connection)
override def dispatch_queue = queue
- var suspended = false
+
refiller = ^ {
- if( suspended ) {
- resume_read
- suspended = false
- }
+ resume_read
}
@@ -1242,10 +1239,7 @@ class StompProtocolHandler extends Proto
val trimmed_dest = dest.deepCopy().ascii()
// create the producer route...
val route = new StompProducerRoute(trimmed_dest) // don't process frames until producer is connected...
- suspend_read("Connecting to destination")
- if( uow !=null ) {
- uow.retain()
- }
+ connection.transport.suspendRead
host.dispatch_queue {
val rc = host.router.connect(route.addresses, route, security_context)
dispatchQueue {
@@ -1260,9 +1254,6 @@ class StompProtocolHandler extends Proto
send_via_route(route.addresses, route, frame, uow)
}
}
- if( uow !=null ) {
- uow.release()
- }
}
}
@@ -1366,7 +1357,7 @@ class StompProtocolHandler extends Proto
rc
}
- def send_via_route(addresses: Array[SimpleAddress], route:StompProducerRoute, frame:StompFrame, uow:StoreUOW) = {
+ def send_via_route(addresses: Array[SimpleAddress], route:DeliveryProducerRoute, frame:StompFrame, uow:StoreUOW) = {
var storeBatch:StoreUOW=null
// User might be asking for ack that we have processed the message..
@@ -1411,7 +1402,6 @@ class StompProtocolHandler extends Proto
if( route.full ) {
// but once it gets full.. suspend, so that we get more stomp messages
// until it's not full anymore.
- route.suspended = true
suspend_read("blocked sending to: "+route.overflowSessions.mkString(", "))
}
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/DeadLetterQueueLoadTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/DeadLetterQueueLoadTest.scala?rev=1446566&r1=1446565&r2=1446566&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/DeadLetterQueueLoadTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/DeadLetterQueueLoadTest.scala Fri Feb 15 13:19:35 2013
@@ -26,14 +26,14 @@ class DeadLetterQueueLoadTest extends St
override def broker_config_uri: String = "xml:classpath:apollo-stomp-bdb.xml"
for (i <- 1 to 16 )
- test("naker.load."+i) {
+ test("naker."+i) {
connect("1.1")
val dlq_client = connect("1.1", new StompClient)
- subscribe("0", "/queue/nacker.load."+i, "client", false, "", false)
- subscribe("dlq", "/queue/dlq.nacker.load."+i, "client", false, "", false, c=dlq_client)
+ subscribe("0", "/queue/nacker."+i, "client", false, "", false)
+ subscribe("dlq", "/queue/dlq.nacker."+i, "auto", false, "", false, c=dlq_client)
for( j <- 1 to 1000 ) {
- async_send("/queue/nacker.load."+i, j)
+ async_send("/queue/nacker."+i, j)
assert_received(j, "0")(false)
assert_received(j, "0")(false)
// It should be sent to the DLQ after the 2nd nak