You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/09/25 16:32:31 UTC

svn commit: r1389882 [7/7] - in /activemq/trunk: ./ activemq-core/ activemq-core/src/main/java/org/apache/activemq/store/leveldb/ activemq-core/src/main/resources/ activemq-core/src/main/resources/META-INF/ activemq-core/src/main/resources/META-INF/ser...

Added: activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/util/FileSupport.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/util/FileSupport.scala?rev=1389882&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/util/FileSupport.scala (added)
+++ activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/util/FileSupport.scala Tue Sep 25 14:32:28 2012
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.leveldb.util
+
+import java.io._
+import org.fusesource.hawtdispatch._
+import org.apache.activemq.leveldb.LevelDBClient
+import org.fusesource.leveldbjni.internal.Util
+import org.apache.activemq.leveldb.util.ProcessSupport._
+
+object FileSupport {
+
+  implicit def toRichFile(file:File):RichFile = new RichFile(file)
+
+  val onWindows = System.getProperty("os.name").toLowerCase().startsWith("windows")
+  private var linkStrategy = 0
+  private val LOG = Log(getClass)
+  
+  def link(source:File, target:File):Unit = {
+    linkStrategy match {
+      case 0 =>
+        // We first try to link via a native system call. Fails if
+        // we cannot load the JNI module.
+        try {
+          Util.link(source, target)
+        } catch {
+          case e:IOException => throw e
+          case e:Throwable =>
+            // Fallback.. to a slower impl..
+            LOG.debug("Native link system call not available")
+            linkStrategy = 5
+            link(source, target)
+        }
+
+      // TODO: consider implementing a case which does the native system call using JNA
+
+      case 5 =>
+        // Next we try to do the link by executing an
+        // operating system shell command
+        try {
+          if( onWindows ) {
+            system("fsutil", "hardlink", "create", target.getCanonicalPath, source.getCanonicalPath) match {
+              case(0, _, _) => // Success
+              case (_, out, err) =>
+                // TODO: we might want to look at the out/err to see why it failed
+                // to avoid falling back to the slower strategy.
+                LOG.debug("fsutil OS command not available either")
+                linkStrategy = 10
+                link(source, target)
+            }
+          } else {
+            system("ln", source.getCanonicalPath, target.getCanonicalPath) match {
+              case(0, _, _) => // Success
+              case (_, out, err) => None
+                // TODO: we might want to look at the out/err to see why it failed
+                // to avoid falling back to the slower strategy.
+                LOG.debug("ln OS command not available either")
+                linkStrategy = 2
+                link(source, target)
+            }
+          }
+        } catch {
+          case e:Throwable =>
+        }
+      case _ =>
+        // this final strategy is slow but sure to work.
+        source.copyTo(target)
+    }
+  }
+
+  def systemDir(name:String) = {
+    val baseValue = System.getProperty(name)
+    if( baseValue==null ) {
+      sys.error("The the %s system property is not set.".format(name))
+    }
+    val file = new File(baseValue)
+    if( !file.isDirectory  ) {
+      sys.error("The the %s system property is not set to valid directory path %s".format(name, baseValue))
+    }
+    file
+  }
+
+  case class RichFile(self:File) {
+
+    def / (path:String) = new File(self, path)
+
+    def linkTo(target:File) = link(self, target)
+
+    def copyTo(target:File) = {
+      using(new FileOutputStream(target)){ os=>
+        using(new FileInputStream(self)){ is=>
+          FileSupport.copy(is, os)
+        }
+      }
+    }
+
+    def listFiles:Array[File] = {
+      Option(self.listFiles()).getOrElse(Array())
+    }
+
+    def recursiveList:List[File] = {
+      if( self.isDirectory ) {
+        self :: self.listFiles.toList.flatten( _.recursiveList )
+      } else {
+        self :: Nil
+      }
+    }
+
+    def recursiveDelete: Unit = {
+      if( self.exists ) {
+        if( self.isDirectory ) {
+          self.listFiles.foreach(_.recursiveDelete)
+        }
+        self.delete
+      }
+    }
+
+    def recursiveCopyTo(target: File) : Unit = {
+      if (self.isDirectory) {
+        target.mkdirs
+        self.listFiles.foreach( file=> file.recursiveCopyTo( target / file.getName) )
+      } else {
+        self.copyTo(target)
+      }
+    }
+
+    def readText(charset:String="UTF-8"): String = {
+      using(new FileInputStream(self)) { in =>
+        FileSupport.readText(in, charset)
+      }
+    }
+
+    def readBytes: Array[Byte] = {
+      using(new FileInputStream(self)) { in =>
+        FileSupport.readBytes(in)
+      }
+    }
+
+    def writeBytes(data:Array[Byte]):Unit = {
+      using(new FileOutputStream(self)) { out =>
+        FileSupport.writeBytes(out, data)
+      }
+    }
+
+    def writeText(data:String, charset:String="UTF-8"):Unit = {
+      using(new FileOutputStream(self)) { out =>
+        FileSupport.writeText(out, data, charset)
+      }
+    }
+
+  }
+
+  /**
+   * Returns the number of bytes copied.
+   */
+  def copy(in: InputStream, out: OutputStream): Long = {
+    var bytesCopied: Long = 0
+    val buffer = new Array[Byte](8192)
+    var bytes = in.read(buffer)
+    while (bytes >= 0) {
+      out.write(buffer, 0, bytes)
+      bytesCopied += bytes
+      bytes = in.read(buffer)
+    }
+    bytesCopied
+  }
+
+  def using[R,C <: Closeable](closable: C)(proc: C=>R) = {
+    try {
+      proc(closable)
+    } finally {
+      try { closable.close  }  catch { case ignore =>  }
+    }
+  }
+
+  def readText(in: InputStream, charset:String="UTF-8"): String = {
+    new String(readBytes(in), charset)
+  }
+
+  def readBytes(in: InputStream): Array[Byte] = {
+    val out = new ByteArrayOutputStream()
+    copy(in, out)
+    out.toByteArray
+  }
+
+  def writeText(out: OutputStream, value: String, charset:String="UTF-8"): Unit = {
+    writeBytes(out, value.getBytes(charset))
+  }
+
+  def writeBytes(out: OutputStream, data: Array[Byte]): Unit = {
+    copy(new ByteArrayInputStream(data), out)
+  }
+
+}
+
+object ProcessSupport {
+  import FileSupport._
+
+  implicit def toRichProcessBuilder(self:ProcessBuilder):RichProcessBuilder = new RichProcessBuilder(self)
+
+  case class RichProcessBuilder(self:ProcessBuilder) {
+
+    def start(out:OutputStream=null, err:OutputStream=null, in:InputStream=null) = {
+      self.redirectErrorStream(out == err)
+      val process = self.start
+      if( in!=null ) {
+        LevelDBClient.THREAD_POOL {
+          try {
+            using(process.getOutputStream) { out =>
+              FileSupport.copy(in, out)
+            }
+          } catch {
+            case _ =>
+          }
+        }
+      } else {
+        process.getOutputStream.close
+      }
+
+      if( out!=null ) {
+        LevelDBClient.THREAD_POOL {
+          try {
+            using(process.getInputStream) { in =>
+              FileSupport.copy(in, out)
+            }
+          } catch {
+            case _ =>
+          }
+        }
+      } else {
+        process.getInputStream.close
+      }
+
+      if( err!=null && err!=out ) {
+        LevelDBClient.THREAD_POOL {
+          try {
+            using(process.getErrorStream) { in =>
+              FileSupport.copy(in, err)
+            }
+          } catch {
+            case _ =>
+          }
+        }
+      } else {
+        process.getErrorStream.close
+      }
+      process
+    }
+
+  }
+
+  implicit def toRichProcess(self:Process):RichProcess = new RichProcess(self)
+
+  case class RichProcess(self:Process) {
+    def onExit(func: (Int)=>Unit) = LevelDBClient.THREAD_POOL {
+      self.waitFor
+      func(self.exitValue)
+    }
+  }
+
+  implicit def toProcessBuilder(args:Seq[String]):ProcessBuilder = new ProcessBuilder().command(args : _*)
+
+  def launch(command:String*)(func: (Int, Array[Byte], Array[Byte])=>Unit ):Unit = launch(command)(func)
+  def launch(p:ProcessBuilder, in:InputStream=null)(func: (Int, Array[Byte], Array[Byte]) => Unit):Unit = {
+    val out = new ByteArrayOutputStream
+    val err = new ByteArrayOutputStream
+    p.start(out, err, in).onExit { code=>
+      func(code, out.toByteArray, err.toByteArray)
+    }
+  }
+
+  def system(command:String*):(Int, Array[Byte], Array[Byte]) = system(command)
+  def system(p:ProcessBuilder, in:InputStream=null):(Int, Array[Byte], Array[Byte]) = {
+    val out = new ByteArrayOutputStream
+    val err = new ByteArrayOutputStream
+    val process = p.start(out, err, in)
+    process.waitFor
+    (process.exitValue, out.toByteArray, err.toByteArray)
+  }
+
+}
\ No newline at end of file

Added: activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/util/Log.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/util/Log.scala?rev=1389882&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/util/Log.scala (added)
+++ activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/util/Log.scala Tue Sep 25 14:32:28 2012
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.leveldb.util
+
+import java.util.concurrent.atomic.AtomicLong
+import org.slf4j.{MDC, Logger, LoggerFactory}
+import java.lang.{Throwable, String}
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object Log {
+
+  def apply(clazz:Class[_]):Log = apply(clazz.getName.stripSuffix("$"))
+
+  def apply(name:String):Log = new Log {
+    override val log = LoggerFactory.getLogger(name)
+  }
+
+  def apply(value:Logger):Log = new Log {
+    override val log = value
+  }
+
+  val exception_id_generator = new AtomicLong(System.currentTimeMillis)
+  def next_exception_id = exception_id_generator.incrementAndGet.toHexString
+}
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+trait Log {
+  import Log._
+  val log = LoggerFactory.getLogger(getClass.getName.stripSuffix("$"))
+
+  private def with_throwable(e:Throwable)(func: =>Unit) = {
+    if( e!=null ) {
+      val stack_ref = if( log.isDebugEnabled ) {
+        val id = next_exception_id
+        MDC.put("stackref", id.toString);
+        Some(id)
+      } else {
+        None
+      }
+      func
+      stack_ref.foreach { id=>
+        log.debug(e.toString, e)
+        MDC.remove("stackref")
+      }
+    } else {
+      func
+    }
+  }
+
+  private def format(message:String, args:Seq[Any]) = {
+    if( args.isEmpty ) {
+      message
+    } else {
+      message.format(args.map(_.asInstanceOf[AnyRef]) : _*)
+    }
+  }
+
+  def error(m: => String, args:Any*): Unit = {
+    if( log.isErrorEnabled ) {
+      log.error(format(m, args.toSeq))
+    }
+  }
+
+  def error(e: Throwable, m: => String, args:Any*): Unit = {
+    with_throwable(e) {
+      if( log.isErrorEnabled ) {
+        log.error(format(m, args.toSeq))
+      }
+    }
+  }
+
+  def error(e: Throwable): Unit = {
+    with_throwable(e) {
+      if( log.isErrorEnabled ) {
+        log.error(e.getMessage)
+      }
+    }
+  }
+
+  def warn(m: => String, args:Any*): Unit = {
+    if( log.isWarnEnabled ) {
+      log.warn(format(m, args.toSeq))
+    }
+  }
+
+  def warn(e: Throwable, m: => String, args:Any*): Unit = {
+    with_throwable(e) {
+      if( log.isWarnEnabled ) {
+        log.warn(format(m, args.toSeq))
+      }
+    }
+  }
+
+  def warn(e: Throwable): Unit = {
+    with_throwable(e) {
+      if( log.isWarnEnabled ) {
+        log.warn(e.toString)
+      }
+    }
+  }
+
+  def info(m: => String, args:Any*): Unit = {
+    if( log.isInfoEnabled ) {
+      log.info(format(m, args.toSeq))
+    }
+  }
+
+  def info(e: Throwable, m: => String, args:Any*): Unit = {
+    with_throwable(e) {
+      if( log.isInfoEnabled ) {
+        log.info(format(m, args.toSeq))
+      }
+    }
+  }
+
+  def info(e: Throwable): Unit = {
+    with_throwable(e) {
+      if( log.isInfoEnabled ) {
+        log.info(e.toString)
+      }
+    }
+  }
+
+
+  def debug(m: => String, args:Any*): Unit = {
+    if( log.isDebugEnabled ) {
+      log.debug(format(m, args.toSeq))
+    }
+  }
+
+  def debug(e: Throwable, m: => String, args:Any*): Unit = {
+    if( log.isDebugEnabled ) {
+      log.debug(format(m, args.toSeq), e)
+    }
+  }
+
+  def debug(e: Throwable): Unit = {
+    if( log.isDebugEnabled ) {
+      log.debug(e.toString, e)
+    }
+  }
+
+  def trace(m: => String, args:Any*): Unit = {
+    if( log.isTraceEnabled ) {
+      log.trace(format(m, args.toSeq))
+    }
+  }
+
+  def trace(e: Throwable, m: => String, args:Any*): Unit = {
+    if( log.isTraceEnabled ) {
+      log.trace(format(m, args.toSeq), e)
+    }
+  }
+
+  def trace(e: Throwable): Unit = {
+    if( log.isTraceEnabled ) {
+      log.trace(e.toString, e)
+    }
+  }
+
+}
\ No newline at end of file

Copied: activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/util/LongCounter.scala (from r1389860, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/leveldb/LevelDBPersistenceAdapter.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/util/LongCounter.scala?p2=activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/util/LongCounter.scala&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/leveldb/LevelDBPersistenceAdapter.java&r1=1389860&r2=1389882&rev=1389882&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/leveldb/LevelDBPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/util/LongCounter.scala Tue Sep 25 14:32:28 2012
@@ -14,17 +14,35 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.store.leveldb;
-
-import org.fusesource.mq.leveldb.LevelDBStore;
 
+package org.apache.activemq.leveldb.util
 
 /**
- * An implementation of {@link org.apache.activemq.store.PersistenceAdapter} designed for use with
- * LevelDB - Embedded Lightweight Non-Relational Database
- *
- * @org.apache.xbean.XBean element="levelDB"
+ * <p>
+ * </p>
  *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-public class LevelDBPersistenceAdapter extends LevelDBStore {
-}
+class LongCounter(private var value:Long = 0) extends Serializable {
+
+  def clear() = value=0
+  def get() = value
+  def set(value:Long) = this.value = value 
+
+  def incrementAndGet() = addAndGet(1)
+  def decrementAndGet() = addAndGet(-1)
+  def addAndGet(amount:Long) = {
+    value+=amount
+    value
+  }
+
+  def getAndIncrement() = getAndAdd(1)
+  def getAndDecrement() = getAndAdd(-11)
+  def getAndAdd(amount:Long) = {
+    val rc = value
+    value+=amount
+    rc
+  }
+
+  override def toString() = get().toString
+}
\ No newline at end of file

Added: activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/util/RetrySupport.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/util/RetrySupport.scala?rev=1389882&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/util/RetrySupport.scala (added)
+++ activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/util/RetrySupport.scala Tue Sep 25 14:32:28 2012
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.leveldb.util
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object RetrySupport {
+
+  def retry[T](log:Log, isStarted: ()=>Boolean, func: ()=>T): T = {
+    import log._
+    var error:Throwable = null
+    var rc:Option[T] = None
+
+    // We will loop until the tx succeeds.  Perhaps it's
+    // failing due to a temporary condition like low disk space.
+    while(!rc.isDefined) {
+
+      try {
+        rc = Some(func())
+      } catch {
+        case e:Throwable =>
+          e.printStackTrace()
+          if( error==null ) {
+            warn(e, "DB operation failed. (entering recovery mode)")
+          }
+          error = e
+      }
+
+      if (!rc.isDefined) {
+        // We may need to give up if the store is being stopped.
+        if ( !isStarted() ) {
+          throw error
+        }
+        Thread.sleep(1000)
+      }
+    }
+
+    if( error!=null ) {
+      info("DB recovered from failure.")
+    }
+    rc.get
+  }
+
+}
\ No newline at end of file

Copied: activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/util/TimeMetric.scala (from r1389860, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/leveldb/LevelDBPersistenceAdapter.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/util/TimeMetric.scala?p2=activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/util/TimeMetric.scala&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/leveldb/LevelDBPersistenceAdapter.java&r1=1389860&r2=1389882&rev=1389882&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/leveldb/LevelDBPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/util/TimeMetric.scala Tue Sep 25 14:32:28 2012
@@ -14,17 +14,37 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.store.leveldb;
 
-import org.fusesource.mq.leveldb.LevelDBStore;
+package org.apache.activemq.leveldb.util
 
+case class TimeMetric() {
+  var max = 0L
+
+  def add(duration:Long) = this.synchronized {
+    max = max.max(duration)
+  }
+
+  def get = {
+    this.synchronized {
+      max
+    } / 1000000.0
+  }
+  def reset = {
+    this.synchronized {
+      val rc = max
+      max  = 0
+      rc
+    } / 1000000.0
+  }
+
+  def apply[T](func: =>T):T = {
+    val start = System.nanoTime()
+    try {
+      func
+    } finally {
+      add(System.nanoTime() - start)
+    }
+  }
 
-/**
- * An implementation of {@link org.apache.activemq.store.PersistenceAdapter} designed for use with
- * LevelDB - Embedded Lightweight Non-Relational Database
- *
- * @org.apache.xbean.XBean element="levelDB"
- *
- */
-public class LevelDBPersistenceAdapter extends LevelDBStore {
 }
+

Copied: activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/store/leveldb/package.html (from r1389860, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/leveldb/package.html)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/store/leveldb/package.html?p2=activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/store/leveldb/package.html&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/leveldb/package.html&r1=1389860&r2=1389882&rev=1389882&view=diff
==============================================================================
    (empty)

Copied: activemq/trunk/activemq-leveldb/src/test/java/org/apache/activemq/store/leveldb/LevelDBConfigTest.java (from r1389860, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/leveldb/LevelDBConfigTest.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/test/java/org/apache/activemq/store/leveldb/LevelDBConfigTest.java?p2=activemq/trunk/activemq-leveldb/src/test/java/org/apache/activemq/store/leveldb/LevelDBConfigTest.java&p1=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/leveldb/LevelDBConfigTest.java&r1=1389860&r2=1389882&rev=1389882&view=diff
==============================================================================
    (empty)

Copied: activemq/trunk/activemq-leveldb/src/test/java/org/apache/activemq/store/leveldb/LevelDBStoreBrokerTest.java (from r1389860, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/leveldb/LevelDBStoreBrokerTest.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/test/java/org/apache/activemq/store/leveldb/LevelDBStoreBrokerTest.java?p2=activemq/trunk/activemq-leveldb/src/test/java/org/apache/activemq/store/leveldb/LevelDBStoreBrokerTest.java&p1=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/leveldb/LevelDBStoreBrokerTest.java&r1=1389860&r2=1389882&rev=1389882&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/leveldb/LevelDBStoreBrokerTest.java (original)
+++ activemq/trunk/activemq-leveldb/src/test/java/org/apache/activemq/store/leveldb/LevelDBStoreBrokerTest.java Tue Sep 25 14:32:28 2012
@@ -23,7 +23,7 @@ import org.apache.activemq.broker.Broker
 import org.apache.activemq.broker.BrokerTest;
 import org.apache.activemq.store.kahadb.KahaDBStore;
 import org.apache.activemq.util.IOHelper;
-import org.fusesource.mq.leveldb.LevelDBStore;
+import org.apache.activemq.leveldb.LevelDBStore;
 
 /**
  * Once the wire format is completed we can test against real persistence storage.

Added: activemq/trunk/activemq-leveldb/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/test/resources/log4j.properties?rev=1389882&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb/src/test/resources/log4j.properties (added)
+++ activemq/trunk/activemq-leveldb/src/test/resources/log4j.properties Tue Sep 25 14:32:28 2012
@@ -0,0 +1,36 @@
+# 
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#      http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# 
+
+#
+# The logging properties used during tests..
+#
+log4j.rootLogger=WARN, console, file
+log4j.logger.org.apache.activemq=INFO
+log4j.logger.org.fusesource=INFO
+
+# Console will only display warnnings
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%-5p | %t | %m%n
+log4j.appender.console.threshold=TRACE
+
+# File appender will contain all info messages
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d | %-5p | %m | %c | %t%n
+log4j.appender.file.file=target/test.log
+log4j.appender.file.append=true

Propchange: activemq/trunk/activemq-leveldb/src/test/resources/log4j.properties
------------------------------------------------------------------------------
    svn:executable = *

Copied: activemq/trunk/activemq-leveldb/src/test/resources/org/apache/activemq/store/leveldb/leveldb.xml (from r1389860, activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/store/leveldb/leveldb.xml)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/test/resources/org/apache/activemq/store/leveldb/leveldb.xml?p2=activemq/trunk/activemq-leveldb/src/test/resources/org/apache/activemq/store/leveldb/leveldb.xml&p1=activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/store/leveldb/leveldb.xml&r1=1389860&r2=1389882&rev=1389882&view=diff
==============================================================================
    (empty)

Added: activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/ActiveMQScenario.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/ActiveMQScenario.scala?rev=1389882&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/ActiveMQScenario.scala (added)
+++ activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/ActiveMQScenario.scala Tue Sep 25 14:32:28 2012
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.leveldb
+
+import org.apache.activemq.spring.ActiveMQConnectionFactory
+import javax.jms.{Destination, ConnectionFactory}
+import org.apache.activemq.command.{ActiveMQTopic, ActiveMQQueue}
+
+/**
+ * <p>
+ * ActiveMQ implementation of the JMS Scenario class.
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class ActiveMQScenario extends JMSClientScenario {
+
+  override protected def factory:ConnectionFactory = {
+    val rc = new ActiveMQConnectionFactory
+    rc.setBrokerURL(url)
+    rc
+  }
+
+  override protected def destination(i:Int):Destination = destination_type match {
+    case "queue" => new ActiveMQQueue(indexed_destination_name(i))
+    case "topic" => new ActiveMQTopic(indexed_destination_name(i))
+    case _ => error("Unsuported destination type: "+destination_type)
+  }
+
+}

Added: activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/EnqueueRateScenariosTest.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/EnqueueRateScenariosTest.scala?rev=1389882&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/EnqueueRateScenariosTest.scala (added)
+++ activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/EnqueueRateScenariosTest.scala Tue Sep 25 14:32:28 2012
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.leveldb
+
+import junit.framework.TestCase
+import org.apache.activemq.broker._
+import org.apache.activemq.store._
+import java.io.File
+import junit.framework.Assert._
+import org.apache.commons.math.stat.descriptive.DescriptiveStatistics
+import region.policy.{PolicyEntry, PolicyMap}
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class EnqueueRateScenariosTest extends TestCase {
+
+  var broker: BrokerService = null
+
+  override def setUp() {
+    import collection.JavaConversions._
+    broker = new BrokerService
+    broker.setDeleteAllMessagesOnStartup(true)
+    broker.setPersistenceAdapter(createStore)
+    broker.addConnector("tcp://0.0.0.0:0")
+//    val policies = new PolicyMap();
+//    val entry = new PolicyEntry
+//    entry.setQueue(">")
+//    policies.setPolicyEntries(List(entry))
+//    broker.setDestinationPolicy(policies)
+    broker.start
+    broker.waitUntilStarted()
+  }
+
+  override def tearDown() = {
+    if (broker != null) {
+      broker.stop
+      broker.waitUntilStopped
+    }
+  }
+
+  protected def canceledEnqueues() =
+    broker.getPersistenceAdapter.asInstanceOf[LevelDBStore].db.uowCanceledCounter
+
+  protected def enqueueOptimized() =
+    broker.getPersistenceAdapter.asInstanceOf[LevelDBStore].db.uowEnqueueDelayReqested
+
+  protected def enqueueNotOptimized() =
+    broker.getPersistenceAdapter.asInstanceOf[LevelDBStore].db.uowEnqueueNodelayReqested
+
+
+  protected def createStore: PersistenceAdapter = {
+    var store: LevelDBStore = new LevelDBStore
+    store.setDirectory(new File("target/activemq-data/leveldb"))
+    return store
+  }
+
+  def collect_benchmark(scenario:ActiveMQScenario, warmup:Int, samples_count:Int) = {
+    val (cancels, optimized, unoptimized) = scenario.with_load {
+      println("Warming up for %d seconds...".format(warmup))
+      Thread.sleep(warmup*1000)
+      println("Sampling...")
+      scenario.collection_start
+      val cancelStart = canceledEnqueues
+      val enqueueOptimizedStart = enqueueOptimized
+      val enqueueNotOptimizedStart = enqueueNotOptimized
+      for (i <- 0 until samples_count) {
+        Thread.sleep(1000);
+        scenario.collection_sample
+      }
+      (canceledEnqueues-cancelStart, enqueueOptimized-enqueueOptimizedStart, enqueueNotOptimized-enqueueNotOptimizedStart)
+    }
+    println("Done.")
+
+    var samples = scenario.collection_end
+    val error_rates = samples.get("e_custom").get.map(_._2)
+    assertFalse("Errors occured during scenario run: "+error_rates, error_rates.find(_ > 0 ).isDefined )
+
+    val producer_stats = new DescriptiveStatistics();
+    for( producer_rates <- samples.get("p_custom") ) {
+      for( i <- producer_rates ) {
+        producer_stats.addValue(i._2)
+      }
+    }
+
+    val consumer_stats = new DescriptiveStatistics();
+    for( consumer_rates <- samples.get("c_custom") ) {
+      for( i <- consumer_rates ) {
+        consumer_stats.addValue(i._2)
+      }
+    }
+
+    (producer_stats, consumer_stats, cancels*1.0/samples_count, optimized*1.0/samples_count, unoptimized*1.0/samples_count)
+  }
+
+  def benchmark(name:String, warmup:Int=3, samples_count:Int=15, async_send:Boolean=true)(setup:(ActiveMQScenario)=>Unit) = {
+    println("Benchmarking: "+name)
+    var options: String = "?jms.watchTopicAdvisories=false&jms.useAsyncSend="+async_send
+    val url = broker.getTransportConnectors.get(0).getConnectUri + options
+
+    val scenario = new ActiveMQScenario
+    scenario.url = url
+    scenario.display_errors = true
+    scenario.persistent = true
+    scenario.message_size = 1024 * 3
+
+    setup(scenario)
+    val (producer_stats, consumer_stats, cancels, optimized, unoptimized) = collect_benchmark(scenario, warmup, samples_count)
+
+    println("%s: producer avg msg/sec: %,.2f, stddev: %,.2f".format(name, producer_stats.getMean, producer_stats.getStandardDeviation))
+    println("%s: consumer avg msg/sec: %,.2f, stddev: %,.2f".format(name, consumer_stats.getMean, consumer_stats.getStandardDeviation))
+    println("%s: canceled enqueues/sec: %,.2f".format(name,cancels))
+    println("%s: optimized enqueues/sec: %,.2f".format(name,optimized))
+    println("%s: unoptimized enqueues/sec: %,.2f".format(name,unoptimized))
+
+    (producer_stats, consumer_stats, cancels, optimized, unoptimized)
+  }
+
+  def testHighCancelRatio = {
+    val (producer_stats, consumer_stats, cancels, optimized, unoptimized) = benchmark("both_connected_baseline") { scenario=>
+      scenario.producers = 1
+      scenario.consumers = 1
+    }
+    val cancel_ratio = cancels / producer_stats.getMean
+    assertTrue("Expecting more than 80%% of the enqueues get canceled. But only %.2f%% was canceled".format(cancel_ratio*100), cancel_ratio > .80)
+  }
+
+  def testDecoupledProducerRate = {
+
+    // Fill up the queue with messages.. for the benefit of the next benchmark..
+    val from_1_to_0 = benchmark("from_1_to_0", 60) { scenario=>
+      scenario.producers = 1
+      scenario.consumers = 0
+    }
+    val from_1_to_10 = benchmark("from_1_to_10") { scenario=>
+      scenario.producers = 1
+      scenario.consumers = 10
+    }
+    val from_1_to_1 = benchmark("from_1_to_1") { scenario=>
+      scenario.producers = 1
+      scenario.consumers = 1
+    }
+
+    var percent_diff0 = (1.0 - (from_1_to_0._1.getMean / from_1_to_1._1.getMean)).abs * 100
+    var percent_diff1 = (1.0 - (from_1_to_1._1.getMean / from_1_to_10._1.getMean)).abs * 100
+
+    var msg0 = "The 0 vs 1 consumer scenario producer rate was within %.2f%%".format(percent_diff0)
+    var msg1 = "The 1 vs 10 consumer scenario producer rate was within %.2f%%".format(percent_diff1)
+
+    println(msg0)
+    println(msg1)
+
+    assertTrue(msg0, percent_diff0 <= 60)
+    assertTrue(msg1, percent_diff1 <= 20)
+  }
+
+}
\ No newline at end of file

Added: activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/HALevelDBFastEnqueueTest.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/HALevelDBFastEnqueueTest.scala?rev=1389882&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/HALevelDBFastEnqueueTest.scala (added)
+++ activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/HALevelDBFastEnqueueTest.scala Tue Sep 25 14:32:28 2012
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.leveldb
+
+import org.apache.hadoop.fs.FileUtil
+import java.io.File
+import java.util.concurrent.TimeUnit
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class HALevelDBFastEnqueueTest extends LevelDBFastEnqueueTest {
+
+  override def setUp: Unit = {
+    TestingHDFSServer.start
+    super.setUp
+  }
+
+  override def tearDown: Unit = {
+    super.tearDown
+    TestingHDFSServer.stop
+  }
+
+  override protected def createStore: LevelDBStore = {
+    var store: HALevelDBStore = new HALevelDBStore
+    store.setDirectory(dataDirectory)
+    store.setDfsDirectory("target/activemq-data/hdfs-leveldb")
+    return store
+  }
+
+  private def dataDirectory: File = {
+    return new File("target/activemq-data/leveldb")
+  }
+
+  /**
+   * On restart we will also delete the local file system store, so that we test restoring from
+   * HDFS.
+   */
+  override  protected def restartBroker(restartDelay: Int, checkpoint: Int): Unit = {
+    stopBroker
+    FileUtil.fullyDelete(dataDirectory)
+    TimeUnit.MILLISECONDS.sleep(restartDelay)
+    startBroker(false, checkpoint)
+  }
+}
\ No newline at end of file

Copied: activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/HALevelDBStoreTest.scala (from r1389860, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/leveldb/LevelDBPersistenceAdapter.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/HALevelDBStoreTest.scala?p2=activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/HALevelDBStoreTest.scala&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/leveldb/LevelDBPersistenceAdapter.java&r1=1389860&r2=1389882&rev=1389882&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/leveldb/LevelDBPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/HALevelDBStoreTest.scala Tue Sep 25 14:32:28 2012
@@ -14,17 +14,35 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.store.leveldb;
-
-import org.fusesource.mq.leveldb.LevelDBStore;
+package org.apache.activemq.leveldb
 
+import org.apache.activemq.store.PersistenceAdapter
+import java.io.File
 
 /**
- * An implementation of {@link org.apache.activemq.store.PersistenceAdapter} designed for use with
- * LevelDB - Embedded Lightweight Non-Relational Database
- *
- * @org.apache.xbean.XBean element="levelDB"
+ * <p>
+ * </p>
  *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-public class LevelDBPersistenceAdapter extends LevelDBStore {
-}
+class HALevelDBStoreTest extends LevelDBStoreTest {
+  override protected def setUp: Unit = {
+    TestingHDFSServer.start
+    super.setUp
+  }
+
+  override protected def tearDown: Unit = {
+    super.tearDown
+    TestingHDFSServer.stop
+  }
+
+  override protected def createPersistenceAdapter(delete: Boolean): PersistenceAdapter = {
+    var store: HALevelDBStore = new HALevelDBStore
+    store.setDirectory(new File("target/activemq-data/haleveldb"))
+    store.setDfsDirectory("localhost")
+    if (delete) {
+      store.deleteAllMessages
+    }
+    return store
+  }
+}
\ No newline at end of file

Copied: activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/IDERunner.scala (from r1389860, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/leveldb/LevelDBPersistenceAdapter.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/IDERunner.scala?p2=activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/IDERunner.scala&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/leveldb/LevelDBPersistenceAdapter.java&r1=1389860&r2=1389882&rev=1389882&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/leveldb/LevelDBPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/IDERunner.scala Tue Sep 25 14:32:28 2012
@@ -14,17 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.store.leveldb;
 
-import org.fusesource.mq.leveldb.LevelDBStore;
+package org.apache.activemq.leveldb
 
+import org.apache.activemq.console.Main
 
-/**
- * An implementation of {@link org.apache.activemq.store.PersistenceAdapter} designed for use with
- * LevelDB - Embedded Lightweight Non-Relational Database
- *
- * @org.apache.xbean.XBean element="levelDB"
- *
- */
-public class LevelDBPersistenceAdapter extends LevelDBStore {
-}
+object IDERunner {
+  def main(args:Array[String]) ={
+    Main.main(args)
+  }
+}
\ No newline at end of file

Added: activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/JMSClientScenario.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/JMSClientScenario.scala?rev=1389882&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/JMSClientScenario.scala (added)
+++ activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/JMSClientScenario.scala Tue Sep 25 14:32:28 2012
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.leveldb
+
+import java.lang.Thread
+import javax.jms._
+
+/**
+ * <p>
+ * Simulates load on a JMS sever using the JMS messaging API.
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract class JMSClientScenario extends Scenario {
+
+  def createProducer(i:Int) = {
+    new ProducerClient(i)
+  }
+  def createConsumer(i:Int) = {
+    new ConsumerClient(i)
+  }
+
+  protected def destination(i:Int):Destination
+
+  def indexed_destination_name(i:Int) = destination_type match {
+    case "queue" => queue_prefix+destination_name+"-"+(i%destination_count)
+    case "topic" => topic_prefix+destination_name+"-"+(i%destination_count)
+    case _ => error("Unsuported destination type: "+destination_type)
+  }
+
+
+  protected def factory:ConnectionFactory
+
+  def jms_ack_mode = {
+    ack_mode match {
+      case "auto" => Session.AUTO_ACKNOWLEDGE
+      case "client" => Session.CLIENT_ACKNOWLEDGE
+      case "dups_ok" => Session.DUPS_OK_ACKNOWLEDGE
+      case "transacted" => Session.SESSION_TRANSACTED
+      case _ => throw new Exception("Invalid ack mode: "+ack_mode)
+    }
+  }
+
+  trait JMSClient extends Client {
+
+    @volatile
+    var connection:Connection = _
+    var message_counter=0L
+
+    var worker = new Thread() {
+      override def run() {
+        var reconnect_delay = 0
+        while( !done.get ) {
+          try {
+
+            if( reconnect_delay!=0 ) {
+              Thread.sleep(reconnect_delay)
+              reconnect_delay=0
+            }
+            connection = factory.createConnection(user_name, password)
+//            connection.setClientID(name)
+            connection.setExceptionListener(new ExceptionListener {
+              def onException(exception: JMSException) {
+              }
+            })
+            connection.start()
+
+            execute
+
+          } catch {
+            case e:Throwable =>
+              if( !done.get ) {
+                if( display_errors ) {
+                  e.printStackTrace
+                }
+                error_counter.incrementAndGet
+                reconnect_delay = 1000
+              }
+          } finally {
+            dispose
+          }
+        }
+      }
+    }
+
+    def dispose {
+      try {
+        connection.close()
+      } catch {
+        case _ =>
+      }
+    }
+
+    def execute:Unit
+
+    def start = {
+      worker.start
+    }
+
+    def shutdown = {
+      assert(done.get)
+      if ( worker!=null ) {
+        dispose
+        worker.join(1000)
+        while(worker.isAlive ) {
+          println("Worker did not shutdown quickly.. interrupting thread.")
+          worker.interrupt()
+          worker.join(1000)
+        }
+        worker = null
+      }
+    }
+
+    def name:String
+  }
+
+  class ConsumerClient(val id: Int) extends JMSClient {
+    val name: String = "consumer " + id
+
+    def execute {
+      var session = connection.createSession(false, jms_ack_mode)
+      var consumer:MessageConsumer = if( durable ) {
+        session.createDurableSubscriber(destination(id).asInstanceOf[Topic], name, selector, no_local)
+      } else {
+        session.createConsumer(destination(id), selector, no_local)
+      }
+
+      while( !done.get() ) {
+        val msg = consumer.receive(500)
+        if( msg!=null ) {
+          consumer_counter.incrementAndGet()
+          if (consumer_sleep != 0) {
+            Thread.sleep(consumer_sleep)
+          }
+          if(session.getAcknowledgeMode == Session.CLIENT_ACKNOWLEDGE) {
+            msg.acknowledge();
+          }
+        }
+      }
+    }
+
+  }
+
+  class ProducerClient(val id: Int) extends JMSClient {
+
+    val name: String = "producer " + id
+
+    def execute {
+      val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+      val producer:MessageProducer = session.createProducer(destination(id))
+      producer.setDeliveryMode(if( persistent ) {
+        DeliveryMode.PERSISTENT
+      } else {
+        DeliveryMode.NON_PERSISTENT
+      })
+
+      val msg = session.createTextMessage(body(name))
+      headers_for(id).foreach { case (key, value) =>
+        msg.setStringProperty(key, value)
+      }
+
+      while( !done.get() ) {
+        producer.send(msg)
+        producer_counter.incrementAndGet()
+        if (producer_sleep != 0) {
+          Thread.sleep(producer_sleep)
+        }
+      }
+
+    }
+  }
+
+  def body(name:String) = {
+    val buffer = new StringBuffer(message_size)
+    buffer.append("Message from " + name+"\n")
+    for( i <- buffer.length to message_size ) {
+      buffer.append(('a'+(i%26)).toChar)
+    }
+    var rc = buffer.toString
+    if( rc.length > message_size ) {
+      rc.substring(0, message_size)
+    } else {
+      rc
+    }
+  }
+
+
+
+}

Added: activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/LevelDBFastEnqueueTest.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/LevelDBFastEnqueueTest.scala?rev=1389882&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/LevelDBFastEnqueueTest.scala (added)
+++ activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/LevelDBFastEnqueueTest.scala Tue Sep 25 14:32:28 2012
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.leveldb
+
+import org.apache.activemq.ActiveMQConnection
+import org.apache.activemq.ActiveMQConnectionFactory
+import org.apache.activemq.broker.BrokerService
+import org.apache.activemq.command.ActiveMQQueue
+import org.apache.activemq.command.ConnectionControl
+import org.junit.After
+import org.junit.Before
+import org.junit.Test
+import javax.jms._
+import java.io.File
+import java.util.Vector
+import java.util.concurrent.ExecutorService
+import java.util.concurrent.Executors
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicLong
+import junit.framework.Assert._
+import org.apache.activemq.leveldb.util.Log
+import junit.framework.TestCase
+
+object LevelDBFastEnqueueTest extends Log
+class LevelDBFastEnqueueTest extends TestCase {
+
+  import LevelDBFastEnqueueTest._
+
+  @Test def testPublishNoConsumer: Unit = {
+    startBroker(true, 10)
+    val sharedCount: AtomicLong = new AtomicLong(toSend)
+    var start: Long = System.currentTimeMillis
+    var executorService: ExecutorService = Executors.newCachedThreadPool
+    var i: Int = 0
+    while (i < parallelProducer) {
+      executorService.execute(new Runnable {
+        def run: Unit = {
+          try {
+            publishMessages(sharedCount, 0)
+          }
+          catch {
+            case e: Exception => {
+              exceptions.add(e)
+            }
+          }
+        }
+      })
+      i += 1
+    }
+    executorService.shutdown
+    executorService.awaitTermination(30, TimeUnit.MINUTES)
+    assertTrue("Producers done in time", executorService.isTerminated)
+    assertTrue("No exceptions: " + exceptions, exceptions.isEmpty)
+    var totalSent: Long = toSend * payloadString.length
+    var duration: Double = System.currentTimeMillis - start
+    info("Duration:                " + duration + "ms")
+    info("Rate:                       " + (toSend * 1000 / duration) + "m/s")
+    info("Total send:             " + totalSent)
+    info("Total journal write: " + store.getLogAppendPosition)
+    info("Journal writes %:    " + store.getLogAppendPosition / totalSent.asInstanceOf[Double] * 100 + "%")
+    stopBroker
+    restartBroker(0, 1200000)
+    consumeMessages(toSend)
+  }
+
+  @Test def testPublishNoConsumerNoCheckpoint: Unit = {
+    toSend = 100
+    startBroker(true, 0)
+    val sharedCount: AtomicLong = new AtomicLong(toSend)
+    var start: Long = System.currentTimeMillis
+    var executorService: ExecutorService = Executors.newCachedThreadPool
+    var i: Int = 0
+    while (i < parallelProducer) {
+      executorService.execute(new Runnable {
+        def run: Unit = {
+          try {
+            publishMessages(sharedCount, 0)
+          }
+          catch {
+            case e: Exception => {
+              exceptions.add(e)
+            }
+          }
+        }
+      })
+      i += 1;
+    }
+    executorService.shutdown
+    executorService.awaitTermination(30, TimeUnit.MINUTES)
+    assertTrue("Producers done in time", executorService.isTerminated)
+    assertTrue("No exceptions: " + exceptions, exceptions.isEmpty)
+    var totalSent: Long = toSend * payloadString.length
+    broker.getAdminView.gc
+    var duration: Double = System.currentTimeMillis - start
+    info("Duration:                " + duration + "ms")
+    info("Rate:                       " + (toSend * 1000 / duration) + "m/s")
+    info("Total send:             " + totalSent)
+    info("Total journal write: " + store.getLogAppendPosition)
+    info("Journal writes %:    " + store.getLogAppendPosition / totalSent.asInstanceOf[Double] * 100 + "%")
+    stopBroker
+    restartBroker(0, 0)
+    consumeMessages(toSend)
+  }
+
+  private def consumeMessages(count: Long): Unit = {
+    var connection: ActiveMQConnection = connectionFactory.createConnection.asInstanceOf[ActiveMQConnection]
+    connection.setWatchTopicAdvisories(false)
+    connection.start
+    var session: Session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    var consumer: MessageConsumer = session.createConsumer(destination)
+    var i: Int = 0
+    while (i < count) {
+        assertNotNull("got message " + i, consumer.receive(10000))
+        i += 1;
+    }
+    assertNull("none left over", consumer.receive(2000))
+  }
+
+  protected def restartBroker(restartDelay: Int, checkpoint: Int): Unit = {
+    stopBroker
+    TimeUnit.MILLISECONDS.sleep(restartDelay)
+    startBroker(false, checkpoint)
+  }
+
+  override def tearDown() = stopBroker
+
+  def stopBroker: Unit = {
+    if (broker != null) {
+      broker.stop
+      broker.waitUntilStopped
+    }
+  }
+
+  private def publishMessages(count: AtomicLong, expiry: Int): Unit = {
+    var connection: ActiveMQConnection = connectionFactory.createConnection.asInstanceOf[ActiveMQConnection]
+    connection.setWatchTopicAdvisories(false)
+    connection.start
+    var session: Session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    var producer: MessageProducer = session.createProducer(destination)
+    var start: Long = System.currentTimeMillis
+    var i: Long = 0l
+    var bytes: Array[Byte] = payloadString.getBytes
+    while ((({
+      i = count.getAndDecrement; i
+    })) > 0) {
+      var message: Message = null
+      if (useBytesMessage) {
+        message = session.createBytesMessage
+        (message.asInstanceOf[BytesMessage]).writeBytes(bytes)
+      }
+      else {
+        message = session.createTextMessage(payloadString)
+      }
+      producer.send(message, DeliveryMode.PERSISTENT, 5, expiry)
+      if (i != toSend && i % sampleRate == 0) {
+        var now: Long = System.currentTimeMillis
+        info("Remainder: " + i + ", rate: " + sampleRate * 1000 / (now - start) + "m/s")
+        start = now
+      }
+    }
+    connection.syncSendPacket(new ConnectionControl)
+    connection.close
+  }
+
+  def startBroker(deleteAllMessages: Boolean, checkPointPeriod: Int): Unit = {
+    broker = new BrokerService
+    broker.setDeleteAllMessagesOnStartup(deleteAllMessages)
+    store = createStore
+    broker.setPersistenceAdapter(store)
+    broker.addConnector("tcp://0.0.0.0:0")
+    broker.start
+    var options: String = "?jms.watchTopicAdvisories=false&jms.useAsyncSend=true&jms.alwaysSessionAsync=false&jms.dispatchAsync=false&socketBufferSize=131072&ioBufferSize=16384&wireFormat.tightEncodingEnabled=false&wireFormat.cacheSize=8192"
+    connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectors.get(0).getConnectUri + options)
+  }
+
+  protected def createStore: LevelDBStore = {
+    var store: LevelDBStore = new LevelDBStore
+    store.setDirectory(new File("target/activemq-data/leveldb"))
+    return store
+  }
+
+  private[leveldb] var broker: BrokerService = null
+  private[leveldb] var connectionFactory: ActiveMQConnectionFactory = null
+  private[leveldb] var store: LevelDBStore = null
+  private[leveldb] var destination: Destination = new ActiveMQQueue("Test")
+  private[leveldb] var payloadString: String = new String(new Array[Byte](6 * 1024))
+  private[leveldb] var useBytesMessage: Boolean = true
+  private[leveldb] final val parallelProducer: Int = 20
+  private[leveldb] var exceptions: Vector[Exception] = new Vector[Exception]
+  private[leveldb] var toSend: Long = 100000
+  private[leveldb] final val sampleRate: Double = 100000
+}
\ No newline at end of file

Added: activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/LevelDBStoreBrokerTest.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/LevelDBStoreBrokerTest.scala?rev=1389882&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/LevelDBStoreBrokerTest.scala (added)
+++ activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/LevelDBStoreBrokerTest.scala Tue Sep 25 14:32:28 2012
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.leveldb
+
+import org.apache.activemq.broker.BrokerService
+import org.apache.activemq.broker.BrokerTest
+import org.apache.activemq.store.PersistenceAdapter
+import java.io.File
+import junit.framework.{TestSuite, Test}
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object LevelDBStoreBrokerTest {
+  def suite: Test = {
+    return new TestSuite(classOf[LevelDBStoreBrokerTest])
+  }
+
+  def main(args: Array[String]): Unit = {
+    junit.textui.TestRunner.run(suite)
+  }
+}
+
+class LevelDBStoreBrokerTest extends BrokerTest {
+
+  protected def createPersistenceAdapter(delete: Boolean): PersistenceAdapter = {
+    var store: LevelDBStore = new LevelDBStore
+    store.setDirectory(new File("target/activemq-data/leveldb"))
+    if (delete) {
+      store.deleteAllMessages
+    }
+    return store
+  }
+
+  protected override def createBroker: BrokerService = {
+    var broker: BrokerService = new BrokerService
+    broker.setPersistenceAdapter(createPersistenceAdapter(true))
+    return broker
+  }
+
+  protected def createRestartedBroker: BrokerService = {
+    var broker: BrokerService = new BrokerService
+    broker.setPersistenceAdapter(createPersistenceAdapter(false))
+    return broker
+  }
+}
\ No newline at end of file

Copied: activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/LevelDBStoreTest.scala (from r1389860, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/leveldb/LevelDBPersistenceAdapter.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/LevelDBStoreTest.scala?p2=activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/LevelDBStoreTest.scala&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/leveldb/LevelDBPersistenceAdapter.java&r1=1389860&r2=1389882&rev=1389882&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/leveldb/LevelDBPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/LevelDBStoreTest.scala Tue Sep 25 14:32:28 2012
@@ -14,17 +14,28 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.store.leveldb;
-
-import org.fusesource.mq.leveldb.LevelDBStore;
+package org.apache.activemq.leveldb
 
+import org.apache.activemq.store.PersistenceAdapter
+import org.apache.activemq.store.PersistenceAdapterTestSupport
+import java.io.File
 
 /**
- * An implementation of {@link org.apache.activemq.store.PersistenceAdapter} designed for use with
- * LevelDB - Embedded Lightweight Non-Relational Database
- *
- * @org.apache.xbean.XBean element="levelDB"
+ * <p>
+ * </p>
  *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-public class LevelDBPersistenceAdapter extends LevelDBStore {
-}
+class LevelDBStoreTest extends PersistenceAdapterTestSupport {
+  override def testStoreCanHandleDupMessages: Unit = {
+  }
+
+  protected def createPersistenceAdapter(delete: Boolean): PersistenceAdapter = {
+    var store: LevelDBStore = new LevelDBStore
+    store.setDirectory(new File("target/activemq-data/haleveldb"))
+    if (delete) {
+      store.deleteAllMessages
+    }
+    return store
+  }
+}
\ No newline at end of file

Added: activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/Scenario.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/Scenario.scala?rev=1389882&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/Scenario.scala (added)
+++ activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/Scenario.scala Tue Sep 25 14:32:28 2012
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.leveldb
+
+import java.util.concurrent.atomic._
+import java.util.concurrent.TimeUnit._
+import scala.collection.mutable.ListBuffer
+
+object Scenario {
+  val MESSAGE_ID:Array[Byte] = "message-id"
+  val NEWLINE = '\n'.toByte
+  val NANOS_PER_SECOND = NANOSECONDS.convert(1, SECONDS)
+  
+  implicit def toBytes(value: String):Array[Byte] = value.getBytes("UTF-8")
+
+  def o[T](value:T):Option[T] = value match {
+    case null => None
+    case x => Some(x)
+  }
+}
+
+trait Scenario {
+  import Scenario._
+
+  var url:String = "tcp://localhost:61616"
+  var user_name:String = _
+  var password:String = _
+
+  private var _producer_sleep: { def apply(): Int; def init(time: Long) } = new { def apply() = 0; def init(time: Long) {}  }
+  def producer_sleep = _producer_sleep()
+  def producer_sleep_= (new_value: Int) = _producer_sleep = new { def apply() = new_value; def init(time: Long) {}  }
+  def producer_sleep_= (new_func: { def apply(): Int; def init(time: Long) }) = _producer_sleep = new_func
+
+  private var _consumer_sleep: { def apply(): Int; def init(time: Long) } = new { def apply() = 0; def init(time: Long) {}  }
+  def consumer_sleep = _consumer_sleep()
+  def consumer_sleep_= (new_value: Int) = _consumer_sleep = new { def apply() = new_value; def init(time: Long) {}  }
+  def consumer_sleep_= (new_func: { def apply(): Int; def init(time: Long) }) = _consumer_sleep = new_func
+
+  var producers = 1
+  var producers_per_sample = 0
+
+  var consumers = 1
+  var consumers_per_sample = 0
+  var sample_interval = 1000
+
+  var message_size = 1024
+  var persistent = false
+
+  var headers = Array[Array[(String,String)]]()
+  var selector:String = null
+  var no_local = false
+  var durable = false
+  var ack_mode = "auto"
+  var messages_per_connection = -1L
+  var display_errors = false
+
+  var destination_type = "queue"
+  private var _destination_name: () => String = () => "load"
+  def destination_name = _destination_name()
+  def destination_name_=(new_name: String) = _destination_name = () => new_name
+  def destination_name_=(new_func: () => String) = _destination_name = new_func
+  var destination_count = 1
+
+  val producer_counter = new AtomicLong()
+  val consumer_counter = new AtomicLong()
+  val error_counter = new AtomicLong()
+  val done = new AtomicBoolean()
+
+  var queue_prefix = ""
+  var topic_prefix = ""
+  var name = "custom"
+
+  var drain_timeout = 2000L
+
+  def run() = {
+    print(toString)
+    println("--------------------------------------")
+    println("     Running: Press ENTER to stop")
+    println("--------------------------------------")
+    println("")
+
+    with_load {
+
+      // start a sampling client...
+      val sample_thread = new Thread() {
+        override def run() = {
+          
+          def print_rate(name: String, periodCount:Long, totalCount:Long, nanos: Long) = {
+
+            val rate_per_second: java.lang.Float = ((1.0f * periodCount / nanos) * NANOS_PER_SECOND)
+            println("%s total: %,d, rate: %,.3f per second".format(name, totalCount, rate_per_second))
+          }
+
+          try {
+            var start = System.nanoTime
+            var total_producer_count = 0L
+            var total_consumer_count = 0L
+            var total_error_count = 0L
+            collection_start
+            while( !done.get ) {
+              Thread.sleep(sample_interval)
+              val end = System.nanoTime
+              collection_sample
+              val samples = collection_end
+              samples.get("p_custom").foreach { case (_, count)::Nil =>
+                total_producer_count += count
+                print_rate("Producer", count, total_producer_count, end - start)
+              case _ =>
+              }
+              samples.get("c_custom").foreach { case (_, count)::Nil =>
+                total_consumer_count += count
+                print_rate("Consumer", count, total_consumer_count, end - start)
+              case _ =>
+              }
+              samples.get("e_custom").foreach { case (_, count)::Nil =>
+                if( count!= 0 ) {
+                  total_error_count += count
+                  print_rate("Error", count, total_error_count, end - start)
+                }
+              case _ =>
+              }
+              start = end
+            }
+          } catch {
+            case e:InterruptedException =>
+          }
+        }
+      }
+      sample_thread.start()
+
+      System.in.read()
+      done.set(true)
+
+      sample_thread.interrupt
+      sample_thread.join
+    }
+
+  }
+
+  override def toString() = {
+    "--------------------------------------\n"+
+    "Scenario Settings\n"+
+    "--------------------------------------\n"+
+    "  destination_type      = "+destination_type+"\n"+
+    "  queue_prefix          = "+queue_prefix+"\n"+
+    "  topic_prefix          = "+topic_prefix+"\n"+
+    "  destination_count     = "+destination_count+"\n" +
+    "  destination_name      = "+destination_name+"\n" +
+    "  sample_interval (ms)  = "+sample_interval+"\n" +
+    "  \n"+
+    "  --- Producer Properties ---\n"+
+    "  producers             = "+producers+"\n"+
+    "  message_size          = "+message_size+"\n"+
+    "  persistent            = "+persistent+"\n"+
+    "  producer_sleep (ms)   = "+producer_sleep+"\n"+
+    "  headers               = "+headers.mkString(", ")+"\n"+
+    "  \n"+
+    "  --- Consumer Properties ---\n"+
+    "  consumers             = "+consumers+"\n"+
+    "  consumer_sleep (ms)   = "+consumer_sleep+"\n"+
+    "  selector              = "+selector+"\n"+
+    "  durable               = "+durable+"\n"+
+    ""
+
+  }
+
+  protected def headers_for(i:Int) = {
+    if ( headers.isEmpty ) {
+      Array[(String, String)]()
+    } else {
+      headers(i%headers.size)
+    }
+  }
+
+  var producer_samples:Option[ListBuffer[(Long,Long)]] = None
+  var consumer_samples:Option[ListBuffer[(Long,Long)]] = None
+  var error_samples = ListBuffer[(Long,Long)]()
+
+  def collection_start: Unit = {
+    producer_counter.set(0)
+    consumer_counter.set(0)
+    error_counter.set(0)
+
+    producer_samples = if (producers > 0 || producers_per_sample>0 ) {
+      Some(ListBuffer[(Long,Long)]())
+    } else {
+      None
+    }
+    consumer_samples = if (consumers > 0 || consumers_per_sample>0 ) {
+      Some(ListBuffer[(Long,Long)]())
+    } else {
+      None
+    }
+  }
+
+  def collection_end: Map[String, scala.List[(Long,Long)]] = {
+    var rc = Map[String, List[(Long,Long)]]()
+    producer_samples.foreach{ samples =>
+      rc += "p_"+name -> samples.toList
+      samples.clear
+    }
+    consumer_samples.foreach{ samples =>
+      rc += "c_"+name -> samples.toList
+      samples.clear
+    }
+    rc += "e_"+name -> error_samples.toList
+    error_samples.clear
+    rc
+  }
+
+  trait Client {
+    def start():Unit
+    def shutdown():Unit
+  }
+
+  var producer_clients = List[Client]()
+  var consumer_clients = List[Client]()
+
+  def with_load[T](func: =>T ):T = {
+    done.set(false)
+
+    _producer_sleep.init(System.currentTimeMillis())
+    _consumer_sleep.init(System.currentTimeMillis())
+
+    for (i <- 0 until producers) {
+      val client = createProducer(i)
+      producer_clients ::= client
+      client.start()
+    }
+
+    for (i <- 0 until consumers) {
+      val client = createConsumer(i)
+      consumer_clients ::= client
+      client.start()
+    }
+
+    try {
+      func
+    } finally {
+      done.set(true)
+      // wait for the threads to finish..
+      for( client <- consumer_clients ) {
+        client.shutdown
+      }
+      consumer_clients = List()
+      for( client <- producer_clients ) {
+        client.shutdown
+      }
+      producer_clients = List()
+    }
+  }
+
+  def drain = {
+    done.set(false)
+    if( destination_type=="queue" || destination_type=="raw_queue" || durable==true ) {
+      print("draining")
+      consumer_counter.set(0)
+      var consumer_clients = List[Client]()
+      for (i <- 0 until destination_count) {
+        val client = createConsumer(i)
+        consumer_clients ::= client
+        client.start()
+      }
+
+      // Keep sleeping until we stop draining messages.
+      var drained = 0L
+      try {
+        Thread.sleep(drain_timeout);
+        def done() = {
+          val c = consumer_counter.getAndSet(0)
+          drained += c
+          c == 0
+        }
+        while( !done ) {
+          print(".")
+          Thread.sleep(drain_timeout);
+        }
+      } finally {
+        done.set(true)
+        for( client <- consumer_clients ) {
+          client.shutdown
+        }
+        println(". (drained %d)".format(drained))
+      }
+    }
+  }
+
+
+  def collection_sample: Unit = {
+
+    val now = System.currentTimeMillis()
+    producer_samples.foreach(_.append((now, producer_counter.getAndSet(0))))
+    consumer_samples.foreach(_.append((now, consumer_counter.getAndSet(0))))
+    error_samples.append((now, error_counter.getAndSet(0)))
+
+    // we might need to increment number the producers..
+    for (i <- 0 until producers_per_sample) {
+      val client = createProducer(producer_clients.length)
+      producer_clients ::= client
+      client.start()
+    }
+
+    // we might need to increment number the consumers..
+    for (i <- 0 until consumers_per_sample) {
+      val client = createConsumer(consumer_clients.length)
+      consumer_clients ::= client
+      client.start()
+    }
+
+  }
+  
+  def createProducer(i:Int):Client
+  def createConsumer(i:Int):Client
+
+}
+
+

Copied: activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/TestingHDFSServer.scala (from r1389860, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/leveldb/LevelDBPersistenceAdapter.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/TestingHDFSServer.scala?p2=activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/TestingHDFSServer.scala&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/leveldb/LevelDBPersistenceAdapter.java&r1=1389860&r2=1389882&rev=1389882&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/leveldb/LevelDBPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/TestingHDFSServer.scala Tue Sep 25 14:32:28 2012
@@ -14,17 +14,38 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.store.leveldb;
-
-import org.fusesource.mq.leveldb.LevelDBStore;
+package org.apache.activemq.leveldb
 
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.hdfs.MiniDFSCluster
+import java.io.IOException
 
 /**
- * An implementation of {@link org.apache.activemq.store.PersistenceAdapter} designed for use with
- * LevelDB - Embedded Lightweight Non-Relational Database
- *
- * @org.apache.xbean.XBean element="levelDB"
+ * <p>
+ * </p>
  *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-public class LevelDBPersistenceAdapter extends LevelDBStore {
-}
+object TestingHDFSServer {
+  private[leveldb] def start: Unit = {
+    var conf: Configuration = new Configuration
+    cluster = new MiniDFSCluster(conf, 1, true, null)
+    cluster.waitActive
+    fs = cluster.getFileSystem
+  }
+
+  private[leveldb] def stop: Unit = {
+    try {
+      cluster.shutdown
+    }
+    catch {
+      case e: Throwable => {
+        e.printStackTrace
+      }
+    }
+  }
+
+  private[leveldb] var cluster: MiniDFSCluster = null
+  private[leveldb] var fs: FileSystem = null
+}
\ No newline at end of file

Modified: activemq/trunk/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/pom.xml?rev=1389882&r1=1389881&r2=1389882&view=diff
==============================================================================
--- activemq/trunk/pom.xml (original)
+++ activemq/trunk/pom.xml Tue Sep 25 14:32:28 2012
@@ -64,11 +64,13 @@
     <fusemq-leveldb-version>1.3</fusemq-leveldb-version>
     <ftpserver-version>1.0.6</ftpserver-version>
     <geronimo-version>1.0</geronimo-version>
+    <hadoop-version>1.0.0</hadoop-version>
     <hawtbuf-version>1.9</hawtbuf-version>
     <hawtdispatch-version>1.11</hawtdispatch-version>
     <howl-version>0.1.8</howl-version>
     <hsqldb-version>1.8.0.10</hsqldb-version>
     <httpclient-version>4.2.1</httpclient-version>
+    <jackson-version>1.9.2</jackson-version>
     <jasypt-version>1.9.0</jasypt-version>
     <jdom-version>1.0</jdom-version>
     <jetty-version>7.6.7.v20120910</jetty-version>
@@ -93,6 +95,9 @@
     <rome-version>1.0</rome-version>
     <saxon-version>9.4</saxon-version>
     <saxon-bundle-version>9.4.0.1_2</saxon-bundle-version>
+    <scala-plugin-version>2.15.1</scala-plugin-version>
+    <scala-version>2.9.1</scala-version>
+    <scalatest-version>1.8</scalatest-version>
     <slf4j-version>1.6.6</slf4j-version>
     <spring-version>3.0.7.RELEASE</spring-version>
     <spring-osgi-version>1.2.1</spring-osgi-version>
@@ -198,6 +203,7 @@
     <module>activemq-jaas</module>
     <module>activemq-blueprint</module>
     <module>activemq-karaf</module>
+    <module>activemq-leveldb</module>
     <module>activemq-openwire-generator</module>
     <module>activemq-optional</module>
     <module>activemq-pool</module>