You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:07:50 UTC

svn commit: r961132 - in /activemq/sandbox/activemq-apollo-actor: ./ activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/ activemq-hawtdb/src/main/scala/org/apac...

Author: chirino
Date: Wed Jul  7 04:07:49 2010
New Revision: 961132

URL: http://svn.apache.org/viewvc?rev=961132&view=rev
Log:
- update to scala 2.8.0 RC3
- cassandra store now limits concurrent cassandra requests to 20
- better logging when the store takes a long time to startup.

Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
    activemq/sandbox/activemq-apollo-actor/activemq-scala/pom.xml
    activemq/sandbox/activemq-apollo-actor/pom.xml

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=961132&r1=961131&r2=961132&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jul  7 04:07:49 2010
@@ -52,8 +52,13 @@ object VirtualHost extends Log {
     rc.id = "default"
     rc.enabled = true
     rc.hostNames.add("localhost")
-    val store = new HawtDBStoreDTO
-    store.directory = new File("activemq-data") 
+
+    val store = new CassandraStoreDTO
+    store.hosts.add("localhost:9160")
+
+//    val store = new HawtDBStoreDTO
+//    store.directory = new File("activemq-data")
+    
     rc.store = store
     rc
   }
@@ -125,10 +130,12 @@ class VirtualHost(val broker: Broker) ex
       val task = tracker.task("store startup")
       store.start(^{
         if( config.purgeOnStartup ) {
+          task.name = "store purge"
           store.purge {
             task.run
           }
         } else {
+          task.name = "store recover queues"
           store.listQueues { queueKeys =>
             for( queueKey <- queueKeys) {
               val task = tracker.task("store load queue key: "+queueKey)

Modified: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala?rev=961132&r1=961131&r2=961132&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala Wed Jul  7 04:07:49 2010
@@ -22,7 +22,6 @@ import com.shorrockin.cascal.session._
 import java.util.concurrent.atomic.AtomicLong
 import collection.mutable.ListBuffer
 import java.util.HashMap
-import java.util.concurrent.{TimeUnit, Executors, ExecutorService}
 import org.apache.activemq.apollo.util.IntCounter
 import org.apache.activemq.apollo.store.{QueueEntryRecord, MessageRecord, QueueStatus, QueueRecord}
 import org.apache.activemq.apollo.broker.{Logging, Log, BaseService}
@@ -32,6 +31,7 @@ import org.apache.activemq.apollo.broker
 import com.shorrockin.cascal.utils.Conversions._
 import org.fusesource.hawtdispatch.ScalaDispatch._
 import ReporterLevel._
+import java.util.concurrent.{ThreadFactory, TimeUnit, Executors, ExecutorService}
 
 object CassandraStore extends Log {
 
@@ -75,8 +75,8 @@ class CassandraStore extends Store with 
   var next_msg_key = new AtomicLong(0)
 
   val client = new CassandraClient()
-  protected var executor_pool:ExecutorService = _
   var config:CassandraStoreDTO = defaultConfig
+  var blocking:BlockingSupport = null
 
   def configure(config: StoreDTO, reporter: Reporter) = configure(config.asInstanceOf[CassandraStoreDTO], reporter)
 
@@ -91,8 +91,10 @@ class CassandraStore extends Store with 
     }
   }
 
+
   protected def _start(onCompleted: Runnable) = {
-    executor_pool = Executors.newFixedThreadPool(20)
+
+    blocking = new BlockingSupport
     client.schema = Schema(config.keyspace)
 
     // TODO: move some of this parsing code into validation too.
@@ -111,17 +113,81 @@ class CassandraStore extends Store with 
   }
 
   protected def _stop(onCompleted: Runnable) = {
-    new Thread() {
-      override def run = {
+    blocking.setDisposer(^{
+      onCompleted
+    })
+    blocking.release
+  }
+
+
+  class BlockingSupport extends BaseRetained {
+
+    val name = "cassandra store worker"
+    var workerStackSize = 0
+    val max_workers = 20
+    var executing_workers=0
+
+    val dispatchQueue = createQueue()
+    val queued_executions = ListBuffer[()=>Unit]()
+    var executor_pool = Executors.newCachedThreadPool(new ThreadFactory {
+      def newThread(r: Runnable) = {
+        val thread = new Thread(null, r, name, workerStackSize)
+        thread.setDaemon(true)
+        thread
+      }
+    })
+
+    /**
+     * executes a blocking function in an async thread.
+     */
+    def apply( func: =>Unit ):Unit = {
+      assertRetained
+      dispatchQueue {
+        if( executing_workers >= max_workers ) {
+          queued_executions += func _
+        } else {
+          executing_workers += 1
+          execute(func _)
+        }
+      }
+    }
+
+    private def execute(func: ()=>Unit):Unit = {
+      executor_pool {
+        try {
+          func()
+        } finally {
+          execute_done
+        }
+      }
+    }
+
+    private def execute_done() = ^{
+      if ( queued_executions.isEmpty ) {
+        executing_workers -= 1
+      } else {
+        execute(queued_executions.head)
+        queued_executions.drop(1)
+      }
+      if( retained < 1 ) {
+        check_pool_disposed
+      }
+    } >>: dispatchQueue
+
+    override def dispose = ^{
+      check_pool_disposed
+    } >>: dispatchQueue
+
+    private def check_pool_disposed = {
+      if( executing_workers == 0 ) {
         executor_pool.shutdown
-        executor_pool.awaitTermination(1, TimeUnit.DAYS)
-        executor_pool = null
-        client.stop
-        onCompleted.run
+        super.dispose
       }
-    }.start
+    }
+
   }
 
+
   /////////////////////////////////////////////////////////////////////
   //
   // Implementation of the BrokerDatabase interface
@@ -132,7 +198,7 @@ class CassandraStore extends Store with 
    * Deletes all stored data from the store.
    */
   def purge(callback: =>Unit) = {
-    executor_pool ^{
+    blocking {
       client.purge
       callback
     }
@@ -141,39 +207,39 @@ class CassandraStore extends Store with 
   def addQueue(record: QueueRecord)(callback: (Option[Long]) => Unit) = {
     val key = next_queue_key.incrementAndGet
     record.key = key
-    executor_pool ^{
+    blocking {
       client.addQueue(record)
       callback(Some(key))
     }
   }
 
   def removeQueue(queueKey: Long)(callback: (Boolean) => Unit) = {
-    executor_pool ^{
+    blocking {
       callback(client.removeQueue(queueKey))
     }
   }
 
   def getQueueStatus(id: Long)(callback: (Option[QueueStatus]) => Unit) = {
-    executor_pool ^{
+    blocking {
       callback( client.getQueueStatus(id) )
     }
   }
 
   def listQueues(callback: (Seq[Long]) => Unit) = {
-    executor_pool ^{
+    blocking {
       callback( client.listQueues )
     }
   }
 
   def loadMessage(id: Long)(callback: (Option[MessageRecord]) => Unit) = {
-    executor_pool ^{
+    blocking {
       callback( client.loadMessage(id) )
     }
   }
 
 
   def listQueueEntries(id: Long)(callback: (Seq[QueueEntryRecord]) => Unit) = {
-    executor_pool ^{
+    blocking {
       callback( client.getQueueEntries(id) )
     }
   }
@@ -384,7 +450,7 @@ class CassandraStore extends Store with 
       // suspend so that we don't process more flush requests while
       // we are concurrently executing a flush
       flush_source.suspend
-      executor_pool ^{
+      blocking {
         client.store(txs)
         txs.foreach { x=>
           x.onPerformed

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala?rev=961132&r1=961131&r2=961132&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala Wed Jul  7 04:07:49 2010
@@ -535,16 +535,26 @@ class HawtDBClient(hawtDBStore: HawtDBSt
   /////////////////////////////////////////////////////////////////////
 
   private def append(data: Buffer)(cb: (Location) => Unit): Unit = {
-    val start = System.currentTimeMillis()
-    journal.write(data, new JournalCallback() {
-      def success(location: Location) = {
-        var end = System.currentTimeMillis()
-        warn("Journal append latencey: %,.3f seconds", ((end - start) / 1000.0f))
-        cb(location)
-        var end2 = System.currentTimeMillis()
-        warn("Index latencey: %,.3f seconds", ((end2 - end) / 1000.0f))
-      }
-    })
+    benchmarkLatency { done =>
+      journal.write(data, new JournalCallback() {
+        def success(location: Location) = {
+          done("journal append")
+          cb(location)
+          done("journal append + index update")
+        }
+      })
+      done("journal enqueue")
+    }
+  }
+
+  /**
+   */
+  def benchmarkLatency[R](func: (String=>Unit)=>R ):R = {
+    val start = System.nanoTime
+    func { label=>
+      var end = System.nanoTime
+      warn("latencey: %s is %,.3f ms", label, ( (end - start).toFloat / TimeUnit.SECONDS.toMillis(1)))
+    }
   }
 
   def read(location: Location) = journal.read(location)

Modified: activemq/sandbox/activemq-apollo-actor/activemq-scala/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-scala/pom.xml?rev=961132&r1=961131&r2=961132&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-scala/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-scala/pom.xml Wed Jul  7 04:07:49 2010
@@ -74,7 +74,6 @@
           </jvmArgs>
           <args>
             <arg>-deprecation</arg>
-            <arg>-Xno-varargs-conversion</arg>
           </args>
           <scalaVersion>${scala-version}</scalaVersion>
         </configuration>

Modified: activemq/sandbox/activemq-apollo-actor/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/pom.xml?rev=961132&r1=961131&r2=961132&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/pom.xml Wed Jul  7 04:07:49 2010
@@ -95,12 +95,12 @@
     
     <jetty-version>6.1.22</jetty-version>
     <jetty-plugin-version>7.0.1.v20091125</jetty-plugin-version>
-    <scalate-version>1.1</scalate-version>
+    <scalate-version>1.2-SNAPSHOT</scalate-version>
     <servlet-api-version>2.5</servlet-api-version>
     <jackson-version>1.5.2</jackson-version>
     
-    <scala-version>2.8.0.Beta1</scala-version>
-    <scalatest-version>1.0.1-for-scala-2.8.0.Beta1-RC7-with-test-interfaces-0.3-SNAPSHOT</scalatest-version>
+    <scala-version>2.8.0.RC3</scala-version>
+    <scalatest-version>1.2-for-scala-2.8.0.RC3-SNAPSHOT</scalatest-version>
     <maven-scala-plugin-version>2.13.1</maven-scala-plugin-version>
     <maven-surefire-plugin-version>2.4.3</maven-surefire-plugin-version>