You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/02/04 22:43:17 UTC

svn commit: r1240617 - in /activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker: LocalRouter.scala Router.scala Sink.scala Topic.scala

Author: chirino
Date: Sat Feb  4 21:43:17 2012
New Revision: 1240617

URL: http://svn.apache.org/viewvc?rev=1240617&view=rev
Log:
The router now supports updating a consumer's subscriptions (adding or removing subscribed destinations).

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1240617&r1=1240616&r2=1240617&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala Sat Feb  4 21:43:17 2012
@@ -24,11 +24,11 @@ import path.PathParser.PathException
 import java.util.concurrent.TimeUnit
 import scala.Array
 import java.util.ArrayList
-import collection.mutable.LinkedHashMap
 import collection.{Iterable, JavaConversions}
 import security.SecuredResource.{TopicKind, QueueKind}
 import security.{SecuredResource, SecurityContext}
 import org.apache.activemq.apollo.dto._
+import scala.collection.mutable.{HashSet, HashMap, LinkedHashMap}
 
 object DestinationMetricsSupport {
 
@@ -149,12 +149,15 @@ object LocalRouter extends Log {
     }
   }
 
-  class ConsumerContext(val bind_address:BindAddress, val consumer:DeliveryConsumer, val security:SecurityContext) {
-    override def hashCode: Int = consumer.hashCode
+  class ConsumerContext[D <: DomainDestination](val consumer:DeliveryConsumer) {
+    val bind_addresses = HashSet[BindAddress]()
+    val matched_destinations = HashSet[D]()
+    var security:SecurityContext = _
 
+    override def hashCode: Int = consumer.hashCode
     override def equals(obj: Any): Boolean = {
       obj match {
-        case x:ConsumerContext=> x.consumer == consumer
+        case x:ConsumerContext[_] => x.consumer == consumer
         case _ => false
       }
     }
@@ -228,7 +231,8 @@ class LocalRouter(val virtual_host:Virtu
     var destination_by_path = new PathMap[D]()
     // Can store consumers on wild cards paths
 
-    val consumers_by_path = new PathMap[ConsumerContext]()
+    val consumers = HashMap[DeliveryConsumer, ConsumerContext[D]]()
+    val consumers_by_path = new PathMap[(ConsumerContext[D], BindAddress)]()
     val producers_by_path = new PathMap[ProducerContext]()
 
     def destinations:Iterable[D] = JavaConversions.collectionAsScalaIterable(destination_by_path.get(ALL))
@@ -263,9 +267,10 @@ class LocalRouter(val virtual_host:Virtu
 
       // binds any matching wild card subs and producers...
       import JavaConversions._
-      consumers_by_path.get( path ).foreach { x=>
-        if( authorizer.can(x.security, bind_action(x.consumer), dest) ) {
-          dest.bind(x.bind_address, x.consumer)
+      consumers_by_path.get( path ).foreach { case (consumer_context, bind_address)=>
+        if( authorizer.can(consumer_context.security, bind_action(consumer_context.consumer), dest) ) {
+          consumer_context.matched_destinations += dest
+          dest.bind(bind_address, consumer_context.consumer)
         }
       }
       producers_by_path.get( path ).foreach { x=>
@@ -352,28 +357,63 @@ class LocalRouter(val virtual_host:Virtu
     }
 
     def bind(bind_address:BindAddress, consumer:DeliveryConsumer, security:SecurityContext):Unit = {
-      var matches = get_destination_matches(bind_address.path)
-      matches.foreach { dest=>
-        if( authorizer.can(security, bind_action(consumer), dest) ) {
-          dest.bind(bind_address, consumer)
-          for( l <- router_listeners) {
-            l.on_bind(dest, consumer, security)
+
+      val context = consumers.getOrElseUpdate(consumer, new ConsumerContext[D](consumer))
+      context.security = security
+      if( context.bind_addresses.add(bind_address) ) {
+
+        consumers_by_path.put(bind_address.path, (context, bind_address))
+        consumer.retain
+
+        // Get the list of new destination matches..
+        var matches = get_destination_matches(bind_address.path).toSet
+        matches --= context.matched_destinations
+        context.matched_destinations ++= matches
+
+        matches.foreach { dest=>
+          if( authorizer.can(security, bind_action(consumer), dest) ) {
+            dest.bind(bind_address, consumer)
+            for( l <- router_listeners) {
+              l.on_bind(dest, consumer, security)
+            }
           }
         }
+
       }
-      consumer.retain
-      consumers_by_path.put(bind_address.path, new ConsumerContext(bind_address, consumer, security))
+
     }
 
     def unbind(bind_address:BindAddress, consumer:DeliveryConsumer, persistent:Boolean, security: SecurityContext) = {
-      if( consumers_by_path.remove(bind_address.path, new ConsumerContext(bind_address, consumer, null) ) ) {
-        get_destination_matches(bind_address.path).foreach{ dest=>
-          dest.unbind(consumer, persistent)
-          for( l <- router_listeners) {
-            l.on_unbind(dest, consumer, persistent)
+      consumers.get(consumer) match {
+        case None => // odd..
+        case Some(context) =>
+          if( context.bind_addresses.remove(bind_address) ) {
+
+            // What did we match?
+            var matches = context.matched_destinations.toSet
+
+            // rebuild the set of what we still match..
+            context.matched_destinations.clear
+            context.bind_addresses.foreach { address =>
+              context.matched_destinations ++= get_destination_matches(address.path)
+            }
+
+            // Take the diff to find out what we don't match anymore..
+            matches --= context.matched_destinations
+
+            matches.foreach{ dest=>
+              dest.unbind(consumer, persistent)
+              for( l <- router_listeners) {
+                l.on_unbind(dest, consumer, persistent)
+              }
+            }
+
+            consumer.release
+            consumers_by_path.remove(bind_address.path, (context, bind_address))
+            if(context.bind_addresses.isEmpty) {
+              consumers.remove(consumer);
+            }
           }
-        }
-        consumer.release
       }
     }
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1240617&r1=1240616&r2=1240617&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Sat Feb  4 21:43:17 2012
@@ -135,12 +135,15 @@ object DestinationAddress {
 sealed trait DestinationAddress {
   def domain:String
   def path:Path
+  def simple:SimpleAddress = SimpleAddress(domain, path)
   val id = DestinationAddress.encode_path(path)
   override def toString: String =  domain+":"+id
 }
 sealed trait ConnectAddress extends DestinationAddress
 sealed trait BindAddress extends DestinationAddress
-case class SimpleAddress(val domain:String, val path:Path) extends ConnectAddress with BindAddress
+case class SimpleAddress(val domain:String, val path:Path) extends ConnectAddress with BindAddress {
+  override def simple = this
+}
 case class SubscriptionAddress(val path:Path, val selector:String, topics:Array[_ <: BindAddress]) extends BindAddress {
   def domain = "dsub"
 }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=1240617&r1=1240616&r2=1240617&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala Sat Feb  4 21:43:17 2012
@@ -58,6 +58,21 @@ trait Sink[T] {
     def downstream = Sink.this
   }
 
+  def flatMap[Y](func: Y=>Option[T]):Sink[Y] = new Sink[Y] with SinkFilter[T] {
+    def downstream = Sink.this
+    def offer(value:Y) = {
+      if( full ) {
+        false
+      } else {
+        val opt = func(value)
+        if( opt.isDefined ) {
+          downstream.offer(opt.get)
+        }
+        true
+      }
+    }
+  }
+
 }
 
 trait SinkFilter[T] {

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1240617&r1=1240616&r2=1240617&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala Sat Feb  4 21:43:17 2012
@@ -107,7 +107,7 @@ class Topic(val router:LocalRouter, val 
 
   case class ProxyConsumerSession(proxy:ProxyDeliveryConsumer, session:DeliverySession) extends DeliverySession with SessionSinkFilter[Delivery] {
 
-    override def toString = proxy.consumer.toString
+    override def toString = proxy.consumer.toString + " (via "+address+")"
 
     def downstream = session