You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/01/26 04:10:37 UTC

svn commit: r1063582 [1/3] - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-broker/src/main/scala/org/apache/acti...

Author: chirino
Date: Wed Jan 26 03:10:35 2011
New Revision: 1063582

URL: http://svn.apache.org/viewvc?rev=1063582&view=rev
Log:
Renamed Router to LocalRouter and extracted a Router trait from it.
decoupling destination from queue. we now have seperate topic and queue domains.

Added:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/DestinationParser.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala
      - copied, changed from r1062213, activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDTO.java
      - copied, changed from r1062213, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/SubscriptionBindingDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDestinationDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDestinationDTO.java
      - copied, changed from r1062213, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BindingDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicAclDTO.java
      - copied, changed from r1062213, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationAclDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicDTO.java
      - copied, changed from r1062213, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueBindingDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicDestinationDTO.java
      - copied, changed from r1062213, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationAclDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicStatusDTO.java
      - copied, changed from r1062213, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationStatusDTO.java
    activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/TopicStatusDTO.jade
      - copied, changed from r1062213, activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/DestinationStatusDTO.jade
Removed:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Destinations.scala
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BindingDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationAclDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationStatusDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueBindingDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/SubscriptionBindingDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TempBindingDTO.java
    activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/DestinationStatusDTO.jade
Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/binding-factory.index
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/AclAuthorizer.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/Authorizer.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/test/resources/org/apache/activemq/apollo/broker/destination-config.xml
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/DestinationConfigurationTest.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/LargeInitialDB.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/RemoteConnection.scala
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueAclDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostStatusDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index
    activemq/activemq-apollo/trunk/apollo-dto/src/test/java/org/apache/activemq/apollo/dto/XmlCodecTest.java
    activemq/activemq-apollo/trunk/apollo-dto/src/test/resources/org/apache/activemq/apollo/dto/XmlCodecTest.xml
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMap.java
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMapNode.java
    activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala
    activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade
    activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/VirtualHostStatusDTO.jade
    activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/binding-factory.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/binding-factory.index?rev=1063582&r1=1063581&r2=1063582&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/binding-factory.index (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/binding-factory.index Wed Jan 26 03:10:35 2011
@@ -14,6 +14,6 @@
 ## See the License for the specific language governing permissions and
 ## limitations under the License.
 ## ---------------------------------------------------------------------------
-org.apache.activemq.apollo.broker.QueueBindingFactory
-org.apache.activemq.apollo.broker.SubscriptionBindingFactory
-org.apache.activemq.apollo.broker.TempBindingFactory
\ No newline at end of file
+org.apache.activemq.apollo.broker.QueueDomainQueueBinding
+org.apache.activemq.apollo.broker.DurableSubscriptionQueueBinding
+org.apache.activemq.apollo.broker.TempQueueBinding
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=1063582&r1=1063581&r2=1063582&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala Wed Jan 26 03:10:35 2011
@@ -22,8 +22,8 @@ import org.fusesource.hawtdispatch._
 import protocol.{ProtocolHandler}
 import org.apache.activemq.apollo.util.{Log, BaseService}
 import org.apache.activemq.apollo.filter.BooleanExpression
-import org.apache.activemq.apollo.dto.ConnectionStatusDTO
 import org.apache.activemq.apollo.transport.{TransportListener, DefaultTransportListener, Transport}
+import org.apache.activemq.apollo.dto.{DestinationDTO, ConnectionStatusDTO}
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -159,7 +159,7 @@ trait ConsumerContext { // extends Clien
 
     def getConsumerId() : String
 
-    def getDestination(): Destination
+    def getDestination(): DestinationDTO
 
     def getSelector() : String
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1063582&r1=1063581&r2=1063582&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jan 26 03:10:35 2011
@@ -23,6 +23,7 @@ import protocol.Protocol
 import org.apache.activemq.apollo.filter.Filterable
 import org.apache.activemq.apollo.broker.store.{StoreUOW, MessageRecord}
 import org.apache.activemq.apollo.util.{Log, Logging}
+import org.apache.activemq.apollo.dto.DestinationDTO
 
 object DeliveryProducer extends Log
 
@@ -31,8 +32,8 @@ object DeliveryProducer extends Log
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-trait DeliveryProducer extends Logging {
-  override protected def log:Log = DeliveryProducer
+trait DeliveryProducer {
+  import DeliveryProducer._
 
   def dispatch_queue:DispatchQueue
 
@@ -115,7 +116,7 @@ trait Message extends Filterable with Re
   /**
    * where the message was sent to.
    */
-  def destination: Destination
+  def destination: Array[DestinationDTO]
 
   /**
    * The protocol of the message

Added: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/DestinationParser.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/DestinationParser.scala?rev=1063582&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/DestinationParser.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/DestinationParser.scala Wed Jan 26 03:10:35 2011
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker
+
+import _root_.org.fusesource.hawtbuf._
+import BufferConversions._
+import Buffer._
+import org.apache.activemq.apollo.util.path.{Path, PathParser}
+import scala.collection.mutable.ListBuffer
+import org.apache.activemq.apollo.dto.{TopicDestinationDTO, QueueDestinationDTO, DestinationDTO}
+
+object DestinationParser {
+
+  val default = new DestinationParser
+
+  def encode_path(value:Path) = default.toString(value)
+  def decode_path(value:String) = default.parsePath(ascii(value))
+
+  def encode_destination(value:Array[DestinationDTO]) = default.toString(value)
+  def decode_destination(value:String) = default.parse(ascii(value))
+
+  def create_destination(domain:AsciiBuffer, name:String) = {
+    Array(domain match {
+      case LocalRouter.QUEUE_DOMAIN => new QueueDestinationDTO(name)
+      case LocalRouter.TOPIC_DOMAIN => new TopicDestinationDTO(name)
+      case _ => throw new Exception("Uknown destination domain: "+domain);
+    })
+  }
+
+}
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class DestinationParser extends PathParser {
+  import DestinationParser._
+
+  var default_domain: AsciiBuffer = null
+  var queue_prefix: AsciiBuffer = ascii("queue:")
+  var topic_prefix: AsciiBuffer = ascii("topic:")
+  var temp_queue_prefix: AsciiBuffer = ascii("temp-queue:")
+  var temp_topic_prefix: AsciiBuffer = ascii("temp-topic:")
+  var destination_separator: Option[Byte] = Some(','.toByte)
+
+  def toBuffer(value: Array[DestinationDTO]): AsciiBuffer = {
+    if (value == null) {
+      null
+    } else {
+      val baos = new ByteArrayOutputStream
+      val first = true
+      value.foreach { d =>
+        if (!first) {
+          assert( destination_separator.isDefined )
+          baos.write(destination_separator.get)
+        }
+        d match {
+          case d:QueueDestinationDTO =>
+            baos.write(queue_prefix)
+          case d:TopicDestinationDTO =>
+            baos.write(topic_prefix)
+//          case Router.TEMP_QUEUE_DOMAIN =>
+//            baos.write(temp_queue_prefix)
+//          case Router.TEMP_TOPIC_DOMAIN =>
+//            baos.write(temp_topic_prefix)
+          case _ =>
+            throw new Exception("Uknown destination type: "+d.getClass);
+        }
+        ascii(d.name).writeTo(baos)
+      }
+      baos.toBuffer.ascii
+    }
+  }
+
+  def toString(value:Array[DestinationDTO]) = toBuffer(value).toString
+
+  /**
+   * Parses a destination which may or may not be a composite.
+   *
+   * @param value
+   * @param compositeSeparator
+   * @return
+   */
+  def parse(value: AsciiBuffer): Array[DestinationDTO] = {
+    if (value == null) {
+      return null;
+    }
+
+    if (destination_separator.isDefined && value.contains(destination_separator.get)) {
+      var rc = value.split(destination_separator.get);
+      var dl = ListBuffer[DestinationDTO]()
+      for (buffer <- rc) {
+        val d = parse(buffer)
+        if (d == null) {
+          return null;
+        }
+        dl += d(0)
+      }
+      return dl.toArray
+    } else {
+
+      if (queue_prefix != null && value.startsWith(queue_prefix)) {
+        var name = value.slice(queue_prefix.length, value.length).ascii()
+        return create_destination(LocalRouter.QUEUE_DOMAIN, name.toString)
+      } else if (topic_prefix != null && value.startsWith(topic_prefix)) {
+        var name = value.slice(topic_prefix.length, value.length).ascii()
+        return create_destination(LocalRouter.TOPIC_DOMAIN, name.toString)
+//      } else if (temp_queue_prefix != null && value.startsWith(temp_queue_prefix)) {
+//        var name = value.slice(temp_queue_prefix.length, value.length).ascii()
+//        return new DestinationDTO(LocalRouter.TEMP_QUEUE_DOMAIN, name.toString)
+//      } else if (temp_topic_prefix != null && value.startsWith(temp_topic_prefix)) {
+//        var name = value.slice(temp_topic_prefix.length, value.length).ascii()
+//        return new DestinationDTO(LocalRouter.TEMP_TOPIC_DOMAIN, name.toString)
+      } else {
+        if (default_domain == null) {
+          return null;
+        }
+        return create_destination(default_domain, value.toString)
+      }
+    }
+  }
+}
+

Added: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1063582&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala Wed Jan 26 03:10:35 2011
@@ -0,0 +1,726 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker
+
+import _root_.org.fusesource.hawtbuf._
+import org.fusesource.hawtdispatch._
+import collection.JavaConversions
+import org.apache.activemq.apollo.util._
+import collection.mutable.HashMap
+import org.apache.activemq.apollo.broker.store.QueueRecord
+import Buffer._
+import org.apache.activemq.apollo.util.path.{Path, Part, PathMap, PathParser}
+import java.util.ArrayList
+import org.apache.activemq.apollo.dto._
+import security.SecurityContext
+import java.util.concurrent.TimeUnit
+
+trait DomainDestination {
+
+  def id:Long
+  def name:String
+
+  def can_bind(destination:DestinationDTO, consumer:DeliveryConsumer, security:SecurityContext):Boolean
+  def bind (destination:DestinationDTO, consumer:DeliveryConsumer)
+  def unbind (consumer:DeliveryConsumer, persistent:Boolean)
+
+  def can_connect(destination:DestinationDTO, producer:BindableDeliveryProducer, security:SecurityContext):Boolean
+  def connect (destination:DestinationDTO, producer:BindableDeliveryProducer)
+  def disconnect (producer:BindableDeliveryProducer)
+
+}
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object LocalRouter extends Log {
+  val TOPIC_DOMAIN = ascii("topic");
+  val QUEUE_DOMAIN = ascii("queue");
+  val TEMP_TOPIC_DOMAIN = ascii("temp-topic");
+  val TEMP_QUEUE_DOMAIN = ascii("temp-queue");
+
+  val QUEUE_KIND = ascii("queue");
+  val DEFAULT_QUEUE_PATH = ascii("default");
+
+  class ConsumerContext(val destination:DestinationDTO, val consumer:DeliveryConsumer, val security:SecurityContext) {
+    override def hashCode: Int = consumer.hashCode
+
+    override def equals(obj: Any): Boolean = {
+      obj match {
+        case x:ConsumerContext=> x.consumer == consumer
+        case _ => false
+      }
+    }
+  }
+
+  class ProducerContext(val destination:DestinationDTO, val producer:BindableDeliveryProducer, val security:SecurityContext) {
+    override def hashCode: Int = producer.hashCode
+
+    override def equals(obj: Any): Boolean = {
+      obj match {
+        case x:ProducerContext=> x.producer == producer
+        case _ => false
+      }
+    }
+  }
+}
+
+
+/**
+ * Provides a non-blocking concurrent producer to consumer
+ * routing implementation.
+ *
+ * DeliveryProducers create a route object for each destination
+ * they will be producing to.  Once the route is
+ * connected to the router, the producer can use
+ * the route.targets list without synchronization to
+ * get the current set of consumers that are bound
+ * to the destination. 
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class LocalRouter(val host:VirtualHost) extends BaseService with Router {
+  import LocalRouter._
+
+  protected def dispatch_queue:DispatchQueue = host.dispatch_queue
+
+  def auto_create_destinations = {
+    import OptionSupport._
+    host.config.auto_create_destinations.getOrElse(true)
+  }
+
+  private val ALL = new Path({
+    val rc = new ArrayList[Part](1)
+    rc.add(Part.ANY_DESCENDANT)
+    rc
+  })
+
+  trait Domain[D <: DomainDestination] {
+
+    // holds all the destinations in the domain by id
+    var destination_by_id = HashMap[Long, D]()
+    // holds all the destinations in the domain by path
+    var destination_by_path = new PathMap[D]()
+    // Can store consumers on wild cards paths
+
+    val consumers_by_path = new PathMap[ConsumerContext]()
+    val producers_by_path = new PathMap[ProducerContext]()
+
+    def destinations:Iterable[D] = JavaConversions.asScalaIterable(destination_by_path.get(ALL))
+
+    def get_destination_matches(path:Path) = {
+      import JavaConversions._
+      asScalaIterable(destination_by_path.get( path ))
+    }
+
+    def create_destination(path:Path, security:SecurityContext):Result[D,String]
+
+    def get_or_create_destination(path:Path, security:SecurityContext):Result[D,String] = {
+      Option(destination_by_path.chooseValue(path)).
+      map(Success(_)).
+      getOrElse( create_destination(path, security))
+    }
+
+    def add_destination(path:Path, dest:D) = {
+      destination_by_path.put(path, dest)
+      destination_by_id.put(dest.id, dest)
+
+      // binds any matching wild card subs and producers...
+      import JavaConversions._
+      consumers_by_path.get( path ).foreach { x=>
+        if( dest.can_bind(x.destination, x.consumer, x.security) ) {
+          dest.bind(x.destination, x.consumer)
+        }
+      }
+      producers_by_path.get( path ).foreach { x=>
+        if( dest.can_connect(x.destination, x.producer, x.security) ) {
+          dest.connect(x.destination, x.producer)
+        }
+      }
+    }
+
+    def remove_destination(path:Path, dest:D) = {
+      destination_by_path.remove(path, dest)
+      destination_by_id.remove(dest.id)
+    }
+
+    def can_bind(path:Path, destination:DestinationDTO, consumer:DeliveryConsumer, security:SecurityContext):Result[Zilch, String] = {
+
+      val wildcard = PathParser.containsWildCards(path)
+      var matches = get_destination_matches(path)
+
+      // Should we attempt to auto create the destination?
+      if( !wildcard ) {
+        if ( matches.isEmpty && auto_create_destinations ) {
+          val rc = create_destination(path, security)
+          if( rc.failed ) {
+            return rc.map_success(_=> Zilch);
+          }
+          matches = get_destination_matches(path)
+        }
+        if( matches.isEmpty ) {
+          return Failure("The destination does not exist.")
+        }
+
+        matches.foreach { dest =>
+          if( !dest.can_bind(destination, consumer, security) ) {
+            return Failure("Not authorized to reveive from the destination.")
+          }
+        }
+      }
+      Success(Zilch)
+    }
+
+    def bind(path:Path, destination:DestinationDTO, consumer:DeliveryConsumer, security:SecurityContext):Unit = {
+      var matches = get_destination_matches(path)
+      matches.foreach { dest=>
+        if( dest.can_bind(destination, consumer, security) ) {
+          dest.bind(destination, consumer)
+        }
+      }
+      consumer.retain
+      consumers_by_path.put(path, new ConsumerContext(destination, consumer, security))
+    }
+
+    def unbind(destination:DestinationDTO, consumer:DeliveryConsumer, persistent:Boolean) = {
+      val path = DestinationParser.decode_path(destination.name)
+      if( consumers_by_path.remove(path, new ConsumerContext(destination, consumer, null) ) ) {
+        get_destination_matches(path).foreach{ dest=>
+          dest.unbind(consumer, persistent)
+        }
+        consumer.release
+      }
+
+//      if( persistent ) {
+//          destroy_queue(consumer.binding, security_context).failure_option.foreach{ reason=>
+//            async_die(reason)
+//          }
+//      }
+
+    }
+
+    def can_connect(path:Path, destination:DestinationDTO, producer:BindableDeliveryProducer, security:SecurityContext):Result[Zilch, String] = {
+
+      val wildcard = PathParser.containsWildCards(path)
+      var matches = get_destination_matches(path)
+
+      // Should we attempt to auto create the destination?
+      if( !wildcard ) {
+        if ( matches.isEmpty && auto_create_destinations ) {
+          val rc = create_destination(path, security)
+          if( rc.failed ) {
+            return rc.map_success(_=> Zilch);
+          }
+          matches = get_destination_matches(path)
+        }
+        if( matches.isEmpty ) {
+          return Failure("The destination does not exist.")
+        }
+
+        matches.foreach { dest =>
+          if( !dest.can_connect(destination, producer, security) ) {
+            return Failure("Not authorized to send to the destination.")
+          }
+        }
+      }
+      Success(Zilch)
+
+    }
+
+    def connect(path:Path, destination:DestinationDTO, producer:BindableDeliveryProducer, security:SecurityContext):Unit = {
+      var matches = get_destination_matches(path)
+      matches.foreach { dest=>
+        if( dest.can_connect(destination, producer, security) ) {
+          dest.connect(destination, producer)
+        }
+      }
+      producers_by_path.put(path, new ProducerContext(destination, producer, security))
+    }
+
+    def disconnect(destination:DestinationDTO, producer:BindableDeliveryProducer) = {
+      val path = DestinationParser.decode_path(destination.name)
+      get_destination_matches(path).foreach { dest=>
+        dest.disconnect(producer)
+      }
+      producer.release
+    }
+
+  }
+
+  object topic_domain extends Domain[Topic] {
+
+    val topic_id_counter = new LongCounter
+
+    // Stores durable subscription queues.
+    val durable_subscriptions_by_path = new PathMap[Queue]()
+    val durable_subscriptions_by_id = HashMap[(String,String), Queue]()
+
+
+    override def can_bind(path:Path, destination:DestinationDTO, consumer:DeliveryConsumer, security:SecurityContext):Result[Zilch, String] = {
+      var rc = super.can_bind(path, destination, consumer, security)
+      if( !rc.failed ) {
+        destination match {
+          case destination:DurableSubscriptionDestinationDTO=>
+            // So the user can subscribe to the topic.. but can he create durable sub??
+            val qc = ds_config(destination)
+            if( !can_create_ds(qc, security) ) {
+               return Failure("Not authorized to create the durable subscription.")
+            }
+          case _ =>
+        }
+      }
+      rc
+    }
+
+    def get_or_create_durable_subscription(destination:DurableSubscriptionDestinationDTO):Queue = {
+      durable_subscriptions_by_id.get( (destination.client_id, destination.subscription_id) ).getOrElse {
+        val binding = QueueBinding.create(destination)
+        val qc = ds_config(destination)
+        _create_queue(-1, binding, qc)
+      }
+    }
+
+    def destroy_durable_subscription(queue:Queue):Unit = {
+      val destination = queue.binding.binding_dto.asInstanceOf[DurableSubscriptionDestinationDTO]
+      if( durable_subscriptions_by_id.remove( (destination.client_id, destination.subscription_id) ).isDefined ) {
+        val path = queue.binding.destination
+        durable_subscriptions_by_path.remove(path, queue)
+        var matches = get_destination_matches(path)
+        matches.foreach( _.unbind_durable_subscription(destination, queue) )
+        _destroy_queue(queue.id, null)
+      }
+    }
+
+    def topic_config(name:Path):TopicDTO = {
+      import collection.JavaConversions._
+      import DestinationParser.default._
+      import AsciiBuffer._
+      host.config.topics.find( x=> parseFilter(ascii(x.name)).matches(name) ).getOrElse(new TopicDTO)
+    }
+
+    def can_create_ds(config:DurableSubscriptionDTO, security:SecurityContext) = {
+      if( host.authorizer==null || security==null) {
+        true
+      } else {
+        host.authorizer.can_create(security, host, config)
+      }
+    }
+
+    def ds_config(destination:DurableSubscriptionDestinationDTO):DurableSubscriptionDTO = {
+      import collection.JavaConversions._
+      import DestinationParser.default._
+      import AsciiBuffer._
+
+      val name = DestinationParser.decode_path(destination.name)
+      def matches(x:DurableSubscriptionDTO):Boolean = {
+        if( x.name != null && !parseFilter(ascii(x.name)).matches(name)) {
+          return false
+        }
+        if( x.client_id != null && x.client_id!=x.client_id ) {
+          return false
+        }
+        if( x.subscription_id != null && x.subscription_id!=x.subscription_id ) {
+          return false
+        }
+        true
+      }
+      host.config.durable_subscriptions.find(matches _).getOrElse(new DurableSubscriptionDTO)
+    }
+
+    def bind(queue:Queue) = {
+
+      val destination = queue.binding.binding_dto.asInstanceOf[DurableSubscriptionDestinationDTO]
+      val path = queue.binding.destination
+      val wildcard = PathParser.containsWildCards(path)
+      var matches = get_destination_matches(path)
+
+      // We may need to create the topic...
+      if( !wildcard && matches.isEmpty ) {
+        create_destination(path, null)
+        matches = get_destination_matches(path)
+      }
+
+      durable_subscriptions_by_path.put(path, queue)
+      durable_subscriptions_by_id.put((destination.client_id, destination.subscription_id), queue)
+
+      matches.foreach( _.bind_durable_subscription(destination, queue) )
+    }
+
+    def unbind(queue:Queue) = {
+      val path = queue.binding.destination
+      durable_subscriptions_by_path.remove(path, queue)
+    }
+
+    def create_destination(path:Path, security:SecurityContext):Result[Topic,String] = {
+
+      // We can't create a wild card destination.. only wild card subscriptions.
+      assert( !PathParser.containsWildCards(path) )
+
+      // A new destination is being created...
+      val dto = topic_config(path)
+
+      if(  host.authorizer!=null && security!=null && !host.authorizer.can_create(security, host, dto)) {
+        return new Failure("Not authorized to create the destination")
+      }
+
+      val id = topic_id_counter.incrementAndGet
+      val topic = new Topic(LocalRouter.this, DestinationParser.encode_path(path), dto, id)
+      add_destination(path, topic)
+      Success(topic)
+    }
+
+  }
+
+  object queue_domain extends Domain[Queue] {
+
+    def config(binding:QueueBinding):QueueDTO = {
+      import collection.JavaConversions._
+      import DestinationParser.default._
+
+      def matches(x:QueueDTO):Boolean = {
+        if( x.name != null && !parseFilter(ascii(x.name)).matches(binding.destination)) {
+          return false
+        }
+        true
+      }
+      host.config.queues.find(matches _).getOrElse(new QueueDTO)
+    }
+
+    def can_create_queue(config:QueueDTO, security:SecurityContext) = {
+      if( host.authorizer==null || security==null) {
+        true
+      } else {
+        host.authorizer.can_create(security, host, config)
+      }
+    }
+
+    def bind(queue:Queue) = {
+      val path = queue.binding.destination
+      assert( !PathParser.containsWildCards(path) )
+      add_destination(path, queue)
+    }
+
+    def unbind(queue:Queue) = {
+      val path = queue.binding.destination
+      remove_destination(path, queue)
+    }
+
+    def create_destination(path: Path, security: SecurityContext) = {
+      val dto = new QueueDestinationDTO
+      dto.name = DestinationParser.encode_path(path)
+
+      val binding = QueueDomainQueueBinding.create(dto)
+      val qc = config(binding)
+      if( can_create_queue(qc, security) ) {
+        val queue = _create_queue(-1, binding, qc)
+        import OptionSupport._
+        if( qc.unified.getOrElse(false) ) {
+          // hook up the queue to be a subscriber of the topic.
+          val topic = topic_domain.get_or_create_destination(path, null).success
+          topic.bind(null, queue)
+        }
+        Success(queue)
+      } else {
+        Failure("Not authorized to create the queue")
+      }
+
+    }
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+  //
+  // life cycle methods.
+  //
+  /////////////////////////////////////////////////////////////////////////////
+
+  protected def _start(on_completed: Runnable) = {
+    val tracker = new LoggingTracker("router startup", dispatch_queue)
+    if( host.store!=null ) {
+      val task = tracker.task("list_queues")
+      host.store.list_queues { queue_keys =>
+        for( queue_key <- queue_keys) {
+          val task = tracker.task("load queue: "+queue_key)
+          // Use a global queue to so we concurrently restore
+          // the queues.
+          globalQueue {
+            host.store.get_queue(queue_key) { x =>
+              x match {
+                case Some(record)=>
+                  dispatch_queue {
+                    _create_queue(record.key, QueueBinding.create(record.binding_kind, record.binding_data), null)
+                    task.run
+                  }
+                case _ => task.run
+              }
+            }
+          }
+        }
+        task.run
+      }
+    }
+
+    import OptionSupport._
+    if(host.config.regroup_connections.getOrElse(false)) {
+      schedule_connection_regroup
+    }
+
+    tracker.callback(on_completed)
+  }
+
+  protected def _stop(on_completed: Runnable) = {
+    val tracker = new LoggingTracker("router shutdown", dispatch_queue)
+    queues_by_id.valuesIterator.foreach { queue=>
+      tracker.stop(queue)
+    }
+    tracker.callback(on_completed)
+  }
+
+
+  // Try to periodically re-balance connections so that consumers/producers
+  // are grouped onto the same thread.
+  def schedule_connection_regroup:Unit = dispatch_queue.after(1, TimeUnit.SECONDS) {
+    if(service_state.is_started) {
+      connection_regroup
+      schedule_connection_regroup
+    }
+  }
+
+  def connection_regroup = {
+    // this should really be much more fancy.  It should look at the messaging
+    // rates between producers and consumers, look for natural data flow partitions
+    // and then try to equally divide the load over the available processing
+    // threads/cores.
+
+
+
+    // For the topics, just collocate the producers onto the first consumer's thread.
+    topic_domain.destinations.foreach { node =>
+
+      node.consumers.headOption.foreach{ consumer =>
+        node.producers.foreach { r=>
+          r.collocate(consumer.dispatch_queue)
+        }
+      }
+    }
+
+
+    queue_domain.destinations.foreach { queue=>
+      queue.dispatch_queue {
+
+        // Collocate the queue's with the first consumer
+        // TODO: change this so it collocates with the fastest consumer.
+
+        queue.all_subscriptions.headOption.map( _._1 ).foreach { consumer=>
+          queue.collocate( consumer.dispatch_queue )
+        }
+
+        // Collocate all the producers with the queue..
+
+        queue.inbound_sessions.foreach { session =>
+          session.producer.collocate( queue.dispatch_queue )
+        }
+      }
+
+    }
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+  //
+  // destination/domain management methods.
+  //
+  /////////////////////////////////////////////////////////////////////////////
+
+  def domain(destination: DestinationDTO) = destination match {
+    case x:TopicDestinationDTO => topic_domain
+    case x:DurableSubscriptionDestinationDTO => topic_domain
+    case x:QueueDestinationDTO => queue_domain
+    case _ => throw new RuntimeException("Unknown domain type: "+destination.getClass)
+  }
+
+  def bind(destination: Array[DestinationDTO], consumer: DeliveryConsumer, security: SecurityContext) = {
+    consumer.retain
+    val paths = destination.map(x=> (DestinationParser.decode_path(x.name), x) )
+    dispatch_queue ! {
+      val failures = paths.map(x=> domain(x._2).can_bind(x._1, x._2, consumer, security) ).flatMap( _.failure_option )
+      val rc = if( !failures.isEmpty ) {
+        Failure(failures.mkString("; "))
+      } else {
+        paths.foreach { x=>
+          domain(x._2).bind(x._1, x._2, consumer, security)
+        }
+        Success(Zilch)
+      }
+      consumer.release
+      rc
+    }
+  }
+
+  def unbind(destinations: Array[DestinationDTO], consumer: DeliveryConsumer, persistent:Boolean=false) = {
+    consumer.retain
+    dispatch_queue {
+      destinations.foreach { destination=>
+        domain(destination).unbind(destination, consumer, persistent)
+      }
+      consumer.release
+    }
+  }
+
+  def connect(destinations: Array[DestinationDTO], producer: BindableDeliveryProducer, security: SecurityContext) = {
+    producer.retain
+    val paths = destinations.map(x=> (DestinationParser.decode_path(x.name), x) )
+    dispatch_queue ! {
+
+      val failures = paths.map(x=> domain(x._2).can_connect(x._1, x._2, producer, security) ).flatMap( _.failure_option )
+
+      if( !failures.isEmpty ) {
+        producer.release
+        Failure(failures.mkString("; "))
+      } else {
+        paths.foreach { x=>
+          domain(x._2).connect(x._1, x._2, producer, security)
+        }
+        producer.connected()
+        Success(Zilch)
+      }
+    }
+  }
+
+  def disconnect(destinations:Array[DestinationDTO], producer:BindableDeliveryProducer) = {
+    dispatch_queue {
+      destinations.foreach { destination=>
+        domain(destination).disconnect(destination, producer)
+      }
+      producer.disconnected()
+      producer.release()
+    }
+  }
+
+  def get_or_create_destination(id: DestinationDTO, security: SecurityContext) = dispatch_queue ! {
+    _get_or_create_destination(id, security)
+  }
+
+  /**
+   * Returns the previously created queue if it already existed.
+   */
+  def _get_or_create_destination(dto: DestinationDTO, security:SecurityContext): Result[DomainDestination, String] = {
+    val path = DestinationParser.decode_path(dto.name)
+    domain(dto).get_or_create_destination(path, security)
+  }
+
+
+  /////////////////////////////////////////////////////////////////////////////
+  //
+  // Queue management methods.  Queues are multi-purpose and get used by both
+  // the queue domain and topic domain.
+  //
+  /////////////////////////////////////////////////////////////////////////////
+
+  var queues_by_binding = HashMap[QueueBinding, Queue]()
+  var queues_by_id = HashMap[Long, Queue]()
+
+  /**
+   * Gets an existing queue.
+   */
+  def get_queue(dto:DestinationDTO) = dispatch_queue ! {
+    queues_by_binding.get(QueueBinding.create(dto))
+  }
+
+  /**
+   * Gets an existing queue.
+   */
+  def get_queue(id:Long) = dispatch_queue ! {
+    queues_by_id.get(id)
+  }
+
+  def _create_queue(id:Long, binding:QueueBinding, config:QueueDTO):Queue = {
+
+    var qid = id
+    if( qid == -1 ) {
+      qid = host.queue_id_counter.incrementAndGet
+    }
+
+    val queue = new Queue(this, qid, binding, config)
+    if( queue.tune_persistent && id == -1 ) {
+
+      val record = new QueueRecord
+      record.key = qid
+      record.binding_data = binding.binding_data
+      record.binding_kind = binding.binding_kind
+
+      host.store.add_queue(record) { rc => Unit }
+
+    }
+
+    queue.start
+    queues_by_binding.put(binding, queue)
+    queues_by_id.put(queue.id, queue)
+
+    // this causes the queue to get registered in the right location in
+    // the router.
+    binding.bind(this, queue)
+    queue
+  }
+
+  /**
+   * Returns true if the queue no longer exists.
+   */
+  def destroy_queue(id:Long, security:SecurityContext) = dispatch_queue ! { _destroy_queue(id,security) }
+
+  def _destroy_queue(id:Long, security:SecurityContext):Result[Zilch, String] = {
+    queues_by_id.get(id) match {
+      case Some(queue) =>
+        _destroy_queue(queue,security)
+      case None =>
+        Failure("Does not exist")
+    }
+  }
+
+  /**
+   * Returns true if the queue no longer exists.
+   */
+  def destroy_queue(dto:DestinationDTO, security:SecurityContext) = dispatch_queue ! { _destroy_queue(dto, security) }
+
+  def _destroy_queue(dto:DestinationDTO, security:SecurityContext):Result[Zilch, String] = {
+    queues_by_binding.get(QueueBinding.create(dto)) match {
+      case Some(queue) =>
+        _destroy_queue(queue, security)
+      case None =>
+        Failure("Does not exist")
+    }
+  }
+
+  def _destroy_queue(queue:Queue, security:SecurityContext):Result[Zilch, String] = {
+
+    if( security!=null && queue.config.acl!=null ) {
+      if( !host.authorizer.can_destroy(security, host, queue.config) ) {
+        return Failure("Not authorized to destroy")
+      }
+    }
+
+    queue.binding.unbind(this, queue)
+    queues_by_binding.remove(queue.binding)
+    queues_by_id.remove(queue.id)
+    queue.stop
+    if( queue.tune_persistent ) {
+      queue.dispatch_queue ^ {
+        host.store.remove_queue(queue.id){x=> Unit}
+      }
+    }
+    Success(Zilch)
+  }
+
+}

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1063582&r1=1063581&r2=1063582&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jan 26 03:10:35 2011
@@ -19,20 +19,17 @@ package org.apache.activemq.apollo.broke
 import java.util.concurrent.TimeUnit
 
 import org.fusesource.hawtdispatch._
-import _root_.org.fusesource.hawtdispatch.ScalaDispatchHelpers._
 import java.util.concurrent.atomic.AtomicInteger
 
-import collection.{SortedMap}
-import org.apache.activemq.apollo.broker.store.{StoreUOW}
 import protocol.ProtocolFactory
 import collection.mutable.ListBuffer
 import org.apache.activemq.apollo.broker.store._
 import org.apache.activemq.apollo.util._
 import org.apache.activemq.apollo.util.list._
-import org.fusesource.hawtdispatch.{Dispatch, ListEventAggregator, DispatchQueue, BaseRetained}
-import org.apache.activemq.apollo.dto.QueueDTO
+import org.fusesource.hawtdispatch.{ListEventAggregator, DispatchQueue, BaseRetained}
 import OptionSupport._
 import security.SecurityContext
+import org.apache.activemq.apollo.dto.{DestinationDTO, QueueDTO}
 
 object Queue extends Log {
   val subcsription_counter = new AtomicInteger(0)
@@ -47,7 +44,9 @@ import Queue._
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class Queue(val host: VirtualHost, var id:Long, val binding:Binding, var config:QueueDTO) extends BaseRetained with Route with DeliveryConsumer with BaseService {
+class Queue(val router: LocalRouter, val id:Long, val binding:QueueBinding, var config:QueueDTO) extends BaseRetained with BindableDeliveryProducer with DeliveryConsumer with BaseService with DomainDestination {
+
+  def host = router.host
 
   var inbound_sessions = Set[DeliverySession]()
   var all_subscriptions = Map[DeliveryConsumer, Subscription]()
@@ -189,48 +188,27 @@ class Queue(val host: VirtualHost, var i
 
     if( tune_persistent ) {
 
-      if( id == -1 ) {
-        id = host.queue_id_counter.incrementAndGet
-
-        val record = new QueueRecord
-        record.key = id
-        record.binding_data = binding.binding_data
-        record.binding_kind = binding.binding_kind
-
-        host.store.add_queue(record) { rc =>
-          dispatch_queue {
-            completed
-          }
-        }
-
-      } else {
-
-        host.store.list_queue_entry_ranges(id, tune_swap_range_size) { ranges=>
-          dispatch_queue {
-            if( ranges!=null && !ranges.isEmpty ) {
-
-              ranges.foreach { range =>
-                val entry = new QueueEntry(Queue.this, range.first_entry_seq).init(range)
-                entries.addLast(entry)
-
-                message_seq_counter = range.last_entry_seq + 1
-                enqueue_item_counter += range.count
-                enqueue_size_counter += range.size
-                tail_entry = new QueueEntry(Queue.this, next_message_seq)
-              }
-
-              debug("restored: "+enqueue_item_counter)
+      host.store.list_queue_entry_ranges(id, tune_swap_range_size) { ranges=>
+        dispatch_queue {
+          if( ranges!=null && !ranges.isEmpty ) {
+
+            ranges.foreach { range =>
+              val entry = new QueueEntry(Queue.this, range.first_entry_seq).init(range)
+              entries.addLast(entry)
+
+              message_seq_counter = range.last_entry_seq + 1
+              enqueue_item_counter += range.count
+              enqueue_size_counter += range.size
+              tail_entry = new QueueEntry(Queue.this, next_message_seq)
             }
-            completed
+
+            debug("restored: "+enqueue_item_counter)
           }
+          completed
         }
-        
       }
 
     } else {
-      if( id == -1 ) {
-        id = host.queue_id_counter.incrementAndGet
-      }
       completed
     }
   }
@@ -562,12 +540,64 @@ class Queue(val host: VirtualHost, var i
 
   def disconnected() = throw new RuntimeException("unsupported")
 
+  def can_bind(destination:DestinationDTO, consumer:DeliveryConsumer, security: SecurityContext):Boolean = {
+    if(  host.authorizer!=null && security!=null ) {
+      if( consumer.browser ) {
+        if( !host.authorizer.can_receive_from(security, host, config) ) {
+          return false;
+        }
+      } else {
+        if( !host.authorizer.can_consume_from(security, host, config) ) {
+          return false
+        }
+      }
+    }
+    return true;
+  }
+
+  def bind(destination:DestinationDTO, consumer: DeliveryConsumer) = {
+    bind(consumer::Nil)
+  }
+  def unbind(consumer: DeliveryConsumer, persistent:Boolean) = {
+    unbind(consumer::Nil)
+  }
+
+  def can_connect(destination:DestinationDTO, producer:BindableDeliveryProducer, security:SecurityContext):Boolean = {
+    val authorizer = host.authorizer
+    if( authorizer!=null && security!=null && !authorizer.can_send_to(security, host, config) ) {
+      false
+    } else {
+      true
+    }
+  }
+
+  def connect (destination:DestinationDTO, producer:BindableDeliveryProducer) = {
+    import OptionSupport._
+    if( config.unified.getOrElse(false) ) {
+      // this is a unified queue.. actually have the produce bind to the topic, instead of the
+      val topic = router.topic_domain.get_or_create_destination(binding.destination, null).success
+      topic.connect(destination, producer)
+    } else {
+      producer.bind(this::Nil)
+    }
+  }
+
+  def disconnect (producer:BindableDeliveryProducer) = {
+    producer.unbind(this::Nil)
+  }
+
+  def name: String = binding.label
+
+  override def connection:Option[BrokerConnection] = None
+
+
   /////////////////////////////////////////////////////////////////////
   //
   // Implementation methods.
   //
   /////////////////////////////////////////////////////////////////////
 
+
   private def next_message_seq = {
     val rc = message_seq_counter
     message_seq_counter += 1
@@ -605,12 +635,6 @@ class Queue(val host: VirtualHost, var i
     }
   }
 
-  def collocate(value:DispatchQueue):Unit = {
-    if( value.getTargetQueue ne dispatch_queue.getTargetQueue ) {
-      debug("co-locating %s with %s", dispatch_queue.getLabel, value.getLabel);
-      this.dispatch_queue.setTargetQueue(value.getTargetQueue)
-    }
-  }
 }
 
 object QueueEntry extends Sizer[QueueEntry] {

Copied: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala (from r1062213, activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala?p2=activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala&p1=activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala&r1=1062213&r2=1063582&rev=1063582&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala Wed Jan 26 03:10:35 2011
@@ -22,7 +22,7 @@ import org.apache.activemq.apollo.filter
 import Buffer._
 import org.apache.activemq.apollo.dto._
 import org.apache.activemq.apollo.util.{OptionSupport, ClassFinder}
-import org.apache.activemq.apollo.util.path.{Path, Part}
+import org.apache.activemq.apollo.util.path.Path
 
 /**
  * <p>
@@ -30,11 +30,11 @@ import org.apache.activemq.apollo.util.p
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-object BindingFactory {
+object QueueBinding {
 
   trait Provider {
-    def create(binding_kind:AsciiBuffer, binding_data:Buffer):Binding
-    def create(binding_dto:BindingDTO):Binding
+    def create(binding_kind:AsciiBuffer, binding_data:Buffer):QueueBinding
+    def create(binding_dto:DestinationDTO):QueueBinding
   }
 
   def discover = {
@@ -44,7 +44,7 @@ object BindingFactory {
 
   var providers = discover
 
-  def create(binding_kind:AsciiBuffer, binding_data:Buffer):Binding = {
+  def create(binding_kind:AsciiBuffer, binding_data:Buffer):QueueBinding = {
     providers.foreach { provider=>
       val rc = provider.create(binding_kind, binding_data)
       if( rc!=null ) {
@@ -53,7 +53,7 @@ object BindingFactory {
     }
     throw new IllegalArgumentException("Invalid binding type: "+binding_kind);
   }
-  def create(binding_dto:BindingDTO):Binding = {
+  def create(binding_dto:DestinationDTO):QueueBinding = {
     providers.foreach { provider=>
       val rc = provider.create(binding_dto)
       if( rc!=null ) {
@@ -71,7 +71,7 @@ object BindingFactory {
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-trait Binding {
+trait QueueBinding {
 
   /**
    * A user friendly description of the binding.
@@ -82,80 +82,67 @@ trait Binding {
    * Wires a queue into the a virtual host based on the binding information contained
    * in the buffer.
    */
-  def bind(node:RoutingNode, queue:Queue)
+  def bind(node:LocalRouter, queue:Queue)
   
-  def unbind(node:RoutingNode, queue:Queue)
+  def unbind(node:LocalRouter, queue:Queue)
 
   def binding_kind:AsciiBuffer
 
   def binding_data:Buffer
 
-  def binding_dto:BindingDTO
+  def binding_dto:DestinationDTO
 
   def message_filter:BooleanExpression = ConstantExpression.TRUE
 
-  def matches(config:QueueDTO):Boolean = {
-    import DestinationParser.default._
-    import OptionSupport._
-    var rc = (o(config.name).map{ x=> parseFilter(ascii(x)).matches(destination) }.getOrElse(true))
-    rc = rc && (o(config.kind).map{ x=> x == binding_kind.toString }.getOrElse(true))
-    rc
-  }
-
   def destination:Path
 }
 
-object QueueBinding {
+object QueueDomainQueueBinding extends QueueBinding.Provider {
+
   val POINT_TO_POINT_KIND = new AsciiBuffer("ptp")
   val DESTINATION_PATH = new AsciiBuffer("default");
-}
-
-import QueueBinding._
-
-class QueueBindingFactory extends BindingFactory.Provider {
 
   def create(binding_kind:AsciiBuffer, binding_data:Buffer) = {
     if( binding_kind == POINT_TO_POINT_KIND ) {
-      val dto = new QueueBindingDTO
+      val dto = new QueueDestinationDTO
       dto.name = binding_data.ascii.toString
-      new QueueBinding(binding_data, dto)
+      new QueueDomainQueueBinding(binding_data, dto)
     } else {
       null
     }
   }
 
-  def create(binding_dto:BindingDTO) = {
-    if( binding_dto.isInstanceOf[QueueBindingDTO] ) {
-      val ptp_dto = binding_dto.asInstanceOf[QueueBindingDTO]
+  def create(binding_dto:DestinationDTO) = {
+    if( binding_dto.isInstanceOf[QueueDestinationDTO] ) {
+      val ptp_dto = binding_dto.asInstanceOf[QueueDestinationDTO]
       val data = new AsciiBuffer(ptp_dto.name).buffer
-      new QueueBinding(data, ptp_dto)
+      new QueueDomainQueueBinding(data, ptp_dto)
     } else {
       null
     }
   }
 }
 
+
 /**
  * <p>
  * </p>
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class QueueBinding(val binding_data:Buffer, val binding_dto:QueueBindingDTO) extends Binding {
+class QueueDomainQueueBinding(val binding_data:Buffer, val binding_dto:QueueDestinationDTO) extends QueueBinding {
+
+  import QueueDomainQueueBinding._
 
   val destination = DestinationParser.decode_path(binding_dto.name)
   def binding_kind = POINT_TO_POINT_KIND
 
-  def unbind(node: RoutingNode, queue: Queue) = {
-    if( node.unified ) {
-      node.remove_broadcast_consumer(queue)
-    }
+  def unbind(node: LocalRouter, queue: Queue) = {
+    node.queue_domain.unbind(queue)
   }
 
-  def bind(node: RoutingNode, queue: Queue) = {
-    if( node.unified ) {
-      node.add_broadcast_consumer(queue)
-    }
+  def bind(node: LocalRouter, queue: Queue) = {
+    node.queue_domain.bind(queue)
   }
 
   def label = binding_dto.name
@@ -163,56 +150,54 @@ class QueueBinding(val binding_data:Buff
   override def hashCode = binding_kind.hashCode ^ binding_data.hashCode
 
   override def equals(o:Any):Boolean = o match {
-    case x: QueueBinding => x.binding_data == binding_data
+    case x: QueueDomainQueueBinding => x.binding_data == binding_data
     case _ => false
   }
 
 }
 
 
-object SubscriptionBinding {
-  val DURABLE_SUB_KIND = new AsciiBuffer("ds")
-}
+object DurableSubscriptionQueueBinding extends QueueBinding.Provider {
 
-import SubscriptionBinding._
+  val DURABLE_SUB_KIND = new AsciiBuffer("ds")
 
-class SubscriptionBindingFactory extends BindingFactory.Provider {
   def create(binding_kind:AsciiBuffer, binding_data:Buffer) = {
     if( binding_kind == DURABLE_SUB_KIND ) {
-      new SubscriptionBinding(binding_data, JsonCodec.decode(binding_data, classOf[SubscriptionBindingDTO]))
+      new DurableSubscriptionQueueBinding(binding_data, JsonCodec.decode(binding_data, classOf[DurableSubscriptionDestinationDTO]))
     } else {
       null
     }
   }
-  def create(binding_dto:BindingDTO) = {
-    if( binding_dto.isInstanceOf[SubscriptionBindingDTO] ) {
-      new SubscriptionBinding(JsonCodec.encode(binding_dto), binding_dto.asInstanceOf[SubscriptionBindingDTO])
+  def create(binding_dto:DestinationDTO) = {
+    if( binding_dto.isInstanceOf[DurableSubscriptionDestinationDTO] ) {
+      new DurableSubscriptionQueueBinding(JsonCodec.encode(binding_dto), binding_dto.asInstanceOf[DurableSubscriptionDestinationDTO])
     } else {
       null
     }
   }
-
 }
 
+
 /**
  * <p>
  * </p>
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class SubscriptionBinding(val binding_data:Buffer, val binding_dto:SubscriptionBindingDTO) extends Binding {
+class DurableSubscriptionQueueBinding(val binding_data:Buffer, val binding_dto:DurableSubscriptionDestinationDTO) extends QueueBinding {
+  import DurableSubscriptionQueueBinding._
 
   val destination = DestinationParser.decode_path(binding_dto.name)
 
   def binding_kind = DURABLE_SUB_KIND
 
 
-  def unbind(node: RoutingNode, queue: Queue) = {
-    node.remove_broadcast_consumer(queue)
+  def unbind(router: LocalRouter, queue: Queue) = {
+    router.topic_domain.unbind(queue)
   }
 
-  def bind(node: RoutingNode, queue: Queue) = {
-    node.add_broadcast_consumer(queue)
+  def bind(router: LocalRouter, queue: Queue) = {
+    router.topic_domain.bind(queue)
   }
 
   def label = {
@@ -229,7 +214,7 @@ class SubscriptionBinding(val binding_da
   override def hashCode = binding_kind.hashCode ^ binding_data.hashCode
 
   override def equals(o:Any):Boolean = o match {
-    case x: SubscriptionBinding => x.binding_data == binding_data
+    case x: DurableSubscriptionQueueBinding => x.binding_data == binding_data
     case _ => false
   }
 
@@ -240,42 +225,23 @@ class SubscriptionBinding(val binding_da
       SelectorParser.parse(binding_dto.filter)
     }
   }
-
-  override def matches(config: QueueDTO): Boolean = {
-    import OptionSupport._
-    var rc = super.matches(config)
-    rc = rc && (o(config.client_id).map{ x=> x == binding_dto.client_id }.getOrElse(true))
-    rc = rc && (o(config.subscription_id).map{ x=> x == binding_dto.subscription_id }.getOrElse(true))
-    rc
-  }
 }
 
 
-object TempBinding {
+object TempQueueBinding extends QueueBinding.Provider {
   val TEMP_DATA = new AsciiBuffer("")
   val TEMP_KIND = new AsciiBuffer("tmp")
-  val TEMP_DTO = new TempBindingDTO
-}
-
-import TempBinding._
-
-class TempBindingFactory extends BindingFactory.Provider {
+  val TEMP_DTO = null
 
   def create(binding_kind:AsciiBuffer, binding_data:Buffer) = {
     if( binding_kind == TEMP_KIND ) {
-      new TempBinding("", "")
+      new TempQueueBinding("", "")
     } else {
       null
     }
   }
 
-  def create(binding_dto:BindingDTO) = {
-    if( binding_dto.isInstanceOf[TempBindingDTO] ) {
-      new TempBinding("", "")
-    } else {
-      null
-    }
-  }
+  def create(binding_dto:DestinationDTO) = throw new UnsupportedOperationException
 }
 
 /**
@@ -284,7 +250,9 @@ class TempBindingFactory extends Binding
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class TempBinding(val key:AnyRef, val label:String) extends Binding {
+class TempQueueBinding(val key:AnyRef, val label:String) extends QueueBinding {
+  import TempQueueBinding._
+
   def this(c:DeliveryConsumer) = this(c, c.connection.map(_.transport.getRemoteAddress).getOrElse("known") )
 
   val destination = null
@@ -292,22 +260,16 @@ class TempBinding(val key:AnyRef, val la
   def binding_dto = TEMP_DTO
   def binding_data = TEMP_DATA
 
-  def unbind(node: RoutingNode, queue: Queue) = {
-    if( node.unified ) {
-      node.remove_broadcast_consumer(queue)
-    }
+  def unbind(router: LocalRouter, queue: Queue) = {
   }
 
-  def bind(node: RoutingNode, queue: Queue) = {
-    if( node.unified ) {
-      node.add_broadcast_consumer(queue)
-    }
+  def bind(router: LocalRouter, queue: Queue) = {
   }
 
   override def hashCode = if(key==null) 0 else key.hashCode
 
   override def equals(o:Any):Boolean = o match {
-    case x: TempBinding => x.key == key
+    case x: TempQueueBinding => x.key == key
     case _ => false
   }
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1063582&r1=1063581&r2=1063582&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Wed Jan 26 03:10:35 2011
@@ -16,500 +16,61 @@
  */
 package org.apache.activemq.apollo.broker
 
-import _root_.java.util.concurrent.atomic.AtomicLong
-import _root_.org.fusesource.hawtbuf._
-import _root_.org.fusesource.hawtdispatch._
 import org.fusesource.hawtdispatch._
-import _root_.org.fusesource.hawtdispatch.ScalaDispatchHelpers._
-
-import collection.JavaConversions
 import org.apache.activemq.apollo.util._
-import collection.mutable.{ListBuffer, HashMap}
+import path.Path
 import scala.collection.immutable.List
-import org.apache.activemq.apollo.broker.store.{StoreUOW, QueueRecord}
-import Buffer._
-import org.apache.activemq.apollo.util.path.{Path, Part, PathMap, PathParser}
-import java.util.ArrayList
 import org.apache.activemq.apollo.dto._
 import security.SecurityContext
+import store.StoreUOW
+import util.continuations._
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-object Router extends Log {
-  val TOPIC_DOMAIN = ascii("topic");
-  val QUEUE_DOMAIN = ascii("queue");
-  val TEMP_TOPIC_DOMAIN = ascii("temp-topic");
-  val TEMP_QUEUE_DOMAIN = ascii("temp-queue");
-
-  val QUEUE_KIND = ascii("queue");
-  val DEFAULT_QUEUE_PATH = ascii("default");
-}
-
-/**
- * Provides a non-blocking concurrent producer to consumer
- * routing implementation.
- *
- * DeliveryProducers create a route object for each destination
- * they will be producing to.  Once the route is
- * connected to the router, the producer can use
- * the route.targets list without synchronization to
- * get the current set of consumers that are bound
- * to the destination. 
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-class Router(val host:VirtualHost) extends DispatchLogging {
-
-  override protected def log = Router
-
-  import Router._
-
-  val destination_id_counter = new LongCounter
-
-  protected def dispatchQueue:DispatchQueue = host.dispatch_queue
-
-  var queue_bindings = HashMap[Binding, Queue]()
-  var queues = HashMap[Long, Queue]()
-
-  // Only stores simple paths, used for wild card lookups.
-  var destinations = new PathMap[RoutingNode]()
-  // Can store consumers on wild cards paths
-  val broadcast_consumers = new PathMap[DeliveryConsumer]()
-  // Can store bindings on wild cards paths
-  val bindings = new PathMap[Queue]()
-
-  private def is_topic(destination:Destination) = {
-    destination.domain match {
-      case TOPIC_DOMAIN => true
-      case TEMP_TOPIC_DOMAIN => true
-      case _ => false
-    }
-  }
-
-  private val ALL = new Path({
-    val rc = new ArrayList[Part](1)
-    rc.add(Part.ANY_DESCENDANT)
-    rc
-  })
-
-  def routing_nodes:Iterable[RoutingNode] = JavaConversions.asScalaIterable(destinations.get(ALL))
-  
-  def _get_or_create_destination(path:Path, security:SecurityContext) = {
-    // We can't create a wild card destination.. only wild card subscriptions.
-    assert( !PathParser.containsWildCards(path) )
-    var rc = destinations.chooseValue( path )
-    if( rc == null ) {
-      _create_destination(path, security)
-    } else {
-      Success(rc)
-    }
-  }
+trait Router extends Service {
 
-  def _get_destination(path:Path) = {
-    Option(destinations.chooseValue( path ))
-  }
-
-  def _create_destination(path:Path, security:SecurityContext):Result[RoutingNode,String] = {
+  def host:VirtualHost
 
-    // We can't create a wild card destination.. only wild card subscriptions.
-    assert( !PathParser.containsWildCards(path) )
+  def get_queue(dto:Long):Option[Queue] @suspendable
 
-    // A new destination is being created...
-    val config = host.destination_config(path).getOrElse(new DestinationDTO)
+  def bind(destinations:Array[DestinationDTO], consumer:DeliveryConsumer, security:SecurityContext) : Result[Zilch,String] @suspendable
 
-    if(  host.authorizer!=null && security!=null && !host.authorizer.can_create(security, host, config)) {
-      return new Failure("Not authorized to create the destination")
-    }
+  def unbind(destinations:Array[DestinationDTO], consumer:DeliveryConsumer, persistent:Boolean=false)
 
-    val rc = new RoutingNode(this, path, config)
-    destinations.put(path, rc)
+  def connect(destinations:Array[DestinationDTO], producer:BindableDeliveryProducer, security:SecurityContext): Result[Zilch,String] @suspendable
 
-    // bind any matching wild card subs
-    import JavaConversions._
-    broadcast_consumers.get( path ).foreach { c=>
-      rc.add_broadcast_consumer(c)
-    }
-    bindings.get( path ).foreach { queue=>
-      rc.add_queue(queue)
-    }
-    Success(rc)
-  }
-
-  def get_destination_matches(path:Path) = {
-    import JavaConversions._
-    asScalaIterable(destinations.get( path ))
-  }
-
-  def _create_queue(id:Long, binding:Binding, security:SecurityContext):Result[Queue,String] = {
-
-    val config = host.queue_config(binding).getOrElse(new QueueDTO)
-    if( host.authorizer!=null && security!=null && !host.authorizer.can_create(security, host, config) ) {
-      return Failure("Not authorized to create the queue")
-    }
-
-    var qid = id
-    if( qid == -1 ) {
-      qid = host.queue_id_counter.incrementAndGet
-    }
-
-    val queue = new Queue(host, qid, binding, config)
-    if( queue.tune_persistent && id == -1 ) {
-
-      val record = new QueueRecord
-      record.key = qid
-      record.binding_data = binding.binding_data
-      record.binding_kind = binding.binding_kind
-
-      host.store.add_queue(record) { rc => Unit }
-
-    }
-    queue.start
-    queue_bindings.put(binding, queue)
-    queues.put(queue.id, queue)
-
-    // Not all queues are bound to destinations.
-    val name = binding.destination
-    if( name!=null ) {
-      bindings.put(name, queue)
-      // make sure the destination is created if this is not a wild card sub
-      if( !PathParser.containsWildCards(name) ) {
-        _get_destination(name) match {
-          case Some(node)=>
-            node.add_queue(queue)
-          case None=>
-            _create_destination(name, null)
-        }
-      } else {
-        get_destination_matches(name).foreach( node=>
-          node.add_queue(queue)
-        )
-      }
-
-    }
-    Success(queue)
-
-  }
-
-  def create_queue(record:QueueRecord, security:SecurityContext) = {
-    _create_queue(record.key, BindingFactory.create(record.binding_kind, record.binding_data), security)
-  }
-
-  /**
-   * Returns the previously created queue if it already existed.
-   */
-  def _get_or_create_queue(dto: BindingDTO, security:SecurityContext): Result[Queue, String] = {
-    val binding = BindingFactory.create(dto)
-    val queue = queue_bindings.get(binding) match {
-      case Some(queue) => Success(queue)
-      case None => _create_queue(-1, binding, security)
-    }
-    queue
-  }
-
-  def get_or_create_queue(id:BindingDTO, security:SecurityContext) = dispatchQueue ! {
-    _get_or_create_queue(id, security)
-  }
-
-  /**
-   * Returns true if the queue no longer exists.
-   */
-  def destroy_queue(dto:BindingDTO, security:SecurityContext) = dispatchQueue ! { _destroy_queue(dto, security) }
-
-  def _destroy_queue(dto:BindingDTO, security:SecurityContext):Result[Zilch, String] = {
-    queue_bindings.get(BindingFactory.create(dto)) match {
-      case Some(queue) =>
-        _destroy_queue(queue, security)
-      case None =>
-        Failure("Does not exist")
-    }
-  }
-
-  /**
-   * Returns true if the queue no longer exists.
-   */
-  def destroy_queue(id:Long, security:SecurityContext) = dispatchQueue ! { _destroy_queue(id,security) }
-
-  def _destroy_queue(id:Long, security:SecurityContext):Result[Zilch, String] = {
-    queues.get(id) match {
-      case Some(queue) =>
-        _destroy_queue(queue,security)
-      case None =>
-        Failure("Does not exist")
-    }
-  }
-
-  def _destroy_queue(queue:Queue, security:SecurityContext):Result[Zilch, String] = {
-
-    if( security!=null && queue.config.acl!=null ) {
-      if( !host.authorizer.can_destroy(security, host, queue.config) ) {
-        return Failure("Not authorized to destroy")
-      }
-    }
-
-    queue_bindings.remove(queue.binding)
-    queues.remove(queue.id)
-
-    val name = queue.binding.destination
-    if( name!=null ) {
-      get_destination_matches(name).foreach( node=>
-        node.remove_queue(queue)
-      )
-    }
-    queue.stop
-    if( queue.tune_persistent ) {
-      queue.dispatch_queue ^ {
-        host.store.remove_queue(queue.id){x=> Unit}
-      }
-    }
-    Success(Zilch)
-  }
-
-  /**
-   * Gets an existing queue.
-   */
-  def get_queue(dto:BindingDTO) = dispatchQueue ! {
-    queue_bindings.get(BindingFactory.create(dto))
-  }
-
-  /**
-   * Gets an existing queue.
-   */
-  def get_queue(id:Long) = dispatchQueue ! {
-    queues.get(id)
-  }
-
-  def bind(destination:Destination, consumer:DeliveryConsumer, security:SecurityContext) = {
-    consumer.retain
-    dispatchQueue ! {
-
-      def do_bind:Result[Zilch, String] = {
-        assert( is_topic(destination) )
-        val name = destination.name
-
-        // A new destination is being created...
-        def config = host.destination_config(name).getOrElse(new DestinationDTO)
-
-        if( host.authorizer!=null && security!=null && !host.authorizer.can_receive_from(security, host, config) ) {
-          return new Failure("Not authorized to receive from the destination")
-        }
-
-        // make sure the destination is created if this is not a wild card sub
-        if( !PathParser.containsWildCards(name) ) {
-          val rc = _get_or_create_destination(name, security)
-          if( rc.failed ) {
-            return rc.map_success(_=> Zilch);
-          }
-        }
-
-        get_destination_matches(name).foreach{ node=>
-          node.add_broadcast_consumer(consumer)
-        }
-        broadcast_consumers.put(name, consumer)
-        Success(Zilch)
-      }
-
-      do_bind
-
-    }
-  }
-
-  def unbind(destination:Destination, consumer:DeliveryConsumer) = dispatchQueue {
-    assert( is_topic(destination) )
-    val name = destination.name
-    broadcast_consumers.remove(name, consumer)
-    get_destination_matches(name).foreach{ node=>
-      node.remove_broadcast_consumer(consumer)
-    }
-    consumer.release
-  }
-
-
-  def connect(destination:Destination, producer:DeliveryProducer, security:SecurityContext)(completed: (Result[DeliveryProducerRoute,String])=>Unit) = {
-
-    val route = new DeliveryProducerRoute(this, destination, producer) {
-      override def on_connected = {
-        completed(Success(this));
-      }
-    }
-
-    def do_connect:Result[Zilch, String] = {
-      val topic = is_topic(destination)
-
-
-      var destination_security = security
-      // Looking up the queue will cause it to get created if it does not exist.
-      val queue = if( topic ) {
-
-        def config = host.destination_config(destination.name).getOrElse(new DestinationDTO)
-        if( host.authorizer!=null && security!=null && !host.authorizer.can_send_to(security, host, config)) {
-          return new Failure("Not authorized to send to the destination")
-        }
-        None
-
-      } else {
-
-        val dto = new QueueBindingDTO
-        dto.name = DestinationParser.encode_path(destination.name)
-
-        // Can we send to the queue?
-        def config = host.queue_config(dto).getOrElse(new QueueDTO)
-        if( host.authorizer!=null && security!=null && !host.authorizer.can_send_to(security, host, config) ) {
-          return Failure("Not authorized to send to the queue")
-        }
-
-        destination_security = null
-        val rc = _get_or_create_queue(dto, security)
-        if( rc.failed ) {
-          return rc.map_success(_=>Zilch)
-        }
-        Some(rc.success)
-      }
-
-      _get_or_create_destination(destination.name, security) match {
-        case Success(node)=>
-          if( node.unified || topic ) {
-            node.add_broadcast_producer( route )
-          } else {
-            route.bind( queue.toList )
-          }
-          route.connected()
-          Success(Zilch)
-
-        case Failure(reason)=>
-          Failure(reason)
-      }
-    }
-
-    dispatchQueue {
-      do_connect.failure_option.foreach(x=> producer.dispatch_queue { completed(Failure(x)) } )
-    }
-
-  }
-
-  def disconnect(route:DeliveryProducerRoute) = dispatchQueue {
-    _get_destination(route.destination.name).foreach { node=>
-      val topic = is_topic(route.destination)
-      if( node.unified || topic ) {
-        node.remove_broadcast_producer(route)
-      }
-    }
-    route.disconnected()
-    route.release
-  }
-
-}
-
-/**
- * Tracks state associated with a destination name.
- */
-class RoutingNode(val router:Router, val name:Path, val config:DestinationDTO) {
-
-  val id = router.destination_id_counter.incrementAndGet
-
-  var broadcast_producers = ListBuffer[DeliveryProducerRoute]()
-  var broadcast_consumers = ListBuffer[DeliveryConsumer]()
-  var queues = ListBuffer[Queue]()
-
-  import OptionSupport._
-
-  def unified = config.unified.getOrElse(false)
-  def slow_consumer_policy = config.slow_consumer_policy.getOrElse("block")
-
-  var consumer_proxies = Map[DeliveryConsumer, DeliveryConsumer]()
-
-  def add_broadcast_consumer (consumer:DeliveryConsumer) = {
-
-    var target = consumer
-    slow_consumer_policy match {
-      case "queue" =>
-
-        // create a temp queue so that it can spool
-        val queue = router._create_queue(-1, new TempBinding(consumer), null).success
-        queue.dispatch_queue.setTargetQueue(consumer.dispatch_queue)
-        queue.bind(List(consumer))
-
-        consumer_proxies += consumer->queue
-        target = queue
-
-      case "block" =>
-        // just have dispatcher dispatch directly to them..
-    }
-
-    broadcast_consumers += target
-    val list = target :: Nil
-    broadcast_producers.foreach({ r=>
-      r.bind(list)
-    })
-  }
-
-  def remove_broadcast_consumer (consumer:DeliveryConsumer) = {
-
-    var target = consumer_proxies.get(consumer).getOrElse(consumer)
-
-    broadcast_consumers = broadcast_consumers.filterNot( _ == target )
-
-    val list = target :: Nil
-    broadcast_producers.foreach({ r=>
-      r.unbind(list)
-    })
-
-    target match {
-      case queue:Queue=>
-        val binding = new TempBinding(consumer)
-        if( queue.binding == binding ) {
-          queue.unbind(List(consumer))
-          router._destroy_queue(queue.id, null)
-        }
-      case _ =>
-    }
-  }
-
-  def add_broadcast_producer (producer:DeliveryProducerRoute) = {
-    broadcast_producers += producer
-    producer.bind(broadcast_consumers.toList)
-  }
-
-  def remove_broadcast_producer (producer:DeliveryProducerRoute) = {
-    broadcast_producers = broadcast_producers.filterNot( _ == producer )
-    producer.unbind(broadcast_consumers.toList)
-  }
-
-  def add_queue (queue:Queue) = {
-    queue.binding.bind(this, queue)
-    queues += queue
-  }
-
-  def remove_queue (queue:Queue) = {
-    queues = queues.filterNot( _ == queue )
-    queue.binding.unbind(this, queue)
-  }
+  def disconnect(destinations:Array[DestinationDTO], producer:BindableDeliveryProducer)
 
 }
 
 /**
+ * An object which produces deliveries to which allows new DeliveryConsumer
+ * object to bind so they can also receive those deliveries.
+ *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-trait Route extends Retained {
+trait BindableDeliveryProducer extends DeliveryProducer with Retained {
 
   def dispatch_queue:DispatchQueue
-  val metric = new AtomicLong();
 
   def bind(targets:List[DeliveryConsumer]):Unit
   def unbind(targets:List[DeliveryConsumer]):Unit
-  
+
   def connected():Unit
   def disconnected():Unit
 
 }
 
+object DeliveryProducerRoute extends Log
+
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-case class DeliveryProducerRoute(val router:Router, val destination:Destination, val producer:DeliveryProducer) extends BaseRetained with Route with Sink[Delivery] with DispatchLogging {
+// case class DeliveryProducerRoute(val router:Router, val destination:DestinationDTO, val path:Path, val producer:DeliveryProducer, val security:SecurityContext) extends BaseRetained with Route with Sink[Delivery] with DispatchLogging {
+abstract class DeliveryProducerRoute(val router:Router) extends BaseRetained with BindableDeliveryProducer with Sink[Delivery] {
 
-  override protected def log = Router
-  override def dispatch_queue = producer.dispatch_queue
+  import DeliveryProducerRoute._
 
   // Retain the queue while we are retained.
   dispatch_queue.retain
@@ -523,19 +84,15 @@ case class DeliveryProducerRoute(val rou
     on_connected
   }
 
-  def bind(targets:List[DeliveryConsumer]) = {
-    targets.foreach(_.retain)
+  def bind(consumers:List[DeliveryConsumer]) = {
+    consumers.foreach(_.retain)
     dispatch_queue {
-      internal_bind(targets)
-    }
-  }
-
-  private def internal_bind(values:List[DeliveryConsumer]) = {
-    values.foreach{ x=>
-      debug("producer route attaching to conusmer.")
-      val target = x.connect(producer);
-      target.refiller = drainer
-      targets ::= target
+      consumers.foreach{ x=>
+        debug("producer route attaching to conusmer.")
+        val target = x.connect(this);
+        target.refiller = drainer
+        targets ::= target
+      }
     }
   }
 
@@ -561,8 +118,7 @@ case class DeliveryProducerRoute(val rou
     this.targets.foreach { x=>
       debug("producer route detaching from conusmer.")
       x.close
-      x.consumer.release
-    }    
+    }
   }
 
   protected def on_connected = {}

Added: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1063582&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala Wed Jan 26 03:10:35 2011
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker
+
+import org.apache.activemq.apollo.util._
+import scala.collection.immutable.List
+import org.apache.activemq.apollo.util.path.Path
+import org.apache.activemq.apollo.dto._
+import security.SecurityContext
+import collection.mutable.{HashMap, ListBuffer}
+
+/**
+ * <p>
+ * A logical messaging topic
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class Topic(val router:LocalRouter, val name:String, val config:TopicDTO, val id:Long) extends DomainDestination {
+
+  var producers = ListBuffer[BindableDeliveryProducer]()
+  var consumers = ListBuffer[DeliveryConsumer]()
+  var durable_subscriptions = ListBuffer[Queue]()
+  var consumer_queues = HashMap[DeliveryConsumer, Queue]()
+
+  import OptionSupport._
+
+  def slow_consumer_policy = config.slow_consumer_policy.getOrElse("block")
+
+  def can_bind(destination: DestinationDTO, consumer:DeliveryConsumer, security:SecurityContext) = {
+    val authorizer = router.host.authorizer
+    if( authorizer!=null && security!=null && !authorizer.can_receive_from(security, router.host, config) ) {
+      false
+    } else {
+      true
+    }
+  }
+
+  def is_same_ds(sub1:DurableSubscriptionDestinationDTO, sub2:DurableSubscriptionDestinationDTO) = {
+    (sub1.client_id, sub1.subscription_id) == (sub2.client_id, sub2.subscription_id)
+  }
+
+  def bind (destination: DestinationDTO, consumer:DeliveryConsumer) = {
+    destination match {
+      case null=> // unified queue case
+
+        consumers += consumer
+        val list = List(consumer)
+        producers.foreach({ r=>
+          r.bind(list)
+        })
+
+      case destination:TopicDestinationDTO=>
+        var target = consumer
+        slow_consumer_policy match {
+          case "queue" =>
+
+            // create a temp queue so that it can spool
+            val queue = router._create_queue(-1, new TempQueueBinding(consumer), new QueueDTO)
+            queue.dispatch_queue.setTargetQueue(consumer.dispatch_queue)
+            queue.bind(List(consumer))
+
+            consumer_queues += consumer->queue
+            target = queue
+
+          case "block" =>
+            // just have dispatcher dispatch directly to them..
+        }
+
+        consumers += target
+        val list = target :: Nil
+        producers.foreach({ r=>
+          r.bind(list)
+        })
+
+      case destination:DurableSubscriptionDestinationDTO=>
+
+        val queue = router.topic_domain.get_or_create_durable_subscription(destination)
+        if( !durable_subscriptions.contains(queue) ) {
+          durable_subscriptions += queue
+          val list = List(queue)
+          producers.foreach({ r=>
+            r.bind(list)
+          })
+        }
+
+        // Typically durable subs are only consumed by on connection at a time. So collocate the
+        // queue onto the consumer's dispatch queue.
+        queue.dispatch_queue.setTargetQueue(consumer.dispatch_queue)
+        queue.bind(destination, consumer)
+        consumer_queues += consumer->queue
+    }
+  }
+
+  def unbind (consumer:DeliveryConsumer, persistent:Boolean) = {
+
+    consumer_queues.remove(consumer) match {
+      case Some(queue)=>
+
+        queue.unbind(List(consumer))
+
+        queue.binding match {
+          case x:TempQueueBinding =>
+
+            val list = List(queue)
+            producers.foreach({ r=>
+              r.unbind(list)
+            })
+            router._destroy_queue(queue.id, null)
+
+          case x:DurableSubscriptionQueueBinding =>
+            if( persistent ) {
+              router.topic_domain.destroy_durable_subscription(queue)
+            }
+        }
+
+      case None=>
+
+        // producers are directly delivering to the consumer..
+        val original = consumers.size
+        consumers -= consumer
+        if( original!= consumers.size ) {
+          val list = List(consumer)
+          producers.foreach({ r=>
+            r.unbind(list)
+          })
+        }
+    }
+
+  }
+
+  def bind_durable_subscription(destination: DurableSubscriptionDestinationDTO, queue:Queue)  = {
+    if( !durable_subscriptions.contains(queue) ) {
+      durable_subscriptions += queue
+      val list = List(queue)
+      producers.foreach({ r=>
+        r.bind(list)
+      })
+      consumer_queues.foreach{case (consumer, q)=>
+        if( q==queue ) {
+          bind(destination, consumer)
+        }
+      }
+    }
+  }
+
+  def unbind_durable_subscription(destination: DurableSubscriptionDestinationDTO, queue:Queue)  = {
+    if( durable_subscriptions.contains(queue) ) {
+      durable_subscriptions -= queue
+      val list = List(queue)
+      producers.foreach({ r=>
+        r.unbind(list)
+      })
+      consumer_queues.foreach{case (consumer, q)=>
+        if( q==queue ) {
+          unbind(consumer, false)
+        }
+      }
+    }
+  }
+
+  def can_connect(destination:DestinationDTO, producer:BindableDeliveryProducer, security:SecurityContext):Boolean = {
+    val authorizer = router.host.authorizer
+    if( authorizer!=null && security!=null && !authorizer.can_send_to(security, router.host, config) ) {
+      false
+    } else {
+      true
+    }
+  }
+
+  def connect (destination:DestinationDTO, producer:BindableDeliveryProducer) = {
+    producers += producer
+    producer.bind(consumers.toList ::: durable_subscriptions.toList)
+  }
+
+  def disconnect (producer:BindableDeliveryProducer) = {
+    producers = producers.filterNot( _ == producer )
+    producer.unbind(consumers.toList ::: durable_subscriptions.toList)
+  }
+
+}