You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:04:27 UTC
svn commit: r961123 - in /activemq/sandbox/activemq-apollo-actor: ./
activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/
activemq-cassandra/ activemq-cassandra/src/ activemq-cassandra/src/main/
activemq-cassandra/src/main/proto/ activemq-...
Author: chirino
Date: Wed Jul 7 04:04:25 2010
New Revision: 961123
URL: http://svn.apache.org/viewvc?rev=961123&view=rev
Log:
initial pass at a cassandra store impl
Added:
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/pom.xml
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/proto/
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/proto/data.proto
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/Schema.scala
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/resources/
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/resources/org/
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/resources/org/apache/
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/resources/org/apache/activemq/
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/resources/org/apache/activemq/broker/
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/resources/org/apache/activemq/broker/store/
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/resources/org/apache/activemq/broker/store/cassandra/
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/resources/org/apache/activemq/broker/store/cassandra/log4j.properties
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/resources/org/apache/activemq/broker/store/cassandra/storage-conf.xml
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/scala/
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/scala/org/
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/scala/org/apache/
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/scala/org/apache/activemq/
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/scala/org/apache/activemq/broker/
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/scala/org/apache/activemq/broker/store/
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/scala/org/apache/activemq/broker/store/cassandra/
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/scala/org/apache/activemq/broker/store/cassandra/CassandraServerMixin.scala
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreTest.scala
activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/IntCounter.scala
- copied, changed from r961122, activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryRecord.java
activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/LongCounter.scala
- copied, changed from r961122, activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryRecord.java
Removed:
activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/memory/MemoryStore.scala
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryRecord.java
activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala
activemq/sandbox/activemq-apollo-actor/activemq-util/src/test/scala/org/apache/activemq/apollo/broker/FunSuiteSupport.scala
activemq/sandbox/activemq-apollo-actor/pom.xml
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=961123&r1=961122&r2=961123&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jul 7 04:04:25 2010
@@ -20,7 +20,7 @@ import _root_.org.apache.activemq.filter
import _root_.java.lang.{String}
import _root_.org.fusesource.hawtdispatch._
import org.fusesource.hawtbuf._
-import org.apache.activemq.broker.store.StoreTransaction
+import org.apache.activemq.broker.store.StoreBatch
/**
* A producer which sends Delivery objects to a delivery consumer.
@@ -151,13 +151,13 @@ class Delivery extends BaseRetained {
/**
* The transaction the delivery is participating in.
*/
- var storeTx:StoreTransaction = null
+ var storeBatch:StoreBatch = null
/**
* Set if the producer requires an ack to be sent back. Consumer
* should execute once the message is processed.
*/
- var ack:(StoreTransaction)=>Unit = null
+ var ack:(StoreBatch)=>Unit = null
def copy() = (new Delivery).set(this)
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961123&r1=961122&r2=961123&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul 7 04:04:25 2010
@@ -23,9 +23,9 @@ import org.fusesource.hawtdispatch.{Scal
import org.apache.activemq.util.TreeMap.TreeEntry
import java.util.{Collections, ArrayList, LinkedList}
import org.apache.activemq.util.list.{LinkedNodeList, LinkedNode}
-import org.apache.activemq.broker.store.{StoreTransaction}
+import org.apache.activemq.broker.store.{StoreBatch}
import protocol.ProtocolFactory
-import org.apache.activemq.apollo.store.MessageRecord
+import org.apache.activemq.apollo.store.{QueueEntryRecord, MessageRecord}
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -75,7 +75,7 @@ class Queue(val host: VirtualHost, val d
})
- val ack_source = createSource(new ListEventAggregator[(LinkedQueueEntry, StoreTransaction)](), dispatchQueue)
+ val ack_source = createSource(new ListEventAggregator[(LinkedQueueEntry, StoreBatch)](), dispatchQueue)
ack_source.setEventHandler(^ {drain_acks});
ack_source.resume
@@ -133,18 +133,18 @@ class Queue(val host: VirtualHost, val d
entry.created(next_message_seq, delivery)
if( delivery.ack!=null ) {
- delivery.ack(delivery.storeTx)
+ delivery.ack(delivery.storeBatch)
}
if (delivery.storeId != -1) {
- delivery.storeTx.enqueue(storeId, entry.seq, delivery.storeId)
- delivery.storeTx.release
+ delivery.storeBatch.enqueue(entry.createQueueEntryRecord)
+ delivery.storeBatch.release
}
size += entry.value.size
entries.addLast(entry)
counter += 1;
- if( full ) {
+ if( full && host.store!=null ) {
// swap
}
@@ -156,21 +156,20 @@ class Queue(val host: VirtualHost, val d
}
}
- def ack(entry: QueueEntry, tx:StoreTransaction) = {
-
+ def ack(entry: QueueEntry, sb:StoreBatch) = {
if (entry.value.ref != -1) {
- val transaction = if( tx == null ) {
- host.database.createStoreTransaction
+ val storeBatch = if( sb == null ) {
+ host.store.createStoreBatch
} else {
- tx
+ sb
}
- transaction.dequeue(storeId, entry.seq, entry.value.ref)
- if( tx == null ) {
- transaction.release
+ storeBatch.dequeue(entry.createQueueEntryRecord)
+ if( sb == null ) {
+ storeBatch.release
}
}
- if( tx != null ) {
- tx.release
+ if( sb != null ) {
+ sb.release
}
counter -= 1
@@ -178,7 +177,6 @@ class Queue(val host: VirtualHost, val d
entry.tombstone
if (counter == 0) {
-// trace("empty.. triggering refill")
messages.refiller.run
}
}
@@ -234,10 +232,10 @@ class Queue(val host: VirtualHost, val d
if (delivery.storeId != -1) {
// If the message has a store id, then this delivery will
// need a tx to track the store changes.
- if( delivery.storeTx == null ) {
- delivery.storeTx = host.database.createStoreTransaction
+ if( delivery.storeBatch == null ) {
+ delivery.storeBatch = host.store.createStoreBatch
} else {
- delivery.storeTx.retain
+ delivery.storeBatch.retain
}
}
@@ -295,7 +293,6 @@ class Queue(val host: VirtualHost, val d
}
def swap() = {
-
class Prio(val entry:QueueEntry) extends Comparable[Prio] {
var value = 0
def compareTo(o: Prio) = o.value - value
@@ -362,7 +359,7 @@ class Queue(val host: VirtualHost, val d
if( stored!=null && !stored.loading) {
// start loading it back...
stored.loading = true
- host.database.loadMessage(stored.ref) { delivery =>
+ host.store.loadMessage(stored.ref) { delivery =>
// pass off to a source so it can aggregate multiple
// loads to reduce cross thread synchronization
if( delivery.isDefined ) {
@@ -376,7 +373,7 @@ class Queue(val host: VirtualHost, val d
if( loaded!=null ) {
var ref = loaded.delivery.storeId
if( ref == -1 ) {
- val tx = host.database.createStoreTransaction
+ val tx = host.store.createStoreBatch
val message = loaded.delivery.message
val sm = new MessageRecord
@@ -386,11 +383,12 @@ class Queue(val host: VirtualHost, val d
tx.store(sm)
loaded.delivery.storeId = sm.id
- tx.enqueue(storeId, entry.seq, sm.id)
+
+ tx.enqueue(entry.createQueueEntryRecord)
tx.release
}
flushingSize += entry.value.size
- host.database.flushMessage(ref) {
+ host.store.flushMessage(ref) {
store_flush_source.merge(entry)
}
}
@@ -454,6 +452,15 @@ class QueueEntry(val queue:Queue) extend
var browsing:List[Subscription] = Nil
var value:EntryType = null
+ def createQueueEntryRecord = {
+ val qer = new QueueEntryRecord
+ qer.queueKey = queue.storeId
+ qer.queueSeq = seq
+ qer.messageKey = value.ref
+ qer.size = value.size
+ qer
+ }
+
def compareTo(o: QueueEntry) = {
(seq - o.seq).toInt
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=961123&r1=961122&r2=961123&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jul 7 04:04:25 2010
@@ -28,7 +28,6 @@ import org.apache.activemq.apollo.dto.Vi
import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
import ReporterLevel._
-import org.apache.activemq.apollo.store.memory.MemoryStore
import org.apache.activemq.broker.store.{Store}
import org.fusesource.hawtbuf.proto.WireFormat
import org.apache.activemq.apollo.store.QueueRecord
@@ -88,7 +87,7 @@ class VirtualHost(val broker: Broker) ex
this.names = names.toList
}
- var database:Store = new MemoryStore()
+ var store:Store = null
var transactionManager:TransactionManagerX = new TransactionManagerX
var protocols = Map[AsciiBuffer, WireFormat]()
@@ -112,28 +111,28 @@ class VirtualHost(val broker: Broker) ex
override protected def _start(onCompleted:Runnable):Unit = {
+ if( store!=null ) {
+ store.start();
+ store.listQueues { ids =>
+ for( id <- ids) {
+ store.getQueueStatus(id) { x =>
+ x match {
+ case Some(info)=>
+ dispatchQueue ^{
+ val dest = DestinationParser.parse(info.record.name , destination_parser_options)
+ if( dest.getDomain == Domain.QUEUE_DOMAIN ) {
+
+ val queue = new Queue(this, dest, id)
+ queue.first_seq = info.first
+ queue.last_seq = info.last
+ queue.message_seq_counter = info.last+1
+ queue.count = info.count
- database.start();
-
- database.listQueues { ids =>
- for( id <- ids) {
- database.getQueueStatus(id) { x =>
- x match {
- case Some(info)=>
- dispatchQueue ^{
- val dest = DestinationParser.parse(info.record.name , destination_parser_options)
- if( dest.getDomain == Domain.QUEUE_DOMAIN ) {
-
- val queue = new Queue(this, dest, id)
- queue.first_seq = info.first
- queue.last_seq = info.last
- queue.message_seq_counter = info.last+1
- queue.count = info.count
-
- queues.put(info.record.name, queue)
+ queues.put(info.record.name, queue)
+ }
}
+ case _ =>
}
- case _ =>
}
}
}
@@ -163,7 +162,9 @@ class VirtualHost(val broker: Broker) ex
// }
// done.await();
- database.stop();
+ if( store!=null ) {
+ store.stop();
+ }
onCompleted.run
}
@@ -184,21 +185,27 @@ class VirtualHost(val broker: Broker) ex
def addQueue(dest:Destination)(cb: (Queue)=>Unit ) = ^{
val name = DestinationParser.toBuffer(dest, destination_parser_options)
- val record = new QueueRecord
- record.name = name
- database.addQueue(record) { rc =>
- rc match {
- case Some(id) =>
- dispatchQueue ^ {
- val queue = new Queue(this, dest, id)
- queues.put(name, queue)
- cb(queue)
- }
- case None => // store could not create
- cb(null)
+ if( store!=null ) {
+ val record = new QueueRecord
+ record.name = name
+ store.addQueue(record) { rc =>
+ rc match {
+ case Some(id) =>
+ dispatchQueue ^ {
+ val queue = new Queue(this, dest, id)
+ queues.put(name, queue)
+ cb(queue)
+ }
+ case None => // store could not create
+ cb(null)
+ }
}
+ } else {
+ val queue = new Queue(this, dest, -1)
+ queues.put(name, queue)
+ cb(queue)
}
- null
+
} |>>: dispatchQueue
def createSubscription(consumer:ConsumerContext):BrokerSubscription = {
Added: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/pom.xml?rev=961123&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/pom.xml (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/pom.xml Wed Jul 7 04:04:25 2010
@@ -0,0 +1,184 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version
+ 2.0 (the "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ applicable law or agreed to in writing, software distributed under
+ the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+ OR CONDITIONS OF ANY KIND, either express or implied. See the
+ License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-scala</artifactId>
+ <version>6.0-SNAPSHOT</version>
+ </parent>
+
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-cassandra</artifactId>
+ <packaging>jar</packaging>
+ <version>6.0-SNAPSHOT</version>
+
+ <name>ActiveMQ :: Store :: Cassandra</name>
+
+ <repositories>
+ <repository>
+ <id>shorrockin Maven 2 Repository</id>
+ <url>http://maven.shorrockin.com</url>
+ </repository>
+ </repositories>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-store</artifactId>
+ <version>6.0-SNAPSHOT</version>
+ </dependency>
+
+ <!-- val shorrockinRepo = "shorrockin Maven 2 Repository" at "http://maven.shorrockin.com" -->
+
+ <dependency>
+ <groupId>com.shorrockin</groupId>
+ <artifactId>cascal</artifactId>
+ <version>1.2-SNAPSHOT</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.fusesource.hawtbuf</groupId>
+ <artifactId>hawtbuf-proto</artifactId>
+ <version>${hawtbuf-version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-dto</artifactId>
+ <version>6.0-SNAPSHOT</version>
+ </dependency>
+
+ <!-- Scala Support -->
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <scope>compile</scope>
+ <version>${scala-version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-compiler</artifactId>
+ <version>${scala-version}</version>
+ <scope>compile</scope>
+ <optional>true</optional>
+ </dependency>
+
+ <!-- Testing Dependencies -->
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest</artifactId>
+ <version>${scalatest-version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-store</artifactId>
+ <version>6.0-SNAPSHOT</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ <version>${junit-version}</version>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <scope>test</scope>
+ <version>${log4j-version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ <version>2.4</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ <version>1.2</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>commons-collections</groupId>
+ <artifactId>commons-collections</artifactId>
+ <version>3.2.1</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.clhm</groupId>
+ <artifactId>clhm-production</artifactId>
+ <version>1.0</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.collections</groupId>
+ <artifactId>google-collections</artifactId>
+ <version>1.0</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>flexjson</groupId>
+ <artifactId>flexjson</artifactId>
+ <version>1.7</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>high-scale-lib</groupId>
+ <artifactId>high-scale-lib</artifactId>
+ <version>1.0</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+
+ <plugin>
+ <groupId>org.fusesource.hawtbuf</groupId>
+ <artifactId>hawtbuf-proto</artifactId>
+ <version>${hawtbuf-version}</version>
+ <configuration>
+ <type>alt</type>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ </plugins>
+ </build>
+
+</project>
Added: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/proto/data.proto
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/proto/data.proto?rev=961123&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/proto/data.proto (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/proto/data.proto Wed Jul 7 04:04:25 2010
@@ -0,0 +1,35 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+package org.apache.activemq.broker.store.cassandra;
+
+option java_multiple_files = true;
+
+message PBMessageRecord {
+ required bytes messageId = 2 [java_override_type = "AsciiBuffer"];
+ required bytes protocol = 3 [java_override_type = "AsciiBuffer"];
+ required int32 size = 4;
+ optional bytes value = 5;
+ optional int64 stream = 6;
+ optional int64 expiration = 7;
+}
+
+message PBQueueEntryRecord {
+ required int64 messageKey = 1;
+ optional bytes attachment = 2;
+ optional int32 size = 3;
+ optional int32 redeliveries = 4;
+}
Added: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala?rev=961123&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala Wed Jul 7 04:04:25 2010
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store.cassandra
+
+import org.apache.activemq.apollo.broker.{Logging, BaseService}
+import com.shorrockin.cascal.session._
+import com.shorrockin.cascal.utils.Conversions._
+import java.util.{HashMap}
+import org.fusesource.hawtbuf.AsciiBuffer._
+import org.fusesource.hawtbuf.{AsciiBuffer, DataByteArrayInputStream, DataByteArrayOutputStream, Buffer}
+import org.apache.activemq.apollo.store.{QueueEntryRecord, QueueStatus, MessageRecord, QueueRecord}
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class CassandraClient() {
+ var schema: Schema = new Schema("ActiveMQ")
+ var hosts = Host("127.0.0.1", 9160, 3000) :: Nil
+
+ implicit def toByteArray(buffer: Buffer) = buffer.toByteArray
+
+ protected var pool: SessionPool = null
+
+ def start() = {
+ val params = new PoolParams(10, ExhaustionPolicy.Fail, 500L, 6, 2)
+ pool = new SessionPool(hosts, params, Consistency.One)
+ }
+
+ def stop() = {
+ pool.close
+ }
+
+ protected def withSession[E](block: Session => E): E = {
+ val session = pool.checkout
+ try {
+ block(session)
+ } finally {
+ pool.checkin(session)
+ }
+ }
+
+ implicit def decodeMessageRecord(v: Array[Byte]): MessageRecord = {
+ import PBMessageRecord._
+ val pb = PBMessageRecordBuffer.parseUnframed(v)
+ val rc = new MessageRecord
+ rc.messageId = pb.getMessageId
+ rc.protocol = pb.getProtocol
+ rc.size = pb.getSize
+ rc.value = pb.getValue
+ rc.stream = pb.getStream
+ rc.expiration = pb.getExpiration
+ rc
+ }
+
+ implicit def encodeMessageRecord(v: MessageRecord): Array[Byte] = {
+ import PBMessageRecord._
+ val pb = new PBMessageRecordBean
+ pb.setMessageId(v.messageId)
+ pb.setProtocol(v.protocol)
+ pb.setSize(v.size)
+ pb.setValue(v.value)
+ pb.setStream(v.stream)
+ pb.setExpiration(v.expiration)
+ pb.freeze.toUnframedByteArray
+ }
+
+ implicit def decodeQueueEntryRecord(v: Array[Byte]): QueueEntryRecord = {
+ import PBQueueEntryRecord._
+ val pb = PBQueueEntryRecordBuffer.parseUnframed(v)
+ val rc = new QueueEntryRecord
+ rc.messageKey = pb.getMessageKey
+ rc.attachment = pb.getAttachment
+ rc.size = pb.getSize
+ rc.redeliveries = pb.getRedeliveries.toShort
+ rc
+ }
+
+ implicit def encodeQueueEntryRecord(v: QueueEntryRecord): Array[Byte] = {
+ import PBQueueEntryRecord._
+ val pb = new PBQueueEntryRecordBean
+ pb.setMessageKey(v.messageKey)
+ pb.setAttachment(v.attachment)
+ pb.setSize(v.size)
+ pb.setRedeliveries(v.redeliveries)
+ pb.freeze.toUnframedByteArray
+ }
+
+ def addQueue(record: QueueRecord) = {
+ withSession {
+ session =>
+ session.insert(schema.queue_name \ (record.id, record.name))
+ }
+ }
+
+ def listQueues: Seq[Long] = {
+ withSession {
+ session =>
+ session.list(schema.queue_name).map {
+ x =>
+ val id: Long = x.name
+ id
+ }
+ }
+ }
+
+ def getQueueStatus(id: Long): Option[QueueStatus] = {
+ withSession {
+ session =>
+ session.get(schema.queue_name \ id) match {
+ case Some(x) =>
+
+ val rc = new QueueStatus
+ rc.record = new QueueRecord
+ rc.record.id = id
+ rc.record.name = new AsciiBuffer(x)
+
+ rc.count = session.count( schema.entries \ id )
+
+ // TODO
+ // rc.count =
+ // rc.first =
+ // rc.last =
+
+ Some(rc)
+ case None =>
+ None
+ }
+ }
+ }
+
+
+ def store(txs:Seq[CassandraStore#CassandraBatch]) {
+ withSession {
+ session =>
+ var batch = List[Operation]()
+ txs.foreach {
+ tx =>
+ tx.actions.foreach {
+ case (msg, action) =>
+ var rc =
+ if (action.store != null) {
+ batch ::= Insert( schema.message_data \ (msg, action.store) )
+ }
+ action.enqueues.foreach {
+ queueEntry =>
+ val qid = queueEntry.queueKey
+ val seq = queueEntry.queueSeq
+ batch ::= Insert( schema.entries \ qid \ (seq, queueEntry) )
+ }
+ action.dequeues.foreach {
+ queueEntry =>
+ val qid = queueEntry.queueKey
+ val seq = queueEntry.queueSeq
+ batch ::= Delete( schema.entries \ qid, ColumnPredicate(seq :: Nil) )
+ }
+ }
+ }
+ session.batch(batch)
+ }
+ }
+
+ def loadMessage(id: Long): Option[MessageRecord] = {
+ withSession {
+ session =>
+ session.get(schema.message_data \ id) match {
+ case Some(x) =>
+ val rc: MessageRecord = x.value
+ rc.id = id
+ Some(rc)
+ case None =>
+ None
+ }
+ }
+ }
+
+ def getQueueEntries(qid: Long): Seq[QueueEntryRecord] = {
+ withSession {
+ session =>
+ session.list(schema.entries \ qid).map { x=>
+ val rc:QueueEntryRecord = x.value
+ rc.queueKey = qid
+ rc.queueSeq = x.name
+ rc
+ }
+ }
+ }
+}
\ No newline at end of file
Added: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala?rev=961123&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala Wed Jul 7 04:04:25 2010
@@ -0,0 +1,343 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store.cassandra
+
+import org.apache.activemq.broker.store.{StoreBatch, Store}
+import org.fusesource.hawtdispatch.BaseRetained
+import org.apache.activemq.apollo.broker.{Logging, Log, BaseService}
+import com.shorrockin.cascal.session._
+import org.fusesource.hawtdispatch.ScalaDispatch._
+import java.util.concurrent.atomic.AtomicLong
+import collection.mutable.ListBuffer
+import org.fusesource.hawtbuf.{Buffer, AsciiBuffer}
+import com.shorrockin.cascal.model.Key
+import org.apache.log.output.db.ColumnType
+import java.util.{HashSet, HashMap}
+import java.util.concurrent.{TimeUnit, Executors, ExecutorService}
+import org.apache.activemq.apollo.util.IntCounter
+import com.shorrockin.cascal.utils.Conversions._
+import org.apache.activemq.apollo.store.{QueueEntryRecord, MessageRecord, QueueStatus, QueueRecord}
+import collection.Seq
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class CassandraStore extends Store with BaseService with Logging {
+
+ import CassandraStoreHelper._
+ override protected def log = CassandraStoreHelper
+
+ /////////////////////////////////////////////////////////////////////
+ //
+ // Implementation of the BaseService interface
+ //
+ /////////////////////////////////////////////////////////////////////
+ val dispatchQueue = createQueue("cassandra store")
+
+ var next_queue_key = new AtomicLong(0)
+ var next_msg_key = new AtomicLong(0)
+
+ val client = new CassandraClient()
+ protected var executor_pool:ExecutorService = _
+
+ protected def _start(onCompleted: Runnable) = {
+ executor_pool = Executors.newCachedThreadPool
+ client.schema = Schema("ActiveMQ")
+ client.start
+ onCompleted.run
+ }
+
+ protected def _stop(onCompleted: Runnable) = {
+ client.stop
+ new Thread() {
+ override def run = {
+ executor_pool.shutdown
+ executor_pool.awaitTermination(1, TimeUnit.DAYS)
+ executor_pool = null
+ onCompleted.run
+ }
+ }.start
+ }
+
+ /////////////////////////////////////////////////////////////////////
+ //
+ // Implementation of the BrokerDatabase interface
+ //
+ /////////////////////////////////////////////////////////////////////
+
+ def addQueue(record: QueueRecord)(cb: (Option[Long]) => Unit) = {
+ val key = next_queue_key.incrementAndGet
+ executor_pool ^{
+ client.addQueue(record)
+ cb(Some(key))
+ }
+ }
+
+ def getQueueStatus(id: Long)(cb: (Option[QueueStatus]) => Unit) = {
+ executor_pool ^{
+ cb( client.getQueueStatus(id) )
+ }
+ }
+
+ def listQueues(cb: (Seq[Long]) => Unit) = {
+ executor_pool ^{
+ cb( client.listQueues )
+ }
+ }
+
+ def loadMessage(id: Long)(cb: (Option[MessageRecord]) => Unit) = {
+ executor_pool ^{
+ cb( client.loadMessage(id) )
+ }
+ }
+
+
+ def getQueueEntries(id: Long)(cb: (Seq[QueueEntryRecord]) => Unit) = {
+ executor_pool ^{
+ cb( client.getQueueEntries(id) )
+ }
+ }
+
+ def flushMessage(id: Long)(cb: => Unit) = ^{
+ val action: CassandraBatch#MessageAction = pendingStores.get(id)
+ if( action == null ) {
+ cb
+ } else {
+ val prevDisposer = action.tx.getDisposer
+ action.tx.setDisposer(^{
+ cb
+ if(prevDisposer!=null) {
+ prevDisposer.run
+ }
+ })
+ flush(action.tx.txid)
+ }
+
+ } >>: dispatchQueue
+
+ def createStoreBatch() = new CassandraBatch
+
+
+ /////////////////////////////////////////////////////////////////////
+ //
+ // Implementation of the StoreBatch interface
+ //
+ /////////////////////////////////////////////////////////////////////
+ class CassandraBatch extends BaseRetained with StoreBatch {
+
+ class MessageAction {
+
+ var msg= 0L
+ var store: MessageRecord = null
+ var enqueues = ListBuffer[QueueEntryRecord]()
+ var dequeues = ListBuffer[QueueEntryRecord]()
+
+ def tx = CassandraBatch.this
+ def isEmpty() = store==null && enqueues==Nil && dequeues==Nil
+ def cancel() = {
+ tx.rm(msg)
+ if( tx.isEmpty ) {
+ tx.cancel
+ }
+ }
+ }
+
+ var actions = Map[Long, MessageAction]()
+ var txid:Int = 0
+
+ def rm(msg:Long) = {
+ actions -= msg
+ }
+
+ def isEmpty = actions.isEmpty
+ def cancel = {
+ delayedTransactions.remove(txid)
+ onPerformed
+ }
+
+ def store(record: MessageRecord) = {
+ record.id = next_msg_key.incrementAndGet
+ val action = new MessageAction
+ action.msg = record.id
+ action.store = record
+ this.synchronized {
+ actions += record.id -> action
+ }
+ }
+
+ def action(msg:Long) = {
+ actions.get(msg) match {
+ case Some(x) => x
+ case None =>
+ val x = new MessageAction
+ x.msg = msg
+ actions += msg->x
+ x
+ }
+ }
+
+ def enqueue(entry: QueueEntryRecord) = {
+ this.synchronized {
+ action(entry.messageKey).enqueues += entry
+ }
+ }
+
+ def dequeue(entry: QueueEntryRecord) = {
+ this.synchronized {
+ action(entry.messageKey).dequeues += entry
+ }
+ }
+
+ override def dispose = {
+ transaction_source.merge(this)
+ }
+
+ def onPerformed() {
+ super.dispose
+ }
+ }
+
+ def key(x:QueueEntryRecord) = (x.queueKey, x.queueSeq)
+
+ val transaction_source = createSource(new ListEventAggregator[CassandraBatch](), dispatchQueue)
+ transaction_source.setEventHandler(^{drain_transactions});
+ transaction_source.resume
+
+ var pendingStores = new HashMap[Long, CassandraBatch#MessageAction]()
+ var pendingEnqueues = new HashMap[(Long,Long), CassandraBatch#MessageAction]()
+ var delayedTransactions = new HashMap[Int, CassandraBatch]()
+
+ var next_tx_id = new IntCounter
+
+ def drain_transactions = {
+ transaction_source.getData.foreach { tx =>
+
+ val tx_id = next_tx_id.incrementAndGet
+ tx.txid = tx_id
+ delayedTransactions.put(tx_id, tx)
+ dispatchQueue.dispatchAfter(30, TimeUnit.SECONDS, ^{flush(tx_id)})
+
+ tx.actions.foreach { case (msg, action) =>
+ if( action.store!=null ) {
+ pendingStores.put(msg, action)
+ }
+ action.enqueues.foreach { queueEntry=>
+ pendingEnqueues.put(key(queueEntry), action)
+ }
+
+
+ // dequeues can cancel out previous enqueues
+ action.dequeues.foreach { currentDequeue=>
+ val currentKey = key(currentDequeue)
+ val prevAction:CassandraBatch#MessageAction = pendingEnqueues.remove(currentKey)
+ if( prevAction!=null ) {
+
+ // yay we can cancel out a previous enqueue
+ prevAction.enqueues = prevAction.enqueues.filterNot( x=> key(x) == currentKey )
+
+ // if the message is not in any queues.. we can gc it..
+ if( prevAction.enqueues == Nil && prevAction.store !=null ) {
+ pendingStores.remove(msg)
+ prevAction.store = null
+ }
+
+ // Cancel the action if it's now empty
+ if( prevAction.isEmpty ) {
+ action.cancel()
+ }
+
+ // since we canceled out the previous enqueue.. now cancel out the action
+ action.dequeues = action.dequeues.filterNot( x=> key(x) == currentDequeue)
+ if( action.isEmpty ) {
+ action.cancel()
+ }
+ }
+ }
+ }
+
+ }
+ }
+
+ def flush(tx_id:Int) = {
+ flush_source.merge(tx_id)
+ }
+
+ val flush_source = createSource(new ListEventAggregator[Int](), dispatchQueue)
+ flush_source.setEventHandler(^{drain_flushes});
+ flush_source.resume
+
+ def drain_flushes = {
+ val txs = flush_source.getData.flatMap{ tx_id =>
+ val tx = delayedTransactions.remove(tx_id)
+ // Message may be flushed or canceled before the timeout flush event..
+ // tx may be null in those cases
+ if (tx!=null) {
+
+ tx.actions.foreach { case (msg, action) =>
+ if( action.store!=null ) {
+ pendingStores.remove(msg)
+ }
+ action.enqueues.foreach { queueEntry=>
+ pendingEnqueues.remove(key(queueEntry), action)
+ }
+ }
+
+ Some(tx)
+ } else {
+ None
+ }
+ }
+
+ if( !txs.isEmpty ) {
+ // suspend so that we don't process more flush requests while
+ // we are concurrently executing a flush
+ flush_source.suspend
+ executor_pool ^{
+ client.store(txs)
+ txs.foreach { x=>
+ x.onPerformed
+ }
+ flush_source.resume
+ }
+ }
+ }
+
+}
+
+object CassandraStoreHelper extends Log {
+ val DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
+
+// /**
+// * Creates a default a configuration object.
+// */
+// def default() = {
+// val rc = new HawtDBStoreDTO
+// rc.directory = new File("activemq-data")
+// rc
+// }
+//
+// /**
+// * Validates a configuration object.
+// */
+// def validate(config: HawtDBStoreDTO, reporter:Reporter):ReporterLevel = {
+// new Reporting(reporter) {
+// if( config.directory == null ) {
+// error("hawtdb store must be configured with a directroy.")
+// }
+// }.result
+// }
+}
Added: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/Schema.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/Schema.scala?rev=961123&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/Schema.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/Schema.scala Wed Jul 7 04:04:25 2010
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store.cassandra
+
+import org.fusesource.hawtbuf.AsciiBuffer._
+import org.fusesource.hawtbuf.Buffer
+import com.shorrockin.cascal.model.{StandardKey, Keyspace}
+import com.shorrockin.cascal.session.ColumnPredicate
+
+case class Schema(name:String) {
+
+ implicit def toByteArray(buffer:Buffer) = buffer.toByteArray
+
+ val keyspace = Keyspace(name)
+
+ /**
+ */
+ val message = keyspace \ "messages"
+ val message_data = message \ "data"
+
+ /**
+ */
+ val queue = keyspace \ "queues"
+ val queue_name = queue \ "name"
+
+ /**
+ */
+ val entries = keyspace \ "entries"
+
+// protected val subscriptionColumns = columns(subscriptionKey, "destination" :: "created" :: "prefetchCount" :: "inactivityTimeout" :: Nil)
+//
+// protected def columns(key: StandardKey, names: Seq[String]) = {
+// val columns = names.map{n => (key \ (n, "")).name}
+// ColumnPredicate(columns)
+// }
+}
\ No newline at end of file
Added: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/resources/org/apache/activemq/broker/store/cassandra/log4j.properties
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/resources/org/apache/activemq/broker/store/cassandra/log4j.properties?rev=961123&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/resources/org/apache/activemq/broker/store/cassandra/log4j.properties (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/resources/org/apache/activemq/broker/store/cassandra/log4j.properties Wed Jul 7 04:04:25 2010
@@ -0,0 +1,10 @@
+log4j.rootLogger=DEBUG, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%5p %d [%c %t] %m%n
+
+# disable cassandra logging for test cases
+log4j.logger.org.apache.cassandra=DEBUG
+log4j.logger.org.apache.cassandra.service=DEBUG
+log4j.logger.org.apache.cassandra.thrift=DEBUG
\ No newline at end of file
Added: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/resources/org/apache/activemq/broker/store/cassandra/storage-conf.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/resources/org/apache/activemq/broker/store/cassandra/storage-conf.xml?rev=961123&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/resources/org/apache/activemq/broker/store/cassandra/storage-conf.xml (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/resources/org/apache/activemq/broker/store/cassandra/storage-conf.xml Wed Jul 7 04:04:25 2010
@@ -0,0 +1,54 @@
+<Storage>
+ <ClusterName>Test</ClusterName>
+
+ <Keyspaces>
+ <Keyspace Name="ActiveMQ">
+
+ <!-- regular records with text keys -->
+ <ColumnFamily Name="messages" CompareWith="LongType"/>
+ <ColumnFamily Name="queues" CompareWith="LongType"/>
+ <ColumnFamily Name="entries" CompareWith="LongType" KeysCached="10000"/>
+ <!-- todo.. play with: RowsCached="100%" KeysCached="100%"-->
+
+ <ReplicaPlacementStrategy>org.apache.cassandra.locator.RackUnawareStrategy</ReplicaPlacementStrategy>
+ <ReplicationFactor>1</ReplicationFactor>
+ <EndPointSnitch>org.apache.cassandra.locator.EndPointSnitch</EndPointSnitch>
+ </Keyspace>
+ </Keyspaces>
+
+ <AutoBootstrap>false</AutoBootstrap>
+ <Authenticator>org.apache.cassandra.auth.AllowAllAuthenticator</Authenticator>
+ <Partitioner>org.apache.cassandra.dht.OrderPreservingPartitioner</Partitioner>
+ <InitialToken></InitialToken>
+ <CommitLogDirectory>%temp-dir%commitlog</CommitLogDirectory>
+
+ <DataFileDirectories>
+ <DataFileDirectory>%temp-dir%data</DataFileDirectory>
+ </DataFileDirectories>
+
+ <Seeds>
+ <Seed>127.0.0.1</Seed>
+ </Seeds>
+ <RpcTimeoutInMillis>10000</RpcTimeoutInMillis>
+ <CommitLogRotationThresholdInMB>128</CommitLogRotationThresholdInMB>
+ <ListenAddress>localhost</ListenAddress>
+ <StoragePort>7000</StoragePort>
+ <ThriftAddress>localhost</ThriftAddress>
+ <ThriftPort>9160</ThriftPort>
+ <ThriftFramedTransport>false</ThriftFramedTransport>
+ <DiskAccessMode>auto</DiskAccessMode>
+ <RowWarningThresholdInMB>512</RowWarningThresholdInMB>
+ <SlicedBufferSizeInKB>64</SlicedBufferSizeInKB>
+ <FlushDataBufferSizeInMB>32</FlushDataBufferSizeInMB>
+ <FlushIndexBufferSizeInMB>8</FlushIndexBufferSizeInMB>
+ <ColumnIndexSizeInKB>64</ColumnIndexSizeInKB>
+ <MemtableThroughputInMB>64</MemtableThroughputInMB>
+ <BinaryMemtableThroughputInMB>256</BinaryMemtableThroughputInMB>
+ <MemtableOperationsInMillions>0.3</MemtableOperationsInMillions>
+ <MemtableFlushAfterMinutes>60</MemtableFlushAfterMinutes>
+ <ConcurrentReads>8</ConcurrentReads>
+ <ConcurrentWrites>32</ConcurrentWrites>
+ <CommitLogSync>periodic</CommitLogSync>
+ <CommitLogSyncPeriodInMS>10000</CommitLogSyncPeriodInMS>
+ <GCGraceSeconds>864000</GCGraceSeconds>
+</Storage>
Added: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/scala/org/apache/activemq/broker/store/cassandra/CassandraServerMixin.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/scala/org/apache/activemq/broker/store/cassandra/CassandraServerMixin.scala?rev=961123&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/scala/org/apache/activemq/broker/store/cassandra/CassandraServerMixin.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/scala/org/apache/activemq/broker/store/cassandra/CassandraServerMixin.scala Wed Jul 7 04:04:25 2010
@@ -0,0 +1,112 @@
+package org.apache.activemq.broker.store.cassandra
+
+import org.apache.cassandra.thrift.CassandraDaemon
+import org.apache.cassandra.config.DatabaseDescriptor
+import java.io.File
+import java.net.ConnectException
+import org.apache.thrift.transport.{TTransportException, TSocket}
+import com.shorrockin.cascal.session._
+import com.shorrockin.cascal.utils.{Utils, Logging}
+import org.scalatest.{Suite, BeforeAndAfterAll}
+
+/**
+ * trait which mixes in the functionality necessary to embed
+ * cassandra into a unit test
+ */
+trait CassandraServerMixin extends BeforeAndAfterAll {
+this: Suite =>
+
+ private var basedir = "target"
+
+ override protected def beforeAll(configMap: Map[String, Any]): Unit = {
+ configMap.get("basedir") match {
+ case Some(x) => basedir = x.toString
+ case _ =>
+ }
+ startCassandra
+ super.beforeAll(configMap)
+ }
+
+ override protected def afterAll(configMap: Map[String, Any]) = {
+ super.afterAll(configMap)
+ stopCassandra
+ }
+
+
+ import Utils._
+
+ protected def cassandraHomeDirectory = new File(new File(basedir),"cassandra.home.unit-tests")
+ protected def resource(str:String) = getClass.getResourceAsStream(str)
+
+ private var daemon:CassandraDaemon = null
+ var pool:SessionPool = null
+ var daemonThread:Thread = null
+
+ private def startCassandra = synchronized {
+ val hosts = Host("localhost", 9160, 250) :: Nil
+ val params = new PoolParams(10, ExhaustionPolicy.Fail, 500L, 6, 2)
+ pool = new SessionPool(hosts, params, Consistency.One)
+
+ val home = cassandraHomeDirectory
+ delete(home)
+ home.mkdirs
+
+ val fileSep = System.getProperty("file.separator")
+ val storageFile = new File(cassandraHomeDirectory, "storage-conf.xml")
+ val logFile = new File(cassandraHomeDirectory, "log4j.properties")
+
+ replace(copy(resource("storage-conf.xml"), storageFile), ("%temp-dir%" -> (cassandraHomeDirectory.getCanonicalPath + fileSep)))
+ copy(resource("log4j.properties"), logFile)
+
+ System.setProperty("storage-config", cassandraHomeDirectory.getCanonicalPath)
+
+ try {
+ DatabaseDescriptor.getAllDataFileLocations.foreach {
+ (file) => new File(file).mkdirs
+ }
+ new File(DatabaseDescriptor.getLogFileLocation).mkdirs
+
+ daemon = new CassandraDaemon
+ daemon.init(new Array[String](0))
+
+ daemonThread = new Thread("Cassandra Daemon") {
+ override def run = {
+ daemon.start
+ }
+ }
+ daemonThread.start
+
+ } catch {
+ case e:Throwable =>
+ e.printStackTrace
+ throw e
+ }
+
+ // try to make sockets until the server opens up - there has to be a better
+ // way - just not sure what it is.
+ val socket = new TSocket("localhost", 9160);
+ var opened = false
+ while (!opened) {
+ try {
+ socket.open()
+ opened = true
+ socket.close()
+ } catch {
+ case e:TTransportException => /* ignore */
+ case e:ConnectException => /* ignore */
+ }
+ }
+ }
+
+ private def stopCassandra() = {
+ pool.close
+ daemon.stop
+ daemonThread.join
+ daemonThread = null
+
+ daemon.destroy
+ daemon = null
+ }
+
+}
+
Added: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreTest.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreTest.scala?rev=961123&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreTest.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreTest.scala Wed Jul 7 04:04:25 2010
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store.cassandra
+
+import org.fusesource.hawtbuf.AsciiBuffer._
+import org.scalatest.BeforeAndAfterAll
+import org.fusesource.hawtdispatch.ScalaDispatch._
+import org.fusesource.hawtdispatch.TaskTracker
+import org.apache.activemq.apollo.broker.{LoggingTracker, FunSuiteSupport}
+import java.util.concurrent.{TimeUnit, CountDownLatch}
+import org.apache.activemq.apollo.store.{QueueEntryRecord, QueueStatus, QueueRecord, MessageRecord}
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class CassandraStoreTest extends FunSuiteSupport with CassandraServerMixin {
+
+ def CB[T](func: (T=>Unit)=>Unit ) = {
+ class X {
+ var value:T = _
+ }
+ val rc = new X
+ val cd = new CountDownLatch(1)
+ def cb(x:T) = {
+ rc.value = x
+ cd.countDown
+ }
+ func(cb)
+ cd.await
+ rc.value
+ }
+
+ var store:CassandraStore=null
+
+ override protected def beforeAll() = {
+ store = new CassandraStore()
+ val tracker = new LoggingTracker("store startup")
+ tracker.start(store)
+ tracker.await
+ }
+
+ override protected def afterAll() = {
+ val tracker = new LoggingTracker("store stop")
+ tracker.stop(store)
+ tracker.await
+ }
+
+
+ def expectCB[T](expected:T)(func: (T=>Unit)=>Unit ) = {
+ expect(expected) {
+ CB(func)
+ }
+ }
+
+
+ test("add message") {
+ addMessage
+ }
+
+ def addMessage() {
+ var queueA = new QueueRecord
+ queueA.id =1
+ queueA.name = ascii("queue:1")
+
+ val rc:Option[Long] = CB( cb=> store.addQueue(queueA)(cb) )
+ queueA.id = rc.get
+
+ val expected:Seq[Long] = List(queueA.id)
+ expectCB(expected) { cb=>
+ store.listQueues(cb)
+ }
+
+ var tx = store.createStoreBatch
+ var message = new MessageRecord
+ message.id = 35
+ message.messageId = ascii("msg-35")
+ message.protocol = ascii("test-protocol")
+ message.value = ascii("test content").buffer
+ message.size = message.value.length
+ tx.store(message)
+
+
+ val disposed = new CountDownLatch(1)
+
+ var queueEntry = new QueueEntryRecord
+ queueEntry.queueKey = queueA.id
+ queueEntry.messageKey = message.id
+ queueEntry.queueSeq = 1
+
+ tx.enqueue(queueEntry)
+ tx.setDisposer(^{ disposed.countDown })
+ tx.dispose
+
+ // It should not finish disposing right away...
+ expect(false) {
+ disposed.await(5, TimeUnit.SECONDS)
+ }
+
+ var flushed = new CountDownLatch(1)
+ store.flushMessage(message.id) {
+ flushed.countDown
+ }
+
+ // Should flush quickly now..
+ expect(true) {
+ flushed.await(1, TimeUnit.SECONDS)
+ }
+ // Flushing triggers the tx to finish disposing.
+ expect(true) {
+ disposed.await(1, TimeUnit.SECONDS)
+ }
+
+ // add another message to the queue..
+ tx = store.createStoreBatch
+ message = new MessageRecord
+ message.id = 36
+ message.messageId = ascii("msg-35")
+ message.protocol = ascii("test-protocol")
+ message.value = ascii("test content").buffer
+ message.size = message.value.length
+ tx.store(message)
+
+ queueEntry = new QueueEntryRecord
+ queueEntry.queueKey = queueA.id
+ queueEntry.messageKey = message.id
+ queueEntry.queueSeq = 2
+
+ tx.enqueue(queueEntry)
+
+ flushed = new CountDownLatch(1)
+ store.flushMessage(message.id) {
+ flushed.countDown
+ }
+ flushed.await
+
+ val qso:Option[QueueStatus] = CB( cb=> store.getQueueStatus(queueA.id)(cb) )
+ expect(ascii("queue:1")) {
+ qso.get.record.name
+ }
+ expect(2) {
+ qso.get.count
+ }
+
+ println("xx")
+
+ }
+
+}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala?rev=961123&r1=961122&r2=961123&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala Wed Jul 7 04:04:25 2010
@@ -30,8 +30,8 @@ import org.apache.activemq.apollo.dto.Ha
import org.apache.activemq.apollo.broker._
import ReporterLevel._
import store.HawtDBManager
-import org.apache.activemq.broker.store.{Store, StoreTransaction}
-import org.apache.activemq.apollo.store.{QueueStatus, MessageRecord, QueueRecord}
+import org.apache.activemq.broker.store.{Store, StoreBatch}
+import org.apache.activemq.apollo.store.{QueueEntryRecord, QueueStatus, MessageRecord, QueueRecord}
object HawtDBStore extends Log {
val DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
@@ -75,6 +75,8 @@ class HawtDBStore extends BaseService wi
/**
* Validates and then applies the configuration.
*/
+ def getQueueEntries(id: Long)(cb: (Seq[QueueEntryRecord]) => Unit) = {}
+
def configure(config: HawtDBStoreDTO, reporter:Reporter) = ^{
if ( validate(config, reporter) < ERROR ) {
this.config = config
@@ -117,7 +119,7 @@ class HawtDBStore extends BaseService wi
def flushMessage(id: Long)(cb: => Unit) = {}
- def createStoreTransaction() = new HawtDBStoreTransaction
+ def createStoreBatch() = new HawtDBStoreBatch
/////////////////////////////////////////////////////////////////////
@@ -125,16 +127,15 @@ class HawtDBStore extends BaseService wi
// Implementation of the StoreTransaction interface
//
/////////////////////////////////////////////////////////////////////
- class HawtDBStoreTransaction extends BaseRetained with StoreTransaction {
+ class HawtDBStoreBatch extends BaseRetained with StoreBatch {
def store(delivery: MessageRecord) = {
}
- def enqueue(queue: Long, seq: Long, msg: Long) = {}
-
- def dequeue(queue: Long, seq: Long, msg: Long) = {}
+ def dequeue(entry: QueueEntryRecord) = {}
+ def enqueue(entry: QueueEntryRecord) = {}
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryRecord.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryRecord.java?rev=961123&r1=961122&r2=961123&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryRecord.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryRecord.java Wed Jul 7 04:04:25 2010
@@ -24,6 +24,7 @@ import org.fusesource.hawtbuf.Buffer;
public class QueueEntryRecord {
public long queueKey;
+ public long queueSeq;
public long messageKey;
public Buffer attachment;
public int size;
Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala?rev=961123&r1=961122&r2=961123&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala Wed Jul 7 04:04:25 2010
@@ -32,7 +32,7 @@ import org.apache.activemq.apollo.store.
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-trait StoreTransaction extends Retained {
+trait StoreBatch extends Retained {
/**
* Assigns the delivery a store id if it did not already
@@ -43,12 +43,12 @@ trait StoreTransaction extends Retained
/**
* Adds a delivery to a specified queue at a the specified position in the queue.
*/
- def enqueue(queue:Long, seq:Long, msg:Long)
+ def enqueue(entry:QueueEntryRecord)
/**
* Removes a delivery from a specified queue at a the specified position in the queue.
*/
- def dequeue(queue:Long, seq:Long, msg:Long)
+ def dequeue(entry:QueueEntryRecord)
}
@@ -74,6 +74,11 @@ trait Store extends Service {
def listQueues(cb: (Seq[Long])=>Unit )
/**
+ * Loads the queue information for a given queue id.
+ */
+ def getQueueEntries(id:Long)(cb:(Seq[QueueEntryRecord])=>Unit )
+
+ /**
* Removes a the delivery associated with the provided from any
* internal buffers/caches. The callback is executed once, the message is
* no longer buffered.
@@ -86,10 +91,10 @@ trait Store extends Service {
def loadMessage(id:Long)(cb:(Option[MessageRecord])=>Unit )
/**
- * Creates a StoreTransaction which is used to perform persistent
+ * Creates a StoreBatch which is used to perform persistent
* operations as unit of work.
*/
- def createStoreTransaction():StoreTransaction
+ def createStoreBatch():StoreBatch
}
Copied: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/IntCounter.scala (from r961122, activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryRecord.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/IntCounter.scala?p2=activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/IntCounter.scala&p1=activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryRecord.java&r1=961122&r2=961123&rev=961123&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryRecord.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/IntCounter.scala Wed Jul 7 04:04:25 2010
@@ -14,19 +14,31 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.apollo.store;
-
-import org.fusesource.hawtbuf.Buffer;
+package org.apache.activemq.apollo.util
/**
+ * <p>
+ * </p>
+ *
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-public class QueueEntryRecord {
+class IntCounter(private var value:Int = 0) {
+
+ def get() = value
+
+ def incrementAndGet() = addAndGet(1)
+ def decrementAndGet() = addAndGet(-1)
+ def addAndGet(amount:Int) = {
+ value+=amount
+ value
+ }
- public long queueKey;
- public long messageKey;
- public Buffer attachment;
- public int size;
- public short redeliveries;
+ def getAndIncrement() = getAndAdd(1)
+ def getAndDecrement() = getAndAdd(-11)
+ def getAndAdd(amount:Int) = {
+ val rc = value
+ value+=amount
+ rc
+ }
-}
+}
\ No newline at end of file
Copied: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/LongCounter.scala (from r961122, activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryRecord.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/LongCounter.scala?p2=activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/LongCounter.scala&p1=activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryRecord.java&r1=961122&r2=961123&rev=961123&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryRecord.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/LongCounter.scala Wed Jul 7 04:04:25 2010
@@ -14,19 +14,31 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.apollo.store;
-
-import org.fusesource.hawtbuf.Buffer;
+package org.apache.activemq.apollo.util
/**
+ * <p>
+ * </p>
+ *
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-public class QueueEntryRecord {
+class LongCounter(private var value:Long = 0) {
+
+ def get() = value
+
+ def incrementAndGet() = addAndGet(1)
+ def decrementAndGet() = addAndGet(-1)
+ def addAndGet(amount:Long) = {
+ value+=amount
+ value
+ }
- public long queueKey;
- public long messageKey;
- public Buffer attachment;
- public int size;
- public short redeliveries;
+ def getAndIncrement() = getAndAdd(1)
+ def getAndDecrement() = getAndAdd(-11)
+ def getAndAdd(amount:Long) = {
+ val rc = value
+ value+=amount
+ rc
+ }
-}
+}
\ No newline at end of file
Modified: activemq/sandbox/activemq-apollo-actor/activemq-util/src/test/scala/org/apache/activemq/apollo/broker/FunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/test/scala/org/apache/activemq/apollo/broker/FunSuiteSupport.scala?rev=961123&r1=961122&r2=961123&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/test/scala/org/apache/activemq/apollo/broker/FunSuiteSupport.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/test/scala/org/apache/activemq/apollo/broker/FunSuiteSupport.scala Wed Jul 7 04:04:25 2010
@@ -30,6 +30,7 @@ abstract class FunSuiteSupport extends F
case _ => System.getProperty("basedir", ".")
}
debug("using basedir: " + _basedir)
+ super.beforeAll(map)
}
//
Modified: activemq/sandbox/activemq-apollo-actor/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/pom.xml?rev=961123&r1=961122&r2=961123&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/pom.xml Wed Jul 7 04:04:25 2010
@@ -137,6 +137,7 @@
<module>activemq-util</module>
<module>activemq-transport</module>
<module>activemq-store</module>
+ <module>activemq-cassandra</module>
<module>activemq-broker</module>
<module>activemq-selector</module>
<module>activemq-tcp</module>