You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:57:25 UTC
svn commit: r961106 - in /activemq/sandbox/activemq-apollo-actor:
activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/
activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/
activemq-stomp/src/main/scala/org/apache/activemq...
Author: chirino
Date: Wed Jul 7 03:57:24 2010
New Revision: 961106
URL: http://svn.apache.org/viewvc?rev=961106&view=rev
Log:
renamed queue field to dispatchQueue for consistency
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=961106&r1=961105&r2=961106&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Wed Jul 7 03:57:24 2010
@@ -260,7 +260,7 @@ trait QueueLifecyleListener {
-object Queue {
+object Queue extends Log {
val maxOutboundSize = 1024*1204*5
}
@@ -268,18 +268,22 @@ object Queue {
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class Queue(val destination:Destination) extends BaseRetained with Route with DeliveryConsumer with DeliveryProducer {
+class Queue(val destination:Destination) extends BaseRetained with Route with DeliveryConsumer with DeliveryProducer with DispatchLogging {
+ override protected def log = Queue
- override val queue:DispatchQueue = createQueue("queue:"+destination);
- queue.setTargetQueue(getRandomThreadQueue)
+ override val dispatchQueue:DispatchQueue = createQueue("queue:"+destination);
+ dispatchQueue.setTargetQueue(getRandomThreadQueue)
+ dispatchQueue {
+ debug("created queue for: "+destination)
+ }
val delivery_buffer = new DeliveryBuffer
delivery_buffer.eventHandler = ^{ drain_delivery_buffer }
- val session_manager = new DeliverySessionManager(delivery_buffer, queue)
+ val session_manager = new DeliverySessionManager(delivery_buffer, dispatchQueue)
setDisposer(^{
- queue.release
+ dispatchQueue.release
session_manager.release
})
@@ -289,7 +293,7 @@ class Queue(val destination:Destination)
def deliver(value:Delivery):Unit = {
val delivery = Delivery(value)
delivery.setDisposer(^{
- ^{ completed(value) } >>:queue
+ ^{ completed(value) } >>:dispatchQueue
})
consumer.deliver(delivery);
delivery.release
@@ -310,12 +314,12 @@ class Queue(val destination:Destination)
def connected(consumers:List[DeliveryConsumer]) = bind(consumers)
def bind(consumers:List[DeliveryConsumer]) = retaining(consumers) {
for ( consumer <- consumers ) {
- val cs = new ConsumerState(consumer.open_session(queue))
+ val cs = new ConsumerState(consumer.open_session(dispatchQueue))
allConsumers += consumer->cs
readyConsumers.addLast(cs)
}
drain_delivery_buffer
- } >>: queue
+ } >>: dispatchQueue
def unbind(consumers:List[DeliveryConsumer]) = releasing(consumers) {
for ( consumer <- consumers ) {
@@ -328,14 +332,14 @@ class Queue(val destination:Destination)
case None=>
}
}
- } >>: queue
+ } >>: dispatchQueue
def disconnected() = throw new RuntimeException("unsupported")
def collocate(value:DispatchQueue):Unit = {
- if( value.getTargetQueue ne queue.getTargetQueue ) {
- println(queue.getLabel+" co-locating with: "+value.getLabel);
- this.queue.setTargetQueue(value.getTargetQueue)
+ if( value.getTargetQueue ne dispatchQueue.getTargetQueue ) {
+ println(dispatchQueue.getLabel+" co-locating with: "+value.getLabel);
+ this.dispatchQueue.setTargetQueue(value.getTargetQueue)
}
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=961106&r1=961105&r2=961106&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala Wed Jul 7 03:57:24 2010
@@ -46,24 +46,21 @@ abstract class Connection() extends Tran
import Connection._
val id = next_id
val dispatchQueue = createQueue(id)
-
- def stopped = serviceState match {
- case STOPPED => true
- case x:STOPPING => true
- case _ => false
- }
+ var stopped = true
var transport:Transport = null
override def toString = id
override protected def _start(onCompleted:Runnable) = {
+ stopped = false
transport.setDispatchQueue(dispatchQueue);
transport.setTransportListener(Connection.this);
transport.start(onCompleted)
}
override protected def _stop(onCompleted:Runnable) = {
+ stopped = true
transport.stop(onCompleted)
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=961106&r1=961105&r2=961106&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jul 7 03:57:24 2010
@@ -45,7 +45,7 @@ trait DeliverySession {
*/
trait DeliveryConsumer extends Retained {
def matches(message:Delivery)
- val queue:DispatchQueue;
+ val dispatchQueue:DispatchQueue;
def open_session(producer_queue:DispatchQueue):DeliverySession
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=961106&r1=961105&r2=961106&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Wed Jul 7 03:57:24 2010
@@ -198,7 +198,7 @@ class Router(var queue:DispatchQueue) ex
trait Route extends Retained {
val destination:Destination
- val queue:DispatchQueue
+ val dispatchQueue:DispatchQueue
val metric = new AtomicLong();
def connected(targets:List[DeliveryConsumer]):Unit
@@ -211,15 +211,14 @@ trait Route extends Retained {
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class DeliveryProducerRoute(val destination:Destination, val queue:DispatchQueue, val producer:DeliveryProducer) extends BaseRetained with Route with DispatchLogging {
+class DeliveryProducerRoute(val destination:Destination, override val dispatchQueue:DispatchQueue, val producer:DeliveryProducer) extends BaseRetained with Route with DispatchLogging {
override protected def log = Router
- protected def dispatchQueue:DispatchQueue = queue
// Retain the queue while we are retained.
- queue.retain
+ dispatchQueue.retain
setDisposer(^{
- queue.release
+ dispatchQueue.release
})
var targets = List[DeliverySession]()
@@ -227,16 +226,16 @@ class DeliveryProducerRoute(val destinat
def connected(targets:List[DeliveryConsumer]) = retaining(targets) {
internal_bind(targets)
on_connected
- } >>: queue
+ } >>: dispatchQueue
def bind(targets:List[DeliveryConsumer]) = retaining(targets) {
internal_bind(targets)
- } >>: queue
+ } >>: dispatchQueue
private def internal_bind(values:List[DeliveryConsumer]) = {
values.foreach{ x=>
debug("producer route attaching to conusmer.")
- targets = x.open_session(queue) :: targets
+ targets = x.open_session(dispatchQueue) :: targets
}
}
@@ -249,7 +248,7 @@ class DeliveryProducerRoute(val destinat
}
rc
}
- } >>: queue
+ } >>: dispatchQueue
def disconnected() = ^ {
this.targets.foreach { x=>
@@ -257,7 +256,7 @@ class DeliveryProducerRoute(val destinat
x.close
x.consumer.release
}
- } >>: queue
+ } >>: dispatchQueue
protected def on_connected = {}
protected def on_disconnected = {}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala?rev=961106&r1=961105&r2=961106&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala Wed Jul 7 03:57:24 2010
@@ -37,7 +37,7 @@ import org.apache.activemq.util.{IOHelpe
import scala.util.matching.Regex
object BaseBrokerPerfSupport {
- var PERFORMANCE_SAMPLES = Integer.parseInt(System.getProperty("PERFORMANCE_SAMPLES", "2"))
+ var PERFORMANCE_SAMPLES = Integer.parseInt(System.getProperty("PERFORMANCE_SAMPLES", "5"))
var SAMPLE_PERIOD = java.lang.Long.parseLong(System.getProperty("SAMPLE_PERIOD", "3000"))
// Set to use tcp IO
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961106&r1=961105&r2=961106&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul 7 03:57:24 2010
@@ -59,13 +59,13 @@ class StompProtocolHandler extends Proto
class SimpleConsumer(val destination:Destination) extends BaseRetained with DeliveryConsumer {
- val queue = StompProtocolHandler.this.dispatchQueue
- val session_manager = new DeliverySessionManager(outboundChannel, queue)
+ val dispatchQueue = StompProtocolHandler.this.dispatchQueue
+ val session_manager = new DeliverySessionManager(outboundChannel, dispatchQueue)
- queue.retain
+ dispatchQueue.retain
setDisposer(^{
session_manager.release
- queue.release
+ dispatchQueue.release
})
def matches(message:Delivery) = true