You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/08/27 22:21:20 UTC
svn commit: r1377823 - in
/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker:
Queue.scala QueueEntry.scala
Author: chirino
Date: Mon Aug 27 20:21:20 2012
New Revision: 1377823
URL: http://svn.apache.org/viewvc?rev=1377823&view=rev
Log:
If a queue entry is swapping out when it gets DLQed, wait for it to finish swapping out before moving it to the DLQ. Fixes some intermittent test failures.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1377823&r1=1377822&r2=1377823&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Mon Aug 27 20:21:20 2012
@@ -1005,43 +1005,41 @@ class Queue(val router: LocalRouter, val
if( config.dlq==null ) {
removeFunc(original_uow)
} else {
- val delivery:Delivery = entry.state match {
- case x:entry.Loaded=>
- x.delivery.copy()
- case x:entry.Swapped=>
- x.to_delivery
- case _ =>
- throw new Exception("Invalid queue entry state, it cannot be DQLed.")
- }
-
- delivery.uow = original_uow
-// delivery.uow = if( tune_persistent ) {
-// if(original_uow!=null ) {
-// original_uow
-// } else {
-// virtual_host.store.create_uow()
-// }
-// } else {
-// null
-// }
+ def complete(delivery:Delivery) = {
+ delivery.uow = original_uow
+ delivery.ack = (result, uow) => {
+ removeFunc(uow)
+ }
- delivery.ack = (result, uow) => {
- removeFunc(uow)
+ if( dql_route==null ) {
+ val dlq = config.dlq.replaceAll(Pattern.quote("*"), id)
+ dql_route = new DlqProducerRoute(Array(SimpleAddress("queue:"+dlq)))
+ router.virtual_host.dispatch_queue {
+ val rc = router.connect(dql_route.addresses, dql_route, null)
+ assert( rc == None ) // Not expecting this to ever fail.
+ dql_route.dispatch_queue {
+ dql_route.offer(delivery)
+ }
+ }
+ } else {
+ dql_route.offer(delivery)
+ }
}
- if( dql_route==null ) {
- val dlq = config.dlq.replaceAll(Pattern.quote("*"), id)
- dql_route = new DlqProducerRoute(Array(SimpleAddress("queue:"+dlq)))
- router.virtual_host.dispatch_queue {
- val rc = router.connect(dql_route.addresses, dql_route, null)
- assert( rc == None ) // Not expecting this to ever fail.
- dql_route.dispatch_queue {
- dql_route.offer(delivery)
+ entry.state match {
+ case x:entry.Loaded=>
+ if( x.swapping_out ) {
+ x.on_swap_out ::=( ()=> {
+ complete(entry.state.asInstanceOf[entry.Swapped].to_delivery)
+ })
+ } else {
+ complete(x.delivery.copy())
}
- }
- } else {
- dql_route.offer(delivery)
+ case x:entry.Swapped=>
+ complete(x.to_delivery)
+ case _ =>
+ throw new Exception("Invalid queue entry state, it cannot be DQLed.")
}
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala?rev=1377823&r1=1377822&r2=1377823&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala Mon Aug 27 20:21:20 2012
@@ -469,6 +469,8 @@ class QueueEntry(val queue:Queue, val se
}
}
+ var on_swap_out = List[()=>Unit]()
+
def swapped_out(store_wrote_to_disk:Boolean) = {
assert( state == this )
storing = false
@@ -493,6 +495,13 @@ class QueueEntry(val queue:Queue, val se
queue.loaded_items -= 1
queue.loaded_size -= size
}
+
+ val on_swap_out_copy = on_swap_out
+ on_swap_out = Nil
+ for ( task <- on_swap_out_copy ) {
+ task()
+ }
+
} else {
if( remove_pending ) {
delivery.message.release