You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/11/29 13:15:00 UTC
svn commit: r1040082 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/main/resources/
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-broker/src/test/resources/org/apache/activemq/apollo/broker/
apollo-broker/src/test/scal...
Author: chirino
Date: Mon Nov 29 12:14:59 2010
New Revision: 1040082
URL: http://svn.apache.org/viewvc?rev=1040082&view=rev
Log:
Hierarchical destinations decoupled from their encoded representation, protocols are free to customize the encoding.
Destination and queue settings are now configurable.
Added:
activemq/activemq-apollo/trunk/apollo-broker/src/test/resources/org/apache/activemq/apollo/broker/destination-config.xml
- copied, changed from r1039378, activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/jaxb/testSimpleConfig.xml
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/DestinationConfigurationTest.scala
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/Path.java
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathParser.java
Removed:
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMapEntry.java
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathSupport.java
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PrefixPathFilter.java
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/SimplePathFilter.java
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/WildcardPathFilter.java
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/non-persistent-activemq.xml
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.java
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Destinations.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/jaxb/XMLBrokerFactoryTest.scala
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/jaxb/testSimpleConfig.xml
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java
activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/AnyChildPathNode.java
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathFilter.java
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMap.java
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMapNode.java
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathNode.java
activemq/activemq-apollo/trunk/apollo-util/src/test/scala/org/apache/activemq/apollo/util/path/PathMapMemoryTest.java
activemq/activemq-apollo/trunk/apollo-util/src/test/scala/org/apache/activemq/apollo/util/path/PathMapTest.java
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/non-persistent-activemq.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/non-persistent-activemq.xml?rev=1040082&r1=1040081&r2=1040082&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/non-persistent-activemq.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/non-persistent-activemq.xml Mon Nov 29 12:14:59 2010
@@ -18,6 +18,5 @@
<broker xmlns="http://activemq.apache.org/schema/activemq/apollo">
<virtual-host>
<host-name>default</host-name>
- <memory-store/>
</virtual-host>
</broker>
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala?rev=1040082&r1=1040081&r2=1040082&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala Mon Nov 29 12:14:59 2010
@@ -16,11 +16,13 @@
*/
package org.apache.activemq.apollo.broker
-import org.apache.activemq.apollo.util.ClassFinder
import org.fusesource.hawtbuf.{Buffer, AsciiBuffer}
-import org.apache.activemq.apollo.dto.{JsonCodec, DurableSubscriptionBindingDTO, PointToPointBindingDTO, BindingDTO}
import org.apache.activemq.apollo.selector.SelectorParser
import org.apache.activemq.apollo.filter.{ConstantExpression, BooleanExpression}
+import org.apache.activemq.apollo.util.path.Path
+import Buffer._
+import org.apache.activemq.apollo.dto._
+import org.apache.activemq.apollo.util.{OptionSupport, ClassFinder}
/**
* <p>
@@ -60,8 +62,18 @@ object BindingFactory {
}
throw new IllegalArgumentException("Invalid binding type: "+binding_dto);
}
+
+}
+
+object Binding {
+ val destination_parser = new DestinationParser
+
+ def encode(value:Array[Path]):String = destination_parser.toString(value)
+ def decode(value:String):Array[Path] = destination_parser.parsePath(ascii(value))
}
+import Binding._
+
/**
* <p>
* </p>
@@ -91,11 +103,19 @@ trait Binding {
def message_filter:BooleanExpression = ConstantExpression.TRUE
- def destination:AsciiBuffer
+ def matches(config:QueueDTO):Boolean = {
+ import Binding.destination_parser._
+ import OptionSupport._
+ var rc = (o(config.destination).map{ x=> parseFilter(ascii(x)).matches(destination) }.getOrElse(true))
+ rc = rc && (o(config.kind).map{ x=> x == binding_kind.toString }.getOrElse(true))
+ rc
+ }
+
+ def destination:Array[Path]
}
object PointToPointBinding {
- val POINT_TO_POINT_KIND = new AsciiBuffer("p2p")
+ val POINT_TO_POINT_KIND = new AsciiBuffer("ptp")
val DESTINATION_PATH = new AsciiBuffer("default");
}
@@ -115,9 +135,9 @@ class PointToPointBindingFactory extends
def create(binding_dto:BindingDTO) = {
if( binding_dto.isInstanceOf[PointToPointBindingDTO] ) {
- val p2p_dto = binding_dto.asInstanceOf[PointToPointBindingDTO]
- val data = new AsciiBuffer(p2p_dto.destination).buffer
- new PointToPointBinding(data, p2p_dto)
+ val ptp_dto = binding_dto.asInstanceOf[PointToPointBindingDTO]
+ val data = new AsciiBuffer(ptp_dto.destination).buffer
+ new PointToPointBinding(data, ptp_dto)
} else {
null
}
@@ -132,6 +152,7 @@ class PointToPointBindingFactory extends
*/
class PointToPointBinding(val binding_data:Buffer, val binding_dto:PointToPointBindingDTO) extends Binding {
+ val destination = Binding.decode(binding_dto.destination)
def binding_kind = POINT_TO_POINT_KIND
def unbind(node: RoutingNode, queue: Queue) = {
@@ -155,7 +176,6 @@ class PointToPointBinding(val binding_da
case _ => false
}
- def destination = new AsciiBuffer(binding_dto.destination)
}
@@ -191,6 +211,8 @@ class DurableSubBindingFactory extends B
*/
class DurableSubBinding(val binding_data:Buffer, val binding_dto:DurableSubscriptionBindingDTO) extends Binding {
+ val destination = Binding.decode(binding_dto.destination)
+
def binding_kind = DURABLE_SUB_KIND
@@ -228,6 +250,11 @@ class DurableSubBinding(val binding_data
}
}
- def destination = new AsciiBuffer(binding_dto.destination)
-
+ override def matches(config: QueueDTO): Boolean = {
+ import OptionSupport._
+ var rc = super.matches(config)
+ rc = rc && (o(config.client_id).map{ x=> x == binding_dto.client_id }.getOrElse(true))
+ rc = rc && (o(config.subscription_id).map{ x=> x == binding_dto.subscription_id }.getOrElse(true))
+ rc
+ }
}
\ No newline at end of file
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=1040082&r1=1040081&r2=1040082&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Mon Nov 29 12:14:59 2010
@@ -191,7 +191,7 @@ class Broker() extends BaseService with
/**
* Validates and then applies the configuration.
*/
- def configure(config: BrokerDTO, reporter:Reporter) = ^{
+ def configure(config: BrokerDTO, reporter:Reporter) = dispatchQueue {
if ( validate(config, reporter) < ERROR ) {
this.config = config
@@ -201,7 +201,7 @@ class Broker() extends BaseService with
}
}
- } >>: dispatchQueue
+ }
override def _start(onCompleted:Runnable) = {
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.java?rev=1040082&r1=1040081&r2=1040082&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.java (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.java Mon Nov 29 12:14:59 2010
@@ -16,12 +16,13 @@
*/
package org.apache.activemq.apollo.broker;
+import org.apache.activemq.apollo.util.path.Path;
import org.fusesource.hawtbuf.AsciiBuffer;
/**
*/
public interface Destination {
AsciiBuffer getDomain();
- AsciiBuffer getName();
+ Path[] path();
Destination[] getDestinations();
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Destinations.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Destinations.scala?rev=1040082&r1=1040081&r2=1040082&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Destinations.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Destinations.scala Mon Nov 29 12:14:59 2010
@@ -18,51 +18,48 @@ package org.apache.activemq.apollo.broke
import _root_.org.fusesource.hawtbuf._
import BufferConversions._
+import org.apache.activemq.apollo.util.path.{Path, PathParser}
+import Buffer._
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class ParserOptions {
- var defaultDomain: AsciiBuffer = null
- var queuePrefix: AsciiBuffer = null
- var topicPrefix: AsciiBuffer = null
- var tempQueuePrefix: AsciiBuffer = null
- var tempTopicPrefix: AsciiBuffer = null
- var separator: Option[Byte] = None
-}
+class DestinationParser extends PathParser {
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-object DestinationParser {
+ var default_domain: AsciiBuffer = null
+ var queue_prefix: AsciiBuffer = ascii("queue:")
+ var topic_prefix: AsciiBuffer = ascii("topic:")
+ var temp_queue_prefix: AsciiBuffer = ascii("temp-queue:")
+ var temp_topic_prefix: AsciiBuffer = ascii("temp-topic:")
+ var destination_separator: Option[Byte] = Some(','.toByte)
- def toBuffer(value: Destination, options: ParserOptions): AsciiBuffer = {
+ def toBuffer(value: Destination): AsciiBuffer = {
if (value == null) {
null
} else {
val baos = new ByteArrayOutputStream
def write(value: Destination):Unit = {
if (value.getDestinations != null) {
- assert( options.separator.isDefined )
+ assert( destination_separator.isDefined )
val first = true
for (d <- value.getDestinations) {
if (!first) {
- baos.write(options.separator.get)
+ baos.write(destination_separator.get)
}
write(d)
}
} else {
value.getDomain match {
case Router.QUEUE_DOMAIN =>
- baos.write(options.queuePrefix)
+ baos.write(queue_prefix)
case Router.TOPIC_DOMAIN =>
- baos.write(options.topicPrefix)
+ baos.write(topic_prefix)
case Router.TEMP_QUEUE_DOMAIN =>
- baos.write(options.tempQueuePrefix)
+ baos.write(temp_queue_prefix)
case Router.TEMP_TOPIC_DOMAIN =>
- baos.write(options.tempTopicPrefix)
+ baos.write(temp_topic_prefix)
}
- baos.write(value.getName)
+ this.write(value.path, baos)
}
}
write(value)
@@ -74,20 +71,19 @@ object DestinationParser {
* Parses a destination which may or may not be a composite.
*
* @param value
- * @param options
* @param compositeSeparator
* @return
*/
- def parse(value: AsciiBuffer, options: ParserOptions): Destination = {
+ def parse(value: AsciiBuffer): Destination = {
if (value == null) {
return null;
}
- if (options.separator.isDefined && value.contains(options.separator.get)) {
- var rc = value.split(options.separator.get);
+ if (destination_separator.isDefined && value.contains(destination_separator.get)) {
+ var rc = value.split(destination_separator.get);
var dl: List[Destination] = Nil
for (buffer <- rc) {
- val d = parse(buffer, options)
+ val d = parse(buffer)
if (d == null) {
return null;
}
@@ -95,23 +91,23 @@ object DestinationParser {
}
return new MultiDestination(dl.toArray[Destination]);
} else {
- if (options.queuePrefix != null && value.startsWith(options.queuePrefix)) {
- var name = value.slice(options.queuePrefix.length, value.length).ascii();
- return new SingleDestination(Router.QUEUE_DOMAIN, name);
- } else if (options.topicPrefix != null && value.startsWith(options.topicPrefix)) {
- var name = value.slice(options.topicPrefix.length, value.length).ascii();
- return new SingleDestination(Router.TOPIC_DOMAIN, name);
- } else if (options.tempQueuePrefix != null && value.startsWith(options.tempQueuePrefix)) {
- var name = value.slice(options.tempQueuePrefix.length, value.length).ascii();
- return new SingleDestination(Router.TEMP_QUEUE_DOMAIN, name);
- } else if (options.tempTopicPrefix != null && value.startsWith(options.tempTopicPrefix)) {
- var name = value.slice(options.tempTopicPrefix.length, value.length).ascii();
- return new SingleDestination(Router.TEMP_TOPIC_DOMAIN, name);
+ if (queue_prefix != null && value.startsWith(queue_prefix)) {
+ var name = value.slice(queue_prefix.length, value.length).ascii();
+ return new SingleDestination(Router.QUEUE_DOMAIN, parsePath(name));
+ } else if (topic_prefix != null && value.startsWith(topic_prefix)) {
+ var name = value.slice(topic_prefix.length, value.length).ascii();
+ return new SingleDestination(Router.TOPIC_DOMAIN, parsePath(name));
+ } else if (temp_queue_prefix != null && value.startsWith(temp_queue_prefix)) {
+ var name = value.slice(temp_queue_prefix.length, value.length).ascii();
+ return new SingleDestination(Router.TEMP_QUEUE_DOMAIN, parsePath(name));
+ } else if (temp_topic_prefix != null && value.startsWith(temp_topic_prefix)) {
+ var name = value.slice(temp_topic_prefix.length, value.length).ascii();
+ return new SingleDestination(Router.TEMP_TOPIC_DOMAIN, parsePath(name));
} else {
- if (options.defaultDomain == null) {
+ if (default_domain == null) {
return null;
}
- return new SingleDestination(options.defaultDomain, value);
+ return new SingleDestination(default_domain, parsePath(value));
}
}
}
@@ -120,11 +116,11 @@ object DestinationParser {
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-case class SingleDestination(var domain: AsciiBuffer = null, var name: AsciiBuffer = null) extends Destination {
+case class SingleDestination(var domain: AsciiBuffer = null, var name: Array[Path] = null) extends Destination {
def getDestinations(): Array[Destination] = null;
def getDomain(): AsciiBuffer = domain
- def getName(): AsciiBuffer = name
+ def path() = name
override def toString() = "" + domain + ":" + name
}
@@ -136,7 +132,7 @@ case class MultiDestination(var destinat
def getDestinations(): Array[Destination] = destinations;
def getDomain(): AsciiBuffer = null
- def getName(): AsciiBuffer = null
+ def path() = null
override def toString() = destinations.mkString(",")
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1040082&r1=1040081&r2=1040082&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Mon Nov 29 12:14:59 2010
@@ -30,6 +30,8 @@ import org.apache.activemq.apollo.store.
import org.apache.activemq.apollo.util._
import org.apache.activemq.apollo.util.list._
import org.fusesource.hawtdispatch.{Dispatch, ListEventAggregator, DispatchQueue, BaseRetained}
+import org.apache.activemq.apollo.dto.QueueDTO
+import OptionSupport._
object Queue extends Log {
val subcsription_counter = new AtomicInteger(0)
@@ -39,7 +41,7 @@ object Queue extends Log {
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class Queue(val host: VirtualHost, var id:Long, val binding:Binding) extends BaseRetained with Route with DeliveryConsumer with BaseService with DispatchLogging {
+class Queue(val host: VirtualHost, var id:Long, val binding:Binding, var config:QueueDTO) extends BaseRetained with Route with DeliveryConsumer with BaseService with DispatchLogging {
override protected def log = Queue
var inbound_sessions = Set[DeliverySession]()
@@ -75,45 +77,50 @@ class Queue(val host: VirtualHost, var i
entries.addFirst(head_entry)
//
- // Tuning options.
+ // In-frequently accessed tuning configuration.
//
/**
* The amount of memory buffer space for receiving messages.
*/
- var tune_producer_buffer = 1024*32
+ def tune_producer_buffer = config.producer_buffer.getOrElse(1024*32)
/**
* The amount of memory buffer space for the queue..
*/
- var tune_queue_buffer = 1024*32
-
- /**
- * The amount of memory buffer space to use per subscription.
- */
- var tune_consumer_buffer = 1024*64
+ def tune_queue_buffer = config.queue_buffer.getOrElse(1024*32)
/**
* Subscribers that consume slower than this rate per seconds will be considered
* slow. Once a consumer is considered slow, we may switch to disk spooling.
*/
- var tune_slow_subscription_rate = 500*1024
+ def tune_slow_subscription_rate = config.slow_subscription_rate.getOrElse(500*1024)
/**
* The number of milliseconds between slow consumer checks.
*/
- var tune_slow_check_interval = 500L
+ def tune_slow_check_interval = config.slow_check_interval.getOrElse(500L)
+
+ /**
+ * The number of intervals that a consumer must not meeting the subscription rate before it is
+ * flagged as a slow consumer.
+ */
+ def tune_max_slow_intervals = config.max_slow_intervals.getOrElse(10)
+
+ //
+ // Frequently accessed tuning configuration.
+ //
/**
* Should this queue persistently store it's entries?
*/
- def tune_persistent = host.store !=null
+ var tune_persistent = true
/**
* Should messages be flushed or swapped out of memory if
* no consumers need the message?
*/
- def tune_flush_to_store = tune_persistent
+ var tune_flush_to_store = true
/**
* The number max number of flushed queue entries to load
@@ -121,13 +128,21 @@ class Queue(val host: VirtualHost, var i
* reference pointers to the actual messages. When not loaded,
* the batch is referenced as sequence range to conserve memory.
*/
- def tune_flush_range_size = 10000
+ var tune_flush_range_size = 0
/**
- * The number of intervals that a consumer must not meeting the subscription rate before it is
- * flagged as a slow consumer.
+ * The amount of memory buffer space to use per subscription.
*/
- var tune_max_slow_intervals = 10
+ var tune_consumer_buffer = 0
+
+ def configure(c:QueueDTO) = {
+ config = c
+ tune_persistent = host.store !=null && config.persistent.getOrElse(true)
+ tune_flush_to_store = tune_persistent && config.flush_to_store.getOrElse(true)
+ tune_flush_range_size = config.flush_range_size.getOrElse(10000)
+ tune_consumer_buffer = config.consumer_buffer.getOrElse(1024*64)
+ }
+ configure(config)
var enqueue_item_counter = 0L
var dequeue_item_counter = 0L
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1040082&r1=1040081&r2=1040082&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Mon Nov 29 12:14:59 2010
@@ -25,22 +25,23 @@ import _root_.org.fusesource.hawtdispatc
import collection.JavaConversions
import org.apache.activemq.apollo.util._
import collection.mutable.{ListBuffer, HashMap}
-import org.apache.activemq.apollo.dto.{PointToPointBindingDTO, BindingDTO}
-import path.{PathFilter, PathMap}
import scala.collection.immutable.List
import org.apache.activemq.apollo.store.{StoreUOW, QueueRecord}
+import org.apache.activemq.apollo.util.path.{Path, PathMap, PathParser}
+import Buffer._
+import org.apache.activemq.apollo.dto.{QueueDTO, PointToPointBindingDTO, BindingDTO}
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
object Router extends Log {
- val TOPIC_DOMAIN = new AsciiBuffer("topic");
- val QUEUE_DOMAIN = new AsciiBuffer("queue");
- val TEMP_TOPIC_DOMAIN = new AsciiBuffer("temp-topic");
- val TEMP_QUEUE_DOMAIN = new AsciiBuffer("temp-queue");
+ val TOPIC_DOMAIN = ascii("topic");
+ val QUEUE_DOMAIN = ascii("queue");
+ val TEMP_TOPIC_DOMAIN = ascii("temp-topic");
+ val TEMP_QUEUE_DOMAIN = ascii("temp-queue");
- val QUEUE_KIND = new AsciiBuffer("queue");
- val DEFAULT_QUEUE_PATH = new AsciiBuffer("default");
+ val QUEUE_KIND = ascii("queue");
+ val DEFAULT_QUEUE_PATH = ascii("default");
}
/**
@@ -83,12 +84,12 @@ class Router(val host:VirtualHost) exten
}
}
- def routing_nodes:Iterable[RoutingNode] = JavaConversions.asIterable(destinations.get(PathFilter.ANY_DESCENDENT))
+ def routing_nodes:Iterable[RoutingNode] = JavaConversions.asIterable(destinations.get(Array(PathParser.ANY_DESCENDANT)))
- def create_destination_or(destination:AsciiBuffer)(func:(RoutingNode)=>Unit):RoutingNode = {
+ def create_destination_or(destination:Array[Path])(func:(RoutingNode)=>Unit):RoutingNode = {
// We can't create a wild card destination.. only wild card subscriptions.
- assert( !PathFilter.containsWildCards(destination) )
+ assert( !PathParser.containsWildCards(destination) )
var rc = destinations.chooseValue( destination )
if( rc == null ) {
@@ -112,13 +113,21 @@ class Router(val host:VirtualHost) exten
rc
}
- def get_destination_matches(destination:AsciiBuffer) = {
+ def get_destination_matches(destination:Array[Path]) = {
import JavaConversions._
asIterable(destinations.get( destination ))
}
def _create_queue(id:Long, binding:Binding):Queue = {
- val queue = new Queue(host, id, binding)
+
+ val config = {
+ import collection.JavaConversions._
+ host.config.queues.find{ config=>
+ binding.matches(config)
+ }
+ }.getOrElse(new QueueDTO)
+
+ val queue = new Queue(host, id, binding, config)
queue.start
queues.put(binding, queue)
@@ -127,7 +136,7 @@ class Router(val host:VirtualHost) exten
if( name!=null ) {
bindings.put(name, queue)
// make sure the destination is created if this is not a wild card sub
- if( !PathFilter.containsWildCards(name) ) {
+ if( !PathParser.containsWildCards(name) ) {
create_destination_or(name) { node=>
node.add_queue(queue)
}
@@ -195,11 +204,11 @@ class Router(val host:VirtualHost) exten
assert( is_topic(destination) )
- val name = destination.getName
+ val name = destination.path
// make sure the destination is created if this is not a wild card sub
- if( !PathFilter.containsWildCards(name) ) {
- val node = create_destination_or(name) { node=> }
+ if( !PathParser.containsWildCards(name) ) {
+ val node = create_destination_or(name) { node=> Unit }
}
get_destination_matches(name).foreach( node=>
@@ -211,7 +220,7 @@ class Router(val host:VirtualHost) exten
def unbind(destination:Destination, consumer:DeliveryConsumer) = releasing(consumer) {
assert( is_topic(destination) )
- val name = destination.getName
+ val name = destination.path
broadcast_consumers.remove(name, consumer)
get_destination_matches(name).foreach{ node=>
node.remove_broadcast_consumer(consumer)
@@ -234,13 +243,13 @@ class Router(val host:VirtualHost) exten
// Looking up the queue will cause it to get created if it does not exist.
val queue = if( !topic ) {
val dto = new PointToPointBindingDTO
- dto.destination = destination.getName.toString
+ dto.destination = Binding.encode(destination.path)
_create_queue(dto)
} else {
None
}
- val node = create_destination_or(destination.getName) { node=> }
+ val node = create_destination_or(destination.path) { node=> Unit }
if( node.unified || topic ) {
node.add_broadcast_producer( route )
} else {
@@ -254,7 +263,7 @@ class Router(val host:VirtualHost) exten
def disconnect(route:DeliveryProducerRoute) = releasing(route) {
val topic = is_topic(route.destination)
- val node = create_destination_or(route.destination.getName) { node=> }
+ val node = create_destination_or(route.destination.path) { node=> Unit }
if( node.unified || topic ) {
node.remove_broadcast_producer(route)
}
@@ -268,7 +277,7 @@ class Router(val host:VirtualHost) exten
/**
* Tracks state associated with a destination name.
*/
-class RoutingNode(val router:Router, val name:AsciiBuffer) {
+class RoutingNode(val router:Router, val name:Array[Path]) {
val id = router.destination_id_counter.incrementAndGet
@@ -276,8 +285,14 @@ class RoutingNode(val router:Router, val
var broadcast_consumers = ListBuffer[DeliveryConsumer]()
var queues = ListBuffer[Queue]()
- // TODO: extract the node's config from the host config object
- def unified = false
+ val unified = {
+ import collection.JavaConversions._
+ import OptionSupport._
+ import Binding.destination_parser._
+
+ val t= router.host.config.destinations.find( x=> parseFilter(ascii(x.path)).matches(name) )
+ t.flatMap(x=> o(x.unified)).getOrElse(false)
+ }
def add_broadcast_consumer (consumer:DeliveryConsumer) = {
broadcast_consumers += consumer
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=1040082&r1=1040081&r2=1040082&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Mon Nov 29 12:14:59 2010
@@ -32,18 +32,13 @@ import collection.JavaConversions
import java.util.concurrent.atomic.AtomicLong
import org.apache.activemq.apollo.util.OptionSupport._
import security.{Authenticator, Authorizer}
+import org.apache.activemq.apollo.util.path.PathParser
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
object VirtualHost extends Log {
- val destination_parser_options = new ParserOptions
- destination_parser_options.queuePrefix = new AsciiBuffer("queue:")
- destination_parser_options.topicPrefix = new AsciiBuffer("topic:")
- destination_parser_options.tempQueuePrefix = new AsciiBuffer("temp-queue:")
- destination_parser_options.tempTopicPrefix = new AsciiBuffer("temp-topic:")
-
/**
* Creates a default a configuration object.
*/
@@ -223,9 +218,7 @@ class VirtualHost(val broker: Broker, va
// rates between producers and consumers, look for natural data flow partitions
// and then try to equally divide the load over the available processing
// threads/cores.
- val nodes = router.destinations.get(PathFilter.ANY_DESCENDENT)
-
- JavaConversions.asIterable(nodes).foreach { node =>
+ router.routing_nodes.foreach { node =>
// For the topics, just collocate the producers onto the first consumer's
// thread.
Copied: activemq/activemq-apollo/trunk/apollo-broker/src/test/resources/org/apache/activemq/apollo/broker/destination-config.xml (from r1039378, activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/jaxb/testSimpleConfig.xml)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/resources/org/apache/activemq/apollo/broker/destination-config.xml?p2=activemq/activemq-apollo/trunk/apollo-broker/src/test/resources/org/apache/activemq/apollo/broker/destination-config.xml&p1=activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/jaxb/testSimpleConfig.xml&r1=1039378&r2=1040082&rev=1040082&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/jaxb/testSimpleConfig.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/resources/org/apache/activemq/apollo/broker/destination-config.xml Mon Nov 29 12:14:59 2010
@@ -15,18 +15,18 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
-<broker xmlns="http://activemq.apache.org/schema/activemq/apollo">
- <dispatcher name="test dispatcher" threads="4"/>
+<broker xmlns="http://activemq.apache.org/schema/activemq/apollo" id="default">
- <transport-server>pipe://test1</transport-server>
+ <virtual-host id="default">
+ <host-name>test</host-name>
- <connector/>
- <connector>pipe://test1</connector>
+ <destination path="unified.*" unified="true"/>
+ <destination path="notunified.*" unified="false"/>
+
+ <queue destination="unified.a" kind="ptp" queue-buffer="333"/>
+ <queue destination="unified.*" kind="ds" queue-buffer="444"/>
+ <queue queue-buffer="111"/>
- <virtual-host>
- <host-name>localhost</host-name>
- <host-name>test.localhost</host-name>
- <memory-store/>
</virtual-host>
</broker>
Added: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/DestinationConfigurationTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/DestinationConfigurationTest.scala?rev=1040082&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/DestinationConfigurationTest.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/DestinationConfigurationTest.scala Mon Nov 29 12:14:59 2010
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker
+
+import org.fusesource.hawtbuf.Buffer._
+import scala.util.continuations._
+import org.apache.activemq.apollo.util.{ServiceControl, FunSuiteSupport}
+import org.apache.activemq.apollo.dto.{BindingDTO, DurableSubscriptionBindingDTO, PointToPointBindingDTO}
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class DestinationConfigurationTest extends FunSuiteSupport {
+
+ test("Simple Config") {
+ val uri = "xml:classpath:org/apache/activemq/apollo/broker/destination-config.xml"
+ info("Loading broker configuration from the classpath with URI: " + uri)
+ val broker = BrokerFactory.createBroker(uri)
+ ServiceControl.start(broker, "broker")
+
+ val host = broker.config.virtual_hosts.get(0)
+
+ expect("test") {
+ host.host_names.get(0)
+ }
+
+ // Let make sure we are reading in the expected config..
+ expect(2) {
+ host.destinations.size
+ }
+ expect(3) {
+ host.queues.size
+ }
+
+ val router = broker.default_virtual_host.router
+
+ def check_tune_queue_buffer(expected:Int)(dto:BindingDTO) = {
+ var actual=0
+ reset {
+ var q = router.create_queue(dto).get
+ actual = q.tune_queue_buffer
+ }
+ expect(expected) {actual}
+ }
+
+ check_tune_queue_buffer(333) {
+ var p = new PointToPointBindingDTO()
+ p.destination = "unified.a"
+ p
+ }
+ check_tune_queue_buffer(444) {
+ val p = new DurableSubscriptionBindingDTO()
+ p.destination = "unified.b"
+ p.client_id = "a"
+ p.subscription_id = "b"
+ p
+ }
+
+ check_tune_queue_buffer(111) {
+ var p = new PointToPointBindingDTO()
+ p.destination = "notunified.other"
+ p
+ }
+
+ def dest(v:String) = Binding.destination_parser.parsePath(ascii(v))
+ expect(true) {
+ router.destinations.chooseValue(dest("unified.a")).unified
+ }
+ expect(false) {
+ router.destinations.chooseValue(dest("notunified.other")).unified
+ }
+ ServiceControl.stop(broker, "broker")
+ }
+
+}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/jaxb/XMLBrokerFactoryTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/jaxb/XMLBrokerFactoryTest.scala?rev=1040082&r1=1040081&r2=1040082&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/jaxb/XMLBrokerFactoryTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/jaxb/XMLBrokerFactoryTest.scala Mon Nov 29 12:14:59 2010
@@ -31,9 +31,6 @@ class XMLBrokerFactoryTest extends FunSu
info("Loading broker configuration from the classpath with URI: " + uri)
val broker = BrokerFactory.createBroker(uri)
- // assertEquals(4, p.getSize())
- // assertEquals("test dispatcher", p.getName())
-
expect(1) {
broker.config.connectors.size()
}
@@ -50,10 +47,6 @@ class XMLBrokerFactoryTest extends FunSu
broker.config.virtual_hosts.size()
}
- // Assert.assertNotNull(broker.defaultVirtualHost().getDatabase())
- // Assert.assertNotNull(broker.defaultVirtualHost().getDatabase().getStore())
- // Assert.assertTrue((broker.defaultVirtualHost().getDatabase().getStore() instanceof MemoryStore))
-
}
def expectException(msg: String = "Expected exeception.")(func: => Unit) = {
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/jaxb/testSimpleConfig.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/jaxb/testSimpleConfig.xml?rev=1040082&r1=1040081&r2=1040082&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/jaxb/testSimpleConfig.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/jaxb/testSimpleConfig.xml Mon Nov 29 12:14:59 2010
@@ -16,11 +16,7 @@
limitations under the License.
-->
<broker xmlns="http://activemq.apache.org/schema/activemq/apollo">
- <dispatcher name="test dispatcher" threads="4"/>
- <transport-server>pipe://test1</transport-server>
-
- <connector/>
<connector>pipe://test1</connector>
<virtual-host>
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala?rev=1040082&r1=1040081&r2=1040082&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala Mon Nov 29 12:14:59 2010
@@ -208,13 +208,15 @@ abstract class BrokerPerfSupport extends
config
}
+ val parser = new DestinationParser
+
def createDestinations(destCount: Int): Array[Destination] = {
var dests = new Array[Destination](destCount)
for (i <- 0 until destCount) {
val domain = if (PTP) {Router.QUEUE_DOMAIN} else {Router.TOPIC_DOMAIN}
val name = new AsciiBuffer("dest" + (i + 1))
- var bean = new SingleDestination(domain, name)
+ var bean = new SingleDestination(domain, parser.parsePath(name))
dests(i) = bean
// if (PTP) {
// sendBroker.defaultVirtualHost.createQueue(dests(i))
Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java?rev=1040082&r1=1040081&r2=1040082&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java Mon Nov 29 12:14:59 2010
@@ -26,17 +26,13 @@ import java.util.ArrayList;
*/
@XmlRootElement(name = "destination")
@XmlAccessorType(XmlAccessType.FIELD)
-public class DestinationDTO {
+public class DestinationDTO extends StringIdDTO {
/**
- * The name or wild card name of the destination
+ * The path to the destination. You can use wild cards.
*/
- public String name;
-
- /**
- * The kind of destination, "queue" or "topic"
- */
- public String kind;
+ @XmlAttribute
+ public String path;
/**
* If set to true, then routing then there is no difference between
@@ -44,8 +40,7 @@ public class DestinationDTO {
* a queue subscriptions is created, it will act like if a durable
* subscription was created on the topic.
*/
+ @XmlAttribute
public Boolean unified;
-
-
}
Added: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java?rev=1040082&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java (added)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java Mon Nov 29 12:14:59 2010
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.dto;
+
+import org.codehaus.jackson.annotate.JsonProperty;
+
+import javax.xml.bind.annotation.*;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+@XmlRootElement(name = "queue")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class QueueDTO {
+
+ /*
+ * The destination this queue is associated with. You can use wild cards.
+ */
+ @XmlAttribute
+ public String destination;
+
+ /*
+ * The kind of queue. It may be "ptp" for standard
+ * point to point queues or "ds" for durable subscriptions.
+ * If not set, then this configuration applies to all queue types.
+ */
+ @XmlAttribute
+ public String kind;
+
+ /**
+ * If the kind is "ds" then you can specify which client
+ * id this configuration should match.
+ */
+ @XmlAttribute(name="client-id")
+ public String client_id;
+
+ /**
+ * If the kind is "ds" then you can specify which subscription
+ * id this configuration should match.
+ */
+ @XmlAttribute(name="subscription-id")
+ public String subscription_id;
+
+
+ /**
+ * The amount of memory buffer space for receiving messages.
+ */
+ @XmlAttribute(name="producer-buffer")
+ @JsonProperty("producer_buffer")
+ public Integer producer_buffer;
+
+ /**
+ * The amount of memory buffer space for the queue..
+ */
+ @XmlAttribute(name="queue-buffer")
+ @JsonProperty("queue_buffer")
+ public Integer queue_buffer;
+
+ /**
+ * The amount of memory buffer space to use per subscription.
+ */
+ @XmlAttribute(name="consumer-buffer")
+ @JsonProperty("consumer_buffer")
+ public Integer consumer_buffer;
+
+ /**
+ * Subscribers that consume slower than this rate per seconds will be considered
+ * slow. Once a consumer is considered slow, we may switch to disk spooling.
+ */
+ @XmlAttribute(name="slow-subscription-rate")
+ @JsonProperty("slow_subscription_rate")
+ public Integer slow_subscription_rate;
+
+ /**
+ * The number of milliseconds between slow consumer checks.
+ */
+ @XmlAttribute(name="slow-check-interval")
+ @JsonProperty("slow_check_interval")
+ public Long slow_check_interval;
+
+ /**
+ * Should this queue persistently store it's entries?
+ */
+ @XmlAttribute(name="persistent")
+ @JsonProperty("persistent")
+ public Boolean persistent;
+
+ /**
+ * Should messages be flushed or swapped out of memory if
+ * no consumers need the message?
+ */
+ @XmlAttribute(name="flush-to-store")
+ @JsonProperty("flush_to_store")
+ public Boolean flush_to_store;
+
+ /**
+ * The number max number of flushed queue entries to load
+ * for the store at a time. Not that Flushed entires are just
+ * reference pointers to the actual messages. When not loaded,
+ * the batch is referenced as sequence range to conserve memory.
+ */
+ @XmlAttribute(name="flush-range-size")
+ @JsonProperty("flush_range_size")
+ public Integer flush_range_size;
+
+ /**
+ * The number of intervals that a consumer must not meeting the subscription rate before it is
+ * flagged as a slow consumer.
+ */
+ @XmlAttribute(name="max-slow-intervals")
+ @JsonProperty("max_slow_intervals")
+ public Integer max_slow_intervals;
+
+}
Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java?rev=1040082&r1=1040081&r2=1040082&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java Mon Nov 29 12:14:59 2010
@@ -53,9 +53,17 @@ public class VirtualHostDTO extends Serv
* Holds the configuration for the destinations.
*/
@XmlElement(name="destination")
+ @JsonProperty("destinations")
public ArrayList<DestinationDTO> destinations = new ArrayList<DestinationDTO>();
/**
+ * Holds the configuration for the queues.
+ */
+ @XmlElement(name="queue")
+ @JsonProperty("queues")
+ public ArrayList<QueueDTO> queues = new ArrayList<QueueDTO>();
+
+ /**
* Should connections get regroups so they get serviced by the same thread?
*/
@XmlAttribute(name="regroup-connections")
Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index?rev=1040082&r1=1040081&r2=1040082&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index Mon Nov 29 12:14:59 2010
@@ -40,4 +40,6 @@ VirtualHostStatusDTO
StompConnectionStatusDTO
KeyStorageDTO
SimpleStoreStatusDTO
-NullStoreDTO
\ No newline at end of file
+NullStoreDTO
+QueueDTO
+DestinationDTO
\ No newline at end of file
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=1040082&r1=1040081&r2=1040082&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala Mon Nov 29 12:14:59 2010
@@ -305,14 +305,17 @@ object Stomp {
val DURABLE_PREFIX = ascii("durable:")
val DURABLE_QUEUE_KIND = ascii("stomp:sub")
- val options = new ParserOptions
- options.queuePrefix = ascii("/queue/")
- options.topicPrefix = ascii("/topic/")
+ val destination_parser = new DestinationParser
+ destination_parser.queue_prefix = ascii("/queue/")
+ destination_parser.topic_prefix = ascii("/topic/")
+ destination_parser.path_seperator = ascii("/")
+ destination_parser.any_child_wildcard = ascii("*")
+ destination_parser.any_descendant_wildcard = ascii("**")
- options.defaultDomain = Router.QUEUE_DOMAIN
+ destination_parser.default_domain = Router.QUEUE_DOMAIN
implicit def toDestination(value:AsciiBuffer):Destination = {
- val d = DestinationParser.parse(value, options)
+ val d = destination_parser.parse(value)
if( d==null ) {
throw new ProtocolException("Invalid stomp destiantion name: "+value);
}
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=1040082&r1=1040081&r2=1040082&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Mon Nov 29 12:14:59 2010
@@ -743,7 +743,7 @@ class StompProtocolHandler extends Proto
// way again)
if (topic) {
val rc = new DurableSubscriptionBindingDTO
- rc.destination = destination.getName.toString
+ rc.destination = Binding.encode(destination.path)
// TODO:
// rc.client_id =
rc.subscription_id = if( persistent ) id else null
@@ -751,7 +751,7 @@ class StompProtocolHandler extends Proto
rc
} else {
val rc = new PointToPointBindingDTO
- rc.destination = destination.getName.toString
+ rc.destination = Binding.encode(destination.path)
rc
}
}
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala?rev=1040082&r1=1040081&r2=1040082&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala Mon Nov 29 12:14:59 2010
@@ -39,9 +39,9 @@ class StompRemoteConsumer extends Remote
outboundSink.refiller = ^ {}
val stompDestination = if (destination.getDomain() == Router.QUEUE_DOMAIN) {
- ascii("/queue/" + destination.getName().toString());
+ ascii("/queue/" + destination.path().toString());
} else {
- ascii("/topic/" + destination.getName().toString());
+ ascii("/topic/" + destination.path().toString());
}
var frame = StompFrame(CONNECT);
@@ -150,9 +150,9 @@ class StompRemoteProducer extends Remote
outboundSink.refiller = ^ {drain}
if (destination.getDomain() == Router.QUEUE_DOMAIN) {
- stompDestination = ascii("/queue/" + destination.getName().toString());
+ stompDestination = ascii("/queue/" + destination.path().toString());
} else {
- stompDestination = ascii("/topic/" + destination.getName().toString());
+ stompDestination = ascii("/topic/" + destination.path().toString());
}
outboundSink.offer(StompFrame(CONNECT));
send_next
Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/AnyChildPathNode.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/AnyChildPathNode.java?rev=1040082&r1=1040081&r2=1040082&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/AnyChildPathNode.java (original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/AnyChildPathNode.java Mon Nov 29 12:14:59 2010
@@ -34,14 +34,14 @@ public class AnyChildPathNode<Value> imp
this.node = node;
}
- public void appendMatchingValues(Set<Value> answer, ArrayList<AsciiBuffer> paths, int startIndex) {
+ public void appendMatchingValues(Set<Value> answer, Path[] paths, int startIndex) {
for (PathNode<Value> child : getChildNodes()) {
child.appendMatchingValues(answer, paths, startIndex);
}
}
- public void appendMatchingWildcards(Set<Value> answer, ArrayList<AsciiBuffer> paths, int startIndex) {
+ public void appendMatchingWildcards(Set<Value> answer, Path[] paths, int startIndex) {
for (PathNode<Value> child : getChildNodes()) {
child.appendMatchingWildcards(answer, paths, startIndex);
}
@@ -54,7 +54,7 @@ public class AnyChildPathNode<Value> imp
}
}
- public PathNode<Value> getChild(AsciiBuffer path) {
+ public PathNode<Value> getChild(Path path) {
final Collection<PathNode<Value>> list = new ArrayList<PathNode<Value>>();
for (PathNode<Value> child : getChildNodes()) {
PathNode<Value> answer = child.getChild(path);
Added: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/Path.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/Path.java?rev=1040082&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/Path.java (added)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/Path.java Mon Nov 29 12:14:59 2010
@@ -0,0 +1,20 @@
+package org.apache.activemq.apollo.util.path;
+
+/**
+ * Holds the delimiters used to parse paths.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract public class Path {
+
+ public boolean matches(Path p) {
+ return true;
+ }
+
+ abstract public String toString(PathParser parser);
+
+ public boolean isLiteral() {
+ return false;
+ }
+
+}
Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathFilter.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathFilter.java?rev=1040082&r1=1040081&r2=1040082&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathFilter.java (original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathFilter.java Mon Nov 29 12:14:59 2010
@@ -27,48 +27,8 @@ import org.fusesource.hawtbuf.AsciiBuffe
*
* @version $Revision: 1.3 $
*/
-public abstract class PathFilter {
+public interface PathFilter {
- public static final AsciiBuffer ANY_DESCENDENT = new AsciiBuffer(">");
- public static final AsciiBuffer ANY_CHILD = new AsciiBuffer("*");
-
- public abstract boolean matches(AsciiBuffer path);
+ public boolean matches(Path[] path);
- public static PathFilter parseFilter(AsciiBuffer path) {
- if( containsWildCards(path) ) {
- ArrayList<AsciiBuffer> paths = PathSupport.parse(path);
- int idx = paths.size() - 1;
- if (idx >= 0) {
- AsciiBuffer lastPath = paths.get(idx);
- if (lastPath.equals(ANY_DESCENDENT)) {
- return new PrefixPathFilter(paths);
- } else {
- while (idx >= 0) {
- lastPath = paths.get(idx--);
- if (lastPath.equals(ANY_CHILD)) {
- return new WildcardPathFilter(paths);
- }
- }
- }
- }
- }
-
- // if none of the paths contain a wildcard then use equality
- return new SimplePathFilter(path);
- }
-
- public static boolean containsWildCards(AsciiBuffer path) {
- byte b1 = ANY_DESCENDENT.getData()[0];
- byte b2 = ANY_CHILD.getData()[0];
-
- byte[] data = path.getData();
- int length = path.getOffset()+path.getLength();
- for (int i = path.getOffset(); i < length; i++) {
- if( data[i] == b1 || data[i]==b2 ) {
- return true;
- }
- }
- return false;
- }
-
}
Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMap.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMap.java?rev=1040082&r1=1040081&r2=1040082&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMap.java (original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMap.java Mon Nov 29 12:14:59 2010
@@ -39,8 +39,6 @@ import org.fusesource.hawtbuf.AsciiBuffe
* @version $Revision: 1.3 $
*/
public class PathMap<Value> {
- protected static final AsciiBuffer ANY_DESCENDENT = PathFilter.ANY_DESCENDENT;
- protected static final AsciiBuffer ANY_CHILD = PathFilter.ANY_CHILD;
private final PathMapNode<Value> root = new PathMapNode<Value>(null);
@@ -54,20 +52,18 @@ public class PathMap<Value> {
* @return a List of matching values or an empty list if there are no
* matching values.
*/
- public Set<Value> get(AsciiBuffer key) {
+ public Set<Value> get(Path[] key) {
return findWildcardMatches(key);
}
- public void put(AsciiBuffer key, Value value) {
- ArrayList<AsciiBuffer> paths = PathSupport.parse(key);
- root.add(paths, 0, value);
+ public void put(Path[] key, Value value) {
+ root.add(key, 0, value);
}
/**
* Removes the value from the associated path
*/
- public void remove(AsciiBuffer key, Value value) {
- ArrayList<AsciiBuffer> paths = PathSupport.parse(key);
+ public void remove(Path[] paths, Value value) {
root.remove(paths, 0, value);
}
@@ -78,20 +74,7 @@ public class PathMap<Value> {
// Implementation methods
// -------------------------------------------------------------------------
-
- /**
- * A helper method to allow the path map to be populated from a
- * dependency injection framework such as Spring
- */
- @SuppressWarnings("unchecked")
- protected void setEntries(List<PathMapEntry> entries) {
- for (PathMapEntry entry : entries) {
- put(entry.getKey(), (Value) entry);
- }
- }
-
- protected Set<Value> findWildcardMatches(AsciiBuffer key) {
- ArrayList<AsciiBuffer> paths = PathSupport.parse(key);
+ protected Set<Value> findWildcardMatches(Path[] paths) {
HashSet<Value> answer = new HashSet<Value>();
root.appendMatchingValues(answer, paths, 0);
return answer;
@@ -101,10 +84,9 @@ public class PathMap<Value> {
* @param key
* @return
*/
- public Set<Value> removeAll(AsciiBuffer key) {
+ public Set<Value> removeAll(Path[] key) {
HashSet<Value> rc = new HashSet<Value>();
- ArrayList<AsciiBuffer> paths = PathSupport.parse(key);
- root.removeAll(rc, paths, 0);
+ root.removeAll(rc, key, 0);
return rc;
}
@@ -112,11 +94,11 @@ public class PathMap<Value> {
* Returns the value which matches the given path or null if there is
* no matching value. If there are multiple values, the results are sorted
* and the last item (the biggest) is returned.
- *
+ *
* @param path the path to find the value for
* @return the largest matching value or null if no value matches
*/
- public Value chooseValue(AsciiBuffer path) {
+ public Value chooseValue(Path[] path) {
Set<Value> set = get(path);
if (set == null || set.isEmpty()) {
return null;
Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMapNode.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMapNode.java?rev=1040082&r1=1040081&r2=1040082&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMapNode.java (original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMapNode.java Mon Nov 29 12:14:59 2010
@@ -24,22 +24,18 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.fusesource.hawtbuf.AsciiBuffer;
-
/**
* An implementation class used to implement {@link PathMap}
*
* @version $Revision: 1.2 $
*/
public class PathMapNode<Value> implements PathNode<Value> {
- protected static final AsciiBuffer ANY_CHILD = PathMap.ANY_CHILD;
- protected static final AsciiBuffer ANY_DESCENDENT = PathMap.ANY_DESCENDENT;
// we synchronize at the PathMap level
private PathMapNode<Value> parent;
private List<Value> values = new ArrayList<Value>();
- private Map<AsciiBuffer, PathNode<Value>> childNodes = new HashMap<AsciiBuffer, PathNode<Value>>();
- private AsciiBuffer path = new AsciiBuffer("Root");
+ private Map<Path, PathNode<Value>> childNodes = new HashMap<Path, PathNode<Value>>();
+ private Path path = PathParser.ROOT;
private int pathLength;
public PathMapNode(PathMapNode<Value> parent) {
@@ -55,7 +51,7 @@ public class PathMapNode<Value> implemen
* Returns the child node for the given named path or null if it does not
* exist
*/
- public PathMapNode<Value> getChild(AsciiBuffer path) {
+ public PathMapNode<Value> getChild(Path path) {
return (PathMapNode<Value>)childNodes.get(path);
}
@@ -74,12 +70,12 @@ public class PathMapNode<Value> implemen
* Returns the child node for the given named path, lazily creating one if
* it does not yet exist
*/
- public PathMapNode<Value> getChildOrCreate(AsciiBuffer asciiBuffer) {
- PathMapNode<Value> answer = (PathMapNode<Value>)childNodes.get(asciiBuffer);
+ public PathMapNode<Value> getChildOrCreate(Path path) {
+ PathMapNode<Value> answer = (PathMapNode<Value>)childNodes.get(path);
if (answer == null) {
answer = createChildNode();
- answer.path = asciiBuffer;
- childNodes.put(asciiBuffer, answer);
+ answer.path = path;
+ childNodes.put(path, answer);
}
return answer;
}
@@ -124,8 +120,8 @@ public class PathMapNode<Value> implemen
return answer;
}
- public void add(ArrayList<AsciiBuffer> paths, int idx, Value value) {
- if (idx >= paths.size()) {
+ public void add(Path[] paths, int idx, Value value) {
+ if (idx >= paths.length) {
values.add(value);
} else {
// if (idx == paths.size() - 1) {
@@ -134,12 +130,12 @@ public class PathMapNode<Value> implemen
// else {
// getAnyChildNode().add(paths, idx + 1, value);
// }
- getChildOrCreate(paths.get(idx)).add(paths, idx + 1, value);
+ getChildOrCreate(paths[idx]).add(paths, idx + 1, value);
}
}
- public void remove(ArrayList<AsciiBuffer> paths, int idx, Value value) {
- if (idx >= paths.size()) {
+ public void remove(Path[] paths, int idx, Value value) {
+ if (idx >= paths.length) {
values.remove(value);
pruneIfEmpty();
} else {
@@ -149,23 +145,23 @@ public class PathMapNode<Value> implemen
// else {
// getAnyChildNode().remove(paths, idx + 1, value);
// }
- getChildOrCreate(paths.get(idx)).remove(paths, ++idx, value);
+ getChildOrCreate(paths[idx]).remove(paths, ++idx, value);
}
}
- public void removeAll(Set<Value> answer, ArrayList<AsciiBuffer> paths, int startIndex) {
+ public void removeAll(Set<Value> answer, Path[] paths, int startIndex) {
PathNode<Value> node = this;
- int size = paths.size();
+ int size = paths.length;
for (int i = startIndex; i < size && node != null; i++) {
- AsciiBuffer path = paths.get(i);
- if (path.equals(ANY_DESCENDENT)) {
+ Path path = paths[i];
+ if (path == PathParser.ANY_DESCENDANT) {
answer.addAll(node.removeDesendentValues());
break;
}
node.appendMatchingWildcards(answer, paths, i);
- if (path.equals(ANY_CHILD)) {
+ if (path == PathParser.ANY_CHILD ) {
// node = node.getAnyChildNode();
node = new AnyChildPathNode<Value>(node);
} else {
@@ -203,27 +199,27 @@ public class PathMapNode<Value> implemen
/**
* Matches any entries in the map containing wildcards
*/
- public void appendMatchingWildcards(Set<Value> answer, ArrayList<AsciiBuffer> paths, int idx) {
+ public void appendMatchingWildcards(Set<Value> answer, Path[] paths, int idx) {
if (idx - 1 > pathLength) {
return;
}
- PathMapNode<Value> wildCardNode = getChild(ANY_CHILD);
+ PathMapNode<Value> wildCardNode = getChild(PathParser.ANY_CHILD);
if (wildCardNode != null) {
wildCardNode.appendMatchingValues(answer, paths, idx + 1);
}
- wildCardNode = getChild(ANY_DESCENDENT);
+ wildCardNode = getChild(PathParser.ANY_DESCENDANT);
if (wildCardNode != null) {
answer.addAll(wildCardNode.getDesendentValues());
}
}
- public void appendMatchingValues(Set<Value> answer, ArrayList<AsciiBuffer> paths, int startIndex) {
+ public void appendMatchingValues(Set<Value> answer, Path[] paths, int startIndex) {
PathNode<Value> node = this;
boolean couldMatchAny = true;
- int size = paths.size();
+ int size = paths.length;
for (int i = startIndex; i < size && node != null; i++) {
- AsciiBuffer path = paths.get(i);
- if (path.equals(ANY_DESCENDENT)) {
+ Path path = paths[i];
+ if (path.equals(PathParser.ANY_DESCENDANT)) {
answer.addAll(node.getDesendentValues());
couldMatchAny = false;
break;
@@ -231,7 +227,7 @@ public class PathMapNode<Value> implemen
node.appendMatchingWildcards(answer, paths, i);
- if (path.equals(ANY_CHILD)) {
+ if (path.equals(PathParser.ANY_CHILD)) {
node = new AnyChildPathNode<Value>(node);
} else {
node = node.getChild(path);
@@ -241,7 +237,7 @@ public class PathMapNode<Value> implemen
answer.addAll(node.getValues());
if (couldMatchAny) {
// lets allow FOO.BAR to match the FOO.BAR.> entry in the map
- PathNode<Value> child = node.getChild(ANY_DESCENDENT);
+ PathNode<Value> child = node.getChild(PathParser.ANY_DESCENDANT);
if (child != null) {
answer.addAll(child.getValues());
}
@@ -249,7 +245,7 @@ public class PathMapNode<Value> implemen
}
}
- public AsciiBuffer getPath() {
+ public Path getPath() {
return path;
}
Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathNode.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathNode.java?rev=1040082&r1=1040081&r2=1040082&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathNode.java (original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathNode.java Mon Nov 29 12:14:59 2010
@@ -27,15 +27,15 @@ import org.fusesource.hawtbuf.AsciiBuffe
*
*/
public interface PathNode<Value> {
- void appendMatchingValues(Set<Value> answer, ArrayList<AsciiBuffer> paths, int startIndex);
+ void appendMatchingValues(Set<Value> answer, Path[] paths, int startIndex);
- void appendMatchingWildcards(Set<Value> answer, ArrayList<AsciiBuffer> paths, int startIndex);
+ void appendMatchingWildcards(Set<Value> answer, Path[] paths, int startIndex);
void appendDescendantValues(Set<Value> answer);
Collection<Value> getDesendentValues();
- PathNode<Value> getChild(AsciiBuffer path);
+ PathNode<Value> getChild(Path path);
Collection<Value> getValues();
Added: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathParser.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathParser.java?rev=1040082&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathParser.java (added)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathParser.java Mon Nov 29 12:14:59 2010
@@ -0,0 +1,237 @@
+package org.apache.activemq.apollo.util.path;
+
+import org.fusesource.hawtbuf.AsciiBuffer;
+import org.fusesource.hawtbuf.ByteArrayOutputStream;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedList;
+
+/**
+ * Holds the delimiters used to parse paths.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class PathParser {
+
+ public static final RootPath ROOT = new RootPath();
+ public static final AnyDescendantPath ANY_DESCENDANT = new AnyDescendantPath();
+ public static final AnyChildPath ANY_CHILD = new AnyChildPath();
+
+ public AsciiBuffer any_descendant_wildcard = new AsciiBuffer("**");
+ public AsciiBuffer any_child_wildcard = new AsciiBuffer("*");
+ public AsciiBuffer path_seperator = new AsciiBuffer(".");
+
+ private static class RootPath extends Path {
+ public String toString(PathParser parser) {
+ return "";
+ }
+ public boolean matches(Path p) {
+ return p == ROOT;
+ }
+ }
+
+ private static class AnyChildPath extends Path {
+ public String toString(PathParser parser) {
+ return parser.any_child_wildcard.toString();
+ }
+ }
+
+ private static class AnyDescendantPath extends Path {
+ public String toString(PathParser parser) {
+ return parser.any_descendant_wildcard.toString();
+ }
+ }
+
+ class LiteralPath extends Path {
+
+ private final AsciiBuffer value;
+
+ public LiteralPath(AsciiBuffer value) {
+ this.value = value;
+ }
+ public boolean isLiteral() {
+ return true;
+ }
+
+ public String toString(PathParser parser) {
+ return value.toString();
+ }
+
+ public boolean matches(Path p) {
+ if( p.isLiteral() ) {
+ return ((LiteralPath)p).value.equals(value);
+ }
+ // we match any type of wildcard..
+ return true;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ LiteralPath that = (LiteralPath) o;
+ return value.equals(that.value);
+ }
+
+ @Override
+ public int hashCode() {
+ return value.hashCode();
+ }
+ }
+
+ public Path[] parsePath(AsciiBuffer subject) {
+ ArrayList<Path> list = new ArrayList<Path>(10);
+ int previous = 0;
+ int lastIndex = subject.getLength() - 1;
+ while (true) {
+ int idx = subject.indexOf(path_seperator, previous);
+ if (idx < 0) {
+ AsciiBuffer buffer = subject.slice(previous, lastIndex + 1).ascii();
+ list.add(parsePart(buffer));
+ break;
+ }
+ AsciiBuffer buffer = subject.slice(previous, idx).ascii();
+ list.add(parsePart(buffer));
+ previous = idx + 1;
+ }
+ return list.toArray(new Path[list.size()]);
+ }
+
+ private Path parsePart(AsciiBuffer value) {
+ if( value.equals(any_child_wildcard) ) {
+ return ANY_CHILD;
+ } else if( value.equals(any_descendant_wildcard) ) {
+ return ANY_DESCENDANT;
+ } else {
+ return new LiteralPath(value);
+ }
+ }
+
+ /**
+ * Converts the paths back to the string representation.
+ *
+ * @param paths
+ * @return
+ */
+ public String toString(Path[] paths) {
+ StringBuffer buffer = new StringBuffer();
+ for (int i = 0; i < paths.length; i++) {
+ if (i > 0) {
+ buffer.append(path_seperator);
+ }
+ buffer.append(paths[i].toString(this));
+ }
+ return buffer.toString();
+ }
+
+ public void write(Path[] paths, ByteArrayOutputStream os) {
+ StringBuffer buffer = new StringBuffer();
+ for (int i = 0; i < paths.length; i++) {
+ if (i > 0) {
+ buffer.append(path_seperator);
+ }
+ buffer.append(paths[i].toString(this));
+ }
+ }
+
+ static interface PartFilter {
+ public boolean matches(LinkedList<Path> remaining);
+ }
+
+ class LitteralPathFilter implements PartFilter {
+
+ private final PartFilter next;
+ private final LiteralPath path;
+
+ public LitteralPathFilter(PartFilter next, LiteralPath path) {
+ this.next = next;
+
+ this.path = path;
+ }
+ public boolean matches(LinkedList<Path> remaining) {
+ if( !remaining.isEmpty() ) {
+ Path p = remaining.removeFirst();
+ if( !path.matches(p) ) {
+ return false;
+ }
+ if( next!=null ) {
+ return next.matches(remaining);
+ } else {
+ return remaining.isEmpty();
+ }
+ } else {
+ return false;
+ }
+ }
+ }
+
+ static class AnyChildPathFilter implements PartFilter {
+ private final PartFilter next;
+
+ public AnyChildPathFilter(PartFilter next) {
+ this.next = next;
+ }
+ public boolean matches(LinkedList<Path> remaining) {
+ if( !remaining.isEmpty() ) {
+ Path p = remaining.removeFirst();
+ if( next!=null ) {
+ return next.matches(remaining);
+ } else {
+ return remaining.isEmpty();
+ }
+ } else {
+ return false;
+ }
+ }
+ }
+
+ static class AnyDecendentPathFilter implements PartFilter {
+ private final PartFilter next;
+
+ public AnyDecendentPathFilter(PartFilter next) {
+ this.next = next;
+ }
+ public boolean matches(LinkedList<Path> remaining) {
+ if( !remaining.isEmpty() ) {
+ remaining.clear();
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
+
+ public PathFilter parseFilter(AsciiBuffer path) {
+ Path[] paths = parsePath(path);
+ Collections.reverse(Arrays.asList(paths));
+ PartFilter last = null;
+ for( Path p: paths ) {
+ if( p.isLiteral() ) {
+ last = new LitteralPathFilter(last, (LiteralPath)p);
+ } else if( p == ANY_CHILD ) {
+ last = new AnyChildPathFilter(last);
+ } else if( p == ANY_DESCENDANT ) {
+ last = new AnyDecendentPathFilter(last);
+ }
+ }
+ final PartFilter filter = last;
+ return new PathFilter() {
+ public boolean matches(Path[] path) {
+ return filter.matches(new LinkedList(Arrays.asList(path)));
+ }
+ };
+ }
+
+ static public boolean containsWildCards(Path[] paths) {
+ for(Path p:paths) {
+ if( p==ANY_DESCENDANT || p==ANY_CHILD) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+}
Modified: activemq/activemq-apollo/trunk/apollo-util/src/test/scala/org/apache/activemq/apollo/util/path/PathMapMemoryTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/test/scala/org/apache/activemq/apollo/util/path/PathMapMemoryTest.java?rev=1040082&r1=1040081&r2=1040082&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/test/scala/org/apache/activemq/apollo/util/path/PathMapMemoryTest.java (original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/test/scala/org/apache/activemq/apollo/util/path/PathMapMemoryTest.java Mon Nov 29 12:14:59 2010
@@ -28,9 +28,15 @@ import static org.junit.Assert.*;
*/
public class PathMapMemoryTest {
+ PathParser parser = new PathParser();
+
+ protected Path[] createDestination(String name) {
+ return parser.parsePath(new AsciiBuffer(name));
+ }
+
@Test()
public void testLongPath() throws Exception {
- AsciiBuffer d1 = new AsciiBuffer("1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18");
+ Path[] d1 = createDestination("1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18");
PathMap<String> map = new PathMap<String>();
map.put(d1, "test");
}
@@ -45,7 +51,7 @@ public class PathMapMemoryTest {
}
// System.out.println("Checking: " + name);
try {
- AsciiBuffer d1 = new AsciiBuffer(name);
+ Path[] d1 = createDestination(name);
PathMap<String> map = new PathMap<String>();
map.put(d1, "test");
} catch (Throwable e) {
@@ -60,11 +66,11 @@ public class PathMapMemoryTest {
Object value = new Object();
int count = 1000;
for (int i = 0; i < count; i++) {
- AsciiBuffer queue = new AsciiBuffer("connection:"+i);
+ Path[] queue = createDestination("connection:"+i);
map.put(queue, value);
}
for (int i = 0; i < count; i++) {
- AsciiBuffer queue = new AsciiBuffer("connection:"+i);
+ Path[] queue = createDestination("connection:"+i);
map.remove(queue, value);
Set<Object> set = map.get(queue);
assertTrue(set.isEmpty());
Modified: activemq/activemq-apollo/trunk/apollo-util/src/test/scala/org/apache/activemq/apollo/util/path/PathMapTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/test/scala/org/apache/activemq/apollo/util/path/PathMapTest.java?rev=1040082&r1=1040081&r2=1040082&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/test/scala/org/apache/activemq/apollo/util/path/PathMapTest.java (original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/test/scala/org/apache/activemq/apollo/util/path/PathMapTest.java Mon Nov 29 12:14:59 2010
@@ -33,9 +33,11 @@ import static org.junit.Assert.*;
*/
public class PathMapTest {
- protected AsciiBuffer d1 = createDestination("TEST.D1");
- protected AsciiBuffer d2 = createDestination("TEST.BAR.D2");
- protected AsciiBuffer d3 = createDestination("TEST.BAR.D3");
+ PathParser parser = new PathParser();
+
+ protected Path[] d1 = createDestination("TEST.D1");
+ protected Path[] d2 = createDestination("TEST.BAR.D2");
+ protected Path[] d3 = createDestination("TEST.BAR.D3");
protected String v1 = "value1";
protected String v2 = "value2";
@@ -46,8 +48,8 @@ public class PathMapTest {
@Test()
public void testCompositePaths() throws Exception {
- AsciiBuffer d1 = createDestination("TEST.BAR.D2");
- AsciiBuffer d2 = createDestination("TEST.BAR.D3");
+ Path[] d1 = createDestination("TEST.BAR.D2");
+ Path[] d2 = createDestination("TEST.BAR.D3");
PathMap<String> map = new PathMap<String>();
map.put(d1, v1);
map.put(d2, v2);
@@ -113,11 +115,11 @@ public class PathMapTest {
map.put(d2, v2);
map.put(d3, v3);
- assertMapValue(map, ">", v1, v2, v3);
- assertMapValue(map, "TEST.>", v1, v2, v3);
- assertMapValue(map, "*.>", v1, v2, v3);
+ assertMapValue(map, "**", v1, v2, v3);
+ assertMapValue(map, "TEST.**", v1, v2, v3);
+ assertMapValue(map, "*.**", v1, v2, v3);
- assertMapValue(map, "FOO.>");
+ assertMapValue(map, "FOO.**");
}
@Test()
@@ -163,9 +165,9 @@ public class PathMapTest {
assertMapValue(map, "TEST.*", v1, v2);
assertMapValue(map, "TEST.*.*", v2, v3, v4, v5, v6);
- assertMapValue(map, "TEST.*.>", v1, v2, v3, v4, v5, v6);
- assertMapValue(map, "TEST.>", v1, v2, v3, v4, v5, v6);
- assertMapValue(map, "TEST.>.>", v1, v2, v3, v4, v5, v6);
+ assertMapValue(map, "TEST.*.**", v1, v2, v3, v4, v5, v6);
+ assertMapValue(map, "TEST.**", v1, v2, v3, v4, v5, v6);
+ assertMapValue(map, "TEST.**.**", v1, v2, v3, v4, v5, v6);
assertMapValue(map, "*.*.D3", v2, v3, v5);
assertMapValue(map, "TEST.BAR.*", v2, v5, v6);
@@ -194,7 +196,7 @@ public class PathMapTest {
@Test()
public void testAnyPathWildcardInMap() throws Exception {
PathMap<String> map = new PathMap<String>();
- put(map, "TEST.FOO.>", v1);
+ put(map, "TEST.FOO.**", v1);
assertMapValue(map, "TEST.FOO.BAR.WHANOT.A.B.C", v1);
assertMapValue(map, "TEST.FOO.BAR.WHANOT", v1);
@@ -245,12 +247,12 @@ public class PathMapTest {
assertMapValue(map, "TEST.*", v3, v4);
assertMapValue(map, "*.*", v3, v4);
- remove(map, ">", v4);
+ remove(map, "**", v4);
assertMapValue(map, "TEST.*", v3);
assertMapValue(map, "*.*", v3);
- remove(map, "TEST.>", v3);
+ remove(map, "TEST.**", v3);
remove(map, "TEST.FOO.BAR", v5);
assertMapValue(map, "FOO");
@@ -272,7 +274,7 @@ public class PathMapTest {
assertSample2(map);
- remove(map, ">", v4);
+ remove(map, "**", v4);
remove(map, "TEST.*", v2);
assertMapValue(map, "FOO");
@@ -296,22 +298,22 @@ public class PathMapTest {
PathMap<String> map = new PathMap<String>();
put(map, "FOO.A", v1);
- assertMapValue(map, "FOO.>", v1);
+ assertMapValue(map, "FOO.**", v1);
put(map, "FOO.B", v2);
- assertMapValue(map, "FOO.>", v1, v2);
+ assertMapValue(map, "FOO.**", v1, v2);
map.removeAll(createDestination("FOO.A"));
- assertMapValue(map, "FOO.>", v2);
+ assertMapValue(map, "FOO.**", v2);
}
protected void loadSample2(PathMap<String> map) {
put(map, "TEST.FOO", v1);
put(map, "TEST.*", v2);
- put(map, "TEST.>", v3);
- put(map, ">", v4);
+ put(map, "TEST.**", v3);
+ put(map, "**", v4);
put(map, "TEST.FOO.BAR", v5);
put(map, "TEST.XYZ", v6);
}
@@ -338,17 +340,17 @@ public class PathMapTest {
}
protected void remove(PathMap<String> map, String name, String value) {
- AsciiBuffer destination = createDestination(name);
+ Path[] destination = createDestination(name);
map.remove(destination, value);
}
protected void assertMapValue(PathMap<String> map, String destinationName, Object... expected) {
- AsciiBuffer destination = createDestination(destinationName);
+ Path[] destination = createDestination(destinationName);
assertMapValue(map, destination, expected);
}
@SuppressWarnings("unchecked")
- protected void assertMapValue(PathMap<String> map, AsciiBuffer destination, Object... expected) {
+ protected void assertMapValue(PathMap<String> map, Path[] destination, Object... expected) {
List expectedList = Arrays.asList(expected);
Collections.sort(expectedList);
Set actualSet = map.get(destination);
@@ -357,7 +359,7 @@ public class PathMapTest {
assertEquals(("map value for destinationName: " + destination), expectedList, actual);
}
- protected AsciiBuffer createDestination(String name) {
- return new AsciiBuffer(new AsciiBuffer(name));
+ protected Path[] createDestination(String name) {
+ return parser.parsePath(new AsciiBuffer(name));
}
}