You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/12/10 21:28:44 UTC

svn commit: r1044500 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/ apollo-broker/src/test/scala/org/apache/activemq/apollo/...

Author: chirino
Date: Fri Dec 10 20:28:43 2010
New Revision: 1044500

URL: http://svn.apache.org/viewvc?rev=1044500&view=rev
Log:
Broker security model is now working /w stomp fine.

Added:
    activemq/activemq-apollo/trunk/apollo-dto/src/test/resources/org/apache/activemq/apollo/dto/XmlCodecTest.xml
      - copied, changed from r1044356, activemq/activemq-apollo/trunk/apollo-dto/src/test/resources/org/apache/activemq/apollo/dto/simple.xml
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Result.scala
Removed:
    activemq/activemq-apollo/trunk/apollo-dto/src/test/resources/org/apache/activemq/apollo/dto/simple.xml
Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ConfigStore.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/AclAuthorizer.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/Authorizer.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/JaasAuthenticator.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/SecurityContext.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/DestinationConfigurationTest.scala
    activemq/activemq-apollo/trunk/apollo-cli/src/main/resources/org/apache/activemq/apollo/cli/commands/etc/apollo-ssl.xml
    activemq/activemq-apollo/trunk/apollo-cli/src/main/resources/org/apache/activemq/apollo/cli/commands/etc/apollo.xml
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AuthenticationDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationAclDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/test/java/org/apache/activemq/apollo/dto/XmlCodecTest.java
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=1044500&r1=1044499&r2=1044500&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Fri Dec 10 20:28:43 2010
@@ -216,7 +216,8 @@ class Broker() extends BaseService with 
         key_storage.config = config.key_storage
       }
 
-      if( config.authentication != null ) {
+      import OptionSupport._
+      if( config.authentication != null && config.authentication.enabled.getOrElse(true) ) {
         authenticator = new JaasAuthenticator(config.authentication.domain)
         authorizer = new AclAuthorizer(config.authentication.kinds().toList)
       }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ConfigStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ConfigStore.scala?rev=1044500&r1=1044499&r2=1044500&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ConfigStore.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ConfigStore.scala Fri Dec 10 20:28:43 2010
@@ -122,24 +122,10 @@ class FileConfigStore extends ConfigStor
     }
     revs = revs.sortWith((x,y)=> x < y)
 
-    val last = revs.lastOption.map{ rev=>
-      val r = read(rev, fileRev(rev))
-      if( !file.exists ) {
-        write(r)
-      } else {
-        val x = read(rev, file)
-        if ( can_write && !Arrays.equals(r.data, x.data) ) {
-          write(x.copy(rev=x.rev+1))
-        } else {
-          x
-        }
-      }
-    } getOrElse {
-      if( file.exists ) {
-        read(1, file)
-      } else {
-        write(StoredBrokerModel(defaultConfig(1)))
-      }
+    val last = if( file.exists ) {
+      read(1, file)
+    } else {
+      write(StoredBrokerModel(defaultConfig(1)))
     }
 
     latest = last

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1044500&r1=1044499&r2=1044500&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Fri Dec 10 20:28:43 2010
@@ -32,6 +32,7 @@ import org.apache.activemq.apollo.util.l
 import org.fusesource.hawtdispatch.{Dispatch, ListEventAggregator, DispatchQueue, BaseRetained}
 import org.apache.activemq.apollo.dto.QueueDTO
 import OptionSupport._
+import security.SecurityContext
 
 object Queue extends Log {
   val subcsription_counter = new AtomicInteger(0)
@@ -505,6 +506,22 @@ class Queue(val host: VirtualHost, var i
 
   def connected() = {}
 
+  def bind(value: DeliveryConsumer, security:SecurityContext): Result[Zilch, String] = {
+    if(  host.authorizer!=null && security!=null ) {
+      if( value.browser ) {
+        if( !host.authorizer.can_receive_from(security, host, config) ) {
+          return new Failure("Not authorized to browse the queue")
+        }
+      } else {
+        if( !host.authorizer.can_consume_from(security, host, config) ) {
+          return new Failure("Not authorized to consume from the queue")
+        }
+      }
+    }
+    bind(value::Nil)
+    Success(Zilch)
+  }
+
   def bind(values: List[DeliveryConsumer]) = retaining(values) {
     for (consumer <- values) {
       val subscription = new Subscription(this, consumer)

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1044500&r1=1044499&r2=1044500&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Fri Dec 10 20:28:43 2010
@@ -31,6 +31,7 @@ import Buffer._
 import org.apache.activemq.apollo.util.path.{Path, Part, PathMap, PathParser}
 import java.util.ArrayList
 import org.apache.activemq.apollo.dto._
+import security.SecurityContext
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -94,41 +95,58 @@ class Router(val host:VirtualHost) exten
 
   def routing_nodes:Iterable[RoutingNode] = JavaConversions.asIterable(destinations.get(ALL))
   
-  def create_destination_or(destination:Path)(func:(RoutingNode)=>Unit):RoutingNode = {
+  def _get_or_create_destination(path:Path, security:SecurityContext) = {
+    // We can't create a wild card destination.. only wild card subscriptions.
+    assert( !PathParser.containsWildCards(path) )
+    var rc = destinations.chooseValue( path )
+    if( rc == null ) {
+      _create_destination(path, security)
+    } else {
+      Success(rc)
+    }
+  }
+
+  def _get_destination(path:Path) = {
+    Option(destinations.chooseValue( path ))
+  }
+
+  def _create_destination(path:Path, security:SecurityContext):Result[RoutingNode,String] = {
 
     // We can't create a wild card destination.. only wild card subscriptions.
-    assert( !PathParser.containsWildCards(destination) )
+    assert( !PathParser.containsWildCards(path) )
 
-    var rc = destinations.chooseValue( destination )
-    if( rc == null ) {
+    // A new destination is being created...
+    val config = host.destination_config(path).getOrElse(new DestinationDTO)
 
-      // A new destination is being created...
-      rc = new RoutingNode(this, destination )
-      destinations.put(destination, rc)
-
-      // bind any matching wild card subs
-      import JavaConversions._
-      broadcast_consumers.get( destination ).foreach { c=>
-        rc.add_broadcast_consumer(c)
-      }
-      bindings.get( destination ).foreach { queue=>
-        rc.add_queue(queue)
-      }
+    if(  host.authorizer!=null && security!=null && !host.authorizer.can_create(security, host, config)) {
+      return new Failure("Not authorized to create the destination")
+    }
 
-    } else {
-      func(rc)
+    val rc = new RoutingNode(this, path, config)
+    destinations.put(path, rc)
+
+    // bind any matching wild card subs
+    import JavaConversions._
+    broadcast_consumers.get( path ).foreach { c=>
+      rc.add_broadcast_consumer(c)
     }
-    rc
+    bindings.get( path ).foreach { queue=>
+      rc.add_queue(queue)
+    }
+    Success(rc)
   }
 
-  def get_destination_matches(destination:Path) = {
+  def get_destination_matches(path:Path) = {
     import JavaConversions._
-    asIterable(destinations.get( destination ))
+    asIterable(destinations.get( path ))
   }
 
-  def _create_queue(id:Long, binding:Binding):Queue = {
+  def _create_queue(id:Long, binding:Binding, security:SecurityContext):Result[Queue,String] = {
 
     val config = host.queue_config(binding).getOrElse(new QueueDTO)
+    if( host.authorizer!=null && security!=null && !host.authorizer.can_create(security, host, config) ) {
+      return Failure("Not authorized to create the queue")
+    }
 
     var qid = id
     if( qid == -1 ) {
@@ -143,7 +161,7 @@ class Router(val host:VirtualHost) exten
       record.binding_data = binding.binding_data
       record.binding_kind = binding.binding_kind
 
-      host.store.addQueue(record) { rc =>  }
+      host.store.addQueue(record) { rc => Unit }
 
     }
     queue.start
@@ -156,8 +174,11 @@ class Router(val host:VirtualHost) exten
       bindings.put(name, queue)
       // make sure the destination is created if this is not a wild card sub
       if( !PathParser.containsWildCards(name) ) {
-        create_destination_or(name) { node=>
-          node.add_queue(queue)
+        _get_destination(name) match {
+          case Some(node)=>
+            node.add_queue(queue)
+          case None=>
+            _create_destination(name, null)
         }
       } else {
         get_destination_matches(name).foreach( node=>
@@ -166,60 +187,66 @@ class Router(val host:VirtualHost) exten
       }
 
     }
-    queue
+    Success(queue)
+
   }
 
-  def create_queue(record:QueueRecord) = {
-    _create_queue(record.key, BindingFactory.create(record.binding_kind, record.binding_data))
+  def create_queue(record:QueueRecord, security:SecurityContext) = {
+    _create_queue(record.key, BindingFactory.create(record.binding_kind, record.binding_data), security)
   }
 
   /**
    * Returns the previously created queue if it already existed.
    */
-  def _create_queue(dto: BindingDTO): Option[Queue] = {
+  def _get_or_create_queue(dto: BindingDTO, security:SecurityContext): Result[Queue, String] = {
     val binding = BindingFactory.create(dto)
     val queue = queue_bindings.get(binding) match {
-      case Some(queue) => Some(queue)
-      case None => Some(_create_queue(-1, binding))
+      case Some(queue) => Success(queue)
+      case None => _create_queue(-1, binding, security)
     }
     queue
   }
 
-  def create_queue(id:BindingDTO) = dispatchQueue ! {
-    _create_queue(id)
+  def get_or_create_queue(id:BindingDTO, security:SecurityContext) = dispatchQueue ! {
+    _get_or_create_queue(id, security)
   }
 
   /**
    * Returns true if the queue no longer exists.
    */
-  def destroy_queue(dto:BindingDTO) = dispatchQueue ! { _destroy_queue(dto) }
+  def destroy_queue(dto:BindingDTO, security:SecurityContext) = dispatchQueue ! { _destroy_queue(dto, security) }
 
-  def _destroy_queue(dto:BindingDTO):Boolean = {
+  def _destroy_queue(dto:BindingDTO, security:SecurityContext):Result[Zilch, String] = {
     queue_bindings.get(BindingFactory.create(dto)) match {
       case Some(queue) =>
-        _destroy_queue(queue)
-        true
+        _destroy_queue(queue, security)
       case None =>
-        true
+        Failure("Does not exist")
     }
   }
 
   /**
    * Returns true if the queue no longer exists.
    */
-  def destroy_queue(id:Long) = dispatchQueue ! { _destroy_queue(id) }
+  def destroy_queue(id:Long, security:SecurityContext) = dispatchQueue ! { _destroy_queue(id,security) }
 
-  def _destroy_queue(id:Long):Boolean = {
+  def _destroy_queue(id:Long, security:SecurityContext):Result[Zilch, String] = {
     queues.get(id) match {
       case Some(queue) =>
-        _destroy_queue(queue)
-        true
+        _destroy_queue(queue,security)
       case None =>
-        true
+        Failure("Does not exist")
     }
   }
 
-  def _destroy_queue(queue:Queue):Unit = {
+  def _destroy_queue(queue:Queue, security:SecurityContext):Result[Zilch, String] = {
+
+    if( security!=null && queue.config.acl!=null ) {
+      if( !host.authorizer.can_destroy(security, host, queue.config) ) {
+        return Failure("Not authorized to destroy")
+      }
+    }
+
     queue_bindings.remove(queue.binding)
     queues.remove(queue.id)
 
@@ -232,9 +259,10 @@ class Router(val host:VirtualHost) exten
     queue.stop
     if( queue.tune_persistent ) {
       queue.dispatchQueue ^ {
-        host.store.removeQueue(queue.id){x=>}
+        host.store.removeQueue(queue.id){x=> Unit}
       }
     }
+    Success(Zilch)
   }
 
   /**
@@ -251,23 +279,38 @@ class Router(val host:VirtualHost) exten
     queues.get(id)
   }
 
-  def bind(destination:Destination, consumer:DeliveryConsumer) = {
+  def bind(destination:Destination, consumer:DeliveryConsumer, security:SecurityContext) = {
     consumer.retain
     dispatchQueue ! {
 
-      assert( is_topic(destination) )
+      def do_bind:Result[Zilch, String] = {
+        assert( is_topic(destination) )
+        val name = destination.name
 
-      val name = destination.name
+        // A new destination is being created...
+        def config = host.destination_config(name).getOrElse(new DestinationDTO)
 
-      // make sure the destination is created if this is not a wild card sub
-      if( !PathParser.containsWildCards(name) ) {
-        val node = create_destination_or(name) { node=> Unit }
-      }
+        if( host.authorizer!=null && security!=null && !host.authorizer.can_receive_from(security, host, config) ) {
+          return new Failure("Not authorized to receive from the destination")
+        }
+
+        // make sure the destination is created if this is not a wild card sub
+        if( !PathParser.containsWildCards(name) ) {
+          val rc = _get_or_create_destination(name, security)
+          if( rc.failed ) {
+            return rc.map_success(_=> Zilch);
+          }
+        }
 
-      get_destination_matches(name).foreach{ node=>
-        node.add_broadcast_consumer(consumer)
+        get_destination_matches(name).foreach{ node=>
+          node.add_broadcast_consumer(consumer)
+        }
+        broadcast_consumers.put(name, consumer)
+        Success(Zilch)
       }
-      broadcast_consumers.put(name, consumer)
+
+      do_bind
+
     }
   }
 
@@ -281,44 +324,74 @@ class Router(val host:VirtualHost) exten
   } >>: dispatchQueue
 
 
-  def connect(destination:Destination, producer:DeliveryProducer)(completed: (DeliveryProducerRoute)=>Unit) = {
+  def connect(destination:Destination, producer:DeliveryProducer, security:SecurityContext)(completed: (Result[DeliveryProducerRoute,String])=>Unit) = {
 
     val route = new DeliveryProducerRoute(this, destination, producer) {
       override def on_connected = {
-        completed(this);
+        completed(Success(this));
       }
     }
 
-    dispatchQueue {
-
+    def do_connect:Result[Zilch, String] = {
       val topic = is_topic(destination)
 
+
+      var destination_security = security
       // Looking up the queue will cause it to get created if it does not exist.
-      val queue = if( !topic ) {
+      val queue = if( topic ) {
+
+        def config = host.destination_config(destination.name).getOrElse(new DestinationDTO)
+        if( host.authorizer!=null && security!=null && !host.authorizer.can_send_to(security, host, config)) {
+          return new Failure("Not authorized to send to the destination")
+        }
+        None
+
+      } else {
+
         val dto = new QueueBindingDTO
         dto.destination = DestinationParser.encode_path(destination.name)
-        _create_queue(dto)
-      } else {
-        None
+
+        // Can we send to the queue?
+        def config = host.queue_config(dto).getOrElse(new QueueDTO)
+        if( host.authorizer!=null && security!=null && !host.authorizer.can_send_to(security, host, config) ) {
+          return Failure("Not authorized to send to the queue")
+        }
+
+        destination_security = null
+        val rc = _get_or_create_queue(dto, security)
+        if( rc.failed ) {
+          return rc.map_success(_=>Zilch)
+        }
+        Some(rc.success)
       }
 
-      val node = create_destination_or(destination.name) { node=> Unit }
-      if( node.unified || topic ) {
-        node.add_broadcast_producer( route )
-      } else {
-        route.bind( queue.toList )
+      _get_or_create_destination(destination.name, security) match {
+        case Success(node)=>
+          if( node.unified || topic ) {
+            node.add_broadcast_producer( route )
+          } else {
+            route.bind( queue.toList )
+          }
+          route.connected()
+          Success(Zilch)
+
+        case Failure(reason)=>
+          Failure(reason)
       }
+    }
 
-      route.connected()
+    dispatchQueue {
+      do_connect.failure_option.foreach(x=> producer.dispatchQueue { completed(Failure(x)) } )
     }
+
   }
 
   def disconnect(route:DeliveryProducerRoute) = releasing(route) {
-
-    val topic = is_topic(route.destination)
-    val node = create_destination_or(route.destination.name) { node=> Unit }
-    if( node.unified || topic ) {
-      node.remove_broadcast_producer(route)
+    _get_destination(route.destination.name).foreach { node=>
+      val topic = is_topic(route.destination)
+      if( node.unified || topic ) {
+        node.remove_broadcast_producer(route)
+      }
     }
     route.disconnected()
 
@@ -326,14 +399,10 @@ class Router(val host:VirtualHost) exten
 
 }
 
-object RoutingNode {
-  val DEFAULT_CONFIG = new DestinationDTO
-}
 /**
  * Tracks state associated with a destination name.
  */
-class RoutingNode(val router:Router, val name:Path) {
-  import RoutingNode._
+class RoutingNode(val router:Router, val name:Path, val config:DestinationDTO) {
 
   val id = router.destination_id_counter.incrementAndGet
 
@@ -343,8 +412,6 @@ class RoutingNode(val router:Router, val
 
   import OptionSupport._
 
-  val config = router.host.destination_config(name).getOrElse(DEFAULT_CONFIG)
-
   def unified = config.unified.getOrElse(false)
   def slow_consumer_policy = config.slow_consumer_policy.getOrElse("block")
 
@@ -357,7 +424,7 @@ class RoutingNode(val router:Router, val
       case "queue" =>
 
         // create a temp queue so that it can spool
-        val queue = router._create_queue(-1, new TempBinding(consumer))
+        val queue = router._create_queue(-1, new TempBinding(consumer), null).success
         queue.dispatchQueue.setTargetQueue(consumer.dispatchQueue)
         queue.bind(List(consumer))
 
@@ -391,7 +458,7 @@ class RoutingNode(val router:Router, val
         val binding = new TempBinding(consumer)
         if( queue.binding == binding ) {
           queue.unbind(List(consumer))
-          router._destroy_queue(queue.id)
+          router._destroy_queue(queue.id, null)
         }
       case _ =>
     }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=1044500&r1=1044499&r2=1044500&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Fri Dec 10 20:28:43 2010
@@ -117,9 +117,17 @@ class VirtualHost(val broker: Broker, va
     val tracker = new LoggingTracker("virtual host startup", dispatchQueue)
 
     if( config.authentication != null ) {
-      authenticator = new JaasAuthenticator(config.authentication.domain)
-      authorizer = new AclAuthorizer(config.authentication.kinds().toList)
+      if( config.authentication.enabled.getOrElse(true) ) {
+        // Virtual host has it's own settings.
+        authenticator = new JaasAuthenticator(config.authentication.domain)
+        authorizer = new AclAuthorizer(config.authentication.kinds().toList)
+      } else {
+        // Don't use security on this host.
+        authenticator = null
+        authorizer = null
+      }
     } else {
+      // use the broker's settings..
       authenticator = broker.authenticator
       authorizer = broker.authorizer
     }
@@ -172,7 +180,7 @@ class VirtualHost(val broker: Broker, va
                   x match {
                     case Some(record)=>
                     dispatchQueue ^{
-                      router.create_queue(record)
+                      router.create_queue(record, null)
                       task.run
                     }
                     case _ =>

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/AclAuthorizer.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/AclAuthorizer.scala?rev=1044500&r1=1044499&r2=1044500&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/AclAuthorizer.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/AclAuthorizer.scala Fri Dec 10 20:28:43 2010
@@ -19,7 +19,7 @@ package org.apache.activemq.apollo.broke
 import org.apache.activemq.apollo.broker.{Destination, VirtualHost, Broker}
 import scala.util.continuations._
 import org.apache.activemq.apollo.util.path.Path
-import org.apache.activemq.apollo.dto.{PrincipalDTO, QueueAclDTO, DestinationAclDTO, BindingDTO}
+import org.apache.activemq.apollo.dto._
 
 /**
  * <p>
@@ -34,15 +34,11 @@ class AclAuthorizer(val default_kinds:Li
 
   var allow_deafult = true
 
-  private def sync[T](func: =>T): T @suspendable = shift { k: (T=>Unit) =>
-    k(func)
-  }
-
   def is_in(ctx: SecurityContext, allowed:java.util.Set[PrincipalDTO]):Boolean = {
     ctx.intersects(allowed.toSet, default_kinds)
   }
 
-  def can_admin(ctx: SecurityContext, broker: Broker) = sync {
+  def can_admin(ctx: SecurityContext, broker: Broker) = {
     if( broker.config.acl!=null ) {
       is_in(ctx, broker.config.acl.admins)
     } else {
@@ -50,7 +46,7 @@ class AclAuthorizer(val default_kinds:Li
     }
   }
 
-  def can_connect_to(ctx: SecurityContext, host: VirtualHost) = sync {
+  def can_connect_to(ctx: SecurityContext, host: VirtualHost) = {
     if( host.config.acl!=null ) {
       is_in(ctx, host.config.acl.connects)
     } else {
@@ -58,58 +54,53 @@ class AclAuthorizer(val default_kinds:Li
     }
   }
 
-
-  private def for_dest(ctx: SecurityContext, host: VirtualHost, dest: Path)(func: DestinationAclDTO=>java.util.Set[PrincipalDTO]) = {
-    host.destination_config(dest).map { config=>
-      if( config.acl!=null ) {
-        is_in(ctx, func(config.acl))
-      } else {
-        allow_deafult
-      }
-    }.getOrElse(allow_deafult)
+  private def can_dest(ctx: SecurityContext, host: VirtualHost, dest: DestinationDTO)(func: DestinationAclDTO=>java.util.Set[PrincipalDTO]) = {
+    if( dest.acl!=null ) {
+      is_in(ctx, func(dest.acl))
+    } else {
+      allow_deafult
+    }
   }
 
-  def can_send_to(ctx: SecurityContext, host: VirtualHost, dest: Path) = sync {
-    for_dest(ctx, host, dest)(_.sends)
+  def can_send_to(ctx: SecurityContext, host: VirtualHost, dest: DestinationDTO) = {
+    can_dest(ctx, host, dest)(_.sends)
   }
-  def can_receive_from(ctx: SecurityContext, host: VirtualHost, dest: Path) = sync {
-    for_dest(ctx, host, dest)(_.receives)
+  def can_receive_from(ctx: SecurityContext, host: VirtualHost, dest: DestinationDTO) = {
+    can_dest(ctx, host, dest)(_.receives)
   }
-  def can_destroy(ctx: SecurityContext, host: VirtualHost, dest: Path) = sync  {
-    for_dest(ctx, host, dest)(_.destroys)
+  def can_destroy(ctx: SecurityContext, host: VirtualHost, dest: DestinationDTO) = {
+    can_dest(ctx, host, dest)(_.destroys)
   }
-  def can_create(ctx: SecurityContext, host: VirtualHost, dest: Path) = sync  {
-    for_dest(ctx, host, dest)(_.creates)
+  def can_create(ctx: SecurityContext, host: VirtualHost, dest: DestinationDTO) = {
+    can_dest(ctx, host, dest)(_.creates)
   }
 
-  private def for_queue(ctx: SecurityContext, host: VirtualHost, dto: BindingDTO)(func: QueueAclDTO=>java.util.Set[PrincipalDTO]) = {
-    host.queue_config(dto).map { config=>
-      if( config.acl!=null ) {
-        is_in(ctx, func(config.acl))
-      } else {
-        allow_deafult
-      }
-    }.getOrElse(allow_deafult)
+  private def can_queue(ctx: SecurityContext, host: VirtualHost, queue: QueueDTO)(func: QueueAclDTO=>java.util.Set[PrincipalDTO]) = {
+    if( queue.acl!=null ) {
+      is_in(ctx, func(queue.acl))
+    } else {
+      allow_deafult
+    }
   }
 
-  def can_send_to(ctx: SecurityContext, host: VirtualHost, dest: BindingDTO) = sync  {
-    for_queue(ctx, host, dest)(_.sends)
+  def can_send_to(ctx: SecurityContext, host: VirtualHost, queue: QueueDTO) = {
+    can_queue(ctx, host, queue)(_.sends)
   }
 
-  def can_receive_from(ctx: SecurityContext, host: VirtualHost, dest: BindingDTO) = sync  {
-    for_queue(ctx, host, dest)(_.receives)
+  def can_receive_from(ctx: SecurityContext, host: VirtualHost, queue: QueueDTO) = {
+    can_queue(ctx, host, queue)(_.receives)
   }
 
-  def can_destroy(ctx: SecurityContext, host: VirtualHost, dest: BindingDTO) = sync  {
-    for_queue(ctx, host, dest)(_.destroys)
+  def can_destroy(ctx: SecurityContext, host: VirtualHost, queue: QueueDTO) = {
+    can_queue(ctx, host, queue)(_.destroys)
   }
 
-  def can_create(ctx: SecurityContext, host: VirtualHost, dest: BindingDTO) = sync  {
-    for_queue(ctx, host, dest)(_.creates)
+  def can_create(ctx: SecurityContext, host: VirtualHost, queue: QueueDTO) = {
+    can_queue(ctx, host, queue)(_.creates)
   }
 
-  def can_consume_from(ctx: SecurityContext, host: VirtualHost, dest: BindingDTO) = sync  {
-    for_queue(ctx, host, dest)(_.consumes)
+  def can_consume_from(ctx: SecurityContext, host: VirtualHost, queue: QueueDTO) = {
+    can_queue(ctx, host, queue)(_.consumes)
   }
 
 }
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/Authorizer.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/Authorizer.scala?rev=1044500&r1=1044499&r2=1044500&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/Authorizer.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/Authorizer.scala Fri Dec 10 20:28:43 2010
@@ -16,9 +16,9 @@
  */
 package org.apache.activemq.apollo.broker.security
 import scala.util.continuations._
-import org.apache.activemq.apollo.dto.BindingDTO
 import org.apache.activemq.apollo.broker.{VirtualHost, Broker, Destination}
 import org.apache.activemq.apollo.util.path.Path
+import org.apache.activemq.apollo.dto.{DestinationDTO, QueueDTO, BindingDTO}
 
 /**
  * <p>
@@ -31,56 +31,57 @@ trait Authorizer {
   /**
    * @returns true if the user is an admin.
    */
-  def can_admin(ctx:SecurityContext, broker:Broker):Boolean @suspendable
+  def can_admin(ctx:SecurityContext, broker:Broker):Boolean
 
   /**
    * @returns true if the user is allowed to connect to the virtual host
    */
-  def can_connect_to(ctx:SecurityContext, host:VirtualHost):Boolean @suspendable
+  def can_connect_to(ctx:SecurityContext, host:VirtualHost):Boolean
 
   /**
    * @returns true if the user is allowed to send to the destination
    */
-  def can_send_to(ctx:SecurityContext, host:VirtualHost, dest:Path):Boolean @suspendable
+  def can_send_to(ctx:SecurityContext, host:VirtualHost, dest:DestinationDTO):Boolean
 
   /**
    * @returns true if the user is allowed to receive from the destination
    */
-  def can_receive_from(ctx:SecurityContext, host:VirtualHost, dest:Path):Boolean @suspendable
+  def can_receive_from(ctx:SecurityContext, host:VirtualHost, dest:DestinationDTO):Boolean
 
   /**
    * @returns true if the user is allowed to create the destination
    */
-  def can_create(ctx:SecurityContext, host:VirtualHost, dest:Path):Boolean @suspendable
+  def can_create(ctx:SecurityContext, host:VirtualHost, dest:DestinationDTO):Boolean
 
   /**
    * @returns true if the user is allowed to destroy the destination
    */
-  def can_destroy(ctx:SecurityContext, host:VirtualHost, dest:Path):Boolean @suspendable
+  def can_destroy(ctx:SecurityContext, host:VirtualHost, dest:DestinationDTO):Boolean
+
 
   /**
    * @returns true if the user is allowed to send to the queue
    */
-  def can_send_to(ctx:SecurityContext, host:VirtualHost, dest:BindingDTO):Boolean @suspendable
+  def can_send_to(ctx:SecurityContext, host:VirtualHost, queue:QueueDTO):Boolean
 
   /**
    * @returns true if the user is allowed to receive from the queue
    */
-  def can_receive_from(ctx:SecurityContext, host:VirtualHost, dest:BindingDTO):Boolean @suspendable
+  def can_receive_from(ctx:SecurityContext, host:VirtualHost, queue:QueueDTO):Boolean
 
   /**
    * @returns true if the user is allowed to consume from the queue
    */
-  def can_consume_from(ctx:SecurityContext, host:VirtualHost, dest:BindingDTO):Boolean @suspendable
+  def can_consume_from(ctx:SecurityContext, host:VirtualHost, queue:QueueDTO):Boolean
 
   /**
    * @returns true if the user is allowed to create the queue
    */
-  def can_create(ctx:SecurityContext, host:VirtualHost, dest:BindingDTO):Boolean @suspendable
+  def can_create(ctx:SecurityContext, host:VirtualHost, queue:QueueDTO):Boolean
 
   /**
    * @returns true if the user is allowed to destroy the queue
    */
-  def can_destroy(ctx:SecurityContext, host:VirtualHost, dest:BindingDTO):Boolean @suspendable
+  def can_destroy(ctx:SecurityContext, host:VirtualHost, queue:QueueDTO):Boolean
 
 }
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/JaasAuthenticator.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/JaasAuthenticator.scala?rev=1044500&r1=1044499&r2=1044500&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/JaasAuthenticator.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/JaasAuthenticator.scala Fri Dec 10 20:28:43 2010
@@ -44,34 +44,37 @@ class JaasAuthenticator(val jass_realm: 
    * potentially perform a blocking wait (e.g. LDAP request).
    */
   def authenticate(security_ctx: SecurityContext) = BLOCKABLE_THREAD_POOL ! {
+    _authenticate(security_ctx)
+  }
 
+  def _authenticate(security_ctx: SecurityContext): Boolean = {
     val original = Thread.currentThread().getContextClassLoader()
     Thread.currentThread().setContextClassLoader(getClass.getClassLoader())
     try {
 
-      val login_ctx = new LoginContext(jass_realm, new CallbackHandler {
+      security_ctx.login_context = new LoginContext(jass_realm, new CallbackHandler {
         def handle(callbacks: Array[Callback]) = {
-          callbacks.foreach{ callback =>
-            callback match {
-              case x: NameCallback => x.setName(security_ctx.user)
-              case x: PasswordCallback => x.setPassword(security_ctx.password.getOrElse("").toCharArray)
-              case x: CertificateCallback => x.setCertificates(security_ctx.certificates)
-              case _ => throw new UnsupportedCallbackException(callback)
-            }
+          callbacks.foreach{
+            callback =>
+              callback match {
+                case x: NameCallback => x.setName(security_ctx.user)
+                case x: PasswordCallback => x.setPassword(security_ctx.password.getOrElse("").toCharArray)
+                case x: CertificateCallback => x.setCertificates(security_ctx.certificates)
+                case _ => throw new UnsupportedCallbackException(callback)
+              }
           }
         }
       })
 
-      login_ctx.login()
-      security_ctx.subject = login_ctx.getSubject()
+      security_ctx.login_context.login()
+      security_ctx.subject = security_ctx.login_context.getSubject()
       true
     } catch {
-      case x:Exception =>
-      false
+      case x: Exception =>
+        false
     } finally {
       Thread.currentThread().setContextClassLoader(original)
     }
   }
 
-
 }
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/SecurityContext.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/SecurityContext.scala?rev=1044500&r1=1044499&r2=1044500&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/SecurityContext.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/SecurityContext.scala Fri Dec 10 20:28:43 2010
@@ -23,6 +23,7 @@ import java.security.cert.X509Certificat
 import org.apache.activemq.apollo.util.OptionSupport._
 import org.apache.activemq.jaas.{GroupPrincipal, UserPrincipal}
 import org.apache.activemq.apollo.dto.PrincipalDTO
+import javax.security.auth.login.LoginContext
 
 /**
  * <p>
@@ -36,6 +37,8 @@ class SecurityContext {
   var password:String = _
   var certificates = Array[X509Certificate]()
 
+  var login_context:LoginContext = _
+
   private val principles = new HashSet[PrincipalDTO]()
 
   private var _subject:Subject = _
@@ -55,11 +58,12 @@ class SecurityContext {
 
   def intersects(values:Set[PrincipalDTO], default_kinds:List[String]):Boolean = {
     val (v1, v2) = values.partition(_.kind == null)
-    if( principles.intersect(v2).isEmpty ) {
+    if( !principles.intersect(v2).isEmpty ) {
       return true
     }
     default_kinds.foreach { x=>
-      if( ! (v1.map(y=> new PrincipalDTO(y.name, x) ).intersect(v1).isEmpty) ) {
+      val kinda_added = v1.map(y=> new PrincipalDTO(y.name, x))
+      if( ! principles.intersect(kinda_added).isEmpty ) {
         return true
       }
     }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/DestinationConfigurationTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/DestinationConfigurationTest.scala?rev=1044500&r1=1044499&r2=1044500&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/DestinationConfigurationTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/DestinationConfigurationTest.scala Fri Dec 10 20:28:43 2010
@@ -54,7 +54,7 @@ class DestinationConfigurationTest exten
     def check_tune_queue_buffer(expected:Int)(dto:BindingDTO) = {
       var actual=0
       reset {
-        var q = router.create_queue(dto).get
+        var q = router.get_or_create_queue(dto, null).success
         actual = q.tune_queue_buffer
       }
       expect(expected) {actual}

Modified: activemq/activemq-apollo/trunk/apollo-cli/src/main/resources/org/apache/activemq/apollo/cli/commands/etc/apollo-ssl.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-cli/src/main/resources/org/apache/activemq/apollo/cli/commands/etc/apollo-ssl.xml?rev=1044500&r1=1044499&r2=1044500&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-cli/src/main/resources/org/apache/activemq/apollo/cli/commands/etc/apollo-ssl.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-cli/src/main/resources/org/apache/activemq/apollo/cli/commands/etc/apollo-ssl.xml Fri Dec 10 20:28:43 2010
@@ -19,6 +19,7 @@
     The default configuration with tls/ssl enabled.
   </notes>
 
+  <!-- used to secure the web admin interface -->
   <authentication domain="apollo"/>
   <acl>
     <admin name="admins"/>
@@ -33,6 +34,37 @@
       -->
     <host-name>localhost</host-name>
 
+    <!-- Uncomment to disable security for the virtual host
+    <authentication enabled="false"/>
+    -->
+
+    <!--
+      You can add an 'acl' element to the virtual host,
+      destination, or queue elements to restrict the operations
+      that a user can take.  If the acl element is not set on
+      an object, then access is not restricted at all.
+      -->
+    <acl>
+      <connect name="admins"/>
+    </acl>
+    <destination path="secure.**">
+      <acl>
+        <create  name="admins"/>
+        <destroy name="admins"/>
+        <send    name="admins"/>
+        <receive name="admins"/>
+      </acl>
+    </destination>
+    <queue path="secure.**">
+      <acl>
+        <create  name="admins"/>
+        <destroy name="admins"/>
+        <send    name="admins"/>
+        <receive name="admins"/> <!-- queue browsers -->
+        <consume name="admins"/> <!-- regular consumers -->
+      </acl>
+    </queue>
+
     <!--
       Examples of how to configure destinations and queues. Note
       they you can use wildcard patterns when specifying destinations,

Modified: activemq/activemq-apollo/trunk/apollo-cli/src/main/resources/org/apache/activemq/apollo/cli/commands/etc/apollo.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-cli/src/main/resources/org/apache/activemq/apollo/cli/commands/etc/apollo.xml?rev=1044500&r1=1044499&r2=1044500&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-cli/src/main/resources/org/apache/activemq/apollo/cli/commands/etc/apollo.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-cli/src/main/resources/org/apache/activemq/apollo/cli/commands/etc/apollo.xml Fri Dec 10 20:28:43 2010
@@ -20,6 +20,7 @@
     The default configuration.
   </notes>
 
+  <!-- used to secure the web admin interface -->
   <authentication domain="apollo"/>
   <acl>
     <admin name="admins"/>
@@ -35,6 +36,38 @@
       -->
     <host-name>localhost</host-name>
 
+    <!-- Uncomment to disable security for the virtual host
+    <authentication enabled="false"/>
+    -->
+
+    <!--
+      You can add an 'acl' element to the virtual host,
+      destination, or queue elements to restrict the operations
+      that a user can take.  If the acl element is not set on
+      an object, then access is not restricted at all.
+      -->
+    <acl>
+      <connect name="admins"/>
+    </acl>
+    <destination path="secure.**">
+      <acl>
+        <create  name="admins"/>
+        <destroy name="admins"/>
+        <send    name="admins"/>
+        <receive name="admins"/>
+      </acl>
+    </destination>
+    <queue path="secure.**">
+      <acl>
+        <create  name="admins"/>
+        <destroy name="admins"/>
+        <send    name="admins"/>
+        <receive name="admins"/> <!-- queue browsers -->
+        <consume name="admins"/> <!-- regular consumers -->
+      </acl>
+    </queue>
+
+
     <!--
       Examples of how to configure destinations and queues. Note
       they you can use wildcard patterns when specifying destinations,

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AuthenticationDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AuthenticationDTO.java?rev=1044500&r1=1044499&r2=1044500&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AuthenticationDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AuthenticationDTO.java Fri Dec 10 20:28:43 2010
@@ -35,6 +35,9 @@ import java.util.Set;
 public class AuthenticationDTO {
 
     @XmlAttribute
+    public Boolean enabled;
+
+    @XmlAttribute
     public String domain;
 
     @XmlElement(name="kind")

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationAclDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationAclDTO.java?rev=1044500&r1=1044499&r2=1044500&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationAclDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationAclDTO.java Fri Dec 10 20:28:43 2010
@@ -30,7 +30,7 @@ import java.util.*;
 @XmlAccessorType(XmlAccessType.FIELD)
 public class DestinationAclDTO {
 
-    @XmlElement(name="creates")
+    @XmlElement(name="create")
     public Set<PrincipalDTO> creates = new HashSet<PrincipalDTO>();
 
     @XmlElement(name="destroy")

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/test/java/org/apache/activemq/apollo/dto/XmlCodecTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/test/java/org/apache/activemq/apollo/dto/XmlCodecTest.java?rev=1044500&r1=1044499&r2=1044500&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/test/java/org/apache/activemq/apollo/dto/XmlCodecTest.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/test/java/org/apache/activemq/apollo/dto/XmlCodecTest.java Fri Dec 10 20:28:43 2010
@@ -35,12 +35,14 @@ public class XmlCodecTest {
 
     @Test
     public void unmarshalling() throws Exception {
-        BrokerDTO dto = XmlCodec.unmarshalBrokerDTO(resource("simple.xml"));
+        BrokerDTO dto = XmlCodec.unmarshalBrokerDTO(resource("XmlCodecTest.xml"));
         assertNotNull(dto);
         assertEquals("default", dto.id);
-        assertEquals("vh-local", dto.virtual_hosts.get(0).id);
-        assertEquals("localhost", dto.virtual_hosts.get(0).host_names.get(0));
-        assertEquals("example.com", dto.virtual_hosts.get(0).host_names.get(1));
+        VirtualHostDTO host = dto.virtual_hosts.get(0);
+        assertNotNull(host.acl);
+        assertEquals("vh-local", host.id);
+        assertEquals("localhost", host.host_names.get(0));
+        assertEquals("example.com", host.host_names.get(1));
 
         assertNotNull(dto.acl);
         assertTrue(dto.acl.admins.contains(new PrincipalDTO("hiram")));

Copied: activemq/activemq-apollo/trunk/apollo-dto/src/test/resources/org/apache/activemq/apollo/dto/XmlCodecTest.xml (from r1044356, activemq/activemq-apollo/trunk/apollo-dto/src/test/resources/org/apache/activemq/apollo/dto/simple.xml)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/test/resources/org/apache/activemq/apollo/dto/XmlCodecTest.xml?p2=activemq/activemq-apollo/trunk/apollo-dto/src/test/resources/org/apache/activemq/apollo/dto/XmlCodecTest.xml&p1=activemq/activemq-apollo/trunk/apollo-dto/src/test/resources/org/apache/activemq/apollo/dto/simple.xml&r1=1044356&r2=1044500&rev=1044500&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/test/resources/org/apache/activemq/apollo/dto/simple.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/test/resources/org/apache/activemq/apollo/dto/XmlCodecTest.xml Fri Dec 10 20:28:43 2010
@@ -24,6 +24,7 @@
   </acl>
 
   <virtual-host enabled="true" id="vh-local">
+    <acl/>
     <host-name>localhost</host-name>
     <host-name>example.com</host-name>
   </virtual-host>

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1044500&r1=1044499&r2=1044500&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Fri Dec 10 20:28:43 2010
@@ -227,6 +227,7 @@ class StompProtocolHandler extends Proto
   var session_manager:SinkMux[StompFrame] = null
   var connection_sink:Sink[StompFrame] = null
 
+  var dead = false
   var closed = false
   var consumers = Map[AsciiBuffer, StompConsumer]()
 
@@ -279,9 +280,11 @@ class StompProtocolHandler extends Proto
   }
 
   private def die[T](headers:HeaderMap, body:String):T = {
-    if( !connection.stopped ) {
-      suspendRead("shutdown")
-      connection.transport.offer(StompFrame(ERROR, headers, BufferContent(ascii(body))) )
+    if( !dead ) {
+      dead = true
+      waiting_on = "shutdown"
+      connection.transport.resumeRead
+      connection_sink.offer(StompFrame(ERROR, headers, BufferContent(ascii(body))) )
       // TODO: if there are too many open connections we should just close the connection
       // without waiting for the error to get sent to the client.
       queue.after(die_delay, TimeUnit.MILLISECONDS) {
@@ -306,6 +309,7 @@ class StompProtocolHandler extends Proto
     if( !closed ) {
       heart_beat_monitor.stop
       closed=true;
+      dead = true;
 
       import collection.JavaConversions._
       producerRoutes.foreach{
@@ -329,7 +333,11 @@ class StompProtocolHandler extends Proto
   }
 
 
-  override def onTransportCommand(command:Any) = {
+  override def onTransportCommand(command:Any):Unit = {
+    if( dead ) {
+      // We stop processing client commands once we are dead
+      return;
+    }
     try {
       command match {
         case s:StompCodec =>
@@ -405,8 +413,8 @@ class StompProtocolHandler extends Proto
 
   def on_stomp_connect(headers:HeaderMap):Unit = {
 
-    security_context.user = get(headers, LOGIN).toString
-    security_context.password = get(headers, PASSCODE).toString
+    security_context.user = get(headers, LOGIN).map(_.toString).getOrElse(null)
+    security_context.password = get(headers, PASSCODE).map(_.toString).getOrElse(null)
 
     val accept_versions = get(headers, ACCEPT_VERSION).getOrElse(V1_0).split(COMMA).map(_.ascii)
     protocol_version = SUPPORTED_PROTOCOL_VERSIONS.find( v=> accept_versions.contains(v) ) match {
@@ -454,6 +462,26 @@ class StompProtocolHandler extends Proto
     }
 
     def noop = shift {  k: (Unit=>Unit) => k() }
+
+    def send_connected = {
+      val outbound_heart_beat_header = ascii("%d,%d".format(outbound_heartbeat,inbound_heartbeat))
+      session_id = ascii(this.host.config.id + ":"+this.host.session_counter.incrementAndGet)
+      if( connection_sink==null ) {
+        weird(headers)
+      }
+      connection_sink.offer(
+        StompFrame(CONNECTED, List(
+          (VERSION, protocol_version),
+          (SESSION, session_id),
+          (HEART_BEAT, outbound_heart_beat_header)
+        )))
+
+      if( this.host.direct_buffer_pool!=null ) {
+        val wf = connection.transport.getProtocolCodec.asInstanceOf[StompCodec]
+        wf.memory_pool = this.host.direct_buffer_pool
+      }
+    }
+
     reset {
       suspendRead("virtual host lookup")
       val host_header = get(headers, HOST)
@@ -464,43 +492,29 @@ class StompProtocolHandler extends Proto
           connection.connector.broker.getVirtualHost(host)
       }
       resumeRead
+
       if(host==null) {
         async_die("Invalid virtual host: "+host_header.get)
-        noop // to make the cps compiler plugin happy.
+        noop
       } else {
         this.host=host
-
-        var authenticated = true;
-
-        if( host.authenticator!=null ) {
-          suspendRead("authenticating")
-          authenticated = host.authenticator.authenticate(security_context)
-          resumeRead
-        } else {
-          noop // to make the cps compiler plugin happy.
-        }
-
-        if( !authenticated ) {
+        if( host.authenticator!=null &&  host.authorizer!=null ) {
+          suspendRead("authenticating and authorizing connect")
+          if( !host.authenticator.authenticate(security_context) ) {
             async_die("Authentication failed.")
-        } else {
-          val outbound_heart_beat_header = ascii("%d,%d".format(outbound_heartbeat,inbound_heartbeat))
-          session_id = ascii(this.host.config.id + ":"+this.host.session_counter.incrementAndGet)
-          if( connection_sink==null ) {
-            weird(headers)
-          }
-          connection_sink.offer(
-            StompFrame(CONNECTED, List(
-              (VERSION, protocol_version),
-              (SESSION, session_id),
-              (HEART_BEAT, outbound_heart_beat_header)
-            )))
-
-          if( this.host.direct_buffer_pool!=null ) {
-            val wf = connection.transport.getProtocolCodec.asInstanceOf[StompCodec]
-            wf.memory_pool = this.host.direct_buffer_pool
+            noop // to make the cps compiler plugin happy.
+          } else if( !host.authorizer.can_connect_to(security_context, host) ) {
+            async_die("Connect not authorized.")
+            noop // to make the cps compiler plugin happy.
+          } else {
+            resumeRead
+            send_connected
+            noop // to make the cps compiler plugin happy.
           }
+        } else {
+          send_connected
+          noop // to make the cps compiler plugin happy.
         }
-
       }
     }
 
@@ -557,8 +571,11 @@ class StompProtocolHandler extends Proto
 
         // don't process frames until producer is connected...
         connection.transport.suspendRead
-        host.router.connect(destiantion, producer) {
-          route =>
+        host.router.connect(destiantion, producer, security_context) {
+          case Failure(reason) =>
+            async_die(reason)
+
+          case Success(route) =>
             if (!connection.stopped) {
               resumeRead
               route.refiller = ^ {
@@ -708,21 +725,32 @@ class StompProtocolHandler extends Proto
 
       // consumer is bind bound as a topic
       reset {
-        host.router.bind(destination, consumer)
-        send_receipt(headers)
+        val rc = host.router.bind(destination, consumer, security_context)
         consumer.release
+        rc match {
+          case Failure(reason)=>
+            async_die(reason)
+          case _=>
+            send_receipt(headers)
+        }
       }
 
     } else {
       reset {
         // create a queue and bind the consumer to it.
-        val x= host.router.create_queue(binding)
+        val x= host.router.get_or_create_queue(binding, security_context)
         x match {
-          case Some(queue:Queue) =>
-            queue.bind(consumer::Nil)
-            send_receipt(headers)
+          case Success(queue) =>
+            val rc = queue.bind(consumer, security_context)
             consumer.release
-          case None => async_die("case not yet implemented.")
+            rc match {
+              case Failure(reason)=>
+                async_die(reason)
+              case _ =>
+                send_receipt(headers)
+            }
+          case Failure(reason) =>
+            async_die(reason)
         }
       }
     }
@@ -763,8 +791,13 @@ class StompProtocolHandler extends Proto
 
           if( persistent && consumer.binding!=null ) {
             reset {
-              val sucess = host.router.destroy_queue(consumer.binding)
-              send_receipt(headers)
+              val rc = host.router.destroy_queue(consumer.binding, security_context)
+              rc match {
+                case Failure(reason) =>
+                  async_die(reason)
+                case Success(_) =>
+                  send_receipt(headers)
+              }
             }
           } else {
             send_receipt(headers)

Added: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Result.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Result.scala?rev=1044500&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Result.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Result.scala Fri Dec 10 20:28:43 2010
@@ -0,0 +1,67 @@
+package org.apache.activemq.apollo.util
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <p>A Result can either be a Success or a Failure</p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+sealed abstract class Result[+S,+F] {
+  def failed:Boolean
+
+  def success:S
+  def failure:F
+
+  def success_option:Option[S] = if (failed) None else Some(success)
+  def failure_option:Option[F] = if (failed) Some(failure) else None
+
+  def map_success[B](f: S => B): Result[B, F] =
+    if (failed) Failure(failure)  else Success(f(this.success))
+
+  def map_failure[B](f: F => B): Result[S, B] =
+    if (failed) Failure(f(this.failure)) else Success(this.success)
+
+}
+
+/**
+ * <p>A Success Result</p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+final case class Success[+S](x: S) extends Result[S, Nothing] {
+  def get = x
+  def success = x
+  def failure = throw new NoSuchElementException("Success.failure")
+  def failed = false
+}
+
+/**
+ * <p>A Failure Result</p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+final case class Failure[+F](x: F) extends Result[Nothing,F] {
+  def get = x
+  def success = throw new NoSuchElementException("Failure.success")
+  def failure = x
+  def failed = true
+}
+
+sealed class Zilch
+final case object Zilch extends Zilch
\ No newline at end of file