You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/02/04 15:42:53 UTC

svn commit: r1240510 [2/3] - in /activemq/activemq-apollo/trunk: ./ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/ apollo-dto/src/main/java/org/apache/activemq/apollo/d...

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1240510&r1=1240509&r2=1240510&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala Sat Feb  4 14:42:52 2012
@@ -33,7 +33,7 @@ import security.SecuredResource
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class Topic(val router:LocalRouter, val destination_dto:TopicDestinationDTO, var config_updater: ()=>TopicDTO, val id:String, val path:Path) extends DomainDestination with SecuredResource {
+class Topic(val router:LocalRouter, val address:DestinationAddress, var config_updater: ()=>TopicDTO) extends DomainDestination with SecuredResource {
 
   val topic_metrics = new DestMetricsDTO
 
@@ -138,7 +138,7 @@ class Topic(val router:LocalRouter, val 
       val copy = value.copy();
       copy.uow = value.uow
       copy.ack = value.ack
-      copy.sender = destination_dto
+      copy.sender = address
       downstream.offer(copy)
     }
   }
@@ -170,7 +170,7 @@ class Topic(val router:LocalRouter, val 
 
   import OptionSupport._
 
-  override def toString = destination_dto.toString
+  override def toString = address.toString
 
   def virtual_host: VirtualHost = router.virtual_host
   def now = virtual_host.broker.now
@@ -307,7 +307,7 @@ class Topic(val router:LocalRouter, val 
         if( auto_delete_after!=0 ) {
           dispatch_queue.after(auto_delete_after, TimeUnit.SECONDS) {
             if( previously_idle_at == idled_at ) {
-              router.local_topic_domain.remove_destination(path, this)
+              router.local_topic_domain.remove_destination(address.path, this)
               DestinationMetricsSupport.add_destination_metrics(router.virtual_host.dead_topic_metrics, topic_metrics)
             }
           }
@@ -318,19 +318,18 @@ class Topic(val router:LocalRouter, val 
     }
   }
 
-  def bind (destination: DestinationDTO, consumer:DeliveryConsumer) = {
+  def bind(address: BindAddress, consumer:DeliveryConsumer) = {
 
-    val target = destination match {
-      case null=>
+    val target = address.domain match {
+      case "queue"=>
         // this is the mirrored queue case..
         consumer
-      case destination:TopicDestinationDTO=>
-        var target = consumer
+      case "topic"=>
         slow_consumer_policy match {
           case "queue" =>
 
             // create a temp queue so that it can spool
-            val queue = router._create_queue(new TempQueueBinding(consumer, path, destination_dto))
+            val queue = router._create_queue(new TempQueueBinding(consumer, address))
             queue.dispatch_queue.setTargetQueue(consumer.dispatch_queue)
             queue.bind(List(consumer))
             consumer_queues += consumer->queue
@@ -418,7 +417,7 @@ class Topic(val router:LocalRouter, val 
     check_idle
   }
 
-  def bind_durable_subscription(destination: DurableSubscriptionDestinationDTO, queue:Queue)  = {
+  def bind_durable_subscription(address: SubscriptionAddress, queue:Queue)  = {
     if( !durable_subscriptions.contains(queue) ) {
       durable_subscriptions += queue
       val list = List(queue)
@@ -427,14 +426,14 @@ class Topic(val router:LocalRouter, val 
       })
       consumer_queues.foreach{case (consumer, q)=>
         if( q==queue ) {
-          bind(destination, consumer)
+          bind(address, consumer)
         }
       }
     }
     check_idle
   }
 
-  def unbind_durable_subscription(destination: DurableSubscriptionDestinationDTO, queue:Queue)  = {
+  def unbind_durable_subscription(queue:Queue)  = {
     if( durable_subscriptions.contains(queue) ) {
       durable_subscriptions -= queue
       val list = List(queue)
@@ -450,7 +449,7 @@ class Topic(val router:LocalRouter, val 
     check_idle
   }
 
-  def connect (destination:DestinationDTO, producer:BindableDeliveryProducer) = {
+  def connect (address:ConnectAddress, producer:BindableDeliveryProducer) = {
     val link = new LinkDTO()
     producer.connection match {
       case Some(connection) =>

Modified: activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/DiskBenchmark.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/DiskBenchmark.scala?rev=1240510&r1=1240509&r2=1240510&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/DiskBenchmark.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/DiskBenchmark.scala Sat Feb  4 14:42:52 2012
@@ -220,7 +220,7 @@ class DiskBenchmark extends Action {
         out.println(report)
       }
     } catch {
-      case x:Helper.Failure=> error(x.getMessage)
+      case x:Helper.Failure=> sys.error(x.getMessage)
       case e: Throwable =>
         if (verbose) {
           out.println("ERROR:")

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java?rev=1240510&r1=1240509&r2=1240510&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java Sat Feb  4 14:42:52 2012
@@ -37,41 +37,20 @@ import java.util.List;
 @XmlAccessorType(XmlAccessType.FIELD)
 abstract public class DestinationDTO {
 
-    @XmlElement(name = "path")
-    public List<String> path = new ArrayList<String>();
-
-    public boolean temp;
+    @XmlAttribute(name = "name")
+    public String name;
 
     public DestinationDTO() {
     }
-
-    public DestinationDTO(List<String> path) {
-        this.path = path;
+    public DestinationDTO(String name) {
+        this.name = name;
     }
 
-    public DestinationDTO(String path[]) {
-        this(Arrays.asList(path));
-    }
-
-    public String name(String separator) {
-        StringBuilder sb = new StringBuilder();
-        for (String p : path) {
-            if (sb.length() != 0) {
-                sb.append(separator);
-            }
-            sb.append(p);
-        }
-        return sb.toString();
-    }
-
-    public boolean temp() {
-        return temp;
-    }
-
-    public DestinationDTO temp(boolean temp) {
-        this.temp = temp;
-        return this;
-    }
+    /**
+     * @deprecated 
+     */
+    @XmlElement(name = "path")
+    public List<String> path = new ArrayList<String>();
 
     @Override
     public boolean equals(Object o) {
@@ -80,23 +59,21 @@ abstract public class DestinationDTO {
 
         DestinationDTO that = (DestinationDTO) o;
 
-        if (temp != that.temp) return false;
-        if (!path.equals(that.path)) return false;
+        if (!name.equals(that.name)) return false;
 
         return true;
     }
 
     @Override
     public int hashCode() {
-        int result = path.hashCode();
-        result = 31 * result + (temp ? 1 : 0);
+        int result = name.hashCode();
         return result;
     }
 
     @Override
     public String toString() {
         return "DestinationDTO{" +
-                "path=" + path +
+                "name=" + name +
                 '}';
     }
 

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDestinationDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDestinationDTO.java?rev=1240510&r1=1240509&r2=1240510&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDestinationDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDestinationDTO.java Sat Feb  4 14:42:52 2012
@@ -44,12 +44,12 @@ public class DurableSubscriptionDestinat
     }
 
     public DurableSubscriptionDestinationDTO(String subscription_id) {
-        super(new String[]{subscription_id});
+        super(subscription_id);
     }
 
     @JsonIgnore
     public String subscription_id() {
-        return path.get(0);
+        return name;
     }
 
     /**
@@ -91,10 +91,6 @@ public class DurableSubscriptionDestinat
 
     @Override
     public String toString() {
-        return "DurableSubscriptionDestinationDTO{" +
-                "id='" + subscription_id() + '\'' +
-                ", selector='" + selector + '\'' +
-                ", topics=" + topics +
-                '}';
+        return "dsub:"+name;
     }
 }
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDestinationDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDestinationDTO.java?rev=1240510&r1=1240509&r2=1240510&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDestinationDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDestinationDTO.java Sat Feb  4 14:42:52 2012
@@ -32,10 +32,7 @@ public class QueueDestinationDTO extends
     public QueueDestinationDTO() {
     }
 
-    public QueueDestinationDTO(List<String> name) {
-        super(name);
-    }
-    public QueueDestinationDTO(String[] name) {
+    public QueueDestinationDTO(String name) {
         super(name);
     }
 
@@ -58,9 +55,7 @@ public class QueueDestinationDTO extends
 
     @Override
     public String toString() {
-        return "QueueDestinationDTO{" +
-                "path=" + path +
-                '}';
+        return "queue:"+name;
     }
 
 }
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicDestinationDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicDestinationDTO.java?rev=1240510&r1=1240509&r2=1240510&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicDestinationDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicDestinationDTO.java Sat Feb  4 14:42:52 2012
@@ -32,17 +32,12 @@ public class TopicDestinationDTO extends
     public TopicDestinationDTO() {
     }
 
-    public TopicDestinationDTO(List<String> name) {
-        super(name);
-    }
-    public TopicDestinationDTO(String name[]) {
+    public TopicDestinationDTO(String name) {
         super(name);
     }
 
     @Override
     public String toString() {
-        return "TopicDestinationDTO{" +
-                "path=" + path +
-                '}';
+        return "topic:"+name;
     }
 }
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala?rev=1240510&r1=1240509&r2=1240510&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala Sat Feb  4 14:42:52 2012
@@ -72,14 +72,14 @@ class DestinationAdvisoryRouterListener(
     override def dispatch_queue = router.virtual_host.dispatch_queue
   }
 
-  var producerRoutes = new LRUCache[List[DestinationDTO], ProducerRoute](10) {
-    override def onCacheEviction(eldest: Entry[List[DestinationDTO], ProducerRoute]) = {
+  var producerRoutes = new LRUCache[List[ConnectAddress], ProducerRoute](10) {
+    override def onCacheEviction(eldest: Entry[List[ConnectAddress], ProducerRoute]) = {
       router.disconnect(eldest.getKey.toArray, eldest.getValue)
     }
   }
 
   def on_create(dest: DomainDestination, security: SecurityContext) = {
-    val ow_destination = to_activemq_destination(Array(dest.destination_dto))
+    val ow_destination = to_activemq_destination(Array(dest.address))
     if (ow_destination!=null && !AdvisorySupport.isAdvisoryTopic(ow_destination)) {
       destination_advisories.getOrElseUpdate(ow_destination, {
         var info = new DestinationInfo(null, DestinationInfo.ADD_OPERATION_TYPE, ow_destination)
@@ -92,7 +92,7 @@ class DestinationAdvisoryRouterListener(
   }
 
   def on_destroy(dest: DomainDestination, security: SecurityContext) = {
-    val destination = to_activemq_destination(Array(dest.destination_dto))
+    val destination = to_activemq_destination(Array(dest.address))
     if (destination!=null && !AdvisorySupport.isAdvisoryTopic(destination)) {
       for (info <- destination_advisories.remove(destination)) {
         var info = new DestinationInfo(null, DestinationInfo.REMOVE_OPERATION_TYPE, destination)
@@ -103,7 +103,7 @@ class DestinationAdvisoryRouterListener(
   }
 
   def on_bind(dest: DomainDestination, consumer: DeliveryConsumer, security: SecurityContext) = {
-    val destination = to_activemq_destination(Array(dest.destination_dto))
+    val destination = to_activemq_destination(Array(dest.address))
     if (destination!=null && AdvisorySupport.isDestinationAdvisoryTopic(destination) && !destination_advisories.isEmpty) {
       // replay the destination advisories..
       val producer = new ProducerRoute {
@@ -175,7 +175,7 @@ class DestinationAdvisoryRouterListener(
 
   def send(delivery:Delivery): Unit = {
     val message = delivery.message.asInstanceOf[OpenwireMessage].message
-    val dest: Array[DestinationDTO] = to_destination_dto(message.getDestination,null)
+    val dest = to_destination_dto(message.getDestination,null)
     val key = dest.toList
 
     val route = producerRoutes.get(key) match {

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationConverter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationConverter.scala?rev=1240510&r1=1240509&r2=1240510&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationConverter.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationConverter.scala Sat Feb  4 14:42:52 2012
@@ -17,9 +17,10 @@
 package org.apache.activemq.apollo.openwire
 
 import org.apache.activemq.apollo.dto.{TopicDestinationDTO, QueueDestinationDTO, DestinationDTO}
-import org.apache.activemq.apollo.broker.DestinationParser
 import org.apache.activemq.apollo.openwire.command._
 import java.lang.String
+import org.apache.activemq.apollo.broker.{DestinationAddress, SimpleAddress, DestinationParser}
+import org.apache.activemq.apollo.util.path.{Path, LiteralPart}
 
 /**
  * <p>
@@ -34,67 +35,43 @@ object DestinationConverter {
   OPENWIRE_PARSER.topic_prefix = ActiveMQDestination.TOPIC_QUALIFIED_PREFIX
   OPENWIRE_PARSER.temp_queue_prefix = ActiveMQDestination.TEMP_QUEUE_QUALIFED_PREFIX
   OPENWIRE_PARSER.temp_topic_prefix = ActiveMQDestination.TEMP_TOPIC_QUALIFED_PREFIX
-  OPENWIRE_PARSER.dsub_prefix = null
   OPENWIRE_PARSER.path_separator = "."
   OPENWIRE_PARSER.any_child_wildcard = "*"
   OPENWIRE_PARSER.any_descendant_wildcard = ">"
-  OPENWIRE_PARSER.sanitize_destinations = true
+  OPENWIRE_PARSER.part_pattern = null
+  OPENWIRE_PARSER.regex_wildcard_start = null
+  OPENWIRE_PARSER.regex_wildcard_end = null
 
-  def to_destination_dto(dest: ActiveMQDestination, handler:OpenwireProtocolHandler): Array[DestinationDTO] = {
+  def to_destination_dto(dest: ActiveMQDestination, handler:OpenwireProtocolHandler): Array[SimpleAddress] = {
     def fallback(value:String) = {
       OPENWIRE_PARSER.decode_single_destination(dest.getQualifiedPrefix+value, null)
     }
-    val rc = OPENWIRE_PARSER.decode_multi_destination(dest.getPhysicalName.toString, fallback)
-    rc.foreach { dest =>
-      if( dest.temp() ) {
-        import collection.JavaConversions._
+    OPENWIRE_PARSER.decode_multi_destination(dest.getPhysicalName.toString, fallback).map { dest =>
+      if( dest.domain.startsWith("temp-") ) {
         // Put it back together...
-        val name = dest.path.map(OPENWIRE_PARSER.unsanitize_destination_part(_)).mkString(OPENWIRE_PARSER.path_separator)
-
+        val name = OPENWIRE_PARSER.encode_path(dest.path)
         val (connectionid, rest) = name.splitAt(name.lastIndexOf(':'))
-        val real_path = ("temp" :: handler.broker.id :: OPENWIRE_PARSER.sanitize_destination_part(connectionid) :: OPENWIRE_PARSER.sanitize_destination_part(rest.substring(1)) :: Nil).toArray
-        dest.path = java.util.Arrays.asList(real_path:_*)
+        val real_path = "temp" :: handler.broker.id :: connectionid :: rest.substring(1) :: Nil
+        SimpleAddress(dest.domain.stripPrefix("temp-"), OPENWIRE_PARSER.decode_path(real_path)) 
+      } else {
+        dest
       }
     }
-    rc
   }
 
-  def to_activemq_destination(dests:Array[DestinationDTO]):ActiveMQDestination = {
-    import collection.JavaConversions._
-    var wrapper: (String)=> ActiveMQDestination = null
-    
-    val rc = OPENWIRE_PARSER.encode_destination(dests.flatMap{ dest=>
-      val temp = dest.path.headOption == Some("temp")
-      dest match {
-        case dest:QueueDestinationDTO =>
-          if( temp ) {
-            if(wrapper==null) 
-              wrapper = (x)=>new ActiveMQTempQueue(x)
-            var path: Array[String] = Array(dest.path.toList.drop(2).map(OPENWIRE_PARSER.unsanitize_destination_part(_)).mkString(":"))
-            Some(new QueueDestinationDTO(path).temp(true))
-          } else {
-            if(wrapper==null) 
-              wrapper = (x)=>new ActiveMQQueue(x)
-            Some(dest)
-          }
-        case dest:TopicDestinationDTO =>
-          if( temp ) {
-            if(wrapper==null) 
-              wrapper = (x)=>new ActiveMQTempTopic(x)
-            var path: Array[String] = Array(dest.path.toList.drop(2).map(OPENWIRE_PARSER.unsanitize_destination_part(_)).mkString(":"))
-            Some(new TopicDestinationDTO(path).temp(true))
+  def to_activemq_destination(addresses:Array[_ <: DestinationAddress]):ActiveMQDestination = {
+    ActiveMQDestination.createDestination(OPENWIRE_PARSER.encode_destination(addresses.map{ address=>
+      address.path.parts match {
+        // Remap temp destinations that look like openwire temp destinations.
+        case LiteralPart("temp") :: LiteralPart(broker) :: LiteralPart(session) :: LiteralPart(tempid) :: Nil =>
+          if( session.startsWith("ID:") ) {
+            SimpleAddress("temp-"+address.domain, Path(session+":"+tempid))
           } else {
-            if(wrapper==null) 
-              wrapper = (x)=>new ActiveMQTopic(x)
-            Some(dest)
+            address
           }
-        case _ => None 
+        case _ => address
       }
-    })
+    }))
 
-    if ( wrapper==null )
-      null
-    else
-      wrapper(rc)
   }
 }
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1240510&r1=1240509&r2=1240510&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Sat Feb  4 14:42:52 2012
@@ -36,6 +36,7 @@ import command._
 import org.apache.activemq.apollo.openwire.dto.{OpenwireConnectionStatusDTO,OpenwireDTO}
 import org.apache.activemq.apollo.dto.{AcceptingConnectorDTO, TopicDestinationDTO, DurableSubscriptionDestinationDTO, DestinationDTO}
 import org.apache.activemq.apollo.broker._
+import path.Path
 import protocol._
 import security.SecurityContext
 import DestinationConverter._
@@ -89,8 +90,8 @@ class OpenwireProtocolHandler extends Pr
 
   def broker = connection.connector.broker
 
-  var producerRoutes = new LRUCache[List[DestinationDTO], DeliveryProducerRoute](10) {
-    override def onCacheEviction(eldest: Entry[List[DestinationDTO], DeliveryProducerRoute]) = {
+  var producerRoutes = new LRUCache[List[ConnectAddress], DeliveryProducerRoute](10) {
+    override def onCacheEviction(eldest: Entry[List[ConnectAddress], DeliveryProducerRoute]) = {
       host.router.disconnect(eldest.getKey.toArray, eldest.getValue)
     }
   }
@@ -442,7 +443,7 @@ class OpenwireProtocolHandler extends Pr
 
       security_context.user = Option(info.getUserName).map(_.toString).getOrElse(null)
       security_context.password = Option(info.getPassword).map(_.toString).getOrElse(null)
-      security_context.session_id = Some(OPENWIRE_PARSER.sanitize_destination_part(info.getConnectionId.toString))
+      security_context.session_id = Some(info.getConnectionId.toString)
 
       if( host.authenticator!=null &&  host.authorizer!=null ) {
         suspend_read("authenticating and authorizing connect")
@@ -802,7 +803,7 @@ class OpenwireProtocolHandler extends Pr
     override def toString = "openwire consumer id:"+info.getConsumerId+", remote address: "+security_context.remote_address
 
     var selector_expression:BooleanExpression = _
-    var destination:Array[DestinationDTO] = _
+    var addresses:Array[_ <: BindAddress] = _
 
     val consumer_sink = sink_manager.open()
     val credit_window_filter = new CreditWindowFilter[Delivery](consumer_sink.map { delivery =>
@@ -842,7 +843,7 @@ class OpenwireProtocolHandler extends Pr
     def attach = {
 
       if( info.getDestination == null ) fail("destination was not set")
-      destination = to_destination_dto(info.getDestination, OpenwireProtocolHandler.this)
+      addresses = to_destination_dto(info.getDestination, OpenwireProtocolHandler.this)
 
       // if they are temp dests.. attach our owner id so that we don't
       // get rejected.
@@ -872,21 +873,16 @@ class OpenwireProtocolHandler extends Pr
           subscription_id += parent.parent.info.getClientId + ":"
         }
         subscription_id += info.getSubscriptionName
-
-        val rc = new DurableSubscriptionDestinationDTO(subscription_id)
-        rc.selector = Option(info.getSelector).map(_.toString).getOrElse(null)
-
-        destination.foreach { _ match {
-          case x:TopicDestinationDTO=>
-            rc.topics.add(new TopicDestinationDTO(x.path))
+        val selector = Option(info.getSelector).map(_.toString).getOrElse(null)
+        addresses.foreach { _ match {
+          case SimpleAddress("topic", _) =>
           case _ => die("A durable subscription can only be used on a topic destination")
-          }
-        }
-        destination = Array(rc)
+        }}
+        addresses = Array(SubscriptionAddress(Path(subscription_id), selector, addresses))
       }
 
       host.dispatch_queue {
-        val rc = host.router.bind(destination, this, security_context)
+        val rc = host.router.bind(addresses, this, security_context)
         this.release
         dispatchQueue {
           rc match {
@@ -900,7 +896,7 @@ class OpenwireProtocolHandler extends Pr
     }
 
     def dettach = {
-      host.router.unbind(destination, this, false , security_context)
+      host.router.unbind(addresses, this, false , security_context)
       parent.consumers.remove(info.getConsumerId)
       all_consumers.remove(info.getConsumerId)
     }
@@ -973,7 +969,7 @@ class OpenwireProtocolHandler extends Pr
         })
         if( info.getDestination.isTemporary ) {
           dispatch_queue {
-            val rc = host.router.delete(destination, security_context)
+            val rc = host.router.delete(addresses, security_context)
             dispatchQueue {
               rc match {
                 case Some(error) => async_die(error)
@@ -1106,7 +1102,7 @@ class OpenwireProtocolHandler extends Pr
           }
 
           if( !found ) {
-            trace("%s: ACK failed, invalid message id: %s, dest: %s".format(security_context.remote_address, msgid, destination.mkString(",")))
+            trace("%s: ACK failed, invalid message id: %s, dest: %s".format(security_context.remote_address, msgid, addresses.mkString(",")))
           } else {
             consumer_acks = not_acked
             acked.foreach{case (id, delivery)=>

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQDestination.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQDestination.java?rev=1240510&r1=1240509&r2=1240510&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQDestination.java (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQDestination.java Sat Feb  4 14:42:52 2012
@@ -55,8 +55,7 @@ abstract public class ActiveMQDestinatio
 
   // static helper methods for working with destinations
   // -------------------------------------------------------------------------
-  public static ActiveMQDestination createDestination(String name, byte defaultType) {
-
+  public static ActiveMQDestination createDestination(String name) {
       if (name.startsWith(QUEUE_QUALIFIED_PREFIX)) {
           return new ActiveMQQueue(name.substring(QUEUE_QUALIFIED_PREFIX.length()));
       } else if (name.startsWith(TOPIC_QUALIFIED_PREFIX)) {
@@ -65,19 +64,8 @@ abstract public class ActiveMQDestinatio
           return new ActiveMQTempQueue(name.substring(TEMP_QUEUE_QUALIFED_PREFIX.length()));
       } else if (name.startsWith(TEMP_TOPIC_QUALIFED_PREFIX)) {
           return new ActiveMQTempTopic(name.substring(TEMP_TOPIC_QUALIFED_PREFIX.length()));
-      }
-
-      switch (defaultType) {
-      case QUEUE_TYPE:
-          return new ActiveMQQueue(name);
-      case TOPIC_TYPE:
-          return new ActiveMQTopic(name);
-      case TEMP_QUEUE_TYPE:
-          return new ActiveMQTempQueue(name);
-      case TEMP_TOPIC_TYPE:
-          return new ActiveMQTempTopic(name);
-      default:
-          throw new IllegalArgumentException("Invalid default destination type: " + defaultType);
+      } else {
+          return null;
       }
   }
 

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/TypeConversionSupport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/TypeConversionSupport.java?rev=1240510&r1=1240509&r2=1240510&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/TypeConversionSupport.java (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/TypeConversionSupport.java Sat Feb  4 14:42:52 2012
@@ -134,7 +134,7 @@ public final class TypeConversionSupport
         });
         CONVERSION_MAP.put(new ConversionKey(String.class, ActiveMQDestination.class), new Converter() {
             public Object convert(Object value) {
-                return ActiveMQDestination.createDestination((String)value, ActiveMQDestination.QUEUE_TYPE);
+                return ActiveMQDestination.createDestination((String)value);
             }
         });
     }

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1240510&r1=1240509&r2=1240510&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Sat Feb  4 14:42:52 2012
@@ -32,12 +32,12 @@ import org.apache.activemq.apollo.broker
 import org.apache.activemq.apollo.util._
 import java.util.concurrent.TimeUnit
 import java.util.Map.Entry
-import path.PathParser
 import java.security.cert.X509Certificate
 import collection.mutable.{ListBuffer, HashMap}
 import java.io.IOException
 import org.apache.activemq.apollo.dto._
 import org.fusesource.hawtdispatch.transport.{SecureTransport, HeartBeatMonitor, SslTransport}
+import path.{LiteralPart, Path, PathParser}
 
 
 case class RichBuffer(self:Buffer) extends Proxy {
@@ -137,7 +137,7 @@ class StompProtocolHandler extends Proto
   class StompConsumer (
 
     val subscription_id:Option[AsciiBuffer],
-    val destination:Array[DestinationDTO],
+    val addresses:Array[_ <: BindAddress],
     ack_mode:AsciiBuffer,
     val selector:(String, BooleanExpression),
     override val browser:Boolean,
@@ -305,7 +305,7 @@ class StompProtocolHandler extends Proto
         }
 
         if( !found ) {
-          trace("%s: ACK failed, invalid message id: %s, dest: %s".format(security_context.remote_address, msgid, destination.mkString(",")))
+          trace("%s: ACK failed, invalid message id: %s, dest: %s".format(security_context.remote_address, msgid, addresses.mkString(",")))
         } else {
           consumer_acks = not_acked
           acked.foreach{case (id, delivery)=>
@@ -542,8 +542,8 @@ class StompProtocolHandler extends Proto
   var closed = false
   var consumers = Map[AsciiBuffer, StompConsumer]()
 
-  var producerRoutes = new LRUCache[List[DestinationDTO], DeliveryProducerRoute](10) {
-    override def onCacheEviction(eldest: Entry[List[DestinationDTO], DeliveryProducerRoute]) = {
+  var producerRoutes = new LRUCache[List[ConnectAddress], DeliveryProducerRoute](10) {
+    override def onCacheEviction(eldest: Entry[List[ConnectAddress], DeliveryProducerRoute]) = {
       host.router.disconnect(eldest.getKey.toArray, eldest.getValue)
     }
   }
@@ -565,27 +565,22 @@ class StompProtocolHandler extends Proto
   var protocol_filters = List[ProtocolFilter]()
 
   var destination_parser = Stomp.destination_parser
-  var temp_destination_map = HashMap[DestinationDTO, DestinationDTO]()
+  var temp_destination_map = HashMap[SimpleAddress, SimpleAddress]()
 
   var codec:StompCodec = _
 
   def session_id = security_context.session_id
 
-  implicit def toDestinationDTO(value:AsciiBuffer):Array[DestinationDTO] = {
+  def decode_addresses(value:AsciiBuffer):Array[SimpleAddress] = {
     val rc = destination_parser.decode_multi_destination(value.toString)
     if( rc==null ) {
       throw new ProtocolException("Invalid stomp destination name: "+value);
     }
     rc.map { dest =>
-      if( dest.temp() ) {
+      if( dest.domain.startsWith("temp-") ) {
         temp_destination_map.getOrElseUpdate(dest, {
-          import scala.collection.JavaConversions._
-          val real_path= ("temp" :: broker.id :: session_id.get :: dest.path.toList).toArray
-          dest match {
-            case dest:QueueDestinationDTO => new QueueDestinationDTO( real_path ).temp(true)
-            case dest:TopicDestinationDTO => new TopicDestinationDTO( real_path ).temp(true)
-            case _ => throw new ProtocolException("Invalid stomp destination");
-          }
+          val parts = LiteralPart("temp") :: LiteralPart(broker.id) :: LiteralPart(session_id.get) :: dest.path.parts
+          SimpleAddress(dest.domain.stripPrefix("temp-"), Path(parts))
         })
       } else {
         dest
@@ -713,7 +708,7 @@ class StompProtocolHandler extends Proto
       producerRoutes.clear
       consumers.foreach {
         case (_,consumer)=>
-          host.router.unbind(consumer.destination, consumer, false , security_context)
+          host.router.unbind(consumer.addresses, consumer, false , security_context)
       }
       consumers = Map()
       security_context.logout( e => {
@@ -920,7 +915,7 @@ class StompProtocolHandler extends Proto
           async_die(headers, "")
         } else {
           this.host=host
-          security_context.session_id = Some("%s-%x".format(destination_parser.sanitize_destination_part(this.host.config.id), this.host.session_counter.incrementAndGet))
+          security_context.session_id = Some("%s-%x".format(this.host.config.id, this.host.session_counter.incrementAndGet))
           connection_log = host.connection_log
           if( host.authenticator!=null &&  host.authorizer!=null ) {
             suspend_read("authenticating and authorizing connect")
@@ -984,8 +979,8 @@ class StompProtocolHandler extends Proto
 
   def perform_send(frame:StompFrame, uow:StoreUOW=null): Unit = {
 
-    val destination: Array[DestinationDTO] = get(frame.headers, DESTINATION).get
-    val key = destination.toList
+    val addresses = decode_addresses(get(frame.headers, DESTINATION).get)
+    val key = addresses.toList
     producerRoutes.get(key) match {
       case null =>
         // create the producer route...
@@ -1003,7 +998,7 @@ class StompProtocolHandler extends Proto
         // don't process frames until producer is connected...
         connection.transport.suspendRead
         host.dispatch_queue {
-          val rc = host.router.connect(destination, route, security_context)
+          val rc = host.router.connect(addresses, route, security_context)
           dispatchQueue {
             rc match {
               case Some(failure) =>
@@ -1012,7 +1007,7 @@ class StompProtocolHandler extends Proto
                 if (!connection.stopped) {
                   resume_read
                   producerRoutes.put(key, route)
-                  send_via_route(destination, route, frame, uow)
+                  send_via_route(addresses, route, frame, uow)
                 }
             }
           }
@@ -1020,56 +1015,57 @@ class StompProtocolHandler extends Proto
 
       case route =>
         // we can re-use the existing producer route
-        send_via_route(destination, route, frame, uow)
+        send_via_route(addresses, route, frame, uow)
 
     }
   }
 
   var message_id_counter = 0;
 
-  def encode_destination(value: Array[DestinationDTO]): String = {
-    if (value == null) {
-      null
-    } else {
-      val rc = new StringBuilder
-      value.foreach { dest =>
-        if (rc.length != 0 ) {
-          assert( destination_parser.destination_separator!=null )
-          rc.append(destination_parser.destination_separator)
-        }
-        import collection.JavaConversions._
-        dest match {
-          case d:QueueDestinationDTO =>
-            rc.append(destination_parser.queue_prefix)
-            rc.append(destination_parser.encode_path_iter(dest.path.toIterable, false))
-          case d:DurableSubscriptionDestinationDTO =>
-            rc.append(destination_parser.dsub_prefix)
-            rc.append(destination_parser.unsanitize_destination_part(d.subscription_id))
-          case d:TopicDestinationDTO =>
-            rc.append(destination_parser.topic_prefix)
-            rc.append(destination_parser.encode_path_iter(dest.path.toIterable, false))
-          case _ =>
-            throw new Exception("Uknown destination type: "+dest.getClass);
-        }
-      }
-      rc.toString
-    }
+  def encode_address(value: Array[_ <: DestinationAddress]): String = {
+    destination_parser.encode_destination(value)
+//    if (value == null) {
+//      null
+//    } else {
+//      val rc = new StringBuilder
+//      value.foreach { dest =>
+//        if (rc.length != 0 ) {
+//          assert( destination_parser.destination_separator!=null )
+//          rc.append(destination_parser.destination_separator)
+//        }
+//        import collection.JavaConversions._
+//        dest match {
+//          case d:QueueDestinationDTO =>
+//            rc.append(destination_parser.queue_prefix)
+//            rc.append(destination_parser.encode_path_iter(dest.path.toIterable, false))
+//          case d:DurableSubscriptionDestinationDTO =>
+//            rc.append(destination_parser.dsub_prefix)
+//            rc.append(destination_parser.unsanitize_destination_part(d.subscription_id))
+//          case d:TopicDestinationDTO =>
+//            rc.append(destination_parser.topic_prefix)
+//            rc.append(destination_parser.encode_path_iter(dest.path.toIterable, false))
+//          case _ =>
+//            throw new Exception("Uknown destination type: "+dest.getClass);
+//        }
+//      }
+//      rc.toString
+//    }
   }
 
-  def updated_headers(destination: Array[DestinationDTO], headers:HeaderMap) = {
+  def updated_headers(addresses: Array[SimpleAddress], headers:HeaderMap) = {
     var rc:HeaderMap=Nil
 
     // Do we need to re-write the destination names?
-    if( destination.find(_.temp()).isDefined ) {
-      rc ::= (DESTINATION -> encode_header(encode_destination(destination)))
+    if( addresses.find(_.id.startsWith("temp.")).isDefined ) {
+      rc ::= (DESTINATION -> encode_header(encode_address(addresses)))
     }
     get(headers, REPLY_TO).foreach { value=>
       // we may need to translate local temp destination names to broker destination names
       if( value.indexOf(TEMP_QUEUE)>=0 || value.indexOf(TEMP_TOPIC)>=0 ) {
         try {
-          val dests: Array[DestinationDTO] = value
-          if (dests.find(_.temp()).isDefined) {
-            rc ::= (REPLY_TO -> encode_header(encode_destination(dests)))
+          val dests = decode_addresses(value)
+          if (dests.find(_.id.startsWith("temp.")).isDefined) {
+            rc ::= (REPLY_TO -> encode_header(encode_address(dests)))
           }
         } catch {
           case _=> // the translation is a best effort thing.
@@ -1113,7 +1109,7 @@ class StompProtocolHandler extends Proto
     rc
   }
 
-  def send_via_route(destination: Array[DestinationDTO], route:DeliveryProducerRoute, frame:StompFrame, uow:StoreUOW) = {
+  def send_via_route(addresses: Array[SimpleAddress], route:DeliveryProducerRoute, frame:StompFrame, uow:StoreUOW) = {
     var storeBatch:StoreUOW=null
     // User might be asking for ack that we have processed the message..
     val receipt = frame.header(RECEIPT_REQUESTED)
@@ -1121,7 +1117,7 @@ class StompProtocolHandler extends Proto
     if( !route.targets.isEmpty ) {
 
       // We may need to add some headers..
-      var message = updated_headers(destination, frame.headers) match {
+      var message = updated_headers(addresses, frame.headers) match {
         case Nil=>
           StompFrameMessage(StompFrame(MESSAGE, frame.headers, frame.content))
         case updated_headers =>
@@ -1161,7 +1157,7 @@ class StompProtocolHandler extends Proto
 
   def on_stomp_subscribe(headers:HeaderMap):Unit = {
     val dest = get(headers, DESTINATION).getOrElse(die("destination not set."))
-    var destination:Array[DestinationDTO] = dest
+    var addresses:Array[_ <: BindAddress] = decode_addresses(dest)
 
     val subscription_id = get(headers, ID)
     var id:AsciiBuffer = subscription_id.getOrElse {
@@ -1184,11 +1180,10 @@ class StompProtocolHandler extends Proto
     val from_seq_opt = get(headers, FROM_SEQ)
     
     
-    def is_multi_destination = if( destination.length > 1 ) {
+    def is_multi_destination = if( addresses.length > 1 ) {
       true
     } else {
-      val path = destination_parser.decode_path(destination(0).path)
-      PathParser.containsWildCards(path)
+      PathParser.containsWildCards(addresses(0).path)
     }
     if( from_seq_opt.isDefined && is_multi_destination ) {
       die("The from-seq header is only supported when you subscribe to one destination");
@@ -1227,30 +1222,26 @@ class StompProtocolHandler extends Proto
     }
 
     if( persistent ) {
-      
-      val dsubs = ListBuffer[DurableSubscriptionDestinationDTO]()
-      val topics = ListBuffer[TopicDestinationDTO]()
-      destination.foreach { _ match {
-        case x:DurableSubscriptionDestinationDTO=> dsubs += x
-        case x:TopicDestinationDTO=> topics += x
-        case _ => die("A persistent subscription can only be used on a topic destination")
-      } }
-
-      if( !topics.isEmpty ) {
-        val dsub = new DurableSubscriptionDestinationDTO(destination_parser.sanitize_destination_part(decode_header(id)))
-        dsub.selector = if (selector == null) null else selector._1
-        topics.foreach( dsub.topics.add(_) )
-        dsubs += dsub
+      val dsubs = ListBuffer[BindAddress]()
+      val topics = ListBuffer[BindAddress]()
+      addresses.foreach { address =>
+        address.domain match {
+          case "dsub" => dsubs += address
+          case "topic" => topics += address
+          case _ => die("A persistent subscription can only be used on a topic destination")
+        }
       }
-      destination = dsubs.toArray
+      val s = if (selector == null) null else selector._1
+      dsubs += SubscriptionAddress(destination_parser.decode_path(decode_header(id)), s, topics.toArray)
+      addresses = dsubs.toArray
     }
 
     val from_seq = from_seq_opt.map(_.toString.toLong).getOrElse(0L)
-    val consumer = new StompConsumer(subscription_id, destination, ack_mode, selector, browser, exclusive, credit_window, include_seq, from_seq, browser_end);
+    val consumer = new StompConsumer(subscription_id, addresses, ack_mode, selector, browser, exclusive, credit_window, include_seq, from_seq, browser_end);
     consumers += (id -> consumer)
 
     host.dispatch_queue {
-      val rc = host.router.bind(destination, consumer, security_context)
+      val rc = host.router.bind(addresses, consumer, security_context)
       consumer.release
       dispatchQueue {
         rc match {
@@ -1290,7 +1281,7 @@ class StompProtocolHandler extends Proto
         // consumer gets disposed after all producer stop sending to it...
         consumer.setDisposer(^{ send_receipt(headers) })
         consumers -= id
-        host.router.unbind(consumer.destination, consumer, persistent, security_context)
+        host.router.unbind(consumer.addresses, consumer, persistent, security_context)
     }
   }
 

Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/Part.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/Part.scala?rev=1240510&r1=1240509&r2=1240510&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/Part.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/Part.scala Sat Feb  4 14:42:52 2012
@@ -17,6 +17,7 @@
 package org.apache.activemq.apollo.util.path
 
 import java.util.regex.Pattern
+import java.lang.String
 
 /**
   * Holds the delimiters used to parse paths.
@@ -37,11 +38,12 @@ object RootPart extends Part {
 object AnyChildPart extends Part
 object AnyDescendantPart extends Part
 
-case class RegexChildPart(regex:Pattern, original:String) extends Part
+case class RegexChildPart(regex:Pattern) extends Part
 
 case class LiteralPart(value: String) extends Part {
   override def matches(p: Part) = p match {
     case LiteralPart(v) => v == value
     case _ => true
   }
+  override def toString = value
 }

Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMapNode.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMapNode.scala?rev=1240510&r1=1240509&r2=1240510&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMapNode.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMapNode.scala Sat Feb  4 14:42:52 2012
@@ -144,7 +144,7 @@ class PathMapNode[Value](val parent: Pat
           node.appendMatchingWildcards(answer, path, i)
           node = new AnyChildPathNode[Value](node)
           i += 1;
-        case RegexChildPart(r, _) =>
+        case RegexChildPart(r) =>
           node.appendMatchingWildcards(answer, path, i)
           node = new RegexChildPathNode[Value](node, r)
           i += 1;
@@ -208,7 +208,7 @@ class PathMapNode[Value](val parent: Pat
           node.appendMatchingWildcards(answer, path, i)
           i += 1
           node = new AnyChildPathNode[Value](node)
-        case RegexChildPart(r, _) =>
+        case RegexChildPart(r) =>
           node.appendMatchingWildcards(answer, path, i)
           i += 1
           node = new RegexChildPathNode[Value](node, r)

Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathParser.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathParser.scala?rev=1240510&r1=1240509&r2=1240510&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathParser.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathParser.scala Sat Feb  4 14:42:52 2012
@@ -22,6 +22,7 @@ import collection.JavaConversions._
 import org.apache.activemq.apollo.util.path.PathParser.PartFilter
 import collection.mutable.ListBuffer
 import org.fusesource.hawtbuf.{Buffer, DataByteArrayOutputStream, AsciiBuffer}
+import java.lang.String
 
 /**
   * Holds the delimiters used to parse paths.
@@ -127,28 +128,24 @@ class PathParser {
     this
   }
 
-  def sanitize_destination_part(value:String, wildcards:Boolean=false) = {
-    val rc = new StringBuffer(value.length())
-    var pos = new Buffer(value.getBytes("UTF-8"))
-    while( pos.length > 0 ) {
-      val c = pos.get(0).toChar
-      val cs = c.toString
-      if((wildcards && (
-              cs == any_descendant_wildcard ||
-              cs == any_child_wildcard ||
-              cs == regex_wildcard_start ||
-              cs == regex_wildcard_end
-          ))|| part_pattern.matcher(cs).matches() ) {
+  def url_encode(value:String, allowed:Pattern) = {
+    // UTF-8 Encode..
+    var ascii = new Buffer(value.getBytes("UTF-8")).ascii().toString
+    val matcher = allowed.matcher(ascii);
+    var pos = 0;
+    val rc = new StringBuffer(ascii.length())
+    while( pos < ascii.length() ) {
+      val c = ascii.charAt(pos)
+      if( matcher.find(pos) ) {
         rc.append(c)
       } else {
-        rc.append("%%%02x".format(pos.get(0)))
+        rc.append("%%%02x".format(c))
       }
-      pos.moveHead(1)
     }
     rc.toString
   }
 
-  def unsanitize_destination_part(value:String):String = {
+  def url_decode(value:String):String = {
     val rc = new DataByteArrayOutputStream
     var pos = value
     while( pos.length() > 0 ) {
@@ -163,29 +160,23 @@ class PathParser {
     }
     rc.toBuffer.utf8().toString
   }
-  
-  
-  def decode_path(subject: java.util.Collection[String]): Path = decode_path(subject.toIterable)
 
-  def decode_path(subject: Iterable[String]): Path = {
-    return new Path(subject.toList.map(decode_part(_)))
-  }
 
-  def parts(subject: String, sanitize:Boolean=false): Array[String] = {
+  def parts(subject: String): Array[String] = {
     val rc = if(path_separator!=null) {
       subject.split(Pattern.quote(path_separator))
     } else {
       Array(subject)
     }
-    if (sanitize) {
-      rc.map(sanitize_destination_part(_, true))
-    } else {
-      rc
-    }
+    rc
+  }
+
+  def decode_path(parts: Iterable[String]): Path = {
+    return new Path(parts.toList.map(decode_part(_)))
   }
 
-  def decode_path(subject: String, sanitize:Boolean=false): Path = {
-    return decode_path(parts(subject, sanitize))
+  def decode_path(subject: String): Path = {
+    return decode_path(parts(subject))
   }
 
   def regex_map[T](text:String, pattern: Pattern)(func: Either[CharSequence, Matcher] => T) = {
@@ -201,42 +192,57 @@ class PathParser {
     rc.toList
   }
 
+  lazy val wildcard_part_pattern = if (regex_wildcard_start!=null && regex_wildcard_end!=null) {
+    var p = Pattern.quote(regex_wildcard_start)+"(.*?)"+Pattern.quote(regex_wildcard_end)
+    if(any_child_wildcard!=null) {
+      p += "|" + Pattern.quote(any_child_wildcard)
+    }
+    p.r.pattern
+  } else {
+    null
+  }
+  
   private def decode_part(value: String): Part = {
     if (any_child_wildcard!=null && value == any_child_wildcard) {
-      return AnyChildPart
+      AnyChildPart
     } else if (any_descendant_wildcard!=null && value == any_descendant_wildcard) {
-      return AnyDescendantPart
-    } else {
-      if (part_pattern == null || part_pattern.matcher(value.toString).matches) {
-        return LiteralPart(value)
-      } else {
-
-        val pattern = (
-            (Pattern.quote(regex_wildcard_start)+"(.*?)"+Pattern.quote(regex_wildcard_end)) +
-            "|" +
-            Pattern.quote(any_child_wildcard)
-          ).r.pattern
-
-        val regex = regex_map(value, pattern) { _ match {
-          case Left(x) =>
-            if (x=="") {
-              ""
-            } else {
-              if( part_pattern.matcher(x).matches ) {
-                Pattern.quote(x.toString)
-              } else {
+      AnyDescendantPart
+    } else if (wildcard_part_pattern!=null && wildcard_part_pattern.matcher(value).matches() ) {      
+      val regex = regex_map(value, wildcard_part_pattern) { _ match {
+        // It's a literal part.
+        case Left(x) =>
+          if (x=="") {
+            ""
+          } else {
+            if( part_pattern!=null ) {
+              if (!part_pattern.matcher(x).matches) {
                 throw new PathParser.PathException(String.format("Invalid destination: '%s', it does not match regex: %s", value, part_pattern))
+              } else {
+                Pattern.quote(url_decode(x.toString))
               }
-            }
-          case Right(wildcard) =>
-            if ( wildcard.group() == any_child_wildcard ) {
-              ".*?"
             } else {
-              wildcard.group(1)
+              Pattern.quote(x.toString)
             }
-        } }.mkString("")
-
-        return RegexChildPart(("^"+regex+"$").r.pattern, value)
+          }
+        // It was a regex part..
+        case Right(wildcard) =>
+          if ( wildcard.group() == any_child_wildcard ) {
+            ".*?"
+          } else {
+            wildcard.group(1)
+          }
+      } }.mkString("")
+      var regex_string: String = "^" + regex + "$"
+      RegexChildPart(regex_string.r.pattern)      
+    } else {
+      if (part_pattern != null ) {
+        if ( !part_pattern.matcher(value.toString).matches) {
+          throw new PathParser.PathException(String.format("Invalid destination: '%s', it does not match regex: %s", value, part_pattern))
+        } else {
+          LiteralPart(url_decode(value))
+        }
+      } else {
+        LiteralPart(value)
       }
     }
   }
@@ -245,34 +251,30 @@ class PathParser {
     * Converts the path back to the string representation.
     * @return
     */
-  def encode_path(path: Path, unsanitize_destinations:Boolean=false): String = encode_path_iter(path_parts(path))
+  def encode_path(path: Path): String = encode_path_iter(path_parts(path))
 
-  def path_parts(path: Path, unsanitize_destinations:Boolean=false):Array[String] = {
+  def path_parts(path: Path):Array[String] = {
     (path.parts.map( _ match {
       case RootPart => ""
       case AnyChildPart => any_child_wildcard
       case AnyDescendantPart => any_descendant_wildcard
-      case RegexChildPart(_, original) => original
+      case RegexChildPart(regex) => regex_wildcard_start + regex.pattern() + regex_wildcard_end
       case LiteralPart(value) =>
-        if(unsanitize_destinations) {
-          unsanitize_destination_part(value)
+        if( part_pattern !=null ) {
+          url_encode(value, part_pattern)
         } else {
           value
         }
     })).toArray
   }
 
-  def encode_path_iter(parts: Iterable[String], unsanitize_destinations:Boolean=false): String = {
-    var buffer: StringBuffer = new StringBuffer
+  def encode_path_iter(parts: Iterable[String]): String = {
+    val buffer: StringBuffer = new StringBuffer
     for (p <- parts) {
       if ( buffer.length() != 0) {
         buffer.append(path_separator)
       }
-      if(unsanitize_destinations) {
-        buffer.append(unsanitize_destination_part(p))
-      } else {
-        buffer.append(p)
-      }
+      buffer.append(p)
     }
     return buffer.toString
   }
@@ -285,7 +287,7 @@ class PathParser {
           last = new LitteralPathFilter(last, p)
         case AnyChildPart =>
           last = new PathParser.AnyChildPathFilter(last)
-        case RegexChildPart(r, _) =>
+        case RegexChildPart(r) =>
           last = new PathParser.RegexChildPathFilter(r, last)
         case AnyDescendantPart =>
           last = new PathParser.AnyDecendentPathFilter(last)