You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:57:25 UTC

svn commit: r961106 - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/ activemq-stomp/src/main/scala/org/apache/activemq...

Author: chirino
Date: Wed Jul  7 03:57:24 2010
New Revision: 961106

URL: http://svn.apache.org/viewvc?rev=961106&view=rev
Log:
renamed queue field to dispatchQueue for consistency

Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=961106&r1=961105&r2=961106&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Wed Jul  7 03:57:24 2010
@@ -260,7 +260,7 @@ trait QueueLifecyleListener {
 
 
 
-object Queue {
+object Queue extends Log {
   val maxOutboundSize = 1024*1204*5
 }
 
@@ -268,18 +268,22 @@ object Queue {
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class Queue(val destination:Destination) extends BaseRetained with Route with DeliveryConsumer with DeliveryProducer {
+class Queue(val destination:Destination) extends BaseRetained with Route with DeliveryConsumer with DeliveryProducer with DispatchLogging {
+  override protected def log = Queue
 
-  override val queue:DispatchQueue = createQueue("queue:"+destination);
-  queue.setTargetQueue(getRandomThreadQueue)
+  override val dispatchQueue:DispatchQueue = createQueue("queue:"+destination);
+  dispatchQueue.setTargetQueue(getRandomThreadQueue)
+  dispatchQueue {
+    debug("created queue for: "+destination)
+  }
 
   val delivery_buffer  = new DeliveryBuffer
   delivery_buffer.eventHandler = ^{ drain_delivery_buffer }
 
-  val session_manager = new DeliverySessionManager(delivery_buffer, queue)
+  val session_manager = new DeliverySessionManager(delivery_buffer, dispatchQueue)
 
   setDisposer(^{
-    queue.release
+    dispatchQueue.release
     session_manager.release
   })
 
@@ -289,7 +293,7 @@ class Queue(val destination:Destination)
     def deliver(value:Delivery):Unit = {
       val delivery = Delivery(value)
       delivery.setDisposer(^{
-        ^{ completed(value) } >>:queue
+        ^{ completed(value) } >>:dispatchQueue
       })
       consumer.deliver(delivery);
       delivery.release
@@ -310,12 +314,12 @@ class Queue(val destination:Destination)
   def connected(consumers:List[DeliveryConsumer]) = bind(consumers)
   def bind(consumers:List[DeliveryConsumer]) = retaining(consumers) {
       for ( consumer <- consumers ) {
-        val cs = new ConsumerState(consumer.open_session(queue))
+        val cs = new ConsumerState(consumer.open_session(dispatchQueue))
         allConsumers += consumer->cs
         readyConsumers.addLast(cs)
       }
       drain_delivery_buffer
-    } >>: queue
+    } >>: dispatchQueue
 
   def unbind(consumers:List[DeliveryConsumer]) = releasing(consumers) {
       for ( consumer <- consumers ) {
@@ -328,14 +332,14 @@ class Queue(val destination:Destination)
           case None=>
         }
       }
-    } >>: queue
+    } >>: dispatchQueue
 
   def disconnected() = throw new RuntimeException("unsupported")
 
   def collocate(value:DispatchQueue):Unit = {
-    if( value.getTargetQueue ne queue.getTargetQueue ) {
-      println(queue.getLabel+" co-locating with: "+value.getLabel);
-      this.queue.setTargetQueue(value.getTargetQueue)
+    if( value.getTargetQueue ne dispatchQueue.getTargetQueue ) {
+      println(dispatchQueue.getLabel+" co-locating with: "+value.getLabel);
+      this.dispatchQueue.setTargetQueue(value.getTargetQueue)
     }
   }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=961106&r1=961105&r2=961106&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala Wed Jul  7 03:57:24 2010
@@ -46,24 +46,21 @@ abstract class Connection() extends Tran
   import Connection._
   val id = next_id
   val dispatchQueue = createQueue(id)
-  
-  def stopped = serviceState match {
-    case STOPPED => true
-    case x:STOPPING => true
-    case _ => false
-  }
+  var stopped = true
 
   var transport:Transport = null
 
   override def toString = id
 
   override protected def _start(onCompleted:Runnable) = {
+    stopped = false
     transport.setDispatchQueue(dispatchQueue);
     transport.setTransportListener(Connection.this);
     transport.start(onCompleted)
   }
 
   override protected def _stop(onCompleted:Runnable) = {
+    stopped = true
     transport.stop(onCompleted)
   }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=961106&r1=961105&r2=961106&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jul  7 03:57:24 2010
@@ -45,7 +45,7 @@ trait DeliverySession {
  */
 trait DeliveryConsumer extends Retained {
   def matches(message:Delivery)
-  val queue:DispatchQueue;
+  val dispatchQueue:DispatchQueue;
   def open_session(producer_queue:DispatchQueue):DeliverySession
 }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=961106&r1=961105&r2=961106&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Wed Jul  7 03:57:24 2010
@@ -198,7 +198,7 @@ class Router(var queue:DispatchQueue) ex
 trait Route extends Retained {
 
   val destination:Destination
-  val queue:DispatchQueue
+  val dispatchQueue:DispatchQueue
   val metric = new AtomicLong();
 
   def connected(targets:List[DeliveryConsumer]):Unit
@@ -211,15 +211,14 @@ trait Route extends Retained {
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class DeliveryProducerRoute(val destination:Destination, val queue:DispatchQueue, val producer:DeliveryProducer) extends BaseRetained with Route with DispatchLogging {
+class DeliveryProducerRoute(val destination:Destination, override val dispatchQueue:DispatchQueue, val producer:DeliveryProducer) extends BaseRetained with Route with DispatchLogging {
 
   override protected def log = Router
-  protected def dispatchQueue:DispatchQueue = queue
 
   // Retain the queue while we are retained.
-  queue.retain
+  dispatchQueue.retain
   setDisposer(^{
-    queue.release
+    dispatchQueue.release
   })
 
   var targets = List[DeliverySession]()
@@ -227,16 +226,16 @@ class DeliveryProducerRoute(val destinat
   def connected(targets:List[DeliveryConsumer]) = retaining(targets) {
     internal_bind(targets)
     on_connected
-  } >>: queue
+  } >>: dispatchQueue
 
   def bind(targets:List[DeliveryConsumer]) = retaining(targets) {
     internal_bind(targets)
-  } >>: queue
+  } >>: dispatchQueue
 
   private def internal_bind(values:List[DeliveryConsumer]) = {
     values.foreach{ x=>
       debug("producer route attaching to conusmer.")
-      targets = x.open_session(queue) :: targets
+      targets = x.open_session(dispatchQueue) :: targets
     }
   }
 
@@ -249,7 +248,7 @@ class DeliveryProducerRoute(val destinat
       }
       rc
     }
-  } >>: queue
+  } >>: dispatchQueue
 
   def disconnected() = ^ {
     this.targets.foreach { x=>
@@ -257,7 +256,7 @@ class DeliveryProducerRoute(val destinat
       x.close
       x.consumer.release
     }    
-  } >>: queue
+  } >>: dispatchQueue
 
   protected def on_connected = {}
   protected def on_disconnected = {}

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala?rev=961106&r1=961105&r2=961106&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala Wed Jul  7 03:57:24 2010
@@ -37,7 +37,7 @@ import org.apache.activemq.util.{IOHelpe
 import scala.util.matching.Regex
 
 object BaseBrokerPerfSupport {
-  var PERFORMANCE_SAMPLES = Integer.parseInt(System.getProperty("PERFORMANCE_SAMPLES", "2"))
+  var PERFORMANCE_SAMPLES = Integer.parseInt(System.getProperty("PERFORMANCE_SAMPLES", "5"))
   var SAMPLE_PERIOD = java.lang.Long.parseLong(System.getProperty("SAMPLE_PERIOD", "3000"))
 
   // Set to use tcp IO

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961106&r1=961105&r2=961106&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul  7 03:57:24 2010
@@ -59,13 +59,13 @@ class StompProtocolHandler extends Proto
 
   class SimpleConsumer(val destination:Destination) extends BaseRetained with DeliveryConsumer {
 
-    val queue = StompProtocolHandler.this.dispatchQueue
-    val session_manager = new DeliverySessionManager(outboundChannel, queue)
+    val dispatchQueue = StompProtocolHandler.this.dispatchQueue
+    val session_manager = new DeliverySessionManager(outboundChannel, dispatchQueue)
 
-    queue.retain
+    dispatchQueue.retain
     setDisposer(^{
       session_manager.release
-      queue.release
+      dispatchQueue.release
     })
 
     def matches(message:Delivery) = true