You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/17 02:12:50 UTC

svn commit: r964988 [2/2] - in /activemq/sandbox/activemq-apollo-actor: apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-broker/src/main/scala/org/apac...

Modified: activemq/sandbox/activemq-apollo-actor/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=964988&r1=964987&r2=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Sat Jul 17 00:12:48 2010
@@ -34,6 +34,7 @@ import org.apache.activemq.apollo.filter
 import org.apache.activemq.apollo.transport._
 import org.apache.activemq.apollo.store._
 import org.apache.activemq.apollo.util._
+import org.apache.activemq.apollo.dto.{BindingDTO, DurableSubscriptionBindingDTO, PointToPointBindingDTO}
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -41,11 +42,14 @@ import org.apache.activemq.apollo.util._
 object StompConstants {
 
   val PROTOCOL = "stomp"
+  val DURABLE_PREFIX = ascii("durable:")
+  val DURABLE_QUEUE_KIND = ascii("stomp:sub")
 
   val options = new ParserOptions
-  options.queuePrefix = new AsciiBuffer("/queue/")
-  options.topicPrefix = new AsciiBuffer("/topic/")
-  options.defaultDomain = Domain.QUEUE_DOMAIN
+  options.queuePrefix = ascii("/queue/")
+  options.topicPrefix = ascii("/topic/")
+
+  options.defaultDomain = Router.QUEUE_DOMAIN
 
   implicit def toDestination(value:AsciiBuffer):Destination = {
     val d = DestinationParser.parse(value, options)
@@ -125,7 +129,7 @@ class StompProtocolHandler extends Proto
   
   protected def dispatchQueue:DispatchQueue = connection.dispatchQueue
 
-  class StompConsumer(val destination:Destination, val ackMode:AsciiBuffer, val selector:(AsciiBuffer, BooleanExpression)) extends BaseRetained with DeliveryConsumer {
+  class StompConsumer(val destination:Destination, val ackMode:AsciiBuffer, val selector:(AsciiBuffer, BooleanExpression), val binding:BindingDTO) extends BaseRetained with DeliveryConsumer {
     val dispatchQueue = StompProtocolHandler.this.dispatchQueue
 
     dispatchQueue.retain
@@ -230,7 +234,13 @@ class StompProtocolHandler extends Proto
       producerRoutes = Map()
       consumers.foreach {
         case (_,consumer)=>
-        host.router.unbind(consumer.destination, consumer::Nil)
+          if( consumer.binding==null ) {
+            host.router.unbind(consumer.destination, consumer)
+          } else {
+            host.router.get_queue(consumer.binding) { queue=>
+              queue.foreach( _.unbind(consumer::Nil) )
+            }
+          }
       }
       consumers = Map()
       trace("stomp protocol resources released")
@@ -377,11 +387,20 @@ class StompProtocolHandler extends Proto
   def on_stomp_subscribe(headers:HeaderMap) = {
     get(headers, Headers.Subscribe.DESTINATION) match {
       case Some(dest)=>
-        val destiantion:Destination = dest
+
+        val destination:Destination = dest
 
         var id:AsciiBuffer = get(headers, Headers.Subscribe.ID) match {
           case None => dest
-          case Some(x)=> x
+          case Some(x:AsciiBuffer)=> x
+        }
+
+        val topic = destination.getDomain == Router.TOPIC_DOMAIN
+
+        var durable_name = if( topic && id.startsWith(DURABLE_PREFIX) ) {
+          id
+        } else {
+          null
         }
 
         val ack:AsciiBuffer = get(headers, Headers.Subscribe.ACK_MODE) match {
@@ -407,12 +426,53 @@ class StompProtocolHandler extends Proto
 
         consumers.get(id) match {
           case None=>
-            info("subscribing to: %s", destiantion)
-            val consumer = new StompConsumer(destiantion, ack, selector);
-            host.router.bind(destiantion, consumer :: Nil)
-            consumer.release
+            info("subscribing to: %s", destination)
+
+            val binding: BindingDTO = if( topic && durable_name==null ) {
+              null
+            } else {
+              // Controls how the created queue gets bound
+              // to the destination name space (this is used to
+              // recover the queue on restart and rebind it the
+              // way again)
+              if (topic) {
+                val rc = new DurableSubscriptionBindingDTO
+                rc.destination = destination.getName.toString
+                // TODO:
+                // rc.client_id =
+                rc.subscription_id = durable_name
+                rc.filter = if (selector == null) null else selector._1
+                rc
+              } else {
+                val rc = new PointToPointBindingDTO
+                rc.destination = destination.getName.toString
+                rc
+              }
+            }
+
+            val consumer = new StompConsumer(destination, ack, selector, binding);
             consumers += (id -> consumer)
 
+            if( binding==null ) {
+
+              // consumer is bind bound as a topic
+              host.router.bind(destination, consumer)
+              consumer.release
+
+            } else {
+
+              // create a queue and bind the consumer to it.
+              host.router.create_queue(binding) { x=>
+                x match {
+                  case Some(queue:Queue) =>
+                    queue.bind(consumer::Nil)
+                    consumer.release
+                }
+              }
+            }
+
+
+
           case Some(_)=>
             die("A subscription with identified with '"+id+"' allready exists")
         }

Modified: activemq/sandbox/activemq-apollo-actor/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala?rev=964988&r1=964987&r2=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala Sat Jul 17 00:12:48 2010
@@ -89,7 +89,7 @@ class StompRemoteConsumer extends Remote
     outboundSink = new OverflowSink[StompFrame](MapSink(transportSink){ x=>x })
     outboundSink.refiller = ^{}
 
-    val stompDestination = if (destination.getDomain() == Domain.QUEUE_DOMAIN) {
+    val stompDestination = if (destination.getDomain() == Router.QUEUE_DOMAIN) {
       ascii("/queue/" + destination.getName().toString());
     } else {
       ascii("/topic/" + destination.getName().toString());
@@ -199,7 +199,7 @@ class StompRemoteProducer extends Remote
     outboundSink = new OverflowSink[StompFrame](MapSink(transportSink){ x=>x })
     outboundSink.refiller = ^ { drain }
 
-    if (destination.getDomain() == Domain.QUEUE_DOMAIN) {
+    if (destination.getDomain() == Router.QUEUE_DOMAIN) {
       stompDestination = ascii("/queue/" + destination.getName().toString());
     } else {
       stompDestination = ascii("/topic/" + destination.getName().toString());

Modified: activemq/sandbox/activemq-apollo-actor/apollo-store/src/main/scala/org/apache/activemq/apollo/store/QueueRecord.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-store/src/main/scala/org/apache/activemq/apollo/store/QueueRecord.java?rev=964988&r1=964987&r2=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-store/src/main/scala/org/apache/activemq/apollo/store/QueueRecord.java (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-store/src/main/scala/org/apache/activemq/apollo/store/QueueRecord.java Sat Jul 17 00:12:48 2010
@@ -17,17 +17,14 @@
 package org.apache.activemq.apollo.store;
 
 import org.fusesource.hawtbuf.AsciiBuffer;
+import org.fusesource.hawtbuf.Buffer;
 
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 public class QueueRecord {
-
     public long key = -1;
-    public AsciiBuffer name;
-    public AsciiBuffer queueType;
-
-//    public AsciiBuffer parent;
-
+    public AsciiBuffer binding_kind;
+    public Buffer binding_data;
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/apollo-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala?rev=964988&r1=964987&r2=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala Sat Jul 17 00:12:48 2010
@@ -77,7 +77,7 @@ trait Store extends ServiceTrait {
   /**
    * Loads the queue information for a given queue key.
    */
-  def getQueueStatus(queueKey:Long)(callback:(Option[QueueStatus])=>Unit )
+  def getQueue(queueKey:Long)(callback:(Option[QueueRecord])=>Unit )
 
   /**
    * Gets a listing of all queue entry sequences previously added

Modified: activemq/sandbox/activemq-apollo-actor/apollo-store/src/test/scala/org/apache/activemq/apollo/store/StoreBenchmarkSupport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-store/src/test/scala/org/apache/activemq/apollo/store/StoreBenchmarkSupport.scala?rev=964988&r1=964987&r2=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-store/src/test/scala/org/apache/activemq/apollo/store/StoreBenchmarkSupport.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-store/src/test/scala/org/apache/activemq/apollo/store/StoreBenchmarkSupport.scala Sat Jul 17 00:12:48 2010
@@ -23,7 +23,7 @@ import java.util.concurrent.{TimeUnit, C
 import org.scalatest.{BeforeAndAfterEach, BeforeAndAfterAll}
 import collection.mutable.ListBuffer
 import java.util.concurrent.atomic.{AtomicLong, AtomicInteger, AtomicBoolean}
-import org.apache.activemq.apollo.store.{Store, QueueEntryRecord, QueueStatus, QueueRecord, MessageRecord}
+import org.apache.activemq.apollo.store.{Store, QueueEntryRecord, QueueRecord, MessageRecord}
 import org.apache.activemq.apollo.util.{LoggingTracker, FunSuiteSupport, LongCounter}
 
 /**
@@ -88,7 +88,8 @@ abstract class StoreBenchmarkSupport ext
   def addQueue(name:String):Long = {
     var queueA = new QueueRecord
     queueA.key = queue_key_counter.incrementAndGet
-    queueA.name = ascii(name)
+    queueA.binding_kind = ascii("test")
+    queueA.binding_data = ascii(name)
     val rc:Boolean = CB( cb=> store.addQueue(queueA)(cb) )
     expect(true)(rc)
     queueA.key

Modified: activemq/sandbox/activemq-apollo-actor/apollo-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala?rev=964988&r1=964987&r2=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala Sat Jul 17 00:12:48 2010
@@ -86,7 +86,8 @@ abstract class StoreFunSuiteSupport exte
   def addQueue(name:String):Long = {
     var queueA = new QueueRecord
     queueA.key = queue_key_counter.incrementAndGet
-    queueA.name = ascii(name)
+    queueA.binding_kind = ascii("test")
+    queueA.binding_data = ascii(name)
     val rc:Boolean = CB( cb=> store.addQueue(queueA)(cb) )
     expect(true)(rc)
     queueA.key
@@ -154,12 +155,9 @@ abstract class StoreFunSuiteSupport exte
     val A = addQueue("my queue name")
     populate(A, "message 1"::"message 2"::"message 3"::Nil)
 
-    val rc:Option[QueueStatus] = CB( cb=> store.getQueueStatus(A)(cb) )
+    val rc:Option[QueueRecord] = CB( cb=> store.getQueue(A)(cb) )
     expect(ascii("my queue name")) {
-      rc.get.record.name
-    }
-    expect(3) {
-      rc.get.count
+      rc.get.binding_data.ascii
     }
   }
 

Copied: activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/AnyChildPathNode.java (from r964573, activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/AnyChildPathNode.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/AnyChildPathNode.java?p2=activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/AnyChildPathNode.java&p1=activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/AnyChildPathNode.java&r1=964573&r2=964988&rev=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/AnyChildPathNode.java (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/AnyChildPathNode.java Sat Jul 17 00:12:48 2010
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.apollo.broker.path;
+package org.apache.activemq.apollo.util.path;
 
 import java.util.ArrayList;
 import java.util.Collection;

Copied: activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathFilter.java (from r964573, activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathFilter.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathFilter.java?p2=activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathFilter.java&p1=activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathFilter.java&r1=964573&r2=964988&rev=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathFilter.java (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathFilter.java Sat Jul 17 00:12:48 2010
@@ -15,12 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.activemq.apollo.broker.path;
+package org.apache.activemq.apollo.util.path;
 
 import java.util.ArrayList;
 
-import org.apache.activemq.apollo.broker.Destination;
-import org.apache.activemq.apollo.filter.FilterException;
 import org.fusesource.hawtbuf.AsciiBuffer;
 
 
@@ -34,14 +32,6 @@ public abstract class PathFilter {
     public static final AsciiBuffer ANY_DESCENDENT = new AsciiBuffer(">");
     public static final AsciiBuffer ANY_CHILD = new AsciiBuffer("*");
     
-	public boolean matches(Destination destination) throws FilterException {
-		return matches(destination.getName());
-	}
-	
-	public Object evaluate(Destination destination) throws FilterException {
-		return matches(destination) ? Boolean.TRUE : Boolean.FALSE;
-	}
-	
     public abstract boolean matches(AsciiBuffer path);
 
     public static PathFilter parseFilter(AsciiBuffer path) {

Copied: activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMap.java (from r964573, activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathMap.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMap.java?p2=activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMap.java&p1=activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathMap.java&r1=964573&r2=964988&rev=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathMap.java (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMap.java Sat Jul 17 00:12:48 2010
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.apollo.broker.path;
+package org.apache.activemq.apollo.util.path;
 
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -54,11 +54,11 @@ public class PathMap<Value> {
      * @return a List of matching values or an empty list if there are no
      *         matching values.
      */
-    public synchronized Set<Value> get(AsciiBuffer key) {
+    public Set<Value> get(AsciiBuffer key) {
         return findWildcardMatches(key);
     }
 
-    public synchronized void put(AsciiBuffer key, Value value) {
+    public void put(AsciiBuffer key, Value value) {
         ArrayList<AsciiBuffer> paths = PathSupport.parse(key);
         root.add(paths, 0, value);
     }
@@ -66,7 +66,7 @@ public class PathMap<Value> {
     /**
      * Removes the value from the associated path
      */
-    public synchronized void remove(AsciiBuffer key, Value value) {
+    public void remove(AsciiBuffer key, Value value) {
         ArrayList<AsciiBuffer> paths = PathSupport.parse(key);
         root.remove(paths, 0, value);
 

Copied: activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMapEntry.java (from r964573, activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathMapEntry.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMapEntry.java?p2=activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMapEntry.java&p1=activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathMapEntry.java&r1=964573&r2=964988&rev=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathMapEntry.java (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMapEntry.java Sat Jul 17 00:12:48 2010
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.apollo.broker.path;
+package org.apache.activemq.apollo.util.path;
 
 import org.fusesource.hawtbuf.AsciiBuffer;
 

Copied: activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMapNode.java (from r964573, activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathMapNode.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMapNode.java?p2=activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMapNode.java&p1=activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathMapNode.java&r1=964573&r2=964988&rev=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathMapNode.java (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMapNode.java Sat Jul 17 00:12:48 2010
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.apollo.broker.path;
+package org.apache.activemq.apollo.util.path;
 
 import java.util.ArrayList;
 import java.util.Collection;

Copied: activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathNode.java (from r964573, activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathNode.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathNode.java?p2=activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathNode.java&p1=activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathNode.java&r1=964573&r2=964988&rev=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathNode.java (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathNode.java Sat Jul 17 00:12:48 2010
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.apollo.broker.path;
+package org.apache.activemq.apollo.util.path;
 
 import java.util.ArrayList;
 import java.util.Collection;

Copied: activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathSupport.java (from r964573, activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathSupport.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathSupport.java?p2=activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathSupport.java&p1=activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathSupport.java&r1=964573&r2=964988&rev=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathSupport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathSupport.java Sat Jul 17 00:12:48 2010
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.activemq.apollo.broker.path;
+package org.apache.activemq.apollo.util.path;
 
 import java.util.ArrayList;
 

Copied: activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PrefixPathFilter.java (from r964573, activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PrefixPathFilter.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PrefixPathFilter.java?p2=activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PrefixPathFilter.java&p1=activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PrefixPathFilter.java&r1=964573&r2=964988&rev=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PrefixPathFilter.java (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PrefixPathFilter.java Sat Jul 17 00:12:48 2010
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.activemq.apollo.broker.path;
+package org.apache.activemq.apollo.util.path;
 
 import java.util.ArrayList;
 

Copied: activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/SimplePathFilter.java (from r964573, activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/SimplePathFilter.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/SimplePathFilter.java?p2=activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/SimplePathFilter.java&p1=activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/SimplePathFilter.java&r1=964573&r2=964988&rev=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/SimplePathFilter.java (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/SimplePathFilter.java Sat Jul 17 00:12:48 2010
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.activemq.apollo.broker.path;
+package org.apache.activemq.apollo.util.path;
 
 import org.fusesource.hawtbuf.AsciiBuffer;
 

Copied: activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/WildcardPathFilter.java (from r964573, activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/WildcardPathFilter.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/WildcardPathFilter.java?p2=activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/WildcardPathFilter.java&p1=activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/WildcardPathFilter.java&r1=964573&r2=964988&rev=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/WildcardPathFilter.java (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/WildcardPathFilter.java Sat Jul 17 00:12:48 2010
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.activemq.apollo.broker.path;
+package org.apache.activemq.apollo.util.path;
 
 import java.util.ArrayList;
 

Copied: activemq/sandbox/activemq-apollo-actor/apollo-util/src/test/scala/org/apache/activemq/apollo/util/path/PathMapMemoryTest.java (from r964573, activemq/sandbox/activemq-apollo-actor/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/path/PathMapMemoryTest.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-util/src/test/scala/org/apache/activemq/apollo/util/path/PathMapMemoryTest.java?p2=activemq/sandbox/activemq-apollo-actor/apollo-util/src/test/scala/org/apache/activemq/apollo/util/path/PathMapMemoryTest.java&p1=activemq/sandbox/activemq-apollo-actor/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/path/PathMapMemoryTest.java&r1=964573&r2=964988&rev=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/path/PathMapMemoryTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-util/src/test/scala/org/apache/activemq/apollo/util/path/PathMapMemoryTest.java Sat Jul 17 00:12:48 2010
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.apollo.broker.path;
+package org.apache.activemq.apollo.util.path;
 
 import java.util.Set;
 

Copied: activemq/sandbox/activemq-apollo-actor/apollo-util/src/test/scala/org/apache/activemq/apollo/util/path/PathMapTest.java (from r964573, activemq/sandbox/activemq-apollo-actor/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/path/PathMapTest.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-util/src/test/scala/org/apache/activemq/apollo/util/path/PathMapTest.java?p2=activemq/sandbox/activemq-apollo-actor/apollo-util/src/test/scala/org/apache/activemq/apollo/util/path/PathMapTest.java&p1=activemq/sandbox/activemq-apollo-actor/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/path/PathMapTest.java&r1=964573&r2=964988&rev=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/path/PathMapTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-util/src/test/scala/org/apache/activemq/apollo/util/path/PathMapTest.java Sat Jul 17 00:12:48 2010
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.activemq.apollo.broker.path;
+package org.apache.activemq.apollo.util.path;
 
 import java.util.ArrayList;
 import java.util.Arrays;

Modified: activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/scala/org/apache/activemq/apollo/web/ConfigStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/scala/org/apache/activemq/apollo/web/ConfigStore.scala?rev=964988&r1=964987&r2=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/scala/org/apache/activemq/apollo/web/ConfigStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/scala/org/apache/activemq/apollo/web/ConfigStore.scala Sat Jul 17 00:12:48 2010
@@ -18,7 +18,7 @@ package org.apache.activemq.apollo.web
 
 import org.apache.activemq.apollo.broker._
 import org.apache.activemq.apollo.broker.jaxb.PropertiesReader
-import org.apache.activemq.apollo.dto.{XmlEncoderDecoder, ConnectorDTO, VirtualHostDTO, BrokerDTO}
+import org.apache.activemq.apollo.dto.{XmlCodec, ConnectorDTO, VirtualHostDTO, BrokerDTO}
 import java.util.regex.Pattern
 import javax.xml.stream.{XMLOutputFactory, XMLInputFactory}
 import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
@@ -26,7 +26,7 @@ import java.util.concurrent.{TimeUnit, E
 import org.fusesource.hawtbuf.{ByteArrayInputStream, ByteArrayOutputStream}
 import javax.xml.bind.{Marshaller, JAXBContext}
 import java.io.{OutputStreamWriter, File}
-import XmlEncoderDecoder._
+import XmlCodec._
 import org.apache.activemq.apollo.util._
 
 object ConfigStore {

Modified: activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/ConfigurationResource.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/ConfigurationResource.scala?rev=964988&r1=964987&r2=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/ConfigurationResource.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/ConfigurationResource.scala Sat Jul 17 00:12:48 2010
@@ -24,7 +24,7 @@ import Response._
 import org.apache.activemq.apollo.web.ConfigStore
 import java.net.URI
 import java.io.ByteArrayInputStream
-import org.apache.activemq.apollo.dto.{XmlEncoderDecoder, BrokerDTO}
+import org.apache.activemq.apollo.dto.{XmlCodec, BrokerDTO}
 
 /**
  * A broker resource is used to represent the configuration of a broker.
@@ -57,7 +57,7 @@ case class ConfigurationResource(parent:
 
   @POST @Path("{rev}")
   def post(@PathParam("rev") rev:Int, @FormParam("config") config:String) = {
-    val dto = XmlEncoderDecoder.unmarshalBrokerDTO(new ByteArrayInputStream(config.getBytes("UTF-8")))
+    val dto = XmlCodec.unmarshalBrokerDTO(new ByteArrayInputStream(config.getBytes("UTF-8")))
     put(rev, dto)
     seeOther(path("../"+dto.rev)).build
   }

Modified: activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RootResource.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RootResource.scala?rev=964988&r1=964987&r2=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RootResource.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RootResource.scala Sat Jul 17 00:12:48 2010
@@ -20,7 +20,6 @@ import java.lang.String
 import com.sun.jersey.api.NotFoundException
 import javax.ws.rs._
 import core.{UriInfo, Response, Context}
-import org.fusesource.scalate.util.Logging
 import reflect.{BeanProperty}
 import com.sun.jersey.api.view.ImplicitProduces
 import org.fusesource.hawtdispatch.Future
@@ -34,6 +33,7 @@ import org.fusesource.scalate.RenderCont
 import java.util.concurrent.TimeUnit
 import org.apache.activemq.apollo.dto._
 import java.util.{Arrays, Collections}
+import org.apache.activemq.apollo.util.Logging
 
 /**
  * Defines the default representations to be used on resources

Modified: activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala?rev=964988&r1=964987&r2=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala Sat Jul 17 00:12:48 2010
@@ -113,8 +113,8 @@ case class RuntimeResource(parent:Broker
       result.state_since = virtualHost.serviceState.since
       result.config = virtualHost.config
 
-      virtualHost.router.destinations.valuesIterator.foreach { node=>
-        result.destinations.add(new LongIdLabeledDTO(node.id, node.destination.getName.toString))
+      virtualHost.router.routing_nodes.foreach { node=>
+        result.destinations.add(new LongIdLabeledDTO(node.id, node.name.toString))
       }
 
       if( virtualHost.store != null ) {
@@ -140,77 +140,70 @@ case class RuntimeResource(parent:Broker
   @GET @Path("virtual-hosts/{id}/destinations/{dest}")
   def destination(@PathParam("id") id : Long, @PathParam("dest") dest : Long):DestinationStatusDTO = {
     with_virtual_host(id) { case (virtualHost,cb) =>
-      cb(virtualHost.router.destinations.valuesIterator.find { _.id == dest } map { node=>
+      cb(virtualHost.router.routing_nodes.find { _.id == dest } map { node=>
         val result = new DestinationStatusDTO
         result.id = node.id
-        result.name = node.destination.getName.toString
-        result.domain = node.destination.getDomain.toString
-
-        node match {
-          case qdn:virtualHost.router.QueueDestinationNode =>
-            // todo give queues some descriptive name of what they are being used for.
-            result.queues.add(new LongIdLabeledDTO(qdn.queue.id, qdn.queue.id.toString))
-          case _ =>
+        result.name = node.name.toString
+        node.queues.foreach { q=>
+          result.queues.add(new LongIdLabeledDTO(q.id, q.binding.label))
         }
         result
       })
     }
   }
 
-  @GET @Path("virtual-hosts/{id}/queues/{queue}")
-  def queue(@PathParam("id") id : Long, @PathParam("queue") qid : Long, @QueryParam("entries") entries:Boolean ):QueueStatusDTO = {
+  @GET @Path("virtual-hosts/{id}/destinations/{dest}/queues/{queue}")
+  def queue(@PathParam("id") id : Long, @PathParam("dest") dest : Long, @PathParam("queue") qid : Long, @QueryParam("entries") entries:Boolean ):QueueStatusDTO = {
     with_virtual_host(id) { case (virtualHost,cb) =>
       import JavaConversions._
-      virtualHost.queues.valuesIterator.find { _.id == qid } match {
-        case Some(q:Queue)=>
-          q.dispatchQueue {
-
-            val result = new QueueStatusDTO
-            result.id = q.id
-            result.capacity_used = q.capacity_used
-            result.capacity = q.capacity
-
-            result.enqueue_item_counter = q.enqueue_item_counter
-            result.dequeue_item_counter = q.dequeue_item_counter
-            result.enqueue_size_counter = q.enqueue_size_counter
-            result.dequeue_size_counter = q.dequeue_size_counter
-            result.nack_item_counter = q.nack_item_counter
-            result.nack_size_counter = q.nack_size_counter
-
-            result.queue_size = q.queue_size
-            result.queue_items = q.queue_items
-
-            result.loading_size = q.loading_size
-            result.flushing_size = q.flushing_size
-            result.flushed_items = q.flushed_items
-
-            if( entries ) {
-              var cur = q.head_entry
-              while( cur!=null ) {
-
-                val e = new EntryStatusDTO
-                e.seq = cur.seq
-                e.count = cur.count
-                e.size = cur.size
-                e.consumer_count = cur.parked.size
-                e.prefetch_count = cur.prefetched
-                e.state = cur.label
-
-                result.entries.add(e)
-
-                cur = if( cur == q.tail_entry ) {
-                  null
-                } else {
-                  cur.nextOrTail
-                }
+      val rc = virtualHost.router.routing_nodes.find { _.id == dest } flatMap { node=>
+        node.queues.find  { _.id == qid } map { q=>
+
+          val result = new QueueStatusDTO
+          result.id = q.id
+          result.label = q.binding.label
+          result.capacity_used = q.capacity_used
+          result.capacity = q.capacity
+
+          result.enqueue_item_counter = q.enqueue_item_counter
+          result.dequeue_item_counter = q.dequeue_item_counter
+          result.enqueue_size_counter = q.enqueue_size_counter
+          result.dequeue_size_counter = q.dequeue_size_counter
+          result.nack_item_counter = q.nack_item_counter
+          result.nack_size_counter = q.nack_size_counter
+
+          result.queue_size = q.queue_size
+          result.queue_items = q.queue_items
+
+          result.loading_size = q.loading_size
+          result.flushing_size = q.flushing_size
+          result.flushed_items = q.flushed_items
+
+          if( entries ) {
+            var cur = q.head_entry
+            while( cur!=null ) {
+
+              val e = new EntryStatusDTO
+              e.seq = cur.seq
+              e.count = cur.count
+              e.size = cur.size
+              e.consumer_count = cur.parked.size
+              e.prefetch_count = cur.prefetched
+              e.state = cur.label
+
+              result.entries.add(e)
+
+              cur = if( cur == q.tail_entry ) {
+                null
+              } else {
+                cur.nextOrTail
               }
             }
-
-            cb(Some(result))
           }
-        case None=>
-          cb(None)
+          result
+        }
       }
+      cb(rc)
     }
   }
 

Modified: activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/BrokerDTO.scaml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/BrokerDTO.scaml?rev=964988&r1=964987&r2=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/BrokerDTO.scaml (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/BrokerDTO.scaml Sat Jul 17 00:12:48 2010
@@ -16,7 +16,7 @@
 - val helper = new org.apache.activemq.apollo.web.resources.ViewHelper
 - import helper._
 - import org.fusesource.hawtbuf._
-- import org.apache.activemq.apollo.dto.XmlEncoderDecoder._
+- import org.apache.activemq.apollo.dto.XmlCodec._
 
 %form(method="post" action={it.rev+1})
   %div

Modified: activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/DestinationStatusDTO.scaml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/DestinationStatusDTO.scaml?rev=964988&r1=964987&r2=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/DestinationStatusDTO.scaml (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/DestinationStatusDTO.scaml Sat Jul 17 00:12:48 2010
@@ -19,13 +19,11 @@
 
 %h1 Destination: #{name}
 
-%p domain: #{domain}
-
 %h2 Queues
 %ul
   - for( x <- queues )
     %li
-      %a(href={ path("../../queues/"+x.id) }) #{x.label}
+      %a(href={ path("queues/"+x.id) }) #{x.label}
 
 %h2 Producers
 %ul

Modified: activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.scaml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.scaml?rev=964988&r1=964987&r2=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.scaml (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.scaml Sat Jul 17 00:12:48 2010
@@ -17,13 +17,13 @@
 - val helper = new org.apache.activemq.apollo.web.resources.ViewHelper
 - import helper._
 
-%h1 Queue: #{id}
+%h1 Queue: #{label}
 
 %h2 Current Size
 
 %p queue size: #{queue_items} messages
 %p queue size: #{memory(queue_size)}
-%p memory used: #{ "%,.2f".format(capacity_used.toFloat*100.0/capacity) } (#{memory(capacity_used)}/#{memory(capacity)})
+%p memory used: #{ "%,.2f".format(capacity_used.toFloat*100.0/capacity) }% (#{memory(capacity_used)}/#{memory(capacity)})
 
 %h2 Enqueue/Deqeueue Counters