You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:05:54 UTC
svn commit: r961126 - in /activemq/sandbox/activemq-apollo-actor:
activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/
activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/
activemq-cassandra/src/main/resources/ activemq-c...
Author: chirino
Date: Wed Jul 7 04:05:53 2010
New Revision: 961126
URL: http://svn.apache.org/viewvc?rev=961126&view=rev
Log:
- swapping is enabled now
- store can be purged on startup
- couple of shutdown bug fixes in TcpTransport
Added:
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/resources/
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/resources/META-INF/
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/resources/META-INF/services/
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/resources/META-INF/services/org.apache.activemq.apollo/
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/resources/META-INF/services/org.apache.activemq.apollo/stores
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala
activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala
activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala
activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.java
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961126&r1=961125&r2=961126&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul 7 04:05:53 2010
@@ -26,6 +26,7 @@ import org.apache.activemq.util.list.{Li
import org.apache.activemq.broker.store.{StoreBatch}
import protocol.ProtocolFactory
import org.apache.activemq.apollo.store.{QueueEntryRecord, MessageRecord}
+import java.util.concurrent.TimeUnit
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -99,6 +100,7 @@ class Queue(val host: VirtualHost, val d
val entries = new LinkedNodeList[QueueEntry]()
entries.addFirst(headEntry)
+ var loadingSize = 0
var flushingSize = 0
var storeId: Long = -1L
@@ -108,18 +110,26 @@ class Queue(val host: VirtualHost, val d
var tune_max_size = 1024 * 32
var tune_subscription_prefetch = 1024*32
var tune_max_outbound_size = 1024 * 1204 * 5
+ var tune_swap_delay = 100L
+
+ var enqueue_counter = 0L
+ var dequeue_counter = 0L
+ var enqueue_size = 0L
+ var dequeue_size = 0L
private var size = 0
def restore(storeId: Long, records:Seq[QueueEntryRecord]) = ^{
this.storeId = storeId
records.foreach { qer =>
- val entry = new QueueEntry(Queue.this).stored(qer)
+ val entry = new QueueEntry(Queue.this).flushed(qer)
entries.addLast(entry)
}
if( !entries.isEmpty ) {
message_seq_counter = entries.getTail.seq
}
+ counter = records.size
+ enqueue_counter += records.size
debug("restored: "+records.size )
} >>: dispatchQueue
@@ -149,24 +159,46 @@ class Queue(val host: VirtualHost, val d
delivery.storeBatch.release
}
- size += entry.value.size
+ size += entry.size
entries.addLast(entry)
counter += 1;
+ enqueue_counter += 1
+ enqueue_size += entry.size
- if( full && host.store!=null ) {
-// swap
+ var swap_check = false
+ if( !entry.hasSubs ) {
+ // we flush the entry out right away if it looks
+ // it wont be needed.
+ if( entry.getPrevious.isFlushedOrFlushing ) {
+ flushingSize += entry.flush
+ } else {
+ swap_check=true
+ }
+ } else {
+ // entry.dispatch==null if the entry was fully dispatched
+ swap_check = entry.dispatch!=null
}
- if( entry.hasSubs ) {
- entry.dispatch
+ // Does it look like we need to start swapping to make room
+ // for more messages?
+ if( swap_check && host.store!=null && full ) {
+ val wasAt = dequeue_size
+ dispatchQueue.dispatchAfter(tune_swap_delay, TimeUnit.MILLISECONDS, ^{
+ // start swapping if was still blocked after a short delay
+ if( dequeue_size == wasAt && full ) {
+ println("swapping...")
+ swap
+ }
+ })
}
+
true
}
}
}
def ack(entry: QueueEntry, sb:StoreBatch) = {
- if (entry.value.ref != -1) {
+ if (entry.ref != -1) {
val storeBatch = if( sb == null ) {
host.store.createStoreBatch
} else {
@@ -181,13 +213,13 @@ class Queue(val host: VirtualHost, val d
sb.release
}
+ dequeue_counter += 1
counter -= 1
- size -= entry.value.size
+ dequeue_size += entry.size
+ size -= entry.size
entry.tombstone
- if (counter == 0) {
- messages.refiller.run
- }
+ messages.refiller.run
}
@@ -301,7 +333,18 @@ class Queue(val host: VirtualHost, val d
rc
}
- def swap() = {
+ /**
+ * Prioritizes all the queue entries so that entries most likely to be consumed
+ * next are a higher priority. All messages with the highest priority are loaded
+ * and messages with the lowest priority are flushed to make room to accept more
+ * messages from the producer.
+ */
+ def swap():Unit = {
+
+ if( !host.serviceState.isStarted ) {
+ return
+ }
+
class Prio(val entry:QueueEntry) extends Comparable[Prio] {
var value = 0
def compareTo(o: Prio) = o.value - value
@@ -311,7 +354,7 @@ class Queue(val host: VirtualHost, val d
var entry = entries.getHead
while( entry!=null ) {
- if( entry.value.asTombstone == null ) {
+ if( entry.asTombstone == null ) {
prios.add(new Prio(entry))
}
entry = entry.getNext
@@ -325,7 +368,7 @@ class Queue(val host: VirtualHost, val d
def prioritize(i:Int, size:Int, p:Int):Unit = {
val prio = prios.get(i)
prio.value += p
- val remainingSize = size - prio.entry.value.size
+ val remainingSize = size - prio.entry.size
if( remainingSize > 0 ) {
val next = i + 1
if( next < prios.size ) {
@@ -363,27 +406,10 @@ class Queue(val host: VirtualHost, val d
val prio = prios.get(i)
val entry = prio.entry
if( remaining > 0 ) {
- remaining -= entry.value.size
- val stored = entry.value.asStored
- if( stored!=null && !stored.loading) {
- stored.load
- }
+ loadingSize += entry.load
+ remaining -= entry.size
} else {
- // Chuck the reset out...
- val loaded = entry.value.asLoaded
- if( loaded!=null ) {
- var ref = loaded.delivery.storeKey
- if( ref == -1 ) {
- val tx = host.store.createStoreBatch
- loaded.delivery.storeKey = tx.store(loaded.delivery.createMessageRecord)
- tx.enqueue(entry.createQueueEntryRecord)
- tx.release
- }
- flushingSize += entry.value.size
- host.store.flushMessage(ref) {
- store_flush_source.merge(entry)
- }
- }
+ flushingSize += entry.flush
}
i += 1
}
@@ -391,20 +417,22 @@ class Queue(val host: VirtualHost, val d
def drain_store_loads() = {
val data = store_load_source.getData
- data.foreach { case (entry,stored) =>
+ data.foreach { case (entry,flushed) =>
+
+ loadingSize -= entry.size
val delivery = new Delivery()
- delivery.message = ProtocolFactory.get(stored.protocol).decode(stored.value)
- delivery.size = stored.size
- delivery.storeKey = stored.key
+ delivery.message = ProtocolFactory.get(flushed.protocol).decode(flushed.value)
+ delivery.size = flushed.size
+ delivery.storeKey = flushed.key
entry.loaded(delivery)
- size += entry.value.size
+ size += entry.size
}
- data.foreach { case (entry,stored) =>
+ data.foreach { case (entry,_) =>
if( entry.hasSubs ) {
entry.run
}
@@ -413,22 +441,25 @@ class Queue(val host: VirtualHost, val d
def drain_store_flushes() = {
store_flush_source.getData.foreach { entry =>
- flushingSize -= entry.value.size
+ flushingSize -= entry.size
// by the time we get called back, subs my be interested in the entry
// or it may have been acked.
- if( !entry.hasSubs && entry.value.asLoaded!=null ) {
- size += entry.value.size
- entry.stored
+ if( !entry.hasSubs && entry.asLoaded!=null ) {
+ size -= entry.size
+ entry.flushed
}
}
+
+ messages.refiller.run
+
}
}
object QueueEntry extends Sizer[QueueEntry] {
- def size(value: QueueEntry): Int = value.value.size
+ def size(value: QueueEntry): Int = value.size
}
class QueueEntry(val queue:Queue) extends LinkedNode[QueueEntry] with Comparable[QueueEntry] with Runnable {
@@ -463,15 +494,15 @@ class QueueEntry(val queue:Queue) extend
this
}
- def stored() = {
+ def flushed() = {
val loaded = value.asLoaded
- this.value = new Stored(loaded.delivery.storeKey, loaded.size)
+ this.value = new Flushed(loaded.delivery.storeKey, loaded.size)
this
}
- def stored(qer:QueueEntryRecord) = {
+ def flushed(qer:QueueEntryRecord) = {
this.seq = qer.queueSeq
- this.value = new Stored(qer.messageKey, qer.size)
+ this.value = new Flushed(qer.messageKey, qer.size)
this
}
@@ -519,7 +550,6 @@ class QueueEntry(val queue:Queue) extend
this
}
-
def hasSubs = !(competing == Nil && browsing == Nil)
def run() = {
@@ -529,15 +559,6 @@ class QueueEntry(val queue:Queue) extend
}
}
- def dispatch():QueueEntry = {
- if( value == null ) {
- // tail entry can't be dispatched.
- null
- } else {
- value.dispatch
- }
- }
-
def addBrowsing(l:List[Subscription]) = {
l.foreach(x=>x.position(this))
browsing :::= l
@@ -566,6 +587,25 @@ class QueueEntry(val queue:Queue) extend
entry
}
+ def size = this.value.size
+ def flush = this.value.flush
+ def load = this.value.load
+ def ref = this.value.ref
+
+ def asTombstone = this.value.asTombstone
+ def asFlushed = this.value.asFlushed
+ def asLoaded = this.value.asLoaded
+ def isFlushedOrFlushing = value.isFlushedOrFlushing
+
+ def dispatch():QueueEntry = {
+ if( value == null ) {
+ // tail entry can't be dispatched.
+ null
+ } else {
+ value.dispatch
+ }
+ }
+
trait EntryType {
def size:Int
@@ -573,8 +613,12 @@ class QueueEntry(val queue:Queue) extend
def ref:Long
def asTombstone:Tombstone = null
- def asStored:Stored = null
+ def asFlushed:Flushed = null
def asLoaded:Loaded = null
+
+ def flush:Int = 0
+ def load:Int = 0
+ def isFlushedOrFlushing = false
}
class Tombstone extends EntryType {
@@ -597,13 +641,15 @@ class QueueEntry(val queue:Queue) extend
}
- class Stored(val ref:Long, val size:Int) extends EntryType {
+ class Flushed(val ref:Long, val size:Int) extends EntryType {
var loading = false
- override def asStored = this
+ override def asFlushed = this
+
+ override def isFlushedOrFlushing = true
- // Stored entries can't be dispatched until
+ // Flushed entries can't be dispatched until
// they get loaded.
def dispatch():QueueEntry = {
if( !loading ) {
@@ -613,10 +659,10 @@ class QueueEntry(val queue:Queue) extend
// make sure the next few entries are loaded too..
var cur = getNext
while( remaining>0 && cur!=null ) {
- remaining -= cur.value.size
- val stored = cur.value.asStored
- if( stored!=null && !stored.loading) {
- stored.load
+ remaining -= cur.size
+ val flushed = cur.asFlushed
+ if( flushed!=null && !flushed.loading) {
+ flushed.load
}
cur = getNext
}
@@ -625,15 +671,20 @@ class QueueEntry(val queue:Queue) extend
null
}
- def load() = {
- // start loading it back...
- loading = true
- queue.host.store.loadMessage(ref) { delivery =>
- // pass off to a source so it can aggregate multiple
- // loads to reduce cross thread synchronization
- if( delivery.isDefined ) {
- queue.store_load_source.merge((QueueEntry.this, delivery.get))
+ override def load():Int = {
+ if( loading ) {
+ 0
+ } else {
+ // start loading it back...
+ loading = true
+ queue.host.store.loadMessage(ref) { delivery =>
+ // pass off to a source so it can aggregate multiple
+ // loads to reduce cross thread synchronization
+ if( delivery.isDefined ) {
+ queue.store_load_source.merge((QueueEntry.this, delivery.get))
+ }
}
+ size
}
}
}
@@ -643,10 +694,39 @@ class QueueEntry(val queue:Queue) extend
var aquired = false
def ref = delivery.storeKey
def size = delivery.size
- def flushing = false
+ var flushing = false
+ override def isFlushedOrFlushing = {
+ flushing
+ }
+
override def asLoaded = this
+ def store() = {
+ if( delivery.storeKey == -1 ) {
+ val tx = queue.host.store.createStoreBatch
+ delivery.storeKey = tx.store(delivery.createMessageRecord)
+ tx.enqueue(createQueueEntryRecord)
+ tx.release
+ true
+ } else {
+ false
+ }
+ }
+
+ override def flush():Int = {
+ if( flushing ) {
+ 0
+ } else {
+ flushing=true
+ store
+ queue.host.store.flushMessage(ref) {
+ queue.store_flush_source.merge(QueueEntry.this)
+ }
+ size
+ }
+ }
+
def dispatch():QueueEntry = {
if( delivery==null ) {
// can't dispatch untill the delivery is set.
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=961126&r1=961125&r2=961126&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jul 7 04:05:53 2010
@@ -121,33 +121,39 @@ class VirtualHost(val broker: Broker) ex
val tracker = new LoggingTracker("virtual host startup", dispatchQueue)
store = StoreFactory.create(config.store)
if( store!=null ) {
- val task = tracker.task("store list queue keys")
+ val task = tracker.task("store startup")
store.start(^{
- store.listQueues { queueKeys =>
- for( queueKey <- queueKeys) {
- val task = tracker.task("store load queue key: "+queueKey)
- // Use a global queue to so we concurrently restore
- // the queues.
- globalQueue {
- store.getQueueStatus(queueKey) { x =>
- x match {
- case Some(info)=>
- store.getQueueEntries(queueKey) { entries=>
- dispatchQueue ^{
- val dest = DestinationParser.parse(info.record.name, destination_parser_options)
- val queue = new Queue(this, dest)
- queue.restore(queueKey, entries)
- queues.put(dest.getName, queue)
- task.run
+ if( config.purgeOnStartup ) {
+ store.purge {
+ task.run
+ }
+ } else {
+ store.listQueues { queueKeys =>
+ for( queueKey <- queueKeys) {
+ val task = tracker.task("store load queue key: "+queueKey)
+ // Use a global queue to so we concurrently restore
+ // the queues.
+ globalQueue {
+ store.getQueueStatus(queueKey) { x =>
+ x match {
+ case Some(info)=>
+ store.getQueueEntries(queueKey) { entries=>
+ dispatchQueue ^{
+ val dest = DestinationParser.parse(info.record.name, destination_parser_options)
+ val queue = new Queue(this, dest)
+ queue.restore(queueKey, entries)
+ queues.put(dest.getName, queue)
+ task.run
+ }
}
+ case _ =>
+ task.run
}
- case _ =>
- task.run
}
}
}
+ task.run
}
- task.run
}
});
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala?rev=961126&r1=961125&r2=961126&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala Wed Jul 7 04:05:53 2010
@@ -35,6 +35,7 @@ import _root_.org.fusesource.hawtbuf._
import java.io.{PrintStream, FileOutputStream, File, IOException}
import org.apache.activemq.util.{IOHelper, ProcessSupport}
import scala.util.matching.Regex
+import org.apache.activemq.apollo.dto.BrokerDTO
object BaseBrokerPerfSupport {
var PERFORMANCE_SAMPLES = Integer.parseInt(System.getProperty("PERFORMANCE_SAMPLES", "6"))
@@ -457,25 +458,29 @@ abstract class BaseBrokerPerfSupport ext
}
def stopServices() = {
+ println("waiting for services to stop");
stopping.set(true);
- val tracker = new LoggingTracker("test shutdown")
+ var tracker = new LoggingTracker("broker shutdown")
for (broker <- brokers) {
- broker.stop(tracker.task("broker"));
+ tracker.stop(broker)
}
+ tracker.await
+ tracker = new LoggingTracker("producer shutdown")
for (connection <- producers) {
- connection.stop(tracker.task(connection.toString));
+ tracker.stop(connection)
}
+ tracker.await
+ tracker = new LoggingTracker("consumer shutdown")
for (connection <- consumers) {
- connection.stop(tracker.task(connection.toString));
+ tracker.stop(connection)
}
- println("waiting for services to stop");
tracker.await
}
def startBrokers() = {
val tracker = new LoggingTracker("test broker startup")
for (broker <- brokers) {
- broker.start(tracker.task("broker"));
+ tracker.start(broker)
}
tracker.await
}
@@ -484,12 +489,14 @@ abstract class BaseBrokerPerfSupport ext
def startClients() = {
var tracker = new LoggingTracker("test consumer startup")
for (connection <- consumers) {
- connection.start(tracker.task(connection.toString));
+ tracker.start(connection)
}
tracker.await
+ // let the consumers drain the destination for a bit...
+ Thread.sleep(1000)
tracker = new LoggingTracker("test producer startup")
for (connection <- producers) {
- connection.start(tracker.task(connection.toString));
+ tracker.start(connection)
}
tracker.await
}
@@ -555,12 +562,18 @@ abstract class BaseBrokerPerfSupport ext
def getRemoteWireFormat(): String
def createBroker(name: String, bindURI: String, connectUri: String): Broker = {
- val broker = new Broker()
- broker.config = Broker.default
- val connector = broker.config.connectors.get(0)
+
+ val config = Broker.default
+ val connector = config.connectors.get(0)
connector.bind = bindURI
connector.advertise = connectUri
connector.protocol = getBrokerWireFormat
+
+ val host = config.virtualHosts.get(0)
+ host.purgeOnStartup = true
+
+ val broker = new Broker()
+ broker.config = config
broker
}
@@ -604,9 +617,12 @@ abstract class RemoteConsumer extends Co
}
override def onTransportFailure(error: IOException) = {
- if (!brokerPerfTest.stopping.get()) {
- System.err.println("Client Async Error:");
- error.printStackTrace();
+ if (!stopped) {
+ if(brokerPerfTest.stopping.get()) {
+ transport.stop
+ } else {
+ onFailure(error);
+ }
}
}
Added: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/resources/META-INF/services/org.apache.activemq.apollo/stores
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/resources/META-INF/services/org.apache.activemq.apollo/stores?rev=961126&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/resources/META-INF/services/org.apache.activemq.apollo/stores (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/resources/META-INF/services/org.apache.activemq.apollo/stores Wed Jul 7 04:05:53 2010
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+org.apache.activemq.broker.store.cassandra.CassandraStoreSPI
\ No newline at end of file
Modified: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala?rev=961126&r1=961125&r2=961126&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala Wed Jul 7 04:05:53 2010
@@ -37,7 +37,7 @@ class CassandraClient() {
protected var pool: SessionPool = null
def start() = {
- val params = new PoolParams(10, ExhaustionPolicy.Fail, 500L, 6, 2)
+ val params = new PoolParams(20, ExhaustionPolicy.Fail, 500L, 6, 2)
pool = new SessionPool(hosts, params, Consistency.One)
}
@@ -98,6 +98,18 @@ class CassandraClient() {
pb.freeze.toUnframedByteArray
}
+ def purge() = {
+ withSession {
+ session =>
+ session.list(schema.queue_name).map { x =>
+ val qid: Long = x.name
+ session.remove(schema.entries \ qid)
+ }
+ session.remove(schema.queue_name)
+ session.remove(schema.message_data)
+ }
+ }
+
def addQueue(record: QueueRecord) = {
withSession {
session =>
Modified: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala?rev=961126&r1=961125&r2=961126&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala Wed Jul 7 04:05:53 2010
@@ -93,7 +93,7 @@ class CassandraStore extends Store with
}
protected def _start(onCompleted: Runnable) = {
- executor_pool = Executors.newCachedThreadPool
+ executor_pool = Executors.newFixedThreadPool(20)
client.schema = Schema(config.keyspace)
// TODO: move some of this parsing code into validation too.
@@ -112,12 +112,12 @@ class CassandraStore extends Store with
}
protected def _stop(onCompleted: Runnable) = {
- client.stop
new Thread() {
override def run = {
executor_pool.shutdown
executor_pool.awaitTermination(1, TimeUnit.DAYS)
executor_pool = null
+ client.stop
onCompleted.run
}
}.start
@@ -129,6 +129,16 @@ class CassandraStore extends Store with
//
/////////////////////////////////////////////////////////////////////
+ /**
+ * Deletes all stored data from the store.
+ */
+ def purge(cb: =>Unit) = {
+ executor_pool ^{
+ client.purge
+ cb
+ }
+ }
+
def addQueue(record: QueueRecord)(cb: (Option[Long]) => Unit) = {
val key = next_queue_key.incrementAndGet
record.key = key
@@ -333,7 +343,12 @@ class CassandraStore extends Store with
flush_source.setEventHandler(^{drain_flushes});
flush_source.resume
- def drain_flushes = {
+ def drain_flushes:Unit = {
+
+ if( !serviceState.isStarted ) {
+ return
+ }
+
val txs = flush_source.getData.flatMap{ tx_id =>
val tx = delayedTransactions.remove(tx_id)
// Message may be flushed or canceled before the timeout flush event..
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java?rev=961126&r1=961125&r2=961126&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java Wed Jul 7 04:05:53 2010
@@ -40,5 +40,9 @@ public class VirtualHostDTO extends Serv
@XmlAttribute(name="auto-create-queues")
public boolean autoCreateQueues = true;
-
+ /**
+ * Should queues be purged on startup?
+ */
+ @XmlAttribute(name="purge-on-startup")
+ public boolean purgeOnStartup = false;
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala?rev=961126&r1=961125&r2=961126&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala Wed Jul 7 04:05:53 2010
@@ -115,6 +115,10 @@ class HawtDBStore extends BaseService wi
//
/////////////////////////////////////////////////////////////////////
+
+ def purge(cb: =>Unit) = {
+ }
+
def addQueue(record: QueueRecord)(cb: (Option[Long]) => Unit) = {}
def getQueueStatus(id: Long)(cb: (Option[QueueStatus]) => Unit) = {}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala?rev=961126&r1=961125&r2=961126&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala Wed Jul 7 04:05:53 2010
@@ -62,7 +62,12 @@ trait Store extends Service {
def configure(config: StoreDTO, reporter:Reporter):Unit
/**
- * Stores a queue, calls back with a unquie id for the stored queue.
+ * Deletes all stored data from the store.
+ */
+ def purge(cb: =>Unit):Unit
+
+ /**
+ * Stores a queue, calls back with a unquie id for the stored queue.
*/
def addQueue(record:QueueRecord)(cb:(Option[Long])=>Unit):Unit
Modified: activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=961126&r1=961125&r2=961126&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Wed Jul 7 04:05:53 2010
@@ -157,6 +157,7 @@ public class TcpTransport extends BaseSe
disposed = true;
dispose();
}
+ onCompleted.run();
}
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala?rev=961126&r1=961125&r2=961126&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala Wed Jul 7 04:05:53 2010
@@ -108,39 +108,42 @@ trait BaseService extends Service with L
}
} |>>: dispatchQueue
- final def stop(onCompleted:Runnable) = ^{
- def done = {
- if( onCompleted!=null ) {
- onCompleted.run
- }
- }
- _serviceState match {
- case STARTED =>
- val state = new STOPPING
- state << onCompleted
- _serviceState = state
- try {
- _stop(^ {
- _serviceState = STOPPED
- state.done
- })
- }
- catch {
- case e:Exception =>
- error(e, "Stop failed due to: %s", e)
- _serviceFailure = e
- _serviceState = FAILED
- state.done
+ final def stop(onCompleted:Runnable) = {
+ def stop_task = {
+ def done = {
+ if( onCompleted!=null ) {
+ onCompleted.run
}
- case state:STOPPING =>
- state << onCompleted
- case STOPPED =>
- done
- case state =>
- done
- error("Stop should not be called from state: %s", state);
+ }
+ _serviceState match {
+ case STARTED =>
+ val state = new STOPPING
+ state << onCompleted
+ _serviceState = state
+ try {
+ _stop(^ {
+ _serviceState = STOPPED
+ state.done
+ })
+ }
+ catch {
+ case e:Exception =>
+ error(e, "Stop failed due to: %s", e)
+ _serviceFailure = e
+ _serviceState = FAILED
+ state.done
+ }
+ case state:STOPPING =>
+ state << onCompleted
+ case STOPPED =>
+ done
+ case state =>
+ done
+ error("Stop should not be called from state: %s", state);
+ }
}
- } |>>: dispatchQueue
+ ^{ stop_task } |>>: dispatchQueue
+ }
protected def _start(onCompleted:Runnable)
protected def _stop(onCompleted:Runnable)
Modified: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.java?rev=961126&r1=961125&r2=961126&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.java Wed Jul 7 04:05:53 2010
@@ -123,7 +123,7 @@ public abstract class BaseService implem
}
});
} else if (_serviceState instanceof STOPPING) {
- ((STARTING) _serviceState).add(onCompleted);
+ ((STOPPING) _serviceState).add(onCompleted);
} else if (_serviceState == STOPPED) {
if (onCompleted != null) {
onCompleted.run();