You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/02/19 21:24:16 UTC
svn commit: r1291054 [1/2] - in
/activemq/activemq-apollo/trunk/apollo-leveldb/src:
main/scala/org/apache/activemq/apollo/broker/store/leveldb/
main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/
test/scala/org/apache/activemq/apollo/br...
Author: chirino
Date: Sun Feb 19 20:24:15 2012
New Revision: 1291054
URL: http://svn.apache.org/viewvc?rev=1291054&view=rev
Log:
Move leveldb store source files to correct directory
Added:
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/HelperTrait.scala
- copied, changed from r1245945, activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/HelperTrait.scala
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/Interval.scala
- copied, changed from r1245945, activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/Interval.scala
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala (with props)
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStore.scala
- copied, changed from r1245945, activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStoreFactory.scala
- copied, changed from r1245945, activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStoreFactory.scala
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/RecordLog.scala
- copied, changed from r1245945, activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStoreTest.scala
- copied, changed from r1245945, activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStoreTest.scala
activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/PureJavaLevelDBStoreTest.scala
- copied, changed from r1245945, activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/PureJavaLevelDBStoreTest.scala
Removed:
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/HelperTrait.scala
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/Interval.scala
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStoreFactory.scala
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStoreTest.scala
activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/PureJavaLevelDBStoreTest.scala
Copied: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/HelperTrait.scala (from r1245945, activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/HelperTrait.scala)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/HelperTrait.scala?p2=activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/HelperTrait.scala&p1=activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/HelperTrait.scala&r1=1245945&r2=1291054&rev=1291054&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/HelperTrait.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/HelperTrait.scala Sun Feb 19 20:24:15 2012
@@ -1,3 +1,5 @@
+package org.apache.activemq.apollo.broker.store.leveldb
+
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -14,7 +16,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.apollo.broker.store.leveldb
import org.fusesource.hawtbuf._
import org.iq80.leveldb._
@@ -22,26 +23,27 @@ import java.io.DataOutput
object HelperTrait {
- def encode_locator(pos:Long, len:Int):Array[Byte] = {
+ def encode_locator(pos: Long, len: Int): Array[Byte] = {
val out = new DataByteArrayOutputStream(
- AbstractVarIntSupport.computeVarLongSize(pos)+
- AbstractVarIntSupport.computeVarIntSize(len)
+ AbstractVarIntSupport.computeVarLongSize(pos) +
+ AbstractVarIntSupport.computeVarIntSize(len)
)
out.writeVarLong(pos)
out.writeVarInt(len)
out.getData
}
- def decode_locator(bytes:Array[Byte]):(Long, Int) = {
+ def decode_locator(bytes: Array[Byte]): (Long, Int) = {
val in = new DataByteArrayInputStream(bytes)
(in.readVarLong(), in.readVarInt())
}
- def decode_locator(bytes:Buffer):(Long, Int) = {
+
+ def decode_locator(bytes: Buffer): (Long, Int) = {
val in = new DataByteArrayInputStream(bytes)
(in.readVarLong(), in.readVarInt())
}
- def encode_vlong(a1:Long):Array[Byte] = {
+ def encode_vlong(a1: Long): Array[Byte] = {
val out = new DataByteArrayOutputStream(
AbstractVarIntSupport.computeVarLongSize(a1)
)
@@ -49,31 +51,31 @@ object HelperTrait {
out.getData
}
- def decode_vlong(bytes:Array[Byte]):Long = {
+ def decode_vlong(bytes: Array[Byte]): Long = {
val in = new DataByteArrayInputStream(bytes)
in.readVarLong()
}
- def encode_key(a1:Byte, a2:Long):Array[Byte] = {
+ def encode_key(a1: Byte, a2: Long): Array[Byte] = {
val out = new DataByteArrayOutputStream(9)
out.writeByte(a1.toInt)
out.writeLong(a2)
out.getData
}
- def encode_key(a1:Byte, a2:Buffer):Array[Byte] = {
- val out = new DataByteArrayOutputStream(1+a2.length)
+ def encode_key(a1: Byte, a2: Buffer): Array[Byte] = {
+ val out = new DataByteArrayOutputStream(1 + a2.length)
out.writeByte(a1.toInt)
a2.writeTo(out.asInstanceOf[DataOutput])
out.getData
}
- def decode_long_key(bytes:Array[Byte]):(Byte, Long) = {
+ def decode_long_key(bytes: Array[Byte]): (Byte, Long) = {
val in = new DataByteArrayInputStream(bytes)
(in.readByte(), in.readLong())
}
- def encode_key(a1:Byte, a2:Long, a3:Long):Array[Byte] = {
+ def encode_key(a1: Byte, a2: Long, a3: Long): Array[Byte] = {
val out = new DataByteArrayOutputStream(17)
out.writeByte(a1)
out.writeLong(a2)
@@ -81,19 +83,19 @@ object HelperTrait {
out.getData
}
- def decode_long_long_key(bytes:Array[Byte]):(Byte,Long,Long) = {
+ def decode_long_long_key(bytes: Array[Byte]): (Byte, Long, Long) = {
val in = new DataByteArrayInputStream(bytes)
(in.readByte(), in.readLong(), in.readLong())
}
- def encode(a1:Byte, a2:Int):Array[Byte] = {
+ def encode(a1: Byte, a2: Int): Array[Byte] = {
val out = new DataByteArrayOutputStream(5)
out.writeByte(a1)
out.writeInt(a2)
out.getData
}
- def decode_int_key(bytes:Array[Byte]):(Byte,Int) = {
+ def decode_int_key(bytes: Array[Byte]): (Byte, Int) = {
val in = new DataByteArrayInputStream(bytes)
(in.readByte(), in.readInt())
}
@@ -101,30 +103,30 @@ object HelperTrait {
final class RichDB(val db: DB) {
val is_pure_java_version = db.getClass.getName == "org.iq80.leveldb.impl.DbImpl"
-
- def getProperty(name:String) = db.getProperty(name)
- def getApproximateSizes(ranges:Range*) = db.getApproximateSizes(ranges:_*)
+ def getProperty(name: String) = db.getProperty(name)
+
+ def getApproximateSizes(ranges: Range*) = db.getApproximateSizes(ranges: _*)
- def get(key:Array[Byte], ro:ReadOptions=new ReadOptions):Option[Array[Byte]] = {
+ def get(key: Array[Byte], ro: ReadOptions = new ReadOptions): Option[Array[Byte]] = {
Option(db.get(key, ro))
}
- def close:Unit = db.close()
+ def close: Unit = db.close()
- def delete(key:Array[Byte], wo:WriteOptions=new WriteOptions):Unit = {
+ def delete(key: Array[Byte], wo: WriteOptions = new WriteOptions): Unit = {
db.delete(key, wo)
}
- def put(key:Array[Byte], value:Array[Byte], wo:WriteOptions=new WriteOptions):Unit = {
+ def put(key: Array[Byte], value: Array[Byte], wo: WriteOptions = new WriteOptions): Unit = {
db.put(key, value, wo)
}
- def write[T](wo:WriteOptions=new WriteOptions)(func: WriteBatch=>T):T = {
+ def write[T](wo: WriteOptions = new WriteOptions)(func: WriteBatch => T): T = {
val updates = db.createWriteBatch()
try {
- val rc=Some(func(updates))
+ val rc = Some(func(updates))
db.write(updates, wo)
return rc.get
} finally {
@@ -132,7 +134,7 @@ object HelperTrait {
}
}
- def snapshot[T](func: Snapshot=>T):T = {
+ def snapshot[T](func: Snapshot => T): T = {
val snapshot = db.getSnapshot
try {
func(snapshot)
@@ -141,11 +143,11 @@ object HelperTrait {
}
}
- def cursor_keys(ro:ReadOptions=new ReadOptions)(func: Array[Byte] => Boolean): Unit = {
+ def cursor_keys(ro: ReadOptions = new ReadOptions)(func: Array[Byte] => Boolean): Unit = {
val iterator = db.iterator(ro)
iterator.seekToFirst();
try {
- while( iterator.hasNext && func(iterator.peekNext.getKey) ) {
+ while (iterator.hasNext && func(iterator.peekNext.getKey)) {
iterator.next()
}
} finally {
@@ -153,14 +155,14 @@ object HelperTrait {
}
}
- def cursor_keys_prefixed(prefix:Array[Byte], ro:ReadOptions=new ReadOptions)(func: Array[Byte] => Boolean): Unit = {
+ def cursor_keys_prefixed(prefix: Array[Byte], ro: ReadOptions = new ReadOptions)(func: Array[Byte] => Boolean): Unit = {
val iterator = db.iterator(ro)
iterator.seek(prefix);
try {
- def check(key:Array[Byte]) = {
+ def check(key: Array[Byte]) = {
key.startsWith(prefix) && func(key)
}
- while( iterator.hasNext && check(iterator.peekNext.getKey) ) {
+ while (iterator.hasNext && check(iterator.peekNext.getKey)) {
iterator.next()
}
} finally {
@@ -168,14 +170,14 @@ object HelperTrait {
}
}
- def cursor_prefixed(prefix:Array[Byte], ro:ReadOptions=new ReadOptions)(func: (Array[Byte],Array[Byte]) => Boolean): Unit = {
+ def cursor_prefixed(prefix: Array[Byte], ro: ReadOptions = new ReadOptions)(func: (Array[Byte], Array[Byte]) => Boolean): Unit = {
val iterator = db.iterator(ro)
iterator.seek(prefix);
try {
- def check(key:Array[Byte]) = {
+ def check(key: Array[Byte]) = {
key.startsWith(prefix) && func(key, iterator.peekNext.getValue)
}
- while( iterator.hasNext && check(iterator.peekNext.getKey) ) {
+ while (iterator.hasNext && check(iterator.peekNext.getKey)) {
iterator.next()
}
} finally {
@@ -183,22 +185,22 @@ object HelperTrait {
}
}
- def compare(a1:Array[Byte], a2:Array[Byte]):Int = {
+ def compare(a1: Array[Byte], a2: Array[Byte]): Int = {
new Buffer(a1).compareTo(new Buffer(a2))
}
- def cursor_range_keys(start_included:Array[Byte], end_excluded:Array[Byte], ro:ReadOptions=new ReadOptions)(func: Array[Byte] => Boolean): Unit = {
+ def cursor_range_keys(start_included: Array[Byte], end_excluded: Array[Byte], ro: ReadOptions = new ReadOptions)(func: Array[Byte] => Boolean): Unit = {
val iterator = db.iterator(ro)
iterator.seek(start_included);
try {
- def check(key:Array[Byte]) = {
- if ( compare(key,end_excluded) < 0) {
+ def check(key: Array[Byte]) = {
+ if (compare(key, end_excluded) < 0) {
func(key)
} else {
false
}
}
- while( iterator.hasNext && check(iterator.peekNext.getKey) ) {
+ while (iterator.hasNext && check(iterator.peekNext.getKey)) {
iterator.next()
}
} finally {
@@ -206,14 +208,14 @@ object HelperTrait {
}
}
- def cursor_range(start_included:Array[Byte], end_excluded:Array[Byte], ro:ReadOptions=new ReadOptions)(func: (Array[Byte],Array[Byte]) => Boolean): Unit = {
+ def cursor_range(start_included: Array[Byte], end_excluded: Array[Byte], ro: ReadOptions = new ReadOptions)(func: (Array[Byte], Array[Byte]) => Boolean): Unit = {
val iterator = db.iterator(ro)
iterator.seek(start_included);
try {
- def check(key:Array[Byte]) = {
- (compare(key,end_excluded) < 0) && func(key, iterator.peekNext.getValue)
+ def check(key: Array[Byte]) = {
+ (compare(key, end_excluded) < 0) && func(key, iterator.peekNext.getValue)
}
- while( iterator.hasNext && check(iterator.peekNext.getKey) ) {
+ while (iterator.hasNext && check(iterator.peekNext.getKey)) {
iterator.next()
}
} finally {
@@ -221,35 +223,36 @@ object HelperTrait {
}
}
- def last_key(prefix:Array[Byte], ro:ReadOptions=new ReadOptions): Option[Array[Byte]] = {
+ def last_key(prefix: Array[Byte], ro: ReadOptions = new ReadOptions): Option[Array[Byte]] = {
val last = new Buffer(prefix).deepCopy().data
- if ( last.length > 0 ) {
- val pos = last.length-1
- last(pos) = (last(pos)+1).toByte
+ if (last.length > 0) {
+ val pos = last.length - 1
+ last(pos) = (last(pos) + 1).toByte
}
- if(is_pure_java_version) {
+ if (is_pure_java_version) {
// The pure java version of LevelDB does not support backward iteration.
- var rc:Option[Array[Byte]] = None
- cursor_range_keys(prefix, last) { key=>
- rc = Some(key)
- true
+ var rc: Option[Array[Byte]] = None
+ cursor_range_keys(prefix, last) {
+ key =>
+ rc = Some(key)
+ true
}
rc
} else {
val iterator = db.iterator(ro)
try {
-
+
iterator.seek(last);
- if ( iterator.hasPrev ) {
+ if (iterator.hasPrev) {
iterator.prev()
} else {
iterator.seekToLast()
}
- if ( iterator.hasNext ) {
+ if (iterator.hasNext) {
val key = iterator.peekNext.getKey
- if(key.startsWith(prefix)) {
+ if (key.startsWith(prefix)) {
Some(key)
} else {
None
Copied: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/Interval.scala (from r1245945, activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/Interval.scala)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/Interval.scala?p2=activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/Interval.scala&p1=activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/Interval.scala&r1=1245945&r2=1291054&rev=1291054&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/Interval.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/Interval.scala Sun Feb 19 20:24:15 2012
@@ -1,3 +1,5 @@
+package org.apache.activemq.apollo.broker.store.leveldb
+
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -14,7 +16,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.apollo.broker.store.leveldb
import java.util.ArrayList
import java.util.Iterator
@@ -23,21 +24,24 @@ import java.util.NoSuchElementException
import org.apache.activemq.apollo.util.TreeMap
object Interval {
- def apply[N](start:N)(implicit numeric: scala.math.Numeric[N]):Interval[N] = {
+ def apply[N](start: N)(implicit numeric: scala.math.Numeric[N]): Interval[N] = {
import numeric._
- Interval(start, start+one)
+ Interval(start, start + one)
}
}
case class Interval[N](start: N, limit: N)(implicit numeric: scala.math.Numeric[N]) {
+
import numeric._
def size = limit - start
+
def end = limit - one
- def start(value: N):Interval[N] = Interval(value, limit)
- def limit(value: N):Interval[N] = Interval(start, value)
-
+ def start(value: N): Interval[N] = Interval(value, limit)
+
+ def limit(value: N): Interval[N] = Interval(start, value)
+
override def toString = {
if (start == end) {
start.toString
@@ -57,8 +61,10 @@ case class Interval[N](start: N, limit:
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
case class IntervalSet[N](implicit numeric: scala.math.Numeric[N]) extends java.lang.Iterable[Interval[N]] {
+
import numeric._
import collection.JavaConversions._
+
private final val ranges = new TreeMap[N, Interval[N]]
def copy = {
@@ -68,11 +74,13 @@ case class IntervalSet[N](implicit numer
}
rc
}
- def add(r:N):Unit = add(Interval(r))
- def add(r:Interval[N]): Unit = {
+
+ def add(r: N): Unit = add(Interval(r))
+
+ def add(r: Interval[N]): Unit = {
var start = r.start
var limit = r.limit
-
+
var entry = ranges.floorEntry(limit)
while (entry != null) {
var curr = entry
@@ -95,17 +103,18 @@ case class IntervalSet[N](implicit numer
ranges.put(start, Interval(start, limit))
}
- def remove(r:N):Unit = remove(Interval(r))
- def remove(r:Interval[N]): Unit = {
- val start = r.start
+ def remove(r: N): Unit = remove(Interval(r))
+
+ def remove(r: Interval[N]): Unit = {
+ val start = r.start
var limit = r.limit
var entry = ranges.lowerEntry(limit)
while (entry != null) {
-
+
var curr = entry
var range = curr.getValue
entry = entry.previous
-
+
if (range.limit <= start) {
entry = null
} else {
@@ -155,7 +164,7 @@ case class IntervalSet[N](implicit numer
}
override def toString = {
- "[ " + ranges.values().mkString(", ")+" ]"
+ "[ " + ranges.values().mkString(", ") + " ]"
}
def iterator: Iterator[Interval[N]] = {
@@ -183,7 +192,7 @@ case class IntervalSet[N](implicit numer
private var _next: Interval[N] = null
def hasNext: Boolean = {
- while (next==null && last.limit < mask.limit && iter.hasNext) {
+ while (next == null && last.limit < mask.limit && iter.hasNext) {
var r = iter.next
if (r.limit >= last.limit) {
if (r.start < last.limit) {
@@ -215,7 +224,7 @@ case class IntervalSet[N](implicit numer
}
}
- private final class ValueIterator(val ranges:Iterator[Interval[N]]) extends java.util.Iterator[N] {
+ private final class ValueIterator(val ranges: Iterator[Interval[N]]) extends java.util.Iterator[N] {
private var range: Interval[N] = null
private var _next: Option[N] = None
Added: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala?rev=1291054&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala Sun Feb 19 20:24:15 2012
@@ -0,0 +1,1416 @@
+package org.apache.activemq.apollo.broker.store.leveldb
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.{lang => jl}
+import java.{util => ju}
+
+import org.fusesource.hawtbuf.proto.PBMessageFactory
+
+import org.apache.activemq.apollo.broker.store._
+import java.io._
+import java.util.concurrent.TimeUnit
+import org.apache.activemq.apollo.util._
+import java.util.concurrent.locks.ReentrantReadWriteLock
+import org.fusesource.hawtdispatch._
+import org.apache.activemq.apollo.util.{TreeMap => ApolloTreeMap}
+import collection.immutable.TreeMap
+import org.fusesource.leveldbjni.internal.Util
+import org.apache.activemq.apollo.broker.Broker
+import org.apache.activemq.apollo.util.ProcessSupport._
+import collection.mutable.{HashMap, ListBuffer}
+import org.apache.activemq.apollo.dto.JsonCodec
+import org.iq80.leveldb._
+import org.apache.activemq.apollo.broker.store.leveldb.RecordLog.LogInfo
+import org.apache.activemq.apollo.broker.store.PBSupport
+import java.util.concurrent.atomic.AtomicReference
+import org.fusesource.hawtbuf.{AsciiBuffer, Buffer, AbstractVarIntSupport}
+import org.apache.activemq.apollo.broker.store.leveldb.HelperTrait.encode_key
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object LevelDBClient extends Log {
+
+ final val STORE_SCHEMA_PREFIX = "leveldb_store:"
+ final val STORE_SCHEMA_VERSION = 3
+
+ final val queue_prefix = 'q'.toByte
+ final val queue_entry_prefix = 'e'.toByte
+ final val map_prefix = 'p'.toByte
+ final val tmp_prefix = 't'.toByte
+
+ final val queue_prefix_array = Array(queue_prefix)
+ final val map_prefix_array = Array(map_prefix)
+ final val queue_entry_prefix_array = Array(queue_entry_prefix)
+
+ final val dirty_index_key = bytes(":dirty")
+ final val log_refs_index_key = bytes(":log-refs")
+ final val TRUE = bytes("true")
+ final val FALSE = bytes("false")
+
+ final val LOG_ADD_QUEUE = 1.toByte
+ final val LOG_REMOVE_QUEUE = 2.toByte
+ final val LOG_ADD_MESSAGE = 3.toByte
+ final val LOG_ADD_QUEUE_ENTRY = 5.toByte
+ final val LOG_REMOVE_QUEUE_ENTRY = 6.toByte
+ final val LOG_MAP_ENTRY = 7.toByte
+
+ final val LOG_ADD_MESSAGE_SNAPPY = (LOG_ADD_MESSAGE + 100).toByte
+ final val LOG_MAP_ENTRY_SNAPPY = (LOG_MAP_ENTRY + 100).toByte
+
+ final val LOG_SUFFIX = ".log"
+ final val INDEX_SUFFIX = ".index"
+
+ def bytes(value: String) = value.getBytes("UTF-8")
+
+ import FileSupport._
+
+ def create_sequence_file(directory: File, id: Long, suffix: String) = directory / ("%016x%s".format(id, suffix))
+
+ def find_sequence_files(directory: File, suffix: String): TreeMap[Long, File] = {
+ TreeMap((directory.list_files.flatMap {
+ f =>
+ if (f.getName.endsWith(suffix)) {
+ try {
+ val base = f.getName.stripSuffix(suffix)
+ val position = java.lang.Long.parseLong(base, 16);
+ Some(position -> f)
+ } catch {
+ case e: NumberFormatException => None
+ }
+ } else {
+ None
+ }
+ }): _*)
+ }
+
+ val on_windows = System.getProperty("os.name").toLowerCase().startsWith("windows")
+ var link_strategy = 0
+
+ def link(source: File, target: File): Unit = {
+ link_strategy match {
+ case 0 =>
+ // We first try to link via a native system call. Fails if
+ // we cannot load the JNI module.
+ try {
+ Util.link(source, target)
+ } catch {
+ case e: IOException => throw e
+ case e: Throwable =>
+ // Fallback.. to a slower impl..
+ debug("Native link system call not available")
+ link_strategy = 2
+ link(source, target)
+ }
+
+ // TODO: consider implementing a case which does the native system call using JNA
+ case 2 =>
+ // Next try JNA (might not be in classpath)
+ try {
+ IOHelper.hardlink(source, target)
+ } catch {
+ case e: IOException => throw e
+ case e: Throwable =>
+ // Fallback.. to a slower impl..
+ debug("JNA based hard link system call not available")
+ link_strategy = 5
+ link(source, target)
+ }
+
+ case 5 =>
+ // Next we try to do the link by executing an
+ // operating system shell command
+ try {
+ if (on_windows) {
+ system("fsutil", "hardlink", "create", target.getCanonicalPath, source.getCanonicalPath) match {
+ case (0, _, _) => // Success
+ case (_, out, err) =>
+ // TODO: we might want to look at the out/err to see why it failed
+ // to avoid falling back to the slower strategy.
+ debug("fsutil OS command not available either")
+ link_strategy = 10
+ link(source, target)
+ }
+ } else {
+ system("ln", source.getCanonicalPath, target.getCanonicalPath) match {
+ case (0, _, _) => // Success
+ case (_, out, err) => None
+ // TODO: we might want to look at the out/err to see why it failed
+ // to avoid falling back to the slower strategy.
+ debug("ln OS command not available either")
+ link_strategy = 10
+ link(source, target)
+ }
+ }
+ } catch {
+ case e: Throwable =>
+ }
+ case _ =>
+ // this final strategy is slow but sure to work.
+ source.copy_to(target)
+ }
+ }
+
+}
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class LevelDBClient(store: LevelDBStore) {
+
+ import HelperTrait._
+ import LevelDBClient._
+ import FileSupport._
+
+ def dispatchQueue = store.dispatch_queue
+
+ implicit def toByteArray(buf: Buffer): Array[Byte] = buf.toByteArray
+
+ implicit def toBuffer(buf: Array[Byte]): Buffer = new Buffer(buf)
+
+ /////////////////////////////////////////////////////////////////////
+ //
+ // Helpers
+ //
+ /////////////////////////////////////////////////////////////////////
+
+ def config = store.config
+
+ def directory = config.directory
+
+ /////////////////////////////////////////////////////////////////////
+ //
+ // Public interface used by the LevelDBStore
+ //
+ /////////////////////////////////////////////////////////////////////
+
+ var sync = false;
+ var verify_checksums = false;
+
+ var log: RecordLog = _
+
+ var snappy_compress_logs = false
+ var index: RichDB = _
+ var index_options: Options = _
+
+ var last_index_snapshot_ts = System.currentTimeMillis()
+ var last_index_snapshot_pos: Long = _
+ val snapshot_rw_lock = new ReentrantReadWriteLock(true)
+
+ var factory: DBFactory = _
+ val log_refs = HashMap[Long, LongCounter]()
+
+ def dirty_index_file = directory / ("dirty" + INDEX_SUFFIX)
+
+ def temp_index_file = directory / ("temp" + INDEX_SUFFIX)
+
+ def snapshot_index_file(id: Long) = create_sequence_file(directory, id, INDEX_SUFFIX)
+
+ def create_log: RecordLog = {
+ new RecordLog(directory, LOG_SUFFIX)
+ }
+
+ def log_size = {
+ Option(config.log_size).map(MemoryPropertyEditor.parse(_)).getOrElse(1024 * 1024 * 100L)
+ }
+
+ def start() = {
+ import OptionSupport._
+
+
+ val factory_names = Option(config.index_factory).getOrElse("org.fusesource.leveldbjni.JniDBFactory, org.iq80.leveldb.impl.Iq80DBFactory")
+ factory = factory_names.split("""(,|\s)+""").map(_.trim()).flatMap {
+ name =>
+ try {
+ Some(Broker.class_loader.loadClass(name).newInstance().asInstanceOf[DBFactory])
+ } catch {
+ case x: Throwable =>
+ None
+ }
+ }.headOption.getOrElse(throw new Exception("Could not load any of the index factory classes: " + factory_names))
+
+ if (factory.getClass.getName == "org.iq80.leveldb.impl.Iq80DBFactory") {
+ warn("Using the pure java LevelDB implementation which is still experimental. If the JNI version is not available for your platform, please switch to the BDB store instead. http://activemq.apache.org/apollo/documentation/user-manual.html#BDB_Store")
+ }
+
+ sync = config.sync.getOrElse(true);
+ verify_checksums = config.verify_checksums.getOrElse(false);
+
+ index_options = new Options();
+ index_options.createIfMissing(true);
+ val paranoid_checks = config.paranoid_checks.getOrElse(false)
+
+ config.index_max_open_files.foreach(index_options.maxOpenFiles(_))
+ config.index_block_restart_interval.foreach(index_options.blockRestartInterval(_))
+ index_options.paranoidChecks(paranoid_checks)
+ Option(config.index_write_buffer_size).map(MemoryPropertyEditor.parse(_).toInt).foreach(index_options.writeBufferSize(_))
+ Option(config.index_block_size).map(MemoryPropertyEditor.parse(_).toInt).foreach(index_options.blockSize(_))
+ Option(config.index_compression).foreach(x => index_options.compressionType(x match {
+ case "snappy" => CompressionType.SNAPPY
+ case "none" => CompressionType.NONE
+ case _ => CompressionType.SNAPPY
+ }))
+
+ if (Option(config.log_compression).map(_.toLowerCase).getOrElse("snappy") == "snappy" && Snappy != null) {
+ snappy_compress_logs = true
+ }
+
+ index_options.cacheSize(Option(config.index_cache_size).map(MemoryPropertyEditor.parse(_).toLong).getOrElse(1024 * 1024 * 256L))
+ index_options.logger(new Logger() {
+ def log(msg: String) = trace(msg.stripSuffix("\n"))
+ })
+
+ log = create_log
+ log.sync = sync
+ log.logSize = log_size
+ log.verify_checksums = verify_checksums
+ log.on_log_rotate = () => {
+ // lets queue a request to checkpoint when
+ // the logs rotate.. queue it on the GC thread since GC's lock
+ // the index for a long time.
+ store.write_executor {
+ snapshot_index
+ }
+ }
+
+ lock_file = new LockFile(directory / "lock", true)
+ def time[T](func: => T): Long = {
+ val start = System.nanoTime()
+ func
+ System.nanoTime() - start
+ }
+
+ // Lock before we open anything..
+ lock_store
+
+ // Lets check store compatibility...
+ val version_file = directory / "store-version.txt"
+ if (version_file.exists()) {
+ val ver = try {
+ var tmp: String = version_file.read_text().trim()
+ if (tmp.startsWith(STORE_SCHEMA_PREFIX)) {
+ tmp.stripPrefix(STORE_SCHEMA_PREFIX).toInt
+ } else {
+ -1
+ }
+ } catch {
+ case e => throw new Exception("Unexpected version file format: " + version_file)
+ }
+ ver match {
+ case STORE_SCHEMA_VERSION => // All is good.
+ case _ => throw new Exception("Cannot open the store. It's schema version is not supported.")
+ }
+ }
+ version_file.write_text(STORE_SCHEMA_PREFIX + STORE_SCHEMA_VERSION)
+
+ val log_open_duration = time {
+ retry {
+ log.open
+ }
+ }
+ info("Opening the log file took: %.2f ms", (log_open_duration / TimeUnit.MILLISECONDS.toNanos(1).toFloat))
+
+ // Find out what was the last snapshot.
+ val snapshots = find_sequence_files(directory, INDEX_SUFFIX)
+ var last_snapshot_index = snapshots.lastOption
+ last_index_snapshot_pos = last_snapshot_index.map(_._1).getOrElse(0)
+
+ // Only keep the last snapshot..
+ snapshots.filterNot(_._1 == last_index_snapshot_pos).foreach(_._2.recursive_delete)
+ temp_index_file.recursive_delete // usually does not exist.
+
+ retry {
+
+ // Delete the dirty indexes
+ dirty_index_file.recursive_delete
+ dirty_index_file.mkdirs()
+
+ last_snapshot_index.foreach {
+ case (id, file) =>
+ // Resume log replay from a snapshot of the index..
+ try {
+ file.list_files.foreach {
+ file =>
+ link(file, dirty_index_file / file.getName)
+ }
+ } catch {
+ case e: Exception =>
+ warn(e, "Could not recover snapshot of the index: " + e)
+ last_snapshot_index = None
+ }
+ }
+
+ index = new RichDB(factory.open(dirty_index_file, index_options));
+
+ try {
+ load_log_refs
+ index.put(dirty_index_key, TRUE)
+
+ if (paranoid_checks) {
+ check_index_integrity(index)
+ }
+
+ // Update the index /w what was stored on the logs..
+ var pos = last_index_snapshot_pos;
+
+ var last_reported_at = System.currentTimeMillis();
+ var showing_progress = false
+ var last_reported_pos = 0L
+
+ def remaining(eta: Double) = {
+ if (eta > 60 * 60) {
+ "%.2f hrs".format(eta / (60 * 60))
+ } else if (eta > 60) {
+ "%.2f mins".format(eta / 60)
+ } else {
+ "%.0f secs".format(eta)
+ }
+ }
+
+ var replay_operations = 0
+ val log_replay_duration = time {
+ while (pos < log.appender_limit) {
+
+ val now = System.currentTimeMillis();
+ if (now > last_reported_at + 1000) {
+ val at = pos - last_index_snapshot_pos
+ val total = log.appender_limit - last_index_snapshot_pos
+ val rate = (pos - last_reported_pos) * 1000.0 / (now - last_reported_at)
+ val eta = (total - at) / rate
+
+ System.out.print("Replaying recovery log: %f%% done (%,d/%,d bytes) @ %,.2f kb/s, %s remaining. \r".format(
+ at * 100.0 / total, at, total, rate / 1024, remaining(eta)))
+ showing_progress = true;
+ last_reported_at = now
+ last_reported_pos = pos
+ }
+
+ log.read(pos).map {
+ case (kind, data, next_pos) =>
+ kind match {
+ case LOG_ADD_QUEUE_ENTRY =>
+ replay_operations += 1
+ val record = QueueEntryPB.FACTORY.parseUnframed(data)
+
+ val index_record = record.copy()
+ index_record.clearQueueKey()
+ index_record.clearQueueSeq()
+ index.put(encode_key(queue_entry_prefix, record.getQueueKey, record.getQueueSeq), index_record.freeze().toUnframedBuffer)
+
+ log_ref_increment(decode_vlong(record.getMessageLocator))
+
+ case LOG_REMOVE_QUEUE_ENTRY =>
+ replay_operations += 1
+ index.get(data, new ReadOptions).foreach {
+ value =>
+ val record = QueueEntryPB.FACTORY.parseUnframed(value)
+ val pos = decode_vlong(record.getMessageLocator)
+ pos.foreach(log_ref_decrement(_))
+ index.delete(data)
+ }
+
+ case LOG_ADD_QUEUE =>
+ replay_operations += 1
+ val record = QueuePB.FACTORY.parseUnframed(data)
+ index.put(encode_key(queue_prefix, record.getKey), data)
+
+ case LOG_REMOVE_QUEUE =>
+ replay_operations += 1
+ val ro = new ReadOptions
+ ro.fillCache(false)
+ ro.verifyChecksums(verify_checksums)
+ val queue_key = decode_vlong(data)
+ index.delete(encode_key(queue_prefix, queue_key))
+ index.cursor_prefixed(encode_key(queue_entry_prefix, queue_key), ro) {
+ (key, value) =>
+ index.delete(key)
+
+ // Figure out what log file that message entry was in so we can,
+ // decrement the log file reference.
+ val record = QueueEntryPB.FACTORY.parseUnframed(value)
+ val pos = decode_vlong(record.getMessageLocator)
+ log_ref_decrement(pos)
+ true
+ }
+
+ case LOG_MAP_ENTRY | LOG_MAP_ENTRY_SNAPPY =>
+ replay_operations += 1
+ val entry = MapEntryPB.FACTORY.parseUnframed(data)
+ if (entry.getValue == null) {
+ index.delete(encode_key(map_prefix, entry.getKey))
+ } else {
+ index.put(encode_key(map_prefix, entry.getKey), entry.getValue.toByteArray)
+ }
+ case _ =>
+ // Skip records which don't require index updates.
+ }
+ pos = next_pos
+ }
+ }
+ if (replay_operations > 0) {
+ snapshot_index
+ }
+ }
+
+ if (showing_progress) {
+ System.out.println("Replaying recovery log: done. %d operations recovered in %s".format(replay_operations, log_replay_duration.toDouble / TimeUnit.SECONDS.toNanos(1)));
+ }
+
+ } catch {
+ case e: Throwable =>
+ // replay failed.. good thing we are in a retry block...
+ index.close
+ throw e;
+ }
+ }
+ }
+
+ def check_index_integrity(index: RichDB) = {
+ val actual_log_refs = HashMap[Long, LongCounter]()
+ var referenced_queues = Set[Long]()
+
+ // Lets find out what the queue entries are..
+ var fixed_records = 0
+ index.cursor_prefixed(queue_entry_prefix_array) {
+ (key, value) =>
+ try {
+ val (_, queue_key, seq_key) = decode_long_long_key(key)
+ val record = QueueEntryPB.FACTORY.parseUnframed(value)
+ val (pos, len) = decode_locator(record.getMessageLocator)
+ if (record.getQueueKey != queue_key) {
+ throw new IOException("key missmatch")
+ }
+ if (record.getQueueSeq != seq_key) {
+ throw new IOException("key missmatch")
+ }
+ log.log_info(pos).foreach {
+ log_info =>
+ actual_log_refs.getOrElseUpdate(log_info.position, new LongCounter()).incrementAndGet()
+ }
+ referenced_queues += queue_key
+ } catch {
+ case e =>
+ trace("invalid queue entry record: %s, error: %s", new Buffer(key), e)
+ fixed_records += 1
+ // Invalid record.
+ index.delete(key)
+ }
+ true
+ }
+
+ // Lets cross check the queues.
+ index.cursor_prefixed(queue_prefix_array) {
+ (key, value) =>
+ try {
+ val (_, queue_key) = decode_long_key(key)
+ val record = QueuePB.FACTORY.parseUnframed(value)
+ if (record.getKey != queue_key) {
+ throw new IOException("key missmatch")
+ }
+ referenced_queues -= queue_key
+ } catch {
+ case e =>
+ trace("invalid queue record: %s, error: %s", new Buffer(key), e)
+ fixed_records += 1
+ // Invalid record.
+ index.delete(key)
+ }
+ true
+ }
+
+ referenced_queues.foreach {
+ queue_key =>
+ // We have queue entries for a queue that does not exist..
+ index.cursor_prefixed(encode_key(queue_entry_prefix, queue_key)) {
+ (key, value) =>
+ trace("invalid queue entry record: %s, error: queue key does not exits %s", new Buffer(key), queue_key)
+ fixed_records += 1
+ index.delete(key)
+ val record = QueueEntryPB.FACTORY.parseUnframed(value)
+ val pos = decode_vlong(record.getMessageLocator)
+ log.log_info(pos).foreach {
+ log_info =>
+ actual_log_refs.get(log_info.position).foreach {
+ counter =>
+ if (counter.decrementAndGet() == 0) {
+ actual_log_refs.remove(log_info.position)
+ }
+ }
+ }
+ true
+ }
+ }
+
+ if (actual_log_refs != log_refs) {
+ debug("expected != actual log references. expected: %s, actual %s", log_refs, actual_log_refs)
+ log_refs.clear()
+ log_refs ++= actual_log_refs
+ }
+
+ if (fixed_records > 0) {
+ warn("Fixed %d invalid index enties in the leveldb store", fixed_records)
+ }
+ }
+
+ var lock_file: LockFile = _
+
+ def lock_store = {
+ import OptionSupport._
+ if (config.fail_if_locked.getOrElse(false)) {
+ lock_file.lock()
+ } else {
+ retry {
+ lock_file.lock()
+ }
+ }
+ }
+
+ def unlock_store = {
+ lock_file.unlock()
+ }
+
+ private def store_log_refs = {
+ index.put(log_refs_index_key, JsonCodec.encode(collection.JavaConversions.mapAsJavaMap(log_refs.mapValues(_.get()))).toByteArray)
+ }
+
+ private def load_log_refs = {
+ log_refs.clear()
+ index.get(log_refs_index_key, new ReadOptions).foreach {
+ value =>
+ val javamap = JsonCodec.decode(new Buffer(value), classOf[java.util.Map[String, Object]])
+ collection.JavaConversions.mapAsScalaMap(javamap).foreach {
+ case (k, v) =>
+ log_refs.put(k.toLong, new LongCounter(v.asInstanceOf[Number].longValue()))
+ }
+ }
+ }
+
+ def stop() = {
+ // this blocks until all io completes..
+ // Suspend also deletes the index.
+ suspend()
+
+ if (log != null) {
+ log.close
+ }
+ copy_dirty_index_to_snapshot
+ log = null
+ unlock_store
+ }
+
+ def using_index[T](func: => T): T = {
+ val lock = snapshot_rw_lock.readLock();
+ lock.lock()
+ try {
+ func
+ } finally {
+ lock.unlock()
+ }
+ }
+
+ def retry_using_index[T](func: => T): T = retry(using_index(func))
+
+ /**
+ * TODO: expose this via management APIs, handy if you want to
+ * do a file system level snapshot and want the data to be consistent.
+ */
+ def suspend() = {
+ // Make sure we are the only ones accessing the index. since
+ // we will be closing it to create a consistent snapshot.
+ snapshot_rw_lock.writeLock().lock()
+
+ // Close the index so that it's files are not changed async on us.
+ store_log_refs
+ index.put(dirty_index_key, FALSE, new WriteOptions().sync(true))
+ index.close
+ }
+
+ /**
+ * TODO: expose this via management APIs, handy if you want to
+ * do a file system level snapshot and want the data to be consistent.
+ */
+ def resume() = {
+ // re=open it..
+ retry {
+ index = new RichDB(factory.open(dirty_index_file, index_options));
+ index.put(dirty_index_key, TRUE)
+ }
+ snapshot_rw_lock.writeLock().unlock()
+ }
+
+ def copy_dirty_index_to_snapshot {
+ if (log.appender_limit == last_index_snapshot_pos) {
+ // no need to snapshot again...
+ return
+ }
+
+ // Where we start copying files into. Delete this on
+ // restart.
+ val tmp_dir = temp_index_file
+ tmp_dir.mkdirs()
+
+ try {
+
+ // Hard link all the index files.
+ dirty_index_file.list_files.foreach {
+ file =>
+ link(file, tmp_dir / file.getName)
+ }
+
+ // Rename to signal that the snapshot is complete.
+ val new_snapshot_index_pos = log.appender_limit
+ tmp_dir.renameTo(snapshot_index_file(new_snapshot_index_pos))
+ snapshot_index_file(last_index_snapshot_pos).recursive_delete
+ last_index_snapshot_pos = new_snapshot_index_pos
+ last_index_snapshot_ts = System.currentTimeMillis()
+
+ } catch {
+ case e: Exception =>
+ // if we could not snapshot for any reason, delete it as we don't
+ // want a partial check point..
+ warn(e, "Could not snapshot the index: " + e)
+ tmp_dir.recursive_delete
+ }
+ }
+
+ def snapshot_index: Unit = {
+ if (log.appender_limit == last_index_snapshot_pos) {
+ // no need to snapshot again...
+ return
+ }
+ suspend()
+ try {
+ copy_dirty_index_to_snapshot
+ } finally {
+ resume()
+ }
+ }
+
+ def retry[T](func: => T): T = {
+ var error: Throwable = null
+ var rc: Option[T] = None
+
+ // We will loop until the tx succeeds. Perhaps it's
+ // failing due to a temporary condition like low disk space.
+ while (!rc.isDefined) {
+
+ try {
+ rc = Some(func)
+ } catch {
+ case e: Throwable =>
+ if (error == null) {
+ warn(e, "DB operation failed. (entering recovery mode): " + e)
+ }
+ error = e
+ }
+
+ if (!rc.isDefined) {
+ // We may need to give up if the store is being stopped.
+ if (!store.service_state.is_starting_or_started) {
+ throw error
+ }
+ Thread.sleep(1000)
+ }
+ }
+
+ if (error != null) {
+ info("DB recovered from failure.")
+ }
+ rc.get
+ }
+
+ def purge() = {
+ suspend()
+ try {
+ log.close
+ directory.list_files.foreach(_.recursive_delete)
+ log_refs.clear()
+ } finally {
+ retry {
+ log.open
+ }
+ resume()
+ }
+ }
+
+ def add_queue(record: QueueRecord, callback: Runnable) = {
+ retry_using_index {
+ log.appender {
+ appender =>
+ val value: Buffer = PBSupport.to_pb(record).freeze().toUnframedBuffer
+ appender.append(LOG_ADD_QUEUE, value)
+ index.put(encode_key(queue_prefix, record.key), value)
+ }
+ }
+ callback.run
+ }
+
+ def log_ref_decrement(pos: Long, log_info: LogInfo = null) = this.synchronized {
+ Option(log_info).orElse(log.log_info(pos)) match {
+ case Some(log_info) =>
+ log_refs.get(log_info.position).foreach {
+ counter =>
+ val count = counter.decrementAndGet()
+ if (count == 0) {
+ log_refs.remove(log_info.position)
+ }
+ }
+ case None =>
+ warn("Invalid log position: " + pos)
+ }
+ }
+
+ def log_ref_increment(pos: Long, log_info: LogInfo = null) = this.synchronized {
+ Option(log_info).orElse(log.log_info(pos)) match {
+ case Some(log_info) =>
+ val count = log_refs.getOrElseUpdate(log_info.position, new LongCounter()).incrementAndGet()
+ case None =>
+ warn("Invalid log position: " + pos)
+ }
+ }
+
+ def remove_queue(queue_key: Long, callback: Runnable) = {
+ retry_using_index {
+ log.appender {
+ appender =>
+ val ro = new ReadOptions
+ ro.fillCache(false)
+ ro.verifyChecksums(verify_checksums)
+ appender.append(LOG_REMOVE_QUEUE, encode_vlong(queue_key))
+ index.delete(encode_key(queue_prefix, queue_key))
+ index.cursor_prefixed(encode_key(queue_entry_prefix, queue_key), ro) {
+ (key, value) =>
+ index.delete(key)
+
+ // Figure out what log file that message entry was in so we can,
+ // decrement the log file reference.
+ val record = QueueEntryPB.FACTORY.parseUnframed(value)
+ val pos = decode_vlong(record.getMessageLocator)
+ log_ref_decrement(pos)
+ true
+ }
+ }
+ }
+ callback.run
+ }
+
+ def store(uows: Seq[LevelDBStore#DelayableUOW], callback: Runnable) {
+ retry_using_index {
+ log.appender {
+ appender =>
+
+ var sync_needed = false
+ index.write() {
+ batch =>
+ uows.foreach {
+ uow =>
+
+ for ((key, value) <- uow.map_actions) {
+ val entry = new MapEntryPB.Bean()
+ entry.setKey(key)
+ if (value == null) {
+ batch.delete(encode_key(map_prefix, key))
+ } else {
+ entry.setValue(value)
+ batch.put(encode_key(map_prefix, key), value.toByteArray)
+ }
+ var log_data = entry.freeze().toUnframedBuffer
+
+ appender.append(LOG_MAP_ENTRY, log_data)
+ }
+
+ uow.actions.foreach {
+ case (msg, action) =>
+ val message_record = action.message_record
+ var locator: (Long, Int) = null
+ var log_info: LogInfo = null
+
+ if (message_record != null) {
+
+ val pb = new MessagePB.Bean
+ pb.setProtocol(message_record.protocol)
+ pb.setSize(message_record.size)
+ pb.setValue(message_record.buffer)
+ var message_data = pb.freeze().toUnframedBuffer
+
+ val p = if (snappy_compress_logs) {
+ val compressed = Snappy.compress(message_data)
+ if (compressed.length < message_data.length) {
+ message_data = compressed
+ appender.append(LOG_ADD_MESSAGE_SNAPPY, message_data)
+ } else {
+ appender.append(LOG_ADD_MESSAGE, message_data)
+ }
+ } else {
+ appender.append(LOG_ADD_MESSAGE, message_data)
+ }
+ locator = (p._1, message_data.length)
+ log_info = p._2
+ message_record.locator.set(locator);
+ }
+
+ action.dequeues.foreach {
+ entry =>
+ if (locator == null) {
+ locator = entry.message_locator.get().asInstanceOf[(Long, Int)]
+ }
+ assert(locator != null)
+ val (pos, len) = locator
+ val key = encode_key(queue_entry_prefix, entry.queue_key, entry.entry_seq)
+
+ appender.append(LOG_REMOVE_QUEUE_ENTRY, key)
+ batch.delete(key)
+ log_ref_decrement(pos, log_info)
+ }
+
+ var locator_buffer: Buffer = null
+ action.enqueues.foreach {
+ entry =>
+ assert(locator != null)
+ val (pos, len) = locator
+ if (locator_buffer == null) {
+ locator_buffer = encode_locator(pos, len)
+ }
+
+ entry.message_locator.set(locator)
+
+ val log_record = new QueueEntryPB.Bean
+ // TODO: perhaps we should normalize the sender to make the index entries more compact.
+ log_record.setSender(entry.sender)
+ log_record.setMessageLocator(locator_buffer)
+ log_record.setQueueKey(entry.queue_key)
+ log_record.setQueueSeq(entry.entry_seq)
+ log_record.setSize(entry.size)
+ if (entry.expiration != 0)
+ log_record.setExpiration(entry.expiration)
+ if (entry.redeliveries != 0)
+ log_record.setRedeliveries(entry.redeliveries)
+
+ appender.append(LOG_ADD_QUEUE_ENTRY, log_record.freeze().toUnframedBuffer)
+
+ // Slim down the index record, the smaller it is the cheaper the compactions
+ // will be and the more we can cache in mem.
+ val index_record = log_record.copy()
+ index_record.clearQueueKey()
+ index_record.clearQueueSeq()
+ batch.put(encode_key(queue_entry_prefix, entry.queue_key, entry.entry_seq), index_record.freeze().toUnframedBuffer)
+
+ // Increment it.
+ log_ref_increment(pos, log_info)
+
+ }
+ }
+ if (uow.flush_sync) {
+ sync_needed = true
+ }
+ }
+ }
+ if (sync_needed && sync) {
+ appender.flush
+ appender.force
+ }
+ }
+ }
+ callback.run
+ }
+
+ val metric_load_from_index_counter = new TimeCounter
+ var metric_load_from_index = metric_load_from_index_counter(false)
+
+ def loadMessages(requests: ListBuffer[(Long, AtomicReference[Object], (Option[MessageRecord]) => Unit)]): Unit = {
+
+ val ro = new ReadOptions
+ ro.verifyChecksums(verify_checksums)
+ ro.fillCache(true)
+
+ val missing = retry_using_index {
+ index.snapshot {
+ snapshot =>
+ ro.snapshot(snapshot)
+ requests.flatMap {
+ x =>
+ val (_, locator, callback) = x
+ val record = metric_load_from_index_counter.time {
+ val (pos, len) = locator.get().asInstanceOf[(Long, Int)]
+ log.read(pos, len).map {
+ case (kind, data) =>
+
+ val msg_data = kind match {
+ case LOG_ADD_MESSAGE => data
+ case LOG_ADD_MESSAGE_SNAPPY => Snappy.uncompress(data)
+ }
+ val rc = PBSupport.from_pb(MessagePB.FACTORY.parseUnframed(msg_data))
+ rc.locator = locator
+ assert(rc.protocol != null)
+ rc
+ }
+ }
+ if (record.isDefined) {
+ callback(record)
+ None
+ } else {
+ Some(x)
+ }
+ }
+ }
+ }
+
+ if (missing.isEmpty)
+ return
+
+ // There's a small chance that a message was missing, perhaps we started a read tx, before the
+ // write tx completed. Lets try again..
+ retry_using_index {
+ index.snapshot {
+ snapshot =>
+ ro.snapshot(snapshot)
+ missing.foreach {
+ x =>
+ val (_, locator, callback) = x
+ val record: Option[MessageRecord] = metric_load_from_index_counter.time {
+ val (pos, len) = locator.get().asInstanceOf[(Long, Int)]
+ log.read(pos, len).map {
+ case (kind, data) =>
+ val msg_data = kind match {
+ case LOG_ADD_MESSAGE => data
+ case LOG_ADD_MESSAGE_SNAPPY => Snappy.uncompress(data)
+ }
+ val rc = PBSupport.from_pb(MessagePB.FACTORY.parseUnframed(msg_data))
+ rc.locator = locator
+ assert(rc.protocol != null)
+ rc
+ }
+ }
+ callback(record)
+ }
+ }
+ }
+ }
+
+ def list_queues: Seq[Long] = {
+ val rc = ListBuffer[Long]()
+ retry_using_index {
+ val ro = new ReadOptions
+ ro.verifyChecksums(verify_checksums)
+ ro.fillCache(false)
+ index.cursor_keys_prefixed(queue_prefix_array, ro) {
+ key =>
+ rc += decode_long_key(key)._2
+ true // to continue cursoring.
+ }
+ }
+ rc
+ }
+
+ def get_queue(queue_key: Long): Option[QueueRecord] = {
+ retry_using_index {
+ val ro = new ReadOptions
+ ro.fillCache(false)
+ ro.verifyChecksums(verify_checksums)
+ index.get(encode_key(queue_prefix, queue_key), ro).map {
+ x =>
+ PBSupport.from_pb(QueuePB.FACTORY.parseUnframed(x))
+ }
+ }
+ }
+
+ def listQueueEntryGroups(queue_key: Long, limit: Int): Seq[QueueEntryRange] = {
+ var rc = ListBuffer[QueueEntryRange]()
+ val ro = new ReadOptions
+ ro.verifyChecksums(verify_checksums)
+ ro.fillCache(false)
+ retry_using_index {
+ index.snapshot {
+ snapshot =>
+ ro.snapshot(snapshot)
+
+ var group: QueueEntryRange = null
+ index.cursor_prefixed(encode_key(queue_entry_prefix, queue_key), ro) {
+ (key, value) =>
+
+ val (_, _, current_key) = decode_long_long_key(key)
+ if (group == null) {
+ group = new QueueEntryRange
+ group.first_entry_seq = current_key
+ }
+
+ val entry = QueueEntryPB.FACTORY.parseUnframed(value)
+ val pos = decode_vlong(entry.getMessageLocator)
+
+ group.last_entry_seq = current_key
+ group.count += 1
+ group.size += entry.getSize
+
+ if (group.expiration == 0) {
+ group.expiration = entry.getExpiration
+ } else {
+ if (entry.getExpiration != 0) {
+ group.expiration = entry.getExpiration.min(group.expiration)
+ }
+ }
+
+ if (group.count == limit) {
+ rc += group
+ group = null
+ }
+
+ true // to continue cursoring.
+ }
+ if (group != null) {
+ rc += group
+ }
+ }
+ }
+ rc
+ }
+
+ def getQueueEntries(queue_key: Long, firstSeq: Long, lastSeq: Long): Seq[QueueEntryRecord] = {
+ var rc = ListBuffer[QueueEntryRecord]()
+ val ro = new ReadOptions
+ ro.verifyChecksums(verify_checksums)
+ ro.fillCache(true)
+ retry_using_index {
+ index.snapshot {
+ snapshot =>
+ ro.snapshot(snapshot)
+ val start = encode_key(queue_entry_prefix, queue_key, firstSeq)
+ val end = encode_key(queue_entry_prefix, queue_key, lastSeq + 1)
+ index.cursor_range(start, end, ro) {
+ (key, value) =>
+ val (_, _, queue_seq) = decode_long_long_key(key)
+ val record = QueueEntryPB.FACTORY.parseUnframed(value)
+ val entry = PBSupport.from_pb(record)
+ entry.queue_key = queue_key
+ entry.entry_seq = queue_seq
+ entry.message_locator = new AtomicReference[Object](decode_locator(record.getMessageLocator))
+ rc += entry
+ true
+ }
+ }
+ }
+ rc
+ }
+
+ def getLastMessageKey: Long = 0
+
+ def get(key: Buffer): Option[Buffer] = {
+ retry_using_index {
+ index.get(encode_key(map_prefix, key)).map(new Buffer(_))
+ }
+ }
+
+ def get_prefixed_map_entries(prefix: Buffer): Seq[(Buffer, Buffer)] = {
+ val rc = ListBuffer[(Buffer, Buffer)]()
+ retry_using_index {
+ index.cursor_prefixed(encode_key(map_prefix, prefix)) {
+ (key, value) =>
+ rc += new Buffer(key) -> new Buffer(value)
+ true
+ }
+ }
+ rc
+ }
+
+ def get_last_queue_key: Long = {
+ retry_using_index {
+ index.last_key(queue_prefix_array).map(decode_long_key(_)._2).getOrElse(0)
+ }
+ }
+
+ def gc: Unit = {
+
+ // TODO:
+ // Perhaps we should snapshot_index if the current snapshot is old.
+ //
+ import collection.JavaConversions._
+ last_index_snapshot_pos
+ val empty_journals = log.log_infos.keySet.toSet -- log_refs.keySet
+
+ // We don't want to delete any journals that the index has not snapshot'ed or
+ // the the
+ val delete_limit = log.log_info(last_index_snapshot_pos).map(_.position).
+ getOrElse(last_index_snapshot_pos).min(log.appender_start)
+
+ empty_journals.foreach {
+ id =>
+ if (id < delete_limit) {
+ log.delete(id)
+ }
+ }
+ }
+
+ case class UsageCounter(info: LogInfo) {
+ var count = 0L
+ var size = 0L
+ var first_reference_queue: QueueRecord = _
+
+ def increment(value: Int) = {
+ count += 1
+ size += value
+ }
+ }
+
+ //
+ // Collects detailed usage information about the journal like who's referencing it.
+ //
+ // def get_log_usage_details = {
+ //
+ // val usage_map = new ApolloTreeMap[Long,UsageCounter]()
+ // log.log_mutex.synchronized {
+ // log.log_infos.foreach(entry=> usage_map.put(entry._1, UsageCounter(entry._2)) )
+ // }
+ //
+ // def lookup_usage(pos: Long) = {
+ // var entry = usage_map.floorEntry(pos)
+ // if (entry != null) {
+ // val usage = entry.getValue()
+ // if (pos < usage.info.limit) {
+ // Some(usage)
+ // } else {
+ // None
+ // }
+ // } else {
+ // None
+ // }
+ // }
+ //
+ // val ro = new ReadOptions()
+ // ro.fillCache(false)
+ // ro.verifyChecksums(verify_checksums)
+ //
+ // retry_using_index {
+ // index.snapshot { snapshot =>
+ // ro.snapshot(snapshot)
+ //
+ // // Figure out which journal files are still in use by which queues.
+ // index.cursor_prefixed(queue_entry_prefix_array, ro) { (_,value) =>
+ //
+ // val entry_record:QueueEntryRecord = value
+ // val pos = if(entry_record.message_locator!=null) {
+ // Some(decode_locator(entry_record.message_locator)._1)
+ // } else {
+ // index.get(encode_key(message_prefix, entry_record.message_key)).map(decode_locator(_)._1)
+ // }
+ //
+ // pos.flatMap(lookup_usage(_)).foreach { usage =>
+ // if( usage.first_reference_queue == null ) {
+ // usage.first_reference_queue = index.get(encode_key(queue_prefix, entry_record.queue_key), ro).map( x=> decode_queue_record(x) ).getOrElse(null)
+ // }
+ // usage.increment(entry_record.size)
+ // }
+ //
+ // true
+ // }
+ // }
+ // }
+ //
+ // import collection.JavaConversions._
+ // usage_map.values.toSeq.toArray
+ // }
+
+
+ def export_data(os: OutputStream): Option[String] = {
+ try {
+ val manager = ExportStreamManager(os, 1)
+
+ retry_using_index {
+
+ // Delete all the tmp keys..
+ index.cursor_keys_prefixed(Array(tmp_prefix)) {
+ key =>
+ index.delete(key)
+ true
+ }
+
+ index.snapshot {
+ snapshot =>
+ val nocache = new ReadOptions
+ nocache.snapshot(snapshot)
+ nocache.verifyChecksums(verify_checksums)
+ nocache.fillCache(false)
+
+ val cache = new ReadOptions
+ nocache.snapshot(snapshot)
+ nocache.verifyChecksums(false)
+ nocache.fillCache(false)
+
+ // Build a temp table of all references messages by the queues
+ // Remember 2 queues could reference the same message.
+ index.cursor_prefixed(queue_entry_prefix_array, cache) {
+ (_, value) =>
+ val record = QueueEntryPB.FACTORY.parseUnframed(value)
+ val (pos, len) = decode_locator(record.getMessageLocator)
+ index.put(encode_key(tmp_prefix, pos), encode_vlong(len))
+ true
+ }
+
+ // Use the temp table to export all the referenced messages. Use
+ // the log position as the message key.
+ index.cursor_prefixed(Array(tmp_prefix)) {
+ (key, value) =>
+ val (_, pos) = decode_long_key(key)
+ val len = decode_vlong(value).toInt
+ log.read(pos, len).foreach {
+ case (kind, data) =>
+ val msg_data = kind match {
+ case LOG_ADD_MESSAGE => data
+ case LOG_ADD_MESSAGE_SNAPPY => Snappy.uncompress(data)
+ }
+ val record = MessagePB.FACTORY.parseUnframed(msg_data).copy()
+ record.setMessageKey(pos)
+ manager.store_message(record)
+ }
+ true
+ }
+
+ // Now export the queue entries
+ index.cursor_prefixed(queue_entry_prefix_array, nocache) {
+ (key, value) =>
+ val (_, queue_key, queue_seq) = decode_long_long_key(key)
+ val record = QueueEntryPB.FACTORY.parseUnframed(value).copy()
+ val (pos, len) = decode_locator(record.getMessageLocator)
+ record.setQueueKey(queue_key)
+ record.setQueueSeq(queue_seq)
+ record.setMessageKey(pos)
+ manager.store_queue_entry(record)
+ true
+ }
+
+ index.cursor_prefixed(queue_prefix_array) {
+ (_, value) =>
+ val record = QueuePB.FACTORY.parseUnframed(value)
+ manager.store_queue(record)
+ true
+ }
+
+ index.cursor_prefixed(map_prefix_array, nocache) {
+ (key, value) =>
+ val key_buffer = new Buffer(key)
+ key_buffer.moveHead(1)
+ val record = new MapEntryPB.Bean
+ record.setKey(key_buffer)
+ record.setValue(new Buffer(value))
+ manager.store_map_entry(record)
+ true
+ }
+
+ }
+
+ // Delete all the tmp keys..
+ index.cursor_keys_prefixed(Array(tmp_prefix)) {
+ key =>
+ index.delete(key)
+ true
+ }
+
+ }
+ manager.finish
+
+ None
+ } catch {
+ case x: Exception =>
+ debug(x, "Export failed")
+ x.printStackTrace()
+ Some(x.getMessage)
+ }
+ }
+
+ def import_data(is: InputStream): Option[String] = {
+ try {
+ val manager = ImportStreamManager(is)
+ if (manager.version != 1) {
+ return Some("Cannot import from an export file of version: " + manager.version)
+ }
+
+ purge
+
+ retry_using_index {
+ log.appender {
+ appender =>
+ while (manager.getNext match {
+
+ case record: MessagePB.Buffer =>
+ var message_data = record.toUnframedBuffer
+ val (pos, _) = if (snappy_compress_logs) {
+ val compressed = Snappy.compress(message_data)
+ if (compressed.length < message_data.length) {
+ message_data = compressed
+ appender.append(LOG_ADD_MESSAGE_SNAPPY, message_data)
+ } else {
+ appender.append(LOG_ADD_MESSAGE, message_data)
+ }
+ } else {
+ appender.append(LOG_ADD_MESSAGE, message_data)
+ }
+ index.put(encode_key(tmp_prefix, record.getMessageKey), encode_locator(pos, message_data.length))
+ true
+
+ case record: QueueEntryPB.Buffer =>
+ val copy = record.copy();
+ var original_msg_key: Long = record.getMessageKey
+ index.get(encode_key(tmp_prefix, original_msg_key)) match {
+ case Some(locator) =>
+ val (pos, len) = decode_locator(locator)
+ copy.setMessageLocator(locator)
+ index.put(encode_key(queue_entry_prefix, record.getQueueKey, record.getQueueSeq), copy.freeze().toUnframedBuffer)
+ log.log_info(pos).foreach {
+ log_info =>
+ log_refs.getOrElseUpdate(log_info.position, new LongCounter()).incrementAndGet()
+ }
+ case None =>
+ println("Invalid queue entry, references message that was not in the export: " + original_msg_key)
+ }
+ true
+
+ case record: QueuePB.Buffer =>
+ index.put(encode_key(queue_prefix, record.getKey), record.toUnframedBuffer)
+ true
+
+ case record: MapEntryPB.Buffer =>
+ index.put(encode_key(map_prefix, record.getKey), record.getValue)
+ true
+
+ case null =>
+ false
+ }) {
+ // keep looping
+ }
+
+ }
+ }
+
+ store_log_refs
+ // Delete all the tmp keys..
+ index.cursor_keys_prefixed(Array(tmp_prefix)) {
+ key =>
+ index.delete(key)
+ true
+ }
+
+ snapshot_index
+ None
+
+ } catch {
+ case x: Exception =>
+ debug(x, "Import failed")
+ Some(x.getMessage)
+ }
+ }
+}
Propchange: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala
------------------------------------------------------------------------------
svn:executable = *
Copied: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStore.scala (from r1245945, activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStore.scala?p2=activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStore.scala&p1=activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala&r1=1245945&r2=1291054&rev=1291054&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStore.scala Sun Feb 19 20:24:15 2012
@@ -1,3 +1,5 @@
+package org.apache.activemq.apollo.broker.store.leveldb
+
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -14,7 +16,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.apollo.broker.store.leveldb
import dto.{LevelDBStoreDTO, LevelDBStoreStatusDTO}
import collection.Seq
@@ -40,31 +41,32 @@ object LevelDBStore extends Log {
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class LevelDBStore(val config:LevelDBStoreDTO) extends DelayingStoreSupport {
+class LevelDBStore(val config: LevelDBStoreDTO) extends DelayingStoreSupport {
var next_queue_key = new AtomicLong(1)
var next_msg_key = new AtomicLong(1)
- var write_executor:ExecutorService = _
- var read_executor:ExecutorService = _
+ var write_executor: ExecutorService = _
+ var read_executor: ExecutorService = _
+
+ var client: LevelDBClient = _
- var client:LevelDBClient = _
def create_client = new LevelDBClient(this)
def store_kind = "leveldb"
- override def toString = store_kind+" store at "+config.directory
+ override def toString = store_kind + " store at " + config.directory
override protected def locator_based = true
def flush_delay = config.flush_delay.getOrElse(500)
-
+
protected def get_next_msg_key = next_msg_key.getAndIncrement
- protected def store(uows: Seq[DelayableUOW])(callback: =>Unit) = {
+ protected def store(uows: Seq[DelayableUOW])(callback: => Unit) = {
write_executor {
- client.store(uows, ^{
+ client.store(uows, ^ {
dispatch_queue {
callback
}
@@ -98,16 +100,16 @@ class LevelDBStore(val config:LevelDBSto
poll_gc
on_completed.run
} catch {
- case e:Throwable =>
+ case e: Throwable =>
e.printStackTrace()
- LevelDBStore.error(e, "Store client startup failure: "+e)
+ LevelDBStore.error(e, "Store client startup failure: " + e)
}
}
}
catch {
- case e:Throwable =>
+ case e: Throwable =>
e.printStackTrace()
- LevelDBStore.error(e, "Store startup failure: "+e)
+ LevelDBStore.error(e, "Store startup failure: " + e)
}
}
@@ -131,15 +133,15 @@ class LevelDBStore(val config:LevelDBSto
ss.is_starting || ss.is_started
}
- def poll_gc:Unit = dispatch_queue.after(10, TimeUnit.SECONDS) {
- if( keep_polling ) {
+ def poll_gc: Unit = dispatch_queue.after(10, TimeUnit.SECONDS) {
+ if (keep_polling) {
gc {
poll_gc
}
}
}
- def gc(onComplete: =>Unit) = write_executor {
+ def gc(onComplete: => Unit) = write_executor {
client.gc
onComplete
}
@@ -153,7 +155,7 @@ class LevelDBStore(val config:LevelDBSto
/**
* Deletes all stored data from the store.
*/
- def purge(callback: =>Unit) = {
+ def purge(callback: => Unit) = {
write_executor {
client.purge()
next_queue_key.set(1)
@@ -169,7 +171,7 @@ class LevelDBStore(val config:LevelDBSto
}
}
- def get_prefixed_map_entries(prefix:Buffer)(callback: Seq[(Buffer, Buffer)]=>Unit) = {
+ def get_prefixed_map_entries(prefix: Buffer)(callback: Seq[(Buffer, Buffer)] => Unit) = {
read_executor {
callback(client.get_prefixed_map_entries(prefix))
}
@@ -178,7 +180,7 @@ class LevelDBStore(val config:LevelDBSto
/**
* Ges the last queue key identifier stored.
*/
- def get_last_queue_key(callback:(Option[Long])=>Unit):Unit = {
+ def get_last_queue_key(callback: (Option[Long]) => Unit): Unit = {
write_executor {
callback(Some(client.get_last_queue_key))
}
@@ -186,81 +188,91 @@ class LevelDBStore(val config:LevelDBSto
def add_queue(record: QueueRecord)(callback: (Boolean) => Unit) = {
write_executor {
- client.add_queue(record, ^{ callback(true) })
+ client.add_queue(record, ^ {
+ callback(true)
+ })
}
}
def remove_queue(queueKey: Long)(callback: (Boolean) => Unit) = {
write_executor {
- client.remove_queue(queueKey,^{ callback(true) })
+ client.remove_queue(queueKey, ^ {
+ callback(true)
+ })
}
}
def get_queue(queueKey: Long)(callback: (Option[QueueRecord]) => Unit) = {
write_executor {
- callback( client.get_queue(queueKey) )
+ callback(client.get_queue(queueKey))
}
}
def list_queues(callback: (Seq[Long]) => Unit) = {
write_executor {
- callback( client.list_queues )
+ callback(client.list_queues)
}
}
- val load_source = createSource(new ListEventAggregator[(Long, AtomicReference[Object], (Option[MessageRecord])=>Unit)](), dispatch_queue)
- load_source.setEventHandler(^{drain_loads});
+ val load_source = createSource(new ListEventAggregator[(Long, AtomicReference[Object], (Option[MessageRecord]) => Unit)](), dispatch_queue)
+ load_source.setEventHandler(^ {
+ drain_loads
+ });
load_source.resume
- def load_message(messageKey: Long, locator:AtomicReference[Object])(callback: (Option[MessageRecord]) => Unit) = {
- message_load_latency_counter.start { end=>
- load_source.merge((messageKey, locator, { (result)=>
- end()
- callback(result)
- }))
+ def load_message(messageKey: Long, locator: AtomicReference[Object])(callback: (Option[MessageRecord]) => Unit) = {
+ message_load_latency_counter.start {
+ end =>
+ load_source.merge((messageKey, locator, {
+ (result) =>
+ end()
+ callback(result)
+ }))
}
}
def drain_loads = {
var data = load_source.getData
message_load_batch_size_counter += data.size
- read_executor ^{
+ read_executor ^ {
client.loadMessages(data)
}
}
def list_queue_entry_ranges(queueKey: Long, limit: Int)(callback: (Seq[QueueEntryRange]) => Unit) = {
- write_executor ^{
- callback( client.listQueueEntryGroups(queueKey, limit) )
+ write_executor ^ {
+ callback(client.listQueueEntryGroups(queueKey, limit))
}
}
def list_queue_entries(queueKey: Long, firstSeq: Long, lastSeq: Long)(callback: (Seq[QueueEntryRecord]) => Unit) = {
- write_executor ^{
- callback( client.getQueueEntries(queueKey, firstSeq, lastSeq) )
+ write_executor ^ {
+ callback(client.getQueueEntries(queueKey, firstSeq, lastSeq))
}
}
- def poll_stats:Unit = {
+ def poll_stats: Unit = {
def displayStats = {
- if( service_state.is_started ) {
+ if (service_state.is_started) {
flush_latency = flush_latency_counter(true)
message_load_latency = message_load_latency_counter(true)
-// client.metric_journal_append = client.metric_journal_append_counter(true)
-// client.metric_index_update = client.metric_index_update_counter(true)
+ // client.metric_journal_append = client.metric_journal_append_counter(true)
+ // client.metric_index_update = client.metric_index_update_counter(true)
close_latency = close_latency_counter(true)
- message_load_batch_size = message_load_batch_size_counter(true)
+ message_load_batch_size = message_load_batch_size_counter(true)
poll_stats
}
}
- dispatch_queue.executeAfter(1, TimeUnit.SECONDS, ^{ displayStats })
+ dispatch_queue.executeAfter(1, TimeUnit.SECONDS, ^ {
+ displayStats
+ })
}
- def get_store_status(callback:(StoreStatusDTO)=>Unit) = dispatch_queue {
+ def get_store_status(callback: (StoreStatusDTO) => Unit) = dispatch_queue {
val rc = new LevelDBStoreStatusDTO
fill_store_status(rc)
rc.message_load_batch_size = message_load_batch_size
@@ -272,21 +284,24 @@ class LevelDBStore(val config:LevelDBSto
rc.log_stats = {
import collection.JavaConversions._
var row_layout = "%-20s | %-10s | %-10s\n"
- row_layout.format("Log File", "Msg Refs", "File Size")+
- client.log.log_infos.map{case (id,info)=> id -> client.log_refs.get(id).map(_.get)}.toSeq.flatMap { case (id, refs)=>
- try {
- val file = LevelDBClient.create_sequence_file(client.directory, id, LevelDBClient.LOG_SUFFIX)
- val size = file.length()
- Some(row_layout.format(
- file.getName,
- refs.getOrElse(0L).toString,
- ViewHelper.memory(size)
- ))
- } catch {
- case e:Throwable =>
- None
- }
- }.mkString("")
+ row_layout.format("Log File", "Msg Refs", "File Size") +
+ client.log.log_infos.map {
+ case (id, info) => id -> client.log_refs.get(id).map(_.get)
+ }.toSeq.flatMap {
+ case (id, refs) =>
+ try {
+ val file = LevelDBClient.create_sequence_file(client.directory, id, LevelDBClient.LOG_SUFFIX)
+ val size = file.length()
+ Some(row_layout.format(
+ file.getName,
+ refs.getOrElse(0L).toString,
+ ViewHelper.memory(size)
+ ))
+ } catch {
+ case e: Throwable =>
+ None
+ }
+ }.mkString("")
}
}
callback(rc)
@@ -297,7 +312,7 @@ class LevelDBStore(val config:LevelDBSto
* Exports the contents of the store to the provided streams. Each stream should contain
* a list of framed protobuf objects with the corresponding object types.
*/
- def export_data(os:OutputStream, cb:(Option[String])=>Unit) = write_executor {
+ def export_data(os: OutputStream, cb: (Option[String]) => Unit) = write_executor {
cb(client.export_data(os))
}
@@ -305,7 +320,7 @@ class LevelDBStore(val config:LevelDBSto
* Imports a previously exported set of streams. This deletes any previous data
* in the store.
*/
- def import_data(is:InputStream, cb:(Option[String])=>Unit) = write_executor {
+ def import_data(is: InputStream, cb: (Option[String]) => Unit) = write_executor {
cb(client.import_data(is))
}
Copied: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStoreFactory.scala (from r1245945, activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStoreFactory.scala)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStoreFactory.scala?p2=activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStoreFactory.scala&p1=activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStoreFactory.scala&r1=1245945&r2=1291054&rev=1291054&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStoreFactory.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStoreFactory.scala Sun Feb 19 20:24:15 2012
@@ -1,3 +1,5 @@
+package org.apache.activemq.apollo.broker.store.leveldb
+
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -14,7 +16,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.apollo.broker.store.leveldb
import dto.LevelDBStoreDTO
import org.apache.activemq.apollo.broker.store.StoreFactory
@@ -30,13 +31,13 @@ import org.apache.activemq.apollo.util._
* This class is discovered using the following resource file:
* <code>META-INF/services/org.apache.activemq.apollo/stores</code>
* </p>
- *
+ *
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
class LevelDBStoreFactory extends StoreFactory {
- def create(config: StoreDTO) = config match {
- case config:LevelDBStoreDTO =>
- if( config.getClass == classOf[LevelDBStoreDTO] ) {
+ def create(config: StoreDTO) = config match {
+ case config: LevelDBStoreDTO =>
+ if (config.getClass == classOf[LevelDBStoreDTO]) {
new LevelDBStore(config)
} else {
null