You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/01/09 21:31:19 UTC
svn commit: r1431043 - in
/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker:
Queue.scala QueueEntry.scala
Author: chirino
Date: Wed Jan 9 20:31:18 2013
New Revision: 1431043
URL: http://svn.apache.org/viewvc?rev=1431043&view=rev
Log:
Further work for APLO-234 - Expired messages should be sent to DLQ
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1431043&r1=1431042&r2=1431043&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jan 9 20:31:18 2013
@@ -748,18 +748,20 @@ class Queue(val router: LocalRouter, val
}
def expired(uow:StoreUOW, entry:QueueEntry)(func: =>Unit):Unit = {
- if( dlq_expired ) {
- dead_letter(uow, entry) { uow =>
- expired_ts = now
- expired_item_counter += 1
- expired_size_counter += entry.size
- func
- }
+ if( entry.expiring ) {
+ func
} else {
+ entry.expiring = true
expired_ts = now
expired_item_counter += 1
expired_size_counter += entry.size
- func
+ if( dlq_expired ) {
+ dead_letter(uow, entry) { uow =>
+ func
+ }
+ } else {
+ func
+ }
}
}
@@ -820,7 +822,8 @@ class Queue(val router: LocalRouter, val
val next = cur.getNext
// handle expiration...
- if( cur.expiration != 0 && cur.expiration <= now ) {
+ if( !cur.expiring && cur.expiration != 0 && cur.expiration <= now ) {
+ val entry = cur
cur.state match {
case x:QueueEntry#SwappedRange =>
// load the range to expire the messages in it.
@@ -830,9 +833,11 @@ class Queue(val router: LocalRouter, val
// acquired.
if( !state.is_acquired ) {
val uow = create_uow
- cur.dequeue(uow)
- expired(uow, cur) {
- state.remove
+ entry.dequeue(uow)
+ expired(uow, entry) {
+ if( entry.isLinked ) {
+ entry.remove
+ }
}
}
case state:QueueEntry#Loaded =>
@@ -840,9 +845,11 @@ class Queue(val router: LocalRouter, val
// acquired.
if( !state.is_acquired ) {
val uow = create_uow
- cur.dequeue(uow)
- expired(uow, cur) {
- state.remove
+ entry.dequeue(uow)
+ expired(uow, entry) {
+ if( entry.isLinked ) {
+ entry.remove
+ }
}
}
case _ =>
@@ -1110,14 +1117,12 @@ class Queue(val router: LocalRouter, val
case (entry, consumed, uow) =>
consumed match {
case Consumed =>
-// debug("ack consumed: ("+store_id+","+entry.entry.seq+")")
entry.ack(uow)
case Expired=>
-// debug("ack expired: ("+store_id+","+entry.entry.seq+")")
- expired_ts = now
- expired_item_counter += 1
- expired_size_counter += entry.entry.size
- entry.ack(uow)
+ val actual = create_uow(uow)
+ expired(actual, entry.entry) {
+ entry.ack(actual)
+ }
case Delivered =>
entry.increment_nack
entry.entry.redelivered
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala?rev=1431043&r1=1431042&r2=1431043&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala Wed Jan 9 20:31:18 2013
@@ -192,6 +192,8 @@ class QueueEntry(val queue:Queue, val se
def dispatch() = state.dispatch
def memory_space = state.memory_space
+ var expiring = false
+
// These methods may cause a change in the current state.
def swap(asap:Boolean) = state.swap_out(asap)
def load(space:MemorySpace) = state.swap_in(space)
@@ -571,7 +573,9 @@ class QueueEntry(val queue:Queue, val se
val uow = queue.create_uow
entry.dequeue(uow)
queue.expired(uow, entry) {
- remove
+ if( isLinked ) {
+ remove
+ }
}
return true
}
@@ -832,7 +836,9 @@ class QueueEntry(val queue:Queue, val se
val uow = queue.create_uow
entry.dequeue(uow)
queue.expired(uow, entry) {
- remove
+ if( isLinked ) {
+ remove
+ }
}
return true
}