You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:05:22 UTC
svn commit: r961125 - in /activemq/sandbox/activemq-apollo-actor:
activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/
activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/
activemq-stomp/src/main/scala/org/apache/a...
Author: chirino
Date: Wed Jul 7 04:05:22 2010
New Revision: 961125
URL: http://svn.apache.org/viewvc?rev=961125&view=rev
Log:
cassandra based persistence and recovery seems to be working..
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/Schema.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala
activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/broker/LoggingTracker.scala
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=961125&r1=961124&r2=961125&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Wed Jul 7 04:05:22 2010
@@ -198,16 +198,21 @@ class Broker() extends BaseService with
}
}
- // Start them up..
+ // Start up the virtual hosts
val tracker = new LoggingTracker("broker startup", dispatchQueue)
virtualHosts.valuesIterator.foreach( x=>
tracker.start(x)
)
- connectors.foreach( x=>
- tracker.start(x)
- )
- tracker.callback(onCompleted)
+ // Once virtual hosts are up.. start up the connectors.
+ tracker.callback(^{
+ val tracker = new LoggingTracker("broker startup", dispatchQueue)
+ connectors.foreach( x=>
+ tracker.start(x)
+ )
+ tracker.callback(onCompleted)
+ })
+
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961125&r1=961124&r2=961125&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul 7 04:05:22 2010
@@ -56,14 +56,14 @@ object Queue extends Log {
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class Queue(val host: VirtualHost, val destination: Destination, val storeId: Long) extends BaseRetained with Route with DeliveryConsumer with DispatchLogging {
+class Queue(val host: VirtualHost, val destination: Destination) extends BaseRetained with Route with DeliveryConsumer with DispatchLogging {
override protected def log = Queue
import Queue._
var consumerSubs = Map[DeliveryConsumer, Subscription]()
- override val dispatchQueue: DispatchQueue = createQueue("queue:" + destination);
+ override val dispatchQueue: DispatchQueue = createQueue(destination.toString);
dispatchQueue.setTargetQueue(getRandomThreadQueue)
dispatchQueue {
debug("created queue for: " + destination)
@@ -90,11 +90,7 @@ class Queue(val host: VirtualHost, val d
val session_manager = new SinkMux[Delivery](messages, dispatchQueue, Delivery)
// sequence numbers.. used to track what's in the store.
- var first_seq = -1L
- var last_seq = -1L
var message_seq_counter = 0L
- var count = 0
-
val headEntry = new QueueEntry(this).tombstone
var tailEntry = new QueueEntry(this)
@@ -104,6 +100,7 @@ class Queue(val host: VirtualHost, val d
entries.addFirst(headEntry)
var flushingSize = 0
+ var storeId: Long = -1L
/**
* Tunning options.
@@ -114,6 +111,18 @@ class Queue(val host: VirtualHost, val d
private var size = 0
+ def restore(storeId: Long, records:Seq[QueueEntryRecord]) = ^{
+ this.storeId = storeId
+ records.foreach { qer =>
+ val entry = new QueueEntry(Queue.this).stored(qer)
+ entries.addLast(entry)
+ }
+ if( !entries.isEmpty ) {
+ message_seq_counter = entries.getTail.seq
+ }
+ debug("restored: "+records.size )
+ } >>: dispatchQueue
+
object messages extends Sink[Delivery] {
var refiller: Runnable = null
@@ -298,7 +307,7 @@ class Queue(val host: VirtualHost, val d
def compareTo(o: Prio) = o.value - value
}
- val prios = new ArrayList[Prio](count)
+ val prios = new ArrayList[Prio](entries.size())
var entry = entries.getHead
while( entry!=null ) {
@@ -357,15 +366,7 @@ class Queue(val host: VirtualHost, val d
remaining -= entry.value.size
val stored = entry.value.asStored
if( stored!=null && !stored.loading) {
- // start loading it back...
- stored.loading = true
- host.store.loadMessage(stored.ref) { delivery =>
- // pass off to a source so it can aggregate multiple
- // loads to reduce cross thread synchronization
- if( delivery.isDefined ) {
- store_load_source.merge((entry, delivery.get))
- }
- }
+ stored.load
}
} else {
// Chuck the reset out...
@@ -390,11 +391,7 @@ class Queue(val host: VirtualHost, val d
def drain_store_loads() = {
val data = store_load_source.getData
- var ready = List[QueueEntry]()
-
- data.foreach { event =>
- val entry = event._1
- val stored = event._2
+ data.foreach { case (entry,stored) =>
val delivery = new Delivery()
delivery.message = ProtocolFactory.get(stored.protocol).decode(stored.value)
@@ -405,13 +402,12 @@ class Queue(val host: VirtualHost, val d
size += entry.value.size
- if( entry.hasSubs ) {
- ready ::= entry
- }
}
- ready.foreach { entry =>
- entry.dispatch
+ data.foreach { case (entry,stored) =>
+ if( entry.hasSubs ) {
+ entry.run
+ }
}
}
@@ -473,6 +469,12 @@ class QueueEntry(val queue:Queue) extend
this
}
+ def stored(qer:QueueEntryRecord) = {
+ this.seq = qer.queueSeq
+ this.value = new Stored(qer.messageKey, qer.size)
+ this
+ }
+
def tombstone = {
this.value = new Tombstone()
if( seq != -1L ) {
@@ -604,8 +606,36 @@ class QueueEntry(val queue:Queue) extend
// Stored entries can't be dispatched until
// they get loaded.
def dispatch():QueueEntry = {
+ if( !loading ) {
+ var remaining = queue.tune_subscription_prefetch - size
+ load
+
+ // make sure the next few entries are loaded too..
+ var cur = getNext
+ while( remaining>0 && cur!=null ) {
+ remaining -= cur.value.size
+ val stored = cur.value.asStored
+ if( stored!=null && !stored.loading) {
+ stored.load
+ }
+ cur = getNext
+ }
+
+ }
null
}
+
+ def load() = {
+ // start loading it back...
+ loading = true
+ queue.host.store.loadMessage(ref) { delivery =>
+ // pass off to a source so it can aggregate multiple
+ // loads to reduce cross thread synchronization
+ if( delivery.isDefined ) {
+ queue.store_load_source.merge((QueueEntry.this, delivery.get))
+ }
+ }
+ }
}
class Loaded(val delivery: Delivery) extends EntryType {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=961125&r1=961124&r2=961125&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jul 7 04:05:22 2010
@@ -121,32 +121,34 @@ class VirtualHost(val broker: Broker) ex
val tracker = new LoggingTracker("virtual host startup", dispatchQueue)
store = StoreFactory.create(config.store)
if( store!=null ) {
- val task = tracker.task("store startup")
-
+ val task = tracker.task("store list queue keys")
store.start(^{
- store.listQueues { ids =>
- for( id <- ids) {
- store.getQueueStatus(id) { x =>
- x match {
- case Some(info)=>
- dispatchQueue ^{
- val dest = new SingleDestination(Domain.QUEUE_DOMAIN, info.record.name)
-
- val queue = new Queue(this, dest, id)
- queue.first_seq = info.first
- queue.last_seq = info.last
- queue.message_seq_counter = info.last+1
- queue.count = info.count
-
- queues.put(info.record.name, queue)
+ store.listQueues { queueKeys =>
+ for( queueKey <- queueKeys) {
+ val task = tracker.task("store load queue key: "+queueKey)
+ // Use a global queue to so we concurrently restore
+ // the queues.
+ globalQueue {
+ store.getQueueStatus(queueKey) { x =>
+ x match {
+ case Some(info)=>
+ store.getQueueEntries(queueKey) { entries=>
+ dispatchQueue ^{
+ val dest = DestinationParser.parse(info.record.name, destination_parser_options)
+ val queue = new Queue(this, dest)
+ queue.restore(queueKey, entries)
+ queues.put(dest.getName, queue)
+ task.run
+ }
+ }
+ case _ =>
+ task.run
}
- case _ =>
}
}
}
+ task.run
}
- task.run
-
});
}
@@ -186,7 +188,7 @@ class VirtualHost(val broker: Broker) ex
error("getQueue can only be called while the service is running.")
cb(null)
} else {
- var queue = queues.get(destination);
+ var queue = queues.get(destination.getName);
if( queue==null && config.autoCreateQueues ) {
addQueue(destination)(cb)
} else {
@@ -203,10 +205,11 @@ class VirtualHost(val broker: Broker) ex
record.name = name
store.addQueue(record) { rc =>
rc match {
- case Some(id) =>
+ case Some(queueKey) =>
dispatchQueue ^ {
- val queue = new Queue(this, dest, id)
- queues.put(name, queue)
+ val queue = new Queue(this, dest)
+ queue.restore(queueKey, Nil)
+ queues.put(dest.getName, queue)
cb(queue)
}
case None => // store could not create
@@ -214,8 +217,8 @@ class VirtualHost(val broker: Broker) ex
}
}
} else {
- val queue = new Queue(this, dest, -1)
- queues.put(name, queue)
+ val queue = new Queue(this, dest)
+ queues.put(dest.getName, queue)
cb(queue)
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala?rev=961125&r1=961124&r2=961125&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala Wed Jul 7 04:05:22 2010
@@ -125,9 +125,9 @@ class CassandraClient() {
val rc = new QueueStatus
rc.record = new QueueRecord
rc.record.key = id
- rc.record.name = new AsciiBuffer(x)
+ rc.record.name = new AsciiBuffer(x.value)
- rc.count = session.count( schema.entries \ id )
+// rc.count = session.count( schema.entries \ id )
// TODO
// rc.count =
@@ -145,30 +145,30 @@ class CassandraClient() {
def store(txs:Seq[CassandraStore#CassandraBatch]) {
withSession {
session =>
- var batch = List[Operation]()
+ var operations = List[Operation]()
txs.foreach {
tx =>
tx.actions.foreach {
case (msg, action) =>
var rc =
if (action.store != null) {
- batch ::= Insert( schema.message_data \ (msg, action.store) )
+ operations ::= Insert( schema.message_data \ (msg, action.store) )
}
action.enqueues.foreach {
queueEntry =>
val qid = queueEntry.queueKey
val seq = queueEntry.queueSeq
- batch ::= Insert( schema.entries \ qid \ (seq, queueEntry) )
+ operations ::= Insert( schema.entries \ qid \ (seq, queueEntry) )
}
action.dequeues.foreach {
queueEntry =>
val qid = queueEntry.queueKey
val seq = queueEntry.queueSeq
- batch ::= Delete( schema.entries \ qid, ColumnPredicate(seq :: Nil) )
+ operations ::= Delete( schema.entries \ qid, ColumnPredicate(seq :: Nil) )
}
}
}
- session.batch(batch)
+ session.batch(operations)
}
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala?rev=961125&r1=961124&r2=961125&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala Wed Jul 7 04:05:22 2010
@@ -131,6 +131,7 @@ class CassandraStore extends Store with
def addQueue(record: QueueRecord)(cb: (Option[Long]) => Unit) = {
val key = next_queue_key.incrementAndGet
+ record.key = key
executor_pool ^{
client.addQueue(record)
cb(Some(key))
Modified: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/Schema.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/Schema.scala?rev=961125&r1=961124&r2=961125&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/Schema.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/Schema.scala Wed Jul 7 04:05:22 2010
@@ -23,8 +23,6 @@ import com.shorrockin.cascal.session.Col
case class Schema(name:String) {
- implicit def toByteArray(buffer:Buffer) = buffer.toByteArray
-
val keyspace = Keyspace(name)
/**
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala?rev=961125&r1=961124&r2=961125&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala Wed Jul 7 04:05:22 2010
@@ -87,7 +87,11 @@ class StompWireFormat extends WireFormat
}
def unmarshal(packet:Buffer):AnyRef = {
- unmarshalNB(packet.toByteBuffer)
+ start = packet.offset
+ end = packet.offset
+ val bb = packet.toByteBuffer
+ bb.position(packet.offset + packet.length)
+ unmarshalNB(bb)
}
def unmarshal(in: DataInput):AnyRef = {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/broker/LoggingTracker.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/broker/LoggingTracker.scala?rev=961125&r1=961124&r2=961125&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/broker/LoggingTracker.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/broker/LoggingTracker.scala Wed Jul 7 04:05:22 2010
@@ -30,7 +30,7 @@ import org.fusesource.hawtdispatch.{Task
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class LoggingTracker(name:String, parent:DispatchQueue=getGlobalQueue) extends TaskTracker(name, parent) with Logging {
+class LoggingTracker(name:String, parent:DispatchQueue=globalQueue) extends TaskTracker(name, parent) with Logging {
timeout = 1000;
@@ -52,7 +52,7 @@ class LoggingTracker(name:String, parent
}
object LoggingTracker extends Log {
- def apply[R](name:String, parent:DispatchQueue=getGlobalQueue)(func: (LoggingTracker)=>Unit ) = {
+ def apply[R](name:String, parent:DispatchQueue=globalQueue)(func: (LoggingTracker)=>Unit ) = {
val t = new LoggingTracker(name, parent)
func(t)
t.await