You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:05:07 UTC

svn commit: r961124 - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ activemq-cassandra/ activemq-cassandra/src/ma...

Author: chirino
Date: Wed Jul  7 04:05:05 2010
New Revision: 961124

URL: http://svn.apache.org/viewvc?rev=961124&view=rev
Log:
- More consistent naming of persistent key feilds
- Defined factory interfaces for the stores
- Added initial bits needed to test persistance in the stomp load client
- bug fixes

Added:
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreSPI.scala
      - copied, changed from r961123, activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/StoreDTO.java
    activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/CassandraStoreDTO.java
      - copied, changed from r961123, activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/MemoryStoreDTO.java
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/StoreFactory.scala
    activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/ClassFinder.scala
Removed:
    activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/MemoryStoreDTO.java
Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/pom.xml
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/proto/data.proto
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreTest.scala
    activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/StoreDTO.java
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/hawtdb-data.proto
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBSession.java
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/RootEntity.java
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBManagerBenchmark.java
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBManagerTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/pom.xml
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
    activemq/sandbox/activemq-apollo-actor/activemq-store/pom.xml
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/MessageRecord.java
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueRecord.java
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=961124&r1=961123&r2=961124&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jul  7 04:05:05 2010
@@ -21,6 +21,8 @@ import _root_.java.lang.{String}
 import _root_.org.fusesource.hawtdispatch._
 import org.fusesource.hawtbuf._
 import org.apache.activemq.broker.store.StoreBatch
+import org.apache.activemq.apollo.store.MessageRecord
+import protocol.ProtocolFactory
 
 /**
  * A producer which sends Delivery objects to a delivery consumer.
@@ -146,7 +148,7 @@ class Delivery extends BaseRetained {
   /**
    * A reference to the stored version of the message.
    */
-  var storeId:Long = -1
+  var storeKey:Long = -1
 
   /**
    * The transaction the delivery is participating in.
@@ -164,8 +166,16 @@ class Delivery extends BaseRetained {
   def set(other:Delivery) = {
     size = other.size
     message = other.message
-    storeId = other.storeId
+    storeKey = other.storeKey
     this
   }
 
+  def createMessageRecord() = {
+    val sm = new MessageRecord
+    sm.protocol = message.protocol
+    sm.value = ProtocolFactory.get(message.protocol).encode(message)
+    sm.size = size
+    sm
+  }
+
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961124&r1=961123&r2=961124&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul  7 04:05:05 2010
@@ -135,7 +135,7 @@ class Queue(val host: VirtualHost, val d
         if( delivery.ack!=null ) {
           delivery.ack(delivery.storeBatch)
         }
-        if (delivery.storeId != -1) {
+        if (delivery.storeKey != -1) {
           delivery.storeBatch.enqueue(entry.createQueueEntryRecord)
           delivery.storeBatch.release
         }
@@ -229,7 +229,7 @@ class Queue(val host: VirtualHost, val d
         // Called from the producer thread before the delivery is
         // processed by the queue's thread.. We don't
         // yet know the order of the delivery in the queue.
-        if (delivery.storeId != -1) {
+        if (delivery.storeKey != -1) {
           // If the message has a store id, then this delivery will
           // need a tx to track the store changes.
           if( delivery.storeBatch == null ) {
@@ -371,19 +371,10 @@ class Queue(val host: VirtualHost, val d
         // Chuck the reset out...
         val loaded = entry.value.asLoaded
         if( loaded!=null ) {
-          var ref = loaded.delivery.storeId
+          var ref = loaded.delivery.storeKey
           if( ref == -1 ) {
             val tx = host.store.createStoreBatch
-
-            val message = loaded.delivery.message
-            val sm = new MessageRecord
-            sm.protocol = message.protocol
-            sm.value = ProtocolFactory.get(message.protocol).encode(message)
-            sm.size = loaded.size
-
-            tx.store(sm)
-            loaded.delivery.storeId = sm.id
-
+            loaded.delivery.storeKey = tx.store(loaded.delivery.createMessageRecord)
             tx.enqueue(entry.createQueueEntryRecord)
             tx.release
           }
@@ -408,7 +399,7 @@ class Queue(val host: VirtualHost, val d
       val delivery = new Delivery()
       delivery.message = ProtocolFactory.get(stored.protocol).decode(stored.value)
       delivery.size = stored.size
-      delivery.storeId = stored.id
+      delivery.storeKey = stored.key
 
       entry.loaded(delivery)
 
@@ -478,7 +469,7 @@ class QueueEntry(val queue:Queue) extend
 
   def stored() = {
     val loaded = value.asLoaded
-    this.value = new Stored(loaded.delivery.storeId, loaded.size)
+    this.value = new Stored(loaded.delivery.storeKey, loaded.size)
     this
   }
 
@@ -620,7 +611,7 @@ class QueueEntry(val queue:Queue) extend
   class Loaded(val delivery: Delivery) extends EntryType {
 
     var aquired = false
-    def ref = delivery.storeId
+    def ref = delivery.storeKey
     def size = delivery.size
     def flushing = false
     

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=961124&r1=961123&r2=961124&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jul  7 04:05:05 2010
@@ -24,13 +24,13 @@ import _root_.scala.collection.JavaConve
 import _root_.scala.reflect.BeanProperty
 import path.PathFilter
 import org.fusesource.hawtbuf.AsciiBuffer
-import org.apache.activemq.apollo.dto.VirtualHostDTO
 import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
 
 import ReporterLevel._
 import org.apache.activemq.broker.store.{Store}
 import org.fusesource.hawtbuf.proto.WireFormat
-import org.apache.activemq.apollo.store.QueueRecord
+import org.apache.activemq.apollo.store.{StoreFactory, QueueRecord}
+import org.apache.activemq.apollo.dto.{CassandraStoreDTO, VirtualHostDTO}
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -51,6 +51,9 @@ object VirtualHost extends Log {
     rc.id = "default"
     rc.enabled = true
     rc.hostNames.add("localhost")
+    val store = new CassandraStoreDTO
+    store.hosts.add("127.0.0.1:9160")
+    rc.store = store
     rc
   }
 
@@ -59,9 +62,13 @@ object VirtualHost extends Log {
    */
   def validate(config: VirtualHostDTO, reporter:Reporter):ReporterLevel = {
      new Reporting(reporter) {
+
       if( config.hostNames.isEmpty ) {
         error("Virtual host must be configured with at least one host name.")
       }
+
+      result |= StoreFactory.validate(config.store, reporter)
+       
     }.result
   }
   
@@ -111,16 +118,19 @@ class VirtualHost(val broker: Broker) ex
 
 
   override protected def _start(onCompleted:Runnable):Unit = {
+    val tracker = new LoggingTracker("virtual host startup", dispatchQueue)
+    store = StoreFactory.create(config.store)
     if( store!=null ) {
-      store.start();
-      store.listQueues { ids =>
-        for( id <- ids) {
-          store.getQueueStatus(id) { x =>
-            x match {
-              case Some(info)=>
-              dispatchQueue ^{
-                val dest = DestinationParser.parse(info.record.name , destination_parser_options)
-                if( dest.getDomain == Domain.QUEUE_DOMAIN ) {
+      val task = tracker.task("store startup")
+
+      store.start(^{
+        store.listQueues { ids =>
+          for( id <- ids) {
+            store.getQueueStatus(id) { x =>
+              x match {
+                case Some(info)=>
+                dispatchQueue ^{
+                  val dest = new SingleDestination(Domain.QUEUE_DOMAIN, info.record.name)
 
                   val queue = new Queue(this, dest, id)
                   queue.first_seq = info.first
@@ -130,19 +140,22 @@ class VirtualHost(val broker: Broker) ex
 
                   queues.put(info.record.name, queue)
                 }
+                case _ =>
               }
-              case _ =>
             }
           }
         }
-      }
+        task.run
+
+      });
     }
 
 
     //Recover transactions:
     transactionManager.virtualHost = this
     transactionManager.loadTransactions();
-    onCompleted.run
+
+    tracker.callback(onCompleted)
   }
 
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala?rev=961124&r1=961123&r2=961124&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala Wed Jul  7 04:05:05 2010
@@ -16,72 +16,16 @@
  */
 package org.apache.activemq.apollo.broker.protocol
 
-import java.util.Properties
-import java.net.{URLClassLoader, URL}
 import org.apache.activemq.transport.DefaultTransportListener
-import java.io.{IOException, File, InputStream}
+import java.io.{IOException}
 import org.apache.activemq.apollo.broker.{Message, BrokerConnection}
 import org.fusesource.hawtbuf.{Buffer, AsciiBuffer}
 import org.apache.activemq.wireformat.WireFormat
+import org.apache.activemq.apollo.util.ClassFinder
 
 
 /**
  * <p>
- * Used to discover classes using the META-INF discovery trick.
- * </p>
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-case class ClassFinder[T](path:String, loaders:Seq[ClassLoader]=Thread.currentThread.getContextClassLoader::Nil) {
-
-  def find(): List[Class[T]] = {
-    var classes = List[Class[T]]()
-    loaders.foreach { loader=>
-
-      val resources = loader.getResources(path)
-      var classNames: List[String] = Nil
-      while(resources.hasMoreElements) {
-        val url = resources.nextElement;
-        val p = loadProperties(url.openStream)
-        val enum = p.keys
-        while (enum.hasMoreElements) {
-          classNames = classNames ::: enum.nextElement.asInstanceOf[String] :: Nil
-        }
-      }
-      classNames = classNames.removeDuplicates
-
-      classes :::= classNames.map { name=>
-        loader.loadClass(name).asInstanceOf[Class[T]]
-      }
-
-    }
-
-    return classes.removeDuplicates
-  }
-
-  private def loadProperties(is:InputStream):Properties = {
-    if( is==null ) {
-      return null;
-    }
-    try {
-      val p = new Properties()
-      p.load(is);
-      return p
-    } catch {
-      case e:Exception =>
-      return null
-    } finally {
-      try {
-        is.close()
-      } catch {
-        case _ =>
-      }
-    }
-  }
-}
-
-/**
- * <p>
  * </p>
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>

Modified: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/pom.xml?rev=961124&r1=961123&r2=961124&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/pom.xml Wed Jul  7 04:05:05 2010
@@ -96,6 +96,14 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-util</artifactId>
+      <version>6.0-SNAPSHOT</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>

Modified: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/proto/data.proto
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/proto/data.proto?rev=961124&r1=961123&r2=961124&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/proto/data.proto (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/proto/data.proto Wed Jul  7 04:05:05 2010
@@ -19,12 +19,11 @@ package org.apache.activemq.broker.store
 option java_multiple_files = true;
 
 message PBMessageRecord {
-  required bytes messageId = 2 [java_override_type = "AsciiBuffer"];
-  required bytes protocol = 3 [java_override_type = "AsciiBuffer"];
-  required int32 size = 4;
-  optional bytes value = 5;
-  optional int64 stream = 6;
-  optional int64 expiration = 7;
+  required bytes protocol = 1 [java_override_type = "AsciiBuffer"];
+  required int32 size = 2;
+  optional bytes value = 3;
+  optional int64 stream = 4;
+  optional int64 expiration = 5;
 }
 
 message PBQueueEntryRecord {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala?rev=961124&r1=961123&r2=961124&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala Wed Jul  7 04:05:05 2010
@@ -58,7 +58,6 @@ class CassandraClient() {
     import PBMessageRecord._
     val pb = PBMessageRecordBuffer.parseUnframed(v)
     val rc = new MessageRecord
-    rc.messageId = pb.getMessageId
     rc.protocol = pb.getProtocol
     rc.size = pb.getSize
     rc.value = pb.getValue
@@ -70,7 +69,6 @@ class CassandraClient() {
   implicit def encodeMessageRecord(v: MessageRecord): Array[Byte] = {
     import PBMessageRecord._
     val pb = new PBMessageRecordBean
-    pb.setMessageId(v.messageId)
     pb.setProtocol(v.protocol)
     pb.setSize(v.size)
     pb.setValue(v.value)
@@ -103,7 +101,7 @@ class CassandraClient() {
   def addQueue(record: QueueRecord) = {
     withSession {
       session =>
-        session.insert(schema.queue_name \ (record.id, record.name))
+        session.insert(schema.queue_name \ (record.key, record.name))
     }
   }
 
@@ -126,7 +124,7 @@ class CassandraClient() {
 
             val rc = new QueueStatus
             rc.record = new QueueRecord
-            rc.record.id = id
+            rc.record.key = id
             rc.record.name = new AsciiBuffer(x)
 
             rc.count = session.count( schema.entries \ id )
@@ -180,7 +178,7 @@ class CassandraClient() {
         session.get(schema.message_data \ id) match {
           case Some(x) =>
             val rc: MessageRecord = x.value
-            rc.id = id
+            rc.key = id
             Some(rc)
           case None =>
             None

Modified: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala?rev=961124&r1=961123&r2=961124&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala Wed Jul  7 04:05:05 2010
@@ -18,28 +18,52 @@ package org.apache.activemq.broker.store
 
 import org.apache.activemq.broker.store.{StoreBatch, Store}
 import org.fusesource.hawtdispatch.BaseRetained
-import org.apache.activemq.apollo.broker.{Logging, Log, BaseService}
 import com.shorrockin.cascal.session._
-import org.fusesource.hawtdispatch.ScalaDispatch._
 import java.util.concurrent.atomic.AtomicLong
 import collection.mutable.ListBuffer
-import org.fusesource.hawtbuf.{Buffer, AsciiBuffer}
-import com.shorrockin.cascal.model.Key
-import org.apache.log.output.db.ColumnType
-import java.util.{HashSet, HashMap}
+import java.util.HashMap
 import java.util.concurrent.{TimeUnit, Executors, ExecutorService}
 import org.apache.activemq.apollo.util.IntCounter
-import com.shorrockin.cascal.utils.Conversions._
 import org.apache.activemq.apollo.store.{QueueEntryRecord, MessageRecord, QueueStatus, QueueRecord}
-import collection.Seq
+import org.apache.activemq.apollo.broker.{Logging, Log, BaseService}
+import org.apache.activemq.apollo.dto.{CassandraStoreDTO, StoreDTO}
+import collection.{JavaConversions, Seq}
+import org.apache.activemq.apollo.broker.{Reporting, ReporterLevel, Reporter}
+import com.shorrockin.cascal.utils.Conversions._
+import org.fusesource.hawtdispatch.ScalaDispatch._
+import ReporterLevel._
+
+object CassandraStore extends Log {
+  val DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
+
+  /**
+   * Creates a default a configuration object.
+   */
+  def default() = {
+    val rc = new CassandraStoreDTO
+    rc.hosts.add("localhost:9160")
+    rc
+  }
+
+  /**
+   * Validates a configuration object.
+   */
+  def validate(config: CassandraStoreDTO, reporter:Reporter):ReporterLevel = {
+    new Reporting(reporter) {
+      if( config.hosts.isEmpty ) {
+        error("At least one cassandra host must be configured.")
+      }
+    }.result
+  }
+}
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 class CassandraStore extends Store with BaseService with Logging {
 
-  import CassandraStoreHelper._
-  override protected def log = CassandraStoreHelper
+  import CassandraStore._
+  override protected def log = CassandraStore
 
   /////////////////////////////////////////////////////////////////////
   //
@@ -53,10 +77,36 @@ class CassandraStore extends Store with 
 
   val client = new CassandraClient()
   protected var executor_pool:ExecutorService = _
+  var config:CassandraStoreDTO = default
+
+  def configure(config: StoreDTO, reporter: Reporter) = configure(config.asInstanceOf[CassandraStoreDTO], reporter)
+
+  def configure(config: CassandraStoreDTO, reporter: Reporter) = {
+    if ( CassandraStore.validate(config, reporter) < ERROR ) {
+      if( serviceState.isStarted ) {
+        // TODO: apply changes while he broker is running.
+        reporter.report(WARN, "Updating cassandra store configuration at runtime is not yet supported.  You must restart the broker for the change to take effect.")
+      } else {
+        this.config = config
+      }
+    }
+  }
 
   protected def _start(onCompleted: Runnable) = {
     executor_pool = Executors.newCachedThreadPool
-    client.schema = Schema("ActiveMQ")
+    client.schema = Schema(config.keyspace)
+
+    // TODO: move some of this parsing code into validation too.
+    val HostPort = """([^:]+)(:(\d+))?""".r
+    import JavaConversions._
+    client.hosts = config.hosts.flatMap { x=>
+      x match {
+        case HostPort(host,_,port)=>
+          Some(Host(host, port.toInt, 3000))
+        case _=> None
+      }
+    }.toList
+
     client.start
     onCompleted.run
   }
@@ -169,14 +219,15 @@ class CassandraStore extends Store with 
       onPerformed
     }
 
-    def store(record: MessageRecord) = {
-      record.id = next_msg_key.incrementAndGet
+    def store(record: MessageRecord):Long = {
+      record.key = next_msg_key.incrementAndGet
       val action = new MessageAction
-      action.msg = record.id
+      action.msg = record.key
       action.store = record
       this.synchronized {
-        actions += record.id -> action
+        actions += record.key -> action
       }
+      record.key
     }
 
     def action(msg:Long) = {
@@ -229,7 +280,8 @@ class CassandraStore extends Store with 
       val tx_id = next_tx_id.incrementAndGet
       tx.txid = tx_id
       delayedTransactions.put(tx_id, tx)
-      dispatchQueue.dispatchAfter(30, TimeUnit.SECONDS, ^{flush(tx_id)})
+      dispatchQueue.dispatchAsync(^{flush(tx_id)})
+      dispatchQueue.dispatchAfter(50, TimeUnit.MILLISECONDS, ^{flush(tx_id)})
 
       tx.actions.foreach { case (msg, action) =>
         if( action.store!=null ) {
@@ -257,11 +309,11 @@ class CassandraStore extends Store with 
 
             // Cancel the action if it's now empty
             if( prevAction.isEmpty ) {
-              action.cancel()
+              prevAction.cancel()
             }
 
             // since we canceled out the previous enqueue.. now cancel out the action
-            action.dequeues = action.dequeues.filterNot( x=> key(x) == currentDequeue)
+            action.dequeues = action.dequeues.filterNot( _ == currentDequeue)
             if( action.isEmpty ) {
               action.cancel()
             }
@@ -292,7 +344,8 @@ class CassandraStore extends Store with 
             pendingStores.remove(msg)
           }
           action.enqueues.foreach { queueEntry=>
-            pendingEnqueues.remove(key(queueEntry), action)
+            val k = key(queueEntry)
+            pendingEnqueues.remove(k)
           }
         }
 
@@ -317,27 +370,3 @@ class CassandraStore extends Store with 
   }
 
 }
-
-object CassandraStoreHelper extends Log {
-  val DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
-
-//  /**
-//   * Creates a default a configuration object.
-//   */
-//  def default() = {
-//    val rc = new HawtDBStoreDTO
-//    rc.directory = new File("activemq-data")
-//    rc
-//  }
-//
-//  /**
-//   * Validates a configuration object.
-//   */
-//  def validate(config: HawtDBStoreDTO, reporter:Reporter):ReporterLevel = {
-//     new Reporting(reporter) {
-//      if( config.directory == null ) {
-//        error("hawtdb store must be configured with a directroy.")
-//      }
-//    }.result
-//  }
-}

Copied: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreSPI.scala (from r961123, activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/StoreDTO.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreSPI.scala?p2=activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreSPI.scala&p1=activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/StoreDTO.java&r1=961123&r2=961124&rev=961124&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/StoreDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreSPI.scala Wed Jul  7 04:05:05 2010
@@ -14,20 +14,36 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.apollo.dto;
+package org.apache.activemq.broker.store.cassandra
 
-import org.codehaus.jackson.annotate.JsonTypeInfo;
-
-import javax.xml.bind.annotation.XmlRootElement;
-import javax.xml.bind.annotation.XmlSeeAlso;
-import javax.xml.bind.annotation.XmlType;
+import org.apache.activemq.apollo.store.StoreFactory
+import org.apache.activemq.apollo.dto.{CassandraStoreDTO, StoreDTO}
+import org.apache.activemq.apollo.broker.{Reporting, ReporterLevel, Reporter}
+import ReporterLevel._
 
 /**
+ * <p>
+ * Hook to use a CassandraStore when a CassandraStoreDTO is
+ * used in a broker configuration.
+ * </p>
+ *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-@XmlType(name = "store-type")
-@XmlSeeAlso({MemoryStoreDTO.class, HawtDBStoreDTO.class})
-@JsonTypeInfo(use=JsonTypeInfo.Id.CLASS, include=JsonTypeInfo.As.PROPERTY, property="@class")
-public abstract class StoreDTO {
+class CassandraStoreSPI extends StoreFactory.SPI {
+
+  def create(config: StoreDTO) = {
+    if( config.isInstanceOf[CassandraStoreDTO]) {
+      new CassandraStore
+    } else {
+      null
+    }
+  }
 
-}
+   def validate(config: StoreDTO, reporter:Reporter):ReporterLevel = {
+     if( config.isInstanceOf[CassandraStoreDTO]) {
+       CassandraStore.validate(config.asInstanceOf[CassandraStoreDTO], reporter)
+     } else {
+       null
+     }
+   }
+}
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreTest.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreTest.scala?rev=961124&r1=961123&r2=961124&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreTest.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreTest.scala Wed Jul  7 04:05:05 2010
@@ -73,21 +73,20 @@ class CassandraStoreTest extends FunSuit
 
   def addMessage() {
     var queueA = new QueueRecord
-    queueA.id =1
+    queueA.key =1
     queueA.name = ascii("queue:1")
 
     val rc:Option[Long] = CB( cb=> store.addQueue(queueA)(cb) )
-    queueA.id = rc.get
+    queueA.key = rc.get
 
-    val expected:Seq[Long] = List(queueA.id)
+    val expected:Seq[Long] = List(queueA.key)
     expectCB(expected) { cb=>
       store.listQueues(cb)
     }
 
     var tx = store.createStoreBatch
     var message = new MessageRecord
-    message.id = 35
-    message.messageId = ascii("msg-35")
+    message.key = 35
     message.protocol = ascii("test-protocol")
     message.value = ascii("test content").buffer
     message.size = message.value.length
@@ -97,8 +96,8 @@ class CassandraStoreTest extends FunSuit
     val disposed = new CountDownLatch(1)
 
     var queueEntry = new QueueEntryRecord
-    queueEntry.queueKey = queueA.id
-    queueEntry.messageKey = message.id
+    queueEntry.queueKey = queueA.key
+    queueEntry.messageKey = message.key
     queueEntry.queueSeq = 1
 
     tx.enqueue(queueEntry)
@@ -111,7 +110,7 @@ class CassandraStoreTest extends FunSuit
     }
 
     var flushed = new CountDownLatch(1)
-    store.flushMessage(message.id) {
+    store.flushMessage(message.key) {
       flushed.countDown
     }
 
@@ -127,27 +126,26 @@ class CassandraStoreTest extends FunSuit
     // add another message to the queue..
     tx = store.createStoreBatch
     message = new MessageRecord
-    message.id = 36
-    message.messageId = ascii("msg-35")
+    message.key = 36
     message.protocol = ascii("test-protocol")
     message.value = ascii("test content").buffer
     message.size = message.value.length
     tx.store(message)
 
     queueEntry = new QueueEntryRecord
-    queueEntry.queueKey = queueA.id
-    queueEntry.messageKey = message.id
+    queueEntry.queueKey = queueA.key
+    queueEntry.messageKey = message.key
     queueEntry.queueSeq = 2
 
     tx.enqueue(queueEntry)
 
     flushed = new CountDownLatch(1)
-    store.flushMessage(message.id) {
+    store.flushMessage(message.key) {
       flushed.countDown
     }
     flushed.await
 
-    val qso:Option[QueueStatus] = CB( cb=> store.getQueueStatus(queueA.id)(cb) )
+    val qso:Option[QueueStatus] = CB( cb=> store.getQueueStatus(queueA.key)(cb) )
     expect(ascii("queue:1")) {
       qso.get.record.name
     }

Copied: activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/CassandraStoreDTO.java (from r961123, activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/MemoryStoreDTO.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/CassandraStoreDTO.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/CassandraStoreDTO.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/MemoryStoreDTO.java&r1=961123&r2=961124&rev=961124&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/MemoryStoreDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/CassandraStoreDTO.java Wed Jul  7 04:05:05 2010
@@ -16,16 +16,20 @@
  */
 package org.apache.activemq.apollo.dto;
 
-import javax.xml.bind.annotation.XmlAccessType;
-import javax.xml.bind.annotation.XmlAccessorType;
-import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.*;
+import java.util.ArrayList;
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-@XmlRootElement(name="memory-store")
+@XmlRootElement(name="cassandra-store")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class MemoryStoreDTO extends StoreDTO {
+public class CassandraStoreDTO extends StoreDTO {
 
+    @XmlAttribute(name="keyspace")
+    public String keyspace="ActiveMQ";
+
+    @XmlElement(name="host", required=true)
+    public ArrayList<String> hosts = new ArrayList<String>();    
 
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/StoreDTO.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/StoreDTO.java?rev=961124&r1=961123&r2=961124&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/StoreDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/StoreDTO.java Wed Jul  7 04:05:05 2010
@@ -18,7 +18,6 @@ package org.apache.activemq.apollo.dto;
 
 import org.codehaus.jackson.annotate.JsonTypeInfo;
 
-import javax.xml.bind.annotation.XmlRootElement;
 import javax.xml.bind.annotation.XmlSeeAlso;
 import javax.xml.bind.annotation.XmlType;
 
@@ -26,7 +25,7 @@ import javax.xml.bind.annotation.XmlType
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 @XmlType(name = "store-type")
-@XmlSeeAlso({MemoryStoreDTO.class, HawtDBStoreDTO.class})
+@XmlSeeAlso({CassandraStoreDTO.class, HawtDBStoreDTO.class})
 @JsonTypeInfo(use=JsonTypeInfo.Id.CLASS, include=JsonTypeInfo.As.PROPERTY, property="@class")
 public abstract class StoreDTO {
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/hawtdb-data.proto
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/hawtdb-data.proto?rev=961124&r1=961123&r2=961124&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/hawtdb-data.proto (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/hawtdb-data.proto Wed Jul  7 04:05:05 2010
@@ -55,11 +55,10 @@ message Trace {
 
 message MessageAdd {
   optional int64 messageKey=1;
-  optional bytes messageId = 2 [java_override_type = "AsciiBuffer"];
-  optional bytes protocol = 3 [java_override_type = "AsciiBuffer"];
-  optional bytes value = 4;
-  optional int64 streamKey=5;
-  optional int32 messageSize=6;
+  optional bytes protocol = 2 [java_override_type = "AsciiBuffer"];
+  optional bytes value = 3;
+  optional int64 streamKey=4;
+  optional int32 messageSize=5;
 }  
 
 ///////////////////////////////////////////////////////////////

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala?rev=961124&r1=961123&r2=961124&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala Wed Jul  7 04:05:05 2010
@@ -26,12 +26,12 @@ import java.util.HashSet
 import org.fusesource.hawtdb.api.{Transaction, TxPageFileFactory}
 import java.util.concurrent.atomic.AtomicLong
 import java.util.concurrent.{Executors, TimeUnit}
-import org.apache.activemq.apollo.dto.HawtDBStoreDTO
 import org.apache.activemq.apollo.broker._
 import ReporterLevel._
 import store.HawtDBManager
 import org.apache.activemq.broker.store.{Store, StoreBatch}
 import org.apache.activemq.apollo.store.{QueueEntryRecord, QueueStatus, MessageRecord, QueueRecord}
+import org.apache.activemq.apollo.dto.{StoreDTO, HawtDBStoreDTO}
 
 object HawtDBStore extends Log {
   val DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
@@ -72,9 +72,15 @@ class HawtDBStore extends BaseService wi
   var config: HawtDBStoreDTO  = default
   var manager:HawtDBManager = null
 
+
+
   /**
    * Validates and then applies the configuration.
    */
+  def configure(config: StoreDTO, reporter: Reporter) = {
+    //TODO:
+  }
+
   def getQueueEntries(id: Long)(cb: (Seq[QueueEntryRecord]) => Unit) = {}
 
   def configure(config: HawtDBStoreDTO, reporter:Reporter) = ^{
@@ -129,8 +135,8 @@ class HawtDBStore extends BaseService wi
   /////////////////////////////////////////////////////////////////////
   class HawtDBStoreBatch extends BaseRetained with StoreBatch {
 
-    def store(delivery: MessageRecord) = {
-
+    def store(delivery: MessageRecord):Long = {
+      -1L
     }
 
     def dequeue(entry: QueueEntryRecord) = {}

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBSession.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBSession.java?rev=961124&r1=961123&r2=961124&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBSession.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBSession.java Wed Jul  7 04:05:05 2010
@@ -143,12 +143,11 @@ class HawtDBSession {
     // /////////////////////////////////////////////////////////////
 
     public void messageAdd(MessageRecord message) {
-        if (message.id < 0) {
+        if (message.key < 0) {
             throw new IllegalArgumentException("Key not set");
         }
         Data.MessageAdd.MessageAddBean bean = new Data.MessageAdd.MessageAddBean();
-        bean.setMessageKey(message.id);
-        bean.setMessageId(message.messageId);
+        bean.setMessageKey(message.key);
         bean.setProtocol(message.protocol);
         bean.setMessageSize(message.size);
         Buffer buffer = message.value;
@@ -172,8 +171,7 @@ class HawtDBSession {
         try {
             Data.MessageAdd bean = (Data.MessageAdd) store.load(location);
             MessageRecord rc = new MessageRecord();
-            rc.id = bean.getMessageKey();
-            rc.messageId = bean.getMessageId();
+            rc.key = bean.getMessageKey();
             rc.protocol = bean.getProtocol();
             rc.size = bean.getMessageSize();
             if (bean.hasValue()) {
@@ -204,7 +202,7 @@ class HawtDBSession {
     }
 
     public void queueRemove(QueueRecord record) {
-        addUpdate(new Data.QueueRemove.QueueRemoveBean().setKey(record.id));
+        addUpdate(new Data.QueueRemove.QueueRemoveBean().setKey(record.key));
     }
 
     public Iterator<QueueStatus> queueListByType(AsciiBuffer type, QueueRecord firstQueue, int max) {
@@ -227,7 +225,7 @@ class HawtDBSession {
 
     public void queueAddMessage(QueueRecord queue, QueueEntryRecord entryRecord) throws KeyNotFoundException {
         Data.QueueAddMessage.QueueAddMessageBean bean = new Data.QueueAddMessage.QueueAddMessageBean();
-        bean.setQueueKey(queue.id);
+        bean.setQueueKey(queue.key);
         bean.setQueueKey(entryRecord.queueKey);
         bean.setMessageKey(entryRecord.messageKey);
         bean.setMessageSize(entryRecord.size);
@@ -246,7 +244,7 @@ class HawtDBSession {
 
     public Iterator<QueueEntryRecord> queueListMessagesQueue(QueueRecord queue, Long firstQueueKey, Long maxQueueKey, int max) throws KeyNotFoundException {
         storeAtomic();
-        DestinationEntity destination = store.rootEntity.getDestination(queue.id);
+        DestinationEntity destination = store.rootEntity.getDestination(queue.key);
         if (destination == null) {
             throw new KeyNotFoundException("queue key: " + queue);
         }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/RootEntity.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/RootEntity.java?rev=961124&r1=961123&r2=961124&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/RootEntity.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/RootEntity.java Wed Jul  7 04:05:05 2010
@@ -374,11 +374,11 @@ public class RootEntity {
     // Queue Methods.
     // /////////////////////////////////////////////////////////////////
     public void queueAdd(Transaction tx, QueueRecord queue) throws IOException {
-        if (data.destinationIndex.get(queue.id) == null) {
+        if (data.destinationIndex.get(queue.key) == null) {
             DestinationEntity rc = new DestinationEntity();
             rc.setQueueDescriptor(queue);
             rc.allocate(tx);
-            data.destinationIndex.put(queue.id, rc);
+            data.destinationIndex.put(queue.key, rc);
         }
     }
 
@@ -405,7 +405,7 @@ public class RootEntity {
         LinkedList<org.apache.activemq.apollo.store.QueueStatus> results = new LinkedList<org.apache.activemq.apollo.store.QueueStatus>();
 
         final Iterator<Entry<Long, DestinationEntity>> i;
-        Long x = firstQueue==null? null : (Long)firstQueue.id;
+        Long x = firstQueue==null? null : (Long)firstQueue.key;
         i = data.destinationIndex.iterator(x);
         while (i.hasNext()) {
             Entry<Long, DestinationEntity> entry = i.next();

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBManagerBenchmark.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBManagerBenchmark.java?rev=961124&r1=961123&r2=961124&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBManagerBenchmark.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBManagerBenchmark.java Wed Jul  7 04:05:05 2010
@@ -235,8 +235,7 @@ public class HawtDBManagerBenchmark exte
                     enqueuePermits.acquire();
 
                     final MessageRecord messageRecord = new MessageRecord();
-                    messageRecord.id = store.allocateStoreTracking();
-                    messageRecord.messageId = new AsciiBuffer("" + i);
+                    messageRecord.key = store.allocateStoreTracking();
                     messageRecord.protocol = new AsciiBuffer("encoding");
                     messageRecord.value = buffer;
                     messageRecord.size = buffer.getLength();
@@ -252,7 +251,7 @@ public class HawtDBManagerBenchmark exte
                         public void run(HawtDBSession session) throws Exception {
                             session.messageAdd(messageRecord);
                             QueueEntryRecord queueEntryRecord = new QueueEntryRecord();
-                            queueEntryRecord.messageKey = messageRecord.id;
+                            queueEntryRecord.messageKey = messageRecord.key;
                             queueEntryRecord.queueKey = queueKey.incrementAndGet();
                             queueEntryRecord.size = messageRecord.size;
                             session.queueAddMessage(queueId, queueEntryRecord);

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBManagerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBManagerTest.java?rev=961124&r1=961123&r2=961124&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBManagerTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBManagerTest.java Wed Jul  7 04:05:05 2010
@@ -66,8 +66,7 @@ public class HawtDBManagerTest extends T
         final MessageRecord expected = new MessageRecord();
         expected.value = new AsciiBuffer("buffer").buffer();
         expected.protocol = new AsciiBuffer("encoding");
-        expected.messageId = new AsciiBuffer("1000");
-        expected.id = store.allocateStoreTracking();
+        expected.key = store.allocateStoreTracking();
         expected.size = expected.value.getLength();
 
         store.execute(new VoidCallback<Exception>() {
@@ -79,7 +78,7 @@ public class HawtDBManagerTest extends T
         store.execute(new VoidCallback<Exception>() {
             @Override
             public void run(HawtDBSession session) throws Exception {
-                MessageRecord actual = session.messageGetRecord(expected.id);
+                MessageRecord actual = session.messageGetRecord(expected.key);
                 assertEquals(expected, actual);
             }
         }, null);
@@ -119,12 +118,11 @@ public class HawtDBManagerTest extends T
         final MessageRecord message = new MessageRecord();
         message.value = new AsciiBuffer("buffer").buffer();
         message.protocol = new AsciiBuffer("encoding");
-        message.messageId = new AsciiBuffer("1000");
-        message.id = store.allocateStoreTracking();
+        message.key = store.allocateStoreTracking();
         message.size = message.value.getLength();
 
         final QueueEntryRecord qEntryRecord = new QueueEntryRecord();
-        qEntryRecord.messageKey = message.id;
+        qEntryRecord.messageKey = message.key;
         qEntryRecord.queueKey = 1L;
         qEntryRecord.size = message.size;
 
@@ -377,7 +375,7 @@ public class HawtDBManagerTest extends T
                 Assert.assertTrue(qRecords.hasNext());
                 QueueEntryRecord qr = qRecords.next();
                 Assert.assertEquals(qEntryRecord.queueKey, qr.queueKey);
-                Assert.assertEquals(qEntryRecord.messageKey, message.id);
+                Assert.assertEquals(qEntryRecord.messageKey, message.key);
                 MessageRecord record = session.messageGetRecord(qr.messageKey);
                 assertEquals(record, message);
             }
@@ -416,7 +414,6 @@ public class HawtDBManagerTest extends T
     static void assertEquals(MessageRecord expected, MessageRecord actual) {
         Assert.assertEquals(expected.value, actual.value);
         Assert.assertEquals(expected.protocol, actual.protocol);
-        Assert.assertEquals(expected.messageId, actual.messageId);
         Assert.assertEquals(expected.stream, actual.stream);
         Assert.assertEquals(expected.size, actual.size);
     }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/pom.xml?rev=961124&r1=961123&r2=961124&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/pom.xml Wed Jul  7 04:05:05 2010
@@ -43,6 +43,11 @@
       <artifactId>activemq-tcp</artifactId>
       <version>6.0-SNAPSHOT</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-cassandra</artifactId>
+      <version>6.0-SNAPSHOT</version>
+    </dependency>
 
     <!-- Scala Support -->
     <dependency>

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961124&r1=961123&r2=961124&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul  7 04:05:05 2010
@@ -29,6 +29,7 @@ import Stomp._
 import BufferConversions._
 import StompFrameConstants._
 import java.io.IOException
+import org.apache.activemq.broker.store.StoreBatch
 
 
 object StompConstants {
@@ -263,6 +264,7 @@ class StompProtocolHandler extends Proto
   }
 
   def send_via_route(route:DeliveryProducerRoute, frame:StompFrame) = {
+    var storeBatch:StoreBatch=null
     if( !route.targets.isEmpty ) {
 
       // We may need to add some headers..
@@ -277,13 +279,14 @@ class StompProtocolHandler extends Proto
       val delivery = new Delivery
       delivery.message = message
       delivery.size = message.frame.size
-      if( message.persistent ) {
-        // TODO:
-//        val content = ascii("todo")
-//        delivery.ref = host.database.createMessageRecord(message.id, content, PROTOCOL)
+
+      if( message.persistent && host.store!=null ) {
+        storeBatch = host.store.createStoreBatch
+        delivery.storeBatch = storeBatch
+        delivery.storeKey = delivery.storeBatch.store(delivery.createMessageRecord)
       }
 
-      // routes can allways accept at least 1 delivery...
+      // routes can always accept at least 1 delivery...
       assert( !route.full )
       route.offer(delivery)
       if( route.full ) {
@@ -291,9 +294,29 @@ class StompProtocolHandler extends Proto
         // until it's not full anymore.
         connection.transport.suspendRead
       }
+
     } else {
       // info("Dropping message.  No consumers interested in message.")
     }
+
+    // User might be asking for ack that we have prcoessed the message..
+    val receipt = frame.header(Stomp.Headers.RECEIPT_REQUESTED)
+    if( receipt!=null ) {
+      if( storeBatch==null ) {
+        // message was not persistent we can ack back right away..
+        connection_sink.offer(StompFrame(Responses.RECEIPT, List((Stomp.Headers.Response.RECEIPT_ID, receipt))))
+      } else {
+        // else lets ack back once the persistent operations are processed.
+        storeBatch.setDisposer(^{
+          connection_sink.offer(StompFrame(Responses.RECEIPT, List((Stomp.Headers.Response.RECEIPT_ID, receipt))))
+        })
+      }
+    }
+
+    if( storeBatch!=null ) {
+      // We can now release the batch as we are done using it..
+      storeBatch.release
+    }
   }
 
   def on_stomp_subscribe(headers:HeaderMap) = {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala?rev=961124&r1=961123&r2=961124&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala Wed Jul  7 04:05:05 2010
@@ -45,6 +45,8 @@ object StompLoadClient {
   var bufferSize = 64*1204
   var messageSize = 1024;
   var useContentLength=true
+  var persistent = false;
+  var syncProducer = false;
 
   var destinationType = "topic";
   var destinationCount = 1;
@@ -131,6 +133,8 @@ object StompLoadClient {
     "destinationType  = "+destinationType+"\n"+
     "destinationCount = "+destinationCount+"\n" +
     "messageSize      = "+messageSize+"\n"+
+    "persistent       = "+persistent+"\n"+
+    "syncProducer     = "+syncProducer+"\n"+
     "producerSleep    = "+producerSleep+"\n"+
     "consumerSleep    = "+consumerSleep+"\n"+
     "bufferSize       = "+bufferSize+"\n"+
@@ -254,6 +258,8 @@ object StompLoadClient {
     var client:StompClient=null
     val content = ("SEND\n" +
               "destination:"+destination(id)+"\n"+
+               { if(persistent) "persistent:true\n" else "" } +
+               { if(syncProducer) "receipt:xxx\n" else "" } +
                { if(useContentLength) "content-length:"+messageSize+"\n" else "" } +
               "\n"+message(name)).getBytes("UTF-8")
 
@@ -264,6 +270,11 @@ object StompLoadClient {
           var i =0;
           while (!done.get) {
             client.send(content)
+            if( syncProducer ) {
+              // waits for the reply..
+              client.flush
+              client.skip
+            }
             producerCounter.incrementAndGet();
             if(producerSleep > 0) {
               client.flush

Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/pom.xml?rev=961124&r1=961123&r2=961124&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/pom.xml Wed Jul  7 04:05:05 2010
@@ -35,6 +35,11 @@
     
     <dependency>
       <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-dto</artifactId>
+      <version>6.0-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
       <artifactId>activemq-util</artifactId>
       <version>6.0-SNAPSHOT</version>
     </dependency>

Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/MessageRecord.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/MessageRecord.java?rev=961124&r1=961123&r2=961124&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/MessageRecord.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/MessageRecord.java Wed Jul  7 04:05:05 2010
@@ -24,8 +24,7 @@ import org.fusesource.hawtbuf.Buffer;
  */
 public class MessageRecord {
 
-    public long id = -1;
-    public AsciiBuffer messageId;
+    public long key = -1;
     public AsciiBuffer protocol;
     public int size;
     public Buffer value;

Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueRecord.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueRecord.java?rev=961124&r1=961123&r2=961124&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueRecord.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueRecord.java Wed Jul  7 04:05:05 2010
@@ -24,7 +24,7 @@ import org.fusesource.hawtbuf.AsciiBuffe
  */
 public class QueueRecord {
 
-    public long id = -1;
+    public long key = -1;
     public AsciiBuffer name;
     public AsciiBuffer queueType;
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala?rev=961124&r1=961123&r2=961124&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala Wed Jul  7 04:05:05 2010
@@ -21,6 +21,8 @@ import org.fusesource.hawtbuf._
 import org.apache.activemq.Service
 import org.fusesource.hawtdispatch.{Retained}
 import org.apache.activemq.apollo.store._
+import org.apache.activemq.apollo.broker.Reporter
+import org.apache.activemq.apollo.dto.StoreDTO
 
 /**
  * A StoreTransaction is used to perform persistent
@@ -38,7 +40,7 @@ trait StoreBatch extends Retained {
    * Assigns the delivery a store id if it did not already
    * have one assigned.
    */
-  def store(delivery:MessageRecord)
+  def store(delivery:MessageRecord):Long
 
   /**
    * Adds a delivery to a specified queue at a the specified position in the queue.
@@ -57,6 +59,7 @@ trait StoreBatch extends Retained {
  */
 trait Store extends Service {
 
+  def configure(config: StoreDTO, reporter:Reporter):Unit
 
   /**
    * Stores a queue, calls back with a unquie id for the stored queue.

Added: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/StoreFactory.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/StoreFactory.scala?rev=961124&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/StoreFactory.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/StoreFactory.scala Wed Jul  7 04:05:05 2010
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.store
+
+import org.apache.activemq.apollo.util.ClassFinder
+import org.apache.activemq.broker.store.Store
+import org.apache.activemq.apollo.dto.StoreDTO
+import org.apache.activemq.apollo.broker.{ReporterLevel, Reporter}
+import ReporterLevel._
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class StoreFactory
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object StoreFactory {
+
+  val finder =  ClassFinder[SPI]("META-INF/services/org.apache.activemq.apollo/stores")
+  var storesSPI = List[SPI]()
+
+  trait SPI {
+    def create(config:StoreDTO):Store
+    def validate(config: StoreDTO, reporter:Reporter):ReporterLevel
+  }
+
+  finder.find.foreach{ clazz =>
+    try {
+      val SPI = clazz.newInstance.asInstanceOf[SPI]
+      storesSPI ::= SPI
+    } catch {
+      case e:Throwable =>
+        e.printStackTrace
+    }
+  }
+
+  def create(config:StoreDTO):Store = {
+    if( config == null ) {
+      return null
+    }
+    storesSPI.foreach { spi=>
+      val rc = spi.create(config)
+      if( rc!=null ) {
+        return rc
+      }
+    }
+    throw new IllegalArgumentException("Uknonwn store configuration type: "+config.getClass)
+  }
+
+
+  def validate(config: StoreDTO, reporter:Reporter):ReporterLevel = {
+    if( config == null ) {
+      return INFO
+    } else {
+      storesSPI.foreach { spi=>
+        val rc = spi.validate(config, reporter)
+        if( rc!=null ) {
+          return rc
+        }
+      }
+    }
+    reporter.report(ERROR, "Uknonwn store configuration type: "+config.getClass)
+    ERROR
+  }
+
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/ClassFinder.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/ClassFinder.scala?rev=961124&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/ClassFinder.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/ClassFinder.scala Wed Jul  7 04:05:05 2010
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.util
+
+import java.io.InputStream
+import java.util.Properties
+
+/**
+ * <p>
+ * Used to discover classes using the META-INF discovery trick.
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+case class ClassFinder[T](path:String, loaders:Seq[ClassLoader]=Thread.currentThread.getContextClassLoader::Nil) {
+
+  def find(): List[Class[T]] = {
+    var classes = List[Class[T]]()
+    loaders.foreach { loader=>
+
+      val resources = loader.getResources(path)
+      var classNames: List[String] = Nil
+      while(resources.hasMoreElements) {
+        val url = resources.nextElement;
+        val p = loadProperties(url.openStream)
+        val enum = p.keys
+        while (enum.hasMoreElements) {
+          classNames = classNames ::: enum.nextElement.asInstanceOf[String] :: Nil
+        }
+      }
+      classNames = classNames.removeDuplicates
+
+      classes :::= classNames.map { name=>
+        loader.loadClass(name).asInstanceOf[Class[T]]
+      }
+
+    }
+
+    return classes.removeDuplicates
+  }
+
+  private def loadProperties(is:InputStream):Properties = {
+    if( is==null ) {
+      return null;
+    }
+    try {
+      val p = new Properties()
+      p.load(is);
+      return p
+    } catch {
+      case e:Exception =>
+      return null
+    } finally {
+      try {
+        is.close()
+      } catch {
+        case _ =>
+      }
+    }
+  }
+}
\ No newline at end of file