You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/04/02 01:02:57 UTC

svn commit: r1308213 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/ apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/

Author: chirino
Date: Sun Apr  1 23:02:57 2012
New Revision: 1308213

URL: http://svn.apache.org/viewvc?rev=1308213&view=rev
Log:
Move the persistent and expiration properties from the Message trait to the Delivery class.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1308213&r1=1308212&r2=1308213&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Sun Apr  1 23:02:57 2012
@@ -96,22 +96,6 @@ trait DeliverySession extends SessionSin
 trait Message extends Filterable with Retained {
 
   /**
-   *  the message priority.
-   */
-  def priority:Byte
-
-  /**
-   * a positive value indicates that the delivery has an expiration
-   * time.
-   */
-  def expiration: Long
-
-  /**
-   * true if the delivery is persistent
-   */
-  def persistent: Boolean
-
-  /**
    * The protocol of the message
    */
   def protocol:Protocol
@@ -184,6 +168,16 @@ class Delivery {
   var size:Int = 0
 
   /**
+   * When the message will expire
+   */
+  var expiration:Long = 0
+
+  /**
+   * Is the delivery persistent?
+   */
+  var persistent:Boolean = false
+
+  /**
    *  the message being delivered
    */
   var message: Message = null
@@ -231,6 +225,9 @@ class Delivery {
   def set(other:Delivery) = {
     sender = other.sender
     size = other.size
+    persistent = other.persistent
+    expiration = other.expiration
+    size = other.size
     seq = other.seq
     message = other.message
     storeKey = other.storeKey

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1308213&r1=1308212&r2=1308213&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Sun Apr  1 23:02:57 2012
@@ -517,7 +517,7 @@ class Queue(val router: LocalRouter, val
       } else {
 
         // Don't even enqueue if the message has expired or the queue has stopped.
-        val expiration = delivery.message.expiration
+        val expiration = delivery.expiration
         val expired = expiration != 0 && expiration <= now
 
         if( !service_state.is_started || expired) {
@@ -1429,7 +1429,7 @@ class QueueEntry(val queue:Queue, val se
 
     override def count = 1
     override def size = delivery.size
-    override def expiration = delivery.message.expiration
+    override def expiration = delivery.expiration
     override def message_key = delivery.storeKey
     override def message_locator = delivery.storeLocator
     override def redelivery_count = delivery.redeliveries
@@ -1747,19 +1747,27 @@ class QueueEntry(val queue:Queue, val se
       }
     }
 
+
+    def to_delivery = {
+      val delivery = new Delivery()
+      delivery.seq = seq
+      delivery.size = size
+      delivery.persistent = true
+      delivery.expiration = expiration
+      delivery.storeKey = message_key
+      delivery.storeLocator = message_locator
+      delivery.redeliveries = redelivery_count
+      delivery.sender = sender
+      delivery
+    }
+
     def swapped_in(messageRecord:MessageRecord) = {
       if( space!=null ) {
 //        debug("Loaded message seq: ", seq )
         queue.swapping_in_size -= size
 
-        val delivery = new Delivery()
-        delivery.seq = seq
-        delivery.size = size
+        val delivery = to_delivery
         delivery.message = ProtocolFactory.get(messageRecord.protocol.toString).get.decode(messageRecord)
-        delivery.storeKey = messageRecord.key
-        delivery.storeLocator = messageRecord.locator
-        delivery.redeliveries = redelivery_count
-        delivery.sender = sender
 
         space += delivery
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1308213&r1=1308212&r2=1308213&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Sun Apr  1 23:02:57 2012
@@ -294,15 +294,18 @@ abstract class DeliveryProducerRoute(rou
         // only deliver to matching consumers
         if( target.consumer.matches(copy) ) {
 
-          if ( target.consumer.is_persistent && copy.message.persistent
-                && copy.storeKey == -1L && store != null) {
+          if ( target.consumer.is_persistent && copy.persistent && store != null) {
+
             if (copy.uow == null) {
               copy.uow = store.create_uow
             } else {
               copy.uow.retain
             }
-            copy.storeLocator = new AtomicReference[Object]()
-            copy.storeKey = copy.uow.store(copy.createMessageRecord)
+
+            if( copy.storeKey == -1L ) {
+              copy.storeLocator = new AtomicReference[Object]()
+              copy.storeKey = copy.uow.store(copy.createMessageRecord)
+            }
           }
 
           if( !target.offer(copy) ) {

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala?rev=1308213&r1=1308212&r2=1308213&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala Sun Apr  1 23:02:57 2012
@@ -41,12 +41,6 @@ class OpenwireMessage(val message:Active
 
   def protocol = OpenwireProtocol
 
-  def priority = message.getPriority
-
-  def persistent = message.isPersistent
-
-  def expiration = message.getExpiration
-
   def getBodyAs[T](toType : Class[T]) = {
     (message match {
       case x:ActiveMQTextMessage =>

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1308213&r1=1308212&r2=1308213&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Sun Apr  1 23:02:57 2012
@@ -669,6 +669,8 @@ class OpenwireProtocolHandler extends Pr
       // We may need to add some headers..
       val delivery = new Delivery
       delivery.message = new OpenwireMessage(message)
+      delivery.expiration = message.getExpiration
+      delivery.persistent = message.isPersistent
       delivery.size = {
         val rc = message.getEncodedSize
         if( rc != 0 )

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1308213&r1=1308212&r2=1308213&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Sun Apr  1 23:02:57 2012
@@ -493,8 +493,11 @@ class StompProtocolHandler extends Proto
             var frame = StompFrame(MESSAGE, headers, BufferContent(EMPTY_BUFFER))
 
             val delivery = new Delivery()
-            delivery.message = StompFrameMessage(frame)
+            var message = StompFrameMessage(frame)
+            delivery.message = message
             delivery.size = frame.size
+            delivery.expiration = message.expiration
+            delivery.persistent = message.persistent
 
             if( downstream.full ) {
               // session is full so use an overflow sink so to hold the message,
@@ -1160,6 +1163,8 @@ class StompProtocolHandler extends Proto
 
       val delivery = new Delivery
       delivery.message = message
+      delivery.expiration = message.expiration
+      delivery.persistent = message.persistent
       delivery.size = message.frame.size
       delivery.uow = uow
       get(frame.headers, RETAIN).foreach { retain =>
@@ -1349,7 +1354,7 @@ class StompProtocolHandler extends Proto
   }
 
   def on_stomp_nack(frame:StompFrame):Unit = {
-    on_stomp_ack(frame.headers, Delivered)
+    on_stomp_ack(frame.headers, Poisoned)
   }
 
   def on_stomp_ack(headers:HeaderMap, consumed:DeliveryResult):Unit = {