You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/09/25 16:32:31 UTC
svn commit: r1389882 [7/7] - in /activemq/trunk: ./ activemq-core/
activemq-core/src/main/java/org/apache/activemq/store/leveldb/
activemq-core/src/main/resources/ activemq-core/src/main/resources/META-INF/
activemq-core/src/main/resources/META-INF/ser...
Added: activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/util/FileSupport.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/util/FileSupport.scala?rev=1389882&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/util/FileSupport.scala (added)
+++ activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/util/FileSupport.scala Tue Sep 25 14:32:28 2012
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.leveldb.util
+
+import java.io._
+import org.fusesource.hawtdispatch._
+import org.apache.activemq.leveldb.LevelDBClient
+import org.fusesource.leveldbjni.internal.Util
+import org.apache.activemq.leveldb.util.ProcessSupport._
+
+object FileSupport {
+
+ implicit def toRichFile(file:File):RichFile = new RichFile(file)
+
+ val onWindows = System.getProperty("os.name").toLowerCase().startsWith("windows")
+ private var linkStrategy = 0
+ private val LOG = Log(getClass)
+
+ def link(source:File, target:File):Unit = {
+ linkStrategy match {
+ case 0 =>
+ // We first try to link via a native system call. Fails if
+ // we cannot load the JNI module.
+ try {
+ Util.link(source, target)
+ } catch {
+ case e:IOException => throw e
+ case e:Throwable =>
+ // Fallback.. to a slower impl..
+ LOG.debug("Native link system call not available")
+ linkStrategy = 5
+ link(source, target)
+ }
+
+ // TODO: consider implementing a case which does the native system call using JNA
+
+ case 5 =>
+ // Next we try to do the link by executing an
+ // operating system shell command
+ try {
+ if( onWindows ) {
+ system("fsutil", "hardlink", "create", target.getCanonicalPath, source.getCanonicalPath) match {
+ case(0, _, _) => // Success
+ case (_, out, err) =>
+ // TODO: we might want to look at the out/err to see why it failed
+ // to avoid falling back to the slower strategy.
+ LOG.debug("fsutil OS command not available either")
+ linkStrategy = 10
+ link(source, target)
+ }
+ } else {
+ system("ln", source.getCanonicalPath, target.getCanonicalPath) match {
+ case(0, _, _) => // Success
+ case (_, out, err) => None
+ // TODO: we might want to look at the out/err to see why it failed
+ // to avoid falling back to the slower strategy.
+ LOG.debug("ln OS command not available either")
+ linkStrategy = 2
+ link(source, target)
+ }
+ }
+ } catch {
+ case e:Throwable =>
+ }
+ case _ =>
+ // this final strategy is slow but sure to work.
+ source.copyTo(target)
+ }
+ }
+
+ def systemDir(name:String) = {
+ val baseValue = System.getProperty(name)
+ if( baseValue==null ) {
+ sys.error("The the %s system property is not set.".format(name))
+ }
+ val file = new File(baseValue)
+ if( !file.isDirectory ) {
+ sys.error("The the %s system property is not set to valid directory path %s".format(name, baseValue))
+ }
+ file
+ }
+
+ case class RichFile(self:File) {
+
+ def / (path:String) = new File(self, path)
+
+ def linkTo(target:File) = link(self, target)
+
+ def copyTo(target:File) = {
+ using(new FileOutputStream(target)){ os=>
+ using(new FileInputStream(self)){ is=>
+ FileSupport.copy(is, os)
+ }
+ }
+ }
+
+ def listFiles:Array[File] = {
+ Option(self.listFiles()).getOrElse(Array())
+ }
+
+ def recursiveList:List[File] = {
+ if( self.isDirectory ) {
+ self :: self.listFiles.toList.flatten( _.recursiveList )
+ } else {
+ self :: Nil
+ }
+ }
+
+ def recursiveDelete: Unit = {
+ if( self.exists ) {
+ if( self.isDirectory ) {
+ self.listFiles.foreach(_.recursiveDelete)
+ }
+ self.delete
+ }
+ }
+
+ def recursiveCopyTo(target: File) : Unit = {
+ if (self.isDirectory) {
+ target.mkdirs
+ self.listFiles.foreach( file=> file.recursiveCopyTo( target / file.getName) )
+ } else {
+ self.copyTo(target)
+ }
+ }
+
+ def readText(charset:String="UTF-8"): String = {
+ using(new FileInputStream(self)) { in =>
+ FileSupport.readText(in, charset)
+ }
+ }
+
+ def readBytes: Array[Byte] = {
+ using(new FileInputStream(self)) { in =>
+ FileSupport.readBytes(in)
+ }
+ }
+
+ def writeBytes(data:Array[Byte]):Unit = {
+ using(new FileOutputStream(self)) { out =>
+ FileSupport.writeBytes(out, data)
+ }
+ }
+
+ def writeText(data:String, charset:String="UTF-8"):Unit = {
+ using(new FileOutputStream(self)) { out =>
+ FileSupport.writeText(out, data, charset)
+ }
+ }
+
+ }
+
+ /**
+ * Returns the number of bytes copied.
+ */
+ def copy(in: InputStream, out: OutputStream): Long = {
+ var bytesCopied: Long = 0
+ val buffer = new Array[Byte](8192)
+ var bytes = in.read(buffer)
+ while (bytes >= 0) {
+ out.write(buffer, 0, bytes)
+ bytesCopied += bytes
+ bytes = in.read(buffer)
+ }
+ bytesCopied
+ }
+
+ def using[R,C <: Closeable](closable: C)(proc: C=>R) = {
+ try {
+ proc(closable)
+ } finally {
+ try { closable.close } catch { case ignore => }
+ }
+ }
+
+ def readText(in: InputStream, charset:String="UTF-8"): String = {
+ new String(readBytes(in), charset)
+ }
+
+ def readBytes(in: InputStream): Array[Byte] = {
+ val out = new ByteArrayOutputStream()
+ copy(in, out)
+ out.toByteArray
+ }
+
+ def writeText(out: OutputStream, value: String, charset:String="UTF-8"): Unit = {
+ writeBytes(out, value.getBytes(charset))
+ }
+
+ def writeBytes(out: OutputStream, data: Array[Byte]): Unit = {
+ copy(new ByteArrayInputStream(data), out)
+ }
+
+}
+
+object ProcessSupport {
+ import FileSupport._
+
+ implicit def toRichProcessBuilder(self:ProcessBuilder):RichProcessBuilder = new RichProcessBuilder(self)
+
+ case class RichProcessBuilder(self:ProcessBuilder) {
+
+ def start(out:OutputStream=null, err:OutputStream=null, in:InputStream=null) = {
+ self.redirectErrorStream(out == err)
+ val process = self.start
+ if( in!=null ) {
+ LevelDBClient.THREAD_POOL {
+ try {
+ using(process.getOutputStream) { out =>
+ FileSupport.copy(in, out)
+ }
+ } catch {
+ case _ =>
+ }
+ }
+ } else {
+ process.getOutputStream.close
+ }
+
+ if( out!=null ) {
+ LevelDBClient.THREAD_POOL {
+ try {
+ using(process.getInputStream) { in =>
+ FileSupport.copy(in, out)
+ }
+ } catch {
+ case _ =>
+ }
+ }
+ } else {
+ process.getInputStream.close
+ }
+
+ if( err!=null && err!=out ) {
+ LevelDBClient.THREAD_POOL {
+ try {
+ using(process.getErrorStream) { in =>
+ FileSupport.copy(in, err)
+ }
+ } catch {
+ case _ =>
+ }
+ }
+ } else {
+ process.getErrorStream.close
+ }
+ process
+ }
+
+ }
+
+ implicit def toRichProcess(self:Process):RichProcess = new RichProcess(self)
+
+ case class RichProcess(self:Process) {
+ def onExit(func: (Int)=>Unit) = LevelDBClient.THREAD_POOL {
+ self.waitFor
+ func(self.exitValue)
+ }
+ }
+
+ implicit def toProcessBuilder(args:Seq[String]):ProcessBuilder = new ProcessBuilder().command(args : _*)
+
+ def launch(command:String*)(func: (Int, Array[Byte], Array[Byte])=>Unit ):Unit = launch(command)(func)
+ def launch(p:ProcessBuilder, in:InputStream=null)(func: (Int, Array[Byte], Array[Byte]) => Unit):Unit = {
+ val out = new ByteArrayOutputStream
+ val err = new ByteArrayOutputStream
+ p.start(out, err, in).onExit { code=>
+ func(code, out.toByteArray, err.toByteArray)
+ }
+ }
+
+ def system(command:String*):(Int, Array[Byte], Array[Byte]) = system(command)
+ def system(p:ProcessBuilder, in:InputStream=null):(Int, Array[Byte], Array[Byte]) = {
+ val out = new ByteArrayOutputStream
+ val err = new ByteArrayOutputStream
+ val process = p.start(out, err, in)
+ process.waitFor
+ (process.exitValue, out.toByteArray, err.toByteArray)
+ }
+
+}
\ No newline at end of file
Added: activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/util/Log.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/util/Log.scala?rev=1389882&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/util/Log.scala (added)
+++ activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/util/Log.scala Tue Sep 25 14:32:28 2012
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.leveldb.util
+
+import java.util.concurrent.atomic.AtomicLong
+import org.slf4j.{MDC, Logger, LoggerFactory}
+import java.lang.{Throwable, String}
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object Log {
+
+ def apply(clazz:Class[_]):Log = apply(clazz.getName.stripSuffix("$"))
+
+ def apply(name:String):Log = new Log {
+ override val log = LoggerFactory.getLogger(name)
+ }
+
+ def apply(value:Logger):Log = new Log {
+ override val log = value
+ }
+
+ val exception_id_generator = new AtomicLong(System.currentTimeMillis)
+ def next_exception_id = exception_id_generator.incrementAndGet.toHexString
+}
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+trait Log {
+ import Log._
+ val log = LoggerFactory.getLogger(getClass.getName.stripSuffix("$"))
+
+ private def with_throwable(e:Throwable)(func: =>Unit) = {
+ if( e!=null ) {
+ val stack_ref = if( log.isDebugEnabled ) {
+ val id = next_exception_id
+ MDC.put("stackref", id.toString);
+ Some(id)
+ } else {
+ None
+ }
+ func
+ stack_ref.foreach { id=>
+ log.debug(e.toString, e)
+ MDC.remove("stackref")
+ }
+ } else {
+ func
+ }
+ }
+
+ private def format(message:String, args:Seq[Any]) = {
+ if( args.isEmpty ) {
+ message
+ } else {
+ message.format(args.map(_.asInstanceOf[AnyRef]) : _*)
+ }
+ }
+
+ def error(m: => String, args:Any*): Unit = {
+ if( log.isErrorEnabled ) {
+ log.error(format(m, args.toSeq))
+ }
+ }
+
+ def error(e: Throwable, m: => String, args:Any*): Unit = {
+ with_throwable(e) {
+ if( log.isErrorEnabled ) {
+ log.error(format(m, args.toSeq))
+ }
+ }
+ }
+
+ def error(e: Throwable): Unit = {
+ with_throwable(e) {
+ if( log.isErrorEnabled ) {
+ log.error(e.getMessage)
+ }
+ }
+ }
+
+ def warn(m: => String, args:Any*): Unit = {
+ if( log.isWarnEnabled ) {
+ log.warn(format(m, args.toSeq))
+ }
+ }
+
+ def warn(e: Throwable, m: => String, args:Any*): Unit = {
+ with_throwable(e) {
+ if( log.isWarnEnabled ) {
+ log.warn(format(m, args.toSeq))
+ }
+ }
+ }
+
+ def warn(e: Throwable): Unit = {
+ with_throwable(e) {
+ if( log.isWarnEnabled ) {
+ log.warn(e.toString)
+ }
+ }
+ }
+
+ def info(m: => String, args:Any*): Unit = {
+ if( log.isInfoEnabled ) {
+ log.info(format(m, args.toSeq))
+ }
+ }
+
+ def info(e: Throwable, m: => String, args:Any*): Unit = {
+ with_throwable(e) {
+ if( log.isInfoEnabled ) {
+ log.info(format(m, args.toSeq))
+ }
+ }
+ }
+
+ def info(e: Throwable): Unit = {
+ with_throwable(e) {
+ if( log.isInfoEnabled ) {
+ log.info(e.toString)
+ }
+ }
+ }
+
+
+ def debug(m: => String, args:Any*): Unit = {
+ if( log.isDebugEnabled ) {
+ log.debug(format(m, args.toSeq))
+ }
+ }
+
+ def debug(e: Throwable, m: => String, args:Any*): Unit = {
+ if( log.isDebugEnabled ) {
+ log.debug(format(m, args.toSeq), e)
+ }
+ }
+
+ def debug(e: Throwable): Unit = {
+ if( log.isDebugEnabled ) {
+ log.debug(e.toString, e)
+ }
+ }
+
+ def trace(m: => String, args:Any*): Unit = {
+ if( log.isTraceEnabled ) {
+ log.trace(format(m, args.toSeq))
+ }
+ }
+
+ def trace(e: Throwable, m: => String, args:Any*): Unit = {
+ if( log.isTraceEnabled ) {
+ log.trace(format(m, args.toSeq), e)
+ }
+ }
+
+ def trace(e: Throwable): Unit = {
+ if( log.isTraceEnabled ) {
+ log.trace(e.toString, e)
+ }
+ }
+
+}
\ No newline at end of file
Copied: activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/util/LongCounter.scala (from r1389860, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/leveldb/LevelDBPersistenceAdapter.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/util/LongCounter.scala?p2=activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/util/LongCounter.scala&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/leveldb/LevelDBPersistenceAdapter.java&r1=1389860&r2=1389882&rev=1389882&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/leveldb/LevelDBPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/util/LongCounter.scala Tue Sep 25 14:32:28 2012
@@ -14,17 +14,35 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.store.leveldb;
-
-import org.fusesource.mq.leveldb.LevelDBStore;
+package org.apache.activemq.leveldb.util
/**
- * An implementation of {@link org.apache.activemq.store.PersistenceAdapter} designed for use with
- * LevelDB - Embedded Lightweight Non-Relational Database
- *
- * @org.apache.xbean.XBean element="levelDB"
+ * <p>
+ * </p>
*
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-public class LevelDBPersistenceAdapter extends LevelDBStore {
-}
+class LongCounter(private var value:Long = 0) extends Serializable {
+
+ def clear() = value=0
+ def get() = value
+ def set(value:Long) = this.value = value
+
+ def incrementAndGet() = addAndGet(1)
+ def decrementAndGet() = addAndGet(-1)
+ def addAndGet(amount:Long) = {
+ value+=amount
+ value
+ }
+
+ def getAndIncrement() = getAndAdd(1)
+ def getAndDecrement() = getAndAdd(-11)
+ def getAndAdd(amount:Long) = {
+ val rc = value
+ value+=amount
+ rc
+ }
+
+ override def toString() = get().toString
+}
\ No newline at end of file
Added: activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/util/RetrySupport.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/util/RetrySupport.scala?rev=1389882&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/util/RetrySupport.scala (added)
+++ activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/util/RetrySupport.scala Tue Sep 25 14:32:28 2012
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.leveldb.util
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object RetrySupport {
+
+ def retry[T](log:Log, isStarted: ()=>Boolean, func: ()=>T): T = {
+ import log._
+ var error:Throwable = null
+ var rc:Option[T] = None
+
+ // We will loop until the tx succeeds. Perhaps it's
+ // failing due to a temporary condition like low disk space.
+ while(!rc.isDefined) {
+
+ try {
+ rc = Some(func())
+ } catch {
+ case e:Throwable =>
+ e.printStackTrace()
+ if( error==null ) {
+ warn(e, "DB operation failed. (entering recovery mode)")
+ }
+ error = e
+ }
+
+ if (!rc.isDefined) {
+ // We may need to give up if the store is being stopped.
+ if ( !isStarted() ) {
+ throw error
+ }
+ Thread.sleep(1000)
+ }
+ }
+
+ if( error!=null ) {
+ info("DB recovered from failure.")
+ }
+ rc.get
+ }
+
+}
\ No newline at end of file
Copied: activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/util/TimeMetric.scala (from r1389860, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/leveldb/LevelDBPersistenceAdapter.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/util/TimeMetric.scala?p2=activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/util/TimeMetric.scala&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/leveldb/LevelDBPersistenceAdapter.java&r1=1389860&r2=1389882&rev=1389882&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/leveldb/LevelDBPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/util/TimeMetric.scala Tue Sep 25 14:32:28 2012
@@ -14,17 +14,37 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.store.leveldb;
-import org.fusesource.mq.leveldb.LevelDBStore;
+package org.apache.activemq.leveldb.util
+case class TimeMetric() {
+ var max = 0L
+
+ def add(duration:Long) = this.synchronized {
+ max = max.max(duration)
+ }
+
+ def get = {
+ this.synchronized {
+ max
+ } / 1000000.0
+ }
+ def reset = {
+ this.synchronized {
+ val rc = max
+ max = 0
+ rc
+ } / 1000000.0
+ }
+
+ def apply[T](func: =>T):T = {
+ val start = System.nanoTime()
+ try {
+ func
+ } finally {
+ add(System.nanoTime() - start)
+ }
+ }
-/**
- * An implementation of {@link org.apache.activemq.store.PersistenceAdapter} designed for use with
- * LevelDB - Embedded Lightweight Non-Relational Database
- *
- * @org.apache.xbean.XBean element="levelDB"
- *
- */
-public class LevelDBPersistenceAdapter extends LevelDBStore {
}
+
Copied: activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/store/leveldb/package.html (from r1389860, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/leveldb/package.html)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/store/leveldb/package.html?p2=activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/store/leveldb/package.html&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/leveldb/package.html&r1=1389860&r2=1389882&rev=1389882&view=diff
==============================================================================
(empty)
Copied: activemq/trunk/activemq-leveldb/src/test/java/org/apache/activemq/store/leveldb/LevelDBConfigTest.java (from r1389860, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/leveldb/LevelDBConfigTest.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/test/java/org/apache/activemq/store/leveldb/LevelDBConfigTest.java?p2=activemq/trunk/activemq-leveldb/src/test/java/org/apache/activemq/store/leveldb/LevelDBConfigTest.java&p1=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/leveldb/LevelDBConfigTest.java&r1=1389860&r2=1389882&rev=1389882&view=diff
==============================================================================
(empty)
Copied: activemq/trunk/activemq-leveldb/src/test/java/org/apache/activemq/store/leveldb/LevelDBStoreBrokerTest.java (from r1389860, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/leveldb/LevelDBStoreBrokerTest.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/test/java/org/apache/activemq/store/leveldb/LevelDBStoreBrokerTest.java?p2=activemq/trunk/activemq-leveldb/src/test/java/org/apache/activemq/store/leveldb/LevelDBStoreBrokerTest.java&p1=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/leveldb/LevelDBStoreBrokerTest.java&r1=1389860&r2=1389882&rev=1389882&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/leveldb/LevelDBStoreBrokerTest.java (original)
+++ activemq/trunk/activemq-leveldb/src/test/java/org/apache/activemq/store/leveldb/LevelDBStoreBrokerTest.java Tue Sep 25 14:32:28 2012
@@ -23,7 +23,7 @@ import org.apache.activemq.broker.Broker
import org.apache.activemq.broker.BrokerTest;
import org.apache.activemq.store.kahadb.KahaDBStore;
import org.apache.activemq.util.IOHelper;
-import org.fusesource.mq.leveldb.LevelDBStore;
+import org.apache.activemq.leveldb.LevelDBStore;
/**
* Once the wire format is completed we can test against real persistence storage.
Added: activemq/trunk/activemq-leveldb/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/test/resources/log4j.properties?rev=1389882&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb/src/test/resources/log4j.properties (added)
+++ activemq/trunk/activemq-leveldb/src/test/resources/log4j.properties Tue Sep 25 14:32:28 2012
@@ -0,0 +1,36 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#
+# The logging properties used during tests..
+#
+log4j.rootLogger=WARN, console, file
+log4j.logger.org.apache.activemq=INFO
+log4j.logger.org.fusesource=INFO
+
+# Console will only display warnnings
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%-5p | %t | %m%n
+log4j.appender.console.threshold=TRACE
+
+# File appender will contain all info messages
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d | %-5p | %m | %c | %t%n
+log4j.appender.file.file=target/test.log
+log4j.appender.file.append=true
Propchange: activemq/trunk/activemq-leveldb/src/test/resources/log4j.properties
------------------------------------------------------------------------------
svn:executable = *
Copied: activemq/trunk/activemq-leveldb/src/test/resources/org/apache/activemq/store/leveldb/leveldb.xml (from r1389860, activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/store/leveldb/leveldb.xml)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/test/resources/org/apache/activemq/store/leveldb/leveldb.xml?p2=activemq/trunk/activemq-leveldb/src/test/resources/org/apache/activemq/store/leveldb/leveldb.xml&p1=activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/store/leveldb/leveldb.xml&r1=1389860&r2=1389882&rev=1389882&view=diff
==============================================================================
(empty)
Added: activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/ActiveMQScenario.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/ActiveMQScenario.scala?rev=1389882&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/ActiveMQScenario.scala (added)
+++ activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/ActiveMQScenario.scala Tue Sep 25 14:32:28 2012
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.leveldb
+
+import org.apache.activemq.spring.ActiveMQConnectionFactory
+import javax.jms.{Destination, ConnectionFactory}
+import org.apache.activemq.command.{ActiveMQTopic, ActiveMQQueue}
+
+/**
+ * <p>
+ * ActiveMQ implementation of the JMS Scenario class.
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class ActiveMQScenario extends JMSClientScenario {
+
+ override protected def factory:ConnectionFactory = {
+ val rc = new ActiveMQConnectionFactory
+ rc.setBrokerURL(url)
+ rc
+ }
+
+ override protected def destination(i:Int):Destination = destination_type match {
+ case "queue" => new ActiveMQQueue(indexed_destination_name(i))
+ case "topic" => new ActiveMQTopic(indexed_destination_name(i))
+ case _ => error("Unsuported destination type: "+destination_type)
+ }
+
+}
Added: activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/EnqueueRateScenariosTest.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/EnqueueRateScenariosTest.scala?rev=1389882&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/EnqueueRateScenariosTest.scala (added)
+++ activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/EnqueueRateScenariosTest.scala Tue Sep 25 14:32:28 2012
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.leveldb
+
+import junit.framework.TestCase
+import org.apache.activemq.broker._
+import org.apache.activemq.store._
+import java.io.File
+import junit.framework.Assert._
+import org.apache.commons.math.stat.descriptive.DescriptiveStatistics
+import region.policy.{PolicyEntry, PolicyMap}
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class EnqueueRateScenariosTest extends TestCase {
+
+ var broker: BrokerService = null
+
+ override def setUp() {
+ import collection.JavaConversions._
+ broker = new BrokerService
+ broker.setDeleteAllMessagesOnStartup(true)
+ broker.setPersistenceAdapter(createStore)
+ broker.addConnector("tcp://0.0.0.0:0")
+// val policies = new PolicyMap();
+// val entry = new PolicyEntry
+// entry.setQueue(">")
+// policies.setPolicyEntries(List(entry))
+// broker.setDestinationPolicy(policies)
+ broker.start
+ broker.waitUntilStarted()
+ }
+
+ override def tearDown() = {
+ if (broker != null) {
+ broker.stop
+ broker.waitUntilStopped
+ }
+ }
+
+ protected def canceledEnqueues() =
+ broker.getPersistenceAdapter.asInstanceOf[LevelDBStore].db.uowCanceledCounter
+
+ protected def enqueueOptimized() =
+ broker.getPersistenceAdapter.asInstanceOf[LevelDBStore].db.uowEnqueueDelayReqested
+
+ protected def enqueueNotOptimized() =
+ broker.getPersistenceAdapter.asInstanceOf[LevelDBStore].db.uowEnqueueNodelayReqested
+
+
+ protected def createStore: PersistenceAdapter = {
+ var store: LevelDBStore = new LevelDBStore
+ store.setDirectory(new File("target/activemq-data/leveldb"))
+ return store
+ }
+
+ def collect_benchmark(scenario:ActiveMQScenario, warmup:Int, samples_count:Int) = {
+ val (cancels, optimized, unoptimized) = scenario.with_load {
+ println("Warming up for %d seconds...".format(warmup))
+ Thread.sleep(warmup*1000)
+ println("Sampling...")
+ scenario.collection_start
+ val cancelStart = canceledEnqueues
+ val enqueueOptimizedStart = enqueueOptimized
+ val enqueueNotOptimizedStart = enqueueNotOptimized
+ for (i <- 0 until samples_count) {
+ Thread.sleep(1000);
+ scenario.collection_sample
+ }
+ (canceledEnqueues-cancelStart, enqueueOptimized-enqueueOptimizedStart, enqueueNotOptimized-enqueueNotOptimizedStart)
+ }
+ println("Done.")
+
+ var samples = scenario.collection_end
+ val error_rates = samples.get("e_custom").get.map(_._2)
+ assertFalse("Errors occured during scenario run: "+error_rates, error_rates.find(_ > 0 ).isDefined )
+
+ val producer_stats = new DescriptiveStatistics();
+ for( producer_rates <- samples.get("p_custom") ) {
+ for( i <- producer_rates ) {
+ producer_stats.addValue(i._2)
+ }
+ }
+
+ val consumer_stats = new DescriptiveStatistics();
+ for( consumer_rates <- samples.get("c_custom") ) {
+ for( i <- consumer_rates ) {
+ consumer_stats.addValue(i._2)
+ }
+ }
+
+ (producer_stats, consumer_stats, cancels*1.0/samples_count, optimized*1.0/samples_count, unoptimized*1.0/samples_count)
+ }
+
+ def benchmark(name:String, warmup:Int=3, samples_count:Int=15, async_send:Boolean=true)(setup:(ActiveMQScenario)=>Unit) = {
+ println("Benchmarking: "+name)
+ var options: String = "?jms.watchTopicAdvisories=false&jms.useAsyncSend="+async_send
+ val url = broker.getTransportConnectors.get(0).getConnectUri + options
+
+ val scenario = new ActiveMQScenario
+ scenario.url = url
+ scenario.display_errors = true
+ scenario.persistent = true
+ scenario.message_size = 1024 * 3
+
+ setup(scenario)
+ val (producer_stats, consumer_stats, cancels, optimized, unoptimized) = collect_benchmark(scenario, warmup, samples_count)
+
+ println("%s: producer avg msg/sec: %,.2f, stddev: %,.2f".format(name, producer_stats.getMean, producer_stats.getStandardDeviation))
+ println("%s: consumer avg msg/sec: %,.2f, stddev: %,.2f".format(name, consumer_stats.getMean, consumer_stats.getStandardDeviation))
+ println("%s: canceled enqueues/sec: %,.2f".format(name,cancels))
+ println("%s: optimized enqueues/sec: %,.2f".format(name,optimized))
+ println("%s: unoptimized enqueues/sec: %,.2f".format(name,unoptimized))
+
+ (producer_stats, consumer_stats, cancels, optimized, unoptimized)
+ }
+
+ def testHighCancelRatio = {
+ val (producer_stats, consumer_stats, cancels, optimized, unoptimized) = benchmark("both_connected_baseline") { scenario=>
+ scenario.producers = 1
+ scenario.consumers = 1
+ }
+ val cancel_ratio = cancels / producer_stats.getMean
+ assertTrue("Expecting more than 80%% of the enqueues get canceled. But only %.2f%% was canceled".format(cancel_ratio*100), cancel_ratio > .80)
+ }
+
+ def testDecoupledProducerRate = {
+
+ // Fill up the queue with messages.. for the benefit of the next benchmark..
+ val from_1_to_0 = benchmark("from_1_to_0", 60) { scenario=>
+ scenario.producers = 1
+ scenario.consumers = 0
+ }
+ val from_1_to_10 = benchmark("from_1_to_10") { scenario=>
+ scenario.producers = 1
+ scenario.consumers = 10
+ }
+ val from_1_to_1 = benchmark("from_1_to_1") { scenario=>
+ scenario.producers = 1
+ scenario.consumers = 1
+ }
+
+ var percent_diff0 = (1.0 - (from_1_to_0._1.getMean / from_1_to_1._1.getMean)).abs * 100
+ var percent_diff1 = (1.0 - (from_1_to_1._1.getMean / from_1_to_10._1.getMean)).abs * 100
+
+ var msg0 = "The 0 vs 1 consumer scenario producer rate was within %.2f%%".format(percent_diff0)
+ var msg1 = "The 1 vs 10 consumer scenario producer rate was within %.2f%%".format(percent_diff1)
+
+ println(msg0)
+ println(msg1)
+
+ assertTrue(msg0, percent_diff0 <= 60)
+ assertTrue(msg1, percent_diff1 <= 20)
+ }
+
+}
\ No newline at end of file
Added: activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/HALevelDBFastEnqueueTest.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/HALevelDBFastEnqueueTest.scala?rev=1389882&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/HALevelDBFastEnqueueTest.scala (added)
+++ activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/HALevelDBFastEnqueueTest.scala Tue Sep 25 14:32:28 2012
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.leveldb
+
+import org.apache.hadoop.fs.FileUtil
+import java.io.File
+import java.util.concurrent.TimeUnit
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class HALevelDBFastEnqueueTest extends LevelDBFastEnqueueTest {
+
+ override def setUp: Unit = {
+ TestingHDFSServer.start
+ super.setUp
+ }
+
+ override def tearDown: Unit = {
+ super.tearDown
+ TestingHDFSServer.stop
+ }
+
+ override protected def createStore: LevelDBStore = {
+ var store: HALevelDBStore = new HALevelDBStore
+ store.setDirectory(dataDirectory)
+ store.setDfsDirectory("target/activemq-data/hdfs-leveldb")
+ return store
+ }
+
+ private def dataDirectory: File = {
+ return new File("target/activemq-data/leveldb")
+ }
+
+ /**
+ * On restart we will also delete the local file system store, so that we test restoring from
+ * HDFS.
+ */
+ override protected def restartBroker(restartDelay: Int, checkpoint: Int): Unit = {
+ stopBroker
+ FileUtil.fullyDelete(dataDirectory)
+ TimeUnit.MILLISECONDS.sleep(restartDelay)
+ startBroker(false, checkpoint)
+ }
+}
\ No newline at end of file
Copied: activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/HALevelDBStoreTest.scala (from r1389860, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/leveldb/LevelDBPersistenceAdapter.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/HALevelDBStoreTest.scala?p2=activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/HALevelDBStoreTest.scala&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/leveldb/LevelDBPersistenceAdapter.java&r1=1389860&r2=1389882&rev=1389882&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/leveldb/LevelDBPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/HALevelDBStoreTest.scala Tue Sep 25 14:32:28 2012
@@ -14,17 +14,35 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.store.leveldb;
-
-import org.fusesource.mq.leveldb.LevelDBStore;
+package org.apache.activemq.leveldb
+import org.apache.activemq.store.PersistenceAdapter
+import java.io.File
/**
- * An implementation of {@link org.apache.activemq.store.PersistenceAdapter} designed for use with
- * LevelDB - Embedded Lightweight Non-Relational Database
- *
- * @org.apache.xbean.XBean element="levelDB"
+ * <p>
+ * </p>
*
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-public class LevelDBPersistenceAdapter extends LevelDBStore {
-}
+class HALevelDBStoreTest extends LevelDBStoreTest {
+ override protected def setUp: Unit = {
+ TestingHDFSServer.start
+ super.setUp
+ }
+
+ override protected def tearDown: Unit = {
+ super.tearDown
+ TestingHDFSServer.stop
+ }
+
+ override protected def createPersistenceAdapter(delete: Boolean): PersistenceAdapter = {
+ var store: HALevelDBStore = new HALevelDBStore
+ store.setDirectory(new File("target/activemq-data/haleveldb"))
+ store.setDfsDirectory("localhost")
+ if (delete) {
+ store.deleteAllMessages
+ }
+ return store
+ }
+}
\ No newline at end of file
Copied: activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/IDERunner.scala (from r1389860, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/leveldb/LevelDBPersistenceAdapter.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/IDERunner.scala?p2=activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/IDERunner.scala&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/leveldb/LevelDBPersistenceAdapter.java&r1=1389860&r2=1389882&rev=1389882&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/leveldb/LevelDBPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/IDERunner.scala Tue Sep 25 14:32:28 2012
@@ -14,17 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.store.leveldb;
-import org.fusesource.mq.leveldb.LevelDBStore;
+package org.apache.activemq.leveldb
+import org.apache.activemq.console.Main
-/**
- * An implementation of {@link org.apache.activemq.store.PersistenceAdapter} designed for use with
- * LevelDB - Embedded Lightweight Non-Relational Database
- *
- * @org.apache.xbean.XBean element="levelDB"
- *
- */
-public class LevelDBPersistenceAdapter extends LevelDBStore {
-}
+object IDERunner {
+ def main(args:Array[String]) ={
+ Main.main(args)
+ }
+}
\ No newline at end of file
Added: activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/JMSClientScenario.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/JMSClientScenario.scala?rev=1389882&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/JMSClientScenario.scala (added)
+++ activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/JMSClientScenario.scala Tue Sep 25 14:32:28 2012
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.leveldb
+
+import java.lang.Thread
+import javax.jms._
+
+/**
+ * <p>
+ * Simulates load on a JMS sever using the JMS messaging API.
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract class JMSClientScenario extends Scenario {
+
+ def createProducer(i:Int) = {
+ new ProducerClient(i)
+ }
+ def createConsumer(i:Int) = {
+ new ConsumerClient(i)
+ }
+
+ protected def destination(i:Int):Destination
+
+ def indexed_destination_name(i:Int) = destination_type match {
+ case "queue" => queue_prefix+destination_name+"-"+(i%destination_count)
+ case "topic" => topic_prefix+destination_name+"-"+(i%destination_count)
+ case _ => error("Unsuported destination type: "+destination_type)
+ }
+
+
+ protected def factory:ConnectionFactory
+
+ def jms_ack_mode = {
+ ack_mode match {
+ case "auto" => Session.AUTO_ACKNOWLEDGE
+ case "client" => Session.CLIENT_ACKNOWLEDGE
+ case "dups_ok" => Session.DUPS_OK_ACKNOWLEDGE
+ case "transacted" => Session.SESSION_TRANSACTED
+ case _ => throw new Exception("Invalid ack mode: "+ack_mode)
+ }
+ }
+
+ trait JMSClient extends Client {
+
+ @volatile
+ var connection:Connection = _
+ var message_counter=0L
+
+ var worker = new Thread() {
+ override def run() {
+ var reconnect_delay = 0
+ while( !done.get ) {
+ try {
+
+ if( reconnect_delay!=0 ) {
+ Thread.sleep(reconnect_delay)
+ reconnect_delay=0
+ }
+ connection = factory.createConnection(user_name, password)
+// connection.setClientID(name)
+ connection.setExceptionListener(new ExceptionListener {
+ def onException(exception: JMSException) {
+ }
+ })
+ connection.start()
+
+ execute
+
+ } catch {
+ case e:Throwable =>
+ if( !done.get ) {
+ if( display_errors ) {
+ e.printStackTrace
+ }
+ error_counter.incrementAndGet
+ reconnect_delay = 1000
+ }
+ } finally {
+ dispose
+ }
+ }
+ }
+ }
+
+ def dispose {
+ try {
+ connection.close()
+ } catch {
+ case _ =>
+ }
+ }
+
+ def execute:Unit
+
+ def start = {
+ worker.start
+ }
+
+ def shutdown = {
+ assert(done.get)
+ if ( worker!=null ) {
+ dispose
+ worker.join(1000)
+ while(worker.isAlive ) {
+ println("Worker did not shutdown quickly.. interrupting thread.")
+ worker.interrupt()
+ worker.join(1000)
+ }
+ worker = null
+ }
+ }
+
+ def name:String
+ }
+
+ class ConsumerClient(val id: Int) extends JMSClient {
+ val name: String = "consumer " + id
+
+ def execute {
+ var session = connection.createSession(false, jms_ack_mode)
+ var consumer:MessageConsumer = if( durable ) {
+ session.createDurableSubscriber(destination(id).asInstanceOf[Topic], name, selector, no_local)
+ } else {
+ session.createConsumer(destination(id), selector, no_local)
+ }
+
+ while( !done.get() ) {
+ val msg = consumer.receive(500)
+ if( msg!=null ) {
+ consumer_counter.incrementAndGet()
+ if (consumer_sleep != 0) {
+ Thread.sleep(consumer_sleep)
+ }
+ if(session.getAcknowledgeMode == Session.CLIENT_ACKNOWLEDGE) {
+ msg.acknowledge();
+ }
+ }
+ }
+ }
+
+ }
+
+ class ProducerClient(val id: Int) extends JMSClient {
+
+ val name: String = "producer " + id
+
+ def execute {
+ val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ val producer:MessageProducer = session.createProducer(destination(id))
+ producer.setDeliveryMode(if( persistent ) {
+ DeliveryMode.PERSISTENT
+ } else {
+ DeliveryMode.NON_PERSISTENT
+ })
+
+ val msg = session.createTextMessage(body(name))
+ headers_for(id).foreach { case (key, value) =>
+ msg.setStringProperty(key, value)
+ }
+
+ while( !done.get() ) {
+ producer.send(msg)
+ producer_counter.incrementAndGet()
+ if (producer_sleep != 0) {
+ Thread.sleep(producer_sleep)
+ }
+ }
+
+ }
+ }
+
+ def body(name:String) = {
+ val buffer = new StringBuffer(message_size)
+ buffer.append("Message from " + name+"\n")
+ for( i <- buffer.length to message_size ) {
+ buffer.append(('a'+(i%26)).toChar)
+ }
+ var rc = buffer.toString
+ if( rc.length > message_size ) {
+ rc.substring(0, message_size)
+ } else {
+ rc
+ }
+ }
+
+
+
+}
Added: activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/LevelDBFastEnqueueTest.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/LevelDBFastEnqueueTest.scala?rev=1389882&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/LevelDBFastEnqueueTest.scala (added)
+++ activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/LevelDBFastEnqueueTest.scala Tue Sep 25 14:32:28 2012
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.leveldb
+
+import org.apache.activemq.ActiveMQConnection
+import org.apache.activemq.ActiveMQConnectionFactory
+import org.apache.activemq.broker.BrokerService
+import org.apache.activemq.command.ActiveMQQueue
+import org.apache.activemq.command.ConnectionControl
+import org.junit.After
+import org.junit.Before
+import org.junit.Test
+import javax.jms._
+import java.io.File
+import java.util.Vector
+import java.util.concurrent.ExecutorService
+import java.util.concurrent.Executors
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicLong
+import junit.framework.Assert._
+import org.apache.activemq.leveldb.util.Log
+import junit.framework.TestCase
+
+object LevelDBFastEnqueueTest extends Log
+class LevelDBFastEnqueueTest extends TestCase {
+
+ import LevelDBFastEnqueueTest._
+
+ @Test def testPublishNoConsumer: Unit = {
+ startBroker(true, 10)
+ val sharedCount: AtomicLong = new AtomicLong(toSend)
+ var start: Long = System.currentTimeMillis
+ var executorService: ExecutorService = Executors.newCachedThreadPool
+ var i: Int = 0
+ while (i < parallelProducer) {
+ executorService.execute(new Runnable {
+ def run: Unit = {
+ try {
+ publishMessages(sharedCount, 0)
+ }
+ catch {
+ case e: Exception => {
+ exceptions.add(e)
+ }
+ }
+ }
+ })
+ i += 1
+ }
+ executorService.shutdown
+ executorService.awaitTermination(30, TimeUnit.MINUTES)
+ assertTrue("Producers done in time", executorService.isTerminated)
+ assertTrue("No exceptions: " + exceptions, exceptions.isEmpty)
+ var totalSent: Long = toSend * payloadString.length
+ var duration: Double = System.currentTimeMillis - start
+ info("Duration: " + duration + "ms")
+ info("Rate: " + (toSend * 1000 / duration) + "m/s")
+ info("Total send: " + totalSent)
+ info("Total journal write: " + store.getLogAppendPosition)
+ info("Journal writes %: " + store.getLogAppendPosition / totalSent.asInstanceOf[Double] * 100 + "%")
+ stopBroker
+ restartBroker(0, 1200000)
+ consumeMessages(toSend)
+ }
+
+ @Test def testPublishNoConsumerNoCheckpoint: Unit = {
+ toSend = 100
+ startBroker(true, 0)
+ val sharedCount: AtomicLong = new AtomicLong(toSend)
+ var start: Long = System.currentTimeMillis
+ var executorService: ExecutorService = Executors.newCachedThreadPool
+ var i: Int = 0
+ while (i < parallelProducer) {
+ executorService.execute(new Runnable {
+ def run: Unit = {
+ try {
+ publishMessages(sharedCount, 0)
+ }
+ catch {
+ case e: Exception => {
+ exceptions.add(e)
+ }
+ }
+ }
+ })
+ i += 1;
+ }
+ executorService.shutdown
+ executorService.awaitTermination(30, TimeUnit.MINUTES)
+ assertTrue("Producers done in time", executorService.isTerminated)
+ assertTrue("No exceptions: " + exceptions, exceptions.isEmpty)
+ var totalSent: Long = toSend * payloadString.length
+ broker.getAdminView.gc
+ var duration: Double = System.currentTimeMillis - start
+ info("Duration: " + duration + "ms")
+ info("Rate: " + (toSend * 1000 / duration) + "m/s")
+ info("Total send: " + totalSent)
+ info("Total journal write: " + store.getLogAppendPosition)
+ info("Journal writes %: " + store.getLogAppendPosition / totalSent.asInstanceOf[Double] * 100 + "%")
+ stopBroker
+ restartBroker(0, 0)
+ consumeMessages(toSend)
+ }
+
+ private def consumeMessages(count: Long): Unit = {
+ var connection: ActiveMQConnection = connectionFactory.createConnection.asInstanceOf[ActiveMQConnection]
+ connection.setWatchTopicAdvisories(false)
+ connection.start
+ var session: Session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ var consumer: MessageConsumer = session.createConsumer(destination)
+ var i: Int = 0
+ while (i < count) {
+ assertNotNull("got message " + i, consumer.receive(10000))
+ i += 1;
+ }
+ assertNull("none left over", consumer.receive(2000))
+ }
+
+ protected def restartBroker(restartDelay: Int, checkpoint: Int): Unit = {
+ stopBroker
+ TimeUnit.MILLISECONDS.sleep(restartDelay)
+ startBroker(false, checkpoint)
+ }
+
+ override def tearDown() = stopBroker
+
+ def stopBroker: Unit = {
+ if (broker != null) {
+ broker.stop
+ broker.waitUntilStopped
+ }
+ }
+
+ private def publishMessages(count: AtomicLong, expiry: Int): Unit = {
+ var connection: ActiveMQConnection = connectionFactory.createConnection.asInstanceOf[ActiveMQConnection]
+ connection.setWatchTopicAdvisories(false)
+ connection.start
+ var session: Session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ var producer: MessageProducer = session.createProducer(destination)
+ var start: Long = System.currentTimeMillis
+ var i: Long = 0l
+ var bytes: Array[Byte] = payloadString.getBytes
+ while ((({
+ i = count.getAndDecrement; i
+ })) > 0) {
+ var message: Message = null
+ if (useBytesMessage) {
+ message = session.createBytesMessage
+ (message.asInstanceOf[BytesMessage]).writeBytes(bytes)
+ }
+ else {
+ message = session.createTextMessage(payloadString)
+ }
+ producer.send(message, DeliveryMode.PERSISTENT, 5, expiry)
+ if (i != toSend && i % sampleRate == 0) {
+ var now: Long = System.currentTimeMillis
+ info("Remainder: " + i + ", rate: " + sampleRate * 1000 / (now - start) + "m/s")
+ start = now
+ }
+ }
+ connection.syncSendPacket(new ConnectionControl)
+ connection.close
+ }
+
+ def startBroker(deleteAllMessages: Boolean, checkPointPeriod: Int): Unit = {
+ broker = new BrokerService
+ broker.setDeleteAllMessagesOnStartup(deleteAllMessages)
+ store = createStore
+ broker.setPersistenceAdapter(store)
+ broker.addConnector("tcp://0.0.0.0:0")
+ broker.start
+ var options: String = "?jms.watchTopicAdvisories=false&jms.useAsyncSend=true&jms.alwaysSessionAsync=false&jms.dispatchAsync=false&socketBufferSize=131072&ioBufferSize=16384&wireFormat.tightEncodingEnabled=false&wireFormat.cacheSize=8192"
+ connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectors.get(0).getConnectUri + options)
+ }
+
+ protected def createStore: LevelDBStore = {
+ var store: LevelDBStore = new LevelDBStore
+ store.setDirectory(new File("target/activemq-data/leveldb"))
+ return store
+ }
+
+ private[leveldb] var broker: BrokerService = null
+ private[leveldb] var connectionFactory: ActiveMQConnectionFactory = null
+ private[leveldb] var store: LevelDBStore = null
+ private[leveldb] var destination: Destination = new ActiveMQQueue("Test")
+ private[leveldb] var payloadString: String = new String(new Array[Byte](6 * 1024))
+ private[leveldb] var useBytesMessage: Boolean = true
+ private[leveldb] final val parallelProducer: Int = 20
+ private[leveldb] var exceptions: Vector[Exception] = new Vector[Exception]
+ private[leveldb] var toSend: Long = 100000
+ private[leveldb] final val sampleRate: Double = 100000
+}
\ No newline at end of file
Added: activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/LevelDBStoreBrokerTest.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/LevelDBStoreBrokerTest.scala?rev=1389882&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/LevelDBStoreBrokerTest.scala (added)
+++ activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/LevelDBStoreBrokerTest.scala Tue Sep 25 14:32:28 2012
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.leveldb
+
+import org.apache.activemq.broker.BrokerService
+import org.apache.activemq.broker.BrokerTest
+import org.apache.activemq.store.PersistenceAdapter
+import java.io.File
+import junit.framework.{TestSuite, Test}
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object LevelDBStoreBrokerTest {
+ def suite: Test = {
+ return new TestSuite(classOf[LevelDBStoreBrokerTest])
+ }
+
+ def main(args: Array[String]): Unit = {
+ junit.textui.TestRunner.run(suite)
+ }
+}
+
+class LevelDBStoreBrokerTest extends BrokerTest {
+
+ protected def createPersistenceAdapter(delete: Boolean): PersistenceAdapter = {
+ var store: LevelDBStore = new LevelDBStore
+ store.setDirectory(new File("target/activemq-data/leveldb"))
+ if (delete) {
+ store.deleteAllMessages
+ }
+ return store
+ }
+
+ protected override def createBroker: BrokerService = {
+ var broker: BrokerService = new BrokerService
+ broker.setPersistenceAdapter(createPersistenceAdapter(true))
+ return broker
+ }
+
+ protected def createRestartedBroker: BrokerService = {
+ var broker: BrokerService = new BrokerService
+ broker.setPersistenceAdapter(createPersistenceAdapter(false))
+ return broker
+ }
+}
\ No newline at end of file
Copied: activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/LevelDBStoreTest.scala (from r1389860, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/leveldb/LevelDBPersistenceAdapter.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/LevelDBStoreTest.scala?p2=activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/LevelDBStoreTest.scala&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/leveldb/LevelDBPersistenceAdapter.java&r1=1389860&r2=1389882&rev=1389882&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/leveldb/LevelDBPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/LevelDBStoreTest.scala Tue Sep 25 14:32:28 2012
@@ -14,17 +14,28 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.store.leveldb;
-
-import org.fusesource.mq.leveldb.LevelDBStore;
+package org.apache.activemq.leveldb
+import org.apache.activemq.store.PersistenceAdapter
+import org.apache.activemq.store.PersistenceAdapterTestSupport
+import java.io.File
/**
- * An implementation of {@link org.apache.activemq.store.PersistenceAdapter} designed for use with
- * LevelDB - Embedded Lightweight Non-Relational Database
- *
- * @org.apache.xbean.XBean element="levelDB"
+ * <p>
+ * </p>
*
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-public class LevelDBPersistenceAdapter extends LevelDBStore {
-}
+class LevelDBStoreTest extends PersistenceAdapterTestSupport {
+ override def testStoreCanHandleDupMessages: Unit = {
+ }
+
+ protected def createPersistenceAdapter(delete: Boolean): PersistenceAdapter = {
+ var store: LevelDBStore = new LevelDBStore
+ store.setDirectory(new File("target/activemq-data/haleveldb"))
+ if (delete) {
+ store.deleteAllMessages
+ }
+ return store
+ }
+}
\ No newline at end of file
Added: activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/Scenario.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/Scenario.scala?rev=1389882&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/Scenario.scala (added)
+++ activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/Scenario.scala Tue Sep 25 14:32:28 2012
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.leveldb
+
+import java.util.concurrent.atomic._
+import java.util.concurrent.TimeUnit._
+import scala.collection.mutable.ListBuffer
+
+object Scenario {
+ val MESSAGE_ID:Array[Byte] = "message-id"
+ val NEWLINE = '\n'.toByte
+ val NANOS_PER_SECOND = NANOSECONDS.convert(1, SECONDS)
+
+ implicit def toBytes(value: String):Array[Byte] = value.getBytes("UTF-8")
+
+ def o[T](value:T):Option[T] = value match {
+ case null => None
+ case x => Some(x)
+ }
+}
+
+trait Scenario {
+ import Scenario._
+
+ var url:String = "tcp://localhost:61616"
+ var user_name:String = _
+ var password:String = _
+
+ private var _producer_sleep: { def apply(): Int; def init(time: Long) } = new { def apply() = 0; def init(time: Long) {} }
+ def producer_sleep = _producer_sleep()
+ def producer_sleep_= (new_value: Int) = _producer_sleep = new { def apply() = new_value; def init(time: Long) {} }
+ def producer_sleep_= (new_func: { def apply(): Int; def init(time: Long) }) = _producer_sleep = new_func
+
+ private var _consumer_sleep: { def apply(): Int; def init(time: Long) } = new { def apply() = 0; def init(time: Long) {} }
+ def consumer_sleep = _consumer_sleep()
+ def consumer_sleep_= (new_value: Int) = _consumer_sleep = new { def apply() = new_value; def init(time: Long) {} }
+ def consumer_sleep_= (new_func: { def apply(): Int; def init(time: Long) }) = _consumer_sleep = new_func
+
+ var producers = 1
+ var producers_per_sample = 0
+
+ var consumers = 1
+ var consumers_per_sample = 0
+ var sample_interval = 1000
+
+ var message_size = 1024
+ var persistent = false
+
+ var headers = Array[Array[(String,String)]]()
+ var selector:String = null
+ var no_local = false
+ var durable = false
+ var ack_mode = "auto"
+ var messages_per_connection = -1L
+ var display_errors = false
+
+ var destination_type = "queue"
+ private var _destination_name: () => String = () => "load"
+ def destination_name = _destination_name()
+ def destination_name_=(new_name: String) = _destination_name = () => new_name
+ def destination_name_=(new_func: () => String) = _destination_name = new_func
+ var destination_count = 1
+
+ val producer_counter = new AtomicLong()
+ val consumer_counter = new AtomicLong()
+ val error_counter = new AtomicLong()
+ val done = new AtomicBoolean()
+
+ var queue_prefix = ""
+ var topic_prefix = ""
+ var name = "custom"
+
+ var drain_timeout = 2000L
+
+ def run() = {
+ print(toString)
+ println("--------------------------------------")
+ println(" Running: Press ENTER to stop")
+ println("--------------------------------------")
+ println("")
+
+ with_load {
+
+ // start a sampling client...
+ val sample_thread = new Thread() {
+ override def run() = {
+
+ def print_rate(name: String, periodCount:Long, totalCount:Long, nanos: Long) = {
+
+ val rate_per_second: java.lang.Float = ((1.0f * periodCount / nanos) * NANOS_PER_SECOND)
+ println("%s total: %,d, rate: %,.3f per second".format(name, totalCount, rate_per_second))
+ }
+
+ try {
+ var start = System.nanoTime
+ var total_producer_count = 0L
+ var total_consumer_count = 0L
+ var total_error_count = 0L
+ collection_start
+ while( !done.get ) {
+ Thread.sleep(sample_interval)
+ val end = System.nanoTime
+ collection_sample
+ val samples = collection_end
+ samples.get("p_custom").foreach { case (_, count)::Nil =>
+ total_producer_count += count
+ print_rate("Producer", count, total_producer_count, end - start)
+ case _ =>
+ }
+ samples.get("c_custom").foreach { case (_, count)::Nil =>
+ total_consumer_count += count
+ print_rate("Consumer", count, total_consumer_count, end - start)
+ case _ =>
+ }
+ samples.get("e_custom").foreach { case (_, count)::Nil =>
+ if( count!= 0 ) {
+ total_error_count += count
+ print_rate("Error", count, total_error_count, end - start)
+ }
+ case _ =>
+ }
+ start = end
+ }
+ } catch {
+ case e:InterruptedException =>
+ }
+ }
+ }
+ sample_thread.start()
+
+ System.in.read()
+ done.set(true)
+
+ sample_thread.interrupt
+ sample_thread.join
+ }
+
+ }
+
+ override def toString() = {
+ "--------------------------------------\n"+
+ "Scenario Settings\n"+
+ "--------------------------------------\n"+
+ " destination_type = "+destination_type+"\n"+
+ " queue_prefix = "+queue_prefix+"\n"+
+ " topic_prefix = "+topic_prefix+"\n"+
+ " destination_count = "+destination_count+"\n" +
+ " destination_name = "+destination_name+"\n" +
+ " sample_interval (ms) = "+sample_interval+"\n" +
+ " \n"+
+ " --- Producer Properties ---\n"+
+ " producers = "+producers+"\n"+
+ " message_size = "+message_size+"\n"+
+ " persistent = "+persistent+"\n"+
+ " producer_sleep (ms) = "+producer_sleep+"\n"+
+ " headers = "+headers.mkString(", ")+"\n"+
+ " \n"+
+ " --- Consumer Properties ---\n"+
+ " consumers = "+consumers+"\n"+
+ " consumer_sleep (ms) = "+consumer_sleep+"\n"+
+ " selector = "+selector+"\n"+
+ " durable = "+durable+"\n"+
+ ""
+
+ }
+
+ protected def headers_for(i:Int) = {
+ if ( headers.isEmpty ) {
+ Array[(String, String)]()
+ } else {
+ headers(i%headers.size)
+ }
+ }
+
+ var producer_samples:Option[ListBuffer[(Long,Long)]] = None
+ var consumer_samples:Option[ListBuffer[(Long,Long)]] = None
+ var error_samples = ListBuffer[(Long,Long)]()
+
+ def collection_start: Unit = {
+ producer_counter.set(0)
+ consumer_counter.set(0)
+ error_counter.set(0)
+
+ producer_samples = if (producers > 0 || producers_per_sample>0 ) {
+ Some(ListBuffer[(Long,Long)]())
+ } else {
+ None
+ }
+ consumer_samples = if (consumers > 0 || consumers_per_sample>0 ) {
+ Some(ListBuffer[(Long,Long)]())
+ } else {
+ None
+ }
+ }
+
+ def collection_end: Map[String, scala.List[(Long,Long)]] = {
+ var rc = Map[String, List[(Long,Long)]]()
+ producer_samples.foreach{ samples =>
+ rc += "p_"+name -> samples.toList
+ samples.clear
+ }
+ consumer_samples.foreach{ samples =>
+ rc += "c_"+name -> samples.toList
+ samples.clear
+ }
+ rc += "e_"+name -> error_samples.toList
+ error_samples.clear
+ rc
+ }
+
+ trait Client {
+ def start():Unit
+ def shutdown():Unit
+ }
+
+ var producer_clients = List[Client]()
+ var consumer_clients = List[Client]()
+
+ def with_load[T](func: =>T ):T = {
+ done.set(false)
+
+ _producer_sleep.init(System.currentTimeMillis())
+ _consumer_sleep.init(System.currentTimeMillis())
+
+ for (i <- 0 until producers) {
+ val client = createProducer(i)
+ producer_clients ::= client
+ client.start()
+ }
+
+ for (i <- 0 until consumers) {
+ val client = createConsumer(i)
+ consumer_clients ::= client
+ client.start()
+ }
+
+ try {
+ func
+ } finally {
+ done.set(true)
+ // wait for the threads to finish..
+ for( client <- consumer_clients ) {
+ client.shutdown
+ }
+ consumer_clients = List()
+ for( client <- producer_clients ) {
+ client.shutdown
+ }
+ producer_clients = List()
+ }
+ }
+
+ def drain = {
+ done.set(false)
+ if( destination_type=="queue" || destination_type=="raw_queue" || durable==true ) {
+ print("draining")
+ consumer_counter.set(0)
+ var consumer_clients = List[Client]()
+ for (i <- 0 until destination_count) {
+ val client = createConsumer(i)
+ consumer_clients ::= client
+ client.start()
+ }
+
+ // Keep sleeping until we stop draining messages.
+ var drained = 0L
+ try {
+ Thread.sleep(drain_timeout);
+ def done() = {
+ val c = consumer_counter.getAndSet(0)
+ drained += c
+ c == 0
+ }
+ while( !done ) {
+ print(".")
+ Thread.sleep(drain_timeout);
+ }
+ } finally {
+ done.set(true)
+ for( client <- consumer_clients ) {
+ client.shutdown
+ }
+ println(". (drained %d)".format(drained))
+ }
+ }
+ }
+
+
+ def collection_sample: Unit = {
+
+ val now = System.currentTimeMillis()
+ producer_samples.foreach(_.append((now, producer_counter.getAndSet(0))))
+ consumer_samples.foreach(_.append((now, consumer_counter.getAndSet(0))))
+ error_samples.append((now, error_counter.getAndSet(0)))
+
+ // we might need to increment number the producers..
+ for (i <- 0 until producers_per_sample) {
+ val client = createProducer(producer_clients.length)
+ producer_clients ::= client
+ client.start()
+ }
+
+ // we might need to increment number the consumers..
+ for (i <- 0 until consumers_per_sample) {
+ val client = createConsumer(consumer_clients.length)
+ consumer_clients ::= client
+ client.start()
+ }
+
+ }
+
+ def createProducer(i:Int):Client
+ def createConsumer(i:Int):Client
+
+}
+
+
Copied: activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/TestingHDFSServer.scala (from r1389860, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/leveldb/LevelDBPersistenceAdapter.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/TestingHDFSServer.scala?p2=activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/TestingHDFSServer.scala&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/leveldb/LevelDBPersistenceAdapter.java&r1=1389860&r2=1389882&rev=1389882&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/leveldb/LevelDBPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/TestingHDFSServer.scala Tue Sep 25 14:32:28 2012
@@ -14,17 +14,38 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.store.leveldb;
-
-import org.fusesource.mq.leveldb.LevelDBStore;
+package org.apache.activemq.leveldb
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.hdfs.MiniDFSCluster
+import java.io.IOException
/**
- * An implementation of {@link org.apache.activemq.store.PersistenceAdapter} designed for use with
- * LevelDB - Embedded Lightweight Non-Relational Database
- *
- * @org.apache.xbean.XBean element="levelDB"
+ * <p>
+ * </p>
*
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-public class LevelDBPersistenceAdapter extends LevelDBStore {
-}
+object TestingHDFSServer {
+ private[leveldb] def start: Unit = {
+ var conf: Configuration = new Configuration
+ cluster = new MiniDFSCluster(conf, 1, true, null)
+ cluster.waitActive
+ fs = cluster.getFileSystem
+ }
+
+ private[leveldb] def stop: Unit = {
+ try {
+ cluster.shutdown
+ }
+ catch {
+ case e: Throwable => {
+ e.printStackTrace
+ }
+ }
+ }
+
+ private[leveldb] var cluster: MiniDFSCluster = null
+ private[leveldb] var fs: FileSystem = null
+}
\ No newline at end of file
Modified: activemq/trunk/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/pom.xml?rev=1389882&r1=1389881&r2=1389882&view=diff
==============================================================================
--- activemq/trunk/pom.xml (original)
+++ activemq/trunk/pom.xml Tue Sep 25 14:32:28 2012
@@ -64,11 +64,13 @@
<fusemq-leveldb-version>1.3</fusemq-leveldb-version>
<ftpserver-version>1.0.6</ftpserver-version>
<geronimo-version>1.0</geronimo-version>
+ <hadoop-version>1.0.0</hadoop-version>
<hawtbuf-version>1.9</hawtbuf-version>
<hawtdispatch-version>1.11</hawtdispatch-version>
<howl-version>0.1.8</howl-version>
<hsqldb-version>1.8.0.10</hsqldb-version>
<httpclient-version>4.2.1</httpclient-version>
+ <jackson-version>1.9.2</jackson-version>
<jasypt-version>1.9.0</jasypt-version>
<jdom-version>1.0</jdom-version>
<jetty-version>7.6.7.v20120910</jetty-version>
@@ -93,6 +95,9 @@
<rome-version>1.0</rome-version>
<saxon-version>9.4</saxon-version>
<saxon-bundle-version>9.4.0.1_2</saxon-bundle-version>
+ <scala-plugin-version>2.15.1</scala-plugin-version>
+ <scala-version>2.9.1</scala-version>
+ <scalatest-version>1.8</scalatest-version>
<slf4j-version>1.6.6</slf4j-version>
<spring-version>3.0.7.RELEASE</spring-version>
<spring-osgi-version>1.2.1</spring-osgi-version>
@@ -198,6 +203,7 @@
<module>activemq-jaas</module>
<module>activemq-blueprint</module>
<module>activemq-karaf</module>
+ <module>activemq-leveldb</module>
<module>activemq-openwire-generator</module>
<module>activemq-optional</module>
<module>activemq-pool</module>