You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:05:54 UTC

svn commit: r961126 - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/ activemq-cassandra/src/main/resources/ activemq-c...

Author: chirino
Date: Wed Jul  7 04:05:53 2010
New Revision: 961126

URL: http://svn.apache.org/viewvc?rev=961126&view=rev
Log:
- swapping is enabled now
- store can be purged on startup
- couple of shutdown bug fixes in TcpTransport

Added:
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/resources/
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/resources/META-INF/
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/resources/META-INF/services/
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/resources/META-INF/services/org.apache.activemq.apollo/
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/resources/META-INF/services/org.apache.activemq.apollo/stores
Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala
    activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala
    activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
    activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala
    activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.java

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961126&r1=961125&r2=961126&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul  7 04:05:53 2010
@@ -26,6 +26,7 @@ import org.apache.activemq.util.list.{Li
 import org.apache.activemq.broker.store.{StoreBatch}
 import protocol.ProtocolFactory
 import org.apache.activemq.apollo.store.{QueueEntryRecord, MessageRecord}
+import java.util.concurrent.TimeUnit
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -99,6 +100,7 @@ class Queue(val host: VirtualHost, val d
   val entries = new LinkedNodeList[QueueEntry]()
   entries.addFirst(headEntry)
 
+  var loadingSize = 0
   var flushingSize = 0
   var storeId: Long = -1L
 
@@ -108,18 +110,26 @@ class Queue(val host: VirtualHost, val d
   var tune_max_size = 1024 * 32
   var tune_subscription_prefetch = 1024*32
   var tune_max_outbound_size = 1024 * 1204 * 5
+  var tune_swap_delay = 100L
+
+  var enqueue_counter = 0L
+  var dequeue_counter = 0L
+  var enqueue_size = 0L
+  var dequeue_size = 0L
 
   private var size = 0
 
   def restore(storeId: Long, records:Seq[QueueEntryRecord]) = ^{
     this.storeId = storeId
     records.foreach { qer =>
-      val entry = new QueueEntry(Queue.this).stored(qer)
+      val entry = new QueueEntry(Queue.this).flushed(qer)
       entries.addLast(entry)
     }
     if( !entries.isEmpty ) {
       message_seq_counter = entries.getTail.seq
     }
+    counter = records.size
+    enqueue_counter += records.size
     debug("restored: "+records.size )
   } >>: dispatchQueue
 
@@ -149,24 +159,46 @@ class Queue(val host: VirtualHost, val d
           delivery.storeBatch.release
         }
 
-        size += entry.value.size
+        size += entry.size
         entries.addLast(entry)
         counter += 1;
+        enqueue_counter += 1
+        enqueue_size += entry.size
 
-        if( full && host.store!=null ) {
-//          swap
+        var swap_check = false
+        if( !entry.hasSubs ) {
+          // we flush the entry out right away if it looks
+          // it wont be needed.
+          if( entry.getPrevious.isFlushedOrFlushing ) {
+            flushingSize += entry.flush
+          } else {
+            swap_check=true
+          }
+        } else {
+          //  entry.dispatch==null if the entry was fully dispatched
+          swap_check = entry.dispatch!=null
         }
 
-        if( entry.hasSubs ) {
-          entry.dispatch
+        // Does it look like we need to start swapping to make room
+        // for more messages?
+        if( swap_check && host.store!=null &&  full ) {
+          val wasAt = dequeue_size
+          dispatchQueue.dispatchAfter(tune_swap_delay, TimeUnit.MILLISECONDS, ^{
+            // start swapping if was still blocked after a short delay
+            if( dequeue_size == wasAt && full ) {
+              println("swapping...")
+              swap
+            }
+          })
         }
+
         true
       }
     }
   }
 
   def ack(entry: QueueEntry, sb:StoreBatch) = {
-    if (entry.value.ref != -1) {
+    if (entry.ref != -1) {
       val storeBatch = if( sb == null ) {
         host.store.createStoreBatch
       } else {
@@ -181,13 +213,13 @@ class Queue(val host: VirtualHost, val d
       sb.release
     }
 
+    dequeue_counter += 1
     counter -= 1
-    size -= entry.value.size
+    dequeue_size += entry.size
+    size -= entry.size
     entry.tombstone
 
-    if (counter == 0) {
-      messages.refiller.run
-    }
+    messages.refiller.run
   }
 
 
@@ -301,7 +333,18 @@ class Queue(val host: VirtualHost, val d
     rc
   }
 
-  def swap() = {
+  /**
+   * Prioritizes all the queue entries so that entries most likely to be consumed
+   * next are a higher priority.  All messages with the highest priority are loaded
+   * and messages with the lowest priority are flushed to make room to accept more
+   * messages from the producer.
+   */
+  def swap():Unit = {
+
+    if( !host.serviceState.isStarted ) {
+      return
+    }
+
     class Prio(val entry:QueueEntry) extends Comparable[Prio] {
       var value = 0
       def compareTo(o: Prio) = o.value - value
@@ -311,7 +354,7 @@ class Queue(val host: VirtualHost, val d
 
     var entry = entries.getHead
     while( entry!=null ) {
-      if( entry.value.asTombstone == null ) {
+      if( entry.asTombstone == null ) {
         prios.add(new Prio(entry))
       }
       entry = entry.getNext
@@ -325,7 +368,7 @@ class Queue(val host: VirtualHost, val d
     def prioritize(i:Int, size:Int, p:Int):Unit = {
       val prio = prios.get(i)
       prio.value += p
-      val remainingSize = size - prio.entry.value.size
+      val remainingSize = size - prio.entry.size
       if( remainingSize > 0 ) {
         val next = i + 1
         if( next < prios.size ) {
@@ -363,27 +406,10 @@ class Queue(val host: VirtualHost, val d
       val prio = prios.get(i)
       val entry = prio.entry
       if( remaining > 0 ) {
-        remaining -= entry.value.size
-        val stored = entry.value.asStored
-        if( stored!=null && !stored.loading) {
-          stored.load
-        }
+        loadingSize += entry.load
+        remaining -= entry.size
       } else {
-        // Chuck the reset out...
-        val loaded = entry.value.asLoaded
-        if( loaded!=null ) {
-          var ref = loaded.delivery.storeKey
-          if( ref == -1 ) {
-            val tx = host.store.createStoreBatch
-            loaded.delivery.storeKey = tx.store(loaded.delivery.createMessageRecord)
-            tx.enqueue(entry.createQueueEntryRecord)
-            tx.release
-          }
-          flushingSize += entry.value.size
-          host.store.flushMessage(ref) {
-            store_flush_source.merge(entry)
-          }
-        }
+        flushingSize += entry.flush
       }
       i += 1
     }
@@ -391,20 +417,22 @@ class Queue(val host: VirtualHost, val d
 
   def drain_store_loads() = {
     val data = store_load_source.getData
-    data.foreach { case (entry,stored) =>
+    data.foreach { case (entry,flushed) =>
+
+      loadingSize -= entry.size
 
       val delivery = new Delivery()
-      delivery.message = ProtocolFactory.get(stored.protocol).decode(stored.value)
-      delivery.size = stored.size
-      delivery.storeKey = stored.key
+      delivery.message = ProtocolFactory.get(flushed.protocol).decode(flushed.value)
+      delivery.size = flushed.size
+      delivery.storeKey = flushed.key
 
       entry.loaded(delivery)
 
-      size += entry.value.size
+      size += entry.size
 
     }
 
-    data.foreach { case (entry,stored) =>
+    data.foreach { case (entry,_) =>
       if( entry.hasSubs ) {
         entry.run
       }
@@ -413,22 +441,25 @@ class Queue(val host: VirtualHost, val d
 
   def drain_store_flushes() = {
     store_flush_source.getData.foreach { entry =>
-      flushingSize -= entry.value.size
+      flushingSize -= entry.size
 
       // by the time we get called back, subs my be interested in the entry
       // or it may have been acked.
-      if( !entry.hasSubs && entry.value.asLoaded!=null ) {
-        size += entry.value.size
-        entry.stored
+      if( !entry.hasSubs && entry.asLoaded!=null ) {
+        size -= entry.size
+        entry.flushed
       }
     }
+    
+    messages.refiller.run
+
   }
 
 }
 
 
 object QueueEntry extends Sizer[QueueEntry] {
-  def size(value: QueueEntry): Int = value.value.size
+  def size(value: QueueEntry): Int = value.size
 }
 
 class QueueEntry(val queue:Queue) extends LinkedNode[QueueEntry] with Comparable[QueueEntry] with Runnable {
@@ -463,15 +494,15 @@ class QueueEntry(val queue:Queue) extend
     this
   }
 
-  def stored() = {
+  def flushed() = {
     val loaded = value.asLoaded
-    this.value = new Stored(loaded.delivery.storeKey, loaded.size)
+    this.value = new Flushed(loaded.delivery.storeKey, loaded.size)
     this
   }
 
-  def stored(qer:QueueEntryRecord) = {
+  def flushed(qer:QueueEntryRecord) = {
     this.seq = qer.queueSeq
-    this.value = new Stored(qer.messageKey, qer.size)
+    this.value = new Flushed(qer.messageKey, qer.size)
     this
   }
 
@@ -519,7 +550,6 @@ class QueueEntry(val queue:Queue) extend
     this
   }
 
-
   def hasSubs = !(competing == Nil && browsing == Nil)
 
   def run() = {
@@ -529,15 +559,6 @@ class QueueEntry(val queue:Queue) extend
     }
   }
 
-  def dispatch():QueueEntry = {
-    if( value == null ) {
-      // tail entry can't be dispatched.
-      null
-    } else {
-      value.dispatch
-    }
-  }
-
   def addBrowsing(l:List[Subscription]) = {
     l.foreach(x=>x.position(this))
     browsing :::= l
@@ -566,6 +587,25 @@ class QueueEntry(val queue:Queue) extend
     entry
   }
 
+  def size = this.value.size
+  def flush = this.value.flush
+  def load = this.value.load
+  def ref = this.value.ref
+
+  def asTombstone = this.value.asTombstone
+  def asFlushed = this.value.asFlushed
+  def asLoaded = this.value.asLoaded
+  def isFlushedOrFlushing = value.isFlushedOrFlushing
+
+  def dispatch():QueueEntry = {
+    if( value == null ) {
+      // tail entry can't be dispatched.
+      null
+    } else {
+      value.dispatch
+    }
+  }
+
 
   trait EntryType {
     def size:Int
@@ -573,8 +613,12 @@ class QueueEntry(val queue:Queue) extend
     def ref:Long
 
     def asTombstone:Tombstone = null
-    def asStored:Stored = null
+    def asFlushed:Flushed = null
     def asLoaded:Loaded = null
+
+    def flush:Int = 0
+    def load:Int = 0
+    def isFlushedOrFlushing = false
   }
 
   class Tombstone extends EntryType {
@@ -597,13 +641,15 @@ class QueueEntry(val queue:Queue) extend
     
   }
 
-  class Stored(val ref:Long, val size:Int) extends EntryType {
+  class Flushed(val ref:Long, val size:Int) extends EntryType {
 
     var loading = false
 
-    override def asStored = this
+    override def asFlushed = this
+
+    override def isFlushedOrFlushing = true
 
-    // Stored entries can't be dispatched until
+    // Flushed entries can't be dispatched until
     // they get loaded.
     def dispatch():QueueEntry = {
       if( !loading ) {
@@ -613,10 +659,10 @@ class QueueEntry(val queue:Queue) extend
         // make sure the next few entries are loaded too..
         var cur = getNext
         while( remaining>0 && cur!=null ) {
-          remaining -= cur.value.size
-          val stored = cur.value.asStored
-          if( stored!=null && !stored.loading) {
-            stored.load
+          remaining -= cur.size
+          val flushed = cur.asFlushed
+          if( flushed!=null && !flushed.loading) {
+            flushed.load
           }
           cur = getNext
         }
@@ -625,15 +671,20 @@ class QueueEntry(val queue:Queue) extend
       null
     }
 
-    def load() = {
-      // start loading it back...
-      loading = true
-      queue.host.store.loadMessage(ref) { delivery =>
-        // pass off to a source so it can aggregate multiple
-        // loads to reduce cross thread synchronization
-        if( delivery.isDefined ) {
-          queue.store_load_source.merge((QueueEntry.this, delivery.get))
+    override def load():Int = {
+      if( loading ) {
+        0
+      } else {
+        // start loading it back...
+        loading = true
+        queue.host.store.loadMessage(ref) { delivery =>
+          // pass off to a source so it can aggregate multiple
+          // loads to reduce cross thread synchronization
+          if( delivery.isDefined ) {
+            queue.store_load_source.merge((QueueEntry.this, delivery.get))
+          }
         }
+        size
       }
     }
   }
@@ -643,10 +694,39 @@ class QueueEntry(val queue:Queue) extend
     var aquired = false
     def ref = delivery.storeKey
     def size = delivery.size
-    def flushing = false
+    var flushing = false
     
+    override def isFlushedOrFlushing = {
+      flushing
+    }
+
     override  def asLoaded = this
 
+    def store() = {
+      if( delivery.storeKey == -1 ) {
+        val tx = queue.host.store.createStoreBatch
+        delivery.storeKey = tx.store(delivery.createMessageRecord)
+        tx.enqueue(createQueueEntryRecord)
+        tx.release
+        true
+      } else {
+        false
+      }
+    }
+
+    override def flush():Int = {
+      if( flushing ) {
+        0
+      } else {
+        flushing=true
+        store
+        queue.host.store.flushMessage(ref) {
+          queue.store_flush_source.merge(QueueEntry.this)
+        }
+        size
+      }
+    }
+
     def dispatch():QueueEntry = {
       if( delivery==null ) {
         // can't dispatch untill the delivery is set.

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=961126&r1=961125&r2=961126&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jul  7 04:05:53 2010
@@ -121,33 +121,39 @@ class VirtualHost(val broker: Broker) ex
     val tracker = new LoggingTracker("virtual host startup", dispatchQueue)
     store = StoreFactory.create(config.store)
     if( store!=null ) {
-      val task = tracker.task("store list queue keys")
+      val task = tracker.task("store startup")
       store.start(^{
-        store.listQueues { queueKeys =>
-          for( queueKey <- queueKeys) {
-            val task = tracker.task("store load queue key: "+queueKey)
-            // Use a global queue to so we concurrently restore
-            // the queues.
-            globalQueue {
-              store.getQueueStatus(queueKey) { x =>
-                x match {
-                  case Some(info)=>
-                  store.getQueueEntries(queueKey) { entries=>
-                    dispatchQueue ^{
-                      val dest = DestinationParser.parse(info.record.name, destination_parser_options)
-                      val queue = new Queue(this, dest)
-                      queue.restore(queueKey, entries)
-                      queues.put(dest.getName, queue)
-                      task.run
+        if( config.purgeOnStartup ) {
+          store.purge {
+            task.run
+          }
+        } else {
+          store.listQueues { queueKeys =>
+            for( queueKey <- queueKeys) {
+              val task = tracker.task("store load queue key: "+queueKey)
+              // Use a global queue to so we concurrently restore
+              // the queues.
+              globalQueue {
+                store.getQueueStatus(queueKey) { x =>
+                  x match {
+                    case Some(info)=>
+                    store.getQueueEntries(queueKey) { entries=>
+                      dispatchQueue ^{
+                        val dest = DestinationParser.parse(info.record.name, destination_parser_options)
+                        val queue = new Queue(this, dest)
+                        queue.restore(queueKey, entries)
+                        queues.put(dest.getName, queue)
+                        task.run
+                      }
                     }
+                    case _ =>
+                      task.run
                   }
-                  case _ =>
-                    task.run
                 }
               }
             }
+            task.run
           }
-          task.run
         }
       });
     }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala?rev=961126&r1=961125&r2=961126&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala Wed Jul  7 04:05:53 2010
@@ -35,6 +35,7 @@ import _root_.org.fusesource.hawtbuf._
 import java.io.{PrintStream, FileOutputStream, File, IOException}
 import org.apache.activemq.util.{IOHelper, ProcessSupport}
 import scala.util.matching.Regex
+import org.apache.activemq.apollo.dto.BrokerDTO
 
 object BaseBrokerPerfSupport {
   var PERFORMANCE_SAMPLES = Integer.parseInt(System.getProperty("PERFORMANCE_SAMPLES", "6"))
@@ -457,25 +458,29 @@ abstract class BaseBrokerPerfSupport ext
     }
 
     def stopServices() = {
+      println("waiting for services to stop");
       stopping.set(true);
-      val tracker = new LoggingTracker("test shutdown")
+      var tracker = new LoggingTracker("broker shutdown")
       for (broker <- brokers) {
-        broker.stop(tracker.task("broker"));
+        tracker.stop(broker)
       }
+      tracker.await
+      tracker = new LoggingTracker("producer shutdown")
       for (connection <- producers) {
-        connection.stop(tracker.task(connection.toString));
+        tracker.stop(connection)
       }
+      tracker.await
+      tracker = new LoggingTracker("consumer shutdown")
       for (connection <- consumers) {
-        connection.stop(tracker.task(connection.toString));
+        tracker.stop(connection)
       }
-      println("waiting for services to stop");
       tracker.await
     }
 
     def startBrokers() = {
       val tracker = new LoggingTracker("test broker startup")
       for (broker <- brokers) {
-        broker.start(tracker.task("broker"));
+        tracker.start(broker)
       }
       tracker.await
     }
@@ -484,12 +489,14 @@ abstract class BaseBrokerPerfSupport ext
     def startClients() = {
       var tracker = new LoggingTracker("test consumer startup")
       for (connection <- consumers) {
-        connection.start(tracker.task(connection.toString));
+        tracker.start(connection)
       }
       tracker.await
+      // let the consumers drain the destination for a bit...
+      Thread.sleep(1000)
       tracker = new LoggingTracker("test producer startup")
       for (connection <- producers) {
-        connection.start(tracker.task(connection.toString));
+        tracker.start(connection)
       }
       tracker.await
     }
@@ -555,12 +562,18 @@ abstract class BaseBrokerPerfSupport ext
   def getRemoteWireFormat(): String
 
   def createBroker(name: String, bindURI: String, connectUri: String): Broker = {
-    val broker = new Broker()
-    broker.config = Broker.default
-    val connector = broker.config.connectors.get(0)
+
+    val config = Broker.default
+    val connector = config.connectors.get(0)
     connector.bind = bindURI
     connector.advertise = connectUri
     connector.protocol = getBrokerWireFormat
+
+    val host = config.virtualHosts.get(0)
+    host.purgeOnStartup = true
+
+    val broker = new Broker()
+    broker.config = config
     broker
   }
 
@@ -604,9 +617,12 @@ abstract class RemoteConsumer extends Co
   }
 
   override def onTransportFailure(error: IOException) = {
-    if (!brokerPerfTest.stopping.get()) {
-      System.err.println("Client Async Error:");
-      error.printStackTrace();
+    if (!stopped) {
+      if(brokerPerfTest.stopping.get()) {
+        transport.stop
+      } else {
+        onFailure(error);
+      }
     }
   }
 

Added: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/resources/META-INF/services/org.apache.activemq.apollo/stores
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/resources/META-INF/services/org.apache.activemq.apollo/stores?rev=961126&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/resources/META-INF/services/org.apache.activemq.apollo/stores (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/resources/META-INF/services/org.apache.activemq.apollo/stores Wed Jul  7 04:05:53 2010
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+org.apache.activemq.broker.store.cassandra.CassandraStoreSPI
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala?rev=961126&r1=961125&r2=961126&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala Wed Jul  7 04:05:53 2010
@@ -37,7 +37,7 @@ class CassandraClient() {
   protected var pool: SessionPool = null
 
   def start() = {
-    val params = new PoolParams(10, ExhaustionPolicy.Fail, 500L, 6, 2)
+    val params = new PoolParams(20, ExhaustionPolicy.Fail, 500L, 6, 2)
     pool = new SessionPool(hosts, params, Consistency.One)
   }
 
@@ -98,6 +98,18 @@ class CassandraClient() {
     pb.freeze.toUnframedByteArray
   }
 
+  def purge() = {
+    withSession {
+      session =>
+        session.list(schema.queue_name).map { x =>
+          val qid: Long = x.name
+          session.remove(schema.entries \ qid)
+        }
+        session.remove(schema.queue_name)
+        session.remove(schema.message_data)
+    }
+  }
+
   def addQueue(record: QueueRecord) = {
     withSession {
       session =>

Modified: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala?rev=961126&r1=961125&r2=961126&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala Wed Jul  7 04:05:53 2010
@@ -93,7 +93,7 @@ class CassandraStore extends Store with 
   }
 
   protected def _start(onCompleted: Runnable) = {
-    executor_pool = Executors.newCachedThreadPool
+    executor_pool = Executors.newFixedThreadPool(20)
     client.schema = Schema(config.keyspace)
 
     // TODO: move some of this parsing code into validation too.
@@ -112,12 +112,12 @@ class CassandraStore extends Store with 
   }
 
   protected def _stop(onCompleted: Runnable) = {
-    client.stop
     new Thread() {
       override def run = {
         executor_pool.shutdown
         executor_pool.awaitTermination(1, TimeUnit.DAYS)
         executor_pool = null
+        client.stop
         onCompleted.run
       }
     }.start
@@ -129,6 +129,16 @@ class CassandraStore extends Store with 
   //
   /////////////////////////////////////////////////////////////////////
 
+  /**
+   * Deletes all stored data from the store.
+   */
+  def purge(cb: =>Unit) = {
+    executor_pool ^{
+      client.purge
+      cb
+    }
+  }
+
   def addQueue(record: QueueRecord)(cb: (Option[Long]) => Unit) = {
     val key = next_queue_key.incrementAndGet
     record.key = key
@@ -333,7 +343,12 @@ class CassandraStore extends Store with 
   flush_source.setEventHandler(^{drain_flushes});
   flush_source.resume
 
-  def drain_flushes = {
+  def drain_flushes:Unit = {
+
+    if( !serviceState.isStarted ) {
+      return
+    }
+    
     val txs = flush_source.getData.flatMap{ tx_id =>
       val tx = delayedTransactions.remove(tx_id)
       // Message may be flushed or canceled before the timeout flush event..

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java?rev=961126&r1=961125&r2=961126&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java Wed Jul  7 04:05:53 2010
@@ -40,5 +40,9 @@ public class VirtualHostDTO extends Serv
     @XmlAttribute(name="auto-create-queues")
     public boolean autoCreateQueues = true;
 
-
+    /**
+     * Should queues be purged on startup?
+     */
+    @XmlAttribute(name="purge-on-startup")
+    public boolean purgeOnStartup = false;
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala?rev=961126&r1=961125&r2=961126&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala Wed Jul  7 04:05:53 2010
@@ -115,6 +115,10 @@ class HawtDBStore extends BaseService wi
   //
   /////////////////////////////////////////////////////////////////////
 
+
+  def purge(cb: =>Unit) = {
+  }
+
   def addQueue(record: QueueRecord)(cb: (Option[Long]) => Unit) = {}
 
   def getQueueStatus(id: Long)(cb: (Option[QueueStatus]) => Unit) = {}

Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala?rev=961126&r1=961125&r2=961126&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala Wed Jul  7 04:05:53 2010
@@ -62,7 +62,12 @@ trait Store extends Service {
   def configure(config: StoreDTO, reporter:Reporter):Unit
 
   /**
-   * Stores a queue, calls back with a unquie id for the stored queue.
+   * Deletes all stored data from the store.
+   */
+  def purge(cb: =>Unit):Unit
+
+  /**
+   *  Stores a queue, calls back with a unquie id for the stored queue.
    */
   def addQueue(record:QueueRecord)(cb:(Option[Long])=>Unit):Unit
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=961126&r1=961125&r2=961126&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Wed Jul  7 04:05:53 2010
@@ -157,6 +157,7 @@ public class TcpTransport extends BaseSe
                 disposed = true;
                 dispose();
             }
+            onCompleted.run();
         }
     }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala?rev=961126&r1=961125&r2=961126&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala Wed Jul  7 04:05:53 2010
@@ -108,39 +108,42 @@ trait BaseService extends Service with L
     }
   } |>>: dispatchQueue
 
-  final def stop(onCompleted:Runnable) = ^{
-    def done = {
-      if( onCompleted!=null ) {
-        onCompleted.run
-      }
-    }
-    _serviceState match {
-      case STARTED =>
-        val state = new STOPPING
-        state << onCompleted
-        _serviceState = state
-        try {
-          _stop(^ {
-            _serviceState = STOPPED
-            state.done
-          })
-        }
-        catch {
-          case e:Exception =>
-            error(e, "Stop failed due to: %s", e)
-            _serviceFailure = e
-            _serviceState = FAILED
-            state.done
+  final def stop(onCompleted:Runnable) = {
+    def stop_task = {
+      def done = {
+        if( onCompleted!=null ) {
+          onCompleted.run
         }
-      case state:STOPPING =>
-        state << onCompleted
-      case STOPPED =>
-        done
-      case state =>
-        done
-        error("Stop should not be called from state: %s", state);
+      }
+      _serviceState match {
+        case STARTED =>
+          val state = new STOPPING
+          state << onCompleted
+          _serviceState = state
+          try {
+            _stop(^ {
+              _serviceState = STOPPED
+              state.done
+            })
+          }
+          catch {
+            case e:Exception =>
+              error(e, "Stop failed due to: %s", e)
+              _serviceFailure = e
+              _serviceState = FAILED
+              state.done
+          }
+        case state:STOPPING =>
+          state << onCompleted
+        case STOPPED =>
+          done
+        case state =>
+          done
+          error("Stop should not be called from state: %s", state);
+      }
     }
-  } |>>: dispatchQueue
+    ^{ stop_task } |>>: dispatchQueue
+  }
 
   protected def _start(onCompleted:Runnable)
   protected def _stop(onCompleted:Runnable)

Modified: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.java?rev=961126&r1=961125&r2=961126&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.java Wed Jul  7 04:05:53 2010
@@ -123,7 +123,7 @@ public abstract class BaseService implem
                         }
                     });
                 } else if (_serviceState instanceof STOPPING) {
-                    ((STARTING) _serviceState).add(onCompleted);
+                    ((STOPPING) _serviceState).add(onCompleted);
                 } else if (_serviceState == STOPPED) {
                     if (onCompleted != null) {
                         onCompleted.run();