You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:12:12 UTC

svn commit: r961160 - /activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala

Author: chirino
Date: Wed Jul  7 04:12:12 2010
New Revision: 961160

URL: http://svn.apache.org/viewvc?rev=961160&view=rev
Log:
cleaning up the load client a bit.  better error handling and rate reporting.

Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala?rev=961160&r1=961159&r2=961160&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala Wed Jul  7 04:12:12 2010
@@ -3,7 +3,7 @@
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
+ * (the "License") you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
  *      http://www.apache.org/licenses/LICENSE-2.0
@@ -35,30 +35,30 @@ import org.apache.activemq.apollo.stomp.
  */
 object StompLoadClient {
 
-  val NANOS_PER_SECOND = NANOSECONDS.convert(1, SECONDS);
+  val NANOS_PER_SECOND = NANOSECONDS.convert(1, SECONDS)
   import StompLoadClient._
   implicit def toAsciiBuffer(value: String) = new AsciiBuffer(value)
 
-  var producerSleep = 0;
-  var consumerSleep = 0;
-  var producers = 1;
-  var consumers = 1;
-  var sampleInterval = 5 * 1000;
-  var uri = "stomp://127.0.0.1:61613";
+  var producerSleep = 0
+  var consumerSleep = 0
+  var producers = 1
+  var consumers = 1
+  var sampleInterval = 5 * 1000
+  var uri = "stomp://127.0.0.1:61613"
   var bufferSize = 64*1204
-  var messageSize = 1024;
+  var messageSize = 1024
   var useContentLength=true
-  var persistent = false;
-  var syncSend = false;
+  var persistent = false
+  var syncSend = false
   var headers = List[String]()
-  var ack = "client";
+  var ack = "auto"
   var selector:String = null
 
-  var destinationType = "queue";
-  var destinationCount = 1;
+  var destinationType = "queue"
+  var destinationCount = 1
 
-  val producerCounter = new AtomicLong();
-  val consumerCounter = new AtomicLong();
+  val producerCounter = new AtomicLong()
+  val consumerCounter = new AtomicLong()
   val done = new AtomicBoolean()
 
   def main(args:Array[String]) = run
@@ -66,7 +66,7 @@ object StompLoadClient {
   def run() = {
 
     println("=======================")
-    println("Press ENTER to shutdown");
+    println("Press ENTER to shutdown")
     println("=======================")
     println("")
 
@@ -74,29 +74,35 @@ object StompLoadClient {
     done.set(false)
     var producerThreads = List[ProducerThread]()
     for (i <- 0 until producers) {
-      val producerThread = new ProducerThread(i);
+      val producerThread = new ProducerThread(i)
       producerThreads = producerThread :: producerThreads
-      producerThread.start();
+      producerThread.start()
     }
 
     var consumerThreads = List[ConsumerThread]()
     for (i <- 0 until consumers) {
-      val consumerThread = new ConsumerThread(i);
+      val consumerThread = new ConsumerThread(i)
       consumerThreads = consumerThread :: consumerThreads
-      consumerThread.start();
+      consumerThread.start()
     }
 
     // start a sampling thread...
     val sampleThread = new Thread() {
       override def run() = {
         try {
-          var start = System.nanoTime();
+          producerCounter.set(0)
+          consumerCounter.set(0)
+          var start = System.nanoTime()
           while( !done.get ) {
             Thread.sleep(sampleInterval)
-            val end = System.nanoTime();
-            printRate("Producer", producerCounter, end - start);
-            printRate("Consumer", consumerCounter, end - start);
-            start = end;
+            val end = System.nanoTime()
+            if( producers > 0 ) {
+              printRate("Producer", producerCounter, end - start)
+            }
+            if( consumers > 0 ) {
+              printRate("Consumer", consumerCounter, end - start)
+            }
+            start = end
           }
         } catch {
           case e:InterruptedException =>
@@ -111,20 +117,16 @@ object StompLoadClient {
 
     // wait for the threads to finish..
     for( thread <- consumerThreads ) {
-      thread.client.close
-      thread.interrupt
-      thread.join
+      thread.shutdown
     }
     for( thread <- producerThreads ) {
-      thread.client.close
-      thread.interrupt
-      thread.join
+      thread.shutdown
     }
     sampleThread.interrupt
     sampleThread.join
 
     println("=======================")
-    println("Shutdown");
+    println("Shutdown")
     println("=======================")
 
   }
@@ -157,64 +159,30 @@ object StompLoadClient {
   }
 
   def printRate(name: String, counter: AtomicLong, nanos: Long) = {
-    val c = counter.getAndSet(0);
-    val rate_per_second: java.lang.Float = ((1.0f * c / nanos) * NANOS_PER_SECOND);
-    println(format("%s rate: %,.3f per second", name, rate_per_second));
+    val c = counter.getAndSet(0)
+    val rate_per_second: java.lang.Float = ((1.0f * c / nanos) * NANOS_PER_SECOND)
+    println(format("%s rate: %,.3f per second", name, rate_per_second))
   }
 
   def destination(i:Int) = "/"+destinationType+"/load-"+(i%destinationCount)
 
 
-  object StompClient {
-    def connect(proc: StompClient=>Unit ) = {
-      val client = new StompClient();
-      try {
-        val connectUri = new URI(uri);
-        client.open(connectUri.getHost(), connectUri.getPort());
-        client.send("""CONNECT
-
-""")
-        client.flush
-        client.receive("CONNECTED")
-
-        proc(client)
-      } catch {
-        case e: Throwable =>
-          if(!done.get) {
-            println("failure occured: "+e);
-            Thread.sleep(1000);
-          }
-      } finally {
-        try {
-          client.close();
-        } catch {
-          case ignore: Throwable =>
-        }
-      }
-    }
-  }
-
   class StompClient {
 
-    var socket:Socket = null
-    var out:OutputStream = null;
+    var socket:Socket = new Socket
+    var out:OutputStream = null
     var in:InputStream = null
 
     def open(host: String, port: Int) = {
       socket = new Socket
       socket.connect(new InetSocketAddress(host, port))
-      socket.setSoLinger(true, 0);
+      socket.setSoLinger(true, 0)
       out = new BufferedOutputStream(socket.getOutputStream, bufferSize)
       in = new BufferedInputStream(socket.getInputStream, bufferSize)
     }
 
     def close() = {
-      if( socket!=null ) {
-        socket.close
-        socket = null
-        out = null
-        in = null
-      }
+      socket.close
     }
 
     def flush() = {
@@ -234,10 +202,10 @@ object StompLoadClient {
     }
 
     def skip():Unit = {
-      var c = in.read;
+      var c = in.read
       while( c >= 0 ) {
         if( c==0 ) {
-          return;
+          return
         }
         c = in.read()
       }
@@ -246,12 +214,12 @@ object StompLoadClient {
 
     def receive():String = {
       val buffer = new ByteArrayOutputStream(messageSize+200)
-      var c = in.read;
+      var c = in.read
       while( c >= 0 ) {
         if( c==0 ) {
           return new String(buffer.toByteArray, "UTF-8")
         }
-        buffer.write(c);
+        buffer.write(c)
         c = in.read()
       }
       throw new EOFException()
@@ -259,12 +227,12 @@ object StompLoadClient {
 
     def receiveAscii():AsciiBuffer = {
       val buffer = new BAOS(messageSize+200)
-      var c = in.read;
+      var c = in.read
       while( c >= 0 ) {
         if( c==0 ) {
           return buffer.toBuffer.ascii
         }
-        buffer.write(c);
+        buffer.write(c)
         c = in.read()
       }
       throw new EOFException()
@@ -280,9 +248,46 @@ object StompLoadClient {
 
   }
 
-  class ProducerThread(val id: Int) extends Thread {
-    val name: String = "producer " + id;
-    var client:StompClient=null
+  
+  class ClientSupport extends Thread {
+
+    var client:StompClient=new StompClient()
+    
+    def connect(proc: =>Unit ) = {
+      try {
+        val connectUri = new URI(uri)
+        client.open(connectUri.getHost(), connectUri.getPort())
+        client.send("""CONNECT
+
+""")
+        client.flush
+        client.receive("CONNECTED")
+        proc
+      } catch {
+        case e: Throwable =>
+          if(!done.get) {
+            println("failure occured: "+e)
+            Thread.sleep(1000)
+          }
+      } finally {
+        try {
+          client.close()
+        } catch {
+          case ignore: Throwable =>
+        }
+      }
+    }
+
+    def shutdown = {
+      interrupt
+      client.close
+      join
+    }
+
+  }
+  
+  class ProducerThread(val id: Int) extends ClientSupport {
+    val name: String = "producer " + id
     val content = ("SEND\n" +
               "destination:"+destination(id)+"\n"+
                { if(persistent) "persistent:true\n" else "" } +
@@ -291,11 +296,12 @@ object StompLoadClient {
                { if(useContentLength) "content-length:"+messageSize+"\n" else "" } +
               "\n"+message(name)).getBytes("UTF-8")
 
+
     override def run() {
       while (!done.get) {
-        StompClient.connect { client =>
+        connect {
           this.client=client
-          var i =0;
+          var i =0
           while (!done.get) {
             client.send(content)
             if( syncSend ) {
@@ -303,10 +309,10 @@ object StompLoadClient {
               client.flush
               client.skip
             }
-            producerCounter.incrementAndGet();
+            producerCounter.incrementAndGet()
             if(producerSleep > 0) {
               client.flush
-              Thread.sleep(producerSleep);
+              Thread.sleep(producerSleep)
             }
             i += 1
           }
@@ -317,7 +323,7 @@ object StompLoadClient {
 
   def message(name:String) = {
     val buffer = new StringBuffer(messageSize)
-    buffer.append("Message from " + name+"\n");
+    buffer.append("Message from " + name+"\n")
     for( i <- buffer.length to messageSize ) {
       buffer.append(('a'+(i%26)).toChar)
     }
@@ -329,15 +335,13 @@ object StompLoadClient {
     }
   }
 
-  class ConsumerThread(val id: Int) extends Thread {
-    val name: String = "producer " + id;
-    var client:StompClient=null
+  class ConsumerThread(val id: Int) extends ClientSupport {
+    val name: String = "producer " + id
 
     override def run() {
       while (!done.get) {
-        StompClient.connect { client =>
-          this.client=client
-          val headers = Map[AsciiBuffer, AsciiBuffer]();
+        connect {
+          val headers = Map[AsciiBuffer, AsciiBuffer]()
           client.send("""
 SUBSCRIBE""" + (if(selector==null) {""} else {
 """
@@ -372,8 +376,8 @@ message-id:"""+msgId+"""
         } else {
           client.skip
         }
-        consumerCounter.incrementAndGet();
-        Thread.sleep(consumerSleep);
+        consumerCounter.incrementAndGet()
+        Thread.sleep(consumerSleep)
       }
     }
   }