You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:05:22 UTC

svn commit: r961125 - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/ activemq-stomp/src/main/scala/org/apache/a...

Author: chirino
Date: Wed Jul  7 04:05:22 2010
New Revision: 961125

URL: http://svn.apache.org/viewvc?rev=961125&view=rev
Log:
cassandra based persistence and recovery seems to be working..

Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/Schema.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala
    activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/broker/LoggingTracker.scala

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=961125&r1=961124&r2=961125&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Wed Jul  7 04:05:22 2010
@@ -198,16 +198,21 @@ class Broker() extends BaseService with 
       }
     }
 
-    // Start them up..
+    // Start up the virtual hosts
     val tracker = new LoggingTracker("broker startup", dispatchQueue)
     virtualHosts.valuesIterator.foreach( x=>
       tracker.start(x)
     )
-    connectors.foreach( x=>
-      tracker.start(x)
-    )
 
-    tracker.callback(onCompleted)
+    // Once virtual hosts are up.. start up the connectors.
+    tracker.callback(^{
+      val tracker = new LoggingTracker("broker startup", dispatchQueue)
+      connectors.foreach( x=>
+        tracker.start(x)
+      )
+      tracker.callback(onCompleted)
+    })
+
   }
 
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961125&r1=961124&r2=961125&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul  7 04:05:22 2010
@@ -56,14 +56,14 @@ object Queue extends Log {
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class Queue(val host: VirtualHost, val destination: Destination, val storeId: Long) extends BaseRetained with Route with DeliveryConsumer with DispatchLogging {
+class Queue(val host: VirtualHost, val destination: Destination) extends BaseRetained with Route with DeliveryConsumer with DispatchLogging {
   override protected def log = Queue
 
   import Queue._
 
   var consumerSubs = Map[DeliveryConsumer, Subscription]()
 
-  override val dispatchQueue: DispatchQueue = createQueue("queue:" + destination);
+  override val dispatchQueue: DispatchQueue = createQueue(destination.toString);
   dispatchQueue.setTargetQueue(getRandomThreadQueue)
   dispatchQueue {
     debug("created queue for: " + destination)
@@ -90,11 +90,7 @@ class Queue(val host: VirtualHost, val d
   val session_manager = new SinkMux[Delivery](messages, dispatchQueue, Delivery)
 
   // sequence numbers.. used to track what's in the store.
-  var first_seq = -1L
-  var last_seq = -1L
   var message_seq_counter = 0L
-  var count = 0
-
 
   val headEntry = new QueueEntry(this).tombstone
   var tailEntry = new QueueEntry(this)
@@ -104,6 +100,7 @@ class Queue(val host: VirtualHost, val d
   entries.addFirst(headEntry)
 
   var flushingSize = 0
+  var storeId: Long = -1L
 
   /**
    * Tunning options.
@@ -114,6 +111,18 @@ class Queue(val host: VirtualHost, val d
 
   private var size = 0
 
+  def restore(storeId: Long, records:Seq[QueueEntryRecord]) = ^{
+    this.storeId = storeId
+    records.foreach { qer =>
+      val entry = new QueueEntry(Queue.this).stored(qer)
+      entries.addLast(entry)
+    }
+    if( !entries.isEmpty ) {
+      message_seq_counter = entries.getTail.seq
+    }
+    debug("restored: "+records.size )
+  } >>: dispatchQueue
+
   object messages extends Sink[Delivery] {
 
     var refiller: Runnable = null
@@ -298,7 +307,7 @@ class Queue(val host: VirtualHost, val d
       def compareTo(o: Prio) = o.value - value
     }
 
-    val prios = new ArrayList[Prio](count)
+    val prios = new ArrayList[Prio](entries.size())
 
     var entry = entries.getHead
     while( entry!=null ) {
@@ -357,15 +366,7 @@ class Queue(val host: VirtualHost, val d
         remaining -= entry.value.size
         val stored = entry.value.asStored
         if( stored!=null && !stored.loading) {
-          // start loading it back...
-          stored.loading = true
-          host.store.loadMessage(stored.ref) { delivery =>
-            // pass off to a source so it can aggregate multiple
-            // loads to reduce cross thread synchronization
-            if( delivery.isDefined ) {
-              store_load_source.merge((entry, delivery.get))
-            }
-          }
+          stored.load
         }
       } else {
         // Chuck the reset out...
@@ -390,11 +391,7 @@ class Queue(val host: VirtualHost, val d
 
   def drain_store_loads() = {
     val data = store_load_source.getData
-    var ready = List[QueueEntry]()
-
-    data.foreach { event =>
-      val entry = event._1
-      val stored = event._2
+    data.foreach { case (entry,stored) =>
 
       val delivery = new Delivery()
       delivery.message = ProtocolFactory.get(stored.protocol).decode(stored.value)
@@ -405,13 +402,12 @@ class Queue(val host: VirtualHost, val d
 
       size += entry.value.size
 
-      if( entry.hasSubs ) {
-        ready ::= entry
-      }
     }
 
-    ready.foreach { entry =>
-      entry.dispatch
+    data.foreach { case (entry,stored) =>
+      if( entry.hasSubs ) {
+        entry.run
+      }
     }
   }
 
@@ -473,6 +469,12 @@ class QueueEntry(val queue:Queue) extend
     this
   }
 
+  def stored(qer:QueueEntryRecord) = {
+    this.seq = qer.queueSeq
+    this.value = new Stored(qer.messageKey, qer.size)
+    this
+  }
+
   def tombstone = {
     this.value = new Tombstone()
     if( seq != -1L ) {
@@ -604,8 +606,36 @@ class QueueEntry(val queue:Queue) extend
     // Stored entries can't be dispatched until
     // they get loaded.
     def dispatch():QueueEntry = {
+      if( !loading ) {
+        var remaining = queue.tune_subscription_prefetch - size
+        load
+
+        // make sure the next few entries are loaded too..
+        var cur = getNext
+        while( remaining>0 && cur!=null ) {
+          remaining -= cur.value.size
+          val stored = cur.value.asStored
+          if( stored!=null && !stored.loading) {
+            stored.load
+          }
+          cur = getNext
+        }
+
+      }
       null
     }
+
+    def load() = {
+      // start loading it back...
+      loading = true
+      queue.host.store.loadMessage(ref) { delivery =>
+        // pass off to a source so it can aggregate multiple
+        // loads to reduce cross thread synchronization
+        if( delivery.isDefined ) {
+          queue.store_load_source.merge((QueueEntry.this, delivery.get))
+        }
+      }
+    }
   }
 
   class Loaded(val delivery: Delivery) extends EntryType {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=961125&r1=961124&r2=961125&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jul  7 04:05:22 2010
@@ -121,32 +121,34 @@ class VirtualHost(val broker: Broker) ex
     val tracker = new LoggingTracker("virtual host startup", dispatchQueue)
     store = StoreFactory.create(config.store)
     if( store!=null ) {
-      val task = tracker.task("store startup")
-
+      val task = tracker.task("store list queue keys")
       store.start(^{
-        store.listQueues { ids =>
-          for( id <- ids) {
-            store.getQueueStatus(id) { x =>
-              x match {
-                case Some(info)=>
-                dispatchQueue ^{
-                  val dest = new SingleDestination(Domain.QUEUE_DOMAIN, info.record.name)
-
-                  val queue = new Queue(this, dest, id)
-                  queue.first_seq = info.first
-                  queue.last_seq = info.last
-                  queue.message_seq_counter = info.last+1
-                  queue.count = info.count
-
-                  queues.put(info.record.name, queue)
+        store.listQueues { queueKeys =>
+          for( queueKey <- queueKeys) {
+            val task = tracker.task("store load queue key: "+queueKey)
+            // Use a global queue to so we concurrently restore
+            // the queues.
+            globalQueue {
+              store.getQueueStatus(queueKey) { x =>
+                x match {
+                  case Some(info)=>
+                  store.getQueueEntries(queueKey) { entries=>
+                    dispatchQueue ^{
+                      val dest = DestinationParser.parse(info.record.name, destination_parser_options)
+                      val queue = new Queue(this, dest)
+                      queue.restore(queueKey, entries)
+                      queues.put(dest.getName, queue)
+                      task.run
+                    }
+                  }
+                  case _ =>
+                    task.run
                 }
-                case _ =>
               }
             }
           }
+          task.run
         }
-        task.run
-
       });
     }
 
@@ -186,7 +188,7 @@ class VirtualHost(val broker: Broker) ex
       error("getQueue can only be called while the service is running.")
       cb(null)
     } else {
-      var queue = queues.get(destination);
+      var queue = queues.get(destination.getName);
       if( queue==null && config.autoCreateQueues ) {
         addQueue(destination)(cb)
       } else  {
@@ -203,10 +205,11 @@ class VirtualHost(val broker: Broker) ex
       record.name = name
       store.addQueue(record) { rc =>
         rc match {
-          case Some(id) =>
+          case Some(queueKey) =>
             dispatchQueue ^ {
-              val queue = new Queue(this, dest, id)
-              queues.put(name, queue)
+              val queue = new Queue(this, dest)
+              queue.restore(queueKey, Nil)
+              queues.put(dest.getName, queue)
               cb(queue)
             }
           case None => // store could not create
@@ -214,8 +217,8 @@ class VirtualHost(val broker: Broker) ex
         }
       }
     } else {
-      val queue = new Queue(this, dest, -1)
-      queues.put(name, queue)
+      val queue = new Queue(this, dest)
+      queues.put(dest.getName, queue)
       cb(queue)
     }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala?rev=961125&r1=961124&r2=961125&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala Wed Jul  7 04:05:22 2010
@@ -125,9 +125,9 @@ class CassandraClient() {
             val rc = new QueueStatus
             rc.record = new QueueRecord
             rc.record.key = id
-            rc.record.name = new AsciiBuffer(x)
+            rc.record.name = new AsciiBuffer(x.value)
 
-            rc.count = session.count( schema.entries \ id )
+//            rc.count = session.count( schema.entries \ id )
             
             // TODO
             //          rc.count =
@@ -145,30 +145,30 @@ class CassandraClient() {
   def store(txs:Seq[CassandraStore#CassandraBatch]) {
     withSession {
       session =>
-        var batch = List[Operation]()
+        var operations = List[Operation]()
         txs.foreach {
           tx =>
             tx.actions.foreach {
               case (msg, action) =>
                 var rc =
                 if (action.store != null) {
-                  batch ::= Insert( schema.message_data \ (msg, action.store) )
+                  operations ::= Insert( schema.message_data \ (msg, action.store) )
                 }
                 action.enqueues.foreach {
                   queueEntry =>
                     val qid = queueEntry.queueKey
                     val seq = queueEntry.queueSeq
-                    batch ::= Insert( schema.entries \ qid \ (seq, queueEntry) )
+                    operations ::= Insert( schema.entries \ qid \ (seq, queueEntry) )
                 }
                 action.dequeues.foreach {
                   queueEntry =>
                     val qid = queueEntry.queueKey
                     val seq = queueEntry.queueSeq
-                    batch ::= Delete( schema.entries \ qid, ColumnPredicate(seq :: Nil) )
+                    operations ::= Delete( schema.entries \ qid, ColumnPredicate(seq :: Nil) )
                 }
             }
         }
-        session.batch(batch)
+        session.batch(operations)
     }
   }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala?rev=961125&r1=961124&r2=961125&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala Wed Jul  7 04:05:22 2010
@@ -131,6 +131,7 @@ class CassandraStore extends Store with 
 
   def addQueue(record: QueueRecord)(cb: (Option[Long]) => Unit) = {
     val key = next_queue_key.incrementAndGet
+    record.key = key
     executor_pool ^{
       client.addQueue(record)
       cb(Some(key))

Modified: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/Schema.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/Schema.scala?rev=961125&r1=961124&r2=961125&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/Schema.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/Schema.scala Wed Jul  7 04:05:22 2010
@@ -23,8 +23,6 @@ import com.shorrockin.cascal.session.Col
 
 case class Schema(name:String) {
 
-  implicit def toByteArray(buffer:Buffer) = buffer.toByteArray
-
   val keyspace = Keyspace(name)
 
   /**

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala?rev=961125&r1=961124&r2=961125&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala Wed Jul  7 04:05:22 2010
@@ -87,7 +87,11 @@ class StompWireFormat extends WireFormat
   }
 
   def unmarshal(packet:Buffer):AnyRef = {
-    unmarshalNB(packet.toByteBuffer)
+    start = packet.offset
+    end = packet.offset
+    val bb = packet.toByteBuffer
+    bb.position(packet.offset + packet.length)
+    unmarshalNB(bb)
   }
 
   def unmarshal(in: DataInput):AnyRef = {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/broker/LoggingTracker.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/broker/LoggingTracker.scala?rev=961125&r1=961124&r2=961125&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/broker/LoggingTracker.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/broker/LoggingTracker.scala Wed Jul  7 04:05:22 2010
@@ -30,7 +30,7 @@ import org.fusesource.hawtdispatch.{Task
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class LoggingTracker(name:String, parent:DispatchQueue=getGlobalQueue) extends TaskTracker(name, parent) with Logging {
+class LoggingTracker(name:String, parent:DispatchQueue=globalQueue) extends TaskTracker(name, parent) with Logging {
 
   timeout = 1000;
 
@@ -52,7 +52,7 @@ class LoggingTracker(name:String, parent
 }
 
 object LoggingTracker extends Log {
-  def apply[R](name:String, parent:DispatchQueue=getGlobalQueue)(func: (LoggingTracker)=>Unit ) = {
+  def apply[R](name:String, parent:DispatchQueue=globalQueue)(func: (LoggingTracker)=>Unit ) = {
     val t = new LoggingTracker(name, parent)
     func(t)
     t.await