You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:05:07 UTC
svn commit: r961124 - in /activemq/sandbox/activemq-apollo-actor:
activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/
activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/
activemq-cassandra/ activemq-cassandra/src/ma...
Author: chirino
Date: Wed Jul 7 04:05:05 2010
New Revision: 961124
URL: http://svn.apache.org/viewvc?rev=961124&view=rev
Log:
- More consistent naming of persistent key feilds
- Defined factory interfaces for the stores
- Added initial bits needed to test persistance in the stomp load client
- bug fixes
Added:
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreSPI.scala
- copied, changed from r961123, activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/StoreDTO.java
activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/CassandraStoreDTO.java
- copied, changed from r961123, activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/MemoryStoreDTO.java
activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/StoreFactory.scala
activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/ClassFinder.scala
Removed:
activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/MemoryStoreDTO.java
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/pom.xml
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/proto/data.proto
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreTest.scala
activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/StoreDTO.java
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/hawtdb-data.proto
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBSession.java
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/RootEntity.java
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBManagerBenchmark.java
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBManagerTest.java
activemq/sandbox/activemq-apollo-actor/activemq-stomp/pom.xml
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
activemq/sandbox/activemq-apollo-actor/activemq-store/pom.xml
activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/MessageRecord.java
activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueRecord.java
activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=961124&r1=961123&r2=961124&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jul 7 04:05:05 2010
@@ -21,6 +21,8 @@ import _root_.java.lang.{String}
import _root_.org.fusesource.hawtdispatch._
import org.fusesource.hawtbuf._
import org.apache.activemq.broker.store.StoreBatch
+import org.apache.activemq.apollo.store.MessageRecord
+import protocol.ProtocolFactory
/**
* A producer which sends Delivery objects to a delivery consumer.
@@ -146,7 +148,7 @@ class Delivery extends BaseRetained {
/**
* A reference to the stored version of the message.
*/
- var storeId:Long = -1
+ var storeKey:Long = -1
/**
* The transaction the delivery is participating in.
@@ -164,8 +166,16 @@ class Delivery extends BaseRetained {
def set(other:Delivery) = {
size = other.size
message = other.message
- storeId = other.storeId
+ storeKey = other.storeKey
this
}
+ def createMessageRecord() = {
+ val sm = new MessageRecord
+ sm.protocol = message.protocol
+ sm.value = ProtocolFactory.get(message.protocol).encode(message)
+ sm.size = size
+ sm
+ }
+
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961124&r1=961123&r2=961124&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul 7 04:05:05 2010
@@ -135,7 +135,7 @@ class Queue(val host: VirtualHost, val d
if( delivery.ack!=null ) {
delivery.ack(delivery.storeBatch)
}
- if (delivery.storeId != -1) {
+ if (delivery.storeKey != -1) {
delivery.storeBatch.enqueue(entry.createQueueEntryRecord)
delivery.storeBatch.release
}
@@ -229,7 +229,7 @@ class Queue(val host: VirtualHost, val d
// Called from the producer thread before the delivery is
// processed by the queue's thread.. We don't
// yet know the order of the delivery in the queue.
- if (delivery.storeId != -1) {
+ if (delivery.storeKey != -1) {
// If the message has a store id, then this delivery will
// need a tx to track the store changes.
if( delivery.storeBatch == null ) {
@@ -371,19 +371,10 @@ class Queue(val host: VirtualHost, val d
// Chuck the reset out...
val loaded = entry.value.asLoaded
if( loaded!=null ) {
- var ref = loaded.delivery.storeId
+ var ref = loaded.delivery.storeKey
if( ref == -1 ) {
val tx = host.store.createStoreBatch
-
- val message = loaded.delivery.message
- val sm = new MessageRecord
- sm.protocol = message.protocol
- sm.value = ProtocolFactory.get(message.protocol).encode(message)
- sm.size = loaded.size
-
- tx.store(sm)
- loaded.delivery.storeId = sm.id
-
+ loaded.delivery.storeKey = tx.store(loaded.delivery.createMessageRecord)
tx.enqueue(entry.createQueueEntryRecord)
tx.release
}
@@ -408,7 +399,7 @@ class Queue(val host: VirtualHost, val d
val delivery = new Delivery()
delivery.message = ProtocolFactory.get(stored.protocol).decode(stored.value)
delivery.size = stored.size
- delivery.storeId = stored.id
+ delivery.storeKey = stored.key
entry.loaded(delivery)
@@ -478,7 +469,7 @@ class QueueEntry(val queue:Queue) extend
def stored() = {
val loaded = value.asLoaded
- this.value = new Stored(loaded.delivery.storeId, loaded.size)
+ this.value = new Stored(loaded.delivery.storeKey, loaded.size)
this
}
@@ -620,7 +611,7 @@ class QueueEntry(val queue:Queue) extend
class Loaded(val delivery: Delivery) extends EntryType {
var aquired = false
- def ref = delivery.storeId
+ def ref = delivery.storeKey
def size = delivery.size
def flushing = false
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=961124&r1=961123&r2=961124&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jul 7 04:05:05 2010
@@ -24,13 +24,13 @@ import _root_.scala.collection.JavaConve
import _root_.scala.reflect.BeanProperty
import path.PathFilter
import org.fusesource.hawtbuf.AsciiBuffer
-import org.apache.activemq.apollo.dto.VirtualHostDTO
import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
import ReporterLevel._
import org.apache.activemq.broker.store.{Store}
import org.fusesource.hawtbuf.proto.WireFormat
-import org.apache.activemq.apollo.store.QueueRecord
+import org.apache.activemq.apollo.store.{StoreFactory, QueueRecord}
+import org.apache.activemq.apollo.dto.{CassandraStoreDTO, VirtualHostDTO}
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -51,6 +51,9 @@ object VirtualHost extends Log {
rc.id = "default"
rc.enabled = true
rc.hostNames.add("localhost")
+ val store = new CassandraStoreDTO
+ store.hosts.add("127.0.0.1:9160")
+ rc.store = store
rc
}
@@ -59,9 +62,13 @@ object VirtualHost extends Log {
*/
def validate(config: VirtualHostDTO, reporter:Reporter):ReporterLevel = {
new Reporting(reporter) {
+
if( config.hostNames.isEmpty ) {
error("Virtual host must be configured with at least one host name.")
}
+
+ result |= StoreFactory.validate(config.store, reporter)
+
}.result
}
@@ -111,16 +118,19 @@ class VirtualHost(val broker: Broker) ex
override protected def _start(onCompleted:Runnable):Unit = {
+ val tracker = new LoggingTracker("virtual host startup", dispatchQueue)
+ store = StoreFactory.create(config.store)
if( store!=null ) {
- store.start();
- store.listQueues { ids =>
- for( id <- ids) {
- store.getQueueStatus(id) { x =>
- x match {
- case Some(info)=>
- dispatchQueue ^{
- val dest = DestinationParser.parse(info.record.name , destination_parser_options)
- if( dest.getDomain == Domain.QUEUE_DOMAIN ) {
+ val task = tracker.task("store startup")
+
+ store.start(^{
+ store.listQueues { ids =>
+ for( id <- ids) {
+ store.getQueueStatus(id) { x =>
+ x match {
+ case Some(info)=>
+ dispatchQueue ^{
+ val dest = new SingleDestination(Domain.QUEUE_DOMAIN, info.record.name)
val queue = new Queue(this, dest, id)
queue.first_seq = info.first
@@ -130,19 +140,22 @@ class VirtualHost(val broker: Broker) ex
queues.put(info.record.name, queue)
}
+ case _ =>
}
- case _ =>
}
}
}
- }
+ task.run
+
+ });
}
//Recover transactions:
transactionManager.virtualHost = this
transactionManager.loadTransactions();
- onCompleted.run
+
+ tracker.callback(onCompleted)
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala?rev=961124&r1=961123&r2=961124&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala Wed Jul 7 04:05:05 2010
@@ -16,72 +16,16 @@
*/
package org.apache.activemq.apollo.broker.protocol
-import java.util.Properties
-import java.net.{URLClassLoader, URL}
import org.apache.activemq.transport.DefaultTransportListener
-import java.io.{IOException, File, InputStream}
+import java.io.{IOException}
import org.apache.activemq.apollo.broker.{Message, BrokerConnection}
import org.fusesource.hawtbuf.{Buffer, AsciiBuffer}
import org.apache.activemq.wireformat.WireFormat
+import org.apache.activemq.apollo.util.ClassFinder
/**
* <p>
- * Used to discover classes using the META-INF discovery trick.
- * </p>
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-case class ClassFinder[T](path:String, loaders:Seq[ClassLoader]=Thread.currentThread.getContextClassLoader::Nil) {
-
- def find(): List[Class[T]] = {
- var classes = List[Class[T]]()
- loaders.foreach { loader=>
-
- val resources = loader.getResources(path)
- var classNames: List[String] = Nil
- while(resources.hasMoreElements) {
- val url = resources.nextElement;
- val p = loadProperties(url.openStream)
- val enum = p.keys
- while (enum.hasMoreElements) {
- classNames = classNames ::: enum.nextElement.asInstanceOf[String] :: Nil
- }
- }
- classNames = classNames.removeDuplicates
-
- classes :::= classNames.map { name=>
- loader.loadClass(name).asInstanceOf[Class[T]]
- }
-
- }
-
- return classes.removeDuplicates
- }
-
- private def loadProperties(is:InputStream):Properties = {
- if( is==null ) {
- return null;
- }
- try {
- val p = new Properties()
- p.load(is);
- return p
- } catch {
- case e:Exception =>
- return null
- } finally {
- try {
- is.close()
- } catch {
- case _ =>
- }
- }
- }
-}
-
-/**
- * <p>
* </p>
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
Modified: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/pom.xml?rev=961124&r1=961123&r2=961124&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/pom.xml Wed Jul 7 04:05:05 2010
@@ -96,6 +96,14 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-util</artifactId>
+ <version>6.0-SNAPSHOT</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
Modified: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/proto/data.proto
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/proto/data.proto?rev=961124&r1=961123&r2=961124&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/proto/data.proto (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/proto/data.proto Wed Jul 7 04:05:05 2010
@@ -19,12 +19,11 @@ package org.apache.activemq.broker.store
option java_multiple_files = true;
message PBMessageRecord {
- required bytes messageId = 2 [java_override_type = "AsciiBuffer"];
- required bytes protocol = 3 [java_override_type = "AsciiBuffer"];
- required int32 size = 4;
- optional bytes value = 5;
- optional int64 stream = 6;
- optional int64 expiration = 7;
+ required bytes protocol = 1 [java_override_type = "AsciiBuffer"];
+ required int32 size = 2;
+ optional bytes value = 3;
+ optional int64 stream = 4;
+ optional int64 expiration = 5;
}
message PBQueueEntryRecord {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala?rev=961124&r1=961123&r2=961124&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala Wed Jul 7 04:05:05 2010
@@ -58,7 +58,6 @@ class CassandraClient() {
import PBMessageRecord._
val pb = PBMessageRecordBuffer.parseUnframed(v)
val rc = new MessageRecord
- rc.messageId = pb.getMessageId
rc.protocol = pb.getProtocol
rc.size = pb.getSize
rc.value = pb.getValue
@@ -70,7 +69,6 @@ class CassandraClient() {
implicit def encodeMessageRecord(v: MessageRecord): Array[Byte] = {
import PBMessageRecord._
val pb = new PBMessageRecordBean
- pb.setMessageId(v.messageId)
pb.setProtocol(v.protocol)
pb.setSize(v.size)
pb.setValue(v.value)
@@ -103,7 +101,7 @@ class CassandraClient() {
def addQueue(record: QueueRecord) = {
withSession {
session =>
- session.insert(schema.queue_name \ (record.id, record.name))
+ session.insert(schema.queue_name \ (record.key, record.name))
}
}
@@ -126,7 +124,7 @@ class CassandraClient() {
val rc = new QueueStatus
rc.record = new QueueRecord
- rc.record.id = id
+ rc.record.key = id
rc.record.name = new AsciiBuffer(x)
rc.count = session.count( schema.entries \ id )
@@ -180,7 +178,7 @@ class CassandraClient() {
session.get(schema.message_data \ id) match {
case Some(x) =>
val rc: MessageRecord = x.value
- rc.id = id
+ rc.key = id
Some(rc)
case None =>
None
Modified: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala?rev=961124&r1=961123&r2=961124&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala Wed Jul 7 04:05:05 2010
@@ -18,28 +18,52 @@ package org.apache.activemq.broker.store
import org.apache.activemq.broker.store.{StoreBatch, Store}
import org.fusesource.hawtdispatch.BaseRetained
-import org.apache.activemq.apollo.broker.{Logging, Log, BaseService}
import com.shorrockin.cascal.session._
-import org.fusesource.hawtdispatch.ScalaDispatch._
import java.util.concurrent.atomic.AtomicLong
import collection.mutable.ListBuffer
-import org.fusesource.hawtbuf.{Buffer, AsciiBuffer}
-import com.shorrockin.cascal.model.Key
-import org.apache.log.output.db.ColumnType
-import java.util.{HashSet, HashMap}
+import java.util.HashMap
import java.util.concurrent.{TimeUnit, Executors, ExecutorService}
import org.apache.activemq.apollo.util.IntCounter
-import com.shorrockin.cascal.utils.Conversions._
import org.apache.activemq.apollo.store.{QueueEntryRecord, MessageRecord, QueueStatus, QueueRecord}
-import collection.Seq
+import org.apache.activemq.apollo.broker.{Logging, Log, BaseService}
+import org.apache.activemq.apollo.dto.{CassandraStoreDTO, StoreDTO}
+import collection.{JavaConversions, Seq}
+import org.apache.activemq.apollo.broker.{Reporting, ReporterLevel, Reporter}
+import com.shorrockin.cascal.utils.Conversions._
+import org.fusesource.hawtdispatch.ScalaDispatch._
+import ReporterLevel._
+
+object CassandraStore extends Log {
+ val DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
+
+ /**
+ * Creates a default a configuration object.
+ */
+ def default() = {
+ val rc = new CassandraStoreDTO
+ rc.hosts.add("localhost:9160")
+ rc
+ }
+
+ /**
+ * Validates a configuration object.
+ */
+ def validate(config: CassandraStoreDTO, reporter:Reporter):ReporterLevel = {
+ new Reporting(reporter) {
+ if( config.hosts.isEmpty ) {
+ error("At least one cassandra host must be configured.")
+ }
+ }.result
+ }
+}
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
class CassandraStore extends Store with BaseService with Logging {
- import CassandraStoreHelper._
- override protected def log = CassandraStoreHelper
+ import CassandraStore._
+ override protected def log = CassandraStore
/////////////////////////////////////////////////////////////////////
//
@@ -53,10 +77,36 @@ class CassandraStore extends Store with
val client = new CassandraClient()
protected var executor_pool:ExecutorService = _
+ var config:CassandraStoreDTO = default
+
+ def configure(config: StoreDTO, reporter: Reporter) = configure(config.asInstanceOf[CassandraStoreDTO], reporter)
+
+ def configure(config: CassandraStoreDTO, reporter: Reporter) = {
+ if ( CassandraStore.validate(config, reporter) < ERROR ) {
+ if( serviceState.isStarted ) {
+ // TODO: apply changes while he broker is running.
+ reporter.report(WARN, "Updating cassandra store configuration at runtime is not yet supported. You must restart the broker for the change to take effect.")
+ } else {
+ this.config = config
+ }
+ }
+ }
protected def _start(onCompleted: Runnable) = {
executor_pool = Executors.newCachedThreadPool
- client.schema = Schema("ActiveMQ")
+ client.schema = Schema(config.keyspace)
+
+ // TODO: move some of this parsing code into validation too.
+ val HostPort = """([^:]+)(:(\d+))?""".r
+ import JavaConversions._
+ client.hosts = config.hosts.flatMap { x=>
+ x match {
+ case HostPort(host,_,port)=>
+ Some(Host(host, port.toInt, 3000))
+ case _=> None
+ }
+ }.toList
+
client.start
onCompleted.run
}
@@ -169,14 +219,15 @@ class CassandraStore extends Store with
onPerformed
}
- def store(record: MessageRecord) = {
- record.id = next_msg_key.incrementAndGet
+ def store(record: MessageRecord):Long = {
+ record.key = next_msg_key.incrementAndGet
val action = new MessageAction
- action.msg = record.id
+ action.msg = record.key
action.store = record
this.synchronized {
- actions += record.id -> action
+ actions += record.key -> action
}
+ record.key
}
def action(msg:Long) = {
@@ -229,7 +280,8 @@ class CassandraStore extends Store with
val tx_id = next_tx_id.incrementAndGet
tx.txid = tx_id
delayedTransactions.put(tx_id, tx)
- dispatchQueue.dispatchAfter(30, TimeUnit.SECONDS, ^{flush(tx_id)})
+ dispatchQueue.dispatchAsync(^{flush(tx_id)})
+ dispatchQueue.dispatchAfter(50, TimeUnit.MILLISECONDS, ^{flush(tx_id)})
tx.actions.foreach { case (msg, action) =>
if( action.store!=null ) {
@@ -257,11 +309,11 @@ class CassandraStore extends Store with
// Cancel the action if it's now empty
if( prevAction.isEmpty ) {
- action.cancel()
+ prevAction.cancel()
}
// since we canceled out the previous enqueue.. now cancel out the action
- action.dequeues = action.dequeues.filterNot( x=> key(x) == currentDequeue)
+ action.dequeues = action.dequeues.filterNot( _ == currentDequeue)
if( action.isEmpty ) {
action.cancel()
}
@@ -292,7 +344,8 @@ class CassandraStore extends Store with
pendingStores.remove(msg)
}
action.enqueues.foreach { queueEntry=>
- pendingEnqueues.remove(key(queueEntry), action)
+ val k = key(queueEntry)
+ pendingEnqueues.remove(k)
}
}
@@ -317,27 +370,3 @@ class CassandraStore extends Store with
}
}
-
-object CassandraStoreHelper extends Log {
- val DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
-
-// /**
-// * Creates a default a configuration object.
-// */
-// def default() = {
-// val rc = new HawtDBStoreDTO
-// rc.directory = new File("activemq-data")
-// rc
-// }
-//
-// /**
-// * Validates a configuration object.
-// */
-// def validate(config: HawtDBStoreDTO, reporter:Reporter):ReporterLevel = {
-// new Reporting(reporter) {
-// if( config.directory == null ) {
-// error("hawtdb store must be configured with a directroy.")
-// }
-// }.result
-// }
-}
Copied: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreSPI.scala (from r961123, activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/StoreDTO.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreSPI.scala?p2=activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreSPI.scala&p1=activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/StoreDTO.java&r1=961123&r2=961124&rev=961124&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/StoreDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreSPI.scala Wed Jul 7 04:05:05 2010
@@ -14,20 +14,36 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.apollo.dto;
+package org.apache.activemq.broker.store.cassandra
-import org.codehaus.jackson.annotate.JsonTypeInfo;
-
-import javax.xml.bind.annotation.XmlRootElement;
-import javax.xml.bind.annotation.XmlSeeAlso;
-import javax.xml.bind.annotation.XmlType;
+import org.apache.activemq.apollo.store.StoreFactory
+import org.apache.activemq.apollo.dto.{CassandraStoreDTO, StoreDTO}
+import org.apache.activemq.apollo.broker.{Reporting, ReporterLevel, Reporter}
+import ReporterLevel._
/**
+ * <p>
+ * Hook to use a CassandraStore when a CassandraStoreDTO is
+ * used in a broker configuration.
+ * </p>
+ *
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-@XmlType(name = "store-type")
-@XmlSeeAlso({MemoryStoreDTO.class, HawtDBStoreDTO.class})
-@JsonTypeInfo(use=JsonTypeInfo.Id.CLASS, include=JsonTypeInfo.As.PROPERTY, property="@class")
-public abstract class StoreDTO {
+class CassandraStoreSPI extends StoreFactory.SPI {
+
+ def create(config: StoreDTO) = {
+ if( config.isInstanceOf[CassandraStoreDTO]) {
+ new CassandraStore
+ } else {
+ null
+ }
+ }
-}
+ def validate(config: StoreDTO, reporter:Reporter):ReporterLevel = {
+ if( config.isInstanceOf[CassandraStoreDTO]) {
+ CassandraStore.validate(config.asInstanceOf[CassandraStoreDTO], reporter)
+ } else {
+ null
+ }
+ }
+}
\ No newline at end of file
Modified: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreTest.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreTest.scala?rev=961124&r1=961123&r2=961124&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreTest.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/test/scala/org/apache/activemq/broker/store/cassandra/CassandraStoreTest.scala Wed Jul 7 04:05:05 2010
@@ -73,21 +73,20 @@ class CassandraStoreTest extends FunSuit
def addMessage() {
var queueA = new QueueRecord
- queueA.id =1
+ queueA.key =1
queueA.name = ascii("queue:1")
val rc:Option[Long] = CB( cb=> store.addQueue(queueA)(cb) )
- queueA.id = rc.get
+ queueA.key = rc.get
- val expected:Seq[Long] = List(queueA.id)
+ val expected:Seq[Long] = List(queueA.key)
expectCB(expected) { cb=>
store.listQueues(cb)
}
var tx = store.createStoreBatch
var message = new MessageRecord
- message.id = 35
- message.messageId = ascii("msg-35")
+ message.key = 35
message.protocol = ascii("test-protocol")
message.value = ascii("test content").buffer
message.size = message.value.length
@@ -97,8 +96,8 @@ class CassandraStoreTest extends FunSuit
val disposed = new CountDownLatch(1)
var queueEntry = new QueueEntryRecord
- queueEntry.queueKey = queueA.id
- queueEntry.messageKey = message.id
+ queueEntry.queueKey = queueA.key
+ queueEntry.messageKey = message.key
queueEntry.queueSeq = 1
tx.enqueue(queueEntry)
@@ -111,7 +110,7 @@ class CassandraStoreTest extends FunSuit
}
var flushed = new CountDownLatch(1)
- store.flushMessage(message.id) {
+ store.flushMessage(message.key) {
flushed.countDown
}
@@ -127,27 +126,26 @@ class CassandraStoreTest extends FunSuit
// add another message to the queue..
tx = store.createStoreBatch
message = new MessageRecord
- message.id = 36
- message.messageId = ascii("msg-35")
+ message.key = 36
message.protocol = ascii("test-protocol")
message.value = ascii("test content").buffer
message.size = message.value.length
tx.store(message)
queueEntry = new QueueEntryRecord
- queueEntry.queueKey = queueA.id
- queueEntry.messageKey = message.id
+ queueEntry.queueKey = queueA.key
+ queueEntry.messageKey = message.key
queueEntry.queueSeq = 2
tx.enqueue(queueEntry)
flushed = new CountDownLatch(1)
- store.flushMessage(message.id) {
+ store.flushMessage(message.key) {
flushed.countDown
}
flushed.await
- val qso:Option[QueueStatus] = CB( cb=> store.getQueueStatus(queueA.id)(cb) )
+ val qso:Option[QueueStatus] = CB( cb=> store.getQueueStatus(queueA.key)(cb) )
expect(ascii("queue:1")) {
qso.get.record.name
}
Copied: activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/CassandraStoreDTO.java (from r961123, activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/MemoryStoreDTO.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/CassandraStoreDTO.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/CassandraStoreDTO.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/MemoryStoreDTO.java&r1=961123&r2=961124&rev=961124&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/MemoryStoreDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/CassandraStoreDTO.java Wed Jul 7 04:05:05 2010
@@ -16,16 +16,20 @@
*/
package org.apache.activemq.apollo.dto;
-import javax.xml.bind.annotation.XmlAccessType;
-import javax.xml.bind.annotation.XmlAccessorType;
-import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.*;
+import java.util.ArrayList;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-@XmlRootElement(name="memory-store")
+@XmlRootElement(name="cassandra-store")
@XmlAccessorType(XmlAccessType.FIELD)
-public class MemoryStoreDTO extends StoreDTO {
+public class CassandraStoreDTO extends StoreDTO {
+ @XmlAttribute(name="keyspace")
+ public String keyspace="ActiveMQ";
+
+ @XmlElement(name="host", required=true)
+ public ArrayList<String> hosts = new ArrayList<String>();
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/StoreDTO.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/StoreDTO.java?rev=961124&r1=961123&r2=961124&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/StoreDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/StoreDTO.java Wed Jul 7 04:05:05 2010
@@ -18,7 +18,6 @@ package org.apache.activemq.apollo.dto;
import org.codehaus.jackson.annotate.JsonTypeInfo;
-import javax.xml.bind.annotation.XmlRootElement;
import javax.xml.bind.annotation.XmlSeeAlso;
import javax.xml.bind.annotation.XmlType;
@@ -26,7 +25,7 @@ import javax.xml.bind.annotation.XmlType
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
@XmlType(name = "store-type")
-@XmlSeeAlso({MemoryStoreDTO.class, HawtDBStoreDTO.class})
+@XmlSeeAlso({CassandraStoreDTO.class, HawtDBStoreDTO.class})
@JsonTypeInfo(use=JsonTypeInfo.Id.CLASS, include=JsonTypeInfo.As.PROPERTY, property="@class")
public abstract class StoreDTO {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/hawtdb-data.proto
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/hawtdb-data.proto?rev=961124&r1=961123&r2=961124&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/hawtdb-data.proto (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/hawtdb-data.proto Wed Jul 7 04:05:05 2010
@@ -55,11 +55,10 @@ message Trace {
message MessageAdd {
optional int64 messageKey=1;
- optional bytes messageId = 2 [java_override_type = "AsciiBuffer"];
- optional bytes protocol = 3 [java_override_type = "AsciiBuffer"];
- optional bytes value = 4;
- optional int64 streamKey=5;
- optional int32 messageSize=6;
+ optional bytes protocol = 2 [java_override_type = "AsciiBuffer"];
+ optional bytes value = 3;
+ optional int64 streamKey=4;
+ optional int32 messageSize=5;
}
///////////////////////////////////////////////////////////////
Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala?rev=961124&r1=961123&r2=961124&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala Wed Jul 7 04:05:05 2010
@@ -26,12 +26,12 @@ import java.util.HashSet
import org.fusesource.hawtdb.api.{Transaction, TxPageFileFactory}
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.{Executors, TimeUnit}
-import org.apache.activemq.apollo.dto.HawtDBStoreDTO
import org.apache.activemq.apollo.broker._
import ReporterLevel._
import store.HawtDBManager
import org.apache.activemq.broker.store.{Store, StoreBatch}
import org.apache.activemq.apollo.store.{QueueEntryRecord, QueueStatus, MessageRecord, QueueRecord}
+import org.apache.activemq.apollo.dto.{StoreDTO, HawtDBStoreDTO}
object HawtDBStore extends Log {
val DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
@@ -72,9 +72,15 @@ class HawtDBStore extends BaseService wi
var config: HawtDBStoreDTO = default
var manager:HawtDBManager = null
+
+
/**
* Validates and then applies the configuration.
*/
+ def configure(config: StoreDTO, reporter: Reporter) = {
+ //TODO:
+ }
+
def getQueueEntries(id: Long)(cb: (Seq[QueueEntryRecord]) => Unit) = {}
def configure(config: HawtDBStoreDTO, reporter:Reporter) = ^{
@@ -129,8 +135,8 @@ class HawtDBStore extends BaseService wi
/////////////////////////////////////////////////////////////////////
class HawtDBStoreBatch extends BaseRetained with StoreBatch {
- def store(delivery: MessageRecord) = {
-
+ def store(delivery: MessageRecord):Long = {
+ -1L
}
def dequeue(entry: QueueEntryRecord) = {}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBSession.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBSession.java?rev=961124&r1=961123&r2=961124&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBSession.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBSession.java Wed Jul 7 04:05:05 2010
@@ -143,12 +143,11 @@ class HawtDBSession {
// /////////////////////////////////////////////////////////////
public void messageAdd(MessageRecord message) {
- if (message.id < 0) {
+ if (message.key < 0) {
throw new IllegalArgumentException("Key not set");
}
Data.MessageAdd.MessageAddBean bean = new Data.MessageAdd.MessageAddBean();
- bean.setMessageKey(message.id);
- bean.setMessageId(message.messageId);
+ bean.setMessageKey(message.key);
bean.setProtocol(message.protocol);
bean.setMessageSize(message.size);
Buffer buffer = message.value;
@@ -172,8 +171,7 @@ class HawtDBSession {
try {
Data.MessageAdd bean = (Data.MessageAdd) store.load(location);
MessageRecord rc = new MessageRecord();
- rc.id = bean.getMessageKey();
- rc.messageId = bean.getMessageId();
+ rc.key = bean.getMessageKey();
rc.protocol = bean.getProtocol();
rc.size = bean.getMessageSize();
if (bean.hasValue()) {
@@ -204,7 +202,7 @@ class HawtDBSession {
}
public void queueRemove(QueueRecord record) {
- addUpdate(new Data.QueueRemove.QueueRemoveBean().setKey(record.id));
+ addUpdate(new Data.QueueRemove.QueueRemoveBean().setKey(record.key));
}
public Iterator<QueueStatus> queueListByType(AsciiBuffer type, QueueRecord firstQueue, int max) {
@@ -227,7 +225,7 @@ class HawtDBSession {
public void queueAddMessage(QueueRecord queue, QueueEntryRecord entryRecord) throws KeyNotFoundException {
Data.QueueAddMessage.QueueAddMessageBean bean = new Data.QueueAddMessage.QueueAddMessageBean();
- bean.setQueueKey(queue.id);
+ bean.setQueueKey(queue.key);
bean.setQueueKey(entryRecord.queueKey);
bean.setMessageKey(entryRecord.messageKey);
bean.setMessageSize(entryRecord.size);
@@ -246,7 +244,7 @@ class HawtDBSession {
public Iterator<QueueEntryRecord> queueListMessagesQueue(QueueRecord queue, Long firstQueueKey, Long maxQueueKey, int max) throws KeyNotFoundException {
storeAtomic();
- DestinationEntity destination = store.rootEntity.getDestination(queue.id);
+ DestinationEntity destination = store.rootEntity.getDestination(queue.key);
if (destination == null) {
throw new KeyNotFoundException("queue key: " + queue);
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/RootEntity.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/RootEntity.java?rev=961124&r1=961123&r2=961124&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/RootEntity.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/RootEntity.java Wed Jul 7 04:05:05 2010
@@ -374,11 +374,11 @@ public class RootEntity {
// Queue Methods.
// /////////////////////////////////////////////////////////////////
public void queueAdd(Transaction tx, QueueRecord queue) throws IOException {
- if (data.destinationIndex.get(queue.id) == null) {
+ if (data.destinationIndex.get(queue.key) == null) {
DestinationEntity rc = new DestinationEntity();
rc.setQueueDescriptor(queue);
rc.allocate(tx);
- data.destinationIndex.put(queue.id, rc);
+ data.destinationIndex.put(queue.key, rc);
}
}
@@ -405,7 +405,7 @@ public class RootEntity {
LinkedList<org.apache.activemq.apollo.store.QueueStatus> results = new LinkedList<org.apache.activemq.apollo.store.QueueStatus>();
final Iterator<Entry<Long, DestinationEntity>> i;
- Long x = firstQueue==null? null : (Long)firstQueue.id;
+ Long x = firstQueue==null? null : (Long)firstQueue.key;
i = data.destinationIndex.iterator(x);
while (i.hasNext()) {
Entry<Long, DestinationEntity> entry = i.next();
Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBManagerBenchmark.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBManagerBenchmark.java?rev=961124&r1=961123&r2=961124&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBManagerBenchmark.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBManagerBenchmark.java Wed Jul 7 04:05:05 2010
@@ -235,8 +235,7 @@ public class HawtDBManagerBenchmark exte
enqueuePermits.acquire();
final MessageRecord messageRecord = new MessageRecord();
- messageRecord.id = store.allocateStoreTracking();
- messageRecord.messageId = new AsciiBuffer("" + i);
+ messageRecord.key = store.allocateStoreTracking();
messageRecord.protocol = new AsciiBuffer("encoding");
messageRecord.value = buffer;
messageRecord.size = buffer.getLength();
@@ -252,7 +251,7 @@ public class HawtDBManagerBenchmark exte
public void run(HawtDBSession session) throws Exception {
session.messageAdd(messageRecord);
QueueEntryRecord queueEntryRecord = new QueueEntryRecord();
- queueEntryRecord.messageKey = messageRecord.id;
+ queueEntryRecord.messageKey = messageRecord.key;
queueEntryRecord.queueKey = queueKey.incrementAndGet();
queueEntryRecord.size = messageRecord.size;
session.queueAddMessage(queueId, queueEntryRecord);
Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBManagerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBManagerTest.java?rev=961124&r1=961123&r2=961124&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBManagerTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBManagerTest.java Wed Jul 7 04:05:05 2010
@@ -66,8 +66,7 @@ public class HawtDBManagerTest extends T
final MessageRecord expected = new MessageRecord();
expected.value = new AsciiBuffer("buffer").buffer();
expected.protocol = new AsciiBuffer("encoding");
- expected.messageId = new AsciiBuffer("1000");
- expected.id = store.allocateStoreTracking();
+ expected.key = store.allocateStoreTracking();
expected.size = expected.value.getLength();
store.execute(new VoidCallback<Exception>() {
@@ -79,7 +78,7 @@ public class HawtDBManagerTest extends T
store.execute(new VoidCallback<Exception>() {
@Override
public void run(HawtDBSession session) throws Exception {
- MessageRecord actual = session.messageGetRecord(expected.id);
+ MessageRecord actual = session.messageGetRecord(expected.key);
assertEquals(expected, actual);
}
}, null);
@@ -119,12 +118,11 @@ public class HawtDBManagerTest extends T
final MessageRecord message = new MessageRecord();
message.value = new AsciiBuffer("buffer").buffer();
message.protocol = new AsciiBuffer("encoding");
- message.messageId = new AsciiBuffer("1000");
- message.id = store.allocateStoreTracking();
+ message.key = store.allocateStoreTracking();
message.size = message.value.getLength();
final QueueEntryRecord qEntryRecord = new QueueEntryRecord();
- qEntryRecord.messageKey = message.id;
+ qEntryRecord.messageKey = message.key;
qEntryRecord.queueKey = 1L;
qEntryRecord.size = message.size;
@@ -377,7 +375,7 @@ public class HawtDBManagerTest extends T
Assert.assertTrue(qRecords.hasNext());
QueueEntryRecord qr = qRecords.next();
Assert.assertEquals(qEntryRecord.queueKey, qr.queueKey);
- Assert.assertEquals(qEntryRecord.messageKey, message.id);
+ Assert.assertEquals(qEntryRecord.messageKey, message.key);
MessageRecord record = session.messageGetRecord(qr.messageKey);
assertEquals(record, message);
}
@@ -416,7 +414,6 @@ public class HawtDBManagerTest extends T
static void assertEquals(MessageRecord expected, MessageRecord actual) {
Assert.assertEquals(expected.value, actual.value);
Assert.assertEquals(expected.protocol, actual.protocol);
- Assert.assertEquals(expected.messageId, actual.messageId);
Assert.assertEquals(expected.stream, actual.stream);
Assert.assertEquals(expected.size, actual.size);
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/pom.xml?rev=961124&r1=961123&r2=961124&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/pom.xml Wed Jul 7 04:05:05 2010
@@ -43,6 +43,11 @@
<artifactId>activemq-tcp</artifactId>
<version>6.0-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-cassandra</artifactId>
+ <version>6.0-SNAPSHOT</version>
+ </dependency>
<!-- Scala Support -->
<dependency>
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961124&r1=961123&r2=961124&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul 7 04:05:05 2010
@@ -29,6 +29,7 @@ import Stomp._
import BufferConversions._
import StompFrameConstants._
import java.io.IOException
+import org.apache.activemq.broker.store.StoreBatch
object StompConstants {
@@ -263,6 +264,7 @@ class StompProtocolHandler extends Proto
}
def send_via_route(route:DeliveryProducerRoute, frame:StompFrame) = {
+ var storeBatch:StoreBatch=null
if( !route.targets.isEmpty ) {
// We may need to add some headers..
@@ -277,13 +279,14 @@ class StompProtocolHandler extends Proto
val delivery = new Delivery
delivery.message = message
delivery.size = message.frame.size
- if( message.persistent ) {
- // TODO:
-// val content = ascii("todo")
-// delivery.ref = host.database.createMessageRecord(message.id, content, PROTOCOL)
+
+ if( message.persistent && host.store!=null ) {
+ storeBatch = host.store.createStoreBatch
+ delivery.storeBatch = storeBatch
+ delivery.storeKey = delivery.storeBatch.store(delivery.createMessageRecord)
}
- // routes can allways accept at least 1 delivery...
+ // routes can always accept at least 1 delivery...
assert( !route.full )
route.offer(delivery)
if( route.full ) {
@@ -291,9 +294,29 @@ class StompProtocolHandler extends Proto
// until it's not full anymore.
connection.transport.suspendRead
}
+
} else {
// info("Dropping message. No consumers interested in message.")
}
+
+ // User might be asking for ack that we have prcoessed the message..
+ val receipt = frame.header(Stomp.Headers.RECEIPT_REQUESTED)
+ if( receipt!=null ) {
+ if( storeBatch==null ) {
+ // message was not persistent we can ack back right away..
+ connection_sink.offer(StompFrame(Responses.RECEIPT, List((Stomp.Headers.Response.RECEIPT_ID, receipt))))
+ } else {
+ // else lets ack back once the persistent operations are processed.
+ storeBatch.setDisposer(^{
+ connection_sink.offer(StompFrame(Responses.RECEIPT, List((Stomp.Headers.Response.RECEIPT_ID, receipt))))
+ })
+ }
+ }
+
+ if( storeBatch!=null ) {
+ // We can now release the batch as we are done using it..
+ storeBatch.release
+ }
}
def on_stomp_subscribe(headers:HeaderMap) = {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala?rev=961124&r1=961123&r2=961124&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala Wed Jul 7 04:05:05 2010
@@ -45,6 +45,8 @@ object StompLoadClient {
var bufferSize = 64*1204
var messageSize = 1024;
var useContentLength=true
+ var persistent = false;
+ var syncProducer = false;
var destinationType = "topic";
var destinationCount = 1;
@@ -131,6 +133,8 @@ object StompLoadClient {
"destinationType = "+destinationType+"\n"+
"destinationCount = "+destinationCount+"\n" +
"messageSize = "+messageSize+"\n"+
+ "persistent = "+persistent+"\n"+
+ "syncProducer = "+syncProducer+"\n"+
"producerSleep = "+producerSleep+"\n"+
"consumerSleep = "+consumerSleep+"\n"+
"bufferSize = "+bufferSize+"\n"+
@@ -254,6 +258,8 @@ object StompLoadClient {
var client:StompClient=null
val content = ("SEND\n" +
"destination:"+destination(id)+"\n"+
+ { if(persistent) "persistent:true\n" else "" } +
+ { if(syncProducer) "receipt:xxx\n" else "" } +
{ if(useContentLength) "content-length:"+messageSize+"\n" else "" } +
"\n"+message(name)).getBytes("UTF-8")
@@ -264,6 +270,11 @@ object StompLoadClient {
var i =0;
while (!done.get) {
client.send(content)
+ if( syncProducer ) {
+ // waits for the reply..
+ client.flush
+ client.skip
+ }
producerCounter.incrementAndGet();
if(producerSleep > 0) {
client.flush
Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/pom.xml?rev=961124&r1=961123&r2=961124&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/pom.xml Wed Jul 7 04:05:05 2010
@@ -35,6 +35,11 @@
<dependency>
<groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-dto</artifactId>
+ <version>6.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
<artifactId>activemq-util</artifactId>
<version>6.0-SNAPSHOT</version>
</dependency>
Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/MessageRecord.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/MessageRecord.java?rev=961124&r1=961123&r2=961124&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/MessageRecord.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/MessageRecord.java Wed Jul 7 04:05:05 2010
@@ -24,8 +24,7 @@ import org.fusesource.hawtbuf.Buffer;
*/
public class MessageRecord {
- public long id = -1;
- public AsciiBuffer messageId;
+ public long key = -1;
public AsciiBuffer protocol;
public int size;
public Buffer value;
Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueRecord.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueRecord.java?rev=961124&r1=961123&r2=961124&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueRecord.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueRecord.java Wed Jul 7 04:05:05 2010
@@ -24,7 +24,7 @@ import org.fusesource.hawtbuf.AsciiBuffe
*/
public class QueueRecord {
- public long id = -1;
+ public long key = -1;
public AsciiBuffer name;
public AsciiBuffer queueType;
Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala?rev=961124&r1=961123&r2=961124&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala Wed Jul 7 04:05:05 2010
@@ -21,6 +21,8 @@ import org.fusesource.hawtbuf._
import org.apache.activemq.Service
import org.fusesource.hawtdispatch.{Retained}
import org.apache.activemq.apollo.store._
+import org.apache.activemq.apollo.broker.Reporter
+import org.apache.activemq.apollo.dto.StoreDTO
/**
* A StoreTransaction is used to perform persistent
@@ -38,7 +40,7 @@ trait StoreBatch extends Retained {
* Assigns the delivery a store id if it did not already
* have one assigned.
*/
- def store(delivery:MessageRecord)
+ def store(delivery:MessageRecord):Long
/**
* Adds a delivery to a specified queue at a the specified position in the queue.
@@ -57,6 +59,7 @@ trait StoreBatch extends Retained {
*/
trait Store extends Service {
+ def configure(config: StoreDTO, reporter:Reporter):Unit
/**
* Stores a queue, calls back with a unquie id for the stored queue.
Added: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/StoreFactory.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/StoreFactory.scala?rev=961124&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/StoreFactory.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/StoreFactory.scala Wed Jul 7 04:05:05 2010
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.store
+
+import org.apache.activemq.apollo.util.ClassFinder
+import org.apache.activemq.broker.store.Store
+import org.apache.activemq.apollo.dto.StoreDTO
+import org.apache.activemq.apollo.broker.{ReporterLevel, Reporter}
+import ReporterLevel._
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class StoreFactory
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object StoreFactory {
+
+ val finder = ClassFinder[SPI]("META-INF/services/org.apache.activemq.apollo/stores")
+ var storesSPI = List[SPI]()
+
+ trait SPI {
+ def create(config:StoreDTO):Store
+ def validate(config: StoreDTO, reporter:Reporter):ReporterLevel
+ }
+
+ finder.find.foreach{ clazz =>
+ try {
+ val SPI = clazz.newInstance.asInstanceOf[SPI]
+ storesSPI ::= SPI
+ } catch {
+ case e:Throwable =>
+ e.printStackTrace
+ }
+ }
+
+ def create(config:StoreDTO):Store = {
+ if( config == null ) {
+ return null
+ }
+ storesSPI.foreach { spi=>
+ val rc = spi.create(config)
+ if( rc!=null ) {
+ return rc
+ }
+ }
+ throw new IllegalArgumentException("Uknonwn store configuration type: "+config.getClass)
+ }
+
+
+ def validate(config: StoreDTO, reporter:Reporter):ReporterLevel = {
+ if( config == null ) {
+ return INFO
+ } else {
+ storesSPI.foreach { spi=>
+ val rc = spi.validate(config, reporter)
+ if( rc!=null ) {
+ return rc
+ }
+ }
+ }
+ reporter.report(ERROR, "Uknonwn store configuration type: "+config.getClass)
+ ERROR
+ }
+
+}
\ No newline at end of file
Added: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/ClassFinder.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/ClassFinder.scala?rev=961124&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/ClassFinder.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/ClassFinder.scala Wed Jul 7 04:05:05 2010
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.util
+
+import java.io.InputStream
+import java.util.Properties
+
+/**
+ * <p>
+ * Used to discover classes using the META-INF discovery trick.
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+case class ClassFinder[T](path:String, loaders:Seq[ClassLoader]=Thread.currentThread.getContextClassLoader::Nil) {
+
+ def find(): List[Class[T]] = {
+ var classes = List[Class[T]]()
+ loaders.foreach { loader=>
+
+ val resources = loader.getResources(path)
+ var classNames: List[String] = Nil
+ while(resources.hasMoreElements) {
+ val url = resources.nextElement;
+ val p = loadProperties(url.openStream)
+ val enum = p.keys
+ while (enum.hasMoreElements) {
+ classNames = classNames ::: enum.nextElement.asInstanceOf[String] :: Nil
+ }
+ }
+ classNames = classNames.removeDuplicates
+
+ classes :::= classNames.map { name=>
+ loader.loadClass(name).asInstanceOf[Class[T]]
+ }
+
+ }
+
+ return classes.removeDuplicates
+ }
+
+ private def loadProperties(is:InputStream):Properties = {
+ if( is==null ) {
+ return null;
+ }
+ try {
+ val p = new Properties()
+ p.load(is);
+ return p
+ } catch {
+ case e:Exception =>
+ return null
+ } finally {
+ try {
+ is.close()
+ } catch {
+ case _ =>
+ }
+ }
+ }
+}
\ No newline at end of file