You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:07:50 UTC
svn commit: r961132 - in /activemq/sandbox/activemq-apollo-actor: ./
activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/
activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/
activemq-hawtdb/src/main/scala/org/apac...
Author: chirino
Date: Wed Jul 7 04:07:49 2010
New Revision: 961132
URL: http://svn.apache.org/viewvc?rev=961132&view=rev
Log:
- update to scala 2.8.0 RC3
- cassandra store now limits concurrent cassandra requests to 20
- better logging when the store takes a long time to startup.
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
activemq/sandbox/activemq-apollo-actor/activemq-scala/pom.xml
activemq/sandbox/activemq-apollo-actor/pom.xml
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=961132&r1=961131&r2=961132&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jul 7 04:07:49 2010
@@ -52,8 +52,13 @@ object VirtualHost extends Log {
rc.id = "default"
rc.enabled = true
rc.hostNames.add("localhost")
- val store = new HawtDBStoreDTO
- store.directory = new File("activemq-data")
+
+ val store = new CassandraStoreDTO
+ store.hosts.add("localhost:9160")
+
+// val store = new HawtDBStoreDTO
+// store.directory = new File("activemq-data")
+
rc.store = store
rc
}
@@ -125,10 +130,12 @@ class VirtualHost(val broker: Broker) ex
val task = tracker.task("store startup")
store.start(^{
if( config.purgeOnStartup ) {
+ task.name = "store purge"
store.purge {
task.run
}
} else {
+ task.name = "store recover queues"
store.listQueues { queueKeys =>
for( queueKey <- queueKeys) {
val task = tracker.task("store load queue key: "+queueKey)
Modified: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala?rev=961132&r1=961131&r2=961132&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala Wed Jul 7 04:07:49 2010
@@ -22,7 +22,6 @@ import com.shorrockin.cascal.session._
import java.util.concurrent.atomic.AtomicLong
import collection.mutable.ListBuffer
import java.util.HashMap
-import java.util.concurrent.{TimeUnit, Executors, ExecutorService}
import org.apache.activemq.apollo.util.IntCounter
import org.apache.activemq.apollo.store.{QueueEntryRecord, MessageRecord, QueueStatus, QueueRecord}
import org.apache.activemq.apollo.broker.{Logging, Log, BaseService}
@@ -32,6 +31,7 @@ import org.apache.activemq.apollo.broker
import com.shorrockin.cascal.utils.Conversions._
import org.fusesource.hawtdispatch.ScalaDispatch._
import ReporterLevel._
+import java.util.concurrent.{ThreadFactory, TimeUnit, Executors, ExecutorService}
object CassandraStore extends Log {
@@ -75,8 +75,8 @@ class CassandraStore extends Store with
var next_msg_key = new AtomicLong(0)
val client = new CassandraClient()
- protected var executor_pool:ExecutorService = _
var config:CassandraStoreDTO = defaultConfig
+ var blocking:BlockingSupport = null
def configure(config: StoreDTO, reporter: Reporter) = configure(config.asInstanceOf[CassandraStoreDTO], reporter)
@@ -91,8 +91,10 @@ class CassandraStore extends Store with
}
}
+
protected def _start(onCompleted: Runnable) = {
- executor_pool = Executors.newFixedThreadPool(20)
+
+ blocking = new BlockingSupport
client.schema = Schema(config.keyspace)
// TODO: move some of this parsing code into validation too.
@@ -111,17 +113,81 @@ class CassandraStore extends Store with
}
protected def _stop(onCompleted: Runnable) = {
- new Thread() {
- override def run = {
+ blocking.setDisposer(^{
+ onCompleted
+ })
+ blocking.release
+ }
+
+
+ class BlockingSupport extends BaseRetained {
+
+ val name = "cassandra store worker"
+ var workerStackSize = 0
+ val max_workers = 20
+ var executing_workers=0
+
+ val dispatchQueue = createQueue()
+ val queued_executions = ListBuffer[()=>Unit]()
+ var executor_pool = Executors.newCachedThreadPool(new ThreadFactory {
+ def newThread(r: Runnable) = {
+ val thread = new Thread(null, r, name, workerStackSize)
+ thread.setDaemon(true)
+ thread
+ }
+ })
+
+ /**
+ * executes a blocking function in an async thread.
+ */
+ def apply( func: =>Unit ):Unit = {
+ assertRetained
+ dispatchQueue {
+ if( executing_workers >= max_workers ) {
+ queued_executions += func _
+ } else {
+ executing_workers += 1
+ execute(func _)
+ }
+ }
+ }
+
+ private def execute(func: ()=>Unit):Unit = {
+ executor_pool {
+ try {
+ func()
+ } finally {
+ execute_done
+ }
+ }
+ }
+
+ private def execute_done() = ^{
+ if ( queued_executions.isEmpty ) {
+ executing_workers -= 1
+ } else {
+ execute(queued_executions.head)
+ queued_executions.drop(1)
+ }
+ if( retained < 1 ) {
+ check_pool_disposed
+ }
+ } >>: dispatchQueue
+
+ override def dispose = ^{
+ check_pool_disposed
+ } >>: dispatchQueue
+
+ private def check_pool_disposed = {
+ if( executing_workers == 0 ) {
executor_pool.shutdown
- executor_pool.awaitTermination(1, TimeUnit.DAYS)
- executor_pool = null
- client.stop
- onCompleted.run
+ super.dispose
}
- }.start
+ }
+
}
+
/////////////////////////////////////////////////////////////////////
//
// Implementation of the BrokerDatabase interface
@@ -132,7 +198,7 @@ class CassandraStore extends Store with
* Deletes all stored data from the store.
*/
def purge(callback: =>Unit) = {
- executor_pool ^{
+ blocking {
client.purge
callback
}
@@ -141,39 +207,39 @@ class CassandraStore extends Store with
def addQueue(record: QueueRecord)(callback: (Option[Long]) => Unit) = {
val key = next_queue_key.incrementAndGet
record.key = key
- executor_pool ^{
+ blocking {
client.addQueue(record)
callback(Some(key))
}
}
def removeQueue(queueKey: Long)(callback: (Boolean) => Unit) = {
- executor_pool ^{
+ blocking {
callback(client.removeQueue(queueKey))
}
}
def getQueueStatus(id: Long)(callback: (Option[QueueStatus]) => Unit) = {
- executor_pool ^{
+ blocking {
callback( client.getQueueStatus(id) )
}
}
def listQueues(callback: (Seq[Long]) => Unit) = {
- executor_pool ^{
+ blocking {
callback( client.listQueues )
}
}
def loadMessage(id: Long)(callback: (Option[MessageRecord]) => Unit) = {
- executor_pool ^{
+ blocking {
callback( client.loadMessage(id) )
}
}
def listQueueEntries(id: Long)(callback: (Seq[QueueEntryRecord]) => Unit) = {
- executor_pool ^{
+ blocking {
callback( client.getQueueEntries(id) )
}
}
@@ -384,7 +450,7 @@ class CassandraStore extends Store with
// suspend so that we don't process more flush requests while
// we are concurrently executing a flush
flush_source.suspend
- executor_pool ^{
+ blocking {
client.store(txs)
txs.foreach { x=>
x.onPerformed
Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala?rev=961132&r1=961131&r2=961132&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala Wed Jul 7 04:07:49 2010
@@ -535,16 +535,26 @@ class HawtDBClient(hawtDBStore: HawtDBSt
/////////////////////////////////////////////////////////////////////
private def append(data: Buffer)(cb: (Location) => Unit): Unit = {
- val start = System.currentTimeMillis()
- journal.write(data, new JournalCallback() {
- def success(location: Location) = {
- var end = System.currentTimeMillis()
- warn("Journal append latencey: %,.3f seconds", ((end - start) / 1000.0f))
- cb(location)
- var end2 = System.currentTimeMillis()
- warn("Index latencey: %,.3f seconds", ((end2 - end) / 1000.0f))
- }
- })
+ benchmarkLatency { done =>
+ journal.write(data, new JournalCallback() {
+ def success(location: Location) = {
+ done("journal append")
+ cb(location)
+ done("journal append + index update")
+ }
+ })
+ done("journal enqueue")
+ }
+ }
+
+ /**
+ */
+ def benchmarkLatency[R](func: (String=>Unit)=>R ):R = {
+ val start = System.nanoTime
+ func { label=>
+ var end = System.nanoTime
+ warn("latencey: %s is %,.3f ms", label, ( (end - start).toFloat / TimeUnit.SECONDS.toMillis(1)))
+ }
}
def read(location: Location) = journal.read(location)
Modified: activemq/sandbox/activemq-apollo-actor/activemq-scala/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-scala/pom.xml?rev=961132&r1=961131&r2=961132&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-scala/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-scala/pom.xml Wed Jul 7 04:07:49 2010
@@ -74,7 +74,6 @@
</jvmArgs>
<args>
<arg>-deprecation</arg>
- <arg>-Xno-varargs-conversion</arg>
</args>
<scalaVersion>${scala-version}</scalaVersion>
</configuration>
Modified: activemq/sandbox/activemq-apollo-actor/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/pom.xml?rev=961132&r1=961131&r2=961132&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/pom.xml Wed Jul 7 04:07:49 2010
@@ -95,12 +95,12 @@
<jetty-version>6.1.22</jetty-version>
<jetty-plugin-version>7.0.1.v20091125</jetty-plugin-version>
- <scalate-version>1.1</scalate-version>
+ <scalate-version>1.2-SNAPSHOT</scalate-version>
<servlet-api-version>2.5</servlet-api-version>
<jackson-version>1.5.2</jackson-version>
- <scala-version>2.8.0.Beta1</scala-version>
- <scalatest-version>1.0.1-for-scala-2.8.0.Beta1-RC7-with-test-interfaces-0.3-SNAPSHOT</scalatest-version>
+ <scala-version>2.8.0.RC3</scala-version>
+ <scalatest-version>1.2-for-scala-2.8.0.RC3-SNAPSHOT</scalatest-version>
<maven-scala-plugin-version>2.13.1</maven-scala-plugin-version>
<maven-surefire-plugin-version>2.4.3</maven-surefire-plugin-version>