You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/08/21 23:31:13 UTC

svn commit: r1375801 - in /activemq/activemq-apollo/trunk: ./ apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/

Author: chirino
Date: Tue Aug 21 21:31:12 2012
New Revision: 1375801

URL: http://svn.apache.org/viewvc?rev=1375801&view=rev
Log:
Monitor the leveldb index performance and force compactions once it starts looking ugly.

Modified:
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/HelperTrait.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala
    activemq/activemq-apollo/trunk/pom.xml

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/HelperTrait.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/HelperTrait.scala?rev=1375801&r1=1375800&r2=1375801&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/HelperTrait.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/HelperTrait.scala Tue Aug 21 21:31:12 2012
@@ -20,6 +20,7 @@ package org.apache.activemq.apollo.broke
 import org.fusesource.hawtbuf._
 import org.iq80.leveldb._
 import java.io.DataOutput
+import org.fusesource.leveldbjni.internal.JniDB
 
 object HelperTrait {
 
@@ -127,7 +128,7 @@ object HelperTrait {
       try {
 
         val rc = Some(func(updates))
-        db.write(updates, wo)
+        might_trigger_compaction(db.write(updates, wo))
         return rc.get
       } finally {
         updates.close();
@@ -155,9 +156,38 @@ object HelperTrait {
       }
     }
 
+    def compact = {
+      compact_needed = false
+      db match {
+        case db:JniDB =>
+          db.compactRange(null, null)
+//        case db:DbImpl =>
+//          val start = new Slice(Array[Byte]('a'.toByte))
+//          val end = new Slice(Array[Byte]('z'.toByte))
+//          db.compactRange(2, start, end)
+        case _ =>
+      }
+    }
+
+    private def might_trigger_compaction[T](func: => T): T = {
+      val start = System.nanoTime()
+      try {
+        func
+      } finally {
+        val duration = System.nanoTime() - start
+        // If it takes longer than 50 ms..
+        if( duration > 1000000*50 ) {
+          compact_needed = true
+        }
+      }
+    }
+
+    @volatile
+    var compact_needed = false
+
     def cursor_keys_prefixed(prefix: Array[Byte], ro: ReadOptions = new ReadOptions)(func: Array[Byte] => Boolean): Unit = {
       val iterator = db.iterator(ro)
-      iterator.seek(prefix);
+      might_trigger_compaction(iterator.seek(prefix))
       try {
         def check(key: Array[Byte]) = {
           key.startsWith(prefix) && func(key)
@@ -172,7 +202,7 @@ object HelperTrait {
 
     def cursor_prefixed(prefix: Array[Byte], ro: ReadOptions = new ReadOptions)(func: (Array[Byte], Array[Byte]) => Boolean): Unit = {
       val iterator = db.iterator(ro)
-      iterator.seek(prefix);
+      might_trigger_compaction( iterator.seek(prefix) )
       try {
         def check(key: Array[Byte]) = {
           key.startsWith(prefix) && func(key, iterator.peekNext.getValue)
@@ -191,7 +221,7 @@ object HelperTrait {
 
     def cursor_range_keys(start_included: Array[Byte], end_excluded: Array[Byte], ro: ReadOptions = new ReadOptions)(func: Array[Byte] => Boolean): Unit = {
       val iterator = db.iterator(ro)
-      iterator.seek(start_included);
+      might_trigger_compaction(iterator.seek(start_included))
       try {
         def check(key: Array[Byte]) = {
           if (compare(key, end_excluded) < 0) {
@@ -210,7 +240,7 @@ object HelperTrait {
 
     def cursor_range(start_included: Array[Byte], end_excluded: Array[Byte], ro: ReadOptions = new ReadOptions)(func: (Array[Byte], Array[Byte]) => Boolean): Unit = {
       val iterator = db.iterator(ro)
-      iterator.seek(start_included);
+      might_trigger_compaction(iterator.seek(start_included))
       try {
         def check(key: Array[Byte]) = {
           (compare(key, end_excluded) < 0) && func(key, iterator.peekNext.getValue)
@@ -243,7 +273,7 @@ object HelperTrait {
         val iterator = db.iterator(ro)
         try {
 
-          iterator.seek(last);
+          might_trigger_compaction(iterator.seek(last))
           if (iterator.hasPrev) {
             iterator.prev()
           } else {

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala?rev=1375801&r1=1375800&r2=1375801&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala Tue Aug 21 21:31:12 2012
@@ -20,14 +20,12 @@ package org.apache.activemq.apollo.broke
 import java.{lang => jl}
 import java.{util => ju}
 
-import org.fusesource.hawtbuf.proto.PBMessageFactory
-
+import org.fusesource.hawtdispatch._
 import org.apache.activemq.apollo.broker.store._
 import java.io._
 import java.util.concurrent.TimeUnit
 import org.apache.activemq.apollo.util._
 import java.util.concurrent.locks.ReentrantReadWriteLock
-import org.fusesource.hawtdispatch._
 import org.apache.activemq.apollo.util.{TreeMap => ApolloTreeMap}
 import collection.immutable.TreeMap
 import org.fusesource.leveldbjni.internal.Util
@@ -39,8 +37,7 @@ import org.iq80.leveldb._
 import org.apache.activemq.apollo.broker.store.leveldb.RecordLog.LogInfo
 import org.apache.activemq.apollo.broker.store.PBSupport
 import java.util.concurrent.atomic.AtomicReference
-import org.apache.activemq.apollo.broker.store.leveldb.HelperTrait.encode_key
-import org.fusesource.hawtbuf.{DataByteArrayInputStream, AsciiBuffer, Buffer, AbstractVarIntSupport}
+import org.fusesource.hawtbuf.{DataByteArrayInputStream, Buffer}
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -232,7 +229,8 @@ class LevelDBClient(store: LevelDBStore)
     import OptionSupport._
     directory.mkdirs()
 
-    val factory_names = Option(config.index_factory).getOrElse("org.fusesource.leveldbjni.JniDBFactory, org.iq80.leveldb.impl.Iq80DBFactory")
+    val factory_names = Option(config.index_factory).getOrElse("org.iq80.leveldb.impl.Iq80DBFactory")
+//    val factory_names = Option(config.index_factory).getOrElse("org.fusesource.leveldbjni.JniDBFactory, org.iq80.leveldb.impl.Iq80DBFactory")
     factory = factory_names.split("""(,|\s)+""").map(_.trim()).flatMap {
       name =>
         try {
@@ -470,6 +468,13 @@ class LevelDBClient(store: LevelDBStore)
           System.out.println("Replaying recovery log: done. %d operations recovered in %s".format(replay_operations, log_replay_duration.toDouble / TimeUnit.SECONDS.toNanos(1)));
         }
 
+        // Access the last queue just to see if we need to compact the index (checks for slow access).
+        for( queue <- list_queues.lastOption ) {
+          listQueueEntryGroups(queue, 100000)
+        }
+        // delete obsolete files..
+        gc
+
       } catch {
         case e: Throwable =>
           // replay failed.. good thing we are in a retry block...
@@ -1135,7 +1140,11 @@ class LevelDBClient(store: LevelDBStore)
     // Perhaps we should snapshot_index if the current snapshot is old.
     //
     import collection.JavaConversions._
-    last_index_snapshot_pos
+
+    // Lets compact the leveldb index if it looks like we need to.
+    if( index.compact_needed ) {
+      index.compact
+    }
     val empty_journals = log.log_infos.keySet.toSet -- log_refs.keySet
 
     // We don't want to delete any journals that the index has not snapshot'ed or

Modified: activemq/activemq-apollo/trunk/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/pom.xml?rev=1375801&r1=1375800&r2=1375801&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/pom.xml (original)
+++ activemq/activemq-apollo/trunk/pom.xml Tue Aug 21 21:31:12 2012
@@ -104,7 +104,7 @@
 
     <leveldb-api-version>0.2</leveldb-api-version>
     <leveldb-version>0.2</leveldb-version>
-    <leveldbjni-version>1.2</leveldbjni-version>
+    <leveldbjni-version>99-master-SNAPSHOT</leveldbjni-version>
 
     <jasypt-version>1.6</jasypt-version>