You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/02/06 18:36:05 UTC

svn commit: r1241090 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/ apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/ apoll...

Author: chirino
Date: Mon Feb  6 17:36:05 2012
New Revision: 1241090

URL: http://svn.apache.org/viewvc?rev=1241090&view=rev
Log:
Fixes APLO-152 - Support an option to have topics retain the last message sent to it

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
    activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1241090&r1=1241089&r2=1241090&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Mon Feb  6 17:36:05 2012
@@ -212,6 +212,12 @@ class Delivery {
    */
   var ack:(DeliveryResult, StoreUOW)=>Unit = null
 
+  /**
+   * Should this message get retained as the last image of a topic?
+   */
+  var retain = false
+
+
   def copy() = (new Delivery).set(this)
 
   def set(other:Delivery) = {
@@ -222,6 +228,7 @@ class Delivery {
     storeKey = other.storeKey
     storeLocator = other.storeLocator
     redeliveries = other.redeliveries
+    retain = other.retain
     this
   }
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1241090&r1=1241089&r2=1241090&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala Mon Feb  6 17:36:05 2012
@@ -39,6 +39,9 @@ class Topic(val router:LocalRouter, val 
   val resource_kind =SecuredResource.TopicKind
   var proxy_sessions = new HashSet[DeliverySession]()
 
+  @transient
+  var retained_message: Delivery = _
+
   implicit def from_link(from:LinkDTO):(Long,Long,Long)=(from.enqueue_item_counter, from.enqueue_size_counter, from.enqueue_ts)
   implicit def from_session(from:DeliverySession):(Long,Long,Long)=(from.enqueue_item_counter, from.enqueue_size_counter, from.enqueue_ts)
 
@@ -83,10 +86,15 @@ class Topic(val router:LocalRouter, val 
     var enqueue_item_counter = 0L
     var refiller:Runnable = null
 
+
     def offer(value: Delivery) = {
       enqueue_item_counter += 1
       enqueue_size_counter += value.size
       enqueue_ts = now
+      if( value.retain ) {
+        // TODO: perhaps persist this message reference
+        retained_message = value;
+      }
       true
     }
 
@@ -374,6 +382,24 @@ class Topic(val router:LocalRouter, val 
         }
     }
 
+    val r = retained_message
+    if (r != null) {
+      val copy = r.copy()
+      copy.sender = address
+
+      val producer = new  DeliveryProducerRoute(router) {
+        val dispatch_queue = createQueue()
+        override protected def on_connected = {
+          copy.ack = (d,x) => consumer.dispatch_queue {
+            unbind(consumer :: Nil)
+          }
+          offer(copy) // producer supports 1 message overflow.
+        }
+      }
+      producer.bind(consumer :: Nil)
+      producer.connected()
+    }
+    
     val proxy = ProxyDeliveryConsumer(target, link, consumer)
     consumers.put(consumer, proxy)
     topic_metrics.consumer_counter += 1

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=1241090&r1=1241089&r2=1241090&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala Mon Feb  6 17:36:05 2012
@@ -372,6 +372,7 @@ object Stomp {
   val PRIORITY = ascii("priority")
   val TYPE = ascii("type")
   val PERSISTENT = ascii("persistent")
+  val RETAIN = ascii("retain")
 
   val MESSAGE_ID = ascii("message-id")
   val PRORITY = ascii("priority")

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1241090&r1=1241089&r2=1241090&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Mon Feb  6 17:36:05 2012
@@ -1131,6 +1131,7 @@ class StompProtocolHandler extends Proto
       delivery.message = message
       delivery.size = message.frame.size
       delivery.uow = uow
+      delivery.retain = get(frame.headers, RETAIN).map( _ == TRUE).getOrElse(false)
 
       if( receipt!=null ) {
         delivery.ack = { (consumed, uow) =>

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1241090&r1=1241089&r2=1241090&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala Mon Feb  6 17:36:05 2012
@@ -90,21 +90,23 @@ class StompTestSupport extends FunSuiteS
 
   val receipt_counter = new AtomicLong()
 
-  def sync_send(dest:String, body:Any, c: StompClient = client) = {
+  def sync_send(dest:String, body:Any, headers:String="", c:StompClient = client) = {
     val rid = receipt_counter.incrementAndGet()
     c.write(
       "SEND\n" +
       "destination:"+dest+"\n" +
       "receipt:"+rid+"\n" +
+      headers+
       "\n" +
       body)
     wait_for_receipt(""+rid, c)
   }
 
-  def async_send(dest:String, body:Any, c: StompClient = client) = {
+  def async_send(dest:String, body:Any, headers:String="", c: StompClient = client) = {
     c.write(
       "SEND\n" +
       "destination:"+dest+"\n" +
+      headers+
       "\n" +
       body)
   }
@@ -496,6 +498,16 @@ class Stomp11HeartBeatTest extends Stomp
 }
 class StompDestinationTest extends StompTestSupport {
 
+  test("Topic remembers retained messages sent before before subscription is established") {
+    connect("1.1")
+    sync_send("/topic/retained-example", 1, "retain:true\n")
+    sync_send("/topic/retained-example", 2, "retain:true\n")
+    subscribe("0", "/topic/retained-example")
+    assert_received(2)
+    sync_send("/topic/retained-example", 3, "retain:true\n")
+    assert_received(3)
+  }
+
   // This is the test case for https://issues.apache.org/jira/browse/APLO-88
   test("ACK then socket close with/without DISCONNECT, should still ACK") {
     for(i <- 1 until 3) {

Modified: activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md?rev=1241090&r1=1241089&r2=1241090&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md (original)
+++ activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md Mon Feb  6 17:36:05 2012
@@ -1135,6 +1135,23 @@ Example STOMP frame sending to a queue:
     hello queue a
     ^@
 
+### Topic Retained Messages
+
+If a message sent to a Topic has the `retain:true` header, then
+the message will be 'remembered' by the topic so that if a new
+subscription arrives, the last retained message is sent 
+to the subscription.  For example if you want a topic 
+to remember the last price published you can send a message 
+that looks like:
+
+    SEND
+    destination:/topic/stock/IBM
+    retain:true
+
+    112.12
+    ^@
+
+Note: retained messages are not retained between broker restarts.
 
 ### Reliable Messaging