You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/02/06 18:36:05 UTC
svn commit: r1241090 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/
apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/ apoll...
Author: chirino
Date: Mon Feb 6 17:36:05 2012
New Revision: 1241090
URL: http://svn.apache.org/viewvc?rev=1241090&view=rev
Log:
Fixes APLO-152 - Support an option to have topics retain the last message sent to it
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1241090&r1=1241089&r2=1241090&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Mon Feb 6 17:36:05 2012
@@ -212,6 +212,12 @@ class Delivery {
*/
var ack:(DeliveryResult, StoreUOW)=>Unit = null
+ /**
+ * Should this message get retained as the last image of a topic?
+ */
+ var retain = false
+
+
def copy() = (new Delivery).set(this)
def set(other:Delivery) = {
@@ -222,6 +228,7 @@ class Delivery {
storeKey = other.storeKey
storeLocator = other.storeLocator
redeliveries = other.redeliveries
+ retain = other.retain
this
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1241090&r1=1241089&r2=1241090&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala Mon Feb 6 17:36:05 2012
@@ -39,6 +39,9 @@ class Topic(val router:LocalRouter, val
val resource_kind =SecuredResource.TopicKind
var proxy_sessions = new HashSet[DeliverySession]()
+ @transient
+ var retained_message: Delivery = _
+
implicit def from_link(from:LinkDTO):(Long,Long,Long)=(from.enqueue_item_counter, from.enqueue_size_counter, from.enqueue_ts)
implicit def from_session(from:DeliverySession):(Long,Long,Long)=(from.enqueue_item_counter, from.enqueue_size_counter, from.enqueue_ts)
@@ -83,10 +86,15 @@ class Topic(val router:LocalRouter, val
var enqueue_item_counter = 0L
var refiller:Runnable = null
+
def offer(value: Delivery) = {
enqueue_item_counter += 1
enqueue_size_counter += value.size
enqueue_ts = now
+ if( value.retain ) {
+ // TODO: perhaps persist this message reference
+ retained_message = value;
+ }
true
}
@@ -374,6 +382,24 @@ class Topic(val router:LocalRouter, val
}
}
+ val r = retained_message
+ if (r != null) {
+ val copy = r.copy()
+ copy.sender = address
+
+ val producer = new DeliveryProducerRoute(router) {
+ val dispatch_queue = createQueue()
+ override protected def on_connected = {
+ copy.ack = (d,x) => consumer.dispatch_queue {
+ unbind(consumer :: Nil)
+ }
+ offer(copy) // producer supports 1 message overflow.
+ }
+ }
+ producer.bind(consumer :: Nil)
+ producer.connected()
+ }
+
val proxy = ProxyDeliveryConsumer(target, link, consumer)
consumers.put(consumer, proxy)
topic_metrics.consumer_counter += 1
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=1241090&r1=1241089&r2=1241090&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala Mon Feb 6 17:36:05 2012
@@ -372,6 +372,7 @@ object Stomp {
val PRIORITY = ascii("priority")
val TYPE = ascii("type")
val PERSISTENT = ascii("persistent")
+ val RETAIN = ascii("retain")
val MESSAGE_ID = ascii("message-id")
val PRORITY = ascii("priority")
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1241090&r1=1241089&r2=1241090&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Mon Feb 6 17:36:05 2012
@@ -1131,6 +1131,7 @@ class StompProtocolHandler extends Proto
delivery.message = message
delivery.size = message.frame.size
delivery.uow = uow
+ delivery.retain = get(frame.headers, RETAIN).map( _ == TRUE).getOrElse(false)
if( receipt!=null ) {
delivery.ack = { (consumed, uow) =>
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1241090&r1=1241089&r2=1241090&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala Mon Feb 6 17:36:05 2012
@@ -90,21 +90,23 @@ class StompTestSupport extends FunSuiteS
val receipt_counter = new AtomicLong()
- def sync_send(dest:String, body:Any, c: StompClient = client) = {
+ def sync_send(dest:String, body:Any, headers:String="", c:StompClient = client) = {
val rid = receipt_counter.incrementAndGet()
c.write(
"SEND\n" +
"destination:"+dest+"\n" +
"receipt:"+rid+"\n" +
+ headers+
"\n" +
body)
wait_for_receipt(""+rid, c)
}
- def async_send(dest:String, body:Any, c: StompClient = client) = {
+ def async_send(dest:String, body:Any, headers:String="", c: StompClient = client) = {
c.write(
"SEND\n" +
"destination:"+dest+"\n" +
+ headers+
"\n" +
body)
}
@@ -496,6 +498,16 @@ class Stomp11HeartBeatTest extends Stomp
}
class StompDestinationTest extends StompTestSupport {
+ test("Topic remembers retained messages sent before before subscription is established") {
+ connect("1.1")
+ sync_send("/topic/retained-example", 1, "retain:true\n")
+ sync_send("/topic/retained-example", 2, "retain:true\n")
+ subscribe("0", "/topic/retained-example")
+ assert_received(2)
+ sync_send("/topic/retained-example", 3, "retain:true\n")
+ assert_received(3)
+ }
+
// This is the test case for https://issues.apache.org/jira/browse/APLO-88
test("ACK then socket close with/without DISCONNECT, should still ACK") {
for(i <- 1 until 3) {
Modified: activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md?rev=1241090&r1=1241089&r2=1241090&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md (original)
+++ activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md Mon Feb 6 17:36:05 2012
@@ -1135,6 +1135,23 @@ Example STOMP frame sending to a queue:
hello queue a
^@
+### Topic Retained Messages
+
+If a message sent to a Topic has the `retain:true` header, then
+the message will be 'remembered' by the topic so that if a new
+subscription arrives, the last retained message is sent
+to the subscription. For example if you want a topic
+to remember the last price published you can send a message
+that looks like:
+
+ SEND
+ destination:/topic/stock/IBM
+ retain:true
+
+ 112.12
+ ^@
+
+Note: retained messages are not retained between broker restarts.
### Reliable Messaging