You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:57:03 UTC
svn commit: r961103 - in /activemq/sandbox/activemq-apollo-actor:
activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/
activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/jaxb/
activemq-broker/src/main/scala/org/apache/activem...
Author: chirino
Date: Wed Jul 7 03:57:02 2010
New Revision: 961103
URL: http://svn.apache.org/viewvc?rev=961103&view=rev
Log:
broker now implements a cleaner config and lifecycle
Added:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Reporter.scala
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/LoggingTracker.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/jaxb/XmlBrokerFactory.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/jaxb/XMLBrokerFactoryTest.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala
activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerDTO.java
activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectorDTO.java
activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactorySupport.java
activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServer.java
activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/BrokerRegistry.scala
activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/ConfigStore.scala
activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/ServletContextListener.scala
activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/Root.scala
activemq/sandbox/activemq-apollo-actor/activemq-web/src/test/scala/org/apache/activemq/apollo/FileConfigStoreTest.scala
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala?rev=961103&r1=961102&r2=961103&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala Wed Jul 7 03:57:02 2010
@@ -20,6 +20,8 @@ import org.apache.activemq.Service
import org.fusesource.hawtdispatch.DispatchQueue
import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
+object BaseService extends Log
+
/**
* <p>
* The BaseService provides helpers for dealing async service state.
@@ -27,7 +29,9 @@ import _root_.org.fusesource.hawtdispatc
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-trait BaseService extends Service {
+trait BaseService extends Service with Logging {
+
+ override protected def log:Log = BaseService
sealed class State {
override def toString = getClass.getSimpleName
@@ -52,7 +56,7 @@ trait BaseService extends Service {
protected class STOPPING extends State with CallbackSupport { override def isStopping = true }
protected object STOPPED extends State { override def isStopped = true }
- val dispatchQueue:DispatchQueue
+ protected val dispatchQueue:DispatchQueue
final def start() = start(null)
final def stop() = stop(null)
@@ -61,14 +65,9 @@ trait BaseService extends Service {
protected var _serviceState:State = CREATED
def serviceState = _serviceState
- private def error(msg:String) {
- try {
- throw new AssertionError(msg)
- } catch {
- case e:Exception =>
- e.printStackTrace
- }
- }
+ @volatile
+ protected var _serviceFailure:Exception = null
+ def serviceFailure = _serviceFailure
final def start(onCompleted:Runnable) = ^{
def do_start = {
@@ -83,6 +82,8 @@ trait BaseService extends Service {
}
catch {
case e:Exception =>
+ error(e, "Start failed due to %s", e)
+ _serviceFailure = e
_serviceState = FAILED
state.done
}
@@ -103,7 +104,7 @@ trait BaseService extends Service {
done
case state =>
done
- error("start should not be called from state: "+state);
+ error("Start should not be called from state: %s", state);
}
} |>>: dispatchQueue
@@ -126,6 +127,8 @@ trait BaseService extends Service {
}
catch {
case e:Exception =>
+ error(e, "Stop failed due to: %s", e)
+ _serviceFailure = e
_serviceState = FAILED
state.done
}
@@ -135,7 +138,7 @@ trait BaseService extends Service {
done
case state =>
done
- error("stop should not be called from state: "+state);
+ error("Stop should not be called from state: %s", state);
}
} |>>: dispatchQueue
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=961103&r1=961102&r2=961103&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Wed Jul 7 03:57:02 2010
@@ -17,17 +17,16 @@
package org.apache.activemq.apollo.broker
import _root_.java.io.{File}
-import _root_.org.apache.activemq.transport._
-import _root_.org.apache.activemq.Service
import _root_.java.lang.{String}
-import _root_.org.apache.activemq.util.{FactoryFinder, IOHelper}
+import _root_.org.apache.activemq.util.{FactoryFinder}
import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
-import _root_.scala.collection.JavaConversions._
-import _root_.scala.reflect.BeanProperty
import org.fusesource.hawtdispatch.{Dispatch, DispatchQueue, BaseRetained}
-import java.util.{HashSet, LinkedList, LinkedHashMap, ArrayList}
-import java.util.concurrent.{TimeUnit, CountDownLatch}
import org.fusesource.hawtbuf._
+import ReporterLevel._
+import AsciiBuffer._
+import org.apache.activemq.apollo.dto.{VirtualHostDTO, BrokerDTO}
+import collection.{JavaConversions, SortedMap}
+import java.util.LinkedList
/**
* <p>
@@ -85,19 +84,50 @@ object BufferConversions {
implicit def toUTF8Buffer(value:Buffer) = value.utf8
}
-import BufferConversions._
-object BrokerConstants extends Log {
- val CONFIGURATION = "CONFIGURATION"
- val STOPPED = "STOPPED"
- val STARTING = "STARTING"
- val STOPPING = "STOPPING"
- val RUNNING = "RUNNING"
- val UNKNOWN = "UNKNOWN"
-
- val DEFAULT_VIRTUAL_HOST_NAME = new AsciiBuffer("default")
+
+
+object Broker extends Log {
val STICK_ON_THREAD_QUEUES = true
+
+ /**
+ * Creates a default a configuration object.
+ */
+ def default() = {
+ val rc = new BrokerDTO
+ rc.id = "default"
+ rc.enabled = true
+ rc.virtualHosts.add(VirtualHost.default)
+ rc.connectors.add(Connector.default)
+ rc.basedir = "./activemq-data/default"
+ rc
+ }
+
+ /**
+ * Validates a configuration object.
+ */
+ def validate(config: BrokerDTO, reporter:Reporter):ReporterLevel = {
+ new Reporting(reporter) {
+ if( empty(config.id) ) {
+ error("Broker id must be specified.")
+ }
+ if( config.virtualHosts.isEmpty ) {
+ error("Broker must define at least one virtual host.")
+ }
+ if( empty(config.basedir) ) {
+ error("Broker basedir must be defined.")
+ }
+
+ import JavaConversions._
+ for (host <- config.virtualHosts ) {
+ result |= VirtualHost.validate(host, reporter)
+ }
+ for (connector <- config.connectors ) {
+ result |= Connector.validate(connector, reporter)
+ }
+ }.result
+ }
}
/**
@@ -109,230 +139,100 @@ object BrokerConstants extends Log {
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class Broker() extends Service with DispatchLogging {
+class Broker() extends BaseService with DispatchLogging with LoggingReporter {
- import BrokerConstants._
- override protected def log = BrokerConstants
+ import Broker._
+ override protected def log = Broker
+
+ var config: BrokerDTO = default
- // The configuration state of the broker... It can be modified directly until the broker
- // is started.
- @BeanProperty
- val connectUris: ArrayList[String] = new ArrayList[String]
- @BeanProperty
- val virtualHosts: LinkedHashMap[AsciiBuffer, VirtualHost] = new LinkedHashMap[AsciiBuffer, VirtualHost]
- @BeanProperty
- val transportServers: ArrayList[TransportServer] = new ArrayList[TransportServer]
- @BeanProperty
var dataDirectory: File = null
- @BeanProperty
- var name = "broker";
- @BeanProperty
var defaultVirtualHost: VirtualHost = null
-
- def start = runtime.start(null)
- def start(onComplete:Runnable) = runtime.start(onComplete)
-
- def stop = runtime.stop(null)
- def stop(onComplete:Runnable) = runtime.stop(onComplete)
+ var virtualHosts: Map[AsciiBuffer, VirtualHost] = Map()
+ var connectors: List[Connector] = Nil
val dispatchQueue = createQueue("broker");
-
if( STICK_ON_THREAD_QUEUES ) {
dispatchQueue.setTargetQueue(Dispatch.getRandomThreadQueue)
}
- def addVirtualHost(host: VirtualHost) = {
- if (host.names.isEmpty) {
- throw new IllegalArgumentException("Virtual host must be configured with at least one host name.")
- }
- for (name <- host.names) {
- if (virtualHosts.containsKey(name)) {
- throw new IllegalArgumentException("Virtual host with host name " + name + " already exists.")
- }
- }
- for (name <- host.names) {
- virtualHosts.put(name, host)
- }
- if (defaultVirtualHost == null) {
- defaultVirtualHost = host
- }
- }
+ def id = config.id
- // Holds the runtime state of the broker all access should be serialized
- // via a the dispatch queue and therefore all requests are setup to return
- // results via callbacks.
- object runtime {
-
- class BrokerAcceptListener extends TransportAcceptListener {
- def onAcceptError(error: Exception): Unit = {
- error.printStackTrace
- warn("Accept error: " + error)
- debug("Accept error details: ", error)
- }
-
- def onAccept(transport: Transport): Unit = {
- debug("Accepted connection from: %s", transport.getRemoteAddress)
- var connection = new BrokerConnection(Broker.this)
- connection.transport = transport
- connection.dispatchQueue.retain
- if( STICK_ON_THREAD_QUEUES ) {
- connection.dispatchQueue.setTargetQueue(Dispatch.getRandomThreadQueue)
- }
+ override def toString() = "broker: "+id
- clientConnections.add(connection)
- try {
- connection.start()
- }
- catch {
- case e1: Exception => {
- onAcceptError(e1)
- }
- }
- }
- }
+ /**
+ * Validates and then applies the configuration.
+ */
+ def configure(config: BrokerDTO, reporter:Reporter) = ^{
+ if ( validate(config, reporter) < ERROR ) {
+ this.config = config
- var state = CONFIGURATION
- val clientConnections: HashSet[Connection] = new HashSet[Connection]
+ if( serviceState.isStarted ) {
+ // TODO: apply changes while he broker is running.
+ reporter.report(WARN, "Updating broker configuration at runtime is not yet supported. You must restart the broker for the change to take effect.")
- def stopped(connection:Connection) = ^{
- if( clientConnections.remove(connection) ) {
- connection.dispatchQueue.release
}
- } >>: dispatchQueue
-
- def removeConnectUri(uri: String): Unit = ^ {
- connectUris.remove(uri)
- } >>: dispatchQueue
-
- def getVirtualHost(name: AsciiBuffer, cb: (VirtualHost) => Unit) = callback(cb) {
- virtualHosts.get(name)
- } >>: dispatchQueue
-
- def getConnectUris(cb: (ArrayList[String]) => Unit) = callback(cb) {
- new ArrayList(connectUris)
- } >>: dispatchQueue
-
-
- def getDefaultVirtualHost(cb: (VirtualHost) => Unit) = callback(cb) {
- defaultVirtualHost
- } >>: dispatchQueue
-
- def addVirtualHost(host: VirtualHost) = ^ {
- Broker.this.addVirtualHost(host)
- } >>: dispatchQueue
-
- def getState(cb: (String) => Unit) = callback(cb) {state} >>: dispatchQueue
-
- def addConnectUri(uri: String) = ^ {
- connectUris.add(uri)
- } >>: dispatchQueue
-
- def getName(cb: (String) => Unit) = callback(cb) {
- name;
- } >>: dispatchQueue
-
- def getVirtualHosts(cb: (ArrayList[VirtualHost]) => Unit) = callback(cb) {
- new ArrayList[VirtualHost](virtualHosts.values)
- } >>: dispatchQueue
-
- def getTransportServers(cb: (ArrayList[TransportServer]) => Unit) = callback(cb) {
- new ArrayList[TransportServer](transportServers)
- } >>: dispatchQueue
-
- def start(onCompleted:Runnable) = ^ {
- _start(onCompleted)
- } >>: dispatchQueue
-
- def _start(onCompleted:Runnable) = {
- if (state == CONFIGURATION) {
- // We can apply defaults now
- if (dataDirectory == null) {
- dataDirectory = new File(IOHelper.getDefaultDataDirectory)
- }
+ }
+ } >>: dispatchQueue
- if (defaultVirtualHost == null) {
- defaultVirtualHost = new VirtualHost()
- defaultVirtualHost.broker = Broker.this
- defaultVirtualHost.names = DEFAULT_VIRTUAL_HOST_NAME.toString :: Nil
- virtualHosts.put(DEFAULT_VIRTUAL_HOST_NAME, defaultVirtualHost)
- }
- state = STARTING
+ override def _start(onCompleted:Runnable) = {
- val tracker = new LoggingTracker("broker startup", dispatchQueue)
- for (virtualHost <- virtualHosts.values) {
- virtualHost.start(tracker.task("virtual host: "+virtualHost))
+ // create the runtime objects from the config
+ {
+ import JavaConversions._
+ dataDirectory = new File(config.basedir)
+ defaultVirtualHost = null
+ for (c <- config.virtualHosts) {
+ val host = new VirtualHost(this)
+ host.configure(c, this)
+ virtualHosts += ascii(c.id)-> host
+ // first defined host is the default virtual host
+ if( defaultVirtualHost == null ) {
+ defaultVirtualHost = host
}
- for (server <- transportServers) {
- server.setDispatchQueue(dispatchQueue)
- server.setAcceptListener(new BrokerAcceptListener)
- server.start(tracker.task("transport server: "+server))
- }
- tracker.callback {
- state = RUNNING
- if( onCompleted!=null ) {
- onCompleted.run
- }
- }
-
- } else {
- warn("Can only start a broker that is in the " + CONFIGURATION + " state. Broker was " + state)
+ }
+ for (c <- config.connectors) {
+ val connector = new Connector(this)
+ connector.configure(c, this)
+ connectors ::= connector
}
}
+ // Start them up..
+ val tracker = new LoggingTracker("broker startup", dispatchQueue)
+ virtualHosts.valuesIterator.foreach( x=>
+ tracker.start(x)
+ )
+ connectors.foreach( x=>
+ tracker.start(x)
+ )
- def stop(onCompleted:Runnable): Unit = ^ {
- if (state == RUNNING) {
- state = STOPPING
- val tracker = new LoggingTracker("broker shutdown", dispatchQueue)
-
- // Stop accepting connections..
- for (server <- transportServers) {
- stopService(server,tracker)
- }
-
- // Kill client connections..
- for (connection <- clientConnections) {
- stopService(connection, tracker)
- }
-
- // Shutdown the virtual host services
- for (virtualHost <- virtualHosts.values) {
- stopService(virtualHost, tracker)
- }
-
- def stopped = {
- state = STOPPED;
+ tracker.callback(onCompleted)
+ }
- }
- tracker.callback {
- stopped
- if( onCompleted!=null ) {
- onCompleted.run
- }
- }
-
- }
- } >>: dispatchQueue
-
+ def _stop(onCompleted:Runnable): Unit = {
+ val tracker = new LoggingTracker("broker shutdown", dispatchQueue)
+ // Stop accepting connections..
+ connectors.foreach( x=>
+ tracker.stop(x)
+ )
+ // Shutdown the virtual host services
+ virtualHosts.valuesIterator.foreach( x=>
+ tracker.stop(x)
+ )
+ tracker.callback(onCompleted)
}
+ def getVirtualHost(name: AsciiBuffer, cb: (VirtualHost) => Unit) = reply(cb) {
+ virtualHosts.getOrElse(name, null)
+ } >>: dispatchQueue
+
+ def getDefaultVirtualHost(cb: (VirtualHost) => Unit) = reply(cb) {
+ defaultVirtualHost
+ } >>: dispatchQueue
-
- /**
- * Helper method to help stop broker services and log error if they fail to start.
- * @param server
- */
- private def stopService(service: Service, tracker:LoggingTracker): Unit = {
- try {
- service.stop(tracker.task(service.toString))
- } catch {
- case e: Exception => {
- warn(e, "Could not stop " + service + ": " + e)
- }
- }
- }
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=961103&r1=961102&r2=961103&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala Wed Jul 7 03:57:02 2010
@@ -39,7 +39,7 @@ object Connection extends Log {
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-abstract class Connection() extends TransportListener with BaseService with DispatchLogging {
+abstract class Connection() extends TransportListener with BaseService {
override protected def log = Connection
@@ -90,21 +90,21 @@ abstract class Connection() extends Tran
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class BrokerConnection(val broker: Broker) extends Connection {
+class BrokerConnection(val connector: Connector) extends Connection {
var protocol = "stomp"
var protocolHandler: ProtocolHandler = null;
override protected def _start(onCompleted:Runnable) = {
- broker.dispatchQueue.retain
+ connector.dispatchQueue.retain
protocolHandler = ProtocolHandlerFactory.createProtocolHandler(protocol)
protocolHandler.setConnection(this);
super._start(onCompleted)
}
override protected def _stop(onCompleted:Runnable) = {
- broker.runtime.stopped(this)
- broker.dispatchQueue.release
+ connector.stopped(this)
+ connector.dispatchQueue.release
super._stop(onCompleted)
}
Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala?rev=961103&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala Wed Jul 7 03:57:02 2010
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker
+
+import _root_.java.io.{File}
+import _root_.org.apache.activemq.transport._
+import _root_.org.apache.activemq.Service
+import _root_.java.lang.{String}
+import _root_.org.apache.activemq.util.{FactoryFinder, IOHelper}
+import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
+import _root_.scala.reflect.BeanProperty
+import org.fusesource.hawtdispatch.{Dispatch, DispatchQueue, BaseRetained}
+import java.util.{HashSet, LinkedList, LinkedHashMap, ArrayList}
+import org.fusesource.hawtbuf._
+import collection.JavaConversions
+import org.apache.activemq.apollo.dto.{ConnectorDTO, BrokerDTO}
+import JavaConversions._
+import org.apache.activemq.wireformat.WireFormatFactory
+import ReporterLevel._
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object Connector extends Log {
+
+ val STICK_ON_THREAD_QUEUES = Broker.STICK_ON_THREAD_QUEUES
+
+ /**
+ * Creates a default a configuration object.
+ */
+ def default() = {
+ val rc = new ConnectorDTO
+ rc.id = "default"
+ rc.enabled = true
+ rc.advertise = "tcp://localhost:61616"
+ rc.bind = "tcp://0.0.0.0:61616"
+ rc.protocol = "multi"
+ rc
+ }
+
+ /**
+ * Validates a configuration object.
+ */
+ def validate(config: ConnectorDTO, reporter:Reporter):ReporterLevel = {
+ new Reporting(reporter) {
+ if( empty(config.id) ) {
+ error("Connector id must be specified")
+ }
+ }.result
+ }
+
+
+}
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class Connector(val broker:Broker) extends BaseService with DispatchLogging {
+ import Connector._
+
+ override protected def log = Connector
+ override val dispatchQueue = broker.dispatchQueue
+
+ var config:ConnectorDTO = default
+ var transportServer:TransportServer = _
+ var wireFormatFactory:WireFormatFactory = _
+
+ val connections: HashSet[Connection] = new HashSet[Connection]
+
+ override def toString = "connector: "+config.id
+
+ object BrokerAcceptListener extends TransportAcceptListener {
+ def onAcceptError(error: Exception): Unit = {
+ error.printStackTrace
+ warn("Accept error: " + error)
+ debug("Accept error details: ", error)
+ }
+
+ def onAccept(transport: Transport): Unit = {
+ debug("Accepted connection from: %s", transport.getRemoteAddress)
+
+ if( wireFormatFactory!=null ) {
+ transport.setWireformat(wireFormatFactory.createWireFormat)
+ }
+
+ var connection = new BrokerConnection(Connector.this)
+ connection.transport = transport
+
+ if( STICK_ON_THREAD_QUEUES ) {
+ connection.dispatchQueue.setTargetQueue(Dispatch.getRandomThreadQueue)
+ }
+
+ // We release when it gets removed form the connections list.
+ connection.dispatchQueue.retain
+ connections.add(connection)
+
+ try {
+ connection.start()
+ } catch {
+ case e1: Exception => {
+ onAcceptError(e1)
+ }
+ }
+ }
+ }
+
+
+ /**
+ * Validates and then applies the configuration.
+ */
+ def configure(config: ConnectorDTO, reporter:Reporter) = ^{
+ if ( validate(config, reporter) < ERROR ) {
+ this.config = config
+
+ if( serviceState.isStarted ) {
+ // TODO: apply changes while running
+ reporter.report(WARN, "Updating connector configuration at runtime is not yet supported. You must restart the broker for the change to take effect.")
+
+ }
+ }
+ } |>>: dispatchQueue
+
+
+ override def _start(onCompleted:Runnable) = {
+ assert(config!=null, "Connector must be configured before it is started.")
+ wireFormatFactory = TransportFactorySupport.createWireFormatFactory(config.protocol)
+
+ transportServer = TransportFactory.bind( config.bind )
+ transportServer.setDispatchQueue(dispatchQueue)
+ transportServer.setAcceptListener(BrokerAcceptListener)
+ transportServer.start(onCompleted)
+ }
+
+
+ override def _stop(onCompleted:Runnable): Unit = {
+ val tracker = new LoggingTracker(toString, dispatchQueue)
+
+ // This odd usage of tracker is because we don't want
+ // to kill client connections until the server is
+ // stopped. Since the connections list could change between
+ // now and when the server actually stops.
+ val task = tracker.task(transportServer.toString)
+ transportServer.stop(^{
+ for (connection <- connections) {
+ tracker.stop(connection)
+ }
+ task.run
+ })
+ tracker.callback(onCompleted)
+ }
+
+ /**
+ * Connections callback into the connector when they are stopped so that we can
+ * stop tracking them.
+ */
+ def stopped(connection:Connection) = ^{
+ if( connections.remove(connection) ) {
+ connection.dispatchQueue.release
+ }
+ } |>>: dispatchQueue
+
+}
\ No newline at end of file
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/LoggingTracker.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/LoggingTracker.scala?rev=961103&r1=961102&r2=961103&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/LoggingTracker.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/LoggingTracker.scala Wed Jul 7 03:57:02 2010
@@ -20,6 +20,7 @@ import java.util.concurrent.{CountDownLa
import java.util.HashSet
import org.fusesource.hawtdispatch.ScalaDispatch._
import org.fusesource.hawtdispatch.{TaskTracker, DispatchQueue}
+import org.apache.activemq.Service
/**
* <p>
@@ -40,6 +41,14 @@ class LoggingTracker(name:String, parent
timeout
}
+ def start(service:Service) = {
+ service.start(task(service.toString))
+ }
+
+ def stop(service:Service) = {
+ service.stop(task(service.toString))
+ }
+
}
object LoggingTracker extends Log {
Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Reporter.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Reporter.scala?rev=961103&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Reporter.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Reporter.scala Wed Jul 7 03:57:02 2010
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker
+
+import _root_.java.lang.{String}
+import java.util.{LinkedHashMap}
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object ReporterLevel extends Enumeration {
+ type ReporterLevel = Value
+ val INFO, WARN, ERROR = Value
+
+ class RichReporterLevel(self:ReporterLevel) {
+ def | (other:ReporterLevel):ReporterLevel = {
+ if( other > self ) {
+ other
+ } else {
+ self
+ }
+ }
+ }
+
+ implicit def toRichReporterLevel(level:ReporterLevel) = new RichReporterLevel(level)
+}
+import ReporterLevel._
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+trait Reporter {
+ def report(level:ReporterLevel, message:String) = {}
+}
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+trait LoggingReporter extends Logging with Reporter {
+ override def report(level:ReporterLevel, message:String) = {
+ level match {
+ case INFO=>
+ info(message)
+ case WARN=>
+ warn(message)
+ case ERROR=>
+ error(message)
+ }
+ }
+}
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class Reporting(reporter:Reporter) {
+ var result = INFO
+
+ protected def warn(msg:String) = {
+ reporter.report(WARN, msg)
+ result |= WARN
+ }
+
+ protected def error(msg:String) = {
+ reporter.report(ERROR, msg)
+ result |= ERROR
+ }
+
+ protected def info(msg:String) = {
+ reporter.report(INFO, msg)
+ result |= INFO
+ }
+
+ protected def empty(value:String) = value==null || value.isEmpty
+
+}
\ No newline at end of file
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=961103&r1=961102&r2=961103&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jul 7 03:57:02 2010
@@ -26,48 +26,82 @@ import _root_.scala.collection.JavaConve
import _root_.scala.reflect.BeanProperty
import path.PathFilter
import org.fusesource.hawtbuf.AsciiBuffer
+import org.apache.activemq.apollo.dto.VirtualHostDTO
+import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
+
+import ReporterLevel._
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-object VirtualHost extends Log
+object VirtualHost extends Log {
+
+ /**
+ * Creates a default a configuration object.
+ */
+ def default() = {
+ val rc = new VirtualHostDTO
+ rc.id = "default"
+ rc.enabled = true
+ rc.hostNames.add("localhost")
+ rc
+ }
+
+ /**
+ * Validates a configuration object.
+ */
+ def validate(config: VirtualHostDTO, reporter:Reporter):ReporterLevel = {
+ new Reporting(reporter) {
+ if( config.hostNames.isEmpty ) {
+ error("Virtual host must be configured with at least one host name.")
+ }
+ }.result
+ }
+
+}
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class VirtualHost() extends Service with Logging {
-
+class VirtualHost(val broker: Broker) extends BaseService with DispatchLogging {
+ import VirtualHost._
+
override protected def log = VirtualHost
+ override protected val dispatchQueue:DispatchQueue = ScalaDispatch.createQueue("virtual-host");
+ var config:VirtualHostDTO = _
private val queueStore = new BrokerQueueStore()
private val queues = new HashMap[AsciiBuffer, Queue]()
private val durableSubs = new HashMap[String, DurableSubscription]()
- private val q:DispatchQueue = ScalaDispatch.createQueue("virtual-host");
- val router = new Router(q)
-
- private var started = false;
+ val router = new Router(dispatchQueue)
- @BeanProperty
- var broker: Broker = null
- @BeanProperty
var names:List[String] = Nil;
def setNamesArray( names:ArrayList[String]) = {
this.names = names.toList
}
- @BeanProperty
var database:BrokerDatabase = new BrokerDatabase
- @BeanProperty
var transactionManager:TransactionManager = new TransactionManager
- override def toString = names.head
+ override def toString = "virtual-host: "+config.id
+ /**
+ * Validates and then applies the configuration.
+ */
+ def configure(config: VirtualHostDTO, reporter:Reporter) = ^{
+ if ( validate(config, reporter) < ERROR ) {
+ this.config = config
+
+ if( serviceState.isStarted ) {
+ // TODO: apply changes while he broker is running.
+ reporter.report(WARN, "Updating virtual host configuration at runtime is not yet supported. You must restart the broker for the change to take effect.")
- def start() = start(null)
- def start(onCompleted:Runnable):Unit = {
- if (started) {
- return;
+ }
}
+ } |>>: dispatchQueue
+
+
+ override protected def _start(onCompleted:Runnable):Unit = {
database.virtualHost = this
database.start();
@@ -76,7 +110,7 @@ class VirtualHost() extends Service with
//Recover queues:
queueStore.setDatabase(database);
- queueStore.setDispatchQueue(q);
+ queueStore.setDispatchQueue(dispatchQueue);
queueStore.loadQueues();
// Create Queue instances
@@ -96,20 +130,11 @@ class VirtualHost() extends Service with
//Recover transactions:
transactionManager.virtualHost = this
transactionManager.loadTransactions();
- started = true;
-
- if( onCompleted!=null ) {
- onCompleted.run
- }
+ onCompleted.run
}
- def stop() = start(null)
- def stop(onCompleted:Runnable):Unit = {
-
- if (!started) {
- return;
- }
+ override protected def _stop(onCompleted:Runnable):Unit = {
// TODO:
// val tmp = new ArrayList[Queue](queues.values())
@@ -125,19 +150,15 @@ class VirtualHost() extends Service with
// }
// done.await();
- database.stop();
- started = false;
- if( onCompleted!=null ) {
- onCompleted.run
- }
-
+ database.stop();
+ onCompleted.run
}
def createQueue(dest:Destination) :Queue = {
- if (!started) {
- //Queues from the store must be loaded before we can create new ones:
- throw new IllegalStateException("Can't create queue on unstarted host");
- }
+// if (!serviceState.isStarted) {
+// //Queues from the store must be loaded before we can create new ones:
+// throw new IllegalStateException("Can't create queue on unstarted host");
+// }
val queue = queues.get(dest);
// TODO:
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/jaxb/XmlBrokerFactory.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/jaxb/XmlBrokerFactory.scala?rev=961103&r1=961102&r2=961103&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/jaxb/XmlBrokerFactory.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/jaxb/XmlBrokerFactory.scala Wed Jul 7 03:57:02 2010
@@ -21,14 +21,13 @@ import javax.xml.bind.JAXBContext
import javax.xml.stream.XMLInputFactory
import org.apache.activemq.util.URISupport
import java.net.{URL, URI}
-import collection.JavaConversions._
import org.apache.activemq.apollo.broker._
import jaxb.PropertiesReader
import org.apache.activemq.apollo.dto._
-import org.apache.activemq.transport.TransportFactory
+import java.lang.String
class XmlBrokerFactory extends BrokerFactory.Handler {
-
+
def createBroker(value: String): Broker = {
try {
var brokerURI = new URI(value)
@@ -60,37 +59,22 @@ class XmlBrokerFactory extends BrokerFac
}
}
- def createMessageBroker(brokerModel: BrokerDTO): Broker = {
- val rc = new Broker()
- for (virtualHostModel <- brokerModel.virtualHosts) {
- rc.addVirtualHost(createVirtualHost(virtualHostModel))
- }
- for (connector <- brokerModel.connectors) {
- try {
- val server = TransportFactory.bind(connector.transport)
- rc.transportServers.add(server)
- } catch {
- case e:Exception=>
- throw new Exception("Unable to bind transport server '" + connector + " due to: " + e.getMessage(), e)
- }
- }
- for (connector <- brokerModel.connectors) {
- rc.connectUris.add(connector.advertise)
- }
- return rc
- }
-
+ def createMessageBroker(config: BrokerDTO): Broker = {
+ import ReporterLevel._
+ val broker = new Broker()
- def createVirtualHost(virtualHostModel: VirtualHostDTO): VirtualHost = {
- val rc = new VirtualHost()
- rc.setNamesArray(virtualHostModel.hostNames)
- if (virtualHostModel.store != null) {
- val database = new BrokerDatabase()
- database.setVirtualHost(rc)
-// TODO:
-// database.setStore( )
- rc.setDatabase(database)
+ var errorMessage = "";
+ if( broker.configure(config, new Reporter(){
+ override def report(level: ReporterLevel, message: String) = {
+ level match {
+ case ERROR=> errorMessage+=message+"\n"
+ case _=>
+ }
+ }
+ }) == ERROR ) {
+ throw new Exception("Invalid Broker Configuration:\n"+ERROR)
}
- return rc
+
+ broker
}
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala?rev=961103&r1=961102&r2=961103&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala Wed Jul 7 03:57:02 2010
@@ -34,12 +34,13 @@ import _root_.org.apache.activemq.transp
import _root_.org.apache.activemq.transport.TransportFactorySupport.verify
import _root_.scala.collection.JavaConversions._
+import org.apache.activemq.apollo.dto.ConnectorDTO
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
object VMTransportFactory extends Log {
- val DEFAULT_PIPE_NAME = BrokerConstants.DEFAULT_VIRTUAL_HOST_NAME.toString();
+ val DEFAULT_PIPE_NAME = "default"
}
/**
@@ -49,119 +50,119 @@ object VMTransportFactory extends Log {
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
class VMTransportFactory extends PipeTransportFactory with Logging {
-
import PipeTransportFactory._
import VMTransportFactory._
override protected def log = VMTransportFactory
- /**
- * This extension of the PipeTransportServer shuts down the broker
- * when all the connections are disconnected.
- *
- * @author chirino
- */
- class VmTransportServer extends PipeTransportServer {
-
- val refs = new AtomicInteger()
- var broker:Broker = null
-
- override def createClientTransport():PipeTransport = {
- refs.incrementAndGet();
- new PipeTransport(this) {
-
- val stopped = new AtomicBoolean()
-
- override def stop() = {
- if( stopped.compareAndSet(false, true) ) {
- super.stop();
- if( refs.decrementAndGet() == 0 ) {
- stopBroker();
- }
- }
- }
- };
- }
-
- def setBroker(broker:Broker) = {
- this.broker = broker;
- }
-
- def stopBroker() = {
- try {
- this.broker.stop();
- unbind(this);
- } catch {
- case e:Exception=>
- error("Failed to stop the broker gracefully: "+e);
- debug("Failed to stop the broker gracefully: ", e);
- }
- }
- }
-
- override def bind(uri:String):TransportServer = {
- new VmTransportServer();
+ /**
+ * This extension of the PipeTransportServer shuts down the broker
+ * when all the connections are disconnected.
+ *
+ * @author chirino
+ */
+ class VmTransportServer extends PipeTransportServer {
+ val refs = new AtomicInteger()
+ var broker: Broker = null
+
+ override def createClientTransport(): PipeTransport = {
+ refs.incrementAndGet();
+ new PipeTransport(this) {
+ val stopped = new AtomicBoolean()
+
+ override def stop() = {
+ if (stopped.compareAndSet(false, true)) {
+ super.stop();
+ if (refs.decrementAndGet() == 0) {
+ stopBroker();
+ }
+ }
+ }
+ };
+ }
+
+ def setBroker(broker: Broker) = {
+ this.broker = broker;
+ }
+
+ def stopBroker() = {
+ try {
+ this.broker.stop();
+ unbind(this);
+ } catch {
+ case e: Exception =>
+ error("Failed to stop the broker gracefully: " + e);
+ debug("Failed to stop the broker gracefully: ", e);
+ }
+ }
}
- override def connect(location:String):Transport = {
- try {
+ override def connect(location: String): Transport = {
+ try {
var uri = new URI(location)
- var brokerURI:String = null;
- var create = true;
- var name = uri.getHost();
- if (name == null) {
- name = DEFAULT_PIPE_NAME;
- }
-
- var options = URISupport.parseParamters(uri);
- var config = options.remove("broker").asInstanceOf[String]
- if (config != null) {
- brokerURI = config;
- }
- if ("false".equals(options.remove("create"))) {
- create = false;
- }
-
-
- var server = servers.get(name);
- if (server == null && create) {
-
- // Create the broker on demand.
- var broker = if( brokerURI == null ) {
- new Broker()
- } else {
- BrokerFactory.createBroker(brokerURI);
- }
-
- // Remove the existing pipe severs if the broker is configured with one... we want to make sure it
- // uses the one we explicitly configure here.
- for (s <- broker.transportServers ) {
- if (s.isInstanceOf[PipeTransportServer] && name == s.asInstanceOf[PipeTransportServer].getName()) {
- broker.transportServers.remove(s);
- }
- }
-
- // We want to use a vm transport server impl.
- var vmTransportServer = TransportFactory.bind("vm://" + name+"?wireFormat=null").asInstanceOf[VmTransportServer]
- vmTransportServer.setBroker(broker);
- broker.transportServers.add(vmTransportServer);
- broker.start();
-
- server = servers.get(name);
- }
-
- if (server == null) {
- throw new IOException("Server is not bound: " + name);
- }
-
- var transport = server.connect();
- verify( configure(transport, options), options);
-
- } catch {
-// case e:URISyntaxException=>
-// throw IOExceptionSupport.create(e);
- case e:Exception=>
- throw IOExceptionSupport.create(e);
- }
- }
+ var brokerURI: String = null;
+ var create = true;
+ var name = uri.getHost();
+ if (name == null) {
+ name = DEFAULT_PIPE_NAME;
+ }
+
+ var options = URISupport.parseParamters(uri);
+ var config = options.remove("broker").asInstanceOf[String]
+ if (config != null) {
+ brokerURI = config;
+ }
+ if ("false".equals(options.remove("create"))) {
+ create = false;
+ }
+
+
+ var server = servers.get(name);
+ if (server == null && create) {
+
+ // This is the connector that the broker needs.
+ val connector = Connector.default
+ connector.id = "vm"
+ connector.bind = "vm://" + name
+ connector.advertise = connector.bind
+
+ // Create the broker on demand.
+ var broker: Broker = null
+ if (brokerURI == null) {
+ // Lets create and configure it...
+ broker = new Broker()
+ broker.config = Broker.default
+ broker.config.connectors.clear
+ broker.config.connectors.add(connector)
+ } else {
+ // Use the user specified config
+ broker = BrokerFactory.createBroker(brokerURI);
+ // we need to add in the connector if it was not in the config...
+ if (broker.config.connectors.toList.filter(_.bind == connector.bind).isEmpty) {
+ broker.config.connectors.add(connector)
+ }
+ }
+
+ // TODO: get rid of this blocking wait.
+ val tracker = new LoggingTracker("vm broker startup")
+ tracker.start(broker)
+ tracker.await
+
+ server = servers.get(name)
+ }
+
+ if (server == null) {
+ throw new IOException("Server is not bound: " + name)
+ }
+
+ var transport = server.connect()
+ verify(configure(transport, options), options)
+
+ } catch {
+ // case e:URISyntaxException=>
+ // throw IOExceptionSupport.create(e)
+ case e: Exception =>
+ throw IOExceptionSupport.create(e)
+ }
+ }
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/jaxb/XMLBrokerFactoryTest.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/jaxb/XMLBrokerFactoryTest.scala?rev=961103&r1=961102&r2=961103&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/jaxb/XMLBrokerFactoryTest.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/jaxb/XMLBrokerFactoryTest.scala Wed Jul 7 03:57:02 2010
@@ -35,18 +35,19 @@ class XMLBrokerFactoryTest extends FunSu
// assertEquals("test dispatcher", p.getName())
expect(1) {
- broker.transportServers.size()
+ broker.config.connectors.size()
}
- val expected = new ArrayList[String]()
- expected.add("pipe://test1")
- expected.add("tcp://127.0.0.1:61616")
- expect(expected) {
- broker.connectUris
+ expect("pipe://test1") {
+ broker.config.connectors.get(0).bind
+ }
+
+ expect("tcp://127.0.0.1:61616") {
+ broker.config.connectors.get(1).bind
}
expect(2) {
- broker.virtualHosts.size()
+ broker.config.virtualHosts.size()
}
// Assert.assertNotNull(broker.defaultVirtualHost().getDatabase())
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala?rev=961103&r1=961102&r2=961103&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala Wed Jul 7 03:57:02 2010
@@ -42,8 +42,6 @@ object BaseBrokerPerfSupport {
// Set to use tcp IO
protected var TCP = true;
- // set to force marshalling even in the NON tcp case.
- protected var FORCE_MARSHALLING = true;
var USE_KAHA_DB = true;
var PURGE_STORE = true;
@@ -99,21 +97,17 @@ abstract class BaseBrokerPerfSupport ext
override protected def beforeAll(configMap: Map[String, Any]) = {
super.beforeAll(configMap)
if (TCP) {
- sendBrokerBindURI = "tcp://localhost:10000?wireFormat=" + getBrokerWireFormat();
- receiveBrokerBindURI = "tcp://localhost:20000?wireFormat=" + getBrokerWireFormat();
+ sendBrokerBindURI = "tcp://localhost:10000";
+ receiveBrokerBindURI = "tcp://localhost:20000";
sendBrokerConnectURI = "tcp://localhost:10000?wireFormat=" + getRemoteWireFormat();
receiveBrokerConnectURI = "tcp://localhost:20000?wireFormat=" + getRemoteWireFormat();
} else {
sendBrokerConnectURI = "pipe://SendBroker";
receiveBrokerConnectURI = "pipe://ReceiveBroker";
- if (FORCE_MARSHALLING) {
- sendBrokerBindURI = sendBrokerConnectURI + "?wireFormat=" + getBrokerWireFormat();
- receiveBrokerBindURI = receiveBrokerConnectURI + "?wireFormat=" + getBrokerWireFormat();
- } else {
- sendBrokerBindURI = sendBrokerConnectURI;
- receiveBrokerBindURI = receiveBrokerConnectURI;
- }
+
+ sendBrokerBindURI = sendBrokerConnectURI;
+ receiveBrokerBindURI = receiveBrokerConnectURI;
}
}
@@ -418,18 +412,22 @@ abstract class BaseBrokerPerfSupport ext
var consumer = createConsumer();
consumer.brokerPerfTest = this
- consumer.uri = rcvBroker.connectUris.head
+ consumer.uri = connectUri(rcvBroker)
consumer.destination = destination
consumer.name = "consumer" + (i + 1)
consumer.totalConsumerRate = totalConsumerRate
return consumer;
}
+ def connectUri(broker:Broker) = {
+ broker.config.connectors.get(0).advertise
+ }
+
def _createProducer(id: Int, destination: Destination): RemoteProducer = {
var producer = createProducer();
producer.brokerPerfTest = this
- producer.uri = sendBroker.connectUris.head
+ producer.uri = connectUri(sendBroker)
producer.producerId = id + 1
producer.name = "producer" + (id + 1)
producer.destination = destination
@@ -518,23 +516,27 @@ abstract class BaseBrokerPerfSupport ext
def getBrokerWireFormat() = "multi"
def getRemoteWireFormat(): String
+
def createBroker(name: String, bindURI: String, connectUri: String): Broker = {
val broker = new Broker()
- broker.transportServers.add(TransportFactory.bind(bindURI))
- broker.connectUris.add(connectUri)
+ broker.config = Broker.default
+ val connector = broker.config.connectors.get(0)
+ connector.bind = bindURI
+ connector.advertise = connectUri
+ connector.protocol = getBrokerWireFormat
broker
}
- def createStore(broker: Broker): Store = {
- val store = if (USE_KAHA_DB) {
- StoreFactory.createStore("hawtdb");
- } else {
- StoreFactory.createStore("memory");
- }
- store.setStoreDirectory(new File("target/test-data/broker-test/" + broker.name));
- store.setDeleteAllMessages(PURGE_STORE);
- store
- }
+// def createStore(broker: Broker): Store = {
+// val store = if (USE_KAHA_DB) {
+// StoreFactory.createStore("hawtdb");
+// } else {
+// StoreFactory.createStore("memory");
+// }
+// store.setStoreDirectory(new File("target/test-data/broker-test/" + broker.id));
+// store.setDeleteAllMessages(PURGE_STORE);
+// store
+// }
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerDTO.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerDTO.java?rev=961103&r1=961102&r2=961103&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerDTO.java Wed Jul 7 03:57:02 2010
@@ -45,6 +45,13 @@ public class BrokerDTO {
public int rev;
/**
+ * Should this broker be running?
+ */
+ @XmlAttribute(name="enabled")
+ public boolean enabled;
+
+
+ /**
* Used to track who last modified the configuration.
*/
@XmlAttribute(name="modified-by")
@@ -68,5 +75,11 @@ public class BrokerDTO {
@XmlElement(name="connectors")
public List<ConnectorDTO> connectors = new ArrayList<ConnectorDTO>();
+ /**
+ * The base data directory of the broker. It will store
+ * persistent data under it.
+ */
+ @XmlAttribute(name="basedir")
+ public String basedir;
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectorDTO.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectorDTO.java?rev=961103&r1=961102&r2=961103&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectorDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectorDTO.java Wed Jul 7 03:57:02 2010
@@ -35,10 +35,16 @@ public class ConnectorDTO {
public String id;
/**
+ * Should this connector be running?
+ */
+ @XmlAttribute(name="enabled")
+ public boolean enabled;
+
+ /**
* The transport uri which it will accept connections on.
*/
@XmlAttribute
- public String transport;
+ public String bind;
/**
* The protocol that the transport will use.
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java?rev=961103&r1=961102&r2=961103&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java Wed Jul 7 03:57:02 2010
@@ -33,6 +33,12 @@ public class VirtualHostDTO {
@XmlAttribute(name="id")
public String id;
+ /**
+ * Should this virtual host be running?
+ */
+ @XmlAttribute(name="enabled")
+ public boolean enabled;
+
@XmlElement(name="host-names", required=true)
public ArrayList<String> hostNames = new ArrayList<String>();
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala?rev=961103&r1=961102&r2=961103&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala Wed Jul 7 03:57:02 2010
@@ -16,8 +16,8 @@
*/
package org.apache.activemq.apollo.stomp
-import org.apache.activemq.apollo.broker.Broker
import org.apache.activemq.transport.TransportFactory
+import org.apache.activemq.apollo.broker.{LoggingTracker, Broker}
/**
*/
@@ -27,17 +27,21 @@ object StompBroker {
var port = 61613
def main(args:Array[String]) = {
- val uri = "tcp://"+address+":"+port+"?wireFormat=multi"
+ val uri = "tcp://"+address+":"+port
println("Starting stomp broker: "+uri)
val broker = new Broker()
-
- val server = TransportFactory.bind(uri)
- broker.transportServers.add(server)
- broker.start
-
+ val connector = broker.config.connectors.get(0)
+ connector.bind = uri
+ connector.protocol = "stomp"
+ connector.advertise = uri
+
+ val tracker = new LoggingTracker("broker startup")
+ tracker.start(broker)
+ tracker.await
println("Startup complete.")
+
System.in.read
println("Shutting down...")
broker.stop
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961103&r1=961102&r2=961103&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul 7 03:57:02 2010
@@ -105,7 +105,7 @@ class StompProtocolHandler extends Proto
}
}
}
- connection.broker.runtime.getDefaultVirtualHost(
+ connection.connector.broker.getDefaultVirtualHost(
queue.wrap { (host)=>
this.host=host
connection.transport.resumeRead
Modified: activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java?rev=961103&r1=961102&r2=961103&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java Wed Jul 7 03:57:02 2010
@@ -52,7 +52,6 @@ public class TcpTransportFactory impleme
URI uri = new URI(location);
Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(uri));
TcpTransportServer server = createTcpTransportServer(uri);
- server.setWireFormatFactory(TransportFactorySupport.createWireFormatFactory(options));
IntrospectionSupport.setProperties(server, options);
Map<String, Object> transportOptions = IntrospectionSupport.extractProperties(options, "transport.");
server.setTransportOption(transportOptions);
Modified: activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java?rev=961103&r1=961102&r2=961103&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java Wed Jul 7 03:57:02 2010
@@ -42,7 +42,7 @@ import java.util.Map;
public class TcpTransportServer implements TransportServer {
- protected WireFormatFactory wireFormatFactory;
+// protected WireFormatFactory wireFormatFactory;
private ServerSocketChannel channel;
private TransportAcceptListener listener;
private URI bindURI;
@@ -173,14 +173,6 @@ public class TcpTransportServer implemen
acceptSource.release();
}
- public WireFormatFactory getWireFormatFactory() {
- return wireFormatFactory;
- }
-
- public void setWireFormatFactory(WireFormatFactory wireFormatFactory) {
- this.wireFormatFactory = wireFormatFactory;
- }
-
public URI getBindURI() {
return bindURI;
}
@@ -210,7 +202,6 @@ public class TcpTransportServer implemen
// options.put("startLogging", Boolean.valueOf(startLogging));
Transport transport = createTransport(socket, options);
- transport.setWireformat(wireFormatFactory.createWireFormat());
listener.onAccept(transport);
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactorySupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactorySupport.java?rev=961103&r1=961102&r2=961103&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactorySupport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactorySupport.java Wed Jul 7 03:57:02 2010
@@ -86,6 +86,23 @@ public class TransportFactorySupport{
throw IOExceptionSupport.create("Could not create wire format factory for: " + wireFormat + ", reason: " + e, e);
}
}
+ static public WireFormatFactory createWireFormatFactory(String location) throws IOException, URISyntaxException {
+ URI uri = new URI(location);
+ Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(uri));
+
+ String wireFormat = uri.getPath();
+ if( "null".equals(wireFormat) ) {
+ return null;
+ }
+
+ try {
+ WireFormatFactory wff = (WireFormatFactory)WIREFORMAT_FACTORY_FINDER.newInstance(wireFormat);
+ IntrospectionSupport.setProperties(wff, options);
+ return wff;
+ } catch (Throwable e) {
+ throw IOExceptionSupport.create("Could not create wire format factory for: " + wireFormat + ", reason: " + e, e);
+ }
+ }
static protected String getDefaultWireFormatType() {
return "default";
Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServer.java?rev=961103&r1=961102&r2=961103&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServer.java Wed Jul 7 03:57:02 2010
@@ -20,6 +20,7 @@ import java.net.InetSocketAddress;
import java.net.URI;
import org.apache.activemq.Service;
+import org.apache.activemq.wireformat.WireFormatFactory;
import org.fusesource.hawtdispatch.DispatchQueue;
/**
Modified: activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/BrokerRegistry.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/BrokerRegistry.scala?rev=961103&r1=961102&r2=961103&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/BrokerRegistry.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/BrokerRegistry.scala Wed Jul 7 03:57:02 2010
@@ -36,7 +36,7 @@ object BrokerRegistry {
}
def add(broker:Broker) = _brokers.synchronized {
- _brokers.put(broker.name, broker)
+ _brokers.put(broker.config.id, broker)
}
def remove(id:String) = _brokers.synchronized {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/ConfigStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/ConfigStore.scala?rev=961103&r1=961102&r2=961103&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/ConfigStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/ConfigStore.scala Wed Jul 7 03:57:02 2010
@@ -38,13 +38,15 @@ import java.io.{OutputStreamWriter, File
*/
trait ConfigStore extends Service {
- def listBrokerModels(cb: (List[String]) => Unit):Unit
+ def listBrokers(cb: (List[String]) => Unit):Unit
- def getBrokerModel(id:String, cb: (Option[BrokerDTO]) => Unit):Unit
+ def getBroker(id:String, cb: (Option[BrokerDTO]) => Unit):Unit
- def putBrokerModel(config:BrokerDTO, cb: (Boolean) => Unit):Unit
+ def putBroker(config:BrokerDTO, cb: (Boolean) => Unit):Unit
- def removeBrokerModel(id:String, rev:Int, cb: (Boolean) => Unit):Unit
+ def removeBroker(id:String, rev:Int, cb: (Boolean) => Unit):Unit
+
+ def forBroker(cb: (BrokerDTO)=> Unit):Unit
}
@@ -144,11 +146,17 @@ class FileConfigStore extends ConfigStor
ioWorker.shutdown
}
- def listBrokerModels(cb: (List[String]) => Unit) = callback(cb) {
+ def listBrokers(cb: (List[String]) => Unit) = reply(cb) {
List(latest.id)
} >>: dispatchQueue
- def getBrokerModel(id:String, cb: (Option[BrokerDTO]) => Unit) = callback(cb) {
+
+ def forBroker(cb: (BrokerDTO)=> Unit) = using(cb) {
+
+ }
+
+
+ def getBroker(id:String, cb: (Option[BrokerDTO]) => Unit) = reply(cb) {
if( latest.id == id ) {
Some(unmarshall(latest.data))
} else {
@@ -156,7 +164,7 @@ class FileConfigStore extends ConfigStor
}
} >>: dispatchQueue
- def putBrokerModel(config:BrokerDTO, cb: (Boolean) => Unit) = callback(cb) {
+ def putBroker(config:BrokerDTO, cb: (Boolean) => Unit) = reply(cb) {
debug("storing broker model: %s ver %d", config.id, config.rev)
if( latest.id != config.id ) {
debug("this store can only update broker: "+latest.id)
@@ -170,7 +178,7 @@ class FileConfigStore extends ConfigStor
}
} >>: dispatchQueue
- def removeBrokerModel(id:String, rev:Int, cb: (Boolean) => Unit) = callback(cb) {
+ def removeBroker(id:String, rev:Int, cb: (Boolean) => Unit) = reply(cb) {
// not supported.
false
} >>: dispatchQueue
@@ -187,7 +195,7 @@ class FileConfigStore extends ConfigStor
// has a chance to update the runtime too.
val c = unmarshall(config.data)
c.rev = config.rev
- putBrokerModel(c, null)
+ putBroker(c, null)
}
schedualNextUpdateCheck
}
@@ -213,7 +221,7 @@ class FileConfigStore extends ConfigStor
config.virtualHosts.add(host)
var connector = new ConnectorDTO
- connector.transport = "tcp://0.0.0.0:61613"
+ connector.bind = "tcp://0.0.0.0:61613"
connector.advertise = "tcp://0.0.0.0:61613"
connector.protocol = "multi"
config.connectors.add( connector )
Modified: activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/ServletContextListener.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/ServletContextListener.scala?rev=961103&r1=961102&r2=961103&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/ServletContextListener.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/ServletContextListener.scala Wed Jul 7 03:57:02 2010
@@ -24,6 +24,7 @@ import org.apache.activemq.apollo.broker
import org.apache.activemq.apollo.{FileConfigStore, ConfigStore, BrokerRegistry}
import java.io.File
import com.google.inject.{Inject, Provides, Guice, Singleton}
+import org.fusesource.hawtdispatch.ScalaDispatch._
/**
* A servlet context listener which registers
@@ -37,17 +38,28 @@ class ServletContextListener extends Gui
override def contextInitialized(servletContextEvent: ServletContextEvent) = {
-
- // todo: replace this with info accessed from a configuration store.
- // register/start brokers we are managing.
try {
BrokerRegistry.configStore = createConfigStore
- broker = createBroker();
- BrokerRegistry.add(broker);
- LoggingTracker("broker startup") { tracker=>
- broker.start(tracker.task())
+ // Brokers startup async.
+ BrokerRegistry.configStore.listBrokers { ids =>
+ ids.foreach { id=>
+ BrokerRegistry.configStore.getBroker(id, { x=>
+ x match {
+ case Some(config)=>
+ // Only start the broker up if it's enabled..
+ if( config.enabled ) {
+ val broker = new Broker()
+ broker.config = config
+ BrokerRegistry.add(broker)
+ broker.start()
+ }
+ case None =>
+ }
+ })
+ }
}
+
}
catch {
case e:Exception =>
@@ -63,9 +75,9 @@ class ServletContextListener extends Gui
// un-register/stop brokers we are managing.
if( broker!=null ) {
- BrokerRegistry.remove(broker.name);
+ BrokerRegistry.remove(broker.id);
LoggingTracker("broker shutdown") { tracker =>
- broker.stop(tracker.task(broker.name))
+ broker.stop(tracker.task(broker.id))
BrokerRegistry.configStore.stop(tracker.task("config store"))
}
}
@@ -97,11 +109,5 @@ class ServletContextListener extends Gui
store
}
- def createBroker(): Broker = {
- val broker = new Broker()
- broker.name = "default"
- broker.transportServers.add(TransportFactory.bind("tcp://localhost:10000?wireFormat=multi"))
- broker.connectUris.add("tcp://localhost:10000")
- broker
- }
+
}
\ No newline at end of file
Modified: activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/Root.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/Root.scala?rev=961103&r1=961102&r2=961103&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/Root.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/Root.scala Wed Jul 7 03:57:02 2010
@@ -60,7 +60,7 @@ class Root() extends Resource {
def get() = {
val rc = new IdListDTO
val ids = Future[List[String]] { cb=>
- configStore.listBrokerModels(cb)
+ configStore.listBrokers(cb)
}.toArray[String]
rc.ids.addAll(Arrays.asList(ids: _*))
rc
@@ -94,7 +94,7 @@ case class Broker(parent:Root, @BeanProp
private def config() = {
Future[Option[BrokerDTO]] { cb=>
- configStore.getBrokerModel(id, cb)
+ configStore.getBroker(id, cb)
}.getOrElse(result(NOT_FOUND))
}
@@ -111,14 +111,14 @@ case class Broker(parent:Root, @BeanProp
config.id = id;
config.rev = rev
Future[Boolean] { cb=>
- configStore.putBrokerModel(config, cb)
+ configStore.putBroker(config, cb)
} || result(NOT_FOUND)
}
@DELETE @Path("config/{rev}")
def delete(@PathParam("rev") rev:Int) = {
Future[Boolean] { cb=>
- configStore.removeBrokerModel(id, rev, cb)
+ configStore.removeBroker(id, rev, cb)
} || result(NOT_FOUND)
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-web/src/test/scala/org/apache/activemq/apollo/FileConfigStoreTest.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-web/src/test/scala/org/apache/activemq/apollo/FileConfigStoreTest.scala?rev=961103&r1=961102&r2=961103&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-web/src/test/scala/org/apache/activemq/apollo/FileConfigStoreTest.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-web/src/test/scala/org/apache/activemq/apollo/FileConfigStoreTest.scala Wed Jul 7 03:57:02 2010
@@ -41,7 +41,7 @@ class FileConfigStoreTest extends FunSui
expect(List("default")) {
Future[List[String]]{ x=>
- store.listBrokerModels(x)
+ store.listBrokers(x)
}
}