You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/02/04 15:42:53 UTC
svn commit: r1240510 [2/3] - in /activemq/activemq-apollo/trunk: ./
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/
apollo-dto/src/main/java/org/apache/activemq/apollo/d...
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1240510&r1=1240509&r2=1240510&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala Sat Feb 4 14:42:52 2012
@@ -33,7 +33,7 @@ import security.SecuredResource
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class Topic(val router:LocalRouter, val destination_dto:TopicDestinationDTO, var config_updater: ()=>TopicDTO, val id:String, val path:Path) extends DomainDestination with SecuredResource {
+class Topic(val router:LocalRouter, val address:DestinationAddress, var config_updater: ()=>TopicDTO) extends DomainDestination with SecuredResource {
val topic_metrics = new DestMetricsDTO
@@ -138,7 +138,7 @@ class Topic(val router:LocalRouter, val
val copy = value.copy();
copy.uow = value.uow
copy.ack = value.ack
- copy.sender = destination_dto
+ copy.sender = address
downstream.offer(copy)
}
}
@@ -170,7 +170,7 @@ class Topic(val router:LocalRouter, val
import OptionSupport._
- override def toString = destination_dto.toString
+ override def toString = address.toString
def virtual_host: VirtualHost = router.virtual_host
def now = virtual_host.broker.now
@@ -307,7 +307,7 @@ class Topic(val router:LocalRouter, val
if( auto_delete_after!=0 ) {
dispatch_queue.after(auto_delete_after, TimeUnit.SECONDS) {
if( previously_idle_at == idled_at ) {
- router.local_topic_domain.remove_destination(path, this)
+ router.local_topic_domain.remove_destination(address.path, this)
DestinationMetricsSupport.add_destination_metrics(router.virtual_host.dead_topic_metrics, topic_metrics)
}
}
@@ -318,19 +318,18 @@ class Topic(val router:LocalRouter, val
}
}
- def bind (destination: DestinationDTO, consumer:DeliveryConsumer) = {
+ def bind(address: BindAddress, consumer:DeliveryConsumer) = {
- val target = destination match {
- case null=>
+ val target = address.domain match {
+ case "queue"=>
// this is the mirrored queue case..
consumer
- case destination:TopicDestinationDTO=>
- var target = consumer
+ case "topic"=>
slow_consumer_policy match {
case "queue" =>
// create a temp queue so that it can spool
- val queue = router._create_queue(new TempQueueBinding(consumer, path, destination_dto))
+ val queue = router._create_queue(new TempQueueBinding(consumer, address))
queue.dispatch_queue.setTargetQueue(consumer.dispatch_queue)
queue.bind(List(consumer))
consumer_queues += consumer->queue
@@ -418,7 +417,7 @@ class Topic(val router:LocalRouter, val
check_idle
}
- def bind_durable_subscription(destination: DurableSubscriptionDestinationDTO, queue:Queue) = {
+ def bind_durable_subscription(address: SubscriptionAddress, queue:Queue) = {
if( !durable_subscriptions.contains(queue) ) {
durable_subscriptions += queue
val list = List(queue)
@@ -427,14 +426,14 @@ class Topic(val router:LocalRouter, val
})
consumer_queues.foreach{case (consumer, q)=>
if( q==queue ) {
- bind(destination, consumer)
+ bind(address, consumer)
}
}
}
check_idle
}
- def unbind_durable_subscription(destination: DurableSubscriptionDestinationDTO, queue:Queue) = {
+ def unbind_durable_subscription(queue:Queue) = {
if( durable_subscriptions.contains(queue) ) {
durable_subscriptions -= queue
val list = List(queue)
@@ -450,7 +449,7 @@ class Topic(val router:LocalRouter, val
check_idle
}
- def connect (destination:DestinationDTO, producer:BindableDeliveryProducer) = {
+ def connect (address:ConnectAddress, producer:BindableDeliveryProducer) = {
val link = new LinkDTO()
producer.connection match {
case Some(connection) =>
Modified: activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/DiskBenchmark.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/DiskBenchmark.scala?rev=1240510&r1=1240509&r2=1240510&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/DiskBenchmark.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/DiskBenchmark.scala Sat Feb 4 14:42:52 2012
@@ -220,7 +220,7 @@ class DiskBenchmark extends Action {
out.println(report)
}
} catch {
- case x:Helper.Failure=> error(x.getMessage)
+ case x:Helper.Failure=> sys.error(x.getMessage)
case e: Throwable =>
if (verbose) {
out.println("ERROR:")
Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java?rev=1240510&r1=1240509&r2=1240510&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java Sat Feb 4 14:42:52 2012
@@ -37,41 +37,20 @@ import java.util.List;
@XmlAccessorType(XmlAccessType.FIELD)
abstract public class DestinationDTO {
- @XmlElement(name = "path")
- public List<String> path = new ArrayList<String>();
-
- public boolean temp;
+ @XmlAttribute(name = "name")
+ public String name;
public DestinationDTO() {
}
-
- public DestinationDTO(List<String> path) {
- this.path = path;
+ public DestinationDTO(String name) {
+ this.name = name;
}
- public DestinationDTO(String path[]) {
- this(Arrays.asList(path));
- }
-
- public String name(String separator) {
- StringBuilder sb = new StringBuilder();
- for (String p : path) {
- if (sb.length() != 0) {
- sb.append(separator);
- }
- sb.append(p);
- }
- return sb.toString();
- }
-
- public boolean temp() {
- return temp;
- }
-
- public DestinationDTO temp(boolean temp) {
- this.temp = temp;
- return this;
- }
+ /**
+ * @deprecated
+ */
+ @XmlElement(name = "path")
+ public List<String> path = new ArrayList<String>();
@Override
public boolean equals(Object o) {
@@ -80,23 +59,21 @@ abstract public class DestinationDTO {
DestinationDTO that = (DestinationDTO) o;
- if (temp != that.temp) return false;
- if (!path.equals(that.path)) return false;
+ if (!name.equals(that.name)) return false;
return true;
}
@Override
public int hashCode() {
- int result = path.hashCode();
- result = 31 * result + (temp ? 1 : 0);
+ int result = name.hashCode();
return result;
}
@Override
public String toString() {
return "DestinationDTO{" +
- "path=" + path +
+ "name=" + name +
'}';
}
Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDestinationDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDestinationDTO.java?rev=1240510&r1=1240509&r2=1240510&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDestinationDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDestinationDTO.java Sat Feb 4 14:42:52 2012
@@ -44,12 +44,12 @@ public class DurableSubscriptionDestinat
}
public DurableSubscriptionDestinationDTO(String subscription_id) {
- super(new String[]{subscription_id});
+ super(subscription_id);
}
@JsonIgnore
public String subscription_id() {
- return path.get(0);
+ return name;
}
/**
@@ -91,10 +91,6 @@ public class DurableSubscriptionDestinat
@Override
public String toString() {
- return "DurableSubscriptionDestinationDTO{" +
- "id='" + subscription_id() + '\'' +
- ", selector='" + selector + '\'' +
- ", topics=" + topics +
- '}';
+ return "dsub:"+name;
}
}
\ No newline at end of file
Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDestinationDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDestinationDTO.java?rev=1240510&r1=1240509&r2=1240510&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDestinationDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDestinationDTO.java Sat Feb 4 14:42:52 2012
@@ -32,10 +32,7 @@ public class QueueDestinationDTO extends
public QueueDestinationDTO() {
}
- public QueueDestinationDTO(List<String> name) {
- super(name);
- }
- public QueueDestinationDTO(String[] name) {
+ public QueueDestinationDTO(String name) {
super(name);
}
@@ -58,9 +55,7 @@ public class QueueDestinationDTO extends
@Override
public String toString() {
- return "QueueDestinationDTO{" +
- "path=" + path +
- '}';
+ return "queue:"+name;
}
}
\ No newline at end of file
Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicDestinationDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicDestinationDTO.java?rev=1240510&r1=1240509&r2=1240510&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicDestinationDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicDestinationDTO.java Sat Feb 4 14:42:52 2012
@@ -32,17 +32,12 @@ public class TopicDestinationDTO extends
public TopicDestinationDTO() {
}
- public TopicDestinationDTO(List<String> name) {
- super(name);
- }
- public TopicDestinationDTO(String name[]) {
+ public TopicDestinationDTO(String name) {
super(name);
}
@Override
public String toString() {
- return "TopicDestinationDTO{" +
- "path=" + path +
- '}';
+ return "topic:"+name;
}
}
\ No newline at end of file
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala?rev=1240510&r1=1240509&r2=1240510&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala Sat Feb 4 14:42:52 2012
@@ -72,14 +72,14 @@ class DestinationAdvisoryRouterListener(
override def dispatch_queue = router.virtual_host.dispatch_queue
}
- var producerRoutes = new LRUCache[List[DestinationDTO], ProducerRoute](10) {
- override def onCacheEviction(eldest: Entry[List[DestinationDTO], ProducerRoute]) = {
+ var producerRoutes = new LRUCache[List[ConnectAddress], ProducerRoute](10) {
+ override def onCacheEviction(eldest: Entry[List[ConnectAddress], ProducerRoute]) = {
router.disconnect(eldest.getKey.toArray, eldest.getValue)
}
}
def on_create(dest: DomainDestination, security: SecurityContext) = {
- val ow_destination = to_activemq_destination(Array(dest.destination_dto))
+ val ow_destination = to_activemq_destination(Array(dest.address))
if (ow_destination!=null && !AdvisorySupport.isAdvisoryTopic(ow_destination)) {
destination_advisories.getOrElseUpdate(ow_destination, {
var info = new DestinationInfo(null, DestinationInfo.ADD_OPERATION_TYPE, ow_destination)
@@ -92,7 +92,7 @@ class DestinationAdvisoryRouterListener(
}
def on_destroy(dest: DomainDestination, security: SecurityContext) = {
- val destination = to_activemq_destination(Array(dest.destination_dto))
+ val destination = to_activemq_destination(Array(dest.address))
if (destination!=null && !AdvisorySupport.isAdvisoryTopic(destination)) {
for (info <- destination_advisories.remove(destination)) {
var info = new DestinationInfo(null, DestinationInfo.REMOVE_OPERATION_TYPE, destination)
@@ -103,7 +103,7 @@ class DestinationAdvisoryRouterListener(
}
def on_bind(dest: DomainDestination, consumer: DeliveryConsumer, security: SecurityContext) = {
- val destination = to_activemq_destination(Array(dest.destination_dto))
+ val destination = to_activemq_destination(Array(dest.address))
if (destination!=null && AdvisorySupport.isDestinationAdvisoryTopic(destination) && !destination_advisories.isEmpty) {
// replay the destination advisories..
val producer = new ProducerRoute {
@@ -175,7 +175,7 @@ class DestinationAdvisoryRouterListener(
def send(delivery:Delivery): Unit = {
val message = delivery.message.asInstanceOf[OpenwireMessage].message
- val dest: Array[DestinationDTO] = to_destination_dto(message.getDestination,null)
+ val dest = to_destination_dto(message.getDestination,null)
val key = dest.toList
val route = producerRoutes.get(key) match {
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationConverter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationConverter.scala?rev=1240510&r1=1240509&r2=1240510&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationConverter.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationConverter.scala Sat Feb 4 14:42:52 2012
@@ -17,9 +17,10 @@
package org.apache.activemq.apollo.openwire
import org.apache.activemq.apollo.dto.{TopicDestinationDTO, QueueDestinationDTO, DestinationDTO}
-import org.apache.activemq.apollo.broker.DestinationParser
import org.apache.activemq.apollo.openwire.command._
import java.lang.String
+import org.apache.activemq.apollo.broker.{DestinationAddress, SimpleAddress, DestinationParser}
+import org.apache.activemq.apollo.util.path.{Path, LiteralPart}
/**
* <p>
@@ -34,67 +35,43 @@ object DestinationConverter {
OPENWIRE_PARSER.topic_prefix = ActiveMQDestination.TOPIC_QUALIFIED_PREFIX
OPENWIRE_PARSER.temp_queue_prefix = ActiveMQDestination.TEMP_QUEUE_QUALIFED_PREFIX
OPENWIRE_PARSER.temp_topic_prefix = ActiveMQDestination.TEMP_TOPIC_QUALIFED_PREFIX
- OPENWIRE_PARSER.dsub_prefix = null
OPENWIRE_PARSER.path_separator = "."
OPENWIRE_PARSER.any_child_wildcard = "*"
OPENWIRE_PARSER.any_descendant_wildcard = ">"
- OPENWIRE_PARSER.sanitize_destinations = true
+ OPENWIRE_PARSER.part_pattern = null
+ OPENWIRE_PARSER.regex_wildcard_start = null
+ OPENWIRE_PARSER.regex_wildcard_end = null
- def to_destination_dto(dest: ActiveMQDestination, handler:OpenwireProtocolHandler): Array[DestinationDTO] = {
+ def to_destination_dto(dest: ActiveMQDestination, handler:OpenwireProtocolHandler): Array[SimpleAddress] = {
def fallback(value:String) = {
OPENWIRE_PARSER.decode_single_destination(dest.getQualifiedPrefix+value, null)
}
- val rc = OPENWIRE_PARSER.decode_multi_destination(dest.getPhysicalName.toString, fallback)
- rc.foreach { dest =>
- if( dest.temp() ) {
- import collection.JavaConversions._
+ OPENWIRE_PARSER.decode_multi_destination(dest.getPhysicalName.toString, fallback).map { dest =>
+ if( dest.domain.startsWith("temp-") ) {
// Put it back together...
- val name = dest.path.map(OPENWIRE_PARSER.unsanitize_destination_part(_)).mkString(OPENWIRE_PARSER.path_separator)
-
+ val name = OPENWIRE_PARSER.encode_path(dest.path)
val (connectionid, rest) = name.splitAt(name.lastIndexOf(':'))
- val real_path = ("temp" :: handler.broker.id :: OPENWIRE_PARSER.sanitize_destination_part(connectionid) :: OPENWIRE_PARSER.sanitize_destination_part(rest.substring(1)) :: Nil).toArray
- dest.path = java.util.Arrays.asList(real_path:_*)
+ val real_path = "temp" :: handler.broker.id :: connectionid :: rest.substring(1) :: Nil
+ SimpleAddress(dest.domain.stripPrefix("temp-"), OPENWIRE_PARSER.decode_path(real_path))
+ } else {
+ dest
}
}
- rc
}
- def to_activemq_destination(dests:Array[DestinationDTO]):ActiveMQDestination = {
- import collection.JavaConversions._
- var wrapper: (String)=> ActiveMQDestination = null
-
- val rc = OPENWIRE_PARSER.encode_destination(dests.flatMap{ dest=>
- val temp = dest.path.headOption == Some("temp")
- dest match {
- case dest:QueueDestinationDTO =>
- if( temp ) {
- if(wrapper==null)
- wrapper = (x)=>new ActiveMQTempQueue(x)
- var path: Array[String] = Array(dest.path.toList.drop(2).map(OPENWIRE_PARSER.unsanitize_destination_part(_)).mkString(":"))
- Some(new QueueDestinationDTO(path).temp(true))
- } else {
- if(wrapper==null)
- wrapper = (x)=>new ActiveMQQueue(x)
- Some(dest)
- }
- case dest:TopicDestinationDTO =>
- if( temp ) {
- if(wrapper==null)
- wrapper = (x)=>new ActiveMQTempTopic(x)
- var path: Array[String] = Array(dest.path.toList.drop(2).map(OPENWIRE_PARSER.unsanitize_destination_part(_)).mkString(":"))
- Some(new TopicDestinationDTO(path).temp(true))
+ def to_activemq_destination(addresses:Array[_ <: DestinationAddress]):ActiveMQDestination = {
+ ActiveMQDestination.createDestination(OPENWIRE_PARSER.encode_destination(addresses.map{ address=>
+ address.path.parts match {
+ // Remap temp destinations that look like openwire temp destinations.
+ case LiteralPart("temp") :: LiteralPart(broker) :: LiteralPart(session) :: LiteralPart(tempid) :: Nil =>
+ if( session.startsWith("ID:") ) {
+ SimpleAddress("temp-"+address.domain, Path(session+":"+tempid))
} else {
- if(wrapper==null)
- wrapper = (x)=>new ActiveMQTopic(x)
- Some(dest)
+ address
}
- case _ => None
+ case _ => address
}
- })
+ }))
- if ( wrapper==null )
- null
- else
- wrapper(rc)
}
}
\ No newline at end of file
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1240510&r1=1240509&r2=1240510&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Sat Feb 4 14:42:52 2012
@@ -36,6 +36,7 @@ import command._
import org.apache.activemq.apollo.openwire.dto.{OpenwireConnectionStatusDTO,OpenwireDTO}
import org.apache.activemq.apollo.dto.{AcceptingConnectorDTO, TopicDestinationDTO, DurableSubscriptionDestinationDTO, DestinationDTO}
import org.apache.activemq.apollo.broker._
+import path.Path
import protocol._
import security.SecurityContext
import DestinationConverter._
@@ -89,8 +90,8 @@ class OpenwireProtocolHandler extends Pr
def broker = connection.connector.broker
- var producerRoutes = new LRUCache[List[DestinationDTO], DeliveryProducerRoute](10) {
- override def onCacheEviction(eldest: Entry[List[DestinationDTO], DeliveryProducerRoute]) = {
+ var producerRoutes = new LRUCache[List[ConnectAddress], DeliveryProducerRoute](10) {
+ override def onCacheEviction(eldest: Entry[List[ConnectAddress], DeliveryProducerRoute]) = {
host.router.disconnect(eldest.getKey.toArray, eldest.getValue)
}
}
@@ -442,7 +443,7 @@ class OpenwireProtocolHandler extends Pr
security_context.user = Option(info.getUserName).map(_.toString).getOrElse(null)
security_context.password = Option(info.getPassword).map(_.toString).getOrElse(null)
- security_context.session_id = Some(OPENWIRE_PARSER.sanitize_destination_part(info.getConnectionId.toString))
+ security_context.session_id = Some(info.getConnectionId.toString)
if( host.authenticator!=null && host.authorizer!=null ) {
suspend_read("authenticating and authorizing connect")
@@ -802,7 +803,7 @@ class OpenwireProtocolHandler extends Pr
override def toString = "openwire consumer id:"+info.getConsumerId+", remote address: "+security_context.remote_address
var selector_expression:BooleanExpression = _
- var destination:Array[DestinationDTO] = _
+ var addresses:Array[_ <: BindAddress] = _
val consumer_sink = sink_manager.open()
val credit_window_filter = new CreditWindowFilter[Delivery](consumer_sink.map { delivery =>
@@ -842,7 +843,7 @@ class OpenwireProtocolHandler extends Pr
def attach = {
if( info.getDestination == null ) fail("destination was not set")
- destination = to_destination_dto(info.getDestination, OpenwireProtocolHandler.this)
+ addresses = to_destination_dto(info.getDestination, OpenwireProtocolHandler.this)
// if they are temp dests.. attach our owner id so that we don't
// get rejected.
@@ -872,21 +873,16 @@ class OpenwireProtocolHandler extends Pr
subscription_id += parent.parent.info.getClientId + ":"
}
subscription_id += info.getSubscriptionName
-
- val rc = new DurableSubscriptionDestinationDTO(subscription_id)
- rc.selector = Option(info.getSelector).map(_.toString).getOrElse(null)
-
- destination.foreach { _ match {
- case x:TopicDestinationDTO=>
- rc.topics.add(new TopicDestinationDTO(x.path))
+ val selector = Option(info.getSelector).map(_.toString).getOrElse(null)
+ addresses.foreach { _ match {
+ case SimpleAddress("topic", _) =>
case _ => die("A durable subscription can only be used on a topic destination")
- }
- }
- destination = Array(rc)
+ }}
+ addresses = Array(SubscriptionAddress(Path(subscription_id), selector, addresses))
}
host.dispatch_queue {
- val rc = host.router.bind(destination, this, security_context)
+ val rc = host.router.bind(addresses, this, security_context)
this.release
dispatchQueue {
rc match {
@@ -900,7 +896,7 @@ class OpenwireProtocolHandler extends Pr
}
def dettach = {
- host.router.unbind(destination, this, false , security_context)
+ host.router.unbind(addresses, this, false , security_context)
parent.consumers.remove(info.getConsumerId)
all_consumers.remove(info.getConsumerId)
}
@@ -973,7 +969,7 @@ class OpenwireProtocolHandler extends Pr
})
if( info.getDestination.isTemporary ) {
dispatch_queue {
- val rc = host.router.delete(destination, security_context)
+ val rc = host.router.delete(addresses, security_context)
dispatchQueue {
rc match {
case Some(error) => async_die(error)
@@ -1106,7 +1102,7 @@ class OpenwireProtocolHandler extends Pr
}
if( !found ) {
- trace("%s: ACK failed, invalid message id: %s, dest: %s".format(security_context.remote_address, msgid, destination.mkString(",")))
+ trace("%s: ACK failed, invalid message id: %s, dest: %s".format(security_context.remote_address, msgid, addresses.mkString(",")))
} else {
consumer_acks = not_acked
acked.foreach{case (id, delivery)=>
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQDestination.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQDestination.java?rev=1240510&r1=1240509&r2=1240510&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQDestination.java (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQDestination.java Sat Feb 4 14:42:52 2012
@@ -55,8 +55,7 @@ abstract public class ActiveMQDestinatio
// static helper methods for working with destinations
// -------------------------------------------------------------------------
- public static ActiveMQDestination createDestination(String name, byte defaultType) {
-
+ public static ActiveMQDestination createDestination(String name) {
if (name.startsWith(QUEUE_QUALIFIED_PREFIX)) {
return new ActiveMQQueue(name.substring(QUEUE_QUALIFIED_PREFIX.length()));
} else if (name.startsWith(TOPIC_QUALIFIED_PREFIX)) {
@@ -65,19 +64,8 @@ abstract public class ActiveMQDestinatio
return new ActiveMQTempQueue(name.substring(TEMP_QUEUE_QUALIFED_PREFIX.length()));
} else if (name.startsWith(TEMP_TOPIC_QUALIFED_PREFIX)) {
return new ActiveMQTempTopic(name.substring(TEMP_TOPIC_QUALIFED_PREFIX.length()));
- }
-
- switch (defaultType) {
- case QUEUE_TYPE:
- return new ActiveMQQueue(name);
- case TOPIC_TYPE:
- return new ActiveMQTopic(name);
- case TEMP_QUEUE_TYPE:
- return new ActiveMQTempQueue(name);
- case TEMP_TOPIC_TYPE:
- return new ActiveMQTempTopic(name);
- default:
- throw new IllegalArgumentException("Invalid default destination type: " + defaultType);
+ } else {
+ return null;
}
}
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/TypeConversionSupport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/TypeConversionSupport.java?rev=1240510&r1=1240509&r2=1240510&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/TypeConversionSupport.java (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/TypeConversionSupport.java Sat Feb 4 14:42:52 2012
@@ -134,7 +134,7 @@ public final class TypeConversionSupport
});
CONVERSION_MAP.put(new ConversionKey(String.class, ActiveMQDestination.class), new Converter() {
public Object convert(Object value) {
- return ActiveMQDestination.createDestination((String)value, ActiveMQDestination.QUEUE_TYPE);
+ return ActiveMQDestination.createDestination((String)value);
}
});
}
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1240510&r1=1240509&r2=1240510&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Sat Feb 4 14:42:52 2012
@@ -32,12 +32,12 @@ import org.apache.activemq.apollo.broker
import org.apache.activemq.apollo.util._
import java.util.concurrent.TimeUnit
import java.util.Map.Entry
-import path.PathParser
import java.security.cert.X509Certificate
import collection.mutable.{ListBuffer, HashMap}
import java.io.IOException
import org.apache.activemq.apollo.dto._
import org.fusesource.hawtdispatch.transport.{SecureTransport, HeartBeatMonitor, SslTransport}
+import path.{LiteralPart, Path, PathParser}
case class RichBuffer(self:Buffer) extends Proxy {
@@ -137,7 +137,7 @@ class StompProtocolHandler extends Proto
class StompConsumer (
val subscription_id:Option[AsciiBuffer],
- val destination:Array[DestinationDTO],
+ val addresses:Array[_ <: BindAddress],
ack_mode:AsciiBuffer,
val selector:(String, BooleanExpression),
override val browser:Boolean,
@@ -305,7 +305,7 @@ class StompProtocolHandler extends Proto
}
if( !found ) {
- trace("%s: ACK failed, invalid message id: %s, dest: %s".format(security_context.remote_address, msgid, destination.mkString(",")))
+ trace("%s: ACK failed, invalid message id: %s, dest: %s".format(security_context.remote_address, msgid, addresses.mkString(",")))
} else {
consumer_acks = not_acked
acked.foreach{case (id, delivery)=>
@@ -542,8 +542,8 @@ class StompProtocolHandler extends Proto
var closed = false
var consumers = Map[AsciiBuffer, StompConsumer]()
- var producerRoutes = new LRUCache[List[DestinationDTO], DeliveryProducerRoute](10) {
- override def onCacheEviction(eldest: Entry[List[DestinationDTO], DeliveryProducerRoute]) = {
+ var producerRoutes = new LRUCache[List[ConnectAddress], DeliveryProducerRoute](10) {
+ override def onCacheEviction(eldest: Entry[List[ConnectAddress], DeliveryProducerRoute]) = {
host.router.disconnect(eldest.getKey.toArray, eldest.getValue)
}
}
@@ -565,27 +565,22 @@ class StompProtocolHandler extends Proto
var protocol_filters = List[ProtocolFilter]()
var destination_parser = Stomp.destination_parser
- var temp_destination_map = HashMap[DestinationDTO, DestinationDTO]()
+ var temp_destination_map = HashMap[SimpleAddress, SimpleAddress]()
var codec:StompCodec = _
def session_id = security_context.session_id
- implicit def toDestinationDTO(value:AsciiBuffer):Array[DestinationDTO] = {
+ def decode_addresses(value:AsciiBuffer):Array[SimpleAddress] = {
val rc = destination_parser.decode_multi_destination(value.toString)
if( rc==null ) {
throw new ProtocolException("Invalid stomp destination name: "+value);
}
rc.map { dest =>
- if( dest.temp() ) {
+ if( dest.domain.startsWith("temp-") ) {
temp_destination_map.getOrElseUpdate(dest, {
- import scala.collection.JavaConversions._
- val real_path= ("temp" :: broker.id :: session_id.get :: dest.path.toList).toArray
- dest match {
- case dest:QueueDestinationDTO => new QueueDestinationDTO( real_path ).temp(true)
- case dest:TopicDestinationDTO => new TopicDestinationDTO( real_path ).temp(true)
- case _ => throw new ProtocolException("Invalid stomp destination");
- }
+ val parts = LiteralPart("temp") :: LiteralPart(broker.id) :: LiteralPart(session_id.get) :: dest.path.parts
+ SimpleAddress(dest.domain.stripPrefix("temp-"), Path(parts))
})
} else {
dest
@@ -713,7 +708,7 @@ class StompProtocolHandler extends Proto
producerRoutes.clear
consumers.foreach {
case (_,consumer)=>
- host.router.unbind(consumer.destination, consumer, false , security_context)
+ host.router.unbind(consumer.addresses, consumer, false , security_context)
}
consumers = Map()
security_context.logout( e => {
@@ -920,7 +915,7 @@ class StompProtocolHandler extends Proto
async_die(headers, "")
} else {
this.host=host
- security_context.session_id = Some("%s-%x".format(destination_parser.sanitize_destination_part(this.host.config.id), this.host.session_counter.incrementAndGet))
+ security_context.session_id = Some("%s-%x".format(this.host.config.id, this.host.session_counter.incrementAndGet))
connection_log = host.connection_log
if( host.authenticator!=null && host.authorizer!=null ) {
suspend_read("authenticating and authorizing connect")
@@ -984,8 +979,8 @@ class StompProtocolHandler extends Proto
def perform_send(frame:StompFrame, uow:StoreUOW=null): Unit = {
- val destination: Array[DestinationDTO] = get(frame.headers, DESTINATION).get
- val key = destination.toList
+ val addresses = decode_addresses(get(frame.headers, DESTINATION).get)
+ val key = addresses.toList
producerRoutes.get(key) match {
case null =>
// create the producer route...
@@ -1003,7 +998,7 @@ class StompProtocolHandler extends Proto
// don't process frames until producer is connected...
connection.transport.suspendRead
host.dispatch_queue {
- val rc = host.router.connect(destination, route, security_context)
+ val rc = host.router.connect(addresses, route, security_context)
dispatchQueue {
rc match {
case Some(failure) =>
@@ -1012,7 +1007,7 @@ class StompProtocolHandler extends Proto
if (!connection.stopped) {
resume_read
producerRoutes.put(key, route)
- send_via_route(destination, route, frame, uow)
+ send_via_route(addresses, route, frame, uow)
}
}
}
@@ -1020,56 +1015,57 @@ class StompProtocolHandler extends Proto
case route =>
// we can re-use the existing producer route
- send_via_route(destination, route, frame, uow)
+ send_via_route(addresses, route, frame, uow)
}
}
var message_id_counter = 0;
- def encode_destination(value: Array[DestinationDTO]): String = {
- if (value == null) {
- null
- } else {
- val rc = new StringBuilder
- value.foreach { dest =>
- if (rc.length != 0 ) {
- assert( destination_parser.destination_separator!=null )
- rc.append(destination_parser.destination_separator)
- }
- import collection.JavaConversions._
- dest match {
- case d:QueueDestinationDTO =>
- rc.append(destination_parser.queue_prefix)
- rc.append(destination_parser.encode_path_iter(dest.path.toIterable, false))
- case d:DurableSubscriptionDestinationDTO =>
- rc.append(destination_parser.dsub_prefix)
- rc.append(destination_parser.unsanitize_destination_part(d.subscription_id))
- case d:TopicDestinationDTO =>
- rc.append(destination_parser.topic_prefix)
- rc.append(destination_parser.encode_path_iter(dest.path.toIterable, false))
- case _ =>
- throw new Exception("Uknown destination type: "+dest.getClass);
- }
- }
- rc.toString
- }
+ def encode_address(value: Array[_ <: DestinationAddress]): String = {
+ destination_parser.encode_destination(value)
+// if (value == null) {
+// null
+// } else {
+// val rc = new StringBuilder
+// value.foreach { dest =>
+// if (rc.length != 0 ) {
+// assert( destination_parser.destination_separator!=null )
+// rc.append(destination_parser.destination_separator)
+// }
+// import collection.JavaConversions._
+// dest match {
+// case d:QueueDestinationDTO =>
+// rc.append(destination_parser.queue_prefix)
+// rc.append(destination_parser.encode_path_iter(dest.path.toIterable, false))
+// case d:DurableSubscriptionDestinationDTO =>
+// rc.append(destination_parser.dsub_prefix)
+// rc.append(destination_parser.unsanitize_destination_part(d.subscription_id))
+// case d:TopicDestinationDTO =>
+// rc.append(destination_parser.topic_prefix)
+// rc.append(destination_parser.encode_path_iter(dest.path.toIterable, false))
+// case _ =>
+// throw new Exception("Uknown destination type: "+dest.getClass);
+// }
+// }
+// rc.toString
+// }
}
- def updated_headers(destination: Array[DestinationDTO], headers:HeaderMap) = {
+ def updated_headers(addresses: Array[SimpleAddress], headers:HeaderMap) = {
var rc:HeaderMap=Nil
// Do we need to re-write the destination names?
- if( destination.find(_.temp()).isDefined ) {
- rc ::= (DESTINATION -> encode_header(encode_destination(destination)))
+ if( addresses.find(_.id.startsWith("temp.")).isDefined ) {
+ rc ::= (DESTINATION -> encode_header(encode_address(addresses)))
}
get(headers, REPLY_TO).foreach { value=>
// we may need to translate local temp destination names to broker destination names
if( value.indexOf(TEMP_QUEUE)>=0 || value.indexOf(TEMP_TOPIC)>=0 ) {
try {
- val dests: Array[DestinationDTO] = value
- if (dests.find(_.temp()).isDefined) {
- rc ::= (REPLY_TO -> encode_header(encode_destination(dests)))
+ val dests = decode_addresses(value)
+ if (dests.find(_.id.startsWith("temp.")).isDefined) {
+ rc ::= (REPLY_TO -> encode_header(encode_address(dests)))
}
} catch {
case _=> // the translation is a best effort thing.
@@ -1113,7 +1109,7 @@ class StompProtocolHandler extends Proto
rc
}
- def send_via_route(destination: Array[DestinationDTO], route:DeliveryProducerRoute, frame:StompFrame, uow:StoreUOW) = {
+ def send_via_route(addresses: Array[SimpleAddress], route:DeliveryProducerRoute, frame:StompFrame, uow:StoreUOW) = {
var storeBatch:StoreUOW=null
// User might be asking for ack that we have processed the message..
val receipt = frame.header(RECEIPT_REQUESTED)
@@ -1121,7 +1117,7 @@ class StompProtocolHandler extends Proto
if( !route.targets.isEmpty ) {
// We may need to add some headers..
- var message = updated_headers(destination, frame.headers) match {
+ var message = updated_headers(addresses, frame.headers) match {
case Nil=>
StompFrameMessage(StompFrame(MESSAGE, frame.headers, frame.content))
case updated_headers =>
@@ -1161,7 +1157,7 @@ class StompProtocolHandler extends Proto
def on_stomp_subscribe(headers:HeaderMap):Unit = {
val dest = get(headers, DESTINATION).getOrElse(die("destination not set."))
- var destination:Array[DestinationDTO] = dest
+ var addresses:Array[_ <: BindAddress] = decode_addresses(dest)
val subscription_id = get(headers, ID)
var id:AsciiBuffer = subscription_id.getOrElse {
@@ -1184,11 +1180,10 @@ class StompProtocolHandler extends Proto
val from_seq_opt = get(headers, FROM_SEQ)
- def is_multi_destination = if( destination.length > 1 ) {
+ def is_multi_destination = if( addresses.length > 1 ) {
true
} else {
- val path = destination_parser.decode_path(destination(0).path)
- PathParser.containsWildCards(path)
+ PathParser.containsWildCards(addresses(0).path)
}
if( from_seq_opt.isDefined && is_multi_destination ) {
die("The from-seq header is only supported when you subscribe to one destination");
@@ -1227,30 +1222,26 @@ class StompProtocolHandler extends Proto
}
if( persistent ) {
-
- val dsubs = ListBuffer[DurableSubscriptionDestinationDTO]()
- val topics = ListBuffer[TopicDestinationDTO]()
- destination.foreach { _ match {
- case x:DurableSubscriptionDestinationDTO=> dsubs += x
- case x:TopicDestinationDTO=> topics += x
- case _ => die("A persistent subscription can only be used on a topic destination")
- } }
-
- if( !topics.isEmpty ) {
- val dsub = new DurableSubscriptionDestinationDTO(destination_parser.sanitize_destination_part(decode_header(id)))
- dsub.selector = if (selector == null) null else selector._1
- topics.foreach( dsub.topics.add(_) )
- dsubs += dsub
+ val dsubs = ListBuffer[BindAddress]()
+ val topics = ListBuffer[BindAddress]()
+ addresses.foreach { address =>
+ address.domain match {
+ case "dsub" => dsubs += address
+ case "topic" => topics += address
+ case _ => die("A persistent subscription can only be used on a topic destination")
+ }
}
- destination = dsubs.toArray
+ val s = if (selector == null) null else selector._1
+ dsubs += SubscriptionAddress(destination_parser.decode_path(decode_header(id)), s, topics.toArray)
+ addresses = dsubs.toArray
}
val from_seq = from_seq_opt.map(_.toString.toLong).getOrElse(0L)
- val consumer = new StompConsumer(subscription_id, destination, ack_mode, selector, browser, exclusive, credit_window, include_seq, from_seq, browser_end);
+ val consumer = new StompConsumer(subscription_id, addresses, ack_mode, selector, browser, exclusive, credit_window, include_seq, from_seq, browser_end);
consumers += (id -> consumer)
host.dispatch_queue {
- val rc = host.router.bind(destination, consumer, security_context)
+ val rc = host.router.bind(addresses, consumer, security_context)
consumer.release
dispatchQueue {
rc match {
@@ -1290,7 +1281,7 @@ class StompProtocolHandler extends Proto
// consumer gets disposed after all producer stop sending to it...
consumer.setDisposer(^{ send_receipt(headers) })
consumers -= id
- host.router.unbind(consumer.destination, consumer, persistent, security_context)
+ host.router.unbind(consumer.addresses, consumer, persistent, security_context)
}
}
Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/Part.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/Part.scala?rev=1240510&r1=1240509&r2=1240510&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/Part.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/Part.scala Sat Feb 4 14:42:52 2012
@@ -17,6 +17,7 @@
package org.apache.activemq.apollo.util.path
import java.util.regex.Pattern
+import java.lang.String
/**
* Holds the delimiters used to parse paths.
@@ -37,11 +38,12 @@ object RootPart extends Part {
object AnyChildPart extends Part
object AnyDescendantPart extends Part
-case class RegexChildPart(regex:Pattern, original:String) extends Part
+case class RegexChildPart(regex:Pattern) extends Part
case class LiteralPart(value: String) extends Part {
override def matches(p: Part) = p match {
case LiteralPart(v) => v == value
case _ => true
}
+ override def toString = value
}
Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMapNode.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMapNode.scala?rev=1240510&r1=1240509&r2=1240510&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMapNode.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMapNode.scala Sat Feb 4 14:42:52 2012
@@ -144,7 +144,7 @@ class PathMapNode[Value](val parent: Pat
node.appendMatchingWildcards(answer, path, i)
node = new AnyChildPathNode[Value](node)
i += 1;
- case RegexChildPart(r, _) =>
+ case RegexChildPart(r) =>
node.appendMatchingWildcards(answer, path, i)
node = new RegexChildPathNode[Value](node, r)
i += 1;
@@ -208,7 +208,7 @@ class PathMapNode[Value](val parent: Pat
node.appendMatchingWildcards(answer, path, i)
i += 1
node = new AnyChildPathNode[Value](node)
- case RegexChildPart(r, _) =>
+ case RegexChildPart(r) =>
node.appendMatchingWildcards(answer, path, i)
i += 1
node = new RegexChildPathNode[Value](node, r)
Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathParser.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathParser.scala?rev=1240510&r1=1240509&r2=1240510&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathParser.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathParser.scala Sat Feb 4 14:42:52 2012
@@ -22,6 +22,7 @@ import collection.JavaConversions._
import org.apache.activemq.apollo.util.path.PathParser.PartFilter
import collection.mutable.ListBuffer
import org.fusesource.hawtbuf.{Buffer, DataByteArrayOutputStream, AsciiBuffer}
+import java.lang.String
/**
* Holds the delimiters used to parse paths.
@@ -127,28 +128,24 @@ class PathParser {
this
}
- def sanitize_destination_part(value:String, wildcards:Boolean=false) = {
- val rc = new StringBuffer(value.length())
- var pos = new Buffer(value.getBytes("UTF-8"))
- while( pos.length > 0 ) {
- val c = pos.get(0).toChar
- val cs = c.toString
- if((wildcards && (
- cs == any_descendant_wildcard ||
- cs == any_child_wildcard ||
- cs == regex_wildcard_start ||
- cs == regex_wildcard_end
- ))|| part_pattern.matcher(cs).matches() ) {
+ def url_encode(value:String, allowed:Pattern) = {
+ // UTF-8 Encode..
+ var ascii = new Buffer(value.getBytes("UTF-8")).ascii().toString
+ val matcher = allowed.matcher(ascii);
+ var pos = 0;
+ val rc = new StringBuffer(ascii.length())
+ while( pos < ascii.length() ) {
+ val c = ascii.charAt(pos)
+ if( matcher.find(pos) ) {
rc.append(c)
} else {
- rc.append("%%%02x".format(pos.get(0)))
+ rc.append("%%%02x".format(c))
}
- pos.moveHead(1)
}
rc.toString
}
- def unsanitize_destination_part(value:String):String = {
+ def url_decode(value:String):String = {
val rc = new DataByteArrayOutputStream
var pos = value
while( pos.length() > 0 ) {
@@ -163,29 +160,23 @@ class PathParser {
}
rc.toBuffer.utf8().toString
}
-
-
- def decode_path(subject: java.util.Collection[String]): Path = decode_path(subject.toIterable)
- def decode_path(subject: Iterable[String]): Path = {
- return new Path(subject.toList.map(decode_part(_)))
- }
- def parts(subject: String, sanitize:Boolean=false): Array[String] = {
+ def parts(subject: String): Array[String] = {
val rc = if(path_separator!=null) {
subject.split(Pattern.quote(path_separator))
} else {
Array(subject)
}
- if (sanitize) {
- rc.map(sanitize_destination_part(_, true))
- } else {
- rc
- }
+ rc
+ }
+
+ def decode_path(parts: Iterable[String]): Path = {
+ return new Path(parts.toList.map(decode_part(_)))
}
- def decode_path(subject: String, sanitize:Boolean=false): Path = {
- return decode_path(parts(subject, sanitize))
+ def decode_path(subject: String): Path = {
+ return decode_path(parts(subject))
}
def regex_map[T](text:String, pattern: Pattern)(func: Either[CharSequence, Matcher] => T) = {
@@ -201,42 +192,57 @@ class PathParser {
rc.toList
}
+ lazy val wildcard_part_pattern = if (regex_wildcard_start!=null && regex_wildcard_end!=null) {
+ var p = Pattern.quote(regex_wildcard_start)+"(.*?)"+Pattern.quote(regex_wildcard_end)
+ if(any_child_wildcard!=null) {
+ p += "|" + Pattern.quote(any_child_wildcard)
+ }
+ p.r.pattern
+ } else {
+ null
+ }
+
private def decode_part(value: String): Part = {
if (any_child_wildcard!=null && value == any_child_wildcard) {
- return AnyChildPart
+ AnyChildPart
} else if (any_descendant_wildcard!=null && value == any_descendant_wildcard) {
- return AnyDescendantPart
- } else {
- if (part_pattern == null || part_pattern.matcher(value.toString).matches) {
- return LiteralPart(value)
- } else {
-
- val pattern = (
- (Pattern.quote(regex_wildcard_start)+"(.*?)"+Pattern.quote(regex_wildcard_end)) +
- "|" +
- Pattern.quote(any_child_wildcard)
- ).r.pattern
-
- val regex = regex_map(value, pattern) { _ match {
- case Left(x) =>
- if (x=="") {
- ""
- } else {
- if( part_pattern.matcher(x).matches ) {
- Pattern.quote(x.toString)
- } else {
+ AnyDescendantPart
+ } else if (wildcard_part_pattern!=null && wildcard_part_pattern.matcher(value).matches() ) {
+ val regex = regex_map(value, wildcard_part_pattern) { _ match {
+ // It's a literal part.
+ case Left(x) =>
+ if (x=="") {
+ ""
+ } else {
+ if( part_pattern!=null ) {
+ if (!part_pattern.matcher(x).matches) {
throw new PathParser.PathException(String.format("Invalid destination: '%s', it does not match regex: %s", value, part_pattern))
+ } else {
+ Pattern.quote(url_decode(x.toString))
}
- }
- case Right(wildcard) =>
- if ( wildcard.group() == any_child_wildcard ) {
- ".*?"
} else {
- wildcard.group(1)
+ Pattern.quote(x.toString)
}
- } }.mkString("")
-
- return RegexChildPart(("^"+regex+"$").r.pattern, value)
+ }
+ // It was a regex part..
+ case Right(wildcard) =>
+ if ( wildcard.group() == any_child_wildcard ) {
+ ".*?"
+ } else {
+ wildcard.group(1)
+ }
+ } }.mkString("")
+ var regex_string: String = "^" + regex + "$"
+ RegexChildPart(regex_string.r.pattern)
+ } else {
+ if (part_pattern != null ) {
+ if ( !part_pattern.matcher(value.toString).matches) {
+ throw new PathParser.PathException(String.format("Invalid destination: '%s', it does not match regex: %s", value, part_pattern))
+ } else {
+ LiteralPart(url_decode(value))
+ }
+ } else {
+ LiteralPart(value)
}
}
}
@@ -245,34 +251,30 @@ class PathParser {
* Converts the path back to the string representation.
* @return
*/
- def encode_path(path: Path, unsanitize_destinations:Boolean=false): String = encode_path_iter(path_parts(path))
+ def encode_path(path: Path): String = encode_path_iter(path_parts(path))
- def path_parts(path: Path, unsanitize_destinations:Boolean=false):Array[String] = {
+ def path_parts(path: Path):Array[String] = {
(path.parts.map( _ match {
case RootPart => ""
case AnyChildPart => any_child_wildcard
case AnyDescendantPart => any_descendant_wildcard
- case RegexChildPart(_, original) => original
+ case RegexChildPart(regex) => regex_wildcard_start + regex.pattern() + regex_wildcard_end
case LiteralPart(value) =>
- if(unsanitize_destinations) {
- unsanitize_destination_part(value)
+ if( part_pattern !=null ) {
+ url_encode(value, part_pattern)
} else {
value
}
})).toArray
}
- def encode_path_iter(parts: Iterable[String], unsanitize_destinations:Boolean=false): String = {
- var buffer: StringBuffer = new StringBuffer
+ def encode_path_iter(parts: Iterable[String]): String = {
+ val buffer: StringBuffer = new StringBuffer
for (p <- parts) {
if ( buffer.length() != 0) {
buffer.append(path_separator)
}
- if(unsanitize_destinations) {
- buffer.append(unsanitize_destination_part(p))
- } else {
- buffer.append(p)
- }
+ buffer.append(p)
}
return buffer.toString
}
@@ -285,7 +287,7 @@ class PathParser {
last = new LitteralPathFilter(last, p)
case AnyChildPart =>
last = new PathParser.AnyChildPathFilter(last)
- case RegexChildPart(r, _) =>
+ case RegexChildPart(r) =>
last = new PathParser.RegexChildPathFilter(r, last)
case AnyDescendantPart =>
last = new PathParser.AnyDecendentPathFilter(last)