You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/04/02 01:02:57 UTC
svn commit: r1308213 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/
apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/
Author: chirino
Date: Sun Apr 1 23:02:57 2012
New Revision: 1308213
URL: http://svn.apache.org/viewvc?rev=1308213&view=rev
Log:
Move the persistent and expiration properties from the Message trait to the Delivery class.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1308213&r1=1308212&r2=1308213&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Sun Apr 1 23:02:57 2012
@@ -96,22 +96,6 @@ trait DeliverySession extends SessionSin
trait Message extends Filterable with Retained {
/**
- * the message priority.
- */
- def priority:Byte
-
- /**
- * a positive value indicates that the delivery has an expiration
- * time.
- */
- def expiration: Long
-
- /**
- * true if the delivery is persistent
- */
- def persistent: Boolean
-
- /**
* The protocol of the message
*/
def protocol:Protocol
@@ -184,6 +168,16 @@ class Delivery {
var size:Int = 0
/**
+ * When the message will expire
+ */
+ var expiration:Long = 0
+
+ /**
+ * Is the delivery persistent?
+ */
+ var persistent:Boolean = false
+
+ /**
* the message being delivered
*/
var message: Message = null
@@ -231,6 +225,9 @@ class Delivery {
def set(other:Delivery) = {
sender = other.sender
size = other.size
+ persistent = other.persistent
+ expiration = other.expiration
+ size = other.size
seq = other.seq
message = other.message
storeKey = other.storeKey
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1308213&r1=1308212&r2=1308213&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Sun Apr 1 23:02:57 2012
@@ -517,7 +517,7 @@ class Queue(val router: LocalRouter, val
} else {
// Don't even enqueue if the message has expired or the queue has stopped.
- val expiration = delivery.message.expiration
+ val expiration = delivery.expiration
val expired = expiration != 0 && expiration <= now
if( !service_state.is_started || expired) {
@@ -1429,7 +1429,7 @@ class QueueEntry(val queue:Queue, val se
override def count = 1
override def size = delivery.size
- override def expiration = delivery.message.expiration
+ override def expiration = delivery.expiration
override def message_key = delivery.storeKey
override def message_locator = delivery.storeLocator
override def redelivery_count = delivery.redeliveries
@@ -1747,19 +1747,27 @@ class QueueEntry(val queue:Queue, val se
}
}
+
+ def to_delivery = {
+ val delivery = new Delivery()
+ delivery.seq = seq
+ delivery.size = size
+ delivery.persistent = true
+ delivery.expiration = expiration
+ delivery.storeKey = message_key
+ delivery.storeLocator = message_locator
+ delivery.redeliveries = redelivery_count
+ delivery.sender = sender
+ delivery
+ }
+
def swapped_in(messageRecord:MessageRecord) = {
if( space!=null ) {
// debug("Loaded message seq: ", seq )
queue.swapping_in_size -= size
- val delivery = new Delivery()
- delivery.seq = seq
- delivery.size = size
+ val delivery = to_delivery
delivery.message = ProtocolFactory.get(messageRecord.protocol.toString).get.decode(messageRecord)
- delivery.storeKey = messageRecord.key
- delivery.storeLocator = messageRecord.locator
- delivery.redeliveries = redelivery_count
- delivery.sender = sender
space += delivery
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1308213&r1=1308212&r2=1308213&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Sun Apr 1 23:02:57 2012
@@ -294,15 +294,18 @@ abstract class DeliveryProducerRoute(rou
// only deliver to matching consumers
if( target.consumer.matches(copy) ) {
- if ( target.consumer.is_persistent && copy.message.persistent
- && copy.storeKey == -1L && store != null) {
+ if ( target.consumer.is_persistent && copy.persistent && store != null) {
+
if (copy.uow == null) {
copy.uow = store.create_uow
} else {
copy.uow.retain
}
- copy.storeLocator = new AtomicReference[Object]()
- copy.storeKey = copy.uow.store(copy.createMessageRecord)
+
+ if( copy.storeKey == -1L ) {
+ copy.storeLocator = new AtomicReference[Object]()
+ copy.storeKey = copy.uow.store(copy.createMessageRecord)
+ }
}
if( !target.offer(copy) ) {
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala?rev=1308213&r1=1308212&r2=1308213&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala Sun Apr 1 23:02:57 2012
@@ -41,12 +41,6 @@ class OpenwireMessage(val message:Active
def protocol = OpenwireProtocol
- def priority = message.getPriority
-
- def persistent = message.isPersistent
-
- def expiration = message.getExpiration
-
def getBodyAs[T](toType : Class[T]) = {
(message match {
case x:ActiveMQTextMessage =>
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1308213&r1=1308212&r2=1308213&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Sun Apr 1 23:02:57 2012
@@ -669,6 +669,8 @@ class OpenwireProtocolHandler extends Pr
// We may need to add some headers..
val delivery = new Delivery
delivery.message = new OpenwireMessage(message)
+ delivery.expiration = message.getExpiration
+ delivery.persistent = message.isPersistent
delivery.size = {
val rc = message.getEncodedSize
if( rc != 0 )
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1308213&r1=1308212&r2=1308213&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Sun Apr 1 23:02:57 2012
@@ -493,8 +493,11 @@ class StompProtocolHandler extends Proto
var frame = StompFrame(MESSAGE, headers, BufferContent(EMPTY_BUFFER))
val delivery = new Delivery()
- delivery.message = StompFrameMessage(frame)
+ var message = StompFrameMessage(frame)
+ delivery.message = message
delivery.size = frame.size
+ delivery.expiration = message.expiration
+ delivery.persistent = message.persistent
if( downstream.full ) {
// session is full so use an overflow sink so to hold the message,
@@ -1160,6 +1163,8 @@ class StompProtocolHandler extends Proto
val delivery = new Delivery
delivery.message = message
+ delivery.expiration = message.expiration
+ delivery.persistent = message.persistent
delivery.size = message.frame.size
delivery.uow = uow
get(frame.headers, RETAIN).foreach { retain =>
@@ -1349,7 +1354,7 @@ class StompProtocolHandler extends Proto
}
def on_stomp_nack(frame:StompFrame):Unit = {
- on_stomp_ack(frame.headers, Delivered)
+ on_stomp_ack(frame.headers, Poisoned)
}
def on_stomp_ack(headers:HeaderMap, consumed:DeliveryResult):Unit = {