You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/09/25 16:32:31 UTC

svn commit: r1389882 [6/7] - in /activemq/trunk: ./ activemq-core/ activemq-core/src/main/java/org/apache/activemq/store/leveldb/ activemq-core/src/main/resources/ activemq-core/src/main/resources/META-INF/ activemq-core/src/main/resources/META-INF/ser...

Added: activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala?rev=1389882&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala (added)
+++ activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala Tue Sep 25 14:32:28 2012
@@ -0,0 +1,1218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.leveldb
+
+import java.{lang=>jl}
+import java.{util=>ju}
+
+import java.util.concurrent.locks.ReentrantReadWriteLock
+import collection.immutable.TreeMap
+import collection.mutable.{HashMap, ListBuffer}
+import org.iq80.leveldb._
+
+import org.fusesource.hawtdispatch._
+import record.{CollectionKey, EntryKey, EntryRecord, CollectionRecord}
+import util._
+import java.util.concurrent._
+import org.fusesource.hawtbuf._
+import java.io.{ObjectInputStream, ObjectOutputStream, File}
+import scala.Option._
+import org.apache.activemq.command.Message
+import org.apache.activemq.util.ByteSequence
+import org.apache.activemq.leveldb.RecordLog.LogInfo
+import java.text.SimpleDateFormat
+import java.util.{Date, Collections}
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object LevelDBClient extends Log {
+
+  final val STORE_SCHEMA_PREFIX = "activemq_leveldb_store:"
+  final val STORE_SCHEMA_VERSION = 1
+
+  final val THREAD_POOL_STACK_SIZE = System.getProperty("leveldb.thread.stack.size", "" + 1024 * 512).toLong
+  final val THREAD_POOL: ThreadPoolExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue[Runnable], new ThreadFactory {
+    def newThread(r: Runnable): Thread = {
+      var rc: Thread = new Thread(null, r, "LevelDB Store Task", THREAD_POOL_STACK_SIZE)
+      rc.setDaemon(true)
+      return rc
+    }
+  }) {
+    override def shutdown: Unit = {}
+    override def shutdownNow = Collections.emptyList[Runnable]
+  }
+
+  final val DIRTY_INDEX_KEY = bytes(":dirty")
+  final val LOG_REF_INDEX_KEY = bytes(":log-refs")
+  final val COLLECTION_META_KEY = bytes(":collection-meta")
+  final val TRUE = bytes("true")
+  final val FALSE = bytes("false")
+  final val ACK_POSITION = new AsciiBuffer("p")
+
+  final val COLLECTION_PREFIX = 'c'.toByte
+  final val COLLECTION_PREFIX_ARRAY = Array(COLLECTION_PREFIX)
+  final val ENTRY_PREFIX = 'e'.toByte
+  final val ENTRY_PREFIX_ARRAY = Array(ENTRY_PREFIX)
+
+  final val LOG_ADD_COLLECTION      = 1.toByte
+  final val LOG_REMOVE_COLLECTION   = 2.toByte
+  final val LOG_ADD_ENTRY           = 3.toByte
+  final val LOG_REMOVE_ENTRY        = 4.toByte
+  final val LOG_DATA                = 5.toByte
+  final val LOG_TRACE               = 6.toByte
+
+  final val LOG_SUFFIX  = ".log"
+  final val INDEX_SUFFIX  = ".index"
+  
+  implicit def toByteArray(buffer:Buffer) = buffer.toByteArray
+  implicit def toBuffer(buffer:Array[Byte]) = new Buffer(buffer)
+  
+  def encodeCollectionRecord(v: CollectionRecord.Buffer) = v.toUnframedByteArray
+  def decodeCollectionRecord(data: Buffer):CollectionRecord.Buffer = CollectionRecord.FACTORY.parseUnframed(data)
+  def encodeCollectionKeyRecord(v: CollectionKey.Buffer) = v.toUnframedByteArray
+  def decodeCollectionKeyRecord(data: Buffer):CollectionKey.Buffer = CollectionKey.FACTORY.parseUnframed(data)
+
+  def encodeEntryRecord(v: EntryRecord.Buffer) = v.toUnframedBuffer
+  def decodeEntryRecord(data: Buffer):EntryRecord.Buffer = EntryRecord.FACTORY.parseUnframed(data)
+
+  def encodeEntryKeyRecord(v: EntryKey.Buffer) = v.toUnframedByteArray
+  def decodeEntryKeyRecord(data: Buffer):EntryKey.Buffer = EntryKey.FACTORY.parseUnframed(data)
+
+  def encodeLocator(pos:Long, len:Int):Array[Byte] = {
+    val out = new DataByteArrayOutputStream(
+      AbstractVarIntSupport.computeVarLongSize(pos)+
+      AbstractVarIntSupport.computeVarIntSize(len)
+    )
+    out.writeVarLong(pos)
+    out.writeVarInt(len)
+    out.getData
+  }
+  def decodeLocator(bytes:Buffer):(Long,  Int) = {
+    val in = new DataByteArrayInputStream(bytes)
+    (in.readVarLong(), in.readVarInt())
+  }
+  def decodeLocator(bytes:Array[Byte]):(Long,  Int) = {
+    val in = new DataByteArrayInputStream(bytes)
+    (in.readVarLong(), in.readVarInt())
+  }
+
+  def encodeLong(a1:Long) = {
+    val out = new DataByteArrayOutputStream(8)
+    out.writeLong(a1)
+    out.toBuffer
+  }
+
+  def encodeVLong(a1:Long):Array[Byte] = {
+    val out = new DataByteArrayOutputStream(
+      AbstractVarIntSupport.computeVarLongSize(a1)
+    )
+    out.writeVarLong(a1)
+    out.getData
+  }
+
+  def decodeVLong(bytes:Array[Byte]):Long = {
+    val in = new DataByteArrayInputStream(bytes)
+    in.readVarLong()
+  }
+
+  def encodeLongKey(a1:Byte, a2:Long):Array[Byte] = {
+    val out = new DataByteArrayOutputStream(9)
+    out.writeByte(a1.toInt)
+    out.writeLong(a2)
+    out.getData
+  }
+  def decodeLongKey(bytes:Array[Byte]):(Byte, Long) = {
+    val in = new DataByteArrayInputStream(bytes)
+    (in.readByte(), in.readLong())
+  }
+
+  def decodeLong(bytes:Buffer):Long = {
+    val in = new DataByteArrayInputStream(bytes)
+    in.readLong()
+  }
+  def decodeLong(bytes:Array[Byte]):Long = {
+    val in = new DataByteArrayInputStream(bytes)
+    in.readLong()
+  }
+
+  def encodeEntryKey(a1:Byte, a2:Long, a3:Long):Array[Byte] = {
+    val out = new DataByteArrayOutputStream(17)
+    out.writeByte(a1.toInt)
+    out.writeLong(a2)
+    out.writeLong(a3)
+    out.getData
+  }
+
+  def encodeEntryKey(a1:Byte, a2:Long, a3:Buffer):Array[Byte] = {
+    val out = new DataByteArrayOutputStream(9+a3.length)
+    out.writeByte(a1.toInt)
+    out.writeLong(a2)
+    out.write(a3)
+    out.getData
+  }
+  
+  def decodeEntryKey(bytes:Array[Byte]):(Byte, Long, Buffer) = {
+    val in = new DataByteArrayInputStream(bytes)
+    (in.readByte(), in.readLong(), in.readBuffer(in.available()))
+  }
+
+  final class RichDB(val db: DB) {
+
+    val isPureJavaVersion = db.getClass.getName == "org.iq80.leveldb.impl.DbImpl"
+
+    def getProperty(name:String) = db.getProperty(name)
+
+    def getApproximateSizes(ranges:Range*) = db.getApproximateSizes(ranges:_*)
+
+    def get(key:Array[Byte], ro:ReadOptions=new ReadOptions):Option[Array[Byte]] = {
+      Option(db.get(key, ro))
+    }
+
+    def close:Unit = db.close()
+
+    def delete(key:Array[Byte], wo:WriteOptions=new WriteOptions):Unit = {
+      db.delete(key, wo)
+    }
+
+    def put(key:Array[Byte], value:Array[Byte], wo:WriteOptions=new WriteOptions):Unit = {
+      db.put(key, value, wo)
+    }
+    
+    def write[T](wo:WriteOptions=new WriteOptions, max_write_latency:TimeMetric = TimeMetric())(func: WriteBatch=>T):T = {
+      val updates = db.createWriteBatch()
+      try {
+        val rc=Some(func(updates))
+        max_write_latency {
+          db.write(updates, wo)
+        }
+        return rc.get
+      } finally {
+        updates.close();
+      }
+    }
+
+    def store[T](write:WriteBatch, wo:WriteOptions=new WriteOptions) = {
+      db.write(write, wo)
+    }
+
+    def snapshot[T](func: Snapshot=>T):T = {
+      val snapshot = db.getSnapshot
+      try {
+        func(snapshot)
+      } finally {
+        snapshot.close()
+      }
+    }
+
+    def cursorKeys(ro:ReadOptions=new ReadOptions)(func: Array[Byte] => Boolean): Unit = {
+      val iterator = db.iterator(ro)
+      iterator.seekToFirst();
+      try {
+        while( iterator.hasNext && func(iterator.peekNext.getKey) ) {
+          iterator.next()
+        }
+      } finally {
+        iterator.close();
+      }
+    }
+
+    def cursorKeysPrefixed(prefix:Array[Byte], ro:ReadOptions=new ReadOptions)(func: Array[Byte] => Boolean): Unit = {
+      val iterator = db.iterator(ro)
+      iterator.seek(prefix);
+      try {
+        def check(key:Buffer) = {
+          key.startsWith(prefix) && func(key)
+        }
+        while( iterator.hasNext && check(iterator.peekNext.getKey) ) {
+          iterator.next()
+        }
+      } finally {
+        iterator.close();
+      }
+    }
+
+    def cursorPrefixed(prefix:Array[Byte], ro:ReadOptions=new ReadOptions)(func: (Array[Byte],Array[Byte]) => Boolean): Unit = {
+      val iterator = db.iterator(ro)
+      iterator.seek(prefix);
+      try {
+        def check(key:Buffer) = {
+          key.startsWith(prefix) && func(key, iterator.peekNext.getValue)
+        }
+        while( iterator.hasNext && check(iterator.peekNext.getKey) ) {
+          iterator.next()
+        }
+      } finally {
+        iterator.close();
+      }
+    }
+
+    def compare(a1:Array[Byte], a2:Array[Byte]):Int = {
+      new Buffer(a1).compareTo(new Buffer(a2))
+    }
+
+    def cursorRangeKeys(startIncluded:Array[Byte], endExcluded:Array[Byte], ro:ReadOptions=new ReadOptions)(func: Array[Byte] => Boolean): Unit = {
+      val iterator = db.iterator(ro)
+      iterator.seek(startIncluded);
+      try {
+        def check(key:Array[Byte]) = {
+          if ( compare(key,endExcluded) < 0) {
+            func(key)
+          } else {
+            false
+          }
+        }
+        while( iterator.hasNext && check(iterator.peekNext.getKey) ) {
+          iterator.next()
+        }
+      } finally {
+        iterator.close();
+      }
+    }
+
+    def cursorRange(startIncluded:Array[Byte], endExcluded:Array[Byte], ro:ReadOptions=new ReadOptions)(func: (Array[Byte],Array[Byte]) => Boolean): Unit = {
+      val iterator = db.iterator(ro)
+      iterator.seek(startIncluded);
+      try {
+        def check(key:Array[Byte]) = {
+          (compare(key,endExcluded) < 0) && func(key, iterator.peekNext.getValue)
+        }
+        while( iterator.hasNext && check(iterator.peekNext.getKey) ) {
+          iterator.next()
+        }
+      } finally {
+        iterator.close();
+      }
+    }
+
+    def lastKey(prefix:Array[Byte], ro:ReadOptions=new ReadOptions): Option[Array[Byte]] = {
+      val last = new Buffer(prefix).deepCopy().data
+      if ( last.length > 0 ) {
+        val pos = last.length-1
+        last(pos) = (last(pos)+1).toByte
+      }
+
+      if(isPureJavaVersion) {
+        // The pure java version of LevelDB does not support backward iteration.
+        var rc:Option[Array[Byte]] = None
+        cursorRangeKeys(prefix, last) { key=>
+          rc = Some(key)
+          true
+        }
+        rc
+      } else {
+        val iterator = db.iterator(ro)
+        try {
+
+          iterator.seek(last);
+          if ( iterator.hasPrev ) {
+            iterator.prev()
+          } else {
+            iterator.seekToLast()
+          }
+
+          if ( iterator.hasNext ) {
+            val key:Buffer = iterator.peekNext.getKey
+            if(key.startsWith(prefix)) {
+              Some(key)
+            } else {
+              None
+            }
+          } else {
+            None
+          }
+        } finally {
+          iterator.close();
+        }
+      }
+    }
+  }
+
+
+  def bytes(value:String) = value.getBytes("UTF-8")
+
+  import FileSupport._
+  def create_sequence_file(directory:File, id:Long, suffix:String) = directory / ("%016x%s".format(id, suffix))
+
+  def find_sequence_files(directory:File, suffix:String):TreeMap[Long, File] = {
+    TreeMap((directory.listFiles.flatMap { f=>
+      if( f.getName.endsWith(suffix) ) {
+        try {
+          val base = f.getName.stripSuffix(suffix)
+          val position = java.lang.Long.parseLong(base, 16);
+          Some(position -> f)
+        } catch {
+          case e:NumberFormatException => None
+        }
+      } else {
+        None
+      }
+    }): _* )
+  }
+
+  class CollectionMeta extends Serializable {
+    var size = 0L
+    var last_key:Array[Byte] = _
+  }
+}
+
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class LevelDBClient(store: LevelDBStore) {
+
+  import LevelDBClient._
+  import FileSupport._
+
+  val dispatchQueue = createQueue("leveldb")
+
+  /////////////////////////////////////////////////////////////////////
+  //
+  // Helpers
+  //
+  /////////////////////////////////////////////////////////////////////
+
+  def directory = store.directory
+  def logDirectory = Option(store.logDirectory).getOrElse(store.directory)
+
+  /////////////////////////////////////////////////////////////////////
+  //
+  // Public interface used by the DBManager
+  //
+  /////////////////////////////////////////////////////////////////////
+
+  def sync = store.sync;
+  def verifyChecksums = store.verifyChecksums
+
+  var log:RecordLog = _
+
+  var index:RichDB = _
+  var indexOptions:Options = _
+
+  var lastIndexSnapshotPos:Long = _
+  val snapshotRwLock = new ReentrantReadWriteLock(true)
+
+  var factory:DBFactory = _
+  val logRefs = HashMap[Long, LongCounter]()
+  
+  val collectionMeta = HashMap[Long, CollectionMeta]()
+
+  def dirtyIndexFile = directory / ("dirty"+INDEX_SUFFIX)
+  def tempIndexFile = directory / ("temp"+INDEX_SUFFIX)
+  def snapshotIndexFile(id:Long) = create_sequence_file(directory,id, INDEX_SUFFIX)
+
+  def createLog: RecordLog = {
+    new RecordLog(logDirectory, LOG_SUFFIX)
+  }
+
+  var writeExecutor:ExecutorService = _
+
+  def storeTrace(ascii:String, force:Boolean=false) = {
+    val time = new SimpleDateFormat("dd/MMM/yyyy:HH:mm::ss Z").format(new Date)
+    log.appender { appender =>
+      appender.append(LOG_TRACE, new AsciiBuffer("%s: %s".format(time, ascii)))
+      if( force ) {
+        appender.force
+      }
+    }
+  }
+
+  def retry[T](func : =>T):T = RetrySupport.retry(LevelDBClient, store.isStarted, func _)
+
+  def start() = {
+
+    // Lets check store compatibility...
+    directory.mkdirs()
+    val version_file = directory / "store-version.txt"
+    if (version_file.exists()) {
+      val ver = try {
+        var tmp: String = version_file.readText().trim()
+        if (tmp.startsWith(STORE_SCHEMA_PREFIX)) {
+          tmp.stripPrefix(STORE_SCHEMA_PREFIX).toInt
+        } else {
+          -1
+        }
+      } catch {
+        case e => throw new Exception("Unexpected version file format: " + version_file)
+      }
+      ver match {
+        case STORE_SCHEMA_VERSION => // All is good.
+        case _ => throw new Exception("Cannot open the store.  It's schema version is not supported.")
+      }
+    }
+    version_file.writeText(STORE_SCHEMA_PREFIX + STORE_SCHEMA_VERSION)
+
+    writeExecutor = Executors.newFixedThreadPool(1, new ThreadFactory() {
+      def newThread(r: Runnable) = {
+        val rc = new Thread(r, "LevelDB store io write")
+        rc.setDaemon(true)
+        rc
+      }
+    })
+
+    val factoryNames = store.indexFactory
+    factory = factoryNames.split("""(,|\s)+""").map(_.trim()).flatMap { name=>
+      try {
+        Some(this.getClass.getClassLoader.loadClass(name).newInstance().asInstanceOf[DBFactory])
+      } catch {
+        case e:Throwable =>
+          debug(e, "Could not load factory: "+name+" due to: "+e)
+          None
+      }
+    }.headOption.getOrElse(throw new Exception("Could not load any of the index factory classes: "+factoryNames))
+
+    if( factory.getClass.getName == "org.iq80.leveldb.impl.Iq80DBFactory") {
+      warn("Using the pure java LevelDB implementation which is still experimental.  Production users should use the JNI based LevelDB implementation instead.")
+    }
+
+    indexOptions = new Options();
+    indexOptions.createIfMissing(true);
+
+    indexOptions.maxOpenFiles(store.indexMaxOpenFiles)
+    indexOptions.blockRestartInterval(store.indexBlockRestartInterval)
+    indexOptions.paranoidChecks(store.paranoidChecks)
+    indexOptions.writeBufferSize(store.indexWriteBufferSize)
+    indexOptions.blockSize(store.indexBlockSize)
+    indexOptions.compressionType( store.indexCompression.toLowerCase match {
+      case "snappy" => CompressionType.SNAPPY
+      case "none" => CompressionType.NONE
+      case _ => CompressionType.SNAPPY
+    })
+
+    indexOptions.cacheSize(store.indexCacheSize)
+    indexOptions.logger(new Logger() {
+      val LOG = Log(factory.getClass.getName)
+      def log(msg: String) = LOG.debug("index: "+msg.stripSuffix("\n"))
+    })
+
+    log = createLog
+    log.logSize = store.logSize
+    log.on_log_rotate = ()=> {
+      // We snapshot the index every time we rotate the logs.
+      writeExecutor {
+        snapshotIndex(false)
+      }
+    }
+
+    retry {
+      log.open
+    }
+
+    // Find out what was the last snapshot.
+    val snapshots = find_sequence_files(directory, INDEX_SUFFIX)
+    var lastSnapshotIndex = snapshots.lastOption
+    lastIndexSnapshotPos = lastSnapshotIndex.map(_._1).getOrElse(0)
+
+    // Only keep the last snapshot..
+    snapshots.filterNot(_._1 == lastIndexSnapshotPos).foreach( _._2.recursiveDelete )
+    tempIndexFile.recursiveDelete
+
+    retry {
+
+      // Delete the dirty indexes
+      dirtyIndexFile.recursiveDelete
+      dirtyIndexFile.mkdirs()
+
+      lastSnapshotIndex.foreach { case (id, file) =>
+        // Resume log replay from a snapshot of the index..
+        try {
+          file.listFiles.foreach { file =>
+            file.linkTo(dirtyIndexFile / file.getName)
+          }
+        } catch {
+          case e:Exception =>
+            warn(e, "Could not recover snapshot of the index: "+e)
+            lastSnapshotIndex  = None
+        }
+      }
+
+      index = new RichDB(factory.open(dirtyIndexFile, indexOptions));
+      try {
+        loadCounters
+        index.put(DIRTY_INDEX_KEY, TRUE)
+        // Update the index /w what was stored on the logs..
+        var pos = lastIndexSnapshotPos;
+        var last_reported_at = System.currentTimeMillis();
+        var showing_progress = false
+        var last_reported_pos = 0L
+        try {
+          while (pos < log.appender_limit) {
+            val now = System.currentTimeMillis();
+            if( now > last_reported_at+1000 ) {
+              val at = pos-lastIndexSnapshotPos
+              val total = log.appender_limit-lastIndexSnapshotPos
+              val rate = (pos-last_reported_pos)*1000.0 / (now - last_reported_at)
+              val eta = (total-at)/rate
+              val remaining = if(eta > 60*60) {
+                "%.2f hrs".format(eta/(60*60))
+              } else if(eta > 60) {
+                "%.2f mins".format(eta/60)
+              } else {
+                "%.0f secs".format(eta)
+              }
+
+              System.out.print("Replaying recovery log: %f%% done (%,d/%,d bytes) @ %,.2f kb/s, %s remaining.     \r".format(
+                at*100.0/total, at, total, rate/1024, remaining))
+              showing_progress = true;
+              last_reported_at = now
+              last_reported_pos = pos
+            }
+
+
+            log.read(pos).map {
+              case (kind, data, nextPos) =>
+                kind match {
+                  case LOG_ADD_COLLECTION =>
+                    val record= decodeCollectionRecord(data)
+                    index.put(encodeLongKey(COLLECTION_PREFIX, record.getKey), data)
+                    collectionMeta.put(record.getKey, new CollectionMeta)
+
+                  case LOG_REMOVE_COLLECTION =>
+                    val record = decodeCollectionKeyRecord(data)
+                    // Delete the entries in the collection.
+                    index.cursorPrefixed(encodeLongKey(ENTRY_PREFIX, record.getKey), new ReadOptions) { (key, value)=>
+                      val record = decodeEntryRecord(value)
+                      val pos = if ( record.hasValueLocation ) {
+                        Some(record.getValueLocation)
+                      } else {
+                        None
+                      }
+                      pos.foreach(logRefDecrement(_))
+                      index.delete(key)
+                      true
+                    }
+                    index.delete(data)
+                    collectionMeta.remove(record.getKey)
+
+                  case LOG_ADD_ENTRY =>
+                    val record = decodeEntryRecord(data)
+
+                    val index_record = new EntryRecord.Bean()
+                    index_record.setValueLocation(record.getValueLocation)
+                    index_record.setValueLength(record.getValueLength)
+                    val    index_value = encodeEntryRecord(index_record.freeze()).toByteArray
+
+                    index.put(encodeEntryKey(ENTRY_PREFIX, record.getCollectionKey, record.getEntryKey), index_value)
+
+                    if ( record.hasValueLocation ) {
+                      logRefIncrement(record.getValueLocation)
+                    }
+                    collectionIncrementSize(record.getCollectionKey, record.getEntryKey.toByteArray)
+
+                  case LOG_REMOVE_ENTRY =>
+                    val record = decodeEntryRecord(data)
+
+                    // Figure out which log file this message reference is pointing at..
+                    if ( record.hasValueLocation ) {
+                      logRefDecrement(record.getValueLocation)
+                    }
+
+                    index.delete(encodeEntryKey(ENTRY_PREFIX, record.getCollectionKey, record.getEntryKey))
+                    collectionDecrementSize( record.getCollectionKey)
+
+                  case _ => // Skip other records, they don't modify the index.
+
+                }
+                pos = nextPos
+            }
+          }
+        }
+        catch {
+          case e:Throwable => e.printStackTrace()
+        }
+        if(showing_progress) {
+          System.out.print("                                                                       \r");
+        }
+
+      } catch {
+        case e:Throwable =>
+          // replay failed.. good thing we are in a retry block...
+          index.close
+          throw e;
+      }
+    }
+  }
+
+  private def logRefDecrement(pos: Long) {
+    log.log_info(pos).foreach { logInfo =>
+      logRefs.get(logInfo.position).foreach { counter =>
+        if (counter.decrementAndGet() == 0) {
+          logRefs.remove(logInfo.position)
+        }
+      }
+    }
+  }
+
+  private def logRefIncrement(pos: Long) {
+    log.log_info(pos).foreach { logInfo =>
+      logRefs.getOrElseUpdate(logInfo.position, new LongCounter()).incrementAndGet()
+    }
+  }
+
+  private def collectionDecrementSize(key: Long) {
+    collectionMeta.get(key).foreach(_.size -= 1)
+  }
+  private def collectionIncrementSize(key: Long, last_key:Array[Byte]) {
+    collectionMeta.get(key).foreach{ x=> 
+      x.size += 1
+      x.last_key = last_key
+    }
+  }
+
+  private def storeCounters = {
+    def storeMap(key:Array[Byte], map:HashMap[Long, _ <: AnyRef]) {
+      val baos = new ByteArrayOutputStream()
+      val os = new ObjectOutputStream(baos);
+      os.writeInt(map.size);
+      map.foreach {
+        case (k, v) =>
+          os.writeLong(k)
+          os.writeObject(v)
+      }
+      os.close()
+      index.put(key, baos.toByteArray)
+    }
+    storeMap(LOG_REF_INDEX_KEY, logRefs)
+    storeMap(COLLECTION_META_KEY, collectionMeta)
+  }
+
+  private def loadCounters = {
+    def loadMap[T <: AnyRef](key:Array[Byte], map:HashMap[Long, T]) {
+      map.clear()
+      index.get(key, new ReadOptions).foreach { value=>
+        val bais = new ByteArrayInputStream(value)
+        val is = new ObjectInputStream(bais);
+        var remaining = is.readInt()
+        while(remaining > 0 ) {
+          map.put(is.readLong(), is.readObject().asInstanceOf[T])
+          remaining-=1
+        }
+      }
+    }
+    loadMap(LOG_REF_INDEX_KEY, logRefs)
+    loadMap(COLLECTION_META_KEY, collectionMeta)
+  }
+  
+  def stop() = {
+    if( writeExecutor!=null ) {
+      writeExecutor.shutdown
+      writeExecutor.awaitTermination(60, TimeUnit.SECONDS)
+      writeExecutor = null
+
+      // this blocks until all io completes..
+      // Suspend also deletes the index.
+      suspend()
+
+      if (log != null) {
+        log.close
+      }
+      copyDirtyIndexToSnapshot
+      log = null
+    }
+  }
+
+  def usingIndex[T](func: =>T):T = {
+    val lock = snapshotRwLock.readLock();
+    lock.lock()
+    try {
+      func
+    } finally {
+      lock.unlock()
+    }
+  }
+
+  def retryUsingIndex[T](func: =>T):T = retry(usingIndex( func ))
+
+  /**
+   * TODO: expose this via management APIs, handy if you want to
+   * do a file system level snapshot and want the data to be consistent.
+   */
+  def suspend() = {
+    // Make sure we are the only ones accessing the index. since
+    // we will be closing it to create a consistent snapshot.
+    snapshotRwLock.writeLock().lock()
+
+    // Close the index so that it's files are not changed async on us.
+    storeCounters
+    index.put(DIRTY_INDEX_KEY, FALSE, new WriteOptions().sync(true))
+    index.close
+  }
+
+  /**
+   * TODO: expose this via management APIs, handy if you want to
+   * do a file system level snapshot and want the data to be consistent.
+   */
+  def resume() = {
+    // re=open it..
+    retry {
+      index = new RichDB(factory.open(dirtyIndexFile, indexOptions));
+      index.put(DIRTY_INDEX_KEY, TRUE)
+    }
+    snapshotRwLock.writeLock().unlock()
+  }
+
+  def copyDirtyIndexToSnapshot {
+    if( log.appender_limit == lastIndexSnapshotPos  ) {
+      // no need to snapshot again...
+      return
+    }
+
+    // Where we start copying files into.  Delete this on
+    // restart.
+    val tmpDir = tempIndexFile
+    tmpDir.mkdirs()
+
+    try {
+
+      // Hard link all the index files.
+      dirtyIndexFile.listFiles.foreach { file =>
+        file.linkTo(tmpDir / file.getName)
+      }
+
+      // Rename to signal that the snapshot is complete.
+      val newSnapshotIndexPos = log.appender_limit
+      tmpDir.renameTo(snapshotIndexFile(newSnapshotIndexPos))
+      snapshotIndexFile(lastIndexSnapshotPos).recursiveDelete
+      lastIndexSnapshotPos = newSnapshotIndexPos
+
+    } catch {
+      case e: Exception =>
+        // if we could not snapshot for any reason, delete it as we don't
+        // want a partial check point..
+        warn(e, "Could not snapshot the index: " + e)
+        tmpDir.recursiveDelete
+    }
+  }
+
+  def snapshotIndex(sync:Boolean=false):Unit = {
+    suspend()
+    try {
+      if( sync ) {
+        log.current_appender.force
+      }
+      if( log.appender_limit == lastIndexSnapshotPos  ) {
+        // no need to snapshot again...
+        return
+      }
+      copyDirtyIndexToSnapshot
+    } finally {
+      resume()
+    }
+  }
+
+  def purge() = {
+    suspend()
+    try{
+      log.close
+      locked_purge
+    } finally {
+      retry {
+        log.open
+      }
+      resume()
+    }
+  }
+
+  def locked_purge {
+    logDirectory.listFiles.foreach {x =>
+      if (x.getName.endsWith(".log")) {
+        x.delete()
+      }
+    }
+    directory.listFiles.foreach {x =>
+      if (x.getName.endsWith(".index")) {
+        x.recursiveDelete
+      }
+    }
+  }
+
+  def addCollection(record: CollectionRecord.Buffer) = {
+    val key = encodeLongKey(COLLECTION_PREFIX, record.getKey)
+    val value = record.toUnframedBuffer
+    retryUsingIndex {
+      log.appender { appender =>
+        appender.append(LOG_ADD_COLLECTION, value)
+        index.put(key, value.toByteArray)
+      }
+    }
+    collectionMeta.put(record.getKey, new CollectionMeta)
+  }
+
+  def getLogAppendPosition = log.appender_limit
+
+  def listCollections: Seq[(Long, CollectionRecord.Buffer)] = {
+    val rc = ListBuffer[(Long, CollectionRecord.Buffer)]()
+    retryUsingIndex {
+      val ro = new ReadOptions
+      ro.verifyChecksums(verifyChecksums)
+      ro.fillCache(false)
+      index.cursorPrefixed(COLLECTION_PREFIX_ARRAY, ro) { (key, value) =>
+        rc.append(( decodeLongKey(key)._2, CollectionRecord.FACTORY.parseUnframed(value) ))
+        true // to continue cursoring.
+      }
+    }
+    rc
+  }
+
+  def removeCollection(collectionKey: Long) = {
+    val key = encodeLongKey(COLLECTION_PREFIX, collectionKey)
+    val value = encodeVLong(collectionKey)
+    val entryKeyPrefix = encodeLongKey(ENTRY_PREFIX, collectionKey)
+    collectionMeta.remove(collectionKey)
+    retryUsingIndex {
+      log.appender { appender =>
+        appender.append(LOG_REMOVE_COLLECTION, new Buffer(value))
+      }
+
+      val ro = new ReadOptions
+      ro.fillCache(false)
+      ro.verifyChecksums(verifyChecksums)
+      index.cursorPrefixed(entryKeyPrefix, ro) { (key, value)=>
+        val record = decodeEntryRecord(value)
+        val pos = if ( record.hasValueLocation ) {
+          Some(record.getValueLocation)
+        } else {
+          None
+        }
+        pos.foreach(logRefDecrement(_))
+        index.delete(key)
+        true
+      }
+      index.delete(key)
+    }
+  }
+
+  def collectionEmpty(collectionKey: Long) = {
+    val key = encodeLongKey(COLLECTION_PREFIX, collectionKey)
+    val value = encodeVLong(collectionKey)
+    val entryKeyPrefix = encodeLongKey(ENTRY_PREFIX, collectionKey)
+
+    val meta = collectionMeta.getOrElseUpdate(collectionKey, new CollectionMeta)
+    meta.size = 0
+    meta.last_key = null
+    
+    retryUsingIndex {
+      index.get(key).foreach { collectionData =>
+        log.appender { appender =>
+          appender.append(LOG_REMOVE_COLLECTION, new Buffer(value))
+          appender.append(LOG_ADD_COLLECTION, new Buffer(collectionData))
+        }
+
+        val ro = new ReadOptions
+        ro.fillCache(false)
+        ro.verifyChecksums(verifyChecksums)
+        index.cursorPrefixed(entryKeyPrefix, ro) { (key, value)=>
+          val record = decodeEntryRecord(value)
+          val pos = if ( record.hasValueLocation ) {
+            Some(record.getValueLocation)
+          } else {
+            None
+          }
+          pos.foreach(logRefDecrement(_))
+          index.delete(key)
+          true
+        }
+      }
+    }
+  }
+
+  def queueCursor(collectionKey: Long, seq:Long)(func: (Message)=>Boolean) = {
+    collectionCursor(collectionKey, encodeLong(seq)) { (key, value) =>
+      val seq = decodeLong(key)
+      var locator = (value.getValueLocation, value.getValueLength)
+      val msg = getMessage(locator)
+      msg.getMessageId().setEntryLocator((collectionKey, seq))
+      msg.getMessageId().setDataLocator(locator)
+      func(msg)
+    }
+  }
+
+  def getAckPosition(subKey: Long): Long = {
+    retryUsingIndex {
+      index.get(encodeEntryKey(ENTRY_PREFIX, subKey, ACK_POSITION)).map{ value=>
+        val record = decodeEntryRecord(value)
+        record.getValueLocation()
+      }.getOrElse(0L)
+    }
+  }
+
+  def getMessage(locator:AnyRef):Message = {
+    assert(locator!=null)
+    val buffer = locator match {
+      case x:MessageRecord =>
+        // Encoded form is still in memory..
+        Some(x.data)
+      case (pos:Long, len:Int) =>
+        // Load the encoded form from disk.
+        log.read(pos, len).map(new Buffer(_))
+    }
+
+    // Lets decode
+    buffer.map{ x =>
+      var data = if( store.snappyCompressLogs ) {
+        Snappy.uncompress(x)
+      } else {
+        x
+      }
+      store.wireFormat.unmarshal(new ByteSequence(data.data, data.offset, data.length)).asInstanceOf[Message]
+    }.getOrElse(null)
+  }
+
+
+  def collectionCursor(collectionKey: Long, cursorPosition:Buffer)(func: (Buffer, EntryRecord.Buffer)=>Boolean) = {
+    val ro = new ReadOptions
+    ro.fillCache(true)
+    ro.verifyChecksums(verifyChecksums)
+    val start = encodeEntryKey(ENTRY_PREFIX, collectionKey, cursorPosition)
+    val end = encodeLongKey(ENTRY_PREFIX, collectionKey+1)
+    retryUsingIndex {
+      index.cursorRange(start, end, ro) { case (key, value) =>
+        func(key.buffer.moveHead(9), EntryRecord.FACTORY.parseUnframed(value))
+      }
+    }
+  }
+
+  def collectionSize(collectionKey: Long) = {
+    collectionMeta.get(collectionKey).map(_.size).getOrElse(0L)
+  }
+
+  def collectionIsEmpty(collectionKey: Long) = {
+    val entryKeyPrefix = encodeLongKey(ENTRY_PREFIX, collectionKey)
+    var empty = true
+    retryUsingIndex {
+      val ro = new ReadOptions
+      ro.fillCache(false)
+      ro.verifyChecksums(verifyChecksums)
+      index.cursorKeysPrefixed(entryKeyPrefix, ro) { key =>
+        empty = false
+        false
+      }
+    }
+    empty
+  }
+
+  val max_write_message_latency = TimeMetric()
+  val max_write_enqueue_latency = TimeMetric()
+
+  val max_index_write_latency = TimeMetric()
+
+  def store(uows: Array[DelayableUOW]) {
+    retryUsingIndex {
+      log.appender { appender =>
+
+        var syncNeeded = false
+        index.write(new WriteOptions, max_index_write_latency) { batch =>
+
+          var write_message_total = 0L
+          var write_enqueue_total = 0L
+
+          uows.foreach { uow =>
+
+
+            uow.actions.foreach { case (msg, action) =>
+              val messageRecord = action.messageRecord
+              var log_info:LogInfo = null
+              var pos = -1L
+              var dataLocator:(Long, Int) = null
+
+              if (messageRecord != null && messageRecord.locator==null) {
+                val start = System.nanoTime()
+                val p = appender.append(LOG_DATA, messageRecord.data)
+                pos = p._1
+                log_info = p._2
+                dataLocator = (pos, messageRecord.data.length)
+                messageRecord.locator = dataLocator
+                write_message_total += System.nanoTime() - start
+              }
+
+
+              action.dequeues.foreach { entry =>
+                val keyLocation = entry.id.getEntryLocator.asInstanceOf[(Long, Long)]
+                val key = encodeEntryKey(ENTRY_PREFIX, keyLocation._1, keyLocation._2)
+
+                if( dataLocator==null ) {
+                  dataLocator = entry.id.getDataLocator match {
+                    case x:(Long, Int) => x
+                    case x:MessageRecord => x.locator
+                    case _ => throw new RuntimeException("Unexpected locator type")
+                  }
+                }
+
+                val log_record = new EntryRecord.Bean()
+                log_record.setCollectionKey(entry.queueKey)
+                log_record.setEntryKey(new Buffer(key, 9, 8))
+                log_record.setValueLocation(dataLocator._1)
+                appender.append(LOG_REMOVE_ENTRY, encodeEntryRecord(log_record.freeze()))
+
+                batch.delete(key)
+                logRefDecrement(dataLocator._1)
+                collectionDecrementSize(entry.queueKey)
+              }
+
+              action.enqueues.foreach { entry =>
+                
+                if(dataLocator ==null ) {
+                  dataLocator = entry.id.getDataLocator match {
+                    case x:(Long, Int) => x
+                    case x:MessageRecord => x.locator
+                    case _ =>
+                      throw new RuntimeException("Unexpected locator type")
+                  }
+                }
+
+                val start = System.nanoTime()
+
+                val key = encodeEntryKey(ENTRY_PREFIX, entry.queueKey, entry.queueSeq)
+
+                assert(entry.id.getDataLocator()!=null)
+
+                val log_record = new EntryRecord.Bean()
+                log_record.setCollectionKey(entry.queueKey)
+                log_record.setEntryKey(new Buffer(key, 9, 8))
+                log_record.setValueLocation(dataLocator._1)
+                log_record.setValueLength(dataLocator._2)
+                appender.append(LOG_ADD_ENTRY, encodeEntryRecord(log_record.freeze()))
+
+                val index_record = new EntryRecord.Bean()
+                index_record.setValueLocation(dataLocator._1)
+                index_record.setValueLength(dataLocator._2)
+                batch.put(key,  encodeEntryRecord(index_record.freeze()).toByteArray)
+
+                val log_data = encodeEntryRecord(log_record.freeze())
+                val index_data = encodeEntryRecord(index_record.freeze()).toByteArray
+
+                appender.append(LOG_ADD_ENTRY, log_data)
+                batch.put(key, index_data)
+
+                Option(log_info).orElse(log.log_info(dataLocator._1)).foreach { logInfo =>
+                  logRefs.getOrElseUpdate(logInfo.position, new LongCounter()).incrementAndGet()
+                }
+
+                collectionIncrementSize(entry.queueKey, log_record.getEntryKey.toByteArray)
+                write_enqueue_total += System.nanoTime() - start
+              }
+
+            }
+            uow.subAcks.foreach { entry =>
+              val key = encodeEntryKey(ENTRY_PREFIX, entry.subKey, ACK_POSITION)
+              val log_record = new EntryRecord.Bean()
+              log_record.setCollectionKey(entry.subKey)
+              log_record.setEntryKey(ACK_POSITION)
+              log_record.setValueLocation(entry.ackPosition)
+              appender.append(LOG_ADD_ENTRY, encodeEntryRecord(log_record.freeze()))
+
+              val index_record = new EntryRecord.Bean()
+              index_record.setValueLocation(entry.ackPosition)
+              batch.put(key, encodeEntryRecord(log_record.freeze()).toByteArray)
+            }
+
+            if( !syncNeeded && uow.syncNeeded ) {
+              syncNeeded = true
+            }
+          }
+
+          max_write_message_latency.add(write_message_total)
+          max_write_enqueue_latency.add(write_enqueue_total)
+        }
+        if( syncNeeded && sync ) {
+          appender.force
+        }
+      } // end of log.appender { block }
+
+      // now that data is logged.. locate message from the data in the logs
+      uows.foreach { uow =>
+        uow.actions.foreach { case (msg, action) =>
+          val messageRecord = action.messageRecord
+          if (messageRecord != null) {
+            messageRecord.id.setDataLocator(messageRecord.locator)
+          }
+        }
+      }
+    }
+  }
+
+  def getCollectionEntries(collectionKey: Long, firstSeq:Long, lastSeq:Long): Seq[(Buffer, EntryRecord.Buffer)] = {
+    var rc = ListBuffer[(Buffer, EntryRecord.Buffer)]()
+    val ro = new ReadOptions
+    ro.verifyChecksums(verifyChecksums)
+    ro.fillCache(true)
+    retryUsingIndex {
+      index.snapshot { snapshot =>
+        ro.snapshot(snapshot)
+        val start = encodeEntryKey(ENTRY_PREFIX, collectionKey, firstSeq)
+        val end = encodeEntryKey(ENTRY_PREFIX, collectionKey, lastSeq+1)
+        index.cursorRange( start, end, ro ) { (key, value) =>
+          val (_, _, seq) = decodeEntryKey(key)
+          rc.append((seq, EntryRecord.FACTORY.parseUnframed(value)))
+          true
+        }
+      }
+    }
+    rc
+  }
+
+  def getLastQueueEntrySeq(collectionKey: Long): Long = {
+    getLastCollectionEntryKey(collectionKey).map(_.bigEndianEditor().readLong()).getOrElse(0L)
+  }
+
+  def getLastCollectionEntryKey(collectionKey: Long): Option[Buffer] = {
+    collectionMeta.get(collectionKey).flatMap(x=> Option(x.last_key)).map(new Buffer(_))
+  }
+
+  def gc(topicPositions:Seq[(Long, Long)]):Unit = {
+
+    // Delete message refs for topics who's consumers have advanced..
+    if( !topicPositions.isEmpty ) {
+      retryUsingIndex {
+        index.write(new WriteOptions, max_index_write_latency) { batch =>
+          for( (topic, first) <- topicPositions ) {
+            val ro = new ReadOptions
+            ro.fillCache(true)
+            ro.verifyChecksums(verifyChecksums)
+            val start = encodeEntryKey(ENTRY_PREFIX, topic, 0)
+            val end =  encodeEntryKey(ENTRY_PREFIX, topic, first)
+            index.cursorRange(start, end, ro) { case (key, value) =>
+              val entry = EntryRecord.FACTORY.parseUnframed(value)
+              batch.delete(key)
+              logRefDecrement(entry.getValueLocation)
+              true
+            }
+          }
+        }
+      }
+    }
+
+    import collection.JavaConversions._
+    lastIndexSnapshotPos
+    val emptyJournals = log.log_infos.keySet.toSet -- logRefs.keySet
+
+    // We don't want to delete any journals that the index has not snapshot'ed or
+    // the the
+    val deleteLimit = log.log_info(lastIndexSnapshotPos).map(_.position).
+          getOrElse(lastIndexSnapshotPos).min(log.appender_start)
+
+    emptyJournals.foreach { id =>
+      if ( id < deleteLimit ) {
+        log.delete(id)
+      }
+    }
+  }
+
+}

Propchange: activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala?rev=1389882&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala (added)
+++ activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala Tue Sep 25 14:32:28 2012
@@ -0,0 +1,622 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.leveldb
+
+import org.apache.activemq.broker.BrokerService
+import org.apache.activemq.broker.BrokerServiceAware
+import org.apache.activemq.broker.ConnectionContext
+import org.apache.activemq.command._
+import org.apache.activemq.openwire.OpenWireFormat
+import org.apache.activemq.usage.SystemUsage
+import java.io.File
+import java.io.IOException
+import java.util.concurrent.ExecutionException
+import java.util.concurrent.Future
+import java.util.concurrent.atomic.AtomicLong
+import reflect.BeanProperty
+import org.apache.activemq.store._
+import java.util._
+import scala.collection.mutable.ListBuffer
+import javax.management.ObjectName
+import org.apache.activemq.broker.jmx.AnnotatedMBean
+import org.apache.activemq.util._
+import org.apache.kahadb.util.LockFile
+import org.apache.activemq.leveldb.util.{RetrySupport, FileSupport, Log}
+
+object LevelDBStore extends Log {
+  
+  val DONE = new CountDownFuture();
+  DONE.countDown
+  
+  def toIOException(e: Throwable): IOException = {
+    if (e.isInstanceOf[ExecutionException]) {
+      var cause: Throwable = (e.asInstanceOf[ExecutionException]).getCause
+      if (cause.isInstanceOf[IOException]) {
+        return cause.asInstanceOf[IOException]
+      }
+    }
+    if (e.isInstanceOf[IOException]) {
+      return e.asInstanceOf[IOException]
+    }
+    return IOExceptionSupport.create(e)
+  }
+
+  def waitOn(future: Future[AnyRef]): Unit = {
+    try {
+      future.get
+    }
+    catch {
+      case e: Throwable => {
+        throw toIOException(e)
+      }
+    }
+  }
+}
+
+case class DurableSubscription(subKey:Long, topicKey:Long, info: SubscriptionInfo) {
+  var lastAckPosition = 0L
+  var cursorPosition = 0L
+}
+
+class LevelDBStoreView(val store:LevelDBStore) extends LevelDBStoreViewMBean {
+  import store._
+
+  def getAsyncBufferSize = asyncBufferSize
+  def getIndexDirectory = directory.getCanonicalPath
+  def getLogDirectory = Option(logDirectory).getOrElse(directory).getCanonicalPath
+  def getIndexBlockRestartInterval = indexBlockRestartInterval
+  def getIndexBlockSize = indexBlockSize
+  def getIndexCacheSize = indexCacheSize
+  def getIndexCompression = indexCompression
+  def getIndexFactory = db.client.factory.getClass.getName
+  def getIndexMaxOpenFiles = indexMaxOpenFiles
+  def getIndexWriteBufferSize = indexWriteBufferSize
+  def getLogSize = logSize
+  def getParanoidChecks = paranoidChecks
+  def getSync = sync
+  def getVerifyChecksums = verifyChecksums
+
+  def getUowClosedCounter = db.uowClosedCounter
+  def getUowCanceledCounter = db.uowCanceledCounter
+  def getUowStoringCounter = db.uowStoringCounter
+  def getUowStoredCounter = db.uowStoredCounter
+
+  def getUowMaxCompleteLatency = db.uow_complete_latency.get
+  def getMaxIndexWriteLatency = db.client.max_index_write_latency.get
+  def getMaxLogWriteLatency = db.client.log.max_log_write_latency.get
+  def getMaxLogFlushLatency = db.client.log.max_log_flush_latency.get
+  def getMaxLogRotateLatency = db.client.log.max_log_rotate_latency.get
+
+  def resetUowMaxCompleteLatency = db.uow_complete_latency.reset
+  def resetMaxIndexWriteLatency = db.client.max_index_write_latency.reset
+  def resetMaxLogWriteLatency = db.client.log.max_log_write_latency.reset
+  def resetMaxLogFlushLatency = db.client.log.max_log_flush_latency.reset
+  def resetMaxLogRotateLatency = db.client.log.max_log_rotate_latency.reset
+
+  def getIndexStats = db.client.index.getProperty("leveldb.stats")
+}
+
+import LevelDBStore._
+
+class LevelDBStore extends ServiceSupport with BrokerServiceAware with PersistenceAdapter with TransactionStore {
+
+  final val wireFormat = new OpenWireFormat
+  final val db = new DBManager(this)
+
+  @BeanProperty
+  var directory: File = null
+  @BeanProperty
+  var logDirectory: File = null
+  
+  @BeanProperty
+  var logSize: Long = 1024 * 1024 * 100
+  @BeanProperty
+  var indexFactory: String = "org.fusesource.leveldbjni.JniDBFactory, org.iq80.leveldb.impl.Iq80DBFactory"
+  @BeanProperty
+  var sync: Boolean = true
+  @BeanProperty
+  var verifyChecksums: Boolean = false
+  @BeanProperty
+  var indexMaxOpenFiles: Int = 1000
+  @BeanProperty
+  var indexBlockRestartInterval: Int = 16
+  @BeanProperty
+  var paranoidChecks: Boolean = false
+  @BeanProperty
+  var indexWriteBufferSize: Int = 1024*1024*6
+  @BeanProperty
+  var indexBlockSize: Int = 4 * 1024
+  @BeanProperty
+  var indexCompression: String = "snappy"
+  @BeanProperty
+  var logCompression: String = "none"
+  @BeanProperty
+  var indexCacheSize: Long = 1024 * 1024 * 256L
+  @BeanProperty
+  var flushDelay = 1000*5
+  @BeanProperty
+  var asyncBufferSize = 1024*1024*4
+  @BeanProperty
+  var monitorStats = false
+  @BeanProperty
+  var failIfLocked = false
+
+  var purgeOnStatup: Boolean = false
+  var brokerService: BrokerService = null
+
+  val queues = collection.mutable.HashMap[ActiveMQQueue, LevelDBStore#LevelDBMessageStore]()
+  val topics = collection.mutable.HashMap[ActiveMQTopic, LevelDBStore#LevelDBTopicMessageStore]()
+  val topicsById = collection.mutable.HashMap[Long, LevelDBStore#LevelDBTopicMessageStore]()
+
+  override def toString: String = {
+    return "LevelDB:[" + directory.getAbsolutePath + "]"
+  }
+
+  def objectName = {
+    var brokerON = brokerService.getBrokerObjectName
+    val broker_name = brokerON.getKeyPropertyList().get("BrokerName")
+    new ObjectName(brokerON.getDomain() + ":" +
+            "BrokerName="+JMXSupport.encodeObjectNamePart(broker_name)+ "," +
+            "Type=LevelDBStore");
+  }
+
+  def retry[T](func : =>T):T = RetrySupport.retry(LevelDBStore, isStarted, func _)
+
+  var lock_file: LockFile = _
+
+  var snappyCompressLogs = false
+
+  def doStart: Unit = {
+    import FileSupport._
+
+    snappyCompressLogs = logCompression.toLowerCase == "snappy" && Snappy != null
+    debug("starting")
+    if ( lock_file==null ) {
+      lock_file = new LockFile(directory / "lock", true)
+    }
+
+    // Expose a JMX bean to expose the status of the store.
+    if(brokerService!=null){
+      try {
+        AnnotatedMBean.registerMBean(brokerService.getManagementContext, new LevelDBStoreView(this), objectName)
+      } catch {
+        case e: Throwable => {
+          warn(e, "LevelDB Store could not be registered in JMX: " + e.getMessage)
+        }
+      }
+    }
+
+    if (failIfLocked) {
+      lock_file.lock()
+    } else {
+      retry {
+        lock_file.lock()
+      }
+    }
+
+    if (purgeOnStatup) {
+      purgeOnStatup = false
+      db.client.locked_purge
+      info("Purged: "+this)
+    }
+
+    db.start
+    db.loadCollections
+    debug("started")
+  }
+
+  def doStop(stopper: ServiceStopper): Unit = {
+    db.stop
+    lock_file.unlock()
+    if(brokerService!=null){
+      brokerService.getManagementContext().unregisterMBean(objectName);
+    }
+    info("Stopped "+this)
+  }
+
+  def setBrokerService(brokerService: BrokerService): Unit = {
+    this.brokerService = brokerService
+  }
+
+  def setBrokerName(brokerName: String): Unit = {
+  }
+
+  def setUsageManager(usageManager: SystemUsage): Unit = {
+  }
+
+  def deleteAllMessages: Unit = {
+    purgeOnStatup = true
+  }
+
+  def getLastMessageBrokerSequenceId: Long = {
+    return 0
+  }
+
+  def createTransactionStore = this
+
+  val transactions = collection.mutable.HashMap[TransactionId, Transaction]()
+  
+  trait TransactionAction {
+    def apply(uow:DelayableUOW):Unit
+  }
+  
+  case class Transaction(id:TransactionId) {
+    val commitActions = ListBuffer[TransactionAction]() 
+    def add(store:LevelDBMessageStore, message: Message, delay:Boolean) = {
+      commitActions += new TransactionAction() {
+        def apply(uow:DelayableUOW) = {
+          store.doAdd(uow, message, delay)
+        }
+      }
+    }
+    def remove(store:LevelDBMessageStore, msgid:MessageId) = {
+      commitActions += new TransactionAction() {
+        def apply(uow:DelayableUOW) = {
+          store.doRemove(uow, msgid)
+        }
+      }
+    }
+    def updateAckPosition(store:LevelDBTopicMessageStore, sub: DurableSubscription, position: Long) = {
+      commitActions += new TransactionAction() {
+        def apply(uow:DelayableUOW) = {
+          store.doUpdateAckPosition(uow, sub, position)
+        }
+      }
+    }
+  }
+  
+  def transaction(txid: TransactionId) = transactions.getOrElseUpdate(txid, Transaction(txid))
+  
+  def commit(txid: TransactionId, wasPrepared: Boolean, preCommit: Runnable, postCommit: Runnable) = {
+    preCommit.run()
+    transactions.remove(txid) match {
+      case None=>
+        println("The transaction does not exist")
+        postCommit.run()
+      case Some(tx)=>
+        withUow { uow =>
+          for( action <- tx.commitActions ) {
+            action(uow)
+          }
+          uow.addCompleteListener( postCommit.run() )
+        }
+    }
+  }
+
+  def rollback(txid: TransactionId) = {
+    transactions.remove(txid) match {
+      case None=>
+        println("The transaction does not exist")
+      case Some(tx)=>
+    }
+  }
+
+  def prepare(tx: TransactionId) = {
+    sys.error("XA transactions not yet supported.")
+  }
+  def recover(listener: TransactionRecoveryListener) = {
+  }
+
+  def createQueueMessageStore(destination: ActiveMQQueue) = {
+    this.synchronized(queues.get(destination)).getOrElse(db.createQueueStore(destination))
+  }
+
+  def createQueueMessageStore(destination: ActiveMQQueue, key: Long):LevelDBMessageStore = {
+    var rc = new LevelDBMessageStore(destination, key)
+    this.synchronized {
+      queues.put(destination, rc)
+    }
+    rc
+  }
+
+  def removeQueueMessageStore(destination: ActiveMQQueue): Unit = this synchronized {
+    queues.remove(destination).foreach { store=>
+      db.destroyQueueStore(store.key)
+    }
+  }
+
+  def createTopicMessageStore(destination: ActiveMQTopic): TopicMessageStore = {
+    this.synchronized(topics.get(destination)).getOrElse(db.createTopicStore(destination))
+  }
+
+  def createTopicMessageStore(destination: ActiveMQTopic, key: Long):LevelDBTopicMessageStore = {
+    var rc = new LevelDBTopicMessageStore(destination, key)
+    this synchronized {
+      topics.put(destination, rc)
+      topicsById.put(key, rc)
+    }
+    rc
+  }
+
+  def removeTopicMessageStore(destination: ActiveMQTopic): Unit = {
+    topics.remove(destination).foreach { store=>
+      store.subscriptions.values.foreach { sub =>
+        db.removeSubscription(sub)
+      }
+      store.subscriptions.clear()
+      db.destroyQueueStore(store.key)
+    }
+  }
+
+  def getLogAppendPosition = db.getLogAppendPosition
+
+  def getDestinations: Set[ActiveMQDestination] = {
+    import collection.JavaConversions._
+    var rc: HashSet[ActiveMQDestination] = new HashSet[ActiveMQDestination]
+    rc.addAll(topics.keys)
+    rc.addAll(queues.keys)
+    return rc
+  }
+
+  def getLastProducerSequenceId(id: ProducerId): Long = {
+    return -1
+  }
+
+  def size: Long = {
+    return 0
+  }
+
+  def checkpoint(sync: Boolean): Unit = db.checkpoint(sync)
+
+  def withUow[T](func:(DelayableUOW)=>T):T = {
+    val uow = db.createUow
+    try {
+      func(uow)
+    } finally {
+      uow.release()
+    }
+  }
+
+  private def subscriptionKey(clientId: String, subscriptionName: String): String = {
+    return clientId + ":" + subscriptionName
+  }
+
+  case class LevelDBMessageStore(dest: ActiveMQDestination, val key: Long) extends AbstractMessageStore(dest) {
+
+    protected val lastSeq: AtomicLong = new AtomicLong(0)
+    protected var cursorPosition: Long = 0
+
+    lastSeq.set(db.getLastQueueEntrySeq(key))
+
+    def doAdd(uow: DelayableUOW, message: Message, delay:Boolean): CountDownFuture = {
+      uow.enqueue(key, lastSeq.incrementAndGet, message, delay)
+    }
+
+
+    override def asyncAddQueueMessage(context: ConnectionContext, message: Message) = asyncAddQueueMessage(context, message, false)
+    override def asyncAddQueueMessage(context: ConnectionContext, message: Message, delay: Boolean): Future[AnyRef] = {
+      if(  message.getTransactionId!=null ) {
+        transaction(message.getTransactionId).add(this, message, delay)
+        DONE
+      } else {
+        withUow { uow=>
+          doAdd(uow, message, delay)
+        }
+      }
+    }
+
+    override def addMessage(context: ConnectionContext, message: Message) = addMessage(context, message, false)
+    override def addMessage(context: ConnectionContext, message: Message, delay: Boolean): Unit = {
+      waitOn(asyncAddQueueMessage(context, message, delay))
+    }
+
+    def doRemove(uow: DelayableUOW, id: MessageId): CountDownFuture = {
+      uow.dequeue(key, id)
+    }
+
+    override def removeAsyncMessage(context: ConnectionContext, ack: MessageAck): Unit = {
+      if(  ack.getTransactionId!=null ) {
+        transaction(ack.getTransactionId).remove(this, ack.getLastMessageId)
+        DONE
+      } else {
+        waitOn(withUow{uow=>
+          doRemove(uow, ack.getLastMessageId)
+        })
+      }
+    }
+
+    def removeMessage(context: ConnectionContext, ack: MessageAck): Unit = {
+      removeAsyncMessage(context, ack)
+    }
+
+    def getMessage(id: MessageId): Message = {
+      var message: Message = db.getMessage(id)
+      if (message == null) {
+        throw new IOException("Message id not found: " + id)
+      }
+      return message
+    }
+
+    def removeAllMessages(context: ConnectionContext): Unit = {
+      db.collectionEmpty(key)
+      cursorPosition = 0
+    }
+
+    def getMessageCount: Int = {
+      return db.collectionSize(key).toInt
+    }
+
+    override def isEmpty: Boolean = {
+      return db.collectionIsEmpty(key)
+    }
+
+    def recover(listener: MessageRecoveryListener): Unit = {
+      cursorPosition = db.cursorMessages(key, listener, 0)
+    }
+
+    def resetBatching: Unit = {
+      cursorPosition = 0
+    }
+
+    def recoverNextMessages(maxReturned: Int, listener: MessageRecoveryListener): Unit = {
+      cursorPosition = db.cursorMessages(key, LimitingRecoveryListener(maxReturned, listener), cursorPosition)
+    }
+
+    override def setBatch(id: MessageId): Unit = {
+      cursorPosition = db.queuePosition(id)
+    }
+
+  }
+
+  case class LimitingRecoveryListener(max: Int, listener: MessageRecoveryListener) extends MessageRecoveryListener {
+    private var recovered: Int = 0
+    def hasSpace = recovered < max && listener.hasSpace
+    def recoverMessage(message: Message) = {
+      recovered += 1;
+      listener.recoverMessage(message)
+    }
+    def recoverMessageReference(ref: MessageId) = {
+      recovered += 1;
+      listener.recoverMessageReference(ref)
+    }
+    def isDuplicate(ref: MessageId) = listener.isDuplicate(ref)
+  }
+  
+
+  //
+  // This gts called when the store is first loading up, it restores
+  // the existing durable subs..
+  def createSubscription(sub:DurableSubscription) = {
+    this.synchronized(topicsById.get(sub.topicKey)) match {
+      case Some(topic) =>
+        topic.synchronized {
+          topic.subscriptions.put((sub.info.getClientId, sub.info.getSubcriptionName), sub)
+        }
+      case None =>
+        // Topic does not exist.. so kill the durable sub..
+        db.removeSubscription(sub)
+    }
+  }
+  
+  
+  def getTopicGCPositions = {
+    import collection.JavaConversions._
+    val topics = this.synchronized {
+      new ArrayList(topicsById.values())
+    }
+    topics.flatMap(_.gcPosition).toSeq
+  }
+
+  class LevelDBTopicMessageStore(dest: ActiveMQDestination, key: Long) extends LevelDBMessageStore(dest, key) with TopicMessageStore {
+    val subscriptions = collection.mutable.HashMap[(String, String), DurableSubscription]()
+    var firstSeq = 0L
+
+    def gcPosition:Option[(Long, Long)] = {
+      var pos = lastSeq.get()
+      subscriptions.synchronized {
+        subscriptions.values.foreach { sub =>
+          if( sub.lastAckPosition < pos ) {
+            pos = sub.lastAckPosition
+          }
+        }
+        if( firstSeq != pos+1) {
+          firstSeq = pos+1
+          Some(key, firstSeq)
+        } else {
+          None
+        }
+      }
+    }
+    
+    def addSubsciption(info: SubscriptionInfo, retroactive: Boolean) = {
+      var sub = db.addSubscription(key, info)
+      subscriptions.synchronized {
+        subscriptions.put((info.getClientId, info.getSubcriptionName), sub)
+      }
+      sub.lastAckPosition = if (retroactive) 0 else lastSeq.get()
+      waitOn(withUow{ uow=>
+        uow.updateAckPosition(sub)
+        uow.countDownFuture
+      })
+    }
+    
+    def getAllSubscriptions: Array[SubscriptionInfo] = subscriptions.synchronized {
+      subscriptions.values.map(_.info).toArray
+    }
+
+    def lookupSubscription(clientId: String, subscriptionName: String): SubscriptionInfo = subscriptions.synchronized {
+      subscriptions.get((clientId, subscriptionName)).map(_.info).getOrElse(null)
+    }
+
+    def deleteSubscription(clientId: String, subscriptionName: String): Unit = {
+      subscriptions.synchronized {
+        subscriptions.remove((clientId, subscriptionName))
+      }.foreach(db.removeSubscription(_))
+    }
+
+    private def lookup(clientId: String, subscriptionName: String): Option[DurableSubscription] = subscriptions.synchronized {
+      subscriptions.get((clientId, subscriptionName))
+    }
+
+    def doUpdateAckPosition(uow: DelayableUOW, sub: DurableSubscription, position: Long) = {
+      sub.lastAckPosition = position
+      uow.updateAckPosition(sub)
+    }
+
+    def acknowledge(context: ConnectionContext, clientId: String, subscriptionName: String, messageId: MessageId, ack: MessageAck): Unit = {
+      lookup(clientId, subscriptionName).foreach { sub =>
+        var position = db.queuePosition(messageId)
+        if(  ack.getTransactionId!=null ) {
+          transaction(ack.getTransactionId).updateAckPosition(this, sub, position)
+          DONE
+        } else {
+          waitOn(withUow{ uow=>
+            doUpdateAckPosition(uow, sub, position)
+            uow.countDownFuture
+          })
+        }
+
+      }
+    }
+    
+    def resetBatching(clientId: String, subscriptionName: String): Unit = {
+      lookup(clientId, subscriptionName).foreach { sub =>
+        sub.cursorPosition = 0
+      }
+    }
+    def recoverSubscription(clientId: String, subscriptionName: String, listener: MessageRecoveryListener): Unit = {
+      lookup(clientId, subscriptionName).foreach { sub =>
+        sub.cursorPosition = db.cursorMessages(key, listener, sub.cursorPosition.max(sub.lastAckPosition+1))
+      }
+    }
+    
+    def recoverNextMessages(clientId: String, subscriptionName: String, maxReturned: Int, listener: MessageRecoveryListener): Unit = {
+      lookup(clientId, subscriptionName).foreach { sub =>
+        sub.cursorPosition = db.cursorMessages(key,  LimitingRecoveryListener(maxReturned, listener), sub.cursorPosition.max(sub.lastAckPosition+1))
+      }
+    }
+    
+    def getMessageCount(clientId: String, subscriptionName: String): Int = {
+      lookup(clientId, subscriptionName) match {
+        case Some(sub) => (lastSeq.get - sub.lastAckPosition).toInt
+        case None => 0
+      }
+    }
+
+  }
+
+  ///////////////////////////////////////////////////////////////////////////
+  // The following methods actually have nothing to do with JMS txs... It's more like
+  // operation batch.. we handle that in the DBManager tho.. 
+  ///////////////////////////////////////////////////////////////////////////
+  def beginTransaction(context: ConnectionContext): Unit = {}
+  def commitTransaction(context: ConnectionContext): Unit = {}
+  def rollbackTransaction(context: ConnectionContext): Unit = {}
+
+  def createClient = new LevelDBClient(this);
+}

Added: activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBStoreViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBStoreViewMBean.java?rev=1389882&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBStoreViewMBean.java (added)
+++ activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBStoreViewMBean.java Tue Sep 25 14:32:28 2012
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.leveldb;
+
+import org.apache.activemq.broker.jmx.MBeanInfo;
+
+import java.io.File;
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface LevelDBStoreViewMBean {
+
+    @MBeanInfo("The directory holding the store index data.")
+    String getIndexDirectory();
+
+    @MBeanInfo("The directory holding the store log data.")
+    String getLogDirectory();
+
+    @MBeanInfo("The size the log files are allowed to grow to.")
+    long getLogSize();
+
+    @MBeanInfo("The implementation of the LevelDB index being used.")
+    String getIndexFactory();
+
+    @MBeanInfo("Are writes synced to disk.")
+    boolean getSync();
+
+    @MBeanInfo("Is data verified against checksums as it's loaded back from disk.")
+    boolean getVerifyChecksums();
+
+    @MBeanInfo("The maximum number of open files the index will open at one time.")
+    int getIndexMaxOpenFiles();
+
+    @MBeanInfo("Number of keys between restart points for delta encoding of keys in the index")
+    int getIndexBlockRestartInterval();
+
+    @MBeanInfo("Do aggressive checking of store data")
+    boolean getParanoidChecks();
+
+    @MBeanInfo("Amount of data to build up in memory for the index before converting to a sorted on-disk file.")
+    int getIndexWriteBufferSize();
+
+    @MBeanInfo("Approximate size of user data packed per block for the index")
+    int getIndexBlockSize();
+
+    @MBeanInfo("The type of compression to use for the index")
+    String getIndexCompression();
+
+    @MBeanInfo("The size of the cache index")
+    long getIndexCacheSize();
+
+    @MBeanInfo("The maximum amount of async writes to buffer up")
+    int getAsyncBufferSize();
+
+    @MBeanInfo("The number of units of work which have been closed.")
+    long getUowClosedCounter();
+    @MBeanInfo("The number of units of work which have been canceled.")
+    long getUowCanceledCounter();
+    @MBeanInfo("The number of units of work which started getting stored.")
+    long getUowStoringCounter();
+    @MBeanInfo("The number of units of work which completed getting stored")
+    long getUowStoredCounter();
+
+    @MBeanInfo("Gets and resets the maximum time (in ms) a unit of work took to complete.")
+    double resetUowMaxCompleteLatency();
+    @MBeanInfo("Gets and resets the maximum time (in ms) an index write batch took to execute.")
+    double resetMaxIndexWriteLatency();
+    @MBeanInfo("Gets and resets the maximum time (in ms) a log write took to execute (includes the index write latency).")
+    double resetMaxLogWriteLatency();
+    @MBeanInfo("Gets and resets the maximum time (in ms) a log flush took to execute.")
+    double resetMaxLogFlushLatency();
+    @MBeanInfo("Gets and resets the maximum time (in ms) a log rotation took to perform.")
+    double resetMaxLogRotateLatency();
+
+    @MBeanInfo("Gets the maximum time (in ms) a unit of work took to complete.")
+    double getUowMaxCompleteLatency();
+    @MBeanInfo("Gets the maximum time (in ms) an index write batch took to execute.")
+    double getMaxIndexWriteLatency();
+    @MBeanInfo("Gets the maximum time (in ms) a log write took to execute (includes the index write latency).")
+    double getMaxLogWriteLatency();
+    @MBeanInfo("Gets the maximum time (in ms) a log flush took to execute.")
+    double getMaxLogFlushLatency();
+    @MBeanInfo("Gets the maximum time (in ms) a log rotation took to perform.")
+    double getMaxLogRotateLatency();
+
+    @MBeanInfo("Gets the index statistics.")
+    String getIndexStats();
+}

Added: activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala?rev=1389882&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala (added)
+++ activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala Tue Sep 25 14:32:28 2012
@@ -0,0 +1,518 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.leveldb
+
+import java.{lang=>jl}
+import java.{util=>ju}
+
+import java.util.zip.CRC32
+import java.util.Map.Entry
+import java.util.concurrent.atomic.AtomicLong
+import java.io._
+import org.fusesource.hawtbuf.{DataByteArrayInputStream, DataByteArrayOutputStream, Buffer}
+import org.fusesource.hawtdispatch.BaseRetained
+import org.apache.activemq.leveldb.util.FileSupport._
+import org.apache.activemq.util.LRUCache
+import util.TimeMetric._
+import util.{TimeMetric, Log}
+import java.util.TreeMap
+
+object RecordLog extends Log {
+
+  // The log files contain a sequence of variable length log records:
+  // record := header + data
+  //
+  // header :=
+  //   '*'      : int8       // Start of Record Magic
+  //   kind     : int8       // Help identify content type of the data.
+  //   checksum : uint32     // crc32c of the data[]
+  //   length   : uint32     // the length the the data
+
+  val LOG_HEADER_PREFIX = '*'.toByte
+  val UOW_END_RECORD = -1.toByte
+
+  val LOG_HEADER_SIZE = 10
+
+  val BUFFER_SIZE = 1024*512
+  val BYPASS_BUFFER_SIZE = 1024*16
+
+  case class LogInfo(file:File, position:Long, length:Long) {
+    def limit = position+length
+  }
+
+  def encode_long(a1:Long) = {
+    val out = new DataByteArrayOutputStream(8)
+    out.writeLong(a1)
+    out.toBuffer
+  }
+
+  def decode_long(value:Buffer):Long = {
+    val in = new DataByteArrayInputStream(value)
+    in.readLong()
+  }
+
+}
+
+case class RecordLog(directory: File, logSuffix:String) {
+  import RecordLog._
+
+  directory.mkdirs()
+
+  var logSize = 1024 * 1024 * 100L
+  var current_appender:LogAppender = _
+  var verify_checksums = false
+  var sync = false
+
+  val log_infos = new TreeMap[Long, LogInfo]()
+
+  object log_mutex
+
+  def delete(id:Long) = {
+    log_mutex.synchronized {
+      // We can't delete the current appender.
+      if( current_appender.position != id ) {
+        Option(log_infos.get(id)).foreach { info =>
+          onDelete(info.file)
+          log_infos.remove(id)
+        }
+      }
+    }
+  }
+
+  protected def onDelete(file:File) = {
+    file.delete()
+  }
+
+  def checksum(data: Buffer): Int = {
+    val checksum = new CRC32
+    checksum.update(data.data, data.offset, data.length)
+    (checksum.getValue & 0xFFFFFFFF).toInt
+  }
+
+  class LogAppender(file:File, position:Long) extends LogReader(file, position) {
+
+    val info = new LogInfo(file, position, 0)
+
+    override def open = new RandomAccessFile(file, "rw")
+
+    override def dispose() = {
+      force
+      super.dispose()
+    }
+
+    var append_offset = 0L
+    val flushed_offset = new AtomicLong(0)
+
+    def append_position = {
+      position+append_offset
+    }
+
+    // set the file size ahead of time so that we don't have to sync the file
+    // meta-data on every log sync.
+    channel.position(logSize-1)
+    channel.write(new Buffer(1).toByteBuffer)
+    channel.force(true)
+    if( sync ) {
+      channel.position(0)
+    }
+
+    val write_buffer = new DataByteArrayOutputStream(BUFFER_SIZE+LOG_HEADER_SIZE)
+
+    def force = {
+      flush
+      if(sync) {
+        max_log_flush_latency {
+          // only need to update the file metadata if the file size changes..
+          channel.force(append_offset > logSize)
+        }
+      }
+    }
+
+    /**
+     * returns the offset position of the data record.
+     */
+    def append(id:Byte, data: Buffer) = this.synchronized {
+      val record_position = append_position
+      val data_length = data.length
+      val total_length = LOG_HEADER_SIZE + data_length
+
+      if( write_buffer.position() + total_length > BUFFER_SIZE ) {
+        flush
+      }
+
+      val cs: Int = checksum(data)
+//      trace("Writing at: "+record_position+" len: "+data_length+" with checksum: "+cs)
+
+      if( false && total_length > BYPASS_BUFFER_SIZE ) {
+
+        // Write the header and flush..
+        write_buffer.writeByte(LOG_HEADER_PREFIX)
+        write_buffer.writeByte(id)
+        write_buffer.writeInt(cs)
+        write_buffer.writeInt(data_length)
+
+        append_offset += LOG_HEADER_SIZE
+        flush
+
+        // Directly write the data to the channel since it's large.
+        val buffer = data.toByteBuffer
+        val pos = append_offset+LOG_HEADER_SIZE
+        val remaining = buffer.remaining
+        channel.write(buffer, pos)
+        flushed_offset.addAndGet(remaining)
+        if( buffer.hasRemaining ) {
+          throw new IOException("Short write")
+        }
+        append_offset += data_length
+
+      } else {
+        write_buffer.writeByte(LOG_HEADER_PREFIX)
+        write_buffer.writeByte(id)
+        write_buffer.writeInt(cs)
+        write_buffer.writeInt(data_length)
+        write_buffer.write(data.data, data.offset, data_length)
+        append_offset += total_length
+      }
+      (record_position, info)
+    }
+
+    def flush = max_log_flush_latency { this.synchronized {
+      if( write_buffer.position() > 0 ) {
+        val buffer = write_buffer.toBuffer.toByteBuffer
+        val remaining = buffer.remaining
+        val pos = append_offset-remaining
+        channel.write(buffer, pos)
+        flushed_offset.addAndGet(remaining)
+        if( buffer.hasRemaining ) {
+          throw new IOException("Short write")
+        }
+        write_buffer.reset()
+      } }
+    }
+
+    override def check_read_flush(end_offset:Long) = {
+      if( flushed_offset.get() < end_offset )  {
+        flush
+      }
+    }
+
+  }
+
+  case class LogReader(file:File, position:Long) extends BaseRetained {
+
+    def open = new RandomAccessFile(file, "r")
+
+    val fd = open
+    val channel = fd.getChannel
+
+    override def dispose() {
+      fd.close()
+    }
+
+    def check_read_flush(end_offset:Long) = {}
+
+    def read(record_position:Long, length:Int) = {
+      val offset = record_position-position
+      assert(offset >=0 )
+
+      check_read_flush(offset+LOG_HEADER_SIZE+length)
+
+      if(verify_checksums) {
+
+        val record = new Buffer(LOG_HEADER_SIZE+length)
+
+        def record_is_not_changing = {
+          using(open) { fd =>
+            val channel = fd.getChannel
+            val new_record = new Buffer(LOG_HEADER_SIZE+length)
+            channel.read(new_record.toByteBuffer, offset)
+            var same = record == new_record
+            println(same)
+            same
+          }
+        }
+
+        if( channel.read(record.toByteBuffer, offset) != record.length ) {
+          assert( record_is_not_changing )
+          throw new IOException("short record at position: "+record_position+" in file: "+file+", offset: "+offset)
+        }
+
+        val is = new DataByteArrayInputStream(record)
+        val prefix = is.readByte()
+        if( prefix != LOG_HEADER_PREFIX ) {
+          assert(record_is_not_changing)
+          throw new IOException("invalid record at position: "+record_position+" in file: "+file+", offset: "+offset)
+        }
+
+        val id = is.readByte()
+        val expectedChecksum = is.readInt()
+        val expectedLength = is.readInt()
+        val data = is.readBuffer(length)
+
+        // If your reading the whole record we can verify the data checksum
+        if( expectedLength == length ) {
+          if( expectedChecksum != checksum(data) ) {
+            assert(record_is_not_changing)
+            throw new IOException("checksum does not match at position: "+record_position+" in file: "+file+", offset: "+offset)
+          }
+        }
+
+        data
+      } else {
+        val data = new Buffer(length)
+        if( channel.read(data.toByteBuffer, offset+LOG_HEADER_SIZE) != data.length ) {
+          throw new IOException("short record at position: "+record_position+" in file: "+file+", offset: "+offset)
+        }
+        data
+      }
+    }
+
+    def read(record_position:Long) = {
+      val offset = record_position-position
+      val header = new Buffer(LOG_HEADER_SIZE)
+      channel.read(header.toByteBuffer, offset)
+      val is = header.bigEndianEditor();
+      val prefix = is.readByte()
+      if( prefix != LOG_HEADER_PREFIX ) {
+        // Does not look like a record.
+        throw new IOException("invalid record position")
+      }
+      val id = is.readByte()
+      val expectedChecksum = is.readInt()
+      val length = is.readInt()
+      val data = new Buffer(length)
+
+      if( channel.read(data.toByteBuffer, offset+LOG_HEADER_SIZE) != length ) {
+        throw new IOException("short record")
+      }
+
+      if(verify_checksums) {
+        if( expectedChecksum != checksum(data) ) {
+          throw new IOException("checksum does not match")
+        }
+      }
+      (id, data, record_position+LOG_HEADER_SIZE+length)
+    }
+
+    def check(record_position:Long):Option[(Long, Option[Long])] = {
+      var offset = record_position-position
+      val header = new Buffer(LOG_HEADER_SIZE)
+      channel.read(header.toByteBuffer, offset)
+      val is = header.bigEndianEditor();
+      val prefix = is.readByte()
+      if( prefix != LOG_HEADER_PREFIX ) {
+        return None // Does not look like a record.
+      }
+      val kind = is.readByte()
+      val expectedChecksum = is.readInt()
+      val length = is.readInt()
+
+      val chunk = new Buffer(1024*4)
+      val chunkbb = chunk.toByteBuffer
+      offset += LOG_HEADER_SIZE
+
+      // Read the data in in chunks to avoid
+      // OOME if we are checking an invalid record
+      // with a bad record length
+      val checksumer = new CRC32
+      var remaining = length
+      while( remaining > 0 ) {
+        val chunkSize = remaining.min(1024*4);
+        chunkbb.position(0)
+        chunkbb.limit(chunkSize)
+        channel.read(chunkbb, offset)
+        if( chunkbb.hasRemaining ) {
+          return None
+        }
+        checksumer.update(chunk.data, 0, chunkSize)
+        offset += chunkSize
+        remaining -= chunkSize
+      }
+
+      val checksum = ( checksumer.getValue & 0xFFFFFFFF).toInt
+      if( expectedChecksum !=  checksum ) {
+        return None
+      }
+      val uow_start_pos = if(kind == UOW_END_RECORD && length==8) Some(decode_long(chunk)) else None
+      return Some(record_position+LOG_HEADER_SIZE+length, uow_start_pos)
+    }
+
+    def verifyAndGetEndPosition:Long = {
+      var pos = position;
+      var current_uow_start = pos
+      val limit = position+channel.size()
+      while(pos < limit) {
+        check(pos) match {
+          case Some((next, uow_start_pos)) =>
+            uow_start_pos.foreach { uow_start_pos =>
+              if( uow_start_pos == current_uow_start ) {
+                current_uow_start = next
+              } else {
+                return current_uow_start
+              }
+            }
+            pos = next
+          case None =>
+            return current_uow_start
+        }
+      }
+      return current_uow_start
+    }
+  }
+
+  def create_log_appender(position: Long) = {
+    new LogAppender(next_log(position), position)
+  }
+
+  def create_appender(position: Long): Any = {
+    log_mutex.synchronized {
+      if(current_appender!=null) {
+        log_infos.put (position, new LogInfo(current_appender.file, current_appender.position, current_appender.append_offset))
+      }
+      current_appender = create_log_appender(position)
+      log_infos.put(position, new LogInfo(current_appender.file, position, 0))
+    }
+  }
+
+  val max_log_write_latency = TimeMetric()
+  val max_log_flush_latency = TimeMetric()
+  val max_log_rotate_latency = TimeMetric()
+
+  def open = {
+    log_mutex.synchronized {
+      log_infos.clear()
+      LevelDBClient.find_sequence_files(directory, logSuffix).foreach { case (position,file) =>
+        log_infos.put(position, LogInfo(file, position, file.length()))
+      }
+
+      val appendPos = if( log_infos.isEmpty ) {
+        0L
+      } else {
+        val file = log_infos.lastEntry().getValue
+        val r = LogReader(file.file, file.position)
+        try {
+          val actualLength = r.verifyAndGetEndPosition
+          val updated = file.copy(length = actualLength - file.position)
+          log_infos.put(updated.position, updated)
+          if( updated.file.length != file.length ) {
+            // we need to truncate.
+            using(new RandomAccessFile(file.file, "rw")) ( _.setLength(updated.length))
+          }
+          actualLength
+        } finally {
+          r.release()
+        }
+      }
+
+      create_appender(appendPos)
+    }
+  }
+
+  def close = {
+    log_mutex.synchronized {
+      current_appender.release
+    }
+  }
+
+  def appender_limit = current_appender.append_position
+  def appender_start = current_appender.position
+
+  def next_log(position:Long) = LevelDBClient.create_sequence_file(directory, position, logSuffix)
+
+  def appender[T](func: (LogAppender)=>T):T= {
+    val intial_position = current_appender.append_position
+    try {
+      max_log_write_latency {
+        val rc = func(current_appender)
+        if( current_appender.append_position != intial_position ) {
+          // Record a UOW_END_RECORD so that on recovery we only replay full units of work.
+          current_appender.append(UOW_END_RECORD,encode_long(intial_position))
+        }
+        rc
+      }
+    } finally {
+      current_appender.flush
+      max_log_rotate_latency {
+        log_mutex.synchronized {
+          if ( current_appender.append_offset >= logSize ) {
+            current_appender.release()
+            on_log_rotate()
+            create_appender(current_appender.append_position)
+          }
+        }
+      }
+    }
+  }
+
+  var on_log_rotate: ()=>Unit = ()=>{}
+
+  private val reader_cache = new LRUCache[File, LogReader](100) {
+    protected override def onCacheEviction(entry: Entry[File, LogReader]) = {
+      entry.getValue.release()
+    }
+  }
+
+  def log_info(pos:Long) = log_mutex.synchronized { Option(log_infos.floorEntry(pos)).map(_.getValue) }
+
+  private def get_reader[T](record_position:Long)(func: (LogReader)=>T) = {
+
+    val lookup = log_mutex.synchronized {
+      val info = log_info(record_position)
+      info.map { info=>
+        if(info.position == current_appender.position) {
+          current_appender.retain()
+          (info, current_appender)
+        } else {
+          (info, null)
+        }
+      }
+    }
+
+    lookup.map { case (info, appender) =>
+      val reader = if( appender!=null ) {
+        // read from the current appender.
+        appender
+      } else {
+        // Checkout a reader from the cache...
+        reader_cache.synchronized {
+          var reader = reader_cache.get(info.file)
+          if(reader==null) {
+            reader = LogReader(info.file, info.position)
+            reader_cache.put(info.file, reader)
+          }
+          reader.retain()
+          reader
+        }
+      }
+
+      try {
+        func(reader)
+      } finally {
+        reader.release
+      }
+    }
+  }
+
+  def read(pos:Long) = {
+    get_reader(pos)(_.read(pos))
+  }
+  def read(pos:Long, length:Int) = {
+    get_reader(pos)(_.read(pos, length))
+  }
+
+}