You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/12/18 16:39:42 UTC
svn commit: r1050651 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/
apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/ apoll...
Author: chirino
Date: Sat Dec 18 15:39:41 2010
New Revision: 1050651
URL: http://svn.apache.org/viewvc?rev=1050651&view=rev
Log:
exclusive subscriptions implemented.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1050651&r1=1050650&r2=1050651&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Sat Dec 18 15:39:41 2010
@@ -57,6 +57,7 @@ trait DeliveryConsumer extends Retained
def connection:Option[BrokerConnection] = None
def browser = false
+ def exclusive = false
def dispatchQueue:DispatchQueue;
def matches(message:Delivery):Boolean
def connect(producer:DeliveryProducer):DeliverySession
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1050651&r1=1050650&r2=1050651&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Sat Dec 18 15:39:41 2010
@@ -51,6 +51,7 @@ class Queue(val host: VirtualHost, var i
var inbound_sessions = Set[DeliverySession]()
var all_subscriptions = Map[DeliveryConsumer, Subscription]()
+ var exclusive_subscriptions = ListBuffer[Subscription]()
val filter = binding.message_filter
@@ -528,22 +529,20 @@ class Queue(val host: VirtualHost, var i
def bind(values: List[DeliveryConsumer]) = retaining(values) {
for (consumer <- values) {
- val subscription = new Subscription(this, consumer)
- all_subscriptions += consumer -> subscription
- addCapacity( tune_consumer_buffer )
+ val sub = new Subscription(this, consumer)
+ sub.open
}
} >>: dispatchQueue
- def unbind(values: List[DeliveryConsumer]) = releasing(values) {
+ def unbind(values: List[DeliveryConsumer]) = dispatchQueue {
for (consumer <- values) {
all_subscriptions.get(consumer) match {
case Some(subscription) =>
subscription.close
case None =>
}
-
}
- } >>: dispatchQueue
+ }
def disconnected() = throw new RuntimeException("unsupported")
@@ -1026,22 +1025,31 @@ class QueueEntry(val queue:Queue, val se
advancing += sub
} else {
- // Is the sub flow controlled?
- if( sub.full ) {
- // hold back: flow controlled
- heldBack += sub
+ // Find the the first exclusive target of the message
+ val exclusive_target = queue.exclusive_subscriptions.find( _.matches(delivery) )
+
+ // Is the current sub not the exclusive target?
+ if( exclusive_target.isDefined && (exclusive_target.get != sub) ) {
+ // advance: not interested.
+ advancing += sub
} else {
- // advance: accepted...
- acquiringSub = sub
- acquired = true
-
- val acquiredQueueEntry = sub.acquire(entry)
- val acquiredDelivery = delivery.copy
- acquiredDelivery.ack = (consumed, tx)=> {
- queue.ack_source.merge((acquiredQueueEntry, consumed, tx))
- }
+ // Is the sub flow controlled?
+ if( sub.full ) {
+ // hold back: flow controlled
+ heldBack += sub
+ } else {
+ // advance: accepted...
+ acquiringSub = sub
+ acquired = true
+
+ val acquiredQueueEntry = sub.acquire(entry)
+ val acquiredDelivery = delivery.copy
+ acquiredDelivery.ack = (consumed, tx)=> {
+ queue.ack_source.merge((acquiredQueueEntry, consumed, tx))
+ }
- assert(sub.offer(acquiredDelivery), "sub should have accepted, it had reported not full earlier.")
+ assert(sub.offer(acquiredDelivery), "sub should have accepted, it had reported not full earlier.")
+ }
}
}
}
@@ -1311,19 +1319,30 @@ class Subscription(val queue:Queue, val
}
def browser = session.consumer.browser
+ def exclusive = session.consumer.exclusive
// This opens up the consumer
- pos = queue.head_entry;
- assert(pos!=null)
+ def open() = {
+ consumer.retain
+ pos = queue.head_entry;
+ assert(pos!=null)
+
+ session = consumer.connect(this)
+ session.refiller = pos
+ queue.head_entry ::= this
- session = consumer.connect(this)
- session.refiller = pos
- queue.head_entry ::= this
-
- if( queue.serviceState.isStarted ) {
- // kick off the initial dispatch.
- refill_prefetch
- queue.dispatchQueue << queue.head_entry
+ queue.all_subscriptions += consumer -> this
+ queue.addCapacity( queue.tune_consumer_buffer )
+
+ if( exclusive ) {
+ queue.exclusive_subscriptions.append(this)
+ }
+
+ if( queue.serviceState.isStarted ) {
+ // kick off the initial dispatch.
+ refill_prefetch
+ queue.dispatchQueue << queue.head_entry
+ }
}
def close() = {
@@ -1331,9 +1350,11 @@ class Subscription(val queue:Queue, val
pos -= this
pos = null
+ queue.exclusive_subscriptions = queue.exclusive_subscriptions.filterNot( _ == this )
queue.all_subscriptions -= consumer
queue.addCapacity( - queue.tune_consumer_buffer )
+
// nack all the acquired entries.
var next = acquired.getHead
while( next !=null ) {
@@ -1342,9 +1363,15 @@ class Subscription(val queue:Queue, val
cur.nack // this unlinks the entry.
}
+ if( exclusive ) {
+ // rewind all the subs to the start of the queue.
+ queue.all_subscriptions.values.foreach(_.rewind(queue.head_entry))
+ }
+
session.refiller = NOOP
session.close
session = null
+ consumer.release
queue.trigger_swap
} else {}
@@ -1483,11 +1510,17 @@ class Subscription(val queue:Queue, val
queue.nack_item_counter += 1
queue.nack_size_counter += entry.size
- // rewind all the matching competing subs past the entry.. back to the entry
- queue.all_subscriptions.valuesIterator.foreach{ sub=>
- if( !sub.browser && entry.seq < sub.pos.seq && sub.matches(entry.as_loaded.delivery)) {
- sub.rewind(entry)
+ // The following does not need to get done for exclusive subs because
+ // they end up rewinding all the sub of the head of the queue.
+ if( !exclusive ) {
+
+ // rewind all the matching competing subs past the entry.. back to the entry
+ queue.all_subscriptions.valuesIterator.foreach{ sub=>
+ if( !sub.browser && entry.seq < sub.pos.seq && sub.matches(entry.as_loaded.delivery)) {
+ sub.rewind(entry)
+ }
}
+
}
unlink()
}
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1050651&r1=1050650&r2=1050651&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Sat Dec 18 15:39:41 2010
@@ -204,8 +204,8 @@ class StompProtocolHandler extends Proto
val ack_handler:AckHandler,
val selector:(String, BooleanExpression),
val binding:BindingDTO,
- override val browser:Boolean
-
+ override val browser:Boolean,
+ override val exclusive:Boolean
) extends BaseRetained with DeliveryConsumer {
val dispatchQueue = StompProtocolHandler.this.dispatchQueue
@@ -771,6 +771,7 @@ class StompProtocolHandler extends Proto
val topic = destination.domain == Router.TOPIC_DOMAIN
var persistent = get(headers, PERSISTENT).map( _ == TRUE ).getOrElse(false)
var browser = get(headers, BROWSER).map( _ == TRUE ).getOrElse(false)
+ var exclusive = get(headers, EXCLUSIVE).map( _ == TRUE ).getOrElse(false)
val ack = get(headers, ACK_MODE) match {
case None=> new AutoAckHandler
@@ -823,7 +824,7 @@ class StompProtocolHandler extends Proto
}
}
- val consumer = new StompConsumer(subscription_id, destination, ack, selector, binding, browser);
+ val consumer = new StompConsumer(subscription_id, destination, ack, selector, binding, browser, exclusive);
consumers += (id -> consumer)
if( binding==null ) {
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1050651&r1=1050650&r2=1050651&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala Sat Dec 18 15:39:41 2010
@@ -225,6 +225,128 @@ class Stomp11HeartBeatTest extends Stomp
class StompDestinationTest extends StompTestSupport {
+ test("Queues load balance across subscribers") {
+ connect("1.1")
+
+ // Connect to subscribers
+ client.write(
+ "SUBSCRIBE\n" +
+ "destination:/queue/load-balanced\n" +
+ "id:1\n" +
+ "\n")
+
+ client.write(
+ "SUBSCRIBE\n" +
+ "destination:/queue/load-balanced\n" +
+ "receipt:0\n"+
+ "id:2\n" +
+ "\n")
+
+ wait_for_receipt("0")
+
+ def put(id:Int) = {
+ client.write(
+ "SEND\n" +
+ "destination:/queue/load-balanced\n" +
+ "\n" +
+ "message:"+id+"\n")
+ }
+
+ for( i <- 0 until 4) {
+ put(i)
+ }
+
+ var sub1_counter=0
+ var sub2_counter=0
+
+ def get() = {
+ val frame = client.receive()
+ frame should startWith("MESSAGE\n")
+
+ if( frame.contains("subscription:1\n") ) {
+ sub1_counter += 1
+ } else if( frame.contains("subscription:2\n") ) {
+ sub2_counter += 1
+ }
+ }
+
+ for( i <- 0 until 4) {
+ get()
+ }
+
+ sub1_counter should be(2)
+ sub2_counter should be(2)
+
+ }
+
+ test("Queues do NOT load balance across exclusive subscribers") {
+ connect("1.1")
+
+ // Connect to subscribers
+ client.write(
+ "SUBSCRIBE\n" +
+ "destination:/queue/exclusive\n" +
+ "id:1\n" +
+ "\n")
+
+ client.write(
+ "SUBSCRIBE\n" +
+ "destination:/queue/exclusive\n" +
+ "exclusive:true\n"+
+ "receipt:0\n"+
+ "ack:client\n"+
+ "id:2\n" +
+ "\n")
+
+ wait_for_receipt("0")
+
+ def put(id:Int) = {
+ client.write(
+ "SEND\n" +
+ "destination:/queue/exclusive\n" +
+ "\n" +
+ "message:"+id+"\n")
+ }
+
+ for( i <- 0 until 4) {
+ put(i)
+ }
+
+ var sub1_counter=0
+ var sub2_counter=0
+
+ def get() = {
+ val frame = client.receive()
+ frame should startWith("MESSAGE\n")
+
+ if( frame.contains("subscription:1\n") ) {
+ sub1_counter += 1
+ } else if( frame.contains("subscription:2\n") ) {
+ sub2_counter += 1
+ }
+ }
+
+ for( i <- 0 until 4) {
+ get()
+ }
+
+ sub1_counter should be(0)
+ sub2_counter should be(4)
+
+ // disconnect the exclusive subscriber.
+ client.write(
+ "UNSUBSCRIBE\n" +
+ "id:2\n" +
+ "\n")
+
+ // sub 1 should now get all the messages.
+ for( i <- 0 until 4) {
+ get()
+ }
+ sub1_counter should be(4)
+
+ }
+
test("Queue browsers don't consume the messages") {
connect("1.1")
@@ -252,7 +374,6 @@ class StompDestinationTest extends Stomp
def get(sub:Int, id:Int) = {
val frame = client.receive()
- info(frame)
frame should startWith("MESSAGE\n")
frame should include ("subscription:%d\n".format(sub))
frame should endWith regex("\n\nmessage:%d\n".format(id))
@@ -302,7 +423,6 @@ class StompDestinationTest extends Stomp
def get(id:Int) = {
val frame = client.receive()
- info(frame)
frame should startWith("MESSAGE\n")
frame should include ("subscription:0\n")
frame should endWith regex("\n\nmessage:"+id+"\n")
@@ -337,7 +457,6 @@ class StompDestinationTest extends Stomp
def get(id:Int) = {
val frame = client.receive()
- info(frame)
frame should startWith("MESSAGE\n")
frame should include ("subscription:0\n")
frame should endWith regex("\n\nmessage:"+id+"\n")
@@ -387,7 +506,6 @@ class StompDestinationTest extends Stomp
def get(id:Int) = {
val frame = client.receive()
- info(frame)
frame should startWith("MESSAGE\n")
frame should include ("subscription:my-sub-name\n")
frame should endWith regex("\n\nmessage:"+id+"\n")
@@ -422,7 +540,6 @@ class StompDestinationTest extends Stomp
def get(id:Int) = {
val frame = client.receive()
- info(frame)
frame should startWith("MESSAGE\n")
frame should endWith regex("\n\nmessage:"+id+"\n")
}
@@ -457,7 +574,6 @@ class StompDestinationTest extends Stomp
def get(id:Int) = {
val frame = client.receive()
- info(frame)
frame should startWith("MESSAGE\n")
frame should endWith regex("\n\nmessage:"+id+"\n")
}
@@ -560,7 +676,6 @@ class StompTransactionTest extends Stomp
def get(id:Int) = {
val frame = client.receive()
- info(frame)
frame should startWith("MESSAGE\n")
frame should endWith regex("\n\nmessage:"+id+"\n")
}
@@ -606,7 +721,6 @@ class StompTransactionTest extends Stomp
def get(id:Int) = {
val frame = client.receive()
- info(frame)
frame should startWith("MESSAGE\n")
frame should endWith regex("\n\nmessage:"+id+"\n")
}
@@ -651,7 +765,6 @@ class StompAckModeTest extends StompTest
def get(id:Int) = {
val frame = client.receive()
- info(frame)
frame should startWith("MESSAGE\n")
frame should include ("subscription:0\n")
frame should include regex("message-id:.+?\n")
@@ -716,7 +829,6 @@ class StompAckModeTest extends StompTest
def get(id:Int) = {
val frame = client.receive()
- info(frame)
frame should startWith("MESSAGE\n")
frame should include ("subscription:0\n")
frame should include regex("message-id:.+?\n")
Modified: activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md?rev=1050651&r1=1050650&r2=1050651&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md (original)
+++ activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md Sat Dec 18 15:39:41 2010
@@ -760,13 +760,46 @@ ack mode to consume reliable messages. A
client which have not been acked when the client disconnects will get
redelivered to another subscribed client.
+### Topic Durable Subscriptions
+
+A durable subscription is a queue which is subscribed to a topic so that
+even if the client which created the durable subscription is not
+online, he can still get a copy of all the messages sent to the topic
+when he comes back online. Multiple clients can subscribe to the same
+durable subscription and since it's backed by a queue, those subscribers
+will have the topic's messages load balanced across them.
+
+To create or reattach to a a durable subscription with STOMP, you uniquely name
+the durable subscription using the `id` header on the `
+SCRIBE` frame and
+also adding a `persistent:true` header. Example:
+
+ SUBSCRIBE
+ id:mysub
+ persistent:true
+ destination:/topic/foo
+
+ ^@
+
+A standard `UNSUBSCRIBE` frame does not destroy the durable subscription, it
+only disconnects the client from the durable subscription. To destroy a
+durable subscription, you must once again add `persistent:true` header
+to the `UNSUBSCRIBE` frame. Example:
+
+ UNSUBSCRIBE
+ id:mysub
+ persistent:true
+
+ ^@
+
### Browsing Subscriptions
A normal subscription on a queue will consume messages so that no other
subscription will get a copy of the message. If you want to browse all the
messages on a queue in a non-destructive fashion, you can create browsing
-subscription. To make a a browsing subscription, just add the `browser:true`
-header to the `SUBSCRIBE` frame. For example:
+subscription. Browsing subscriptions also works with durable subscriptions
+since they are backed by a queue. To make a a browsing subscription, just add the
+`browser:true` header to the `SUBSCRIBE` frame. For example:
SUBSCRIBE
id:mysub
@@ -787,35 +820,37 @@ Example:
^@
-### Topic Durable Subscriptions
+### Exclusive Subscriptions
-A durable subscription is a queue which is subscribed to a topic so that
-even if the client which created the durable subscription is not
-online, he can still get a copy of all the messages sent to the topic
-when he comes back online. Multiple clients can subscribe to the same
-durable subscription and since it's backed by a queue, those subscribers
-will have the topic's messages load balanced across them.
+We maintain the order of messages in queues and dispatch them to
+subscriptions in order. However if you have multiple subscriptions consuming
+from the same queue, you will loose the guarantee of processing the messages
+in order; since the messages may be processed concurrently on different
+subscribers.
+
+Sometimes it is important to guarantee the order in which messages are
+processed. e.g. you don't want to process the update to an order until an
+insert has been done; or to go backwards in time, overwriting an newer update
+of an order with an older one etc.
+
+So what folks have to do in clusters is often to only run one consumer
+process in a cluster to avoid loosing the ordering. The problem with this is
+that if that process goes down, no one is processing the queue any more,
+which can be problem.
+
+${project_name} supports exclusive subscriptions which avoids the end user
+having to restrict himself to only running one process. The broker will pick
+a single subscription to get all the messages for a queue to ensure ordering.
+If that subscription fails, the broker will auto failover and choose another
+subscription.
-To create or reattach to a a durable subscription with STOMP, you uniquely name
-the durable subscription using the `id` header on the `
-SCRIBE` frame and
-also adding a `persistent:true` header. Example:
+An exclusive subscription is created by adding a `exclusive:true` header
+to the `SUBSCRIBE` frame. Example:
SUBSCRIBE
id:mysub
- persistent:true
- destination:/topic/foo
-
- ^@
-
-A standard `UNSUBSCRIBE` frame does not destroy the durable subscription, it
-only disconnects the client from the durable subscription. To destroy a
-durable subscription, you must once again add `persistent:true` header
-to the `UNSUBSCRIBE` frame. Example:
-
- UNSUBSCRIBE
- id:mysub
- persistent:true
+ exclusive:true
+ destination:/queue/foo
^@