You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/02/06 19:34:38 UTC

svn commit: r1241105 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/ apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/ apoll...

Author: chirino
Date: Mon Feb  6 18:34:38 2012
New Revision: 1241105

URL: http://svn.apache.org/viewvc?rev=1241105&view=rev
Log:
Further enhancements to APLO-152, you can now set or remove the topic retained message.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
    activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1241105&r1=1241104&r2=1241105&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Mon Feb  6 18:34:38 2012
@@ -163,6 +163,12 @@ object Expired extends DeliveryResult
  */
 object Poisoned extends DeliveryResult
 
+
+sealed trait RetainAction
+object RetainSet extends RetainAction
+object RetainRemove extends RetainAction
+object RetainIgnore extends RetainAction
+
 class Delivery {
 
   /**
@@ -215,7 +221,7 @@ class Delivery {
   /**
    * Should this message get retained as the last image of a topic?
    */
-  var retain = false
+  var retain:RetainAction = RetainIgnore
 
 
   def copy() = (new Delivery).set(this)

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1241105&r1=1241104&r2=1241105&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala Mon Feb  6 18:34:38 2012
@@ -91,9 +91,14 @@ class Topic(val router:LocalRouter, val 
       enqueue_item_counter += 1
       enqueue_size_counter += value.size
       enqueue_ts = now
-      if( value.retain ) {
-        // TODO: perhaps persist this message reference
-        retained_message = value;
+      value.retain match {
+        case RetainSet =>
+          // TODO: perhaps persist so that we can recall what was
+          // retained across broker restarts.
+          retained_message = value;
+        case RetainRemove =>
+          retained_message = null;
+        case _ =>
       }
       true
     }

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=1241105&r1=1241104&r2=1241105&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala Mon Feb  6 18:34:38 2012
@@ -373,6 +373,8 @@ object Stomp {
   val TYPE = ascii("type")
   val PERSISTENT = ascii("persistent")
   val RETAIN = ascii("retain")
+  val SET = ascii("set")
+  val REMOVE = ascii("remove")
 
   val MESSAGE_ID = ascii("message-id")
   val PRORITY = ascii("priority")

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1241105&r1=1241104&r2=1241105&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Mon Feb  6 18:34:38 2012
@@ -1131,7 +1131,13 @@ class StompProtocolHandler extends Proto
       delivery.message = message
       delivery.size = message.frame.size
       delivery.uow = uow
-      delivery.retain = get(frame.headers, RETAIN).map( _ == TRUE).getOrElse(false)
+      get(frame.headers, RETAIN).foreach { retain =>
+        delivery.retain = retain match {
+          case SET => RetainSet
+          case REMOVE => RetainRemove
+          case _ => RetainIgnore
+        }
+      }
 
       if( receipt!=null ) {
         delivery.ack = { (consumed, uow) =>

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1241105&r1=1241104&r2=1241105&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala Mon Feb  6 18:34:38 2012
@@ -498,14 +498,25 @@ class Stomp11HeartBeatTest extends Stomp
 }
 class StompDestinationTest extends StompTestSupport {
 
-  test("Topic remembers retained messages sent before before subscription is established") {
+  test("retain:set makes a topic remeber the message") {
     connect("1.1")
-    sync_send("/topic/retained-example", 1, "retain:true\n")
-    sync_send("/topic/retained-example", 2, "retain:true\n")
+    async_send("/topic/retained-example", 1)
+    async_send("/topic/retained-example", 2, "retain:set\n")
+    sync_send("/topic/retained-example", 3)
     subscribe("0", "/topic/retained-example")
     assert_received(2)
-    sync_send("/topic/retained-example", 3, "retain:true\n")
-    assert_received(3)
+    async_send("/topic/retained-example", 4)
+    assert_received(4)
+  }
+
+  test("retain:remove makes a topic forget the message") {
+    connect("1.1")
+    async_send("/topic/retained-example2", 1)
+    async_send("/topic/retained-example2", 2, "retain:set\n")
+    async_send("/topic/retained-example2", 3, "retain:remove\n")
+    subscribe("0", "/topic/retained-example2")
+    async_send("/topic/retained-example2", 4)
+    assert_received(4)
   }
 
   // This is the test case for https://issues.apache.org/jira/browse/APLO-88

Modified: activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md?rev=1241105&r1=1241104&r2=1241105&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md (original)
+++ activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md Mon Feb  6 18:34:38 2012
@@ -1137,7 +1137,7 @@ Example STOMP frame sending to a queue:
 
 ### Topic Retained Messages
 
-If a message sent to a Topic has the `retain:true` header, then
+If a message sent to a Topic has the `retain:set` header, then
 the message will be 'remembered' by the topic so that if a new
 subscription arrives, the last retained message is sent 
 to the subscription.  For example if you want a topic 
@@ -1151,6 +1151,9 @@ that looks like:
     112.12
     ^@
 
+You can also send a new message with the `retain:remove` header
+to have the topic forget about the last retained message.
+
 Note: retained messages are not retained between broker restarts.
 
 ### Reliable Messaging