You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/11/17 20:48:48 UTC

svn commit: r476279 - in /incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf: PerfConsumer.java SimpleTopicTest.java

Author: chirino
Date: Fri Nov 17 11:48:47 2006
New Revision: 476279

URL: http://svn.apache.org/viewvc?view=rev&rev=476279
Log:
added ability to have the consumer slow down. also better rate info.

Modified:
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfConsumer.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfConsumer.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfConsumer.java?view=diff&rev=476279&r1=476278&r2=476279
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfConsumer.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfConsumer.java Fri Nov 17 11:48:47 2006
@@ -32,6 +32,8 @@
 public class PerfConsumer implements MessageListener{
     protected Connection connection;
     protected MessageConsumer consumer;
+    protected long sleepDuration;
+    
     protected PerfRate rate=new PerfRate();
     public PerfConsumer(ConnectionFactory fac,Destination dest,String consumerName) throws JMSException{
         connection=fac.createConnection();
@@ -62,5 +64,18 @@
     }
     public void onMessage(Message msg){
         rate.increment();
-    }
+        try {
+        	if( sleepDuration!=0 ) {
+        		Thread.sleep(sleepDuration);
+        	}
+		} catch (InterruptedException e) {
+		}        
+    }
+    
+	public synchronized long getSleepDuration() {
+		return sleepDuration;
+	}
+	public synchronized void setSleepDuration(long sleepDuration) {
+		this.sleepDuration = sleepDuration;
+	}
 }

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java?view=diff&rev=476279&r1=476278&r2=476279
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java Fri Nov 17 11:48:47 2006
@@ -52,6 +52,7 @@
     protected byte[] array=null;
     protected ConnectionFactory factory;
     protected Destination destination;
+    protected long CONSUMER_SLEEP_DURATION = 0;
 
     /**
      * Sets up a test where the producer and consumer have their own connection.
@@ -67,11 +68,14 @@
         Session session=con.createSession(false,Session.AUTO_ACKNOWLEDGE);
         
         destination=createDestination(session,DESTINATION_NAME);
+        log.info("Testing against destination: "+destination);
+        
         con.close();
         producers=new PerfProducer[NUMBER_OF_PRODUCERS];
         consumers=new PerfConsumer[NUMBER_OF_CONSUMERS];
         for(int i=0;i<NUMBER_OF_CONSUMERS;i++){
             consumers[i]=createConsumer(factory,destination,i);
+            consumers[i].setSleepDuration(CONSUMER_SLEEP_DURATION);
         }
         for(int i=0;i<NUMBER_OF_PRODUCERS;i++){
             array=new byte[PAYLOAD_SIZE];
@@ -128,11 +132,7 @@
     }
 
     protected ActiveMQConnectionFactory createConnectionFactory() throws Exception{
-        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(bindAddress);
-//        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?marshal=true");
-//        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?marshal=true&wireFormat.cacheEnabled=false");
-       // cf.setAsyncDispatch(false);
-        return cf;
+        return new ActiveMQConnectionFactory(bindAddress);
     }
 
     public void testPerformance() throws JMSException, InterruptedException{
@@ -160,28 +160,28 @@
     }
 
     protected void dumpProducerRate(){
-        int count=0;
+        int totalRate=0;
         int totalCount=0;
         for(int i=0;i<producers.length;i++){
-            count+=producers[i].getRate().getRate();
+            totalRate+=producers[i].getRate().getRate();
             totalCount+=consumers[i].getRate().getTotalCount();
         }
-        count=count/producers.length;
-        log.info("Producer rate = "+count+" msg/sec total count = "+totalCount);
+        int avgRate = totalRate/producers.length;
+        log.info("Avg producer rate = "+avgRate+" msg/sec | Total rate = "+totalRate+", sent = "+totalCount);
         for(int i=0;i<producers.length;i++){
             producers[i].getRate().reset();
         }
     }
 
     protected void dumpConsumerRate(){
-        int rate=0;
+        int totalRate=0;
         int totalCount=0;
         for(int i=0;i<consumers.length;i++){
-            rate+=consumers[i].getRate().getRate();
+            totalRate+=consumers[i].getRate().getRate();
             totalCount+=consumers[i].getRate().getTotalCount();
         }
-        rate=rate/consumers.length;
-        log.info("Consumer rate = "+rate+" msg/sec total count = "+totalCount);
+        int avgRate = totalRate/consumers.length;
+        log.info("Avg consumer rate = "+avgRate+" msg/sec | Total rate = "+totalRate+", received = "+totalCount);
         for(int i=0;i<consumers.length;i++){
             consumers[i].getRate().reset();
         }