You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/02/11 04:21:48 UTC
svn commit: r1069669 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/
apollo-dto/src/main/java/org/apache/activemq/apollo/dto/
Author: chirino
Date: Fri Feb 11 03:21:46 2011
New Revision: 1069669
URL: http://svn.apache.org/viewvc?rev=1069669&view=rev
Log:
Updates needed due to changes in the latest HawtDispatch interface.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/FileZeroCopyBufferAllocator.scala
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerDTO.java
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=1069669&r1=1069668&r2=1069669&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Fri Feb 11 03:21:46 2011
@@ -19,7 +19,6 @@ package org.apache.activemq.apollo.broke
import _root_.java.io.File
import _root_.java.lang.String
import org.fusesource.hawtdispatch._
-import org.fusesource.hawtdispatch.Dispatch
import org.fusesource.hawtbuf._
import AsciiBuffer._
import collection.JavaConversions
@@ -32,6 +31,7 @@ import security.{AclAuthorizer, Authoriz
import java.net.InetSocketAddress
import org.apache.activemq.apollo.broker.web._
import collection.mutable.{HashSet, LinkedHashMap}
+import scala.util.Random
/**
* <p>
@@ -143,8 +143,6 @@ object Broker extends Log {
val broker_id_counter = new AtomicLong()
- val STICK_ON_THREAD_QUEUES = true
-
def class_loader:ClassLoader = ClassFinder.class_loader
/**
@@ -204,9 +202,6 @@ class Broker() extends BaseService {
var connectors: List[Connector] = Nil
val dispatch_queue = createQueue("broker") // getGlobalQueue(DispatchPriority.HIGH).createQueue("broker")
- if( STICK_ON_THREAD_QUEUES ) {
- dispatch_queue.setTargetQueue(Dispatch.getRandomThreadQueue)
- }
val id = broker_id_counter.incrementAndGet
@@ -240,15 +235,27 @@ class Broker() extends BaseService {
var authenticator:Authenticator = _
var authorizer:Authorizer = _
+ def init_dispatch_queue(dispatch_queue:DispatchQueue) = {
+ import OptionSupport._
+ if( config.sticky_dispatching.getOrElse(true) ) {
+ val queues = getThreadQueues()
+ val queue = queues(Random.nextInt(queues.length));
+ dispatch_queue.setTargetQueue(queue)
+ }
+ }
+
override def _start(on_completed:Runnable) = {
// create the runtime objects from the config
{
+ init_dispatch_queue(dispatch_queue)
+
if( config.key_storage!=null ) {
key_storage = new KeyStorage
key_storage.config = config.key_storage
}
+
import OptionSupport._
if( config.authentication != null && config.authentication.enabled.getOrElse(true) ) {
authenticator = new JaasAuthenticator(config.authentication)
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala?rev=1069669&r1=1069668&r2=1069669&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala Fri Feb 11 03:21:46 2011
@@ -35,8 +35,6 @@ import org.apache.activemq.apollo.util.O
*/
object Connector extends Log {
- val STICK_ON_THREAD_QUEUES = Broker.STICK_ON_THREAD_QUEUES
-
/**
* Creates a default a configuration object.
*/
@@ -99,9 +97,7 @@ class Connector(val broker:Broker, val i
connection.protocol_handler = protocol.createProtocolHandler
connection.transport = transport
- if( STICK_ON_THREAD_QUEUES ) {
- connection.dispatch_queue.setTargetQueue(Dispatch.getRandomThreadQueue)
- }
+ broker.init_dispatch_queue(connection.dispatch_queue)
// We release when it gets removed form the connections list.
connection.dispatch_queue.retain
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1069669&r1=1069668&r2=1069669&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Fri Feb 11 03:21:46 2011
@@ -55,7 +55,8 @@ class Queue(val router: LocalRouter, val
val filter = binding.message_filter
override val dispatch_queue: DispatchQueue = createQueue(binding.label);
- dispatch_queue.setTargetQueue(getRandomThreadQueue)
+ host.broker.init_dispatch_queue(dispatch_queue)
+
dispatch_queue {
debug("created queue for: " + binding.label)
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/FileZeroCopyBufferAllocator.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/FileZeroCopyBufferAllocator.scala?rev=1069669&r1=1069668&r2=1069669&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/FileZeroCopyBufferAllocator.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/FileZeroCopyBufferAllocator.scala Fri Feb 11 03:21:46 2011
@@ -504,25 +504,25 @@ class FileZeroCopyBufferAllocator(val di
def start() = {
directory.mkdirs
- val config = new DispatcherConfig()
- for( i <- 0 until config.getThreads ) {
+ var i=0;
+ for( queue <- getThreadQueues()) {
val ctx = new AllocatorContext(i)
+ ctx.queue = queue
contexts += i->ctx
- getThreadQueue(i) {
- ctx.queue = getCurrentThreadQueue
+ queue {
_current_allocator_context.set(ctx)
}
+ i += 1
}
}
def stop() = {
- val config = new DispatcherConfig()
- for( i <- 0 until config.getThreads ) {
- contexts = Map()
- getThreadQueue(i) {
+ for( queue <- getThreadQueues() ) {
+ queue {
_current_allocator_context.remove
}
}
+ contexts = Map()
}
def sync(file: Int) = {
@@ -546,7 +546,7 @@ class FileZeroCopyBufferAllocator(val di
}
def context(i:Int)(func: (AllocatorContext)=>Unit):Unit= {
- getThreadQueue(i) {
+ getThreadQueues()(i) {
func(current_allocator_context)
}
}
Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerDTO.java?rev=1069669&r1=1069668&r2=1069669&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerDTO.java Fri Feb 11 03:21:46 2011
@@ -63,4 +63,7 @@ public class BrokerDTO {
@XmlElement(name="authentication")
public AuthenticationDTO authentication;
+
+
+ public Boolean sticky_dispatching;
}