You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:09:06 UTC
svn commit: r961136 - in /activemq/sandbox/activemq-apollo-actor:
activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/
activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/
activemq-stomp/src/main/scala/org/apache/activem...
Author: chirino
Date: Wed Jul 7 04:09:06 2010
New Revision: 961136
URL: http://svn.apache.org/viewvc?rev=961136&view=rev
Log:
- fix deprecation warnnings.
- fix persistent slow down if queue mem limit was large
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala
activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/ClassFinder.scala
activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/TimeCounter.scala
activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/util/IntrospectionSupport.java
activemq/sandbox/activemq-apollo-actor/activemq-util/src/test/scala/org/apache/activemq/apollo/CombinationTestSupport.java
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961136&r1=961135&r2=961136&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul 7 04:09:06 2010
@@ -107,7 +107,7 @@ class Queue(val host: VirtualHost, val d
/**
* Tunning options.
*/
- var tune_max_size = 1024 * 32
+ var tune_max_size = 1024 * 1024 * 4
var tune_subscription_prefetch = 1024*32
var tune_max_outbound_size = 1024 * 1204 * 5
var tune_swap_delay = 100L
@@ -170,12 +170,33 @@ class Queue(val host: VirtualHost, val d
if( !entry.hasSubs ) {
// we flush the entry out right away if it looks
// it wont be needed.
+
if( entry.getPrevious.isFlushedOrFlushing ) {
+ // in this case take it out of memory too...
flushingSize += entry.flush
} else {
+ if( slow_consumers ) {
+ if( delivery.storeBatch!=null ) {
+ // just make it hit the disk quick.. but keep it in memory.
+ delivery.storeBatch.eagerFlush(^{})
+ }
+ } else {
+ if( !checking_for_slow_consumers ) {
+ checking_for_slow_consumers=true
+ val tail_consumer_counter_copy = tail_consumer_counter
+ dispatchQueue.dispatchAfter(tune_swap_delay, TimeUnit.MILLISECONDS, ^{
+ if( tail_consumer_counter_copy == tail_consumer_counter ) {
+ slow_consumers = true
+ }
+ checking_for_slow_consumers = false
+ })
+ }
+ }
swap_check=true
}
} else {
+ slow_consumers = false
+ tail_consumer_counter += 1
// entry.dispatch==null if the entry was fully dispatched
swap_check = entry.dispatch!=null
}
@@ -204,6 +225,10 @@ class Queue(val host: VirtualHost, val d
}
}
+ var tail_consumer_counter = 0L
+ var checking_for_slow_consumers = false
+ var slow_consumers = false
+
def ack(entry: QueueEntry, sb:StoreBatch) = {
if (entry.ref != -1) {
val storeBatch = if( sb == null ) {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala?rev=961136&r1=961135&r2=961136&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala Wed Jul 7 04:09:06 2010
@@ -195,22 +195,21 @@ class HawtDBClient(hawtDBStore: HawtDBSt
flush
}
- def addQueue(record: QueueRecord) = {
+ def addQueue(record: QueueRecord, callback:Runnable) = {
val update = new AddQueue.Bean()
update.setKey(record.key)
update.setName(record.name)
update.setQueueType(record.queueType)
- store(update)
+ _store(update, callback)
}
- def removeQueue(queueKey: Long):Boolean = {
+ def removeQueue(queueKey: Long, callback:Runnable) = {
val update = new RemoveQueue.Bean()
update.setKey(queueKey)
- store(update)
- true
+ _store(update, callback)
}
- def store(txs: Seq[HawtDBStore#HawtDBBatch]) {
+ def store(txs: Seq[HawtDBStore#HawtDBBatch], callback:Runnable) {
var batch = ListBuffer[TypeCreatable]()
txs.foreach {
tx =>
@@ -233,15 +232,12 @@ class HawtDBClient(hawtDBStore: HawtDBSt
}
}
}
- store(batch, ^{
- txs.foreach { tx=>
- tx.onPerformed
- }
- })
+ _store(batch, callback)
}
+
def purge(callback: Runnable) = {
- store(new Purge.Bean(), callback)
+ _store(new Purge.Bean(), callback)
}
def listQueues: Seq[Long] = {
@@ -344,29 +340,17 @@ class HawtDBClient(hawtDBStore: HawtDBSt
}
}
- private def store(updates: List[TypeCreatable]): Unit = {
- val tracker = new TaskTracker("storing")
- store(updates, tracker.task(updates))
- tracker.await
- }
-
- private def store(update: TypeCreatable): Unit = {
- val tracker = new TaskTracker("storing")
- store(update, tracker.task(update))
- tracker.await
- }
-
- private def store(updates: Seq[TypeCreatable], onComplete: Runnable): Unit = {
+ private def _store(updates: Seq[TypeCreatable], onComplete: Runnable): Unit = {
val batch = next_batch_id
begin(batch)
updates.foreach {
update =>
- store(batch, update, null)
+ _store(batch, update, null)
}
commit(batch, onComplete)
}
- private def store(update: TypeCreatable, onComplete: Runnable): Unit = store(-1, update, onComplete)
+ private def _store(update: TypeCreatable, onComplete: Runnable): Unit = _store(-1, update, onComplete)
/**
* All updated are are funneled through this method. The updates are logged to
@@ -375,7 +359,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
*
* @throws IOException
*/
- private def store(batch: Int, update: TypeCreatable, onComplete: Runnable): Unit = {
+ private def _store(batch: Int, update: TypeCreatable, onComplete: Runnable): Unit = {
val kind = update.asInstanceOf[TypeCreatable]
val frozen = update.freeze
val baos = new DataByteArrayOutputStream(frozen.serializedSizeUnframed + 1)
@@ -452,7 +436,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
val start = System.currentTimeMillis()
incrementalRecover
- store(new AddTrace.Bean().setMessage("RECOVERED"), ^ {
+ _store(new AddTrace.Bean().setMessage("RECOVERED"), ^ {
// Rollback any batches that did not complete.
batches.keysIterator.foreach {
batch =>
Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala?rev=961136&r1=961135&r2=961136&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala Wed Jul 7 04:09:06 2010
@@ -21,8 +21,6 @@ import org.fusesource.hawtdispatch.BaseR
import java.util.concurrent.atomic.AtomicLong
import collection.mutable.ListBuffer
import java.util.HashMap
-import java.util.concurrent.{TimeUnit, Executors, ExecutorService}
-import org.apache.activemq.apollo.util.IntCounter
import org.apache.activemq.apollo.store.{QueueEntryRecord, MessageRecord, QueueStatus, QueueRecord}
import org.apache.activemq.apollo.dto.{HawtDBStoreDTO, StoreDTO}
import collection.{JavaConversions, Seq}
@@ -30,6 +28,8 @@ import org.fusesource.hawtdispatch.Scala
import org.apache.activemq.apollo.broker._
import java.io.File
import ReporterLevel._
+import org.apache.activemq.apollo.util.{TimeCounter, IntCounter}
+import java.util.concurrent._
object HawtDBStore extends Log {
val DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
@@ -91,7 +91,13 @@ class HawtDBStore extends Store with Bas
}
protected def _start(onCompleted: Runnable) = {
- executor_pool = Executors.newFixedThreadPool(20)
+ executor_pool = new ThreadPoolExecutor(4, 20, 1, TimeUnit.SECONDS, new LinkedBlockingQueue[Runnable](), new ThreadFactory(){
+ def newThread(r: Runnable) = {
+ val rc = new Thread(r, "hawtdb store client")
+ rc.setDaemon(true)
+ rc
+ }
+ })
client.config = config
schedualDisplayStats
executor_pool {
@@ -135,16 +141,11 @@ class HawtDBStore extends Store with Bas
def addQueue(record: QueueRecord)(callback: (Option[Long]) => Unit) = {
val key = next_queue_key.getAndIncrement
record.key = key
- executor_pool ^{
- client.addQueue(record)
- callback(Some(key))
- }
+ client.addQueue(record, ^{ callback(Some(key)) })
}
def removeQueue(queueKey: Long)(callback: (Boolean) => Unit) = {
- executor_pool ^{
- callback(client.removeQueue(queueKey))
- }
+ client.removeQueue(queueKey,^{ callback(true) })
}
def getQueueStatus(id: Long)(callback: (Option[QueueStatus]) => Unit) = {
@@ -318,8 +319,9 @@ class HawtDBStore extends Store with Bas
def rate(x:Long, y:Long):Float = ((y-x)*1000.0f)/TimeUnit.NANOSECONDS.toMillis(et-st)
- info("metrics: cancled: { messages: %,.3f, enqeues: %,.3f }, flushed: { messages: %,.3f, enqeues: %,.3f }, commit latency: %,.3f",
- rate(ss._1,es._1), rate(ss._2,es._2), rate(ss._3,es._3), rate(ss._4,es._4), avgCommitLatency)
+
+ info("metrics: cancled: { messages: %,.3f, enqeues: %,.3f }, flushed: { messages: %,.3f, enqeues: %,.3f }, commit latency: %,.3f, store latency: %,.3f",
+ rate(ss._1,es._1), rate(ss._2,es._2), rate(ss._3,es._3), rate(ss._4,es._4), avgCommitLatency, storeLatency(true).avgTime(TimeUnit.MILLISECONDS) )
schedualDisplayStats
}
}
@@ -390,6 +392,8 @@ class HawtDBStore extends Store with Bas
flush_source.setEventHandler(^{drain_flushes});
flush_source.resume
+ val storeLatency = new TimeCounter
+
def drain_flushes:Unit = {
if( !serviceState.isStarted ) {
@@ -420,7 +424,16 @@ class HawtDBStore extends Store with Bas
}
if( !txs.isEmpty ) {
- client.store(txs)
+ storeLatency.start { end=>
+ client.store(txs, ^{
+ dispatchQueue {
+ end()
+ txs.foreach { tx=>
+ tx.onPerformed
+ }
+ }
+ })
+ }
}
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala?rev=961136&r1=961135&r2=961136&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala Wed Jul 7 04:09:06 2010
@@ -39,11 +39,11 @@ object StompBroker {
connector.protocol = "stomp"
connector.advertise = uri
- val store = new CassandraStoreDTO
- store.hosts.add("localhost:9160")
+// val store = new CassandraStoreDTO
+// store.hosts.add("localhost:9160")
-// val store = new HawtDBStoreDTO
-// store.directory = new File("activemq-data")
+ val store = new HawtDBStoreDTO
+ store.directory = new File("activemq-data")
broker.config.virtualHosts.get(0).store = store
Modified: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/ClassFinder.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/ClassFinder.scala?rev=961136&r1=961135&r2=961136&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/ClassFinder.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/ClassFinder.scala Wed Jul 7 04:09:06 2010
@@ -42,7 +42,7 @@ case class ClassFinder[T](path:String, l
classNames = classNames ::: enum.nextElement.asInstanceOf[String] :: Nil
}
}
- classNames = classNames.removeDuplicates
+ classNames = classNames.distinct
classes :::= classNames.map { name=>
loader.loadClass(name).asInstanceOf[Class[T]]
@@ -50,7 +50,7 @@ case class ClassFinder[T](path:String, l
}
- return classes.removeDuplicates
+ return classes.distinct
}
private def loadProperties(is:InputStream):Properties = {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/TimeCounter.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/TimeCounter.scala?rev=961136&r1=961135&r2=961136&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/TimeCounter.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/TimeCounter.scala Wed Jul 7 04:09:06 2010
@@ -19,14 +19,14 @@ package org.apache.activemq.apollo.util
import java.util.concurrent.TimeUnit
/**
- * <p>A Timer collects time durations and produces a TimingMetric.</p>
+ * <p>A Timer collects time durations and produces a TimeMetric.</p>
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
class TimeCounter extends MetricProducer[TimeMetric] {
- private var maximum = Math.MIN_LONG
- private var minimum = Math.MAX_LONG
+ private var maximum = Long.MinValue
+ private var minimum = Long.MaxValue
private var total = 0L
private var count = 0
@@ -39,8 +39,8 @@ class TimeCounter extends MetricProducer
}
def clear() = {
- maximum = Math.MIN_INT
- minimum = Math.MAX_INT
+ maximum = Long.MinValue
+ minimum = Long.MaxValue
total = 0L
count = 0
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/util/IntrospectionSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/util/IntrospectionSupport.java?rev=961136&r1=961135&r2=961136&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/util/IntrospectionSupport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/util/IntrospectionSupport.java Wed Jul 7 04:09:06 2010
@@ -254,7 +254,7 @@ public final class IntrospectionSupport
}
public static String toString(Object target) {
- return toString(target, Object.class, null, null);
+ return toString(target, Object.class, null, (String[])null);
}
public static String toString(Object target, String...fields) {
@@ -262,7 +262,7 @@ public final class IntrospectionSupport
}
public static String toString(Object target, Class<?> stopClass) {
- return toString(target, stopClass, null, null);
+ return toString(target, stopClass, null, (String[])null);
}
public static String toString(Object target, Map<String, Object> overrideFields, String...fields) {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-util/src/test/scala/org/apache/activemq/apollo/CombinationTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/test/scala/org/apache/activemq/apollo/CombinationTestSupport.java?rev=961136&r1=961135&r2=961136&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/test/scala/org/apache/activemq/apollo/CombinationTestSupport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/test/scala/org/apache/activemq/apollo/CombinationTestSupport.java Wed Jul 7 04:09:06 2010
@@ -115,16 +115,16 @@ public abstract class CombinationTestSup
private CombinationTestSupport[] getCombinations() {
try {
- Method method = getClass().getMethod("initCombos", null);
- method.invoke(this, null);
+ Method method = getClass().getMethod("initCombos", (Class[])null);
+ method.invoke(this, (Object[])null);
} catch (Throwable e) {
}
String name = getName().split(" ")[0];
String comboSetupMethodName = "initCombosFor" + Character.toUpperCase(name.charAt(0)) + name.substring(1);
try {
- Method method = getClass().getMethod(comboSetupMethodName, null);
- method.invoke(this, null);
+ Method method = getClass().getMethod(comboSetupMethodName, (Class[])null);
+ method.invoke(this, (Object[])null);
} catch (Throwable e) {
}