You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/06/22 11:43:40 UTC
svn commit: r1138361 - in /activemq/activemq-apollo/trunk:
apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/
apollo-broker/src/main/proto/
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-broker/src/main/scala...
Author: chirino
Date: Wed Jun 22 09:43:39 2011
New Revision: 1138361
URL: http://svn.apache.org/viewvc?rev=1138361&view=rev
Log:
Implemented message expiration.
Modified:
activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRange.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueMetricsDTO.java
activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala
activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala
activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade
activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
activemq/activemq-apollo/trunk/apollo-website/src/index.page
Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala?rev=1138361&r1=1138360&r2=1138361&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala Wed Jun 22 09:43:39 2011
@@ -399,6 +399,14 @@ class BDBClient(store: BDBStore) {
group.count += 1
group.size += entry.size
+ if(group.expiration == 0){
+ group.expiration = entry.expiration
+ } else {
+ if( entry.expiration != 0 ) {
+ group.expiration = entry.expiration.min(group.expiration)
+ }
+ }
+
if( group.count == limit) {
rc += group
group = null
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto?rev=1138361&r1=1138360&r2=1138361&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto Wed Jun 22 09:43:39 2011
@@ -48,4 +48,5 @@ message QueueEntryPB {
optional int32 size=4;
optional bytes attachment=5;
optional int32 redeliveries = 6;
+ optional sint64 expiration=7;
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1138361&r1=1138360&r2=1138361&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jun 22 09:43:39 2011
@@ -140,6 +140,21 @@ object Delivery extends Sizer[Delivery]
def size(value:Delivery):Int = value.size
}
+sealed trait DeliveryResult
+/** message was processed, does not need redelivery */
+object Delivered extends DeliveryResult
+/** message expired before it could be processed, does not need redelivery */
+object Expired extends DeliveryResult
+/**
+ * The receiver thinks the message was poison message, it was not successfully
+ * processed and it should not get redelivered..
+ */
+object Poisoned extends DeliveryResult
+/**
+ * The message was not consumed, it should be redelivered to another consumer ASAP.
+ */
+object Undelivered extends DeliveryResult
+
class Delivery {
/**
@@ -166,7 +181,7 @@ class Delivery {
* Set if the producer requires an ack to be sent back. Consumer
* should execute once the message is processed.
*/
- var ack:(Boolean, StoreUOW)=>Unit = null
+ var ack:(DeliveryResult, StoreUOW)=>Unit = null
def copy() = (new Delivery).set(this)
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1138361&r1=1138360&r2=1138361&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jun 22 09:43:39 2011
@@ -30,7 +30,6 @@ import org.fusesource.hawtdispatch.{List
import OptionSupport._
import security.SecurityContext
import org.apache.activemq.apollo.dto.{DestinationDTO, QueueDTO}
-import java.lang.String
object Queue extends Log {
val subcsription_counter = new AtomicInteger(0)
@@ -71,7 +70,7 @@ class Queue(val router: LocalRouter, val
ack_source.cancel
}
- val ack_source = createSource(new ListEventAggregator[(Subscription#AcquiredQueueEntry, Boolean, StoreUOW)](), dispatch_queue)
+ val ack_source = createSource(new ListEventAggregator[(Subscription#AcquiredQueueEntry, DeliveryResult, StoreUOW)](), dispatch_queue)
ack_source.setEventHandler(^ {drain_acks});
ack_source.resume
@@ -150,19 +149,23 @@ class Queue(val router: LocalRouter, val
configure(config)
}
- var last_maintenance_ts = System.currentTimeMillis
+ var now = System.currentTimeMillis
var enqueue_item_counter = 0L
var enqueue_size_counter = 0L
- var enqueue_ts = last_maintenance_ts;
+ var enqueue_ts = now;
var dequeue_item_counter = 0L
var dequeue_size_counter = 0L
- var dequeue_ts = last_maintenance_ts;
+ var dequeue_ts = now;
var nack_item_counter = 0L
var nack_size_counter = 0L
- var nack_ts = last_maintenance_ts;
+ var nack_ts = now;
+
+ var expired_item_counter = 0L
+ var expired_size_counter = 0L
+ var expired_ts = now;
def queue_size = enqueue_size_counter - dequeue_size_counter
def queue_items = enqueue_item_counter - dequeue_item_counter
@@ -227,7 +230,7 @@ class Queue(val router: LocalRouter, val
def check_idle {
if (producers.isEmpty && all_subscriptions.isEmpty && queue_items==0 ) {
if (idled_at==0) {
- val now = System.currentTimeMillis()
+ now = System.currentTimeMillis()
idled_at = now
if( auto_delete_after!=0 ) {
dispatch_queue.after(auto_delete_after, TimeUnit.SECONDS) {
@@ -327,6 +330,13 @@ class Queue(val router: LocalRouter, val
false
} else {
+ // Don't even enqueue if the message has expired.
+ val expiration = delivery.message.expiration
+ if( expiration != 0 && expiration <= now ) {
+ expired(delivery)
+ return true
+ }
+
val entry = tail_entry
tail_entry = new QueueEntry(Queue.this, next_message_seq)
val queueDelivery = delivery.copy
@@ -339,7 +349,7 @@ class Queue(val router: LocalRouter, val
entries.addLast(entry)
enqueue_item_counter += 1
enqueue_size_counter += entry.size
- enqueue_ts = last_maintenance_ts;
+ enqueue_ts = now;
// Do we need to do a persistent enqueue???
@@ -372,6 +382,24 @@ class Queue(val router: LocalRouter, val
}
}
+ def expired(delivery:Delivery):Unit = {
+ expired_ts = now
+ expired_item_counter += 1
+ expired_size_counter += delivery.size
+ }
+
+ def expired(entry:QueueEntry, dequeue:Boolean=true):Unit = {
+ if(dequeue) {
+ dequeue_item_counter += 1
+ dequeue_size_counter += entry.size
+ dequeue_ts = now
+ messages.refiller.run
+ }
+
+ expired_ts = now
+ expired_item_counter += 1
+ expired_size_counter += entry.size
+ }
def display_stats: Unit = {
info("contains: %d messages worth %,.2f MB of data, producers are %s, %d/%d buffer space used.", queue_items, (queue_size.toFloat / (1024 * 1024)), {if (messages.full) "being throttled" else "not being throttled"}, swapped_in_size, swapped_in_size_max)
@@ -416,11 +444,36 @@ class Queue(val router: LocalRouter, val
def swap_messages = {
- // reset the prefetch flags...
+ now = System.currentTimeMillis()
+
var cur = entries.getHead
while( cur!=null ) {
+
+ // reset the prefetch flags and handle expiration...
cur.prefetch_flags = 0
- cur = cur.getNext
+ val next = cur.getNext
+
+ // handle expiration...
+ if( cur.expiration != 0 && cur.expiration <= now ) {
+ cur.state match {
+ case x:QueueEntry#SwappedRange =>
+ // load the range to expire the messages in it.
+ cur.load
+ case x:QueueEntry#Swapped =>
+ // remove the expired swapped message.
+ expired(cur)
+ x.remove
+ case x:QueueEntry#Loaded =>
+ // remove the expired message if it has not been
+ // acquired.
+ if( !x.acquired ) {
+ expired(cur)
+ x.remove
+ }
+ case _ =>
+ }
+ }
+ cur = next
}
// Set the prefetch flags
@@ -482,7 +535,7 @@ class Queue(val router: LocalRouter, val
def schedule_periodic_maintenance:Unit = dispatch_queue.after(1, TimeUnit.SECONDS) {
if( service_state.is_started ) {
- last_maintenance_ts = System.currentTimeMillis
+ now = System.currentTimeMillis
// target tune_min_subscription_rate / sec
all_subscriptions.foreach{ case (consumer, sub)=>
@@ -514,10 +567,13 @@ class Queue(val router: LocalRouter, val
def drain_acks = {
ack_source.getData.foreach {
case (entry, consumed, tx) =>
- if( consumed ) {
- entry.ack(tx)
- } else {
- entry.nack
+ consumed match {
+ case Delivered => entry.ack(tx)
+ case Expired =>
+ entry.entry.queue.expired(entry.entry, false)
+ entry.ack(tx)
+ case Poisoned => entry.nack
+ case Undelivered => entry.nack
}
}
messages.refiller.run
@@ -624,7 +680,7 @@ class Queue(val router: LocalRouter, val
}
}
- def unbind(values: List[DeliveryConsumer]) = dispatch_queue {
+ def unbind(values: List[DeliveryConsumer]):Unit = dispatch_queue {
for (consumer <- values) {
all_subscriptions.get(consumer) match {
case Some(subscription) =>
@@ -639,7 +695,7 @@ class Queue(val router: LocalRouter, val
def bind(destination:DestinationDTO, consumer: DeliveryConsumer) = {
bind(consumer::Nil)
}
- def unbind(consumer: DeliveryConsumer, persistent:Boolean) = {
+ def unbind(consumer: DeliveryConsumer, persistent:Boolean):Unit = {
unbind(consumer::Nil)
}
@@ -763,12 +819,12 @@ class QueueEntry(val queue:Queue, val se
}
def init(qer:QueueEntryRecord):QueueEntry = {
- state = new Swapped(qer.message_key, qer.size)
+ state = new Swapped(qer.message_key, qer.size, qer.expiration)
this
}
def init(range:QueueEntryRange):QueueEntry = {
- state = new SwappedRange(range.last_entry_seq, range.count, range.size)
+ state = new SwappedRange(range.last_entry_seq, range.count, range.size, range.expiration)
this
}
@@ -780,9 +836,14 @@ class QueueEntry(val queue:Queue, val se
*/
def run() = {
queue.assert_executing
- var next = this;
- while( next!=null && next.dispatch) {
- next = next.getNext
+ var cur = this;
+ while( cur!=null && cur.isLinked ) {
+ val next = cur.getNext
+ cur = if( cur.dispatch ) {
+ next
+ } else {
+ null
+ }
}
}
@@ -818,6 +879,7 @@ class QueueEntry(val queue:Queue, val se
qer.entry_seq = seq
qer.message_key = state.message_key
qer.size = state.size
+ qer.expiration = expiration
qer
}
@@ -851,6 +913,7 @@ class QueueEntry(val queue:Queue, val se
// These should not change the current state.
def count = state.count
def size = state.size
+ def expiration = state.expiration
def messageKey = state.message_key
def is_swapped_or_swapping_out = state.is_swapped_or_swapping_out
def dispatch() = state.dispatch
@@ -885,6 +948,11 @@ class QueueEntry(val queue:Queue, val se
def size = 0
/**
+ * When the entry expires or 0 if it does not expire.
+ */
+ def expiration = 0L
+
+ /**
* Gets number of messages that this entry represents
*/
def count = 0
@@ -1023,6 +1091,7 @@ class QueueEntry(val queue:Queue, val se
override def count = 1
override def size = delivery.size
+ override def expiration = delivery.message.expiration
override def message_key = delivery.storeKey
var remove_pending = false
@@ -1097,7 +1166,7 @@ class QueueEntry(val queue:Queue, val se
queue.swap_out_size_counter += size
queue.swap_out_item_counter += 1
- state = new Swapped(delivery.storeKey, size)
+ state = new Swapped(delivery.storeKey, size, expiration)
if( can_combine_with_prev ) {
getPrevious.as_swapped_range.combineNext
}
@@ -1136,6 +1205,12 @@ class QueueEntry(val queue:Queue, val se
queue.assert_executing
+ if( !acquired && expiration != 0 && expiration <= queue.now ) {
+ queue.expired(entry)
+ remove
+ return true
+ }
+
// Nothing to dispatch if we don't have subs..
if( parked.isEmpty ) {
return false
@@ -1236,7 +1311,7 @@ class QueueEntry(val queue:Queue, val se
* entry is persisted, it can move into this state. This state only holds onto the
* the massage key so that it can reload the message from the store quickly when needed.
*/
- class Swapped(override val message_key:Long, override val size:Int) extends EntryState {
+ class Swapped(override val message_key:Long, override val size:Int, override val expiration:Long) extends EntryState {
queue.individual_swapped_items += 1
@@ -1325,7 +1400,7 @@ class QueueEntry(val queue:Queue, val se
queue.swapping_in_size -= size
}
queue.individual_swapped_items -= 1
- state = new SwappedRange(seq, 1, size)
+ state = new SwappedRange(seq, 1, size, expiration)
}
}
@@ -1345,10 +1420,13 @@ class QueueEntry(val queue:Queue, val se
/** the number of items in the range */
var _count:Int,
/** size in bytes of the range */
- var _size:Int) extends EntryState {
+ var _size:Int,
+ var _expiration:Long) extends EntryState {
+
override def count = _count
override def size = _size
+ override def expiration = _expiration
var swapping_in = false
@@ -1424,15 +1502,20 @@ class QueueEntry(val queue:Queue, val se
assert(last < value.seq )
last = value.seq
_count += 1
- _size += value.size
- value.remove
} else if( value.is_swapped_range ) {
assert(last < value.seq )
last = value.as_swapped_range.last
_count += value.as_swapped_range.count
- _size += value.size
- value.remove
}
+ if(_expiration == 0){
+ _expiration = value.expiration
+ } else {
+ if( value.expiration != 0 ) {
+ _expiration = value.expiration.min(_expiration)
+ }
+ }
+ _size += value.size
+ value.remove
}
}
@@ -1652,7 +1735,7 @@ class Subscription(val queue:Queue, val
queue.dequeue_item_counter += 1
queue.dequeue_size_counter += entry.size
- queue.dequeue_ts = queue.last_maintenance_ts
+ queue.dequeue_ts = queue.now
// removes this entry from the acquired list.
unlink()
@@ -1685,7 +1768,7 @@ class Subscription(val queue:Queue, val
// track for stats
queue.nack_item_counter += 1
queue.nack_size_counter += entry.size
- queue.nack_ts = queue.last_maintenance_ts
+ queue.nack_ts = queue.now
// The following does not need to get done for exclusive subs because
// they end up rewinding all the sub of the head of the queue.
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1138361&r1=1138360&r2=1138361&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Wed Jun 22 09:43:39 2011
@@ -156,7 +156,7 @@ abstract class DeliveryProducerRoute(val
// Dispatch.
//
- var pendingAck: (Boolean, StoreUOW)=>Unit = null
+ var pendingAck: (DeliveryResult, StoreUOW)=>Unit = null
var overflow:Delivery=null
var overflowSessions = List[DeliverySession]()
var refiller:Runnable=null
@@ -207,11 +207,11 @@ abstract class DeliveryProducerRoute(val
if (delivery.uow != null) {
val ack = pendingAck
delivery.uow.on_complete {
- ack(true, null)
+ ack(Delivered, null)
}
} else {
- pendingAck(true, null)
+ pendingAck(Delivered, null)
}
pendingAck==null
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala?rev=1138361&r1=1138360&r2=1138361&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala Wed Jun 22 09:43:39 2011
@@ -32,7 +32,8 @@ object PBSupport {
pb.setProtocol(v.protocol)
pb.setSize(v.size)
pb.setValue(v.buffer)
- pb.setExpiration(v.expiration)
+ if(v.expiration!=0)
+ pb.setExpiration(v.expiration)
pb.freeze
}
@@ -84,7 +85,10 @@ object PBSupport {
pb.setMessageKey(v.message_key)
pb.setAttachment(v.attachment)
pb.setSize(v.size)
- pb.setRedeliveries(v.redeliveries)
+ if(v.expiration!=0)
+ pb.setExpiration(v.expiration)
+ if(v.redeliveries!=0)
+ pb.setRedeliveries(v.redeliveries)
pb.freeze
}
@@ -95,6 +99,7 @@ object PBSupport {
rc.message_key = pb.getMessageKey
rc.attachment = pb.getAttachment
rc.size = pb.getSize
+ rc.expiration = pb.getExpiration
rc.redeliveries = pb.getRedeliveries.toShort
rc
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRange.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRange.scala?rev=1138361&r1=1138360&r2=1138361&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRange.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRange.scala Wed Jun 22 09:43:39 2011
@@ -25,4 +25,5 @@ class QueueEntryRange {
var last_entry_seq = 0L
var count = 0
var size = 0
+ var expiration = 0L
}
\ No newline at end of file
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala?rev=1138361&r1=1138360&r2=1138361&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala Wed Jun 22 09:43:39 2011
@@ -31,6 +31,7 @@ class QueueEntryRecord {
var message_key = 0L
var attachment:Buffer = _
var size = 0
+ var expiration = 0L
var redeliveries:Short = 0
}
Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueMetricsDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueMetricsDTO.java?rev=1138361&r1=1138360&r2=1138361&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueMetricsDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueMetricsDTO.java Wed Jun 22 09:43:39 2011
@@ -84,6 +84,25 @@ public class QueueMetricsDTO {
public long dequeue_ts;
/**
+ * The number of messages which expired before they could be processed.
+ */
+ @XmlAttribute(name="expired_item_counter")
+ public long expired_item_counter;
+
+ /**
+ * The total size in bytes of messages which expired before
+ * they could be processed.
+ */
+ @XmlAttribute(name="expired_size_counter")
+ public long expired_size_counter;
+
+ /**
+ * The time stamp of when the last message expiration occurred.
+ */
+ @XmlAttribute(name="expired_ts")
+ public long expired_ts;
+
+ /**
* The number of messages that were delivered to
* a consumer but which the consumer did not successfully process.
*/
Modified: activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala?rev=1138361&r1=1138360&r2=1138361&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala Wed Jun 22 09:43:39 2011
@@ -311,6 +311,16 @@ class HawtDBClient(hawtDBStore: HawtDBSt
group.last_entry_seq = entry.getKey.longValue
group.count += 1
group.size += entry.getValue.getSize
+
+// TODO:
+// if(group.expiration == 0){
+// group.expiration = entry.expiration
+// } else {
+// if( entry.expiration != 0 ) {
+// group.expiration = entry.expiration.min(group.expiration)
+// }
+// }
+
if( group.count == limit) {
rc += group
group = null
Modified: activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala?rev=1138361&r1=1138360&r2=1138361&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala Wed Jun 22 09:43:39 2011
@@ -427,6 +427,14 @@ class JDBM2Client(store: JDBM2Store) {
group.count += 1
group.size += entry.size
+ if(group.expiration == 0){
+ group.expiration = entry.expiration
+ } else {
+ if( entry.expiration != 0 ) {
+ group.expiration = entry.expiration.min(group.expiration)
+ }
+ }
+
if( group.count == limit) {
rc += group
group = null
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1138361&r1=1138360&r2=1138361&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Wed Jun 22 09:43:39 2011
@@ -853,9 +853,9 @@ class OpenwireProtocolHandler extends Pr
object ack_handler {
// TODO: Need to validate all the range ack cases...
- var consumer_acks = ListBuffer[(MessageId, (Boolean, StoreUOW)=>Unit)]()
+ var consumer_acks = ListBuffer[(MessageId, (DeliveryResult, StoreUOW)=>Unit)]()
- def track(id:MessageId, callback:(Boolean, StoreUOW)=>Unit) = {
+ def track(id:MessageId, callback:(DeliveryResult, StoreUOW)=>Unit) = {
queue {
consumer_acks += (( id, callback ))
}
@@ -881,7 +881,7 @@ class OpenwireProtocolHandler extends Pr
consumer_acks = not_acked
acked.foreach{case (_, callback)=>
if( callback!=null ) {
- callback(true, uow)
+ callback(Delivered, uow)
}
}
}
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=1138361&r1=1138360&r2=1138361&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala Wed Jun 22 09:43:39 2011
@@ -58,7 +58,7 @@ case class StompFrameMessage(frame:Stomp
* a positive value indicates that the delivery has an expiration
* time.
*/
- var expiration: Long = -1;
+ var expiration: Long = 0;
/**
* true if the delivery is persistent
@@ -71,7 +71,7 @@ case class StompFrameMessage(frame:Stomp
id = value
case (PRIORITY, value) =>
priority = java.lang.Integer.parseInt(value).toByte
- case (EXPIRATION_TIME, value) =>
+ case (EXPIRES, value) =>
expiration = java.lang.Long.parseLong(value)
case (PERSISTENT, value) =>
persistent = java.lang.Boolean.parseBoolean(value)
@@ -370,7 +370,7 @@ object Stomp {
val DESTINATION = ascii("destination")
val CORRELATION_ID = ascii("correlation-id")
val REPLY_TO = ascii("reply-to")
- val EXPIRATION_TIME = ascii("expires")
+ val EXPIRES = ascii("expires")
val PRIORITY = ascii("priority")
val TYPE = ascii("type")
val PERSISTENT = ascii("persistent")
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1138361&r1=1138360&r2=1138361&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Wed Jun 22 09:43:39 2011
@@ -126,24 +126,24 @@ class StompProtocolHandler extends Proto
trait AckHandler {
def track(delivery:Delivery):Unit
- def perform_ack(consumed:Boolean, msgid: AsciiBuffer, uow:StoreUOW=null):Unit
+ def perform_ack(consumed:DeliveryResult, msgid: AsciiBuffer, uow:StoreUOW=null):Unit
}
class AutoAckHandler extends AckHandler {
def track(delivery:Delivery) = {
if( delivery.ack!=null ) {
- delivery.ack(true, null)
+ delivery.ack(Delivered, null)
}
}
- def perform_ack(consumed:Boolean, msgid: AsciiBuffer, uow:StoreUOW=null) = {
+ def perform_ack(consumed:DeliveryResult, msgid: AsciiBuffer, uow:StoreUOW=null) = {
async_die("The subscription ack mode does not expect ACK or NACK frames")
}
}
class SessionAckHandler extends AckHandler{
- var consumer_acks = ListBuffer[(AsciiBuffer, (Boolean, StoreUOW)=>Unit)]()
+ var consumer_acks = ListBuffer[(AsciiBuffer, (DeliveryResult, StoreUOW)=>Unit)]()
def track(delivery:Delivery) = {
queue.apply {
@@ -157,7 +157,7 @@ class StompProtocolHandler extends Proto
}
- def perform_ack(consumed:Boolean, msgid: AsciiBuffer, uow:StoreUOW=null) = {
+ def perform_ack(consumed:DeliveryResult, msgid: AsciiBuffer, uow:StoreUOW=null) = {
// session acks ack all previously recieved messages..
var found = false
@@ -190,7 +190,7 @@ class StompProtocolHandler extends Proto
}
class MessageAckHandler extends AckHandler {
- var consumer_acks = HashMap[AsciiBuffer, (Boolean, StoreUOW)=>Unit]()
+ var consumer_acks = HashMap[AsciiBuffer, (DeliveryResult, StoreUOW)=>Unit]()
def track(delivery:Delivery) = {
queue.apply {
@@ -202,7 +202,7 @@ class StompProtocolHandler extends Proto
}
}
- def perform_ack(consumed:Boolean, msgid: AsciiBuffer, uow:StoreUOW=null) = {
+ def perform_ack(consumed:DeliveryResult, msgid: AsciiBuffer, uow:StoreUOW=null) = {
consumer_acks.remove(msgid) match {
case Some(ack) =>
if( ack!=null ) {
@@ -1012,14 +1012,14 @@ class StompProtocolHandler extends Proto
}
def on_stomp_ack(frame:StompFrame):Unit = {
- on_stomp_ack(frame.headers, true)
+ on_stomp_ack(frame.headers, Delivered)
}
def on_stomp_nack(frame:StompFrame):Unit = {
- on_stomp_ack(frame.headers, false)
+ on_stomp_ack(frame.headers, Undelivered)
}
- def on_stomp_ack(headers:HeaderMap, consumed:Boolean):Unit = {
+ def on_stomp_ack(headers:HeaderMap, consumed:DeliveryResult):Unit = {
val messageId = get(headers, MESSAGE_ID).getOrElse(die("message id header not set"))
val subscription_id = get(headers, SUBSCRIPTION);
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1138361&r1=1138360&r2=1138361&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala Wed Jun 22 09:43:39 2011
@@ -1508,3 +1508,46 @@ class CustomStompWildcardTest extends St
override val broker_config_uri: String = "xml:classpath:apollo-stomp-custom-dest-delimiters.xml"
override def path_separator = "/"
}
+
+class StompExpirationTest extends StompTestSupport {
+
+ def path_separator = "."
+
+ test("Messages Expire") {
+ connect("1.1")
+
+ def put(msg:String, ttl:Option[Long]=None) = {
+ val expires_header = ttl.map("expires:"+System.currentTimeMillis()+_+"\n").getOrElse("")
+ client.write(
+ "SEND\n" +
+ expires_header +
+ "destination:/queue/exp\n" +
+ "\n" +
+ "message:"+msg+"\n")
+ }
+
+ put("1")
+ put("2", Some(1000L))
+ put("3")
+
+ Thread.sleep(2000)
+
+ client.write(
+ "SUBSCRIBE\n" +
+ "destination:/queue/exp\n" +
+ "id:1\n" +
+ "receipt:0\n"+
+ "\n")
+ wait_for_receipt("0")
+
+
+ def get(dest:String) = {
+ val frame = client.receive()
+ frame should startWith("MESSAGE\n")
+ frame should endWith("\n\nmessage:%s\n".format(dest))
+ }
+
+ get("1")
+ get("3")
+ }
+}
Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala?rev=1138361&r1=1138360&r2=1138361&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala Wed Jun 22 09:43:39 2011
@@ -182,6 +182,10 @@ case class BrokerResource() extends Reso
rc.nack_size_counter += q.nack_size_counter
rc.nack_ts = rc.nack_ts max q.nack_ts
+ rc.expired_item_counter += q.expired_item_counter
+ rc.expired_size_counter += q.expired_size_counter
+ rc.expired_ts = rc.expired_ts max q.expired_ts
+
rc.queue_size += q.queue_size
rc.queue_items += q.queue_items
@@ -699,6 +703,10 @@ case class BrokerResource() extends Reso
rc.nack_size_counter = q.nack_size_counter
rc.nack_ts = q.nack_ts
+ rc.expired_item_counter = q.expired_item_counter
+ rc.expired_size_counter = q.expired_size_counter
+ rc.expired_ts = q.expired_ts
+
rc.queue_size = q.queue_size
rc.queue_items = q.queue_items
Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade?rev=1138361&r1=1138360&r2=1138361&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade Wed Jun 22 09:43:39 2011
@@ -54,6 +54,8 @@ p dequeued: #{metrics.dequeue_item_count
p nacked: #{metrics.nack_item_counter} messages (#{memory(metrics.nack_size_counter)}), #{uptime(metrics.nack_ts)} ago
+p expired: #{metrics.expired_item_counter} messages (#{memory(metrics.expired_size_counter)}), #{uptime(metrics.expired_ts)} ago
+
h2 Swap Metrics
p swapped in: #{metrics.swapped_in_items} messages #{memory(metrics.swapped_in_size)}
Modified: activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md?rev=1138361&r1=1138360&r2=1138361&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md (original)
+++ activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md Wed Jun 22 09:43:39 2011
@@ -1098,6 +1098,22 @@ ack mode to consume reliable messages. A
client which have not been acked when the client disconnects will get
redelivered to another subscribed client.
+### Message Expiration
+
+${project_name} supports expiring old messages. Unconsumed expired messages
+are automatically removed from the queue. You just need to specify when
+the message expires by setting the `expires` message header. The expiration
+time must be specified as the number of milliseconds since the Unix epoch.
+
+Example:
+
+ SEND
+ destination:/queue/a
+ expires:1308690148000
+
+ this message will expire on Tue Jun 21 17:02:28 EDT 2011
+ ^@
+
### Topic Durable Subscriptions
A durable subscription is a queue which is subscribed to a topic so that
Modified: activemq/activemq-apollo/trunk/apollo-website/src/index.page
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-website/src/index.page?rev=1138361&r1=1138360&r2=1138361&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-website/src/index.page (original)
+++ activemq/activemq-apollo/trunk/apollo-website/src/index.page Wed Jun 22 09:43:39 2011
@@ -43,6 +43,7 @@ ActiveMQ 5.x JMS clients.
* [Queue Browsers](documentation/user-manual.html#Browsing_Subscriptions)
* [Durable Subscriptions on Topics](documentation/user-manual.html#Topic_Durable_Subscriptions)
* [Reliable Messaging](documentation/user-manual.html#Reliable_Messaging)
+* [Message Expiration](documentation/user-manual.html#Message_Expiration)
* [Message Swapping](documentation/architecture.html#Message_Swapping)
* [Message Selectors](documentation/user-manual.html#Message_Selectors)
* [JAAS Authentication](documentation/user-manual.html#Authentication)