You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/08/27 22:21:20 UTC

svn commit: r1377823 - in /activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker: Queue.scala QueueEntry.scala

Author: chirino
Date: Mon Aug 27 20:21:20 2012
New Revision: 1377823

URL: http://svn.apache.org/viewvc?rev=1377823&view=rev
Log:
If a queue entry is swapping out when it gets DLQed, wait for it to finish swapping out before moving it to the DLQ.  Fixes some intermittent test failures.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1377823&r1=1377822&r2=1377823&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Mon Aug 27 20:21:20 2012
@@ -1005,43 +1005,41 @@ class Queue(val router: LocalRouter, val
     if( config.dlq==null ) {
       removeFunc(original_uow)
     } else {
-      val delivery:Delivery = entry.state match {
-        case x:entry.Loaded=>
-          x.delivery.copy()
-        case x:entry.Swapped=>
-          x.to_delivery
-        case _ =>
-          throw new Exception("Invalid queue entry state, it cannot be DQLed.")
-      }
-
-      delivery.uow = original_uow
 
-//      delivery.uow = if( tune_persistent ) {
-//        if(original_uow!=null ) {
-//          original_uow
-//        } else {
-//          virtual_host.store.create_uow()
-//        }
-//      } else {
-//        null
-//      }
+      def complete(delivery:Delivery) = {
+        delivery.uow = original_uow
+        delivery.ack = (result, uow) => {
+          removeFunc(uow)
+        }
 
-      delivery.ack = (result, uow) => {
-        removeFunc(uow)
+        if( dql_route==null ) {
+          val dlq = config.dlq.replaceAll(Pattern.quote("*"), id)
+          dql_route = new DlqProducerRoute(Array(SimpleAddress("queue:"+dlq)))
+          router.virtual_host.dispatch_queue {
+            val rc = router.connect(dql_route.addresses, dql_route, null)
+            assert( rc == None ) // Not expecting this to ever fail.
+            dql_route.dispatch_queue {
+              dql_route.offer(delivery)
+            }
+          }
+        } else {
+          dql_route.offer(delivery)
+        }
       }
 
-      if( dql_route==null ) {
-        val dlq = config.dlq.replaceAll(Pattern.quote("*"), id)
-        dql_route = new DlqProducerRoute(Array(SimpleAddress("queue:"+dlq)))
-        router.virtual_host.dispatch_queue {
-          val rc = router.connect(dql_route.addresses, dql_route, null)
-          assert( rc == None ) // Not expecting this to ever fail.
-          dql_route.dispatch_queue {
-            dql_route.offer(delivery)
+      entry.state match {
+        case x:entry.Loaded=>
+          if( x.swapping_out ) {
+            x.on_swap_out ::=( ()=> {
+              complete(entry.state.asInstanceOf[entry.Swapped].to_delivery)
+            })
+          } else {
+            complete(x.delivery.copy())
           }
-        }
-      } else {
-        dql_route.offer(delivery)
+        case x:entry.Swapped=>
+          complete(x.to_delivery)
+        case _ =>
+          throw new Exception("Invalid queue entry state, it cannot be DQLed.")
       }
 
     }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala?rev=1377823&r1=1377822&r2=1377823&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala Mon Aug 27 20:21:20 2012
@@ -469,6 +469,8 @@ class QueueEntry(val queue:Queue, val se
       }
     }
 
+    var on_swap_out = List[()=>Unit]()
+
     def swapped_out(store_wrote_to_disk:Boolean) = {
       assert( state == this )
       storing = false
@@ -493,6 +495,13 @@ class QueueEntry(val queue:Queue, val se
           queue.loaded_items -= 1
           queue.loaded_size -= size
         }
+
+        val on_swap_out_copy = on_swap_out
+        on_swap_out = Nil
+        for ( task <- on_swap_out_copy ) {
+          task()
+        }
+
       } else {
         if( remove_pending ) {
           delivery.message.release