You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2011/08/26 19:15:46 UTC
svn commit: r1162175 - in /activemq/trunk/activemq-camel/src/test:
java/org/apache/activemq/camel/TransactedConsumeTest.java
resources/org/apache/activemq/camel/transactedconsume.xml
Author: gtully
Date: Fri Aug 26 17:15:46 2011
New Revision: 1162175
URL: http://svn.apache.org/viewvc?rev=1162175&view=rev
Log:
second route is the way to use a second connection and let spring do the caching
Modified:
activemq/trunk/activemq-camel/src/test/java/org/apache/activemq/camel/TransactedConsumeTest.java
activemq/trunk/activemq-camel/src/test/resources/org/apache/activemq/camel/transactedconsume.xml
Modified: activemq/trunk/activemq-camel/src/test/java/org/apache/activemq/camel/TransactedConsumeTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-camel/src/test/java/org/apache/activemq/camel/TransactedConsumeTest.java?rev=1162175&r1=1162174&r2=1162175&view=diff
==============================================================================
--- activemq/trunk/activemq-camel/src/test/java/org/apache/activemq/camel/TransactedConsumeTest.java (original)
+++ activemq/trunk/activemq-camel/src/test/java/org/apache/activemq/camel/TransactedConsumeTest.java Fri Aug 26 17:15:46 2011
@@ -16,6 +16,8 @@
*/
package org.apache.activemq.camel;
+import java.io.File;
+import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
import javax.jms.MessageProducer;
import javax.jms.Session;
@@ -26,6 +28,7 @@ import org.apache.activemq.broker.region
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.store.amq.AMQPersistenceAdapter;
import org.apache.activemq.util.Wait;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
@@ -40,7 +43,7 @@ import org.springframework.context.suppo
public class TransactedConsumeTest extends CamelSpringTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(TransactedConsumeTest.class);
BrokerService broker = null;
- int messageCount = 1000;
+ int messageCount = 100000;
@Test
public void testConsume() throws Exception {
@@ -53,6 +56,8 @@ public class TransactedConsumeTest exten
return broker.getAdminView().getTotalDequeueCount() >= messageCount;
}
}, 20 * 60 * 1000));
+ long duration = System.currentTimeMillis() - firstConsumed.get();
+ LOG.info("Done message consumption in " + duration + "millis");
}
private void sendJMSMessageToKickOffRoute() throws Exception {
@@ -78,12 +83,14 @@ public class TransactedConsumeTest exten
PolicyMap policyMap = new PolicyMap();
PolicyEntry defaultPolicy = new PolicyEntry();
- // defaultPolicy.setStrictOrderDispatch(false);
policyMap.setDefaultEntry(defaultPolicy);
brokerService.setDestinationPolicy(policyMap);
brokerService.setAdvisorySupport(false);
brokerService.setDataDirectory("target/data");
+ AMQPersistenceAdapter amq = new AMQPersistenceAdapter();
+ amq.setDirectory(new File("target/data"));
+ brokerService.setPersistenceAdapter(amq);
brokerService.addConnector("tcp://localhost:61616");
return brokerService;
}
@@ -110,13 +117,21 @@ public class TransactedConsumeTest exten
return new ClassPathXmlApplicationContext("org/apache/activemq/camel/transactedconsume.xml");
}
+ static AtomicLong firstConsumed = new AtomicLong();
+ static AtomicLong consumed = new AtomicLong();
+
static class ConnectionLog implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
+ if (consumed.getAndIncrement() == 0) {
+ firstConsumed.set(System.currentTimeMillis());
+ }
ActiveMQTextMessage m = (ActiveMQTextMessage) ((JmsMessage)exchange.getIn()).getJmsMessage();
- Thread.currentThread().sleep(10);
- LOG.info("received on " + m.getConnection().toString());
+ //Thread.currentThread().sleep(500);
+ if (consumed.get() %500 == 0) {
+ LOG.info("received on " + m.getConnection().toString());
+ }
}
}
Modified: activemq/trunk/activemq-camel/src/test/resources/org/apache/activemq/camel/transactedconsume.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-camel/src/test/resources/org/apache/activemq/camel/transactedconsume.xml?rev=1162175&r1=1162174&r2=1162175&view=diff
==============================================================================
--- activemq/trunk/activemq-camel/src/test/resources/org/apache/activemq/camel/transactedconsume.xml (original)
+++ activemq/trunk/activemq-camel/src/test/resources/org/apache/activemq/camel/transactedconsume.xml Fri Aug 26 17:15:46 2011
@@ -28,12 +28,12 @@
<context:annotation-config/>
<bean id="vhfBatchListenerJMSConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
- <property name="brokerURL" value="tcp://localhost:61616?jms.prefetchPolicy.all=1"/>
+ <property name="brokerURL" value="tcp://localhost:61616?jms.prefetchPolicy.all=1000"/>
</bean>
<bean id="vhfBatchListenerPooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
- <property name="maxConnections" value="10"/>
- <property name="maximumActive" value="10"/>
+ <property name="maxConnections" value="1"/>
+ <property name="maximumActive" value="1"/>
<property name="connectionFactory" ref="vhfBatchListenerJMSConnectionFactory"/>
</bean>
@@ -66,13 +66,23 @@
<property name="configuration" ref="vhfBatchListenerJMSConfig"/>
</bean>
+ <bean id="activemq2" class="org.apache.activemq.camel.component.ActiveMQComponent">
+ <property name="configuration" ref="vhfBatchListenerJMSConfig"/>
+ </bean>
+
<camelContext xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="activemq:queue:scp_transacted"/>
<!-- transacted /-->
- <process ref="connectonLog"/>
+ <process ref="connectionLog"/>
+ </route>
+
+ <!-- marginally better through put with a second route/connection -->
+ <route>
+ <from uri="activemq2:queue:scp_transacted"/>
+ <process ref="connectionLog"/>
</route>
</camelContext>
- <bean id="connectonLog" class="org.apache.activemq.camel.TransactedConsumeTest.ConnectionLog"/>
+ <bean id="connectionLog" class="org.apache.activemq.camel.TransactedConsumeTest.ConnectionLog"/>
</beans>