You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:40:20 UTC

svn commit: r961068 [1/4] - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/ activemq-broker/src/main/java/org/apache/activemq/apollo/ activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ activemq-broker/src/main/java/org/apache...

Author: chirino
Date: Wed Jul  7 03:40:18 2010
New Revision: 961068

URL: http://svn.apache.org/viewvc?rev=961068&view=rev
Log:
converting broker module to be scala based

Added:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ScalaBroker.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/TransactionManager.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/XidImpl.java
      - copied, changed from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XidImpl.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/AnyChildPathNode.java
      - copied, changed from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/AnyChildPathNode.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathFilter.java
      - copied, changed from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathFilter.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathMap.java
      - copied, changed from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathMap.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathMapEntry.java
      - copied, changed from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathMapEntry.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathMapNode.java
      - copied, changed from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathMapNode.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathNode.java
      - copied, changed from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathNode.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathSupport.java
      - copied, changed from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathSupport.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PrefixPathFilter.java
      - copied, changed from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PrefixPathFilter.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/SimplePathFilter.java
      - copied, changed from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/SimplePathFilter.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/WildcardPathFilter.java
      - copied, changed from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/WildcardPathFilter.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/package.html
      - copied, changed from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/transport/vm/package.html
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/path/
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/path/PathMapMemoryTest.java
      - copied, changed from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/apollo/broker/path/PathMapMemoryTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/path/PathMapTest.java
      - copied, changed from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/apollo/broker/path/PathMapTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/transport/
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/transport/vm/
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/transport/vm/VMTransportTest.java
      - copied, changed from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/apollo/transport/vm/VMTransportTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/BrokerTest.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/BrokerTestBase.java
      - copied, changed from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Domain.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/SharedQueueTest.java
      - copied, changed from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java
Removed:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/Connection.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerAware.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerConnection.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerFactory.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerSubscription.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/CompositeSubscription.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DeliveryTarget.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Destination.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Domain.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DurableSubscription.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/LocalTransaction.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDelivery.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDeliveryWrapper.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ProtocolHandler.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ProtocolHandlerFactory.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Queue.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Transaction.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TransactionManager.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/WildcardQueueSubscription.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XATransaction.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XidImpl.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/AnyChildPathNode.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathFilter.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathMap.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathMapEntry.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathMapNode.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathNode.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathSupport.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PrefixPathFilter.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/SimplePathFilter.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/WildcardPathFilter.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/transport/vm/VMTransportFactory.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/transport/vm/package.html
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/apollo/broker/path/PathMapMemoryTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/apollo/broker/path/PathMapTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/apollo/transport/vm/VMTransportTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteConsumer.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java
Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/pom.xml
    activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/JAXBBrokerFactory.java
    activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/MemoryStoreXml.java
    activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/VirtualHostXml.java
    activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/test/java/org/apache/activemq/apollo/jaxb/JAXBConfigTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/pom.xml?rev=961068&r1=961067&r2=961068&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/pom.xml Wed Jul  7 03:40:18 2010
@@ -35,11 +35,26 @@
   
     <dependency>
       <groupId>org.fusesource.hawtdispatch</groupId>
-      <artifactId>hawtdispatch</artifactId>
+      <artifactId>hawtdispatch-scala</artifactId>
       <version>${hawtdispatch-version}</version>
     </dependency>
 
     <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-library</artifactId>
+      <scope>compile</scope>
+      <version>${scala-version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-compiler</artifactId>
+      <version>${scala-version}</version>
+      <scope>compile</scope>
+      <optional>true</optional>
+    </dependency>
+
+    <dependency>
       <groupId>org.apache.activemq</groupId>
       <artifactId>activemq-transport</artifactId>
     </dependency>
@@ -78,6 +93,10 @@
 
   <build>
   
+    <defaultGoal>install</defaultGoal>
+    <sourceDirectory>src/main/scala</sourceDirectory>
+    <testSourceDirectory>src/test/scala</testSourceDirectory>
+
     <resources>
       <resource>
         <directory>target/schema</directory>
@@ -108,6 +127,43 @@
         </executions>
       </plugin>
 
+      <plugin>
+        <groupId>org.scala-tools</groupId>
+        <artifactId>maven-scala-plugin</artifactId>
+        <version>2.13.1</version>
+        <executions>
+          <execution>
+            <goals>
+              <goal>compile</goal>
+              <goal>testCompile</goal>
+            </goals>
+          </execution>
+        </executions>
+        <configuration>
+          <jvmArgs>
+            <jvmArg>-Xmx1024m</jvmArg>
+          </jvmArgs>
+          <args>
+            <arg>-deprecation</arg>
+            <arg>-Xno-varargs-conversion</arg>
+          </args>
+          <scalaVersion>${scala-version}</scalaVersion>
+        </configuration>
+      </plugin>
+      
+      <plugin>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <version>2.4.3</version>
+        <configuration>
+          <!-- we must turn off the use of system class loader so our tests can find stuff - otherwise ScalaSupport compiler can't find stuff -->
+          <useSystemClassLoader>false</useSystemClassLoader>
+          <!--forkMode>pertest</forkMode-->
+          <childDelegation>false</childDelegation>
+          <useFile>true</useFile>
+          <failIfNoTests>false</failIfNoTests>
+        </configuration>
+      </plugin>
+
     </plugins>
   </build>
 

Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.scala?rev=961068&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.scala Wed Jul  7 03:40:18 2010
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker
+
+import _root_.java.util.concurrent.atomic.AtomicLong
+import _root_.org.apache.activemq.util.buffer._
+import _root_.org.fusesource.hawtdispatch._
+import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
+
+import java.util.HashMap
+import collection.JavaConversions
+import path.PathMap
+
+object Domain {
+  val TOPIC_DOMAIN = new AsciiBuffer("topic");
+  val QUEUE_DOMAIN = new AsciiBuffer("queue");
+  val TEMP_TOPIC_DOMAIN = new AsciiBuffer("temp-topic");
+  val TEMP_QUEUE_DOMAIN = new AsciiBuffer("temp-queue");
+}
+
+import Domain._
+class Domain {
+
+  val targets = new PathMap[DeliveryTarget]();
+
+  def bind(name:AsciiBuffer, queue:DeliveryTarget) = {
+    targets.put(name, queue);
+  }
+
+  def unbind(name:AsciiBuffer, queue:DeliveryTarget) = {
+    targets.remove(name, queue);
+  }
+
+//
+//  synchronized public Collection<DeliveryTarget> route(AsciiBuffer name, MessageDelivery delivery) {
+//    return targets.get(name);
+//  }
+
+}
+
+
+/**
+ * Provides a non-blocking concurrent producer to consumer
+ * routing implementation.
+ *
+ * DeliveryProducers create a route object for each destination
+ * they will be producing to.  Once the route is
+ * connected to the router, the producer can use
+ * the route.targets list without synchronization to
+ * get the current set of consumers that are bound
+ * to the destination. 
+ *
+ */
+class Router(var queue:DispatchQueue) {
+  
+  trait DestinationNode {
+    var targets = List[DeliveryTarget]()
+    var routes = List[DeliveryProducerRoute]()
+
+    def on_bind(x:List[DeliveryTarget]):Unit
+    def on_unbind(x:List[DeliveryTarget]):Boolean
+    def on_connect(route:DeliveryProducerRoute):Unit
+    def on_disconnect(route:DeliveryProducerRoute):Boolean = {
+      routes = routes.filterNot({r=> route==r})
+      route.disconnected()
+      routes == Nil && targets == Nil
+    }
+  }
+
+  class TopicDestinationNode extends DestinationNode {
+    def on_bind(x:List[DeliveryTarget]) =  {
+      targets = x ::: targets
+      routes.foreach({r=>
+        r.bind(x)
+      })
+    }
+
+    def on_unbind(x:List[DeliveryTarget]):Boolean = {
+      targets = targets.filterNot({t=>x.contains(t)})
+      routes.foreach({r=>
+        r.unbind(x)
+      })
+      routes == Nil && targets == Nil
+    }
+
+    def on_connect(route:DeliveryProducerRoute) = {
+      routes = route :: routes
+      route.connected(targets)
+    }
+  }
+
+  class QueueDestinationNode(destination:Destination) extends DestinationNode {
+    val queue = new Queue(destination)
+
+    def on_bind(x:List[DeliveryTarget]) =  {
+      targets = x ::: targets
+      queue.bind(x)
+    }
+
+    def on_unbind(x:List[DeliveryTarget]):Boolean = {
+      targets = targets.filterNot({t=>x.contains(t)})
+      queue.unbind(x)
+      routes == Nil && targets == Nil
+    }
+
+    def on_connect(route:DeliveryProducerRoute) = {
+      routes = route :: routes
+      route.connected(queue :: Nil)
+    }
+  }
+
+  var destinations = new HashMap[Destination, DestinationNode]()
+
+  private def get(destination:Destination):DestinationNode = {
+    var result = destinations.get(destination)
+    if( result ==null ) {
+      if( isTopic(destination) ) {
+        result = new TopicDestinationNode
+      } else {
+        result = new QueueDestinationNode(destination)
+      }
+      destinations.put(destination, result)
+    }
+    result
+  }
+
+  def bind(destination:Destination, targets:List[DeliveryTarget]) = retaining(targets) {
+      get(destination).on_bind(targets)
+    } ->: queue
+
+  def unbind(destination:Destination, targets:List[DeliveryTarget]) = releasing(targets) {
+      if( get(destination).on_unbind(targets) ) {
+        destinations.remove(destination)
+      }
+    } ->: queue
+
+  def connect(destination:Destination, routeQueue:DispatchQueue, producer:DeliveryProducer)(completed: (DeliveryProducerRoute)=>Unit) = {
+    val route = new DeliveryProducerRoute(destination, routeQueue, producer) {
+      override def on_connected = {
+        completed(this);
+      }
+    }
+    ^ {
+      get(destination).on_connect(route)
+    } ->: queue
+  }
+
+  def isTopic(destination:Destination) = destination.getDomain == TOPIC_DOMAIN
+  def isQueue(destination:Destination) = !isTopic(destination)
+
+  def disconnect(route:DeliveryProducerRoute) = releasing(route) {
+      get(route.destination).on_disconnect(route)
+    } ->: queue
+
+
+   def each(proc:(Destination, DestinationNode)=>Unit) = {
+     import JavaConversions._;
+     for( (destination, node) <- destinations ) {
+        proc(destination, node)
+     }
+   }
+
+}
+
+trait Route extends Retained {
+
+  val destination:Destination
+  val queue:DispatchQueue
+  val metric = new AtomicLong();
+
+  def connected(targets:List[DeliveryTarget]):Unit
+  def bind(targets:List[DeliveryTarget]):Unit
+  def unbind(targets:List[DeliveryTarget]):Unit
+  def disconnected():Unit
+
+}
+
+class DeliveryProducerRoute(val destination:Destination, val queue:DispatchQueue, val producer:DeliveryProducer) extends BaseRetained with Route {
+
+
+  // Retain the queue while we are retained.
+  queue.retain
+  setDisposer(^{
+    queue.release
+  })
+
+  var targets = List[DeliveryTargetSession]()
+
+  def connected(targets:List[DeliveryTarget]) = retaining(targets) {
+    internal_bind(targets)
+    on_connected
+  } ->: queue
+
+  def bind(targets:List[DeliveryTarget]) = retaining(targets) {
+    internal_bind(targets)
+  } ->: queue
+
+  private def internal_bind(values:List[DeliveryTarget]) = {
+    values.foreach{ x=>
+      targets = x.open_session(queue) :: targets
+    }
+  }
+
+  def unbind(targets:List[DeliveryTarget]) = releasing(targets) {
+    this.targets = this.targets.filterNot { x=>
+      val rc = targets.contains(x.consumer)
+      if( rc ) {
+        x.close
+      }
+      rc
+    }
+  } ->: queue
+
+  def disconnected() = ^ {
+    this.targets.foreach { x=>
+      x.close
+      x.consumer.release
+    }    
+  } ->: queue
+
+  protected def on_connected = {}
+  protected def on_disconnected = {}
+
+}

Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=961068&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala Wed Jul  7 03:40:18 2010
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker
+
+import _root_.java.beans.ExceptionListener
+import _root_.java.io.{IOException}
+import _root_.java.util.{LinkedHashMap, HashMap}
+import _root_.org.apache.activemq.filter.{BooleanExpression}
+import _root_.org.apache.activemq.transport._
+import _root_.org.apache.activemq.Service
+import _root_.java.lang.{String}
+import _root_.org.apache.activemq.util.{FactoryFinder, IOExceptionSupport}
+import _root_.org.apache.activemq.wireformat.WireFormat
+import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
+class ConnectionConfig {
+
+}
+abstract class Connection() extends TransportListener with Service {
+
+  val q = createQueue("connection")
+  var name = "connection"
+  var stopping = false;
+
+  var transport:Transport = null
+  var exceptionListener:ExceptionListener = null;
+
+  def start() = {
+    transport.setDispatchQueue(q);
+    transport.getDispatchQueue.release
+    transport.setTransportListener(this);
+    transport.start()
+  }
+
+  def stop() = {
+    stopping=true
+    transport.stop()
+    q.release
+  }
+
+  def onException(error:IOException) = {
+      if (!stopping) {
+          onFailure(error);
+      }
+  }
+
+  def onFailure(error:Exception) = {
+      if (exceptionListener != null) {
+          exceptionListener.exceptionThrown(error);
+      }
+  }
+
+  def onDisconnected() = {
+  }
+
+  def onConnected() = {
+  }
+
+}
+
+object BrokerConnection extends Log
+
+class BrokerConnection(val broker: Broker) extends Connection with Logging {
+  override protected def log = BrokerConnection
+
+  var protocolHandler: ProtocolHandler = null;
+
+  exceptionListener = new ExceptionListener() {
+    def exceptionThrown(error:Exception) = {
+      info("Transport failed before messaging protocol was initialized.", error);
+      stop()
+    }
+  }
+
+
+  def onCommand(command: Object) = {
+    if (protocolHandler != null) {
+      protocolHandler.onCommand(command);
+    } else {
+      try {
+        var wireformat:WireFormat = null;
+
+        if (command.isInstanceOf[WireFormat]) {
+
+          // First command might be from the wire format decriminator, letting
+          // us know what the actually wireformat is.
+          wireformat = command.asInstanceOf[WireFormat];
+
+          try {
+            protocolHandler = ProtocolHandlerFactory.createProtocolHandler(wireformat.getName());
+          } catch {
+            case e:Exception=>
+            throw IOExceptionSupport.create("No protocol handler available for: " + wireformat.getName(), e);
+          }
+
+          protocolHandler.setConnection(this);
+          protocolHandler.setWireFormat(wireformat);
+          protocolHandler.start();
+
+          exceptionListener = new ExceptionListener() {
+            def exceptionThrown(error:Exception) {
+              protocolHandler.onException(error);
+            }
+          }
+          protocolHandler.onCommand(command);
+
+        } else {
+          throw new IOException("First command should be a WireFormat");
+        }
+
+      } catch {
+        case e:Exception =>
+        onFailure(e);
+      }
+    }
+  }
+
+  override def stop() = {
+    super.stop();
+    if (protocolHandler != null) {
+      protocolHandler.stop();
+    }
+  }
+}
+
+
+object ProtocolHandlerFactory {
+    val PROTOCOL_HANDLER_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/broker/protocol/");
+
+    def createProtocolHandler(protocol:String) = {
+        PROTOCOL_HANDLER_FINDER.newInstance(protocol).asInstanceOf[ProtocolHandler]
+    }
+}
+
+trait ProtocolHandler extends Service {
+
+    def onCommand(command:Any);
+    def setConnection(brokerConnection:BrokerConnection);
+    def setWireFormat(wireformat:WireFormat);
+    def onException(error:Exception);
+
+// TODO:
+//    public void setConnection(BrokerConnection connection);
+//
+//    public BrokerConnection getConnection();
+//
+//    public void onCommand(Object command);
+//
+//    public void onException(Exception error);
+//
+//    public void setWireFormat(WireFormat wf);
+//
+//    public BrokerMessageDelivery createMessageDelivery(MessageRecord record) throws IOException;
+//
+//    /**
+//     * ClientContext
+//     * <p>
+//     * Description: Base interface describing a channel on a physical
+//     * connection.
+//     * </p>
+//     *
+//     * @author cmacnaug
+//     * @version 1.0
+//     */
+//    public interface ClientContext {
+//        public ClientContext getParent();
+//
+//        public Collection<ClientContext> getChildren();
+//
+//        public void addChild(ClientContext context);
+//
+//        public void removeChild(ClientContext context);
+//
+//        public void close();
+//
+//    }
+//
+//    public abstract class AbstractClientContext<E extends MessageDelivery> extends AbstractLimitedFlowResource<E> implements ClientContext {
+//        protected final HashSet<ClientContext> children = new HashSet<ClientContext>();
+//        protected final ClientContext parent;
+//        protected boolean closed = false;
+//
+//        public AbstractClientContext(String name, ClientContext parent) {
+//            super(name);
+//            this.parent = parent;
+//            if (parent != null) {
+//                parent.addChild(this);
+//            }
+//        }
+//
+//        public ClientContext getParent() {
+//            return parent;
+//        }
+//
+//        public void addChild(ClientContext child) {
+//            if (!closed) {
+//                children.add(child);
+//            }
+//        }
+//
+//        public void removeChild(ClientContext child) {
+//            if (!closed) {
+//                children.remove(child);
+//            }
+//        }
+//
+//        public Collection<ClientContext> getChildren() {
+//            return children;
+//        }
+//
+//        public void close() {
+//
+//            closed = true;
+//
+//            for (ClientContext c : children) {
+//                c.close();
+//            }
+//
+//            if (parent != null) {
+//                parent.removeChild(this);
+//            }
+//
+//            super.close();
+//        }
+//    }
+//
+}
+
+trait ConsumerContext { // extends ClientContext, Subscription<MessageDelivery>, IFlowSink<MessageDelivery> {
+
+    def getConsumerId() : String
+
+    def getDestination(): Destination
+
+    def getSelector() : String
+
+    def getSelectorExpression() : BooleanExpression
+
+    def isDurable() : Boolean
+
+    def getSubscriptionName() : String
+
+    /**
+     * If the destination does not exist, should it automatically be
+     * created?
+     *
+     * @return
+     */
+    def autoCreateDestination():Boolean
+
+    def isPersistent() : Boolean
+
+}
+

Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=961068&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jul  7 03:40:18 2010
@@ -0,0 +1,568 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker
+
+import _root_.java.util.{LinkedList, LinkedHashMap, HashMap}
+import _root_.org.apache.activemq.filter.{MessageEvaluationContext}
+import _root_.java.lang.{String}
+import _root_.org.apache.activemq.util.buffer.{Buffer, AsciiBuffer}
+import _root_.org.fusesource.hawtdispatch._
+import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
+
+trait DeliveryProducer {
+  def collocate(queue:DispatchQueue):Unit
+}
+
+trait DeliveryTargetSession {
+  val consumer:DeliveryTarget
+  def deliver(delivery:MessageDelivery)
+  def close:Unit
+}
+trait DeliveryTarget extends Retained {
+  def matches(message:MessageDelivery)
+  val queue:DispatchQueue;
+  def open_session(producer_queue:DispatchQueue):DeliveryTargetSession
+}
+
+/**
+ * Abstracts wire protocol message implementations.  Each wire protocol
+ * will provide it's own type of Message.
+ */
+trait Message {
+
+  /**
+   * the globally unique id of the message
+   */
+  def id: AsciiBuffer
+
+  /**
+   * the globally unique id of the producer
+   */
+  def producer: AsciiBuffer
+
+  /**
+   *  the message priority.
+   */
+  def priority:Byte
+
+  /**
+   * a positive value indicates that the delivery has an expiration
+   * time.
+   */
+  def expiration: Long
+
+  /**
+   * true if the delivery is persistent
+   */
+  def persistent: Boolean
+
+  /**
+   * where the message was sent to.
+   */
+  def destination: Destination
+
+  /**
+   * used to apply a selector against the message.
+   */
+  def messageEvaluationContext:MessageEvaluationContext
+
+}
+
+object MessageDelivery {
+  def apply(o:MessageDelivery) = new MessageDelivery(o.message, o.encoded, o.encoding, o.size, o.ack, o.tx_id, o.store_id)
+}
+
+case class MessageDelivery (
+
+        /**
+         *  the message being delivered
+         */
+  message: Message,
+
+  /**
+   * the encoded form of the message being delivered.
+   */
+  encoded: Buffer,
+
+  /**
+   * the encoding format of the message
+   */
+  encoding: String,
+
+  /**
+   * memory size of the delivery.  Used for resource allocation tracking
+   */
+  size:Int,
+
+  /**
+   *  true if this delivery requires acknowledgment.
+   */
+  ack:Boolean,
+
+  /**
+   * The id used to identify the transaction that the message
+   * belongs to.
+   */
+  tx_id:Long,
+
+  /**
+   * The id used to identify this message in the message
+   * store.
+   *
+   * @return The store tracking or -1 if not set.
+   */
+  store_id: Long
+
+) extends BaseRetained {
+
+}
+
+//abstract class BrokerMessageDelivery extends MessageDelivery {
+// TODO:
+//    // True while the message is being dispatched to the delivery targets:
+//    boolean dispatching = false;
+//
+//    // A non null pending save indicates that the message is the
+//    // saver queue and that the message
+//    OperationContext<?> pendingSave;
+//
+//    // List of persistent targets for which the message should be saved
+//    // when dispatch is complete:
+//    HashMap<QueueDescriptor, SaveableQueueElement<MessageDelivery>> persistentTargets;
+//    SaveableQueueElement<MessageDelivery> singleTarget;
+//
+//    long storeTracking = -1;
+//    BrokerDatabase store;
+//    boolean fromStore = false;
+//    boolean enableFlushDelay = true;
+//    private int limiterSize = -1;
+//    private long tid=-1;
+//
+//    public void setFromDatabase(BrokerDatabase database, MessageRecord mRecord) {
+//        fromStore = true;
+//        store = database;
+//        storeTracking = mRecord.getKey();
+//        limiterSize = mRecord.getSize();
+//    }
+//
+//    public final int getFlowLimiterSize() {
+//        if (limiterSize == -1) {
+//            limiterSize = getMemorySize();
+//        }
+//        return limiterSize;
+//    }
+//
+//    /**
+//     * When an application wishes to include a message in a broker transaction
+//     * it must set this the tid returned by {@link Transaction#getTid()}
+//     *
+//     * @param tid
+//     *            Sets the tid used to identify the transaction at the broker.
+//     */
+//    public void setTransactionId(long tid) {
+//        this.tid = tid;
+//    }
+//
+//    /**
+//     * @return The tid used to identify the transaction at the broker.
+//     */
+//    public final long getTransactionId() {
+//        return tid;
+//    }
+//
+//    public final void clearTransactionId() {
+//        tid = -1;
+//    }
+//
+//    /**
+//     * Subclass must implement this to return their current memory size
+//     * estimate.
+//     *
+//     * @return The memory size of the message.
+//     */
+//    public abstract int getMemorySize();
+//
+//    public final boolean isFromStore() {
+//        return fromStore;
+//    }
+//
+//    public final void persist(SaveableQueueElement<MessageDelivery> sqe, ISourceController<?> controller, boolean delayable) {
+//        synchronized (this) {
+//            // Can flush of this message to the store be delayed?
+//            if (enableFlushDelay && !delayable) {
+//                enableFlushDelay = false;
+//            }
+//            // If this message is being dispatched then add the queue to the
+//            // list of queues for which to save the message when dispatch is
+//            // finished:
+//            if (dispatching) {
+//                addPersistentTarget(sqe);
+//                return;
+//            }
+//            // Otherwise, if it is still in the saver queue, we can add this
+//            // queue to the queue list:
+//            else if (pendingSave != null) {
+//                addPersistentTarget(sqe);
+//                if (!delayable) {
+//                    pendingSave.requestFlush();
+//                }
+//                return;
+//            }
+//        }
+//
+//        store.saveMessage(sqe, controller, delayable);
+//    }
+//
+//    public final void acknowledge(SaveableQueueElement<MessageDelivery> sqe) {
+//        boolean firePersistListener = false;
+//        boolean deleted = false;
+//        synchronized (this) {
+//            // If the message hasn't been saved to the database
+//            // then we don't need to issue a delete:
+//            if (dispatching || pendingSave != null) {
+//
+//                deleted = true;
+//
+//                removePersistentTarget(sqe.getQueueDescriptor());
+//                // We get a save context when we place the message in the
+//                // database queue. If it has been added to the queue,
+//                // and we've removed the last queue, see if we can cancel
+//                // the save:
+//                if (pendingSave != null && !hasPersistentTargets()) {
+//                    if (pendingSave.cancel()) {
+//                        pendingSave = null;
+//                        if (isPersistent()) {
+//                            firePersistListener = true;
+//                        }
+//                    }
+//                }
+//            }
+//        }
+//
+//        if (!deleted) {
+//            store.deleteQueueElement(sqe);
+//        }
+//
+//        if (firePersistListener) {
+//            onMessagePersisted();
+//        }
+//
+//    }
+//
+//    public final void setStoreTracking(long tracking) {
+//        if (storeTracking == -1) {
+//            storeTracking = tracking;
+//        }
+//    }
+//
+//    public final void beginDispatch(BrokerDatabase database) {
+//        this.store = database;
+//        dispatching = true;
+//        setStoreTracking(database.allocateStoreTracking());
+//    }
+//
+//    public long getStoreTracking() {
+//        return storeTracking;
+//    }
+//
+//    public synchronized Collection<SaveableQueueElement<MessageDelivery>> getPersistentQueues() {
+//        if (singleTarget != null) {
+//            ArrayList<SaveableQueueElement<MessageDelivery>> list = new ArrayList<SaveableQueueElement<MessageDelivery>>(1);
+//            list.add(singleTarget);
+//            return list;
+//        } else if (persistentTargets != null) {
+//            return persistentTargets.values();
+//        }
+//        return null;
+//    }
+//
+//    public void beginStore() {
+//        synchronized (this) {
+//            pendingSave = null;
+//        }
+//    }
+//
+//    private final boolean hasPersistentTargets() {
+//        return (persistentTargets != null && !persistentTargets.isEmpty()) || singleTarget != null;
+//    }
+//
+//    private final void removePersistentTarget(QueueDescriptor queue) {
+//        if (persistentTargets != null) {
+//            persistentTargets.remove(queue);
+//            return;
+//        }
+//
+//        if (singleTarget != null && singleTarget.getQueueDescriptor().equals(queue)) {
+//            singleTarget = null;
+//        }
+//    }
+//
+//    private final void addPersistentTarget(SaveableQueueElement<MessageDelivery> elem) {
+//        if (persistentTargets != null) {
+//            persistentTargets.put(elem.getQueueDescriptor(), elem);
+//            return;
+//        }
+//
+//        if (singleTarget == null) {
+//            singleTarget = elem;
+//            return;
+//        }
+//
+//        if (elem.getQueueDescriptor() != singleTarget.getQueueDescriptor()) {
+//            persistentTargets = new HashMap<QueueDescriptor, SaveableQueueElement<MessageDelivery>>();
+//            persistentTargets.put(elem.getQueueDescriptor(), elem);
+//            persistentTargets.put(singleTarget.getQueueDescriptor(), singleTarget);
+//            singleTarget = null;
+//        }
+//    }
+//
+//    public final void finishDispatch(ISourceController<?> controller) throws IOException {
+//        boolean firePersistListener = false;
+//        synchronized (this) {
+//            // If any of the targets requested save then save the message
+//            // Note that this could be the case even if the message isn't
+//            // persistent if a target requested that the message be spooled
+//            // for some other reason such as queue memory overflow.
+//            if (hasPersistentTargets()) {
+//                pendingSave = store.persistReceivedMessage(this, controller);
+//            }
+//
+//            // If none of the targets required persistence, then fire the
+//            // persist listener:
+//            if (pendingSave == null || !isPersistent()) {
+//                firePersistListener = true;
+//            }
+//            dispatching = false;
+//        }
+//
+//        if (firePersistListener) {
+//            onMessagePersisted();
+//        }
+//    }
+//
+//    public final MessageRecord createMessageRecord() {
+//
+//        MessageRecord record = new MessageRecord();
+//        record.setEncoding(getStoreEncoding());
+//        record.setBuffer(getStoreEncoded());
+//        record.setStreamKey((long) 0);
+//        record.setMessageId(getMsgId());
+//        record.setSize(getFlowLimiterSize());
+//        record.setKey(getStoreTracking());
+//        return record;
+//    }
+//
+//    /**
+//     * @return A buffer representation of the message to be stored in the store.
+//     * @throws
+//     */
+//    protected abstract Buffer getStoreEncoded();
+//
+//    /**
+//     * @return The encoding scheme used to store the message.
+//     */
+//    protected abstract AsciiBuffer getStoreEncoding();
+//
+//    public boolean isFlushDelayable() {
+//        // TODO Auto-generated method stub
+//        return enableFlushDelay;
+//    }
+//}
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class DeliveryBuffer(var maxSize:Int=1024*32) {
+
+  var deliveries = new LinkedList[MessageDelivery]()
+  private var size = 0
+  var eventHandler: Runnable = null
+
+  def full = size >= maxSize
+
+  def drain = eventHandler.run
+
+  def receive = deliveries.poll
+
+  def isEmpty = deliveries.isEmpty
+
+  def send(delivery:MessageDelivery):Unit = {
+    delivery.retain
+    size += delivery.size
+    deliveries.addLast(delivery)
+    if( deliveries.size == 1 ) {
+      drain
+    }
+  }
+
+  def ack(delivery:MessageDelivery) = {
+    // When a message is delivered to the consumer, we release
+    // used capacity in the outbound queue, and can drain the inbound
+    // queue
+    val wasBlocking = full
+    size -= delivery.size
+    delivery.release
+    if( !isEmpty ) {
+      drain
+    }
+  }
+
+}
+
+class DeliveryOverflowBuffer(val delivery_buffer:DeliveryBuffer) {
+
+  private var overflow = new LinkedList[MessageDelivery]()
+
+  protected def drainOverflow:Unit = {
+    while( !overflow.isEmpty && !full ) {
+      val delivery = overflow.removeFirst
+      delivery.release
+      send_to_delivery_queue(delivery)
+    }
+  }
+
+  def send(delivery:MessageDelivery) = {
+    if( full ) {
+      // Deliveries in the overflow queue is remain acquired by us so that
+      // producer that sent it to us gets flow controlled.
+      delivery.retain
+      overflow.addLast(delivery)
+    } else {
+      send_to_delivery_queue(delivery)
+    }
+  }
+
+  protected def send_to_delivery_queue(value:MessageDelivery) = {
+    var delivery = MessageDelivery(value)
+    delivery.setDisposer(^{
+      drainOverflow
+    })
+    delivery_buffer.send(delivery)
+    delivery.release
+  }
+
+  def full = delivery_buffer.full
+
+}
+
+class DeliveryCreditBufferProtocol(val delivery_buffer:DeliveryBuffer, val queue:DispatchQueue) extends BaseRetained {
+
+  var sessions = List[CreditServer]()
+
+  var session_min_credits = 1024*4;
+  var session_credit_capacity = 1024*32
+  var session_max_credits = session_credit_capacity;
+
+  queue.retain
+  setDisposer(^{
+    source.release
+    queue.release
+  })
+
+  // use a event aggregating source to coalesce multiple events from the same thread.
+  val source = createSource(new ListEventAggregator[MessageDelivery](), queue)
+  source.setEventHandler(^{drain_source});
+  source.resume
+
+  def drain_source = {
+    val deliveries = source.getData
+    deliveries.foreach { delivery=>
+      delivery_buffer.send(delivery)
+      delivery.release
+    }
+  }
+
+
+  class CreditServer(val producer_queue:DispatchQueue) {
+    private var _capacity = 0
+
+    def capacity(value:Int) = {
+      val change = value - _capacity;
+      _capacity = value;
+      client.credit(change)
+    }
+
+    def drain(callback:Runnable) = {
+      client.drain(callback)
+    }
+
+    val client = new CreditClient()
+
+    class CreditClient() extends DeliveryOverflowBuffer(delivery_buffer) {
+
+      producer_queue.retain
+      val credit_adder = createSource(EventAggregators.INTEGER_ADD , producer_queue)
+      credit_adder.setEventHandler(^{
+        internal_credit(credit_adder.getData.intValue)
+      });
+      credit_adder.resume
+
+      private var credits = 0;
+
+      ///////////////////////////////////////////////////
+      // These methods get called from the client/producer thread...
+      ///////////////////////////////////////////////////
+      def close = {
+        credit_adder.release
+        producer_queue.release
+      }
+
+      override def full = credits <= 0
+
+      override protected def send_to_delivery_queue(value:MessageDelivery) = {
+        var delivery = MessageDelivery(value)
+        delivery.setDisposer(^{
+          // This is called from the server/consumer thread
+          credit_adder.merge(delivery.size);
+        })
+        internal_credit(-delivery.size)
+        source.merge(delivery)
+      }
+
+      def internal_credit(value:Int) = {
+        credits += value;
+        if( credits <= 0 ) {
+          credits = 0
+        } else {
+          drainOverflow
+        }
+      }
+
+      ///////////////////////////////////////////////////
+      // These methods get called from the server/consumer thread...
+      ///////////////////////////////////////////////////
+      def credit(value:Int) = ^{ internal_credit(value) } ->: producer_queue
+
+      def drain(callback:Runnable) = {
+        credits = 0
+        if( callback!=null ) {
+          queue << callback
+        }
+      }
+    }
+  }
+
+  def session(queue:DispatchQueue) = {
+    val session = new CreditServer(queue)
+    sessions = session :: sessions
+    session.capacity(session_max_credits)
+    session.client
+  }
+
+
+}

Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.scala?rev=961068&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.scala Wed Jul  7 03:40:18 2010
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker
+
+import _root_.java.util.{LinkedHashMap, HashMap}
+import _root_.org.apache.activemq.util.buffer.{AsciiBuffer}
+import BufferConversions._
+
+class ParserOptions {
+  var defaultDomain:AsciiBuffer = null
+  var queuePrefix:AsciiBuffer = null
+  var topicPrefix:AsciiBuffer = null
+  var tempQueuePrefix:AsciiBuffer = null
+  var tempTopicPrefix:AsciiBuffer = null
+}
+
+trait Destination {
+  def getDomain(): AsciiBuffer
+  def getName(): AsciiBuffer
+  def getDestinations():Seq[Destination]
+}
+
+object Destination {
+
+    /**
+     * Parses a simple destination.
+     *
+     * @param value
+     * @param options
+     * @return
+     */
+    def parse(value:AsciiBuffer, options:ParserOptions ):Destination = {
+        if (options.queuePrefix!=null && value.startsWith(options.queuePrefix)) {
+            var name = value.slice(options.queuePrefix.length, value.length).ascii();
+            return new SingleDestination(Domain.QUEUE_DOMAIN, name);
+        } else if (options.topicPrefix!=null && value.startsWith(options.topicPrefix)) {
+            var name = value.slice(options.topicPrefix.length, value.length).ascii();
+            return new SingleDestination(Domain.TOPIC_DOMAIN, name);
+        } else if (options.tempQueuePrefix!=null && value.startsWith(options.tempQueuePrefix)) {
+            var name = value.slice(options.tempQueuePrefix.length, value.length).ascii();
+            return new SingleDestination(Domain.TEMP_QUEUE_DOMAIN, name);
+        } else if (options.tempTopicPrefix!=null && value.startsWith(options.tempTopicPrefix)) {
+            var name = value.slice(options.tempTopicPrefix.length, value.length).ascii();
+            return new SingleDestination(Domain.TEMP_TOPIC_DOMAIN, name);
+        } else {
+            if( options.defaultDomain==null ) {
+                throw new IllegalArgumentException("Destination domain not provided: "+value);
+            }
+            return new SingleDestination(options.defaultDomain, value);
+        }
+    }
+
+    /**
+     * Parses a destination which may or may not be a composite.
+     *
+     * @param value
+     * @param options
+     * @param compositeSeparator
+     * @return
+     */
+    def parse(value:AsciiBuffer, options:ParserOptions , compositeSeparator:Byte ):Destination = {
+        if( value == null ) {
+            return null;
+        }
+
+        if( value.contains(compositeSeparator) ) {
+            var rc = value.split(compositeSeparator);
+            var md = new MultiDestination();
+            for (buffer <- rc) {
+                md.destinations ::= parse(buffer, options)
+            }
+            return md;
+        }
+        return parse(value, options);
+    }
+}
+
+class SingleDestination(var domain:AsciiBuffer=null, var name:AsciiBuffer=null) extends Destination {
+
+  def getDestinations():Seq[Destination] = null;
+  def getDomain():AsciiBuffer = domain
+  def getName():AsciiBuffer = name
+
+  override def toString() = ""+domain+":"+name
+}
+
+class MultiDestination(var destinations:List[Destination]=Nil) extends Destination {
+
+  def getDestinations():Seq[Destination] = destinations;
+  def getDomain():AsciiBuffer = null
+  def getName():AsciiBuffer = null
+
+  override def toString() = destinations.mkString(",")
+}

Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala?rev=961068&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala Wed Jul  7 03:40:18 2010
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker
+
+import _root_.java.util.{LinkedHashMap, HashMap}
+import _root_.java.lang.{Throwable, String}
+import _root_.org.apache.commons.logging.LogFactory
+import _root_.org.apache.commons.logging.{Log => Logger}
+
+trait Log {
+  val log = LogFactory.getLog(getClass.getName)
+}
+
+/**
+ * A Logging trait you can mix into an implementation class without affecting its public API
+ */
+trait Logging {
+
+  protected def log: Log
+
+  protected def error(message: => String): Unit = log.log.error(message)
+
+  protected def error(e: Throwable): Unit = log.log.error(e.getMessage, e)
+
+  protected def error(message: => String, e: Throwable): Unit = log.log.error(message, e)
+
+  protected def warn(message: => String): Unit = log.log.warn(message)
+
+  protected def warn(message: => String, e: Throwable): Unit = log.log.warn(message, e)
+
+  protected def info(message: => String): Unit = log.log.info(message)
+
+  protected def info(message: => String, e: Throwable): Unit = log.log.info(message, e)
+
+  protected def debug(message: => String): Unit = log.log.debug(message)
+
+  protected def debug(message: => String, e: Throwable): Unit = log.log.debug(message, e)
+
+  protected def trace(message: => String): Unit = log.log.trace(message)
+
+  protected def trace(message: => String, e: Throwable): Unit = log.log.trace(message, e)
+
+}
+

Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ScalaBroker.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ScalaBroker.scala?rev=961068&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ScalaBroker.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ScalaBroker.scala Wed Jul  7 03:40:18 2010
@@ -0,0 +1,571 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker
+
+import _root_.java.io.{File}
+import _root_.java.util.{LinkedList, LinkedHashMap, ArrayList, HashMap}
+import _root_.org.apache.activemq.transport._
+import _root_.org.apache.activemq.Service
+import _root_.java.lang.{String}
+import _root_.org.apache.activemq.util.buffer.{Buffer, UTF8Buffer, AsciiBuffer}
+import _root_.org.apache.activemq.util.{FactoryFinder, IOHelper}
+import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
+import _root_.org.fusesource.hawtdispatch.{DispatchQueue, BaseRetained}
+
+import _root_.scala.collection.JavaConversions._
+
+object BrokerFactory {
+
+    val BROKER_FACTORY_HANDLER_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/apollo/broker/");
+
+    trait Handler {
+        def createBroker(brokerURI:String):Broker
+    }
+
+
+    def createHandler(name:String):Handler = {
+      BROKER_FACTORY_HANDLER_FINDER.newInstance(name).asInstanceOf[Handler]
+    }
+
+    /**
+     * Creates a broker from a URI configuration
+     *
+     * @param brokerURI the URI scheme to configure the broker
+     * @param startBroker whether or not the broker should have its
+     *                {@link Broker#start()} method called after
+     *                construction
+     * @throws Exception
+     */
+    def createBroker(brokerURI:String, startBroker:Boolean=false):Broker = {
+      var split = brokerURI.split(":")
+      if (split.length < 2 ) {
+          throw new IllegalArgumentException("Invalid broker URI, no scheme specified: " + brokerURI)
+      }
+      var handler = createHandler(split(0))
+      var broker = handler.createBroker(brokerURI)
+      if (startBroker) {
+          broker.start();
+      }
+      return broker;
+    }
+
+}
+
+object BufferConversions {
+
+  implicit def toAsciiBuffer(value:String) = new AsciiBuffer(value)
+  implicit def toUTF8Buffer(value:String) = new UTF8Buffer(value)
+  implicit def fromAsciiBuffer(value:AsciiBuffer) = value.toString
+  implicit def fromUTF8Buffer(value:UTF8Buffer) = value.toString
+
+  implicit def toAsciiBuffer(value:Buffer) = value.ascii
+  implicit def toUTF8Buffer(value:Buffer) = value.utf8
+}
+
+import BufferConversions._
+
+object BrokerConstants extends Log {
+  val CONFIGURATION = "CONFIGURATION"
+  val STOPPED = "STOPPED"
+  val STARTING = "STARTING"
+  val STOPPING = "STOPPING"
+  val RUNNING = "RUNNING"
+  val UNKNOWN = "UNKNOWN"
+  
+  val DEFAULT_VIRTUAL_HOST_NAME = new AsciiBuffer("default")
+}
+
+class Broker() extends Service with Logging {
+  
+  import BrokerConstants._
+  override protected def log = BrokerConstants
+
+  class BrokerAcceptListener extends TransportAcceptListener {
+    def onAcceptError(error: Exception): Unit = {
+      warn("Accept error: " + error)
+      debug("Accept error details: ", error)
+    }
+
+    def onAccept(transport: Transport): Unit = {
+      var connection = new BrokerConnection(Broker.this)
+      connection.transport = transport
+      clientConnections.add(connection)
+      try {
+        connection.start
+      }
+      catch {
+        case e1: Exception => {
+          onAcceptError(e1)
+        }
+      }
+    }
+  }
+
+  val q = createQueue("broker");
+
+  var connectUris: List[String] = Nil
+  val virtualHosts: LinkedHashMap[AsciiBuffer, VirtualHost] = new LinkedHashMap[AsciiBuffer, VirtualHost]
+  val transportServers: ArrayList[TransportServer] = new ArrayList[TransportServer]
+  val clientConnections: ArrayList[Connection] = new ArrayList[Connection]
+  var dataDirectory: File = null
+  var state = CONFIGURATION
+  var name = "broker";
+  var defaultVirtualHost: VirtualHost = null
+
+  def removeConnectUri(uri: String): Unit = ^ {
+    this.connectUris = this.connectUris.filterNot(_==uri)
+  } ->: q
+
+  def getVirtualHost(name: AsciiBuffer, cb: (VirtualHost) => Unit) = callback(cb) {
+    virtualHosts.get(name)
+  } ->: q
+
+  def getConnectUris(cb: (List[String]) => Unit) = callback(cb) {
+    connectUris
+  } ->: q
+
+
+  def getDefaultVirtualHost(cb: (VirtualHost) => Unit) = callback(cb) {
+    defaultVirtualHost
+  } ->: q
+
+  def addVirtualHost(host: VirtualHost) = ^ {
+    if (host.names.isEmpty) {
+      throw new IllegalArgumentException("Virtual host must be configured with at least one host name.")
+    }
+    for (name <- host.names) {
+      if (virtualHosts.containsKey(name)) {
+        throw new IllegalArgumentException("Virtual host with host name " + name + " already exists.")
+      }
+    }
+    for (name <- host.names) {
+      virtualHosts.put(name, host)
+    }
+    if (defaultVirtualHost == null) {
+      setDefaultVirtualHost(host)
+    }
+  } ->: q
+
+  def addTransportServer(server: TransportServer) = ^ {
+    state match {
+      case RUNNING =>
+        start(server)
+      case CONFIGURATION =>
+        this.transportServers.add(server)
+      case _ =>
+        throw new IllegalStateException("Cannot add a transport server when broker is: " + state)
+    }
+  } ->: q
+
+  def removeTransportServer(server: TransportServer) = ^ {
+    state match {
+      case RUNNING =>
+        stopTransportServerWrapException(server)
+      case STOPPED =>
+        this.transportServers.remove(server)
+      case CONFIGURATION =>
+        this.transportServers.remove(server)
+      case _ =>
+        throw new IllegalStateException("Cannot add a transport server when broker is: " + state)
+    }
+  } ->: q
+
+
+  def getState(cb: (String) => Unit) = callback(cb) {state} ->: q
+
+
+  def addConnectUri(uri: String) = ^ {
+    this.connectUris = this.connectUris ::: uri::Nil 
+  } ->: q
+
+  def removeVirtualHost(host: VirtualHost) = ^ {
+    for (name <- host.names) {
+      virtualHosts.remove(name)
+    }
+    if (host == defaultVirtualHost) {
+      if (virtualHosts.isEmpty) {
+        defaultVirtualHost = null
+      }
+      else {
+        defaultVirtualHost = virtualHosts.values.iterator.next
+      }
+    }
+  } ->: q
+
+  def setDefaultVirtualHost(defaultVirtualHost: VirtualHost) = ^ {
+    this.defaultVirtualHost = defaultVirtualHost
+  } ->: q
+
+  def getName(cb: (String) => Unit) = callback(cb) {
+    name;
+  } ->: q
+
+
+  private def start(server: TransportServer): Unit = {
+    server.setDispatchQueue(q)
+    server.setAcceptListener(new BrokerAcceptListener)
+    server.start
+  }
+
+
+  final def stop: Unit = ^ {
+    if (state == RUNNING) {
+      state = STOPPING
+
+      for (server <- transportServers) {
+        stop(server)
+      }
+      for (connection <- clientConnections) {
+        stop(connection)
+      }
+      for (virtualHost <- virtualHosts.values) {
+        stop(virtualHost)
+      }
+      state = STOPPED;
+    }
+
+  } ->: q
+
+  def getVirtualHosts(cb: (ArrayList[VirtualHost]) => Unit) = callback(cb) {
+    new ArrayList[VirtualHost](virtualHosts.values)
+  } ->: q
+
+  def getTransportServers(cb: (ArrayList[TransportServer]) => Unit) = callback(cb) {
+    new ArrayList[TransportServer](transportServers)
+  } ->: q
+
+
+
+
+  def start = ^ {
+    if (state == CONFIGURATION) {
+      // We can apply defaults now
+      if (dataDirectory == null) {
+        dataDirectory = new File(IOHelper.getDefaultDataDirectory)
+      }
+
+      if (defaultVirtualHost == null) {
+        defaultVirtualHost = new VirtualHost()
+        defaultVirtualHost.broker = Broker.this
+        defaultVirtualHost.names = DEFAULT_VIRTUAL_HOST_NAME.toString :: Nil
+        virtualHosts.put(DEFAULT_VIRTUAL_HOST_NAME, defaultVirtualHost)
+      }
+
+      state = STARTING
+
+      for (virtualHost <- virtualHosts.values) {
+        virtualHost.start
+      }
+      for (server <- transportServers) {
+        start(server)
+      }
+      state = RUNNING
+    } else {
+      warn("Can only start a broker that is in the " + CONFIGURATION + " state.  Broker was " + state)
+    }
+  } ->: q
+
+  private def stopTransportServerWrapException(server: TransportServer): Unit = {
+    try {
+      server.stop
+    }
+    catch {
+      case e: Exception => {
+        throw new RuntimeException(e)
+      }
+    }
+  }
+
+
+  /**
+   * Helper method to help stop broker services and log error if they fail to start.
+   * @param server
+   */
+  private def stop(server: Service): Unit = {
+    try {
+      server.stop
+    } catch {
+      case e: Exception => {
+        warn("Could not stop " + server + ": " + e)
+        debug("Could not stop " + server + " due to: ", e)
+      }
+    }
+  }
+}
+
+
+trait QueueLifecyleListener {
+
+    /**
+     * A destination has bean created
+     *
+     * @param queue
+     */
+    def onCreate(queue:Queue);
+
+    /**
+     * A destination has bean destroyed
+     *
+     * @param queue
+     */
+    def onDestroy(queue:Queue);
+
+}
+
+
+
+
+object Queue {
+  val maxOutboundSize = 1024*1204*5
+}
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class Queue(val destination:Destination) extends BaseRetained with Route with DeliveryTarget with DeliveryProducer {
+
+
+
+  override val queue:DispatchQueue = createQueue("queue:"+destination);
+  queue.setTargetQueue(getRandomThreadQueue)
+  setDisposer(^{
+    queue.release
+  })
+
+
+  val delivery_buffer  = new DeliveryBuffer
+
+  class ConsumerState(val consumer:DeliveryTargetSession) {
+    var bound=true
+
+    def deliver(value:MessageDelivery):Unit = {
+      val delivery = MessageDelivery(value)
+      delivery.setDisposer(^{
+        ^{ completed(value) } ->:queue
+      })
+      consumer.deliver(delivery);
+      delivery.release
+    }
+
+    def completed(delivery:MessageDelivery) = {
+      // Lets get back on the readyList if  we are still bound.
+      if( bound ) {
+        readyConsumers.addLast(this)
+      }
+      delivery_buffer.ack(delivery)
+    }
+  }
+
+  var allConsumers = Map[DeliveryTarget,ConsumerState]()
+  val readyConsumers = new LinkedList[ConsumerState]()
+
+  def connected(consumers:List[DeliveryTarget]) = bind(consumers)
+  def bind(consumers:List[DeliveryTarget]) = retaining(consumers) {
+      for ( consumer <- consumers ) {
+        val cs = new ConsumerState(consumer.open_session(queue))
+        allConsumers += consumer->cs
+        readyConsumers.addLast(cs)
+      }
+      delivery_buffer.eventHandler.run
+    } ->: queue
+
+  def unbind(consumers:List[DeliveryTarget]) = releasing(consumers) {
+      for ( consumer <- consumers ) {
+        allConsumers.get(consumer) match {
+          case Some(cs)=>
+            cs.bound = false
+            cs.consumer.close
+            allConsumers -= consumer
+            readyConsumers.remove(cs)
+          case None=>
+        }
+      }
+    } ->: queue
+
+  def disconnected() = throw new RuntimeException("unsupported")
+
+  def collocate(value:DispatchQueue):Unit = {
+    if( value.getTargetQueue ne queue.getTargetQueue ) {
+      println(queue.getLabel+" co-locating with: "+value.getLabel);
+      this.queue.setTargetQueue(value.getTargetQueue)
+    }
+  }
+
+
+  delivery_buffer.eventHandler = ^{
+    while( !readyConsumers.isEmpty && !delivery_buffer.isEmpty ) {
+      val cs = readyConsumers.removeFirst
+      val delivery = delivery_buffer.receive
+      cs.deliver(delivery)
+    }
+  }
+
+
+  val deliveryQueue = new DeliveryCreditBufferProtocol(delivery_buffer, queue)
+  def open_session(producer_queue:DispatchQueue) = new DeliveryTargetSession {
+    val session = deliveryQueue.session(producer_queue)
+    val consumer = Queue.this
+    retain
+
+    def deliver(delivery:MessageDelivery) = session.send(delivery)
+    def close = {
+      session.close
+      release
+    }
+  }
+
+  def matches(message:MessageDelivery) = { true }
+
+//  def open_session(producer_queue:DispatchQueue) = new ConsumerSession {
+//    val consumer = StompQueue.this
+//    val deliveryQueue = new DeliveryOverflowBuffer(delivery_buffer)
+//    retain
+//
+//    def deliver(delivery:Delivery) = using(delivery) {
+//      deliveryQueue.send(delivery)
+//    } ->: queue
+//
+//    def close = {
+//      release
+//    }
+//  }
+
+
+}
+
+class XQueue(val destination:Destination) {
+
+// TODO:
+//    private VirtualHost virtualHost;
+//
+//    Queue() {
+//        this.queue = queue;
+//    }
+//
+//    /*
+//     * (non-Javadoc)
+//     *
+//     * @see
+//     * org.apache.activemq.broker.DeliveryTarget#deliver(org.apache.activemq
+//     * .broker.MessageDelivery, org.apache.activemq.flow.ISourceController)
+//     */
+//    public void deliver(MessageDelivery message, ISourceController<?> source) {
+//        queue.add(message, source);
+//    }
+//
+//    public final void addSubscription(final Subscription<MessageDelivery> sub) {
+//        queue.addSubscription(sub);
+//    }
+//
+//    public boolean removeSubscription(final Subscription<MessageDelivery> sub) {
+//        return queue.removeSubscription(sub);
+//    }
+//
+//    public void start() throws Exception {
+//        queue.start();
+//    }
+//
+//    public void stop() throws Exception {
+//        if (queue != null) {
+//            queue.stop();
+//        }
+//    }
+//
+//    public void shutdown(Runnable onShutdown) throws Exception {
+//        if (queue != null) {
+//            queue.shutdown(onShutdown);
+//        }
+//    }
+//
+//    public boolean hasSelector() {
+//        return false;
+//    }
+//
+//    public boolean matches(MessageDelivery message) {
+//        return true;
+//    }
+//
+//    public VirtualHost getBroker() {
+//        return virtualHost;
+//    }
+//
+//    public void setVirtualHost(VirtualHost virtualHost) {
+//        this.virtualHost = virtualHost;
+//    }
+//
+//    public void setDestination(Destination destination) {
+//        this.destination = destination;
+//    }
+//
+//    public final Destination getDestination() {
+//        return destination;
+//    }
+//
+//    public boolean isDurable() {
+//        return true;
+//    }
+//
+//    public static class QueueSubscription implements BrokerSubscription {
+//        Subscription<MessageDelivery> subscription;
+//        final Queue queue;
+//
+//        public QueueSubscription(Queue queue) {
+//            this.queue = queue;
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see
+//         * org.apache.activemq.broker.BrokerSubscription#connect(org.apache.
+//         * activemq.broker.protocol.ProtocolHandler.ConsumerContext)
+//         */
+//        public void connect(ConsumerContext subscription) throws UserAlreadyConnectedException {
+//            this.subscription = subscription;
+//            queue.addSubscription(subscription);
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see
+//         * org.apache.activemq.broker.BrokerSubscription#disconnect(org.apache
+//         * .activemq.broker.protocol.ProtocolHandler.ConsumerContext)
+//         */
+//        public void disconnect(ConsumerContext context) {
+//            queue.removeSubscription(subscription);
+//        }
+//
+//        /* (non-Javadoc)
+//         * @see org.apache.activemq.broker.BrokerSubscription#getDestination()
+//         */
+//        public Destination getDestination() {
+//            return queue.getDestination();
+//        }
+//    }
+
+  // TODO:
+  def matches(message:MessageDelivery) = false
+  def deliver(message:MessageDelivery) = {
+    // TODO:
+  }
+
+  def getDestination() = destination
+
+  def shutdown = {}
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala?rev=961068&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala Wed Jul  7 03:40:18 2010
@@ -0,0 +1,305 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker
+
+import _root_.java.util.{LinkedHashMap, ArrayList, HashMap}
+import _root_.org.apache.activemq.filter.{FilterException, BooleanExpression}
+import path.{PathFilter}
+import _root_.scala.collection.JavaConversions._
+
+trait BrokerSubscription {
+
+    def connect(consumer:ConsumerContext)
+
+    def disconnect(consumer:ConsumerContext)
+
+    def getDestination():Destination
+
+}
+
+
+class CompositeSubscription(val destination:Destination, val subscriptions:List[BrokerSubscription] ) extends BrokerSubscription {
+
+
+  def connect(consumer:ConsumerContext) = {
+    for (sub <- subscriptions) {
+        sub.connect(consumer);
+    }
+  }
+
+  def disconnect(consumer:ConsumerContext) = {
+    for (sub <- subscriptions) {
+        sub.disconnect(consumer);
+    }
+  }
+
+  def getDestination() = destination
+
+}
+
+object WildcardQueueSubscription extends Log {
+
+}
+class WildcardQueueSubscription(val host:VirtualHost, val destination:Destination, val consumer:ConsumerContext) extends BrokerSubscription with QueueLifecyleListener with Logging {
+
+  protected def log = WildcardQueueSubscription
+
+    var filter = PathFilter.parseFilter(destination.getName());
+    val childSubs = new ArrayList[BrokerSubscription]();
+
+
+    ///////////////////////////////////////////////////////////////////
+    // BrokerSubscription interface implementation
+    ///////////////////////////////////////////////////////////////////
+    def connect(cc:ConsumerContext) = {
+        assert(cc == consumer)
+// TODO:
+//        val domain = host.router.getDomain(Broker.QUEUE_DOMAIN);
+//        val matches = domain.route(destination.getName(), null);
+//        for (target <- matches) {
+//            val queue = target.asInstanceOf[Queue]
+//            var childSub = host.createSubscription(consumer, queue.destination);
+//            childSubs.add(childSub);
+//            childSub.connect(consumer);
+//        }
+        host.addDestinationLifecyleListener(this);
+    }
+
+    def disconnect(cc:ConsumerContext) = {
+        assert(cc == consumer)
+          host.removeDestinationLifecyleListener(this);
+          for (childSub <- childSubs) {
+              childSub.disconnect(cc);
+          }
+          childSubs.clear();
+    }
+
+    def getDestination() : Destination =  destination
+
+    ///////////////////////////////////////////////////////////////////
+    // QueueLifecyleListener interface implementation
+    ///////////////////////////////////////////////////////////////////
+    def onCreate(queue:Queue) = {
+        if( filter.matches(queue.destination.getName()) ) {
+            try {
+                var childSub = host.createSubscription(consumer, queue.destination);
+                childSubs.add(childSub);
+                childSub.connect(consumer);
+            } catch {
+              case e:Exception=>
+                warn("Could not create dynamic subscription to "+queue.destination+": "+e);
+                debug("Could not create dynamic subscription to "+queue.destination+": ", e);
+            }
+        }
+    }
+
+    def onDestroy(queue:Queue ) = {
+    }
+
+}
+
+class TopicSubscription { // extends BrokerSubscription with DeliveryTarget {
+  def matches(message:MessageDelivery) = false
+  def deliver(message:MessageDelivery) = {}
+  def connect(consumer:ConsumerContext) = {}
+  def disconnect(consumer:ConsumerContext) = {}
+  def getDestination():Destination = null
+
+//	static final boolean USE_PERSISTENT_QUEUES = true;
+//
+//    protected final BooleanExpression selector;
+//    protected final Destination destination;
+//    protected Subscription<MessageDelivery> connectedSub;
+//    private final VirtualHost host;
+//
+//    //TODO: replace this with a base interface for queue which also support non persistent use case.
+//	private IFlowQueue<MessageDelivery> queue;
+//
+//    TopicSubscription(VirtualHost host, Destination destination, BooleanExpression selector) {
+//        this.host = host;
+//        this.selector = selector;
+//        this.destination = destination;
+//    }
+//
+//    @Override
+//    public String toString() {
+//        return IntrospectionSupport.toString(this);
+//    }
+//
+//    /*
+//     * (non-Javadoc)
+//     *
+//     * @see
+//     * org.apache.activemq.broker.DeliveryTarget#deliver(org.apache.activemq
+//     * .broker.MessageDelivery, org.apache.activemq.flow.ISourceController)
+//     */
+//    public final void deliver(MessageDelivery message, ISourceController<?> source) {
+//        if (matches(message)) {
+//            queue.add(message, source);
+//        }
+//    }
+//
+//    /*
+//     * (non-Javadoc)
+//     *
+//     * @see org.apache.activemq.broker.DeliveryTarget#hasSelector()
+//     */
+//    public boolean hasSelector() {
+//        return selector != null;
+//    }
+//
+//    public synchronized void connect(final ConsumerContext subscription) throws UserAlreadyConnectedException {
+//        if (this.connectedSub == null) {
+//        	if( subscription.isPersistent() ) {
+//        		queue = createPersistentQueue(subscription);
+//        	} else {
+//        		queue = createNonPersistentQueue(subscription);
+//        	}
+//    		queue.start();
+//
+//        	this.connectedSub = subscription;
+//        	this.queue.addSubscription(connectedSub);
+//    		this.host.getRouter().bind(destination, this);
+//        } else if (connectedSub != subscription) {
+//            throw new UserAlreadyConnectedException();
+//        }
+//    }
+//
+//    private IFlowQueue<MessageDelivery> createNonPersistentQueue(final ConsumerContext subscription) {
+//		Flow flow = new Flow(subscription.getResourceName(), false);
+//		String name = subscription.getResourceName();
+//		IFlowLimiter<MessageDelivery> limiter = new SizeLimiter<MessageDelivery>(100, 50);
+//		ExclusiveQueue<MessageDelivery> queue = new ExclusiveQueue<MessageDelivery>(flow, name, limiter);
+//		queue.setDrain( new QueueDispatchTarget<MessageDelivery>() {
+//            public void drain(MessageDelivery elem, ISourceController<MessageDelivery> controller) {
+//                subscription.add(elem, controller);
+//            }
+//        });
+//		return queue;
+//	}
+//
+//	private IFlowQueue<MessageDelivery> createPersistentQueue(ConsumerContext subscription) {
+//        ExclusivePersistentQueue<Long, MessageDelivery> queue = host.getQueueStore().createExclusivePersistentQueue();
+//        return queue;
+//	}
+//
+//    @SuppressWarnings("unchecked")
+//	private void destroyPersistentQueue(IFlowQueue<MessageDelivery> queue) {
+//    	ExclusivePersistentQueue<Long, MessageDelivery> pq = (ExclusivePersistentQueue<Long, MessageDelivery>) queue;
+//		host.getQueueStore().deleteQueue(pq.getDescriptor());
+//	}
+//
+//	public synchronized void disconnect(final ConsumerContext subscription) {
+//        if (connectedSub != null && connectedSub == subscription) {
+//    		this.host.getRouter().unbind(destination, this);
+//    		this.queue.removeSubscription(connectedSub);
+//    		this.connectedSub = null;
+//
+//    		queue.stop();
+//        	if( USE_PERSISTENT_QUEUES ) {
+//        		destroyPersistentQueue(queue);
+//        	}
+//    		queue=null;
+//        }
+//    }
+//
+//
+//
+//	public boolean matches(MessageDelivery message) {
+//        if (selector == null) {
+//            return true;
+//        }
+//
+//        MessageEvaluationContext selectorContext = message.createMessageEvaluationContext();
+//        selectorContext.setDestination(destination);
+//        try {
+//            return (selector.matches(selectorContext));
+//        } catch (FilterException e) {
+//            e.printStackTrace();
+//            return false;
+//        }
+//    }
+//
+//    /*
+//     * (non-Javadoc)
+//     *
+//     * @see org.apache.activemq.broker.BrokerSubscription#getDestination()
+//     */
+//    public Destination getDestination() {
+//        return destination;
+//    }
+
+
+}
+
+class DurableSubscription(val host:VirtualHost, val destination:Destination, val selector:BooleanExpression) { // extends BrokerSubscription with DeliveryTarget {
+
+//    private final IQueue<Long, MessageDelivery> queue;
+//    private Subscription<MessageDelivery> connectedSub;
+    var started = false;
+//  TODO:
+//    this.host.router.bind(destination, this);
+
+    /* (non-Javadoc)
+     * @see org.apache.activemq.broker.BrokerSubscription#getDestination()
+     */
+    def getDestination() = destination
+
+    /* (non-Javadoc)
+     * @see org.apache.activemq.broker.DeliveryTarget#deliver(org.apache.activemq.broker.MessageDelivery, org.apache.activemq.flow.ISourceController)
+     */
+    def deliver(message:MessageDelivery ) = {
+//        TODO:
+//        queue.add(message, source);
+    }
+
+    def connect(subscription:ConsumerContext) = {
+//        TODO:
+//        if (this.connectedSub == null) {
+//            this.connectedSub = subscription;
+//            queue.addSubscription(connectedSub);
+//        } else if (connectedSub != subscription) {
+//            throw new UserAlreadyConnectedException();
+//        }
+    }
+
+  def disconnect(subscription:ConsumerContext) = {
+//        TODO:
+//        if (connectedSub != null && connectedSub == subscription) {
+//            queue.removeSubscription(connectedSub);
+//            connectedSub = null;
+//        }
+    }
+
+    def matches(message:MessageDelivery) = {
+        if (selector != null) {
+          var selectorContext = message.message.messageEvaluationContext
+          selectorContext.setDestination(destination);
+          try {
+              (selector.matches(selectorContext));
+          } catch {
+            case e:FilterException=>
+              e.printStackTrace();
+              false;
+          }
+        } else {
+            true;
+        }
+
+    }
+
+}