You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:40:20 UTC
svn commit: r961068 [1/4] - in /activemq/sandbox/activemq-apollo-actor:
activemq-broker/ activemq-broker/src/main/java/org/apache/activemq/apollo/
activemq-broker/src/main/java/org/apache/activemq/apollo/broker/
activemq-broker/src/main/java/org/apache...
Author: chirino
Date: Wed Jul 7 03:40:18 2010
New Revision: 961068
URL: http://svn.apache.org/viewvc?rev=961068&view=rev
Log:
converting broker module to be scala based
Added:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ScalaBroker.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/TransactionManager.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/XidImpl.java
- copied, changed from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XidImpl.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/AnyChildPathNode.java
- copied, changed from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/AnyChildPathNode.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathFilter.java
- copied, changed from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathFilter.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathMap.java
- copied, changed from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathMap.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathMapEntry.java
- copied, changed from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathMapEntry.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathMapNode.java
- copied, changed from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathMapNode.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathNode.java
- copied, changed from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathNode.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathSupport.java
- copied, changed from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathSupport.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PrefixPathFilter.java
- copied, changed from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PrefixPathFilter.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/SimplePathFilter.java
- copied, changed from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/SimplePathFilter.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/WildcardPathFilter.java
- copied, changed from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/WildcardPathFilter.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/package.html
- copied, changed from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/transport/vm/package.html
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/path/
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/path/PathMapMemoryTest.java
- copied, changed from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/apollo/broker/path/PathMapMemoryTest.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/path/PathMapTest.java
- copied, changed from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/apollo/broker/path/PathMapTest.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/transport/
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/transport/vm/
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/transport/vm/VMTransportTest.java
- copied, changed from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/apollo/transport/vm/VMTransportTest.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/BrokerTest.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/BrokerTestBase.java
- copied, changed from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Domain.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/SharedQueueTest.java
- copied, changed from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java
Removed:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/Connection.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerAware.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerConnection.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerFactory.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerSubscription.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/CompositeSubscription.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DeliveryTarget.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Destination.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Domain.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DurableSubscription.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/LocalTransaction.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDelivery.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDeliveryWrapper.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ProtocolHandler.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ProtocolHandlerFactory.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Queue.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Transaction.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TransactionManager.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/WildcardQueueSubscription.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XATransaction.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XidImpl.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/AnyChildPathNode.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathFilter.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathMap.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathMapEntry.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathMapNode.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathNode.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathSupport.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PrefixPathFilter.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/SimplePathFilter.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/WildcardPathFilter.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/transport/vm/VMTransportFactory.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/transport/vm/package.html
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/apollo/broker/path/PathMapMemoryTest.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/apollo/broker/path/PathMapTest.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/apollo/transport/vm/VMTransportTest.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteConsumer.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/pom.xml
activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/JAXBBrokerFactory.java
activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/MemoryStoreXml.java
activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/VirtualHostXml.java
activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/test/java/org/apache/activemq/apollo/jaxb/JAXBConfigTest.java
activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/pom.xml?rev=961068&r1=961067&r2=961068&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/pom.xml Wed Jul 7 03:40:18 2010
@@ -35,11 +35,26 @@
<dependency>
<groupId>org.fusesource.hawtdispatch</groupId>
- <artifactId>hawtdispatch</artifactId>
+ <artifactId>hawtdispatch-scala</artifactId>
<version>${hawtdispatch-version}</version>
</dependency>
<dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <scope>compile</scope>
+ <version>${scala-version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-compiler</artifactId>
+ <version>${scala-version}</version>
+ <scope>compile</scope>
+ <optional>true</optional>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-transport</artifactId>
</dependency>
@@ -78,6 +93,10 @@
<build>
+ <defaultGoal>install</defaultGoal>
+ <sourceDirectory>src/main/scala</sourceDirectory>
+ <testSourceDirectory>src/test/scala</testSourceDirectory>
+
<resources>
<resource>
<directory>target/schema</directory>
@@ -108,6 +127,43 @@
</executions>
</plugin>
+ <plugin>
+ <groupId>org.scala-tools</groupId>
+ <artifactId>maven-scala-plugin</artifactId>
+ <version>2.13.1</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>compile</goal>
+ <goal>testCompile</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <jvmArgs>
+ <jvmArg>-Xmx1024m</jvmArg>
+ </jvmArgs>
+ <args>
+ <arg>-deprecation</arg>
+ <arg>-Xno-varargs-conversion</arg>
+ </args>
+ <scalaVersion>${scala-version}</scalaVersion>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.4.3</version>
+ <configuration>
+ <!-- we must turn off the use of system class loader so our tests can find stuff - otherwise ScalaSupport compiler can't find stuff -->
+ <useSystemClassLoader>false</useSystemClassLoader>
+ <!--forkMode>pertest</forkMode-->
+ <childDelegation>false</childDelegation>
+ <useFile>true</useFile>
+ <failIfNoTests>false</failIfNoTests>
+ </configuration>
+ </plugin>
+
</plugins>
</build>
Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.scala?rev=961068&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.scala Wed Jul 7 03:40:18 2010
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker
+
+import _root_.java.util.concurrent.atomic.AtomicLong
+import _root_.org.apache.activemq.util.buffer._
+import _root_.org.fusesource.hawtdispatch._
+import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
+
+import java.util.HashMap
+import collection.JavaConversions
+import path.PathMap
+
+object Domain {
+ val TOPIC_DOMAIN = new AsciiBuffer("topic");
+ val QUEUE_DOMAIN = new AsciiBuffer("queue");
+ val TEMP_TOPIC_DOMAIN = new AsciiBuffer("temp-topic");
+ val TEMP_QUEUE_DOMAIN = new AsciiBuffer("temp-queue");
+}
+
+import Domain._
+class Domain {
+
+ val targets = new PathMap[DeliveryTarget]();
+
+ def bind(name:AsciiBuffer, queue:DeliveryTarget) = {
+ targets.put(name, queue);
+ }
+
+ def unbind(name:AsciiBuffer, queue:DeliveryTarget) = {
+ targets.remove(name, queue);
+ }
+
+//
+// synchronized public Collection<DeliveryTarget> route(AsciiBuffer name, MessageDelivery delivery) {
+// return targets.get(name);
+// }
+
+}
+
+
+/**
+ * Provides a non-blocking concurrent producer to consumer
+ * routing implementation.
+ *
+ * DeliveryProducers create a route object for each destination
+ * they will be producing to. Once the route is
+ * connected to the router, the producer can use
+ * the route.targets list without synchronization to
+ * get the current set of consumers that are bound
+ * to the destination.
+ *
+ */
+class Router(var queue:DispatchQueue) {
+
+ trait DestinationNode {
+ var targets = List[DeliveryTarget]()
+ var routes = List[DeliveryProducerRoute]()
+
+ def on_bind(x:List[DeliveryTarget]):Unit
+ def on_unbind(x:List[DeliveryTarget]):Boolean
+ def on_connect(route:DeliveryProducerRoute):Unit
+ def on_disconnect(route:DeliveryProducerRoute):Boolean = {
+ routes = routes.filterNot({r=> route==r})
+ route.disconnected()
+ routes == Nil && targets == Nil
+ }
+ }
+
+ class TopicDestinationNode extends DestinationNode {
+ def on_bind(x:List[DeliveryTarget]) = {
+ targets = x ::: targets
+ routes.foreach({r=>
+ r.bind(x)
+ })
+ }
+
+ def on_unbind(x:List[DeliveryTarget]):Boolean = {
+ targets = targets.filterNot({t=>x.contains(t)})
+ routes.foreach({r=>
+ r.unbind(x)
+ })
+ routes == Nil && targets == Nil
+ }
+
+ def on_connect(route:DeliveryProducerRoute) = {
+ routes = route :: routes
+ route.connected(targets)
+ }
+ }
+
+ class QueueDestinationNode(destination:Destination) extends DestinationNode {
+ val queue = new Queue(destination)
+
+ def on_bind(x:List[DeliveryTarget]) = {
+ targets = x ::: targets
+ queue.bind(x)
+ }
+
+ def on_unbind(x:List[DeliveryTarget]):Boolean = {
+ targets = targets.filterNot({t=>x.contains(t)})
+ queue.unbind(x)
+ routes == Nil && targets == Nil
+ }
+
+ def on_connect(route:DeliveryProducerRoute) = {
+ routes = route :: routes
+ route.connected(queue :: Nil)
+ }
+ }
+
+ var destinations = new HashMap[Destination, DestinationNode]()
+
+ private def get(destination:Destination):DestinationNode = {
+ var result = destinations.get(destination)
+ if( result ==null ) {
+ if( isTopic(destination) ) {
+ result = new TopicDestinationNode
+ } else {
+ result = new QueueDestinationNode(destination)
+ }
+ destinations.put(destination, result)
+ }
+ result
+ }
+
+ def bind(destination:Destination, targets:List[DeliveryTarget]) = retaining(targets) {
+ get(destination).on_bind(targets)
+ } ->: queue
+
+ def unbind(destination:Destination, targets:List[DeliveryTarget]) = releasing(targets) {
+ if( get(destination).on_unbind(targets) ) {
+ destinations.remove(destination)
+ }
+ } ->: queue
+
+ def connect(destination:Destination, routeQueue:DispatchQueue, producer:DeliveryProducer)(completed: (DeliveryProducerRoute)=>Unit) = {
+ val route = new DeliveryProducerRoute(destination, routeQueue, producer) {
+ override def on_connected = {
+ completed(this);
+ }
+ }
+ ^ {
+ get(destination).on_connect(route)
+ } ->: queue
+ }
+
+ def isTopic(destination:Destination) = destination.getDomain == TOPIC_DOMAIN
+ def isQueue(destination:Destination) = !isTopic(destination)
+
+ def disconnect(route:DeliveryProducerRoute) = releasing(route) {
+ get(route.destination).on_disconnect(route)
+ } ->: queue
+
+
+ def each(proc:(Destination, DestinationNode)=>Unit) = {
+ import JavaConversions._;
+ for( (destination, node) <- destinations ) {
+ proc(destination, node)
+ }
+ }
+
+}
+
+trait Route extends Retained {
+
+ val destination:Destination
+ val queue:DispatchQueue
+ val metric = new AtomicLong();
+
+ def connected(targets:List[DeliveryTarget]):Unit
+ def bind(targets:List[DeliveryTarget]):Unit
+ def unbind(targets:List[DeliveryTarget]):Unit
+ def disconnected():Unit
+
+}
+
+class DeliveryProducerRoute(val destination:Destination, val queue:DispatchQueue, val producer:DeliveryProducer) extends BaseRetained with Route {
+
+
+ // Retain the queue while we are retained.
+ queue.retain
+ setDisposer(^{
+ queue.release
+ })
+
+ var targets = List[DeliveryTargetSession]()
+
+ def connected(targets:List[DeliveryTarget]) = retaining(targets) {
+ internal_bind(targets)
+ on_connected
+ } ->: queue
+
+ def bind(targets:List[DeliveryTarget]) = retaining(targets) {
+ internal_bind(targets)
+ } ->: queue
+
+ private def internal_bind(values:List[DeliveryTarget]) = {
+ values.foreach{ x=>
+ targets = x.open_session(queue) :: targets
+ }
+ }
+
+ def unbind(targets:List[DeliveryTarget]) = releasing(targets) {
+ this.targets = this.targets.filterNot { x=>
+ val rc = targets.contains(x.consumer)
+ if( rc ) {
+ x.close
+ }
+ rc
+ }
+ } ->: queue
+
+ def disconnected() = ^ {
+ this.targets.foreach { x=>
+ x.close
+ x.consumer.release
+ }
+ } ->: queue
+
+ protected def on_connected = {}
+ protected def on_disconnected = {}
+
+}
Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=961068&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala Wed Jul 7 03:40:18 2010
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker
+
+import _root_.java.beans.ExceptionListener
+import _root_.java.io.{IOException}
+import _root_.java.util.{LinkedHashMap, HashMap}
+import _root_.org.apache.activemq.filter.{BooleanExpression}
+import _root_.org.apache.activemq.transport._
+import _root_.org.apache.activemq.Service
+import _root_.java.lang.{String}
+import _root_.org.apache.activemq.util.{FactoryFinder, IOExceptionSupport}
+import _root_.org.apache.activemq.wireformat.WireFormat
+import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
+class ConnectionConfig {
+
+}
+abstract class Connection() extends TransportListener with Service {
+
+ val q = createQueue("connection")
+ var name = "connection"
+ var stopping = false;
+
+ var transport:Transport = null
+ var exceptionListener:ExceptionListener = null;
+
+ def start() = {
+ transport.setDispatchQueue(q);
+ transport.getDispatchQueue.release
+ transport.setTransportListener(this);
+ transport.start()
+ }
+
+ def stop() = {
+ stopping=true
+ transport.stop()
+ q.release
+ }
+
+ def onException(error:IOException) = {
+ if (!stopping) {
+ onFailure(error);
+ }
+ }
+
+ def onFailure(error:Exception) = {
+ if (exceptionListener != null) {
+ exceptionListener.exceptionThrown(error);
+ }
+ }
+
+ def onDisconnected() = {
+ }
+
+ def onConnected() = {
+ }
+
+}
+
+object BrokerConnection extends Log
+
+class BrokerConnection(val broker: Broker) extends Connection with Logging {
+ override protected def log = BrokerConnection
+
+ var protocolHandler: ProtocolHandler = null;
+
+ exceptionListener = new ExceptionListener() {
+ def exceptionThrown(error:Exception) = {
+ info("Transport failed before messaging protocol was initialized.", error);
+ stop()
+ }
+ }
+
+
+ def onCommand(command: Object) = {
+ if (protocolHandler != null) {
+ protocolHandler.onCommand(command);
+ } else {
+ try {
+ var wireformat:WireFormat = null;
+
+ if (command.isInstanceOf[WireFormat]) {
+
+ // First command might be from the wire format decriminator, letting
+ // us know what the actually wireformat is.
+ wireformat = command.asInstanceOf[WireFormat];
+
+ try {
+ protocolHandler = ProtocolHandlerFactory.createProtocolHandler(wireformat.getName());
+ } catch {
+ case e:Exception=>
+ throw IOExceptionSupport.create("No protocol handler available for: " + wireformat.getName(), e);
+ }
+
+ protocolHandler.setConnection(this);
+ protocolHandler.setWireFormat(wireformat);
+ protocolHandler.start();
+
+ exceptionListener = new ExceptionListener() {
+ def exceptionThrown(error:Exception) {
+ protocolHandler.onException(error);
+ }
+ }
+ protocolHandler.onCommand(command);
+
+ } else {
+ throw new IOException("First command should be a WireFormat");
+ }
+
+ } catch {
+ case e:Exception =>
+ onFailure(e);
+ }
+ }
+ }
+
+ override def stop() = {
+ super.stop();
+ if (protocolHandler != null) {
+ protocolHandler.stop();
+ }
+ }
+}
+
+
+object ProtocolHandlerFactory {
+ val PROTOCOL_HANDLER_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/broker/protocol/");
+
+ def createProtocolHandler(protocol:String) = {
+ PROTOCOL_HANDLER_FINDER.newInstance(protocol).asInstanceOf[ProtocolHandler]
+ }
+}
+
+trait ProtocolHandler extends Service {
+
+ def onCommand(command:Any);
+ def setConnection(brokerConnection:BrokerConnection);
+ def setWireFormat(wireformat:WireFormat);
+ def onException(error:Exception);
+
+// TODO:
+// public void setConnection(BrokerConnection connection);
+//
+// public BrokerConnection getConnection();
+//
+// public void onCommand(Object command);
+//
+// public void onException(Exception error);
+//
+// public void setWireFormat(WireFormat wf);
+//
+// public BrokerMessageDelivery createMessageDelivery(MessageRecord record) throws IOException;
+//
+// /**
+// * ClientContext
+// * <p>
+// * Description: Base interface describing a channel on a physical
+// * connection.
+// * </p>
+// *
+// * @author cmacnaug
+// * @version 1.0
+// */
+// public interface ClientContext {
+// public ClientContext getParent();
+//
+// public Collection<ClientContext> getChildren();
+//
+// public void addChild(ClientContext context);
+//
+// public void removeChild(ClientContext context);
+//
+// public void close();
+//
+// }
+//
+// public abstract class AbstractClientContext<E extends MessageDelivery> extends AbstractLimitedFlowResource<E> implements ClientContext {
+// protected final HashSet<ClientContext> children = new HashSet<ClientContext>();
+// protected final ClientContext parent;
+// protected boolean closed = false;
+//
+// public AbstractClientContext(String name, ClientContext parent) {
+// super(name);
+// this.parent = parent;
+// if (parent != null) {
+// parent.addChild(this);
+// }
+// }
+//
+// public ClientContext getParent() {
+// return parent;
+// }
+//
+// public void addChild(ClientContext child) {
+// if (!closed) {
+// children.add(child);
+// }
+// }
+//
+// public void removeChild(ClientContext child) {
+// if (!closed) {
+// children.remove(child);
+// }
+// }
+//
+// public Collection<ClientContext> getChildren() {
+// return children;
+// }
+//
+// public void close() {
+//
+// closed = true;
+//
+// for (ClientContext c : children) {
+// c.close();
+// }
+//
+// if (parent != null) {
+// parent.removeChild(this);
+// }
+//
+// super.close();
+// }
+// }
+//
+}
+
+trait ConsumerContext { // extends ClientContext, Subscription<MessageDelivery>, IFlowSink<MessageDelivery> {
+
+ def getConsumerId() : String
+
+ def getDestination(): Destination
+
+ def getSelector() : String
+
+ def getSelectorExpression() : BooleanExpression
+
+ def isDurable() : Boolean
+
+ def getSubscriptionName() : String
+
+ /**
+ * If the destination does not exist, should it automatically be
+ * created?
+ *
+ * @return
+ */
+ def autoCreateDestination():Boolean
+
+ def isPersistent() : Boolean
+
+}
+
Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=961068&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jul 7 03:40:18 2010
@@ -0,0 +1,568 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker
+
+import _root_.java.util.{LinkedList, LinkedHashMap, HashMap}
+import _root_.org.apache.activemq.filter.{MessageEvaluationContext}
+import _root_.java.lang.{String}
+import _root_.org.apache.activemq.util.buffer.{Buffer, AsciiBuffer}
+import _root_.org.fusesource.hawtdispatch._
+import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
+
+trait DeliveryProducer {
+ def collocate(queue:DispatchQueue):Unit
+}
+
+trait DeliveryTargetSession {
+ val consumer:DeliveryTarget
+ def deliver(delivery:MessageDelivery)
+ def close:Unit
+}
+trait DeliveryTarget extends Retained {
+ def matches(message:MessageDelivery)
+ val queue:DispatchQueue;
+ def open_session(producer_queue:DispatchQueue):DeliveryTargetSession
+}
+
+/**
+ * Abstracts wire protocol message implementations. Each wire protocol
+ * will provide it's own type of Message.
+ */
+trait Message {
+
+ /**
+ * the globally unique id of the message
+ */
+ def id: AsciiBuffer
+
+ /**
+ * the globally unique id of the producer
+ */
+ def producer: AsciiBuffer
+
+ /**
+ * the message priority.
+ */
+ def priority:Byte
+
+ /**
+ * a positive value indicates that the delivery has an expiration
+ * time.
+ */
+ def expiration: Long
+
+ /**
+ * true if the delivery is persistent
+ */
+ def persistent: Boolean
+
+ /**
+ * where the message was sent to.
+ */
+ def destination: Destination
+
+ /**
+ * used to apply a selector against the message.
+ */
+ def messageEvaluationContext:MessageEvaluationContext
+
+}
+
+object MessageDelivery {
+ def apply(o:MessageDelivery) = new MessageDelivery(o.message, o.encoded, o.encoding, o.size, o.ack, o.tx_id, o.store_id)
+}
+
+case class MessageDelivery (
+
+ /**
+ * the message being delivered
+ */
+ message: Message,
+
+ /**
+ * the encoded form of the message being delivered.
+ */
+ encoded: Buffer,
+
+ /**
+ * the encoding format of the message
+ */
+ encoding: String,
+
+ /**
+ * memory size of the delivery. Used for resource allocation tracking
+ */
+ size:Int,
+
+ /**
+ * true if this delivery requires acknowledgment.
+ */
+ ack:Boolean,
+
+ /**
+ * The id used to identify the transaction that the message
+ * belongs to.
+ */
+ tx_id:Long,
+
+ /**
+ * The id used to identify this message in the message
+ * store.
+ *
+ * @return The store tracking or -1 if not set.
+ */
+ store_id: Long
+
+) extends BaseRetained {
+
+}
+
+//abstract class BrokerMessageDelivery extends MessageDelivery {
+// TODO:
+// // True while the message is being dispatched to the delivery targets:
+// boolean dispatching = false;
+//
+// // A non null pending save indicates that the message is the
+// // saver queue and that the message
+// OperationContext<?> pendingSave;
+//
+// // List of persistent targets for which the message should be saved
+// // when dispatch is complete:
+// HashMap<QueueDescriptor, SaveableQueueElement<MessageDelivery>> persistentTargets;
+// SaveableQueueElement<MessageDelivery> singleTarget;
+//
+// long storeTracking = -1;
+// BrokerDatabase store;
+// boolean fromStore = false;
+// boolean enableFlushDelay = true;
+// private int limiterSize = -1;
+// private long tid=-1;
+//
+// public void setFromDatabase(BrokerDatabase database, MessageRecord mRecord) {
+// fromStore = true;
+// store = database;
+// storeTracking = mRecord.getKey();
+// limiterSize = mRecord.getSize();
+// }
+//
+// public final int getFlowLimiterSize() {
+// if (limiterSize == -1) {
+// limiterSize = getMemorySize();
+// }
+// return limiterSize;
+// }
+//
+// /**
+// * When an application wishes to include a message in a broker transaction
+// * it must set this the tid returned by {@link Transaction#getTid()}
+// *
+// * @param tid
+// * Sets the tid used to identify the transaction at the broker.
+// */
+// public void setTransactionId(long tid) {
+// this.tid = tid;
+// }
+//
+// /**
+// * @return The tid used to identify the transaction at the broker.
+// */
+// public final long getTransactionId() {
+// return tid;
+// }
+//
+// public final void clearTransactionId() {
+// tid = -1;
+// }
+//
+// /**
+// * Subclass must implement this to return their current memory size
+// * estimate.
+// *
+// * @return The memory size of the message.
+// */
+// public abstract int getMemorySize();
+//
+// public final boolean isFromStore() {
+// return fromStore;
+// }
+//
+// public final void persist(SaveableQueueElement<MessageDelivery> sqe, ISourceController<?> controller, boolean delayable) {
+// synchronized (this) {
+// // Can flush of this message to the store be delayed?
+// if (enableFlushDelay && !delayable) {
+// enableFlushDelay = false;
+// }
+// // If this message is being dispatched then add the queue to the
+// // list of queues for which to save the message when dispatch is
+// // finished:
+// if (dispatching) {
+// addPersistentTarget(sqe);
+// return;
+// }
+// // Otherwise, if it is still in the saver queue, we can add this
+// // queue to the queue list:
+// else if (pendingSave != null) {
+// addPersistentTarget(sqe);
+// if (!delayable) {
+// pendingSave.requestFlush();
+// }
+// return;
+// }
+// }
+//
+// store.saveMessage(sqe, controller, delayable);
+// }
+//
+// public final void acknowledge(SaveableQueueElement<MessageDelivery> sqe) {
+// boolean firePersistListener = false;
+// boolean deleted = false;
+// synchronized (this) {
+// // If the message hasn't been saved to the database
+// // then we don't need to issue a delete:
+// if (dispatching || pendingSave != null) {
+//
+// deleted = true;
+//
+// removePersistentTarget(sqe.getQueueDescriptor());
+// // We get a save context when we place the message in the
+// // database queue. If it has been added to the queue,
+// // and we've removed the last queue, see if we can cancel
+// // the save:
+// if (pendingSave != null && !hasPersistentTargets()) {
+// if (pendingSave.cancel()) {
+// pendingSave = null;
+// if (isPersistent()) {
+// firePersistListener = true;
+// }
+// }
+// }
+// }
+// }
+//
+// if (!deleted) {
+// store.deleteQueueElement(sqe);
+// }
+//
+// if (firePersistListener) {
+// onMessagePersisted();
+// }
+//
+// }
+//
+// public final void setStoreTracking(long tracking) {
+// if (storeTracking == -1) {
+// storeTracking = tracking;
+// }
+// }
+//
+// public final void beginDispatch(BrokerDatabase database) {
+// this.store = database;
+// dispatching = true;
+// setStoreTracking(database.allocateStoreTracking());
+// }
+//
+// public long getStoreTracking() {
+// return storeTracking;
+// }
+//
+// public synchronized Collection<SaveableQueueElement<MessageDelivery>> getPersistentQueues() {
+// if (singleTarget != null) {
+// ArrayList<SaveableQueueElement<MessageDelivery>> list = new ArrayList<SaveableQueueElement<MessageDelivery>>(1);
+// list.add(singleTarget);
+// return list;
+// } else if (persistentTargets != null) {
+// return persistentTargets.values();
+// }
+// return null;
+// }
+//
+// public void beginStore() {
+// synchronized (this) {
+// pendingSave = null;
+// }
+// }
+//
+// private final boolean hasPersistentTargets() {
+// return (persistentTargets != null && !persistentTargets.isEmpty()) || singleTarget != null;
+// }
+//
+// private final void removePersistentTarget(QueueDescriptor queue) {
+// if (persistentTargets != null) {
+// persistentTargets.remove(queue);
+// return;
+// }
+//
+// if (singleTarget != null && singleTarget.getQueueDescriptor().equals(queue)) {
+// singleTarget = null;
+// }
+// }
+//
+// private final void addPersistentTarget(SaveableQueueElement<MessageDelivery> elem) {
+// if (persistentTargets != null) {
+// persistentTargets.put(elem.getQueueDescriptor(), elem);
+// return;
+// }
+//
+// if (singleTarget == null) {
+// singleTarget = elem;
+// return;
+// }
+//
+// if (elem.getQueueDescriptor() != singleTarget.getQueueDescriptor()) {
+// persistentTargets = new HashMap<QueueDescriptor, SaveableQueueElement<MessageDelivery>>();
+// persistentTargets.put(elem.getQueueDescriptor(), elem);
+// persistentTargets.put(singleTarget.getQueueDescriptor(), singleTarget);
+// singleTarget = null;
+// }
+// }
+//
+// public final void finishDispatch(ISourceController<?> controller) throws IOException {
+// boolean firePersistListener = false;
+// synchronized (this) {
+// // If any of the targets requested save then save the message
+// // Note that this could be the case even if the message isn't
+// // persistent if a target requested that the message be spooled
+// // for some other reason such as queue memory overflow.
+// if (hasPersistentTargets()) {
+// pendingSave = store.persistReceivedMessage(this, controller);
+// }
+//
+// // If none of the targets required persistence, then fire the
+// // persist listener:
+// if (pendingSave == null || !isPersistent()) {
+// firePersistListener = true;
+// }
+// dispatching = false;
+// }
+//
+// if (firePersistListener) {
+// onMessagePersisted();
+// }
+// }
+//
+// public final MessageRecord createMessageRecord() {
+//
+// MessageRecord record = new MessageRecord();
+// record.setEncoding(getStoreEncoding());
+// record.setBuffer(getStoreEncoded());
+// record.setStreamKey((long) 0);
+// record.setMessageId(getMsgId());
+// record.setSize(getFlowLimiterSize());
+// record.setKey(getStoreTracking());
+// return record;
+// }
+//
+// /**
+// * @return A buffer representation of the message to be stored in the store.
+// * @throws
+// */
+// protected abstract Buffer getStoreEncoded();
+//
+// /**
+// * @return The encoding scheme used to store the message.
+// */
+// protected abstract AsciiBuffer getStoreEncoding();
+//
+// public boolean isFlushDelayable() {
+// // TODO Auto-generated method stub
+// return enableFlushDelay;
+// }
+//}
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class DeliveryBuffer(var maxSize:Int=1024*32) {
+
+ var deliveries = new LinkedList[MessageDelivery]()
+ private var size = 0
+ var eventHandler: Runnable = null
+
+ def full = size >= maxSize
+
+ def drain = eventHandler.run
+
+ def receive = deliveries.poll
+
+ def isEmpty = deliveries.isEmpty
+
+ def send(delivery:MessageDelivery):Unit = {
+ delivery.retain
+ size += delivery.size
+ deliveries.addLast(delivery)
+ if( deliveries.size == 1 ) {
+ drain
+ }
+ }
+
+ def ack(delivery:MessageDelivery) = {
+ // When a message is delivered to the consumer, we release
+ // used capacity in the outbound queue, and can drain the inbound
+ // queue
+ val wasBlocking = full
+ size -= delivery.size
+ delivery.release
+ if( !isEmpty ) {
+ drain
+ }
+ }
+
+}
+
+class DeliveryOverflowBuffer(val delivery_buffer:DeliveryBuffer) {
+
+ private var overflow = new LinkedList[MessageDelivery]()
+
+ protected def drainOverflow:Unit = {
+ while( !overflow.isEmpty && !full ) {
+ val delivery = overflow.removeFirst
+ delivery.release
+ send_to_delivery_queue(delivery)
+ }
+ }
+
+ def send(delivery:MessageDelivery) = {
+ if( full ) {
+ // Deliveries in the overflow queue is remain acquired by us so that
+ // producer that sent it to us gets flow controlled.
+ delivery.retain
+ overflow.addLast(delivery)
+ } else {
+ send_to_delivery_queue(delivery)
+ }
+ }
+
+ protected def send_to_delivery_queue(value:MessageDelivery) = {
+ var delivery = MessageDelivery(value)
+ delivery.setDisposer(^{
+ drainOverflow
+ })
+ delivery_buffer.send(delivery)
+ delivery.release
+ }
+
+ def full = delivery_buffer.full
+
+}
+
+class DeliveryCreditBufferProtocol(val delivery_buffer:DeliveryBuffer, val queue:DispatchQueue) extends BaseRetained {
+
+ var sessions = List[CreditServer]()
+
+ var session_min_credits = 1024*4;
+ var session_credit_capacity = 1024*32
+ var session_max_credits = session_credit_capacity;
+
+ queue.retain
+ setDisposer(^{
+ source.release
+ queue.release
+ })
+
+ // use a event aggregating source to coalesce multiple events from the same thread.
+ val source = createSource(new ListEventAggregator[MessageDelivery](), queue)
+ source.setEventHandler(^{drain_source});
+ source.resume
+
+ def drain_source = {
+ val deliveries = source.getData
+ deliveries.foreach { delivery=>
+ delivery_buffer.send(delivery)
+ delivery.release
+ }
+ }
+
+
+ class CreditServer(val producer_queue:DispatchQueue) {
+ private var _capacity = 0
+
+ def capacity(value:Int) = {
+ val change = value - _capacity;
+ _capacity = value;
+ client.credit(change)
+ }
+
+ def drain(callback:Runnable) = {
+ client.drain(callback)
+ }
+
+ val client = new CreditClient()
+
+ class CreditClient() extends DeliveryOverflowBuffer(delivery_buffer) {
+
+ producer_queue.retain
+ val credit_adder = createSource(EventAggregators.INTEGER_ADD , producer_queue)
+ credit_adder.setEventHandler(^{
+ internal_credit(credit_adder.getData.intValue)
+ });
+ credit_adder.resume
+
+ private var credits = 0;
+
+ ///////////////////////////////////////////////////
+ // These methods get called from the client/producer thread...
+ ///////////////////////////////////////////////////
+ def close = {
+ credit_adder.release
+ producer_queue.release
+ }
+
+ override def full = credits <= 0
+
+ override protected def send_to_delivery_queue(value:MessageDelivery) = {
+ var delivery = MessageDelivery(value)
+ delivery.setDisposer(^{
+ // This is called from the server/consumer thread
+ credit_adder.merge(delivery.size);
+ })
+ internal_credit(-delivery.size)
+ source.merge(delivery)
+ }
+
+ def internal_credit(value:Int) = {
+ credits += value;
+ if( credits <= 0 ) {
+ credits = 0
+ } else {
+ drainOverflow
+ }
+ }
+
+ ///////////////////////////////////////////////////
+ // These methods get called from the server/consumer thread...
+ ///////////////////////////////////////////////////
+ def credit(value:Int) = ^{ internal_credit(value) } ->: producer_queue
+
+ def drain(callback:Runnable) = {
+ credits = 0
+ if( callback!=null ) {
+ queue << callback
+ }
+ }
+ }
+ }
+
+ def session(queue:DispatchQueue) = {
+ val session = new CreditServer(queue)
+ sessions = session :: sessions
+ session.capacity(session_max_credits)
+ session.client
+ }
+
+
+}
Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.scala?rev=961068&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.scala Wed Jul 7 03:40:18 2010
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker
+
+import _root_.java.util.{LinkedHashMap, HashMap}
+import _root_.org.apache.activemq.util.buffer.{AsciiBuffer}
+import BufferConversions._
+
+class ParserOptions {
+ var defaultDomain:AsciiBuffer = null
+ var queuePrefix:AsciiBuffer = null
+ var topicPrefix:AsciiBuffer = null
+ var tempQueuePrefix:AsciiBuffer = null
+ var tempTopicPrefix:AsciiBuffer = null
+}
+
+trait Destination {
+ def getDomain(): AsciiBuffer
+ def getName(): AsciiBuffer
+ def getDestinations():Seq[Destination]
+}
+
+object Destination {
+
+ /**
+ * Parses a simple destination.
+ *
+ * @param value
+ * @param options
+ * @return
+ */
+ def parse(value:AsciiBuffer, options:ParserOptions ):Destination = {
+ if (options.queuePrefix!=null && value.startsWith(options.queuePrefix)) {
+ var name = value.slice(options.queuePrefix.length, value.length).ascii();
+ return new SingleDestination(Domain.QUEUE_DOMAIN, name);
+ } else if (options.topicPrefix!=null && value.startsWith(options.topicPrefix)) {
+ var name = value.slice(options.topicPrefix.length, value.length).ascii();
+ return new SingleDestination(Domain.TOPIC_DOMAIN, name);
+ } else if (options.tempQueuePrefix!=null && value.startsWith(options.tempQueuePrefix)) {
+ var name = value.slice(options.tempQueuePrefix.length, value.length).ascii();
+ return new SingleDestination(Domain.TEMP_QUEUE_DOMAIN, name);
+ } else if (options.tempTopicPrefix!=null && value.startsWith(options.tempTopicPrefix)) {
+ var name = value.slice(options.tempTopicPrefix.length, value.length).ascii();
+ return new SingleDestination(Domain.TEMP_TOPIC_DOMAIN, name);
+ } else {
+ if( options.defaultDomain==null ) {
+ throw new IllegalArgumentException("Destination domain not provided: "+value);
+ }
+ return new SingleDestination(options.defaultDomain, value);
+ }
+ }
+
+ /**
+ * Parses a destination which may or may not be a composite.
+ *
+ * @param value
+ * @param options
+ * @param compositeSeparator
+ * @return
+ */
+ def parse(value:AsciiBuffer, options:ParserOptions , compositeSeparator:Byte ):Destination = {
+ if( value == null ) {
+ return null;
+ }
+
+ if( value.contains(compositeSeparator) ) {
+ var rc = value.split(compositeSeparator);
+ var md = new MultiDestination();
+ for (buffer <- rc) {
+ md.destinations ::= parse(buffer, options)
+ }
+ return md;
+ }
+ return parse(value, options);
+ }
+}
+
+class SingleDestination(var domain:AsciiBuffer=null, var name:AsciiBuffer=null) extends Destination {
+
+ def getDestinations():Seq[Destination] = null;
+ def getDomain():AsciiBuffer = domain
+ def getName():AsciiBuffer = name
+
+ override def toString() = ""+domain+":"+name
+}
+
+class MultiDestination(var destinations:List[Destination]=Nil) extends Destination {
+
+ def getDestinations():Seq[Destination] = destinations;
+ def getDomain():AsciiBuffer = null
+ def getName():AsciiBuffer = null
+
+ override def toString() = destinations.mkString(",")
+}
Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala?rev=961068&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala Wed Jul 7 03:40:18 2010
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker
+
+import _root_.java.util.{LinkedHashMap, HashMap}
+import _root_.java.lang.{Throwable, String}
+import _root_.org.apache.commons.logging.LogFactory
+import _root_.org.apache.commons.logging.{Log => Logger}
+
+trait Log {
+ val log = LogFactory.getLog(getClass.getName)
+}
+
+/**
+ * A Logging trait you can mix into an implementation class without affecting its public API
+ */
+trait Logging {
+
+ protected def log: Log
+
+ protected def error(message: => String): Unit = log.log.error(message)
+
+ protected def error(e: Throwable): Unit = log.log.error(e.getMessage, e)
+
+ protected def error(message: => String, e: Throwable): Unit = log.log.error(message, e)
+
+ protected def warn(message: => String): Unit = log.log.warn(message)
+
+ protected def warn(message: => String, e: Throwable): Unit = log.log.warn(message, e)
+
+ protected def info(message: => String): Unit = log.log.info(message)
+
+ protected def info(message: => String, e: Throwable): Unit = log.log.info(message, e)
+
+ protected def debug(message: => String): Unit = log.log.debug(message)
+
+ protected def debug(message: => String, e: Throwable): Unit = log.log.debug(message, e)
+
+ protected def trace(message: => String): Unit = log.log.trace(message)
+
+ protected def trace(message: => String, e: Throwable): Unit = log.log.trace(message, e)
+
+}
+
Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ScalaBroker.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ScalaBroker.scala?rev=961068&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ScalaBroker.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ScalaBroker.scala Wed Jul 7 03:40:18 2010
@@ -0,0 +1,571 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker
+
+import _root_.java.io.{File}
+import _root_.java.util.{LinkedList, LinkedHashMap, ArrayList, HashMap}
+import _root_.org.apache.activemq.transport._
+import _root_.org.apache.activemq.Service
+import _root_.java.lang.{String}
+import _root_.org.apache.activemq.util.buffer.{Buffer, UTF8Buffer, AsciiBuffer}
+import _root_.org.apache.activemq.util.{FactoryFinder, IOHelper}
+import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
+import _root_.org.fusesource.hawtdispatch.{DispatchQueue, BaseRetained}
+
+import _root_.scala.collection.JavaConversions._
+
+object BrokerFactory {
+
+ val BROKER_FACTORY_HANDLER_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/apollo/broker/");
+
+ trait Handler {
+ def createBroker(brokerURI:String):Broker
+ }
+
+
+ def createHandler(name:String):Handler = {
+ BROKER_FACTORY_HANDLER_FINDER.newInstance(name).asInstanceOf[Handler]
+ }
+
+ /**
+ * Creates a broker from a URI configuration
+ *
+ * @param brokerURI the URI scheme to configure the broker
+ * @param startBroker whether or not the broker should have its
+ * {@link Broker#start()} method called after
+ * construction
+ * @throws Exception
+ */
+ def createBroker(brokerURI:String, startBroker:Boolean=false):Broker = {
+ var split = brokerURI.split(":")
+ if (split.length < 2 ) {
+ throw new IllegalArgumentException("Invalid broker URI, no scheme specified: " + brokerURI)
+ }
+ var handler = createHandler(split(0))
+ var broker = handler.createBroker(brokerURI)
+ if (startBroker) {
+ broker.start();
+ }
+ return broker;
+ }
+
+}
+
+object BufferConversions {
+
+ implicit def toAsciiBuffer(value:String) = new AsciiBuffer(value)
+ implicit def toUTF8Buffer(value:String) = new UTF8Buffer(value)
+ implicit def fromAsciiBuffer(value:AsciiBuffer) = value.toString
+ implicit def fromUTF8Buffer(value:UTF8Buffer) = value.toString
+
+ implicit def toAsciiBuffer(value:Buffer) = value.ascii
+ implicit def toUTF8Buffer(value:Buffer) = value.utf8
+}
+
+import BufferConversions._
+
+object BrokerConstants extends Log {
+ val CONFIGURATION = "CONFIGURATION"
+ val STOPPED = "STOPPED"
+ val STARTING = "STARTING"
+ val STOPPING = "STOPPING"
+ val RUNNING = "RUNNING"
+ val UNKNOWN = "UNKNOWN"
+
+ val DEFAULT_VIRTUAL_HOST_NAME = new AsciiBuffer("default")
+}
+
+class Broker() extends Service with Logging {
+
+ import BrokerConstants._
+ override protected def log = BrokerConstants
+
+ class BrokerAcceptListener extends TransportAcceptListener {
+ def onAcceptError(error: Exception): Unit = {
+ warn("Accept error: " + error)
+ debug("Accept error details: ", error)
+ }
+
+ def onAccept(transport: Transport): Unit = {
+ var connection = new BrokerConnection(Broker.this)
+ connection.transport = transport
+ clientConnections.add(connection)
+ try {
+ connection.start
+ }
+ catch {
+ case e1: Exception => {
+ onAcceptError(e1)
+ }
+ }
+ }
+ }
+
+ val q = createQueue("broker");
+
+ var connectUris: List[String] = Nil
+ val virtualHosts: LinkedHashMap[AsciiBuffer, VirtualHost] = new LinkedHashMap[AsciiBuffer, VirtualHost]
+ val transportServers: ArrayList[TransportServer] = new ArrayList[TransportServer]
+ val clientConnections: ArrayList[Connection] = new ArrayList[Connection]
+ var dataDirectory: File = null
+ var state = CONFIGURATION
+ var name = "broker";
+ var defaultVirtualHost: VirtualHost = null
+
+ def removeConnectUri(uri: String): Unit = ^ {
+ this.connectUris = this.connectUris.filterNot(_==uri)
+ } ->: q
+
+ def getVirtualHost(name: AsciiBuffer, cb: (VirtualHost) => Unit) = callback(cb) {
+ virtualHosts.get(name)
+ } ->: q
+
+ def getConnectUris(cb: (List[String]) => Unit) = callback(cb) {
+ connectUris
+ } ->: q
+
+
+ def getDefaultVirtualHost(cb: (VirtualHost) => Unit) = callback(cb) {
+ defaultVirtualHost
+ } ->: q
+
+ def addVirtualHost(host: VirtualHost) = ^ {
+ if (host.names.isEmpty) {
+ throw new IllegalArgumentException("Virtual host must be configured with at least one host name.")
+ }
+ for (name <- host.names) {
+ if (virtualHosts.containsKey(name)) {
+ throw new IllegalArgumentException("Virtual host with host name " + name + " already exists.")
+ }
+ }
+ for (name <- host.names) {
+ virtualHosts.put(name, host)
+ }
+ if (defaultVirtualHost == null) {
+ setDefaultVirtualHost(host)
+ }
+ } ->: q
+
+ def addTransportServer(server: TransportServer) = ^ {
+ state match {
+ case RUNNING =>
+ start(server)
+ case CONFIGURATION =>
+ this.transportServers.add(server)
+ case _ =>
+ throw new IllegalStateException("Cannot add a transport server when broker is: " + state)
+ }
+ } ->: q
+
+ def removeTransportServer(server: TransportServer) = ^ {
+ state match {
+ case RUNNING =>
+ stopTransportServerWrapException(server)
+ case STOPPED =>
+ this.transportServers.remove(server)
+ case CONFIGURATION =>
+ this.transportServers.remove(server)
+ case _ =>
+ throw new IllegalStateException("Cannot add a transport server when broker is: " + state)
+ }
+ } ->: q
+
+
+ def getState(cb: (String) => Unit) = callback(cb) {state} ->: q
+
+
+ def addConnectUri(uri: String) = ^ {
+ this.connectUris = this.connectUris ::: uri::Nil
+ } ->: q
+
+ def removeVirtualHost(host: VirtualHost) = ^ {
+ for (name <- host.names) {
+ virtualHosts.remove(name)
+ }
+ if (host == defaultVirtualHost) {
+ if (virtualHosts.isEmpty) {
+ defaultVirtualHost = null
+ }
+ else {
+ defaultVirtualHost = virtualHosts.values.iterator.next
+ }
+ }
+ } ->: q
+
+ def setDefaultVirtualHost(defaultVirtualHost: VirtualHost) = ^ {
+ this.defaultVirtualHost = defaultVirtualHost
+ } ->: q
+
+ def getName(cb: (String) => Unit) = callback(cb) {
+ name;
+ } ->: q
+
+
+ private def start(server: TransportServer): Unit = {
+ server.setDispatchQueue(q)
+ server.setAcceptListener(new BrokerAcceptListener)
+ server.start
+ }
+
+
+ final def stop: Unit = ^ {
+ if (state == RUNNING) {
+ state = STOPPING
+
+ for (server <- transportServers) {
+ stop(server)
+ }
+ for (connection <- clientConnections) {
+ stop(connection)
+ }
+ for (virtualHost <- virtualHosts.values) {
+ stop(virtualHost)
+ }
+ state = STOPPED;
+ }
+
+ } ->: q
+
+ def getVirtualHosts(cb: (ArrayList[VirtualHost]) => Unit) = callback(cb) {
+ new ArrayList[VirtualHost](virtualHosts.values)
+ } ->: q
+
+ def getTransportServers(cb: (ArrayList[TransportServer]) => Unit) = callback(cb) {
+ new ArrayList[TransportServer](transportServers)
+ } ->: q
+
+
+
+
+ def start = ^ {
+ if (state == CONFIGURATION) {
+ // We can apply defaults now
+ if (dataDirectory == null) {
+ dataDirectory = new File(IOHelper.getDefaultDataDirectory)
+ }
+
+ if (defaultVirtualHost == null) {
+ defaultVirtualHost = new VirtualHost()
+ defaultVirtualHost.broker = Broker.this
+ defaultVirtualHost.names = DEFAULT_VIRTUAL_HOST_NAME.toString :: Nil
+ virtualHosts.put(DEFAULT_VIRTUAL_HOST_NAME, defaultVirtualHost)
+ }
+
+ state = STARTING
+
+ for (virtualHost <- virtualHosts.values) {
+ virtualHost.start
+ }
+ for (server <- transportServers) {
+ start(server)
+ }
+ state = RUNNING
+ } else {
+ warn("Can only start a broker that is in the " + CONFIGURATION + " state. Broker was " + state)
+ }
+ } ->: q
+
+ private def stopTransportServerWrapException(server: TransportServer): Unit = {
+ try {
+ server.stop
+ }
+ catch {
+ case e: Exception => {
+ throw new RuntimeException(e)
+ }
+ }
+ }
+
+
+ /**
+ * Helper method to help stop broker services and log error if they fail to start.
+ * @param server
+ */
+ private def stop(server: Service): Unit = {
+ try {
+ server.stop
+ } catch {
+ case e: Exception => {
+ warn("Could not stop " + server + ": " + e)
+ debug("Could not stop " + server + " due to: ", e)
+ }
+ }
+ }
+}
+
+
+trait QueueLifecyleListener {
+
+ /**
+ * A destination has bean created
+ *
+ * @param queue
+ */
+ def onCreate(queue:Queue);
+
+ /**
+ * A destination has bean destroyed
+ *
+ * @param queue
+ */
+ def onDestroy(queue:Queue);
+
+}
+
+
+
+
+object Queue {
+ val maxOutboundSize = 1024*1204*5
+}
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class Queue(val destination:Destination) extends BaseRetained with Route with DeliveryTarget with DeliveryProducer {
+
+
+
+ override val queue:DispatchQueue = createQueue("queue:"+destination);
+ queue.setTargetQueue(getRandomThreadQueue)
+ setDisposer(^{
+ queue.release
+ })
+
+
+ val delivery_buffer = new DeliveryBuffer
+
+ class ConsumerState(val consumer:DeliveryTargetSession) {
+ var bound=true
+
+ def deliver(value:MessageDelivery):Unit = {
+ val delivery = MessageDelivery(value)
+ delivery.setDisposer(^{
+ ^{ completed(value) } ->:queue
+ })
+ consumer.deliver(delivery);
+ delivery.release
+ }
+
+ def completed(delivery:MessageDelivery) = {
+ // Lets get back on the readyList if we are still bound.
+ if( bound ) {
+ readyConsumers.addLast(this)
+ }
+ delivery_buffer.ack(delivery)
+ }
+ }
+
+ var allConsumers = Map[DeliveryTarget,ConsumerState]()
+ val readyConsumers = new LinkedList[ConsumerState]()
+
+ def connected(consumers:List[DeliveryTarget]) = bind(consumers)
+ def bind(consumers:List[DeliveryTarget]) = retaining(consumers) {
+ for ( consumer <- consumers ) {
+ val cs = new ConsumerState(consumer.open_session(queue))
+ allConsumers += consumer->cs
+ readyConsumers.addLast(cs)
+ }
+ delivery_buffer.eventHandler.run
+ } ->: queue
+
+ def unbind(consumers:List[DeliveryTarget]) = releasing(consumers) {
+ for ( consumer <- consumers ) {
+ allConsumers.get(consumer) match {
+ case Some(cs)=>
+ cs.bound = false
+ cs.consumer.close
+ allConsumers -= consumer
+ readyConsumers.remove(cs)
+ case None=>
+ }
+ }
+ } ->: queue
+
+ def disconnected() = throw new RuntimeException("unsupported")
+
+ def collocate(value:DispatchQueue):Unit = {
+ if( value.getTargetQueue ne queue.getTargetQueue ) {
+ println(queue.getLabel+" co-locating with: "+value.getLabel);
+ this.queue.setTargetQueue(value.getTargetQueue)
+ }
+ }
+
+
+ delivery_buffer.eventHandler = ^{
+ while( !readyConsumers.isEmpty && !delivery_buffer.isEmpty ) {
+ val cs = readyConsumers.removeFirst
+ val delivery = delivery_buffer.receive
+ cs.deliver(delivery)
+ }
+ }
+
+
+ val deliveryQueue = new DeliveryCreditBufferProtocol(delivery_buffer, queue)
+ def open_session(producer_queue:DispatchQueue) = new DeliveryTargetSession {
+ val session = deliveryQueue.session(producer_queue)
+ val consumer = Queue.this
+ retain
+
+ def deliver(delivery:MessageDelivery) = session.send(delivery)
+ def close = {
+ session.close
+ release
+ }
+ }
+
+ def matches(message:MessageDelivery) = { true }
+
+// def open_session(producer_queue:DispatchQueue) = new ConsumerSession {
+// val consumer = StompQueue.this
+// val deliveryQueue = new DeliveryOverflowBuffer(delivery_buffer)
+// retain
+//
+// def deliver(delivery:Delivery) = using(delivery) {
+// deliveryQueue.send(delivery)
+// } ->: queue
+//
+// def close = {
+// release
+// }
+// }
+
+
+}
+
+class XQueue(val destination:Destination) {
+
+// TODO:
+// private VirtualHost virtualHost;
+//
+// Queue() {
+// this.queue = queue;
+// }
+//
+// /*
+// * (non-Javadoc)
+// *
+// * @see
+// * org.apache.activemq.broker.DeliveryTarget#deliver(org.apache.activemq
+// * .broker.MessageDelivery, org.apache.activemq.flow.ISourceController)
+// */
+// public void deliver(MessageDelivery message, ISourceController<?> source) {
+// queue.add(message, source);
+// }
+//
+// public final void addSubscription(final Subscription<MessageDelivery> sub) {
+// queue.addSubscription(sub);
+// }
+//
+// public boolean removeSubscription(final Subscription<MessageDelivery> sub) {
+// return queue.removeSubscription(sub);
+// }
+//
+// public void start() throws Exception {
+// queue.start();
+// }
+//
+// public void stop() throws Exception {
+// if (queue != null) {
+// queue.stop();
+// }
+// }
+//
+// public void shutdown(Runnable onShutdown) throws Exception {
+// if (queue != null) {
+// queue.shutdown(onShutdown);
+// }
+// }
+//
+// public boolean hasSelector() {
+// return false;
+// }
+//
+// public boolean matches(MessageDelivery message) {
+// return true;
+// }
+//
+// public VirtualHost getBroker() {
+// return virtualHost;
+// }
+//
+// public void setVirtualHost(VirtualHost virtualHost) {
+// this.virtualHost = virtualHost;
+// }
+//
+// public void setDestination(Destination destination) {
+// this.destination = destination;
+// }
+//
+// public final Destination getDestination() {
+// return destination;
+// }
+//
+// public boolean isDurable() {
+// return true;
+// }
+//
+// public static class QueueSubscription implements BrokerSubscription {
+// Subscription<MessageDelivery> subscription;
+// final Queue queue;
+//
+// public QueueSubscription(Queue queue) {
+// this.queue = queue;
+// }
+//
+// /*
+// * (non-Javadoc)
+// *
+// * @see
+// * org.apache.activemq.broker.BrokerSubscription#connect(org.apache.
+// * activemq.broker.protocol.ProtocolHandler.ConsumerContext)
+// */
+// public void connect(ConsumerContext subscription) throws UserAlreadyConnectedException {
+// this.subscription = subscription;
+// queue.addSubscription(subscription);
+// }
+//
+// /*
+// * (non-Javadoc)
+// *
+// * @see
+// * org.apache.activemq.broker.BrokerSubscription#disconnect(org.apache
+// * .activemq.broker.protocol.ProtocolHandler.ConsumerContext)
+// */
+// public void disconnect(ConsumerContext context) {
+// queue.removeSubscription(subscription);
+// }
+//
+// /* (non-Javadoc)
+// * @see org.apache.activemq.broker.BrokerSubscription#getDestination()
+// */
+// public Destination getDestination() {
+// return queue.getDestination();
+// }
+// }
+
+ // TODO:
+ def matches(message:MessageDelivery) = false
+ def deliver(message:MessageDelivery) = {
+ // TODO:
+ }
+
+ def getDestination() = destination
+
+ def shutdown = {}
+}
\ No newline at end of file
Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala?rev=961068&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala Wed Jul 7 03:40:18 2010
@@ -0,0 +1,305 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker
+
+import _root_.java.util.{LinkedHashMap, ArrayList, HashMap}
+import _root_.org.apache.activemq.filter.{FilterException, BooleanExpression}
+import path.{PathFilter}
+import _root_.scala.collection.JavaConversions._
+
+trait BrokerSubscription {
+
+ def connect(consumer:ConsumerContext)
+
+ def disconnect(consumer:ConsumerContext)
+
+ def getDestination():Destination
+
+}
+
+
+class CompositeSubscription(val destination:Destination, val subscriptions:List[BrokerSubscription] ) extends BrokerSubscription {
+
+
+ def connect(consumer:ConsumerContext) = {
+ for (sub <- subscriptions) {
+ sub.connect(consumer);
+ }
+ }
+
+ def disconnect(consumer:ConsumerContext) = {
+ for (sub <- subscriptions) {
+ sub.disconnect(consumer);
+ }
+ }
+
+ def getDestination() = destination
+
+}
+
+object WildcardQueueSubscription extends Log {
+
+}
+class WildcardQueueSubscription(val host:VirtualHost, val destination:Destination, val consumer:ConsumerContext) extends BrokerSubscription with QueueLifecyleListener with Logging {
+
+ protected def log = WildcardQueueSubscription
+
+ var filter = PathFilter.parseFilter(destination.getName());
+ val childSubs = new ArrayList[BrokerSubscription]();
+
+
+ ///////////////////////////////////////////////////////////////////
+ // BrokerSubscription interface implementation
+ ///////////////////////////////////////////////////////////////////
+ def connect(cc:ConsumerContext) = {
+ assert(cc == consumer)
+// TODO:
+// val domain = host.router.getDomain(Broker.QUEUE_DOMAIN);
+// val matches = domain.route(destination.getName(), null);
+// for (target <- matches) {
+// val queue = target.asInstanceOf[Queue]
+// var childSub = host.createSubscription(consumer, queue.destination);
+// childSubs.add(childSub);
+// childSub.connect(consumer);
+// }
+ host.addDestinationLifecyleListener(this);
+ }
+
+ def disconnect(cc:ConsumerContext) = {
+ assert(cc == consumer)
+ host.removeDestinationLifecyleListener(this);
+ for (childSub <- childSubs) {
+ childSub.disconnect(cc);
+ }
+ childSubs.clear();
+ }
+
+ def getDestination() : Destination = destination
+
+ ///////////////////////////////////////////////////////////////////
+ // QueueLifecyleListener interface implementation
+ ///////////////////////////////////////////////////////////////////
+ def onCreate(queue:Queue) = {
+ if( filter.matches(queue.destination.getName()) ) {
+ try {
+ var childSub = host.createSubscription(consumer, queue.destination);
+ childSubs.add(childSub);
+ childSub.connect(consumer);
+ } catch {
+ case e:Exception=>
+ warn("Could not create dynamic subscription to "+queue.destination+": "+e);
+ debug("Could not create dynamic subscription to "+queue.destination+": ", e);
+ }
+ }
+ }
+
+ def onDestroy(queue:Queue ) = {
+ }
+
+}
+
+class TopicSubscription { // extends BrokerSubscription with DeliveryTarget {
+ def matches(message:MessageDelivery) = false
+ def deliver(message:MessageDelivery) = {}
+ def connect(consumer:ConsumerContext) = {}
+ def disconnect(consumer:ConsumerContext) = {}
+ def getDestination():Destination = null
+
+// static final boolean USE_PERSISTENT_QUEUES = true;
+//
+// protected final BooleanExpression selector;
+// protected final Destination destination;
+// protected Subscription<MessageDelivery> connectedSub;
+// private final VirtualHost host;
+//
+// //TODO: replace this with a base interface for queue which also support non persistent use case.
+// private IFlowQueue<MessageDelivery> queue;
+//
+// TopicSubscription(VirtualHost host, Destination destination, BooleanExpression selector) {
+// this.host = host;
+// this.selector = selector;
+// this.destination = destination;
+// }
+//
+// @Override
+// public String toString() {
+// return IntrospectionSupport.toString(this);
+// }
+//
+// /*
+// * (non-Javadoc)
+// *
+// * @see
+// * org.apache.activemq.broker.DeliveryTarget#deliver(org.apache.activemq
+// * .broker.MessageDelivery, org.apache.activemq.flow.ISourceController)
+// */
+// public final void deliver(MessageDelivery message, ISourceController<?> source) {
+// if (matches(message)) {
+// queue.add(message, source);
+// }
+// }
+//
+// /*
+// * (non-Javadoc)
+// *
+// * @see org.apache.activemq.broker.DeliveryTarget#hasSelector()
+// */
+// public boolean hasSelector() {
+// return selector != null;
+// }
+//
+// public synchronized void connect(final ConsumerContext subscription) throws UserAlreadyConnectedException {
+// if (this.connectedSub == null) {
+// if( subscription.isPersistent() ) {
+// queue = createPersistentQueue(subscription);
+// } else {
+// queue = createNonPersistentQueue(subscription);
+// }
+// queue.start();
+//
+// this.connectedSub = subscription;
+// this.queue.addSubscription(connectedSub);
+// this.host.getRouter().bind(destination, this);
+// } else if (connectedSub != subscription) {
+// throw new UserAlreadyConnectedException();
+// }
+// }
+//
+// private IFlowQueue<MessageDelivery> createNonPersistentQueue(final ConsumerContext subscription) {
+// Flow flow = new Flow(subscription.getResourceName(), false);
+// String name = subscription.getResourceName();
+// IFlowLimiter<MessageDelivery> limiter = new SizeLimiter<MessageDelivery>(100, 50);
+// ExclusiveQueue<MessageDelivery> queue = new ExclusiveQueue<MessageDelivery>(flow, name, limiter);
+// queue.setDrain( new QueueDispatchTarget<MessageDelivery>() {
+// public void drain(MessageDelivery elem, ISourceController<MessageDelivery> controller) {
+// subscription.add(elem, controller);
+// }
+// });
+// return queue;
+// }
+//
+// private IFlowQueue<MessageDelivery> createPersistentQueue(ConsumerContext subscription) {
+// ExclusivePersistentQueue<Long, MessageDelivery> queue = host.getQueueStore().createExclusivePersistentQueue();
+// return queue;
+// }
+//
+// @SuppressWarnings("unchecked")
+// private void destroyPersistentQueue(IFlowQueue<MessageDelivery> queue) {
+// ExclusivePersistentQueue<Long, MessageDelivery> pq = (ExclusivePersistentQueue<Long, MessageDelivery>) queue;
+// host.getQueueStore().deleteQueue(pq.getDescriptor());
+// }
+//
+// public synchronized void disconnect(final ConsumerContext subscription) {
+// if (connectedSub != null && connectedSub == subscription) {
+// this.host.getRouter().unbind(destination, this);
+// this.queue.removeSubscription(connectedSub);
+// this.connectedSub = null;
+//
+// queue.stop();
+// if( USE_PERSISTENT_QUEUES ) {
+// destroyPersistentQueue(queue);
+// }
+// queue=null;
+// }
+// }
+//
+//
+//
+// public boolean matches(MessageDelivery message) {
+// if (selector == null) {
+// return true;
+// }
+//
+// MessageEvaluationContext selectorContext = message.createMessageEvaluationContext();
+// selectorContext.setDestination(destination);
+// try {
+// return (selector.matches(selectorContext));
+// } catch (FilterException e) {
+// e.printStackTrace();
+// return false;
+// }
+// }
+//
+// /*
+// * (non-Javadoc)
+// *
+// * @see org.apache.activemq.broker.BrokerSubscription#getDestination()
+// */
+// public Destination getDestination() {
+// return destination;
+// }
+
+
+}
+
+class DurableSubscription(val host:VirtualHost, val destination:Destination, val selector:BooleanExpression) { // extends BrokerSubscription with DeliveryTarget {
+
+// private final IQueue<Long, MessageDelivery> queue;
+// private Subscription<MessageDelivery> connectedSub;
+ var started = false;
+// TODO:
+// this.host.router.bind(destination, this);
+
+ /* (non-Javadoc)
+ * @see org.apache.activemq.broker.BrokerSubscription#getDestination()
+ */
+ def getDestination() = destination
+
+ /* (non-Javadoc)
+ * @see org.apache.activemq.broker.DeliveryTarget#deliver(org.apache.activemq.broker.MessageDelivery, org.apache.activemq.flow.ISourceController)
+ */
+ def deliver(message:MessageDelivery ) = {
+// TODO:
+// queue.add(message, source);
+ }
+
+ def connect(subscription:ConsumerContext) = {
+// TODO:
+// if (this.connectedSub == null) {
+// this.connectedSub = subscription;
+// queue.addSubscription(connectedSub);
+// } else if (connectedSub != subscription) {
+// throw new UserAlreadyConnectedException();
+// }
+ }
+
+ def disconnect(subscription:ConsumerContext) = {
+// TODO:
+// if (connectedSub != null && connectedSub == subscription) {
+// queue.removeSubscription(connectedSub);
+// connectedSub = null;
+// }
+ }
+
+ def matches(message:MessageDelivery) = {
+ if (selector != null) {
+ var selectorContext = message.message.messageEvaluationContext
+ selectorContext.setDestination(destination);
+ try {
+ (selector.matches(selectorContext));
+ } catch {
+ case e:FilterException=>
+ e.printStackTrace();
+ false;
+ }
+ } else {
+ true;
+ }
+
+ }
+
+}