You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/02/11 04:21:48 UTC

svn commit: r1069669 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/ apollo-dto/src/main/java/org/apache/activemq/apollo/dto/

Author: chirino
Date: Fri Feb 11 03:21:46 2011
New Revision: 1069669

URL: http://svn.apache.org/viewvc?rev=1069669&view=rev
Log:
Updates needed due to changes in the latest HawtDispatch interface.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/FileZeroCopyBufferAllocator.scala
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerDTO.java

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=1069669&r1=1069668&r2=1069669&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Fri Feb 11 03:21:46 2011
@@ -19,7 +19,6 @@ package org.apache.activemq.apollo.broke
 import _root_.java.io.File
 import _root_.java.lang.String
 import org.fusesource.hawtdispatch._
-import org.fusesource.hawtdispatch.Dispatch
 import org.fusesource.hawtbuf._
 import AsciiBuffer._
 import collection.JavaConversions
@@ -32,6 +31,7 @@ import security.{AclAuthorizer, Authoriz
 import java.net.InetSocketAddress
 import org.apache.activemq.apollo.broker.web._
 import collection.mutable.{HashSet, LinkedHashMap}
+import scala.util.Random
 
 /**
  * <p>
@@ -143,8 +143,6 @@ object Broker extends Log {
 
   val broker_id_counter = new AtomicLong()
 
-  val STICK_ON_THREAD_QUEUES = true
-
   def class_loader:ClassLoader = ClassFinder.class_loader
 
   /**
@@ -204,9 +202,6 @@ class Broker() extends BaseService {
   var connectors: List[Connector] = Nil
 
   val dispatch_queue = createQueue("broker") // getGlobalQueue(DispatchPriority.HIGH).createQueue("broker")
-  if( STICK_ON_THREAD_QUEUES ) {
-    dispatch_queue.setTargetQueue(Dispatch.getRandomThreadQueue)
-  }
 
   val id = broker_id_counter.incrementAndGet
   
@@ -240,15 +235,27 @@ class Broker() extends BaseService {
   var authenticator:Authenticator = _
   var authorizer:Authorizer = _
 
+  def init_dispatch_queue(dispatch_queue:DispatchQueue) = {
+    import OptionSupport._
+    if( config.sticky_dispatching.getOrElse(true) ) {
+      val queues = getThreadQueues()
+      val queue = queues(Random.nextInt(queues.length));
+      dispatch_queue.setTargetQueue(queue)
+    }
+  }
+
   override def _start(on_completed:Runnable) = {
 
     // create the runtime objects from the config
     {
+      init_dispatch_queue(dispatch_queue)
+
       if( config.key_storage!=null ) {
         key_storage = new KeyStorage
         key_storage.config = config.key_storage
       }
 
+
       import OptionSupport._
       if( config.authentication != null && config.authentication.enabled.getOrElse(true) ) {
         authenticator = new JaasAuthenticator(config.authentication)

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala?rev=1069669&r1=1069668&r2=1069669&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala Fri Feb 11 03:21:46 2011
@@ -35,8 +35,6 @@ import org.apache.activemq.apollo.util.O
  */
 object Connector extends Log {
 
-  val STICK_ON_THREAD_QUEUES = Broker.STICK_ON_THREAD_QUEUES
-
   /**
    * Creates a default a configuration object.
    */
@@ -99,9 +97,7 @@ class Connector(val broker:Broker, val i
       connection.protocol_handler = protocol.createProtocolHandler
       connection.transport = transport
 
-      if( STICK_ON_THREAD_QUEUES ) {
-        connection.dispatch_queue.setTargetQueue(Dispatch.getRandomThreadQueue)
-      }
+      broker.init_dispatch_queue(connection.dispatch_queue)
 
       // We release when it gets removed form the connections list.
       connection.dispatch_queue.retain

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1069669&r1=1069668&r2=1069669&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Fri Feb 11 03:21:46 2011
@@ -55,7 +55,8 @@ class Queue(val router: LocalRouter, val
   val filter = binding.message_filter
 
   override val dispatch_queue: DispatchQueue = createQueue(binding.label);
-  dispatch_queue.setTargetQueue(getRandomThreadQueue)
+  host.broker.init_dispatch_queue(dispatch_queue)
+
   dispatch_queue {
     debug("created queue for: " + binding.label)
   }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/FileZeroCopyBufferAllocator.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/FileZeroCopyBufferAllocator.scala?rev=1069669&r1=1069668&r2=1069669&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/FileZeroCopyBufferAllocator.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/FileZeroCopyBufferAllocator.scala Fri Feb 11 03:21:46 2011
@@ -504,25 +504,25 @@ class FileZeroCopyBufferAllocator(val di
 
   def start() = {
     directory.mkdirs
-    val config = new DispatcherConfig()
-    for( i <- 0 until config.getThreads ) {
+    var i=0;
+    for( queue <- getThreadQueues()) {
       val ctx = new AllocatorContext(i)
+      ctx.queue = queue
       contexts += i->ctx
-      getThreadQueue(i) {
-        ctx.queue = getCurrentThreadQueue
+      queue {
         _current_allocator_context.set(ctx)
       }
+      i += 1
     }
   }
 
   def stop() = {
-    val config = new DispatcherConfig()
-    for( i <- 0 until config.getThreads ) {
-      contexts = Map()
-      getThreadQueue(i) {
+    for( queue <- getThreadQueues() ) {
+      queue {
         _current_allocator_context.remove
       }
     }
+    contexts = Map()
   }
 
   def sync(file: Int) = {
@@ -546,7 +546,7 @@ class FileZeroCopyBufferAllocator(val di
   }
 
   def context(i:Int)(func: (AllocatorContext)=>Unit):Unit= {
-    getThreadQueue(i) {
+    getThreadQueues()(i) {
       func(current_allocator_context)
     }
   }

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerDTO.java?rev=1069669&r1=1069668&r2=1069669&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerDTO.java Fri Feb 11 03:21:46 2011
@@ -63,4 +63,7 @@ public class BrokerDTO {
 
     @XmlElement(name="authentication")
     public AuthenticationDTO authentication;
+
+
+    public Boolean sticky_dispatching;
 }