You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/08/21 23:31:13 UTC
svn commit: r1375801 - in /activemq/activemq-apollo/trunk: ./
apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/
Author: chirino
Date: Tue Aug 21 21:31:12 2012
New Revision: 1375801
URL: http://svn.apache.org/viewvc?rev=1375801&view=rev
Log:
Monitor the leveldb index performance and force compactions once it starts looking ugly.
Modified:
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/HelperTrait.scala
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala
activemq/activemq-apollo/trunk/pom.xml
Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/HelperTrait.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/HelperTrait.scala?rev=1375801&r1=1375800&r2=1375801&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/HelperTrait.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/HelperTrait.scala Tue Aug 21 21:31:12 2012
@@ -20,6 +20,7 @@ package org.apache.activemq.apollo.broke
import org.fusesource.hawtbuf._
import org.iq80.leveldb._
import java.io.DataOutput
+import org.fusesource.leveldbjni.internal.JniDB
object HelperTrait {
@@ -127,7 +128,7 @@ object HelperTrait {
try {
val rc = Some(func(updates))
- db.write(updates, wo)
+ might_trigger_compaction(db.write(updates, wo))
return rc.get
} finally {
updates.close();
@@ -155,9 +156,38 @@ object HelperTrait {
}
}
+ def compact = {
+ compact_needed = false
+ db match {
+ case db:JniDB =>
+ db.compactRange(null, null)
+// case db:DbImpl =>
+// val start = new Slice(Array[Byte]('a'.toByte))
+// val end = new Slice(Array[Byte]('z'.toByte))
+// db.compactRange(2, start, end)
+ case _ =>
+ }
+ }
+
+ private def might_trigger_compaction[T](func: => T): T = {
+ val start = System.nanoTime()
+ try {
+ func
+ } finally {
+ val duration = System.nanoTime() - start
+ // If it takes longer than 50 ms..
+ if( duration > 1000000*50 ) {
+ compact_needed = true
+ }
+ }
+ }
+
+ @volatile
+ var compact_needed = false
+
def cursor_keys_prefixed(prefix: Array[Byte], ro: ReadOptions = new ReadOptions)(func: Array[Byte] => Boolean): Unit = {
val iterator = db.iterator(ro)
- iterator.seek(prefix);
+ might_trigger_compaction(iterator.seek(prefix))
try {
def check(key: Array[Byte]) = {
key.startsWith(prefix) && func(key)
@@ -172,7 +202,7 @@ object HelperTrait {
def cursor_prefixed(prefix: Array[Byte], ro: ReadOptions = new ReadOptions)(func: (Array[Byte], Array[Byte]) => Boolean): Unit = {
val iterator = db.iterator(ro)
- iterator.seek(prefix);
+ might_trigger_compaction( iterator.seek(prefix) )
try {
def check(key: Array[Byte]) = {
key.startsWith(prefix) && func(key, iterator.peekNext.getValue)
@@ -191,7 +221,7 @@ object HelperTrait {
def cursor_range_keys(start_included: Array[Byte], end_excluded: Array[Byte], ro: ReadOptions = new ReadOptions)(func: Array[Byte] => Boolean): Unit = {
val iterator = db.iterator(ro)
- iterator.seek(start_included);
+ might_trigger_compaction(iterator.seek(start_included))
try {
def check(key: Array[Byte]) = {
if (compare(key, end_excluded) < 0) {
@@ -210,7 +240,7 @@ object HelperTrait {
def cursor_range(start_included: Array[Byte], end_excluded: Array[Byte], ro: ReadOptions = new ReadOptions)(func: (Array[Byte], Array[Byte]) => Boolean): Unit = {
val iterator = db.iterator(ro)
- iterator.seek(start_included);
+ might_trigger_compaction(iterator.seek(start_included))
try {
def check(key: Array[Byte]) = {
(compare(key, end_excluded) < 0) && func(key, iterator.peekNext.getValue)
@@ -243,7 +273,7 @@ object HelperTrait {
val iterator = db.iterator(ro)
try {
- iterator.seek(last);
+ might_trigger_compaction(iterator.seek(last))
if (iterator.hasPrev) {
iterator.prev()
} else {
Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala?rev=1375801&r1=1375800&r2=1375801&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala Tue Aug 21 21:31:12 2012
@@ -20,14 +20,12 @@ package org.apache.activemq.apollo.broke
import java.{lang => jl}
import java.{util => ju}
-import org.fusesource.hawtbuf.proto.PBMessageFactory
-
+import org.fusesource.hawtdispatch._
import org.apache.activemq.apollo.broker.store._
import java.io._
import java.util.concurrent.TimeUnit
import org.apache.activemq.apollo.util._
import java.util.concurrent.locks.ReentrantReadWriteLock
-import org.fusesource.hawtdispatch._
import org.apache.activemq.apollo.util.{TreeMap => ApolloTreeMap}
import collection.immutable.TreeMap
import org.fusesource.leveldbjni.internal.Util
@@ -39,8 +37,7 @@ import org.iq80.leveldb._
import org.apache.activemq.apollo.broker.store.leveldb.RecordLog.LogInfo
import org.apache.activemq.apollo.broker.store.PBSupport
import java.util.concurrent.atomic.AtomicReference
-import org.apache.activemq.apollo.broker.store.leveldb.HelperTrait.encode_key
-import org.fusesource.hawtbuf.{DataByteArrayInputStream, AsciiBuffer, Buffer, AbstractVarIntSupport}
+import org.fusesource.hawtbuf.{DataByteArrayInputStream, Buffer}
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -232,7 +229,8 @@ class LevelDBClient(store: LevelDBStore)
import OptionSupport._
directory.mkdirs()
- val factory_names = Option(config.index_factory).getOrElse("org.fusesource.leveldbjni.JniDBFactory, org.iq80.leveldb.impl.Iq80DBFactory")
+ val factory_names = Option(config.index_factory).getOrElse("org.iq80.leveldb.impl.Iq80DBFactory")
+// val factory_names = Option(config.index_factory).getOrElse("org.fusesource.leveldbjni.JniDBFactory, org.iq80.leveldb.impl.Iq80DBFactory")
factory = factory_names.split("""(,|\s)+""").map(_.trim()).flatMap {
name =>
try {
@@ -470,6 +468,13 @@ class LevelDBClient(store: LevelDBStore)
System.out.println("Replaying recovery log: done. %d operations recovered in %s".format(replay_operations, log_replay_duration.toDouble / TimeUnit.SECONDS.toNanos(1)));
}
+ // Access the last queue just to see if we need to compact the index (checks for slow access).
+ for( queue <- list_queues.lastOption ) {
+ listQueueEntryGroups(queue, 100000)
+ }
+ // delete obsolete files..
+ gc
+
} catch {
case e: Throwable =>
// replay failed.. good thing we are in a retry block...
@@ -1135,7 +1140,11 @@ class LevelDBClient(store: LevelDBStore)
// Perhaps we should snapshot_index if the current snapshot is old.
//
import collection.JavaConversions._
- last_index_snapshot_pos
+
+ // Lets compact the leveldb index if it looks like we need to.
+ if( index.compact_needed ) {
+ index.compact
+ }
val empty_journals = log.log_infos.keySet.toSet -- log_refs.keySet
// We don't want to delete any journals that the index has not snapshot'ed or
Modified: activemq/activemq-apollo/trunk/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/pom.xml?rev=1375801&r1=1375800&r2=1375801&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/pom.xml (original)
+++ activemq/activemq-apollo/trunk/pom.xml Tue Aug 21 21:31:12 2012
@@ -104,7 +104,7 @@
<leveldb-api-version>0.2</leveldb-api-version>
<leveldb-version>0.2</leveldb-version>
- <leveldbjni-version>1.2</leveldbjni-version>
+ <leveldbjni-version>99-master-SNAPSHOT</leveldbjni-version>
<jasypt-version>1.6</jasypt-version>