You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:07:37 UTC

svn commit: r961131 - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/ activemq-stomp/ activemq-store/

Author: chirino
Date: Wed Jul  7 04:07:36 2010
New Revision: 961131

URL: http://svn.apache.org/viewvc?rev=961131&view=rev
Log:
starting to profile hawtdb sotre

Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/pom.xml
    activemq/sandbox/activemq-apollo-actor/activemq-store/pom.xml

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=961131&r1=961130&r2=961131&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jul  7 04:07:36 2010
@@ -30,7 +30,8 @@ import ReporterLevel._
 import org.apache.activemq.broker.store.{Store}
 import org.fusesource.hawtbuf.proto.WireFormat
 import org.apache.activemq.apollo.store.{StoreFactory, QueueRecord}
-import org.apache.activemq.apollo.dto.{CassandraStoreDTO, VirtualHostDTO}
+import org.apache.activemq.apollo.dto.{HawtDBStoreDTO, CassandraStoreDTO, VirtualHostDTO}
+import java.io.File
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -51,8 +52,8 @@ object VirtualHost extends Log {
     rc.id = "default"
     rc.enabled = true
     rc.hostNames.add("localhost")
-    val store = new CassandraStoreDTO
-    store.hosts.add("127.0.0.1:9160")
+    val store = new HawtDBStoreDTO
+    store.directory = new File("activemq-data") 
     rc.store = store
     rc
   }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala?rev=961131&r1=961130&r2=961131&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala Wed Jul  7 04:07:36 2010
@@ -382,7 +382,10 @@ class HawtDBClient(hawtDBStore: HawtDBSt
     val buffer = baos.toBuffer()
     append(buffer) {
       location =>
-        executeStore(batch, update, onComplete, location)
+        executeStore(batch, update, null, location)
+    }
+    if(onComplete!=null) {
+      onComplete.run
     }
   }
 
@@ -533,18 +536,15 @@ class HawtDBClient(hawtDBStore: HawtDBSt
 
   private def append(data: Buffer)(cb: (Location) => Unit): Unit = {
     val start = System.currentTimeMillis()
-    try {
-      journal.write(data, new JournalCallback() {
-        def success(location: Location) = {
-          cb(location)
-        }
-      })
-    } finally {
-      val end = System.currentTimeMillis()
-      if (end - start > 1000) {
+    journal.write(data, new JournalCallback() {
+      def success(location: Location) = {
+        var end = System.currentTimeMillis()
         warn("Journal append latencey: %,.3f seconds", ((end - start) / 1000.0f))
+        cb(location)
+        var end2 = System.currentTimeMillis()
+        warn("Index latencey: %,.3f seconds", ((end2 - end) / 1000.0f))
       }
-    }
+    })
   }
 
   def read(location: Location) = journal.read(location)

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala?rev=961131&r1=961130&r2=961131&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala Wed Jul  7 04:07:36 2010
@@ -93,6 +93,7 @@ class HawtDBStore extends Store with Bas
   protected def _start(onCompleted: Runnable) = {
     executor_pool = Executors.newFixedThreadPool(20)
     client.config = config
+    schedualDisplayStats
     executor_pool {
       client.start(^{
         next_msg_key.set( client.databaseRootRecord.getLastMessageKey.longValue +1 )
@@ -197,6 +198,8 @@ class HawtDBStore extends Store with Bas
   /////////////////////////////////////////////////////////////////////
   class HawtDBBatch extends BaseRetained with StoreBatch {
 
+    var dispose_start:Long = 0
+
     class MessageAction {
 
       var msg= 0L
@@ -262,14 +265,33 @@ class HawtDBStore extends Store with Bas
     }
 
     override def dispose = {
+      dispose_start = System.nanoTime
       transaction_source.merge(this)
     }
 
     def onPerformed() {
+      metric_commit_counter += 1
+      val t = TimeUnit.NANOSECONDS.toMillis(System.nanoTime-dispose_start)
+      if( t < 0 ) {
+        println("wtf")
+      }
+      metric_commit_latency_counter += t
       super.dispose
     }
   }
 
+  var metric_canceled_message_counter:Long = 0
+  var metric_canceled_enqueue_counter:Long = 0
+  var metric_flushed_message_counter:Long = 0
+  var metric_flushed_enqueue_counter:Long = 0
+  var metric_commit_counter:Long = 0
+  var metric_commit_latency_counter:Long = 0
+
+
+  var canceled_add_message:Long = 0
+  var canceled_enqueue:Long = 0
+
+
   def key(x:QueueEntryRecord) = (x.queueKey, x.queueSeq)
 
   val transaction_source = createSource(new ListEventAggregator[HawtDBBatch](), dispatchQueue)
@@ -281,7 +303,29 @@ class HawtDBStore extends Store with Bas
   var delayedTransactions = new HashMap[Int, HawtDBBatch]()
 
   var next_tx_id = new IntCounter
-  
+
+
+  def schedualDisplayStats:Unit = {
+    val st = System.nanoTime
+    val ss = (metric_canceled_message_counter, metric_canceled_enqueue_counter, metric_flushed_message_counter, metric_flushed_enqueue_counter, metric_commit_counter, metric_commit_latency_counter)
+    def displayStats = {
+      if( serviceState.isStarted ) {
+        val et = System.nanoTime
+        val es = (metric_canceled_message_counter, metric_canceled_enqueue_counter, metric_flushed_message_counter, metric_flushed_enqueue_counter, metric_commit_counter, metric_commit_latency_counter)
+
+        val commits = es._5-ss._5
+        var avgCommitLatency = if (commits!=0) (es._6 - ss._6).toFloat / commits else 0f
+
+        def rate(x:Long, y:Long):Float = ((y-x)*1000.0f)/TimeUnit.NANOSECONDS.toMillis(et-st)
+
+        info("metrics: cancled: { messages: %,.3f, enqeues: %,.3f }, flushed: { messages: %,.3f, enqeues: %,.3f }, commit latency: %,.3f",
+          rate(ss._1,es._1), rate(ss._2,es._2), rate(ss._3,es._3), rate(ss._4,es._4), avgCommitLatency)
+        schedualDisplayStats
+      }
+    }
+    dispatchQueue.dispatchAfter(5, TimeUnit.SECONDS, ^{ displayStats })
+  }
+
   def drain_transactions = {
     transaction_source.getData.foreach { tx =>
 
@@ -310,6 +354,8 @@ class HawtDBStore extends Store with Bas
           val prevAction:HawtDBBatch#MessageAction = pendingEnqueues.remove(currentKey)
           if( prevAction!=null ) {
 
+            metric_canceled_enqueue_counter += 1
+
             // yay we can cancel out a previous enqueue
             prevAction.enqueues = prevAction.enqueues.filterNot( x=> key(x) == currentKey )
 
@@ -317,6 +363,7 @@ class HawtDBStore extends Store with Bas
             if( prevAction.enqueues == Nil && prevAction.messageRecord !=null ) {
               pendingStores.remove(msg)
               prevAction.messageRecord = null
+              metric_canceled_message_counter += 1
             }
 
             // Cancel the action if it's now empty
@@ -358,9 +405,11 @@ class HawtDBStore extends Store with Bas
 
         tx.actions.foreach { case (msg, action) =>
           if( action.messageRecord !=null ) {
+            metric_flushed_message_counter += 1
             pendingStores.remove(msg)
           }
           action.enqueues.foreach { queueEntry=>
+            metric_flushed_enqueue_counter += 1
             val k = key(queueEntry)
             pendingEnqueues.remove(k)
           }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/pom.xml?rev=961131&r1=961130&r2=961131&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/pom.xml Wed Jul  7 04:07:36 2010
@@ -43,10 +43,18 @@
       <artifactId>activemq-tcp</artifactId>
       <version>6.0-SNAPSHOT</version>
     </dependency>
+
     <dependency>
       <groupId>org.apache.activemq</groupId>
       <artifactId>activemq-cassandra</artifactId>
       <version>6.0-SNAPSHOT</version>
+      <optional>true</optional>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-hawtdb</artifactId>
+      <version>6.0-SNAPSHOT</version>
+      <optional>true</optional>
     </dependency>
 
     <!-- Scala Support -->

Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/pom.xml?rev=961131&r1=961130&r2=961131&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/pom.xml Wed Jul  7 04:07:36 2010
@@ -71,6 +71,14 @@
 
     <!-- Testing Dependencies -->    
     <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-util</artifactId>
+      <version>6.0-SNAPSHOT</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
       <groupId>org.scalatest</groupId>
       <artifactId>scalatest</artifactId>
       <version>${scalatest-version}</version>