You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:07:37 UTC
svn commit: r961131 - in /activemq/sandbox/activemq-apollo-actor:
activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/
activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/
activemq-stomp/ activemq-store/
Author: chirino
Date: Wed Jul 7 04:07:36 2010
New Revision: 961131
URL: http://svn.apache.org/viewvc?rev=961131&view=rev
Log:
starting to profile hawtdb sotre
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/pom.xml
activemq/sandbox/activemq-apollo-actor/activemq-store/pom.xml
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=961131&r1=961130&r2=961131&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jul 7 04:07:36 2010
@@ -30,7 +30,8 @@ import ReporterLevel._
import org.apache.activemq.broker.store.{Store}
import org.fusesource.hawtbuf.proto.WireFormat
import org.apache.activemq.apollo.store.{StoreFactory, QueueRecord}
-import org.apache.activemq.apollo.dto.{CassandraStoreDTO, VirtualHostDTO}
+import org.apache.activemq.apollo.dto.{HawtDBStoreDTO, CassandraStoreDTO, VirtualHostDTO}
+import java.io.File
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -51,8 +52,8 @@ object VirtualHost extends Log {
rc.id = "default"
rc.enabled = true
rc.hostNames.add("localhost")
- val store = new CassandraStoreDTO
- store.hosts.add("127.0.0.1:9160")
+ val store = new HawtDBStoreDTO
+ store.directory = new File("activemq-data")
rc.store = store
rc
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala?rev=961131&r1=961130&r2=961131&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala Wed Jul 7 04:07:36 2010
@@ -382,7 +382,10 @@ class HawtDBClient(hawtDBStore: HawtDBSt
val buffer = baos.toBuffer()
append(buffer) {
location =>
- executeStore(batch, update, onComplete, location)
+ executeStore(batch, update, null, location)
+ }
+ if(onComplete!=null) {
+ onComplete.run
}
}
@@ -533,18 +536,15 @@ class HawtDBClient(hawtDBStore: HawtDBSt
private def append(data: Buffer)(cb: (Location) => Unit): Unit = {
val start = System.currentTimeMillis()
- try {
- journal.write(data, new JournalCallback() {
- def success(location: Location) = {
- cb(location)
- }
- })
- } finally {
- val end = System.currentTimeMillis()
- if (end - start > 1000) {
+ journal.write(data, new JournalCallback() {
+ def success(location: Location) = {
+ var end = System.currentTimeMillis()
warn("Journal append latencey: %,.3f seconds", ((end - start) / 1000.0f))
+ cb(location)
+ var end2 = System.currentTimeMillis()
+ warn("Index latencey: %,.3f seconds", ((end2 - end) / 1000.0f))
}
- }
+ })
}
def read(location: Location) = journal.read(location)
Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala?rev=961131&r1=961130&r2=961131&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala Wed Jul 7 04:07:36 2010
@@ -93,6 +93,7 @@ class HawtDBStore extends Store with Bas
protected def _start(onCompleted: Runnable) = {
executor_pool = Executors.newFixedThreadPool(20)
client.config = config
+ schedualDisplayStats
executor_pool {
client.start(^{
next_msg_key.set( client.databaseRootRecord.getLastMessageKey.longValue +1 )
@@ -197,6 +198,8 @@ class HawtDBStore extends Store with Bas
/////////////////////////////////////////////////////////////////////
class HawtDBBatch extends BaseRetained with StoreBatch {
+ var dispose_start:Long = 0
+
class MessageAction {
var msg= 0L
@@ -262,14 +265,33 @@ class HawtDBStore extends Store with Bas
}
override def dispose = {
+ dispose_start = System.nanoTime
transaction_source.merge(this)
}
def onPerformed() {
+ metric_commit_counter += 1
+ val t = TimeUnit.NANOSECONDS.toMillis(System.nanoTime-dispose_start)
+ if( t < 0 ) {
+ println("wtf")
+ }
+ metric_commit_latency_counter += t
super.dispose
}
}
+ var metric_canceled_message_counter:Long = 0
+ var metric_canceled_enqueue_counter:Long = 0
+ var metric_flushed_message_counter:Long = 0
+ var metric_flushed_enqueue_counter:Long = 0
+ var metric_commit_counter:Long = 0
+ var metric_commit_latency_counter:Long = 0
+
+
+ var canceled_add_message:Long = 0
+ var canceled_enqueue:Long = 0
+
+
def key(x:QueueEntryRecord) = (x.queueKey, x.queueSeq)
val transaction_source = createSource(new ListEventAggregator[HawtDBBatch](), dispatchQueue)
@@ -281,7 +303,29 @@ class HawtDBStore extends Store with Bas
var delayedTransactions = new HashMap[Int, HawtDBBatch]()
var next_tx_id = new IntCounter
-
+
+
+ def schedualDisplayStats:Unit = {
+ val st = System.nanoTime
+ val ss = (metric_canceled_message_counter, metric_canceled_enqueue_counter, metric_flushed_message_counter, metric_flushed_enqueue_counter, metric_commit_counter, metric_commit_latency_counter)
+ def displayStats = {
+ if( serviceState.isStarted ) {
+ val et = System.nanoTime
+ val es = (metric_canceled_message_counter, metric_canceled_enqueue_counter, metric_flushed_message_counter, metric_flushed_enqueue_counter, metric_commit_counter, metric_commit_latency_counter)
+
+ val commits = es._5-ss._5
+ var avgCommitLatency = if (commits!=0) (es._6 - ss._6).toFloat / commits else 0f
+
+ def rate(x:Long, y:Long):Float = ((y-x)*1000.0f)/TimeUnit.NANOSECONDS.toMillis(et-st)
+
+ info("metrics: cancled: { messages: %,.3f, enqeues: %,.3f }, flushed: { messages: %,.3f, enqeues: %,.3f }, commit latency: %,.3f",
+ rate(ss._1,es._1), rate(ss._2,es._2), rate(ss._3,es._3), rate(ss._4,es._4), avgCommitLatency)
+ schedualDisplayStats
+ }
+ }
+ dispatchQueue.dispatchAfter(5, TimeUnit.SECONDS, ^{ displayStats })
+ }
+
def drain_transactions = {
transaction_source.getData.foreach { tx =>
@@ -310,6 +354,8 @@ class HawtDBStore extends Store with Bas
val prevAction:HawtDBBatch#MessageAction = pendingEnqueues.remove(currentKey)
if( prevAction!=null ) {
+ metric_canceled_enqueue_counter += 1
+
// yay we can cancel out a previous enqueue
prevAction.enqueues = prevAction.enqueues.filterNot( x=> key(x) == currentKey )
@@ -317,6 +363,7 @@ class HawtDBStore extends Store with Bas
if( prevAction.enqueues == Nil && prevAction.messageRecord !=null ) {
pendingStores.remove(msg)
prevAction.messageRecord = null
+ metric_canceled_message_counter += 1
}
// Cancel the action if it's now empty
@@ -358,9 +405,11 @@ class HawtDBStore extends Store with Bas
tx.actions.foreach { case (msg, action) =>
if( action.messageRecord !=null ) {
+ metric_flushed_message_counter += 1
pendingStores.remove(msg)
}
action.enqueues.foreach { queueEntry=>
+ metric_flushed_enqueue_counter += 1
val k = key(queueEntry)
pendingEnqueues.remove(k)
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/pom.xml?rev=961131&r1=961130&r2=961131&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/pom.xml Wed Jul 7 04:07:36 2010
@@ -43,10 +43,18 @@
<artifactId>activemq-tcp</artifactId>
<version>6.0-SNAPSHOT</version>
</dependency>
+
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-cassandra</artifactId>
<version>6.0-SNAPSHOT</version>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-hawtdb</artifactId>
+ <version>6.0-SNAPSHOT</version>
+ <optional>true</optional>
</dependency>
<!-- Scala Support -->
Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/pom.xml?rev=961131&r1=961130&r2=961131&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/pom.xml Wed Jul 7 04:07:36 2010
@@ -71,6 +71,14 @@
<!-- Testing Dependencies -->
<dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-util</artifactId>
+ <version>6.0-SNAPSHOT</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest</artifactId>
<version>${scalatest-version}</version>