You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/02/19 20:24:31 UTC

svn commit: r1447879 - in /activemq/activemq-apollo/trunk: apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/ apollo-op...

Author: chirino
Date: Tue Feb 19 19:24:30 2013
New Revision: 1447879

URL: http://svn.apache.org/r1447879
Log:
Updated the router bind method to take a callback so that callers know when the subscription has been fully bound.  This should avoid us needing some of sleep hacks in our tests.

Modified:
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
    activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala

Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala?rev=1447879&r1=1447878&r2=1447879&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala Tue Feb 19 19:24:30 2013
@@ -588,19 +588,20 @@ class AmqpProtocolHandler extends Protoc
       consumers += (id -> consumer)
 
       host.dispatch_queue {
-        val rc = host.router.bind(consumer.addresses, consumer, security_context)
-        queue {
-          rc match {
-            case Some(reason) =>
-              consumers -= id
-              consumer.release
-              sender.setSource(null)
-              close_with_error(sender, "amqp:not-found", reason)
-              onComplete.run()
-            case None =>
-              set_attachment(sender, consumer)
-              sender.open()
-              onComplete.run()
+        host.router.bind(consumer.addresses, consumer, security_context) { rc =>
+          queue {
+            rc match {
+              case Some(reason) =>
+                consumers -= id
+                consumer.release
+                sender.setSource(null)
+                close_with_error(sender, "amqp:not-found", reason)
+                onComplete.run()
+              case None =>
+                set_attachment(sender, consumer)
+                sender.open()
+                onComplete.run()
+            }
           }
         }
       }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1447879&r1=1447878&r2=1447879&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala Tue Feb 19 19:24:30 2013
@@ -23,12 +23,12 @@ import path._
 import path.PathParser.PathException
 import java.util.concurrent.TimeUnit
 import scala.Array
-import java.util.ArrayList
 import collection.{Iterable, JavaConversions}
 import security.SecuredResource.{TopicKind, QueueKind}
 import security.{SecuredResource, SecurityContext}
 import org.apache.activemq.apollo.dto._
 import scala.collection.mutable.{HashSet, HashMap, LinkedHashMap}
+import java.util.concurrent.atomic.AtomicInteger
 
 object DestinationMetricsSupport {
 
@@ -141,7 +141,7 @@ trait DomainDestination extends SecuredR
   def browse(from_seq:Long, to:Option[Long], max:Long)(func: (BrowseResult)=>Unit):Unit
 
 
-  def bind (bind_address:BindAddress, consumer:DeliveryConsumer):Unit
+  def bind (bind_address:BindAddress, consumer:DeliveryConsumer, on_bind:()=>Unit):Unit
   def unbind (consumer:DeliveryConsumer, persistent:Boolean):Unit
 
   def connect (connect_address:ConnectAddress, producer:BindableDeliveryProducer):Unit
@@ -288,7 +288,7 @@ class LocalRouter(val virtual_host:Virtu
       consumers_by_path.get( path ).foreach { case (consumer_context, bind_address)=>
         if( authorizer.can(consumer_context.security, bind_action(consumer_context.consumer), dest) ) {
           consumer_context.matched_destinations += dest
-          dest.bind(bind_address, consumer_context.consumer)
+          dest.bind(bind_address, consumer_context.consumer, ()=>{})
         }
       }
       producers_by_path.get( path ).foreach { x=>
@@ -374,7 +374,7 @@ class LocalRouter(val virtual_host:Virtu
       None
     }
 
-    def bind(bind_address:BindAddress, consumer:DeliveryConsumer, security:SecurityContext):Unit = {
+    def bind(bind_address:BindAddress, consumer:DeliveryConsumer, security:SecurityContext, on_bind:()=>Unit):Unit = {
 
       val context = consumers.getOrElseUpdate(consumer, new ConsumerContext[D](consumer))
       context.security = security
@@ -388,15 +388,26 @@ class LocalRouter(val virtual_host:Virtu
         matches --= context.matched_destinations
         context.matched_destinations ++= matches
 
+        val remaining = new AtomicInteger(1)
+        var bind_release:()=>Unit = ()=> {
+          if( remaining.decrementAndGet() == 0 ) {
+            on_bind()
+          }
+        }
+
         matches.foreach { dest=>
           if( authorizer.can(security, bind_action(consumer), dest) ) {
-            dest.bind(bind_address, consumer)
+            remaining.incrementAndGet()
+            dest.bind(bind_address, consumer, bind_release)
             for( l <- router_listeners) {
               l.on_bind(dest, consumer, security)
             }
           }
         }
+        bind_release()
 
+      } else {
+        on_bind()
       }
 
     }
@@ -694,7 +705,7 @@ class LocalRouter(val virtual_host:Virtu
       "consume"
     }
 
-    override def bind(bind_address: BindAddress, consumer: DeliveryConsumer, security: SecurityContext) {
+    override def bind(bind_address: BindAddress, consumer: DeliveryConsumer, security: SecurityContext, on_bind: ()=>Unit) {
       destination_by_id.get(bind_address.id).foreach { queue =>
 
         // We may need to update the bindings...
@@ -716,12 +727,21 @@ class LocalRouter(val virtual_host:Virtu
           case _ =>
         }
 
+        val remaining = new AtomicInteger(1)
+        var bind_release:()=>Unit = ()=> {
+          if( remaining.decrementAndGet() == 0 ) {
+            on_bind()
+          }
+        }
+
         if( authorizer.can(security, bind_action(consumer), queue) ) {
-          queue.bind(bind_address, consumer)
+          remaining.incrementAndGet()
+          queue.bind(bind_address, consumer, bind_release)
           for( l <- router_listeners) {
             l.on_bind(queue, consumer, security)
           }
         }
+        bind_release();
       }
     }
 
@@ -748,7 +768,7 @@ class LocalRouter(val virtual_host:Virtu
       if( queue.mirrored ) {
         // hook up the queue to be a subscriber of the topic.
         val topic = local_topic_domain.get_or_create_destination(SimpleAddress("topic", path), null).success
-        topic.bind(SimpleAddress("queue", path), queue)
+        topic.bind(SimpleAddress("queue", path), queue, ()=>{})
       }
     }
 
@@ -1012,25 +1032,26 @@ class LocalRouter(val virtual_host:Virtu
   def topic_domain:Domain[_ <: DomainDestination] = local_topic_domain
   def dsub_domain:Domain[_ <: DomainDestination] = local_dsub_domain
 
-  def bind(addresses: Array[_ <: BindAddress], consumer: DeliveryConsumer, security: SecurityContext):Option[String] = {
+  def bind(addresses: Array[_ <: BindAddress], consumer: DeliveryConsumer, security: SecurityContext)(cb: (Option[String])=>Unit):Unit = {
     dispatch_queue.assertExecuting()
     if(!virtual_host.service_state.is_started) {
-      return Some("virtual host stopped.")
+      cb(Some("virtual host stopped."))
+      return
     } else {
       try {
         val actions = addresses.map { address =>
           address.domain match {
             case "topic" =>
               val allowed = topic_domain.can_bind_all(address, consumer, security)
-              def perform() = topic_domain.bind(address, consumer, security)
+              def perform(on_bind:()=>Unit) = topic_domain.bind(address, consumer, security, on_bind)
               (allowed, perform _)
             case "queue" =>
               val allowed = queue_domain.can_bind_all(address, consumer, security)
-              def perform() = queue_domain.bind(address, consumer, security)
+              def perform(on_bind:()=>Unit) = queue_domain.bind(address, consumer, security, on_bind)
               (allowed, perform _)
             case "dsub" =>
               val allowed = dsub_domain.can_bind_all(address, consumer, security)
-              def perform() = dsub_domain.bind(address, consumer, security)
+              def perform(on_bind:()=>Unit) = dsub_domain.bind(address, consumer, security, on_bind)
               (allowed, perform _)
             case _ => sys.error("Unknown domain: "+address.domain)
           }
@@ -1038,14 +1059,23 @@ class LocalRouter(val virtual_host:Virtu
 
         val failures = actions.flatMap(_._1)
         if( !failures.isEmpty ) {
-          return Some(failures.mkString("; "))
+          cb(Some(failures.mkString("; ")))
+          return
         } else {
-          actions.foreach(_._2())
-          return None
+          val remaining = new AtomicInteger(actions.length+1)
+          var bind_release:()=>Unit = ()=> {
+            if( remaining.decrementAndGet() == 0 ) {
+              cb(None)
+            }
+          }
+          actions.foreach(_._2(bind_release))
+          bind_release()
+          return
         }
       } catch {
         case x:PathException =>
-          return Some(x.getMessage)
+          cb(Some(x.getMessage))
+          return
       }
     }
   }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1447879&r1=1447878&r2=1447879&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Tue Feb 19 19:24:30 2013
@@ -1240,23 +1240,24 @@ class Queue(val router: LocalRouter, val
 
   def connected() = {}
 
-  def bind(value: DeliveryConsumer, ctx:SecurityContext): Result[Zilch, String] = {
+  def bind(value: DeliveryConsumer, ctx:SecurityContext, cb: (Result[Zilch, String])=>Unit):Unit = {
     if( ctx!=null ) {
       if( value.browser ) {
         if( !virtual_host.authorizer.can(ctx, "receive", this) ) {
-          return new Failure("Not authorized to browse the queue")
+          cb(new Failure("Not authorized to browse the queue"))
+          return
         }
       } else {
         if( !virtual_host.authorizer.can(ctx, "consume", this) ) {
-          return new Failure("Not authorized to consume from the queue")
+          cb(new Failure("Not authorized to consume from the queue"))
+          return
         }
       }
     }
-    bind(value::Nil)
-    Success(Zilch)
+    bind(value::Nil, ()=>{ cb(Success(Zilch)) })
   }
 
-  def bind(values: List[DeliveryConsumer]) = {
+  def bind(values: List[DeliveryConsumer], on_bind:()=>Unit) = {
     values.foreach(_.retain)
     dispatch_queue {
       for (consumer <- values) {
@@ -1264,6 +1265,7 @@ class Queue(val router: LocalRouter, val
         sub.open
         consumer.release()
       }
+      on_bind()
     }
   }
 
@@ -1279,8 +1281,8 @@ class Queue(val router: LocalRouter, val
 
   def disconnected() = throw new RuntimeException("unsupported")
 
-  def bind(bind_address:BindAddress, consumer: DeliveryConsumer) = {
-    bind(consumer::Nil)
+  def bind(bind_address:BindAddress, consumer: DeliveryConsumer, on_bind:()=>Unit) = {
+    bind(consumer::Nil, on_bind)
   }
   def unbind(consumer: DeliveryConsumer, persistent:Boolean):Unit = {
     unbind(consumer::Nil)
@@ -1297,7 +1299,7 @@ class Queue(val router: LocalRouter, val
         producers += producer
         check_idle
       }
-      producer.bind(this::Nil)
+      producer.bind(this::Nil, ()=>{})
     }
   }
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1447879&r1=1447878&r2=1447879&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Tue Feb 19 19:24:30 2013
@@ -161,7 +161,7 @@ trait Router extends Service {
 
   def get_queue(dto:Long):Option[Queue]
 
-  def bind(destinations:Array[_ <: BindAddress], consumer:DeliveryConsumer, security:SecurityContext): Option[String]
+  def bind(destinations:Array[_ <: BindAddress], consumer:DeliveryConsumer, security:SecurityContext)(cb: (Option[String])=>Unit)
 
   def unbind(destinations:Array[_ <: BindAddress], consumer:DeliveryConsumer, persistent:Boolean, security:SecurityContext)
 
@@ -188,7 +188,7 @@ trait BindableDeliveryProducer extends D
 
   def dispatch_queue:DispatchQueue
 
-  def bind(targets:List[DeliveryConsumer]):Unit
+  def bind(targets:List[DeliveryConsumer], on_bind:()=>Unit):Unit
   def unbind(targets:List[DeliveryConsumer]):Unit
 
   def connected():Unit
@@ -231,7 +231,7 @@ abstract class DeliveryProducerRoute(rou
     on_connected
   }
 
-  def bind(consumers:List[DeliveryConsumer]) = {
+  def bind(consumers:List[DeliveryConsumer], on_bind:()=>Unit) = {
     consumers.foreach(_.retain)
     dispatch_queue {
       consumers.foreach{ x=>
@@ -240,6 +240,7 @@ abstract class DeliveryProducerRoute(rou
         target.refiller = drainer
         targets ::= target
       }
+      on_bind();
     }
   }
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1447879&r1=1447878&r2=1447879&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala Tue Feb 19 19:24:30 2013
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit
 import org.fusesource.hawtdispatch._
 import collection.mutable.{HashSet, HashMap, ListBuffer}
 import security.SecuredResource
+import java.util.concurrent.atomic.AtomicInteger
 
 /**
  * <p>
@@ -411,7 +412,14 @@ class Topic(val router:LocalRouter, val 
 
   var topic_queue:Queue = null
 
-  def bind(address: BindAddress, consumer:DeliveryConsumer):Unit = {
+  def bind(address: BindAddress, consumer:DeliveryConsumer, on_bind:()=>Unit):Unit = {
+
+    val remaining = new AtomicInteger(1)
+    var bind_release:()=>Unit = ()=> {
+      if( remaining.decrementAndGet() == 0 ) {
+        on_bind()
+      }
+    }
 
     def send_retained = {
       val r = retained_message
@@ -428,7 +436,7 @@ class Topic(val router:LocalRouter, val 
             offer(copy) // producer supports 1 message overflow.
           }
         }
-        producer.bind(consumer :: Nil)
+        producer.bind(consumer :: Nil, ()=>{})
         producer.connected()
       }
     }
@@ -445,7 +453,8 @@ class Topic(val router:LocalRouter, val 
             if ( topic_queue==null ) {
               topic_queue = router._create_queue(new TempQueueBinding(id, Topic.this.address, Option(config.subscription).getOrElse(new QueueSettingsDTO)))
               producers.keys.foreach({ r=>
-                r.bind(List(topic_queue))
+                remaining.incrementAndGet()
+                r.bind(List(topic_queue), bind_release)
               })
             }
             val proxy = new DeliveryConsumerFilter(consumer) {
@@ -456,7 +465,7 @@ class Topic(val router:LocalRouter, val 
               override def exclusive = false
             }
             topic_queue_consumers.put(consumer, proxy)
-            topic_queue.bind(List(proxy))
+            topic_queue.bind(List(proxy), bind_release)
             send_retained
             return
 
@@ -500,8 +509,10 @@ class Topic(val router:LocalRouter, val 
     topic_metrics.consumer_counter += 1
     val list = proxy :: Nil
     producers.keys.foreach({ r=>
-      r.bind(list)
+      remaining.incrementAndGet()
+      r.bind(list, bind_release)
     })
+    bind_release()
     check_idle
   }
 
@@ -556,7 +567,7 @@ class Topic(val router:LocalRouter, val 
   def bind_durable_subscription(address: SubscriptionAddress, queue:Queue)  = {
     if( !durable_subscriptions.contains(queue) ) {
       durable_subscriptions += queue
-      bind(address, queue)
+      bind(address, queue, ()=>{})
     }
     check_idle
   }
@@ -586,7 +597,7 @@ class Topic(val router:LocalRouter, val 
     if( topic_queue !=null ) {
       targets ::= topic_queue
     }
-    producer.bind(targets )
+    producer.bind(targets, ()=>{})
     check_idle
   }
 

Modified: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala?rev=1447879&r1=1447878&r2=1447879&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala Tue Feb 19 19:24:30 2013
@@ -1059,8 +1059,9 @@ case class MqttSession(host_state:HostSt
 
     host.dispatch_queue {
       addresses.foreach { address=>
-        host.router.bind(Array[BindAddress](address), mqtt_consumer, security_context)
+        host.router.bind(Array[BindAddress](address), mqtt_consumer, security_context) { result =>
         // MQTT ignores subscribe failures.
+        }
       }
       on_subscribed
     }
@@ -1093,7 +1094,8 @@ case class MqttSession(host_state:HostSt
           host.router.unbind(Array(session_state.durable_sub), mqtt_consumer, true, security_context)
           session_state.durable_sub = null
         } else {
-          host.router.bind(Array(session_state.durable_sub), mqtt_consumer, security_context)
+          host.router.bind(Array(session_state.durable_sub), mqtt_consumer, security_context) { result =>
+          }
         }
       }
       queue {

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala?rev=1447879&r1=1447878&r2=1447879&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala Tue Feb 19 19:24:30 2013
@@ -121,7 +121,7 @@ class DestinationAdvisoryRouterListener(
             super.on_connected
           }
         }
-        producer.bind(consumer::Nil)
+        producer.bind(consumer::Nil, ()=>{})
         producer.connected()
         for( info <- destination_advisories.values ) {
           producer.overflow_sink.offer(info)

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1447879&r1=1447878&r2=1447879&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Tue Feb 19 19:24:30 2013
@@ -932,13 +932,14 @@ class OpenwireProtocolHandler extends Pr
       }
 
       host.dispatch_queue {
-        val rc = host.router.bind(addresses, this, security_context)
-        dispatchQueue {
-          rc match {
-            case None =>
-              ack(info)
-            case Some(reason) =>
-              fail(reason, info)
+        host.router.bind(addresses, this, security_context) { rc =>
+          dispatchQueue {
+            rc match {
+              case None =>
+                ack(info)
+              case Some(reason) =>
+                fail(reason, info)
+            }
           }
         }
       }

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1447879&r1=1447878&r2=1447879&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Tue Feb 19 19:24:30 2013
@@ -1520,16 +1520,17 @@ class StompProtocolHandler extends Proto
     consumers += (id -> consumer)
 
     host.dispatch_queue {
-      val rc = host.router.bind(addresses, consumer, security_context)
-      dispatchQueue {
-        rc match {
-          case Some(reason)=>
-            consumers -= id
-            consumer.release
-            async_die(reason)
-          case None =>
-            send_receipt(headers)
-            consumer.supply_initial_credit
+      host.router.bind(addresses, consumer, security_context) { rc =>
+        dispatchQueue {
+          rc match {
+            case Some(reason)=>
+              consumers -= id
+              consumer.release
+              async_die(reason)
+            case None =>
+              send_receipt(headers)
+              consumer.supply_initial_credit
+          }
         }
       }
     }

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala?rev=1447879&r1=1447878&r2=1447879&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala Tue Feb 19 19:24:30 2013
@@ -540,10 +540,6 @@ class StompParallelTest extends StompTes
     subscribe("1", "/queue/load-balanced")
     subscribe("2", "/queue/load-balanced")
 
-    // Lets sleep a little to make sure the subscriptions are full
-    // established before we start sending messages to them.
-    Thread.sleep(500)
-
     for (i <- 0 until 4) {
       async_send("/queue/load-balanced", "message:" + i)
     }
@@ -603,9 +599,6 @@ class StompParallelTest extends StompTes
     subscribe("1", dest)
     subscribe("2", dest)
 
-    // Give the subs time to setup..
-    Thread.sleep(500L)
-
     var actual_mapping = mutable.HashMap[String, mutable.HashSet[Char]]()
 
     def send_receive = {
@@ -639,8 +632,6 @@ class StompParallelTest extends StompTes
 
     // Add another subscriber, the groups should re-balance
     subscribe("3", dest)
-    // Give the sub time to setup..
-    Thread.sleep(500L)
 
     actual_mapping = mutable.HashMap[String, mutable.HashSet[Char]]()
     send_receive
@@ -786,33 +777,13 @@ class StompParallelTest extends StompTes
 
   test("Queue order preserved") {
     connect("1.1")
-
-    def put(id: Int) = {
-      client.write(
-        "SEND\n" +
-                "destination:/queue/example\n" +
-                "\n" +
-                "message:" + id + "\n")
-    }
-    put(1)
-    put(2)
-    put(3)
-
-    client.write(
-      "SUBSCRIBE\n" +
-              "destination:/queue/example\n" +
-              "id:0\n" +
-              "\n")
-
-    def get(id: Int) = {
-      val frame = client.receive()
-      frame should startWith("MESSAGE\n")
-      frame should include("subscription:0\n")
-      frame should endWith regex ("\n\nmessage:" + id + "\n")
-    }
-    get(1)
-    get(2)
-    get(3)
+    async_send("/queue/example", 1)
+    async_send("/queue/example", 2)
+    async_send("/queue/example", 3)
+    subscribe("0", "/queue/example")
+    assert_received(1, "0")
+    assert_received(2, "0")
+    assert_received(3, "0")
   }
 
   test("Topic drops messages sent before before subscription is established") {