You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/01/26 04:10:37 UTC
svn commit: r1063582 [1/3] - in /activemq/activemq-apollo/trunk:
apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-broker/src/main/scala/org/apache/acti...
Author: chirino
Date: Wed Jan 26 03:10:35 2011
New Revision: 1063582
URL: http://svn.apache.org/viewvc?rev=1063582&view=rev
Log:
Renamed Router to LocalRouter and extracted a Router trait from it.
decoupling destination from queue. we now have seperate topic and queue domains.
Added:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/DestinationParser.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala
- copied, changed from r1062213, activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDTO.java
- copied, changed from r1062213, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/SubscriptionBindingDTO.java
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDestinationDTO.java
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDestinationDTO.java
- copied, changed from r1062213, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BindingDTO.java
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicAclDTO.java
- copied, changed from r1062213, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationAclDTO.java
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicDTO.java
- copied, changed from r1062213, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueBindingDTO.java
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicDestinationDTO.java
- copied, changed from r1062213, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationAclDTO.java
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicStatusDTO.java
- copied, changed from r1062213, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationStatusDTO.java
activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/TopicStatusDTO.jade
- copied, changed from r1062213, activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/DestinationStatusDTO.jade
Removed:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Destinations.scala
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BindingDTO.java
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationAclDTO.java
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationStatusDTO.java
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueBindingDTO.java
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/SubscriptionBindingDTO.java
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TempBindingDTO.java
activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/DestinationStatusDTO.jade
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/binding-factory.index
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/AclAuthorizer.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/Authorizer.scala
activemq/activemq-apollo/trunk/apollo-broker/src/test/resources/org/apache/activemq/apollo/broker/destination-config.xml
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/DestinationConfigurationTest.scala
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/LargeInitialDB.scala
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/RemoteConnection.scala
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueAclDTO.java
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostStatusDTO.java
activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index
activemq/activemq-apollo/trunk/apollo-dto/src/test/java/org/apache/activemq/apollo/dto/XmlCodecTest.java
activemq/activemq-apollo/trunk/apollo-dto/src/test/resources/org/apache/activemq/apollo/dto/XmlCodecTest.xml
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMap.java
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathMapNode.java
activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala
activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade
activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/VirtualHostStatusDTO.jade
activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/binding-factory.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/binding-factory.index?rev=1063582&r1=1063581&r2=1063582&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/binding-factory.index (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/binding-factory.index Wed Jan 26 03:10:35 2011
@@ -14,6 +14,6 @@
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
-org.apache.activemq.apollo.broker.QueueBindingFactory
-org.apache.activemq.apollo.broker.SubscriptionBindingFactory
-org.apache.activemq.apollo.broker.TempBindingFactory
\ No newline at end of file
+org.apache.activemq.apollo.broker.QueueDomainQueueBinding
+org.apache.activemq.apollo.broker.DurableSubscriptionQueueBinding
+org.apache.activemq.apollo.broker.TempQueueBinding
\ No newline at end of file
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=1063582&r1=1063581&r2=1063582&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala Wed Jan 26 03:10:35 2011
@@ -22,8 +22,8 @@ import org.fusesource.hawtdispatch._
import protocol.{ProtocolHandler}
import org.apache.activemq.apollo.util.{Log, BaseService}
import org.apache.activemq.apollo.filter.BooleanExpression
-import org.apache.activemq.apollo.dto.ConnectionStatusDTO
import org.apache.activemq.apollo.transport.{TransportListener, DefaultTransportListener, Transport}
+import org.apache.activemq.apollo.dto.{DestinationDTO, ConnectionStatusDTO}
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -159,7 +159,7 @@ trait ConsumerContext { // extends Clien
def getConsumerId() : String
- def getDestination(): Destination
+ def getDestination(): DestinationDTO
def getSelector() : String
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1063582&r1=1063581&r2=1063582&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jan 26 03:10:35 2011
@@ -23,6 +23,7 @@ import protocol.Protocol
import org.apache.activemq.apollo.filter.Filterable
import org.apache.activemq.apollo.broker.store.{StoreUOW, MessageRecord}
import org.apache.activemq.apollo.util.{Log, Logging}
+import org.apache.activemq.apollo.dto.DestinationDTO
object DeliveryProducer extends Log
@@ -31,8 +32,8 @@ object DeliveryProducer extends Log
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-trait DeliveryProducer extends Logging {
- override protected def log:Log = DeliveryProducer
+trait DeliveryProducer {
+ import DeliveryProducer._
def dispatch_queue:DispatchQueue
@@ -115,7 +116,7 @@ trait Message extends Filterable with Re
/**
* where the message was sent to.
*/
- def destination: Destination
+ def destination: Array[DestinationDTO]
/**
* The protocol of the message
Added: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/DestinationParser.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/DestinationParser.scala?rev=1063582&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/DestinationParser.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/DestinationParser.scala Wed Jan 26 03:10:35 2011
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker
+
+import _root_.org.fusesource.hawtbuf._
+import BufferConversions._
+import Buffer._
+import org.apache.activemq.apollo.util.path.{Path, PathParser}
+import scala.collection.mutable.ListBuffer
+import org.apache.activemq.apollo.dto.{TopicDestinationDTO, QueueDestinationDTO, DestinationDTO}
+
+object DestinationParser {
+
+ val default = new DestinationParser
+
+ def encode_path(value:Path) = default.toString(value)
+ def decode_path(value:String) = default.parsePath(ascii(value))
+
+ def encode_destination(value:Array[DestinationDTO]) = default.toString(value)
+ def decode_destination(value:String) = default.parse(ascii(value))
+
+ def create_destination(domain:AsciiBuffer, name:String) = {
+ Array(domain match {
+ case LocalRouter.QUEUE_DOMAIN => new QueueDestinationDTO(name)
+ case LocalRouter.TOPIC_DOMAIN => new TopicDestinationDTO(name)
+ case _ => throw new Exception("Uknown destination domain: "+domain);
+ })
+ }
+
+}
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class DestinationParser extends PathParser {
+ import DestinationParser._
+
+ var default_domain: AsciiBuffer = null
+ var queue_prefix: AsciiBuffer = ascii("queue:")
+ var topic_prefix: AsciiBuffer = ascii("topic:")
+ var temp_queue_prefix: AsciiBuffer = ascii("temp-queue:")
+ var temp_topic_prefix: AsciiBuffer = ascii("temp-topic:")
+ var destination_separator: Option[Byte] = Some(','.toByte)
+
+ def toBuffer(value: Array[DestinationDTO]): AsciiBuffer = {
+ if (value == null) {
+ null
+ } else {
+ val baos = new ByteArrayOutputStream
+ val first = true
+ value.foreach { d =>
+ if (!first) {
+ assert( destination_separator.isDefined )
+ baos.write(destination_separator.get)
+ }
+ d match {
+ case d:QueueDestinationDTO =>
+ baos.write(queue_prefix)
+ case d:TopicDestinationDTO =>
+ baos.write(topic_prefix)
+// case Router.TEMP_QUEUE_DOMAIN =>
+// baos.write(temp_queue_prefix)
+// case Router.TEMP_TOPIC_DOMAIN =>
+// baos.write(temp_topic_prefix)
+ case _ =>
+ throw new Exception("Uknown destination type: "+d.getClass);
+ }
+ ascii(d.name).writeTo(baos)
+ }
+ baos.toBuffer.ascii
+ }
+ }
+
+ def toString(value:Array[DestinationDTO]) = toBuffer(value).toString
+
+ /**
+ * Parses a destination which may or may not be a composite.
+ *
+ * @param value
+ * @param compositeSeparator
+ * @return
+ */
+ def parse(value: AsciiBuffer): Array[DestinationDTO] = {
+ if (value == null) {
+ return null;
+ }
+
+ if (destination_separator.isDefined && value.contains(destination_separator.get)) {
+ var rc = value.split(destination_separator.get);
+ var dl = ListBuffer[DestinationDTO]()
+ for (buffer <- rc) {
+ val d = parse(buffer)
+ if (d == null) {
+ return null;
+ }
+ dl += d(0)
+ }
+ return dl.toArray
+ } else {
+
+ if (queue_prefix != null && value.startsWith(queue_prefix)) {
+ var name = value.slice(queue_prefix.length, value.length).ascii()
+ return create_destination(LocalRouter.QUEUE_DOMAIN, name.toString)
+ } else if (topic_prefix != null && value.startsWith(topic_prefix)) {
+ var name = value.slice(topic_prefix.length, value.length).ascii()
+ return create_destination(LocalRouter.TOPIC_DOMAIN, name.toString)
+// } else if (temp_queue_prefix != null && value.startsWith(temp_queue_prefix)) {
+// var name = value.slice(temp_queue_prefix.length, value.length).ascii()
+// return new DestinationDTO(LocalRouter.TEMP_QUEUE_DOMAIN, name.toString)
+// } else if (temp_topic_prefix != null && value.startsWith(temp_topic_prefix)) {
+// var name = value.slice(temp_topic_prefix.length, value.length).ascii()
+// return new DestinationDTO(LocalRouter.TEMP_TOPIC_DOMAIN, name.toString)
+ } else {
+ if (default_domain == null) {
+ return null;
+ }
+ return create_destination(default_domain, value.toString)
+ }
+ }
+ }
+}
+
Added: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1063582&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala Wed Jan 26 03:10:35 2011
@@ -0,0 +1,726 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker
+
+import _root_.org.fusesource.hawtbuf._
+import org.fusesource.hawtdispatch._
+import collection.JavaConversions
+import org.apache.activemq.apollo.util._
+import collection.mutable.HashMap
+import org.apache.activemq.apollo.broker.store.QueueRecord
+import Buffer._
+import org.apache.activemq.apollo.util.path.{Path, Part, PathMap, PathParser}
+import java.util.ArrayList
+import org.apache.activemq.apollo.dto._
+import security.SecurityContext
+import java.util.concurrent.TimeUnit
+
+trait DomainDestination {
+
+ def id:Long
+ def name:String
+
+ def can_bind(destination:DestinationDTO, consumer:DeliveryConsumer, security:SecurityContext):Boolean
+ def bind (destination:DestinationDTO, consumer:DeliveryConsumer)
+ def unbind (consumer:DeliveryConsumer, persistent:Boolean)
+
+ def can_connect(destination:DestinationDTO, producer:BindableDeliveryProducer, security:SecurityContext):Boolean
+ def connect (destination:DestinationDTO, producer:BindableDeliveryProducer)
+ def disconnect (producer:BindableDeliveryProducer)
+
+}
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object LocalRouter extends Log {
+ val TOPIC_DOMAIN = ascii("topic");
+ val QUEUE_DOMAIN = ascii("queue");
+ val TEMP_TOPIC_DOMAIN = ascii("temp-topic");
+ val TEMP_QUEUE_DOMAIN = ascii("temp-queue");
+
+ val QUEUE_KIND = ascii("queue");
+ val DEFAULT_QUEUE_PATH = ascii("default");
+
+ class ConsumerContext(val destination:DestinationDTO, val consumer:DeliveryConsumer, val security:SecurityContext) {
+ override def hashCode: Int = consumer.hashCode
+
+ override def equals(obj: Any): Boolean = {
+ obj match {
+ case x:ConsumerContext=> x.consumer == consumer
+ case _ => false
+ }
+ }
+ }
+
+ class ProducerContext(val destination:DestinationDTO, val producer:BindableDeliveryProducer, val security:SecurityContext) {
+ override def hashCode: Int = producer.hashCode
+
+ override def equals(obj: Any): Boolean = {
+ obj match {
+ case x:ProducerContext=> x.producer == producer
+ case _ => false
+ }
+ }
+ }
+}
+
+
+/**
+ * Provides a non-blocking concurrent producer to consumer
+ * routing implementation.
+ *
+ * DeliveryProducers create a route object for each destination
+ * they will be producing to. Once the route is
+ * connected to the router, the producer can use
+ * the route.targets list without synchronization to
+ * get the current set of consumers that are bound
+ * to the destination.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class LocalRouter(val host:VirtualHost) extends BaseService with Router {
+ import LocalRouter._
+
+ protected def dispatch_queue:DispatchQueue = host.dispatch_queue
+
+ def auto_create_destinations = {
+ import OptionSupport._
+ host.config.auto_create_destinations.getOrElse(true)
+ }
+
+ private val ALL = new Path({
+ val rc = new ArrayList[Part](1)
+ rc.add(Part.ANY_DESCENDANT)
+ rc
+ })
+
+ trait Domain[D <: DomainDestination] {
+
+ // holds all the destinations in the domain by id
+ var destination_by_id = HashMap[Long, D]()
+ // holds all the destinations in the domain by path
+ var destination_by_path = new PathMap[D]()
+ // Can store consumers on wild cards paths
+
+ val consumers_by_path = new PathMap[ConsumerContext]()
+ val producers_by_path = new PathMap[ProducerContext]()
+
+ def destinations:Iterable[D] = JavaConversions.asScalaIterable(destination_by_path.get(ALL))
+
+ def get_destination_matches(path:Path) = {
+ import JavaConversions._
+ asScalaIterable(destination_by_path.get( path ))
+ }
+
+ def create_destination(path:Path, security:SecurityContext):Result[D,String]
+
+ def get_or_create_destination(path:Path, security:SecurityContext):Result[D,String] = {
+ Option(destination_by_path.chooseValue(path)).
+ map(Success(_)).
+ getOrElse( create_destination(path, security))
+ }
+
+ def add_destination(path:Path, dest:D) = {
+ destination_by_path.put(path, dest)
+ destination_by_id.put(dest.id, dest)
+
+ // binds any matching wild card subs and producers...
+ import JavaConversions._
+ consumers_by_path.get( path ).foreach { x=>
+ if( dest.can_bind(x.destination, x.consumer, x.security) ) {
+ dest.bind(x.destination, x.consumer)
+ }
+ }
+ producers_by_path.get( path ).foreach { x=>
+ if( dest.can_connect(x.destination, x.producer, x.security) ) {
+ dest.connect(x.destination, x.producer)
+ }
+ }
+ }
+
+ def remove_destination(path:Path, dest:D) = {
+ destination_by_path.remove(path, dest)
+ destination_by_id.remove(dest.id)
+ }
+
+ def can_bind(path:Path, destination:DestinationDTO, consumer:DeliveryConsumer, security:SecurityContext):Result[Zilch, String] = {
+
+ val wildcard = PathParser.containsWildCards(path)
+ var matches = get_destination_matches(path)
+
+ // Should we attempt to auto create the destination?
+ if( !wildcard ) {
+ if ( matches.isEmpty && auto_create_destinations ) {
+ val rc = create_destination(path, security)
+ if( rc.failed ) {
+ return rc.map_success(_=> Zilch);
+ }
+ matches = get_destination_matches(path)
+ }
+ if( matches.isEmpty ) {
+ return Failure("The destination does not exist.")
+ }
+
+ matches.foreach { dest =>
+ if( !dest.can_bind(destination, consumer, security) ) {
+ return Failure("Not authorized to reveive from the destination.")
+ }
+ }
+ }
+ Success(Zilch)
+ }
+
+ def bind(path:Path, destination:DestinationDTO, consumer:DeliveryConsumer, security:SecurityContext):Unit = {
+ var matches = get_destination_matches(path)
+ matches.foreach { dest=>
+ if( dest.can_bind(destination, consumer, security) ) {
+ dest.bind(destination, consumer)
+ }
+ }
+ consumer.retain
+ consumers_by_path.put(path, new ConsumerContext(destination, consumer, security))
+ }
+
+ def unbind(destination:DestinationDTO, consumer:DeliveryConsumer, persistent:Boolean) = {
+ val path = DestinationParser.decode_path(destination.name)
+ if( consumers_by_path.remove(path, new ConsumerContext(destination, consumer, null) ) ) {
+ get_destination_matches(path).foreach{ dest=>
+ dest.unbind(consumer, persistent)
+ }
+ consumer.release
+ }
+
+// if( persistent ) {
+// destroy_queue(consumer.binding, security_context).failure_option.foreach{ reason=>
+// async_die(reason)
+// }
+// }
+
+ }
+
+ def can_connect(path:Path, destination:DestinationDTO, producer:BindableDeliveryProducer, security:SecurityContext):Result[Zilch, String] = {
+
+ val wildcard = PathParser.containsWildCards(path)
+ var matches = get_destination_matches(path)
+
+ // Should we attempt to auto create the destination?
+ if( !wildcard ) {
+ if ( matches.isEmpty && auto_create_destinations ) {
+ val rc = create_destination(path, security)
+ if( rc.failed ) {
+ return rc.map_success(_=> Zilch);
+ }
+ matches = get_destination_matches(path)
+ }
+ if( matches.isEmpty ) {
+ return Failure("The destination does not exist.")
+ }
+
+ matches.foreach { dest =>
+ if( !dest.can_connect(destination, producer, security) ) {
+ return Failure("Not authorized to send to the destination.")
+ }
+ }
+ }
+ Success(Zilch)
+
+ }
+
+ def connect(path:Path, destination:DestinationDTO, producer:BindableDeliveryProducer, security:SecurityContext):Unit = {
+ var matches = get_destination_matches(path)
+ matches.foreach { dest=>
+ if( dest.can_connect(destination, producer, security) ) {
+ dest.connect(destination, producer)
+ }
+ }
+ producers_by_path.put(path, new ProducerContext(destination, producer, security))
+ }
+
+ def disconnect(destination:DestinationDTO, producer:BindableDeliveryProducer) = {
+ val path = DestinationParser.decode_path(destination.name)
+ get_destination_matches(path).foreach { dest=>
+ dest.disconnect(producer)
+ }
+ producer.release
+ }
+
+ }
+
+ object topic_domain extends Domain[Topic] {
+
+ val topic_id_counter = new LongCounter
+
+ // Stores durable subscription queues.
+ val durable_subscriptions_by_path = new PathMap[Queue]()
+ val durable_subscriptions_by_id = HashMap[(String,String), Queue]()
+
+
+ override def can_bind(path:Path, destination:DestinationDTO, consumer:DeliveryConsumer, security:SecurityContext):Result[Zilch, String] = {
+ var rc = super.can_bind(path, destination, consumer, security)
+ if( !rc.failed ) {
+ destination match {
+ case destination:DurableSubscriptionDestinationDTO=>
+ // So the user can subscribe to the topic.. but can he create durable sub??
+ val qc = ds_config(destination)
+ if( !can_create_ds(qc, security) ) {
+ return Failure("Not authorized to create the durable subscription.")
+ }
+ case _ =>
+ }
+ }
+ rc
+ }
+
+ def get_or_create_durable_subscription(destination:DurableSubscriptionDestinationDTO):Queue = {
+ durable_subscriptions_by_id.get( (destination.client_id, destination.subscription_id) ).getOrElse {
+ val binding = QueueBinding.create(destination)
+ val qc = ds_config(destination)
+ _create_queue(-1, binding, qc)
+ }
+ }
+
+ def destroy_durable_subscription(queue:Queue):Unit = {
+ val destination = queue.binding.binding_dto.asInstanceOf[DurableSubscriptionDestinationDTO]
+ if( durable_subscriptions_by_id.remove( (destination.client_id, destination.subscription_id) ).isDefined ) {
+ val path = queue.binding.destination
+ durable_subscriptions_by_path.remove(path, queue)
+ var matches = get_destination_matches(path)
+ matches.foreach( _.unbind_durable_subscription(destination, queue) )
+ _destroy_queue(queue.id, null)
+ }
+ }
+
+ def topic_config(name:Path):TopicDTO = {
+ import collection.JavaConversions._
+ import DestinationParser.default._
+ import AsciiBuffer._
+ host.config.topics.find( x=> parseFilter(ascii(x.name)).matches(name) ).getOrElse(new TopicDTO)
+ }
+
+ def can_create_ds(config:DurableSubscriptionDTO, security:SecurityContext) = {
+ if( host.authorizer==null || security==null) {
+ true
+ } else {
+ host.authorizer.can_create(security, host, config)
+ }
+ }
+
+ def ds_config(destination:DurableSubscriptionDestinationDTO):DurableSubscriptionDTO = {
+ import collection.JavaConversions._
+ import DestinationParser.default._
+ import AsciiBuffer._
+
+ val name = DestinationParser.decode_path(destination.name)
+ def matches(x:DurableSubscriptionDTO):Boolean = {
+ if( x.name != null && !parseFilter(ascii(x.name)).matches(name)) {
+ return false
+ }
+ if( x.client_id != null && x.client_id!=x.client_id ) {
+ return false
+ }
+ if( x.subscription_id != null && x.subscription_id!=x.subscription_id ) {
+ return false
+ }
+ true
+ }
+ host.config.durable_subscriptions.find(matches _).getOrElse(new DurableSubscriptionDTO)
+ }
+
+ def bind(queue:Queue) = {
+
+ val destination = queue.binding.binding_dto.asInstanceOf[DurableSubscriptionDestinationDTO]
+ val path = queue.binding.destination
+ val wildcard = PathParser.containsWildCards(path)
+ var matches = get_destination_matches(path)
+
+ // We may need to create the topic...
+ if( !wildcard && matches.isEmpty ) {
+ create_destination(path, null)
+ matches = get_destination_matches(path)
+ }
+
+ durable_subscriptions_by_path.put(path, queue)
+ durable_subscriptions_by_id.put((destination.client_id, destination.subscription_id), queue)
+
+ matches.foreach( _.bind_durable_subscription(destination, queue) )
+ }
+
+ def unbind(queue:Queue) = {
+ val path = queue.binding.destination
+ durable_subscriptions_by_path.remove(path, queue)
+ }
+
+ def create_destination(path:Path, security:SecurityContext):Result[Topic,String] = {
+
+ // We can't create a wild card destination.. only wild card subscriptions.
+ assert( !PathParser.containsWildCards(path) )
+
+ // A new destination is being created...
+ val dto = topic_config(path)
+
+ if( host.authorizer!=null && security!=null && !host.authorizer.can_create(security, host, dto)) {
+ return new Failure("Not authorized to create the destination")
+ }
+
+ val id = topic_id_counter.incrementAndGet
+ val topic = new Topic(LocalRouter.this, DestinationParser.encode_path(path), dto, id)
+ add_destination(path, topic)
+ Success(topic)
+ }
+
+ }
+
+ object queue_domain extends Domain[Queue] {
+
+ def config(binding:QueueBinding):QueueDTO = {
+ import collection.JavaConversions._
+ import DestinationParser.default._
+
+ def matches(x:QueueDTO):Boolean = {
+ if( x.name != null && !parseFilter(ascii(x.name)).matches(binding.destination)) {
+ return false
+ }
+ true
+ }
+ host.config.queues.find(matches _).getOrElse(new QueueDTO)
+ }
+
+ def can_create_queue(config:QueueDTO, security:SecurityContext) = {
+ if( host.authorizer==null || security==null) {
+ true
+ } else {
+ host.authorizer.can_create(security, host, config)
+ }
+ }
+
+ def bind(queue:Queue) = {
+ val path = queue.binding.destination
+ assert( !PathParser.containsWildCards(path) )
+ add_destination(path, queue)
+ }
+
+ def unbind(queue:Queue) = {
+ val path = queue.binding.destination
+ remove_destination(path, queue)
+ }
+
+ def create_destination(path: Path, security: SecurityContext) = {
+ val dto = new QueueDestinationDTO
+ dto.name = DestinationParser.encode_path(path)
+
+ val binding = QueueDomainQueueBinding.create(dto)
+ val qc = config(binding)
+ if( can_create_queue(qc, security) ) {
+ val queue = _create_queue(-1, binding, qc)
+ import OptionSupport._
+ if( qc.unified.getOrElse(false) ) {
+ // hook up the queue to be a subscriber of the topic.
+ val topic = topic_domain.get_or_create_destination(path, null).success
+ topic.bind(null, queue)
+ }
+ Success(queue)
+ } else {
+ Failure("Not authorized to create the queue")
+ }
+
+ }
+ }
+
+ /////////////////////////////////////////////////////////////////////////////
+ //
+ // life cycle methods.
+ //
+ /////////////////////////////////////////////////////////////////////////////
+
+ protected def _start(on_completed: Runnable) = {
+ val tracker = new LoggingTracker("router startup", dispatch_queue)
+ if( host.store!=null ) {
+ val task = tracker.task("list_queues")
+ host.store.list_queues { queue_keys =>
+ for( queue_key <- queue_keys) {
+ val task = tracker.task("load queue: "+queue_key)
+ // Use a global queue to so we concurrently restore
+ // the queues.
+ globalQueue {
+ host.store.get_queue(queue_key) { x =>
+ x match {
+ case Some(record)=>
+ dispatch_queue {
+ _create_queue(record.key, QueueBinding.create(record.binding_kind, record.binding_data), null)
+ task.run
+ }
+ case _ => task.run
+ }
+ }
+ }
+ }
+ task.run
+ }
+ }
+
+ import OptionSupport._
+ if(host.config.regroup_connections.getOrElse(false)) {
+ schedule_connection_regroup
+ }
+
+ tracker.callback(on_completed)
+ }
+
+ protected def _stop(on_completed: Runnable) = {
+ val tracker = new LoggingTracker("router shutdown", dispatch_queue)
+ queues_by_id.valuesIterator.foreach { queue=>
+ tracker.stop(queue)
+ }
+ tracker.callback(on_completed)
+ }
+
+
+ // Try to periodically re-balance connections so that consumers/producers
+ // are grouped onto the same thread.
+ def schedule_connection_regroup:Unit = dispatch_queue.after(1, TimeUnit.SECONDS) {
+ if(service_state.is_started) {
+ connection_regroup
+ schedule_connection_regroup
+ }
+ }
+
+ def connection_regroup = {
+ // this should really be much more fancy. It should look at the messaging
+ // rates between producers and consumers, look for natural data flow partitions
+ // and then try to equally divide the load over the available processing
+ // threads/cores.
+
+
+
+ // For the topics, just collocate the producers onto the first consumer's thread.
+ topic_domain.destinations.foreach { node =>
+
+ node.consumers.headOption.foreach{ consumer =>
+ node.producers.foreach { r=>
+ r.collocate(consumer.dispatch_queue)
+ }
+ }
+ }
+
+
+ queue_domain.destinations.foreach { queue=>
+ queue.dispatch_queue {
+
+ // Collocate the queue's with the first consumer
+ // TODO: change this so it collocates with the fastest consumer.
+
+ queue.all_subscriptions.headOption.map( _._1 ).foreach { consumer=>
+ queue.collocate( consumer.dispatch_queue )
+ }
+
+ // Collocate all the producers with the queue..
+
+ queue.inbound_sessions.foreach { session =>
+ session.producer.collocate( queue.dispatch_queue )
+ }
+ }
+
+ }
+ }
+
+ /////////////////////////////////////////////////////////////////////////////
+ //
+ // destination/domain management methods.
+ //
+ /////////////////////////////////////////////////////////////////////////////
+
+ def domain(destination: DestinationDTO) = destination match {
+ case x:TopicDestinationDTO => topic_domain
+ case x:DurableSubscriptionDestinationDTO => topic_domain
+ case x:QueueDestinationDTO => queue_domain
+ case _ => throw new RuntimeException("Unknown domain type: "+destination.getClass)
+ }
+
+ def bind(destination: Array[DestinationDTO], consumer: DeliveryConsumer, security: SecurityContext) = {
+ consumer.retain
+ val paths = destination.map(x=> (DestinationParser.decode_path(x.name), x) )
+ dispatch_queue ! {
+ val failures = paths.map(x=> domain(x._2).can_bind(x._1, x._2, consumer, security) ).flatMap( _.failure_option )
+ val rc = if( !failures.isEmpty ) {
+ Failure(failures.mkString("; "))
+ } else {
+ paths.foreach { x=>
+ domain(x._2).bind(x._1, x._2, consumer, security)
+ }
+ Success(Zilch)
+ }
+ consumer.release
+ rc
+ }
+ }
+
+ def unbind(destinations: Array[DestinationDTO], consumer: DeliveryConsumer, persistent:Boolean=false) = {
+ consumer.retain
+ dispatch_queue {
+ destinations.foreach { destination=>
+ domain(destination).unbind(destination, consumer, persistent)
+ }
+ consumer.release
+ }
+ }
+
+ def connect(destinations: Array[DestinationDTO], producer: BindableDeliveryProducer, security: SecurityContext) = {
+ producer.retain
+ val paths = destinations.map(x=> (DestinationParser.decode_path(x.name), x) )
+ dispatch_queue ! {
+
+ val failures = paths.map(x=> domain(x._2).can_connect(x._1, x._2, producer, security) ).flatMap( _.failure_option )
+
+ if( !failures.isEmpty ) {
+ producer.release
+ Failure(failures.mkString("; "))
+ } else {
+ paths.foreach { x=>
+ domain(x._2).connect(x._1, x._2, producer, security)
+ }
+ producer.connected()
+ Success(Zilch)
+ }
+ }
+ }
+
+ def disconnect(destinations:Array[DestinationDTO], producer:BindableDeliveryProducer) = {
+ dispatch_queue {
+ destinations.foreach { destination=>
+ domain(destination).disconnect(destination, producer)
+ }
+ producer.disconnected()
+ producer.release()
+ }
+ }
+
+ def get_or_create_destination(id: DestinationDTO, security: SecurityContext) = dispatch_queue ! {
+ _get_or_create_destination(id, security)
+ }
+
+ /**
+ * Returns the previously created queue if it already existed.
+ */
+ def _get_or_create_destination(dto: DestinationDTO, security:SecurityContext): Result[DomainDestination, String] = {
+ val path = DestinationParser.decode_path(dto.name)
+ domain(dto).get_or_create_destination(path, security)
+ }
+
+
+ /////////////////////////////////////////////////////////////////////////////
+ //
+ // Queue management methods. Queues are multi-purpose and get used by both
+ // the queue domain and topic domain.
+ //
+ /////////////////////////////////////////////////////////////////////////////
+
+ var queues_by_binding = HashMap[QueueBinding, Queue]()
+ var queues_by_id = HashMap[Long, Queue]()
+
+ /**
+ * Gets an existing queue.
+ */
+ def get_queue(dto:DestinationDTO) = dispatch_queue ! {
+ queues_by_binding.get(QueueBinding.create(dto))
+ }
+
+ /**
+ * Gets an existing queue.
+ */
+ def get_queue(id:Long) = dispatch_queue ! {
+ queues_by_id.get(id)
+ }
+
+ def _create_queue(id:Long, binding:QueueBinding, config:QueueDTO):Queue = {
+
+ var qid = id
+ if( qid == -1 ) {
+ qid = host.queue_id_counter.incrementAndGet
+ }
+
+ val queue = new Queue(this, qid, binding, config)
+ if( queue.tune_persistent && id == -1 ) {
+
+ val record = new QueueRecord
+ record.key = qid
+ record.binding_data = binding.binding_data
+ record.binding_kind = binding.binding_kind
+
+ host.store.add_queue(record) { rc => Unit }
+
+ }
+
+ queue.start
+ queues_by_binding.put(binding, queue)
+ queues_by_id.put(queue.id, queue)
+
+ // this causes the queue to get registered in the right location in
+ // the router.
+ binding.bind(this, queue)
+ queue
+ }
+
+ /**
+ * Returns true if the queue no longer exists.
+ */
+ def destroy_queue(id:Long, security:SecurityContext) = dispatch_queue ! { _destroy_queue(id,security) }
+
+ def _destroy_queue(id:Long, security:SecurityContext):Result[Zilch, String] = {
+ queues_by_id.get(id) match {
+ case Some(queue) =>
+ _destroy_queue(queue,security)
+ case None =>
+ Failure("Does not exist")
+ }
+ }
+
+ /**
+ * Returns true if the queue no longer exists.
+ */
+ def destroy_queue(dto:DestinationDTO, security:SecurityContext) = dispatch_queue ! { _destroy_queue(dto, security) }
+
+ def _destroy_queue(dto:DestinationDTO, security:SecurityContext):Result[Zilch, String] = {
+ queues_by_binding.get(QueueBinding.create(dto)) match {
+ case Some(queue) =>
+ _destroy_queue(queue, security)
+ case None =>
+ Failure("Does not exist")
+ }
+ }
+
+ def _destroy_queue(queue:Queue, security:SecurityContext):Result[Zilch, String] = {
+
+ if( security!=null && queue.config.acl!=null ) {
+ if( !host.authorizer.can_destroy(security, host, queue.config) ) {
+ return Failure("Not authorized to destroy")
+ }
+ }
+
+ queue.binding.unbind(this, queue)
+ queues_by_binding.remove(queue.binding)
+ queues_by_id.remove(queue.id)
+ queue.stop
+ if( queue.tune_persistent ) {
+ queue.dispatch_queue ^ {
+ host.store.remove_queue(queue.id){x=> Unit}
+ }
+ }
+ Success(Zilch)
+ }
+
+}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1063582&r1=1063581&r2=1063582&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jan 26 03:10:35 2011
@@ -19,20 +19,17 @@ package org.apache.activemq.apollo.broke
import java.util.concurrent.TimeUnit
import org.fusesource.hawtdispatch._
-import _root_.org.fusesource.hawtdispatch.ScalaDispatchHelpers._
import java.util.concurrent.atomic.AtomicInteger
-import collection.{SortedMap}
-import org.apache.activemq.apollo.broker.store.{StoreUOW}
import protocol.ProtocolFactory
import collection.mutable.ListBuffer
import org.apache.activemq.apollo.broker.store._
import org.apache.activemq.apollo.util._
import org.apache.activemq.apollo.util.list._
-import org.fusesource.hawtdispatch.{Dispatch, ListEventAggregator, DispatchQueue, BaseRetained}
-import org.apache.activemq.apollo.dto.QueueDTO
+import org.fusesource.hawtdispatch.{ListEventAggregator, DispatchQueue, BaseRetained}
import OptionSupport._
import security.SecurityContext
+import org.apache.activemq.apollo.dto.{DestinationDTO, QueueDTO}
object Queue extends Log {
val subcsription_counter = new AtomicInteger(0)
@@ -47,7 +44,9 @@ import Queue._
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class Queue(val host: VirtualHost, var id:Long, val binding:Binding, var config:QueueDTO) extends BaseRetained with Route with DeliveryConsumer with BaseService {
+class Queue(val router: LocalRouter, val id:Long, val binding:QueueBinding, var config:QueueDTO) extends BaseRetained with BindableDeliveryProducer with DeliveryConsumer with BaseService with DomainDestination {
+
+ def host = router.host
var inbound_sessions = Set[DeliverySession]()
var all_subscriptions = Map[DeliveryConsumer, Subscription]()
@@ -189,48 +188,27 @@ class Queue(val host: VirtualHost, var i
if( tune_persistent ) {
- if( id == -1 ) {
- id = host.queue_id_counter.incrementAndGet
-
- val record = new QueueRecord
- record.key = id
- record.binding_data = binding.binding_data
- record.binding_kind = binding.binding_kind
-
- host.store.add_queue(record) { rc =>
- dispatch_queue {
- completed
- }
- }
-
- } else {
-
- host.store.list_queue_entry_ranges(id, tune_swap_range_size) { ranges=>
- dispatch_queue {
- if( ranges!=null && !ranges.isEmpty ) {
-
- ranges.foreach { range =>
- val entry = new QueueEntry(Queue.this, range.first_entry_seq).init(range)
- entries.addLast(entry)
-
- message_seq_counter = range.last_entry_seq + 1
- enqueue_item_counter += range.count
- enqueue_size_counter += range.size
- tail_entry = new QueueEntry(Queue.this, next_message_seq)
- }
-
- debug("restored: "+enqueue_item_counter)
+ host.store.list_queue_entry_ranges(id, tune_swap_range_size) { ranges=>
+ dispatch_queue {
+ if( ranges!=null && !ranges.isEmpty ) {
+
+ ranges.foreach { range =>
+ val entry = new QueueEntry(Queue.this, range.first_entry_seq).init(range)
+ entries.addLast(entry)
+
+ message_seq_counter = range.last_entry_seq + 1
+ enqueue_item_counter += range.count
+ enqueue_size_counter += range.size
+ tail_entry = new QueueEntry(Queue.this, next_message_seq)
}
- completed
+
+ debug("restored: "+enqueue_item_counter)
}
+ completed
}
-
}
} else {
- if( id == -1 ) {
- id = host.queue_id_counter.incrementAndGet
- }
completed
}
}
@@ -562,12 +540,64 @@ class Queue(val host: VirtualHost, var i
def disconnected() = throw new RuntimeException("unsupported")
+ def can_bind(destination:DestinationDTO, consumer:DeliveryConsumer, security: SecurityContext):Boolean = {
+ if( host.authorizer!=null && security!=null ) {
+ if( consumer.browser ) {
+ if( !host.authorizer.can_receive_from(security, host, config) ) {
+ return false;
+ }
+ } else {
+ if( !host.authorizer.can_consume_from(security, host, config) ) {
+ return false
+ }
+ }
+ }
+ return true;
+ }
+
+ def bind(destination:DestinationDTO, consumer: DeliveryConsumer) = {
+ bind(consumer::Nil)
+ }
+ def unbind(consumer: DeliveryConsumer, persistent:Boolean) = {
+ unbind(consumer::Nil)
+ }
+
+ def can_connect(destination:DestinationDTO, producer:BindableDeliveryProducer, security:SecurityContext):Boolean = {
+ val authorizer = host.authorizer
+ if( authorizer!=null && security!=null && !authorizer.can_send_to(security, host, config) ) {
+ false
+ } else {
+ true
+ }
+ }
+
+ def connect (destination:DestinationDTO, producer:BindableDeliveryProducer) = {
+ import OptionSupport._
+ if( config.unified.getOrElse(false) ) {
+ // this is a unified queue.. actually have the produce bind to the topic, instead of the
+ val topic = router.topic_domain.get_or_create_destination(binding.destination, null).success
+ topic.connect(destination, producer)
+ } else {
+ producer.bind(this::Nil)
+ }
+ }
+
+ def disconnect (producer:BindableDeliveryProducer) = {
+ producer.unbind(this::Nil)
+ }
+
+ def name: String = binding.label
+
+ override def connection:Option[BrokerConnection] = None
+
+
/////////////////////////////////////////////////////////////////////
//
// Implementation methods.
//
/////////////////////////////////////////////////////////////////////
+
private def next_message_seq = {
val rc = message_seq_counter
message_seq_counter += 1
@@ -605,12 +635,6 @@ class Queue(val host: VirtualHost, var i
}
}
- def collocate(value:DispatchQueue):Unit = {
- if( value.getTargetQueue ne dispatch_queue.getTargetQueue ) {
- debug("co-locating %s with %s", dispatch_queue.getLabel, value.getLabel);
- this.dispatch_queue.setTargetQueue(value.getTargetQueue)
- }
- }
}
object QueueEntry extends Sizer[QueueEntry] {
Copied: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala (from r1062213, activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala?p2=activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala&p1=activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala&r1=1062213&r2=1063582&rev=1063582&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala Wed Jan 26 03:10:35 2011
@@ -22,7 +22,7 @@ import org.apache.activemq.apollo.filter
import Buffer._
import org.apache.activemq.apollo.dto._
import org.apache.activemq.apollo.util.{OptionSupport, ClassFinder}
-import org.apache.activemq.apollo.util.path.{Path, Part}
+import org.apache.activemq.apollo.util.path.Path
/**
* <p>
@@ -30,11 +30,11 @@ import org.apache.activemq.apollo.util.p
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-object BindingFactory {
+object QueueBinding {
trait Provider {
- def create(binding_kind:AsciiBuffer, binding_data:Buffer):Binding
- def create(binding_dto:BindingDTO):Binding
+ def create(binding_kind:AsciiBuffer, binding_data:Buffer):QueueBinding
+ def create(binding_dto:DestinationDTO):QueueBinding
}
def discover = {
@@ -44,7 +44,7 @@ object BindingFactory {
var providers = discover
- def create(binding_kind:AsciiBuffer, binding_data:Buffer):Binding = {
+ def create(binding_kind:AsciiBuffer, binding_data:Buffer):QueueBinding = {
providers.foreach { provider=>
val rc = provider.create(binding_kind, binding_data)
if( rc!=null ) {
@@ -53,7 +53,7 @@ object BindingFactory {
}
throw new IllegalArgumentException("Invalid binding type: "+binding_kind);
}
- def create(binding_dto:BindingDTO):Binding = {
+ def create(binding_dto:DestinationDTO):QueueBinding = {
providers.foreach { provider=>
val rc = provider.create(binding_dto)
if( rc!=null ) {
@@ -71,7 +71,7 @@ object BindingFactory {
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-trait Binding {
+trait QueueBinding {
/**
* A user friendly description of the binding.
@@ -82,80 +82,67 @@ trait Binding {
* Wires a queue into the a virtual host based on the binding information contained
* in the buffer.
*/
- def bind(node:RoutingNode, queue:Queue)
+ def bind(node:LocalRouter, queue:Queue)
- def unbind(node:RoutingNode, queue:Queue)
+ def unbind(node:LocalRouter, queue:Queue)
def binding_kind:AsciiBuffer
def binding_data:Buffer
- def binding_dto:BindingDTO
+ def binding_dto:DestinationDTO
def message_filter:BooleanExpression = ConstantExpression.TRUE
- def matches(config:QueueDTO):Boolean = {
- import DestinationParser.default._
- import OptionSupport._
- var rc = (o(config.name).map{ x=> parseFilter(ascii(x)).matches(destination) }.getOrElse(true))
- rc = rc && (o(config.kind).map{ x=> x == binding_kind.toString }.getOrElse(true))
- rc
- }
-
def destination:Path
}
-object QueueBinding {
+object QueueDomainQueueBinding extends QueueBinding.Provider {
+
val POINT_TO_POINT_KIND = new AsciiBuffer("ptp")
val DESTINATION_PATH = new AsciiBuffer("default");
-}
-
-import QueueBinding._
-
-class QueueBindingFactory extends BindingFactory.Provider {
def create(binding_kind:AsciiBuffer, binding_data:Buffer) = {
if( binding_kind == POINT_TO_POINT_KIND ) {
- val dto = new QueueBindingDTO
+ val dto = new QueueDestinationDTO
dto.name = binding_data.ascii.toString
- new QueueBinding(binding_data, dto)
+ new QueueDomainQueueBinding(binding_data, dto)
} else {
null
}
}
- def create(binding_dto:BindingDTO) = {
- if( binding_dto.isInstanceOf[QueueBindingDTO] ) {
- val ptp_dto = binding_dto.asInstanceOf[QueueBindingDTO]
+ def create(binding_dto:DestinationDTO) = {
+ if( binding_dto.isInstanceOf[QueueDestinationDTO] ) {
+ val ptp_dto = binding_dto.asInstanceOf[QueueDestinationDTO]
val data = new AsciiBuffer(ptp_dto.name).buffer
- new QueueBinding(data, ptp_dto)
+ new QueueDomainQueueBinding(data, ptp_dto)
} else {
null
}
}
}
+
/**
* <p>
* </p>
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class QueueBinding(val binding_data:Buffer, val binding_dto:QueueBindingDTO) extends Binding {
+class QueueDomainQueueBinding(val binding_data:Buffer, val binding_dto:QueueDestinationDTO) extends QueueBinding {
+
+ import QueueDomainQueueBinding._
val destination = DestinationParser.decode_path(binding_dto.name)
def binding_kind = POINT_TO_POINT_KIND
- def unbind(node: RoutingNode, queue: Queue) = {
- if( node.unified ) {
- node.remove_broadcast_consumer(queue)
- }
+ def unbind(node: LocalRouter, queue: Queue) = {
+ node.queue_domain.unbind(queue)
}
- def bind(node: RoutingNode, queue: Queue) = {
- if( node.unified ) {
- node.add_broadcast_consumer(queue)
- }
+ def bind(node: LocalRouter, queue: Queue) = {
+ node.queue_domain.bind(queue)
}
def label = binding_dto.name
@@ -163,56 +150,54 @@ class QueueBinding(val binding_data:Buff
override def hashCode = binding_kind.hashCode ^ binding_data.hashCode
override def equals(o:Any):Boolean = o match {
- case x: QueueBinding => x.binding_data == binding_data
+ case x: QueueDomainQueueBinding => x.binding_data == binding_data
case _ => false
}
}
-object SubscriptionBinding {
- val DURABLE_SUB_KIND = new AsciiBuffer("ds")
-}
+object DurableSubscriptionQueueBinding extends QueueBinding.Provider {
-import SubscriptionBinding._
+ val DURABLE_SUB_KIND = new AsciiBuffer("ds")
-class SubscriptionBindingFactory extends BindingFactory.Provider {
def create(binding_kind:AsciiBuffer, binding_data:Buffer) = {
if( binding_kind == DURABLE_SUB_KIND ) {
- new SubscriptionBinding(binding_data, JsonCodec.decode(binding_data, classOf[SubscriptionBindingDTO]))
+ new DurableSubscriptionQueueBinding(binding_data, JsonCodec.decode(binding_data, classOf[DurableSubscriptionDestinationDTO]))
} else {
null
}
}
- def create(binding_dto:BindingDTO) = {
- if( binding_dto.isInstanceOf[SubscriptionBindingDTO] ) {
- new SubscriptionBinding(JsonCodec.encode(binding_dto), binding_dto.asInstanceOf[SubscriptionBindingDTO])
+ def create(binding_dto:DestinationDTO) = {
+ if( binding_dto.isInstanceOf[DurableSubscriptionDestinationDTO] ) {
+ new DurableSubscriptionQueueBinding(JsonCodec.encode(binding_dto), binding_dto.asInstanceOf[DurableSubscriptionDestinationDTO])
} else {
null
}
}
-
}
+
/**
* <p>
* </p>
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class SubscriptionBinding(val binding_data:Buffer, val binding_dto:SubscriptionBindingDTO) extends Binding {
+class DurableSubscriptionQueueBinding(val binding_data:Buffer, val binding_dto:DurableSubscriptionDestinationDTO) extends QueueBinding {
+ import DurableSubscriptionQueueBinding._
val destination = DestinationParser.decode_path(binding_dto.name)
def binding_kind = DURABLE_SUB_KIND
- def unbind(node: RoutingNode, queue: Queue) = {
- node.remove_broadcast_consumer(queue)
+ def unbind(router: LocalRouter, queue: Queue) = {
+ router.topic_domain.unbind(queue)
}
- def bind(node: RoutingNode, queue: Queue) = {
- node.add_broadcast_consumer(queue)
+ def bind(router: LocalRouter, queue: Queue) = {
+ router.topic_domain.bind(queue)
}
def label = {
@@ -229,7 +214,7 @@ class SubscriptionBinding(val binding_da
override def hashCode = binding_kind.hashCode ^ binding_data.hashCode
override def equals(o:Any):Boolean = o match {
- case x: SubscriptionBinding => x.binding_data == binding_data
+ case x: DurableSubscriptionQueueBinding => x.binding_data == binding_data
case _ => false
}
@@ -240,42 +225,23 @@ class SubscriptionBinding(val binding_da
SelectorParser.parse(binding_dto.filter)
}
}
-
- override def matches(config: QueueDTO): Boolean = {
- import OptionSupport._
- var rc = super.matches(config)
- rc = rc && (o(config.client_id).map{ x=> x == binding_dto.client_id }.getOrElse(true))
- rc = rc && (o(config.subscription_id).map{ x=> x == binding_dto.subscription_id }.getOrElse(true))
- rc
- }
}
-object TempBinding {
+object TempQueueBinding extends QueueBinding.Provider {
val TEMP_DATA = new AsciiBuffer("")
val TEMP_KIND = new AsciiBuffer("tmp")
- val TEMP_DTO = new TempBindingDTO
-}
-
-import TempBinding._
-
-class TempBindingFactory extends BindingFactory.Provider {
+ val TEMP_DTO = null
def create(binding_kind:AsciiBuffer, binding_data:Buffer) = {
if( binding_kind == TEMP_KIND ) {
- new TempBinding("", "")
+ new TempQueueBinding("", "")
} else {
null
}
}
- def create(binding_dto:BindingDTO) = {
- if( binding_dto.isInstanceOf[TempBindingDTO] ) {
- new TempBinding("", "")
- } else {
- null
- }
- }
+ def create(binding_dto:DestinationDTO) = throw new UnsupportedOperationException
}
/**
@@ -284,7 +250,9 @@ class TempBindingFactory extends Binding
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class TempBinding(val key:AnyRef, val label:String) extends Binding {
+class TempQueueBinding(val key:AnyRef, val label:String) extends QueueBinding {
+ import TempQueueBinding._
+
def this(c:DeliveryConsumer) = this(c, c.connection.map(_.transport.getRemoteAddress).getOrElse("known") )
val destination = null
@@ -292,22 +260,16 @@ class TempBinding(val key:AnyRef, val la
def binding_dto = TEMP_DTO
def binding_data = TEMP_DATA
- def unbind(node: RoutingNode, queue: Queue) = {
- if( node.unified ) {
- node.remove_broadcast_consumer(queue)
- }
+ def unbind(router: LocalRouter, queue: Queue) = {
}
- def bind(node: RoutingNode, queue: Queue) = {
- if( node.unified ) {
- node.add_broadcast_consumer(queue)
- }
+ def bind(router: LocalRouter, queue: Queue) = {
}
override def hashCode = if(key==null) 0 else key.hashCode
override def equals(o:Any):Boolean = o match {
- case x: TempBinding => x.key == key
+ case x: TempQueueBinding => x.key == key
case _ => false
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1063582&r1=1063581&r2=1063582&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Wed Jan 26 03:10:35 2011
@@ -16,500 +16,61 @@
*/
package org.apache.activemq.apollo.broker
-import _root_.java.util.concurrent.atomic.AtomicLong
-import _root_.org.fusesource.hawtbuf._
-import _root_.org.fusesource.hawtdispatch._
import org.fusesource.hawtdispatch._
-import _root_.org.fusesource.hawtdispatch.ScalaDispatchHelpers._
-
-import collection.JavaConversions
import org.apache.activemq.apollo.util._
-import collection.mutable.{ListBuffer, HashMap}
+import path.Path
import scala.collection.immutable.List
-import org.apache.activemq.apollo.broker.store.{StoreUOW, QueueRecord}
-import Buffer._
-import org.apache.activemq.apollo.util.path.{Path, Part, PathMap, PathParser}
-import java.util.ArrayList
import org.apache.activemq.apollo.dto._
import security.SecurityContext
+import store.StoreUOW
+import util.continuations._
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-object Router extends Log {
- val TOPIC_DOMAIN = ascii("topic");
- val QUEUE_DOMAIN = ascii("queue");
- val TEMP_TOPIC_DOMAIN = ascii("temp-topic");
- val TEMP_QUEUE_DOMAIN = ascii("temp-queue");
-
- val QUEUE_KIND = ascii("queue");
- val DEFAULT_QUEUE_PATH = ascii("default");
-}
-
-/**
- * Provides a non-blocking concurrent producer to consumer
- * routing implementation.
- *
- * DeliveryProducers create a route object for each destination
- * they will be producing to. Once the route is
- * connected to the router, the producer can use
- * the route.targets list without synchronization to
- * get the current set of consumers that are bound
- * to the destination.
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-class Router(val host:VirtualHost) extends DispatchLogging {
-
- override protected def log = Router
-
- import Router._
-
- val destination_id_counter = new LongCounter
-
- protected def dispatchQueue:DispatchQueue = host.dispatch_queue
-
- var queue_bindings = HashMap[Binding, Queue]()
- var queues = HashMap[Long, Queue]()
-
- // Only stores simple paths, used for wild card lookups.
- var destinations = new PathMap[RoutingNode]()
- // Can store consumers on wild cards paths
- val broadcast_consumers = new PathMap[DeliveryConsumer]()
- // Can store bindings on wild cards paths
- val bindings = new PathMap[Queue]()
-
- private def is_topic(destination:Destination) = {
- destination.domain match {
- case TOPIC_DOMAIN => true
- case TEMP_TOPIC_DOMAIN => true
- case _ => false
- }
- }
-
- private val ALL = new Path({
- val rc = new ArrayList[Part](1)
- rc.add(Part.ANY_DESCENDANT)
- rc
- })
-
- def routing_nodes:Iterable[RoutingNode] = JavaConversions.asScalaIterable(destinations.get(ALL))
-
- def _get_or_create_destination(path:Path, security:SecurityContext) = {
- // We can't create a wild card destination.. only wild card subscriptions.
- assert( !PathParser.containsWildCards(path) )
- var rc = destinations.chooseValue( path )
- if( rc == null ) {
- _create_destination(path, security)
- } else {
- Success(rc)
- }
- }
+trait Router extends Service {
- def _get_destination(path:Path) = {
- Option(destinations.chooseValue( path ))
- }
-
- def _create_destination(path:Path, security:SecurityContext):Result[RoutingNode,String] = {
+ def host:VirtualHost
- // We can't create a wild card destination.. only wild card subscriptions.
- assert( !PathParser.containsWildCards(path) )
+ def get_queue(dto:Long):Option[Queue] @suspendable
- // A new destination is being created...
- val config = host.destination_config(path).getOrElse(new DestinationDTO)
+ def bind(destinations:Array[DestinationDTO], consumer:DeliveryConsumer, security:SecurityContext) : Result[Zilch,String] @suspendable
- if( host.authorizer!=null && security!=null && !host.authorizer.can_create(security, host, config)) {
- return new Failure("Not authorized to create the destination")
- }
+ def unbind(destinations:Array[DestinationDTO], consumer:DeliveryConsumer, persistent:Boolean=false)
- val rc = new RoutingNode(this, path, config)
- destinations.put(path, rc)
+ def connect(destinations:Array[DestinationDTO], producer:BindableDeliveryProducer, security:SecurityContext): Result[Zilch,String] @suspendable
- // bind any matching wild card subs
- import JavaConversions._
- broadcast_consumers.get( path ).foreach { c=>
- rc.add_broadcast_consumer(c)
- }
- bindings.get( path ).foreach { queue=>
- rc.add_queue(queue)
- }
- Success(rc)
- }
-
- def get_destination_matches(path:Path) = {
- import JavaConversions._
- asScalaIterable(destinations.get( path ))
- }
-
- def _create_queue(id:Long, binding:Binding, security:SecurityContext):Result[Queue,String] = {
-
- val config = host.queue_config(binding).getOrElse(new QueueDTO)
- if( host.authorizer!=null && security!=null && !host.authorizer.can_create(security, host, config) ) {
- return Failure("Not authorized to create the queue")
- }
-
- var qid = id
- if( qid == -1 ) {
- qid = host.queue_id_counter.incrementAndGet
- }
-
- val queue = new Queue(host, qid, binding, config)
- if( queue.tune_persistent && id == -1 ) {
-
- val record = new QueueRecord
- record.key = qid
- record.binding_data = binding.binding_data
- record.binding_kind = binding.binding_kind
-
- host.store.add_queue(record) { rc => Unit }
-
- }
- queue.start
- queue_bindings.put(binding, queue)
- queues.put(queue.id, queue)
-
- // Not all queues are bound to destinations.
- val name = binding.destination
- if( name!=null ) {
- bindings.put(name, queue)
- // make sure the destination is created if this is not a wild card sub
- if( !PathParser.containsWildCards(name) ) {
- _get_destination(name) match {
- case Some(node)=>
- node.add_queue(queue)
- case None=>
- _create_destination(name, null)
- }
- } else {
- get_destination_matches(name).foreach( node=>
- node.add_queue(queue)
- )
- }
-
- }
- Success(queue)
-
- }
-
- def create_queue(record:QueueRecord, security:SecurityContext) = {
- _create_queue(record.key, BindingFactory.create(record.binding_kind, record.binding_data), security)
- }
-
- /**
- * Returns the previously created queue if it already existed.
- */
- def _get_or_create_queue(dto: BindingDTO, security:SecurityContext): Result[Queue, String] = {
- val binding = BindingFactory.create(dto)
- val queue = queue_bindings.get(binding) match {
- case Some(queue) => Success(queue)
- case None => _create_queue(-1, binding, security)
- }
- queue
- }
-
- def get_or_create_queue(id:BindingDTO, security:SecurityContext) = dispatchQueue ! {
- _get_or_create_queue(id, security)
- }
-
- /**
- * Returns true if the queue no longer exists.
- */
- def destroy_queue(dto:BindingDTO, security:SecurityContext) = dispatchQueue ! { _destroy_queue(dto, security) }
-
- def _destroy_queue(dto:BindingDTO, security:SecurityContext):Result[Zilch, String] = {
- queue_bindings.get(BindingFactory.create(dto)) match {
- case Some(queue) =>
- _destroy_queue(queue, security)
- case None =>
- Failure("Does not exist")
- }
- }
-
- /**
- * Returns true if the queue no longer exists.
- */
- def destroy_queue(id:Long, security:SecurityContext) = dispatchQueue ! { _destroy_queue(id,security) }
-
- def _destroy_queue(id:Long, security:SecurityContext):Result[Zilch, String] = {
- queues.get(id) match {
- case Some(queue) =>
- _destroy_queue(queue,security)
- case None =>
- Failure("Does not exist")
- }
- }
-
- def _destroy_queue(queue:Queue, security:SecurityContext):Result[Zilch, String] = {
-
- if( security!=null && queue.config.acl!=null ) {
- if( !host.authorizer.can_destroy(security, host, queue.config) ) {
- return Failure("Not authorized to destroy")
- }
- }
-
- queue_bindings.remove(queue.binding)
- queues.remove(queue.id)
-
- val name = queue.binding.destination
- if( name!=null ) {
- get_destination_matches(name).foreach( node=>
- node.remove_queue(queue)
- )
- }
- queue.stop
- if( queue.tune_persistent ) {
- queue.dispatch_queue ^ {
- host.store.remove_queue(queue.id){x=> Unit}
- }
- }
- Success(Zilch)
- }
-
- /**
- * Gets an existing queue.
- */
- def get_queue(dto:BindingDTO) = dispatchQueue ! {
- queue_bindings.get(BindingFactory.create(dto))
- }
-
- /**
- * Gets an existing queue.
- */
- def get_queue(id:Long) = dispatchQueue ! {
- queues.get(id)
- }
-
- def bind(destination:Destination, consumer:DeliveryConsumer, security:SecurityContext) = {
- consumer.retain
- dispatchQueue ! {
-
- def do_bind:Result[Zilch, String] = {
- assert( is_topic(destination) )
- val name = destination.name
-
- // A new destination is being created...
- def config = host.destination_config(name).getOrElse(new DestinationDTO)
-
- if( host.authorizer!=null && security!=null && !host.authorizer.can_receive_from(security, host, config) ) {
- return new Failure("Not authorized to receive from the destination")
- }
-
- // make sure the destination is created if this is not a wild card sub
- if( !PathParser.containsWildCards(name) ) {
- val rc = _get_or_create_destination(name, security)
- if( rc.failed ) {
- return rc.map_success(_=> Zilch);
- }
- }
-
- get_destination_matches(name).foreach{ node=>
- node.add_broadcast_consumer(consumer)
- }
- broadcast_consumers.put(name, consumer)
- Success(Zilch)
- }
-
- do_bind
-
- }
- }
-
- def unbind(destination:Destination, consumer:DeliveryConsumer) = dispatchQueue {
- assert( is_topic(destination) )
- val name = destination.name
- broadcast_consumers.remove(name, consumer)
- get_destination_matches(name).foreach{ node=>
- node.remove_broadcast_consumer(consumer)
- }
- consumer.release
- }
-
-
- def connect(destination:Destination, producer:DeliveryProducer, security:SecurityContext)(completed: (Result[DeliveryProducerRoute,String])=>Unit) = {
-
- val route = new DeliveryProducerRoute(this, destination, producer) {
- override def on_connected = {
- completed(Success(this));
- }
- }
-
- def do_connect:Result[Zilch, String] = {
- val topic = is_topic(destination)
-
-
- var destination_security = security
- // Looking up the queue will cause it to get created if it does not exist.
- val queue = if( topic ) {
-
- def config = host.destination_config(destination.name).getOrElse(new DestinationDTO)
- if( host.authorizer!=null && security!=null && !host.authorizer.can_send_to(security, host, config)) {
- return new Failure("Not authorized to send to the destination")
- }
- None
-
- } else {
-
- val dto = new QueueBindingDTO
- dto.name = DestinationParser.encode_path(destination.name)
-
- // Can we send to the queue?
- def config = host.queue_config(dto).getOrElse(new QueueDTO)
- if( host.authorizer!=null && security!=null && !host.authorizer.can_send_to(security, host, config) ) {
- return Failure("Not authorized to send to the queue")
- }
-
- destination_security = null
- val rc = _get_or_create_queue(dto, security)
- if( rc.failed ) {
- return rc.map_success(_=>Zilch)
- }
- Some(rc.success)
- }
-
- _get_or_create_destination(destination.name, security) match {
- case Success(node)=>
- if( node.unified || topic ) {
- node.add_broadcast_producer( route )
- } else {
- route.bind( queue.toList )
- }
- route.connected()
- Success(Zilch)
-
- case Failure(reason)=>
- Failure(reason)
- }
- }
-
- dispatchQueue {
- do_connect.failure_option.foreach(x=> producer.dispatch_queue { completed(Failure(x)) } )
- }
-
- }
-
- def disconnect(route:DeliveryProducerRoute) = dispatchQueue {
- _get_destination(route.destination.name).foreach { node=>
- val topic = is_topic(route.destination)
- if( node.unified || topic ) {
- node.remove_broadcast_producer(route)
- }
- }
- route.disconnected()
- route.release
- }
-
-}
-
-/**
- * Tracks state associated with a destination name.
- */
-class RoutingNode(val router:Router, val name:Path, val config:DestinationDTO) {
-
- val id = router.destination_id_counter.incrementAndGet
-
- var broadcast_producers = ListBuffer[DeliveryProducerRoute]()
- var broadcast_consumers = ListBuffer[DeliveryConsumer]()
- var queues = ListBuffer[Queue]()
-
- import OptionSupport._
-
- def unified = config.unified.getOrElse(false)
- def slow_consumer_policy = config.slow_consumer_policy.getOrElse("block")
-
- var consumer_proxies = Map[DeliveryConsumer, DeliveryConsumer]()
-
- def add_broadcast_consumer (consumer:DeliveryConsumer) = {
-
- var target = consumer
- slow_consumer_policy match {
- case "queue" =>
-
- // create a temp queue so that it can spool
- val queue = router._create_queue(-1, new TempBinding(consumer), null).success
- queue.dispatch_queue.setTargetQueue(consumer.dispatch_queue)
- queue.bind(List(consumer))
-
- consumer_proxies += consumer->queue
- target = queue
-
- case "block" =>
- // just have dispatcher dispatch directly to them..
- }
-
- broadcast_consumers += target
- val list = target :: Nil
- broadcast_producers.foreach({ r=>
- r.bind(list)
- })
- }
-
- def remove_broadcast_consumer (consumer:DeliveryConsumer) = {
-
- var target = consumer_proxies.get(consumer).getOrElse(consumer)
-
- broadcast_consumers = broadcast_consumers.filterNot( _ == target )
-
- val list = target :: Nil
- broadcast_producers.foreach({ r=>
- r.unbind(list)
- })
-
- target match {
- case queue:Queue=>
- val binding = new TempBinding(consumer)
- if( queue.binding == binding ) {
- queue.unbind(List(consumer))
- router._destroy_queue(queue.id, null)
- }
- case _ =>
- }
- }
-
- def add_broadcast_producer (producer:DeliveryProducerRoute) = {
- broadcast_producers += producer
- producer.bind(broadcast_consumers.toList)
- }
-
- def remove_broadcast_producer (producer:DeliveryProducerRoute) = {
- broadcast_producers = broadcast_producers.filterNot( _ == producer )
- producer.unbind(broadcast_consumers.toList)
- }
-
- def add_queue (queue:Queue) = {
- queue.binding.bind(this, queue)
- queues += queue
- }
-
- def remove_queue (queue:Queue) = {
- queues = queues.filterNot( _ == queue )
- queue.binding.unbind(this, queue)
- }
+ def disconnect(destinations:Array[DestinationDTO], producer:BindableDeliveryProducer)
}
/**
+ * An object which produces deliveries to which allows new DeliveryConsumer
+ * object to bind so they can also receive those deliveries.
+ *
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-trait Route extends Retained {
+trait BindableDeliveryProducer extends DeliveryProducer with Retained {
def dispatch_queue:DispatchQueue
- val metric = new AtomicLong();
def bind(targets:List[DeliveryConsumer]):Unit
def unbind(targets:List[DeliveryConsumer]):Unit
-
+
def connected():Unit
def disconnected():Unit
}
+object DeliveryProducerRoute extends Log
+
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-case class DeliveryProducerRoute(val router:Router, val destination:Destination, val producer:DeliveryProducer) extends BaseRetained with Route with Sink[Delivery] with DispatchLogging {
+// case class DeliveryProducerRoute(val router:Router, val destination:DestinationDTO, val path:Path, val producer:DeliveryProducer, val security:SecurityContext) extends BaseRetained with Route with Sink[Delivery] with DispatchLogging {
+abstract class DeliveryProducerRoute(val router:Router) extends BaseRetained with BindableDeliveryProducer with Sink[Delivery] {
- override protected def log = Router
- override def dispatch_queue = producer.dispatch_queue
+ import DeliveryProducerRoute._
// Retain the queue while we are retained.
dispatch_queue.retain
@@ -523,19 +84,15 @@ case class DeliveryProducerRoute(val rou
on_connected
}
- def bind(targets:List[DeliveryConsumer]) = {
- targets.foreach(_.retain)
+ def bind(consumers:List[DeliveryConsumer]) = {
+ consumers.foreach(_.retain)
dispatch_queue {
- internal_bind(targets)
- }
- }
-
- private def internal_bind(values:List[DeliveryConsumer]) = {
- values.foreach{ x=>
- debug("producer route attaching to conusmer.")
- val target = x.connect(producer);
- target.refiller = drainer
- targets ::= target
+ consumers.foreach{ x=>
+ debug("producer route attaching to conusmer.")
+ val target = x.connect(this);
+ target.refiller = drainer
+ targets ::= target
+ }
}
}
@@ -561,8 +118,7 @@ case class DeliveryProducerRoute(val rou
this.targets.foreach { x=>
debug("producer route detaching from conusmer.")
x.close
- x.consumer.release
- }
+ }
}
protected def on_connected = {}
Added: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1063582&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala Wed Jan 26 03:10:35 2011
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker
+
+import org.apache.activemq.apollo.util._
+import scala.collection.immutable.List
+import org.apache.activemq.apollo.util.path.Path
+import org.apache.activemq.apollo.dto._
+import security.SecurityContext
+import collection.mutable.{HashMap, ListBuffer}
+
+/**
+ * <p>
+ * A logical messaging topic
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class Topic(val router:LocalRouter, val name:String, val config:TopicDTO, val id:Long) extends DomainDestination {
+
+ var producers = ListBuffer[BindableDeliveryProducer]()
+ var consumers = ListBuffer[DeliveryConsumer]()
+ var durable_subscriptions = ListBuffer[Queue]()
+ var consumer_queues = HashMap[DeliveryConsumer, Queue]()
+
+ import OptionSupport._
+
+ def slow_consumer_policy = config.slow_consumer_policy.getOrElse("block")
+
+ def can_bind(destination: DestinationDTO, consumer:DeliveryConsumer, security:SecurityContext) = {
+ val authorizer = router.host.authorizer
+ if( authorizer!=null && security!=null && !authorizer.can_receive_from(security, router.host, config) ) {
+ false
+ } else {
+ true
+ }
+ }
+
+ def is_same_ds(sub1:DurableSubscriptionDestinationDTO, sub2:DurableSubscriptionDestinationDTO) = {
+ (sub1.client_id, sub1.subscription_id) == (sub2.client_id, sub2.subscription_id)
+ }
+
+ def bind (destination: DestinationDTO, consumer:DeliveryConsumer) = {
+ destination match {
+ case null=> // unified queue case
+
+ consumers += consumer
+ val list = List(consumer)
+ producers.foreach({ r=>
+ r.bind(list)
+ })
+
+ case destination:TopicDestinationDTO=>
+ var target = consumer
+ slow_consumer_policy match {
+ case "queue" =>
+
+ // create a temp queue so that it can spool
+ val queue = router._create_queue(-1, new TempQueueBinding(consumer), new QueueDTO)
+ queue.dispatch_queue.setTargetQueue(consumer.dispatch_queue)
+ queue.bind(List(consumer))
+
+ consumer_queues += consumer->queue
+ target = queue
+
+ case "block" =>
+ // just have dispatcher dispatch directly to them..
+ }
+
+ consumers += target
+ val list = target :: Nil
+ producers.foreach({ r=>
+ r.bind(list)
+ })
+
+ case destination:DurableSubscriptionDestinationDTO=>
+
+ val queue = router.topic_domain.get_or_create_durable_subscription(destination)
+ if( !durable_subscriptions.contains(queue) ) {
+ durable_subscriptions += queue
+ val list = List(queue)
+ producers.foreach({ r=>
+ r.bind(list)
+ })
+ }
+
+ // Typically durable subs are only consumed by on connection at a time. So collocate the
+ // queue onto the consumer's dispatch queue.
+ queue.dispatch_queue.setTargetQueue(consumer.dispatch_queue)
+ queue.bind(destination, consumer)
+ consumer_queues += consumer->queue
+ }
+ }
+
+ def unbind (consumer:DeliveryConsumer, persistent:Boolean) = {
+
+ consumer_queues.remove(consumer) match {
+ case Some(queue)=>
+
+ queue.unbind(List(consumer))
+
+ queue.binding match {
+ case x:TempQueueBinding =>
+
+ val list = List(queue)
+ producers.foreach({ r=>
+ r.unbind(list)
+ })
+ router._destroy_queue(queue.id, null)
+
+ case x:DurableSubscriptionQueueBinding =>
+ if( persistent ) {
+ router.topic_domain.destroy_durable_subscription(queue)
+ }
+ }
+
+ case None=>
+
+ // producers are directly delivering to the consumer..
+ val original = consumers.size
+ consumers -= consumer
+ if( original!= consumers.size ) {
+ val list = List(consumer)
+ producers.foreach({ r=>
+ r.unbind(list)
+ })
+ }
+ }
+
+ }
+
+ def bind_durable_subscription(destination: DurableSubscriptionDestinationDTO, queue:Queue) = {
+ if( !durable_subscriptions.contains(queue) ) {
+ durable_subscriptions += queue
+ val list = List(queue)
+ producers.foreach({ r=>
+ r.bind(list)
+ })
+ consumer_queues.foreach{case (consumer, q)=>
+ if( q==queue ) {
+ bind(destination, consumer)
+ }
+ }
+ }
+ }
+
+ def unbind_durable_subscription(destination: DurableSubscriptionDestinationDTO, queue:Queue) = {
+ if( durable_subscriptions.contains(queue) ) {
+ durable_subscriptions -= queue
+ val list = List(queue)
+ producers.foreach({ r=>
+ r.unbind(list)
+ })
+ consumer_queues.foreach{case (consumer, q)=>
+ if( q==queue ) {
+ unbind(consumer, false)
+ }
+ }
+ }
+ }
+
+ def can_connect(destination:DestinationDTO, producer:BindableDeliveryProducer, security:SecurityContext):Boolean = {
+ val authorizer = router.host.authorizer
+ if( authorizer!=null && security!=null && !authorizer.can_send_to(security, router.host, config) ) {
+ false
+ } else {
+ true
+ }
+ }
+
+ def connect (destination:DestinationDTO, producer:BindableDeliveryProducer) = {
+ producers += producer
+ producer.bind(consumers.toList ::: durable_subscriptions.toList)
+ }
+
+ def disconnect (producer:BindableDeliveryProducer) = {
+ producers = producers.filterNot( _ == producer )
+ producer.unbind(consumers.toList ::: durable_subscriptions.toList)
+ }
+
+}