You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by su...@apache.org on 2012/09/26 20:51:54 UTC
svn commit: r1390665 - in /camel/trunk/components/camel-sjms/src:
main/java/org/apache/camel/component/sjms/consumer/
main/java/org/apache/camel/component/sjms/producer/
test/java/org/apache/camel/component/sjms/it/
test/java/org/apache/camel/component...
Author: sully6768
Date: Wed Sep 26 18:51:53 2012
New Revision: 1390665
URL: http://svn.apache.org/viewvc?rev=1390665&view=rev
Log:
Create ConnectionResource integration test and clean up current ITests.
Added:
camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/ConnectionResourceIT.java
Modified:
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/DefaultConsumer.java
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/AsyncJmsInOutIT.java
camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/AsyncJmsInOutTempDestIT.java
camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/SyncJmsInOutIT.java
camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/SyncJmsInOutTempDestIT.java
camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/JmsTestSupport.java
camel/trunk/components/camel-sjms/src/test/resources/log4j.properties
Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/DefaultConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/DefaultConsumer.java?rev=1390665&r1=1390664&r2=1390665&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/DefaultConsumer.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/DefaultConsumer.java Wed Sep 26 18:51:53 2012
@@ -63,9 +63,9 @@ public class DefaultConsumer extends Sjm
protected void destroyObject(MessageConsumerResources model) throws Exception {
if (model != null) {
if (model.getMessageConsumer() != null) {
- if (model.getMessageConsumer().getMessageListener() != null) {
- model.getMessageConsumer().setMessageListener(null);
- }
+// if (model.getMessageConsumer().getMessageListener() != null) {
+// model.getMessageConsumer().setMessageListener(null);
+// }
model.getMessageConsumer().close();
}
Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java?rev=1390665&r1=1390664&r2=1390665&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java Wed Sep 26 18:51:53 2012
@@ -98,8 +98,10 @@ public class InOutProducer extends SjmsP
@Override
public void onMessage(Message message) {
- logger.info("Message Received in the Consumer Pool");
- logger.info(" Message : {}", message);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Message Received in the Consumer Pool");
+ logger.debug(" Message : {}", message);
+ }
try {
Exchanger<Object> exchanger = exchangerMap.get(message.getJMSCorrelationID());
exchanger.exchange(message, getResponseTimeOut(), TimeUnit.MILLISECONDS);
Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/AsyncJmsInOutIT.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/AsyncJmsInOutIT.java?rev=1390665&r1=1390664&r2=1390665&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/AsyncJmsInOutIT.java (original)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/AsyncJmsInOutIT.java Wed Sep 26 18:51:53 2012
@@ -26,7 +26,9 @@ import org.apache.camel.util.StopWatch;
import org.junit.Test;
/**
- * @version
+ * Integration test that verifies the ability of SJMS to correctly process
+ * asynchronous InOut exchanges from both the Producer and Consumer perspective
+ * using a namedReplyTo destination.
*/
public class AsyncJmsInOutIT extends JmsTestSupport {
@@ -53,23 +55,13 @@ public class AsyncJmsInOutIT extends Jms
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- // in a fully sync mode it would take at least 5 + 5 sec to process the 100 messages
- // (there are delays in both routes)
- // however due async routing, we can leverage the fact to let threads non blocked
- // in the first route, and therefore can have the messages processed faster
- // because we can have messages wait concurrently in both routes
- // this means the async processing model is about 2x faster
from("seda:start")
- // we can only send at fastest the 100 msg in 5 sec due the delay
- .delay(50)
- .to("sjms:queue:bar?synchronous=false&exchangePattern=InOut")
+ .to("sjms:queue:in.foo?synchronous=false&namedReplyTo=out.bar&exchangePattern=InOut")
.to("mock:result");
- from("sjms:queue:bar?synchronous=false&exchangePattern=InOut")
+ from("sjms:queue:in.foo?synchronous=false&exchangePattern=InOut")
.log("Using ${threadName} to process ${body}")
- // we can only process at fastest the 100 msg in 5 sec due the delay
- .delay(50)
.transform(body().prepend("Bye "));
}
};
Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/AsyncJmsInOutTempDestIT.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/AsyncJmsInOutTempDestIT.java?rev=1390665&r1=1390664&r2=1390665&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/AsyncJmsInOutTempDestIT.java (original)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/AsyncJmsInOutTempDestIT.java Wed Sep 26 18:51:53 2012
@@ -26,7 +26,9 @@ import org.apache.camel.util.StopWatch;
import org.junit.Test;
/**
- * @version
+ * Integration test that verifies the ability of SJMS to correctly process
+ * asynchronous InOut exchanges from both the Producer and Consumer perspective
+ * using a temporary destination.
*/
public class AsyncJmsInOutTempDestIT extends JmsTestSupport {
@@ -53,23 +55,13 @@ public class AsyncJmsInOutTempDestIT ext
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- // in a fully sync mode it would take at least 5 + 5 sec to process the 100 messages
- // (there are delays in both routes)
- // however due async routing, we can leverage the fact to let threads non blocked
- // in the first route, and therefore can have the messages processed faster
- // because we can have messages wait concurrently in both routes
- // this means the async processing model is about 2x faster
from("seda:start")
- // we can only send at fastest the 100 msg in 5 sec due the delay
- .delay(50)
- .to("sjms:in.out.temp.queue?exchangePattern=InOut&synchronous=false")
+ .to("sjms:in.foo.tempQ?synchronous=false&exchangePattern=InOut")
.to("mock:result");
- from("sjms:in.out.temp.queue?exchangePattern=InOut&synchronous=false")
+ from("sjms:in.foo.tempQ?synchronous=false&exchangePattern=InOut")
.log("Using ${threadName} to process ${body}")
- // we can only process at fastest the 100 msg in 5 sec due the delay
- .delay(50)
.transform(body().prepend("Bye "));
}
};
Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/ConnectionResourceIT.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/ConnectionResourceIT.java?rev=1390665&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/ConnectionResourceIT.java (added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/ConnectionResourceIT.java Wed Sep 26 18:51:53 2012
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.it;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+
+import org.apache.activemq.pool.PooledConnectionFactory;
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.sjms.SjmsComponent;
+import org.apache.camel.component.sjms.jms.ConnectionResource;
+import org.apache.camel.component.sjms.support.JmsTestSupport;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.util.StopWatch;
+import org.junit.Test;
+
+/**
+ * Integration test that verifies we can replace the internal
+ * ConnectionFactoryResource with another provider.
+ *
+ */
+public class ConnectionResourceIT extends JmsTestSupport {
+
+ /**
+ * Test method for
+ * {@link org.apache.camel.component.sjms.jms.ObjectPool#returnObject(java.lang.Object)}
+ * .
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testCreateConnections() throws Exception {
+ ConnectionResource pool = new AMQConnectionResource("tcp://localhost:33333", 1);
+ assertNotNull(pool);
+ Connection connection = pool.borrowConnection();
+ assertNotNull(connection);
+ assertNotNull(connection.createSession(false, Session.AUTO_ACKNOWLEDGE));
+ pool.returnConnection(connection);
+ Connection connection2 = pool.borrowConnection();
+ assertNotNull(connection2);
+ }
+
+ @Test
+ public void testAsynchronous() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(100);
+ mock.expectsNoDuplicates(body());
+
+ StopWatch watch = new StopWatch();
+
+ for (int i = 0; i < 100; i++) {
+ template.sendBody("seda:start", "" + i);
+ }
+
+ // just in case we run on slow boxes
+ assertMockEndpointsSatisfied(20, TimeUnit.SECONDS);
+
+ log.info("Took " + watch.stop() + " ms. to process 100 messages request/reply over JMS");
+ }
+
+ /*
+ * @see org.apache.camel.test.junit4.CamelTestSupport#createCamelContext()
+ * @return
+ * @throws Exception
+ */
+ @Override
+ protected CamelContext createCamelContext() throws Exception {
+ CamelContext camelContext = new DefaultCamelContext();
+ AMQConnectionResource pool = new AMQConnectionResource("tcp://localhost:33333", 1);
+ SjmsComponent component = new SjmsComponent();
+ component.setConnectionResource(pool);
+ camelContext.addComponent("sjms", component);
+ return camelContext;
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+
+ from("seda:start")
+ .to("sjms:queue:in.foo?namedReplyTo=out.bar&exchangePattern=InOut&producerCount=5")
+ .to("mock:result");
+
+ from("sjms:queue:in.foo?exchangePattern=InOut&consumerCount=20")
+ .log("Using ${threadName} to process ${body}")
+ .transform(body().prepend("Bye "));
+ }
+ };
+ }
+
+ public class AMQConnectionResource implements ConnectionResource {
+ private PooledConnectionFactory pcf;
+
+ public AMQConnectionResource(String connectString, int maxConnections) {
+ super();
+ pcf = new PooledConnectionFactory(connectString);
+ pcf.setMaxConnections(maxConnections);
+ pcf.start();
+ }
+
+ public void stop() {
+ pcf.stop();
+ }
+
+ @Override
+ public Connection borrowConnection() throws Exception {
+ Connection answer = pcf.createConnection();
+ answer.start();
+ return answer;
+ }
+
+ @Override
+ public Connection borrowConnection(long timeout) throws Exception {
+ Connection answer = null;
+ int counter = 0;
+ while (counter++ < timeout) {
+ answer = pcf.createConnection();
+ if (answer == null) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ log.info("Intrupted but ignoring: ", e.getMessage());
+ }
+ } else {
+ break;
+ }
+ }
+ if (answer != null) {
+ answer.start();
+ }
+ return answer;
+ }
+
+ @Override
+ public void returnConnection(Connection connection) throws Exception {
+ // Do nothing in this case since the PooledConnectionFactory takes
+ // care of this for us
+ log.info("Connection returned");
+ }
+ }
+}
Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/SyncJmsInOutIT.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/SyncJmsInOutIT.java?rev=1390665&r1=1390664&r2=1390665&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/SyncJmsInOutIT.java (original)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/SyncJmsInOutIT.java Wed Sep 26 18:51:53 2012
@@ -25,8 +25,11 @@ import org.apache.camel.util.StopWatch;
import org.junit.Test;
+
/**
- * @version
+ * Integration test that verifies the ability of SJMS to correctly process
+ * synchronous InOut exchanges from both the Producer and Consumer perspective
+ * using a namedReplyTo destination.
*/
public class SyncJmsInOutIT extends JmsTestSupport {
@@ -53,23 +56,13 @@ public class SyncJmsInOutIT extends JmsT
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- // in a fully sync mode it would take at least 5 + 5 sec to process the 100 messages
- // (there are delays in both routes)
- // however due async routing, we can leverage the fact to let threads non blocked
- // in the first route, and therefore can have the messages processed faster
- // because we can have messages wait concurrently in both routes
- // this means the async processing model is about 2x faster
from("seda:start")
- // we can only send at fastest the 100 msg in 5 sec due the delay
- .delay(50)
- .to("sjms:queue:bar?exchangePattern=InOut")
+ .to("sjms:queue:in.foo?namedReplyTo=out.bar&exchangePattern=InOut")
.to("mock:result");
- from("sjms:queue:bar?exchangePattern=InOut")
+ from("sjms:queue:in.foo?exchangePattern=InOut")
.log("Using ${threadName} to process ${body}")
- // we can only process at fastest the 100 msg in 5 sec due the delay
- .delay(50)
.transform(body().prepend("Bye "));
}
};
Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/SyncJmsInOutTempDestIT.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/SyncJmsInOutTempDestIT.java?rev=1390665&r1=1390664&r2=1390665&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/SyncJmsInOutTempDestIT.java (original)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/SyncJmsInOutTempDestIT.java Wed Sep 26 18:51:53 2012
@@ -25,8 +25,11 @@ import org.apache.camel.util.StopWatch;
import org.junit.Test;
+
/**
- * @version
+ * Integration test that verifies the ability of SJMS to correctly process
+ * synchronous InOut exchanges from both the Producer and Consumer perspective
+ * using a temporary destination.
*/
public class SyncJmsInOutTempDestIT extends JmsTestSupport {
@@ -53,23 +56,12 @@ public class SyncJmsInOutTempDestIT exte
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- // in a fully sync mode it would take at least 5 + 5 sec to process the 100 messages
- // (there are delays in both routes)
- // however due async routing, we can leverage the fact to let threads non blocked
- // in the first route, and therefore can have the messages processed faster
- // because we can have messages wait concurrently in both routes
- // this means the async processing model is about 2x faster
-
from("seda:start")
- // we can only send at fastest the 100 msg in 5 sec due the delay
- .delay(50)
- .to("sjms:in.out.temp.queue?exchangePattern=InOut")
+ .to("sjms:in.foo.tempQ?exchangePattern=InOut")
.to("mock:result");
- from("sjms:in.out.temp.queue?exchangePattern=InOut")
+ from("sjms:in.foo.tempQ?exchangePattern=InOut")
.log("Using ${threadName} to process ${body}")
- // we can only process at fastest the 100 msg in 5 sec due the delay
- .delay(50)
.transform(body().prepend("Bye "));
}
};
Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/JmsTestSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/JmsTestSupport.java?rev=1390665&r1=1390664&r2=1390665&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/JmsTestSupport.java (original)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/JmsTestSupport.java Wed Sep 26 18:51:53 2012
@@ -29,8 +29,8 @@ import org.apache.camel.impl.DefaultCame
import org.apache.camel.test.junit4.CamelTestSupport;
/**
- * TODO Add Class documentation for JmsTestSupport
- *
+ * A support class that builds up and tears down an ActiveMQ instance to be used
+ * for unit testing.
*/
public class JmsTestSupport extends CamelTestSupport {
private static final String BROKER_URI = "tcp://localhost:33333";
@@ -40,20 +40,21 @@ public class JmsTestSupport extends Came
private Connection connection;
private Session session;
+ /**
+ * Set up the Broker
+ *
+ * @see org.apache.camel.test.junit4.CamelTestSupport#doPreSetup()
+ *
+ * @throws Exception
+ */
@Override
protected void doPreSetup() throws Exception {
- super.doPreSetup();
- }
-
- @Override
- public void setUp() throws Exception {
broker = new BrokerService();
broker.setUseJmx(true);
broker.setPersistent(false);
broker.deleteAllMessages();
broker.addConnector(BROKER_URI);
broker.start();
- super.setUp();
}
@Override
@@ -79,10 +80,9 @@ public class JmsTestSupport extends Came
broker = null;
}
}
-
+
/*
* @see org.apache.camel.test.junit4.CamelTestSupport#createCamelContext()
- *
* @return
* @throws Exception
*/
@@ -99,7 +99,7 @@ public class JmsTestSupport extends Came
camelContext.addComponent("sjms", component);
return camelContext;
}
-
+
public void setSession(Session session) {
this.session = session;
}
Modified: camel/trunk/components/camel-sjms/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/resources/log4j.properties?rev=1390665&r1=1390664&r2=1390665&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/resources/log4j.properties (original)
+++ camel/trunk/components/camel-sjms/src/test/resources/log4j.properties Wed Sep 26 18:51:53 2012
@@ -20,8 +20,8 @@
log4j.rootLogger=INFO, out
# uncomment the following line to turn on Camel debugging
-log4j.logger.org.apache.activemq=info
-log4j.logger.org.apache.activemq.transaction=trace
+log4j.logger.org.apache.activemq=warn
+log4j.logger.org.apache.activemq.transaction=warn
log4j.logger.org.apache.camel=info
log4j.logger.org.apache.camel.converter=info
log4j.logger.org.apache.camel.component.sjms=info