You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:04:27 UTC

svn commit: r961123 - in /activemq/sandbox/activemq-apollo-actor: ./ activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ activemq-cassandra/ activemq-cassandra/src/ activemq-cassandra/src/main/ activemq-cassandra/src/main/proto/ activemq-...

Author: chirino
Date: Wed Jul  7 04:04:25 2010
New Revision: 961123

URL: http://svn.apache.org/viewvc?rev=961123&view=rev
Log:
initial pass at a cassandra store impl

Added:
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/pom.xml
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/proto/
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/proto/data.proto
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/Schema.scala
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/resources/
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/resources/org/
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/resources/org/apache/
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/resources/org/apache/activemq/
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/resources/org/apache/activemq/broker/
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/resources/org/apache/activemq/broker/store/
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/resources/org/apache/activemq/broker/store/cassandra/
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/resources/org/apache/activemq/broker/store/cassandra/log4j.properties
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/resources/org/apache/activemq/broker/store/cassandra/storage-conf.xml
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/scala/
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/scala/org/
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/scala/org/apache/
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/scala/org/apache/activemq/
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/scala/org/apache/activemq/broker/
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/scala/org/apache/activemq/broker/store/
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/scala/org/apache/activemq/broker/store/cassandra/
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/scala/org/apache/activemq/broker/store/cassandra/CassandraServerMixin.scala
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreTest.scala
    activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/IntCounter.scala
      - copied, changed from r961122, activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryRecord.java
    activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/LongCounter.scala
      - copied, changed from r961122, activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryRecord.java
Removed:
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/memory/MemoryStore.scala
Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryRecord.java
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala
    activemq/sandbox/activemq-apollo-actor/activemq-util/src/test/scala/org/apache/activemq/apollo/broker/FunSuiteSupport.scala
    activemq/sandbox/activemq-apollo-actor/pom.xml

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=961123&r1=961122&r2=961123&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jul  7 04:04:25 2010
@@ -20,7 +20,7 @@ import _root_.org.apache.activemq.filter
 import _root_.java.lang.{String}
 import _root_.org.fusesource.hawtdispatch._
 import org.fusesource.hawtbuf._
-import org.apache.activemq.broker.store.StoreTransaction
+import org.apache.activemq.broker.store.StoreBatch
 
 /**
  * A producer which sends Delivery objects to a delivery consumer.
@@ -151,13 +151,13 @@ class Delivery extends BaseRetained {
   /**
    * The transaction the delivery is participating in.
    */
-  var storeTx:StoreTransaction = null
+  var storeBatch:StoreBatch = null
 
   /**
    * Set if the producer requires an ack to be sent back.  Consumer
    * should execute once the message is processed.
    */
-  var ack:(StoreTransaction)=>Unit = null
+  var ack:(StoreBatch)=>Unit = null
 
   def copy() = (new Delivery).set(this)
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961123&r1=961122&r2=961123&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul  7 04:04:25 2010
@@ -23,9 +23,9 @@ import org.fusesource.hawtdispatch.{Scal
 import org.apache.activemq.util.TreeMap.TreeEntry
 import java.util.{Collections, ArrayList, LinkedList}
 import org.apache.activemq.util.list.{LinkedNodeList, LinkedNode}
-import org.apache.activemq.broker.store.{StoreTransaction}
+import org.apache.activemq.broker.store.{StoreBatch}
 import protocol.ProtocolFactory
-import org.apache.activemq.apollo.store.MessageRecord
+import org.apache.activemq.apollo.store.{QueueEntryRecord, MessageRecord}
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -75,7 +75,7 @@ class Queue(val host: VirtualHost, val d
   })
 
 
-  val ack_source = createSource(new ListEventAggregator[(LinkedQueueEntry, StoreTransaction)](), dispatchQueue)
+  val ack_source = createSource(new ListEventAggregator[(LinkedQueueEntry, StoreBatch)](), dispatchQueue)
   ack_source.setEventHandler(^ {drain_acks});
   ack_source.resume
 
@@ -133,18 +133,18 @@ class Queue(val host: VirtualHost, val d
         entry.created(next_message_seq, delivery)
 
         if( delivery.ack!=null ) {
-          delivery.ack(delivery.storeTx)
+          delivery.ack(delivery.storeBatch)
         }
         if (delivery.storeId != -1) {
-          delivery.storeTx.enqueue(storeId, entry.seq, delivery.storeId)
-          delivery.storeTx.release
+          delivery.storeBatch.enqueue(entry.createQueueEntryRecord)
+          delivery.storeBatch.release
         }
 
         size += entry.value.size
         entries.addLast(entry)
         counter += 1;
 
-        if( full ) {
+        if( full && host.store!=null ) {
 //          swap
         }
 
@@ -156,21 +156,20 @@ class Queue(val host: VirtualHost, val d
     }
   }
 
-  def ack(entry: QueueEntry, tx:StoreTransaction) = {
-
+  def ack(entry: QueueEntry, sb:StoreBatch) = {
     if (entry.value.ref != -1) {
-      val transaction = if( tx == null ) {
-        host.database.createStoreTransaction
+      val storeBatch = if( sb == null ) {
+        host.store.createStoreBatch
       } else {
-        tx
+        sb
       }
-      transaction.dequeue(storeId, entry.seq, entry.value.ref)
-      if( tx == null ) {
-        transaction.release
+      storeBatch.dequeue(entry.createQueueEntryRecord)
+      if( sb == null ) {
+        storeBatch.release
       }
     }
-    if( tx != null ) {
-      tx.release
+    if( sb != null ) {
+      sb.release
     }
 
     counter -= 1
@@ -178,7 +177,6 @@ class Queue(val host: VirtualHost, val d
     entry.tombstone
 
     if (counter == 0) {
-//      trace("empty.. triggering refill")
       messages.refiller.run
     }
   }
@@ -234,10 +232,10 @@ class Queue(val host: VirtualHost, val d
         if (delivery.storeId != -1) {
           // If the message has a store id, then this delivery will
           // need a tx to track the store changes.
-          if( delivery.storeTx == null ) {
-            delivery.storeTx = host.database.createStoreTransaction
+          if( delivery.storeBatch == null ) {
+            delivery.storeBatch = host.store.createStoreBatch
           } else {
-            delivery.storeTx.retain
+            delivery.storeBatch.retain
           }
         }
 
@@ -295,7 +293,6 @@ class Queue(val host: VirtualHost, val d
   }
 
   def swap() = {
-
     class Prio(val entry:QueueEntry) extends Comparable[Prio] {
       var value = 0
       def compareTo(o: Prio) = o.value - value
@@ -362,7 +359,7 @@ class Queue(val host: VirtualHost, val d
         if( stored!=null && !stored.loading) {
           // start loading it back...
           stored.loading = true
-          host.database.loadMessage(stored.ref) { delivery =>
+          host.store.loadMessage(stored.ref) { delivery =>
             // pass off to a source so it can aggregate multiple
             // loads to reduce cross thread synchronization
             if( delivery.isDefined ) {
@@ -376,7 +373,7 @@ class Queue(val host: VirtualHost, val d
         if( loaded!=null ) {
           var ref = loaded.delivery.storeId
           if( ref == -1 ) {
-            val tx = host.database.createStoreTransaction
+            val tx = host.store.createStoreBatch
 
             val message = loaded.delivery.message
             val sm = new MessageRecord
@@ -386,11 +383,12 @@ class Queue(val host: VirtualHost, val d
 
             tx.store(sm)
             loaded.delivery.storeId = sm.id
-            tx.enqueue(storeId, entry.seq, sm.id)
+
+            tx.enqueue(entry.createQueueEntryRecord)
             tx.release
           }
           flushingSize += entry.value.size
-          host.database.flushMessage(ref) {
+          host.store.flushMessage(ref) {
             store_flush_source.merge(entry)
           }
         }
@@ -454,6 +452,15 @@ class QueueEntry(val queue:Queue) extend
   var browsing:List[Subscription] = Nil
   var value:EntryType = null
 
+  def createQueueEntryRecord = {
+    val qer = new QueueEntryRecord
+    qer.queueKey = queue.storeId
+    qer.queueSeq = seq
+    qer.messageKey = value.ref
+    qer.size = value.size
+    qer
+  }
+
   def compareTo(o: QueueEntry) = {
     (seq - o.seq).toInt
   }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=961123&r1=961122&r2=961123&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jul  7 04:04:25 2010
@@ -28,7 +28,6 @@ import org.apache.activemq.apollo.dto.Vi
 import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
 
 import ReporterLevel._
-import org.apache.activemq.apollo.store.memory.MemoryStore
 import org.apache.activemq.broker.store.{Store}
 import org.fusesource.hawtbuf.proto.WireFormat
 import org.apache.activemq.apollo.store.QueueRecord
@@ -88,7 +87,7 @@ class VirtualHost(val broker: Broker) ex
     this.names = names.toList
   }
 
-  var database:Store = new MemoryStore()
+  var store:Store = null
   var transactionManager:TransactionManagerX = new TransactionManagerX
 
   var protocols = Map[AsciiBuffer, WireFormat]()
@@ -112,28 +111,28 @@ class VirtualHost(val broker: Broker) ex
 
 
   override protected def _start(onCompleted:Runnable):Unit = {
+    if( store!=null ) {
+      store.start();
+      store.listQueues { ids =>
+        for( id <- ids) {
+          store.getQueueStatus(id) { x =>
+            x match {
+              case Some(info)=>
+              dispatchQueue ^{
+                val dest = DestinationParser.parse(info.record.name , destination_parser_options)
+                if( dest.getDomain == Domain.QUEUE_DOMAIN ) {
+
+                  val queue = new Queue(this, dest, id)
+                  queue.first_seq = info.first
+                  queue.last_seq = info.last
+                  queue.message_seq_counter = info.last+1
+                  queue.count = info.count
 
-    database.start();
-
-    database.listQueues { ids =>
-      for( id <- ids) {
-        database.getQueueStatus(id) { x =>
-          x match {
-            case Some(info)=>
-            dispatchQueue ^{
-              val dest = DestinationParser.parse(info.record.name , destination_parser_options)
-              if( dest.getDomain == Domain.QUEUE_DOMAIN ) {
-
-                val queue = new Queue(this, dest, id)
-                queue.first_seq = info.first
-                queue.last_seq = info.last
-                queue.message_seq_counter = info.last+1
-                queue.count = info.count
-
-                queues.put(info.record.name, queue)
+                  queues.put(info.record.name, queue)
+                }
               }
+              case _ =>
             }
-            case _ =>
           }
         }
       }
@@ -163,7 +162,9 @@ class VirtualHost(val broker: Broker) ex
 //        }
 //        done.await();
 
-    database.stop();
+    if( store!=null ) {
+      store.stop();
+    }
     onCompleted.run
   }
 
@@ -184,21 +185,27 @@ class VirtualHost(val broker: Broker) ex
 
   def addQueue(dest:Destination)(cb: (Queue)=>Unit ) = ^{
     val name = DestinationParser.toBuffer(dest, destination_parser_options)
-    val record = new QueueRecord
-    record.name = name
-    database.addQueue(record) { rc =>
-      rc match {
-        case Some(id) =>
-          dispatchQueue ^ {
-            val queue = new Queue(this, dest, id)
-            queues.put(name, queue)
-            cb(queue)
-          }
-        case None => // store could not create
-          cb(null)
+    if( store!=null ) {
+      val record = new QueueRecord
+      record.name = name
+      store.addQueue(record) { rc =>
+        rc match {
+          case Some(id) =>
+            dispatchQueue ^ {
+              val queue = new Queue(this, dest, id)
+              queues.put(name, queue)
+              cb(queue)
+            }
+          case None => // store could not create
+            cb(null)
+        }
       }
+    } else {
+      val queue = new Queue(this, dest, -1)
+      queues.put(name, queue)
+      cb(queue)
     }
-    null
+
   } |>>: dispatchQueue
 
   def createSubscription(consumer:ConsumerContext):BrokerSubscription = {

Added: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/pom.xml?rev=961123&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/pom.xml (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/pom.xml Wed Jul  7 04:04:25 2010
@@ -0,0 +1,184 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version
+  2.0 (the "License"); you may not use this file except in compliance
+  with the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+  applicable law or agreed to in writing, software distributed under
+  the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+  OR CONDITIONS OF ANY KIND, either express or implied. See the
+  License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.activemq</groupId>
+    <artifactId>activemq-scala</artifactId>
+    <version>6.0-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.activemq</groupId>
+  <artifactId>activemq-cassandra</artifactId>
+  <packaging>jar</packaging>
+  <version>6.0-SNAPSHOT</version>
+
+  <name>ActiveMQ :: Store :: Cassandra</name>
+
+  <repositories>
+    <repository>
+      <id>shorrockin Maven 2 Repository</id>
+      <url>http://maven.shorrockin.com</url>
+    </repository>
+  </repositories>
+  
+  <dependencies>
+
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-store</artifactId>
+      <version>6.0-SNAPSHOT</version>
+    </dependency>
+    
+   <!-- val shorrockinRepo = "shorrockin Maven 2 Repository" at "http://maven.shorrockin.com" -->
+    
+    <dependency>
+      <groupId>com.shorrockin</groupId>
+      <artifactId>cascal</artifactId>
+      <version>1.2-SNAPSHOT</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.fusesource.hawtbuf</groupId>
+      <artifactId>hawtbuf-proto</artifactId>
+      <version>${hawtbuf-version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-dto</artifactId>
+      <version>6.0-SNAPSHOT</version>
+    </dependency>
+
+    <!-- Scala Support -->
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-library</artifactId>
+      <scope>compile</scope>
+      <version>${scala-version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-compiler</artifactId>
+      <version>${scala-version}</version>
+      <scope>compile</scope>
+      <optional>true</optional>
+    </dependency>
+    
+    <!-- Testing Dependencies -->    
+    <dependency>
+      <groupId>org.scalatest</groupId>
+      <artifactId>scalatest</artifactId>
+      <version>${scalatest-version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-store</artifactId>
+      <version>6.0-SNAPSHOT</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+      <version>${junit-version}</version>
+    </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <scope>test</scope>
+      <version>${log4j-version}</version>
+    </dependency>
+    
+    <dependency>
+        <groupId>commons-lang</groupId>
+        <artifactId>commons-lang</artifactId>
+        <version>2.4</version>
+        <scope>test</scope>
+    </dependency>
+
+    <dependency>
+        <groupId>commons-codec</groupId>
+        <artifactId>commons-codec</artifactId>
+        <version>1.2</version>
+        <scope>test</scope>
+    </dependency>
+
+    <dependency>
+        <groupId>commons-collections</groupId>
+        <artifactId>commons-collections</artifactId>
+        <version>3.2.1</version>
+        <scope>test</scope>
+    </dependency>
+
+    <dependency>
+        <groupId>com.google.clhm</groupId>
+        <artifactId>clhm-production</artifactId>
+        <version>1.0</version>
+        <scope>test</scope>
+    </dependency>
+
+    <dependency>
+        <groupId>com.google.collections</groupId>
+        <artifactId>google-collections</artifactId>
+        <version>1.0</version>
+        <scope>test</scope>
+    </dependency>
+
+    <dependency>
+        <groupId>flexjson</groupId>
+        <artifactId>flexjson</artifactId>
+        <version>1.7</version>
+        <scope>test</scope>
+    </dependency>
+
+    <dependency>
+        <groupId>high-scale-lib</groupId>
+        <artifactId>high-scale-lib</artifactId>
+        <version>1.0</version>
+        <scope>test</scope>
+    </dependency>
+
+  </dependencies>
+
+  <build>
+    <plugins>
+
+      <plugin>
+        <groupId>org.fusesource.hawtbuf</groupId>
+        <artifactId>hawtbuf-proto</artifactId>
+        <version>${hawtbuf-version}</version>
+        <configuration>
+          <type>alt</type>
+        </configuration>
+         <executions>
+          <execution>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+
+    </plugins>
+  </build>
+
+</project>

Added: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/proto/data.proto
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/proto/data.proto?rev=961123&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/proto/data.proto (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/proto/data.proto Wed Jul  7 04:04:25 2010
@@ -0,0 +1,35 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+package org.apache.activemq.broker.store.cassandra;
+
+option java_multiple_files = true;
+
+message PBMessageRecord {
+  required bytes messageId = 2 [java_override_type = "AsciiBuffer"];
+  required bytes protocol = 3 [java_override_type = "AsciiBuffer"];
+  required int32 size = 4;
+  optional bytes value = 5;
+  optional int64 stream = 6;
+  optional int64 expiration = 7;
+}
+
+message PBQueueEntryRecord {
+  required int64 messageKey = 1;
+  optional bytes attachment = 2;
+  optional int32 size = 3;
+  optional int32 redeliveries = 4;
+}

Added: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala?rev=961123&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala Wed Jul  7 04:04:25 2010
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store.cassandra
+
+import org.apache.activemq.apollo.broker.{Logging, BaseService}
+import com.shorrockin.cascal.session._
+import com.shorrockin.cascal.utils.Conversions._
+import java.util.{HashMap}
+import org.fusesource.hawtbuf.AsciiBuffer._
+import org.fusesource.hawtbuf.{AsciiBuffer, DataByteArrayInputStream, DataByteArrayOutputStream, Buffer}
+import org.apache.activemq.apollo.store.{QueueEntryRecord, QueueStatus, MessageRecord, QueueRecord}
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class CassandraClient() {
+  var schema: Schema = new Schema("ActiveMQ")
+  var hosts = Host("127.0.0.1", 9160, 3000) :: Nil
+
+  implicit def toByteArray(buffer: Buffer) = buffer.toByteArray
+
+  protected var pool: SessionPool = null
+
+  def start() = {
+    val params = new PoolParams(10, ExhaustionPolicy.Fail, 500L, 6, 2)
+    pool = new SessionPool(hosts, params, Consistency.One)
+  }
+
+  def stop() = {
+    pool.close
+  }
+
+  protected def withSession[E](block: Session => E): E = {
+    val session = pool.checkout
+    try {
+      block(session)
+    } finally {
+      pool.checkin(session)
+    }
+  }
+
+  implicit def decodeMessageRecord(v: Array[Byte]): MessageRecord = {
+    import PBMessageRecord._
+    val pb = PBMessageRecordBuffer.parseUnframed(v)
+    val rc = new MessageRecord
+    rc.messageId = pb.getMessageId
+    rc.protocol = pb.getProtocol
+    rc.size = pb.getSize
+    rc.value = pb.getValue
+    rc.stream = pb.getStream
+    rc.expiration = pb.getExpiration
+    rc
+  }
+
+  implicit def encodeMessageRecord(v: MessageRecord): Array[Byte] = {
+    import PBMessageRecord._
+    val pb = new PBMessageRecordBean
+    pb.setMessageId(v.messageId)
+    pb.setProtocol(v.protocol)
+    pb.setSize(v.size)
+    pb.setValue(v.value)
+    pb.setStream(v.stream)
+    pb.setExpiration(v.expiration)
+    pb.freeze.toUnframedByteArray
+  }
+  
+  implicit def decodeQueueEntryRecord(v: Array[Byte]): QueueEntryRecord = {
+    import PBQueueEntryRecord._
+    val pb = PBQueueEntryRecordBuffer.parseUnframed(v)
+    val rc = new QueueEntryRecord
+    rc.messageKey = pb.getMessageKey
+    rc.attachment = pb.getAttachment
+    rc.size = pb.getSize
+    rc.redeliveries = pb.getRedeliveries.toShort
+    rc
+  }
+
+  implicit def encodeQueueEntryRecord(v: QueueEntryRecord): Array[Byte] = {
+    import PBQueueEntryRecord._
+    val pb = new PBQueueEntryRecordBean
+    pb.setMessageKey(v.messageKey)
+    pb.setAttachment(v.attachment)
+    pb.setSize(v.size)
+    pb.setRedeliveries(v.redeliveries)
+    pb.freeze.toUnframedByteArray
+  }
+
+  def addQueue(record: QueueRecord) = {
+    withSession {
+      session =>
+        session.insert(schema.queue_name \ (record.id, record.name))
+    }
+  }
+
+  def listQueues: Seq[Long] = {
+    withSession {
+      session =>
+        session.list(schema.queue_name).map {
+          x =>
+            val id: Long = x.name
+            id
+        }
+    }
+  }
+
+  def getQueueStatus(id: Long): Option[QueueStatus] = {
+    withSession {
+      session =>
+        session.get(schema.queue_name \ id) match {
+          case Some(x) =>
+
+            val rc = new QueueStatus
+            rc.record = new QueueRecord
+            rc.record.id = id
+            rc.record.name = new AsciiBuffer(x)
+
+            rc.count = session.count( schema.entries \ id )
+            
+            // TODO
+            //          rc.count =
+            //          rc.first =
+            //          rc.last =
+
+            Some(rc)
+          case None =>
+            None
+        }
+    }
+  }
+
+
+  def store(txs:Seq[CassandraStore#CassandraBatch]) {
+    withSession {
+      session =>
+        var batch = List[Operation]()
+        txs.foreach {
+          tx =>
+            tx.actions.foreach {
+              case (msg, action) =>
+                var rc =
+                if (action.store != null) {
+                  batch ::= Insert( schema.message_data \ (msg, action.store) )
+                }
+                action.enqueues.foreach {
+                  queueEntry =>
+                    val qid = queueEntry.queueKey
+                    val seq = queueEntry.queueSeq
+                    batch ::= Insert( schema.entries \ qid \ (seq, queueEntry) )
+                }
+                action.dequeues.foreach {
+                  queueEntry =>
+                    val qid = queueEntry.queueKey
+                    val seq = queueEntry.queueSeq
+                    batch ::= Delete( schema.entries \ qid, ColumnPredicate(seq :: Nil) )
+                }
+            }
+        }
+        session.batch(batch)
+    }
+  }
+
+  def loadMessage(id: Long): Option[MessageRecord] = {
+    withSession {
+      session =>
+        session.get(schema.message_data \ id) match {
+          case Some(x) =>
+            val rc: MessageRecord = x.value
+            rc.id = id
+            Some(rc)
+          case None =>
+            None
+        }
+    }
+  }
+
+  def getQueueEntries(qid: Long): Seq[QueueEntryRecord] = {
+    withSession {
+      session =>
+        session.list(schema.entries \ qid).map { x=>
+          val rc:QueueEntryRecord = x.value
+          rc.queueKey = qid
+          rc.queueSeq = x.name
+          rc
+        }
+    }
+  }
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala?rev=961123&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala Wed Jul  7 04:04:25 2010
@@ -0,0 +1,343 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store.cassandra
+
+import org.apache.activemq.broker.store.{StoreBatch, Store}
+import org.fusesource.hawtdispatch.BaseRetained
+import org.apache.activemq.apollo.broker.{Logging, Log, BaseService}
+import com.shorrockin.cascal.session._
+import org.fusesource.hawtdispatch.ScalaDispatch._
+import java.util.concurrent.atomic.AtomicLong
+import collection.mutable.ListBuffer
+import org.fusesource.hawtbuf.{Buffer, AsciiBuffer}
+import com.shorrockin.cascal.model.Key
+import org.apache.log.output.db.ColumnType
+import java.util.{HashSet, HashMap}
+import java.util.concurrent.{TimeUnit, Executors, ExecutorService}
+import org.apache.activemq.apollo.util.IntCounter
+import com.shorrockin.cascal.utils.Conversions._
+import org.apache.activemq.apollo.store.{QueueEntryRecord, MessageRecord, QueueStatus, QueueRecord}
+import collection.Seq
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class CassandraStore extends Store with BaseService with Logging {
+
+  import CassandraStoreHelper._
+  override protected def log = CassandraStoreHelper
+
+  /////////////////////////////////////////////////////////////////////
+  //
+  // Implementation of the BaseService interface
+  //
+  /////////////////////////////////////////////////////////////////////
+  val dispatchQueue = createQueue("cassandra store")
+
+  var next_queue_key = new AtomicLong(0)
+  var next_msg_key = new AtomicLong(0)
+
+  val client = new CassandraClient()
+  protected var executor_pool:ExecutorService = _
+
+  protected def _start(onCompleted: Runnable) = {
+    executor_pool = Executors.newCachedThreadPool
+    client.schema = Schema("ActiveMQ")
+    client.start
+    onCompleted.run
+  }
+
+  protected def _stop(onCompleted: Runnable) = {
+    client.stop
+    new Thread() {
+      override def run = {
+        executor_pool.shutdown
+        executor_pool.awaitTermination(1, TimeUnit.DAYS)
+        executor_pool = null
+        onCompleted.run
+      }
+    }.start
+  }
+
+  /////////////////////////////////////////////////////////////////////
+  //
+  // Implementation of the BrokerDatabase interface
+  //
+  /////////////////////////////////////////////////////////////////////
+
+  def addQueue(record: QueueRecord)(cb: (Option[Long]) => Unit) = {
+    val key = next_queue_key.incrementAndGet
+    executor_pool ^{
+      client.addQueue(record)
+      cb(Some(key))
+    }
+  }
+
+  def getQueueStatus(id: Long)(cb: (Option[QueueStatus]) => Unit) = {
+    executor_pool ^{
+      cb( client.getQueueStatus(id) )
+    }
+  }
+
+  def listQueues(cb: (Seq[Long]) => Unit) = {
+    executor_pool ^{
+      cb( client.listQueues )
+    }
+  }
+
+  def loadMessage(id: Long)(cb: (Option[MessageRecord]) => Unit) = {
+    executor_pool ^{
+      cb( client.loadMessage(id) )
+    }
+  }
+
+
+  def getQueueEntries(id: Long)(cb: (Seq[QueueEntryRecord]) => Unit) = {
+    executor_pool ^{
+      cb( client.getQueueEntries(id) )
+    }
+  }
+
+  def flushMessage(id: Long)(cb: => Unit) = ^{
+    val action: CassandraBatch#MessageAction = pendingStores.get(id)
+    if( action == null ) {
+      cb
+    } else {
+      val prevDisposer = action.tx.getDisposer
+      action.tx.setDisposer(^{
+        cb
+        if(prevDisposer!=null) {
+          prevDisposer.run
+        }
+      })
+      flush(action.tx.txid)
+    }
+
+  } >>: dispatchQueue
+
+  def createStoreBatch() = new CassandraBatch
+
+
+  /////////////////////////////////////////////////////////////////////
+  //
+  // Implementation of the StoreBatch interface
+  //
+  /////////////////////////////////////////////////////////////////////
+  class CassandraBatch extends BaseRetained with StoreBatch {
+
+    class MessageAction {
+
+      var msg= 0L
+      var store: MessageRecord = null
+      var enqueues = ListBuffer[QueueEntryRecord]()
+      var dequeues = ListBuffer[QueueEntryRecord]()
+
+      def tx = CassandraBatch.this
+      def isEmpty() = store==null && enqueues==Nil && dequeues==Nil
+      def cancel() = {
+        tx.rm(msg)
+        if( tx.isEmpty ) {
+          tx.cancel
+        }
+      }
+    }
+
+    var actions = Map[Long, MessageAction]()
+    var txid:Int = 0
+
+    def rm(msg:Long) = {
+      actions -= msg
+    }
+
+    def isEmpty = actions.isEmpty
+    def cancel = {
+      delayedTransactions.remove(txid)
+      onPerformed
+    }
+
+    def store(record: MessageRecord) = {
+      record.id = next_msg_key.incrementAndGet
+      val action = new MessageAction
+      action.msg = record.id
+      action.store = record
+      this.synchronized {
+        actions += record.id -> action
+      }
+    }
+
+    def action(msg:Long) = {
+      actions.get(msg) match {
+        case Some(x) => x
+        case None =>
+          val x = new MessageAction
+          x.msg = msg
+          actions += msg->x
+          x
+      }
+    }
+
+    def enqueue(entry: QueueEntryRecord) = {
+      this.synchronized {
+        action(entry.messageKey).enqueues += entry
+      }
+    }
+
+    def dequeue(entry: QueueEntryRecord) = {
+      this.synchronized {
+        action(entry.messageKey).dequeues += entry
+      }
+    }
+
+    override def dispose = {
+      transaction_source.merge(this)
+    }
+
+    def onPerformed() {
+      super.dispose
+    }
+  }
+
+  def key(x:QueueEntryRecord) = (x.queueKey, x.queueSeq)
+
+  val transaction_source = createSource(new ListEventAggregator[CassandraBatch](), dispatchQueue)
+  transaction_source.setEventHandler(^{drain_transactions});
+  transaction_source.resume
+
+  var pendingStores = new HashMap[Long, CassandraBatch#MessageAction]()
+  var pendingEnqueues = new HashMap[(Long,Long), CassandraBatch#MessageAction]()
+  var delayedTransactions = new HashMap[Int, CassandraBatch]()
+
+  var next_tx_id = new IntCounter
+  
+  def drain_transactions = {
+    transaction_source.getData.foreach { tx =>
+
+      val tx_id = next_tx_id.incrementAndGet
+      tx.txid = tx_id
+      delayedTransactions.put(tx_id, tx)
+      dispatchQueue.dispatchAfter(30, TimeUnit.SECONDS, ^{flush(tx_id)})
+
+      tx.actions.foreach { case (msg, action) =>
+        if( action.store!=null ) {
+          pendingStores.put(msg, action)
+        }
+        action.enqueues.foreach { queueEntry=>
+          pendingEnqueues.put(key(queueEntry), action)
+        }
+
+
+        // dequeues can cancel out previous enqueues
+        action.dequeues.foreach { currentDequeue=>
+          val currentKey = key(currentDequeue)
+          val prevAction:CassandraBatch#MessageAction = pendingEnqueues.remove(currentKey)
+          if( prevAction!=null ) {
+
+            // yay we can cancel out a previous enqueue
+            prevAction.enqueues = prevAction.enqueues.filterNot( x=> key(x) == currentKey )
+
+            // if the message is not in any queues.. we can gc it..
+            if( prevAction.enqueues == Nil && prevAction.store !=null ) {
+              pendingStores.remove(msg)
+              prevAction.store = null
+            }
+
+            // Cancel the action if it's now empty
+            if( prevAction.isEmpty ) {
+              action.cancel()
+            }
+
+            // since we canceled out the previous enqueue.. now cancel out the action
+            action.dequeues = action.dequeues.filterNot( x=> key(x) == currentDequeue)
+            if( action.isEmpty ) {
+              action.cancel()
+            }
+          }
+        }
+      }
+
+    }
+  }
+
+  def flush(tx_id:Int) = {
+    flush_source.merge(tx_id)
+  }
+
+  val flush_source = createSource(new ListEventAggregator[Int](), dispatchQueue)
+  flush_source.setEventHandler(^{drain_flushes});
+  flush_source.resume
+
+  def drain_flushes = {
+    val txs = flush_source.getData.flatMap{ tx_id =>
+      val tx = delayedTransactions.remove(tx_id)
+      // Message may be flushed or canceled before the timeout flush event..
+      // tx may be null in those cases
+      if (tx!=null) {
+
+        tx.actions.foreach { case (msg, action) =>
+          if( action.store!=null ) {
+            pendingStores.remove(msg)
+          }
+          action.enqueues.foreach { queueEntry=>
+            pendingEnqueues.remove(key(queueEntry), action)
+          }
+        }
+
+        Some(tx)
+      } else {
+        None
+      }
+    }
+
+    if( !txs.isEmpty ) {
+      // suspend so that we don't process more flush requests while
+      // we are concurrently executing a flush
+      flush_source.suspend
+      executor_pool ^{
+        client.store(txs)
+        txs.foreach { x=>
+          x.onPerformed
+        }
+        flush_source.resume
+      }
+    }
+  }
+
+}
+
+object CassandraStoreHelper extends Log {
+  val DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
+
+//  /**
+//   * Creates a default a configuration object.
+//   */
+//  def default() = {
+//    val rc = new HawtDBStoreDTO
+//    rc.directory = new File("activemq-data")
+//    rc
+//  }
+//
+//  /**
+//   * Validates a configuration object.
+//   */
+//  def validate(config: HawtDBStoreDTO, reporter:Reporter):ReporterLevel = {
+//     new Reporting(reporter) {
+//      if( config.directory == null ) {
+//        error("hawtdb store must be configured with a directroy.")
+//      }
+//    }.result
+//  }
+}

Added: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/Schema.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/Schema.scala?rev=961123&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/Schema.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/Schema.scala Wed Jul  7 04:04:25 2010
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store.cassandra
+
+import org.fusesource.hawtbuf.AsciiBuffer._
+import org.fusesource.hawtbuf.Buffer
+import com.shorrockin.cascal.model.{StandardKey, Keyspace}
+import com.shorrockin.cascal.session.ColumnPredicate
+
+case class Schema(name:String) {
+
+  implicit def toByteArray(buffer:Buffer) = buffer.toByteArray
+
+  val keyspace = Keyspace(name)
+
+  /**
+   */
+  val message = keyspace \ "messages"
+  val message_data = message \ "data"
+
+  /**
+   */
+  val queue = keyspace \ "queues"
+  val queue_name = queue \ "name"
+
+  /**
+   */
+  val entries = keyspace \ "entries"
+
+//  protected val subscriptionColumns = columns(subscriptionKey, "destination" :: "created" :: "prefetchCount" :: "inactivityTimeout" :: Nil)
+//
+//  protected def columns(key: StandardKey, names: Seq[String]) = {
+//    val columns = names.map{n => (key \ (n, "")).name}
+//    ColumnPredicate(columns)
+//  }
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/resources/org/apache/activemq/broker/store/cassandra/log4j.properties
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/resources/org/apache/activemq/broker/store/cassandra/log4j.properties?rev=961123&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/resources/org/apache/activemq/broker/store/cassandra/log4j.properties (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/resources/org/apache/activemq/broker/store/cassandra/log4j.properties Wed Jul  7 04:04:25 2010
@@ -0,0 +1,10 @@
+log4j.rootLogger=DEBUG, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%5p %d [%c %t] %m%n
+
+# disable cassandra logging for test cases
+log4j.logger.org.apache.cassandra=DEBUG
+log4j.logger.org.apache.cassandra.service=DEBUG
+log4j.logger.org.apache.cassandra.thrift=DEBUG
\ No newline at end of file

Added: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/resources/org/apache/activemq/broker/store/cassandra/storage-conf.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/resources/org/apache/activemq/broker/store/cassandra/storage-conf.xml?rev=961123&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/resources/org/apache/activemq/broker/store/cassandra/storage-conf.xml (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/resources/org/apache/activemq/broker/store/cassandra/storage-conf.xml Wed Jul  7 04:04:25 2010
@@ -0,0 +1,54 @@
+<Storage>
+  <ClusterName>Test</ClusterName>
+
+  <Keyspaces>
+    <Keyspace Name="ActiveMQ">
+
+      <!-- regular records with text keys -->
+      <ColumnFamily Name="messages" CompareWith="LongType"/>
+      <ColumnFamily Name="queues" CompareWith="LongType"/>
+      <ColumnFamily Name="entries" CompareWith="LongType" KeysCached="10000"/>
+      <!-- todo.. play with: RowsCached="100%" KeysCached="100%"-->
+
+      <ReplicaPlacementStrategy>org.apache.cassandra.locator.RackUnawareStrategy</ReplicaPlacementStrategy>
+      <ReplicationFactor>1</ReplicationFactor>
+      <EndPointSnitch>org.apache.cassandra.locator.EndPointSnitch</EndPointSnitch>
+    </Keyspace>
+  </Keyspaces>
+
+  <AutoBootstrap>false</AutoBootstrap>
+  <Authenticator>org.apache.cassandra.auth.AllowAllAuthenticator</Authenticator>
+  <Partitioner>org.apache.cassandra.dht.OrderPreservingPartitioner</Partitioner>
+  <InitialToken></InitialToken>
+  <CommitLogDirectory>%temp-dir%commitlog</CommitLogDirectory>
+
+  <DataFileDirectories>
+    <DataFileDirectory>%temp-dir%data</DataFileDirectory>
+  </DataFileDirectories>
+
+  <Seeds>
+    <Seed>127.0.0.1</Seed>
+  </Seeds>
+  <RpcTimeoutInMillis>10000</RpcTimeoutInMillis>
+  <CommitLogRotationThresholdInMB>128</CommitLogRotationThresholdInMB>
+  <ListenAddress>localhost</ListenAddress>
+  <StoragePort>7000</StoragePort>
+  <ThriftAddress>localhost</ThriftAddress>
+  <ThriftPort>9160</ThriftPort>
+  <ThriftFramedTransport>false</ThriftFramedTransport>
+  <DiskAccessMode>auto</DiskAccessMode>
+  <RowWarningThresholdInMB>512</RowWarningThresholdInMB>
+  <SlicedBufferSizeInKB>64</SlicedBufferSizeInKB>
+  <FlushDataBufferSizeInMB>32</FlushDataBufferSizeInMB>
+  <FlushIndexBufferSizeInMB>8</FlushIndexBufferSizeInMB>
+  <ColumnIndexSizeInKB>64</ColumnIndexSizeInKB>
+  <MemtableThroughputInMB>64</MemtableThroughputInMB>
+  <BinaryMemtableThroughputInMB>256</BinaryMemtableThroughputInMB>
+  <MemtableOperationsInMillions>0.3</MemtableOperationsInMillions>
+  <MemtableFlushAfterMinutes>60</MemtableFlushAfterMinutes>
+  <ConcurrentReads>8</ConcurrentReads>
+  <ConcurrentWrites>32</ConcurrentWrites>
+  <CommitLogSync>periodic</CommitLogSync>
+  <CommitLogSyncPeriodInMS>10000</CommitLogSyncPeriodInMS>
+  <GCGraceSeconds>864000</GCGraceSeconds>
+</Storage>

Added: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/scala/org/apache/activemq/broker/store/cassandra/CassandraServerMixin.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/scala/org/apache/activemq/broker/store/cassandra/CassandraServerMixin.scala?rev=961123&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/scala/org/apache/activemq/broker/store/cassandra/CassandraServerMixin.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/scala/org/apache/activemq/broker/store/cassandra/CassandraServerMixin.scala Wed Jul  7 04:04:25 2010
@@ -0,0 +1,112 @@
+package org.apache.activemq.broker.store.cassandra
+
+import org.apache.cassandra.thrift.CassandraDaemon
+import org.apache.cassandra.config.DatabaseDescriptor
+import java.io.File
+import java.net.ConnectException
+import org.apache.thrift.transport.{TTransportException, TSocket}
+import com.shorrockin.cascal.session._
+import com.shorrockin.cascal.utils.{Utils, Logging}
+import org.scalatest.{Suite, BeforeAndAfterAll}
+
+/**
+ * trait which mixes in the functionality necessary to embed
+ * cassandra into a unit test
+ */
+trait CassandraServerMixin extends BeforeAndAfterAll {
+this: Suite =>
+
+  private var basedir = "target"
+
+  override protected def beforeAll(configMap: Map[String, Any]): Unit = {
+    configMap.get("basedir") match {
+      case Some(x) => basedir = x.toString
+      case _ =>
+    }
+    startCassandra
+    super.beforeAll(configMap)
+  }
+
+  override protected def afterAll(configMap: Map[String, Any]) = {
+    super.afterAll(configMap)
+    stopCassandra
+  }
+
+
+  import Utils._
+
+  protected def cassandraHomeDirectory = new File(new File(basedir),"cassandra.home.unit-tests")
+  protected def resource(str:String) = getClass.getResourceAsStream(str)
+
+  private var daemon:CassandraDaemon = null
+  var pool:SessionPool = null
+  var daemonThread:Thread = null
+
+  private def startCassandra = synchronized {
+    val hosts  = Host("localhost", 9160, 250) :: Nil
+    val params = new PoolParams(10, ExhaustionPolicy.Fail, 500L, 6, 2)
+    pool = new SessionPool(hosts, params, Consistency.One)
+
+    val home = cassandraHomeDirectory
+    delete(home)
+    home.mkdirs
+
+    val fileSep     = System.getProperty("file.separator")
+    val storageFile = new File(cassandraHomeDirectory, "storage-conf.xml")
+    val logFile     = new File(cassandraHomeDirectory, "log4j.properties")
+
+    replace(copy(resource("storage-conf.xml"), storageFile), ("%temp-dir%" -> (cassandraHomeDirectory.getCanonicalPath + fileSep)))
+    copy(resource("log4j.properties"), logFile)
+
+    System.setProperty("storage-config", cassandraHomeDirectory.getCanonicalPath)
+
+    try {
+      DatabaseDescriptor.getAllDataFileLocations.foreach {
+        (file) => new File(file).mkdirs
+      }
+      new File(DatabaseDescriptor.getLogFileLocation).mkdirs
+
+      daemon = new CassandraDaemon
+      daemon.init(new Array[String](0))
+
+      daemonThread = new Thread("Cassandra Daemon") {
+        override def run = {
+          daemon.start
+        }
+      }
+      daemonThread.start
+      
+    } catch {
+      case e:Throwable =>
+        e.printStackTrace
+        throw e
+    }
+
+    // try to make sockets until the server opens up - there has to be a better
+    // way - just not sure what it is.
+    val socket = new TSocket("localhost", 9160);
+    var opened = false
+    while (!opened) {
+      try {
+        socket.open()
+        opened = true
+        socket.close()
+      } catch {
+        case e:TTransportException => /* ignore */
+        case e:ConnectException => /* ignore */
+      }
+    }
+  }
+
+  private def stopCassandra() = {
+    pool.close
+    daemon.stop
+    daemonThread.join
+    daemonThread = null
+
+    daemon.destroy
+    daemon = null
+  }
+
+}
+

Added: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreTest.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreTest.scala?rev=961123&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreTest.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreTest.scala Wed Jul  7 04:04:25 2010
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store.cassandra
+
+import org.fusesource.hawtbuf.AsciiBuffer._
+import org.scalatest.BeforeAndAfterAll
+import org.fusesource.hawtdispatch.ScalaDispatch._
+import org.fusesource.hawtdispatch.TaskTracker
+import org.apache.activemq.apollo.broker.{LoggingTracker, FunSuiteSupport}
+import java.util.concurrent.{TimeUnit, CountDownLatch}
+import org.apache.activemq.apollo.store.{QueueEntryRecord, QueueStatus, QueueRecord, MessageRecord}
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class CassandraStoreTest extends FunSuiteSupport with CassandraServerMixin {
+
+  def CB[T](func: (T=>Unit)=>Unit ) = {
+    class X {
+      var value:T = _
+    }
+    val rc = new X
+    val cd = new CountDownLatch(1)
+    def cb(x:T) = {
+      rc.value = x
+      cd.countDown
+    }
+    func(cb)
+    cd.await
+    rc.value
+  }
+
+  var store:CassandraStore=null
+
+  override protected def beforeAll() = {
+    store = new CassandraStore()
+    val tracker = new LoggingTracker("store startup")
+    tracker.start(store)
+    tracker.await
+  }
+
+  override protected def afterAll() = {
+    val tracker = new LoggingTracker("store stop")
+    tracker.stop(store)
+    tracker.await
+  }
+
+
+  def expectCB[T](expected:T)(func: (T=>Unit)=>Unit ) = {
+    expect(expected) {
+      CB(func)
+    }
+  }
+
+
+  test("add message") {
+   addMessage
+  }
+
+  def addMessage() {
+    var queueA = new QueueRecord
+    queueA.id =1
+    queueA.name = ascii("queue:1")
+
+    val rc:Option[Long] = CB( cb=> store.addQueue(queueA)(cb) )
+    queueA.id = rc.get
+
+    val expected:Seq[Long] = List(queueA.id)
+    expectCB(expected) { cb=>
+      store.listQueues(cb)
+    }
+
+    var tx = store.createStoreBatch
+    var message = new MessageRecord
+    message.id = 35
+    message.messageId = ascii("msg-35")
+    message.protocol = ascii("test-protocol")
+    message.value = ascii("test content").buffer
+    message.size = message.value.length
+    tx.store(message)
+
+
+    val disposed = new CountDownLatch(1)
+
+    var queueEntry = new QueueEntryRecord
+    queueEntry.queueKey = queueA.id
+    queueEntry.messageKey = message.id
+    queueEntry.queueSeq = 1
+
+    tx.enqueue(queueEntry)
+    tx.setDisposer(^{ disposed.countDown })
+    tx.dispose
+
+    // It should not finish disposing right away...
+    expect(false) {
+      disposed.await(5, TimeUnit.SECONDS)
+    }
+
+    var flushed = new CountDownLatch(1)
+    store.flushMessage(message.id) {
+      flushed.countDown
+    }
+
+    // Should flush quickly now..
+    expect(true) {
+      flushed.await(1, TimeUnit.SECONDS)
+    }
+    // Flushing triggers the tx to finish disposing.
+    expect(true) {
+      disposed.await(1, TimeUnit.SECONDS)
+    }
+
+    // add another message to the queue..
+    tx = store.createStoreBatch
+    message = new MessageRecord
+    message.id = 36
+    message.messageId = ascii("msg-35")
+    message.protocol = ascii("test-protocol")
+    message.value = ascii("test content").buffer
+    message.size = message.value.length
+    tx.store(message)
+
+    queueEntry = new QueueEntryRecord
+    queueEntry.queueKey = queueA.id
+    queueEntry.messageKey = message.id
+    queueEntry.queueSeq = 2
+
+    tx.enqueue(queueEntry)
+
+    flushed = new CountDownLatch(1)
+    store.flushMessage(message.id) {
+      flushed.countDown
+    }
+    flushed.await
+
+    val qso:Option[QueueStatus] = CB( cb=> store.getQueueStatus(queueA.id)(cb) )
+    expect(ascii("queue:1")) {
+      qso.get.record.name
+    }
+    expect(2) {
+      qso.get.count
+    }
+
+    println("xx")
+
+  }
+    
+}

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala?rev=961123&r1=961122&r2=961123&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala Wed Jul  7 04:04:25 2010
@@ -30,8 +30,8 @@ import org.apache.activemq.apollo.dto.Ha
 import org.apache.activemq.apollo.broker._
 import ReporterLevel._
 import store.HawtDBManager
-import org.apache.activemq.broker.store.{Store, StoreTransaction}
-import org.apache.activemq.apollo.store.{QueueStatus, MessageRecord, QueueRecord}
+import org.apache.activemq.broker.store.{Store, StoreBatch}
+import org.apache.activemq.apollo.store.{QueueEntryRecord, QueueStatus, MessageRecord, QueueRecord}
 
 object HawtDBStore extends Log {
   val DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
@@ -75,6 +75,8 @@ class HawtDBStore extends BaseService wi
   /**
    * Validates and then applies the configuration.
    */
+  def getQueueEntries(id: Long)(cb: (Seq[QueueEntryRecord]) => Unit) = {}
+
   def configure(config: HawtDBStoreDTO, reporter:Reporter) = ^{
     if ( validate(config, reporter) < ERROR ) {
       this.config = config
@@ -117,7 +119,7 @@ class HawtDBStore extends BaseService wi
 
   def flushMessage(id: Long)(cb: => Unit) = {}
 
-  def createStoreTransaction() = new HawtDBStoreTransaction
+  def createStoreBatch() = new HawtDBStoreBatch
 
 
   /////////////////////////////////////////////////////////////////////
@@ -125,16 +127,15 @@ class HawtDBStore extends BaseService wi
   // Implementation of the StoreTransaction interface
   //
   /////////////////////////////////////////////////////////////////////
-  class HawtDBStoreTransaction extends BaseRetained with StoreTransaction {
+  class HawtDBStoreBatch extends BaseRetained with StoreBatch {
 
     def store(delivery: MessageRecord) = {
 
     }
 
-    def enqueue(queue: Long, seq: Long, msg: Long) = {}
-
-    def dequeue(queue: Long, seq: Long, msg: Long) = {}
+    def dequeue(entry: QueueEntryRecord) = {}
 
+    def enqueue(entry: QueueEntryRecord) = {}
   }
 
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryRecord.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryRecord.java?rev=961123&r1=961122&r2=961123&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryRecord.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryRecord.java Wed Jul  7 04:04:25 2010
@@ -24,6 +24,7 @@ import org.fusesource.hawtbuf.Buffer;
 public class QueueEntryRecord {
 
     public long queueKey;
+    public long queueSeq;
     public long messageKey;
     public Buffer attachment;
     public int size;

Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala?rev=961123&r1=961122&r2=961123&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala Wed Jul  7 04:04:25 2010
@@ -32,7 +32,7 @@ import org.apache.activemq.apollo.store.
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-trait StoreTransaction extends Retained {
+trait StoreBatch extends Retained {
 
   /**
    * Assigns the delivery a store id if it did not already
@@ -43,12 +43,12 @@ trait StoreTransaction extends Retained 
   /**
    * Adds a delivery to a specified queue at a the specified position in the queue.
    */
-  def enqueue(queue:Long, seq:Long, msg:Long)
+  def enqueue(entry:QueueEntryRecord)
 
   /**
    * Removes a delivery from a specified queue at a the specified position in the queue.
    */
-  def dequeue(queue:Long, seq:Long, msg:Long)
+  def dequeue(entry:QueueEntryRecord)
 
 }
 
@@ -74,6 +74,11 @@ trait Store extends Service {
   def listQueues(cb: (Seq[Long])=>Unit )
 
   /**
+   * Loads the queue information for a given queue id.
+   */
+  def getQueueEntries(id:Long)(cb:(Seq[QueueEntryRecord])=>Unit )
+
+  /**
    * Removes a the delivery associated with the provided from any
    * internal buffers/caches.  The callback is executed once, the message is
    * no longer buffered.
@@ -86,10 +91,10 @@ trait Store extends Service {
   def loadMessage(id:Long)(cb:(Option[MessageRecord])=>Unit )
 
   /**
-   * Creates a StoreTransaction which is used to perform persistent
+   * Creates a StoreBatch which is used to perform persistent
    * operations as unit of work.
    */
-  def createStoreTransaction():StoreTransaction
+  def createStoreBatch():StoreBatch
 
 }
 

Copied: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/IntCounter.scala (from r961122, activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryRecord.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/IntCounter.scala?p2=activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/IntCounter.scala&p1=activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryRecord.java&r1=961122&r2=961123&rev=961123&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryRecord.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/IntCounter.scala Wed Jul  7 04:04:25 2010
@@ -14,19 +14,31 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.apollo.store;
-
-import org.fusesource.hawtbuf.Buffer;
+package org.apache.activemq.apollo.util
 
 /**
+ * <p>
+ * </p>
+ *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-public class QueueEntryRecord {
+class IntCounter(private var value:Int = 0) {
+
+  def get() = value
+
+  def incrementAndGet() = addAndGet(1)
+  def decrementAndGet() = addAndGet(-1)
+  def addAndGet(amount:Int) = {
+    value+=amount
+    value
+  }
 
-    public long queueKey;
-    public long messageKey;
-    public Buffer attachment;
-    public int size;
-    public short redeliveries;
+  def getAndIncrement() = getAndAdd(1)
+  def getAndDecrement() = getAndAdd(-11)
+  def getAndAdd(amount:Int) = {
+    val rc = value
+    value+=amount
+    rc
+  }
 
-}
+}
\ No newline at end of file

Copied: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/LongCounter.scala (from r961122, activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryRecord.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/LongCounter.scala?p2=activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/LongCounter.scala&p1=activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryRecord.java&r1=961122&r2=961123&rev=961123&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryRecord.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/LongCounter.scala Wed Jul  7 04:04:25 2010
@@ -14,19 +14,31 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.apollo.store;
-
-import org.fusesource.hawtbuf.Buffer;
+package org.apache.activemq.apollo.util
 
 /**
+ * <p>
+ * </p>
+ *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-public class QueueEntryRecord {
+class LongCounter(private var value:Long = 0) {
+
+  def get() = value
+
+  def incrementAndGet() = addAndGet(1)
+  def decrementAndGet() = addAndGet(-1)
+  def addAndGet(amount:Long) = {
+    value+=amount
+    value
+  }
 
-    public long queueKey;
-    public long messageKey;
-    public Buffer attachment;
-    public int size;
-    public short redeliveries;
+  def getAndIncrement() = getAndAdd(1)
+  def getAndDecrement() = getAndAdd(-11)
+  def getAndAdd(amount:Long) = {
+    val rc = value
+    value+=amount
+    rc
+  }
 
-}
+}
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-util/src/test/scala/org/apache/activemq/apollo/broker/FunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/test/scala/org/apache/activemq/apollo/broker/FunSuiteSupport.scala?rev=961123&r1=961122&r2=961123&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/test/scala/org/apache/activemq/apollo/broker/FunSuiteSupport.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/test/scala/org/apache/activemq/apollo/broker/FunSuiteSupport.scala Wed Jul  7 04:04:25 2010
@@ -30,6 +30,7 @@ abstract class FunSuiteSupport extends F
       case _ => System.getProperty("basedir", ".")
     }
     debug("using basedir: " + _basedir)
+    super.beforeAll(map)
   }
 
   //

Modified: activemq/sandbox/activemq-apollo-actor/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/pom.xml?rev=961123&r1=961122&r2=961123&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/pom.xml Wed Jul  7 04:04:25 2010
@@ -137,6 +137,7 @@
     <module>activemq-util</module>
     <module>activemq-transport</module>
     <module>activemq-store</module>
+    <module>activemq-cassandra</module>
     <module>activemq-broker</module>
     <module>activemq-selector</module>
     <module>activemq-tcp</module>