You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/02/04 22:43:17 UTC
svn commit: r1240617 - in
/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker:
LocalRouter.scala Router.scala Sink.scala Topic.scala
Author: chirino
Date: Sat Feb 4 21:43:17 2012
New Revision: 1240617
URL: http://svn.apache.org/viewvc?rev=1240617&view=rev
Log:
The router now supports updating a consumer's subscriptions (adding or removing subscribed destinations).
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1240617&r1=1240616&r2=1240617&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala Sat Feb 4 21:43:17 2012
@@ -24,11 +24,11 @@ import path.PathParser.PathException
import java.util.concurrent.TimeUnit
import scala.Array
import java.util.ArrayList
-import collection.mutable.LinkedHashMap
import collection.{Iterable, JavaConversions}
import security.SecuredResource.{TopicKind, QueueKind}
import security.{SecuredResource, SecurityContext}
import org.apache.activemq.apollo.dto._
+import scala.collection.mutable.{HashSet, HashMap, LinkedHashMap}
object DestinationMetricsSupport {
@@ -149,12 +149,15 @@ object LocalRouter extends Log {
}
}
- class ConsumerContext(val bind_address:BindAddress, val consumer:DeliveryConsumer, val security:SecurityContext) {
- override def hashCode: Int = consumer.hashCode
+ class ConsumerContext[D <: DomainDestination](val consumer:DeliveryConsumer) {
+ val bind_addresses = HashSet[BindAddress]()
+ val matched_destinations = HashSet[D]()
+ var security:SecurityContext = _
+ override def hashCode: Int = consumer.hashCode
override def equals(obj: Any): Boolean = {
obj match {
- case x:ConsumerContext=> x.consumer == consumer
+ case x:ConsumerContext[_] => x.consumer == consumer
case _ => false
}
}
@@ -228,7 +231,8 @@ class LocalRouter(val virtual_host:Virtu
var destination_by_path = new PathMap[D]()
// Can store consumers on wild cards paths
- val consumers_by_path = new PathMap[ConsumerContext]()
+ val consumers = HashMap[DeliveryConsumer, ConsumerContext[D]]()
+ val consumers_by_path = new PathMap[(ConsumerContext[D], BindAddress)]()
val producers_by_path = new PathMap[ProducerContext]()
def destinations:Iterable[D] = JavaConversions.collectionAsScalaIterable(destination_by_path.get(ALL))
@@ -263,9 +267,10 @@ class LocalRouter(val virtual_host:Virtu
// binds any matching wild card subs and producers...
import JavaConversions._
- consumers_by_path.get( path ).foreach { x=>
- if( authorizer.can(x.security, bind_action(x.consumer), dest) ) {
- dest.bind(x.bind_address, x.consumer)
+ consumers_by_path.get( path ).foreach { case (consumer_context, bind_address)=>
+ if( authorizer.can(consumer_context.security, bind_action(consumer_context.consumer), dest) ) {
+ consumer_context.matched_destinations += dest
+ dest.bind(bind_address, consumer_context.consumer)
}
}
producers_by_path.get( path ).foreach { x=>
@@ -352,28 +357,63 @@ class LocalRouter(val virtual_host:Virtu
}
def bind(bind_address:BindAddress, consumer:DeliveryConsumer, security:SecurityContext):Unit = {
- var matches = get_destination_matches(bind_address.path)
- matches.foreach { dest=>
- if( authorizer.can(security, bind_action(consumer), dest) ) {
- dest.bind(bind_address, consumer)
- for( l <- router_listeners) {
- l.on_bind(dest, consumer, security)
+
+ val context = consumers.getOrElseUpdate(consumer, new ConsumerContext[D](consumer))
+ context.security = security
+ if( context.bind_addresses.add(bind_address) ) {
+
+ consumers_by_path.put(bind_address.path, (context, bind_address))
+ consumer.retain
+
+ // Get the list of new destination matches..
+ var matches = get_destination_matches(bind_address.path).toSet
+ matches --= context.matched_destinations
+ context.matched_destinations ++= matches
+
+ matches.foreach { dest=>
+ if( authorizer.can(security, bind_action(consumer), dest) ) {
+ dest.bind(bind_address, consumer)
+ for( l <- router_listeners) {
+ l.on_bind(dest, consumer, security)
+ }
}
}
+
}
- consumer.retain
- consumers_by_path.put(bind_address.path, new ConsumerContext(bind_address, consumer, security))
+
}
def unbind(bind_address:BindAddress, consumer:DeliveryConsumer, persistent:Boolean, security: SecurityContext) = {
- if( consumers_by_path.remove(bind_address.path, new ConsumerContext(bind_address, consumer, null) ) ) {
- get_destination_matches(bind_address.path).foreach{ dest=>
- dest.unbind(consumer, persistent)
- for( l <- router_listeners) {
- l.on_unbind(dest, consumer, persistent)
+ consumers.get(consumer) match {
+ case None => // odd..
+ case Some(context) =>
+ if( context.bind_addresses.remove(bind_address) ) {
+
+ // What did we match?
+ var matches = context.matched_destinations.toSet
+
+ // rebuild the set of what we still match..
+ context.matched_destinations.clear
+ context.bind_addresses.foreach { address =>
+ context.matched_destinations ++= get_destination_matches(address.path)
+ }
+
+ // Take the diff to find out what we don't match anymore..
+ matches --= context.matched_destinations
+
+ matches.foreach{ dest=>
+ dest.unbind(consumer, persistent)
+ for( l <- router_listeners) {
+ l.on_unbind(dest, consumer, persistent)
+ }
+ }
+
+ consumer.release
+ consumers_by_path.remove(bind_address.path, (context, bind_address))
+ if(context.bind_addresses.isEmpty) {
+ consumers.remove(consumer);
+ }
}
- }
- consumer.release
}
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1240617&r1=1240616&r2=1240617&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Sat Feb 4 21:43:17 2012
@@ -135,12 +135,15 @@ object DestinationAddress {
sealed trait DestinationAddress {
def domain:String
def path:Path
+ def simple:SimpleAddress = SimpleAddress(domain, path)
val id = DestinationAddress.encode_path(path)
override def toString: String = domain+":"+id
}
sealed trait ConnectAddress extends DestinationAddress
sealed trait BindAddress extends DestinationAddress
-case class SimpleAddress(val domain:String, val path:Path) extends ConnectAddress with BindAddress
+case class SimpleAddress(val domain:String, val path:Path) extends ConnectAddress with BindAddress {
+ override def simple = this
+}
case class SubscriptionAddress(val path:Path, val selector:String, topics:Array[_ <: BindAddress]) extends BindAddress {
def domain = "dsub"
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=1240617&r1=1240616&r2=1240617&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala Sat Feb 4 21:43:17 2012
@@ -58,6 +58,21 @@ trait Sink[T] {
def downstream = Sink.this
}
+ def flatMap[Y](func: Y=>Option[T]):Sink[Y] = new Sink[Y] with SinkFilter[T] {
+ def downstream = Sink.this
+ def offer(value:Y) = {
+ if( full ) {
+ false
+ } else {
+ val opt = func(value)
+ if( opt.isDefined ) {
+ downstream.offer(opt.get)
+ }
+ true
+ }
+ }
+ }
+
}
trait SinkFilter[T] {
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1240617&r1=1240616&r2=1240617&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala Sat Feb 4 21:43:17 2012
@@ -107,7 +107,7 @@ class Topic(val router:LocalRouter, val
case class ProxyConsumerSession(proxy:ProxyDeliveryConsumer, session:DeliverySession) extends DeliverySession with SessionSinkFilter[Delivery] {
- override def toString = proxy.consumer.toString
+ override def toString = proxy.consumer.toString + " (via "+address+")"
def downstream = session