You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/12/10 21:28:44 UTC
svn commit: r1044500 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/
apollo-broker/src/test/scala/org/apache/activemq/apollo/...
Author: chirino
Date: Fri Dec 10 20:28:43 2010
New Revision: 1044500
URL: http://svn.apache.org/viewvc?rev=1044500&view=rev
Log:
Broker security model is now working /w stomp fine.
Added:
activemq/activemq-apollo/trunk/apollo-dto/src/test/resources/org/apache/activemq/apollo/dto/XmlCodecTest.xml
- copied, changed from r1044356, activemq/activemq-apollo/trunk/apollo-dto/src/test/resources/org/apache/activemq/apollo/dto/simple.xml
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Result.scala
Removed:
activemq/activemq-apollo/trunk/apollo-dto/src/test/resources/org/apache/activemq/apollo/dto/simple.xml
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ConfigStore.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/AclAuthorizer.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/Authorizer.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/JaasAuthenticator.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/SecurityContext.scala
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/DestinationConfigurationTest.scala
activemq/activemq-apollo/trunk/apollo-cli/src/main/resources/org/apache/activemq/apollo/cli/commands/etc/apollo-ssl.xml
activemq/activemq-apollo/trunk/apollo-cli/src/main/resources/org/apache/activemq/apollo/cli/commands/etc/apollo.xml
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AuthenticationDTO.java
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationAclDTO.java
activemq/activemq-apollo/trunk/apollo-dto/src/test/java/org/apache/activemq/apollo/dto/XmlCodecTest.java
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=1044500&r1=1044499&r2=1044500&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Fri Dec 10 20:28:43 2010
@@ -216,7 +216,8 @@ class Broker() extends BaseService with
key_storage.config = config.key_storage
}
- if( config.authentication != null ) {
+ import OptionSupport._
+ if( config.authentication != null && config.authentication.enabled.getOrElse(true) ) {
authenticator = new JaasAuthenticator(config.authentication.domain)
authorizer = new AclAuthorizer(config.authentication.kinds().toList)
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ConfigStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ConfigStore.scala?rev=1044500&r1=1044499&r2=1044500&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ConfigStore.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ConfigStore.scala Fri Dec 10 20:28:43 2010
@@ -122,24 +122,10 @@ class FileConfigStore extends ConfigStor
}
revs = revs.sortWith((x,y)=> x < y)
- val last = revs.lastOption.map{ rev=>
- val r = read(rev, fileRev(rev))
- if( !file.exists ) {
- write(r)
- } else {
- val x = read(rev, file)
- if ( can_write && !Arrays.equals(r.data, x.data) ) {
- write(x.copy(rev=x.rev+1))
- } else {
- x
- }
- }
- } getOrElse {
- if( file.exists ) {
- read(1, file)
- } else {
- write(StoredBrokerModel(defaultConfig(1)))
- }
+ val last = if( file.exists ) {
+ read(1, file)
+ } else {
+ write(StoredBrokerModel(defaultConfig(1)))
}
latest = last
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1044500&r1=1044499&r2=1044500&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Fri Dec 10 20:28:43 2010
@@ -32,6 +32,7 @@ import org.apache.activemq.apollo.util.l
import org.fusesource.hawtdispatch.{Dispatch, ListEventAggregator, DispatchQueue, BaseRetained}
import org.apache.activemq.apollo.dto.QueueDTO
import OptionSupport._
+import security.SecurityContext
object Queue extends Log {
val subcsription_counter = new AtomicInteger(0)
@@ -505,6 +506,22 @@ class Queue(val host: VirtualHost, var i
def connected() = {}
+ def bind(value: DeliveryConsumer, security:SecurityContext): Result[Zilch, String] = {
+ if( host.authorizer!=null && security!=null ) {
+ if( value.browser ) {
+ if( !host.authorizer.can_receive_from(security, host, config) ) {
+ return new Failure("Not authorized to browse the queue")
+ }
+ } else {
+ if( !host.authorizer.can_consume_from(security, host, config) ) {
+ return new Failure("Not authorized to consume from the queue")
+ }
+ }
+ }
+ bind(value::Nil)
+ Success(Zilch)
+ }
+
def bind(values: List[DeliveryConsumer]) = retaining(values) {
for (consumer <- values) {
val subscription = new Subscription(this, consumer)
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1044500&r1=1044499&r2=1044500&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Fri Dec 10 20:28:43 2010
@@ -31,6 +31,7 @@ import Buffer._
import org.apache.activemq.apollo.util.path.{Path, Part, PathMap, PathParser}
import java.util.ArrayList
import org.apache.activemq.apollo.dto._
+import security.SecurityContext
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -94,41 +95,58 @@ class Router(val host:VirtualHost) exten
def routing_nodes:Iterable[RoutingNode] = JavaConversions.asIterable(destinations.get(ALL))
- def create_destination_or(destination:Path)(func:(RoutingNode)=>Unit):RoutingNode = {
+ def _get_or_create_destination(path:Path, security:SecurityContext) = {
+ // We can't create a wild card destination.. only wild card subscriptions.
+ assert( !PathParser.containsWildCards(path) )
+ var rc = destinations.chooseValue( path )
+ if( rc == null ) {
+ _create_destination(path, security)
+ } else {
+ Success(rc)
+ }
+ }
+
+ def _get_destination(path:Path) = {
+ Option(destinations.chooseValue( path ))
+ }
+
+ def _create_destination(path:Path, security:SecurityContext):Result[RoutingNode,String] = {
// We can't create a wild card destination.. only wild card subscriptions.
- assert( !PathParser.containsWildCards(destination) )
+ assert( !PathParser.containsWildCards(path) )
- var rc = destinations.chooseValue( destination )
- if( rc == null ) {
+ // A new destination is being created...
+ val config = host.destination_config(path).getOrElse(new DestinationDTO)
- // A new destination is being created...
- rc = new RoutingNode(this, destination )
- destinations.put(destination, rc)
-
- // bind any matching wild card subs
- import JavaConversions._
- broadcast_consumers.get( destination ).foreach { c=>
- rc.add_broadcast_consumer(c)
- }
- bindings.get( destination ).foreach { queue=>
- rc.add_queue(queue)
- }
+ if( host.authorizer!=null && security!=null && !host.authorizer.can_create(security, host, config)) {
+ return new Failure("Not authorized to create the destination")
+ }
- } else {
- func(rc)
+ val rc = new RoutingNode(this, path, config)
+ destinations.put(path, rc)
+
+ // bind any matching wild card subs
+ import JavaConversions._
+ broadcast_consumers.get( path ).foreach { c=>
+ rc.add_broadcast_consumer(c)
}
- rc
+ bindings.get( path ).foreach { queue=>
+ rc.add_queue(queue)
+ }
+ Success(rc)
}
- def get_destination_matches(destination:Path) = {
+ def get_destination_matches(path:Path) = {
import JavaConversions._
- asIterable(destinations.get( destination ))
+ asIterable(destinations.get( path ))
}
- def _create_queue(id:Long, binding:Binding):Queue = {
+ def _create_queue(id:Long, binding:Binding, security:SecurityContext):Result[Queue,String] = {
val config = host.queue_config(binding).getOrElse(new QueueDTO)
+ if( host.authorizer!=null && security!=null && !host.authorizer.can_create(security, host, config) ) {
+ return Failure("Not authorized to create the queue")
+ }
var qid = id
if( qid == -1 ) {
@@ -143,7 +161,7 @@ class Router(val host:VirtualHost) exten
record.binding_data = binding.binding_data
record.binding_kind = binding.binding_kind
- host.store.addQueue(record) { rc => }
+ host.store.addQueue(record) { rc => Unit }
}
queue.start
@@ -156,8 +174,11 @@ class Router(val host:VirtualHost) exten
bindings.put(name, queue)
// make sure the destination is created if this is not a wild card sub
if( !PathParser.containsWildCards(name) ) {
- create_destination_or(name) { node=>
- node.add_queue(queue)
+ _get_destination(name) match {
+ case Some(node)=>
+ node.add_queue(queue)
+ case None=>
+ _create_destination(name, null)
}
} else {
get_destination_matches(name).foreach( node=>
@@ -166,60 +187,66 @@ class Router(val host:VirtualHost) exten
}
}
- queue
+ Success(queue)
+
}
- def create_queue(record:QueueRecord) = {
- _create_queue(record.key, BindingFactory.create(record.binding_kind, record.binding_data))
+ def create_queue(record:QueueRecord, security:SecurityContext) = {
+ _create_queue(record.key, BindingFactory.create(record.binding_kind, record.binding_data), security)
}
/**
* Returns the previously created queue if it already existed.
*/
- def _create_queue(dto: BindingDTO): Option[Queue] = {
+ def _get_or_create_queue(dto: BindingDTO, security:SecurityContext): Result[Queue, String] = {
val binding = BindingFactory.create(dto)
val queue = queue_bindings.get(binding) match {
- case Some(queue) => Some(queue)
- case None => Some(_create_queue(-1, binding))
+ case Some(queue) => Success(queue)
+ case None => _create_queue(-1, binding, security)
}
queue
}
- def create_queue(id:BindingDTO) = dispatchQueue ! {
- _create_queue(id)
+ def get_or_create_queue(id:BindingDTO, security:SecurityContext) = dispatchQueue ! {
+ _get_or_create_queue(id, security)
}
/**
* Returns true if the queue no longer exists.
*/
- def destroy_queue(dto:BindingDTO) = dispatchQueue ! { _destroy_queue(dto) }
+ def destroy_queue(dto:BindingDTO, security:SecurityContext) = dispatchQueue ! { _destroy_queue(dto, security) }
- def _destroy_queue(dto:BindingDTO):Boolean = {
+ def _destroy_queue(dto:BindingDTO, security:SecurityContext):Result[Zilch, String] = {
queue_bindings.get(BindingFactory.create(dto)) match {
case Some(queue) =>
- _destroy_queue(queue)
- true
+ _destroy_queue(queue, security)
case None =>
- true
+ Failure("Does not exist")
}
}
/**
* Returns true if the queue no longer exists.
*/
- def destroy_queue(id:Long) = dispatchQueue ! { _destroy_queue(id) }
+ def destroy_queue(id:Long, security:SecurityContext) = dispatchQueue ! { _destroy_queue(id,security) }
- def _destroy_queue(id:Long):Boolean = {
+ def _destroy_queue(id:Long, security:SecurityContext):Result[Zilch, String] = {
queues.get(id) match {
case Some(queue) =>
- _destroy_queue(queue)
- true
+ _destroy_queue(queue,security)
case None =>
- true
+ Failure("Does not exist")
}
}
- def _destroy_queue(queue:Queue):Unit = {
+ def _destroy_queue(queue:Queue, security:SecurityContext):Result[Zilch, String] = {
+
+ if( security!=null && queue.config.acl!=null ) {
+ if( !host.authorizer.can_destroy(security, host, queue.config) ) {
+ return Failure("Not authorized to destroy")
+ }
+ }
+
queue_bindings.remove(queue.binding)
queues.remove(queue.id)
@@ -232,9 +259,10 @@ class Router(val host:VirtualHost) exten
queue.stop
if( queue.tune_persistent ) {
queue.dispatchQueue ^ {
- host.store.removeQueue(queue.id){x=>}
+ host.store.removeQueue(queue.id){x=> Unit}
}
}
+ Success(Zilch)
}
/**
@@ -251,23 +279,38 @@ class Router(val host:VirtualHost) exten
queues.get(id)
}
- def bind(destination:Destination, consumer:DeliveryConsumer) = {
+ def bind(destination:Destination, consumer:DeliveryConsumer, security:SecurityContext) = {
consumer.retain
dispatchQueue ! {
- assert( is_topic(destination) )
+ def do_bind:Result[Zilch, String] = {
+ assert( is_topic(destination) )
+ val name = destination.name
- val name = destination.name
+ // A new destination is being created...
+ def config = host.destination_config(name).getOrElse(new DestinationDTO)
- // make sure the destination is created if this is not a wild card sub
- if( !PathParser.containsWildCards(name) ) {
- val node = create_destination_or(name) { node=> Unit }
- }
+ if( host.authorizer!=null && security!=null && !host.authorizer.can_receive_from(security, host, config) ) {
+ return new Failure("Not authorized to receive from the destination")
+ }
+
+ // make sure the destination is created if this is not a wild card sub
+ if( !PathParser.containsWildCards(name) ) {
+ val rc = _get_or_create_destination(name, security)
+ if( rc.failed ) {
+ return rc.map_success(_=> Zilch);
+ }
+ }
- get_destination_matches(name).foreach{ node=>
- node.add_broadcast_consumer(consumer)
+ get_destination_matches(name).foreach{ node=>
+ node.add_broadcast_consumer(consumer)
+ }
+ broadcast_consumers.put(name, consumer)
+ Success(Zilch)
}
- broadcast_consumers.put(name, consumer)
+
+ do_bind
+
}
}
@@ -281,44 +324,74 @@ class Router(val host:VirtualHost) exten
} >>: dispatchQueue
- def connect(destination:Destination, producer:DeliveryProducer)(completed: (DeliveryProducerRoute)=>Unit) = {
+ def connect(destination:Destination, producer:DeliveryProducer, security:SecurityContext)(completed: (Result[DeliveryProducerRoute,String])=>Unit) = {
val route = new DeliveryProducerRoute(this, destination, producer) {
override def on_connected = {
- completed(this);
+ completed(Success(this));
}
}
- dispatchQueue {
-
+ def do_connect:Result[Zilch, String] = {
val topic = is_topic(destination)
+
+ var destination_security = security
// Looking up the queue will cause it to get created if it does not exist.
- val queue = if( !topic ) {
+ val queue = if( topic ) {
+
+ def config = host.destination_config(destination.name).getOrElse(new DestinationDTO)
+ if( host.authorizer!=null && security!=null && !host.authorizer.can_send_to(security, host, config)) {
+ return new Failure("Not authorized to send to the destination")
+ }
+ None
+
+ } else {
+
val dto = new QueueBindingDTO
dto.destination = DestinationParser.encode_path(destination.name)
- _create_queue(dto)
- } else {
- None
+
+ // Can we send to the queue?
+ def config = host.queue_config(dto).getOrElse(new QueueDTO)
+ if( host.authorizer!=null && security!=null && !host.authorizer.can_send_to(security, host, config) ) {
+ return Failure("Not authorized to send to the queue")
+ }
+
+ destination_security = null
+ val rc = _get_or_create_queue(dto, security)
+ if( rc.failed ) {
+ return rc.map_success(_=>Zilch)
+ }
+ Some(rc.success)
}
- val node = create_destination_or(destination.name) { node=> Unit }
- if( node.unified || topic ) {
- node.add_broadcast_producer( route )
- } else {
- route.bind( queue.toList )
+ _get_or_create_destination(destination.name, security) match {
+ case Success(node)=>
+ if( node.unified || topic ) {
+ node.add_broadcast_producer( route )
+ } else {
+ route.bind( queue.toList )
+ }
+ route.connected()
+ Success(Zilch)
+
+ case Failure(reason)=>
+ Failure(reason)
}
+ }
- route.connected()
+ dispatchQueue {
+ do_connect.failure_option.foreach(x=> producer.dispatchQueue { completed(Failure(x)) } )
}
+
}
def disconnect(route:DeliveryProducerRoute) = releasing(route) {
-
- val topic = is_topic(route.destination)
- val node = create_destination_or(route.destination.name) { node=> Unit }
- if( node.unified || topic ) {
- node.remove_broadcast_producer(route)
+ _get_destination(route.destination.name).foreach { node=>
+ val topic = is_topic(route.destination)
+ if( node.unified || topic ) {
+ node.remove_broadcast_producer(route)
+ }
}
route.disconnected()
@@ -326,14 +399,10 @@ class Router(val host:VirtualHost) exten
}
-object RoutingNode {
- val DEFAULT_CONFIG = new DestinationDTO
-}
/**
* Tracks state associated with a destination name.
*/
-class RoutingNode(val router:Router, val name:Path) {
- import RoutingNode._
+class RoutingNode(val router:Router, val name:Path, val config:DestinationDTO) {
val id = router.destination_id_counter.incrementAndGet
@@ -343,8 +412,6 @@ class RoutingNode(val router:Router, val
import OptionSupport._
- val config = router.host.destination_config(name).getOrElse(DEFAULT_CONFIG)
-
def unified = config.unified.getOrElse(false)
def slow_consumer_policy = config.slow_consumer_policy.getOrElse("block")
@@ -357,7 +424,7 @@ class RoutingNode(val router:Router, val
case "queue" =>
// create a temp queue so that it can spool
- val queue = router._create_queue(-1, new TempBinding(consumer))
+ val queue = router._create_queue(-1, new TempBinding(consumer), null).success
queue.dispatchQueue.setTargetQueue(consumer.dispatchQueue)
queue.bind(List(consumer))
@@ -391,7 +458,7 @@ class RoutingNode(val router:Router, val
val binding = new TempBinding(consumer)
if( queue.binding == binding ) {
queue.unbind(List(consumer))
- router._destroy_queue(queue.id)
+ router._destroy_queue(queue.id, null)
}
case _ =>
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=1044500&r1=1044499&r2=1044500&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Fri Dec 10 20:28:43 2010
@@ -117,9 +117,17 @@ class VirtualHost(val broker: Broker, va
val tracker = new LoggingTracker("virtual host startup", dispatchQueue)
if( config.authentication != null ) {
- authenticator = new JaasAuthenticator(config.authentication.domain)
- authorizer = new AclAuthorizer(config.authentication.kinds().toList)
+ if( config.authentication.enabled.getOrElse(true) ) {
+ // Virtual host has it's own settings.
+ authenticator = new JaasAuthenticator(config.authentication.domain)
+ authorizer = new AclAuthorizer(config.authentication.kinds().toList)
+ } else {
+ // Don't use security on this host.
+ authenticator = null
+ authorizer = null
+ }
} else {
+ // use the broker's settings..
authenticator = broker.authenticator
authorizer = broker.authorizer
}
@@ -172,7 +180,7 @@ class VirtualHost(val broker: Broker, va
x match {
case Some(record)=>
dispatchQueue ^{
- router.create_queue(record)
+ router.create_queue(record, null)
task.run
}
case _ =>
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/AclAuthorizer.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/AclAuthorizer.scala?rev=1044500&r1=1044499&r2=1044500&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/AclAuthorizer.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/AclAuthorizer.scala Fri Dec 10 20:28:43 2010
@@ -19,7 +19,7 @@ package org.apache.activemq.apollo.broke
import org.apache.activemq.apollo.broker.{Destination, VirtualHost, Broker}
import scala.util.continuations._
import org.apache.activemq.apollo.util.path.Path
-import org.apache.activemq.apollo.dto.{PrincipalDTO, QueueAclDTO, DestinationAclDTO, BindingDTO}
+import org.apache.activemq.apollo.dto._
/**
* <p>
@@ -34,15 +34,11 @@ class AclAuthorizer(val default_kinds:Li
var allow_deafult = true
- private def sync[T](func: =>T): T @suspendable = shift { k: (T=>Unit) =>
- k(func)
- }
-
def is_in(ctx: SecurityContext, allowed:java.util.Set[PrincipalDTO]):Boolean = {
ctx.intersects(allowed.toSet, default_kinds)
}
- def can_admin(ctx: SecurityContext, broker: Broker) = sync {
+ def can_admin(ctx: SecurityContext, broker: Broker) = {
if( broker.config.acl!=null ) {
is_in(ctx, broker.config.acl.admins)
} else {
@@ -50,7 +46,7 @@ class AclAuthorizer(val default_kinds:Li
}
}
- def can_connect_to(ctx: SecurityContext, host: VirtualHost) = sync {
+ def can_connect_to(ctx: SecurityContext, host: VirtualHost) = {
if( host.config.acl!=null ) {
is_in(ctx, host.config.acl.connects)
} else {
@@ -58,58 +54,53 @@ class AclAuthorizer(val default_kinds:Li
}
}
-
- private def for_dest(ctx: SecurityContext, host: VirtualHost, dest: Path)(func: DestinationAclDTO=>java.util.Set[PrincipalDTO]) = {
- host.destination_config(dest).map { config=>
- if( config.acl!=null ) {
- is_in(ctx, func(config.acl))
- } else {
- allow_deafult
- }
- }.getOrElse(allow_deafult)
+ private def can_dest(ctx: SecurityContext, host: VirtualHost, dest: DestinationDTO)(func: DestinationAclDTO=>java.util.Set[PrincipalDTO]) = {
+ if( dest.acl!=null ) {
+ is_in(ctx, func(dest.acl))
+ } else {
+ allow_deafult
+ }
}
- def can_send_to(ctx: SecurityContext, host: VirtualHost, dest: Path) = sync {
- for_dest(ctx, host, dest)(_.sends)
+ def can_send_to(ctx: SecurityContext, host: VirtualHost, dest: DestinationDTO) = {
+ can_dest(ctx, host, dest)(_.sends)
}
- def can_receive_from(ctx: SecurityContext, host: VirtualHost, dest: Path) = sync {
- for_dest(ctx, host, dest)(_.receives)
+ def can_receive_from(ctx: SecurityContext, host: VirtualHost, dest: DestinationDTO) = {
+ can_dest(ctx, host, dest)(_.receives)
}
- def can_destroy(ctx: SecurityContext, host: VirtualHost, dest: Path) = sync {
- for_dest(ctx, host, dest)(_.destroys)
+ def can_destroy(ctx: SecurityContext, host: VirtualHost, dest: DestinationDTO) = {
+ can_dest(ctx, host, dest)(_.destroys)
}
- def can_create(ctx: SecurityContext, host: VirtualHost, dest: Path) = sync {
- for_dest(ctx, host, dest)(_.creates)
+ def can_create(ctx: SecurityContext, host: VirtualHost, dest: DestinationDTO) = {
+ can_dest(ctx, host, dest)(_.creates)
}
- private def for_queue(ctx: SecurityContext, host: VirtualHost, dto: BindingDTO)(func: QueueAclDTO=>java.util.Set[PrincipalDTO]) = {
- host.queue_config(dto).map { config=>
- if( config.acl!=null ) {
- is_in(ctx, func(config.acl))
- } else {
- allow_deafult
- }
- }.getOrElse(allow_deafult)
+ private def can_queue(ctx: SecurityContext, host: VirtualHost, queue: QueueDTO)(func: QueueAclDTO=>java.util.Set[PrincipalDTO]) = {
+ if( queue.acl!=null ) {
+ is_in(ctx, func(queue.acl))
+ } else {
+ allow_deafult
+ }
}
- def can_send_to(ctx: SecurityContext, host: VirtualHost, dest: BindingDTO) = sync {
- for_queue(ctx, host, dest)(_.sends)
+ def can_send_to(ctx: SecurityContext, host: VirtualHost, queue: QueueDTO) = {
+ can_queue(ctx, host, queue)(_.sends)
}
- def can_receive_from(ctx: SecurityContext, host: VirtualHost, dest: BindingDTO) = sync {
- for_queue(ctx, host, dest)(_.receives)
+ def can_receive_from(ctx: SecurityContext, host: VirtualHost, queue: QueueDTO) = {
+ can_queue(ctx, host, queue)(_.receives)
}
- def can_destroy(ctx: SecurityContext, host: VirtualHost, dest: BindingDTO) = sync {
- for_queue(ctx, host, dest)(_.destroys)
+ def can_destroy(ctx: SecurityContext, host: VirtualHost, queue: QueueDTO) = {
+ can_queue(ctx, host, queue)(_.destroys)
}
- def can_create(ctx: SecurityContext, host: VirtualHost, dest: BindingDTO) = sync {
- for_queue(ctx, host, dest)(_.creates)
+ def can_create(ctx: SecurityContext, host: VirtualHost, queue: QueueDTO) = {
+ can_queue(ctx, host, queue)(_.creates)
}
- def can_consume_from(ctx: SecurityContext, host: VirtualHost, dest: BindingDTO) = sync {
- for_queue(ctx, host, dest)(_.consumes)
+ def can_consume_from(ctx: SecurityContext, host: VirtualHost, queue: QueueDTO) = {
+ can_queue(ctx, host, queue)(_.consumes)
}
}
\ No newline at end of file
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/Authorizer.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/Authorizer.scala?rev=1044500&r1=1044499&r2=1044500&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/Authorizer.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/Authorizer.scala Fri Dec 10 20:28:43 2010
@@ -16,9 +16,9 @@
*/
package org.apache.activemq.apollo.broker.security
import scala.util.continuations._
-import org.apache.activemq.apollo.dto.BindingDTO
import org.apache.activemq.apollo.broker.{VirtualHost, Broker, Destination}
import org.apache.activemq.apollo.util.path.Path
+import org.apache.activemq.apollo.dto.{DestinationDTO, QueueDTO, BindingDTO}
/**
* <p>
@@ -31,56 +31,57 @@ trait Authorizer {
/**
* @returns true if the user is an admin.
*/
- def can_admin(ctx:SecurityContext, broker:Broker):Boolean @suspendable
+ def can_admin(ctx:SecurityContext, broker:Broker):Boolean
/**
* @returns true if the user is allowed to connect to the virtual host
*/
- def can_connect_to(ctx:SecurityContext, host:VirtualHost):Boolean @suspendable
+ def can_connect_to(ctx:SecurityContext, host:VirtualHost):Boolean
/**
* @returns true if the user is allowed to send to the destination
*/
- def can_send_to(ctx:SecurityContext, host:VirtualHost, dest:Path):Boolean @suspendable
+ def can_send_to(ctx:SecurityContext, host:VirtualHost, dest:DestinationDTO):Boolean
/**
* @returns true if the user is allowed to receive from the destination
*/
- def can_receive_from(ctx:SecurityContext, host:VirtualHost, dest:Path):Boolean @suspendable
+ def can_receive_from(ctx:SecurityContext, host:VirtualHost, dest:DestinationDTO):Boolean
/**
* @returns true if the user is allowed to create the destination
*/
- def can_create(ctx:SecurityContext, host:VirtualHost, dest:Path):Boolean @suspendable
+ def can_create(ctx:SecurityContext, host:VirtualHost, dest:DestinationDTO):Boolean
/**
* @returns true if the user is allowed to destroy the destination
*/
- def can_destroy(ctx:SecurityContext, host:VirtualHost, dest:Path):Boolean @suspendable
+ def can_destroy(ctx:SecurityContext, host:VirtualHost, dest:DestinationDTO):Boolean
+
/**
* @returns true if the user is allowed to send to the queue
*/
- def can_send_to(ctx:SecurityContext, host:VirtualHost, dest:BindingDTO):Boolean @suspendable
+ def can_send_to(ctx:SecurityContext, host:VirtualHost, queue:QueueDTO):Boolean
/**
* @returns true if the user is allowed to receive from the queue
*/
- def can_receive_from(ctx:SecurityContext, host:VirtualHost, dest:BindingDTO):Boolean @suspendable
+ def can_receive_from(ctx:SecurityContext, host:VirtualHost, queue:QueueDTO):Boolean
/**
* @returns true if the user is allowed to consume from the queue
*/
- def can_consume_from(ctx:SecurityContext, host:VirtualHost, dest:BindingDTO):Boolean @suspendable
+ def can_consume_from(ctx:SecurityContext, host:VirtualHost, queue:QueueDTO):Boolean
/**
* @returns true if the user is allowed to create the queue
*/
- def can_create(ctx:SecurityContext, host:VirtualHost, dest:BindingDTO):Boolean @suspendable
+ def can_create(ctx:SecurityContext, host:VirtualHost, queue:QueueDTO):Boolean
/**
* @returns true if the user is allowed to destroy the queue
*/
- def can_destroy(ctx:SecurityContext, host:VirtualHost, dest:BindingDTO):Boolean @suspendable
+ def can_destroy(ctx:SecurityContext, host:VirtualHost, queue:QueueDTO):Boolean
}
\ No newline at end of file
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/JaasAuthenticator.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/JaasAuthenticator.scala?rev=1044500&r1=1044499&r2=1044500&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/JaasAuthenticator.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/JaasAuthenticator.scala Fri Dec 10 20:28:43 2010
@@ -44,34 +44,37 @@ class JaasAuthenticator(val jass_realm:
* potentially perform a blocking wait (e.g. LDAP request).
*/
def authenticate(security_ctx: SecurityContext) = BLOCKABLE_THREAD_POOL ! {
+ _authenticate(security_ctx)
+ }
+ def _authenticate(security_ctx: SecurityContext): Boolean = {
val original = Thread.currentThread().getContextClassLoader()
Thread.currentThread().setContextClassLoader(getClass.getClassLoader())
try {
- val login_ctx = new LoginContext(jass_realm, new CallbackHandler {
+ security_ctx.login_context = new LoginContext(jass_realm, new CallbackHandler {
def handle(callbacks: Array[Callback]) = {
- callbacks.foreach{ callback =>
- callback match {
- case x: NameCallback => x.setName(security_ctx.user)
- case x: PasswordCallback => x.setPassword(security_ctx.password.getOrElse("").toCharArray)
- case x: CertificateCallback => x.setCertificates(security_ctx.certificates)
- case _ => throw new UnsupportedCallbackException(callback)
- }
+ callbacks.foreach{
+ callback =>
+ callback match {
+ case x: NameCallback => x.setName(security_ctx.user)
+ case x: PasswordCallback => x.setPassword(security_ctx.password.getOrElse("").toCharArray)
+ case x: CertificateCallback => x.setCertificates(security_ctx.certificates)
+ case _ => throw new UnsupportedCallbackException(callback)
+ }
}
}
})
- login_ctx.login()
- security_ctx.subject = login_ctx.getSubject()
+ security_ctx.login_context.login()
+ security_ctx.subject = security_ctx.login_context.getSubject()
true
} catch {
- case x:Exception =>
- false
+ case x: Exception =>
+ false
} finally {
Thread.currentThread().setContextClassLoader(original)
}
}
-
}
\ No newline at end of file
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/SecurityContext.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/SecurityContext.scala?rev=1044500&r1=1044499&r2=1044500&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/SecurityContext.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/SecurityContext.scala Fri Dec 10 20:28:43 2010
@@ -23,6 +23,7 @@ import java.security.cert.X509Certificat
import org.apache.activemq.apollo.util.OptionSupport._
import org.apache.activemq.jaas.{GroupPrincipal, UserPrincipal}
import org.apache.activemq.apollo.dto.PrincipalDTO
+import javax.security.auth.login.LoginContext
/**
* <p>
@@ -36,6 +37,8 @@ class SecurityContext {
var password:String = _
var certificates = Array[X509Certificate]()
+ var login_context:LoginContext = _
+
private val principles = new HashSet[PrincipalDTO]()
private var _subject:Subject = _
@@ -55,11 +58,12 @@ class SecurityContext {
def intersects(values:Set[PrincipalDTO], default_kinds:List[String]):Boolean = {
val (v1, v2) = values.partition(_.kind == null)
- if( principles.intersect(v2).isEmpty ) {
+ if( !principles.intersect(v2).isEmpty ) {
return true
}
default_kinds.foreach { x=>
- if( ! (v1.map(y=> new PrincipalDTO(y.name, x) ).intersect(v1).isEmpty) ) {
+ val kinda_added = v1.map(y=> new PrincipalDTO(y.name, x))
+ if( ! principles.intersect(kinda_added).isEmpty ) {
return true
}
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/DestinationConfigurationTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/DestinationConfigurationTest.scala?rev=1044500&r1=1044499&r2=1044500&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/DestinationConfigurationTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/DestinationConfigurationTest.scala Fri Dec 10 20:28:43 2010
@@ -54,7 +54,7 @@ class DestinationConfigurationTest exten
def check_tune_queue_buffer(expected:Int)(dto:BindingDTO) = {
var actual=0
reset {
- var q = router.create_queue(dto).get
+ var q = router.get_or_create_queue(dto, null).success
actual = q.tune_queue_buffer
}
expect(expected) {actual}
Modified: activemq/activemq-apollo/trunk/apollo-cli/src/main/resources/org/apache/activemq/apollo/cli/commands/etc/apollo-ssl.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-cli/src/main/resources/org/apache/activemq/apollo/cli/commands/etc/apollo-ssl.xml?rev=1044500&r1=1044499&r2=1044500&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-cli/src/main/resources/org/apache/activemq/apollo/cli/commands/etc/apollo-ssl.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-cli/src/main/resources/org/apache/activemq/apollo/cli/commands/etc/apollo-ssl.xml Fri Dec 10 20:28:43 2010
@@ -19,6 +19,7 @@
The default configuration with tls/ssl enabled.
</notes>
+ <!-- used to secure the web admin interface -->
<authentication domain="apollo"/>
<acl>
<admin name="admins"/>
@@ -33,6 +34,37 @@
-->
<host-name>localhost</host-name>
+ <!-- Uncomment to disable security for the virtual host
+ <authentication enabled="false"/>
+ -->
+
+ <!--
+ You can add an 'acl' element to the virtual host,
+ destination, or queue elements to restrict the operations
+ that a user can take. If the acl element is not set on
+ an object, then access is not restricted at all.
+ -->
+ <acl>
+ <connect name="admins"/>
+ </acl>
+ <destination path="secure.**">
+ <acl>
+ <create name="admins"/>
+ <destroy name="admins"/>
+ <send name="admins"/>
+ <receive name="admins"/>
+ </acl>
+ </destination>
+ <queue path="secure.**">
+ <acl>
+ <create name="admins"/>
+ <destroy name="admins"/>
+ <send name="admins"/>
+ <receive name="admins"/> <!-- queue browsers -->
+ <consume name="admins"/> <!-- regular consumers -->
+ </acl>
+ </queue>
+
<!--
Examples of how to configure destinations and queues. Note
they you can use wildcard patterns when specifying destinations,
Modified: activemq/activemq-apollo/trunk/apollo-cli/src/main/resources/org/apache/activemq/apollo/cli/commands/etc/apollo.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-cli/src/main/resources/org/apache/activemq/apollo/cli/commands/etc/apollo.xml?rev=1044500&r1=1044499&r2=1044500&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-cli/src/main/resources/org/apache/activemq/apollo/cli/commands/etc/apollo.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-cli/src/main/resources/org/apache/activemq/apollo/cli/commands/etc/apollo.xml Fri Dec 10 20:28:43 2010
@@ -20,6 +20,7 @@
The default configuration.
</notes>
+ <!-- used to secure the web admin interface -->
<authentication domain="apollo"/>
<acl>
<admin name="admins"/>
@@ -35,6 +36,38 @@
-->
<host-name>localhost</host-name>
+ <!-- Uncomment to disable security for the virtual host
+ <authentication enabled="false"/>
+ -->
+
+ <!--
+ You can add an 'acl' element to the virtual host,
+ destination, or queue elements to restrict the operations
+ that a user can take. If the acl element is not set on
+ an object, then access is not restricted at all.
+ -->
+ <acl>
+ <connect name="admins"/>
+ </acl>
+ <destination path="secure.**">
+ <acl>
+ <create name="admins"/>
+ <destroy name="admins"/>
+ <send name="admins"/>
+ <receive name="admins"/>
+ </acl>
+ </destination>
+ <queue path="secure.**">
+ <acl>
+ <create name="admins"/>
+ <destroy name="admins"/>
+ <send name="admins"/>
+ <receive name="admins"/> <!-- queue browsers -->
+ <consume name="admins"/> <!-- regular consumers -->
+ </acl>
+ </queue>
+
+
<!--
Examples of how to configure destinations and queues. Note
they you can use wildcard patterns when specifying destinations,
Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AuthenticationDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AuthenticationDTO.java?rev=1044500&r1=1044499&r2=1044500&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AuthenticationDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AuthenticationDTO.java Fri Dec 10 20:28:43 2010
@@ -35,6 +35,9 @@ import java.util.Set;
public class AuthenticationDTO {
@XmlAttribute
+ public Boolean enabled;
+
+ @XmlAttribute
public String domain;
@XmlElement(name="kind")
Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationAclDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationAclDTO.java?rev=1044500&r1=1044499&r2=1044500&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationAclDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationAclDTO.java Fri Dec 10 20:28:43 2010
@@ -30,7 +30,7 @@ import java.util.*;
@XmlAccessorType(XmlAccessType.FIELD)
public class DestinationAclDTO {
- @XmlElement(name="creates")
+ @XmlElement(name="create")
public Set<PrincipalDTO> creates = new HashSet<PrincipalDTO>();
@XmlElement(name="destroy")
Modified: activemq/activemq-apollo/trunk/apollo-dto/src/test/java/org/apache/activemq/apollo/dto/XmlCodecTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/test/java/org/apache/activemq/apollo/dto/XmlCodecTest.java?rev=1044500&r1=1044499&r2=1044500&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/test/java/org/apache/activemq/apollo/dto/XmlCodecTest.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/test/java/org/apache/activemq/apollo/dto/XmlCodecTest.java Fri Dec 10 20:28:43 2010
@@ -35,12 +35,14 @@ public class XmlCodecTest {
@Test
public void unmarshalling() throws Exception {
- BrokerDTO dto = XmlCodec.unmarshalBrokerDTO(resource("simple.xml"));
+ BrokerDTO dto = XmlCodec.unmarshalBrokerDTO(resource("XmlCodecTest.xml"));
assertNotNull(dto);
assertEquals("default", dto.id);
- assertEquals("vh-local", dto.virtual_hosts.get(0).id);
- assertEquals("localhost", dto.virtual_hosts.get(0).host_names.get(0));
- assertEquals("example.com", dto.virtual_hosts.get(0).host_names.get(1));
+ VirtualHostDTO host = dto.virtual_hosts.get(0);
+ assertNotNull(host.acl);
+ assertEquals("vh-local", host.id);
+ assertEquals("localhost", host.host_names.get(0));
+ assertEquals("example.com", host.host_names.get(1));
assertNotNull(dto.acl);
assertTrue(dto.acl.admins.contains(new PrincipalDTO("hiram")));
Copied: activemq/activemq-apollo/trunk/apollo-dto/src/test/resources/org/apache/activemq/apollo/dto/XmlCodecTest.xml (from r1044356, activemq/activemq-apollo/trunk/apollo-dto/src/test/resources/org/apache/activemq/apollo/dto/simple.xml)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/test/resources/org/apache/activemq/apollo/dto/XmlCodecTest.xml?p2=activemq/activemq-apollo/trunk/apollo-dto/src/test/resources/org/apache/activemq/apollo/dto/XmlCodecTest.xml&p1=activemq/activemq-apollo/trunk/apollo-dto/src/test/resources/org/apache/activemq/apollo/dto/simple.xml&r1=1044356&r2=1044500&rev=1044500&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/test/resources/org/apache/activemq/apollo/dto/simple.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/test/resources/org/apache/activemq/apollo/dto/XmlCodecTest.xml Fri Dec 10 20:28:43 2010
@@ -24,6 +24,7 @@
</acl>
<virtual-host enabled="true" id="vh-local">
+ <acl/>
<host-name>localhost</host-name>
<host-name>example.com</host-name>
</virtual-host>
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1044500&r1=1044499&r2=1044500&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Fri Dec 10 20:28:43 2010
@@ -227,6 +227,7 @@ class StompProtocolHandler extends Proto
var session_manager:SinkMux[StompFrame] = null
var connection_sink:Sink[StompFrame] = null
+ var dead = false
var closed = false
var consumers = Map[AsciiBuffer, StompConsumer]()
@@ -279,9 +280,11 @@ class StompProtocolHandler extends Proto
}
private def die[T](headers:HeaderMap, body:String):T = {
- if( !connection.stopped ) {
- suspendRead("shutdown")
- connection.transport.offer(StompFrame(ERROR, headers, BufferContent(ascii(body))) )
+ if( !dead ) {
+ dead = true
+ waiting_on = "shutdown"
+ connection.transport.resumeRead
+ connection_sink.offer(StompFrame(ERROR, headers, BufferContent(ascii(body))) )
// TODO: if there are too many open connections we should just close the connection
// without waiting for the error to get sent to the client.
queue.after(die_delay, TimeUnit.MILLISECONDS) {
@@ -306,6 +309,7 @@ class StompProtocolHandler extends Proto
if( !closed ) {
heart_beat_monitor.stop
closed=true;
+ dead = true;
import collection.JavaConversions._
producerRoutes.foreach{
@@ -329,7 +333,11 @@ class StompProtocolHandler extends Proto
}
- override def onTransportCommand(command:Any) = {
+ override def onTransportCommand(command:Any):Unit = {
+ if( dead ) {
+ // We stop processing client commands once we are dead
+ return;
+ }
try {
command match {
case s:StompCodec =>
@@ -405,8 +413,8 @@ class StompProtocolHandler extends Proto
def on_stomp_connect(headers:HeaderMap):Unit = {
- security_context.user = get(headers, LOGIN).toString
- security_context.password = get(headers, PASSCODE).toString
+ security_context.user = get(headers, LOGIN).map(_.toString).getOrElse(null)
+ security_context.password = get(headers, PASSCODE).map(_.toString).getOrElse(null)
val accept_versions = get(headers, ACCEPT_VERSION).getOrElse(V1_0).split(COMMA).map(_.ascii)
protocol_version = SUPPORTED_PROTOCOL_VERSIONS.find( v=> accept_versions.contains(v) ) match {
@@ -454,6 +462,26 @@ class StompProtocolHandler extends Proto
}
def noop = shift { k: (Unit=>Unit) => k() }
+
+ def send_connected = {
+ val outbound_heart_beat_header = ascii("%d,%d".format(outbound_heartbeat,inbound_heartbeat))
+ session_id = ascii(this.host.config.id + ":"+this.host.session_counter.incrementAndGet)
+ if( connection_sink==null ) {
+ weird(headers)
+ }
+ connection_sink.offer(
+ StompFrame(CONNECTED, List(
+ (VERSION, protocol_version),
+ (SESSION, session_id),
+ (HEART_BEAT, outbound_heart_beat_header)
+ )))
+
+ if( this.host.direct_buffer_pool!=null ) {
+ val wf = connection.transport.getProtocolCodec.asInstanceOf[StompCodec]
+ wf.memory_pool = this.host.direct_buffer_pool
+ }
+ }
+
reset {
suspendRead("virtual host lookup")
val host_header = get(headers, HOST)
@@ -464,43 +492,29 @@ class StompProtocolHandler extends Proto
connection.connector.broker.getVirtualHost(host)
}
resumeRead
+
if(host==null) {
async_die("Invalid virtual host: "+host_header.get)
- noop // to make the cps compiler plugin happy.
+ noop
} else {
this.host=host
-
- var authenticated = true;
-
- if( host.authenticator!=null ) {
- suspendRead("authenticating")
- authenticated = host.authenticator.authenticate(security_context)
- resumeRead
- } else {
- noop // to make the cps compiler plugin happy.
- }
-
- if( !authenticated ) {
+ if( host.authenticator!=null && host.authorizer!=null ) {
+ suspendRead("authenticating and authorizing connect")
+ if( !host.authenticator.authenticate(security_context) ) {
async_die("Authentication failed.")
- } else {
- val outbound_heart_beat_header = ascii("%d,%d".format(outbound_heartbeat,inbound_heartbeat))
- session_id = ascii(this.host.config.id + ":"+this.host.session_counter.incrementAndGet)
- if( connection_sink==null ) {
- weird(headers)
- }
- connection_sink.offer(
- StompFrame(CONNECTED, List(
- (VERSION, protocol_version),
- (SESSION, session_id),
- (HEART_BEAT, outbound_heart_beat_header)
- )))
-
- if( this.host.direct_buffer_pool!=null ) {
- val wf = connection.transport.getProtocolCodec.asInstanceOf[StompCodec]
- wf.memory_pool = this.host.direct_buffer_pool
+ noop // to make the cps compiler plugin happy.
+ } else if( !host.authorizer.can_connect_to(security_context, host) ) {
+ async_die("Connect not authorized.")
+ noop // to make the cps compiler plugin happy.
+ } else {
+ resumeRead
+ send_connected
+ noop // to make the cps compiler plugin happy.
}
+ } else {
+ send_connected
+ noop // to make the cps compiler plugin happy.
}
-
}
}
@@ -557,8 +571,11 @@ class StompProtocolHandler extends Proto
// don't process frames until producer is connected...
connection.transport.suspendRead
- host.router.connect(destiantion, producer) {
- route =>
+ host.router.connect(destiantion, producer, security_context) {
+ case Failure(reason) =>
+ async_die(reason)
+
+ case Success(route) =>
if (!connection.stopped) {
resumeRead
route.refiller = ^ {
@@ -708,21 +725,32 @@ class StompProtocolHandler extends Proto
// consumer is bind bound as a topic
reset {
- host.router.bind(destination, consumer)
- send_receipt(headers)
+ val rc = host.router.bind(destination, consumer, security_context)
consumer.release
+ rc match {
+ case Failure(reason)=>
+ async_die(reason)
+ case _=>
+ send_receipt(headers)
+ }
}
} else {
reset {
// create a queue and bind the consumer to it.
- val x= host.router.create_queue(binding)
+ val x= host.router.get_or_create_queue(binding, security_context)
x match {
- case Some(queue:Queue) =>
- queue.bind(consumer::Nil)
- send_receipt(headers)
+ case Success(queue) =>
+ val rc = queue.bind(consumer, security_context)
consumer.release
- case None => async_die("case not yet implemented.")
+ rc match {
+ case Failure(reason)=>
+ async_die(reason)
+ case _ =>
+ send_receipt(headers)
+ }
+ case Failure(reason) =>
+ async_die(reason)
}
}
}
@@ -763,8 +791,13 @@ class StompProtocolHandler extends Proto
if( persistent && consumer.binding!=null ) {
reset {
- val sucess = host.router.destroy_queue(consumer.binding)
- send_receipt(headers)
+ val rc = host.router.destroy_queue(consumer.binding, security_context)
+ rc match {
+ case Failure(reason) =>
+ async_die(reason)
+ case Success(_) =>
+ send_receipt(headers)
+ }
}
} else {
send_receipt(headers)
Added: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Result.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Result.scala?rev=1044500&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Result.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Result.scala Fri Dec 10 20:28:43 2010
@@ -0,0 +1,67 @@
+package org.apache.activemq.apollo.util
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <p>A Result can either be a Success or a Failure</p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+sealed abstract class Result[+S,+F] {
+ def failed:Boolean
+
+ def success:S
+ def failure:F
+
+ def success_option:Option[S] = if (failed) None else Some(success)
+ def failure_option:Option[F] = if (failed) Some(failure) else None
+
+ def map_success[B](f: S => B): Result[B, F] =
+ if (failed) Failure(failure) else Success(f(this.success))
+
+ def map_failure[B](f: F => B): Result[S, B] =
+ if (failed) Failure(f(this.failure)) else Success(this.success)
+
+}
+
+/**
+ * <p>A Success Result</p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+final case class Success[+S](x: S) extends Result[S, Nothing] {
+ def get = x
+ def success = x
+ def failure = throw new NoSuchElementException("Success.failure")
+ def failed = false
+}
+
+/**
+ * <p>A Failure Result</p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+final case class Failure[+F](x: F) extends Result[Nothing,F] {
+ def get = x
+ def success = throw new NoSuchElementException("Failure.success")
+ def failure = x
+ def failed = true
+}
+
+sealed class Zilch
+final case object Zilch extends Zilch
\ No newline at end of file