You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/17 02:12:50 UTC

svn commit: r964988 [1/2] - in /activemq/sandbox/activemq-apollo-actor: apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-broker/src/main/scala/org/apac...

Author: chirino
Date: Sat Jul 17 00:12:48 2010
New Revision: 964988

URL: http://svn.apache.org/viewvc?rev=964988&view=rev
Log:
Laying foundation for supporting more flexible message routing.
 - Wildcard bits are in now
 - Queues are now created using an extensible 'binding' object which controls how the queue gets connected to destinations.
   - Allows us to use queues to implement durable subs
   - binding controls the queue filter
   - and which destinations it binds to (one queue can bind to multiple destinations)

Added:
    activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/binding-factory.index
    activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala
    activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BindingDTO.java
      - copied, changed from r964573, activemq/sandbox/activemq-apollo-actor/apollo-store/src/main/scala/org/apache/activemq/apollo/store/QueueRecord.java
    activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java
      - copied, changed from r964573, activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java
    activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionBindingDTO.java
      - copied, changed from r964573, activemq/sandbox/activemq-apollo-actor/apollo-store/src/main/scala/org/apache/activemq/apollo/store/QueueRecord.java
    activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/JsonCodec.java
      - copied, changed from r964573, activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java
    activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/PointToPointBindingDTO.java
      - copied, changed from r964573, activemq/sandbox/activemq-apollo-actor/apollo-store/src/main/scala/org/apache/activemq/apollo/store/QueueRecord.java
    activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/XmlCodec.java
      - copied, changed from r964573, activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/XmlEncoderDecoder.java
    activemq/sandbox/activemq-apollo-actor/apollo-dto/src/test/java/org/apache/activemq/apollo/dto/XmlCodecTest.java
      - copied, changed from r964573, activemq/sandbox/activemq-apollo-actor/apollo-dto/src/test/java/org/apache/activemq/apollo/dto/XmlEncoderDecoderTest.java
    activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/
    activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/AnyChildPathNode.java
      - copied, changed from r964573, activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/AnyChildPathNode.java
    activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathFilter.java
      - copied, changed from r964573, activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathFilter.java
    activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMap.java
      - copied, changed from r964573, activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathMap.java
    activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMapEntry.java
      - copied, changed from r964573, activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathMapEntry.java
    activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMapNode.java
      - copied, changed from r964573, activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathMapNode.java
    activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathNode.java
      - copied, changed from r964573, activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathNode.java
    activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathSupport.java
      - copied, changed from r964573, activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathSupport.java
    activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PrefixPathFilter.java
      - copied, changed from r964573, activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PrefixPathFilter.java
    activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/SimplePathFilter.java
      - copied, changed from r964573, activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/SimplePathFilter.java
    activemq/sandbox/activemq-apollo-actor/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/WildcardPathFilter.java
      - copied, changed from r964573, activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/WildcardPathFilter.java
    activemq/sandbox/activemq-apollo-actor/apollo-util/src/test/scala/org/apache/activemq/apollo/util/path/
    activemq/sandbox/activemq-apollo-actor/apollo-util/src/test/scala/org/apache/activemq/apollo/util/path/PathMapMemoryTest.java
      - copied, changed from r964573, activemq/sandbox/activemq-apollo-actor/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/path/PathMapMemoryTest.java
    activemq/sandbox/activemq-apollo-actor/apollo-util/src/test/scala/org/apache/activemq/apollo/util/path/PathMapTest.java
      - copied, changed from r964573, activemq/sandbox/activemq-apollo-actor/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/path/PathMapTest.java
Removed:
    activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
    activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/AnyChildPathNode.java
    activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathFilter.java
    activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathMap.java
    activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathMapEntry.java
    activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathMapNode.java
    activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathNode.java
    activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathSupport.java
    activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PrefixPathFilter.java
    activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/SimplePathFilter.java
    activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/path/WildcardPathFilter.java
    activemq/sandbox/activemq-apollo-actor/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/path/PathMapMemoryTest.java
    activemq/sandbox/activemq-apollo-actor/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/path/PathMapTest.java
    activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/XmlEncoderDecoder.java
    activemq/sandbox/activemq-apollo-actor/apollo-dto/src/test/java/org/apache/activemq/apollo/dto/XmlEncoderDecoderTest.java
Modified:
    activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Destinations.scala
    activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
    activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jaxb/XmlBrokerFactory.scala
    activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala
    activemq/sandbox/activemq-apollo-actor/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
    activemq/sandbox/activemq-apollo-actor/apollo-cassandra/src/main/proto/data.proto
    activemq/sandbox/activemq-apollo-actor/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/store/cassandra/CassandraClient.scala
    activemq/sandbox/activemq-apollo-actor/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/store/cassandra/CassandraStore.scala
    activemq/sandbox/activemq-apollo-actor/apollo-dto/pom.xml
    activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationStatusDTO.java
    activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java
    activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java
    activemq/sandbox/activemq-apollo-actor/apollo-hawtdb/src/main/proto/data.proto
    activemq/sandbox/activemq-apollo-actor/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBClient.scala
    activemq/sandbox/activemq-apollo-actor/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBStore.scala
    activemq/sandbox/activemq-apollo-actor/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
    activemq/sandbox/activemq-apollo-actor/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
    activemq/sandbox/activemq-apollo-actor/apollo-store/src/main/scala/org/apache/activemq/apollo/store/QueueRecord.java
    activemq/sandbox/activemq-apollo-actor/apollo-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala
    activemq/sandbox/activemq-apollo-actor/apollo-store/src/test/scala/org/apache/activemq/apollo/store/StoreBenchmarkSupport.scala
    activemq/sandbox/activemq-apollo-actor/apollo-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala
    activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/scala/org/apache/activemq/apollo/web/ConfigStore.scala
    activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/ConfigurationResource.scala
    activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RootResource.scala
    activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala
    activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/BrokerDTO.scaml
    activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/DestinationStatusDTO.scaml
    activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.scaml

Added: activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/binding-factory.index
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/binding-factory.index?rev=964988&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/binding-factory.index (added)
+++ activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/binding-factory.index Sat Jul 17 00:12:48 2010
@@ -0,0 +1,18 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+org.apache.activemq.apollo.broker.PointToPointBindingFactory
+org.apache.activemq.apollo.broker.DurableSubBindingFactory
\ No newline at end of file

Added: activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala?rev=964988&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala Sat Jul 17 00:12:48 2010
@@ -0,0 +1,233 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker
+
+import org.apache.activemq.apollo.util.ClassFinder
+import org.fusesource.hawtbuf.{Buffer, AsciiBuffer}
+import org.apache.activemq.apollo.dto.{JsonCodec, DurableSubscriptionBindingDTO, PointToPointBindingDTO, BindingDTO}
+import org.apache.activemq.apollo.selector.SelectorParser
+import org.apache.activemq.apollo.filter.{ConstantExpression, BooleanExpression}
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object BindingFactory {
+
+  trait Provider {
+    def create(binding_kind:AsciiBuffer, binding_data:Buffer):Binding
+    def create(binding_dto:BindingDTO):Binding
+  }
+
+  def discover = {
+    val finder = new ClassFinder[Provider]("META-INF/services/org.apache.activemq.apollo/binding-factory.index")
+    finder.new_instances
+  }
+
+  var providers = discover
+
+  def create(binding_kind:AsciiBuffer, binding_data:Buffer):Binding = {
+    providers.foreach { provider=>
+      val rc = provider.create(binding_kind, binding_data)
+      if( rc!=null ) {
+        return rc
+      }
+    }
+    throw new IllegalArgumentException("Invalid binding type: "+binding_kind);
+  }
+  def create(binding_dto:BindingDTO):Binding = {
+    providers.foreach { provider=>
+      val rc = provider.create(binding_dto)
+      if( rc!=null ) {
+        return rc
+      }
+    }
+    throw new IllegalArgumentException("Invalid binding type: "+binding_dto);
+  }
+}
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+trait Binding {
+
+  /**
+   * A user friendly description of the binding.
+   */
+  def label:String
+
+  /**
+   * Wires a queue into the a virtual host based on the binding information contained
+   * in the buffer.
+   */
+  def bind(node:RoutingNode, queue:Queue)
+  
+  def unbind(node:RoutingNode, queue:Queue)
+
+  def binding_kind:AsciiBuffer
+
+  def binding_data:Buffer
+
+  def binding_dto:BindingDTO
+
+  def message_filter:BooleanExpression = ConstantExpression.TRUE
+
+  def destination:AsciiBuffer
+}
+
+object PointToPointBinding {
+  val POINT_TO_POINT_KIND = new AsciiBuffer("p2p")
+  val DESTINATION_PATH = new AsciiBuffer("default");
+}
+
+import PointToPointBinding._
+
+class PointToPointBindingFactory extends BindingFactory.Provider {
+
+  def create(binding_kind:AsciiBuffer, binding_data:Buffer) = {
+    if( binding_kind == POINT_TO_POINT_KIND ) {
+      val dto = new PointToPointBindingDTO
+      dto.destination = binding_data.ascii.toString
+      new PointToPointBinding(binding_data, dto)
+    } else {
+      null
+    }
+  }
+
+  def create(binding_dto:BindingDTO) = {
+    if( binding_dto.isInstanceOf[PointToPointBindingDTO] ) {
+      val p2p_dto = binding_dto.asInstanceOf[PointToPointBindingDTO]
+      val data = new AsciiBuffer(p2p_dto.destination).buffer
+      new PointToPointBinding(data, p2p_dto)
+    } else {
+      null
+    }
+  }
+}
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class PointToPointBinding(val binding_data:Buffer, val binding_dto:PointToPointBindingDTO) extends Binding {
+
+  def binding_kind = POINT_TO_POINT_KIND
+
+  def unbind(node: RoutingNode, queue: Queue) = {
+    if( node.unified ) {
+      node.remove_broadcast_consumer(queue)
+    }
+  }
+
+  def bind(node: RoutingNode, queue: Queue) = {
+    if( node.unified ) {
+      node.add_broadcast_consumer(queue)
+    }
+  }
+
+  def label = binding_dto.destination
+
+  override def hashCode = binding_kind.hashCode ^ binding_data.hashCode
+
+  override def equals(o:Any):Boolean = o match {
+    case x: PointToPointBinding => x.binding_data == binding_data
+    case _ => false
+  }
+
+  def destination = new AsciiBuffer(binding_dto.destination)
+}
+
+
+object DurableSubBinding {
+  val DURABLE_SUB_KIND = new AsciiBuffer("ds")
+}
+
+import DurableSubBinding._
+
+class DurableSubBindingFactory extends BindingFactory.Provider {
+  def create(binding_kind:AsciiBuffer, binding_data:Buffer) = {
+    if( binding_kind == DURABLE_SUB_KIND ) {
+      new DurableSubBinding(binding_data, JsonCodec.decode(binding_data, classOf[DurableSubscriptionBindingDTO]))
+    } else {
+      null
+    }
+  }
+  def create(binding_dto:BindingDTO) = {
+    if( binding_dto.isInstanceOf[DurableSubscriptionBindingDTO] ) {
+      new DurableSubBinding(JsonCodec.encode(binding_dto), binding_dto.asInstanceOf[DurableSubscriptionBindingDTO])
+    } else {
+      null
+    }
+  }
+
+}
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class DurableSubBinding(val binding_data:Buffer, val binding_dto:DurableSubscriptionBindingDTO) extends Binding {
+
+  def binding_kind = DURABLE_SUB_KIND
+
+
+  def unbind(node: RoutingNode, queue: Queue) = {
+    node.add_broadcast_consumer(queue)
+  }
+
+  def bind(node: RoutingNode, queue: Queue) = {
+    node.remove_broadcast_consumer(queue)
+  }
+
+  def label = {
+    var rc = "sub: '"+binding_dto.subscription_id+"'"
+    if( binding_dto.filter!=null ) {
+      rc += " filtering '"+binding_dto.filter+"'"
+    }
+    if( binding_dto.client_id!=null ) {
+      rc += " for client '"+binding_dto.client_id+"'"
+    }
+    rc
+  }
+
+  override def hashCode = binding_kind.hashCode ^ binding_data.hashCode
+
+  override def equals(o:Any):Boolean = o match {
+    case x: DurableSubBinding => x.binding_data == binding_data
+    case _ => false
+  }
+
+  override def message_filter = {
+    if ( binding_dto.filter==null ) {
+      ConstantExpression.TRUE
+    } else {
+      SelectorParser.parse(binding_dto.filter)
+    }
+  }
+
+  def destination = new AsciiBuffer(binding_dto.destination)
+
+}
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Destinations.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Destinations.scala?rev=964988&r1=964987&r2=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Destinations.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Destinations.scala Sat Jul 17 00:12:48 2010
@@ -53,13 +53,13 @@ object DestinationParser {
           }
         } else {
           value.getDomain match {
-            case Domain.QUEUE_DOMAIN =>
+            case Router.QUEUE_DOMAIN =>
               baos.write(options.queuePrefix)
-            case Domain.TOPIC_DOMAIN =>
+            case Router.TOPIC_DOMAIN =>
               baos.write(options.topicPrefix)
-            case Domain.TEMP_QUEUE_DOMAIN =>
+            case Router.TEMP_QUEUE_DOMAIN =>
               baos.write(options.tempQueuePrefix)
-            case Domain.TEMP_TOPIC_DOMAIN =>
+            case Router.TEMP_TOPIC_DOMAIN =>
               baos.write(options.tempTopicPrefix)
           }
           baos.write(value.getName)
@@ -97,16 +97,16 @@ object DestinationParser {
     } else {
       if (options.queuePrefix != null && value.startsWith(options.queuePrefix)) {
         var name = value.slice(options.queuePrefix.length, value.length).ascii();
-        return new SingleDestination(Domain.QUEUE_DOMAIN, name);
+        return new SingleDestination(Router.QUEUE_DOMAIN, name);
       } else if (options.topicPrefix != null && value.startsWith(options.topicPrefix)) {
         var name = value.slice(options.topicPrefix.length, value.length).ascii();
-        return new SingleDestination(Domain.TOPIC_DOMAIN, name);
+        return new SingleDestination(Router.TOPIC_DOMAIN, name);
       } else if (options.tempQueuePrefix != null && value.startsWith(options.tempQueuePrefix)) {
         var name = value.slice(options.tempQueuePrefix.length, value.length).ascii();
-        return new SingleDestination(Domain.TEMP_QUEUE_DOMAIN, name);
+        return new SingleDestination(Router.TEMP_QUEUE_DOMAIN, name);
       } else if (options.tempTopicPrefix != null && value.startsWith(options.tempTopicPrefix)) {
         var name = value.slice(options.tempTopicPrefix.length, value.length).ascii();
-        return new SingleDestination(Domain.TEMP_TOPIC_DOMAIN, name);
+        return new SingleDestination(Router.TEMP_TOPIC_DOMAIN, name);
       } else {
         if (options.defaultDomain == null) {
           return null;

Modified: activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=964988&r1=964987&r2=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Sat Jul 17 00:12:48 2010
@@ -30,28 +30,6 @@ import org.apache.activemq.apollo.store.
 import org.apache.activemq.apollo.util._
 import org.apache.activemq.apollo.util.list._
 
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-trait QueueLifecyleListener {
-
-  /**
-   * A destination has bean created
-   *
-   * @param queue
-   */
-  def onCreate(queue: Queue);
-
-  /**
-   * A destination has bean destroyed
-   *
-   * @param queue
-   */
-  def onDestroy(queue: Queue);
-
-}
-
-
 object Queue extends Log {
   val subcsription_counter = new AtomicInteger(0)
 }
@@ -60,16 +38,18 @@ object Queue extends Log {
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class Queue(val host: VirtualHost, val destination: Destination, val id: Long) extends BaseRetained with Route with DeliveryConsumer with BaseService with DispatchLogging {
+class Queue(val host: VirtualHost, var id:Long, val binding:Binding) extends BaseRetained with Route with DeliveryConsumer with BaseService with DispatchLogging {
   override protected def log = Queue
 
   var all_subscriptions = Map[DeliveryConsumer, Subscription]()
   var fast_subscriptions = List[Subscription]()
 
-  override val dispatchQueue: DispatchQueue = createQueue(destination.toString);
+  val filter = binding.message_filter
+
+  override val dispatchQueue: DispatchQueue = createQueue(binding.label);
   dispatchQueue.setTargetQueue(getRandomThreadQueue)
   dispatchQueue {
-    debug("created queue for: " + destination)
+    debug("created queue for: " + binding.label)
   }
   setDisposer(^ {
     ack_source.release
@@ -177,25 +157,46 @@ class Queue(val host: VirtualHost, val d
     }
 
     if( tune_persistent ) {
-      host.store.listQueueEntryRanges(id, tune_flush_range_size) { ranges=>
-        dispatchQueue {
-          if( !ranges.isEmpty ) {
-
-            ranges.foreach { range =>
-              val entry = new QueueEntry(Queue.this, range.firstQueueSeq).init(range)
-              entries.addLast(entry)
-
-              message_seq_counter = range.lastQueueSeq + 1
-              enqueue_item_counter += range.count
-              enqueue_size_counter += range.size
-            }
 
-            debug("restored: "+enqueue_item_counter)
-          }
+      if( id == -1 ) {
+        id = host.queue_id_counter.incrementAndGet
+
+        val record = new QueueRecord
+        record.key = id
+        record.binding_data = binding.binding_data
+        record.binding_kind = binding.binding_kind
+
+        host.store.addQueue(record) { rc =>
           completed
         }
+
+      } else {
+
+        host.store.listQueueEntryRanges(id, tune_flush_range_size) { ranges=>
+          dispatchQueue {
+            if( !ranges.isEmpty ) {
+
+              ranges.foreach { range =>
+                val entry = new QueueEntry(Queue.this, range.firstQueueSeq).init(range)
+                entries.addLast(entry)
+
+                message_seq_counter = range.lastQueueSeq + 1
+                enqueue_item_counter += range.count
+                enqueue_size_counter += range.size
+              }
+
+              debug("restored: "+enqueue_item_counter)
+            }
+            completed
+          }
+        }
+        
       }
+
     } else {
+      if( id == -1 ) {
+        id = host.queue_id_counter.incrementAndGet
+      }
       completed
     }
   }
@@ -450,7 +451,7 @@ class Queue(val host: VirtualHost, val d
   //
   /////////////////////////////////////////////////////////////////////
 
-  def matches(message: Delivery) = {true}
+  def matches(delivery: Delivery) = filter.matches(delivery.message)
 
   def connect(p: DeliveryProducer) = new DeliverySession {
     retain
@@ -502,7 +503,7 @@ class Queue(val host: VirtualHost, val d
   //
   /////////////////////////////////////////////////////////////////////
 
-  def connected(values: List[DeliveryConsumer]) = bind(values)
+  def connected() = {}
 
   def bind(values: List[DeliveryConsumer]) = retaining(values) {
     for (consumer <- values) {
@@ -583,7 +584,6 @@ class Queue(val host: VirtualHost, val d
       this.dispatchQueue.setTargetQueue(value.getTargetQueue)
     }
   }
-  
 }
 
 object QueueEntry extends Sizer[QueueEntry] {
@@ -1285,7 +1285,6 @@ class QueueEntry(val queue:Queue, val se
 
   }
 
-
 }
 
 /**

Modified: activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=964988&r1=964987&r2=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Sat Jul 17 00:12:48 2010
@@ -21,49 +21,25 @@ import _root_.org.fusesource.hawtbuf._
 import _root_.org.fusesource.hawtdispatch._
 import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
 
-import path.PathMap
 import collection.JavaConversions
-import org.apache.activemq.apollo.util.LongCounter
-import collection.mutable.HashMap
 import org.apache.activemq.apollo.util._
+import collection.mutable.{ListBuffer, HashMap}
+import org.apache.activemq.apollo.store.QueueRecord
+import org.apache.activemq.apollo.dto.{PointToPointBindingDTO, BindingDTO}
+import path.{PathFilter, PathMap}
+import scala.collection.immutable.List
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-object Domain {
+object Router extends Log {
   val TOPIC_DOMAIN = new AsciiBuffer("topic");
   val QUEUE_DOMAIN = new AsciiBuffer("queue");
   val TEMP_TOPIC_DOMAIN = new AsciiBuffer("temp-topic");
   val TEMP_QUEUE_DOMAIN = new AsciiBuffer("temp-queue");
-}
-
-import Domain._
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-class Domain {
-
-  val targets = new PathMap[DeliveryConsumer]();
-
-  def bind(name:AsciiBuffer, queue:DeliveryConsumer) = {
-    targets.put(name, queue);
-  }
-
-  def unbind(name:AsciiBuffer, queue:DeliveryConsumer) = {
-    targets.remove(name, queue);
-  }
-
-//
-//  synchronized public Collection<DeliveryTarget> route(AsciiBuffer name, MessageDelivery delivery) {
-//    return targets.get(name);
-//  }
-
-}
 
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-object Router extends Log {
+  val QUEUE_KIND = new AsciiBuffer("queue");
+  val DEFAULT_QUEUE_PATH = new AsciiBuffer("default");
 }
 
 /**
@@ -81,133 +57,263 @@ object Router extends Log {
  */
 class Router(val host:VirtualHost) extends DispatchLogging {
 
+  override protected def log = Router
+
+  import Router._
+
   val destination_id_counter = new LongCounter
 
-  override protected def log = Router
   protected def dispatchQueue:DispatchQueue = host.dispatchQueue
 
-  trait DestinationNode {
-    val destination:Destination
-    val id = destination_id_counter.incrementAndGet
-    var targets = List[DeliveryConsumer]()
-    var routes = List[DeliveryProducerRoute]()
+  var queues = HashMap[Binding, Queue]()
+
+  // Only stores simple paths, used for wild card lookups.
+  var destinations = new PathMap[RoutingNode]()
+  // Can store consumers on wild cards paths
+  val broadcast_consumers = new PathMap[DeliveryConsumer]()
+  // Can store bindings on wild cards paths
+  val bindings = new PathMap[Queue]()
 
-    def on_bind(x:List[DeliveryConsumer]):Unit
-    def on_unbind(x:List[DeliveryConsumer]):Boolean
-    def on_connect(route:DeliveryProducerRoute):Unit
-    def on_disconnect(route:DeliveryProducerRoute):Boolean = {
-      routes = routes.filterNot({r=> route==r})
-      route.disconnected()
-      routes == Nil && targets == Nil
+  private def is_topic(destination:Destination) = {
+    destination.getDomain match {
+      case TOPIC_DOMAIN => true
+      case TEMP_TOPIC_DOMAIN => true
+      case _ => false
     }
   }
 
-  class TopicDestinationNode(val destination:Destination) extends DestinationNode {
-    def on_bind(x:List[DeliveryConsumer]) =  {
-      targets = x ::: targets
-      routes.foreach({r=>
-        r.bind(x)
-      })
-    }
+  def routing_nodes:Iterable[RoutingNode] = JavaConversions.asIterable(destinations.get(PathFilter.ANY_DESCENDENT))
+  
+  def create_destination_or(destination:AsciiBuffer)(func:(RoutingNode)=>Unit):RoutingNode = {
 
-    def on_unbind(x:List[DeliveryConsumer]):Boolean = {
-      targets = targets.filterNot({t=>x.contains(t)})
-      routes.foreach({r=>
-        r.unbind(x)
-      })
-      routes == Nil && targets == Nil
-    }
+    // We can't create a wild card destination.. only wild card subscriptions.
+    assert( !PathFilter.containsWildCards(destination) )
 
-    def on_connect(route:DeliveryProducerRoute) = {
-      routes = route :: routes
-      route.connected(targets)
-    }
-  }
+    var rc = destinations.chooseValue( destination )
+    if( rc == null ) {
 
-  class QueueDestinationNode(val destination:Destination) extends DestinationNode {
-    var queue:Queue = null
+      // A new destination is being created...
+      rc = new RoutingNode(this, destination )
+      destinations.put(destination, rc)
 
-    // once the queue is created.. connect it up with the producers and targets.
-    host.getQueue(destination) { q =>
-      dispatchQueue {
-        queue = q;
-        queue.bind(targets)
-        routes.foreach({route=>
-          route.connected(queue :: Nil)
-        })
+      // bind any matching wild card subs
+      import JavaConversions._
+      broadcast_consumers.get( destination ).foreach { c=>
+        rc.add_broadcast_consumer(c)
       }
-    }
-
-    def on_bind(x:List[DeliveryConsumer]) =  {
-      targets = x ::: targets
-      if( queue!=null ) {
-        queue.bind(x)
+      bindings.get( destination ).foreach { queue=>
+        rc.add_queue(queue)
       }
+
+    } else {
+      func(rc)
     }
+    rc
+  }
 
-    def on_unbind(x:List[DeliveryConsumer]):Boolean = {
-      targets = targets.filterNot({t=>x.contains(t)})
-      if( queue!=null ) {
-        queue.unbind(x)
+  def get_destination_matches(destination:AsciiBuffer) = {
+    import JavaConversions._
+    asIterable(destinations.get( destination ))
+  }
+
+  def _create_queue(id:Long, binding:Binding):Queue = {
+    val queue = new Queue(host, id, binding)
+    queue.start
+    queues.put(binding, queue)
+
+    // Not all queues are bound to destinations.
+    val name = binding.destination
+    if( name!=null ) {
+      bindings.put(name, queue)
+      // make sure the destination is created if this is not a wild card sub
+      if( !PathFilter.containsWildCards(name) ) {
+        create_destination_or(name) { node=>
+          node.add_queue(queue)
+        }
+      } else {
+        get_destination_matches(name).foreach( node=>
+          node.add_queue(queue)
+        )
       }
-      routes == Nil && targets == Nil
+
     }
+    queue
+  }
 
-    def on_connect(route:DeliveryProducerRoute) = {
-      routes = route :: routes
-      if( queue!=null ) {
-        route.connected(queue :: Nil)
-      }
+  def create_queue(record:QueueRecord) = {
+    _create_queue(record.key, BindingFactory.create(record.binding_kind, record.binding_data))
+  }
+
+  /**
+   * Returns the previously created queue if it already existed.
+   */
+  def _create_queue(dto: BindingDTO): Some[Queue] = {
+    val binding = BindingFactory.create(dto)
+    val queue = queues.get(binding) match {
+      case Some(queue) => Some(queue)
+      case None => Some(_create_queue(-1, binding))
     }
+    queue
   }
 
-  var destinations = new HashMap[Destination, DestinationNode]()
+  def create_queue(dto:BindingDTO)(cb: (Option[Queue])=>Unit) = ^{
+    cb(_create_queue(dto))
+  } >>: dispatchQueue
 
-  private def get(destination:Destination):DestinationNode = {
-    destinations.getOrElseUpdate(destination,
-      if( isTopic(destination) ) {
-        new TopicDestinationNode(destination)
-      } else {
-        new QueueDestinationNode(destination)
-      }
+  /**
+   * Returns true if the queue no longer exists.
+   */
+  def destroy_queue(dto:BindingDTO)(cb: (Boolean)=>Unit) = ^{
+    val binding = BindingFactory.create(dto)
+    val queue = queues.get(binding) match {
+      case Some(queue) =>
+        val name = binding.destination
+        if( name!=null ) {
+          get_destination_matches(name).foreach( node=>
+            node.remove_queue(queue)
+          )
+        }
+        queue.stop
+        true
+      case None =>
+        true
+    }
+    cb(queue)
+  } >>: dispatchQueue
+
+  /**
+   * Gets an existing queue.
+   */
+  def get_queue(dto:BindingDTO)(cb: (Option[Queue])=>Unit) = ^{
+    val binding = BindingFactory.create(dto)
+    cb(queues.get(binding))
+  } >>: dispatchQueue
+
+  def bind(destination:Destination, consumer:DeliveryConsumer) = retaining(consumer) {
+
+    assert( is_topic(destination) )
+
+    val name = destination.getName
+
+    // make sure the destination is created if this is not a wild card sub
+    if( !PathFilter.containsWildCards(name) ) {
+      val node = create_destination_or(name) { node=> }
+    }
+
+    get_destination_matches(name).foreach( node=>
+      node.add_broadcast_consumer(consumer)
     )
-  }
+    broadcast_consumers.put(name, consumer)
+
+  } >>: dispatchQueue
+
+  def unbind(destination:Destination, consumer:DeliveryConsumer) = releasing(consumer) {
+    assert( is_topic(destination) )
+    val name = destination.getName
+    broadcast_consumers.remove(name, consumer)
+    get_destination_matches(name).foreach{ node=>
+      node.remove_broadcast_consumer(consumer)
+    }
+  } >>: dispatchQueue
 
-  def bind(destination:Destination, targets:List[DeliveryConsumer]) = retaining(targets) {
-      get(destination).on_bind(targets)
-    } >>: dispatchQueue
-
-  def unbind(destination:Destination, targets:List[DeliveryConsumer]) = releasing(targets) {
-      if( get(destination).on_unbind(targets) ) {
-        destinations.remove(destination)
-      }
-    } >>: dispatchQueue
 
   def connect(destination:Destination, producer:DeliveryProducer)(completed: (DeliveryProducerRoute)=>Unit) = {
+
     val route = new DeliveryProducerRoute(this, destination, producer) {
       override def on_connected = {
         completed(this);
       }
     }
-    ^ {
-      get(destination).on_connect(route)
-    } >>: dispatchQueue
-  }
 
-  def isTopic(destination:Destination) = destination.getDomain == TOPIC_DOMAIN
-  def isQueue(destination:Destination) = !isTopic(destination)
+    dispatchQueue {
+
+      val topic = is_topic(destination)
+
+      // Looking up the queue will cause it to get created if it does not exist.
+      val queue = if( !topic ) {
+        val dto = new PointToPointBindingDTO
+        dto.destination = destination.getName.toString
+        _create_queue(dto)
+      } else {
+        None
+      }
+
+      val node = create_destination_or(destination.getName) { node=> }
+      if( node.unified || topic ) {
+        node.add_broadcast_producer( route )
+      } else {
+        route.bind( queue.toList )
+      }
+
+      route.connected()
+    }
+  }
 
   def disconnect(route:DeliveryProducerRoute) = releasing(route) {
-      get(route.destination).on_disconnect(route)
-    } >>: dispatchQueue
 
+    val topic = is_topic(route.destination)
+    val node = create_destination_or(route.destination.getName) { node=> }
+    if( node.unified || topic ) {
+      node.remove_broadcast_producer(route)
+    }
+    route.disconnected()
+
+  } >>: dispatchQueue
+
+}
 
-   def each(proc:(Destination, DestinationNode)=>Unit) = dispatchQueue {
-     import JavaConversions._
-     for( (destination, node) <- destinations ) {
-        proc(destination, node)
-     }
-   } 
+
+/**
+ * Tracks state associated with a destination name.
+ */
+class RoutingNode(val router:Router, val name:AsciiBuffer) {
+
+  val id = router.destination_id_counter.incrementAndGet
+
+  var broadcast_producers = ListBuffer[DeliveryProducerRoute]()
+  var broadcast_consumers = ListBuffer[DeliveryConsumer]()
+  var queues = ListBuffer[Queue]()
+
+  // TODO: extract the node's config from the host config object
+  def unified = false
+
+  def add_broadcast_consumer (consumer:DeliveryConsumer) = {
+    broadcast_consumers += consumer
+
+    val list = consumer :: Nil
+    broadcast_producers.foreach({ r=>
+      r.bind(list)
+    })
+  }
+
+  def remove_broadcast_consumer (consumer:DeliveryConsumer) = {
+    broadcast_consumers = broadcast_consumers.filterNot( _ == consumer )
+
+    val list = consumer :: Nil
+    broadcast_producers.foreach({ r=>
+      r.unbind(list)
+    })
+  }
+
+  def add_broadcast_producer (producer:DeliveryProducerRoute) = {
+    broadcast_producers += producer
+    producer.bind(broadcast_consumers.toList)
+  }
+
+  def remove_broadcast_producer (producer:DeliveryProducerRoute) = {
+    broadcast_producers = broadcast_producers.filterNot( _ == producer )
+    producer.unbind(broadcast_consumers.toList)
+  }
+
+  def add_queue (queue:Queue) = {
+    queue.binding.bind(this, queue)
+    queues += queue
+  }
+
+  def remove_queue (queue:Queue) = {
+    queues = queues.filterNot( _ == queue )
+    queue.binding.unbind(this, queue)
+  }
 
 }
 
@@ -216,13 +322,13 @@ class Router(val host:VirtualHost) exten
  */
 trait Route extends Retained {
 
-  def destination:Destination
   def dispatchQueue:DispatchQueue
   val metric = new AtomicLong();
 
-  def connected(targets:List[DeliveryConsumer]):Unit
   def bind(targets:List[DeliveryConsumer]):Unit
   def unbind(targets:List[DeliveryConsumer]):Unit
+  
+  def connected():Unit
   def disconnected():Unit
 
 }
@@ -230,7 +336,7 @@ trait Route extends Retained {
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class DeliveryProducerRoute(val router:Router, val destination:Destination, val producer:DeliveryProducer) extends BaseRetained with Route with Sink[Delivery] with DispatchLogging {
+case class DeliveryProducerRoute(val router:Router, val destination:Destination, val producer:DeliveryProducer) extends BaseRetained with Route with Sink[Delivery] with DispatchLogging {
 
   override protected def log = Router
   override def dispatchQueue = producer.dispatchQueue
@@ -243,8 +349,7 @@ class DeliveryProducerRoute(val router:R
 
   var targets = List[DeliverySession]()
 
-  def connected(targets:List[DeliveryConsumer]) = retaining(targets) {
-    internal_bind(targets)
+  def connected() = ^{
     on_connected
   } >>: dispatchQueue
 

Modified: activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=964988&r1=964987&r2=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Sat Jul 17 00:12:48 2010
@@ -20,15 +20,14 @@ import _root_.java.util.{ArrayList, Hash
 import _root_.java.lang.{String}
 import _root_.org.fusesource.hawtdispatch.{ScalaDispatch, DispatchQueue}
 import _root_.scala.collection.JavaConversions._
-import path.PathFilter
-import org.fusesource.hawtbuf.AsciiBuffer
 import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
 
 import org.apache.activemq.apollo.dto.{VirtualHostDTO}
 import java.util.concurrent.TimeUnit
-import org.apache.activemq.apollo.store.{Store, StoreFactory, QueueRecord}
+import org.apache.activemq.apollo.store.{Store, StoreFactory}
 import org.apache.activemq.apollo.util._
 import ReporterLevel._
+import org.fusesource.hawtbuf.{Buffer, AsciiBuffer}
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -80,8 +79,6 @@ class VirtualHost(val broker: Broker, va
   override val dispatchQueue:DispatchQueue = ScalaDispatch.createQueue("virtual-host");
 
   var config:VirtualHostDTO = _
-  val queues = new HashMap[AsciiBuffer, Queue]()
-  val durableSubs = new HashMap[String, DurableSubscription]()
   val router = new Router(this)
 
   var names:List[String] = Nil;
@@ -159,15 +156,11 @@ class VirtualHost(val broker: Broker, va
               // Use a global queue to so we concurrently restore
               // the queues.
               globalQueue {
-                store.getQueueStatus(queueKey) { x =>
+                store.getQueue(queueKey) { x =>
                   x match {
-                    case Some(info)=>
-
+                    case Some(record)=>
                     dispatchQueue ^{
-                      val dest = DestinationParser.parse(info.record.name, destination_parser_options)
-                      val queue = new Queue(this, dest, queueKey)
-                      queue.start
-                      queues.put(dest.getName, queue)
+                      router.create_queue(record)
                       task.run
                     }
                     case _ =>
@@ -221,159 +214,53 @@ class VirtualHost(val broker: Broker, va
     }
   }
 
-  def getQueue(destination:Destination)(cb: (Queue)=>Unit ) = ^{
-    if( !serviceState.isStarted ) {
-      error("getQueue can only be called while the service is running.")
-      cb(null)
-    } else {
-      var queue = queues.get(destination.getName);
-      if( queue==null && config.auto_create_queues ) {
-        addQueue(destination)(cb)
-      } else  {
-        cb(queue)
-      }
-    }
-  } |>>: dispatchQueue
-
-
 
   // Try to periodically re-balance connections so that consumers/producers
   // are grouped onto the same thread.
   def schedualConnectionRegroup:Unit = {
     def connectionRegroup = {
-      router.each { (destination, node)=>
-        node match {
-          case x:router.TopicDestinationNode=>
-
-            // 1->1 is the easy case...
-            if( node.targets.size==1 && node.routes.size==1 ) {
-              // move the producer to the consumer thread.
-              node.routes.head.producer.collocate( node.targets.head.dispatchQueue )
-            } else {
-              // we need to get fancy perhaps look at rates
-              // to figure out how to be group the connections.
-            }
-
-          case x:router.QueueDestinationNode=>
-
-            if( node.targets.size==1 ) {
-              // move the queue to the consumer
-              x.queue.collocate( node.targets.head.dispatchQueue )
-            } else {
-              // we need to get fancy perhaps look at rates
-              // to figure out how to be group the connections.
-            }
-
-            if( node.routes.size==1 ) {
-              // move the producer to the queue.
-              node.routes.head.producer.collocate( x.queue.dispatchQueue )
-            } else {
-              // we need to get fancy perhaps look at rates
-              // to figure out how to be group the connections.
-            }
-        }
-      }
-      schedualConnectionRegroup
-    }
-    dispatchQueue.dispatchAfter(1, TimeUnit.SECONDS, ^{ if(serviceState.isStarted) { connectionRegroup } } )
-  }
-
-
-  def addQueue(dest:Destination)(cb: (Queue)=>Unit ) = ^{
-    val name = DestinationParser.toBuffer(dest, destination_parser_options)
-    if( store!=null ) {
-      val record = new QueueRecord
-      record.name = name
-      record.key = queue_id_counter.incrementAndGet
-
-      store.addQueue(record) { rc =>
-        rc match {
-          case true =>
-            dispatchQueue {
-              val queue = new Queue(this, dest, record.key)
-              queue.start()
-              queues.put(dest.getName, queue)
-              cb(queue)
-            }
-          case false => // store could not create
-            cb(null)
-        }
-      }
-    } else {
-      val queue = new Queue(this, dest, queue_id_counter.incrementAndGet)
-      queue.start()
-      queues.put(dest.getName, queue)
-      cb(queue)
-    }
 
-  } |>>: dispatchQueue
-
-  def createSubscription(consumer:ConsumerContext):BrokerSubscription = {
-      createSubscription(consumer, consumer.getDestination());
-  }
-
-  def createSubscription(consumer:ConsumerContext, destination:Destination):BrokerSubscription = {
-
-      // First handle composite destinations..
-      var destinations = destination.getDestinations();
-      if (destinations != null) {
-          var subs :List[BrokerSubscription] = Nil
-          for (childDest <- destinations) {
-              subs ::= createSubscription(consumer, childDest);
-          }
-          return new CompositeSubscription(destination, subs);
-      }
-
-      // If it's a Topic...
-//      if ( destination.getDomain == TOPIC_DOMAIN || destination.getDomain == TEMP_TOPIC_DOMAIN ) {
+      // this should really be much more fancy.  It should look at the messaging
+      // rates between producers and consumers, look for natural data flow partitions
+      // and then try to equally divide the load over the available processing
+      // threads/cores.
+//      router.destinations.valuesIterator.foreach { node =>
+        // todo
+//        if( node.get_queue==null ) {
+//          // Looks like a topic destination...
+//
+//          // 1->1 is the easy case...
+//          if( node.direct_consumers.size==1 && node.producers.size==1 ) {
+//            // move the producer to the consumer thread.
+//            node.producers.head.producer.collocate( node.direct_consumers.head.dispatchQueue )
+//          } else {
+//            // we need to get fancy perhaps look at rates
+//            // to figure out how to be group the connections.
+//          }
+//        } else {
+//          // Looks like a queue destination...
 //
-//          // It might be a durable subscription on the topic
-//          if (consumer.isDurable()) {
-//              var dsub = durableSubs.get(consumer.getSubscriptionName());
-//              if (dsub == null) {
-////                    TODO:
-////                    IQueue<Long, MessageDelivery> queue = queueStore.createDurableQueue(consumer.getSubscriptionName());
-////                    queue.start();
-////                    dsub = new DurableSubscription(this, destination, consumer.getSelectorExpression(), queue);
-////                    durableSubs.put(consumer.getSubscriptionName(), dsub);
-//              }
-//              return dsub;
+//          if( node.direct_consumers.size==1 ) {
+//            // move the queue to the consumer
+//            node.get_queue.collocate( node.direct_consumers.head.dispatchQueue )
+//          } else {
+//            // we need to get fancy perhaps look at rates
+//            // to figure out how to be group the connections.
 //          }
 //
-//          // return a standard subscription
-////            TODO:
-////            return new TopicSubscription(this, destination, consumer.getSelectorExpression());
-//          return null;
+//          if( node.producers.size==1 ) {
+//            // move the producer to the queue.
+//            node.producers.head.producer.collocate( node.get_queue.dispatchQueue )
+//          } else {
+//            // we need to get fancy perhaps look at rates
+//            // to figure out how to be group the connections.
+//          }
+//
+//        }
 //      }
-
-      // It looks like a wild card subscription on a queue..
-      if (PathFilter.containsWildCards(destination.getName())) {
-          return new WildcardQueueSubscription(this, destination, consumer);
-      }
-
-      // It has to be a Queue subscription then..
-      var queue = queues.get(destination.getName());
-      if (queue == null) {
-          if (consumer.autoCreateDestination()) {
-//            TODO
-//              queue = createQueue(destination);
-          } else {
-              throw new IllegalStateException("The queue does not exist: " + destination.getName());
-          }
-      }
-//        TODO:
-//        return new Queue.QueueSubscription(queue);
-      return null;
-  }
-
-
-  val queueLifecyleListeners = new ArrayList[QueueLifecyleListener]();
-
-  def addDestinationLifecyleListener(listener:QueueLifecyleListener):Unit= {
-      queueLifecyleListeners.add(listener);
+      schedualConnectionRegroup
+    }
+    dispatchQueue.dispatchAfter(1, TimeUnit.SECONDS, ^{ if(serviceState.isStarted) { connectionRegroup } } )
   }
 
-  def removeDestinationLifecyleListener(listener:QueueLifecyleListener):Unit= {
-      queueLifecyleListeners.add(listener);
-  }
 }

Modified: activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jaxb/XmlBrokerFactory.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jaxb/XmlBrokerFactory.scala?rev=964988&r1=964987&r2=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jaxb/XmlBrokerFactory.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jaxb/XmlBrokerFactory.scala Sat Jul 17 00:12:48 2010
@@ -22,7 +22,7 @@ import java.net.{URL, URI}
 import org.apache.activemq.apollo.broker._
 import org.apache.activemq.apollo.dto._
 import java.lang.String
-import XmlEncoderDecoder._
+import XmlCodec._
 import org.apache.activemq.apollo.util._
 
 class XmlBrokerFactory extends BrokerFactory.Provider {

Modified: activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala?rev=964988&r1=964987&r2=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala Sat Jul 17 00:12:48 2010
@@ -17,11 +17,11 @@
 package org.apache.activemq.apollo.broker.protocol
 
 import java.io.{IOException}
-import org.apache.activemq.apollo.broker.{Message, BrokerConnection}
 import org.fusesource.hawtbuf.{Buffer, AsciiBuffer}
 import org.apache.activemq.apollo.util.ClassFinder
 import org.apache.activemq.apollo.store.MessageRecord
 import org.apache.activemq.apollo.transport._
+import org.apache.activemq.apollo.broker.{Delivery, Message, BrokerConnection}
 
 /**
  * <p>

Modified: activemq/sandbox/activemq-apollo-actor/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala?rev=964988&r1=964987&r2=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala Sat Jul 17 00:12:48 2010
@@ -224,7 +224,7 @@ abstract class BrokerPerfSupport extends
     var dests = new Array[Destination](destCount)
 
     for (i <- 0 until destCount) {
-      val domain = if (PTP) {Domain.QUEUE_DOMAIN} else {Domain.TOPIC_DOMAIN}
+      val domain = if (PTP) {Router.QUEUE_DOMAIN} else {Router.TOPIC_DOMAIN}
       val name = new AsciiBuffer("dest" + (i + 1))
       var bean = new SingleDestination(domain, name)
       dests(i) = bean

Modified: activemq/sandbox/activemq-apollo-actor/apollo-cassandra/src/main/proto/data.proto
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-cassandra/src/main/proto/data.proto?rev=964988&r1=964987&r2=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-cassandra/src/main/proto/data.proto (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-cassandra/src/main/proto/data.proto Sat Jul 17 00:12:48 2010
@@ -32,3 +32,9 @@ message PBQueueEntryRecord {
   optional int32 size = 3;
   optional int32 redeliveries = 4;
 }
+
+message PBQueueRecord {
+  required int64 key=1;
+  optional bytes binding_kind = 2 [java_override_type = "AsciiBuffer"];
+  optional bytes binding_data = 3;
+}
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/store/cassandra/CassandraClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/store/cassandra/CassandraClient.scala?rev=964988&r1=964987&r2=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/store/cassandra/CassandraClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/store/cassandra/CassandraClient.scala Sat Jul 17 00:12:48 2010
@@ -94,6 +94,24 @@ class CassandraClient() {
     pb.freeze.toUnframedByteArray
   }
 
+  implicit def decodeQueueRecord(v: Array[Byte]): QueueRecord = {
+    import PBQueueRecord._
+    val pb = PBQueueRecord.FACTORY.parseUnframed(v)
+    val rc = new QueueRecord
+    rc.key = pb.getKey
+    rc.binding_kind = pb.getBindingKind
+    rc.binding_data = pb.getBindingData
+    rc
+  }
+
+  implicit def encodeQueueRecord(v: QueueRecord): Array[Byte] = {
+    val pb = new PBQueueRecord.Bean
+    pb.setKey(v.key)
+    pb.setBindingKind(v.binding_kind)
+    pb.setBindingData(v.binding_data)
+    pb.freeze.toUnframedByteArray
+  }
+
   def purge() = {
     withSession {
       session =>
@@ -109,7 +127,7 @@ class CassandraClient() {
   def addQueue(record: QueueRecord) = {
     withSession {
       session =>
-        session.insert(schema.queue_name \ (record.key, record.name))
+        session.insert(schema.queue_name \ (record.key, record))
     }
   }
 
@@ -133,25 +151,14 @@ class CassandraClient() {
     }
   }
 
-  def getQueueStatus(id: Long): Option[QueueStatus] = {
+  def getQueue(id: Long): Option[QueueRecord] = {
     withSession {
       session =>
         session.get(schema.queue_name \ id) match {
           case Some(x) =>
+            val record:QueueRecord = x.value
+            Some(record)
 
-            val rc = new QueueStatus
-            rc.record = new QueueRecord
-            rc.record.key = id
-            rc.record.name = new AsciiBuffer(x.value)
-
-            rc.count = session.count( schema.entries \ id )
-            
-            // TODO
-            //          rc.count =
-            //          rc.first =
-            //          rc.last =
-
-            Some(rc)
           case None =>
             None
         }

Modified: activemq/sandbox/activemq-apollo-actor/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/store/cassandra/CassandraStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/store/cassandra/CassandraStore.scala?rev=964988&r1=964987&r2=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/store/cassandra/CassandraStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/store/cassandra/CassandraStore.scala Sat Jul 17 00:12:48 2010
@@ -185,9 +185,9 @@ class CassandraStore extends Store with 
     }
   }
 
-  def getQueueStatus(id: Long)(callback: (Option[QueueStatus]) => Unit) = {
+  def getQueue(id: Long)(callback: (Option[QueueRecord]) => Unit) = {
     blocking {
-      callback( client.getQueueStatus(id) )
+      callback( client.getQueue(id) )
     }
   }
 

Modified: activemq/sandbox/activemq-apollo-actor/apollo-dto/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-dto/pom.xml?rev=964988&r1=964987&r2=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-dto/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-dto/pom.xml Sat Jul 17 00:12:48 2010
@@ -38,6 +38,18 @@
       <artifactId>jackson-core-asl</artifactId>
       <version>${jackson-version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.codehaus.jackson</groupId>
+      <artifactId>jackson-mapper-asl</artifactId>
+      <version>${jackson-version}</version>
+      <optional>true</optional>
+    </dependency>
+    <dependency>
+      <groupId>org.fusesource.hawtbuf</groupId>
+      <artifactId>hawtbuf</artifactId>
+      <version>${hawtbuf-version}</version>
+      <optional>true</optional>
+    </dependency>
 
     <!-- Testing Dependencies -->    
     <dependency>

Copied: activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BindingDTO.java (from r964573, activemq/sandbox/activemq-apollo-actor/apollo-store/src/main/scala/org/apache/activemq/apollo/store/QueueRecord.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BindingDTO.java?p2=activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BindingDTO.java&p1=activemq/sandbox/activemq-apollo-actor/apollo-store/src/main/scala/org/apache/activemq/apollo/store/QueueRecord.java&r1=964573&r2=964988&rev=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-store/src/main/scala/org/apache/activemq/apollo/store/QueueRecord.java (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BindingDTO.java Sat Jul 17 00:12:48 2010
@@ -14,20 +14,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.apollo.store;
+package org.apache.activemq.apollo.dto;
 
-import org.fusesource.hawtbuf.AsciiBuffer;
+import org.codehaus.jackson.annotate.JsonTypeInfo;
 
+import javax.xml.bind.annotation.*;
 
 /**
+ * <p>
+ * </p>
+ *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-public class QueueRecord {
-
-    public long key = -1;
-    public AsciiBuffer name;
-    public AsciiBuffer queueType;
-
-//    public AsciiBuffer parent;
-
+@XmlType(name = "binding")
+@XmlSeeAlso({PointToPointBindingDTO.class, DurableSubscriptionBindingDTO.class})
+@JsonTypeInfo(use=JsonTypeInfo.Id.CLASS, include=JsonTypeInfo.As.PROPERTY, property="@class")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class BindingDTO {
 }
\ No newline at end of file

Copied: activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java (from r964573, activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java?p2=activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java&p1=activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java&r1=964573&r2=964988&rev=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java Sat Jul 17 00:12:48 2010
@@ -18,34 +18,34 @@ package org.apache.activemq.apollo.dto;
 
 import org.codehaus.jackson.annotate.JsonProperty;
 
-import java.util.ArrayList;
-
 import javax.xml.bind.annotation.*;
+import java.util.ArrayList;
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-@XmlRootElement(name = "virtual-host")
+@XmlRootElement(name = "destination")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class VirtualHostDTO extends ServiceDTO<String> {
-
-    @XmlElement(name="host-name", required=true)
-    public ArrayList<String> host_names = new ArrayList<String>();
+public class DestinationDTO {
 
-    @XmlElementRef
-    public StoreDTO store;
+    /**
+     * The name or wild card name of the destination
+     */
+    public String name;
 
     /**
-     * Should queues be auto created when they are first accessed
-     * by clients?
+     * The kind of destination, "queue" or "topic"
      */
-    @JsonProperty("auto_create_queues")
-    @XmlAttribute(name="auto-create-queues")
-    public boolean auto_create_queues = true;
+    public String kind;
 
     /**
-     * Should queues be purged on startup?
+     * If set to true, then routing then there is no difference between
+     * sending to a queue or topic of the same name.  The first time
+     * a queue subscriptions is created, it will act like if a durable
+     * subscription was created on the topic. 
      */
-    @XmlAttribute(name="purge-on-startup")
-    public boolean purge_on_startup = false;
+    public boolean unified = false;
+
+    
+
 }

Modified: activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationStatusDTO.java?rev=964988&r1=964987&r2=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationStatusDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationStatusDTO.java Sat Jul 17 00:12:48 2010
@@ -39,12 +39,6 @@ public class DestinationStatusDTO extend
     public String name;
 
     /**
-     * The routing domain
-     */
-    @XmlAttribute
-    public String domain;
-
-    /**
      * Ids of all connections that are producing to the destination
      */
     @XmlElement(name="producer")

Copied: activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionBindingDTO.java (from r964573, activemq/sandbox/activemq-apollo-actor/apollo-store/src/main/scala/org/apache/activemq/apollo/store/QueueRecord.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionBindingDTO.java?p2=activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionBindingDTO.java&p1=activemq/sandbox/activemq-apollo-actor/apollo-store/src/main/scala/org/apache/activemq/apollo/store/QueueRecord.java&r1=964573&r2=964988&rev=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-store/src/main/scala/org/apache/activemq/apollo/store/QueueRecord.java (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionBindingDTO.java Sat Jul 17 00:12:48 2010
@@ -14,20 +14,30 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.apollo.store;
-
-import org.fusesource.hawtbuf.AsciiBuffer;
+package org.apache.activemq.apollo.dto;
 
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlRootElement;
 
 /**
+ * <p>
+ * </p>
+ *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-public class QueueRecord {
+@XmlRootElement(name = "durable-subscription-binding")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class DurableSubscriptionBindingDTO extends BindingDTO {
+
+    public String destination;
 
-    public long key = -1;
-    public AsciiBuffer name;
-    public AsciiBuffer queueType;
+    public String filter;
 
-//    public AsciiBuffer parent;
+    @XmlAttribute(name="client-id")
+    public String client_id;
 
+    @XmlAttribute(name="subscription-id")
+    public String subscription_id;
 }
\ No newline at end of file

Copied: activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/JsonCodec.java (from r964573, activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/JsonCodec.java?p2=activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/JsonCodec.java&p1=activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java&r1=964573&r2=964988&rev=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/JsonCodec.java Sat Jul 17 00:12:48 2010
@@ -16,36 +16,34 @@
  */
 package org.apache.activemq.apollo.dto;
 
-import org.codehaus.jackson.annotate.JsonProperty;
-
-import java.util.ArrayList;
-
-import javax.xml.bind.annotation.*;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.ByteArrayOutputStream;
+
+import javax.xml.bind.JAXBException;
+import javax.xml.stream.XMLStreamException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.Properties;
 
 /**
+ *
+ *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-@XmlRootElement(name = "virtual-host")
-@XmlAccessorType(XmlAccessType.FIELD)
-public class VirtualHostDTO extends ServiceDTO<String> {
-
-    @XmlElement(name="host-name", required=true)
-    public ArrayList<String> host_names = new ArrayList<String>();
-
-    @XmlElementRef
-    public StoreDTO store;
-
-    /**
-     * Should queues be auto created when they are first accessed
-     * by clients?
-     */
-    @JsonProperty("auto_create_queues")
-    @XmlAttribute(name="auto-create-queues")
-    public boolean auto_create_queues = true;
-
-    /**
-     * Should queues be purged on startup?
-     */
-    @XmlAttribute(name="purge-on-startup")
-    public boolean purge_on_startup = false;
+public class JsonCodec {
+    private static ObjectMapper mapper = new ObjectMapper();
+
+    static public <T> T decode(Buffer buffer, Class<T> type) throws IOException {
+        return mapper.readValue(buffer.in(), type);
+    }
+
+    static public Buffer encode(Object value) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        mapper.writeValue(baos, value);
+        return baos.toBuffer();
+    }
+
 }

Copied: activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/PointToPointBindingDTO.java (from r964573, activemq/sandbox/activemq-apollo-actor/apollo-store/src/main/scala/org/apache/activemq/apollo/store/QueueRecord.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/PointToPointBindingDTO.java?p2=activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/PointToPointBindingDTO.java&p1=activemq/sandbox/activemq-apollo-actor/apollo-store/src/main/scala/org/apache/activemq/apollo/store/QueueRecord.java&r1=964573&r2=964988&rev=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-store/src/main/scala/org/apache/activemq/apollo/store/QueueRecord.java (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/PointToPointBindingDTO.java Sat Jul 17 00:12:48 2010
@@ -14,20 +14,26 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.apollo.store;
+package org.apache.activemq.apollo.dto;
 
-import org.fusesource.hawtbuf.AsciiBuffer;
+import org.codehaus.jackson.annotate.JsonTypeInfo;
 
+import javax.xml.bind.annotation.*;
 
 /**
+ * <p>
+ * </p>
+ *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-public class QueueRecord {
-
-    public long key = -1;
-    public AsciiBuffer name;
-    public AsciiBuffer queueType;
-
-//    public AsciiBuffer parent;
+@XmlRootElement(name = "queue-binding")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class PointToPointBindingDTO extends BindingDTO {
+
+    /**
+     * A label that describes the binding
+     */
+    @XmlAttribute
+    public String destination;
 
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java?rev=964988&r1=964987&r2=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java Sat Jul 17 00:12:48 2010
@@ -38,6 +38,9 @@ public class QueueStatusDTO extends Long
 	@XmlAttribute
 	public long id;
 
+    @XmlAttribute
+    public String label;
+
     @XmlAttribute(name="enqueue-item-counter")
     public long enqueue_item_counter;
 

Modified: activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java?rev=964988&r1=964987&r2=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java Sat Jul 17 00:12:48 2010
@@ -48,4 +48,12 @@ public class VirtualHostDTO extends Serv
      */
     @XmlAttribute(name="purge-on-startup")
     public boolean purge_on_startup = false;
+
+    /**
+     * Holds the configuration for the destinations.
+     */
+    @XmlElement(name="destination")
+    public ArrayList<DestinationDTO> destinations = new ArrayList<DestinationDTO>();
+
+
 }

Copied: activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/XmlCodec.java (from r964573, activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/XmlEncoderDecoder.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/XmlCodec.java?p2=activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/XmlCodec.java&p1=activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/XmlEncoderDecoder.java&r1=964573&r2=964988&rev=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/XmlEncoderDecoder.java (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/XmlCodec.java Sat Jul 17 00:12:48 2010
@@ -33,7 +33,7 @@ import java.util.regex.Pattern;
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-public class XmlEncoderDecoder {
+public class XmlCodec {
 
     /**
      * Changes ${property} with values from a properties object

Copied: activemq/sandbox/activemq-apollo-actor/apollo-dto/src/test/java/org/apache/activemq/apollo/dto/XmlCodecTest.java (from r964573, activemq/sandbox/activemq-apollo-actor/apollo-dto/src/test/java/org/apache/activemq/apollo/dto/XmlEncoderDecoderTest.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-dto/src/test/java/org/apache/activemq/apollo/dto/XmlCodecTest.java?p2=activemq/sandbox/activemq-apollo-actor/apollo-dto/src/test/java/org/apache/activemq/apollo/dto/XmlCodecTest.java&p1=activemq/sandbox/activemq-apollo-actor/apollo-dto/src/test/java/org/apache/activemq/apollo/dto/XmlEncoderDecoderTest.java&r1=964573&r2=964988&rev=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-dto/src/test/java/org/apache/activemq/apollo/dto/XmlEncoderDecoderTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-dto/src/test/java/org/apache/activemq/apollo/dto/XmlCodecTest.java Sat Jul 17 00:12:48 2010
@@ -27,7 +27,7 @@ import static junit.framework.Assert.*;
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 
-public class XmlEncoderDecoderTest {
+public class XmlCodecTest {
 
     private InputStream resource(String path) {
         return getClass().getResourceAsStream(path);
@@ -35,7 +35,7 @@ public class XmlEncoderDecoderTest {
 
     @Test
     public void unmarshalling() throws Exception {
-        BrokerDTO dto = XmlEncoderDecoder.unmarshalBrokerDTO(resource("simple.xml"));
+        BrokerDTO dto = XmlCodec.unmarshalBrokerDTO(resource("simple.xml"));
         assertNotNull(dto);
         assertEquals("default", dto.id);
         assertEquals(true, dto.enabled);
@@ -66,7 +66,7 @@ public class XmlEncoderDecoderTest {
         broker.connectors.add(connector);
         broker.basedir = "./activemq-data/default";
 
-        XmlEncoderDecoder.marshalBrokerDTO(broker, System.out, true);
+        XmlCodec.marshalBrokerDTO(broker, System.out, true);
 
     }
 

Modified: activemq/sandbox/activemq-apollo-actor/apollo-hawtdb/src/main/proto/data.proto
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-hawtdb/src/main/proto/data.proto?rev=964988&r1=964987&r2=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-hawtdb/src/main/proto/data.proto (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-hawtdb/src/main/proto/data.proto Sat Jul 17 00:12:48 2010
@@ -69,9 +69,10 @@ message AddMessage {
 
 message AddQueue {
   required int64 key=1;
-  optional bytes name = 2 [java_override_type = "AsciiBuffer"];
-  optional bytes queueType = 3 [java_override_type = "AsciiBuffer"];
+  optional bytes binding_kind = 2 [java_override_type = "AsciiBuffer"];
+  optional bytes binding_data = 3;
 }
+
 message RemoveQueue {
   required int64 key=1;
 }
@@ -149,9 +150,7 @@ message DatabaseRootRecord {
 
 message QueueRootRecord {
   optional AddQueue info=1;
-  optional int64 size=2;
-  optional int64 count=3;
-  optional fixed32 entryIndexPage=4;
-  optional fixed32 trackingIndexPage=5;
+  optional fixed32 entryIndexPage=2;
+  optional fixed32 trackingIndexPage=3;
 }
 

Modified: activemq/sandbox/activemq-apollo-actor/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBClient.scala?rev=964988&r1=964987&r2=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBClient.scala Sat Jul 17 00:12:48 2010
@@ -224,8 +224,8 @@ class HawtDBClient(hawtDBStore: HawtDBSt
   def addQueue(record: QueueRecord, callback:Runnable) = {
     val update = new AddQueue.Bean()
     update.setKey(record.key)
-    update.setName(record.name)
-    update.setQueueType(record.queueType)
+    update.setBindingKind(record.binding_kind)
+    update.setBindingData(record.binding_data)
     _store(update, callback)
   }
 
@@ -280,26 +280,18 @@ class HawtDBClient(hawtDBStore: HawtDBSt
     rc
   }
 
-  def getQueueStatus(queueKey: Long): Option[QueueStatus] = {
+  def getQueue(queueKey: Long): Option[QueueRecord] = {
     withTx { tx =>
         val helper = new TxHelper(tx)
         import helper._
 
         val queueRecord = queueIndex.get(queueKey)
         if (queueRecord != null) {
-          val rc = new QueueStatus
-          rc.record = new QueueRecord
-          rc.record.key = queueKey
-          rc.record.name = queueRecord.getInfo.getName
-          rc.record.queueType = queueRecord.getInfo.getQueueType
-          rc.count = queueRecord.getCount.toInt
-          rc.size = queueRecord.getSize
-
-          // TODO
-          // rc.first =
-          // rc.last =
-
-          Some(rc)
+          val record = new QueueRecord
+          record.key = queueKey
+          record.binding_kind = queueRecord.getInfo.getBindingKind
+          record.binding_data = queueRecord.getInfo.getBindingData
+          Some(record)
         } else {
           None
         }
@@ -815,12 +807,6 @@ class HawtDBClient(hawtDBStore: HawtDBSt
           if (existing == null) {
             val previous = entryIndex.put(queueSeq, x.freeze)
             if (previous == null) {
-
-              val queueRecordUpdate = queueRecord.copy
-              queueRecordUpdate.setCount(queueRecord.getCount + 1)
-              queueRecordUpdate.setSize(queueRecord.getSize + x.getSize)
-              queueIndex.put(queueKey, queueRecordUpdate.freeze)
-
               addAndGet(messageRefsIndex, new jl.Long(messageKey), 1)
             } else {
               // TODO perhaps treat this like an update?

Modified: activemq/sandbox/activemq-apollo-actor/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBStore.scala?rev=964988&r1=964987&r2=964988&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBStore.scala Sat Jul 17 00:12:48 2010
@@ -160,9 +160,9 @@ class HawtDBStore extends Store with Bas
     }
   }
 
-  def getQueueStatus(queueKey: Long)(callback: (Option[QueueStatus]) => Unit) = {
+  def getQueue(queueKey: Long)(callback: (Option[QueueRecord]) => Unit) = {
     executor_pool {
-      callback( client.getQueueStatus(queueKey) )
+      callback( client.getQueue(queueKey) )
     }
   }