You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2011/08/26 19:15:46 UTC

svn commit: r1162175 - in /activemq/trunk/activemq-camel/src/test: java/org/apache/activemq/camel/TransactedConsumeTest.java resources/org/apache/activemq/camel/transactedconsume.xml

Author: gtully
Date: Fri Aug 26 17:15:46 2011
New Revision: 1162175

URL: http://svn.apache.org/viewvc?rev=1162175&view=rev
Log:
second route is the way to use a second connection and let spring do the caching

Modified:
    activemq/trunk/activemq-camel/src/test/java/org/apache/activemq/camel/TransactedConsumeTest.java
    activemq/trunk/activemq-camel/src/test/resources/org/apache/activemq/camel/transactedconsume.xml

Modified: activemq/trunk/activemq-camel/src/test/java/org/apache/activemq/camel/TransactedConsumeTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-camel/src/test/java/org/apache/activemq/camel/TransactedConsumeTest.java?rev=1162175&r1=1162174&r2=1162175&view=diff
==============================================================================
--- activemq/trunk/activemq-camel/src/test/java/org/apache/activemq/camel/TransactedConsumeTest.java (original)
+++ activemq/trunk/activemq-camel/src/test/java/org/apache/activemq/camel/TransactedConsumeTest.java Fri Aug 26 17:15:46 2011
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.camel;
 
+import java.io.File;
+import java.util.concurrent.atomic.AtomicLong;
 import javax.jms.Connection;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
@@ -26,6 +28,7 @@ import org.apache.activemq.broker.region
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.store.amq.AMQPersistenceAdapter;
 import org.apache.activemq.util.Wait;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
@@ -40,7 +43,7 @@ import org.springframework.context.suppo
 public class TransactedConsumeTest extends CamelSpringTestSupport {
     private static final Logger LOG = LoggerFactory.getLogger(TransactedConsumeTest.class);
     BrokerService broker = null;
-    int messageCount = 1000;
+    int messageCount = 100000;
 
     @Test
     public void testConsume() throws Exception {
@@ -53,6 +56,8 @@ public class TransactedConsumeTest exten
                 return broker.getAdminView().getTotalDequeueCount() >= messageCount;
             }
         }, 20 * 60 * 1000));
+        long duration = System.currentTimeMillis() - firstConsumed.get();
+        LOG.info("Done message consumption in " + duration + "millis");
     }
 
     private void sendJMSMessageToKickOffRoute() throws Exception {
@@ -78,12 +83,14 @@ public class TransactedConsumeTest exten
 
         PolicyMap policyMap = new PolicyMap();
         PolicyEntry defaultPolicy = new PolicyEntry();
-       // defaultPolicy.setStrictOrderDispatch(false);
         policyMap.setDefaultEntry(defaultPolicy);
         brokerService.setDestinationPolicy(policyMap);
 
         brokerService.setAdvisorySupport(false);
         brokerService.setDataDirectory("target/data");
+        AMQPersistenceAdapter amq = new AMQPersistenceAdapter();
+        amq.setDirectory(new File("target/data"));
+        brokerService.setPersistenceAdapter(amq);
         brokerService.addConnector("tcp://localhost:61616");
         return brokerService;
     }
@@ -110,13 +117,21 @@ public class TransactedConsumeTest exten
         return new ClassPathXmlApplicationContext("org/apache/activemq/camel/transactedconsume.xml");
     }
 
+    static AtomicLong firstConsumed = new AtomicLong();
+    static AtomicLong consumed = new AtomicLong();
+
     static class ConnectionLog implements Processor  {
 
         @Override
         public void process(Exchange exchange) throws Exception {
+            if (consumed.getAndIncrement() == 0) {
+                firstConsumed.set(System.currentTimeMillis());
+            }
             ActiveMQTextMessage m = (ActiveMQTextMessage) ((JmsMessage)exchange.getIn()).getJmsMessage();
-            Thread.currentThread().sleep(10);
-            LOG.info("received on " + m.getConnection().toString());
+            //Thread.currentThread().sleep(500);
+            if (consumed.get() %500 == 0) {
+                LOG.info("received on " + m.getConnection().toString());
+            }
         }
     }
 

Modified: activemq/trunk/activemq-camel/src/test/resources/org/apache/activemq/camel/transactedconsume.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-camel/src/test/resources/org/apache/activemq/camel/transactedconsume.xml?rev=1162175&r1=1162174&r2=1162175&view=diff
==============================================================================
--- activemq/trunk/activemq-camel/src/test/resources/org/apache/activemq/camel/transactedconsume.xml (original)
+++ activemq/trunk/activemq-camel/src/test/resources/org/apache/activemq/camel/transactedconsume.xml Fri Aug 26 17:15:46 2011
@@ -28,12 +28,12 @@
     <context:annotation-config/>
 
     <bean id="vhfBatchListenerJMSConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
-        <property name="brokerURL" value="tcp://localhost:61616?jms.prefetchPolicy.all=1"/>
+        <property name="brokerURL" value="tcp://localhost:61616?jms.prefetchPolicy.all=1000"/>
     </bean>
 
     <bean id="vhfBatchListenerPooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
-        <property name="maxConnections" value="10"/>
-        <property name="maximumActive" value="10"/>
+        <property name="maxConnections" value="1"/>
+        <property name="maximumActive" value="1"/>
         <property name="connectionFactory" ref="vhfBatchListenerJMSConnectionFactory"/>
     </bean>
 
@@ -66,13 +66,23 @@
         <property name="configuration" ref="vhfBatchListenerJMSConfig"/>
     </bean>
 
+    <bean id="activemq2" class="org.apache.activemq.camel.component.ActiveMQComponent">
+        <property name="configuration" ref="vhfBatchListenerJMSConfig"/>
+    </bean>
+
     <camelContext xmlns="http://camel.apache.org/schema/spring">
         <route>
             <from uri="activemq:queue:scp_transacted"/>
             <!-- transacted /-->
-            <process ref="connectonLog"/>
+            <process ref="connectionLog"/>
+        </route>
+
+        <!-- marginally better through put with a second route/connection -->
+        <route>
+            <from uri="activemq2:queue:scp_transacted"/>
+            <process ref="connectionLog"/>
         </route>
     </camelContext>
 
-    <bean id="connectonLog" class="org.apache.activemq.camel.TransactedConsumeTest.ConnectionLog"/>
+    <bean id="connectionLog" class="org.apache.activemq.camel.TransactedConsumeTest.ConnectionLog"/>
 </beans>