You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:15:33 UTC
svn commit: r961187 - in /activemq/sandbox/activemq-apollo-actor:
activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/
activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/
activemq-dto/src/main/java/org/apache/acti...
Author: chirino
Date: Wed Jul 7 04:15:31 2010
New Revision: 961187
URL: http://svn.apache.org/viewvc?rev=961187&view=rev
Log:
starting to expose broker stats via the web module.
Added:
activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationStatusDTO.java
- copied, changed from r961186, activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/IdDTO.java
activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationSummaryDTO.java
- copied, changed from r961186, activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/IdDTO.java
activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/EntryStatusDTO.java
- copied, changed from r961186, activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerStatusDTO.java
activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/LongIdDTO.java
- copied, changed from r961186, activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ServiceDTO.java
activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java
- copied, changed from r961186, activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectorStatusDTO.java
activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/StringIdDTO.java
- copied, changed from r961186, activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/IdDTO.java
Removed:
activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/IdDTO.java
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala
activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerDTO.java
activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerStatusDTO.java
activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerSummaryDTO.java
activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectorDTO.java
activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectorStatusDTO.java
activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ServiceDTO.java
activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ServiceStatusDTO.java
activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java
activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostStatusDTO.java
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala
activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/StoreFactory.scala
activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreBenchmarkSupport.scala
activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala
activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala
activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/LongCounter.scala
activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/BrokerRegistry.scala
activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/ServletContextListener.scala
activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerStatus.scala
activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/Root.scala
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=961187&r1=961186&r2=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Wed Jul 7 04:15:31 2010
@@ -24,9 +24,11 @@ import org.fusesource.hawtdispatch.{Disp
import org.fusesource.hawtbuf._
import ReporterLevel._
import AsciiBuffer._
-import org.apache.activemq.apollo.dto.{BrokerDTO}
import collection.{JavaConversions, SortedMap}
-import JavaConversions._
+import JavaConversions._
+import org.apache.activemq.apollo.dto.{VirtualHostStatusDTO, ConnectorStatusDTO, BrokerStatusDTO, BrokerDTO}
+import java.util.concurrent.atomic.AtomicLong
+import org.apache.activemq.apollo.util.LongCounter
/**
* <p>
@@ -89,6 +91,8 @@ object BufferConversions {
object Broker extends Log {
+ val broker_id_counter = new AtomicLong()
+
val STICK_ON_THREAD_QUEUES = true
/**
@@ -156,9 +160,14 @@ class Broker() extends BaseService with
dispatchQueue.setTargetQueue(Dispatch.getRandomThreadQueue)
}
- def id = config.id
+ val id = broker_id_counter.incrementAndGet
+
+ val virtual_host_id_counter = new LongCounter
+ val connector_id_counter = new LongCounter
+ val connection_id_counter = new LongCounter
+
+ override def toString() = "broker: "+id
- override def toString() = "broker: "+id
/**
* Validates and then applies the configuration.
@@ -183,7 +192,7 @@ class Broker() extends BaseService with
dataDirectory = new File(config.basedir)
defaultVirtualHost = null
for (c <- config.virtualHosts) {
- val host = new VirtualHost(this)
+ val host = new VirtualHost(this, virtual_host_id_counter.incrementAndGet)
host.configure(c, this)
virtualHosts += ascii(c.id)-> host
// first defined host is the default virtual host
@@ -192,7 +201,7 @@ class Broker() extends BaseService with
}
}
for (c <- config.connectors) {
- val connector = new Connector(this)
+ val connector = new Connector(this, connector_id_counter.incrementAndGet)
connector.configure(c, this)
connectors ::= connector
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=961187&r1=961186&r2=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala Wed Jul 7 04:15:31 2010
@@ -33,8 +33,6 @@ import protocol.{ProtocolFactory, Protoc
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
object Connection extends Log {
- val id_generator = new AtomicLong()
- def next_id = "connection:"+id_generator.incrementAndGet
}
/**
@@ -45,14 +43,11 @@ abstract class Connection() extends Defa
override protected def log = Connection
import Connection._
- val id = next_id
- val dispatchQueue = createQueue(id)
+ val dispatchQueue = createQueue()
var stopped = true
var transport:Transport = null
var transportSink:TransportSink = null
- override def toString = id
-
override protected def _start(onCompleted:Runnable) = {
stopped = false
transportSink = new TransportSink(transport)
@@ -89,11 +84,13 @@ abstract class Connection() extends Defa
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class BrokerConnection(val connector: Connector) extends Connection {
+class BrokerConnection(val connector: Connector, val id:Long) extends Connection {
var protocol = "stomp"
var protocolHandler: ProtocolHandler = null;
+ override def toString = "id: "+id.toString
+
override protected def _start(onCompleted:Runnable) = {
connector.dispatchQueue.retain
protocolHandler = ProtocolFactory.get(protocol).createProtocolHandler
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala?rev=961187&r1=961186&r2=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala Wed Jul 7 04:15:31 2010
@@ -16,21 +16,14 @@
*/
package org.apache.activemq.apollo.broker
-import _root_.java.io.{File}
import _root_.org.apache.activemq.transport._
-import _root_.org.apache.activemq.Service
-import _root_.java.lang.{String}
-import _root_.org.apache.activemq.util.{FactoryFinder, IOHelper}
import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
-import _root_.scala.reflect.BeanProperty
-import org.fusesource.hawtdispatch.{Dispatch, DispatchQueue, BaseRetained}
-import java.util.{HashSet, LinkedList, LinkedHashMap, ArrayList}
-import org.fusesource.hawtbuf._
-import collection.JavaConversions
-import org.apache.activemq.apollo.dto.{ConnectorDTO, BrokerDTO}
-import JavaConversions._
+import org.fusesource.hawtdispatch.{Dispatch}
+import org.apache.activemq.apollo.dto.{ConnectorDTO}
import org.apache.activemq.wireformat.WireFormatFactory
import ReporterLevel._
+import org.apache.activemq.apollo.util.LongCounter
+import collection.mutable.HashMap
/**
* <p>
@@ -75,7 +68,7 @@ object Connector extends Log {
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class Connector(val broker:Broker) extends BaseService with DispatchLogging {
+class Connector(val broker:Broker, val id:Long) extends BaseService with DispatchLogging {
import Connector._
override protected def log = Connector
@@ -85,9 +78,9 @@ class Connector(val broker:Broker) exten
var transportServer:TransportServer = _
var wireFormatFactory:WireFormatFactory = _
- val connections: HashSet[Connection] = new HashSet[Connection]
-
+ val connections = HashMap[Long, BrokerConnection]()
override def toString = "connector: "+config.id
+ val accept_counter = new LongCounter
object BrokerAcceptListener extends TransportAcceptListener {
def onAcceptError(error: Exception): Unit = {
@@ -103,7 +96,8 @@ class Connector(val broker:Broker) exten
transport.setWireformat(wireFormatFactory.createWireFormat)
}
- var connection = new BrokerConnection(Connector.this)
+ accept_counter.incrementAndGet
+ var connection = new BrokerConnection(Connector.this, broker.connection_id_counter.incrementAndGet)
connection.transport = transport
if( STICK_ON_THREAD_QUEUES ) {
@@ -112,7 +106,7 @@ class Connector(val broker:Broker) exten
// We release when it gets removed form the connections list.
connection.dispatchQueue.retain
- connections.add(connection)
+ connections.put(connection.id, connection)
try {
connection.start()
@@ -155,7 +149,7 @@ class Connector(val broker:Broker) exten
override def _stop(onCompleted:Runnable): Unit = {
transportServer.stop(^{
val tracker = new LoggingTracker(toString, dispatchQueue)
- for (connection <- connections) {
+ connections.valuesIterator.foreach { connection=>
tracker.stop(connection)
}
tracker.callback(onCompleted)
@@ -166,8 +160,8 @@ class Connector(val broker:Broker) exten
* Connections callback into the connector when they are stopped so that we can
* stop tracking them.
*/
- def stopped(connection:Connection) = ^{
- if( connections.remove(connection) ) {
+ def stopped(connection:BrokerConnection) = ^{
+ if( connections.remove(connection.id).isDefined ) {
connection.dispatchQueue.release
}
} |>>: dispatchQueue
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961187&r1=961186&r2=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul 7 04:15:31 2010
@@ -60,7 +60,7 @@ object Queue extends Log {
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class Queue(val host: VirtualHost, val destination: Destination, val queueKey: Long = -1L) extends BaseRetained with Route with DeliveryConsumer with BaseService with DispatchLogging {
+class Queue(val host: VirtualHost, val destination: Destination, val id: Long) extends BaseRetained with Route with DeliveryConsumer with BaseService with DispatchLogging {
override protected def log = Queue
import Queue._
@@ -112,12 +112,12 @@ class Queue(val host: VirtualHost, val d
* Subscribers that consume slower than this rate per seconds will be considered
* slow.
*/
- var tune_slow_subscription_rate = 1000*1024
+ var tune_slow_subscription_rate = 500*1024
/**
* The number of milliseconds between slow consumer checks.
*/
- var tune_slow_check_interval = 200L
+ var tune_slow_check_interval = 500L
/**
* Should this queue persistently store it's entries?
@@ -158,8 +158,8 @@ class Queue(val host: VirtualHost, val d
var flushing_size = 0
var flushed_items = 0
- private var capacity = tune_producer_buffer
- var size = 0
+ var capacity = 0
+ var capacity_used = 0
protected def _start(onCompleted: Runnable) = {
@@ -179,7 +179,7 @@ class Queue(val host: VirtualHost, val d
}
if( tune_persistent ) {
- host.store.listQueueEntryRanges(queueKey, tune_flush_range_size) { ranges=>
+ host.store.listQueueEntryRanges(id, tune_flush_range_size) { ranges=>
dispatchQueue {
if( !ranges.isEmpty ) {
@@ -214,7 +214,7 @@ class Queue(val host: VirtualHost, val d
var refiller: Runnable = null
- def full = (size >= capacity) || !serviceState.isStarted
+ def full = (capacity_used >= capacity) || !serviceState.isStarted
def offer(delivery: Delivery): Boolean = {
if (full) {
@@ -265,7 +265,7 @@ class Queue(val host: VirtualHost, val d
var check_counter = 0
def display_stats: Unit = {
- info("contains: %d messages worth %,.2f MB of data, producers are %s, %d/%d buffer space used.", queue_items, (queue_size.toFloat / (1024 * 1024)), {if (messages.full) "being throttled" else "not being throttled"}, size, capacity)
+ info("contains: %d messages worth %,.2f MB of data, producers are %s, %d/%d buffer space used.", queue_items, (queue_size.toFloat / (1024 * 1024)), {if (messages.full) "being throttled" else "not being throttled"}, capacity_used, capacity)
info("total messages enqueued %d, dequeues %d ", enqueue_item_counter, dequeue_item_counter)
}
@@ -309,9 +309,9 @@ class Queue(val host: VirtualHost, val d
if( (check_counter%25)==0 ) {
display_stats
- if (!all_subscriptions.isEmpty) {
- display_active_entries
- }
+// if (!all_subscriptions.isEmpty) {
+// display_active_entries
+// }
}
// target tune_min_subscription_rate / sec
@@ -327,8 +327,8 @@ class Queue(val host: VirtualHost, val d
// Skip over new consumers...
if( sub.advanced_size != 0 ) {
- val cursor_delta = sub.advanced_size - sub.last_cursored_size
- sub.last_cursored_size = sub.advanced_size
+ val cursor_delta = sub.advanced_size - sub.last_advanced_size
+ sub.last_advanced_size = sub.advanced_size
// If the subscription is NOT slow if it's been tail parked or
// it's been parking and cursoring through the data at the tune_slow_subscription_rate
@@ -634,7 +634,6 @@ class QueueEntry(val queue:Queue, val se
def init(delivery:Delivery):QueueEntry = {
state = new Loaded(delivery, false)
- queue.size += size
this
}
@@ -689,7 +688,7 @@ class QueueEntry(val queue:Queue, val se
def toQueueEntryRecord = {
val qer = new QueueEntryRecord
- qer.queueKey = queue.queueKey
+ qer.queueKey = queue.id
qer.queueSeq = seq
qer.messageKey = state.messageKey
qer.size = state.size
@@ -714,6 +713,8 @@ class QueueEntry(val queue:Queue, val se
def as_flushed_range = state.as_flushed_range
def as_loaded = state.as_loaded
+ def label = state.label
+
def is_tail = this == queue.tail_entry
def is_head = this == queue.head_entry
@@ -763,6 +764,11 @@ class QueueEntry(val queue:Queue, val se
def count = 0
/**
+ * Retuns a string label used to describe this state.
+ */
+ def label:String
+
+ /**
* Gets the message key for the entry.
* @returns -1 if it is not known.
*/
@@ -868,6 +874,7 @@ class QueueEntry(val queue:Queue, val se
*/
class Head extends EntryState {
+ def label = "head"
override def toString = "head"
override def as_head = this
@@ -899,6 +906,7 @@ class QueueEntry(val queue:Queue, val se
*/
class Tail extends EntryState {
+ def label = "tail"
override def toString = "tail"
override def as_tail:Tail = this
@@ -919,6 +927,7 @@ class QueueEntry(val queue:Queue, val se
var acquired = false
var flushing = false
+ def label = "loaded"
override def toString = { "loaded:{ stored: "+stored+", flushing: "+flushing+", acquired: "+acquired+", size:"+size+"}" }
override def count = 1
@@ -989,7 +998,7 @@ class QueueEntry(val queue:Queue, val se
delivery.uow = null
if( flushing ) {
queue.flushing_size-=size
- queue.size -= size
+ queue.capacity_used -= size
state = new Flushed(delivery.storeKey, size)
if( can_combine_with_prev ) {
@@ -1010,7 +1019,7 @@ class QueueEntry(val queue:Queue, val se
flushing = false
queue.flushing_size-=size
}
- queue.size -= size
+ queue.capacity_used -= size
super.remove
}
@@ -1111,12 +1120,14 @@ class QueueEntry(val queue:Queue, val se
var loading = false
+
override def count = 1
override def as_flushed = this
override def is_flushed_or_flushing = true
+ def label = "flushed"
override def toString = { "flushed:{ loading: "+loading+", size:"+size+"}" }
override def load() = {
@@ -1157,7 +1168,7 @@ class QueueEntry(val queue:Queue, val se
delivery.size = messageRecord.size
delivery.storeKey = messageRecord.key
- queue.size += size
+ queue.capacity_used += size
queue.flushed_items -= 1
state = new Loaded(delivery, true)
} else {
@@ -1212,12 +1223,13 @@ class QueueEntry(val queue:Queue, val se
override def is_flushed_or_flushing = true
- override def toString = { "flushed_group:{ loading: "+loading+", count: "+count+", size: "+size+"}" }
+ def label = "flushed_range"
+ override def toString = { "flushed_range:{ loading: "+loading+", count: "+count+", size: "+size+"}" }
override def load() = {
if( !loading ) {
loading = true
- queue.host.store.listQueueEntries(queue.queueKey, seq, last) { records =>
+ queue.host.store.listQueueEntries(queue.id, seq, last) { records =>
queue.dispatchQueue {
var item_count=0
@@ -1307,7 +1319,7 @@ class Subscription(queue:Queue) extends
var advanced_size = 0L
// Vars used to detect slow consumers.
- var last_cursored_size = 0L
+ var last_advanced_size = 0L
var tail_parkings = 0
var slow_intervals = 0
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=961187&r1=961186&r2=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Wed Jul 7 04:15:31 2010
@@ -21,9 +21,10 @@ import _root_.org.fusesource.hawtbuf._
import _root_.org.fusesource.hawtdispatch._
import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
-import java.util.HashMap
import path.PathMap
import collection.JavaConversions
+import org.apache.activemq.apollo.util.LongCounter
+import collection.mutable.HashMap
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -62,7 +63,6 @@ class Domain {
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
object Router extends Log {
-
}
/**
@@ -80,10 +80,14 @@ object Router extends Log {
*/
class Router(val host:VirtualHost) extends DispatchLogging {
+ val destination_id_counter = new LongCounter
+
override protected def log = Router
protected def dispatchQueue:DispatchQueue = host.dispatchQueue
trait DestinationNode {
+ val destination:Destination
+ val id = destination_id_counter.incrementAndGet
var targets = List[DeliveryConsumer]()
var routes = List[DeliveryProducerRoute]()
@@ -97,7 +101,7 @@ class Router(val host:VirtualHost) exten
}
}
- class TopicDestinationNode extends DestinationNode {
+ class TopicDestinationNode(val destination:Destination) extends DestinationNode {
def on_bind(x:List[DeliveryConsumer]) = {
targets = x ::: targets
routes.foreach({r=>
@@ -119,7 +123,7 @@ class Router(val host:VirtualHost) exten
}
}
- class QueueDestinationNode(destination:Destination) extends DestinationNode {
+ class QueueDestinationNode(val destination:Destination) extends DestinationNode {
var queue:Queue = null
// once the queue is created.. connect it up with the producers and targets.
@@ -159,16 +163,13 @@ class Router(val host:VirtualHost) exten
var destinations = new HashMap[Destination, DestinationNode]()
private def get(destination:Destination):DestinationNode = {
- var result = destinations.get(destination)
- if( result ==null ) {
+ destinations.getOrElseUpdate(destination,
if( isTopic(destination) ) {
- result = new TopicDestinationNode
+ new TopicDestinationNode(destination)
} else {
- result = new QueueDestinationNode(destination)
+ new QueueDestinationNode(destination)
}
- destinations.put(destination, result)
- }
- result
+ )
}
def bind(destination:Destination, targets:List[DeliveryConsumer]) = retaining(targets) {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=961187&r1=961186&r2=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jul 7 04:15:31 2010
@@ -33,6 +33,7 @@ import org.apache.activemq.apollo.store.
import org.apache.activemq.apollo.dto.{HawtDBStoreDTO, CassandraStoreDTO, VirtualHostDTO}
import java.io.File
import java.util.concurrent.TimeUnit
+import org.apache.activemq.apollo.util.LongCounter
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -84,15 +85,15 @@ object VirtualHost extends Log {
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class VirtualHost(val broker: Broker) extends BaseService with DispatchLogging with LoggingReporter {
+class VirtualHost(val broker: Broker, val id:Long) extends BaseService with DispatchLogging with LoggingReporter {
import VirtualHost._
override protected def log = VirtualHost
override val dispatchQueue:DispatchQueue = ScalaDispatch.createQueue("virtual-host");
var config:VirtualHostDTO = _
- private val queues = new HashMap[AsciiBuffer, Queue]()
- private val durableSubs = new HashMap[String, DurableSubscription]()
+ val queues = new HashMap[AsciiBuffer, Queue]()
+ val durableSubs = new HashMap[String, DurableSubscription]()
val router = new Router(this)
var names:List[String] = Nil;
@@ -102,8 +103,8 @@ class VirtualHost(val broker: Broker) ex
var store:Store = null
var transactionManager:TransactionManagerX = new TransactionManagerX
-
var protocols = Map[AsciiBuffer, WireFormat]()
+ val queue_id_counter = new LongCounter
override def toString = if (config==null) "virtual-host" else "virtual-host: "+config.id
@@ -128,15 +129,27 @@ class VirtualHost(val broker: Broker) ex
store = StoreFactory.create(config.store)
if( store!=null ) {
store.configure(config.store, this)
- val task = tracker.task("store startup")
+ val storeStartupDone = tracker.task("store startup")
store.start {
+
+ val getKeyDone = tracker.task("store get last queue key")
+ store.getLastQueueKey{ key=>
+ key match {
+ case Some(x)=>
+ queue_id_counter.set(key.get)
+ case None =>
+ warn("Could not get last queue key")
+ }
+ getKeyDone.run
+ }
+
if( config.purgeOnStartup ) {
- task.name = "store purge"
+ storeStartupDone.name = "store purge"
store.purge {
- task.run
+ storeStartupDone.run
}
} else {
- task.name = "store recover queues"
+ storeStartupDone.name = "store recover queues"
store.listQueues { queueKeys =>
for( queueKey <- queueKeys) {
val task = tracker.task("store load queue key: "+queueKey)
@@ -160,7 +173,7 @@ class VirtualHost(val broker: Broker) ex
}
}
}
- task.run
+ storeStartupDone.run
}
}
}
@@ -263,21 +276,23 @@ class VirtualHost(val broker: Broker) ex
if( store!=null ) {
val record = new QueueRecord
record.name = name
+ record.key = queue_id_counter.incrementAndGet
+
store.addQueue(record) { rc =>
rc match {
- case Some(queueKey) =>
+ case true =>
dispatchQueue {
- val queue = new Queue(this, dest, queueKey)
+ val queue = new Queue(this, dest, record.key)
queue.start()
queues.put(dest.getName, queue)
cb(queue)
}
- case None => // store could not create
+ case false => // store could not create
cb(null)
}
}
} else {
- val queue = new Queue(this, dest)
+ val queue = new Queue(this, dest, queue_id_counter.incrementAndGet)
queue.start()
queues.put(dest.getName, queue)
cb(queue)
Modified: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala?rev=961187&r1=961186&r2=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala Wed Jul 7 04:15:31 2010
@@ -71,7 +71,6 @@ class CassandraStore extends Store with
/////////////////////////////////////////////////////////////////////
val dispatchQueue = createQueue("cassandra store")
- var next_queue_key = new AtomicLong(1)
var next_msg_key = new AtomicLong(1)
val client = new CassandraClient()
@@ -80,6 +79,9 @@ class CassandraStore extends Store with
def configure(config: StoreDTO, reporter: Reporter) = configure(config.asInstanceOf[CassandraStoreDTO], reporter)
+
+ def storeType = "cassandra"
+
def configure(config: CassandraStoreDTO, reporter: Reporter) = {
if ( CassandraStore.validate(config, reporter) < ERROR ) {
if( serviceState.isStarted ) {
@@ -155,18 +157,23 @@ class CassandraStore extends Store with
def purge(callback: =>Unit) = {
blocking {
client.purge
- next_queue_key.set(1)
next_msg_key.set(1)
callback
}
}
- def addQueue(record: QueueRecord)(callback: (Option[Long]) => Unit) = {
- val key = next_queue_key.getAndIncrement
- record.key = key
+ /**
+ * Ges the next queue key identifier.
+ */
+ def getLastQueueKey(callback:(Option[Long])=>Unit):Unit = {
+ // TODO:
+ callback( Some(1L) )
+ }
+
+ def addQueue(record: QueueRecord)(callback: (Boolean) => Unit) = {
blocking {
client.addQueue(record)
- callback(Some(key))
+ callback(true)
}
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerDTO.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerDTO.java?rev=961187&r1=961186&r2=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerDTO.java Wed Jul 7 04:15:31 2010
@@ -30,7 +30,7 @@ import javax.xml.bind.annotation.XmlRoot
*/
@XmlRootElement(name="broker")
@XmlAccessorType(XmlAccessType.FIELD)
-public class BrokerDTO extends ServiceDTO {
+public class BrokerDTO extends ServiceDTO<String> {
/**
* Used to track config revisions.
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerStatusDTO.java?rev=961187&r1=961186&r2=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerStatusDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerStatusDTO.java Wed Jul 7 04:15:31 2010
@@ -16,10 +16,9 @@
*/
package org.apache.activemq.apollo.dto;
-import javax.xml.bind.annotation.XmlAccessType;
-import javax.xml.bind.annotation.XmlAccessorType;
-import javax.xml.bind.annotation.XmlAttribute;
-import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.*;
+import java.util.ArrayList;
+import java.util.List;
/**
* <p>
@@ -32,17 +31,27 @@ import javax.xml.bind.annotation.XmlRoot
public class BrokerStatusDTO extends ServiceStatusDTO {
/**
- * The current time on the broker machine.
+ * The current time on the broker machine. In milliseconds since the epoch.
*/
@XmlAttribute(name="current-time")
- public String currentTime;
+ public long currentTime;
/**
- * Since when has the broker in in this state?
+ * Ids of all the virtual hosts running on the broker
*/
- @XmlAttribute(name="in-state-since")
- public String inStateSince;
+ @XmlElement(name="virtual-host")
+ public List<Long> virtualHosts = new ArrayList<Long>();
+ /**
+ * Ids of all the connections running on the broker
+ */
+ @XmlElement(name="connectors")
+ public List<Long> connectors = new ArrayList<Long>();
+ /**
+ * The current running configuration of the object
+ */
+ @XmlElement(name="config")
+ public BrokerDTO config = null;
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerSummaryDTO.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerSummaryDTO.java?rev=961187&r1=961186&r2=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerSummaryDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerSummaryDTO.java Wed Jul 7 04:15:31 2010
@@ -29,7 +29,7 @@ import javax.xml.bind.annotation.XmlRoot
*/
@XmlRootElement(name="broker-summary")
@XmlAccessorType(XmlAccessType.FIELD)
-public class BrokerSummaryDTO extends IdDTO {
+public class BrokerSummaryDTO extends StringIdDTO {
/**
* The latest revision of the broker config.
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectorDTO.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectorDTO.java?rev=961187&r1=961186&r2=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectorDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectorDTO.java Wed Jul 7 04:15:31 2010
@@ -26,7 +26,7 @@ import javax.xml.bind.annotation.*;
*/
@XmlRootElement(name = "connector")
@XmlAccessorType(XmlAccessType.FIELD)
-public class ConnectorDTO extends ServiceDTO {
+public class ConnectorDTO extends ServiceDTO<String> {
/**
* The transport uri which it will accept connections on.
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectorStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectorStatusDTO.java?rev=961187&r1=961186&r2=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectorStatusDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectorStatusDTO.java Wed Jul 7 04:15:31 2010
@@ -16,10 +16,9 @@
*/
package org.apache.activemq.apollo.dto;
-import javax.xml.bind.annotation.XmlAccessType;
-import javax.xml.bind.annotation.XmlAccessorType;
-import javax.xml.bind.annotation.XmlAttribute;
-import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.*;
+import java.util.ArrayList;
+import java.util.List;
/**
* <p>
@@ -29,25 +28,24 @@ import javax.xml.bind.annotation.XmlRoot
*/
@XmlRootElement(name="connector-status")
@XmlAccessorType(XmlAccessType.FIELD)
-public class ConnectorStatusDTO {
+public class ConnectorStatusDTO extends ServiceStatusDTO {
/**
- * A unique id of the connector.
+ * The number of connections that this connector has accepted.
*/
- @XmlAttribute(name="id")
- public String id;
+ @XmlAttribute(name="accepted")
+ public Long accepted;
/**
- * The state of the object.
+ * Ids of all open connections that the connector is managing.
*/
- @XmlAttribute(name="state")
- public String state;
+ @XmlElement(name="connection")
+ public List<Long> connections = new ArrayList<Long>();
/**
- * The number of connections that this connector has accepted.
+ * The current running configuration of the object
*/
- @XmlAttribute(name="accepted")
- public Long accepted;
+ @XmlElement(name="config")
+ public ConnectorDTO config = null;
-
}
Copied: activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationStatusDTO.java (from r961186, activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/IdDTO.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationStatusDTO.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationStatusDTO.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/IdDTO.java&r1=961186&r2=961187&rev=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/IdDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationStatusDTO.java Wed Jul 7 04:15:31 2010
@@ -17,21 +17,26 @@
package org.apache.activemq.apollo.dto;
import javax.xml.bind.annotation.*;
-import java.util.ArrayList;
-import java.util.List;
/**
+ * <p>
+ * </p>
+ *
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-@XmlRootElement(name="id")
+@XmlRootElement(name="destination-summary")
@XmlAccessorType(XmlAccessType.FIELD)
-public class IdDTO {
+public class DestinationStatusDTO extends IdDTO<Long> {
/**
- * A unique id of the object within it's container
+ * The destination name
*/
- @XmlAttribute(name="id")
- public String id;
-
+ @XmlElement(name="name")
+ public String name;
+ /**
+ * The routing domain
+ */
+ @XmlElement(name="domain")
+ public String domain;
}
\ No newline at end of file
Copied: activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationSummaryDTO.java (from r961186, activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/IdDTO.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationSummaryDTO.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationSummaryDTO.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/IdDTO.java&r1=961186&r2=961187&rev=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/IdDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationSummaryDTO.java Wed Jul 7 04:15:31 2010
@@ -21,17 +21,25 @@ import java.util.ArrayList;
import java.util.List;
/**
+ * <p>
+ * </p>
+ *
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-@XmlRootElement(name="id")
+@XmlRootElement(name="virtual-host-status")
@XmlAccessorType(XmlAccessType.FIELD)
-public class IdDTO {
+public class DestinationSummaryDTO extends ServiceStatusDTO {
/**
- * A unique id of the object within it's container
+ * The type of store the virtual host is using.
*/
- @XmlAttribute(name="id")
- public String id;
+ @XmlAttribute(name="store-type")
+ public String storeType;
+ /**
+ * Ids of all the destinations running on the broker
+ */
+ @XmlElement(name="destination")
+ public List<Long> destinations = new ArrayList<Long>();
}
\ No newline at end of file
Copied: activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/EntryStatusDTO.java (from r961186, activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerStatusDTO.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/EntryStatusDTO.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/EntryStatusDTO.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerStatusDTO.java&r1=961186&r2=961187&rev=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerStatusDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/EntryStatusDTO.java Wed Jul 7 04:15:31 2010
@@ -27,22 +27,42 @@ import javax.xml.bind.annotation.XmlRoot
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-@XmlRootElement(name="broker-status")
+@XmlRootElement(name="destination-status")
@XmlAccessorType(XmlAccessType.FIELD)
-public class BrokerStatusDTO extends ServiceStatusDTO {
+public class EntryStatusDTO extends IdDTO<Long> {
/**
- * The current time on the broker machine.
+ * A unique id of the object within it's container
*/
- @XmlAttribute(name="current-time")
- public String currentTime;
+ @XmlAttribute(name="enqueue-item-counter")
+ public long id;
- /**
- * Since when has the broker in in this state?
- */
- @XmlAttribute(name="in-state-since")
- public String inStateSince;
+ @XmlAttribute(name="enqueue-item-counter")
+ public long enqueueItemCounter;
+ @XmlAttribute(name="dequeue-item-counter")
+ public long dequeueItemCounter;
+ @XmlAttribute(name="enqueue-size-counter")
+ public long enqueueSizeCounter;
+ @XmlAttribute(name="dequeue-size-counter")
+ public long dequeueSizeCounter;
+ @XmlAttribute(name="nack-item-counter")
+ public long nackItemCounter;
+ @XmlAttribute(name="nack-size-counter")
+ public long nackSizeCounter;
+
+ @XmlAttribute(name="queue-size")
+ public long queueSize;
+ @XmlAttribute(name="queue-items")
+ public long queueItems;
+ @XmlAttribute(name="loading-size")
+ public int loadingSize;
+ @XmlAttribute(name="flushing-size")
+ public int flushingSize;
+ @XmlAttribute(name="flushed-items")
+ public int flushedItems;
+ @XmlAttribute(name="capacity")
+ public int capacity;
-}
+}
\ No newline at end of file
Copied: activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/LongIdDTO.java (from r961186, activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ServiceDTO.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/LongIdDTO.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/LongIdDTO.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ServiceDTO.java&r1=961186&r2=961187&rev=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ServiceDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/LongIdDTO.java Wed Jul 7 04:15:31 2010
@@ -26,13 +26,13 @@ import javax.xml.bind.annotation.XmlRoot
*/
@XmlRootElement(name="id")
@XmlAccessorType(XmlAccessType.FIELD)
-public class ServiceDTO extends IdDTO {
+public class LongIdDTO {
/**
- * Should this service be running?
+ * A unique id of the object within it's container
*/
- @XmlAttribute(name="enabled")
- public boolean enabled = true;
+ @XmlAttribute(name="id")
+ public long id;
}
\ No newline at end of file
Copied: activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java (from r961186, activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectorStatusDTO.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectorStatusDTO.java&r1=961186&r2=961187&rev=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectorStatusDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java Wed Jul 7 04:15:31 2010
@@ -18,8 +18,10 @@ package org.apache.activemq.apollo.dto;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
-import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
+import java.util.ArrayList;
+import java.util.List;
/**
* <p>
@@ -27,27 +29,25 @@ import javax.xml.bind.annotation.XmlRoot
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-@XmlRootElement(name="connector-status")
+@XmlRootElement(name="destination-status")
@XmlAccessorType(XmlAccessType.FIELD)
-public class ConnectorStatusDTO {
+public class QueueStatusDTO extends DestinationSummaryDTO {
/**
- * A unique id of the connector.
+ * Ids of all connections that are producing to the destination
*/
- @XmlAttribute(name="id")
- public String id;
+ @XmlElement(name="producer")
+ public List<Long> producers = new ArrayList<Long>();
/**
- * The state of the object.
+ * Ids of all connections that are consuming from the destination
*/
- @XmlAttribute(name="state")
- public String state;
+ @XmlElement(name="consumer")
+ public List<Long> consumers = new ArrayList<Long>();
/**
- * The number of connections that this connector has accepted.
+ * Ids of all queues that are associated with the destination
*/
- @XmlAttribute(name="accepted")
- public Long accepted;
-
-
-}
+ @XmlElement(name="queue")
+ public List<Long> queues = new ArrayList<Long>();
+}
\ No newline at end of file
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ServiceDTO.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ServiceDTO.java?rev=961187&r1=961186&r2=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ServiceDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ServiceDTO.java Wed Jul 7 04:15:31 2010
@@ -26,7 +26,7 @@ import javax.xml.bind.annotation.XmlRoot
*/
@XmlRootElement(name="id")
@XmlAccessorType(XmlAccessType.FIELD)
-public class ServiceDTO extends IdDTO {
+public class ServiceDTO<ID> extends StringIdDTO {
/**
* Should this service be running?
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ServiceStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ServiceStatusDTO.java?rev=961187&r1=961186&r2=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ServiceStatusDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ServiceStatusDTO.java Wed Jul 7 04:15:31 2010
@@ -26,7 +26,7 @@ import javax.xml.bind.annotation.XmlRoot
*/
@XmlRootElement(name="service-status")
@XmlAccessorType(XmlAccessType.FIELD)
-public class ServiceStatusDTO extends IdDTO {
+public class ServiceStatusDTO extends LongIdDTO {
/**
* The state of the service.
@@ -34,5 +34,10 @@ public class ServiceStatusDTO extends Id
@XmlAttribute(name="state")
public String state;
+ /**
+ * Since when has the broker in in this state? In milliseconds since the epoch.
+ */
+ @XmlAttribute(name="state-since")
+ public long stateSince;
}
\ No newline at end of file
Copied: activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/StringIdDTO.java (from r961186, activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/IdDTO.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/StringIdDTO.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/StringIdDTO.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/IdDTO.java&r1=961186&r2=961187&rev=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/IdDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/StringIdDTO.java Wed Jul 7 04:15:31 2010
@@ -25,7 +25,7 @@ import java.util.List;
*/
@XmlRootElement(name="id")
@XmlAccessorType(XmlAccessType.FIELD)
-public class IdDTO {
+public class StringIdDTO {
/**
* A unique id of the object within it's container
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java?rev=961187&r1=961186&r2=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java Wed Jul 7 04:15:31 2010
@@ -25,7 +25,7 @@ import javax.xml.bind.annotation.*;
*/
@XmlRootElement(name = "virtual-host")
@XmlAccessorType(XmlAccessType.FIELD)
-public class VirtualHostDTO extends ServiceDTO {
+public class VirtualHostDTO extends ServiceDTO<String> {
@XmlElement(name="host-name", required=true)
public ArrayList<String> hostNames = new ArrayList<String>();
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostStatusDTO.java?rev=961187&r1=961186&r2=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostStatusDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostStatusDTO.java Wed Jul 7 04:15:31 2010
@@ -16,10 +16,9 @@
*/
package org.apache.activemq.apollo.dto;
-import javax.xml.bind.annotation.XmlAccessType;
-import javax.xml.bind.annotation.XmlAccessorType;
-import javax.xml.bind.annotation.XmlAttribute;
-import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.*;
+import java.util.ArrayList;
+import java.util.List;
/**
* <p>
@@ -30,5 +29,24 @@ import javax.xml.bind.annotation.XmlRoot
@XmlRootElement(name="virtual-host-status")
@XmlAccessorType(XmlAccessType.FIELD)
public class VirtualHostStatusDTO extends ServiceStatusDTO {
-
+
+ /**
+ * The type of store the virtual host is using.
+ */
+ @XmlAttribute(name="store-type")
+ public String storeType;
+
+ /**
+ * Ids of all the destinations running on the broker
+ */
+ @XmlElement(name="destination")
+ public List<DestinationSummaryDTO> destinations = new ArrayList<DestinationSummaryDTO>();
+
+
+ /**
+ * The current running configuration of the object
+ */
+ @XmlElement(name="config")
+ public VirtualHostDTO config = null;
+
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala?rev=961187&r1=961186&r2=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala Wed Jul 7 04:15:31 2010
@@ -79,6 +79,8 @@ class HawtDBStore extends Store with Bas
def configure(config: StoreDTO, reporter: Reporter) = configure(config.asInstanceOf[HawtDBStoreDTO], reporter)
+ def storeType = "hawtdb"
+
def configure(config: HawtDBStoreDTO, reporter: Reporter) = {
if ( HawtDBStore.validate(config, reporter) < ERROR ) {
if( serviceState.isStarted ) {
@@ -139,10 +141,15 @@ class HawtDBStore extends Store with Bas
}
- def addQueue(record: QueueRecord)(callback: (Option[Long]) => Unit) = {
- val key = next_queue_key.getAndIncrement
- record.key = key
- client.addQueue(record, ^{ callback(Some(key)) })
+ /**
+ * Ges the last queue key identifier stored.
+ */
+ def getLastQueueKey(callback:(Option[Long])=>Unit):Unit = {
+ callback(Some(client.rootBuffer.getLastQueueKey.longValue))
+ }
+
+ def addQueue(record: QueueRecord)(callback: (Boolean) => Unit) = {
+ client.addQueue(record, ^{ callback(true) })
}
def removeQueue(queueKey: Long)(callback: (Boolean) => Unit) = {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala?rev=961187&r1=961186&r2=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala Wed Jul 7 04:15:31 2010
@@ -31,6 +31,8 @@ import org.apache.activemq.apollo.Servic
*/
trait Store extends ServiceTrait {
+ def storeType:String
+
/**
* Creates a store uow which is used to perform persistent
* operations as unit of work.
@@ -50,12 +52,17 @@ trait Store extends ServiceTrait {
def purge(callback: =>Unit):Unit
/**
+ * Ges the last queue key identifier stored.
+ */
+ def getLastQueueKey(callback:(Option[Long])=>Unit):Unit
+
+ /**
* Adds a queue.
*
* This method auto generates and assigns the key field of the queue record and
- * returns it via the callback.
+ * returns true if it succeeded.
*/
- def addQueue(record:QueueRecord)(callback:(Option[Long])=>Unit):Unit
+ def addQueue(record:QueueRecord)(callback:(Boolean)=>Unit):Unit
/**
* Removes a queue. Success is reported via the callback.
Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/StoreFactory.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/StoreFactory.scala?rev=961187&r1=961186&r2=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/StoreFactory.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/StoreFactory.scala Wed Jul 7 04:15:31 2010
@@ -66,7 +66,7 @@ object StoreFactory {
return rc
}
}
- throw new IllegalArgumentException("Uknonwn store configuration type: "+config.getClass)
+ throw new IllegalArgumentException("Uknonwn store type: "+config.getClass)
}
@@ -81,7 +81,7 @@ object StoreFactory {
}
}
}
- reporter.report(ERROR, "Uknonwn store configuration type: "+config.getClass)
+ reporter.report(ERROR, "Uknonwn store type: "+config.getClass)
ERROR
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreBenchmarkSupport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreBenchmarkSupport.scala?rev=961187&r1=961186&r2=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreBenchmarkSupport.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreBenchmarkSupport.scala Wed Jul 7 04:15:31 2010
@@ -25,6 +25,7 @@ import org.apache.activemq.apollo.store.
import org.scalatest.{BeforeAndAfterEach, BeforeAndAfterAll}
import collection.mutable.ListBuffer
import java.util.concurrent.atomic.{AtomicLong, AtomicInteger, AtomicBoolean}
+import org.apache.activemq.apollo.util.LongCounter
/**
* <p>Implements generic testing of Store implementations.</p>
@@ -83,12 +84,15 @@ abstract class StoreBenchmarkSupport ext
}
}
+ val queue_key_counter = new LongCounter
+
def addQueue(name:String):Long = {
var queueA = new QueueRecord
+ queueA.key = queue_key_counter.incrementAndGet
queueA.name = ascii(name)
- val rc:Option[Long] = CB( cb=> store.addQueue(queueA)(cb) )
- expect(true)(rc.isDefined)
- rc.get
+ val rc:Boolean = CB( cb=> store.addQueue(queueA)(cb) )
+ expect(true)(rc)
+ queueA.key
}
def addMessage(batch:StoreUOW, content:String):Long = {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala?rev=961187&r1=961186&r2=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala Wed Jul 7 04:15:31 2010
@@ -24,6 +24,7 @@ import java.util.concurrent.{TimeUnit, C
import org.apache.activemq.apollo.store.{QueueEntryRecord, QueueStatus, QueueRecord, MessageRecord}
import org.scalatest.{BeforeAndAfterEach, BeforeAndAfterAll}
import collection.mutable.ListBuffer
+import org.apache.activemq.apollo.util.LongCounter
/**
* <p>Implements generic testing of Store implementations.</p>
@@ -82,12 +83,15 @@ abstract class StoreFunSuiteSupport exte
}
}
+ val queue_key_counter = new LongCounter
+
def addQueue(name:String):Long = {
var queueA = new QueueRecord
+ queueA.key = queue_key_counter.incrementAndGet
queueA.name = ascii(name)
- val rc:Option[Long] = CB( cb=> store.addQueue(queueA)(cb) )
- expect(true)(rc.isDefined)
- rc.get
+ val rc:Boolean = CB( cb=> store.addQueue(queueA)(cb) )
+ expect(true)(rc)
+ queueA.key
}
def addMessage(batch:StoreUOW, content:String):Long = {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala?rev=961187&r1=961186&r2=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala Wed Jul 7 04:15:31 2010
@@ -34,6 +34,9 @@ trait BaseService extends Service with L
override protected def log:Log = BaseService
sealed class State {
+
+ val since = System.currentTimeMillis
+
override def toString = getClass.getSimpleName
def isCreated = false
def isStarting = false
@@ -49,12 +52,12 @@ trait BaseService extends Service with L
def done = { callbacks.foreach(_.run); callbacks=Nil }
}
- protected object CREATED extends State { override def isCreated = true }
- protected class STARTING extends State with CallbackSupport { override def isStarting = true }
- protected object FAILED extends State { override def isFailed = true }
- protected object STARTED extends State { override def isStarted = true }
- protected class STOPPING extends State with CallbackSupport { override def isStopping = true }
- protected object STOPPED extends State { override def isStopped = true }
+ protected class CREATED extends State { override def isCreated = true }
+ protected class STARTING extends State with CallbackSupport { override def isStarting = true }
+ protected class FAILED extends State { override def isFailed = true }
+ protected class STARTED extends State { override def isStarted = true }
+ protected class STOPPING extends State with CallbackSupport { override def isStopping = true }
+ protected class STOPPED extends State { override def isStopped = true }
protected val dispatchQueue:DispatchQueue
@@ -62,7 +65,8 @@ trait BaseService extends Service with L
final def stop() = stop(null)
@volatile
- protected var _serviceState:State = CREATED
+ protected var _serviceState:State = new CREATED
+
def serviceState = _serviceState
@volatile
@@ -76,7 +80,7 @@ trait BaseService extends Service with L
_serviceState = state
try {
_start(^ {
- _serviceState = STARTED
+ _serviceState = new STARTED
state.done
})
}
@@ -84,7 +88,7 @@ trait BaseService extends Service with L
case e:Exception =>
error(e, "Start failed due to %s", e)
_serviceFailure = e
- _serviceState = FAILED
+ _serviceState = new FAILED
state.done
}
}
@@ -94,13 +98,13 @@ trait BaseService extends Service with L
}
}
_serviceState match {
- case CREATED =>
+ case state:CREATED =>
do_start
- case STOPPED =>
+ case state:STOPPED =>
do_start
case state:STARTING =>
state << onCompleted
- case STARTED =>
+ case state:STARTED =>
done
case state =>
done
@@ -116,13 +120,13 @@ trait BaseService extends Service with L
}
}
_serviceState match {
- case STARTED =>
+ case state:STARTED =>
val state = new STOPPING
state << onCompleted
_serviceState = state
try {
_stop(^ {
- _serviceState = STOPPED
+ _serviceState = new STOPPED
state.done
})
}
@@ -130,12 +134,12 @@ trait BaseService extends Service with L
case e:Exception =>
error(e, "Stop failed due to: %s", e)
_serviceFailure = e
- _serviceState = FAILED
+ _serviceState = new FAILED
state.done
}
case state:STOPPING =>
state << onCompleted
- case STOPPED =>
+ case state:STOPPED =>
done
case state =>
done
Modified: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/LongCounter.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/LongCounter.scala?rev=961187&r1=961186&r2=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/LongCounter.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/LongCounter.scala Wed Jul 7 04:15:31 2010
@@ -32,6 +32,7 @@ class LongCounter(private var value:Long
def clear() = value=0
def get() = value
+ def set(value:Long) = this.value = value
def incrementAndGet() = addAndGet(1)
def decrementAndGet() = addAndGet(-1)
Modified: activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/BrokerRegistry.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/BrokerRegistry.scala?rev=961187&r1=961186&r2=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/BrokerRegistry.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/BrokerRegistry.scala Wed Jul 7 04:15:31 2010
@@ -16,8 +16,8 @@
*/
package org.apache.activemq.apollo
-import java.util.HashMap
import org.apache.activemq.apollo.broker.Broker
+import java.util.concurrent.ConcurrentHashMap
/**
* <p>
@@ -29,17 +29,11 @@ object BrokerRegistry {
var configStore:ConfigStore = _
- private val _brokers = new HashMap[String, Broker]()
+ val brokers = new ConcurrentHashMap[String, Broker]()
- def get(id:String) = _brokers.synchronized {
- _brokers.get(id)
- }
-
- def add(broker:Broker) = _brokers.synchronized {
- _brokers.put(broker.config.id, broker)
- }
-
- def remove(id:String) = _brokers.synchronized {
- _brokers.remove(id)
- }
+ def get(id:String) = brokers.get(id)
+
+ def add(broker:Broker) = brokers.put(broker.config.id, broker)
+
+ def remove(id:String) = brokers.remove(id)
}
\ No newline at end of file
Modified: activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/ServletContextListener.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/ServletContextListener.scala?rev=961187&r1=961186&r2=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/ServletContextListener.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/ServletContextListener.scala Wed Jul 7 04:15:31 2010
@@ -34,8 +34,6 @@ import org.fusesource.hawtdispatch.Scala
*/
class ServletContextListener extends GuiceServletContextListener {
- var broker:Broker = null
-
override def contextInitialized(servletContextEvent: ServletContextEvent) = {
try {
@@ -43,19 +41,25 @@ class ServletContextListener extends Gui
// Brokers startup async.
BrokerRegistry.configStore.foreachBroker(true) { config=>
+
+ println("Config store contained broker: "+config.id);
+
// Only start the broker up if it's enabled..
if( config.enabled ) {
+
+ println("starting broker: "+config.id);
val broker = new Broker()
broker.config = config
BrokerRegistry.add(broker)
broker.start()
+
}
+
}
}
catch {
case e:Exception =>
- broker = null
e.printStackTrace
}
@@ -64,7 +68,8 @@ class ServletContextListener extends Gui
override def contextDestroyed(servletContextEvent: ServletContextEvent) = {
super.contextDestroyed(servletContextEvent);
- val tracker = new LoggingTracker("broker shutdown")
+
+ val tracker = new LoggingTracker("webapp shutdown")
BrokerRegistry.configStore.foreachBroker(false) { config=>
// remove started brokers what we configured..
val broker = BrokerRegistry.remove(config.id);
Modified: activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerStatus.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerStatus.scala?rev=961187&r1=961186&r2=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerStatus.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerStatus.scala Wed Jul 7 04:15:31 2010
@@ -16,10 +16,17 @@
*/
package org.apache.activemq.apollo.web.resources;
-import java.lang.String
import javax.ws.rs._
-import reflect.{BeanProperty}
-import org.apache.activemq.apollo.dto.{ConnectionStatusDTO, ConnectorStatusDTO, VirtualHostStatusDTO, BrokerStatusDTO}
+import core.Response
+import org.apache.activemq.apollo.BrokerRegistry
+import Response.Status._
+import java.util.List
+import org.apache.activemq.apollo.dto._
+import java.{lang => jl}
+import collection.JavaConversions
+import org.fusesource.hawtdispatch.{ScalaDispatch, Future}
+import ScalaDispatch._
+import org.apache.activemq.apollo.broker._
/**
* <p>
@@ -28,55 +35,188 @@ import org.apache.activemq.apollo.dto.{C
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-case class BrokerStatus(parent:Broker, @BeanProperty id:String) extends Resource {
+case class BrokerStatus(parent:Broker) extends Resource {
+
+ val broker:org.apache.activemq.apollo.broker.Broker = BrokerRegistry.get(parent.id)
+ if( broker == null ) {
+ println("not in regisitry: "+BrokerRegistry.brokers)
+ result(NOT_FOUND)
+ }
+
@GET
def get() = {
- val rc = new BrokerStatusDTO
- rc.id = id
- rc
+ println("get hit")
+ Future[BrokerStatusDTO] { cb=>
+ broker.dispatchQueue {
+ println("building result...")
+ val result = new BrokerStatusDTO
+
+ result.id = broker.id
+ result.currentTime = System.currentTimeMillis
+ result.state = broker.serviceState.toString
+ result.stateSince - broker.serviceState.since
+ result.config = broker.config
+
+ broker.connectors.foreach{ c=>
+ result.connectors.add(c.id)
+ }
+
+ broker.virtualHosts.values.foreach{ host=>
+ result.virtualHosts.add( host.id )
+ }
+
+
+ cb(result)
+ }
+ }
}
+
@Path("virtual-hosts")
- def virtualHosts :Array[VirtualHostStatus] = null
- @Path("virtual-hosts/{id}")
- def virtualHost(@PathParam("id") id : String):VirtualHostStatus = null
+ def virtualHosts :Array[jl.Long] = {
+ val list: List[jl.Long] = get.virtualHosts
+ list.toArray(new Array[jl.Long](list.size))
+ }
- @Path("connectors")
- def connectors :Array[ConnectorStatus] = null
- @Path("connectors/{id}")
- def connector(@PathParam("id") id : String):ConnectorStatus = null
+ private def with_virtual_host[T](id:Long)(func: (VirtualHost, Option[T]=>Unit)=>Unit):T = {
+ Future[Option[T]] { cb=>
+ broker.virtualHosts.valuesIterator.find( _.id == id) match {
+ case Some(virtualHost)=>
+ virtualHost.dispatchQueue {
+ func(virtualHost, cb)
+ }
+ case None=> cb(None)
+ }
+ }.getOrElse(result(NOT_FOUND))
+ }
- @Path("connections")
- def connections :Array[ConnectionStatus] = null
- @Path("connections/{id}")
- def connection(@PathParam("id") id : String):ConnectionStatus = null
-}
+ @Path("virtual-hosts/{id}")
+ def virtualHost(@PathParam("id") id : Long):VirtualHostStatusDTO = {
+ with_virtual_host(id) { case (virtualHost,cb) =>
+ val result = new VirtualHostStatusDTO
+ result.id = virtualHost.id
+ result.state = virtualHost.serviceState.toString
+ result.stateSince = virtualHost.serviceState.since
+ result.config = virtualHost.config
+
+ if( virtualHost.store != null ) {
+ result.storeType = virtualHost.store.storeType
+ }
+ virtualHost.router.destinations.valuesIterator.foreach { node=>
+ val summary = new DestinationSummaryDTO
+ summary.id = node.id
+ summary.name = node.destination.getName.toString
+ summary.domain = node.destination.getDomain.toString
+ result.destinations.add(summary)
+ }
+ cb(Some(result))
+ }
+ }
-case class VirtualHostStatus(parent:BrokerStatus, @BeanProperty id: String) extends Resource {
- @GET
- def get() = {
- val rc = new VirtualHostStatusDTO
- rc.id = id
- rc
+ @Path("virtual-hosts/{id}/destinations/{dest}")
+ def destination(@PathParam("id") id : Long, @PathParam("dest") dest : Long):DestinationStatusDTO = {
+ with_virtual_host(id) { case (virtualHost,cb) =>
+ cb(virtualHost.router.destinations.valuesIterator.find { _.id == dest } map { node=>
+ val result = new DestinationStatusDTO
+ result.id = node.id
+ result.name = node.destination.getName.toString
+ result.domain = node.destination.getDomain.toString
+
+ node match {
+ case qdn:virtualHost.router.QueueDestinationNode =>
+ result.queues.add(qdn.queue.id)
+ case _ =>
+ }
+ result
+ })
+ }
}
-}
-case class ConnectorStatus(parent:BrokerStatus, @BeanProperty id: String) extends Resource {
+ @Path("virtual-hosts/{id}/queues/{queue}")
+ def queue(@PathParam("id") id : Long, @PathParam("queue") qid : Long):QueueStatusDTO = {
+ with_virtual_host(id) { case (virtualHost,cb) =>
+ import JavaConversions._
+ virtualHost.queues.valuesIterator.find { _.id == qid } match {
+ case Some(q:Queue)=>
+ q.dispatchQueue {
+
+ val result = new QueueStatusDTO
+ result.id = q.id
+ result.capacity = q.capacity
+
+ result.enqueueItemCounter = q.enqueue_item_counter
+ result.dequeueItemCounter = q.dequeue_item_counter
+ result.enqueueSizeCounter = q.enqueue_size_counter
+ result.dequeueSizeCounter = q.dequeue_size_counter
+ result.nackItemCounter = q.nack_item_counter
+ result.nackSizeCounter = q.nack_size_counter
+
+ result.queueSize = q.queue_size
+ result.queueItems = q.queue_items
+
+ result.loadingSize = q.loading_size
+ result.flushingSize = q.flushing_size
+ result.flushedItems = q.flushed_items
+
+
+ var cur = q.head_entry
+ while( cur!=null ) {
+
+ val e = new EntryStatusDTO
+ e.seq = cur.seq
+ e.count = cur.count
+ e.size = cur.size
+ e.consumers = cur.parked.size
+ e.prefetched = cur.prefetched
+ e.state = cur.label
+
+ result.entries.add(e)
+
+ cur = if( cur == q.tail_entry ) {
+ null
+ } else {
+ cur.nextOrTail
+ }
+ }
+
+ cb(Some(result))
+ }
+ case None=>
+ cb(None)
+ }
+ }
+ }
- @GET
- def get() = {
- val rc = new ConnectorStatusDTO
- rc.id = id
- rc
+
+ @Path("connectors")
+ def connectors :Array[jl.Long] = {
+ val list: List[jl.Long] = get.connectors
+ list.toArray(new Array[jl.Long](list.size))
}
-}
-case class ConnectionStatus(parent:BrokerStatus, @BeanProperty id:String) extends Resource {
+ @Path("connectors/{id}")
+ def connector(@PathParam("id") id : Long):ConnectorStatusDTO = {
+
+ Future[Option[ConnectorStatusDTO]] { cb=>
+ broker.connectors.find(_.id == id) match {
+ case Some(connector)=>
+ connector.dispatchQueue {
+ val result = new ConnectorStatusDTO
+ result.id = connector.id
+ result.state = connector.serviceState.toString
+ result.stateSince = connector.serviceState.since
+ result.config = connector.config
+
+ result.accepted = connector.accept_counter.get
+ connector.connections.keysIterator.foreach { id=>
+ result.connections.add(id)
+ }
+ cb(Some(result))
+ }
+ case None=> cb(None)
+ }
+ }.getOrElse(result(NOT_FOUND))
- @GET
- def get() = {
- val rc = new ConnectionStatusDTO
- rc.id = id
- rc
}
+
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/Root.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/Root.scala?rev=961187&r1=961186&r2=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/Root.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/Root.scala Wed Jul 7 04:15:31 2010
@@ -123,6 +123,6 @@ case class Broker(parent:Root, @BeanProp
}
@Path("status")
- def status = BrokerStatus(this, id)
+ def status = BrokerStatus(this)
}