You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/01/09 20:14:21 UTC

svn commit: r1431014 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ apollo-stomp/src/test/resources/ apollo-stomp/src/test/scala/org/apache/...

Author: chirino
Date: Wed Jan  9 19:14:21 2013
New Revision: 1431014

URL: http://svn.apache.org/viewvc?rev=1431014&view=rev
Log:
Implements APLO-234 - Expired messages should be sent to DLQ

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueSettingsDTO.java
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-bdb.xml
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
    activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1431014&r1=1431013&r2=1431014&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jan  9 19:14:21 2013
@@ -233,6 +233,7 @@ class Queue(val router: LocalRouter, val
   var full_policy:FullDropPolicy = Block
 
   def dlq_nak_limit = OptionSupport(config.nak_limit).getOrElse(0)
+  def dlq_expired = OptionSupport(config.dlq_expired).getOrElse(false)
 
   def message_group_graceful_handoff = OptionSupport(config.message_group_graceful_handoff).getOrElse(true)
 
@@ -541,9 +542,9 @@ class Queue(val router: LocalRouter, val
       sub.close()
     }
 
-    if( dql_route!=null ) {
-      val route = dql_route
-      dql_route = null
+    if( dlq_route!=null ) {
+      val route = dlq_route
+      dlq_route = null
       virtual_host.dispatch_queue {
         router.disconnect(route.addresses, route)
       }
@@ -582,6 +583,8 @@ class Queue(val router: LocalRouter, val
 
 
   def is_topic_queue = resource_kind eq TopicQueueKind
+  def create_uow:StoreUOW = if(virtual_host.store==null) null else virtual_host.store.create_uow
+  def create_uow(uow:StoreUOW):StoreUOW = if(uow==null) create_uow else uow
 
   object messages extends Sink[(Session[Delivery], Delivery)] {
     def stall_check = {}
@@ -744,10 +747,20 @@ class Queue(val router: LocalRouter, val
     }
   }
 
-  def expired(entry:QueueEntry):Unit = {
-    expired_ts = now
-    expired_item_counter += 1
-    expired_size_counter += entry.size
+  def expired(uow:StoreUOW, entry:QueueEntry)(func: =>Unit):Unit = {
+    if( dlq_expired ) {
+      dead_letter(uow, entry) { uow =>
+        expired_ts = now
+        expired_item_counter += 1
+        expired_size_counter += entry.size
+        func
+      }
+    } else {
+      expired_ts = now
+      expired_item_counter += 1
+      expired_size_counter += entry.size
+      func
+    }
   }
 
   def display_stats: Unit = {
@@ -812,21 +825,25 @@ class Queue(val router: LocalRouter, val
           case x:QueueEntry#SwappedRange =>
             // load the range to expire the messages in it.
             cur.load(null)
-          case x:QueueEntry#Swapped =>
+          case state:QueueEntry#Swapped =>
             // remove the expired message if it has not been
             // acquired.
-            if( !x.is_acquired ) {
-              expired(cur)
-              cur.dequeue(null)
-              x.remove
+            if( !state.is_acquired ) {
+              val uow = create_uow
+              cur.dequeue(uow)
+              expired(uow, cur) {
+                state.remove
+              }
             }
-          case x:QueueEntry#Loaded =>
+          case state:QueueEntry#Loaded =>
             // remove the expired message if it has not been
             // acquired.
-            if( !x.is_acquired ) {
-              expired(cur)
-              cur.dequeue(null)
-              x.remove
+            if( !state.is_acquired ) {
+              val uow = create_uow
+              cur.dequeue(uow)
+              expired(uow, cur) {
+                state.remove
+              }
             }
           case _ =>
         }
@@ -1036,10 +1053,10 @@ class Queue(val router: LocalRouter, val
     override def connection = None
     override def dispatch_queue = Queue.this.dispatch_queue
   }
-  var dql_route:DlqProducerRoute = _
+  var dlq_route:DlqProducerRoute = _
   
   def dead_letter(original_uow:StoreUOW, entry:QueueEntry)(removeFunc: (StoreUOW)=>Unit) = {
-
+    assert_executing
     if( config.dlq==null ) {
       removeFunc(original_uow)
     } else {
@@ -1047,21 +1064,24 @@ class Queue(val router: LocalRouter, val
       def complete(delivery:Delivery) = {
         delivery.uow = original_uow
         delivery.ack = (result, uow) => {
-          removeFunc(uow)
+          dispatch_queue {
+            removeFunc(uow)
+          }
         }
+        delivery.expiration=0
 
-        if( dql_route==null ) {
+        if( dlq_route==null ) {
           val dlq = config.dlq.replaceAll(Pattern.quote("*"), id)
-          dql_route = new DlqProducerRoute(Array(SimpleAddress("queue:"+dlq)))
+          dlq_route = new DlqProducerRoute(Array(SimpleAddress("queue:"+dlq)))
           router.virtual_host.dispatch_queue {
-            val rc = router.connect(dql_route.addresses, dql_route, null)
+            val rc = router.connect(dlq_route.addresses, dlq_route, null)
             assert( rc == None ) // Not expecting this to ever fail.
-            dql_route.dispatch_queue {
-              dql_route.offer(delivery)
+            dlq_route.dispatch_queue {
+              dlq_route.offer(delivery)
             }
           }
         } else {
-          dql_route.offer(delivery)
+          dlq_route.offer(delivery)
         }
       }
 
@@ -1110,9 +1130,7 @@ class Queue(val router: LocalRouter, val
             var limit = dlq_nak_limit
             if( limit>0 && entry.entry.redelivery_count >= limit ) {
               dead_letter(uow, entry.entry) { uow =>
-                dispatch_queue {
-                  entry.ack(uow)
-                }
+                entry.ack(uow)
               }
             } else {
               entry.nack

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala?rev=1431014&r1=1431013&r2=1431014&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala Wed Jan  9 19:14:21 2013
@@ -202,14 +202,10 @@ class QueueEntry(val queue:Queue, val se
   def dequeue(uow: StoreUOW) = {
     if ( queued ) {
       if (messageKey != -1) {
-        val storeBatch = if( uow == null ) {
-          queue.virtual_host.store.create_uow
-        } else {
-          uow
-        }
-        storeBatch.dequeue(toQueueEntryRecord)
+        val actual_uow = queue.create_uow(uow)
+        actual_uow.dequeue(toQueueEntryRecord)
         if( uow == null ) {
-          storeBatch.release
+          actual_uow.release
         }
       }
       queue.dequeue_item_counter += 1
@@ -480,8 +476,8 @@ class QueueEntry(val queue:Queue, val se
             if( !storing ) {
               assert( delivery.storeKey == -1 )
 
-              delivery.uow = queue.virtual_host.store.create_uow
-              val uow = delivery.uow
+              val uow = queue.create_uow
+              delivery.uow = uow
               delivery.storeLocator = new AtomicReference[Object]()
               delivery.storeKey = uow.store(delivery.createMessageRecord )
               store
@@ -572,9 +568,11 @@ class QueueEntry(val queue:Queue, val se
       queue.assert_executing
 
       if( !is_acquired && expiration != 0 && expiration <= queue.now ) {
-        queue.expired(entry)
-        entry.dequeue(null)
-        remove
+        val uow = queue.create_uow
+        entry.dequeue(uow)
+        queue.expired(uow, entry) {
+          remove
+        }
         return true
       }
 
@@ -831,9 +829,11 @@ class QueueEntry(val queue:Queue, val se
       queue.assert_executing
 
       if( !is_acquired && expiration != 0 && expiration <= queue.now ) {
-        queue.expired(entry)
-        entry.dequeue(null)
-        remove
+        val uow = queue.create_uow
+        entry.dequeue(uow)
+        queue.expired(uow, entry) {
+          remove
+        }
         return true
       }
 

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueSettingsDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueSettingsDTO.java?rev=1431014&r1=1431013&r2=1431014&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueSettingsDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueSettingsDTO.java Wed Jan  9 19:14:21 2013
@@ -163,6 +163,12 @@ public class QueueSettingsDTO {
     public Integer nak_limit;
 
     /**
+     * Should expired messages be sent to the dead letter queue?  Defaults to false.
+     */
+    @XmlAttribute(name="dlq_expired")
+    public Boolean dlq_expired;
+
+    /**
      * To hold any other non-matching XML elements
      */
     @XmlAnyElement(lax=true)

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-bdb.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-bdb.xml?rev=1431014&r1=1431013&r2=1431014&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-bdb.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-bdb.xml Wed Jan  9 19:14:21 2013
@@ -26,7 +26,7 @@
       <subscription quota="10k"/>
     </topic>
 
-    <queue id="nacker.**" dlq="dlq.*" nak_limit="2"/>
+    <queue id="nacker.**" dlq="dlq.*" dlq_expired="true" nak_limit="2"/>
     <queue id="mirrored.**" mirrored="true"/>
     <topic id="queued.**" slow_consumer_policy="queue">
       <subscription tail_buffer="4k"/>

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml?rev=1431014&r1=1431013&r2=1431014&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml Wed Jan  9 19:14:21 2013
@@ -26,7 +26,7 @@
       <subscription quota="10k"/>
     </topic>
 
-    <queue id="nacker.**" dlq="dlq.*" nak_limit="2"/>
+    <queue id="nacker.**" dlq="dlq.*" dlq_expired="true" nak_limit="2"/>
     <queue id="mirrored.**" mirrored="true"/>
     <topic id="queued.**" slow_consumer_policy="queue">
       <subscription tail_buffer="4k"/>

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml?rev=1431014&r1=1431013&r2=1431014&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml Wed Jan  9 19:14:21 2013
@@ -26,7 +26,7 @@
       <subscription quota="10k"/>
     </topic>
 
-    <queue id="nacker.**" dlq="dlq.*" nak_limit="2"/>
+    <queue id="nacker.**" dlq="dlq.*" dlq_expired="true" nak_limit="2"/>
     <queue id="mirrored.**" mirrored="true"/>
     <topic id="queued.**" slow_consumer_policy="queue">
       <subscription tail_buffer="4k"/>

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala?rev=1431014&r1=1431013&r2=1431014&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala Wed Jan  9 19:14:21 2013
@@ -1061,6 +1061,24 @@ class StompParallelTest extends StompTes
     get("3")
   }
 
+  test("Expired message sent to DLQ") {
+    connect("1.1")
+
+    val now = System.currentTimeMillis()
+    var dest = "/queue/nacker.expires"
+    async_send(dest, "1")
+    async_send(dest, "2", "expires:"+(now+500)+"\n")
+    sync_send(dest, "3")
+
+    Thread.sleep(1000)
+    subscribe("a", dest)
+    assert_received("1", "a")
+    assert_received("3", "a")
+
+    subscribe("b", "/queue/dlq.nacker.expires")
+    assert_received("2", "b")
+  }
+
   test("Receipts on SEND to unconsummed topic") {
     connect("1.1")
 

Modified: activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md?rev=1431014&r1=1431013&r2=1431014&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md (original)
+++ activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md Wed Jan  9 19:14:21 2013
@@ -449,6 +449,9 @@ A `queue` element may be configured with
    configured or dropped.  If set to less than one, then the message
    will never be considered to be a poison message. Defaults to zero.
 
+* `dlq_expired`: Should expired messages be sent to the dead letter queue?  
+  Defaults to false.
+
 * `full_policy`: Once the queue is full, the `full_policy` 
   controls how the   queue behaves when additional messages attempt to 
   be enqueued onto the queue.