You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:08:02 UTC

svn commit: r961133 - in /activemq/sandbox/activemq-apollo-actor: activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/ activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/ activemq-store/src/test/scala/org/apache/a...

Author: chirino
Date: Wed Jul  7 04:08:01 2010
New Revision: 961133

URL: http://svn.apache.org/viewvc?rev=961133&view=rev
Log:
adding store benchmarking tests

Added:
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStoreBenchmark.scala
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreBenchmarkSupport.scala
Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala?rev=961133&r1=961132&r2=961133&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala Wed Jul  7 04:08:01 2010
@@ -232,7 +232,11 @@ class HawtDBClient(hawtDBStore: HawtDBSt
             }
         }
     }
-    store(batch)
+    store(batch, ^{
+      txs.foreach { tx=>
+        tx.onPerformed
+      }
+    })
   }
 
   def purge() = {
@@ -535,16 +539,11 @@ class HawtDBClient(hawtDBStore: HawtDBSt
   /////////////////////////////////////////////////////////////////////
 
   private def append(data: Buffer)(cb: (Location) => Unit): Unit = {
-    benchmarkLatency { done =>
-      journal.write(data, new JournalCallback() {
-        def success(location: Location) = {
-          done("journal append")
-          cb(location)
-          done("journal append + index update")
-        }
-      })
-      done("journal enqueue")
-    }
+    journal.write(data, new JournalCallback() {
+      def success(location: Location) = {
+        cb(location)
+      }
+    })
   }
 
   /**
@@ -553,7 +552,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
     val start = System.nanoTime
     func { label=>
       var end = System.nanoTime
-      warn("latencey: %s is %,.3f ms", label, ( (end - start).toFloat / TimeUnit.SECONDS.toMillis(1)))
+      warn("latencey: %s is %,.3f ms", label, ( (end - start).toFloat / TimeUnit.MILLISECONDS.toNanos(1)))
     }
   }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala?rev=961133&r1=961132&r2=961133&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala Wed Jul  7 04:08:01 2010
@@ -424,14 +424,7 @@ class HawtDBStore extends Store with Bas
     if( !txs.isEmpty ) {
       // suspend so that we don't process more flush requests while
       // we are concurrently executing a flush
-      flush_source.suspend
-      executor_pool ^{
-        client.store(txs)
-        txs.foreach { x=>
-          x.onPerformed
-        }
-        flush_source.resume
-      }
+      client.store(txs)
     }
   }
 

Added: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStoreBenchmark.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStoreBenchmark.scala?rev=961133&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStoreBenchmark.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStoreBenchmark.scala Wed Jul  7 04:08:01 2010
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store.cassandra
+
+import org.apache.activemq.broker.store.hawtdb.HawtDBStore
+import org.apache.activemq.broker.store.{StoreBenchmarkSupport, Store}
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class HawtDBStoreBenchmark extends StoreBenchmarkSupport {
+
+  def createStore(flushDelay:Long):Store = {
+    val rc = new HawtDBStore
+    rc.config.flushDelay = flushDelay
+    rc
+  }
+
+}

Added: activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreBenchmarkSupport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreBenchmarkSupport.scala?rev=961133&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreBenchmarkSupport.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreBenchmarkSupport.scala Wed Jul  7 04:08:01 2010
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store
+
+import org.fusesource.hawtbuf.AsciiBuffer._
+import org.fusesource.hawtdispatch.ScalaDispatch._
+import org.fusesource.hawtdispatch.TaskTracker
+import org.apache.activemq.apollo.broker.{LoggingTracker, FunSuiteSupport}
+import java.util.concurrent.{TimeUnit, CountDownLatch}
+import org.apache.activemq.apollo.store.{QueueEntryRecord, QueueStatus, QueueRecord, MessageRecord}
+import org.scalatest.{BeforeAndAfterEach, BeforeAndAfterAll}
+import collection.mutable.ListBuffer
+import java.util.concurrent.atomic.{AtomicLong, AtomicInteger, AtomicBoolean}
+
+/**
+ * <p>Implements generic testing of Store implementations.</p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract class StoreBenchmarkSupport extends FunSuiteSupport with BeforeAndAfterEach {
+
+  var store:Store = null
+
+  def createStore(flushDelay:Long):Store
+
+  /**
+   * Handy helper to call an async method on the store and wait for
+   * the result of the callback.
+   */
+  def CB[T](func: (T=>Unit)=>Unit ) = {
+    class X {
+      var value:T = _
+    }
+    val rc = new X
+    val cd = new CountDownLatch(1)
+    def cb(x:T) = {
+      rc.value = x
+      cd.countDown
+    }
+    func(cb)
+    cd.await
+    rc.value
+  }
+
+
+  override protected def beforeAll() = {
+    store = createStore(5*1000)
+    val tracker = new LoggingTracker("store startup")
+    tracker.start(store)
+    tracker.await
+  }
+
+  override protected def afterAll() = {
+    val tracker = new LoggingTracker("store stop")
+    tracker.stop(store)
+    tracker.await
+  }
+
+  override protected def beforeEach() = {
+    val tracker = new LoggingTracker("store startup")
+    val task = tracker.task("purge")
+    store.purge(task.run)
+    tracker.await
+  }
+
+  def expectCB[T](expected:T)(func: (T=>Unit)=>Unit ) = {
+    expect(expected) {
+      CB(func)
+    }
+  }
+
+  def addQueue(name:String):Long = {
+    var queueA = new QueueRecord
+    queueA.name = ascii(name)
+    val rc:Option[Long] = CB( cb=> store.addQueue(queueA)(cb) )
+    expect(true)(rc.isDefined)
+    rc.get
+  }
+
+  def addMessage(batch:StoreBatch, content:String):Long = {
+    var message = new MessageRecord
+    message.protocol = ascii("test-protocol")
+    message.value = ascii(content).buffer
+    message.size = message.value.length
+    batch.store(message)
+  }
+
+
+  def entry(queueKey:Long, queueSeq:Long, messageKey:Long=0) = {
+    var queueEntry = new QueueEntryRecord
+    queueEntry.queueKey = queueKey
+    queueEntry.queueSeq = queueSeq
+    queueEntry.messageKey = messageKey
+    queueEntry
+  }
+
+
+  def payload(prefix:String, messageSize:Int) = {
+    val buffer = new StringBuffer(messageSize)
+    buffer.append(prefix);
+    for( i <- buffer.length to messageSize ) {
+      buffer.append(('a'+(i%26)).toChar)
+    }
+    var rc = buffer.toString
+    if( rc.length > messageSize ) {
+      rc.substring(0, messageSize)
+    } else {
+      rc
+    }
+  }
+
+  def populate(queueKey:Long, messages:List[String], firstSeq:Long=1) = {
+    var batch = store.createStoreBatch
+    var msgKeys = ListBuffer[Long]()
+    var nextSeq = firstSeq
+
+    messages.foreach { message=>
+      val msgKey = addMessage(batch, message)
+      msgKeys += msgKey
+      batch.enqueue(entry(queueKey, nextSeq, msgKey))
+      nextSeq += 1
+    }
+
+    val tracker = new TaskTracker()
+    tracker.release(batch)
+    msgKeys.foreach { msgKey =>
+      store.flushMessage(msgKey) {}
+    }
+    tracker.await
+    msgKeys
+  }
+
+  test("store enqueue latencey") {
+    val A = addQueue("A")
+    var seq = 0
+
+    val content = payload("message\n", 1024)
+    val metric = benchmark {
+      seq += 1
+
+      var batch = store.createStoreBatch
+      val message = addMessage(batch, content)
+      batch.enqueue(entry(A, seq, message))
+
+      val latch = new CountDownLatch(1)
+      batch.setDisposer(^{cd(latch)} )
+      batch.release
+      store.flushMessage(message) {}
+
+      latch.await
+
+    }
+    println("enqueue metrics: "+metric)
+    println("enqueue latency is: "+metric.latency(TimeUnit.MILLISECONDS)+" ms")
+    println("enqueue rate is: "+metric.rate(TimeUnit.SECONDS)+" enqueues/s")
+  }
+
+  def cd(latch:CountDownLatch) = {
+    latch.countDown
+  }
+
+
+  case class Metric(count:Long, duration:Long) {
+    def latency(unit:TimeUnit) = {
+      ((duration).toFloat / unit.toNanos(1)) / count
+    }
+    def rate(unit:TimeUnit) = {
+      (count.toFloat * unit.toNanos(1) ) / duration
+    }
+  }
+
+  def benchmark(func: =>Unit ) = {
+
+    val counter = new AtomicLong()
+    val done = new AtomicBoolean()
+    var startT = 0L
+    var endT = 0L
+    val thread = new Thread("benchmarked task") {
+      override def run = {
+        startT = System.nanoTime();
+        while(!done.get) {
+          func
+          counter.incrementAndGet
+        }
+        endT = System.nanoTime();
+      }
+    }
+
+    thread.start()
+    Thread.sleep(1000*30)
+    done.set(true)
+    thread.join
+
+    Metric(counter.get, endT-startT)
+  }
+
+
+}