You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/09/25 16:32:31 UTC
svn commit: r1389882 [6/7] - in /activemq/trunk: ./ activemq-core/
activemq-core/src/main/java/org/apache/activemq/store/leveldb/
activemq-core/src/main/resources/ activemq-core/src/main/resources/META-INF/
activemq-core/src/main/resources/META-INF/ser...
Added: activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala?rev=1389882&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala (added)
+++ activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala Tue Sep 25 14:32:28 2012
@@ -0,0 +1,1218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.leveldb
+
+import java.{lang=>jl}
+import java.{util=>ju}
+
+import java.util.concurrent.locks.ReentrantReadWriteLock
+import collection.immutable.TreeMap
+import collection.mutable.{HashMap, ListBuffer}
+import org.iq80.leveldb._
+
+import org.fusesource.hawtdispatch._
+import record.{CollectionKey, EntryKey, EntryRecord, CollectionRecord}
+import util._
+import java.util.concurrent._
+import org.fusesource.hawtbuf._
+import java.io.{ObjectInputStream, ObjectOutputStream, File}
+import scala.Option._
+import org.apache.activemq.command.Message
+import org.apache.activemq.util.ByteSequence
+import org.apache.activemq.leveldb.RecordLog.LogInfo
+import java.text.SimpleDateFormat
+import java.util.{Date, Collections}
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object LevelDBClient extends Log {
+
+ final val STORE_SCHEMA_PREFIX = "activemq_leveldb_store:"
+ final val STORE_SCHEMA_VERSION = 1
+
+ final val THREAD_POOL_STACK_SIZE = System.getProperty("leveldb.thread.stack.size", "" + 1024 * 512).toLong
+ final val THREAD_POOL: ThreadPoolExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue[Runnable], new ThreadFactory {
+ def newThread(r: Runnable): Thread = {
+ var rc: Thread = new Thread(null, r, "LevelDB Store Task", THREAD_POOL_STACK_SIZE)
+ rc.setDaemon(true)
+ return rc
+ }
+ }) {
+ override def shutdown: Unit = {}
+ override def shutdownNow = Collections.emptyList[Runnable]
+ }
+
+ final val DIRTY_INDEX_KEY = bytes(":dirty")
+ final val LOG_REF_INDEX_KEY = bytes(":log-refs")
+ final val COLLECTION_META_KEY = bytes(":collection-meta")
+ final val TRUE = bytes("true")
+ final val FALSE = bytes("false")
+ final val ACK_POSITION = new AsciiBuffer("p")
+
+ final val COLLECTION_PREFIX = 'c'.toByte
+ final val COLLECTION_PREFIX_ARRAY = Array(COLLECTION_PREFIX)
+ final val ENTRY_PREFIX = 'e'.toByte
+ final val ENTRY_PREFIX_ARRAY = Array(ENTRY_PREFIX)
+
+ final val LOG_ADD_COLLECTION = 1.toByte
+ final val LOG_REMOVE_COLLECTION = 2.toByte
+ final val LOG_ADD_ENTRY = 3.toByte
+ final val LOG_REMOVE_ENTRY = 4.toByte
+ final val LOG_DATA = 5.toByte
+ final val LOG_TRACE = 6.toByte
+
+ final val LOG_SUFFIX = ".log"
+ final val INDEX_SUFFIX = ".index"
+
+ implicit def toByteArray(buffer:Buffer) = buffer.toByteArray
+ implicit def toBuffer(buffer:Array[Byte]) = new Buffer(buffer)
+
+ def encodeCollectionRecord(v: CollectionRecord.Buffer) = v.toUnframedByteArray
+ def decodeCollectionRecord(data: Buffer):CollectionRecord.Buffer = CollectionRecord.FACTORY.parseUnframed(data)
+ def encodeCollectionKeyRecord(v: CollectionKey.Buffer) = v.toUnframedByteArray
+ def decodeCollectionKeyRecord(data: Buffer):CollectionKey.Buffer = CollectionKey.FACTORY.parseUnframed(data)
+
+ def encodeEntryRecord(v: EntryRecord.Buffer) = v.toUnframedBuffer
+ def decodeEntryRecord(data: Buffer):EntryRecord.Buffer = EntryRecord.FACTORY.parseUnframed(data)
+
+ def encodeEntryKeyRecord(v: EntryKey.Buffer) = v.toUnframedByteArray
+ def decodeEntryKeyRecord(data: Buffer):EntryKey.Buffer = EntryKey.FACTORY.parseUnframed(data)
+
+ def encodeLocator(pos:Long, len:Int):Array[Byte] = {
+ val out = new DataByteArrayOutputStream(
+ AbstractVarIntSupport.computeVarLongSize(pos)+
+ AbstractVarIntSupport.computeVarIntSize(len)
+ )
+ out.writeVarLong(pos)
+ out.writeVarInt(len)
+ out.getData
+ }
+ def decodeLocator(bytes:Buffer):(Long, Int) = {
+ val in = new DataByteArrayInputStream(bytes)
+ (in.readVarLong(), in.readVarInt())
+ }
+ def decodeLocator(bytes:Array[Byte]):(Long, Int) = {
+ val in = new DataByteArrayInputStream(bytes)
+ (in.readVarLong(), in.readVarInt())
+ }
+
+ def encodeLong(a1:Long) = {
+ val out = new DataByteArrayOutputStream(8)
+ out.writeLong(a1)
+ out.toBuffer
+ }
+
+ def encodeVLong(a1:Long):Array[Byte] = {
+ val out = new DataByteArrayOutputStream(
+ AbstractVarIntSupport.computeVarLongSize(a1)
+ )
+ out.writeVarLong(a1)
+ out.getData
+ }
+
+ def decodeVLong(bytes:Array[Byte]):Long = {
+ val in = new DataByteArrayInputStream(bytes)
+ in.readVarLong()
+ }
+
+ def encodeLongKey(a1:Byte, a2:Long):Array[Byte] = {
+ val out = new DataByteArrayOutputStream(9)
+ out.writeByte(a1.toInt)
+ out.writeLong(a2)
+ out.getData
+ }
+ def decodeLongKey(bytes:Array[Byte]):(Byte, Long) = {
+ val in = new DataByteArrayInputStream(bytes)
+ (in.readByte(), in.readLong())
+ }
+
+ def decodeLong(bytes:Buffer):Long = {
+ val in = new DataByteArrayInputStream(bytes)
+ in.readLong()
+ }
+ def decodeLong(bytes:Array[Byte]):Long = {
+ val in = new DataByteArrayInputStream(bytes)
+ in.readLong()
+ }
+
+ def encodeEntryKey(a1:Byte, a2:Long, a3:Long):Array[Byte] = {
+ val out = new DataByteArrayOutputStream(17)
+ out.writeByte(a1.toInt)
+ out.writeLong(a2)
+ out.writeLong(a3)
+ out.getData
+ }
+
+ def encodeEntryKey(a1:Byte, a2:Long, a3:Buffer):Array[Byte] = {
+ val out = new DataByteArrayOutputStream(9+a3.length)
+ out.writeByte(a1.toInt)
+ out.writeLong(a2)
+ out.write(a3)
+ out.getData
+ }
+
+ def decodeEntryKey(bytes:Array[Byte]):(Byte, Long, Buffer) = {
+ val in = new DataByteArrayInputStream(bytes)
+ (in.readByte(), in.readLong(), in.readBuffer(in.available()))
+ }
+
+ final class RichDB(val db: DB) {
+
+ val isPureJavaVersion = db.getClass.getName == "org.iq80.leveldb.impl.DbImpl"
+
+ def getProperty(name:String) = db.getProperty(name)
+
+ def getApproximateSizes(ranges:Range*) = db.getApproximateSizes(ranges:_*)
+
+ def get(key:Array[Byte], ro:ReadOptions=new ReadOptions):Option[Array[Byte]] = {
+ Option(db.get(key, ro))
+ }
+
+ def close:Unit = db.close()
+
+ def delete(key:Array[Byte], wo:WriteOptions=new WriteOptions):Unit = {
+ db.delete(key, wo)
+ }
+
+ def put(key:Array[Byte], value:Array[Byte], wo:WriteOptions=new WriteOptions):Unit = {
+ db.put(key, value, wo)
+ }
+
+ def write[T](wo:WriteOptions=new WriteOptions, max_write_latency:TimeMetric = TimeMetric())(func: WriteBatch=>T):T = {
+ val updates = db.createWriteBatch()
+ try {
+ val rc=Some(func(updates))
+ max_write_latency {
+ db.write(updates, wo)
+ }
+ return rc.get
+ } finally {
+ updates.close();
+ }
+ }
+
+ def store[T](write:WriteBatch, wo:WriteOptions=new WriteOptions) = {
+ db.write(write, wo)
+ }
+
+ def snapshot[T](func: Snapshot=>T):T = {
+ val snapshot = db.getSnapshot
+ try {
+ func(snapshot)
+ } finally {
+ snapshot.close()
+ }
+ }
+
+ def cursorKeys(ro:ReadOptions=new ReadOptions)(func: Array[Byte] => Boolean): Unit = {
+ val iterator = db.iterator(ro)
+ iterator.seekToFirst();
+ try {
+ while( iterator.hasNext && func(iterator.peekNext.getKey) ) {
+ iterator.next()
+ }
+ } finally {
+ iterator.close();
+ }
+ }
+
+ def cursorKeysPrefixed(prefix:Array[Byte], ro:ReadOptions=new ReadOptions)(func: Array[Byte] => Boolean): Unit = {
+ val iterator = db.iterator(ro)
+ iterator.seek(prefix);
+ try {
+ def check(key:Buffer) = {
+ key.startsWith(prefix) && func(key)
+ }
+ while( iterator.hasNext && check(iterator.peekNext.getKey) ) {
+ iterator.next()
+ }
+ } finally {
+ iterator.close();
+ }
+ }
+
+ def cursorPrefixed(prefix:Array[Byte], ro:ReadOptions=new ReadOptions)(func: (Array[Byte],Array[Byte]) => Boolean): Unit = {
+ val iterator = db.iterator(ro)
+ iterator.seek(prefix);
+ try {
+ def check(key:Buffer) = {
+ key.startsWith(prefix) && func(key, iterator.peekNext.getValue)
+ }
+ while( iterator.hasNext && check(iterator.peekNext.getKey) ) {
+ iterator.next()
+ }
+ } finally {
+ iterator.close();
+ }
+ }
+
+ def compare(a1:Array[Byte], a2:Array[Byte]):Int = {
+ new Buffer(a1).compareTo(new Buffer(a2))
+ }
+
+ def cursorRangeKeys(startIncluded:Array[Byte], endExcluded:Array[Byte], ro:ReadOptions=new ReadOptions)(func: Array[Byte] => Boolean): Unit = {
+ val iterator = db.iterator(ro)
+ iterator.seek(startIncluded);
+ try {
+ def check(key:Array[Byte]) = {
+ if ( compare(key,endExcluded) < 0) {
+ func(key)
+ } else {
+ false
+ }
+ }
+ while( iterator.hasNext && check(iterator.peekNext.getKey) ) {
+ iterator.next()
+ }
+ } finally {
+ iterator.close();
+ }
+ }
+
+ def cursorRange(startIncluded:Array[Byte], endExcluded:Array[Byte], ro:ReadOptions=new ReadOptions)(func: (Array[Byte],Array[Byte]) => Boolean): Unit = {
+ val iterator = db.iterator(ro)
+ iterator.seek(startIncluded);
+ try {
+ def check(key:Array[Byte]) = {
+ (compare(key,endExcluded) < 0) && func(key, iterator.peekNext.getValue)
+ }
+ while( iterator.hasNext && check(iterator.peekNext.getKey) ) {
+ iterator.next()
+ }
+ } finally {
+ iterator.close();
+ }
+ }
+
+ def lastKey(prefix:Array[Byte], ro:ReadOptions=new ReadOptions): Option[Array[Byte]] = {
+ val last = new Buffer(prefix).deepCopy().data
+ if ( last.length > 0 ) {
+ val pos = last.length-1
+ last(pos) = (last(pos)+1).toByte
+ }
+
+ if(isPureJavaVersion) {
+ // The pure java version of LevelDB does not support backward iteration.
+ var rc:Option[Array[Byte]] = None
+ cursorRangeKeys(prefix, last) { key=>
+ rc = Some(key)
+ true
+ }
+ rc
+ } else {
+ val iterator = db.iterator(ro)
+ try {
+
+ iterator.seek(last);
+ if ( iterator.hasPrev ) {
+ iterator.prev()
+ } else {
+ iterator.seekToLast()
+ }
+
+ if ( iterator.hasNext ) {
+ val key:Buffer = iterator.peekNext.getKey
+ if(key.startsWith(prefix)) {
+ Some(key)
+ } else {
+ None
+ }
+ } else {
+ None
+ }
+ } finally {
+ iterator.close();
+ }
+ }
+ }
+ }
+
+
+ def bytes(value:String) = value.getBytes("UTF-8")
+
+ import FileSupport._
+ def create_sequence_file(directory:File, id:Long, suffix:String) = directory / ("%016x%s".format(id, suffix))
+
+ def find_sequence_files(directory:File, suffix:String):TreeMap[Long, File] = {
+ TreeMap((directory.listFiles.flatMap { f=>
+ if( f.getName.endsWith(suffix) ) {
+ try {
+ val base = f.getName.stripSuffix(suffix)
+ val position = java.lang.Long.parseLong(base, 16);
+ Some(position -> f)
+ } catch {
+ case e:NumberFormatException => None
+ }
+ } else {
+ None
+ }
+ }): _* )
+ }
+
+ class CollectionMeta extends Serializable {
+ var size = 0L
+ var last_key:Array[Byte] = _
+ }
+}
+
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class LevelDBClient(store: LevelDBStore) {
+
+ import LevelDBClient._
+ import FileSupport._
+
+ val dispatchQueue = createQueue("leveldb")
+
+ /////////////////////////////////////////////////////////////////////
+ //
+ // Helpers
+ //
+ /////////////////////////////////////////////////////////////////////
+
+ def directory = store.directory
+ def logDirectory = Option(store.logDirectory).getOrElse(store.directory)
+
+ /////////////////////////////////////////////////////////////////////
+ //
+ // Public interface used by the DBManager
+ //
+ /////////////////////////////////////////////////////////////////////
+
+ def sync = store.sync;
+ def verifyChecksums = store.verifyChecksums
+
+ var log:RecordLog = _
+
+ var index:RichDB = _
+ var indexOptions:Options = _
+
+ var lastIndexSnapshotPos:Long = _
+ val snapshotRwLock = new ReentrantReadWriteLock(true)
+
+ var factory:DBFactory = _
+ val logRefs = HashMap[Long, LongCounter]()
+
+ val collectionMeta = HashMap[Long, CollectionMeta]()
+
+ def dirtyIndexFile = directory / ("dirty"+INDEX_SUFFIX)
+ def tempIndexFile = directory / ("temp"+INDEX_SUFFIX)
+ def snapshotIndexFile(id:Long) = create_sequence_file(directory,id, INDEX_SUFFIX)
+
+ def createLog: RecordLog = {
+ new RecordLog(logDirectory, LOG_SUFFIX)
+ }
+
+ var writeExecutor:ExecutorService = _
+
+ def storeTrace(ascii:String, force:Boolean=false) = {
+ val time = new SimpleDateFormat("dd/MMM/yyyy:HH:mm::ss Z").format(new Date)
+ log.appender { appender =>
+ appender.append(LOG_TRACE, new AsciiBuffer("%s: %s".format(time, ascii)))
+ if( force ) {
+ appender.force
+ }
+ }
+ }
+
+ def retry[T](func : =>T):T = RetrySupport.retry(LevelDBClient, store.isStarted, func _)
+
+ def start() = {
+
+ // Lets check store compatibility...
+ directory.mkdirs()
+ val version_file = directory / "store-version.txt"
+ if (version_file.exists()) {
+ val ver = try {
+ var tmp: String = version_file.readText().trim()
+ if (tmp.startsWith(STORE_SCHEMA_PREFIX)) {
+ tmp.stripPrefix(STORE_SCHEMA_PREFIX).toInt
+ } else {
+ -1
+ }
+ } catch {
+ case e => throw new Exception("Unexpected version file format: " + version_file)
+ }
+ ver match {
+ case STORE_SCHEMA_VERSION => // All is good.
+ case _ => throw new Exception("Cannot open the store. It's schema version is not supported.")
+ }
+ }
+ version_file.writeText(STORE_SCHEMA_PREFIX + STORE_SCHEMA_VERSION)
+
+ writeExecutor = Executors.newFixedThreadPool(1, new ThreadFactory() {
+ def newThread(r: Runnable) = {
+ val rc = new Thread(r, "LevelDB store io write")
+ rc.setDaemon(true)
+ rc
+ }
+ })
+
+ val factoryNames = store.indexFactory
+ factory = factoryNames.split("""(,|\s)+""").map(_.trim()).flatMap { name=>
+ try {
+ Some(this.getClass.getClassLoader.loadClass(name).newInstance().asInstanceOf[DBFactory])
+ } catch {
+ case e:Throwable =>
+ debug(e, "Could not load factory: "+name+" due to: "+e)
+ None
+ }
+ }.headOption.getOrElse(throw new Exception("Could not load any of the index factory classes: "+factoryNames))
+
+ if( factory.getClass.getName == "org.iq80.leveldb.impl.Iq80DBFactory") {
+ warn("Using the pure java LevelDB implementation which is still experimental. Production users should use the JNI based LevelDB implementation instead.")
+ }
+
+ indexOptions = new Options();
+ indexOptions.createIfMissing(true);
+
+ indexOptions.maxOpenFiles(store.indexMaxOpenFiles)
+ indexOptions.blockRestartInterval(store.indexBlockRestartInterval)
+ indexOptions.paranoidChecks(store.paranoidChecks)
+ indexOptions.writeBufferSize(store.indexWriteBufferSize)
+ indexOptions.blockSize(store.indexBlockSize)
+ indexOptions.compressionType( store.indexCompression.toLowerCase match {
+ case "snappy" => CompressionType.SNAPPY
+ case "none" => CompressionType.NONE
+ case _ => CompressionType.SNAPPY
+ })
+
+ indexOptions.cacheSize(store.indexCacheSize)
+ indexOptions.logger(new Logger() {
+ val LOG = Log(factory.getClass.getName)
+ def log(msg: String) = LOG.debug("index: "+msg.stripSuffix("\n"))
+ })
+
+ log = createLog
+ log.logSize = store.logSize
+ log.on_log_rotate = ()=> {
+ // We snapshot the index every time we rotate the logs.
+ writeExecutor {
+ snapshotIndex(false)
+ }
+ }
+
+ retry {
+ log.open
+ }
+
+ // Find out what was the last snapshot.
+ val snapshots = find_sequence_files(directory, INDEX_SUFFIX)
+ var lastSnapshotIndex = snapshots.lastOption
+ lastIndexSnapshotPos = lastSnapshotIndex.map(_._1).getOrElse(0)
+
+ // Only keep the last snapshot..
+ snapshots.filterNot(_._1 == lastIndexSnapshotPos).foreach( _._2.recursiveDelete )
+ tempIndexFile.recursiveDelete
+
+ retry {
+
+ // Delete the dirty indexes
+ dirtyIndexFile.recursiveDelete
+ dirtyIndexFile.mkdirs()
+
+ lastSnapshotIndex.foreach { case (id, file) =>
+ // Resume log replay from a snapshot of the index..
+ try {
+ file.listFiles.foreach { file =>
+ file.linkTo(dirtyIndexFile / file.getName)
+ }
+ } catch {
+ case e:Exception =>
+ warn(e, "Could not recover snapshot of the index: "+e)
+ lastSnapshotIndex = None
+ }
+ }
+
+ index = new RichDB(factory.open(dirtyIndexFile, indexOptions));
+ try {
+ loadCounters
+ index.put(DIRTY_INDEX_KEY, TRUE)
+ // Update the index /w what was stored on the logs..
+ var pos = lastIndexSnapshotPos;
+ var last_reported_at = System.currentTimeMillis();
+ var showing_progress = false
+ var last_reported_pos = 0L
+ try {
+ while (pos < log.appender_limit) {
+ val now = System.currentTimeMillis();
+ if( now > last_reported_at+1000 ) {
+ val at = pos-lastIndexSnapshotPos
+ val total = log.appender_limit-lastIndexSnapshotPos
+ val rate = (pos-last_reported_pos)*1000.0 / (now - last_reported_at)
+ val eta = (total-at)/rate
+ val remaining = if(eta > 60*60) {
+ "%.2f hrs".format(eta/(60*60))
+ } else if(eta > 60) {
+ "%.2f mins".format(eta/60)
+ } else {
+ "%.0f secs".format(eta)
+ }
+
+ System.out.print("Replaying recovery log: %f%% done (%,d/%,d bytes) @ %,.2f kb/s, %s remaining. \r".format(
+ at*100.0/total, at, total, rate/1024, remaining))
+ showing_progress = true;
+ last_reported_at = now
+ last_reported_pos = pos
+ }
+
+
+ log.read(pos).map {
+ case (kind, data, nextPos) =>
+ kind match {
+ case LOG_ADD_COLLECTION =>
+ val record= decodeCollectionRecord(data)
+ index.put(encodeLongKey(COLLECTION_PREFIX, record.getKey), data)
+ collectionMeta.put(record.getKey, new CollectionMeta)
+
+ case LOG_REMOVE_COLLECTION =>
+ val record = decodeCollectionKeyRecord(data)
+ // Delete the entries in the collection.
+ index.cursorPrefixed(encodeLongKey(ENTRY_PREFIX, record.getKey), new ReadOptions) { (key, value)=>
+ val record = decodeEntryRecord(value)
+ val pos = if ( record.hasValueLocation ) {
+ Some(record.getValueLocation)
+ } else {
+ None
+ }
+ pos.foreach(logRefDecrement(_))
+ index.delete(key)
+ true
+ }
+ index.delete(data)
+ collectionMeta.remove(record.getKey)
+
+ case LOG_ADD_ENTRY =>
+ val record = decodeEntryRecord(data)
+
+ val index_record = new EntryRecord.Bean()
+ index_record.setValueLocation(record.getValueLocation)
+ index_record.setValueLength(record.getValueLength)
+ val index_value = encodeEntryRecord(index_record.freeze()).toByteArray
+
+ index.put(encodeEntryKey(ENTRY_PREFIX, record.getCollectionKey, record.getEntryKey), index_value)
+
+ if ( record.hasValueLocation ) {
+ logRefIncrement(record.getValueLocation)
+ }
+ collectionIncrementSize(record.getCollectionKey, record.getEntryKey.toByteArray)
+
+ case LOG_REMOVE_ENTRY =>
+ val record = decodeEntryRecord(data)
+
+ // Figure out which log file this message reference is pointing at..
+ if ( record.hasValueLocation ) {
+ logRefDecrement(record.getValueLocation)
+ }
+
+ index.delete(encodeEntryKey(ENTRY_PREFIX, record.getCollectionKey, record.getEntryKey))
+ collectionDecrementSize( record.getCollectionKey)
+
+ case _ => // Skip other records, they don't modify the index.
+
+ }
+ pos = nextPos
+ }
+ }
+ }
+ catch {
+ case e:Throwable => e.printStackTrace()
+ }
+ if(showing_progress) {
+ System.out.print(" \r");
+ }
+
+ } catch {
+ case e:Throwable =>
+ // replay failed.. good thing we are in a retry block...
+ index.close
+ throw e;
+ }
+ }
+ }
+
+ private def logRefDecrement(pos: Long) {
+ log.log_info(pos).foreach { logInfo =>
+ logRefs.get(logInfo.position).foreach { counter =>
+ if (counter.decrementAndGet() == 0) {
+ logRefs.remove(logInfo.position)
+ }
+ }
+ }
+ }
+
+ private def logRefIncrement(pos: Long) {
+ log.log_info(pos).foreach { logInfo =>
+ logRefs.getOrElseUpdate(logInfo.position, new LongCounter()).incrementAndGet()
+ }
+ }
+
+ private def collectionDecrementSize(key: Long) {
+ collectionMeta.get(key).foreach(_.size -= 1)
+ }
+ private def collectionIncrementSize(key: Long, last_key:Array[Byte]) {
+ collectionMeta.get(key).foreach{ x=>
+ x.size += 1
+ x.last_key = last_key
+ }
+ }
+
+ private def storeCounters = {
+ def storeMap(key:Array[Byte], map:HashMap[Long, _ <: AnyRef]) {
+ val baos = new ByteArrayOutputStream()
+ val os = new ObjectOutputStream(baos);
+ os.writeInt(map.size);
+ map.foreach {
+ case (k, v) =>
+ os.writeLong(k)
+ os.writeObject(v)
+ }
+ os.close()
+ index.put(key, baos.toByteArray)
+ }
+ storeMap(LOG_REF_INDEX_KEY, logRefs)
+ storeMap(COLLECTION_META_KEY, collectionMeta)
+ }
+
+ private def loadCounters = {
+ def loadMap[T <: AnyRef](key:Array[Byte], map:HashMap[Long, T]) {
+ map.clear()
+ index.get(key, new ReadOptions).foreach { value=>
+ val bais = new ByteArrayInputStream(value)
+ val is = new ObjectInputStream(bais);
+ var remaining = is.readInt()
+ while(remaining > 0 ) {
+ map.put(is.readLong(), is.readObject().asInstanceOf[T])
+ remaining-=1
+ }
+ }
+ }
+ loadMap(LOG_REF_INDEX_KEY, logRefs)
+ loadMap(COLLECTION_META_KEY, collectionMeta)
+ }
+
+ def stop() = {
+ if( writeExecutor!=null ) {
+ writeExecutor.shutdown
+ writeExecutor.awaitTermination(60, TimeUnit.SECONDS)
+ writeExecutor = null
+
+ // this blocks until all io completes..
+ // Suspend also deletes the index.
+ suspend()
+
+ if (log != null) {
+ log.close
+ }
+ copyDirtyIndexToSnapshot
+ log = null
+ }
+ }
+
+ def usingIndex[T](func: =>T):T = {
+ val lock = snapshotRwLock.readLock();
+ lock.lock()
+ try {
+ func
+ } finally {
+ lock.unlock()
+ }
+ }
+
+ def retryUsingIndex[T](func: =>T):T = retry(usingIndex( func ))
+
+ /**
+ * TODO: expose this via management APIs, handy if you want to
+ * do a file system level snapshot and want the data to be consistent.
+ */
+ def suspend() = {
+ // Make sure we are the only ones accessing the index. since
+ // we will be closing it to create a consistent snapshot.
+ snapshotRwLock.writeLock().lock()
+
+ // Close the index so that it's files are not changed async on us.
+ storeCounters
+ index.put(DIRTY_INDEX_KEY, FALSE, new WriteOptions().sync(true))
+ index.close
+ }
+
+ /**
+ * TODO: expose this via management APIs, handy if you want to
+ * do a file system level snapshot and want the data to be consistent.
+ */
+ def resume() = {
+ // re=open it..
+ retry {
+ index = new RichDB(factory.open(dirtyIndexFile, indexOptions));
+ index.put(DIRTY_INDEX_KEY, TRUE)
+ }
+ snapshotRwLock.writeLock().unlock()
+ }
+
+ def copyDirtyIndexToSnapshot {
+ if( log.appender_limit == lastIndexSnapshotPos ) {
+ // no need to snapshot again...
+ return
+ }
+
+ // Where we start copying files into. Delete this on
+ // restart.
+ val tmpDir = tempIndexFile
+ tmpDir.mkdirs()
+
+ try {
+
+ // Hard link all the index files.
+ dirtyIndexFile.listFiles.foreach { file =>
+ file.linkTo(tmpDir / file.getName)
+ }
+
+ // Rename to signal that the snapshot is complete.
+ val newSnapshotIndexPos = log.appender_limit
+ tmpDir.renameTo(snapshotIndexFile(newSnapshotIndexPos))
+ snapshotIndexFile(lastIndexSnapshotPos).recursiveDelete
+ lastIndexSnapshotPos = newSnapshotIndexPos
+
+ } catch {
+ case e: Exception =>
+ // if we could not snapshot for any reason, delete it as we don't
+ // want a partial check point..
+ warn(e, "Could not snapshot the index: " + e)
+ tmpDir.recursiveDelete
+ }
+ }
+
+ def snapshotIndex(sync:Boolean=false):Unit = {
+ suspend()
+ try {
+ if( sync ) {
+ log.current_appender.force
+ }
+ if( log.appender_limit == lastIndexSnapshotPos ) {
+ // no need to snapshot again...
+ return
+ }
+ copyDirtyIndexToSnapshot
+ } finally {
+ resume()
+ }
+ }
+
+ def purge() = {
+ suspend()
+ try{
+ log.close
+ locked_purge
+ } finally {
+ retry {
+ log.open
+ }
+ resume()
+ }
+ }
+
+ def locked_purge {
+ logDirectory.listFiles.foreach {x =>
+ if (x.getName.endsWith(".log")) {
+ x.delete()
+ }
+ }
+ directory.listFiles.foreach {x =>
+ if (x.getName.endsWith(".index")) {
+ x.recursiveDelete
+ }
+ }
+ }
+
+ def addCollection(record: CollectionRecord.Buffer) = {
+ val key = encodeLongKey(COLLECTION_PREFIX, record.getKey)
+ val value = record.toUnframedBuffer
+ retryUsingIndex {
+ log.appender { appender =>
+ appender.append(LOG_ADD_COLLECTION, value)
+ index.put(key, value.toByteArray)
+ }
+ }
+ collectionMeta.put(record.getKey, new CollectionMeta)
+ }
+
+ def getLogAppendPosition = log.appender_limit
+
+ def listCollections: Seq[(Long, CollectionRecord.Buffer)] = {
+ val rc = ListBuffer[(Long, CollectionRecord.Buffer)]()
+ retryUsingIndex {
+ val ro = new ReadOptions
+ ro.verifyChecksums(verifyChecksums)
+ ro.fillCache(false)
+ index.cursorPrefixed(COLLECTION_PREFIX_ARRAY, ro) { (key, value) =>
+ rc.append(( decodeLongKey(key)._2, CollectionRecord.FACTORY.parseUnframed(value) ))
+ true // to continue cursoring.
+ }
+ }
+ rc
+ }
+
+ def removeCollection(collectionKey: Long) = {
+ val key = encodeLongKey(COLLECTION_PREFIX, collectionKey)
+ val value = encodeVLong(collectionKey)
+ val entryKeyPrefix = encodeLongKey(ENTRY_PREFIX, collectionKey)
+ collectionMeta.remove(collectionKey)
+ retryUsingIndex {
+ log.appender { appender =>
+ appender.append(LOG_REMOVE_COLLECTION, new Buffer(value))
+ }
+
+ val ro = new ReadOptions
+ ro.fillCache(false)
+ ro.verifyChecksums(verifyChecksums)
+ index.cursorPrefixed(entryKeyPrefix, ro) { (key, value)=>
+ val record = decodeEntryRecord(value)
+ val pos = if ( record.hasValueLocation ) {
+ Some(record.getValueLocation)
+ } else {
+ None
+ }
+ pos.foreach(logRefDecrement(_))
+ index.delete(key)
+ true
+ }
+ index.delete(key)
+ }
+ }
+
+ def collectionEmpty(collectionKey: Long) = {
+ val key = encodeLongKey(COLLECTION_PREFIX, collectionKey)
+ val value = encodeVLong(collectionKey)
+ val entryKeyPrefix = encodeLongKey(ENTRY_PREFIX, collectionKey)
+
+ val meta = collectionMeta.getOrElseUpdate(collectionKey, new CollectionMeta)
+ meta.size = 0
+ meta.last_key = null
+
+ retryUsingIndex {
+ index.get(key).foreach { collectionData =>
+ log.appender { appender =>
+ appender.append(LOG_REMOVE_COLLECTION, new Buffer(value))
+ appender.append(LOG_ADD_COLLECTION, new Buffer(collectionData))
+ }
+
+ val ro = new ReadOptions
+ ro.fillCache(false)
+ ro.verifyChecksums(verifyChecksums)
+ index.cursorPrefixed(entryKeyPrefix, ro) { (key, value)=>
+ val record = decodeEntryRecord(value)
+ val pos = if ( record.hasValueLocation ) {
+ Some(record.getValueLocation)
+ } else {
+ None
+ }
+ pos.foreach(logRefDecrement(_))
+ index.delete(key)
+ true
+ }
+ }
+ }
+ }
+
+ def queueCursor(collectionKey: Long, seq:Long)(func: (Message)=>Boolean) = {
+ collectionCursor(collectionKey, encodeLong(seq)) { (key, value) =>
+ val seq = decodeLong(key)
+ var locator = (value.getValueLocation, value.getValueLength)
+ val msg = getMessage(locator)
+ msg.getMessageId().setEntryLocator((collectionKey, seq))
+ msg.getMessageId().setDataLocator(locator)
+ func(msg)
+ }
+ }
+
+ def getAckPosition(subKey: Long): Long = {
+ retryUsingIndex {
+ index.get(encodeEntryKey(ENTRY_PREFIX, subKey, ACK_POSITION)).map{ value=>
+ val record = decodeEntryRecord(value)
+ record.getValueLocation()
+ }.getOrElse(0L)
+ }
+ }
+
+ def getMessage(locator:AnyRef):Message = {
+ assert(locator!=null)
+ val buffer = locator match {
+ case x:MessageRecord =>
+ // Encoded form is still in memory..
+ Some(x.data)
+ case (pos:Long, len:Int) =>
+ // Load the encoded form from disk.
+ log.read(pos, len).map(new Buffer(_))
+ }
+
+ // Lets decode
+ buffer.map{ x =>
+ var data = if( store.snappyCompressLogs ) {
+ Snappy.uncompress(x)
+ } else {
+ x
+ }
+ store.wireFormat.unmarshal(new ByteSequence(data.data, data.offset, data.length)).asInstanceOf[Message]
+ }.getOrElse(null)
+ }
+
+
+ def collectionCursor(collectionKey: Long, cursorPosition:Buffer)(func: (Buffer, EntryRecord.Buffer)=>Boolean) = {
+ val ro = new ReadOptions
+ ro.fillCache(true)
+ ro.verifyChecksums(verifyChecksums)
+ val start = encodeEntryKey(ENTRY_PREFIX, collectionKey, cursorPosition)
+ val end = encodeLongKey(ENTRY_PREFIX, collectionKey+1)
+ retryUsingIndex {
+ index.cursorRange(start, end, ro) { case (key, value) =>
+ func(key.buffer.moveHead(9), EntryRecord.FACTORY.parseUnframed(value))
+ }
+ }
+ }
+
+ def collectionSize(collectionKey: Long) = {
+ collectionMeta.get(collectionKey).map(_.size).getOrElse(0L)
+ }
+
+ def collectionIsEmpty(collectionKey: Long) = {
+ val entryKeyPrefix = encodeLongKey(ENTRY_PREFIX, collectionKey)
+ var empty = true
+ retryUsingIndex {
+ val ro = new ReadOptions
+ ro.fillCache(false)
+ ro.verifyChecksums(verifyChecksums)
+ index.cursorKeysPrefixed(entryKeyPrefix, ro) { key =>
+ empty = false
+ false
+ }
+ }
+ empty
+ }
+
+ val max_write_message_latency = TimeMetric()
+ val max_write_enqueue_latency = TimeMetric()
+
+ val max_index_write_latency = TimeMetric()
+
+ def store(uows: Array[DelayableUOW]) {
+ retryUsingIndex {
+ log.appender { appender =>
+
+ var syncNeeded = false
+ index.write(new WriteOptions, max_index_write_latency) { batch =>
+
+ var write_message_total = 0L
+ var write_enqueue_total = 0L
+
+ uows.foreach { uow =>
+
+
+ uow.actions.foreach { case (msg, action) =>
+ val messageRecord = action.messageRecord
+ var log_info:LogInfo = null
+ var pos = -1L
+ var dataLocator:(Long, Int) = null
+
+ if (messageRecord != null && messageRecord.locator==null) {
+ val start = System.nanoTime()
+ val p = appender.append(LOG_DATA, messageRecord.data)
+ pos = p._1
+ log_info = p._2
+ dataLocator = (pos, messageRecord.data.length)
+ messageRecord.locator = dataLocator
+ write_message_total += System.nanoTime() - start
+ }
+
+
+ action.dequeues.foreach { entry =>
+ val keyLocation = entry.id.getEntryLocator.asInstanceOf[(Long, Long)]
+ val key = encodeEntryKey(ENTRY_PREFIX, keyLocation._1, keyLocation._2)
+
+ if( dataLocator==null ) {
+ dataLocator = entry.id.getDataLocator match {
+ case x:(Long, Int) => x
+ case x:MessageRecord => x.locator
+ case _ => throw new RuntimeException("Unexpected locator type")
+ }
+ }
+
+ val log_record = new EntryRecord.Bean()
+ log_record.setCollectionKey(entry.queueKey)
+ log_record.setEntryKey(new Buffer(key, 9, 8))
+ log_record.setValueLocation(dataLocator._1)
+ appender.append(LOG_REMOVE_ENTRY, encodeEntryRecord(log_record.freeze()))
+
+ batch.delete(key)
+ logRefDecrement(dataLocator._1)
+ collectionDecrementSize(entry.queueKey)
+ }
+
+ action.enqueues.foreach { entry =>
+
+ if(dataLocator ==null ) {
+ dataLocator = entry.id.getDataLocator match {
+ case x:(Long, Int) => x
+ case x:MessageRecord => x.locator
+ case _ =>
+ throw new RuntimeException("Unexpected locator type")
+ }
+ }
+
+ val start = System.nanoTime()
+
+ val key = encodeEntryKey(ENTRY_PREFIX, entry.queueKey, entry.queueSeq)
+
+ assert(entry.id.getDataLocator()!=null)
+
+ val log_record = new EntryRecord.Bean()
+ log_record.setCollectionKey(entry.queueKey)
+ log_record.setEntryKey(new Buffer(key, 9, 8))
+ log_record.setValueLocation(dataLocator._1)
+ log_record.setValueLength(dataLocator._2)
+ appender.append(LOG_ADD_ENTRY, encodeEntryRecord(log_record.freeze()))
+
+ val index_record = new EntryRecord.Bean()
+ index_record.setValueLocation(dataLocator._1)
+ index_record.setValueLength(dataLocator._2)
+ batch.put(key, encodeEntryRecord(index_record.freeze()).toByteArray)
+
+ val log_data = encodeEntryRecord(log_record.freeze())
+ val index_data = encodeEntryRecord(index_record.freeze()).toByteArray
+
+ appender.append(LOG_ADD_ENTRY, log_data)
+ batch.put(key, index_data)
+
+ Option(log_info).orElse(log.log_info(dataLocator._1)).foreach { logInfo =>
+ logRefs.getOrElseUpdate(logInfo.position, new LongCounter()).incrementAndGet()
+ }
+
+ collectionIncrementSize(entry.queueKey, log_record.getEntryKey.toByteArray)
+ write_enqueue_total += System.nanoTime() - start
+ }
+
+ }
+ uow.subAcks.foreach { entry =>
+ val key = encodeEntryKey(ENTRY_PREFIX, entry.subKey, ACK_POSITION)
+ val log_record = new EntryRecord.Bean()
+ log_record.setCollectionKey(entry.subKey)
+ log_record.setEntryKey(ACK_POSITION)
+ log_record.setValueLocation(entry.ackPosition)
+ appender.append(LOG_ADD_ENTRY, encodeEntryRecord(log_record.freeze()))
+
+ val index_record = new EntryRecord.Bean()
+ index_record.setValueLocation(entry.ackPosition)
+ batch.put(key, encodeEntryRecord(log_record.freeze()).toByteArray)
+ }
+
+ if( !syncNeeded && uow.syncNeeded ) {
+ syncNeeded = true
+ }
+ }
+
+ max_write_message_latency.add(write_message_total)
+ max_write_enqueue_latency.add(write_enqueue_total)
+ }
+ if( syncNeeded && sync ) {
+ appender.force
+ }
+ } // end of log.appender { block }
+
+ // now that data is logged.. locate message from the data in the logs
+ uows.foreach { uow =>
+ uow.actions.foreach { case (msg, action) =>
+ val messageRecord = action.messageRecord
+ if (messageRecord != null) {
+ messageRecord.id.setDataLocator(messageRecord.locator)
+ }
+ }
+ }
+ }
+ }
+
+ def getCollectionEntries(collectionKey: Long, firstSeq:Long, lastSeq:Long): Seq[(Buffer, EntryRecord.Buffer)] = {
+ var rc = ListBuffer[(Buffer, EntryRecord.Buffer)]()
+ val ro = new ReadOptions
+ ro.verifyChecksums(verifyChecksums)
+ ro.fillCache(true)
+ retryUsingIndex {
+ index.snapshot { snapshot =>
+ ro.snapshot(snapshot)
+ val start = encodeEntryKey(ENTRY_PREFIX, collectionKey, firstSeq)
+ val end = encodeEntryKey(ENTRY_PREFIX, collectionKey, lastSeq+1)
+ index.cursorRange( start, end, ro ) { (key, value) =>
+ val (_, _, seq) = decodeEntryKey(key)
+ rc.append((seq, EntryRecord.FACTORY.parseUnframed(value)))
+ true
+ }
+ }
+ }
+ rc
+ }
+
+ def getLastQueueEntrySeq(collectionKey: Long): Long = {
+ getLastCollectionEntryKey(collectionKey).map(_.bigEndianEditor().readLong()).getOrElse(0L)
+ }
+
+ def getLastCollectionEntryKey(collectionKey: Long): Option[Buffer] = {
+ collectionMeta.get(collectionKey).flatMap(x=> Option(x.last_key)).map(new Buffer(_))
+ }
+
+ def gc(topicPositions:Seq[(Long, Long)]):Unit = {
+
+ // Delete message refs for topics who's consumers have advanced..
+ if( !topicPositions.isEmpty ) {
+ retryUsingIndex {
+ index.write(new WriteOptions, max_index_write_latency) { batch =>
+ for( (topic, first) <- topicPositions ) {
+ val ro = new ReadOptions
+ ro.fillCache(true)
+ ro.verifyChecksums(verifyChecksums)
+ val start = encodeEntryKey(ENTRY_PREFIX, topic, 0)
+ val end = encodeEntryKey(ENTRY_PREFIX, topic, first)
+ index.cursorRange(start, end, ro) { case (key, value) =>
+ val entry = EntryRecord.FACTORY.parseUnframed(value)
+ batch.delete(key)
+ logRefDecrement(entry.getValueLocation)
+ true
+ }
+ }
+ }
+ }
+ }
+
+ import collection.JavaConversions._
+ lastIndexSnapshotPos
+ val emptyJournals = log.log_infos.keySet.toSet -- logRefs.keySet
+
+ // We don't want to delete any journals that the index has not snapshot'ed or
+ // the the
+ val deleteLimit = log.log_info(lastIndexSnapshotPos).map(_.position).
+ getOrElse(lastIndexSnapshotPos).min(log.appender_start)
+
+ emptyJournals.foreach { id =>
+ if ( id < deleteLimit ) {
+ log.delete(id)
+ }
+ }
+ }
+
+}
Propchange: activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala?rev=1389882&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala (added)
+++ activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala Tue Sep 25 14:32:28 2012
@@ -0,0 +1,622 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.leveldb
+
+import org.apache.activemq.broker.BrokerService
+import org.apache.activemq.broker.BrokerServiceAware
+import org.apache.activemq.broker.ConnectionContext
+import org.apache.activemq.command._
+import org.apache.activemq.openwire.OpenWireFormat
+import org.apache.activemq.usage.SystemUsage
+import java.io.File
+import java.io.IOException
+import java.util.concurrent.ExecutionException
+import java.util.concurrent.Future
+import java.util.concurrent.atomic.AtomicLong
+import reflect.BeanProperty
+import org.apache.activemq.store._
+import java.util._
+import scala.collection.mutable.ListBuffer
+import javax.management.ObjectName
+import org.apache.activemq.broker.jmx.AnnotatedMBean
+import org.apache.activemq.util._
+import org.apache.kahadb.util.LockFile
+import org.apache.activemq.leveldb.util.{RetrySupport, FileSupport, Log}
+
+object LevelDBStore extends Log {
+
+ val DONE = new CountDownFuture();
+ DONE.countDown
+
+ def toIOException(e: Throwable): IOException = {
+ if (e.isInstanceOf[ExecutionException]) {
+ var cause: Throwable = (e.asInstanceOf[ExecutionException]).getCause
+ if (cause.isInstanceOf[IOException]) {
+ return cause.asInstanceOf[IOException]
+ }
+ }
+ if (e.isInstanceOf[IOException]) {
+ return e.asInstanceOf[IOException]
+ }
+ return IOExceptionSupport.create(e)
+ }
+
+ def waitOn(future: Future[AnyRef]): Unit = {
+ try {
+ future.get
+ }
+ catch {
+ case e: Throwable => {
+ throw toIOException(e)
+ }
+ }
+ }
+}
+
+case class DurableSubscription(subKey:Long, topicKey:Long, info: SubscriptionInfo) {
+ var lastAckPosition = 0L
+ var cursorPosition = 0L
+}
+
+class LevelDBStoreView(val store:LevelDBStore) extends LevelDBStoreViewMBean {
+ import store._
+
+ def getAsyncBufferSize = asyncBufferSize
+ def getIndexDirectory = directory.getCanonicalPath
+ def getLogDirectory = Option(logDirectory).getOrElse(directory).getCanonicalPath
+ def getIndexBlockRestartInterval = indexBlockRestartInterval
+ def getIndexBlockSize = indexBlockSize
+ def getIndexCacheSize = indexCacheSize
+ def getIndexCompression = indexCompression
+ def getIndexFactory = db.client.factory.getClass.getName
+ def getIndexMaxOpenFiles = indexMaxOpenFiles
+ def getIndexWriteBufferSize = indexWriteBufferSize
+ def getLogSize = logSize
+ def getParanoidChecks = paranoidChecks
+ def getSync = sync
+ def getVerifyChecksums = verifyChecksums
+
+ def getUowClosedCounter = db.uowClosedCounter
+ def getUowCanceledCounter = db.uowCanceledCounter
+ def getUowStoringCounter = db.uowStoringCounter
+ def getUowStoredCounter = db.uowStoredCounter
+
+ def getUowMaxCompleteLatency = db.uow_complete_latency.get
+ def getMaxIndexWriteLatency = db.client.max_index_write_latency.get
+ def getMaxLogWriteLatency = db.client.log.max_log_write_latency.get
+ def getMaxLogFlushLatency = db.client.log.max_log_flush_latency.get
+ def getMaxLogRotateLatency = db.client.log.max_log_rotate_latency.get
+
+ def resetUowMaxCompleteLatency = db.uow_complete_latency.reset
+ def resetMaxIndexWriteLatency = db.client.max_index_write_latency.reset
+ def resetMaxLogWriteLatency = db.client.log.max_log_write_latency.reset
+ def resetMaxLogFlushLatency = db.client.log.max_log_flush_latency.reset
+ def resetMaxLogRotateLatency = db.client.log.max_log_rotate_latency.reset
+
+ def getIndexStats = db.client.index.getProperty("leveldb.stats")
+}
+
+import LevelDBStore._
+
+class LevelDBStore extends ServiceSupport with BrokerServiceAware with PersistenceAdapter with TransactionStore {
+
+ final val wireFormat = new OpenWireFormat
+ final val db = new DBManager(this)
+
+ @BeanProperty
+ var directory: File = null
+ @BeanProperty
+ var logDirectory: File = null
+
+ @BeanProperty
+ var logSize: Long = 1024 * 1024 * 100
+ @BeanProperty
+ var indexFactory: String = "org.fusesource.leveldbjni.JniDBFactory, org.iq80.leveldb.impl.Iq80DBFactory"
+ @BeanProperty
+ var sync: Boolean = true
+ @BeanProperty
+ var verifyChecksums: Boolean = false
+ @BeanProperty
+ var indexMaxOpenFiles: Int = 1000
+ @BeanProperty
+ var indexBlockRestartInterval: Int = 16
+ @BeanProperty
+ var paranoidChecks: Boolean = false
+ @BeanProperty
+ var indexWriteBufferSize: Int = 1024*1024*6
+ @BeanProperty
+ var indexBlockSize: Int = 4 * 1024
+ @BeanProperty
+ var indexCompression: String = "snappy"
+ @BeanProperty
+ var logCompression: String = "none"
+ @BeanProperty
+ var indexCacheSize: Long = 1024 * 1024 * 256L
+ @BeanProperty
+ var flushDelay = 1000*5
+ @BeanProperty
+ var asyncBufferSize = 1024*1024*4
+ @BeanProperty
+ var monitorStats = false
+ @BeanProperty
+ var failIfLocked = false
+
+ var purgeOnStatup: Boolean = false
+ var brokerService: BrokerService = null
+
+ val queues = collection.mutable.HashMap[ActiveMQQueue, LevelDBStore#LevelDBMessageStore]()
+ val topics = collection.mutable.HashMap[ActiveMQTopic, LevelDBStore#LevelDBTopicMessageStore]()
+ val topicsById = collection.mutable.HashMap[Long, LevelDBStore#LevelDBTopicMessageStore]()
+
+ override def toString: String = {
+ return "LevelDB:[" + directory.getAbsolutePath + "]"
+ }
+
+ def objectName = {
+ var brokerON = brokerService.getBrokerObjectName
+ val broker_name = brokerON.getKeyPropertyList().get("BrokerName")
+ new ObjectName(brokerON.getDomain() + ":" +
+ "BrokerName="+JMXSupport.encodeObjectNamePart(broker_name)+ "," +
+ "Type=LevelDBStore");
+ }
+
+ def retry[T](func : =>T):T = RetrySupport.retry(LevelDBStore, isStarted, func _)
+
+ var lock_file: LockFile = _
+
+ var snappyCompressLogs = false
+
+ def doStart: Unit = {
+ import FileSupport._
+
+ snappyCompressLogs = logCompression.toLowerCase == "snappy" && Snappy != null
+ debug("starting")
+ if ( lock_file==null ) {
+ lock_file = new LockFile(directory / "lock", true)
+ }
+
+ // Expose a JMX bean to expose the status of the store.
+ if(brokerService!=null){
+ try {
+ AnnotatedMBean.registerMBean(brokerService.getManagementContext, new LevelDBStoreView(this), objectName)
+ } catch {
+ case e: Throwable => {
+ warn(e, "LevelDB Store could not be registered in JMX: " + e.getMessage)
+ }
+ }
+ }
+
+ if (failIfLocked) {
+ lock_file.lock()
+ } else {
+ retry {
+ lock_file.lock()
+ }
+ }
+
+ if (purgeOnStatup) {
+ purgeOnStatup = false
+ db.client.locked_purge
+ info("Purged: "+this)
+ }
+
+ db.start
+ db.loadCollections
+ debug("started")
+ }
+
+ def doStop(stopper: ServiceStopper): Unit = {
+ db.stop
+ lock_file.unlock()
+ if(brokerService!=null){
+ brokerService.getManagementContext().unregisterMBean(objectName);
+ }
+ info("Stopped "+this)
+ }
+
+ def setBrokerService(brokerService: BrokerService): Unit = {
+ this.brokerService = brokerService
+ }
+
+ def setBrokerName(brokerName: String): Unit = {
+ }
+
+ def setUsageManager(usageManager: SystemUsage): Unit = {
+ }
+
+ def deleteAllMessages: Unit = {
+ purgeOnStatup = true
+ }
+
+ def getLastMessageBrokerSequenceId: Long = {
+ return 0
+ }
+
+ def createTransactionStore = this
+
+ val transactions = collection.mutable.HashMap[TransactionId, Transaction]()
+
+ trait TransactionAction {
+ def apply(uow:DelayableUOW):Unit
+ }
+
+ case class Transaction(id:TransactionId) {
+ val commitActions = ListBuffer[TransactionAction]()
+ def add(store:LevelDBMessageStore, message: Message, delay:Boolean) = {
+ commitActions += new TransactionAction() {
+ def apply(uow:DelayableUOW) = {
+ store.doAdd(uow, message, delay)
+ }
+ }
+ }
+ def remove(store:LevelDBMessageStore, msgid:MessageId) = {
+ commitActions += new TransactionAction() {
+ def apply(uow:DelayableUOW) = {
+ store.doRemove(uow, msgid)
+ }
+ }
+ }
+ def updateAckPosition(store:LevelDBTopicMessageStore, sub: DurableSubscription, position: Long) = {
+ commitActions += new TransactionAction() {
+ def apply(uow:DelayableUOW) = {
+ store.doUpdateAckPosition(uow, sub, position)
+ }
+ }
+ }
+ }
+
+ def transaction(txid: TransactionId) = transactions.getOrElseUpdate(txid, Transaction(txid))
+
+ def commit(txid: TransactionId, wasPrepared: Boolean, preCommit: Runnable, postCommit: Runnable) = {
+ preCommit.run()
+ transactions.remove(txid) match {
+ case None=>
+ println("The transaction does not exist")
+ postCommit.run()
+ case Some(tx)=>
+ withUow { uow =>
+ for( action <- tx.commitActions ) {
+ action(uow)
+ }
+ uow.addCompleteListener( postCommit.run() )
+ }
+ }
+ }
+
+ def rollback(txid: TransactionId) = {
+ transactions.remove(txid) match {
+ case None=>
+ println("The transaction does not exist")
+ case Some(tx)=>
+ }
+ }
+
+ def prepare(tx: TransactionId) = {
+ sys.error("XA transactions not yet supported.")
+ }
+ def recover(listener: TransactionRecoveryListener) = {
+ }
+
+ def createQueueMessageStore(destination: ActiveMQQueue) = {
+ this.synchronized(queues.get(destination)).getOrElse(db.createQueueStore(destination))
+ }
+
+ def createQueueMessageStore(destination: ActiveMQQueue, key: Long):LevelDBMessageStore = {
+ var rc = new LevelDBMessageStore(destination, key)
+ this.synchronized {
+ queues.put(destination, rc)
+ }
+ rc
+ }
+
+ def removeQueueMessageStore(destination: ActiveMQQueue): Unit = this synchronized {
+ queues.remove(destination).foreach { store=>
+ db.destroyQueueStore(store.key)
+ }
+ }
+
+ def createTopicMessageStore(destination: ActiveMQTopic): TopicMessageStore = {
+ this.synchronized(topics.get(destination)).getOrElse(db.createTopicStore(destination))
+ }
+
+ def createTopicMessageStore(destination: ActiveMQTopic, key: Long):LevelDBTopicMessageStore = {
+ var rc = new LevelDBTopicMessageStore(destination, key)
+ this synchronized {
+ topics.put(destination, rc)
+ topicsById.put(key, rc)
+ }
+ rc
+ }
+
+ def removeTopicMessageStore(destination: ActiveMQTopic): Unit = {
+ topics.remove(destination).foreach { store=>
+ store.subscriptions.values.foreach { sub =>
+ db.removeSubscription(sub)
+ }
+ store.subscriptions.clear()
+ db.destroyQueueStore(store.key)
+ }
+ }
+
+ def getLogAppendPosition = db.getLogAppendPosition
+
+ def getDestinations: Set[ActiveMQDestination] = {
+ import collection.JavaConversions._
+ var rc: HashSet[ActiveMQDestination] = new HashSet[ActiveMQDestination]
+ rc.addAll(topics.keys)
+ rc.addAll(queues.keys)
+ return rc
+ }
+
+ def getLastProducerSequenceId(id: ProducerId): Long = {
+ return -1
+ }
+
+ def size: Long = {
+ return 0
+ }
+
+ def checkpoint(sync: Boolean): Unit = db.checkpoint(sync)
+
+ def withUow[T](func:(DelayableUOW)=>T):T = {
+ val uow = db.createUow
+ try {
+ func(uow)
+ } finally {
+ uow.release()
+ }
+ }
+
+ private def subscriptionKey(clientId: String, subscriptionName: String): String = {
+ return clientId + ":" + subscriptionName
+ }
+
+ case class LevelDBMessageStore(dest: ActiveMQDestination, val key: Long) extends AbstractMessageStore(dest) {
+
+ protected val lastSeq: AtomicLong = new AtomicLong(0)
+ protected var cursorPosition: Long = 0
+
+ lastSeq.set(db.getLastQueueEntrySeq(key))
+
+ def doAdd(uow: DelayableUOW, message: Message, delay:Boolean): CountDownFuture = {
+ uow.enqueue(key, lastSeq.incrementAndGet, message, delay)
+ }
+
+
+ override def asyncAddQueueMessage(context: ConnectionContext, message: Message) = asyncAddQueueMessage(context, message, false)
+ override def asyncAddQueueMessage(context: ConnectionContext, message: Message, delay: Boolean): Future[AnyRef] = {
+ if( message.getTransactionId!=null ) {
+ transaction(message.getTransactionId).add(this, message, delay)
+ DONE
+ } else {
+ withUow { uow=>
+ doAdd(uow, message, delay)
+ }
+ }
+ }
+
+ override def addMessage(context: ConnectionContext, message: Message) = addMessage(context, message, false)
+ override def addMessage(context: ConnectionContext, message: Message, delay: Boolean): Unit = {
+ waitOn(asyncAddQueueMessage(context, message, delay))
+ }
+
+ def doRemove(uow: DelayableUOW, id: MessageId): CountDownFuture = {
+ uow.dequeue(key, id)
+ }
+
+ override def removeAsyncMessage(context: ConnectionContext, ack: MessageAck): Unit = {
+ if( ack.getTransactionId!=null ) {
+ transaction(ack.getTransactionId).remove(this, ack.getLastMessageId)
+ DONE
+ } else {
+ waitOn(withUow{uow=>
+ doRemove(uow, ack.getLastMessageId)
+ })
+ }
+ }
+
+ def removeMessage(context: ConnectionContext, ack: MessageAck): Unit = {
+ removeAsyncMessage(context, ack)
+ }
+
+ def getMessage(id: MessageId): Message = {
+ var message: Message = db.getMessage(id)
+ if (message == null) {
+ throw new IOException("Message id not found: " + id)
+ }
+ return message
+ }
+
+ def removeAllMessages(context: ConnectionContext): Unit = {
+ db.collectionEmpty(key)
+ cursorPosition = 0
+ }
+
+ def getMessageCount: Int = {
+ return db.collectionSize(key).toInt
+ }
+
+ override def isEmpty: Boolean = {
+ return db.collectionIsEmpty(key)
+ }
+
+ def recover(listener: MessageRecoveryListener): Unit = {
+ cursorPosition = db.cursorMessages(key, listener, 0)
+ }
+
+ def resetBatching: Unit = {
+ cursorPosition = 0
+ }
+
+ def recoverNextMessages(maxReturned: Int, listener: MessageRecoveryListener): Unit = {
+ cursorPosition = db.cursorMessages(key, LimitingRecoveryListener(maxReturned, listener), cursorPosition)
+ }
+
+ override def setBatch(id: MessageId): Unit = {
+ cursorPosition = db.queuePosition(id)
+ }
+
+ }
+
+ case class LimitingRecoveryListener(max: Int, listener: MessageRecoveryListener) extends MessageRecoveryListener {
+ private var recovered: Int = 0
+ def hasSpace = recovered < max && listener.hasSpace
+ def recoverMessage(message: Message) = {
+ recovered += 1;
+ listener.recoverMessage(message)
+ }
+ def recoverMessageReference(ref: MessageId) = {
+ recovered += 1;
+ listener.recoverMessageReference(ref)
+ }
+ def isDuplicate(ref: MessageId) = listener.isDuplicate(ref)
+ }
+
+
+ //
+ // This gts called when the store is first loading up, it restores
+ // the existing durable subs..
+ def createSubscription(sub:DurableSubscription) = {
+ this.synchronized(topicsById.get(sub.topicKey)) match {
+ case Some(topic) =>
+ topic.synchronized {
+ topic.subscriptions.put((sub.info.getClientId, sub.info.getSubcriptionName), sub)
+ }
+ case None =>
+ // Topic does not exist.. so kill the durable sub..
+ db.removeSubscription(sub)
+ }
+ }
+
+
+ def getTopicGCPositions = {
+ import collection.JavaConversions._
+ val topics = this.synchronized {
+ new ArrayList(topicsById.values())
+ }
+ topics.flatMap(_.gcPosition).toSeq
+ }
+
+ class LevelDBTopicMessageStore(dest: ActiveMQDestination, key: Long) extends LevelDBMessageStore(dest, key) with TopicMessageStore {
+ val subscriptions = collection.mutable.HashMap[(String, String), DurableSubscription]()
+ var firstSeq = 0L
+
+ def gcPosition:Option[(Long, Long)] = {
+ var pos = lastSeq.get()
+ subscriptions.synchronized {
+ subscriptions.values.foreach { sub =>
+ if( sub.lastAckPosition < pos ) {
+ pos = sub.lastAckPosition
+ }
+ }
+ if( firstSeq != pos+1) {
+ firstSeq = pos+1
+ Some(key, firstSeq)
+ } else {
+ None
+ }
+ }
+ }
+
+ def addSubsciption(info: SubscriptionInfo, retroactive: Boolean) = {
+ var sub = db.addSubscription(key, info)
+ subscriptions.synchronized {
+ subscriptions.put((info.getClientId, info.getSubcriptionName), sub)
+ }
+ sub.lastAckPosition = if (retroactive) 0 else lastSeq.get()
+ waitOn(withUow{ uow=>
+ uow.updateAckPosition(sub)
+ uow.countDownFuture
+ })
+ }
+
+ def getAllSubscriptions: Array[SubscriptionInfo] = subscriptions.synchronized {
+ subscriptions.values.map(_.info).toArray
+ }
+
+ def lookupSubscription(clientId: String, subscriptionName: String): SubscriptionInfo = subscriptions.synchronized {
+ subscriptions.get((clientId, subscriptionName)).map(_.info).getOrElse(null)
+ }
+
+ def deleteSubscription(clientId: String, subscriptionName: String): Unit = {
+ subscriptions.synchronized {
+ subscriptions.remove((clientId, subscriptionName))
+ }.foreach(db.removeSubscription(_))
+ }
+
+ private def lookup(clientId: String, subscriptionName: String): Option[DurableSubscription] = subscriptions.synchronized {
+ subscriptions.get((clientId, subscriptionName))
+ }
+
+ def doUpdateAckPosition(uow: DelayableUOW, sub: DurableSubscription, position: Long) = {
+ sub.lastAckPosition = position
+ uow.updateAckPosition(sub)
+ }
+
+ def acknowledge(context: ConnectionContext, clientId: String, subscriptionName: String, messageId: MessageId, ack: MessageAck): Unit = {
+ lookup(clientId, subscriptionName).foreach { sub =>
+ var position = db.queuePosition(messageId)
+ if( ack.getTransactionId!=null ) {
+ transaction(ack.getTransactionId).updateAckPosition(this, sub, position)
+ DONE
+ } else {
+ waitOn(withUow{ uow=>
+ doUpdateAckPosition(uow, sub, position)
+ uow.countDownFuture
+ })
+ }
+
+ }
+ }
+
+ def resetBatching(clientId: String, subscriptionName: String): Unit = {
+ lookup(clientId, subscriptionName).foreach { sub =>
+ sub.cursorPosition = 0
+ }
+ }
+ def recoverSubscription(clientId: String, subscriptionName: String, listener: MessageRecoveryListener): Unit = {
+ lookup(clientId, subscriptionName).foreach { sub =>
+ sub.cursorPosition = db.cursorMessages(key, listener, sub.cursorPosition.max(sub.lastAckPosition+1))
+ }
+ }
+
+ def recoverNextMessages(clientId: String, subscriptionName: String, maxReturned: Int, listener: MessageRecoveryListener): Unit = {
+ lookup(clientId, subscriptionName).foreach { sub =>
+ sub.cursorPosition = db.cursorMessages(key, LimitingRecoveryListener(maxReturned, listener), sub.cursorPosition.max(sub.lastAckPosition+1))
+ }
+ }
+
+ def getMessageCount(clientId: String, subscriptionName: String): Int = {
+ lookup(clientId, subscriptionName) match {
+ case Some(sub) => (lastSeq.get - sub.lastAckPosition).toInt
+ case None => 0
+ }
+ }
+
+ }
+
+ ///////////////////////////////////////////////////////////////////////////
+ // The following methods actually have nothing to do with JMS txs... It's more like
+ // operation batch.. we handle that in the DBManager tho..
+ ///////////////////////////////////////////////////////////////////////////
+ def beginTransaction(context: ConnectionContext): Unit = {}
+ def commitTransaction(context: ConnectionContext): Unit = {}
+ def rollbackTransaction(context: ConnectionContext): Unit = {}
+
+ def createClient = new LevelDBClient(this);
+}
Added: activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBStoreViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBStoreViewMBean.java?rev=1389882&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBStoreViewMBean.java (added)
+++ activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBStoreViewMBean.java Tue Sep 25 14:32:28 2012
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.leveldb;
+
+import org.apache.activemq.broker.jmx.MBeanInfo;
+
+import java.io.File;
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface LevelDBStoreViewMBean {
+
+ @MBeanInfo("The directory holding the store index data.")
+ String getIndexDirectory();
+
+ @MBeanInfo("The directory holding the store log data.")
+ String getLogDirectory();
+
+ @MBeanInfo("The size the log files are allowed to grow to.")
+ long getLogSize();
+
+ @MBeanInfo("The implementation of the LevelDB index being used.")
+ String getIndexFactory();
+
+ @MBeanInfo("Are writes synced to disk.")
+ boolean getSync();
+
+ @MBeanInfo("Is data verified against checksums as it's loaded back from disk.")
+ boolean getVerifyChecksums();
+
+ @MBeanInfo("The maximum number of open files the index will open at one time.")
+ int getIndexMaxOpenFiles();
+
+ @MBeanInfo("Number of keys between restart points for delta encoding of keys in the index")
+ int getIndexBlockRestartInterval();
+
+ @MBeanInfo("Do aggressive checking of store data")
+ boolean getParanoidChecks();
+
+ @MBeanInfo("Amount of data to build up in memory for the index before converting to a sorted on-disk file.")
+ int getIndexWriteBufferSize();
+
+ @MBeanInfo("Approximate size of user data packed per block for the index")
+ int getIndexBlockSize();
+
+ @MBeanInfo("The type of compression to use for the index")
+ String getIndexCompression();
+
+ @MBeanInfo("The size of the cache index")
+ long getIndexCacheSize();
+
+ @MBeanInfo("The maximum amount of async writes to buffer up")
+ int getAsyncBufferSize();
+
+ @MBeanInfo("The number of units of work which have been closed.")
+ long getUowClosedCounter();
+ @MBeanInfo("The number of units of work which have been canceled.")
+ long getUowCanceledCounter();
+ @MBeanInfo("The number of units of work which started getting stored.")
+ long getUowStoringCounter();
+ @MBeanInfo("The number of units of work which completed getting stored")
+ long getUowStoredCounter();
+
+ @MBeanInfo("Gets and resets the maximum time (in ms) a unit of work took to complete.")
+ double resetUowMaxCompleteLatency();
+ @MBeanInfo("Gets and resets the maximum time (in ms) an index write batch took to execute.")
+ double resetMaxIndexWriteLatency();
+ @MBeanInfo("Gets and resets the maximum time (in ms) a log write took to execute (includes the index write latency).")
+ double resetMaxLogWriteLatency();
+ @MBeanInfo("Gets and resets the maximum time (in ms) a log flush took to execute.")
+ double resetMaxLogFlushLatency();
+ @MBeanInfo("Gets and resets the maximum time (in ms) a log rotation took to perform.")
+ double resetMaxLogRotateLatency();
+
+ @MBeanInfo("Gets the maximum time (in ms) a unit of work took to complete.")
+ double getUowMaxCompleteLatency();
+ @MBeanInfo("Gets the maximum time (in ms) an index write batch took to execute.")
+ double getMaxIndexWriteLatency();
+ @MBeanInfo("Gets the maximum time (in ms) a log write took to execute (includes the index write latency).")
+ double getMaxLogWriteLatency();
+ @MBeanInfo("Gets the maximum time (in ms) a log flush took to execute.")
+ double getMaxLogFlushLatency();
+ @MBeanInfo("Gets the maximum time (in ms) a log rotation took to perform.")
+ double getMaxLogRotateLatency();
+
+ @MBeanInfo("Gets the index statistics.")
+ String getIndexStats();
+}
Added: activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala?rev=1389882&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala (added)
+++ activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala Tue Sep 25 14:32:28 2012
@@ -0,0 +1,518 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.leveldb
+
+import java.{lang=>jl}
+import java.{util=>ju}
+
+import java.util.zip.CRC32
+import java.util.Map.Entry
+import java.util.concurrent.atomic.AtomicLong
+import java.io._
+import org.fusesource.hawtbuf.{DataByteArrayInputStream, DataByteArrayOutputStream, Buffer}
+import org.fusesource.hawtdispatch.BaseRetained
+import org.apache.activemq.leveldb.util.FileSupport._
+import org.apache.activemq.util.LRUCache
+import util.TimeMetric._
+import util.{TimeMetric, Log}
+import java.util.TreeMap
+
+object RecordLog extends Log {
+
+ // The log files contain a sequence of variable length log records:
+ // record := header + data
+ //
+ // header :=
+ // '*' : int8 // Start of Record Magic
+ // kind : int8 // Help identify content type of the data.
+ // checksum : uint32 // crc32c of the data[]
+ // length : uint32 // the length the the data
+
+ val LOG_HEADER_PREFIX = '*'.toByte
+ val UOW_END_RECORD = -1.toByte
+
+ val LOG_HEADER_SIZE = 10
+
+ val BUFFER_SIZE = 1024*512
+ val BYPASS_BUFFER_SIZE = 1024*16
+
+ case class LogInfo(file:File, position:Long, length:Long) {
+ def limit = position+length
+ }
+
+ def encode_long(a1:Long) = {
+ val out = new DataByteArrayOutputStream(8)
+ out.writeLong(a1)
+ out.toBuffer
+ }
+
+ def decode_long(value:Buffer):Long = {
+ val in = new DataByteArrayInputStream(value)
+ in.readLong()
+ }
+
+}
+
+case class RecordLog(directory: File, logSuffix:String) {
+ import RecordLog._
+
+ directory.mkdirs()
+
+ var logSize = 1024 * 1024 * 100L
+ var current_appender:LogAppender = _
+ var verify_checksums = false
+ var sync = false
+
+ val log_infos = new TreeMap[Long, LogInfo]()
+
+ object log_mutex
+
+ def delete(id:Long) = {
+ log_mutex.synchronized {
+ // We can't delete the current appender.
+ if( current_appender.position != id ) {
+ Option(log_infos.get(id)).foreach { info =>
+ onDelete(info.file)
+ log_infos.remove(id)
+ }
+ }
+ }
+ }
+
+ protected def onDelete(file:File) = {
+ file.delete()
+ }
+
+ def checksum(data: Buffer): Int = {
+ val checksum = new CRC32
+ checksum.update(data.data, data.offset, data.length)
+ (checksum.getValue & 0xFFFFFFFF).toInt
+ }
+
+ class LogAppender(file:File, position:Long) extends LogReader(file, position) {
+
+ val info = new LogInfo(file, position, 0)
+
+ override def open = new RandomAccessFile(file, "rw")
+
+ override def dispose() = {
+ force
+ super.dispose()
+ }
+
+ var append_offset = 0L
+ val flushed_offset = new AtomicLong(0)
+
+ def append_position = {
+ position+append_offset
+ }
+
+ // set the file size ahead of time so that we don't have to sync the file
+ // meta-data on every log sync.
+ channel.position(logSize-1)
+ channel.write(new Buffer(1).toByteBuffer)
+ channel.force(true)
+ if( sync ) {
+ channel.position(0)
+ }
+
+ val write_buffer = new DataByteArrayOutputStream(BUFFER_SIZE+LOG_HEADER_SIZE)
+
+ def force = {
+ flush
+ if(sync) {
+ max_log_flush_latency {
+ // only need to update the file metadata if the file size changes..
+ channel.force(append_offset > logSize)
+ }
+ }
+ }
+
+ /**
+ * returns the offset position of the data record.
+ */
+ def append(id:Byte, data: Buffer) = this.synchronized {
+ val record_position = append_position
+ val data_length = data.length
+ val total_length = LOG_HEADER_SIZE + data_length
+
+ if( write_buffer.position() + total_length > BUFFER_SIZE ) {
+ flush
+ }
+
+ val cs: Int = checksum(data)
+// trace("Writing at: "+record_position+" len: "+data_length+" with checksum: "+cs)
+
+ if( false && total_length > BYPASS_BUFFER_SIZE ) {
+
+ // Write the header and flush..
+ write_buffer.writeByte(LOG_HEADER_PREFIX)
+ write_buffer.writeByte(id)
+ write_buffer.writeInt(cs)
+ write_buffer.writeInt(data_length)
+
+ append_offset += LOG_HEADER_SIZE
+ flush
+
+ // Directly write the data to the channel since it's large.
+ val buffer = data.toByteBuffer
+ val pos = append_offset+LOG_HEADER_SIZE
+ val remaining = buffer.remaining
+ channel.write(buffer, pos)
+ flushed_offset.addAndGet(remaining)
+ if( buffer.hasRemaining ) {
+ throw new IOException("Short write")
+ }
+ append_offset += data_length
+
+ } else {
+ write_buffer.writeByte(LOG_HEADER_PREFIX)
+ write_buffer.writeByte(id)
+ write_buffer.writeInt(cs)
+ write_buffer.writeInt(data_length)
+ write_buffer.write(data.data, data.offset, data_length)
+ append_offset += total_length
+ }
+ (record_position, info)
+ }
+
+ def flush = max_log_flush_latency { this.synchronized {
+ if( write_buffer.position() > 0 ) {
+ val buffer = write_buffer.toBuffer.toByteBuffer
+ val remaining = buffer.remaining
+ val pos = append_offset-remaining
+ channel.write(buffer, pos)
+ flushed_offset.addAndGet(remaining)
+ if( buffer.hasRemaining ) {
+ throw new IOException("Short write")
+ }
+ write_buffer.reset()
+ } }
+ }
+
+ override def check_read_flush(end_offset:Long) = {
+ if( flushed_offset.get() < end_offset ) {
+ flush
+ }
+ }
+
+ }
+
+ case class LogReader(file:File, position:Long) extends BaseRetained {
+
+ def open = new RandomAccessFile(file, "r")
+
+ val fd = open
+ val channel = fd.getChannel
+
+ override def dispose() {
+ fd.close()
+ }
+
+ def check_read_flush(end_offset:Long) = {}
+
+ def read(record_position:Long, length:Int) = {
+ val offset = record_position-position
+ assert(offset >=0 )
+
+ check_read_flush(offset+LOG_HEADER_SIZE+length)
+
+ if(verify_checksums) {
+
+ val record = new Buffer(LOG_HEADER_SIZE+length)
+
+ def record_is_not_changing = {
+ using(open) { fd =>
+ val channel = fd.getChannel
+ val new_record = new Buffer(LOG_HEADER_SIZE+length)
+ channel.read(new_record.toByteBuffer, offset)
+ var same = record == new_record
+ println(same)
+ same
+ }
+ }
+
+ if( channel.read(record.toByteBuffer, offset) != record.length ) {
+ assert( record_is_not_changing )
+ throw new IOException("short record at position: "+record_position+" in file: "+file+", offset: "+offset)
+ }
+
+ val is = new DataByteArrayInputStream(record)
+ val prefix = is.readByte()
+ if( prefix != LOG_HEADER_PREFIX ) {
+ assert(record_is_not_changing)
+ throw new IOException("invalid record at position: "+record_position+" in file: "+file+", offset: "+offset)
+ }
+
+ val id = is.readByte()
+ val expectedChecksum = is.readInt()
+ val expectedLength = is.readInt()
+ val data = is.readBuffer(length)
+
+ // If your reading the whole record we can verify the data checksum
+ if( expectedLength == length ) {
+ if( expectedChecksum != checksum(data) ) {
+ assert(record_is_not_changing)
+ throw new IOException("checksum does not match at position: "+record_position+" in file: "+file+", offset: "+offset)
+ }
+ }
+
+ data
+ } else {
+ val data = new Buffer(length)
+ if( channel.read(data.toByteBuffer, offset+LOG_HEADER_SIZE) != data.length ) {
+ throw new IOException("short record at position: "+record_position+" in file: "+file+", offset: "+offset)
+ }
+ data
+ }
+ }
+
+ def read(record_position:Long) = {
+ val offset = record_position-position
+ val header = new Buffer(LOG_HEADER_SIZE)
+ channel.read(header.toByteBuffer, offset)
+ val is = header.bigEndianEditor();
+ val prefix = is.readByte()
+ if( prefix != LOG_HEADER_PREFIX ) {
+ // Does not look like a record.
+ throw new IOException("invalid record position")
+ }
+ val id = is.readByte()
+ val expectedChecksum = is.readInt()
+ val length = is.readInt()
+ val data = new Buffer(length)
+
+ if( channel.read(data.toByteBuffer, offset+LOG_HEADER_SIZE) != length ) {
+ throw new IOException("short record")
+ }
+
+ if(verify_checksums) {
+ if( expectedChecksum != checksum(data) ) {
+ throw new IOException("checksum does not match")
+ }
+ }
+ (id, data, record_position+LOG_HEADER_SIZE+length)
+ }
+
+ def check(record_position:Long):Option[(Long, Option[Long])] = {
+ var offset = record_position-position
+ val header = new Buffer(LOG_HEADER_SIZE)
+ channel.read(header.toByteBuffer, offset)
+ val is = header.bigEndianEditor();
+ val prefix = is.readByte()
+ if( prefix != LOG_HEADER_PREFIX ) {
+ return None // Does not look like a record.
+ }
+ val kind = is.readByte()
+ val expectedChecksum = is.readInt()
+ val length = is.readInt()
+
+ val chunk = new Buffer(1024*4)
+ val chunkbb = chunk.toByteBuffer
+ offset += LOG_HEADER_SIZE
+
+ // Read the data in in chunks to avoid
+ // OOME if we are checking an invalid record
+ // with a bad record length
+ val checksumer = new CRC32
+ var remaining = length
+ while( remaining > 0 ) {
+ val chunkSize = remaining.min(1024*4);
+ chunkbb.position(0)
+ chunkbb.limit(chunkSize)
+ channel.read(chunkbb, offset)
+ if( chunkbb.hasRemaining ) {
+ return None
+ }
+ checksumer.update(chunk.data, 0, chunkSize)
+ offset += chunkSize
+ remaining -= chunkSize
+ }
+
+ val checksum = ( checksumer.getValue & 0xFFFFFFFF).toInt
+ if( expectedChecksum != checksum ) {
+ return None
+ }
+ val uow_start_pos = if(kind == UOW_END_RECORD && length==8) Some(decode_long(chunk)) else None
+ return Some(record_position+LOG_HEADER_SIZE+length, uow_start_pos)
+ }
+
+ def verifyAndGetEndPosition:Long = {
+ var pos = position;
+ var current_uow_start = pos
+ val limit = position+channel.size()
+ while(pos < limit) {
+ check(pos) match {
+ case Some((next, uow_start_pos)) =>
+ uow_start_pos.foreach { uow_start_pos =>
+ if( uow_start_pos == current_uow_start ) {
+ current_uow_start = next
+ } else {
+ return current_uow_start
+ }
+ }
+ pos = next
+ case None =>
+ return current_uow_start
+ }
+ }
+ return current_uow_start
+ }
+ }
+
+ def create_log_appender(position: Long) = {
+ new LogAppender(next_log(position), position)
+ }
+
+ def create_appender(position: Long): Any = {
+ log_mutex.synchronized {
+ if(current_appender!=null) {
+ log_infos.put (position, new LogInfo(current_appender.file, current_appender.position, current_appender.append_offset))
+ }
+ current_appender = create_log_appender(position)
+ log_infos.put(position, new LogInfo(current_appender.file, position, 0))
+ }
+ }
+
+ val max_log_write_latency = TimeMetric()
+ val max_log_flush_latency = TimeMetric()
+ val max_log_rotate_latency = TimeMetric()
+
+ def open = {
+ log_mutex.synchronized {
+ log_infos.clear()
+ LevelDBClient.find_sequence_files(directory, logSuffix).foreach { case (position,file) =>
+ log_infos.put(position, LogInfo(file, position, file.length()))
+ }
+
+ val appendPos = if( log_infos.isEmpty ) {
+ 0L
+ } else {
+ val file = log_infos.lastEntry().getValue
+ val r = LogReader(file.file, file.position)
+ try {
+ val actualLength = r.verifyAndGetEndPosition
+ val updated = file.copy(length = actualLength - file.position)
+ log_infos.put(updated.position, updated)
+ if( updated.file.length != file.length ) {
+ // we need to truncate.
+ using(new RandomAccessFile(file.file, "rw")) ( _.setLength(updated.length))
+ }
+ actualLength
+ } finally {
+ r.release()
+ }
+ }
+
+ create_appender(appendPos)
+ }
+ }
+
+ def close = {
+ log_mutex.synchronized {
+ current_appender.release
+ }
+ }
+
+ def appender_limit = current_appender.append_position
+ def appender_start = current_appender.position
+
+ def next_log(position:Long) = LevelDBClient.create_sequence_file(directory, position, logSuffix)
+
+ def appender[T](func: (LogAppender)=>T):T= {
+ val intial_position = current_appender.append_position
+ try {
+ max_log_write_latency {
+ val rc = func(current_appender)
+ if( current_appender.append_position != intial_position ) {
+ // Record a UOW_END_RECORD so that on recovery we only replay full units of work.
+ current_appender.append(UOW_END_RECORD,encode_long(intial_position))
+ }
+ rc
+ }
+ } finally {
+ current_appender.flush
+ max_log_rotate_latency {
+ log_mutex.synchronized {
+ if ( current_appender.append_offset >= logSize ) {
+ current_appender.release()
+ on_log_rotate()
+ create_appender(current_appender.append_position)
+ }
+ }
+ }
+ }
+ }
+
+ var on_log_rotate: ()=>Unit = ()=>{}
+
+ private val reader_cache = new LRUCache[File, LogReader](100) {
+ protected override def onCacheEviction(entry: Entry[File, LogReader]) = {
+ entry.getValue.release()
+ }
+ }
+
+ def log_info(pos:Long) = log_mutex.synchronized { Option(log_infos.floorEntry(pos)).map(_.getValue) }
+
+ private def get_reader[T](record_position:Long)(func: (LogReader)=>T) = {
+
+ val lookup = log_mutex.synchronized {
+ val info = log_info(record_position)
+ info.map { info=>
+ if(info.position == current_appender.position) {
+ current_appender.retain()
+ (info, current_appender)
+ } else {
+ (info, null)
+ }
+ }
+ }
+
+ lookup.map { case (info, appender) =>
+ val reader = if( appender!=null ) {
+ // read from the current appender.
+ appender
+ } else {
+ // Checkout a reader from the cache...
+ reader_cache.synchronized {
+ var reader = reader_cache.get(info.file)
+ if(reader==null) {
+ reader = LogReader(info.file, info.position)
+ reader_cache.put(info.file, reader)
+ }
+ reader.retain()
+ reader
+ }
+ }
+
+ try {
+ func(reader)
+ } finally {
+ reader.release
+ }
+ }
+ }
+
+ def read(pos:Long) = {
+ get_reader(pos)(_.read(pos))
+ }
+ def read(pos:Long, length:Int) = {
+ get_reader(pos)(_.read(pos, length))
+ }
+
+}