You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:57:03 UTC

svn commit: r961103 - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/jaxb/ activemq-broker/src/main/scala/org/apache/activem...

Author: chirino
Date: Wed Jul  7 03:57:02 2010
New Revision: 961103

URL: http://svn.apache.org/viewvc?rev=961103&view=rev
Log:
broker now implements a cleaner config and lifecycle

Added:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Reporter.scala
Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/LoggingTracker.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/jaxb/XmlBrokerFactory.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/jaxb/XMLBrokerFactoryTest.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala
    activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerDTO.java
    activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectorDTO.java
    activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
    activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
    activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactorySupport.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServer.java
    activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/BrokerRegistry.scala
    activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/ConfigStore.scala
    activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/ServletContextListener.scala
    activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/Root.scala
    activemq/sandbox/activemq-apollo-actor/activemq-web/src/test/scala/org/apache/activemq/apollo/FileConfigStoreTest.scala

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala?rev=961103&r1=961102&r2=961103&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala Wed Jul  7 03:57:02 2010
@@ -20,6 +20,8 @@ import org.apache.activemq.Service
 import org.fusesource.hawtdispatch.DispatchQueue
 import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
 
+object BaseService extends Log
+
 /**
  * <p>
  * The BaseService provides helpers for dealing async service state.
@@ -27,7 +29,9 @@ import _root_.org.fusesource.hawtdispatc
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-trait BaseService extends Service {
+trait BaseService extends Service with Logging {
+
+  override protected def log:Log = BaseService
 
   sealed class State {
     override def toString = getClass.getSimpleName
@@ -52,7 +56,7 @@ trait BaseService extends Service {
   protected class  STOPPING extends State with CallbackSupport { override def isStopping = true  }
   protected object STOPPED extends State { override def isStopped = true  }
 
-  val dispatchQueue:DispatchQueue
+  protected val dispatchQueue:DispatchQueue
 
   final def start() = start(null)
   final def stop() = stop(null)
@@ -61,14 +65,9 @@ trait BaseService extends Service {
   protected var _serviceState:State = CREATED
   def serviceState = _serviceState
 
-  private def error(msg:String) {
-    try {
-      throw new AssertionError(msg)
-    } catch {
-      case e:Exception =>
-      e.printStackTrace
-    }
-  }
+  @volatile
+  protected var _serviceFailure:Exception = null
+  def serviceFailure = _serviceFailure
 
   final def start(onCompleted:Runnable) = ^{
     def do_start = {
@@ -83,6 +82,8 @@ trait BaseService extends Service {
       }
       catch {
         case e:Exception =>
+          error(e, "Start failed due to %s", e)
+          _serviceFailure = e
           _serviceState = FAILED
           state.done
       }
@@ -103,7 +104,7 @@ trait BaseService extends Service {
         done
       case state =>
         done
-        error("start should not be called from state: "+state);
+        error("Start should not be called from state: %s", state);
     }
   } |>>: dispatchQueue
 
@@ -126,6 +127,8 @@ trait BaseService extends Service {
         }
         catch {
           case e:Exception =>
+            error(e, "Stop failed due to: %s", e)
+            _serviceFailure = e
             _serviceState = FAILED
             state.done
         }
@@ -135,7 +138,7 @@ trait BaseService extends Service {
         done
       case state =>
         done
-        error("stop should not be called from state: "+state);
+        error("Stop should not be called from state: %s", state);
     }
   } |>>: dispatchQueue
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=961103&r1=961102&r2=961103&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Wed Jul  7 03:57:02 2010
@@ -17,17 +17,16 @@
 package org.apache.activemq.apollo.broker
 
 import _root_.java.io.{File}
-import _root_.org.apache.activemq.transport._
-import _root_.org.apache.activemq.Service
 import _root_.java.lang.{String}
-import _root_.org.apache.activemq.util.{FactoryFinder, IOHelper}
+import _root_.org.apache.activemq.util.{FactoryFinder}
 import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
-import _root_.scala.collection.JavaConversions._
-import _root_.scala.reflect.BeanProperty
 import org.fusesource.hawtdispatch.{Dispatch, DispatchQueue, BaseRetained}
-import java.util.{HashSet, LinkedList, LinkedHashMap, ArrayList}
-import java.util.concurrent.{TimeUnit, CountDownLatch}
 import org.fusesource.hawtbuf._
+import ReporterLevel._
+import AsciiBuffer._
+import org.apache.activemq.apollo.dto.{VirtualHostDTO, BrokerDTO}
+import collection.{JavaConversions, SortedMap}
+import java.util.LinkedList
 
 /**
  * <p>
@@ -85,19 +84,50 @@ object BufferConversions {
   implicit def toUTF8Buffer(value:Buffer) = value.utf8
 }
 
-import BufferConversions._
 
-object BrokerConstants extends Log {
-  val CONFIGURATION = "CONFIGURATION"
-  val STOPPED = "STOPPED"
-  val STARTING = "STARTING"
-  val STOPPING = "STOPPING"
-  val RUNNING = "RUNNING"
-  val UNKNOWN = "UNKNOWN"
-  
-  val DEFAULT_VIRTUAL_HOST_NAME = new AsciiBuffer("default")
+
+
+object Broker extends Log {
 
   val STICK_ON_THREAD_QUEUES = true
+
+  /**
+   * Creates a default a configuration object.
+   */
+  def default() = {
+    val rc = new BrokerDTO
+    rc.id = "default"
+    rc.enabled = true
+    rc.virtualHosts.add(VirtualHost.default)
+    rc.connectors.add(Connector.default)
+    rc.basedir = "./activemq-data/default"
+    rc
+  }
+
+  /**
+   * Validates a configuration object.
+   */
+  def validate(config: BrokerDTO, reporter:Reporter):ReporterLevel = {
+    new Reporting(reporter) {
+      if( empty(config.id) ) {
+        error("Broker id must be specified.")
+      }
+      if( config.virtualHosts.isEmpty ) {
+        error("Broker must define at least one virtual host.")
+      }
+      if( empty(config.basedir) ) {
+        error("Broker basedir must be defined.")
+      }
+
+      import JavaConversions._
+      for (host <- config.virtualHosts ) {
+        result |= VirtualHost.validate(host, reporter)
+      }
+      for (connector <- config.connectors ) {
+        result |= Connector.validate(connector, reporter)
+      }
+    }.result
+  }
 }
 
 /**
@@ -109,230 +139,100 @@ object BrokerConstants extends Log {
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class Broker() extends Service with DispatchLogging {
+class Broker() extends BaseService with DispatchLogging with LoggingReporter {
   
-  import BrokerConstants._
-  override protected def log = BrokerConstants
+  import Broker._
+  override protected def log = Broker
+
+  var config: BrokerDTO = default
 
-  // The configuration state of the broker... It can be modified directly until the broker
-  // is started.
-  @BeanProperty
-  val connectUris: ArrayList[String] = new ArrayList[String]
-  @BeanProperty
-  val virtualHosts: LinkedHashMap[AsciiBuffer, VirtualHost] = new LinkedHashMap[AsciiBuffer, VirtualHost]
-  @BeanProperty
-  val transportServers: ArrayList[TransportServer] = new ArrayList[TransportServer]
-  @BeanProperty
   var dataDirectory: File = null
-  @BeanProperty
-  var name = "broker";
-  @BeanProperty
   var defaultVirtualHost: VirtualHost = null
-
-  def start = runtime.start(null)
-  def start(onComplete:Runnable) = runtime.start(onComplete)
-
-  def stop = runtime.stop(null)
-  def stop(onComplete:Runnable) = runtime.stop(onComplete)
+  var virtualHosts: Map[AsciiBuffer, VirtualHost] = Map()
+  var connectors: List[Connector] = Nil
 
   val dispatchQueue = createQueue("broker");
-
   if( STICK_ON_THREAD_QUEUES ) {
     dispatchQueue.setTargetQueue(Dispatch.getRandomThreadQueue)
   }
 
-  def addVirtualHost(host: VirtualHost) = {
-    if (host.names.isEmpty) {
-      throw new IllegalArgumentException("Virtual host must be configured with at least one host name.")
-    }
-    for (name <- host.names) {
-      if (virtualHosts.containsKey(name)) {
-        throw new IllegalArgumentException("Virtual host with host name " + name + " already exists.")
-      }
-    }
-    for (name <- host.names) {
-      virtualHosts.put(name, host)
-    }
-    if (defaultVirtualHost == null) {
-      defaultVirtualHost = host
-    }
-  }
+  def id = config.id
 
-  // Holds the runtime state of the broker all access should be serialized
-  // via a the dispatch queue and therefore all requests are setup to return
-  // results via callbacks.
-  object runtime {
-
-    class BrokerAcceptListener extends TransportAcceptListener {
-      def onAcceptError(error: Exception): Unit = {
-        error.printStackTrace
-        warn("Accept error: " + error)
-        debug("Accept error details: ", error)
-      }
-
-      def onAccept(transport: Transport): Unit = {
-        debug("Accepted connection from: %s", transport.getRemoteAddress)
-        var connection = new BrokerConnection(Broker.this)
-        connection.transport = transport
-        connection.dispatchQueue.retain
-        if( STICK_ON_THREAD_QUEUES ) {
-          connection.dispatchQueue.setTargetQueue(Dispatch.getRandomThreadQueue)
-        }
+  override def toString() = "broker: "+id 
 
-        clientConnections.add(connection)
-        try {
-          connection.start()
-        }
-        catch {
-          case e1: Exception => {
-            onAcceptError(e1)
-          }
-        }
-      }
-    }
+  /**
+   * Validates and then applies the configuration.
+   */
+  def configure(config: BrokerDTO, reporter:Reporter) = ^{
+    if ( validate(config, reporter) < ERROR ) {
+      this.config = config
 
-    var state = CONFIGURATION
-    val clientConnections: HashSet[Connection] = new HashSet[Connection]
+      if( serviceState.isStarted ) {
+        // TODO: apply changes while he broker is running.
+        reporter.report(WARN, "Updating broker configuration at runtime is not yet supported.  You must restart the broker for the change to take effect.")
 
-    def stopped(connection:Connection) = ^{
-      if( clientConnections.remove(connection) ) {
-        connection.dispatchQueue.release
       }
-    } >>: dispatchQueue
-
-    def removeConnectUri(uri: String): Unit = ^ {
-      connectUris.remove(uri)
-    } >>: dispatchQueue
-
-    def getVirtualHost(name: AsciiBuffer, cb: (VirtualHost) => Unit) = callback(cb) {
-      virtualHosts.get(name)
-    } >>: dispatchQueue
-
-    def getConnectUris(cb: (ArrayList[String]) => Unit) = callback(cb) {
-      new ArrayList(connectUris)
-    } >>: dispatchQueue
-
-
-    def getDefaultVirtualHost(cb: (VirtualHost) => Unit) = callback(cb) {
-      defaultVirtualHost
-    } >>: dispatchQueue
-
-    def addVirtualHost(host: VirtualHost) = ^ {
-      Broker.this.addVirtualHost(host)
-    } >>: dispatchQueue
-
-    def getState(cb: (String) => Unit) = callback(cb) {state} >>: dispatchQueue
-
-    def addConnectUri(uri: String) = ^ {
-      connectUris.add(uri)
-    } >>: dispatchQueue
-
-    def getName(cb: (String) => Unit) = callback(cb) {
-      name;
-    } >>: dispatchQueue
-
-    def getVirtualHosts(cb: (ArrayList[VirtualHost]) => Unit) = callback(cb) {
-      new ArrayList[VirtualHost](virtualHosts.values)
-    } >>: dispatchQueue
-
-    def getTransportServers(cb: (ArrayList[TransportServer]) => Unit) = callback(cb) {
-      new ArrayList[TransportServer](transportServers)
-    } >>: dispatchQueue
-
-    def start(onCompleted:Runnable) = ^ {
-      _start(onCompleted)
-    } >>: dispatchQueue
-
-    def _start(onCompleted:Runnable) = {
-      if (state == CONFIGURATION) {
-        // We can apply defaults now
-        if (dataDirectory == null) {
-          dataDirectory = new File(IOHelper.getDefaultDataDirectory)
-        }
+    }
+  } >>: dispatchQueue
 
-        if (defaultVirtualHost == null) {
-          defaultVirtualHost = new VirtualHost()
-          defaultVirtualHost.broker = Broker.this
-          defaultVirtualHost.names = DEFAULT_VIRTUAL_HOST_NAME.toString :: Nil
-          virtualHosts.put(DEFAULT_VIRTUAL_HOST_NAME, defaultVirtualHost)
-        }
 
-        state = STARTING
+  override def _start(onCompleted:Runnable) = {
 
-        val tracker = new LoggingTracker("broker startup", dispatchQueue)
-        for (virtualHost <- virtualHosts.values) {
-          virtualHost.start(tracker.task("virtual host: "+virtualHost))
+    // create the runtime objects from the config
+    {
+      import JavaConversions._
+      dataDirectory = new File(config.basedir)
+      defaultVirtualHost = null
+      for (c <- config.virtualHosts) {
+        val host = new VirtualHost(this)
+        host.configure(c, this)
+        virtualHosts += ascii(c.id)-> host
+        // first defined host is the default virtual host
+        if( defaultVirtualHost == null ) {
+          defaultVirtualHost = host
         }
-        for (server <- transportServers) {
-          server.setDispatchQueue(dispatchQueue)
-          server.setAcceptListener(new BrokerAcceptListener)
-          server.start(tracker.task("transport server: "+server))
-        }
-        tracker.callback {
-          state = RUNNING
-          if( onCompleted!=null ) {
-            onCompleted.run
-          }
-        }
-
-      } else {
-        warn("Can only start a broker that is in the " + CONFIGURATION + " state.  Broker was " + state)
+      }
+      for (c <- config.connectors) {
+        val connector = new Connector(this)
+        connector.configure(c, this)
+        connectors ::= connector
       }
     }
 
+    // Start them up..
+    val tracker = new LoggingTracker("broker startup", dispatchQueue)
+    virtualHosts.valuesIterator.foreach( x=>
+      tracker.start(x)
+    )
+    connectors.foreach( x=>
+      tracker.start(x)
+    )
 
-    def stop(onCompleted:Runnable): Unit = ^ {
-      if (state == RUNNING) {
-        state = STOPPING
-        val tracker = new LoggingTracker("broker shutdown", dispatchQueue)
-
-        // Stop accepting connections..
-        for (server <- transportServers) {
-          stopService(server,tracker)
-        }
-
-        // Kill client connections..
-        for (connection <- clientConnections) {
-          stopService(connection, tracker)
-        }
-
-        // Shutdown the virtual host services
-        for (virtualHost <- virtualHosts.values) {
-          stopService(virtualHost, tracker)
-        }
-
-        def stopped = {
-          state = STOPPED;
+    tracker.callback(onCompleted)
+  }
 
-        }
 
-        tracker.callback {
-          stopped
-          if( onCompleted!=null ) {
-            onCompleted.run
-          }
-        }
-
-      }
-    } >>: dispatchQueue
-    
+  def _stop(onCompleted:Runnable): Unit = {
+    val tracker = new LoggingTracker("broker shutdown", dispatchQueue)
+    // Stop accepting connections..
+    connectors.foreach( x=>
+      tracker.stop(x)
+    )
+    // Shutdown the virtual host services
+    virtualHosts.valuesIterator.foreach( x=>
+      tracker.stop(x)
+    )
+    tracker.callback(onCompleted)
   }
 
+  def getVirtualHost(name: AsciiBuffer, cb: (VirtualHost) => Unit) = reply(cb) {
+    virtualHosts.getOrElse(name, null)
+  } >>: dispatchQueue
+
+  def getDefaultVirtualHost(cb: (VirtualHost) => Unit) = reply(cb) {
+    defaultVirtualHost
+  } >>: dispatchQueue
 
-
-  /**
-   * Helper method to help stop broker services and log error if they fail to start.
-   * @param server
-   */
-  private def stopService(service: Service, tracker:LoggingTracker): Unit = {
-    try {
-      service.stop(tracker.task(service.toString))
-    } catch {
-      case e: Exception => {
-        warn(e, "Could not stop " + service + ": " + e)
-      }
-    }
-  }
 }
 
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=961103&r1=961102&r2=961103&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala Wed Jul  7 03:57:02 2010
@@ -39,7 +39,7 @@ object Connection extends Log {
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-abstract class Connection() extends TransportListener with BaseService  with DispatchLogging {
+abstract class Connection() extends TransportListener with BaseService  {
 
   override protected def log = Connection
 
@@ -90,21 +90,21 @@ abstract class Connection() extends Tran
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class BrokerConnection(val broker: Broker) extends Connection {
+class BrokerConnection(val connector: Connector) extends Connection {
 
   var protocol = "stomp"
   var protocolHandler: ProtocolHandler = null;
 
   override protected  def _start(onCompleted:Runnable) = {
-    broker.dispatchQueue.retain
+    connector.dispatchQueue.retain
     protocolHandler = ProtocolHandlerFactory.createProtocolHandler(protocol)
     protocolHandler.setConnection(this);
     super._start(onCompleted)
   }
 
   override protected def _stop(onCompleted:Runnable) = {
-    broker.runtime.stopped(this)
-    broker.dispatchQueue.release
+    connector.stopped(this)
+    connector.dispatchQueue.release
     super._stop(onCompleted)
   }
 

Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala?rev=961103&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala Wed Jul  7 03:57:02 2010
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker
+
+import _root_.java.io.{File}
+import _root_.org.apache.activemq.transport._
+import _root_.org.apache.activemq.Service
+import _root_.java.lang.{String}
+import _root_.org.apache.activemq.util.{FactoryFinder, IOHelper}
+import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
+import _root_.scala.reflect.BeanProperty
+import org.fusesource.hawtdispatch.{Dispatch, DispatchQueue, BaseRetained}
+import java.util.{HashSet, LinkedList, LinkedHashMap, ArrayList}
+import org.fusesource.hawtbuf._
+import collection.JavaConversions
+import org.apache.activemq.apollo.dto.{ConnectorDTO, BrokerDTO}
+import JavaConversions._
+import org.apache.activemq.wireformat.WireFormatFactory
+import ReporterLevel._
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object Connector extends Log {
+
+  val STICK_ON_THREAD_QUEUES = Broker.STICK_ON_THREAD_QUEUES
+
+  /**
+   * Creates a default a configuration object.
+   */
+  def default() = {
+    val rc = new ConnectorDTO
+    rc.id = "default"
+    rc.enabled = true
+    rc.advertise = "tcp://localhost:61616"
+    rc.bind = "tcp://0.0.0.0:61616"
+    rc.protocol = "multi"
+    rc
+  }
+
+  /**
+   * Validates a configuration object.
+   */
+  def validate(config: ConnectorDTO, reporter:Reporter):ReporterLevel = {
+    new Reporting(reporter) {
+      if( empty(config.id) ) {
+        error("Connector id must be specified")
+      }
+    }.result
+  }
+
+
+}
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class Connector(val broker:Broker) extends BaseService with DispatchLogging {
+  import Connector._
+
+  override protected def log = Connector
+  override val dispatchQueue = broker.dispatchQueue
+
+  var config:ConnectorDTO = default
+  var transportServer:TransportServer = _
+  var wireFormatFactory:WireFormatFactory = _
+
+  val connections: HashSet[Connection] = new HashSet[Connection]
+
+  override def toString = "connector: "+config.id
+
+  object BrokerAcceptListener extends TransportAcceptListener {
+    def onAcceptError(error: Exception): Unit = {
+      error.printStackTrace
+      warn("Accept error: " + error)
+      debug("Accept error details: ", error)
+    }
+
+    def onAccept(transport: Transport): Unit = {
+      debug("Accepted connection from: %s", transport.getRemoteAddress)
+
+      if( wireFormatFactory!=null ) {
+        transport.setWireformat(wireFormatFactory.createWireFormat)
+      }
+
+      var connection = new BrokerConnection(Connector.this)
+      connection.transport = transport
+
+      if( STICK_ON_THREAD_QUEUES ) {
+        connection.dispatchQueue.setTargetQueue(Dispatch.getRandomThreadQueue)
+      }
+
+      // We release when it gets removed form the connections list.
+      connection.dispatchQueue.retain
+      connections.add(connection)
+
+      try {
+        connection.start()
+      } catch {
+        case e1: Exception => {
+          onAcceptError(e1)
+        }
+      }
+    }
+  }
+
+
+  /**
+   * Validates and then applies the configuration.
+   */
+  def configure(config: ConnectorDTO, reporter:Reporter) = ^{
+    if ( validate(config, reporter) < ERROR ) {
+      this.config = config
+
+      if( serviceState.isStarted ) {
+        // TODO: apply changes while running
+        reporter.report(WARN, "Updating connector configuration at runtime is not yet supported.  You must restart the broker for the change to take effect.")
+
+      }
+    }
+  } |>>: dispatchQueue
+
+
+  override def _start(onCompleted:Runnable) = {
+    assert(config!=null, "Connector must be configured before it is started.")
+    wireFormatFactory = TransportFactorySupport.createWireFormatFactory(config.protocol)
+
+    transportServer = TransportFactory.bind( config.bind )
+    transportServer.setDispatchQueue(dispatchQueue)
+    transportServer.setAcceptListener(BrokerAcceptListener)
+    transportServer.start(onCompleted)
+  }
+
+
+  override def _stop(onCompleted:Runnable): Unit = {
+    val tracker = new LoggingTracker(toString, dispatchQueue)
+
+    // This odd usage of tracker is because we don't want
+    // to kill client connections until the server is
+    // stopped. Since the connections list could change between
+    // now and when the server actually stops.
+    val task = tracker.task(transportServer.toString)
+    transportServer.stop(^{
+      for (connection <- connections) {
+        tracker.stop(connection)
+      }
+      task.run
+    })
+    tracker.callback(onCompleted)
+  }
+
+  /**
+   * Connections callback into the connector when they are stopped so that we can
+   * stop tracking them.
+   */
+  def stopped(connection:Connection) = ^{
+    if( connections.remove(connection) ) {
+      connection.dispatchQueue.release
+    }
+  } |>>: dispatchQueue
+
+}
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/LoggingTracker.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/LoggingTracker.scala?rev=961103&r1=961102&r2=961103&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/LoggingTracker.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/LoggingTracker.scala Wed Jul  7 03:57:02 2010
@@ -20,6 +20,7 @@ import java.util.concurrent.{CountDownLa
 import java.util.HashSet
 import org.fusesource.hawtdispatch.ScalaDispatch._
 import org.fusesource.hawtdispatch.{TaskTracker, DispatchQueue}
+import org.apache.activemq.Service
 
 /**
  * <p>
@@ -40,6 +41,14 @@ class LoggingTracker(name:String, parent
     timeout
   }
 
+  def start(service:Service) = {
+    service.start(task(service.toString))
+  }
+
+  def stop(service:Service) = {
+    service.stop(task(service.toString))
+  }
+
 }
 
 object LoggingTracker extends Log {

Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Reporter.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Reporter.scala?rev=961103&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Reporter.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Reporter.scala Wed Jul  7 03:57:02 2010
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker
+
+import _root_.java.lang.{String}
+import java.util.{LinkedHashMap}
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object ReporterLevel extends Enumeration {
+  type ReporterLevel = Value
+  val INFO, WARN, ERROR = Value
+
+  class RichReporterLevel(self:ReporterLevel) {
+    def | (other:ReporterLevel):ReporterLevel = {
+      if( other > self  ) {
+        other
+      } else {
+        self
+      }
+    }
+  }
+
+  implicit def toRichReporterLevel(level:ReporterLevel) = new RichReporterLevel(level)
+}
+import ReporterLevel._
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+trait Reporter {
+  def report(level:ReporterLevel, message:String) = {}
+}
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+trait LoggingReporter extends Logging with Reporter {
+  override def report(level:ReporterLevel, message:String) = {
+    level match {
+      case INFO=>
+        info(message)
+      case WARN=>
+        warn(message)
+      case ERROR=>
+        error(message)
+    }
+  }
+}
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class Reporting(reporter:Reporter) {
+  var result = INFO
+
+  protected def warn(msg:String) = {
+    reporter.report(WARN, msg)
+    result |= WARN
+  }
+
+  protected def error(msg:String) = {
+    reporter.report(ERROR, msg)
+    result |= ERROR
+  }
+
+  protected def info(msg:String) = {
+    reporter.report(INFO, msg)
+    result |= INFO
+  }
+
+  protected def empty(value:String) = value==null || value.isEmpty
+
+}
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=961103&r1=961102&r2=961103&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jul  7 03:57:02 2010
@@ -26,48 +26,82 @@ import _root_.scala.collection.JavaConve
 import _root_.scala.reflect.BeanProperty
 import path.PathFilter
 import org.fusesource.hawtbuf.AsciiBuffer
+import org.apache.activemq.apollo.dto.VirtualHostDTO
+import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
+
+import ReporterLevel._
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-object VirtualHost extends Log
+object VirtualHost extends Log {
+
+  /**
+   * Creates a default a configuration object.
+   */
+  def default() = {
+    val rc = new VirtualHostDTO
+    rc.id = "default"
+    rc.enabled = true
+    rc.hostNames.add("localhost")
+    rc
+  }
+
+  /**
+   * Validates a configuration object.
+   */
+  def validate(config: VirtualHostDTO, reporter:Reporter):ReporterLevel = {
+     new Reporting(reporter) {
+      if( config.hostNames.isEmpty ) {
+        error("Virtual host must be configured with at least one host name.")
+      }
+    }.result
+  }
+  
+}
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class VirtualHost() extends Service with Logging {
-
+class VirtualHost(val broker: Broker) extends BaseService with DispatchLogging {
+  import VirtualHost._
+  
   override protected def log = VirtualHost
+  override protected val dispatchQueue:DispatchQueue = ScalaDispatch.createQueue("virtual-host");
 
+  var config:VirtualHostDTO = _
   private val queueStore = new BrokerQueueStore()
   private val queues = new HashMap[AsciiBuffer, Queue]()
   private val durableSubs = new HashMap[String, DurableSubscription]()
-  private val q:DispatchQueue = ScalaDispatch.createQueue("virtual-host");
-  val router = new Router(q)
-
-  private var started = false;
+  val router = new Router(dispatchQueue)
 
-  @BeanProperty
-  var broker: Broker = null
-  @BeanProperty
   var names:List[String] = Nil;
   def setNamesArray( names:ArrayList[String]) = {
     this.names = names.toList
   }
 
-  @BeanProperty
   var database:BrokerDatabase = new BrokerDatabase
-  @BeanProperty
   var transactionManager:TransactionManager = new TransactionManager
 
-  override def toString = names.head
+  override def toString = "virtual-host: "+config.id
 
+  /**
+   * Validates and then applies the configuration.
+   */
+  def configure(config: VirtualHostDTO, reporter:Reporter) = ^{
+    if ( validate(config, reporter) < ERROR ) {
+      this.config = config
+
+      if( serviceState.isStarted ) {
+        // TODO: apply changes while he broker is running.
+        reporter.report(WARN, "Updating virtual host configuration at runtime is not yet supported.  You must restart the broker for the change to take effect.")
 
-  def start() = start(null)
-  def start(onCompleted:Runnable):Unit = {
-    if (started) {
-        return;
+      }
     }
+  } |>>: dispatchQueue
+
+
+  override protected def _start(onCompleted:Runnable):Unit = {
 
     database.virtualHost = this
     database.start();
@@ -76,7 +110,7 @@ class VirtualHost() extends Service with
 
     //Recover queues:
     queueStore.setDatabase(database);
-    queueStore.setDispatchQueue(q);
+    queueStore.setDispatchQueue(dispatchQueue);
     queueStore.loadQueues();
 
     // Create Queue instances
@@ -96,20 +130,11 @@ class VirtualHost() extends Service with
     //Recover transactions:
     transactionManager.virtualHost = this
     transactionManager.loadTransactions();
-    started = true;
-    
-    if( onCompleted!=null ) {
-      onCompleted.run
-    }
+    onCompleted.run
   }
 
 
-  def stop() = start(null)
-  def stop(onCompleted:Runnable):Unit = {
-
-      if (!started) {
-          return;
-      }
+  override protected def _stop(onCompleted:Runnable):Unit = {
 
 //    TODO:
 //      val tmp = new ArrayList[Queue](queues.values())
@@ -125,19 +150,15 @@ class VirtualHost() extends Service with
 //        }
 //        done.await();
 
-      database.stop();
-      started = false;
-    if( onCompleted!=null ) {
-      onCompleted.run
-    }
-    
+    database.stop();
+    onCompleted.run
   }
 
   def createQueue(dest:Destination) :Queue = {
-      if (!started) {
-          //Queues from the store must be loaded before we can create new ones:
-          throw new IllegalStateException("Can't create queue on unstarted host");
-      }
+//      if (!serviceState.isStarted) {
+//          //Queues from the store must be loaded before we can create new ones:
+//          throw new IllegalStateException("Can't create queue on unstarted host");
+//      }
 
       val queue = queues.get(dest);
 //        TODO:

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/jaxb/XmlBrokerFactory.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/jaxb/XmlBrokerFactory.scala?rev=961103&r1=961102&r2=961103&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/jaxb/XmlBrokerFactory.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/jaxb/XmlBrokerFactory.scala Wed Jul  7 03:57:02 2010
@@ -21,14 +21,13 @@ import javax.xml.bind.JAXBContext
 import javax.xml.stream.XMLInputFactory
 import org.apache.activemq.util.URISupport
 import java.net.{URL, URI}
-import collection.JavaConversions._
 import org.apache.activemq.apollo.broker._
 import jaxb.PropertiesReader
 import org.apache.activemq.apollo.dto._
-import org.apache.activemq.transport.TransportFactory
+import java.lang.String
 
 class XmlBrokerFactory extends BrokerFactory.Handler {
-  
+
   def createBroker(value: String): Broker = {
     try {
       var brokerURI = new URI(value)
@@ -60,37 +59,22 @@ class XmlBrokerFactory extends BrokerFac
     }
   }
 
-  def createMessageBroker(brokerModel: BrokerDTO): Broker = {
-    val rc = new Broker()
-    for (virtualHostModel <- brokerModel.virtualHosts) {
-      rc.addVirtualHost(createVirtualHost(virtualHostModel))
-    }
-    for (connector <- brokerModel.connectors) {
-      try {
-        val server = TransportFactory.bind(connector.transport)
-        rc.transportServers.add(server)
-      } catch {
-        case e:Exception=>
-          throw new Exception("Unable to bind transport server '" + connector + " due to: " + e.getMessage(), e)
-      }
-    }
-    for (connector <- brokerModel.connectors) {
-      rc.connectUris.add(connector.advertise)
-    }
-    return rc
-  }
-
+  def createMessageBroker(config: BrokerDTO): Broker = {
+    import ReporterLevel._
+    val broker = new Broker()
 
-  def createVirtualHost(virtualHostModel: VirtualHostDTO): VirtualHost = {
-    val rc = new VirtualHost()
-    rc.setNamesArray(virtualHostModel.hostNames)
-    if (virtualHostModel.store != null) {
-      val database = new BrokerDatabase()
-      database.setVirtualHost(rc)
-//      TODO:
-//      database.setStore( )
-      rc.setDatabase(database)
+    var errorMessage = "";
+    if( broker.configure(config, new Reporter(){
+      override def report(level: ReporterLevel, message: String) = {
+        level match {
+          case ERROR=> errorMessage+=message+"\n"
+          case _=>
+        }
+      }
+    }) == ERROR ) {
+      throw new Exception("Invalid Broker Configuration:\n"+ERROR)
     }
-    return rc
+    
+    broker
   }
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala?rev=961103&r1=961102&r2=961103&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala Wed Jul  7 03:57:02 2010
@@ -34,12 +34,13 @@ import _root_.org.apache.activemq.transp
 import _root_.org.apache.activemq.transport.TransportFactorySupport.verify
 
 import _root_.scala.collection.JavaConversions._
+import org.apache.activemq.apollo.dto.ConnectorDTO
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 object VMTransportFactory extends Log {
-  val DEFAULT_PIPE_NAME = BrokerConstants.DEFAULT_VIRTUAL_HOST_NAME.toString();
+  val DEFAULT_PIPE_NAME = "default"
 }
 
 /**
@@ -49,119 +50,119 @@ object VMTransportFactory extends Log {
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 class VMTransportFactory extends PipeTransportFactory with Logging {
-
   import PipeTransportFactory._
   import VMTransportFactory._
   override protected def log = VMTransportFactory
 
-	/**
-	 * This extension of the PipeTransportServer shuts down the broker
-	 * when all the connections are disconnected.
-	 *
-	 * @author chirino
-	 */
-	class VmTransportServer extends PipeTransportServer {
-
-		val refs = new AtomicInteger()
-		var broker:Broker = null
-
-		override def createClientTransport():PipeTransport = {
-			refs.incrementAndGet();
-			new PipeTransport(this) {
-
-				val stopped = new AtomicBoolean()
-
-				override def stop() = {
-					if( stopped.compareAndSet(false, true) ) {
-						super.stop();
-						if( refs.decrementAndGet() == 0 ) {
-							stopBroker();
-						}
-					}
-				}
-			};
-		}
-
-		def setBroker(broker:Broker) = {
-			this.broker = broker;
-		}
-
-		def stopBroker() = {
-			try {
-				this.broker.stop();
-				unbind(this);
-			} catch {
-        case e:Exception=>
-				error("Failed to stop the broker gracefully: "+e);
-				debug("Failed to stop the broker gracefully: ", e);
-			}
-		}
-	}
-
-  override def bind(uri:String):TransportServer = {
-    new VmTransportServer();
+  /**
+   * This extension of the PipeTransportServer shuts down the broker
+   * when all the connections are disconnected.
+   *
+   * @author chirino
+   */
+  class VmTransportServer extends PipeTransportServer {
+    val refs = new AtomicInteger()
+    var broker: Broker = null
+
+    override def createClientTransport(): PipeTransport = {
+      refs.incrementAndGet();
+      new PipeTransport(this) {
+        val stopped = new AtomicBoolean()
+
+        override def stop() = {
+          if (stopped.compareAndSet(false, true)) {
+            super.stop();
+            if (refs.decrementAndGet() == 0) {
+              stopBroker();
+            }
+          }
+        }
+      };
+    }
+
+    def setBroker(broker: Broker) = {
+      this.broker = broker;
+    }
+
+    def stopBroker() = {
+      try {
+        this.broker.stop();
+        unbind(this);
+      } catch {
+        case e: Exception =>
+          error("Failed to stop the broker gracefully: " + e);
+          debug("Failed to stop the broker gracefully: ", e);
+      }
+    }
   }
 
-  override def connect(location:String):Transport = {
-		try {
+  override def connect(location: String): Transport = {
+    try {
       var uri = new URI(location)
-			var brokerURI:String = null;
-			var create = true;
-			var name = uri.getHost();
-			if (name == null) {
-				name = DEFAULT_PIPE_NAME;
-			}
-
-			var options = URISupport.parseParamters(uri);
-			var config = options.remove("broker").asInstanceOf[String]
-			if (config != null) {
-				brokerURI = config;
-			}
-			if ("false".equals(options.remove("create"))) {
-				create = false;
-			}
-
-
-			var server = servers.get(name);
-			if (server == null && create) {
-
-				// Create the broker on demand.
-				var broker = if( brokerURI == null ) {
-					new Broker()
-				} else {
-					BrokerFactory.createBroker(brokerURI);
-				}
-
-				// Remove the existing pipe severs if the broker is configured with one...  we want to make sure it
-				// uses the one we explicitly configure here.
-				for (s <- broker.transportServers ) {
-					if (s.isInstanceOf[PipeTransportServer] && name == s.asInstanceOf[PipeTransportServer].getName()) {
-						broker.transportServers.remove(s);
-					}
-				}
-
-				// We want to use a vm transport server impl.
-				var vmTransportServer = TransportFactory.bind("vm://" + name+"?wireFormat=null").asInstanceOf[VmTransportServer]
-				vmTransportServer.setBroker(broker);
-				broker.transportServers.add(vmTransportServer);
-				broker.start();
-
-				server = servers.get(name);
-			}
-
-			if (server == null) {
-				throw new IOException("Server is not bound: " + name);
-			}
-
-      var transport = server.connect();
-      verify( configure(transport, options), options);
-
-		} catch {
-//      case e:URISyntaxException=>
-//  			throw IOExceptionSupport.create(e);
-      case e:Exception=>
-  			throw IOExceptionSupport.create(e);
-		}
-	}
+      var brokerURI: String = null;
+      var create = true;
+      var name = uri.getHost();
+      if (name == null) {
+        name = DEFAULT_PIPE_NAME;
+      }
+
+      var options = URISupport.parseParamters(uri);
+      var config = options.remove("broker").asInstanceOf[String]
+      if (config != null) {
+        brokerURI = config;
+      }
+      if ("false".equals(options.remove("create"))) {
+        create = false;
+      }
+
+
+      var server = servers.get(name);
+      if (server == null && create) {
+
+        // This is the connector that the broker needs.
+        val connector = Connector.default
+        connector.id = "vm"
+        connector.bind = "vm://" + name
+        connector.advertise = connector.bind
+
+        // Create the broker on demand.
+        var broker: Broker = null
+        if (brokerURI == null) {
+          // Lets create and configure it...
+          broker = new Broker()
+          broker.config = Broker.default
+          broker.config.connectors.clear
+          broker.config.connectors.add(connector)
+        } else {
+          // Use the user specified config
+          broker = BrokerFactory.createBroker(brokerURI);
+          // we need to add in the connector if it was not in the config...
+          if (broker.config.connectors.toList.filter(_.bind == connector.bind).isEmpty) {
+            broker.config.connectors.add(connector)
+          }
+        }
+
+        // TODO: get rid of this blocking wait.
+        val tracker = new LoggingTracker("vm broker startup")
+        tracker.start(broker)
+        tracker.await
+
+        server = servers.get(name)
+      }
+
+      if (server == null) {
+        throw new IOException("Server is not bound: " + name)
+      }
+
+      var transport = server.connect()
+      verify(configure(transport, options), options)
+
+    } catch {
+      //      case e:URISyntaxException=>
+      //  			throw IOExceptionSupport.create(e)
+      case e: Exception =>
+        throw IOExceptionSupport.create(e)
+    }
+  }
 
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/jaxb/XMLBrokerFactoryTest.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/jaxb/XMLBrokerFactoryTest.scala?rev=961103&r1=961102&r2=961103&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/jaxb/XMLBrokerFactoryTest.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/jaxb/XMLBrokerFactoryTest.scala Wed Jul  7 03:57:02 2010
@@ -35,18 +35,19 @@ class XMLBrokerFactoryTest extends FunSu
     //		assertEquals("test dispatcher", p.getName())
 
     expect(1) {
-      broker.transportServers.size()
+      broker.config.connectors.size()
     }
-    val expected = new ArrayList[String]()
-    expected.add("pipe://test1")
-    expected.add("tcp://127.0.0.1:61616")
 
-    expect(expected) {
-      broker.connectUris
+    expect("pipe://test1") {
+      broker.config.connectors.get(0).bind
+    }
+
+    expect("tcp://127.0.0.1:61616") {
+      broker.config.connectors.get(1).bind
     }
 
     expect(2) {
-      broker.virtualHosts.size()
+      broker.config.virtualHosts.size()
     }
 
     //		Assert.assertNotNull(broker.defaultVirtualHost().getDatabase())

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala?rev=961103&r1=961102&r2=961103&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala Wed Jul  7 03:57:02 2010
@@ -42,8 +42,6 @@ object BaseBrokerPerfSupport {
 
   // Set to use tcp IO
   protected var TCP = true;
-  // set to force marshalling even in the NON tcp case.
-  protected var FORCE_MARSHALLING = true;
 
   var USE_KAHA_DB = true;
   var PURGE_STORE = true;
@@ -99,21 +97,17 @@ abstract class BaseBrokerPerfSupport ext
   override protected def beforeAll(configMap: Map[String, Any]) = {
     super.beforeAll(configMap)
     if (TCP) {
-      sendBrokerBindURI = "tcp://localhost:10000?wireFormat=" + getBrokerWireFormat();
-      receiveBrokerBindURI = "tcp://localhost:20000?wireFormat=" + getBrokerWireFormat();
+      sendBrokerBindURI = "tcp://localhost:10000";
+      receiveBrokerBindURI = "tcp://localhost:20000";
 
       sendBrokerConnectURI = "tcp://localhost:10000?wireFormat=" + getRemoteWireFormat();
       receiveBrokerConnectURI = "tcp://localhost:20000?wireFormat=" + getRemoteWireFormat();
     } else {
       sendBrokerConnectURI = "pipe://SendBroker";
       receiveBrokerConnectURI = "pipe://ReceiveBroker";
-      if (FORCE_MARSHALLING) {
-        sendBrokerBindURI = sendBrokerConnectURI + "?wireFormat=" + getBrokerWireFormat();
-        receiveBrokerBindURI = receiveBrokerConnectURI + "?wireFormat=" + getBrokerWireFormat();
-      } else {
-        sendBrokerBindURI = sendBrokerConnectURI;
-        receiveBrokerBindURI = receiveBrokerConnectURI;
-      }
+      
+      sendBrokerBindURI = sendBrokerConnectURI;
+      receiveBrokerBindURI = receiveBrokerConnectURI;
     }
   }
 
@@ -418,18 +412,22 @@ abstract class BaseBrokerPerfSupport ext
       var consumer = createConsumer();
       consumer.brokerPerfTest = this
 
-      consumer.uri = rcvBroker.connectUris.head
+      consumer.uri = connectUri(rcvBroker)
       consumer.destination = destination
       consumer.name = "consumer" + (i + 1)
       consumer.totalConsumerRate = totalConsumerRate
       return consumer;
     }
 
+    def connectUri(broker:Broker) = {
+      broker.config.connectors.get(0).advertise
+    }
+
 
     def _createProducer(id: Int, destination: Destination): RemoteProducer = {
       var producer = createProducer();
       producer.brokerPerfTest = this
-      producer.uri = sendBroker.connectUris.head
+      producer.uri = connectUri(sendBroker)
       producer.producerId = id + 1
       producer.name = "producer" + (id + 1)
       producer.destination = destination
@@ -518,23 +516,27 @@ abstract class BaseBrokerPerfSupport ext
 
   def getBrokerWireFormat() = "multi"
   def getRemoteWireFormat(): String
+
   def createBroker(name: String, bindURI: String, connectUri: String): Broker = {
     val broker = new Broker()
-    broker.transportServers.add(TransportFactory.bind(bindURI))
-    broker.connectUris.add(connectUri)
+    broker.config = Broker.default
+    val connector = broker.config.connectors.get(0)
+    connector.bind = bindURI
+    connector.advertise = connectUri
+    connector.protocol = getBrokerWireFormat
     broker
   }
 
-  def createStore(broker: Broker): Store = {
-    val store = if (USE_KAHA_DB) {
-      StoreFactory.createStore("hawtdb");
-    } else {
-      StoreFactory.createStore("memory");
-    }
-    store.setStoreDirectory(new File("target/test-data/broker-test/" + broker.name));
-    store.setDeleteAllMessages(PURGE_STORE);
-    store
-  }
+//  def createStore(broker: Broker): Store = {
+//    val store = if (USE_KAHA_DB) {
+//      StoreFactory.createStore("hawtdb");
+//    } else {
+//      StoreFactory.createStore("memory");
+//    }
+//    store.setStoreDirectory(new File("target/test-data/broker-test/" + broker.id));
+//    store.setDeleteAllMessages(PURGE_STORE);
+//    store
+//  }
 
 }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerDTO.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerDTO.java?rev=961103&r1=961102&r2=961103&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerDTO.java Wed Jul  7 03:57:02 2010
@@ -45,6 +45,13 @@ public class BrokerDTO {
     public int rev;
 
     /**
+     * Should this broker be running?
+     */
+    @XmlAttribute(name="enabled")
+    public boolean enabled;
+
+
+    /**
      * Used to track who last modified the configuration.
      */
     @XmlAttribute(name="modified-by")
@@ -68,5 +75,11 @@ public class BrokerDTO {
     @XmlElement(name="connectors")
     public List<ConnectorDTO> connectors = new ArrayList<ConnectorDTO>();
 
+    /**
+     * The base data directory of the broker.  It will store
+     * persistent data under it. 
+     */
+    @XmlAttribute(name="basedir")
+    public String basedir;
 
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectorDTO.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectorDTO.java?rev=961103&r1=961102&r2=961103&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectorDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectorDTO.java Wed Jul  7 03:57:02 2010
@@ -35,10 +35,16 @@ public class ConnectorDTO {
 	public String id;
 
     /**
+     * Should this connector be running?
+     */
+    @XmlAttribute(name="enabled")
+    public boolean enabled;
+    
+    /**
      * The transport uri which it will accept connections on.
      */
     @XmlAttribute
-    public String transport;
+    public String bind;
 
     /**
      * The protocol that the transport will use.

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java?rev=961103&r1=961102&r2=961103&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java Wed Jul  7 03:57:02 2010
@@ -33,6 +33,12 @@ public class VirtualHostDTO {
 	@XmlAttribute(name="id")
 	public String id;
 
+    /**
+     * Should this virtual host be running?
+     */
+    @XmlAttribute(name="enabled")
+    public boolean enabled;
+
     @XmlElement(name="host-names", required=true)
     public ArrayList<String> hostNames = new ArrayList<String>();
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala?rev=961103&r1=961102&r2=961103&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala Wed Jul  7 03:57:02 2010
@@ -16,8 +16,8 @@
  */
 package org.apache.activemq.apollo.stomp
 
-import org.apache.activemq.apollo.broker.Broker
 import org.apache.activemq.transport.TransportFactory
+import org.apache.activemq.apollo.broker.{LoggingTracker, Broker}
 
 /**
  */
@@ -27,17 +27,21 @@ object StompBroker {
   var port = 61613
 
   def main(args:Array[String]) = {
-    val uri = "tcp://"+address+":"+port+"?wireFormat=multi"
+    val uri = "tcp://"+address+":"+port
 
     println("Starting stomp broker: "+uri)
 
     val broker = new Broker()
-
-    val server = TransportFactory.bind(uri)
-    broker.transportServers.add(server)
-    broker.start
-
+    val connector = broker.config.connectors.get(0)
+    connector.bind = uri
+    connector.protocol = "stomp"
+    connector.advertise = uri
+
+    val tracker = new LoggingTracker("broker startup")
+    tracker.start(broker)
+    tracker.await
     println("Startup complete.")
+
     System.in.read
     println("Shutting down...")
     broker.stop

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961103&r1=961102&r2=961103&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul  7 03:57:02 2010
@@ -105,7 +105,7 @@ class StompProtocolHandler extends Proto
         }
       }
     }
-    connection.broker.runtime.getDefaultVirtualHost(
+    connection.connector.broker.getDefaultVirtualHost(
       queue.wrap { (host)=>
         this.host=host
         connection.transport.resumeRead

Modified: activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java?rev=961103&r1=961102&r2=961103&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java Wed Jul  7 03:57:02 2010
@@ -52,7 +52,6 @@ public class TcpTransportFactory impleme
         URI uri = new URI(location);
         Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(uri));
         TcpTransportServer server = createTcpTransportServer(uri);
-        server.setWireFormatFactory(TransportFactorySupport.createWireFormatFactory(options));
         IntrospectionSupport.setProperties(server, options);
         Map<String, Object> transportOptions = IntrospectionSupport.extractProperties(options, "transport.");
         server.setTransportOption(transportOptions);

Modified: activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java?rev=961103&r1=961102&r2=961103&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java Wed Jul  7 03:57:02 2010
@@ -42,7 +42,7 @@ import java.util.Map;
 
 public class TcpTransportServer implements TransportServer {
 
-    protected WireFormatFactory wireFormatFactory;
+//    protected WireFormatFactory wireFormatFactory;
     private ServerSocketChannel channel;
     private TransportAcceptListener listener;
     private URI bindURI;
@@ -173,14 +173,6 @@ public class TcpTransportServer implemen
         acceptSource.release();
     }
 
-    public WireFormatFactory getWireFormatFactory() {
-        return wireFormatFactory;
-    }
-
-    public void setWireFormatFactory(WireFormatFactory wireFormatFactory) {
-        this.wireFormatFactory = wireFormatFactory;
-    }
-
     public URI getBindURI() {
         return bindURI;
     }
@@ -210,7 +202,6 @@ public class TcpTransportServer implemen
 //      options.put("startLogging", Boolean.valueOf(startLogging));
 
         Transport transport = createTransport(socket, options);
-        transport.setWireformat(wireFormatFactory.createWireFormat());
         listener.onAccept(transport);
     }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactorySupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactorySupport.java?rev=961103&r1=961102&r2=961103&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactorySupport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactorySupport.java Wed Jul  7 03:57:02 2010
@@ -86,6 +86,23 @@ public class  TransportFactorySupport{
             throw IOExceptionSupport.create("Could not create wire format factory for: " + wireFormat + ", reason: " + e, e);
         }
     }
+    static public WireFormatFactory createWireFormatFactory(String location) throws IOException, URISyntaxException {
+        URI uri = new URI(location);
+        Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(uri));
+
+        String wireFormat = uri.getPath();
+        if( "null".equals(wireFormat) ) {
+            return null;
+        }
+
+        try {
+            WireFormatFactory wff = (WireFormatFactory)WIREFORMAT_FACTORY_FINDER.newInstance(wireFormat);
+            IntrospectionSupport.setProperties(wff, options);
+            return wff;
+        } catch (Throwable e) {
+            throw IOExceptionSupport.create("Could not create wire format factory for: " + wireFormat + ", reason: " + e, e);
+        }
+    }
 
     static protected String getDefaultWireFormatType() {
         return "default";

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServer.java?rev=961103&r1=961102&r2=961103&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServer.java Wed Jul  7 03:57:02 2010
@@ -20,6 +20,7 @@ import java.net.InetSocketAddress;
 import java.net.URI;
 
 import org.apache.activemq.Service;
+import org.apache.activemq.wireformat.WireFormatFactory;
 import org.fusesource.hawtdispatch.DispatchQueue;
 
 /**

Modified: activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/BrokerRegistry.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/BrokerRegistry.scala?rev=961103&r1=961102&r2=961103&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/BrokerRegistry.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/BrokerRegistry.scala Wed Jul  7 03:57:02 2010
@@ -36,7 +36,7 @@ object BrokerRegistry {
   }
 
   def add(broker:Broker) = _brokers.synchronized {
-    _brokers.put(broker.name, broker)
+    _brokers.put(broker.config.id, broker)
   }
 
   def remove(id:String) = _brokers.synchronized {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/ConfigStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/ConfigStore.scala?rev=961103&r1=961102&r2=961103&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/ConfigStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/ConfigStore.scala Wed Jul  7 03:57:02 2010
@@ -38,13 +38,15 @@ import java.io.{OutputStreamWriter, File
  */
 trait ConfigStore extends Service {
 
-  def listBrokerModels(cb: (List[String]) => Unit):Unit
+  def listBrokers(cb: (List[String]) => Unit):Unit
 
-  def getBrokerModel(id:String, cb: (Option[BrokerDTO]) => Unit):Unit
+  def getBroker(id:String, cb: (Option[BrokerDTO]) => Unit):Unit
 
-  def putBrokerModel(config:BrokerDTO, cb: (Boolean) => Unit):Unit
+  def putBroker(config:BrokerDTO, cb: (Boolean) => Unit):Unit
 
-  def removeBrokerModel(id:String, rev:Int, cb: (Boolean) => Unit):Unit
+  def removeBroker(id:String, rev:Int, cb: (Boolean) => Unit):Unit
+
+  def forBroker(cb: (BrokerDTO)=> Unit):Unit
 
 }
 
@@ -144,11 +146,17 @@ class FileConfigStore extends ConfigStor
     ioWorker.shutdown
   }
 
-  def listBrokerModels(cb: (List[String]) => Unit) = callback(cb) {
+  def listBrokers(cb: (List[String]) => Unit) = reply(cb) {
     List(latest.id)
   } >>: dispatchQueue
 
-  def getBrokerModel(id:String, cb: (Option[BrokerDTO]) => Unit) = callback(cb) {
+
+  def forBroker(cb: (BrokerDTO)=> Unit) = using(cb) {
+
+  }
+
+
+  def getBroker(id:String, cb: (Option[BrokerDTO]) => Unit) = reply(cb) {
     if( latest.id == id ) {
       Some(unmarshall(latest.data))
     } else {
@@ -156,7 +164,7 @@ class FileConfigStore extends ConfigStor
     }
   } >>: dispatchQueue
 
-  def putBrokerModel(config:BrokerDTO, cb: (Boolean) => Unit) = callback(cb) {
+  def putBroker(config:BrokerDTO, cb: (Boolean) => Unit) = reply(cb) {
     debug("storing broker model: %s ver %d", config.id, config.rev)
     if( latest.id != config.id ) {
       debug("this store can only update broker: "+latest.id)
@@ -170,7 +178,7 @@ class FileConfigStore extends ConfigStor
     }
   } >>: dispatchQueue
 
-  def removeBrokerModel(id:String, rev:Int, cb: (Boolean) => Unit) = callback(cb) {
+  def removeBroker(id:String, rev:Int, cb: (Boolean) => Unit) = reply(cb) {
     // not supported.
     false
   } >>: dispatchQueue
@@ -187,7 +195,7 @@ class FileConfigStore extends ConfigStor
             // has a chance to update the runtime too.
             val c = unmarshall(config.data)
             c.rev = config.rev
-            putBrokerModel(c, null)
+            putBroker(c, null)
           }
           schedualNextUpdateCheck
         }
@@ -213,7 +221,7 @@ class FileConfigStore extends ConfigStor
     config.virtualHosts.add(host)
 
     var connector = new ConnectorDTO
-    connector.transport = "tcp://0.0.0.0:61613"
+    connector.bind = "tcp://0.0.0.0:61613"
     connector.advertise = "tcp://0.0.0.0:61613"
     connector.protocol = "multi"
     config.connectors.add( connector )

Modified: activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/ServletContextListener.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/ServletContextListener.scala?rev=961103&r1=961102&r2=961103&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/ServletContextListener.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/ServletContextListener.scala Wed Jul  7 03:57:02 2010
@@ -24,6 +24,7 @@ import org.apache.activemq.apollo.broker
 import org.apache.activemq.apollo.{FileConfigStore, ConfigStore, BrokerRegistry}
 import java.io.File
 import com.google.inject.{Inject, Provides, Guice, Singleton}
+import org.fusesource.hawtdispatch.ScalaDispatch._
 
 /**
  * A servlet context listener which registers
@@ -37,17 +38,28 @@ class ServletContextListener extends Gui
 
   override def contextInitialized(servletContextEvent: ServletContextEvent) = {
 
-
-    // todo: replace this with info accessed from a configuration store.
-    // register/start brokers we are managing.
     try {
       BrokerRegistry.configStore = createConfigStore
 
-      broker = createBroker();
-      BrokerRegistry.add(broker);
-      LoggingTracker("broker startup") { tracker=>
-        broker.start(tracker.task())
+      // Brokers startup async.
+      BrokerRegistry.configStore.listBrokers { ids =>
+        ids.foreach { id=>
+          BrokerRegistry.configStore.getBroker(id, { x=>
+            x match {
+              case Some(config)=>
+                // Only start the broker up if it's enabled..
+                if( config.enabled ) {
+                  val broker = new Broker()
+                  broker.config = config
+                  BrokerRegistry.add(broker)
+                  broker.start()
+                }
+              case None =>
+            }
+          })
+        }
       }
+
     }
     catch {
       case e:Exception =>
@@ -63,9 +75,9 @@ class ServletContextListener extends Gui
 
     // un-register/stop brokers we are managing.
     if( broker!=null ) {
-      BrokerRegistry.remove(broker.name);
+      BrokerRegistry.remove(broker.id);
       LoggingTracker("broker shutdown") { tracker =>
-        broker.stop(tracker.task(broker.name))
+        broker.stop(tracker.task(broker.id))
         BrokerRegistry.configStore.stop(tracker.task("config store"))
       }
     }
@@ -97,11 +109,5 @@ class ServletContextListener extends Gui
     store
   }
 
-  def createBroker(): Broker = {
-    val broker = new Broker()
-    broker.name = "default"
-    broker.transportServers.add(TransportFactory.bind("tcp://localhost:10000?wireFormat=multi"))
-    broker.connectUris.add("tcp://localhost:10000")
-    broker
-  }
+
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/Root.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/Root.scala?rev=961103&r1=961102&r2=961103&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/Root.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/Root.scala Wed Jul  7 03:57:02 2010
@@ -60,7 +60,7 @@ class Root() extends Resource {
   def get() = {
     val rc = new IdListDTO
     val ids = Future[List[String]] { cb=>
-      configStore.listBrokerModels(cb)
+      configStore.listBrokers(cb)
     }.toArray[String]
     rc.ids.addAll(Arrays.asList(ids: _*))
     rc
@@ -94,7 +94,7 @@ case class Broker(parent:Root, @BeanProp
 
   private def config() = {
     Future[Option[BrokerDTO]] { cb=>
-      configStore.getBrokerModel(id, cb)
+      configStore.getBroker(id, cb)
     }.getOrElse(result(NOT_FOUND))
   }
 
@@ -111,14 +111,14 @@ case class Broker(parent:Root, @BeanProp
     config.id = id;
     config.rev = rev
     Future[Boolean] { cb=>
-      configStore.putBrokerModel(config, cb)
+      configStore.putBroker(config, cb)
     } || result(NOT_FOUND)
   }
 
   @DELETE @Path("config/{rev}")
   def delete(@PathParam("rev") rev:Int) = {
     Future[Boolean] { cb=>
-      configStore.removeBrokerModel(id, rev, cb)
+      configStore.removeBroker(id, rev, cb)
     } || result(NOT_FOUND)
   }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-web/src/test/scala/org/apache/activemq/apollo/FileConfigStoreTest.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-web/src/test/scala/org/apache/activemq/apollo/FileConfigStoreTest.scala?rev=961103&r1=961102&r2=961103&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-web/src/test/scala/org/apache/activemq/apollo/FileConfigStoreTest.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-web/src/test/scala/org/apache/activemq/apollo/FileConfigStoreTest.scala Wed Jul  7 03:57:02 2010
@@ -41,7 +41,7 @@ class FileConfigStoreTest extends FunSui
 
     expect(List("default")) {
       Future[List[String]]{ x=>
-        store.listBrokerModels(x)
+        store.listBrokers(x)
       }
     }