You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/02/19 20:24:31 UTC
svn commit: r1447879 - in /activemq/activemq-apollo/trunk:
apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/ apollo-op...
Author: chirino
Date: Tue Feb 19 19:24:30 2013
New Revision: 1447879
URL: http://svn.apache.org/r1447879
Log:
Updated the router bind method to take a callback so that callers know when the subscription has been fully bound. This should avoid us needing some of sleep hacks in our tests.
Modified:
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala?rev=1447879&r1=1447878&r2=1447879&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala Tue Feb 19 19:24:30 2013
@@ -588,19 +588,20 @@ class AmqpProtocolHandler extends Protoc
consumers += (id -> consumer)
host.dispatch_queue {
- val rc = host.router.bind(consumer.addresses, consumer, security_context)
- queue {
- rc match {
- case Some(reason) =>
- consumers -= id
- consumer.release
- sender.setSource(null)
- close_with_error(sender, "amqp:not-found", reason)
- onComplete.run()
- case None =>
- set_attachment(sender, consumer)
- sender.open()
- onComplete.run()
+ host.router.bind(consumer.addresses, consumer, security_context) { rc =>
+ queue {
+ rc match {
+ case Some(reason) =>
+ consumers -= id
+ consumer.release
+ sender.setSource(null)
+ close_with_error(sender, "amqp:not-found", reason)
+ onComplete.run()
+ case None =>
+ set_attachment(sender, consumer)
+ sender.open()
+ onComplete.run()
+ }
}
}
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1447879&r1=1447878&r2=1447879&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala Tue Feb 19 19:24:30 2013
@@ -23,12 +23,12 @@ import path._
import path.PathParser.PathException
import java.util.concurrent.TimeUnit
import scala.Array
-import java.util.ArrayList
import collection.{Iterable, JavaConversions}
import security.SecuredResource.{TopicKind, QueueKind}
import security.{SecuredResource, SecurityContext}
import org.apache.activemq.apollo.dto._
import scala.collection.mutable.{HashSet, HashMap, LinkedHashMap}
+import java.util.concurrent.atomic.AtomicInteger
object DestinationMetricsSupport {
@@ -141,7 +141,7 @@ trait DomainDestination extends SecuredR
def browse(from_seq:Long, to:Option[Long], max:Long)(func: (BrowseResult)=>Unit):Unit
- def bind (bind_address:BindAddress, consumer:DeliveryConsumer):Unit
+ def bind (bind_address:BindAddress, consumer:DeliveryConsumer, on_bind:()=>Unit):Unit
def unbind (consumer:DeliveryConsumer, persistent:Boolean):Unit
def connect (connect_address:ConnectAddress, producer:BindableDeliveryProducer):Unit
@@ -288,7 +288,7 @@ class LocalRouter(val virtual_host:Virtu
consumers_by_path.get( path ).foreach { case (consumer_context, bind_address)=>
if( authorizer.can(consumer_context.security, bind_action(consumer_context.consumer), dest) ) {
consumer_context.matched_destinations += dest
- dest.bind(bind_address, consumer_context.consumer)
+ dest.bind(bind_address, consumer_context.consumer, ()=>{})
}
}
producers_by_path.get( path ).foreach { x=>
@@ -374,7 +374,7 @@ class LocalRouter(val virtual_host:Virtu
None
}
- def bind(bind_address:BindAddress, consumer:DeliveryConsumer, security:SecurityContext):Unit = {
+ def bind(bind_address:BindAddress, consumer:DeliveryConsumer, security:SecurityContext, on_bind:()=>Unit):Unit = {
val context = consumers.getOrElseUpdate(consumer, new ConsumerContext[D](consumer))
context.security = security
@@ -388,15 +388,26 @@ class LocalRouter(val virtual_host:Virtu
matches --= context.matched_destinations
context.matched_destinations ++= matches
+ val remaining = new AtomicInteger(1)
+ var bind_release:()=>Unit = ()=> {
+ if( remaining.decrementAndGet() == 0 ) {
+ on_bind()
+ }
+ }
+
matches.foreach { dest=>
if( authorizer.can(security, bind_action(consumer), dest) ) {
- dest.bind(bind_address, consumer)
+ remaining.incrementAndGet()
+ dest.bind(bind_address, consumer, bind_release)
for( l <- router_listeners) {
l.on_bind(dest, consumer, security)
}
}
}
+ bind_release()
+ } else {
+ on_bind()
}
}
@@ -694,7 +705,7 @@ class LocalRouter(val virtual_host:Virtu
"consume"
}
- override def bind(bind_address: BindAddress, consumer: DeliveryConsumer, security: SecurityContext) {
+ override def bind(bind_address: BindAddress, consumer: DeliveryConsumer, security: SecurityContext, on_bind: ()=>Unit) {
destination_by_id.get(bind_address.id).foreach { queue =>
// We may need to update the bindings...
@@ -716,12 +727,21 @@ class LocalRouter(val virtual_host:Virtu
case _ =>
}
+ val remaining = new AtomicInteger(1)
+ var bind_release:()=>Unit = ()=> {
+ if( remaining.decrementAndGet() == 0 ) {
+ on_bind()
+ }
+ }
+
if( authorizer.can(security, bind_action(consumer), queue) ) {
- queue.bind(bind_address, consumer)
+ remaining.incrementAndGet()
+ queue.bind(bind_address, consumer, bind_release)
for( l <- router_listeners) {
l.on_bind(queue, consumer, security)
}
}
+ bind_release();
}
}
@@ -748,7 +768,7 @@ class LocalRouter(val virtual_host:Virtu
if( queue.mirrored ) {
// hook up the queue to be a subscriber of the topic.
val topic = local_topic_domain.get_or_create_destination(SimpleAddress("topic", path), null).success
- topic.bind(SimpleAddress("queue", path), queue)
+ topic.bind(SimpleAddress("queue", path), queue, ()=>{})
}
}
@@ -1012,25 +1032,26 @@ class LocalRouter(val virtual_host:Virtu
def topic_domain:Domain[_ <: DomainDestination] = local_topic_domain
def dsub_domain:Domain[_ <: DomainDestination] = local_dsub_domain
- def bind(addresses: Array[_ <: BindAddress], consumer: DeliveryConsumer, security: SecurityContext):Option[String] = {
+ def bind(addresses: Array[_ <: BindAddress], consumer: DeliveryConsumer, security: SecurityContext)(cb: (Option[String])=>Unit):Unit = {
dispatch_queue.assertExecuting()
if(!virtual_host.service_state.is_started) {
- return Some("virtual host stopped.")
+ cb(Some("virtual host stopped."))
+ return
} else {
try {
val actions = addresses.map { address =>
address.domain match {
case "topic" =>
val allowed = topic_domain.can_bind_all(address, consumer, security)
- def perform() = topic_domain.bind(address, consumer, security)
+ def perform(on_bind:()=>Unit) = topic_domain.bind(address, consumer, security, on_bind)
(allowed, perform _)
case "queue" =>
val allowed = queue_domain.can_bind_all(address, consumer, security)
- def perform() = queue_domain.bind(address, consumer, security)
+ def perform(on_bind:()=>Unit) = queue_domain.bind(address, consumer, security, on_bind)
(allowed, perform _)
case "dsub" =>
val allowed = dsub_domain.can_bind_all(address, consumer, security)
- def perform() = dsub_domain.bind(address, consumer, security)
+ def perform(on_bind:()=>Unit) = dsub_domain.bind(address, consumer, security, on_bind)
(allowed, perform _)
case _ => sys.error("Unknown domain: "+address.domain)
}
@@ -1038,14 +1059,23 @@ class LocalRouter(val virtual_host:Virtu
val failures = actions.flatMap(_._1)
if( !failures.isEmpty ) {
- return Some(failures.mkString("; "))
+ cb(Some(failures.mkString("; ")))
+ return
} else {
- actions.foreach(_._2())
- return None
+ val remaining = new AtomicInteger(actions.length+1)
+ var bind_release:()=>Unit = ()=> {
+ if( remaining.decrementAndGet() == 0 ) {
+ cb(None)
+ }
+ }
+ actions.foreach(_._2(bind_release))
+ bind_release()
+ return
}
} catch {
case x:PathException =>
- return Some(x.getMessage)
+ cb(Some(x.getMessage))
+ return
}
}
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1447879&r1=1447878&r2=1447879&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Tue Feb 19 19:24:30 2013
@@ -1240,23 +1240,24 @@ class Queue(val router: LocalRouter, val
def connected() = {}
- def bind(value: DeliveryConsumer, ctx:SecurityContext): Result[Zilch, String] = {
+ def bind(value: DeliveryConsumer, ctx:SecurityContext, cb: (Result[Zilch, String])=>Unit):Unit = {
if( ctx!=null ) {
if( value.browser ) {
if( !virtual_host.authorizer.can(ctx, "receive", this) ) {
- return new Failure("Not authorized to browse the queue")
+ cb(new Failure("Not authorized to browse the queue"))
+ return
}
} else {
if( !virtual_host.authorizer.can(ctx, "consume", this) ) {
- return new Failure("Not authorized to consume from the queue")
+ cb(new Failure("Not authorized to consume from the queue"))
+ return
}
}
}
- bind(value::Nil)
- Success(Zilch)
+ bind(value::Nil, ()=>{ cb(Success(Zilch)) })
}
- def bind(values: List[DeliveryConsumer]) = {
+ def bind(values: List[DeliveryConsumer], on_bind:()=>Unit) = {
values.foreach(_.retain)
dispatch_queue {
for (consumer <- values) {
@@ -1264,6 +1265,7 @@ class Queue(val router: LocalRouter, val
sub.open
consumer.release()
}
+ on_bind()
}
}
@@ -1279,8 +1281,8 @@ class Queue(val router: LocalRouter, val
def disconnected() = throw new RuntimeException("unsupported")
- def bind(bind_address:BindAddress, consumer: DeliveryConsumer) = {
- bind(consumer::Nil)
+ def bind(bind_address:BindAddress, consumer: DeliveryConsumer, on_bind:()=>Unit) = {
+ bind(consumer::Nil, on_bind)
}
def unbind(consumer: DeliveryConsumer, persistent:Boolean):Unit = {
unbind(consumer::Nil)
@@ -1297,7 +1299,7 @@ class Queue(val router: LocalRouter, val
producers += producer
check_idle
}
- producer.bind(this::Nil)
+ producer.bind(this::Nil, ()=>{})
}
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1447879&r1=1447878&r2=1447879&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Tue Feb 19 19:24:30 2013
@@ -161,7 +161,7 @@ trait Router extends Service {
def get_queue(dto:Long):Option[Queue]
- def bind(destinations:Array[_ <: BindAddress], consumer:DeliveryConsumer, security:SecurityContext): Option[String]
+ def bind(destinations:Array[_ <: BindAddress], consumer:DeliveryConsumer, security:SecurityContext)(cb: (Option[String])=>Unit)
def unbind(destinations:Array[_ <: BindAddress], consumer:DeliveryConsumer, persistent:Boolean, security:SecurityContext)
@@ -188,7 +188,7 @@ trait BindableDeliveryProducer extends D
def dispatch_queue:DispatchQueue
- def bind(targets:List[DeliveryConsumer]):Unit
+ def bind(targets:List[DeliveryConsumer], on_bind:()=>Unit):Unit
def unbind(targets:List[DeliveryConsumer]):Unit
def connected():Unit
@@ -231,7 +231,7 @@ abstract class DeliveryProducerRoute(rou
on_connected
}
- def bind(consumers:List[DeliveryConsumer]) = {
+ def bind(consumers:List[DeliveryConsumer], on_bind:()=>Unit) = {
consumers.foreach(_.retain)
dispatch_queue {
consumers.foreach{ x=>
@@ -240,6 +240,7 @@ abstract class DeliveryProducerRoute(rou
target.refiller = drainer
targets ::= target
}
+ on_bind();
}
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1447879&r1=1447878&r2=1447879&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala Tue Feb 19 19:24:30 2013
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit
import org.fusesource.hawtdispatch._
import collection.mutable.{HashSet, HashMap, ListBuffer}
import security.SecuredResource
+import java.util.concurrent.atomic.AtomicInteger
/**
* <p>
@@ -411,7 +412,14 @@ class Topic(val router:LocalRouter, val
var topic_queue:Queue = null
- def bind(address: BindAddress, consumer:DeliveryConsumer):Unit = {
+ def bind(address: BindAddress, consumer:DeliveryConsumer, on_bind:()=>Unit):Unit = {
+
+ val remaining = new AtomicInteger(1)
+ var bind_release:()=>Unit = ()=> {
+ if( remaining.decrementAndGet() == 0 ) {
+ on_bind()
+ }
+ }
def send_retained = {
val r = retained_message
@@ -428,7 +436,7 @@ class Topic(val router:LocalRouter, val
offer(copy) // producer supports 1 message overflow.
}
}
- producer.bind(consumer :: Nil)
+ producer.bind(consumer :: Nil, ()=>{})
producer.connected()
}
}
@@ -445,7 +453,8 @@ class Topic(val router:LocalRouter, val
if ( topic_queue==null ) {
topic_queue = router._create_queue(new TempQueueBinding(id, Topic.this.address, Option(config.subscription).getOrElse(new QueueSettingsDTO)))
producers.keys.foreach({ r=>
- r.bind(List(topic_queue))
+ remaining.incrementAndGet()
+ r.bind(List(topic_queue), bind_release)
})
}
val proxy = new DeliveryConsumerFilter(consumer) {
@@ -456,7 +465,7 @@ class Topic(val router:LocalRouter, val
override def exclusive = false
}
topic_queue_consumers.put(consumer, proxy)
- topic_queue.bind(List(proxy))
+ topic_queue.bind(List(proxy), bind_release)
send_retained
return
@@ -500,8 +509,10 @@ class Topic(val router:LocalRouter, val
topic_metrics.consumer_counter += 1
val list = proxy :: Nil
producers.keys.foreach({ r=>
- r.bind(list)
+ remaining.incrementAndGet()
+ r.bind(list, bind_release)
})
+ bind_release()
check_idle
}
@@ -556,7 +567,7 @@ class Topic(val router:LocalRouter, val
def bind_durable_subscription(address: SubscriptionAddress, queue:Queue) = {
if( !durable_subscriptions.contains(queue) ) {
durable_subscriptions += queue
- bind(address, queue)
+ bind(address, queue, ()=>{})
}
check_idle
}
@@ -586,7 +597,7 @@ class Topic(val router:LocalRouter, val
if( topic_queue !=null ) {
targets ::= topic_queue
}
- producer.bind(targets )
+ producer.bind(targets, ()=>{})
check_idle
}
Modified: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala?rev=1447879&r1=1447878&r2=1447879&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala Tue Feb 19 19:24:30 2013
@@ -1059,8 +1059,9 @@ case class MqttSession(host_state:HostSt
host.dispatch_queue {
addresses.foreach { address=>
- host.router.bind(Array[BindAddress](address), mqtt_consumer, security_context)
+ host.router.bind(Array[BindAddress](address), mqtt_consumer, security_context) { result =>
// MQTT ignores subscribe failures.
+ }
}
on_subscribed
}
@@ -1093,7 +1094,8 @@ case class MqttSession(host_state:HostSt
host.router.unbind(Array(session_state.durable_sub), mqtt_consumer, true, security_context)
session_state.durable_sub = null
} else {
- host.router.bind(Array(session_state.durable_sub), mqtt_consumer, security_context)
+ host.router.bind(Array(session_state.durable_sub), mqtt_consumer, security_context) { result =>
+ }
}
}
queue {
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala?rev=1447879&r1=1447878&r2=1447879&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala Tue Feb 19 19:24:30 2013
@@ -121,7 +121,7 @@ class DestinationAdvisoryRouterListener(
super.on_connected
}
}
- producer.bind(consumer::Nil)
+ producer.bind(consumer::Nil, ()=>{})
producer.connected()
for( info <- destination_advisories.values ) {
producer.overflow_sink.offer(info)
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1447879&r1=1447878&r2=1447879&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Tue Feb 19 19:24:30 2013
@@ -932,13 +932,14 @@ class OpenwireProtocolHandler extends Pr
}
host.dispatch_queue {
- val rc = host.router.bind(addresses, this, security_context)
- dispatchQueue {
- rc match {
- case None =>
- ack(info)
- case Some(reason) =>
- fail(reason, info)
+ host.router.bind(addresses, this, security_context) { rc =>
+ dispatchQueue {
+ rc match {
+ case None =>
+ ack(info)
+ case Some(reason) =>
+ fail(reason, info)
+ }
}
}
}
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1447879&r1=1447878&r2=1447879&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Tue Feb 19 19:24:30 2013
@@ -1520,16 +1520,17 @@ class StompProtocolHandler extends Proto
consumers += (id -> consumer)
host.dispatch_queue {
- val rc = host.router.bind(addresses, consumer, security_context)
- dispatchQueue {
- rc match {
- case Some(reason)=>
- consumers -= id
- consumer.release
- async_die(reason)
- case None =>
- send_receipt(headers)
- consumer.supply_initial_credit
+ host.router.bind(addresses, consumer, security_context) { rc =>
+ dispatchQueue {
+ rc match {
+ case Some(reason)=>
+ consumers -= id
+ consumer.release
+ async_die(reason)
+ case None =>
+ send_receipt(headers)
+ consumer.supply_initial_credit
+ }
}
}
}
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala?rev=1447879&r1=1447878&r2=1447879&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala Tue Feb 19 19:24:30 2013
@@ -540,10 +540,6 @@ class StompParallelTest extends StompTes
subscribe("1", "/queue/load-balanced")
subscribe("2", "/queue/load-balanced")
- // Lets sleep a little to make sure the subscriptions are full
- // established before we start sending messages to them.
- Thread.sleep(500)
-
for (i <- 0 until 4) {
async_send("/queue/load-balanced", "message:" + i)
}
@@ -603,9 +599,6 @@ class StompParallelTest extends StompTes
subscribe("1", dest)
subscribe("2", dest)
- // Give the subs time to setup..
- Thread.sleep(500L)
-
var actual_mapping = mutable.HashMap[String, mutable.HashSet[Char]]()
def send_receive = {
@@ -639,8 +632,6 @@ class StompParallelTest extends StompTes
// Add another subscriber, the groups should re-balance
subscribe("3", dest)
- // Give the sub time to setup..
- Thread.sleep(500L)
actual_mapping = mutable.HashMap[String, mutable.HashSet[Char]]()
send_receive
@@ -786,33 +777,13 @@ class StompParallelTest extends StompTes
test("Queue order preserved") {
connect("1.1")
-
- def put(id: Int) = {
- client.write(
- "SEND\n" +
- "destination:/queue/example\n" +
- "\n" +
- "message:" + id + "\n")
- }
- put(1)
- put(2)
- put(3)
-
- client.write(
- "SUBSCRIBE\n" +
- "destination:/queue/example\n" +
- "id:0\n" +
- "\n")
-
- def get(id: Int) = {
- val frame = client.receive()
- frame should startWith("MESSAGE\n")
- frame should include("subscription:0\n")
- frame should endWith regex ("\n\nmessage:" + id + "\n")
- }
- get(1)
- get(2)
- get(3)
+ async_send("/queue/example", 1)
+ async_send("/queue/example", 2)
+ async_send("/queue/example", 3)
+ subscribe("0", "/queue/example")
+ assert_received(1, "0")
+ assert_received(2, "0")
+ assert_received(3, "0")
}
test("Topic drops messages sent before before subscription is established") {