You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/02/19 21:24:16 UTC

svn commit: r1291054 [1/2] - in /activemq/activemq-apollo/trunk/apollo-leveldb/src: main/scala/org/apache/activemq/apollo/broker/store/leveldb/ main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/ test/scala/org/apache/activemq/apollo/br...

Author: chirino
Date: Sun Feb 19 20:24:15 2012
New Revision: 1291054

URL: http://svn.apache.org/viewvc?rev=1291054&view=rev
Log:
Move leveldb store source files to correct directory

Added:
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/HelperTrait.scala
      - copied, changed from r1245945, activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/HelperTrait.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/Interval.scala
      - copied, changed from r1245945, activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/Interval.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala   (with props)
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStore.scala
      - copied, changed from r1245945, activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStoreFactory.scala
      - copied, changed from r1245945, activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStoreFactory.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/RecordLog.scala
      - copied, changed from r1245945, activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStoreTest.scala
      - copied, changed from r1245945, activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStoreTest.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/PureJavaLevelDBStoreTest.scala
      - copied, changed from r1245945, activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/PureJavaLevelDBStoreTest.scala
Removed:
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/HelperTrait.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/Interval.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStoreFactory.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStoreTest.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/PureJavaLevelDBStoreTest.scala

Copied: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/HelperTrait.scala (from r1245945, activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/HelperTrait.scala)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/HelperTrait.scala?p2=activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/HelperTrait.scala&p1=activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/HelperTrait.scala&r1=1245945&r2=1291054&rev=1291054&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/HelperTrait.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/HelperTrait.scala Sun Feb 19 20:24:15 2012
@@ -1,3 +1,5 @@
+package org.apache.activemq.apollo.broker.store.leveldb
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -14,7 +16,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.apollo.broker.store.leveldb
 
 import org.fusesource.hawtbuf._
 import org.iq80.leveldb._
@@ -22,26 +23,27 @@ import java.io.DataOutput
 
 object HelperTrait {
 
-  def encode_locator(pos:Long, len:Int):Array[Byte] = {
+  def encode_locator(pos: Long, len: Int): Array[Byte] = {
     val out = new DataByteArrayOutputStream(
-      AbstractVarIntSupport.computeVarLongSize(pos)+
-      AbstractVarIntSupport.computeVarIntSize(len)
+      AbstractVarIntSupport.computeVarLongSize(pos) +
+        AbstractVarIntSupport.computeVarIntSize(len)
     )
     out.writeVarLong(pos)
     out.writeVarInt(len)
     out.getData
   }
 
-  def decode_locator(bytes:Array[Byte]):(Long,  Int) = {
+  def decode_locator(bytes: Array[Byte]): (Long, Int) = {
     val in = new DataByteArrayInputStream(bytes)
     (in.readVarLong(), in.readVarInt())
   }
-  def decode_locator(bytes:Buffer):(Long,  Int) = {
+
+  def decode_locator(bytes: Buffer): (Long, Int) = {
     val in = new DataByteArrayInputStream(bytes)
     (in.readVarLong(), in.readVarInt())
   }
 
-  def encode_vlong(a1:Long):Array[Byte] = {
+  def encode_vlong(a1: Long): Array[Byte] = {
     val out = new DataByteArrayOutputStream(
       AbstractVarIntSupport.computeVarLongSize(a1)
     )
@@ -49,31 +51,31 @@ object HelperTrait {
     out.getData
   }
 
-  def decode_vlong(bytes:Array[Byte]):Long = {
+  def decode_vlong(bytes: Array[Byte]): Long = {
     val in = new DataByteArrayInputStream(bytes)
     in.readVarLong()
   }
 
-  def encode_key(a1:Byte, a2:Long):Array[Byte] = {
+  def encode_key(a1: Byte, a2: Long): Array[Byte] = {
     val out = new DataByteArrayOutputStream(9)
     out.writeByte(a1.toInt)
     out.writeLong(a2)
     out.getData
   }
 
-  def encode_key(a1:Byte, a2:Buffer):Array[Byte] = {
-    val out = new DataByteArrayOutputStream(1+a2.length)
+  def encode_key(a1: Byte, a2: Buffer): Array[Byte] = {
+    val out = new DataByteArrayOutputStream(1 + a2.length)
     out.writeByte(a1.toInt)
     a2.writeTo(out.asInstanceOf[DataOutput])
     out.getData
   }
 
-  def decode_long_key(bytes:Array[Byte]):(Byte, Long) = {
+  def decode_long_key(bytes: Array[Byte]): (Byte, Long) = {
     val in = new DataByteArrayInputStream(bytes)
     (in.readByte(), in.readLong())
   }
 
-  def encode_key(a1:Byte, a2:Long, a3:Long):Array[Byte] = {
+  def encode_key(a1: Byte, a2: Long, a3: Long): Array[Byte] = {
     val out = new DataByteArrayOutputStream(17)
     out.writeByte(a1)
     out.writeLong(a2)
@@ -81,19 +83,19 @@ object HelperTrait {
     out.getData
   }
 
-  def decode_long_long_key(bytes:Array[Byte]):(Byte,Long,Long) = {
+  def decode_long_long_key(bytes: Array[Byte]): (Byte, Long, Long) = {
     val in = new DataByteArrayInputStream(bytes)
     (in.readByte(), in.readLong(), in.readLong())
   }
 
-  def encode(a1:Byte, a2:Int):Array[Byte] = {
+  def encode(a1: Byte, a2: Int): Array[Byte] = {
     val out = new DataByteArrayOutputStream(5)
     out.writeByte(a1)
     out.writeInt(a2)
     out.getData
   }
 
-  def decode_int_key(bytes:Array[Byte]):(Byte,Int) = {
+  def decode_int_key(bytes: Array[Byte]): (Byte, Int) = {
     val in = new DataByteArrayInputStream(bytes)
     (in.readByte(), in.readInt())
   }
@@ -101,30 +103,30 @@ object HelperTrait {
   final class RichDB(val db: DB) {
 
     val is_pure_java_version = db.getClass.getName == "org.iq80.leveldb.impl.DbImpl"
-    
-    def getProperty(name:String) = db.getProperty(name)
 
-    def getApproximateSizes(ranges:Range*) = db.getApproximateSizes(ranges:_*)
+    def getProperty(name: String) = db.getProperty(name)
+
+    def getApproximateSizes(ranges: Range*) = db.getApproximateSizes(ranges: _*)
 
-    def get(key:Array[Byte], ro:ReadOptions=new ReadOptions):Option[Array[Byte]] = {
+    def get(key: Array[Byte], ro: ReadOptions = new ReadOptions): Option[Array[Byte]] = {
       Option(db.get(key, ro))
     }
 
-    def close:Unit = db.close()
+    def close: Unit = db.close()
 
-    def delete(key:Array[Byte], wo:WriteOptions=new WriteOptions):Unit = {
+    def delete(key: Array[Byte], wo: WriteOptions = new WriteOptions): Unit = {
       db.delete(key, wo)
     }
 
-    def put(key:Array[Byte], value:Array[Byte], wo:WriteOptions=new WriteOptions):Unit = {
+    def put(key: Array[Byte], value: Array[Byte], wo: WriteOptions = new WriteOptions): Unit = {
       db.put(key, value, wo)
     }
 
-    def write[T](wo:WriteOptions=new WriteOptions)(func: WriteBatch=>T):T = {
+    def write[T](wo: WriteOptions = new WriteOptions)(func: WriteBatch => T): T = {
       val updates = db.createWriteBatch()
       try {
 
-        val rc=Some(func(updates))
+        val rc = Some(func(updates))
         db.write(updates, wo)
         return rc.get
       } finally {
@@ -132,7 +134,7 @@ object HelperTrait {
       }
     }
 
-    def snapshot[T](func: Snapshot=>T):T = {
+    def snapshot[T](func: Snapshot => T): T = {
       val snapshot = db.getSnapshot
       try {
         func(snapshot)
@@ -141,11 +143,11 @@ object HelperTrait {
       }
     }
 
-    def cursor_keys(ro:ReadOptions=new ReadOptions)(func: Array[Byte] => Boolean): Unit = {
+    def cursor_keys(ro: ReadOptions = new ReadOptions)(func: Array[Byte] => Boolean): Unit = {
       val iterator = db.iterator(ro)
       iterator.seekToFirst();
       try {
-        while( iterator.hasNext && func(iterator.peekNext.getKey) ) {
+        while (iterator.hasNext && func(iterator.peekNext.getKey)) {
           iterator.next()
         }
       } finally {
@@ -153,14 +155,14 @@ object HelperTrait {
       }
     }
 
-    def cursor_keys_prefixed(prefix:Array[Byte], ro:ReadOptions=new ReadOptions)(func: Array[Byte] => Boolean): Unit = {
+    def cursor_keys_prefixed(prefix: Array[Byte], ro: ReadOptions = new ReadOptions)(func: Array[Byte] => Boolean): Unit = {
       val iterator = db.iterator(ro)
       iterator.seek(prefix);
       try {
-        def check(key:Array[Byte]) = {
+        def check(key: Array[Byte]) = {
           key.startsWith(prefix) && func(key)
         }
-        while( iterator.hasNext && check(iterator.peekNext.getKey) ) {
+        while (iterator.hasNext && check(iterator.peekNext.getKey)) {
           iterator.next()
         }
       } finally {
@@ -168,14 +170,14 @@ object HelperTrait {
       }
     }
 
-    def cursor_prefixed(prefix:Array[Byte], ro:ReadOptions=new ReadOptions)(func: (Array[Byte],Array[Byte]) => Boolean): Unit = {
+    def cursor_prefixed(prefix: Array[Byte], ro: ReadOptions = new ReadOptions)(func: (Array[Byte], Array[Byte]) => Boolean): Unit = {
       val iterator = db.iterator(ro)
       iterator.seek(prefix);
       try {
-        def check(key:Array[Byte]) = {
+        def check(key: Array[Byte]) = {
           key.startsWith(prefix) && func(key, iterator.peekNext.getValue)
         }
-        while( iterator.hasNext && check(iterator.peekNext.getKey) ) {
+        while (iterator.hasNext && check(iterator.peekNext.getKey)) {
           iterator.next()
         }
       } finally {
@@ -183,22 +185,22 @@ object HelperTrait {
       }
     }
 
-    def compare(a1:Array[Byte], a2:Array[Byte]):Int = {
+    def compare(a1: Array[Byte], a2: Array[Byte]): Int = {
       new Buffer(a1).compareTo(new Buffer(a2))
     }
 
-    def cursor_range_keys(start_included:Array[Byte], end_excluded:Array[Byte], ro:ReadOptions=new ReadOptions)(func: Array[Byte] => Boolean): Unit = {
+    def cursor_range_keys(start_included: Array[Byte], end_excluded: Array[Byte], ro: ReadOptions = new ReadOptions)(func: Array[Byte] => Boolean): Unit = {
       val iterator = db.iterator(ro)
       iterator.seek(start_included);
       try {
-        def check(key:Array[Byte]) = {
-          if ( compare(key,end_excluded) < 0) {
+        def check(key: Array[Byte]) = {
+          if (compare(key, end_excluded) < 0) {
             func(key)
           } else {
             false
           }
         }
-        while( iterator.hasNext && check(iterator.peekNext.getKey) ) {
+        while (iterator.hasNext && check(iterator.peekNext.getKey)) {
           iterator.next()
         }
       } finally {
@@ -206,14 +208,14 @@ object HelperTrait {
       }
     }
 
-    def cursor_range(start_included:Array[Byte], end_excluded:Array[Byte], ro:ReadOptions=new ReadOptions)(func: (Array[Byte],Array[Byte]) => Boolean): Unit = {
+    def cursor_range(start_included: Array[Byte], end_excluded: Array[Byte], ro: ReadOptions = new ReadOptions)(func: (Array[Byte], Array[Byte]) => Boolean): Unit = {
       val iterator = db.iterator(ro)
       iterator.seek(start_included);
       try {
-        def check(key:Array[Byte]) = {
-          (compare(key,end_excluded) < 0) && func(key, iterator.peekNext.getValue)
+        def check(key: Array[Byte]) = {
+          (compare(key, end_excluded) < 0) && func(key, iterator.peekNext.getValue)
         }
-        while( iterator.hasNext && check(iterator.peekNext.getKey) ) {
+        while (iterator.hasNext && check(iterator.peekNext.getKey)) {
           iterator.next()
         }
       } finally {
@@ -221,35 +223,36 @@ object HelperTrait {
       }
     }
 
-    def last_key(prefix:Array[Byte], ro:ReadOptions=new ReadOptions): Option[Array[Byte]] = {
+    def last_key(prefix: Array[Byte], ro: ReadOptions = new ReadOptions): Option[Array[Byte]] = {
       val last = new Buffer(prefix).deepCopy().data
-      if ( last.length > 0 ) {
-        val pos = last.length-1
-        last(pos) = (last(pos)+1).toByte
+      if (last.length > 0) {
+        val pos = last.length - 1
+        last(pos) = (last(pos) + 1).toByte
       }
 
-      if(is_pure_java_version) {
+      if (is_pure_java_version) {
         // The pure java version of LevelDB does not support backward iteration.
-        var rc:Option[Array[Byte]] = None
-        cursor_range_keys(prefix, last) { key=>
-          rc = Some(key)
-          true
+        var rc: Option[Array[Byte]] = None
+        cursor_range_keys(prefix, last) {
+          key =>
+            rc = Some(key)
+            true
         }
         rc
       } else {
         val iterator = db.iterator(ro)
         try {
-        
+
           iterator.seek(last);
-          if ( iterator.hasPrev ) {
+          if (iterator.hasPrev) {
             iterator.prev()
           } else {
             iterator.seekToLast()
           }
 
-          if ( iterator.hasNext ) {
+          if (iterator.hasNext) {
             val key = iterator.peekNext.getKey
-            if(key.startsWith(prefix)) {
+            if (key.startsWith(prefix)) {
               Some(key)
             } else {
               None

Copied: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/Interval.scala (from r1245945, activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/Interval.scala)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/Interval.scala?p2=activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/Interval.scala&p1=activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/Interval.scala&r1=1245945&r2=1291054&rev=1291054&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/Interval.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/Interval.scala Sun Feb 19 20:24:15 2012
@@ -1,3 +1,5 @@
+package org.apache.activemq.apollo.broker.store.leveldb
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -14,7 +16,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.apollo.broker.store.leveldb
 
 import java.util.ArrayList
 import java.util.Iterator
@@ -23,21 +24,24 @@ import java.util.NoSuchElementException
 import org.apache.activemq.apollo.util.TreeMap
 
 object Interval {
-  def apply[N](start:N)(implicit numeric: scala.math.Numeric[N]):Interval[N] = {
+  def apply[N](start: N)(implicit numeric: scala.math.Numeric[N]): Interval[N] = {
     import numeric._
-    Interval(start, start+one)
+    Interval(start, start + one)
   }
 }
 
 case class Interval[N](start: N, limit: N)(implicit numeric: scala.math.Numeric[N]) {
+
   import numeric._
 
   def size = limit - start
+
   def end = limit - one
 
-  def start(value: N):Interval[N] = Interval(value, limit)
-  def limit(value: N):Interval[N] = Interval(start, value)
-  
+  def start(value: N): Interval[N] = Interval(value, limit)
+
+  def limit(value: N): Interval[N] = Interval(start, value)
+
   override def toString = {
     if (start == end) {
       start.toString
@@ -57,8 +61,10 @@ case class Interval[N](start: N, limit: 
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 case class IntervalSet[N](implicit numeric: scala.math.Numeric[N]) extends java.lang.Iterable[Interval[N]] {
+
   import numeric._
   import collection.JavaConversions._
+
   private final val ranges = new TreeMap[N, Interval[N]]
 
   def copy = {
@@ -68,11 +74,13 @@ case class IntervalSet[N](implicit numer
     }
     rc
   }
-  def add(r:N):Unit = add(Interval(r))
-  def add(r:Interval[N]): Unit = {
+
+  def add(r: N): Unit = add(Interval(r))
+
+  def add(r: Interval[N]): Unit = {
     var start = r.start
     var limit = r.limit
-    
+
     var entry = ranges.floorEntry(limit)
     while (entry != null) {
       var curr = entry
@@ -95,17 +103,18 @@ case class IntervalSet[N](implicit numer
     ranges.put(start, Interval(start, limit))
   }
 
-  def remove(r:N):Unit = remove(Interval(r))
-  def remove(r:Interval[N]): Unit = {
-    val start = r.start 
+  def remove(r: N): Unit = remove(Interval(r))
+
+  def remove(r: Interval[N]): Unit = {
+    val start = r.start
     var limit = r.limit
     var entry = ranges.lowerEntry(limit)
     while (entry != null) {
-      
+
       var curr = entry
       var range = curr.getValue
       entry = entry.previous
-      
+
       if (range.limit <= start) {
         entry = null
       } else {
@@ -155,7 +164,7 @@ case class IntervalSet[N](implicit numer
   }
 
   override def toString = {
-    "[ " + ranges.values().mkString(", ")+" ]"
+    "[ " + ranges.values().mkString(", ") + " ]"
   }
 
   def iterator: Iterator[Interval[N]] = {
@@ -183,7 +192,7 @@ case class IntervalSet[N](implicit numer
       private var _next: Interval[N] = null
 
       def hasNext: Boolean = {
-        while (next==null && last.limit < mask.limit && iter.hasNext) {
+        while (next == null && last.limit < mask.limit && iter.hasNext) {
           var r = iter.next
           if (r.limit >= last.limit) {
             if (r.start < last.limit) {
@@ -215,7 +224,7 @@ case class IntervalSet[N](implicit numer
     }
   }
 
-  private final class ValueIterator(val ranges:Iterator[Interval[N]]) extends java.util.Iterator[N] {
+  private final class ValueIterator(val ranges: Iterator[Interval[N]]) extends java.util.Iterator[N] {
 
     private var range: Interval[N] = null
     private var _next: Option[N] = None

Added: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala?rev=1291054&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala Sun Feb 19 20:24:15 2012
@@ -0,0 +1,1416 @@
+package org.apache.activemq.apollo.broker.store.leveldb
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.{lang => jl}
+import java.{util => ju}
+
+import org.fusesource.hawtbuf.proto.PBMessageFactory
+
+import org.apache.activemq.apollo.broker.store._
+import java.io._
+import java.util.concurrent.TimeUnit
+import org.apache.activemq.apollo.util._
+import java.util.concurrent.locks.ReentrantReadWriteLock
+import org.fusesource.hawtdispatch._
+import org.apache.activemq.apollo.util.{TreeMap => ApolloTreeMap}
+import collection.immutable.TreeMap
+import org.fusesource.leveldbjni.internal.Util
+import org.apache.activemq.apollo.broker.Broker
+import org.apache.activemq.apollo.util.ProcessSupport._
+import collection.mutable.{HashMap, ListBuffer}
+import org.apache.activemq.apollo.dto.JsonCodec
+import org.iq80.leveldb._
+import org.apache.activemq.apollo.broker.store.leveldb.RecordLog.LogInfo
+import org.apache.activemq.apollo.broker.store.PBSupport
+import java.util.concurrent.atomic.AtomicReference
+import org.fusesource.hawtbuf.{AsciiBuffer, Buffer, AbstractVarIntSupport}
+import org.apache.activemq.apollo.broker.store.leveldb.HelperTrait.encode_key
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object LevelDBClient extends Log {
+
+  final val STORE_SCHEMA_PREFIX = "leveldb_store:"
+  final val STORE_SCHEMA_VERSION = 3
+
+  final val queue_prefix = 'q'.toByte
+  final val queue_entry_prefix = 'e'.toByte
+  final val map_prefix = 'p'.toByte
+  final val tmp_prefix = 't'.toByte
+
+  final val queue_prefix_array = Array(queue_prefix)
+  final val map_prefix_array = Array(map_prefix)
+  final val queue_entry_prefix_array = Array(queue_entry_prefix)
+
+  final val dirty_index_key = bytes(":dirty")
+  final val log_refs_index_key = bytes(":log-refs")
+  final val TRUE = bytes("true")
+  final val FALSE = bytes("false")
+
+  final val LOG_ADD_QUEUE = 1.toByte
+  final val LOG_REMOVE_QUEUE = 2.toByte
+  final val LOG_ADD_MESSAGE = 3.toByte
+  final val LOG_ADD_QUEUE_ENTRY = 5.toByte
+  final val LOG_REMOVE_QUEUE_ENTRY = 6.toByte
+  final val LOG_MAP_ENTRY = 7.toByte
+
+  final val LOG_ADD_MESSAGE_SNAPPY = (LOG_ADD_MESSAGE + 100).toByte
+  final val LOG_MAP_ENTRY_SNAPPY = (LOG_MAP_ENTRY + 100).toByte
+
+  final val LOG_SUFFIX = ".log"
+  final val INDEX_SUFFIX = ".index"
+
+  def bytes(value: String) = value.getBytes("UTF-8")
+
+  import FileSupport._
+
+  def create_sequence_file(directory: File, id: Long, suffix: String) = directory / ("%016x%s".format(id, suffix))
+
+  def find_sequence_files(directory: File, suffix: String): TreeMap[Long, File] = {
+    TreeMap((directory.list_files.flatMap {
+      f =>
+        if (f.getName.endsWith(suffix)) {
+          try {
+            val base = f.getName.stripSuffix(suffix)
+            val position = java.lang.Long.parseLong(base, 16);
+            Some(position -> f)
+          } catch {
+            case e: NumberFormatException => None
+          }
+        } else {
+          None
+        }
+    }): _*)
+  }
+
+  val on_windows = System.getProperty("os.name").toLowerCase().startsWith("windows")
+  var link_strategy = 0
+
+  def link(source: File, target: File): Unit = {
+    link_strategy match {
+      case 0 =>
+        // We first try to link via a native system call. Fails if
+        // we cannot load the JNI module.
+        try {
+          Util.link(source, target)
+        } catch {
+          case e: IOException => throw e
+          case e: Throwable =>
+            // Fallback.. to a slower impl..
+            debug("Native link system call not available")
+            link_strategy = 2
+            link(source, target)
+        }
+
+      // TODO: consider implementing a case which does the native system call using JNA
+      case 2 =>
+        // Next try JNA (might not be in classpath)
+        try {
+          IOHelper.hardlink(source, target)
+        } catch {
+          case e: IOException => throw e
+          case e: Throwable =>
+            // Fallback.. to a slower impl..
+            debug("JNA based hard link system call not available")
+            link_strategy = 5
+            link(source, target)
+        }
+
+      case 5 =>
+        // Next we try to do the link by executing an
+        // operating system shell command
+        try {
+          if (on_windows) {
+            system("fsutil", "hardlink", "create", target.getCanonicalPath, source.getCanonicalPath) match {
+              case (0, _, _) => // Success
+              case (_, out, err) =>
+                // TODO: we might want to look at the out/err to see why it failed
+                // to avoid falling back to the slower strategy.
+                debug("fsutil OS command not available either")
+                link_strategy = 10
+                link(source, target)
+            }
+          } else {
+            system("ln", source.getCanonicalPath, target.getCanonicalPath) match {
+              case (0, _, _) => // Success
+              case (_, out, err) => None
+              // TODO: we might want to look at the out/err to see why it failed
+              // to avoid falling back to the slower strategy.
+              debug("ln OS command not available either")
+              link_strategy = 10
+              link(source, target)
+            }
+          }
+        } catch {
+          case e: Throwable =>
+        }
+      case _ =>
+        // this final strategy is slow but sure to work.
+        source.copy_to(target)
+    }
+  }
+
+}
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class LevelDBClient(store: LevelDBStore) {
+
+  import HelperTrait._
+  import LevelDBClient._
+  import FileSupport._
+
+  def dispatchQueue = store.dispatch_queue
+
+  implicit def toByteArray(buf: Buffer): Array[Byte] = buf.toByteArray
+
+  implicit def toBuffer(buf: Array[Byte]): Buffer = new Buffer(buf)
+
+  /////////////////////////////////////////////////////////////////////
+  //
+  // Helpers
+  //
+  /////////////////////////////////////////////////////////////////////
+
+  def config = store.config
+
+  def directory = config.directory
+
+  /////////////////////////////////////////////////////////////////////
+  //
+  // Public interface used by the LevelDBStore
+  //
+  /////////////////////////////////////////////////////////////////////
+
+  var sync = false;
+  var verify_checksums = false;
+
+  var log: RecordLog = _
+
+  var snappy_compress_logs = false
+  var index: RichDB = _
+  var index_options: Options = _
+
+  var last_index_snapshot_ts = System.currentTimeMillis()
+  var last_index_snapshot_pos: Long = _
+  val snapshot_rw_lock = new ReentrantReadWriteLock(true)
+
+  var factory: DBFactory = _
+  val log_refs = HashMap[Long, LongCounter]()
+
+  def dirty_index_file = directory / ("dirty" + INDEX_SUFFIX)
+
+  def temp_index_file = directory / ("temp" + INDEX_SUFFIX)
+
+  def snapshot_index_file(id: Long) = create_sequence_file(directory, id, INDEX_SUFFIX)
+
+  def create_log: RecordLog = {
+    new RecordLog(directory, LOG_SUFFIX)
+  }
+
+  def log_size = {
+    Option(config.log_size).map(MemoryPropertyEditor.parse(_)).getOrElse(1024 * 1024 * 100L)
+  }
+
+  def start() = {
+    import OptionSupport._
+
+
+    val factory_names = Option(config.index_factory).getOrElse("org.fusesource.leveldbjni.JniDBFactory, org.iq80.leveldb.impl.Iq80DBFactory")
+    factory = factory_names.split("""(,|\s)+""").map(_.trim()).flatMap {
+      name =>
+        try {
+          Some(Broker.class_loader.loadClass(name).newInstance().asInstanceOf[DBFactory])
+        } catch {
+          case x: Throwable =>
+            None
+        }
+    }.headOption.getOrElse(throw new Exception("Could not load any of the index factory classes: " + factory_names))
+
+    if (factory.getClass.getName == "org.iq80.leveldb.impl.Iq80DBFactory") {
+      warn("Using the pure java LevelDB implementation which is still experimental.  If the JNI version is not available for your platform, please switch to the BDB store instead. http://activemq.apache.org/apollo/documentation/user-manual.html#BDB_Store")
+    }
+
+    sync = config.sync.getOrElse(true);
+    verify_checksums = config.verify_checksums.getOrElse(false);
+
+    index_options = new Options();
+    index_options.createIfMissing(true);
+    val paranoid_checks = config.paranoid_checks.getOrElse(false)
+
+    config.index_max_open_files.foreach(index_options.maxOpenFiles(_))
+    config.index_block_restart_interval.foreach(index_options.blockRestartInterval(_))
+    index_options.paranoidChecks(paranoid_checks)
+    Option(config.index_write_buffer_size).map(MemoryPropertyEditor.parse(_).toInt).foreach(index_options.writeBufferSize(_))
+    Option(config.index_block_size).map(MemoryPropertyEditor.parse(_).toInt).foreach(index_options.blockSize(_))
+    Option(config.index_compression).foreach(x => index_options.compressionType(x match {
+      case "snappy" => CompressionType.SNAPPY
+      case "none" => CompressionType.NONE
+      case _ => CompressionType.SNAPPY
+    }))
+
+    if (Option(config.log_compression).map(_.toLowerCase).getOrElse("snappy") == "snappy" && Snappy != null) {
+      snappy_compress_logs = true
+    }
+
+    index_options.cacheSize(Option(config.index_cache_size).map(MemoryPropertyEditor.parse(_).toLong).getOrElse(1024 * 1024 * 256L))
+    index_options.logger(new Logger() {
+      def log(msg: String) = trace(msg.stripSuffix("\n"))
+    })
+
+    log = create_log
+    log.sync = sync
+    log.logSize = log_size
+    log.verify_checksums = verify_checksums
+    log.on_log_rotate = () => {
+      // lets queue a request to checkpoint when
+      // the logs rotate.. queue it on the GC thread since GC's lock
+      // the index for a long time.
+      store.write_executor {
+        snapshot_index
+      }
+    }
+
+    lock_file = new LockFile(directory / "lock", true)
+    def time[T](func: => T): Long = {
+      val start = System.nanoTime()
+      func
+      System.nanoTime() - start
+    }
+
+    // Lock before we open anything..
+    lock_store
+
+    // Lets check store compatibility...
+    val version_file = directory / "store-version.txt"
+    if (version_file.exists()) {
+      val ver = try {
+        var tmp: String = version_file.read_text().trim()
+        if (tmp.startsWith(STORE_SCHEMA_PREFIX)) {
+          tmp.stripPrefix(STORE_SCHEMA_PREFIX).toInt
+        } else {
+          -1
+        }
+      } catch {
+        case e => throw new Exception("Unexpected version file format: " + version_file)
+      }
+      ver match {
+        case STORE_SCHEMA_VERSION => // All is good.
+        case _ => throw new Exception("Cannot open the store.  It's schema version is not supported.")
+      }
+    }
+    version_file.write_text(STORE_SCHEMA_PREFIX + STORE_SCHEMA_VERSION)
+
+    val log_open_duration = time {
+      retry {
+        log.open
+      }
+    }
+    info("Opening the log file took: %.2f ms", (log_open_duration / TimeUnit.MILLISECONDS.toNanos(1).toFloat))
+
+    // Find out what was the last snapshot.
+    val snapshots = find_sequence_files(directory, INDEX_SUFFIX)
+    var last_snapshot_index = snapshots.lastOption
+    last_index_snapshot_pos = last_snapshot_index.map(_._1).getOrElse(0)
+
+    // Only keep the last snapshot..
+    snapshots.filterNot(_._1 == last_index_snapshot_pos).foreach(_._2.recursive_delete)
+    temp_index_file.recursive_delete // usually does not exist.
+
+    retry {
+
+      // Delete the dirty indexes
+      dirty_index_file.recursive_delete
+      dirty_index_file.mkdirs()
+
+      last_snapshot_index.foreach {
+        case (id, file) =>
+          // Resume log replay from a snapshot of the index..
+          try {
+            file.list_files.foreach {
+              file =>
+                link(file, dirty_index_file / file.getName)
+            }
+          } catch {
+            case e: Exception =>
+              warn(e, "Could not recover snapshot of the index: " + e)
+              last_snapshot_index = None
+          }
+      }
+
+      index = new RichDB(factory.open(dirty_index_file, index_options));
+
+      try {
+        load_log_refs
+        index.put(dirty_index_key, TRUE)
+
+        if (paranoid_checks) {
+          check_index_integrity(index)
+        }
+
+        // Update the index /w what was stored on the logs..
+        var pos = last_index_snapshot_pos;
+
+        var last_reported_at = System.currentTimeMillis();
+        var showing_progress = false
+        var last_reported_pos = 0L
+
+        def remaining(eta: Double) = {
+          if (eta > 60 * 60) {
+            "%.2f hrs".format(eta / (60 * 60))
+          } else if (eta > 60) {
+            "%.2f mins".format(eta / 60)
+          } else {
+            "%.0f secs".format(eta)
+          }
+        }
+
+        var replay_operations = 0
+        val log_replay_duration = time {
+          while (pos < log.appender_limit) {
+
+            val now = System.currentTimeMillis();
+            if (now > last_reported_at + 1000) {
+              val at = pos - last_index_snapshot_pos
+              val total = log.appender_limit - last_index_snapshot_pos
+              val rate = (pos - last_reported_pos) * 1000.0 / (now - last_reported_at)
+              val eta = (total - at) / rate
+
+              System.out.print("Replaying recovery log: %f%% done (%,d/%,d bytes) @ %,.2f kb/s, %s remaining.     \r".format(
+                at * 100.0 / total, at, total, rate / 1024, remaining(eta)))
+              showing_progress = true;
+              last_reported_at = now
+              last_reported_pos = pos
+            }
+
+            log.read(pos).map {
+              case (kind, data, next_pos) =>
+                kind match {
+                  case LOG_ADD_QUEUE_ENTRY =>
+                    replay_operations += 1
+                    val record = QueueEntryPB.FACTORY.parseUnframed(data)
+
+                    val index_record = record.copy()
+                    index_record.clearQueueKey()
+                    index_record.clearQueueSeq()
+                    index.put(encode_key(queue_entry_prefix, record.getQueueKey, record.getQueueSeq), index_record.freeze().toUnframedBuffer)
+
+                    log_ref_increment(decode_vlong(record.getMessageLocator))
+
+                  case LOG_REMOVE_QUEUE_ENTRY =>
+                    replay_operations += 1
+                    index.get(data, new ReadOptions).foreach {
+                      value =>
+                        val record = QueueEntryPB.FACTORY.parseUnframed(value)
+                        val pos = decode_vlong(record.getMessageLocator)
+                        pos.foreach(log_ref_decrement(_))
+                        index.delete(data)
+                    }
+
+                  case LOG_ADD_QUEUE =>
+                    replay_operations += 1
+                    val record = QueuePB.FACTORY.parseUnframed(data)
+                    index.put(encode_key(queue_prefix, record.getKey), data)
+
+                  case LOG_REMOVE_QUEUE =>
+                    replay_operations += 1
+                    val ro = new ReadOptions
+                    ro.fillCache(false)
+                    ro.verifyChecksums(verify_checksums)
+                    val queue_key = decode_vlong(data)
+                    index.delete(encode_key(queue_prefix, queue_key))
+                    index.cursor_prefixed(encode_key(queue_entry_prefix, queue_key), ro) {
+                      (key, value) =>
+                        index.delete(key)
+
+                        // Figure out what log file that message entry was in so we can,
+                        // decrement the log file reference.
+                        val record = QueueEntryPB.FACTORY.parseUnframed(value)
+                        val pos = decode_vlong(record.getMessageLocator)
+                        log_ref_decrement(pos)
+                        true
+                    }
+
+                  case LOG_MAP_ENTRY | LOG_MAP_ENTRY_SNAPPY =>
+                    replay_operations += 1
+                    val entry = MapEntryPB.FACTORY.parseUnframed(data)
+                    if (entry.getValue == null) {
+                      index.delete(encode_key(map_prefix, entry.getKey))
+                    } else {
+                      index.put(encode_key(map_prefix, entry.getKey), entry.getValue.toByteArray)
+                    }
+                  case _ =>
+                  // Skip records which don't require index updates.
+                }
+                pos = next_pos
+            }
+          }
+          if (replay_operations > 0) {
+            snapshot_index
+          }
+        }
+
+        if (showing_progress) {
+          System.out.println("Replaying recovery log: done. %d operations recovered in %s".format(replay_operations, log_replay_duration.toDouble / TimeUnit.SECONDS.toNanos(1)));
+        }
+
+      } catch {
+        case e: Throwable =>
+          // replay failed.. good thing we are in a retry block...
+          index.close
+          throw e;
+      }
+    }
+  }
+
+  def check_index_integrity(index: RichDB) = {
+    val actual_log_refs = HashMap[Long, LongCounter]()
+    var referenced_queues = Set[Long]()
+
+    // Lets find out what the queue entries are..
+    var fixed_records = 0
+    index.cursor_prefixed(queue_entry_prefix_array) {
+      (key, value) =>
+        try {
+          val (_, queue_key, seq_key) = decode_long_long_key(key)
+          val record = QueueEntryPB.FACTORY.parseUnframed(value)
+          val (pos, len) = decode_locator(record.getMessageLocator)
+          if (record.getQueueKey != queue_key) {
+            throw new IOException("key missmatch")
+          }
+          if (record.getQueueSeq != seq_key) {
+            throw new IOException("key missmatch")
+          }
+          log.log_info(pos).foreach {
+            log_info =>
+              actual_log_refs.getOrElseUpdate(log_info.position, new LongCounter()).incrementAndGet()
+          }
+          referenced_queues += queue_key
+        } catch {
+          case e =>
+            trace("invalid queue entry record: %s, error: %s", new Buffer(key), e)
+            fixed_records += 1
+            // Invalid record.
+            index.delete(key)
+        }
+        true
+    }
+
+    // Lets cross check the queues.
+    index.cursor_prefixed(queue_prefix_array) {
+      (key, value) =>
+        try {
+          val (_, queue_key) = decode_long_key(key)
+          val record = QueuePB.FACTORY.parseUnframed(value)
+          if (record.getKey != queue_key) {
+            throw new IOException("key missmatch")
+          }
+          referenced_queues -= queue_key
+        } catch {
+          case e =>
+            trace("invalid queue record: %s, error: %s", new Buffer(key), e)
+            fixed_records += 1
+            // Invalid record.
+            index.delete(key)
+        }
+        true
+    }
+
+    referenced_queues.foreach {
+      queue_key =>
+      // We have queue entries for a queue that does not exist..
+        index.cursor_prefixed(encode_key(queue_entry_prefix, queue_key)) {
+          (key, value) =>
+            trace("invalid queue entry record: %s, error: queue key does not exits %s", new Buffer(key), queue_key)
+            fixed_records += 1
+            index.delete(key)
+            val record = QueueEntryPB.FACTORY.parseUnframed(value)
+            val pos = decode_vlong(record.getMessageLocator)
+            log.log_info(pos).foreach {
+              log_info =>
+                actual_log_refs.get(log_info.position).foreach {
+                  counter =>
+                    if (counter.decrementAndGet() == 0) {
+                      actual_log_refs.remove(log_info.position)
+                    }
+                }
+            }
+            true
+        }
+    }
+
+    if (actual_log_refs != log_refs) {
+      debug("expected != actual log references. expected: %s, actual %s", log_refs, actual_log_refs)
+      log_refs.clear()
+      log_refs ++= actual_log_refs
+    }
+
+    if (fixed_records > 0) {
+      warn("Fixed %d invalid index enties in the leveldb store", fixed_records)
+    }
+  }
+
+  var lock_file: LockFile = _
+
+  def lock_store = {
+    import OptionSupport._
+    if (config.fail_if_locked.getOrElse(false)) {
+      lock_file.lock()
+    } else {
+      retry {
+        lock_file.lock()
+      }
+    }
+  }
+
+  def unlock_store = {
+    lock_file.unlock()
+  }
+
+  private def store_log_refs = {
+    index.put(log_refs_index_key, JsonCodec.encode(collection.JavaConversions.mapAsJavaMap(log_refs.mapValues(_.get()))).toByteArray)
+  }
+
+  private def load_log_refs = {
+    log_refs.clear()
+    index.get(log_refs_index_key, new ReadOptions).foreach {
+      value =>
+        val javamap = JsonCodec.decode(new Buffer(value), classOf[java.util.Map[String, Object]])
+        collection.JavaConversions.mapAsScalaMap(javamap).foreach {
+          case (k, v) =>
+            log_refs.put(k.toLong, new LongCounter(v.asInstanceOf[Number].longValue()))
+        }
+    }
+  }
+
+  def stop() = {
+    // this blocks until all io completes..
+    // Suspend also deletes the index.
+    suspend()
+
+    if (log != null) {
+      log.close
+    }
+    copy_dirty_index_to_snapshot
+    log = null
+    unlock_store
+  }
+
+  def using_index[T](func: => T): T = {
+    val lock = snapshot_rw_lock.readLock();
+    lock.lock()
+    try {
+      func
+    } finally {
+      lock.unlock()
+    }
+  }
+
+  def retry_using_index[T](func: => T): T = retry(using_index(func))
+
+  /**
+   * TODO: expose this via management APIs, handy if you want to
+   * do a file system level snapshot and want the data to be consistent.
+   */
+  def suspend() = {
+    // Make sure we are the only ones accessing the index. since
+    // we will be closing it to create a consistent snapshot.
+    snapshot_rw_lock.writeLock().lock()
+
+    // Close the index so that it's files are not changed async on us.
+    store_log_refs
+    index.put(dirty_index_key, FALSE, new WriteOptions().sync(true))
+    index.close
+  }
+
+  /**
+   * TODO: expose this via management APIs, handy if you want to
+   * do a file system level snapshot and want the data to be consistent.
+   */
+  def resume() = {
+    // re=open it..
+    retry {
+      index = new RichDB(factory.open(dirty_index_file, index_options));
+      index.put(dirty_index_key, TRUE)
+    }
+    snapshot_rw_lock.writeLock().unlock()
+  }
+
+  def copy_dirty_index_to_snapshot {
+    if (log.appender_limit == last_index_snapshot_pos) {
+      // no need to snapshot again...
+      return
+    }
+
+    // Where we start copying files into.  Delete this on
+    // restart.
+    val tmp_dir = temp_index_file
+    tmp_dir.mkdirs()
+
+    try {
+
+      // Hard link all the index files.
+      dirty_index_file.list_files.foreach {
+        file =>
+          link(file, tmp_dir / file.getName)
+      }
+
+      // Rename to signal that the snapshot is complete.
+      val new_snapshot_index_pos = log.appender_limit
+      tmp_dir.renameTo(snapshot_index_file(new_snapshot_index_pos))
+      snapshot_index_file(last_index_snapshot_pos).recursive_delete
+      last_index_snapshot_pos = new_snapshot_index_pos
+      last_index_snapshot_ts = System.currentTimeMillis()
+
+    } catch {
+      case e: Exception =>
+        // if we could not snapshot for any reason, delete it as we don't
+        // want a partial check point..
+        warn(e, "Could not snapshot the index: " + e)
+        tmp_dir.recursive_delete
+    }
+  }
+
+  def snapshot_index: Unit = {
+    if (log.appender_limit == last_index_snapshot_pos) {
+      // no need to snapshot again...
+      return
+    }
+    suspend()
+    try {
+      copy_dirty_index_to_snapshot
+    } finally {
+      resume()
+    }
+  }
+
+  def retry[T](func: => T): T = {
+    var error: Throwable = null
+    var rc: Option[T] = None
+
+    // We will loop until the tx succeeds.  Perhaps it's
+    // failing due to a temporary condition like low disk space.
+    while (!rc.isDefined) {
+
+      try {
+        rc = Some(func)
+      } catch {
+        case e: Throwable =>
+          if (error == null) {
+            warn(e, "DB operation failed. (entering recovery mode): " + e)
+          }
+          error = e
+      }
+
+      if (!rc.isDefined) {
+        // We may need to give up if the store is being stopped.
+        if (!store.service_state.is_starting_or_started) {
+          throw error
+        }
+        Thread.sleep(1000)
+      }
+    }
+
+    if (error != null) {
+      info("DB recovered from failure.")
+    }
+    rc.get
+  }
+
+  def purge() = {
+    suspend()
+    try {
+      log.close
+      directory.list_files.foreach(_.recursive_delete)
+      log_refs.clear()
+    } finally {
+      retry {
+        log.open
+      }
+      resume()
+    }
+  }
+
+  def add_queue(record: QueueRecord, callback: Runnable) = {
+    retry_using_index {
+      log.appender {
+        appender =>
+          val value: Buffer = PBSupport.to_pb(record).freeze().toUnframedBuffer
+          appender.append(LOG_ADD_QUEUE, value)
+          index.put(encode_key(queue_prefix, record.key), value)
+      }
+    }
+    callback.run
+  }
+
+  def log_ref_decrement(pos: Long, log_info: LogInfo = null) = this.synchronized {
+    Option(log_info).orElse(log.log_info(pos)) match {
+      case Some(log_info) =>
+        log_refs.get(log_info.position).foreach {
+          counter =>
+            val count = counter.decrementAndGet()
+            if (count == 0) {
+              log_refs.remove(log_info.position)
+            }
+        }
+      case None =>
+        warn("Invalid log position: " + pos)
+    }
+  }
+
+  def log_ref_increment(pos: Long, log_info: LogInfo = null) = this.synchronized {
+    Option(log_info).orElse(log.log_info(pos)) match {
+      case Some(log_info) =>
+        val count = log_refs.getOrElseUpdate(log_info.position, new LongCounter()).incrementAndGet()
+      case None =>
+        warn("Invalid log position: " + pos)
+    }
+  }
+
+  def remove_queue(queue_key: Long, callback: Runnable) = {
+    retry_using_index {
+      log.appender {
+        appender =>
+          val ro = new ReadOptions
+          ro.fillCache(false)
+          ro.verifyChecksums(verify_checksums)
+          appender.append(LOG_REMOVE_QUEUE, encode_vlong(queue_key))
+          index.delete(encode_key(queue_prefix, queue_key))
+          index.cursor_prefixed(encode_key(queue_entry_prefix, queue_key), ro) {
+            (key, value) =>
+              index.delete(key)
+
+              // Figure out what log file that message entry was in so we can,
+              // decrement the log file reference.
+              val record = QueueEntryPB.FACTORY.parseUnframed(value)
+              val pos = decode_vlong(record.getMessageLocator)
+              log_ref_decrement(pos)
+              true
+          }
+      }
+    }
+    callback.run
+  }
+
+  def store(uows: Seq[LevelDBStore#DelayableUOW], callback: Runnable) {
+    retry_using_index {
+      log.appender {
+        appender =>
+
+          var sync_needed = false
+          index.write() {
+            batch =>
+              uows.foreach {
+                uow =>
+
+                  for ((key, value) <- uow.map_actions) {
+                    val entry = new MapEntryPB.Bean()
+                    entry.setKey(key)
+                    if (value == null) {
+                      batch.delete(encode_key(map_prefix, key))
+                    } else {
+                      entry.setValue(value)
+                      batch.put(encode_key(map_prefix, key), value.toByteArray)
+                    }
+                    var log_data = entry.freeze().toUnframedBuffer
+
+                    appender.append(LOG_MAP_ENTRY, log_data)
+                  }
+
+                  uow.actions.foreach {
+                    case (msg, action) =>
+                      val message_record = action.message_record
+                      var locator: (Long, Int) = null
+                      var log_info: LogInfo = null
+
+                      if (message_record != null) {
+
+                        val pb = new MessagePB.Bean
+                        pb.setProtocol(message_record.protocol)
+                        pb.setSize(message_record.size)
+                        pb.setValue(message_record.buffer)
+                        var message_data = pb.freeze().toUnframedBuffer
+
+                        val p = if (snappy_compress_logs) {
+                          val compressed = Snappy.compress(message_data)
+                          if (compressed.length < message_data.length) {
+                            message_data = compressed
+                            appender.append(LOG_ADD_MESSAGE_SNAPPY, message_data)
+                          } else {
+                            appender.append(LOG_ADD_MESSAGE, message_data)
+                          }
+                        } else {
+                          appender.append(LOG_ADD_MESSAGE, message_data)
+                        }
+                        locator = (p._1, message_data.length)
+                        log_info = p._2
+                        message_record.locator.set(locator);
+                      }
+
+                      action.dequeues.foreach {
+                        entry =>
+                          if (locator == null) {
+                            locator = entry.message_locator.get().asInstanceOf[(Long, Int)]
+                          }
+                          assert(locator != null)
+                          val (pos, len) = locator
+                          val key = encode_key(queue_entry_prefix, entry.queue_key, entry.entry_seq)
+
+                          appender.append(LOG_REMOVE_QUEUE_ENTRY, key)
+                          batch.delete(key)
+                          log_ref_decrement(pos, log_info)
+                      }
+
+                      var locator_buffer: Buffer = null
+                      action.enqueues.foreach {
+                        entry =>
+                          assert(locator != null)
+                          val (pos, len) = locator
+                          if (locator_buffer == null) {
+                            locator_buffer = encode_locator(pos, len)
+                          }
+
+                          entry.message_locator.set(locator)
+
+                          val log_record = new QueueEntryPB.Bean
+                          // TODO: perhaps we should normalize the sender to make the index entries more compact.
+                          log_record.setSender(entry.sender)
+                          log_record.setMessageLocator(locator_buffer)
+                          log_record.setQueueKey(entry.queue_key)
+                          log_record.setQueueSeq(entry.entry_seq)
+                          log_record.setSize(entry.size)
+                          if (entry.expiration != 0)
+                            log_record.setExpiration(entry.expiration)
+                          if (entry.redeliveries != 0)
+                            log_record.setRedeliveries(entry.redeliveries)
+
+                          appender.append(LOG_ADD_QUEUE_ENTRY, log_record.freeze().toUnframedBuffer)
+
+                          // Slim down the index record, the smaller it is the cheaper the compactions
+                          // will be and the more we can cache in mem.
+                          val index_record = log_record.copy()
+                          index_record.clearQueueKey()
+                          index_record.clearQueueSeq()
+                          batch.put(encode_key(queue_entry_prefix, entry.queue_key, entry.entry_seq), index_record.freeze().toUnframedBuffer)
+
+                          // Increment it.
+                          log_ref_increment(pos, log_info)
+
+                      }
+                  }
+                  if (uow.flush_sync) {
+                    sync_needed = true
+                  }
+              }
+          }
+          if (sync_needed && sync) {
+            appender.flush
+            appender.force
+          }
+      }
+    }
+    callback.run
+  }
+
+  val metric_load_from_index_counter = new TimeCounter
+  var metric_load_from_index = metric_load_from_index_counter(false)
+
+  def loadMessages(requests: ListBuffer[(Long, AtomicReference[Object], (Option[MessageRecord]) => Unit)]): Unit = {
+
+    val ro = new ReadOptions
+    ro.verifyChecksums(verify_checksums)
+    ro.fillCache(true)
+
+    val missing = retry_using_index {
+      index.snapshot {
+        snapshot =>
+          ro.snapshot(snapshot)
+          requests.flatMap {
+            x =>
+              val (_, locator, callback) = x
+              val record = metric_load_from_index_counter.time {
+                val (pos, len) = locator.get().asInstanceOf[(Long, Int)]
+                log.read(pos, len).map {
+                  case (kind, data) =>
+
+                    val msg_data = kind match {
+                      case LOG_ADD_MESSAGE => data
+                      case LOG_ADD_MESSAGE_SNAPPY => Snappy.uncompress(data)
+                    }
+                    val rc = PBSupport.from_pb(MessagePB.FACTORY.parseUnframed(msg_data))
+                    rc.locator = locator
+                    assert(rc.protocol != null)
+                    rc
+                }
+              }
+              if (record.isDefined) {
+                callback(record)
+                None
+              } else {
+                Some(x)
+              }
+          }
+      }
+    }
+
+    if (missing.isEmpty)
+      return
+
+    // There's a small chance that a message was missing, perhaps we started a read tx, before the
+    // write tx completed.  Lets try again..
+    retry_using_index {
+      index.snapshot {
+        snapshot =>
+          ro.snapshot(snapshot)
+          missing.foreach {
+            x =>
+              val (_, locator, callback) = x
+              val record: Option[MessageRecord] = metric_load_from_index_counter.time {
+                val (pos, len) = locator.get().asInstanceOf[(Long, Int)]
+                log.read(pos, len).map {
+                  case (kind, data) =>
+                    val msg_data = kind match {
+                      case LOG_ADD_MESSAGE => data
+                      case LOG_ADD_MESSAGE_SNAPPY => Snappy.uncompress(data)
+                    }
+                    val rc = PBSupport.from_pb(MessagePB.FACTORY.parseUnframed(msg_data))
+                    rc.locator = locator
+                    assert(rc.protocol != null)
+                    rc
+                }
+              }
+              callback(record)
+          }
+      }
+    }
+  }
+
+  def list_queues: Seq[Long] = {
+    val rc = ListBuffer[Long]()
+    retry_using_index {
+      val ro = new ReadOptions
+      ro.verifyChecksums(verify_checksums)
+      ro.fillCache(false)
+      index.cursor_keys_prefixed(queue_prefix_array, ro) {
+        key =>
+          rc += decode_long_key(key)._2
+          true // to continue cursoring.
+      }
+    }
+    rc
+  }
+
+  def get_queue(queue_key: Long): Option[QueueRecord] = {
+    retry_using_index {
+      val ro = new ReadOptions
+      ro.fillCache(false)
+      ro.verifyChecksums(verify_checksums)
+      index.get(encode_key(queue_prefix, queue_key), ro).map {
+        x =>
+          PBSupport.from_pb(QueuePB.FACTORY.parseUnframed(x))
+      }
+    }
+  }
+
+  def listQueueEntryGroups(queue_key: Long, limit: Int): Seq[QueueEntryRange] = {
+    var rc = ListBuffer[QueueEntryRange]()
+    val ro = new ReadOptions
+    ro.verifyChecksums(verify_checksums)
+    ro.fillCache(false)
+    retry_using_index {
+      index.snapshot {
+        snapshot =>
+          ro.snapshot(snapshot)
+
+          var group: QueueEntryRange = null
+          index.cursor_prefixed(encode_key(queue_entry_prefix, queue_key), ro) {
+            (key, value) =>
+
+              val (_, _, current_key) = decode_long_long_key(key)
+              if (group == null) {
+                group = new QueueEntryRange
+                group.first_entry_seq = current_key
+              }
+
+              val entry = QueueEntryPB.FACTORY.parseUnframed(value)
+              val pos = decode_vlong(entry.getMessageLocator)
+
+              group.last_entry_seq = current_key
+              group.count += 1
+              group.size += entry.getSize
+
+              if (group.expiration == 0) {
+                group.expiration = entry.getExpiration
+              } else {
+                if (entry.getExpiration != 0) {
+                  group.expiration = entry.getExpiration.min(group.expiration)
+                }
+              }
+
+              if (group.count == limit) {
+                rc += group
+                group = null
+              }
+
+              true // to continue cursoring.
+          }
+          if (group != null) {
+            rc += group
+          }
+      }
+    }
+    rc
+  }
+
+  def getQueueEntries(queue_key: Long, firstSeq: Long, lastSeq: Long): Seq[QueueEntryRecord] = {
+    var rc = ListBuffer[QueueEntryRecord]()
+    val ro = new ReadOptions
+    ro.verifyChecksums(verify_checksums)
+    ro.fillCache(true)
+    retry_using_index {
+      index.snapshot {
+        snapshot =>
+          ro.snapshot(snapshot)
+          val start = encode_key(queue_entry_prefix, queue_key, firstSeq)
+          val end = encode_key(queue_entry_prefix, queue_key, lastSeq + 1)
+          index.cursor_range(start, end, ro) {
+            (key, value) =>
+              val (_, _, queue_seq) = decode_long_long_key(key)
+              val record = QueueEntryPB.FACTORY.parseUnframed(value)
+              val entry = PBSupport.from_pb(record)
+              entry.queue_key = queue_key
+              entry.entry_seq = queue_seq
+              entry.message_locator = new AtomicReference[Object](decode_locator(record.getMessageLocator))
+              rc += entry
+              true
+          }
+      }
+    }
+    rc
+  }
+
+  def getLastMessageKey: Long = 0
+
+  def get(key: Buffer): Option[Buffer] = {
+    retry_using_index {
+      index.get(encode_key(map_prefix, key)).map(new Buffer(_))
+    }
+  }
+
+  def get_prefixed_map_entries(prefix: Buffer): Seq[(Buffer, Buffer)] = {
+    val rc = ListBuffer[(Buffer, Buffer)]()
+    retry_using_index {
+      index.cursor_prefixed(encode_key(map_prefix, prefix)) {
+        (key, value) =>
+          rc += new Buffer(key) -> new Buffer(value)
+          true
+      }
+    }
+    rc
+  }
+
+  def get_last_queue_key: Long = {
+    retry_using_index {
+      index.last_key(queue_prefix_array).map(decode_long_key(_)._2).getOrElse(0)
+    }
+  }
+
+  def gc: Unit = {
+
+    // TODO:
+    // Perhaps we should snapshot_index if the current snapshot is old.
+    //
+    import collection.JavaConversions._
+    last_index_snapshot_pos
+    val empty_journals = log.log_infos.keySet.toSet -- log_refs.keySet
+
+    // We don't want to delete any journals that the index has not snapshot'ed or
+    // the the
+    val delete_limit = log.log_info(last_index_snapshot_pos).map(_.position).
+      getOrElse(last_index_snapshot_pos).min(log.appender_start)
+
+    empty_journals.foreach {
+      id =>
+        if (id < delete_limit) {
+          log.delete(id)
+        }
+    }
+  }
+
+  case class UsageCounter(info: LogInfo) {
+    var count = 0L
+    var size = 0L
+    var first_reference_queue: QueueRecord = _
+
+    def increment(value: Int) = {
+      count += 1
+      size += value
+    }
+  }
+
+  //
+  // Collects detailed usage information about the journal like who's referencing it.
+  //
+  //  def get_log_usage_details = {
+  //
+  //    val usage_map = new ApolloTreeMap[Long,UsageCounter]()
+  //    log.log_mutex.synchronized {
+  //      log.log_infos.foreach(entry=> usage_map.put(entry._1, UsageCounter(entry._2)) )
+  //    }
+  //
+  //    def lookup_usage(pos: Long) = {
+  //      var entry = usage_map.floorEntry(pos)
+  //      if (entry != null) {
+  //        val usage = entry.getValue()
+  //        if (pos < usage.info.limit) {
+  //          Some(usage)
+  //        } else {
+  //          None
+  //        }
+  //      } else {
+  //        None
+  //      }
+  //    }
+  //
+  //    val ro = new ReadOptions()
+  //    ro.fillCache(false)
+  //    ro.verifyChecksums(verify_checksums)
+  //
+  //    retry_using_index {
+  //      index.snapshot { snapshot =>
+  //        ro.snapshot(snapshot)
+  //
+  //        // Figure out which journal files are still in use by which queues.
+  //        index.cursor_prefixed(queue_entry_prefix_array, ro) { (_,value) =>
+  //
+  //          val entry_record:QueueEntryRecord = value
+  //          val pos = if(entry_record.message_locator!=null) {
+  //            Some(decode_locator(entry_record.message_locator)._1)
+  //          } else {
+  //            index.get(encode_key(message_prefix, entry_record.message_key)).map(decode_locator(_)._1)
+  //          }
+  //
+  //          pos.flatMap(lookup_usage(_)).foreach { usage =>
+  //            if( usage.first_reference_queue == null ) {
+  //              usage.first_reference_queue = index.get(encode_key(queue_prefix, entry_record.queue_key), ro).map( x=> decode_queue_record(x) ).getOrElse(null)
+  //            }
+  //            usage.increment(entry_record.size)
+  //          }
+  //
+  //          true
+  //        }
+  //      }
+  //    }
+  //
+  //    import collection.JavaConversions._
+  //    usage_map.values.toSeq.toArray
+  //  }
+
+
+  def export_data(os: OutputStream): Option[String] = {
+    try {
+      val manager = ExportStreamManager(os, 1)
+
+      retry_using_index {
+
+        // Delete all the tmp keys..
+        index.cursor_keys_prefixed(Array(tmp_prefix)) {
+          key =>
+            index.delete(key)
+            true
+        }
+
+        index.snapshot {
+          snapshot =>
+            val nocache = new ReadOptions
+            nocache.snapshot(snapshot)
+            nocache.verifyChecksums(verify_checksums)
+            nocache.fillCache(false)
+
+            val cache = new ReadOptions
+            nocache.snapshot(snapshot)
+            nocache.verifyChecksums(false)
+            nocache.fillCache(false)
+
+            // Build a temp table of all references messages by the queues
+            // Remember 2 queues could reference the same message.
+            index.cursor_prefixed(queue_entry_prefix_array, cache) {
+              (_, value) =>
+                val record = QueueEntryPB.FACTORY.parseUnframed(value)
+                val (pos, len) = decode_locator(record.getMessageLocator)
+                index.put(encode_key(tmp_prefix, pos), encode_vlong(len))
+                true
+            }
+
+            // Use the temp table to export all the referenced messages. Use
+            // the log position as the message key.
+            index.cursor_prefixed(Array(tmp_prefix)) {
+              (key, value) =>
+                val (_, pos) = decode_long_key(key)
+                val len = decode_vlong(value).toInt
+                log.read(pos, len).foreach {
+                  case (kind, data) =>
+                    val msg_data = kind match {
+                      case LOG_ADD_MESSAGE => data
+                      case LOG_ADD_MESSAGE_SNAPPY => Snappy.uncompress(data)
+                    }
+                    val record = MessagePB.FACTORY.parseUnframed(msg_data).copy()
+                    record.setMessageKey(pos)
+                    manager.store_message(record)
+                }
+                true
+            }
+
+            // Now export the queue entries
+            index.cursor_prefixed(queue_entry_prefix_array, nocache) {
+              (key, value) =>
+                val (_, queue_key, queue_seq) = decode_long_long_key(key)
+                val record = QueueEntryPB.FACTORY.parseUnframed(value).copy()
+                val (pos, len) = decode_locator(record.getMessageLocator)
+                record.setQueueKey(queue_key)
+                record.setQueueSeq(queue_seq)
+                record.setMessageKey(pos)
+                manager.store_queue_entry(record)
+                true
+            }
+
+            index.cursor_prefixed(queue_prefix_array) {
+              (_, value) =>
+                val record = QueuePB.FACTORY.parseUnframed(value)
+                manager.store_queue(record)
+                true
+            }
+
+            index.cursor_prefixed(map_prefix_array, nocache) {
+              (key, value) =>
+                val key_buffer = new Buffer(key)
+                key_buffer.moveHead(1)
+                val record = new MapEntryPB.Bean
+                record.setKey(key_buffer)
+                record.setValue(new Buffer(value))
+                manager.store_map_entry(record)
+                true
+            }
+
+        }
+
+        // Delete all the tmp keys..
+        index.cursor_keys_prefixed(Array(tmp_prefix)) {
+          key =>
+            index.delete(key)
+            true
+        }
+
+      }
+      manager.finish
+
+      None
+    } catch {
+      case x: Exception =>
+        debug(x, "Export failed")
+        x.printStackTrace()
+        Some(x.getMessage)
+    }
+  }
+
+  def import_data(is: InputStream): Option[String] = {
+    try {
+      val manager = ImportStreamManager(is)
+      if (manager.version != 1) {
+        return Some("Cannot import from an export file of version: " + manager.version)
+      }
+
+      purge
+
+      retry_using_index {
+        log.appender {
+          appender =>
+            while (manager.getNext match {
+
+              case record: MessagePB.Buffer =>
+                var message_data = record.toUnframedBuffer
+                val (pos, _) = if (snappy_compress_logs) {
+                  val compressed = Snappy.compress(message_data)
+                  if (compressed.length < message_data.length) {
+                    message_data = compressed
+                    appender.append(LOG_ADD_MESSAGE_SNAPPY, message_data)
+                  } else {
+                    appender.append(LOG_ADD_MESSAGE, message_data)
+                  }
+                } else {
+                  appender.append(LOG_ADD_MESSAGE, message_data)
+                }
+                index.put(encode_key(tmp_prefix, record.getMessageKey), encode_locator(pos, message_data.length))
+                true
+
+              case record: QueueEntryPB.Buffer =>
+                val copy = record.copy();
+                var original_msg_key: Long = record.getMessageKey
+                index.get(encode_key(tmp_prefix, original_msg_key)) match {
+                  case Some(locator) =>
+                    val (pos, len) = decode_locator(locator)
+                    copy.setMessageLocator(locator)
+                    index.put(encode_key(queue_entry_prefix, record.getQueueKey, record.getQueueSeq), copy.freeze().toUnframedBuffer)
+                    log.log_info(pos).foreach {
+                      log_info =>
+                        log_refs.getOrElseUpdate(log_info.position, new LongCounter()).incrementAndGet()
+                    }
+                  case None =>
+                    println("Invalid queue entry, references message that was not in the export: " + original_msg_key)
+                }
+                true
+
+              case record: QueuePB.Buffer =>
+                index.put(encode_key(queue_prefix, record.getKey), record.toUnframedBuffer)
+                true
+
+              case record: MapEntryPB.Buffer =>
+                index.put(encode_key(map_prefix, record.getKey), record.getValue)
+                true
+
+              case null =>
+                false
+            }) {
+              // keep looping
+            }
+
+        }
+      }
+
+      store_log_refs
+      // Delete all the tmp keys..
+      index.cursor_keys_prefixed(Array(tmp_prefix)) {
+        key =>
+          index.delete(key)
+          true
+      }
+
+      snapshot_index
+      None
+
+    } catch {
+      case x: Exception =>
+        debug(x, "Import failed")
+        Some(x.getMessage)
+    }
+  }
+}

Propchange: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala
------------------------------------------------------------------------------
    svn:executable = *

Copied: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStore.scala (from r1245945, activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStore.scala?p2=activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStore.scala&p1=activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala&r1=1245945&r2=1291054&rev=1291054&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStore.scala Sun Feb 19 20:24:15 2012
@@ -1,3 +1,5 @@
+package org.apache.activemq.apollo.broker.store.leveldb
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -14,7 +16,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.apollo.broker.store.leveldb
 
 import dto.{LevelDBStoreDTO, LevelDBStoreStatusDTO}
 import collection.Seq
@@ -40,31 +41,32 @@ object LevelDBStore extends Log {
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class LevelDBStore(val config:LevelDBStoreDTO) extends DelayingStoreSupport {
+class LevelDBStore(val config: LevelDBStoreDTO) extends DelayingStoreSupport {
 
   var next_queue_key = new AtomicLong(1)
   var next_msg_key = new AtomicLong(1)
 
-  var write_executor:ExecutorService = _
-  var read_executor:ExecutorService = _
+  var write_executor: ExecutorService = _
+  var read_executor: ExecutorService = _
+
+  var client: LevelDBClient = _
 
-  var client:LevelDBClient = _
   def create_client = new LevelDBClient(this)
 
 
   def store_kind = "leveldb"
 
-  override def toString = store_kind+" store at "+config.directory
+  override def toString = store_kind + " store at " + config.directory
 
   override protected def locator_based = true
 
   def flush_delay = config.flush_delay.getOrElse(500)
-  
+
   protected def get_next_msg_key = next_msg_key.getAndIncrement
 
-  protected def store(uows: Seq[DelayableUOW])(callback: =>Unit) = {
+  protected def store(uows: Seq[DelayableUOW])(callback: => Unit) = {
     write_executor {
-      client.store(uows, ^{
+      client.store(uows, ^ {
         dispatch_queue {
           callback
         }
@@ -98,16 +100,16 @@ class LevelDBStore(val config:LevelDBSto
           poll_gc
           on_completed.run
         } catch {
-          case e:Throwable =>
+          case e: Throwable =>
             e.printStackTrace()
-            LevelDBStore.error(e, "Store client startup failure: "+e)
+            LevelDBStore.error(e, "Store client startup failure: " + e)
         }
       }
     }
     catch {
-      case e:Throwable =>
+      case e: Throwable =>
         e.printStackTrace()
-        LevelDBStore.error(e, "Store startup failure: "+e)
+        LevelDBStore.error(e, "Store startup failure: " + e)
     }
   }
 
@@ -131,15 +133,15 @@ class LevelDBStore(val config:LevelDBSto
     ss.is_starting || ss.is_started
   }
 
-  def poll_gc:Unit = dispatch_queue.after(10, TimeUnit.SECONDS) {
-    if( keep_polling ) {
+  def poll_gc: Unit = dispatch_queue.after(10, TimeUnit.SECONDS) {
+    if (keep_polling) {
       gc {
         poll_gc
       }
     }
   }
 
-  def gc(onComplete: =>Unit) = write_executor {
+  def gc(onComplete: => Unit) = write_executor {
     client.gc
     onComplete
   }
@@ -153,7 +155,7 @@ class LevelDBStore(val config:LevelDBSto
   /**
    * Deletes all stored data from the store.
    */
-  def purge(callback: =>Unit) = {
+  def purge(callback: => Unit) = {
     write_executor {
       client.purge()
       next_queue_key.set(1)
@@ -169,7 +171,7 @@ class LevelDBStore(val config:LevelDBSto
     }
   }
 
-  def get_prefixed_map_entries(prefix:Buffer)(callback: Seq[(Buffer, Buffer)]=>Unit) = {
+  def get_prefixed_map_entries(prefix: Buffer)(callback: Seq[(Buffer, Buffer)] => Unit) = {
     read_executor {
       callback(client.get_prefixed_map_entries(prefix))
     }
@@ -178,7 +180,7 @@ class LevelDBStore(val config:LevelDBSto
   /**
    * Ges the last queue key identifier stored.
    */
-  def get_last_queue_key(callback:(Option[Long])=>Unit):Unit = {
+  def get_last_queue_key(callback: (Option[Long]) => Unit): Unit = {
     write_executor {
       callback(Some(client.get_last_queue_key))
     }
@@ -186,81 +188,91 @@ class LevelDBStore(val config:LevelDBSto
 
   def add_queue(record: QueueRecord)(callback: (Boolean) => Unit) = {
     write_executor {
-     client.add_queue(record, ^{ callback(true) })
+      client.add_queue(record, ^ {
+        callback(true)
+      })
     }
   }
 
   def remove_queue(queueKey: Long)(callback: (Boolean) => Unit) = {
     write_executor {
-      client.remove_queue(queueKey,^{ callback(true) })
+      client.remove_queue(queueKey, ^ {
+        callback(true)
+      })
     }
   }
 
   def get_queue(queueKey: Long)(callback: (Option[QueueRecord]) => Unit) = {
     write_executor {
-      callback( client.get_queue(queueKey) )
+      callback(client.get_queue(queueKey))
     }
   }
 
   def list_queues(callback: (Seq[Long]) => Unit) = {
     write_executor {
-      callback( client.list_queues )
+      callback(client.list_queues)
     }
   }
 
-  val load_source = createSource(new ListEventAggregator[(Long, AtomicReference[Object], (Option[MessageRecord])=>Unit)](), dispatch_queue)
-  load_source.setEventHandler(^{drain_loads});
+  val load_source = createSource(new ListEventAggregator[(Long, AtomicReference[Object], (Option[MessageRecord]) => Unit)](), dispatch_queue)
+  load_source.setEventHandler(^ {
+    drain_loads
+  });
   load_source.resume
 
 
-  def load_message(messageKey: Long, locator:AtomicReference[Object])(callback: (Option[MessageRecord]) => Unit) = {
-    message_load_latency_counter.start { end=>
-      load_source.merge((messageKey, locator, { (result)=>
-        end()
-        callback(result)
-      }))
+  def load_message(messageKey: Long, locator: AtomicReference[Object])(callback: (Option[MessageRecord]) => Unit) = {
+    message_load_latency_counter.start {
+      end =>
+        load_source.merge((messageKey, locator, {
+          (result) =>
+            end()
+            callback(result)
+        }))
     }
   }
 
   def drain_loads = {
     var data = load_source.getData
     message_load_batch_size_counter += data.size
-    read_executor ^{
+    read_executor ^ {
       client.loadMessages(data)
     }
   }
 
   def list_queue_entry_ranges(queueKey: Long, limit: Int)(callback: (Seq[QueueEntryRange]) => Unit) = {
-    write_executor ^{
-      callback( client.listQueueEntryGroups(queueKey, limit) )
+    write_executor ^ {
+      callback(client.listQueueEntryGroups(queueKey, limit))
     }
   }
 
   def list_queue_entries(queueKey: Long, firstSeq: Long, lastSeq: Long)(callback: (Seq[QueueEntryRecord]) => Unit) = {
-    write_executor ^{
-      callback( client.getQueueEntries(queueKey, firstSeq, lastSeq) )
+    write_executor ^ {
+      callback(client.getQueueEntries(queueKey, firstSeq, lastSeq))
     }
   }
 
-  def poll_stats:Unit = {
+  def poll_stats: Unit = {
     def displayStats = {
-      if( service_state.is_started ) {
+      if (service_state.is_started) {
 
         flush_latency = flush_latency_counter(true)
         message_load_latency = message_load_latency_counter(true)
-//        client.metric_journal_append = client.metric_journal_append_counter(true)
-//        client.metric_index_update = client.metric_index_update_counter(true)
+        //        client.metric_journal_append = client.metric_journal_append_counter(true)
+        //        client.metric_index_update = client.metric_index_update_counter(true)
         close_latency = close_latency_counter(true)
-        message_load_batch_size =  message_load_batch_size_counter(true)
+        message_load_batch_size = message_load_batch_size_counter(true)
 
         poll_stats
       }
     }
 
-    dispatch_queue.executeAfter(1, TimeUnit.SECONDS, ^{ displayStats })
+    dispatch_queue.executeAfter(1, TimeUnit.SECONDS, ^ {
+      displayStats
+    })
   }
 
-  def get_store_status(callback:(StoreStatusDTO)=>Unit) = dispatch_queue {
+  def get_store_status(callback: (StoreStatusDTO) => Unit) = dispatch_queue {
     val rc = new LevelDBStoreStatusDTO
     fill_store_status(rc)
     rc.message_load_batch_size = message_load_batch_size
@@ -272,21 +284,24 @@ class LevelDBStore(val config:LevelDBSto
         rc.log_stats = {
           import collection.JavaConversions._
           var row_layout = "%-20s | %-10s | %-10s\n"
-          row_layout.format("Log File", "Msg Refs", "File Size")+
-          client.log.log_infos.map{case (id,info)=> id -> client.log_refs.get(id).map(_.get)}.toSeq.flatMap { case (id, refs)=>
-            try {
-              val file = LevelDBClient.create_sequence_file(client.directory, id, LevelDBClient.LOG_SUFFIX)
-              val size = file.length()
-              Some(row_layout.format(
-                file.getName,
-                refs.getOrElse(0L).toString,
-                ViewHelper.memory(size)
-              ))
-            } catch {
-              case e:Throwable =>
-                None
-            }
-          }.mkString("")
+          row_layout.format("Log File", "Msg Refs", "File Size") +
+            client.log.log_infos.map {
+              case (id, info) => id -> client.log_refs.get(id).map(_.get)
+            }.toSeq.flatMap {
+              case (id, refs) =>
+                try {
+                  val file = LevelDBClient.create_sequence_file(client.directory, id, LevelDBClient.LOG_SUFFIX)
+                  val size = file.length()
+                  Some(row_layout.format(
+                    file.getName,
+                    refs.getOrElse(0L).toString,
+                    ViewHelper.memory(size)
+                  ))
+                } catch {
+                  case e: Throwable =>
+                    None
+                }
+            }.mkString("")
         }
       }
       callback(rc)
@@ -297,7 +312,7 @@ class LevelDBStore(val config:LevelDBSto
    * Exports the contents of the store to the provided streams.  Each stream should contain
    * a list of framed protobuf objects with the corresponding object types.
    */
-  def export_data(os:OutputStream, cb:(Option[String])=>Unit) = write_executor {
+  def export_data(os: OutputStream, cb: (Option[String]) => Unit) = write_executor {
     cb(client.export_data(os))
   }
 
@@ -305,7 +320,7 @@ class LevelDBStore(val config:LevelDBSto
    * Imports a previously exported set of streams.  This deletes any previous data
    * in the store.
    */
-  def import_data(is:InputStream, cb:(Option[String])=>Unit) = write_executor {
+  def import_data(is: InputStream, cb: (Option[String]) => Unit) = write_executor {
     cb(client.import_data(is))
   }
 

Copied: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStoreFactory.scala (from r1245945, activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStoreFactory.scala)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStoreFactory.scala?p2=activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStoreFactory.scala&p1=activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStoreFactory.scala&r1=1245945&r2=1291054&rev=1291054&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStoreFactory.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStoreFactory.scala Sun Feb 19 20:24:15 2012
@@ -1,3 +1,5 @@
+package org.apache.activemq.apollo.broker.store.leveldb
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -14,7 +16,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.apollo.broker.store.leveldb
 
 import dto.LevelDBStoreDTO
 import org.apache.activemq.apollo.broker.store.StoreFactory
@@ -30,13 +31,13 @@ import org.apache.activemq.apollo.util._
  * This class is discovered using the following resource file:
  * <code>META-INF/services/org.apache.activemq.apollo/stores</code>
  * </p>
- * 
+ *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 class LevelDBStoreFactory extends StoreFactory {
-  def create(config: StoreDTO) =  config match {
-    case config:LevelDBStoreDTO =>
-      if( config.getClass == classOf[LevelDBStoreDTO] ) {
+  def create(config: StoreDTO) = config match {
+    case config: LevelDBStoreDTO =>
+      if (config.getClass == classOf[LevelDBStoreDTO]) {
         new LevelDBStore(config)
       } else {
         null