You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/03/06 08:24:17 UTC

svn commit: r383485 - in /incubator/activemq/trunk/activemq-core/src/test: java/org/apache/activemq/perf/ resources/org/apache/activemq/perf/

Author: rajdavies
Date: Sun Mar  5 23:24:16 2006
New Revision: 383485

URL: http://svn.apache.org/viewcvs?rev=383485&view=rev
Log:
Added slow consumer test

Added:
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SlowConsumer.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SlowConsumerTopicTest.java
    incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/perf/slowConsumerBroker.xml
Modified:
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java?rev=383485&r1=383484&r2=383485&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java Sun Mar  5 23:24:16 2006
@@ -44,6 +44,8 @@
     protected int PAYLOAD_SIZE=1024;
     protected int MESSAGE_COUNT=1000000;
     protected byte[] array=null;
+    protected ConnectionFactory factory;
+    protected Destination destination;
 
     /**
      * Sets up a test where the producer and consumer have their own connection.
@@ -58,21 +60,21 @@
         for(int i=0;i<array.length;i++){
             array[i]=(byte) i;
         }
-        ConnectionFactory fac=createConnectionFactory();
-        Connection con=fac.createConnection();
+        factory=createConnectionFactory();
+        Connection con=factory.createConnection();
         Session session=con.createSession(false,Session.AUTO_ACKNOWLEDGE);
         payload=session.createBytesMessage();
         payload.writeBytes(array);
-        Destination dest=createDestination(session,DESTINATION_NAME);
+        destination=createDestination(session,DESTINATION_NAME);
         con.close();
         producers=new PerfProducer[NUMBER_OF_PRODUCERS];
         consumers=new PerfConsumer[NUMBER_OF_CONSUMERS];
         for(int i=0;i<NUMBER_OF_CONSUMERS;i++){
-            consumers[i]=createConsumer(fac,dest,i);
+            consumers[i]=createConsumer(factory,destination,i);
             consumers[i].start();
         }
         for(int i=0;i<NUMBER_OF_PRODUCERS;i++){
-            producers[i]=createProducer(fac,dest,i);
+            producers[i]=createProducer(factory,destination,i);
             producers[i].start();
         }
         super.setUp();
@@ -125,7 +127,7 @@
         ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(bindAddress);
 //        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?marshal=true");
 //        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?marshal=true&wireFormat.cacheEnabled=false");
-        cf.setAsyncDispatch(false);
+       // cf.setAsyncDispatch(false);
         return cf;
     }
 

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SlowConsumer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SlowConsumer.java?rev=383485&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SlowConsumer.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SlowConsumer.java Sun Mar  5 23:24:16 2006
@@ -0,0 +1,41 @@
+/**
+ * 
+ * Copyright 2005-2006 The Apache Software Foundation
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.activemq.perf;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+/**
+ * @version $Revision: 1.3 $
+ */
+public class SlowConsumer extends PerfConsumer{
+    public SlowConsumer(ConnectionFactory fac,Destination dest,String consumerName,boolean slowConsumer)
+                    throws JMSException{
+        super(fac,dest,consumerName);
+    }
+
+    public SlowConsumer(ConnectionFactory fac,Destination dest) throws JMSException{
+        super(fac,dest,null);
+    }
+
+    public void onMessage(Message msg){
+        super.onMessage(msg);
+        try{
+            Thread.sleep(10000);
+        }catch(InterruptedException e){
+            e.printStackTrace();
+        }
+    }
+}
\ No newline at end of file

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SlowConsumerTopicTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SlowConsumerTopicTest.java?rev=383485&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SlowConsumerTopicTest.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SlowConsumerTopicTest.java Sun Mar  5 23:24:16 2006
@@ -0,0 +1,64 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.perf;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Session;
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.xbean.BrokerFactoryBean;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.core.io.ClassPathResource;
+import org.springframework.core.io.Resource;
+/**
+ * @version $Revision: 1.3 $
+ */
+public class SlowConsumerTopicTest extends SimpleTopicTest{
+    protected PerfConsumer[] slowConsumers;
+    protected int NUMBER_OF_SLOW_CONSUMERS=1;
+    
+    
+    protected void setUp() throws Exception{
+        super.setUp();
+        
+        slowConsumers=new SlowConsumer[NUMBER_OF_SLOW_CONSUMERS];
+        for(int i=0;i<NUMBER_OF_SLOW_CONSUMERS;i++){
+            consumers[i]=createSlowConsumer(factory,destination,i);
+            consumers[i].start();
+        }
+    }
+    
+    protected PerfConsumer createSlowConsumer(ConnectionFactory fac,Destination dest,int number) throws JMSException{
+        return new SlowConsumer(fac,dest);
+    }
+    
+    protected BrokerService createBroker() throws Exception{
+        Resource resource=new ClassPathResource("org/apache/activemq/perf/slowConsumerBroker.xml");
+        BrokerFactoryBean factory=new BrokerFactoryBean(resource);
+        factory.afterPropertiesSet();
+        BrokerService broker =factory.getBroker();
+        broker.start();
+        return broker;
+    }
+}
\ No newline at end of file

Added: incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/perf/slowConsumerBroker.xml
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/perf/slowConsumerBroker.xml?rev=383485&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/perf/slowConsumerBroker.xml (added)
+++ incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/perf/slowConsumerBroker.xml Sun Mar  5 23:24:16 2006
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Copyright 2005-2006 The Apache Software Foundation
+   
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans xmlns="http://activemq.org/config/1.0">
+
+  <broker brokerName="slowConsumerBroker" persistent="true" useShutdownHook="false">
+    <transportConnectors>
+      <transportConnector uri="tcp://localhost:61616"/>
+    </transportConnectors>
+     <destinationPolicy>
+      <policyMap>
+        <policyEntries>
+          <policyEntry topic=">">            
+            <!-- lets force old messages to be discarded for slow consumers -->
+            <pendingMessageLimitStrategy>
+              <constantPendingMessageLimitStrategy limit="10"/>
+            </pendingMessageLimitStrategy>
+          </policyEntry>
+        </policyEntries>
+      </policyMap>
+    </destinationPolicy>
+   
+  </broker>
+
+</beans>
+