You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:12:12 UTC
svn commit: r961160 -
/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
Author: chirino
Date: Wed Jul 7 04:12:12 2010
New Revision: 961160
URL: http://svn.apache.org/viewvc?rev=961160&view=rev
Log:
cleaning up the load client a bit. better error handling and rate reporting.
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala?rev=961160&r1=961159&r2=961160&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala Wed Jul 7 04:12:12 2010
@@ -3,7 +3,7 @@
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
+ * (the "License") you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
@@ -35,30 +35,30 @@ import org.apache.activemq.apollo.stomp.
*/
object StompLoadClient {
- val NANOS_PER_SECOND = NANOSECONDS.convert(1, SECONDS);
+ val NANOS_PER_SECOND = NANOSECONDS.convert(1, SECONDS)
import StompLoadClient._
implicit def toAsciiBuffer(value: String) = new AsciiBuffer(value)
- var producerSleep = 0;
- var consumerSleep = 0;
- var producers = 1;
- var consumers = 1;
- var sampleInterval = 5 * 1000;
- var uri = "stomp://127.0.0.1:61613";
+ var producerSleep = 0
+ var consumerSleep = 0
+ var producers = 1
+ var consumers = 1
+ var sampleInterval = 5 * 1000
+ var uri = "stomp://127.0.0.1:61613"
var bufferSize = 64*1204
- var messageSize = 1024;
+ var messageSize = 1024
var useContentLength=true
- var persistent = false;
- var syncSend = false;
+ var persistent = false
+ var syncSend = false
var headers = List[String]()
- var ack = "client";
+ var ack = "auto"
var selector:String = null
- var destinationType = "queue";
- var destinationCount = 1;
+ var destinationType = "queue"
+ var destinationCount = 1
- val producerCounter = new AtomicLong();
- val consumerCounter = new AtomicLong();
+ val producerCounter = new AtomicLong()
+ val consumerCounter = new AtomicLong()
val done = new AtomicBoolean()
def main(args:Array[String]) = run
@@ -66,7 +66,7 @@ object StompLoadClient {
def run() = {
println("=======================")
- println("Press ENTER to shutdown");
+ println("Press ENTER to shutdown")
println("=======================")
println("")
@@ -74,29 +74,35 @@ object StompLoadClient {
done.set(false)
var producerThreads = List[ProducerThread]()
for (i <- 0 until producers) {
- val producerThread = new ProducerThread(i);
+ val producerThread = new ProducerThread(i)
producerThreads = producerThread :: producerThreads
- producerThread.start();
+ producerThread.start()
}
var consumerThreads = List[ConsumerThread]()
for (i <- 0 until consumers) {
- val consumerThread = new ConsumerThread(i);
+ val consumerThread = new ConsumerThread(i)
consumerThreads = consumerThread :: consumerThreads
- consumerThread.start();
+ consumerThread.start()
}
// start a sampling thread...
val sampleThread = new Thread() {
override def run() = {
try {
- var start = System.nanoTime();
+ producerCounter.set(0)
+ consumerCounter.set(0)
+ var start = System.nanoTime()
while( !done.get ) {
Thread.sleep(sampleInterval)
- val end = System.nanoTime();
- printRate("Producer", producerCounter, end - start);
- printRate("Consumer", consumerCounter, end - start);
- start = end;
+ val end = System.nanoTime()
+ if( producers > 0 ) {
+ printRate("Producer", producerCounter, end - start)
+ }
+ if( consumers > 0 ) {
+ printRate("Consumer", consumerCounter, end - start)
+ }
+ start = end
}
} catch {
case e:InterruptedException =>
@@ -111,20 +117,16 @@ object StompLoadClient {
// wait for the threads to finish..
for( thread <- consumerThreads ) {
- thread.client.close
- thread.interrupt
- thread.join
+ thread.shutdown
}
for( thread <- producerThreads ) {
- thread.client.close
- thread.interrupt
- thread.join
+ thread.shutdown
}
sampleThread.interrupt
sampleThread.join
println("=======================")
- println("Shutdown");
+ println("Shutdown")
println("=======================")
}
@@ -157,64 +159,30 @@ object StompLoadClient {
}
def printRate(name: String, counter: AtomicLong, nanos: Long) = {
- val c = counter.getAndSet(0);
- val rate_per_second: java.lang.Float = ((1.0f * c / nanos) * NANOS_PER_SECOND);
- println(format("%s rate: %,.3f per second", name, rate_per_second));
+ val c = counter.getAndSet(0)
+ val rate_per_second: java.lang.Float = ((1.0f * c / nanos) * NANOS_PER_SECOND)
+ println(format("%s rate: %,.3f per second", name, rate_per_second))
}
def destination(i:Int) = "/"+destinationType+"/load-"+(i%destinationCount)
- object StompClient {
- def connect(proc: StompClient=>Unit ) = {
- val client = new StompClient();
- try {
- val connectUri = new URI(uri);
- client.open(connectUri.getHost(), connectUri.getPort());
- client.send("""CONNECT
-
-""")
- client.flush
- client.receive("CONNECTED")
-
- proc(client)
- } catch {
- case e: Throwable =>
- if(!done.get) {
- println("failure occured: "+e);
- Thread.sleep(1000);
- }
- } finally {
- try {
- client.close();
- } catch {
- case ignore: Throwable =>
- }
- }
- }
- }
-
class StompClient {
- var socket:Socket = null
- var out:OutputStream = null;
+ var socket:Socket = new Socket
+ var out:OutputStream = null
var in:InputStream = null
def open(host: String, port: Int) = {
socket = new Socket
socket.connect(new InetSocketAddress(host, port))
- socket.setSoLinger(true, 0);
+ socket.setSoLinger(true, 0)
out = new BufferedOutputStream(socket.getOutputStream, bufferSize)
in = new BufferedInputStream(socket.getInputStream, bufferSize)
}
def close() = {
- if( socket!=null ) {
- socket.close
- socket = null
- out = null
- in = null
- }
+ socket.close
}
def flush() = {
@@ -234,10 +202,10 @@ object StompLoadClient {
}
def skip():Unit = {
- var c = in.read;
+ var c = in.read
while( c >= 0 ) {
if( c==0 ) {
- return;
+ return
}
c = in.read()
}
@@ -246,12 +214,12 @@ object StompLoadClient {
def receive():String = {
val buffer = new ByteArrayOutputStream(messageSize+200)
- var c = in.read;
+ var c = in.read
while( c >= 0 ) {
if( c==0 ) {
return new String(buffer.toByteArray, "UTF-8")
}
- buffer.write(c);
+ buffer.write(c)
c = in.read()
}
throw new EOFException()
@@ -259,12 +227,12 @@ object StompLoadClient {
def receiveAscii():AsciiBuffer = {
val buffer = new BAOS(messageSize+200)
- var c = in.read;
+ var c = in.read
while( c >= 0 ) {
if( c==0 ) {
return buffer.toBuffer.ascii
}
- buffer.write(c);
+ buffer.write(c)
c = in.read()
}
throw new EOFException()
@@ -280,9 +248,46 @@ object StompLoadClient {
}
- class ProducerThread(val id: Int) extends Thread {
- val name: String = "producer " + id;
- var client:StompClient=null
+
+ class ClientSupport extends Thread {
+
+ var client:StompClient=new StompClient()
+
+ def connect(proc: =>Unit ) = {
+ try {
+ val connectUri = new URI(uri)
+ client.open(connectUri.getHost(), connectUri.getPort())
+ client.send("""CONNECT
+
+""")
+ client.flush
+ client.receive("CONNECTED")
+ proc
+ } catch {
+ case e: Throwable =>
+ if(!done.get) {
+ println("failure occured: "+e)
+ Thread.sleep(1000)
+ }
+ } finally {
+ try {
+ client.close()
+ } catch {
+ case ignore: Throwable =>
+ }
+ }
+ }
+
+ def shutdown = {
+ interrupt
+ client.close
+ join
+ }
+
+ }
+
+ class ProducerThread(val id: Int) extends ClientSupport {
+ val name: String = "producer " + id
val content = ("SEND\n" +
"destination:"+destination(id)+"\n"+
{ if(persistent) "persistent:true\n" else "" } +
@@ -291,11 +296,12 @@ object StompLoadClient {
{ if(useContentLength) "content-length:"+messageSize+"\n" else "" } +
"\n"+message(name)).getBytes("UTF-8")
+
override def run() {
while (!done.get) {
- StompClient.connect { client =>
+ connect {
this.client=client
- var i =0;
+ var i =0
while (!done.get) {
client.send(content)
if( syncSend ) {
@@ -303,10 +309,10 @@ object StompLoadClient {
client.flush
client.skip
}
- producerCounter.incrementAndGet();
+ producerCounter.incrementAndGet()
if(producerSleep > 0) {
client.flush
- Thread.sleep(producerSleep);
+ Thread.sleep(producerSleep)
}
i += 1
}
@@ -317,7 +323,7 @@ object StompLoadClient {
def message(name:String) = {
val buffer = new StringBuffer(messageSize)
- buffer.append("Message from " + name+"\n");
+ buffer.append("Message from " + name+"\n")
for( i <- buffer.length to messageSize ) {
buffer.append(('a'+(i%26)).toChar)
}
@@ -329,15 +335,13 @@ object StompLoadClient {
}
}
- class ConsumerThread(val id: Int) extends Thread {
- val name: String = "producer " + id;
- var client:StompClient=null
+ class ConsumerThread(val id: Int) extends ClientSupport {
+ val name: String = "producer " + id
override def run() {
while (!done.get) {
- StompClient.connect { client =>
- this.client=client
- val headers = Map[AsciiBuffer, AsciiBuffer]();
+ connect {
+ val headers = Map[AsciiBuffer, AsciiBuffer]()
client.send("""
SUBSCRIBE""" + (if(selector==null) {""} else {
"""
@@ -372,8 +376,8 @@ message-id:"""+msgId+"""
} else {
client.skip
}
- consumerCounter.incrementAndGet();
- Thread.sleep(consumerSleep);
+ consumerCounter.incrementAndGet()
+ Thread.sleep(consumerSleep)
}
}
}