You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/02/06 19:34:38 UTC
svn commit: r1241105 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/
apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/ apoll...
Author: chirino
Date: Mon Feb 6 18:34:38 2012
New Revision: 1241105
URL: http://svn.apache.org/viewvc?rev=1241105&view=rev
Log:
Further enhancements to APLO-152, you can now set or remove the topic retained message.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1241105&r1=1241104&r2=1241105&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Mon Feb 6 18:34:38 2012
@@ -163,6 +163,12 @@ object Expired extends DeliveryResult
*/
object Poisoned extends DeliveryResult
+
+sealed trait RetainAction
+object RetainSet extends RetainAction
+object RetainRemove extends RetainAction
+object RetainIgnore extends RetainAction
+
class Delivery {
/**
@@ -215,7 +221,7 @@ class Delivery {
/**
* Should this message get retained as the last image of a topic?
*/
- var retain = false
+ var retain:RetainAction = RetainIgnore
def copy() = (new Delivery).set(this)
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1241105&r1=1241104&r2=1241105&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala Mon Feb 6 18:34:38 2012
@@ -91,9 +91,14 @@ class Topic(val router:LocalRouter, val
enqueue_item_counter += 1
enqueue_size_counter += value.size
enqueue_ts = now
- if( value.retain ) {
- // TODO: perhaps persist this message reference
- retained_message = value;
+ value.retain match {
+ case RetainSet =>
+ // TODO: perhaps persist so that we can recall what was
+ // retained across broker restarts.
+ retained_message = value;
+ case RetainRemove =>
+ retained_message = null;
+ case _ =>
}
true
}
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=1241105&r1=1241104&r2=1241105&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala Mon Feb 6 18:34:38 2012
@@ -373,6 +373,8 @@ object Stomp {
val TYPE = ascii("type")
val PERSISTENT = ascii("persistent")
val RETAIN = ascii("retain")
+ val SET = ascii("set")
+ val REMOVE = ascii("remove")
val MESSAGE_ID = ascii("message-id")
val PRORITY = ascii("priority")
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1241105&r1=1241104&r2=1241105&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Mon Feb 6 18:34:38 2012
@@ -1131,7 +1131,13 @@ class StompProtocolHandler extends Proto
delivery.message = message
delivery.size = message.frame.size
delivery.uow = uow
- delivery.retain = get(frame.headers, RETAIN).map( _ == TRUE).getOrElse(false)
+ get(frame.headers, RETAIN).foreach { retain =>
+ delivery.retain = retain match {
+ case SET => RetainSet
+ case REMOVE => RetainRemove
+ case _ => RetainIgnore
+ }
+ }
if( receipt!=null ) {
delivery.ack = { (consumed, uow) =>
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1241105&r1=1241104&r2=1241105&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala Mon Feb 6 18:34:38 2012
@@ -498,14 +498,25 @@ class Stomp11HeartBeatTest extends Stomp
}
class StompDestinationTest extends StompTestSupport {
- test("Topic remembers retained messages sent before before subscription is established") {
+ test("retain:set makes a topic remeber the message") {
connect("1.1")
- sync_send("/topic/retained-example", 1, "retain:true\n")
- sync_send("/topic/retained-example", 2, "retain:true\n")
+ async_send("/topic/retained-example", 1)
+ async_send("/topic/retained-example", 2, "retain:set\n")
+ sync_send("/topic/retained-example", 3)
subscribe("0", "/topic/retained-example")
assert_received(2)
- sync_send("/topic/retained-example", 3, "retain:true\n")
- assert_received(3)
+ async_send("/topic/retained-example", 4)
+ assert_received(4)
+ }
+
+ test("retain:remove makes a topic forget the message") {
+ connect("1.1")
+ async_send("/topic/retained-example2", 1)
+ async_send("/topic/retained-example2", 2, "retain:set\n")
+ async_send("/topic/retained-example2", 3, "retain:remove\n")
+ subscribe("0", "/topic/retained-example2")
+ async_send("/topic/retained-example2", 4)
+ assert_received(4)
}
// This is the test case for https://issues.apache.org/jira/browse/APLO-88
Modified: activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md?rev=1241105&r1=1241104&r2=1241105&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md (original)
+++ activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md Mon Feb 6 18:34:38 2012
@@ -1137,7 +1137,7 @@ Example STOMP frame sending to a queue:
### Topic Retained Messages
-If a message sent to a Topic has the `retain:true` header, then
+If a message sent to a Topic has the `retain:set` header, then
the message will be 'remembered' by the topic so that if a new
subscription arrives, the last retained message is sent
to the subscription. For example if you want a topic
@@ -1151,6 +1151,9 @@ that looks like:
112.12
^@
+You can also send a new message with the `retain:remove` header
+to have the topic forget about the last retained message.
+
Note: retained messages are not retained between broker restarts.
### Reliable Messaging