You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/12/18 16:39:42 UTC

svn commit: r1050651 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/ apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/ apoll...

Author: chirino
Date: Sat Dec 18 15:39:41 2010
New Revision: 1050651

URL: http://svn.apache.org/viewvc?rev=1050651&view=rev
Log:
exclusive subscriptions implemented.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
    activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1050651&r1=1050650&r2=1050651&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Sat Dec 18 15:39:41 2010
@@ -57,6 +57,7 @@ trait DeliveryConsumer extends Retained 
   def connection:Option[BrokerConnection] = None
 
   def browser = false
+  def exclusive = false
   def dispatchQueue:DispatchQueue;
   def matches(message:Delivery):Boolean
   def connect(producer:DeliveryProducer):DeliverySession

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1050651&r1=1050650&r2=1050651&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Sat Dec 18 15:39:41 2010
@@ -51,6 +51,7 @@ class Queue(val host: VirtualHost, var i
 
   var inbound_sessions = Set[DeliverySession]()
   var all_subscriptions = Map[DeliveryConsumer, Subscription]()
+  var exclusive_subscriptions = ListBuffer[Subscription]()
 
   val filter = binding.message_filter
 
@@ -528,22 +529,20 @@ class Queue(val host: VirtualHost, var i
 
   def bind(values: List[DeliveryConsumer]) = retaining(values) {
     for (consumer <- values) {
-      val subscription = new Subscription(this, consumer)
-      all_subscriptions += consumer -> subscription
-      addCapacity( tune_consumer_buffer )
+      val sub = new Subscription(this, consumer)
+      sub.open
     }
   } >>: dispatchQueue
 
-  def unbind(values: List[DeliveryConsumer]) = releasing(values) {
+  def unbind(values: List[DeliveryConsumer]) = dispatchQueue {
     for (consumer <- values) {
       all_subscriptions.get(consumer) match {
         case Some(subscription) =>
           subscription.close
         case None =>
       }
-
     }
-  } >>: dispatchQueue
+  }
 
   def disconnected() = throw new RuntimeException("unsupported")
 
@@ -1026,22 +1025,31 @@ class QueueEntry(val queue:Queue, val se
               advancing += sub
             } else {
 
-              // Is the sub flow controlled?
-              if( sub.full ) {
-                // hold back: flow controlled
-                heldBack += sub
+              // Find the the first exclusive target of the message
+              val exclusive_target = queue.exclusive_subscriptions.find( _.matches(delivery) )
+
+              // Is the current sub not the exclusive target?
+              if( exclusive_target.isDefined && (exclusive_target.get != sub) ) {
+                // advance: not interested.
+                advancing += sub
               } else {
-                // advance: accepted...
-                acquiringSub = sub
-                acquired = true
-
-                val acquiredQueueEntry = sub.acquire(entry)
-                val acquiredDelivery = delivery.copy
-                acquiredDelivery.ack = (consumed, tx)=> {
-                  queue.ack_source.merge((acquiredQueueEntry, consumed, tx))
-                }
+                // Is the sub flow controlled?
+                if( sub.full ) {
+                  // hold back: flow controlled
+                  heldBack += sub
+                } else {
+                  // advance: accepted...
+                  acquiringSub = sub
+                  acquired = true
+
+                  val acquiredQueueEntry = sub.acquire(entry)
+                  val acquiredDelivery = delivery.copy
+                  acquiredDelivery.ack = (consumed, tx)=> {
+                    queue.ack_source.merge((acquiredQueueEntry, consumed, tx))
+                  }
 
-                assert(sub.offer(acquiredDelivery), "sub should have accepted, it had reported not full earlier.")
+                  assert(sub.offer(acquiredDelivery), "sub should have accepted, it had reported not full earlier.")
+                }
               }
             }
           }
@@ -1311,19 +1319,30 @@ class Subscription(val queue:Queue, val 
   }
 
   def browser = session.consumer.browser
+  def exclusive = session.consumer.exclusive
 
   // This opens up the consumer
-  pos = queue.head_entry;
-  assert(pos!=null)
+  def open() = {
+    consumer.retain
+    pos = queue.head_entry;
+    assert(pos!=null)
+
+    session = consumer.connect(this)
+    session.refiller = pos
+    queue.head_entry ::= this
 
-  session = consumer.connect(this)
-  session.refiller = pos
-  queue.head_entry ::= this
-
-  if( queue.serviceState.isStarted ) {
-    // kick off the initial dispatch.
-    refill_prefetch
-    queue.dispatchQueue << queue.head_entry
+    queue.all_subscriptions += consumer -> this
+    queue.addCapacity( queue.tune_consumer_buffer )
+
+    if( exclusive ) {
+      queue.exclusive_subscriptions.append(this)
+    }
+
+    if( queue.serviceState.isStarted ) {
+      // kick off the initial dispatch.
+      refill_prefetch
+      queue.dispatchQueue << queue.head_entry
+    }
   }
 
   def close() = {
@@ -1331,9 +1350,11 @@ class Subscription(val queue:Queue, val 
       pos -= this
       pos = null
 
+      queue.exclusive_subscriptions = queue.exclusive_subscriptions.filterNot( _ == this )
       queue.all_subscriptions -= consumer
       queue.addCapacity( - queue.tune_consumer_buffer )
 
+
       // nack all the acquired entries.
       var next = acquired.getHead
       while( next !=null ) {
@@ -1342,9 +1363,15 @@ class Subscription(val queue:Queue, val 
         cur.nack // this unlinks the entry.
       }
 
+      if( exclusive ) {
+        // rewind all the subs to the start of the queue.
+        queue.all_subscriptions.values.foreach(_.rewind(queue.head_entry))
+      }
+
       session.refiller = NOOP
       session.close
       session = null
+      consumer.release
 
       queue.trigger_swap
     } else {}
@@ -1483,11 +1510,17 @@ class Subscription(val queue:Queue, val 
       queue.nack_item_counter += 1
       queue.nack_size_counter += entry.size
 
-      // rewind all the matching competing subs past the entry.. back to the entry
-      queue.all_subscriptions.valuesIterator.foreach{ sub=>
-        if( !sub.browser && entry.seq < sub.pos.seq && sub.matches(entry.as_loaded.delivery)) {
-          sub.rewind(entry)
+      // The following does not need to get done for exclusive subs because
+      // they end up rewinding all the sub of the head of the queue.
+      if( !exclusive ) {
+
+        // rewind all the matching competing subs past the entry.. back to the entry
+        queue.all_subscriptions.valuesIterator.foreach{ sub=>
+          if( !sub.browser && entry.seq < sub.pos.seq && sub.matches(entry.as_loaded.delivery)) {
+            sub.rewind(entry)
+          }
         }
+
       }
       unlink()
     }

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1050651&r1=1050650&r2=1050651&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Sat Dec 18 15:39:41 2010
@@ -204,8 +204,8 @@ class StompProtocolHandler extends Proto
     val ack_handler:AckHandler,
     val selector:(String, BooleanExpression),
     val binding:BindingDTO,
-    override val browser:Boolean
-
+    override val browser:Boolean,
+    override val exclusive:Boolean
   ) extends BaseRetained with DeliveryConsumer {
 
     val dispatchQueue = StompProtocolHandler.this.dispatchQueue
@@ -771,6 +771,7 @@ class StompProtocolHandler extends Proto
     val topic = destination.domain == Router.TOPIC_DOMAIN
     var persistent = get(headers, PERSISTENT).map( _ == TRUE ).getOrElse(false)
     var browser = get(headers, BROWSER).map( _ == TRUE ).getOrElse(false)
+    var exclusive = get(headers, EXCLUSIVE).map( _ == TRUE ).getOrElse(false)
 
     val ack = get(headers, ACK_MODE) match {
       case None=> new AutoAckHandler
@@ -823,7 +824,7 @@ class StompProtocolHandler extends Proto
       }
     }
 
-    val consumer = new StompConsumer(subscription_id, destination, ack, selector, binding, browser);
+    val consumer = new StompConsumer(subscription_id, destination, ack, selector, binding, browser, exclusive);
     consumers += (id -> consumer)
 
     if( binding==null ) {

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1050651&r1=1050650&r2=1050651&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala Sat Dec 18 15:39:41 2010
@@ -225,6 +225,128 @@ class Stomp11HeartBeatTest extends Stomp
 
 class StompDestinationTest extends StompTestSupport {
 
+  test("Queues load balance across subscribers") {
+    connect("1.1")
+
+    // Connect to subscribers
+    client.write(
+      "SUBSCRIBE\n" +
+      "destination:/queue/load-balanced\n" +
+      "id:1\n" +
+      "\n")
+
+    client.write(
+      "SUBSCRIBE\n" +
+      "destination:/queue/load-balanced\n" +
+      "receipt:0\n"+
+      "id:2\n" +
+      "\n")
+
+    wait_for_receipt("0")
+
+    def put(id:Int) = {
+      client.write(
+        "SEND\n" +
+        "destination:/queue/load-balanced\n" +
+        "\n" +
+        "message:"+id+"\n")
+    }
+
+    for( i <- 0 until 4) {
+      put(i)
+    }
+
+    var sub1_counter=0
+    var sub2_counter=0
+
+    def get() = {
+      val frame = client.receive()
+      frame should startWith("MESSAGE\n")
+
+      if( frame.contains("subscription:1\n") ) {
+        sub1_counter += 1
+      } else if( frame.contains("subscription:2\n") ) {
+        sub2_counter += 1
+      }
+    }
+
+    for( i <- 0 until 4) {
+      get()
+    }
+
+    sub1_counter should be(2)
+    sub2_counter should be(2)
+
+  }
+
+  test("Queues do NOT load balance across exclusive subscribers") {
+    connect("1.1")
+
+    // Connect to subscribers
+    client.write(
+      "SUBSCRIBE\n" +
+      "destination:/queue/exclusive\n" +
+      "id:1\n" +
+      "\n")
+
+    client.write(
+      "SUBSCRIBE\n" +
+      "destination:/queue/exclusive\n" +
+      "exclusive:true\n"+
+      "receipt:0\n"+
+      "ack:client\n"+
+      "id:2\n" +
+      "\n")
+
+    wait_for_receipt("0")
+
+    def put(id:Int) = {
+      client.write(
+        "SEND\n" +
+        "destination:/queue/exclusive\n" +
+        "\n" +
+        "message:"+id+"\n")
+    }
+
+    for( i <- 0 until 4) {
+      put(i)
+    }
+
+    var sub1_counter=0
+    var sub2_counter=0
+
+    def get() = {
+      val frame = client.receive()
+      frame should startWith("MESSAGE\n")
+
+      if( frame.contains("subscription:1\n") ) {
+        sub1_counter += 1
+      } else if( frame.contains("subscription:2\n") ) {
+        sub2_counter += 1
+      }
+    }
+
+    for( i <- 0 until 4) {
+      get()
+    }
+
+    sub1_counter should be(0)
+    sub2_counter should be(4)
+
+    // disconnect the exclusive subscriber.
+    client.write(
+      "UNSUBSCRIBE\n" +
+      "id:2\n" +
+      "\n")
+
+    // sub 1 should now get all the messages.
+    for( i <- 0 until 4) {
+      get()
+    }
+    sub1_counter should be(4)
+
+  }
+
   test("Queue browsers don't consume the messages") {
     connect("1.1")
 
@@ -252,7 +374,6 @@ class StompDestinationTest extends Stomp
 
     def get(sub:Int, id:Int) = {
       val frame = client.receive()
-      info(frame)
       frame should startWith("MESSAGE\n")
       frame should include ("subscription:%d\n".format(sub))
       frame should endWith regex("\n\nmessage:%d\n".format(id))
@@ -302,7 +423,6 @@ class StompDestinationTest extends Stomp
 
     def get(id:Int) = {
       val frame = client.receive()
-      info(frame)
       frame should startWith("MESSAGE\n")
       frame should include ("subscription:0\n")
       frame should endWith regex("\n\nmessage:"+id+"\n")
@@ -337,7 +457,6 @@ class StompDestinationTest extends Stomp
 
     def get(id:Int) = {
       val frame = client.receive()
-      info(frame)
       frame should startWith("MESSAGE\n")
       frame should include ("subscription:0\n")
       frame should endWith regex("\n\nmessage:"+id+"\n")
@@ -387,7 +506,6 @@ class StompDestinationTest extends Stomp
 
     def get(id:Int) = {
       val frame = client.receive()
-      info(frame)
       frame should startWith("MESSAGE\n")
       frame should include ("subscription:my-sub-name\n")
       frame should endWith regex("\n\nmessage:"+id+"\n")
@@ -422,7 +540,6 @@ class StompDestinationTest extends Stomp
 
     def get(id:Int) = {
       val frame = client.receive()
-      info(frame)
       frame should startWith("MESSAGE\n")
       frame should endWith regex("\n\nmessage:"+id+"\n")
     }
@@ -457,7 +574,6 @@ class StompDestinationTest extends Stomp
 
     def get(id:Int) = {
       val frame = client.receive()
-      info(frame)
       frame should startWith("MESSAGE\n")
       frame should endWith regex("\n\nmessage:"+id+"\n")
     }
@@ -560,7 +676,6 @@ class StompTransactionTest extends Stomp
 
     def get(id:Int) = {
       val frame = client.receive()
-      info(frame)
       frame should startWith("MESSAGE\n")
       frame should endWith regex("\n\nmessage:"+id+"\n")
     }
@@ -606,7 +721,6 @@ class StompTransactionTest extends Stomp
 
     def get(id:Int) = {
       val frame = client.receive()
-      info(frame)
       frame should startWith("MESSAGE\n")
       frame should endWith regex("\n\nmessage:"+id+"\n")
     }
@@ -651,7 +765,6 @@ class StompAckModeTest extends StompTest
 
     def get(id:Int) = {
       val frame = client.receive()
-      info(frame)
       frame should startWith("MESSAGE\n")
       frame should include ("subscription:0\n")
       frame should include regex("message-id:.+?\n")
@@ -716,7 +829,6 @@ class StompAckModeTest extends StompTest
 
     def get(id:Int) = {
       val frame = client.receive()
-      info(frame)
       frame should startWith("MESSAGE\n")
       frame should include ("subscription:0\n")
       frame should include regex("message-id:.+?\n")

Modified: activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md?rev=1050651&r1=1050650&r2=1050651&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md (original)
+++ activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md Sat Dec 18 15:39:41 2010
@@ -760,13 +760,46 @@ ack mode to consume reliable messages. A
 client which have not been acked when the client disconnects will get
 redelivered to another subscribed client.
 
+### Topic Durable Subscriptions
+
+A durable subscription is a queue which is subscribed to a topic so that
+even if the client which created the durable subscription is not
+online, he can still get a copy of all the messages sent to the topic
+when he comes back online.  Multiple clients can subscribe to the same
+durable subscription and since it's backed by a queue, those subscribers
+will have the topic's messages load balanced across them.
+
+To create or reattach to a a durable subscription with STOMP, you uniquely name
+the durable subscription using the `id` header on the `
+SCRIBE` frame and
+also adding a `persistent:true` header. Example:
+
+    SUBSCRIBE
+    id:mysub
+    persistent:true
+    destination:/topic/foo
+    
+    ^@
+
+A standard `UNSUBSCRIBE` frame does not destroy the durable subscription, it 
+only disconnects the client from the durable subscription.  To destroy a 
+durable subscription, you must once again add `persistent:true` header
+to the `UNSUBSCRIBE` frame.  Example:
+
+    UNSUBSCRIBE
+    id:mysub
+    persistent:true
+    
+    ^@
+
 ### Browsing Subscriptions
 
 A normal subscription on a queue will consume messages so that no other
 subscription will get a copy of the message. If you want to browse all the
 messages on a queue in a non-destructive fashion, you can create browsing
-subscription. To make a a browsing subscription, just add the `browser:true`
-header to the `SUBSCRIBE` frame. For example:
+subscription. Browsing subscriptions also works with durable subscriptions
+since they are backed by a queue. To make a a browsing subscription, just add the
+`browser:true` header to the `SUBSCRIBE` frame. For example:
 
     SUBSCRIBE
     id:mysub
@@ -787,35 +820,37 @@ Example:
     
     ^@
 
-### Topic Durable Subscriptions
+### Exclusive Subscriptions
 
-A durable subscription is a queue which is subscribed to a topic so that
-even if the client which created the durable subscription is not
-online, he can still get a copy of all the messages sent to the topic
-when he comes back online.  Multiple clients can subscribe to the same
-durable subscription and since it's backed by a queue, those subscribers
-will have the topic's messages load balanced across them.
+We maintain the order of messages in queues and dispatch them to
+subscriptions in order. However if you have multiple subscriptions consuming
+from the same queue, you will loose the guarantee of processing the messages
+in order; since the messages may be processed concurrently on different
+subscribers.
+
+Sometimes it is important to guarantee the order in which messages are
+processed. e.g. you don't want to process the update to an order until an
+insert has been done; or to go backwards in time, overwriting an newer update
+of an order with an older one etc.
+
+So what folks have to do in clusters is often to only run one consumer
+process in a cluster to avoid loosing the ordering. The problem with this is
+that if that process goes down, no one is processing the queue any more,
+which can be problem.
+
+${project_name} supports exclusive subscriptions which avoids the end user
+having to restrict himself to only running one process. The broker will pick
+a single subscription to get all the messages for a queue to ensure ordering.
+If that subscription fails, the broker will auto failover and choose another
+subscription.
 
-To create or reattach to a a durable subscription with STOMP, you uniquely name
-the durable subscription using the `id` header on the `
-SCRIBE` frame and
-also adding a `persistent:true` header. Example:
+An exclusive subscription is created by adding a `exclusive:true` header
+to the `SUBSCRIBE` frame.  Example:
 
     SUBSCRIBE
     id:mysub
-    persistent:true
-    destination:/topic/foo
-    
-    ^@
-
-A standard `UNSUBSCRIBE` frame does not destroy the durable subscription, it 
-only disconnects the client from the durable subscription.  To destroy a 
-durable subscription, you must once again add `persistent:true` header
-to the `UNSUBSCRIBE` frame.  Example:
-
-    UNSUBSCRIBE
-    id:mysub
-    persistent:true
+    exclusive:true
+    destination:/queue/foo
     
     ^@