You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/06/22 11:43:40 UTC

svn commit: r1138361 - in /activemq/activemq-apollo/trunk: apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/ apollo-broker/src/main/proto/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-broker/src/main/scala...

Author: chirino
Date: Wed Jun 22 09:43:39 2011
New Revision: 1138361

URL: http://svn.apache.org/viewvc?rev=1138361&view=rev
Log:
Implemented message expiration.

Modified:
    activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRange.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueMetricsDTO.java
    activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala
    activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
    activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala
    activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade
    activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
    activemq/activemq-apollo/trunk/apollo-website/src/index.page

Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala?rev=1138361&r1=1138360&r2=1138361&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala Wed Jun 22 09:43:39 2011
@@ -399,6 +399,14 @@ class BDBClient(store: BDBStore) {
           group.count += 1
           group.size += entry.size
 
+          if(group.expiration == 0){
+            group.expiration = entry.expiration
+          } else {
+            if( entry.expiration != 0 ) {
+              group.expiration = entry.expiration.min(group.expiration)
+            }
+          }
+
           if( group.count == limit) {
             rc += group
             group = null

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto?rev=1138361&r1=1138360&r2=1138361&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto Wed Jun 22 09:43:39 2011
@@ -48,4 +48,5 @@ message QueueEntryPB {
   optional int32 size=4;
   optional bytes attachment=5;
   optional int32 redeliveries = 6;
+  optional sint64 expiration=7;
 }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1138361&r1=1138360&r2=1138361&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jun 22 09:43:39 2011
@@ -140,6 +140,21 @@ object Delivery extends Sizer[Delivery] 
   def size(value:Delivery):Int = value.size
 }
 
+sealed trait DeliveryResult
+/** message was processed, does not need redelivery */
+object Delivered extends DeliveryResult
+/** message expired before it could be processed, does not need redelivery */
+object Expired extends DeliveryResult
+/**
+  * The receiver thinks the message was poison message, it was not successfully
+  * processed and it should not get redelivered..
+  */
+object Poisoned extends DeliveryResult
+/**
+  * The message was not consumed, it should be redelivered to another consumer ASAP.
+  */
+object Undelivered extends DeliveryResult
+
 class Delivery {
 
   /**
@@ -166,7 +181,7 @@ class Delivery {
    * Set if the producer requires an ack to be sent back.  Consumer
    * should execute once the message is processed.
    */
-  var ack:(Boolean, StoreUOW)=>Unit = null
+  var ack:(DeliveryResult, StoreUOW)=>Unit = null
 
   def copy() = (new Delivery).set(this)
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1138361&r1=1138360&r2=1138361&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jun 22 09:43:39 2011
@@ -30,7 +30,6 @@ import org.fusesource.hawtdispatch.{List
 import OptionSupport._
 import security.SecurityContext
 import org.apache.activemq.apollo.dto.{DestinationDTO, QueueDTO}
-import java.lang.String
 
 object Queue extends Log {
   val subcsription_counter = new AtomicInteger(0)
@@ -71,7 +70,7 @@ class Queue(val router: LocalRouter, val
     ack_source.cancel
   }
 
-  val ack_source = createSource(new ListEventAggregator[(Subscription#AcquiredQueueEntry, Boolean, StoreUOW)](), dispatch_queue)
+  val ack_source = createSource(new ListEventAggregator[(Subscription#AcquiredQueueEntry, DeliveryResult, StoreUOW)](), dispatch_queue)
   ack_source.setEventHandler(^ {drain_acks});
   ack_source.resume
 
@@ -150,19 +149,23 @@ class Queue(val router: LocalRouter, val
     configure(config)
   }
 
-  var last_maintenance_ts = System.currentTimeMillis
+  var now = System.currentTimeMillis
 
   var enqueue_item_counter = 0L
   var enqueue_size_counter = 0L
-  var enqueue_ts = last_maintenance_ts;
+  var enqueue_ts = now;
 
   var dequeue_item_counter = 0L
   var dequeue_size_counter = 0L
-  var dequeue_ts = last_maintenance_ts;
+  var dequeue_ts = now;
 
   var nack_item_counter = 0L
   var nack_size_counter = 0L
-  var nack_ts = last_maintenance_ts;
+  var nack_ts = now;
+
+  var expired_item_counter = 0L
+  var expired_size_counter = 0L
+  var expired_ts = now;
 
   def queue_size = enqueue_size_counter - dequeue_size_counter
   def queue_items = enqueue_item_counter - dequeue_item_counter
@@ -227,7 +230,7 @@ class Queue(val router: LocalRouter, val
   def check_idle {
     if (producers.isEmpty && all_subscriptions.isEmpty && queue_items==0 ) {
       if (idled_at==0) {
-        val now = System.currentTimeMillis()
+        now = System.currentTimeMillis()
         idled_at = now
         if( auto_delete_after!=0 ) {
           dispatch_queue.after(auto_delete_after, TimeUnit.SECONDS) {
@@ -327,6 +330,13 @@ class Queue(val router: LocalRouter, val
         false
       } else {
 
+        // Don't even enqueue if the message has expired.
+        val expiration = delivery.message.expiration
+        if( expiration != 0 && expiration <= now ) {
+          expired(delivery)
+          return true
+        }
+
         val entry = tail_entry
         tail_entry = new QueueEntry(Queue.this, next_message_seq)
         val queueDelivery = delivery.copy
@@ -339,7 +349,7 @@ class Queue(val router: LocalRouter, val
         entries.addLast(entry)
         enqueue_item_counter += 1
         enqueue_size_counter += entry.size
-        enqueue_ts = last_maintenance_ts;
+        enqueue_ts = now;
 
 
         // Do we need to do a persistent enqueue???
@@ -372,6 +382,24 @@ class Queue(val router: LocalRouter, val
     }
   }
 
+  def expired(delivery:Delivery):Unit = {
+    expired_ts = now
+    expired_item_counter += 1
+    expired_size_counter += delivery.size
+  }
+
+  def expired(entry:QueueEntry, dequeue:Boolean=true):Unit = {
+    if(dequeue) {
+      dequeue_item_counter += 1
+      dequeue_size_counter += entry.size
+      dequeue_ts = now
+      messages.refiller.run
+    }
+
+    expired_ts = now
+    expired_item_counter += 1
+    expired_size_counter += entry.size
+  }
 
   def display_stats: Unit = {
     info("contains: %d messages worth %,.2f MB of data, producers are %s, %d/%d buffer space used.", queue_items, (queue_size.toFloat / (1024 * 1024)), {if (messages.full) "being throttled" else "not being throttled"}, swapped_in_size, swapped_in_size_max)
@@ -416,11 +444,36 @@ class Queue(val router: LocalRouter, val
 
   def swap_messages = {
 
-    // reset the prefetch flags...
+    now = System.currentTimeMillis()
+
     var cur = entries.getHead
     while( cur!=null ) {
+
+      // reset the prefetch flags and handle expiration...
       cur.prefetch_flags = 0
-      cur = cur.getNext
+      val next = cur.getNext
+
+      // handle expiration...
+      if( cur.expiration != 0 && cur.expiration <= now ) {
+        cur.state match {
+          case x:QueueEntry#SwappedRange =>
+            // load the range to expire the messages in it.
+            cur.load
+          case x:QueueEntry#Swapped =>
+            // remove the expired swapped message.
+            expired(cur)
+            x.remove
+          case x:QueueEntry#Loaded =>
+            // remove the expired message if it has not been
+            // acquired.
+            if( !x.acquired ) {
+              expired(cur)
+              x.remove
+            }
+          case _ =>
+        }
+      }
+      cur = next
     }
 
     // Set the prefetch flags
@@ -482,7 +535,7 @@ class Queue(val router: LocalRouter, val
 
   def schedule_periodic_maintenance:Unit = dispatch_queue.after(1, TimeUnit.SECONDS) {
     if( service_state.is_started ) {
-      last_maintenance_ts = System.currentTimeMillis
+      now = System.currentTimeMillis
 
       // target tune_min_subscription_rate / sec
       all_subscriptions.foreach{ case (consumer, sub)=>
@@ -514,10 +567,13 @@ class Queue(val router: LocalRouter, val
   def drain_acks = {
     ack_source.getData.foreach {
       case (entry, consumed, tx) =>
-        if( consumed ) {
-          entry.ack(tx)
-        } else {
-          entry.nack
+        consumed match {
+          case Delivered   => entry.ack(tx)
+          case Expired     =>
+            entry.entry.queue.expired(entry.entry, false)
+            entry.ack(tx)
+          case Poisoned    => entry.nack
+          case Undelivered => entry.nack
         }
     }
     messages.refiller.run
@@ -624,7 +680,7 @@ class Queue(val router: LocalRouter, val
     }
   }
 
-  def unbind(values: List[DeliveryConsumer]) = dispatch_queue {
+  def unbind(values: List[DeliveryConsumer]):Unit = dispatch_queue {
     for (consumer <- values) {
       all_subscriptions.get(consumer) match {
         case Some(subscription) =>
@@ -639,7 +695,7 @@ class Queue(val router: LocalRouter, val
   def bind(destination:DestinationDTO, consumer: DeliveryConsumer) = {
     bind(consumer::Nil)
   }
-  def unbind(consumer: DeliveryConsumer, persistent:Boolean) = {
+  def unbind(consumer: DeliveryConsumer, persistent:Boolean):Unit = {
     unbind(consumer::Nil)
   }
 
@@ -763,12 +819,12 @@ class QueueEntry(val queue:Queue, val se
   }
 
   def init(qer:QueueEntryRecord):QueueEntry = {
-    state = new Swapped(qer.message_key, qer.size)
+    state = new Swapped(qer.message_key, qer.size, qer.expiration)
     this
   }
 
   def init(range:QueueEntryRange):QueueEntry = {
-    state = new SwappedRange(range.last_entry_seq, range.count, range.size)
+    state = new SwappedRange(range.last_entry_seq, range.count, range.size, range.expiration)
     this
   }
 
@@ -780,9 +836,14 @@ class QueueEntry(val queue:Queue, val se
    */
   def run() = {
     queue.assert_executing
-    var next = this;
-    while( next!=null && next.dispatch) {
-      next = next.getNext
+    var cur = this;
+    while( cur!=null && cur.isLinked ) {
+      val next = cur.getNext
+      cur = if( cur.dispatch ) {
+        next
+      } else {
+        null
+      }
     }
   }
 
@@ -818,6 +879,7 @@ class QueueEntry(val queue:Queue, val se
     qer.entry_seq = seq
     qer.message_key = state.message_key
     qer.size = state.size
+    qer.expiration = expiration
     qer
   }
 
@@ -851,6 +913,7 @@ class QueueEntry(val queue:Queue, val se
   // These should not change the current state.
   def count = state.count
   def size = state.size
+  def expiration = state.expiration
   def messageKey = state.message_key
   def is_swapped_or_swapping_out = state.is_swapped_or_swapping_out
   def dispatch() = state.dispatch
@@ -885,6 +948,11 @@ class QueueEntry(val queue:Queue, val se
     def size = 0
 
     /**
+     * When the entry expires or 0 if it does not expire.
+     */
+    def expiration = 0L
+
+    /**
      * Gets number of messages that this entry represents
      */
     def count = 0
@@ -1023,6 +1091,7 @@ class QueueEntry(val queue:Queue, val se
 
     override def count = 1
     override def size = delivery.size
+    override def expiration = delivery.message.expiration
     override def message_key = delivery.storeKey
     var remove_pending = false
 
@@ -1097,7 +1166,7 @@ class QueueEntry(val queue:Queue, val se
         queue.swap_out_size_counter += size
         queue.swap_out_item_counter += 1
 
-        state = new Swapped(delivery.storeKey, size)
+        state = new Swapped(delivery.storeKey, size, expiration)
         if( can_combine_with_prev ) {
           getPrevious.as_swapped_range.combineNext
         }
@@ -1136,6 +1205,12 @@ class QueueEntry(val queue:Queue, val se
 
       queue.assert_executing
 
+      if( !acquired && expiration != 0 && expiration <= queue.now ) {
+        queue.expired(entry)
+        remove
+        return true
+      }
+
       // Nothing to dispatch if we don't have subs..
       if( parked.isEmpty ) {
         return false
@@ -1236,7 +1311,7 @@ class QueueEntry(val queue:Queue, val se
    * entry is persisted, it can move into this state.  This state only holds onto the
    * the massage key so that it can reload the message from the store quickly when needed.
    */
-  class Swapped(override val message_key:Long, override val size:Int) extends EntryState {
+  class Swapped(override val message_key:Long, override val size:Int, override val expiration:Long) extends EntryState {
 
     queue.individual_swapped_items += 1
 
@@ -1325,7 +1400,7 @@ class QueueEntry(val queue:Queue, val se
         queue.swapping_in_size -= size
       }
       queue.individual_swapped_items -= 1
-      state = new SwappedRange(seq, 1, size)
+      state = new SwappedRange(seq, 1, size, expiration)
     }
   }
 
@@ -1345,10 +1420,13 @@ class QueueEntry(val queue:Queue, val se
     /** the number of items in the range */
     var _count:Int,
     /** size in bytes of the range */
-    var _size:Int) extends EntryState {
+    var _size:Int,
+    var _expiration:Long) extends EntryState {
+
 
     override def count = _count
     override def size = _size
+    override def expiration = _expiration
 
     var swapping_in = false
 
@@ -1424,15 +1502,20 @@ class QueueEntry(val queue:Queue, val se
         assert(last < value.seq )
         last = value.seq
         _count += 1
-        _size += value.size
-        value.remove
       } else if( value.is_swapped_range ) {
         assert(last < value.seq )
         last = value.as_swapped_range.last
         _count += value.as_swapped_range.count
-        _size += value.size
-        value.remove
       }
+      if(_expiration == 0){
+        _expiration = value.expiration
+      } else {
+        if( value.expiration != 0 ) {
+          _expiration = value.expiration.min(_expiration)
+        }
+      }
+      _size += value.size
+      value.remove
     }
 
   }
@@ -1652,7 +1735,7 @@ class Subscription(val queue:Queue, val 
 
       queue.dequeue_item_counter += 1
       queue.dequeue_size_counter += entry.size
-      queue.dequeue_ts = queue.last_maintenance_ts
+      queue.dequeue_ts = queue.now
 
       // removes this entry from the acquired list.
       unlink()
@@ -1685,7 +1768,7 @@ class Subscription(val queue:Queue, val 
       // track for stats
       queue.nack_item_counter += 1
       queue.nack_size_counter += entry.size
-      queue.nack_ts = queue.last_maintenance_ts
+      queue.nack_ts = queue.now
 
       // The following does not need to get done for exclusive subs because
       // they end up rewinding all the sub of the head of the queue.

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1138361&r1=1138360&r2=1138361&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Wed Jun 22 09:43:39 2011
@@ -156,7 +156,7 @@ abstract class DeliveryProducerRoute(val
   // Dispatch.
   //
 
-  var pendingAck: (Boolean, StoreUOW)=>Unit = null
+  var pendingAck: (DeliveryResult, StoreUOW)=>Unit = null
   var overflow:Delivery=null
   var overflowSessions = List[DeliverySession]()
   var refiller:Runnable=null
@@ -207,11 +207,11 @@ abstract class DeliveryProducerRoute(val
       if (delivery.uow != null) {
         val ack = pendingAck
         delivery.uow.on_complete {
-          ack(true, null)
+          ack(Delivered, null)
         }
 
       } else {
-        pendingAck(true, null)
+        pendingAck(Delivered, null)
       }
       pendingAck==null
     }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala?rev=1138361&r1=1138360&r2=1138361&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala Wed Jun 22 09:43:39 2011
@@ -32,7 +32,8 @@ object PBSupport {
     pb.setProtocol(v.protocol)
     pb.setSize(v.size)
     pb.setValue(v.buffer)
-    pb.setExpiration(v.expiration)
+    if(v.expiration!=0)
+      pb.setExpiration(v.expiration)
     pb.freeze
   }
 
@@ -84,7 +85,10 @@ object PBSupport {
     pb.setMessageKey(v.message_key)
     pb.setAttachment(v.attachment)
     pb.setSize(v.size)
-    pb.setRedeliveries(v.redeliveries)
+    if(v.expiration!=0)
+      pb.setExpiration(v.expiration)
+    if(v.redeliveries!=0)
+      pb.setRedeliveries(v.redeliveries)
     pb.freeze
   }
 
@@ -95,6 +99,7 @@ object PBSupport {
     rc.message_key = pb.getMessageKey
     rc.attachment = pb.getAttachment
     rc.size = pb.getSize
+    rc.expiration = pb.getExpiration
     rc.redeliveries = pb.getRedeliveries.toShort
     rc
   }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRange.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRange.scala?rev=1138361&r1=1138360&r2=1138361&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRange.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRange.scala Wed Jun 22 09:43:39 2011
@@ -25,4 +25,5 @@ class QueueEntryRange {
   var last_entry_seq = 0L
   var count = 0
   var size = 0
+  var expiration = 0L
 }
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala?rev=1138361&r1=1138360&r2=1138361&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala Wed Jun 22 09:43:39 2011
@@ -31,6 +31,7 @@ class QueueEntryRecord {
   var message_key = 0L
   var attachment:Buffer = _
   var size = 0
+  var expiration = 0L
   var redeliveries:Short = 0
 
 }

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueMetricsDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueMetricsDTO.java?rev=1138361&r1=1138360&r2=1138361&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueMetricsDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueMetricsDTO.java Wed Jun 22 09:43:39 2011
@@ -84,6 +84,25 @@ public class QueueMetricsDTO {
     public long dequeue_ts;
 
     /**
+     * The number of messages which expired before they could be processed.
+     */
+    @XmlAttribute(name="expired_item_counter")
+    public long expired_item_counter;
+
+    /**
+     * The total size in bytes of messages which expired before
+     * they could be processed.
+     */
+    @XmlAttribute(name="expired_size_counter")
+    public long expired_size_counter;
+
+    /**
+     * The time stamp of when the last message expiration occurred.
+     */
+    @XmlAttribute(name="expired_ts")
+    public long expired_ts;
+
+    /**
      * The number of messages that were delivered to
      * a consumer but which the consumer did not successfully process.
      */

Modified: activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala?rev=1138361&r1=1138360&r2=1138361&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala Wed Jun 22 09:43:39 2011
@@ -311,6 +311,16 @@ class HawtDBClient(hawtDBStore: HawtDBSt
             group.last_entry_seq = entry.getKey.longValue
             group.count += 1
             group.size += entry.getValue.getSize
+
+// TODO:
+//            if(group.expiration == 0){
+//              group.expiration = entry.expiration
+//            } else {
+//              if( entry.expiration != 0 ) {
+//                group.expiration = entry.expiration.min(group.expiration)
+//              }
+//            }
+
             if( group.count == limit) {
               rc += group
               group = null

Modified: activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala?rev=1138361&r1=1138360&r2=1138361&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala Wed Jun 22 09:43:39 2011
@@ -427,6 +427,14 @@ class JDBM2Client(store: JDBM2Store) {
       group.count += 1
       group.size += entry.size
 
+      if(group.expiration == 0){
+        group.expiration = entry.expiration
+      } else {
+        if( entry.expiration != 0 ) {
+          group.expiration = entry.expiration.min(group.expiration)
+        }
+      }
+
       if( group.count == limit) {
         rc += group
         group = null

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1138361&r1=1138360&r2=1138361&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Wed Jun 22 09:43:39 2011
@@ -853,9 +853,9 @@ class OpenwireProtocolHandler extends Pr
     object ack_handler {
 
       // TODO: Need to validate all the range ack cases...
-      var consumer_acks = ListBuffer[(MessageId, (Boolean, StoreUOW)=>Unit)]()
+      var consumer_acks = ListBuffer[(MessageId, (DeliveryResult, StoreUOW)=>Unit)]()
 
-      def track(id:MessageId, callback:(Boolean, StoreUOW)=>Unit) = {
+      def track(id:MessageId, callback:(DeliveryResult, StoreUOW)=>Unit) = {
         queue {
           consumer_acks += (( id, callback ))
         }
@@ -881,7 +881,7 @@ class OpenwireProtocolHandler extends Pr
           consumer_acks = not_acked
           acked.foreach{case (_, callback)=>
             if( callback!=null ) {
-              callback(true, uow)
+              callback(Delivered, uow)
             }
           }
         }

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=1138361&r1=1138360&r2=1138361&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala Wed Jun 22 09:43:39 2011
@@ -58,7 +58,7 @@ case class StompFrameMessage(frame:Stomp
    * a positive value indicates that the delivery has an expiration
    * time.
    */
-  var expiration: Long = -1;
+  var expiration: Long = 0;
 
   /**
    * true if the delivery is persistent
@@ -71,7 +71,7 @@ case class StompFrameMessage(frame:Stomp
         id = value
       case (PRIORITY, value) =>
         priority = java.lang.Integer.parseInt(value).toByte
-      case (EXPIRATION_TIME, value) =>
+      case (EXPIRES, value) =>
         expiration = java.lang.Long.parseLong(value)
       case (PERSISTENT, value) =>
         persistent = java.lang.Boolean.parseBoolean(value)
@@ -370,7 +370,7 @@ object Stomp {
   val DESTINATION = ascii("destination")
   val CORRELATION_ID = ascii("correlation-id")
   val REPLY_TO = ascii("reply-to")
-  val EXPIRATION_TIME = ascii("expires")
+  val EXPIRES = ascii("expires")
   val PRIORITY = ascii("priority")
   val TYPE = ascii("type")
   val PERSISTENT = ascii("persistent")

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1138361&r1=1138360&r2=1138361&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Wed Jun 22 09:43:39 2011
@@ -126,24 +126,24 @@ class StompProtocolHandler extends Proto
 
   trait AckHandler {
     def track(delivery:Delivery):Unit
-    def perform_ack(consumed:Boolean, msgid: AsciiBuffer, uow:StoreUOW=null):Unit
+    def perform_ack(consumed:DeliveryResult, msgid: AsciiBuffer, uow:StoreUOW=null):Unit
   }
 
   class AutoAckHandler extends AckHandler {
     def track(delivery:Delivery) = {
       if( delivery.ack!=null ) {
-        delivery.ack(true, null)
+        delivery.ack(Delivered, null)
       }
     }
 
-    def perform_ack(consumed:Boolean, msgid: AsciiBuffer, uow:StoreUOW=null) = {
+    def perform_ack(consumed:DeliveryResult, msgid: AsciiBuffer, uow:StoreUOW=null) = {
       async_die("The subscription ack mode does not expect ACK or NACK frames")
     }
 
   }
 
   class SessionAckHandler extends AckHandler{
-    var consumer_acks = ListBuffer[(AsciiBuffer, (Boolean, StoreUOW)=>Unit)]()
+    var consumer_acks = ListBuffer[(AsciiBuffer, (DeliveryResult, StoreUOW)=>Unit)]()
 
     def track(delivery:Delivery) = {
       queue.apply {
@@ -157,7 +157,7 @@ class StompProtocolHandler extends Proto
     }
 
 
-    def perform_ack(consumed:Boolean, msgid: AsciiBuffer, uow:StoreUOW=null) = {
+    def perform_ack(consumed:DeliveryResult, msgid: AsciiBuffer, uow:StoreUOW=null) = {
 
       // session acks ack all previously recieved messages..
       var found = false
@@ -190,7 +190,7 @@ class StompProtocolHandler extends Proto
 
   }
   class MessageAckHandler extends AckHandler {
-    var consumer_acks = HashMap[AsciiBuffer, (Boolean, StoreUOW)=>Unit]()
+    var consumer_acks = HashMap[AsciiBuffer, (DeliveryResult, StoreUOW)=>Unit]()
 
     def track(delivery:Delivery) = {
       queue.apply {
@@ -202,7 +202,7 @@ class StompProtocolHandler extends Proto
       }
     }
 
-    def perform_ack(consumed:Boolean, msgid: AsciiBuffer, uow:StoreUOW=null) = {
+    def perform_ack(consumed:DeliveryResult, msgid: AsciiBuffer, uow:StoreUOW=null) = {
       consumer_acks.remove(msgid) match {
         case Some(ack) =>
           if( ack!=null ) {
@@ -1012,14 +1012,14 @@ class StompProtocolHandler extends Proto
   }
 
   def on_stomp_ack(frame:StompFrame):Unit = {
-    on_stomp_ack(frame.headers, true)
+    on_stomp_ack(frame.headers, Delivered)
   }
 
   def on_stomp_nack(frame:StompFrame):Unit = {
-    on_stomp_ack(frame.headers, false)
+    on_stomp_ack(frame.headers, Undelivered)
   }
 
-  def on_stomp_ack(headers:HeaderMap, consumed:Boolean):Unit = {
+  def on_stomp_ack(headers:HeaderMap, consumed:DeliveryResult):Unit = {
     val messageId = get(headers, MESSAGE_ID).getOrElse(die("message id header not set"))
 
     val subscription_id = get(headers, SUBSCRIPTION);

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1138361&r1=1138360&r2=1138361&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala Wed Jun 22 09:43:39 2011
@@ -1508,3 +1508,46 @@ class CustomStompWildcardTest extends St
   override val broker_config_uri: String = "xml:classpath:apollo-stomp-custom-dest-delimiters.xml"
   override def path_separator = "/"
 }
+
+class StompExpirationTest extends StompTestSupport {
+
+  def path_separator = "."
+
+  test("Messages Expire") {
+    connect("1.1")
+
+    def put(msg:String, ttl:Option[Long]=None) = {
+      val expires_header = ttl.map("expires:"+System.currentTimeMillis()+_+"\n").getOrElse("")
+      client.write(
+        "SEND\n" +
+        expires_header +
+        "destination:/queue/exp\n" +
+        "\n" +
+        "message:"+msg+"\n")
+    }
+
+    put("1")
+    put("2", Some(1000L))
+    put("3")
+
+    Thread.sleep(2000)
+
+    client.write(
+      "SUBSCRIBE\n" +
+      "destination:/queue/exp\n" +
+      "id:1\n" +
+      "receipt:0\n"+
+      "\n")
+    wait_for_receipt("0")
+
+
+    def get(dest:String) = {
+      val frame = client.receive()
+      frame should startWith("MESSAGE\n")
+      frame should endWith("\n\nmessage:%s\n".format(dest))
+    }
+
+    get("1")
+    get("3")
+  }
+}

Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala?rev=1138361&r1=1138360&r2=1138361&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala Wed Jun 22 09:43:39 2011
@@ -182,6 +182,10 @@ case class BrokerResource() extends Reso
       rc.nack_size_counter += q.nack_size_counter
       rc.nack_ts = rc.nack_ts max q.nack_ts
 
+      rc.expired_item_counter += q.expired_item_counter
+      rc.expired_size_counter += q.expired_size_counter
+      rc.expired_ts = rc.expired_ts max q.expired_ts
+
       rc.queue_size += q.queue_size
       rc.queue_items += q.queue_items
 
@@ -699,6 +703,10 @@ case class BrokerResource() extends Reso
     rc.nack_size_counter = q.nack_size_counter
     rc.nack_ts = q.nack_ts
 
+    rc.expired_item_counter = q.expired_item_counter
+    rc.expired_size_counter = q.expired_size_counter
+    rc.expired_ts = q.expired_ts
+
     rc.queue_size = q.queue_size
     rc.queue_items = q.queue_items
 

Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade?rev=1138361&r1=1138360&r2=1138361&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade Wed Jun 22 09:43:39 2011
@@ -54,6 +54,8 @@ p dequeued: #{metrics.dequeue_item_count
 
 p nacked: #{metrics.nack_item_counter} messages (#{memory(metrics.nack_size_counter)}), #{uptime(metrics.nack_ts)} ago
 
+p expired: #{metrics.expired_item_counter} messages (#{memory(metrics.expired_size_counter)}), #{uptime(metrics.expired_ts)} ago
+
 h2 Swap Metrics
 
 p swapped in: #{metrics.swapped_in_items} messages #{memory(metrics.swapped_in_size)}

Modified: activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md?rev=1138361&r1=1138360&r2=1138361&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md (original)
+++ activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md Wed Jun 22 09:43:39 2011
@@ -1098,6 +1098,22 @@ ack mode to consume reliable messages. A
 client which have not been acked when the client disconnects will get
 redelivered to another subscribed client.
 
+### Message Expiration
+
+${project_name} supports expiring old messages.  Unconsumed expired messages 
+are automatically removed from the queue.  You just need to specify when
+the message expires by setting the `expires` message header.  The expiration
+time must be specified as the number of milliseconds since the Unix epoch.
+
+Example:
+
+    SEND
+    destination:/queue/a
+    expires:1308690148000
+
+    this message will expire on Tue Jun 21 17:02:28 EDT 2011
+    ^@
+
 ### Topic Durable Subscriptions
 
 A durable subscription is a queue which is subscribed to a topic so that

Modified: activemq/activemq-apollo/trunk/apollo-website/src/index.page
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-website/src/index.page?rev=1138361&r1=1138360&r2=1138361&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-website/src/index.page (original)
+++ activemq/activemq-apollo/trunk/apollo-website/src/index.page Wed Jun 22 09:43:39 2011
@@ -43,6 +43,7 @@ ActiveMQ 5.x JMS clients.
 * [Queue Browsers](documentation/user-manual.html#Browsing_Subscriptions)
 * [Durable Subscriptions on Topics](documentation/user-manual.html#Topic_Durable_Subscriptions)
 * [Reliable Messaging](documentation/user-manual.html#Reliable_Messaging)
+* [Message Expiration](documentation/user-manual.html#Message_Expiration)
 * [Message Swapping](documentation/architecture.html#Message_Swapping)
 * [Message Selectors](documentation/user-manual.html#Message_Selectors)
 * [JAAS Authentication](documentation/user-manual.html#Authentication)