You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/01/09 20:14:21 UTC
svn commit: r1431014 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-dto/src/main/java/org/apache/activemq/apollo/dto/
apollo-stomp/src/test/resources/ apollo-stomp/src/test/scala/org/apache/...
Author: chirino
Date: Wed Jan 9 19:14:21 2013
New Revision: 1431014
URL: http://svn.apache.org/viewvc?rev=1431014&view=rev
Log:
Implements APLO-234 - Expired messages should be sent to DLQ
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueSettingsDTO.java
activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-bdb.xml
activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml
activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1431014&r1=1431013&r2=1431014&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jan 9 19:14:21 2013
@@ -233,6 +233,7 @@ class Queue(val router: LocalRouter, val
var full_policy:FullDropPolicy = Block
def dlq_nak_limit = OptionSupport(config.nak_limit).getOrElse(0)
+ def dlq_expired = OptionSupport(config.dlq_expired).getOrElse(false)
def message_group_graceful_handoff = OptionSupport(config.message_group_graceful_handoff).getOrElse(true)
@@ -541,9 +542,9 @@ class Queue(val router: LocalRouter, val
sub.close()
}
- if( dql_route!=null ) {
- val route = dql_route
- dql_route = null
+ if( dlq_route!=null ) {
+ val route = dlq_route
+ dlq_route = null
virtual_host.dispatch_queue {
router.disconnect(route.addresses, route)
}
@@ -582,6 +583,8 @@ class Queue(val router: LocalRouter, val
def is_topic_queue = resource_kind eq TopicQueueKind
+ def create_uow:StoreUOW = if(virtual_host.store==null) null else virtual_host.store.create_uow
+ def create_uow(uow:StoreUOW):StoreUOW = if(uow==null) create_uow else uow
object messages extends Sink[(Session[Delivery], Delivery)] {
def stall_check = {}
@@ -744,10 +747,20 @@ class Queue(val router: LocalRouter, val
}
}
- def expired(entry:QueueEntry):Unit = {
- expired_ts = now
- expired_item_counter += 1
- expired_size_counter += entry.size
+ def expired(uow:StoreUOW, entry:QueueEntry)(func: =>Unit):Unit = {
+ if( dlq_expired ) {
+ dead_letter(uow, entry) { uow =>
+ expired_ts = now
+ expired_item_counter += 1
+ expired_size_counter += entry.size
+ func
+ }
+ } else {
+ expired_ts = now
+ expired_item_counter += 1
+ expired_size_counter += entry.size
+ func
+ }
}
def display_stats: Unit = {
@@ -812,21 +825,25 @@ class Queue(val router: LocalRouter, val
case x:QueueEntry#SwappedRange =>
// load the range to expire the messages in it.
cur.load(null)
- case x:QueueEntry#Swapped =>
+ case state:QueueEntry#Swapped =>
// remove the expired message if it has not been
// acquired.
- if( !x.is_acquired ) {
- expired(cur)
- cur.dequeue(null)
- x.remove
+ if( !state.is_acquired ) {
+ val uow = create_uow
+ cur.dequeue(uow)
+ expired(uow, cur) {
+ state.remove
+ }
}
- case x:QueueEntry#Loaded =>
+ case state:QueueEntry#Loaded =>
// remove the expired message if it has not been
// acquired.
- if( !x.is_acquired ) {
- expired(cur)
- cur.dequeue(null)
- x.remove
+ if( !state.is_acquired ) {
+ val uow = create_uow
+ cur.dequeue(uow)
+ expired(uow, cur) {
+ state.remove
+ }
}
case _ =>
}
@@ -1036,10 +1053,10 @@ class Queue(val router: LocalRouter, val
override def connection = None
override def dispatch_queue = Queue.this.dispatch_queue
}
- var dql_route:DlqProducerRoute = _
+ var dlq_route:DlqProducerRoute = _
def dead_letter(original_uow:StoreUOW, entry:QueueEntry)(removeFunc: (StoreUOW)=>Unit) = {
-
+ assert_executing
if( config.dlq==null ) {
removeFunc(original_uow)
} else {
@@ -1047,21 +1064,24 @@ class Queue(val router: LocalRouter, val
def complete(delivery:Delivery) = {
delivery.uow = original_uow
delivery.ack = (result, uow) => {
- removeFunc(uow)
+ dispatch_queue {
+ removeFunc(uow)
+ }
}
+ delivery.expiration=0
- if( dql_route==null ) {
+ if( dlq_route==null ) {
val dlq = config.dlq.replaceAll(Pattern.quote("*"), id)
- dql_route = new DlqProducerRoute(Array(SimpleAddress("queue:"+dlq)))
+ dlq_route = new DlqProducerRoute(Array(SimpleAddress("queue:"+dlq)))
router.virtual_host.dispatch_queue {
- val rc = router.connect(dql_route.addresses, dql_route, null)
+ val rc = router.connect(dlq_route.addresses, dlq_route, null)
assert( rc == None ) // Not expecting this to ever fail.
- dql_route.dispatch_queue {
- dql_route.offer(delivery)
+ dlq_route.dispatch_queue {
+ dlq_route.offer(delivery)
}
}
} else {
- dql_route.offer(delivery)
+ dlq_route.offer(delivery)
}
}
@@ -1110,9 +1130,7 @@ class Queue(val router: LocalRouter, val
var limit = dlq_nak_limit
if( limit>0 && entry.entry.redelivery_count >= limit ) {
dead_letter(uow, entry.entry) { uow =>
- dispatch_queue {
- entry.ack(uow)
- }
+ entry.ack(uow)
}
} else {
entry.nack
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala?rev=1431014&r1=1431013&r2=1431014&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala Wed Jan 9 19:14:21 2013
@@ -202,14 +202,10 @@ class QueueEntry(val queue:Queue, val se
def dequeue(uow: StoreUOW) = {
if ( queued ) {
if (messageKey != -1) {
- val storeBatch = if( uow == null ) {
- queue.virtual_host.store.create_uow
- } else {
- uow
- }
- storeBatch.dequeue(toQueueEntryRecord)
+ val actual_uow = queue.create_uow(uow)
+ actual_uow.dequeue(toQueueEntryRecord)
if( uow == null ) {
- storeBatch.release
+ actual_uow.release
}
}
queue.dequeue_item_counter += 1
@@ -480,8 +476,8 @@ class QueueEntry(val queue:Queue, val se
if( !storing ) {
assert( delivery.storeKey == -1 )
- delivery.uow = queue.virtual_host.store.create_uow
- val uow = delivery.uow
+ val uow = queue.create_uow
+ delivery.uow = uow
delivery.storeLocator = new AtomicReference[Object]()
delivery.storeKey = uow.store(delivery.createMessageRecord )
store
@@ -572,9 +568,11 @@ class QueueEntry(val queue:Queue, val se
queue.assert_executing
if( !is_acquired && expiration != 0 && expiration <= queue.now ) {
- queue.expired(entry)
- entry.dequeue(null)
- remove
+ val uow = queue.create_uow
+ entry.dequeue(uow)
+ queue.expired(uow, entry) {
+ remove
+ }
return true
}
@@ -831,9 +829,11 @@ class QueueEntry(val queue:Queue, val se
queue.assert_executing
if( !is_acquired && expiration != 0 && expiration <= queue.now ) {
- queue.expired(entry)
- entry.dequeue(null)
- remove
+ val uow = queue.create_uow
+ entry.dequeue(uow)
+ queue.expired(uow, entry) {
+ remove
+ }
return true
}
Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueSettingsDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueSettingsDTO.java?rev=1431014&r1=1431013&r2=1431014&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueSettingsDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueSettingsDTO.java Wed Jan 9 19:14:21 2013
@@ -163,6 +163,12 @@ public class QueueSettingsDTO {
public Integer nak_limit;
/**
+ * Should expired messages be sent to the dead letter queue? Defaults to false.
+ */
+ @XmlAttribute(name="dlq_expired")
+ public Boolean dlq_expired;
+
+ /**
* To hold any other non-matching XML elements
*/
@XmlAnyElement(lax=true)
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-bdb.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-bdb.xml?rev=1431014&r1=1431013&r2=1431014&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-bdb.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-bdb.xml Wed Jan 9 19:14:21 2013
@@ -26,7 +26,7 @@
<subscription quota="10k"/>
</topic>
- <queue id="nacker.**" dlq="dlq.*" nak_limit="2"/>
+ <queue id="nacker.**" dlq="dlq.*" dlq_expired="true" nak_limit="2"/>
<queue id="mirrored.**" mirrored="true"/>
<topic id="queued.**" slow_consumer_policy="queue">
<subscription tail_buffer="4k"/>
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml?rev=1431014&r1=1431013&r2=1431014&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml Wed Jan 9 19:14:21 2013
@@ -26,7 +26,7 @@
<subscription quota="10k"/>
</topic>
- <queue id="nacker.**" dlq="dlq.*" nak_limit="2"/>
+ <queue id="nacker.**" dlq="dlq.*" dlq_expired="true" nak_limit="2"/>
<queue id="mirrored.**" mirrored="true"/>
<topic id="queued.**" slow_consumer_policy="queue">
<subscription tail_buffer="4k"/>
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml?rev=1431014&r1=1431013&r2=1431014&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml Wed Jan 9 19:14:21 2013
@@ -26,7 +26,7 @@
<subscription quota="10k"/>
</topic>
- <queue id="nacker.**" dlq="dlq.*" nak_limit="2"/>
+ <queue id="nacker.**" dlq="dlq.*" dlq_expired="true" nak_limit="2"/>
<queue id="mirrored.**" mirrored="true"/>
<topic id="queued.**" slow_consumer_policy="queue">
<subscription tail_buffer="4k"/>
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala?rev=1431014&r1=1431013&r2=1431014&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala Wed Jan 9 19:14:21 2013
@@ -1061,6 +1061,24 @@ class StompParallelTest extends StompTes
get("3")
}
+ test("Expired message sent to DLQ") {
+ connect("1.1")
+
+ val now = System.currentTimeMillis()
+ var dest = "/queue/nacker.expires"
+ async_send(dest, "1")
+ async_send(dest, "2", "expires:"+(now+500)+"\n")
+ sync_send(dest, "3")
+
+ Thread.sleep(1000)
+ subscribe("a", dest)
+ assert_received("1", "a")
+ assert_received("3", "a")
+
+ subscribe("b", "/queue/dlq.nacker.expires")
+ assert_received("2", "b")
+ }
+
test("Receipts on SEND to unconsummed topic") {
connect("1.1")
Modified: activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md?rev=1431014&r1=1431013&r2=1431014&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md (original)
+++ activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md Wed Jan 9 19:14:21 2013
@@ -449,6 +449,9 @@ A `queue` element may be configured with
configured or dropped. If set to less than one, then the message
will never be considered to be a poison message. Defaults to zero.
+* `dlq_expired`: Should expired messages be sent to the dead letter queue?
+ Defaults to false.
+
* `full_policy`: Once the queue is full, the `full_policy`
controls how the queue behaves when additional messages attempt to
be enqueued onto the queue.