You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2012/08/02 12:56:44 UTC
svn commit: r1368413 [4/5] - in /camel/trunk: components/
components/camel-sjms/ components/camel-sjms/src/
components/camel-sjms/src/main/ components/camel-sjms/src/main/java/
components/camel-sjms/src/main/java/org/
components/camel-sjms/src/main/jav...
Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/TransactedInOnlyTopicConsumerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/TransactedInOnlyTopicConsumerTest.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/TransactedInOnlyTopicConsumerTest.java (added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/TransactedInOnlyTopicConsumerTest.java Thu Aug 2 10:56:40 2012
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.consumer;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.sjms.SjmsComponent;
+import org.apache.camel.component.sjms.jms.JmsMessageHeaderType;
+import org.apache.camel.test.junit4.CamelTestSupport;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TransactedInOnlyTopicConsumerTest extends CamelTestSupport {
+ private static final String TEST_DESTINATION_1 = "sjms:topic:transacted.in.only.topic.consumer.test.1?transacted=true";
+ private static final String TEST_DESTINATION_2 = "sjms:topic:transacted.in.only.topic.consumer.test.2?transacted=true";
+ protected final Logger logger = LoggerFactory.getLogger(getClass());
+
+ @Test
+ public void testTransactedInOnlyTopicConsumerExchangeFailure() throws Exception {
+ // We should see the World message twice, once for the exception
+ getMockEndpoint("mock:test1.topic.mock.before").expectedBodiesReceived("World", "World");
+ getMockEndpoint("mock:test1.topic.mock.after").expectedBodiesReceived("Hello World");
+
+ template.sendBody(TEST_DESTINATION_1, "World");
+
+ getMockEndpoint("mock:test1.topic.mock.before").assertIsSatisfied();
+ getMockEndpoint("mock:test1.topic.mock.after").assertIsSatisfied();
+
+ }
+
+ @Test
+ public void testTransactedInOnlyTopicConsumerRuntimeException() throws Exception {
+ // We should see the World message twice, once for the exception
+ getMockEndpoint("mock:test2.topic.mock.before").expectedBodiesReceived("World", "World");
+ getMockEndpoint("mock:test2.topic.mock.after").expectedBodiesReceived("Hello World");
+
+ template.sendBody(TEST_DESTINATION_2, "World");
+
+ getMockEndpoint("mock:test2.topic.mock.before").assertIsSatisfied();
+ getMockEndpoint("mock:test2.topic.mock.after").assertIsSatisfied();
+ }
+
+ protected CamelContext createCamelContext() throws Exception {
+ CamelContext camelContext = super.createCamelContext();
+
+ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
+ "vm://broker?broker.persistent=false&broker.useJmx=true");
+ SjmsComponent component = new SjmsComponent();
+ component.setConnectionFactory(connectionFactory);
+ camelContext.addComponent("sjms", component);
+
+ return camelContext;
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() {
+ from(TEST_DESTINATION_1)
+ .to("log:test1.before")
+ .to("mock:test1.topic.mock.before")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ logger.info("Begin processing Exchange ID: {}", exchange.getExchangeId());
+ if (!exchange.getIn().getHeader(JmsMessageHeaderType.JMSRedelivered.toString(), String.class).equalsIgnoreCase("true")) {
+ logger.info("Exchange does not have a retry message. Set the exception and allow the retry.");
+ exchange.setException(new RuntimeCamelException("Creating Failure"));
+ } else {
+ logger.info("Exchange has retry header. Continue processing the message.");
+ }
+ }
+ })
+ .transform(body().prepend("Hello "))
+ .to("log:test1.after?showAll=true", "mock:test1.topic.mock.after");
+
+ from(TEST_DESTINATION_2)
+ .to("log:test2.before")
+ .to("mock:test2.topic.mock.before")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ logger.info("Begin processing Exchange ID: {}", exchange.getExchangeId());
+ if (!exchange.getIn().getHeader(JmsMessageHeaderType.JMSRedelivered.toString(), String.class).equalsIgnoreCase("true")) {
+ logger.info("Exchange does not have a retry message. Throw the exception to verify we handle the retry.");
+ throw new RuntimeCamelException("Creating Failure");
+ } else {
+ logger.info("Exchange has retry header. Continue processing the message.");
+ }
+ }
+ })
+ .transform(body().prepend("Hello "))
+ .to("log:test2.after?showAll=true", "mock:test2.topic.mock.after");
+ }
+ };
+ }
+}
Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/AsyncJmsInOutIT.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/AsyncJmsInOutIT.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/AsyncJmsInOutIT.java (added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/AsyncJmsInOutIT.java Thu Aug 2 10:56:40 2012
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.it;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.sjms.support.JmsTestSupport;
+import org.apache.camel.util.StopWatch;
+
+import org.junit.Test;
+
+/**
+ * @version
+ */
+public class AsyncJmsInOutIT extends JmsTestSupport {
+
+ @Test
+ public void testAsynchronous() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(100);
+ mock.expectsNoDuplicates(body());
+
+ StopWatch watch = new StopWatch();
+
+ for (int i = 0; i < 100; i++) {
+ template.sendBody("seda:start", "" + i);
+ }
+
+ // just in case we run on slow boxes
+ assertMockEndpointsSatisfied(20, TimeUnit.SECONDS);
+
+ log.info("Took " + watch.stop() + " ms. to process 100 messages request/reply over JMS");
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ // in a fully sync mode it would take at least 5 + 5 sec to process the 100 messages
+ // (there are delays in both routes)
+ // however due async routing, we can leverage the fact to let threads non blocked
+ // in the first route, and therefore can have the messages processed faster
+ // because we can have messages wait concurrently in both routes
+ // this means the async processing model is about 2x faster
+
+ from("seda:start")
+ // we can only send at fastest the 100 msg in 5 sec due the delay
+ .delay(50)
+ .to("sjms:queue:bar?synchronous=false&exchangePattern=InOut")
+ .to("mock:result");
+
+ from("sjms:queue:bar?synchronous=false&exchangePattern=InOut")
+ .log("Using ${threadName} to process ${body}")
+ // we can only process at fastest the 100 msg in 5 sec due the delay
+ .delay(50)
+ .transform(body().prepend("Bye "));
+ }
+ };
+ }
+}
Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/AsyncJmsInOutTempDestIT.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/AsyncJmsInOutTempDestIT.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/AsyncJmsInOutTempDestIT.java (added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/AsyncJmsInOutTempDestIT.java Thu Aug 2 10:56:40 2012
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.it;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.sjms.support.JmsTestSupport;
+import org.apache.camel.util.StopWatch;
+
+import org.junit.Test;
+
+/**
+ * @version
+ */
+public class AsyncJmsInOutTempDestIT extends JmsTestSupport {
+
+ @Test
+ public void testAsynchronous() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(100);
+ mock.expectsNoDuplicates(body());
+
+ StopWatch watch = new StopWatch();
+
+ for (int i = 0; i < 100; i++) {
+ template.sendBody("seda:start", "" + i);
+ }
+
+ // just in case we run on slow boxes
+ assertMockEndpointsSatisfied(20, TimeUnit.SECONDS);
+
+ log.info("Took " + watch.stop() + " ms. to process 100 messages request/reply over JMS");
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ // in a fully sync mode it would take at least 5 + 5 sec to process the 100 messages
+ // (there are delays in both routes)
+ // however due async routing, we can leverage the fact to let threads non blocked
+ // in the first route, and therefore can have the messages processed faster
+ // because we can have messages wait concurrently in both routes
+ // this means the async processing model is about 2x faster
+
+ from("seda:start")
+ // we can only send at fastest the 100 msg in 5 sec due the delay
+ .delay(50)
+ .to("sjms:in.out.temp.queue?exchangePattern=InOut&synchronous=false")
+ .to("mock:result");
+
+ from("sjms:in.out.temp.queue?exchangePattern=InOut&synchronous=false")
+ .log("Using ${threadName} to process ${body}")
+ // we can only process at fastest the 100 msg in 5 sec due the delay
+ .delay(50)
+ .transform(body().prepend("Bye "));
+ }
+ };
+ }
+}
Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/SyncJmsInOutIT.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/SyncJmsInOutIT.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/SyncJmsInOutIT.java (added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/SyncJmsInOutIT.java Thu Aug 2 10:56:40 2012
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.it;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.sjms.support.JmsTestSupport;
+import org.apache.camel.util.StopWatch;
+
+import org.junit.Test;
+
+/**
+ * @version
+ */
+public class SyncJmsInOutIT extends JmsTestSupport {
+
+ @Test
+ public void testSynchronous() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(100);
+ mock.expectsNoDuplicates(body());
+
+ StopWatch watch = new StopWatch();
+
+ for (int i = 0; i < 100; i++) {
+ template.sendBody("seda:start", "" + i);
+ }
+
+ // just in case we run on slow boxes
+ assertMockEndpointsSatisfied(20, TimeUnit.SECONDS);
+
+ log.info("Took " + watch.stop() + " ms. to process 100 messages request/reply over JMS");
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ // in a fully sync mode it would take at least 5 + 5 sec to process the 100 messages
+ // (there are delays in both routes)
+ // however due async routing, we can leverage the fact to let threads non blocked
+ // in the first route, and therefore can have the messages processed faster
+ // because we can have messages wait concurrently in both routes
+ // this means the async processing model is about 2x faster
+
+ from("seda:start")
+ // we can only send at fastest the 100 msg in 5 sec due the delay
+ .delay(50)
+ .to("sjms:queue:bar?exchangePattern=InOut")
+ .to("mock:result");
+
+ from("sjms:queue:bar?exchangePattern=InOut")
+ .log("Using ${threadName} to process ${body}")
+ // we can only process at fastest the 100 msg in 5 sec due the delay
+ .delay(50)
+ .transform(body().prepend("Bye "));
+ }
+ };
+ }
+}
Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/SyncJmsInOutTempDestIT.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/SyncJmsInOutTempDestIT.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/SyncJmsInOutTempDestIT.java (added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/SyncJmsInOutTempDestIT.java Thu Aug 2 10:56:40 2012
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.it;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.sjms.support.JmsTestSupport;
+import org.apache.camel.util.StopWatch;
+
+import org.junit.Test;
+
+/**
+ * @version
+ */
+public class SyncJmsInOutTempDestIT extends JmsTestSupport {
+
+ @Test
+ public void testSynchronous() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(100);
+ mock.expectsNoDuplicates(body());
+
+ StopWatch watch = new StopWatch();
+
+ for (int i = 0; i < 100; i++) {
+ template.sendBody("seda:start", "" + i);
+ }
+
+ // just in case we run on slow boxes
+ assertMockEndpointsSatisfied(20, TimeUnit.SECONDS);
+
+ log.info("Took " + watch.stop() + " ms. to process 100 messages request/reply over JMS");
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ // in a fully sync mode it would take at least 5 + 5 sec to process the 100 messages
+ // (there are delays in both routes)
+ // however due async routing, we can leverage the fact to let threads non blocked
+ // in the first route, and therefore can have the messages processed faster
+ // because we can have messages wait concurrently in both routes
+ // this means the async processing model is about 2x faster
+
+ from("seda:start")
+ // we can only send at fastest the 100 msg in 5 sec due the delay
+ .delay(50)
+ .to("sjms:in.out.temp.queue?exchangePattern=InOut")
+ .to("mock:result");
+
+ from("sjms:in.out.temp.queue?exchangePattern=InOut")
+ .log("Using ${threadName} to process ${body}")
+ // we can only process at fastest the 100 msg in 5 sec due the delay
+ .delay(50)
+ .transform(body().prepend("Bye "));
+ }
+ };
+ }
+}
Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/pool/DefaultConnectionResourceTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/pool/DefaultConnectionResourceTest.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/pool/DefaultConnectionResourceTest.java (added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/pool/DefaultConnectionResourceTest.java Thu Aug 2 10:56:40 2012
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.pool;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * TODO Add Class documentation for DefaultConnectionResourceTest
+ *
+ */
+public class DefaultConnectionResourceTest {
+ private ActiveMQConnectionFactory connectionFactory;
+
+ @Before
+ public void setup() {
+ connectionFactory = new ActiveMQConnectionFactory(
+ "vm://broker?broker.persistent=false");
+ }
+
+ @After
+ public void teardown() {
+ connectionFactory = null;
+ }
+
+ /**
+ * Test method for
+ * {@link org.apache.camel.component.sjms.pool.DefaultConnectionResourceTest#createObject()}
+ * .
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testCreateObject() throws Exception {
+ DefaultConnectionResource pool = new DefaultConnectionResource(1, connectionFactory);
+ pool.fillPool();
+ assertNotNull(pool);
+ ActiveMQConnection connection = (ActiveMQConnection) pool
+ .borrowObject();
+ assertNotNull(connection);
+ assertTrue(connection.isStarted());
+ pool.drainPool();
+ }
+
+ /**
+ * Test method for
+ * {@link org.apache.camel.component.sjms.pool.DefaultConnectionResourceTest#createObject()}
+ * .
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testDestroyObject() throws Exception {
+ DefaultConnectionResource pool = new DefaultConnectionResource(1, connectionFactory);
+ pool.fillPool();
+ assertNotNull(pool);
+ ActiveMQConnection connection = (ActiveMQConnection) pool
+ .borrowObject();
+ assertNotNull(connection);
+ assertTrue(connection.isStarted());
+ pool.drainPool();
+ assertTrue(pool.size() == 0);
+ }
+
+ /**
+ * Test method for
+ * {@link org.apache.camel.component.sjms.pool.ObjectPool#borrowObject()}.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testBorrowObject() throws Exception {
+ DefaultConnectionResource pool = new DefaultConnectionResource(1, connectionFactory);
+ pool.fillPool();
+ assertNotNull(pool);
+ ActiveMQConnection connection = (ActiveMQConnection) pool.borrowObject();
+ assertNotNull(connection);
+ assertTrue(connection.isStarted());
+
+ ActiveMQConnection connection2 = (ActiveMQConnection) pool.borrowObject();
+ assertNull(connection2);
+ pool.drainPool();
+ }
+
+ /**
+ * Test method for
+ * {@link org.apache.camel.component.sjms.pool.ObjectPool#returnObject(java.lang.Object)}
+ * .
+ * @throws Exception
+ */
+ @Test
+ public void testReturnObject() throws Exception {
+ DefaultConnectionResource pool = new DefaultConnectionResource(1, connectionFactory);
+ pool.fillPool();
+ assertNotNull(pool);
+ ActiveMQConnection connection = (ActiveMQConnection) pool.borrowObject();
+ assertNotNull(connection);
+ assertTrue(connection.isStarted());
+ pool.returnObject(connection);
+ ActiveMQConnection connection2 = (ActiveMQConnection) pool.borrowObject();
+ assertNotNull(connection2);
+ pool.drainPool();
+ }
+
+}
Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/pool/ObjectPoolTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/pool/ObjectPoolTest.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/pool/ObjectPoolTest.java (added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/pool/ObjectPoolTest.java Thu Aug 2 10:56:40 2012
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.pool;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * TODO Add Class documentation for ObjectPoolTest
+ *
+ */
+public class ObjectPoolTest {
+
+ private static final Logger LOGGER = LoggerFactory
+ .getLogger(ObjectPoolTest.class);
+
+
+ private AtomicInteger atomicInteger;
+
+ @Before
+ public void setUp() {
+ atomicInteger = new AtomicInteger();
+ }
+
+ /**
+ * Test method for
+ * {@link org.apache.camel.component.sjms.pool.ObjectPool#ObjectPool()}.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testObjectPool() throws Exception {
+ TestPool testPool = new TestPool();
+ assertNotNull(testPool);
+ testPool.fillPool();
+ MyPooledObject pooledObject = testPool.borrowObject();
+ assertNotNull(pooledObject);
+ assertTrue("Expected a value of 1. Returned: " + pooledObject.getObjectId(), pooledObject.getObjectId() == 1);
+
+ MyPooledObject nextPooledObject = testPool.borrowObject();
+ assertNull(nextPooledObject);
+
+ testPool.returnObject(pooledObject);
+ nextPooledObject = testPool.borrowObject();
+ assertNotNull(nextPooledObject);
+ testPool.drainPool();
+ }
+
+ /**
+ * Test method for
+ * {@link org.apache.camel.component.sjms.pool.ObjectPool#ObjectPool()}.
+ */
+ @Test
+ public void testBadObjectPool() {
+ try {
+ new BadTestPool();
+ } catch (Exception e) {
+ assertTrue("Should have thrown an IllegalStateException", e instanceof IllegalStateException);
+ }
+ }
+
+ /**
+ * Test method for
+ * {@link org.apache.camel.component.sjms.pool.ObjectPool#ObjectPool(int)}.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testObjectPoolInt() throws Exception {
+ final int maxPoolObjects = 5;
+
+ TestPool testPool = new TestPool(maxPoolObjects);
+ testPool.fillPool();
+
+ List<MyPooledObject> poolObjects = new ArrayList<MyPooledObject>();
+ for (int i = 0; i < maxPoolObjects; i++) {
+ poolObjects.add(testPool.borrowObject());
+ }
+ for (int i = 0; i < maxPoolObjects; i++) {
+ MyPooledObject pooledObject = poolObjects.get(i);
+ assertNotNull("MyPooledObject was null for borrow attempt: " + i, pooledObject);
+ assertTrue("Expected a value in the range of 1-5. Returned: " + pooledObject.getObjectId(), pooledObject.getObjectId() > 0 && pooledObject.getObjectId() < 6);
+ LOGGER.info("MyPooledObject has an ID of: " + pooledObject.getObjectId());
+ }
+
+ assertNull("Pool should be empty", testPool.borrowObject());
+
+ for (MyPooledObject myPooledObject : poolObjects) {
+ testPool.returnObject(myPooledObject);
+ }
+
+ MyPooledObject pooledObject = testPool.borrowObject();
+ assertNotNull(pooledObject);
+ assertTrue("Expected a value in the range of 1-5. Returned: " + pooledObject.getObjectId(), pooledObject.getObjectId() > 0 && pooledObject.getObjectId() < 6);
+
+ testPool.drainPool();
+ }
+
+ /**
+ * Test method for
+ * {@link org.apache.camel.component.sjms.pool.ObjectPool#createObject()}.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testCreateObject() throws Exception {
+ TestPool testPool = new TestPool();
+ assertNotNull(testPool.createObject());
+ }
+
+ /**
+ * Test method for
+ * {@link org.apache.camel.component.sjms.pool.ObjectPool#borrowObject()}.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testBorrowObject() throws Exception {
+ TestPool testPool = new TestPool();
+ testPool.fillPool();
+ MyPooledObject pooledObject = testPool.borrowObject();
+ assertNotNull(pooledObject);
+ assertTrue("Expected a value of 1. Returned: " + pooledObject.getObjectId(), pooledObject.getObjectId() == 1);
+
+ MyPooledObject nextPooledObject = testPool.borrowObject();
+ assertNull("Expected a null as the pool of 1 was already removed", nextPooledObject);
+ testPool.drainPool();
+ }
+
+ /**
+ * Test method for
+ * {@link org.apache.camel.component.sjms.pool.ObjectPool#returnObject(java.lang.Object)}
+ * .
+ * @throws Exception
+ */
+ @Test
+ public void testReturnObject() throws Exception {
+ TestPool testPool = new TestPool();
+ testPool.fillPool();
+ assertNotNull(testPool);
+ MyPooledObject pooledObject = testPool.borrowObject();
+ MyPooledObject nextPooledObject = testPool.borrowObject();
+ testPool.returnObject(pooledObject);
+ nextPooledObject = testPool.borrowObject();
+ assertNotNull(nextPooledObject);
+ testPool.drainPool();
+ }
+
+ class TestPool extends ObjectPool<MyPooledObject> {
+
+ public TestPool() {
+ super();
+ }
+
+ public TestPool(int poolSize) {
+ super(poolSize);
+ }
+
+ @Override
+ protected MyPooledObject createObject() throws Exception {
+ return new MyPooledObject(atomicInteger.incrementAndGet());
+ }
+
+ @Override
+ protected void destroyObject(MyPooledObject t) throws Exception {
+ t = null;
+ }
+
+ }
+
+ class MyPooledObject {
+ private int objectId = -1;
+
+ public MyPooledObject(int objectId) {
+ this.objectId = objectId;
+ }
+
+ /**
+ *
+ * @return the OBJECT_ID
+ */
+ public Integer getObjectId() {
+ return this.objectId;
+ }
+ }
+
+
+ class BadTestPool extends ObjectPool<Object> {
+ @Override
+ protected Object createObject() throws Exception {
+ throw new Exception();
+ }
+
+ @Override
+ protected void destroyObject(Object t) throws Exception {
+ // TODO Auto-generated method stub
+
+ }
+
+ }
+}
Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/pool/SessionPoolTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/pool/SessionPoolTest.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/pool/SessionPoolTest.java (added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/pool/SessionPoolTest.java Thu Aug 2 10:56:40 2012
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.pool;
+
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQSession;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * TODO Add Class documentation for DefaultConnectionResourceTest
+ *
+ */
+public class SessionPoolTest {
+ private ActiveMQConnectionFactory connectionFactory;
+
+ @Before
+ public void setup() {
+ connectionFactory = new ActiveMQConnectionFactory(
+ "vm://broker?broker.persistent=false");
+ }
+
+ @After
+ public void teardown() {
+ connectionFactory = null;
+ }
+
+ /**
+ * Test method for
+ * {@link org.apache.camel.component.sjms.pool.SessionPoolTest#createObject()}
+ * .
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testCreateObject() throws Exception {
+ DefaultConnectionResource connections = new DefaultConnectionResource(1, connectionFactory);
+ connections.fillPool();
+ SessionPool sessions = new SessionPool(1, connections);
+ sessions.fillPool();
+ assertNotNull(sessions);
+ Session session = sessions.createObject();
+ assertNotNull(session);
+ sessions.drainPool();
+ connections.drainPool();
+ }
+
+ /**
+ * Test method for
+ * {@link org.apache.camel.component.sjms.pool.ObjectPool#borrowObject()}.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testBorrowObject() throws Exception {
+ DefaultConnectionResource connections = new DefaultConnectionResource(1, connectionFactory);
+ connections.fillPool();
+ SessionPool sessions = new SessionPool(1, connections);
+ sessions.fillPool();
+ assertNotNull(sessions);
+ ActiveMQSession session = (ActiveMQSession) sessions.borrowObject();
+ assertNotNull(session);
+ assertTrue(!session.isClosed());
+
+ ActiveMQSession session2 = (ActiveMQSession) sessions.borrowObject();
+ assertNull(session2);
+ sessions.drainPool();
+ connections.drainPool();
+ }
+
+ /**
+ * Test method for
+ * {@link org.apache.camel.component.sjms.pool.ObjectPool#returnObject(java.lang.Object)}
+ * .
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testReturnObject() throws Exception {
+ DefaultConnectionResource connections = new DefaultConnectionResource(1, connectionFactory);
+ connections.fillPool();
+ SessionPool sessions = new SessionPool(1, connections);
+ sessions.fillPool();
+ assertNotNull(sessions);
+ ActiveMQSession session = (ActiveMQSession) sessions.borrowObject();
+ assertNotNull(session);
+ assertTrue(!session.isClosed());
+
+ ActiveMQSession session2 = (ActiveMQSession) sessions.borrowObject();
+ assertNull(session2);
+
+ sessions.returnObject(session);
+ session2 = (ActiveMQSession) sessions.borrowObject();
+ assertNotNull(session2);
+
+ sessions.drainPool();
+ connections.drainPool();
+ }
+
+}
Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/AsyncQueueProducerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/AsyncQueueProducerTest.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/AsyncQueueProducerTest.java (added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/AsyncQueueProducerTest.java Thu Aug 2 10:56:40 2012
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.producer;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.sjms.SjmsComponent;
+import org.apache.camel.component.sjms.support.MyAsyncComponent;
+import org.apache.camel.test.junit4.CamelTestSupport;
+
+import org.junit.Test;
+
+/**
+ * @version
+ */
+public class AsyncQueueProducerTest extends CamelTestSupport {
+
+ private static String beforeThreadName;
+ private static String afterThreadName;
+ private static String sedaThreadName;
+ private static String route = "";
+
+ @Test
+ public void testAsyncJmsProducerEndpoint() throws Exception {
+ getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
+ getMockEndpoint("mock:after").expectedBodiesReceived("Bye Camel");
+ getMockEndpoint("mock:result").expectedBodiesReceived("Bye Camel");
+
+ template.sendBody("direct:start", "Hello Camel");
+ // we should run before the async processor that sets B
+ route += "A";
+
+ assertMockEndpointsSatisfied();
+
+ assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
+ assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(sedaThreadName));
+ assertFalse("Should use different threads", afterThreadName.equalsIgnoreCase(sedaThreadName));
+
+ assertEquals("AB", route);
+ }
+
+ protected CamelContext createCamelContext() throws Exception {
+ CamelContext camelContext = super.createCamelContext();
+
+ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
+ "vm://broker?broker.persistent=false&broker.useJmx=false");
+ SjmsComponent component = new SjmsComponent();
+ component.setConnectionFactory(connectionFactory);
+ camelContext.addComponent("sjms", component);
+
+ return camelContext;
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ context.addComponent("async", new MyAsyncComponent());
+
+ from("direct:start")
+ .to("mock:before")
+ .to("log:before")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ beforeThreadName = Thread.currentThread().getName();
+ }
+ })
+ .to("async:bye:camel")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ afterThreadName = Thread.currentThread().getName();
+ }
+ })
+ .to("sjms:queue:foo?synchronous=false");
+
+ from("sjms:queue:foo?synchronous=false")
+ .to("mock:after")
+ .to("log:after")
+ .delay(1000)
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ route += "B";
+ sedaThreadName = Thread.currentThread().getName();
+ }
+ })
+ .to("mock:result");
+ }
+ };
+ }
+}
Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/AsyncTopicProducerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/AsyncTopicProducerTest.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/AsyncTopicProducerTest.java (added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/AsyncTopicProducerTest.java Thu Aug 2 10:56:40 2012
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.producer;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.sjms.SjmsComponent;
+import org.apache.camel.component.sjms.support.MyAsyncComponent;
+import org.apache.camel.test.junit4.CamelTestSupport;
+
+import org.junit.Test;
+
+/**
+ * @version
+ */
+public class AsyncTopicProducerTest extends CamelTestSupport {
+
+ private static String beforeThreadName;
+ private static String afterThreadName;
+ private static String sedaThreadName;
+ private static String route = "";
+
+ @Test
+ public void testAsyncTopicProducer() throws Exception {
+ getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
+ getMockEndpoint("mock:after").expectedBodiesReceived("Bye Camel");
+ getMockEndpoint("mock:result").expectedBodiesReceived("Bye Camel");
+
+ template.sendBody("direct:start", "Hello Camel");
+ // we should run before the async processor that sets B
+ route += "A";
+
+ assertMockEndpointsSatisfied();
+
+ assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
+ assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(sedaThreadName));
+ assertFalse("Should use different threads", afterThreadName.equalsIgnoreCase(sedaThreadName));
+
+ assertEquals("AB", route);
+ }
+
+ protected CamelContext createCamelContext() throws Exception {
+ CamelContext camelContext = super.createCamelContext();
+
+ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
+ "vm://broker?broker.persistent=false&broker.useJmx=false");
+ SjmsComponent component = new SjmsComponent();
+ component.setConnectionFactory(connectionFactory);
+ camelContext.addComponent("sjms", component);
+
+ return camelContext;
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ context.addComponent("async", new MyAsyncComponent());
+
+ from("direct:start")
+ .to("mock:before")
+ .to("log:before")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ beforeThreadName = Thread.currentThread().getName();
+ }
+ })
+ .to("async:bye:camel")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ afterThreadName = Thread.currentThread().getName();
+ }
+ })
+ .to("sjms:topic:foo?synchronous=false");
+
+ from("sjms:topic:foo")
+ .to("mock:after")
+ .to("log:after")
+ .delay(1000)
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ route += "B";
+ sedaThreadName = Thread.currentThread().getName();
+ }
+ })
+ .to("mock:result");
+ }
+ };
+ }
+}
Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOnlyQueueProducerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOnlyQueueProducerTest.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOnlyQueueProducerTest.java (added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOnlyQueueProducerTest.java Thu Aug 2 10:56:40 2012
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.producer;
+
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.TextMessage;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.sjms.jms.JmsObjectFactory;
+import org.apache.camel.component.sjms.support.JmsTestSupport;
+
+import org.junit.Test;
+
+public class InOnlyQueueProducerTest extends JmsTestSupport {
+
+ private static final String TEST_DESTINATION_NAME = "sync.queue.producer.test";
+
+ public InOnlyQueueProducerTest() {
+ }
+
+ @Override
+ protected boolean useJmx() {
+ return false;
+ }
+
+ @Test
+ public void testInOnlyQueueProducer() throws Exception {
+ MessageConsumer mc = JmsObjectFactory.createQueueConsumer(getSession(), TEST_DESTINATION_NAME);
+ assertNotNull(mc);
+ final String expectedBody = "Hello World!";
+ MockEndpoint mock = getMockEndpoint("mock:result");
+
+ mock.expectedMessageCount(1);
+ mock.expectedBodiesReceived(expectedBody);
+
+ template.sendBody("direct:start", expectedBody);
+ Message message = mc.receive(5000);
+ assertNotNull(message);
+ assertTrue(message instanceof TextMessage);
+
+ TextMessage tm = (TextMessage) message;
+ String text = tm.getText();
+ assertNotNull(text);
+
+ template.sendBody("direct:finish", text);
+
+ mock.assertIsSatisfied();
+ mc.close();
+
+ }
+
+ /*
+ * @see org.apache.camel.test.junit4.CamelTestSupport#createRouteBuilder()
+ *
+ * @return
+ * @throws Exception
+ */
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() {
+ from("direct:start")
+ .to("sjms:queue:" + TEST_DESTINATION_NAME);
+
+ from("direct:finish")
+ .to("log:test.log.1?showBody=true", "mock:result");
+ }
+ };
+ }
+}
Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOnlyTopicProducerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOnlyTopicProducerTest.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOnlyTopicProducerTest.java (added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOnlyTopicProducerTest.java Thu Aug 2 10:56:40 2012
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.producer;
+
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.TextMessage;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.sjms.jms.JmsObjectFactory;
+import org.apache.camel.component.sjms.support.JmsTestSupport;
+
+import org.junit.Test;
+
+public class InOnlyTopicProducerTest extends JmsTestSupport {
+
+ private static final String TEST_DESTINATION_NAME = "test.foo.topic";
+
+ public InOnlyTopicProducerTest() {
+ }
+
+ @Override
+ protected boolean useJmx() {
+ return false;
+ }
+
+ @Test
+ public void testInOnlyTopicProducerProducer() throws Exception {
+ MessageConsumer mc = JmsObjectFactory.createTopicConsumer(getSession(), TEST_DESTINATION_NAME, null);
+ assertNotNull(mc);
+ final String expectedBody = "Hello World!";
+ MockEndpoint mock = getMockEndpoint("mock:result");
+
+ mock.expectedMessageCount(1);
+ mock.expectedBodiesReceived(expectedBody);
+
+ template.sendBody("direct:start", expectedBody);
+ Message message = mc.receive(5000);
+ assertNotNull(message);
+ assertTrue(message instanceof TextMessage);
+
+ TextMessage tm = (TextMessage) message;
+ String text = tm.getText();
+ assertNotNull(text);
+
+ template.sendBody("direct:finish", text);
+
+ mock.assertIsSatisfied();
+ mc.close();
+
+ }
+
+ /*
+ * @see org.apache.camel.test.junit4.CamelTestSupport#createRouteBuilder()
+ *
+ * @return
+ * @throws Exception
+ */
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() {
+ from("direct:start")
+ .to("sjms:topic:" + TEST_DESTINATION_NAME);
+
+ from("direct:finish")
+ .to("log:test.log.1?showBody=true", "mock:result");
+ }
+ };
+ }
+}
Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerAsyncLoadTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerAsyncLoadTest.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerAsyncLoadTest.java (added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerAsyncLoadTest.java Thu Aug 2 10:56:40 2012
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.producer;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.TextMessage;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.sjms.jms.JmsObjectFactory;
+import org.apache.camel.component.sjms.support.JmsTestSupport;
+
+import org.junit.Test;
+
+public class InOutQueueProducerAsyncLoadTest extends JmsTestSupport {
+
+ private static final String TEST_DESTINATION_NAME = "in.out.queue.producer.test";
+ private MessageConsumer mc1;
+ private MessageConsumer mc2;
+
+ public InOutQueueProducerAsyncLoadTest() {
+ }
+
+ @Override
+ protected boolean useJmx() {
+ return false;
+ }
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ mc1 = JmsObjectFactory.createQueueConsumer(getSession(), TEST_DESTINATION_NAME + ".request");
+ mc2 = JmsObjectFactory.createQueueConsumer(getSession(), TEST_DESTINATION_NAME + ".request");
+ mc1.setMessageListener(new MyMessageListener());
+ mc2.setMessageListener(new MyMessageListener());
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ MyMessageListener l1 = (MyMessageListener)mc1.getMessageListener();
+ l1.close();
+ mc1.close();
+ MyMessageListener l2 = (MyMessageListener)mc2.getMessageListener();
+ l2.close();
+ mc2.close();
+ super.tearDown();
+ }
+
+ /**
+ * Test to verify that when using the consumer listener for the InOut
+ * producer we get the correct message back.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testInOutQueueProducer() throws Exception {
+
+ ExecutorService executor = Executors.newFixedThreadPool(2);
+
+ for (int i = 1; i <= 5000; i++) {
+ final int tempI = i;
+ Runnable worker = new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ final String requestText = "Message " + tempI;
+ final String responseText = "Response Message " + tempI;
+ String response = template.requestBody("direct:start", requestText, String.class);
+ assertNotNull(response);
+ assertEquals(responseText, response);
+ } catch (Exception e) {
+ log.error("TODO Auto-generated catch block", e);
+ }
+ }
+ };
+ executor.execute(worker);
+ }
+ while (context.getInflightRepository().size() > 0) {
+
+ }
+ executor.shutdown();
+ while (!executor.isTerminated()) {
+ //
+ }
+ }
+
+ /*
+ * @see org.apache.camel.test.junit4.CamelTestSupport#createRouteBuilder()
+ *
+ * @return
+ *
+ * @throws Exception
+ */
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() {
+ from("direct:start")
+ .to("log:" + TEST_DESTINATION_NAME + ".in.log?showBody=true")
+ .inOut("sjms:queue:" + TEST_DESTINATION_NAME + ".request" + "?namedReplyTo="
+ + TEST_DESTINATION_NAME
+ + ".response&consumerCount=10&producerCount=20&synchronous=false").threads(20)
+ .to("log:" + TEST_DESTINATION_NAME + ".out.log?showBody=true");
+ }
+ };
+ }
+
+ protected class MyMessageListener implements MessageListener {
+ private MessageProducer mp;
+
+ public MyMessageListener() {
+ super();
+ }
+
+ @Override
+ public void onMessage(Message message) {
+ try {
+ TextMessage request = (TextMessage)message;
+ String text = request.getText();
+
+ TextMessage response = getSession().createTextMessage();
+ response.setText("Response " + text);
+ response.setJMSCorrelationID(request.getJMSCorrelationID());
+ if (mp == null) {
+ mp = getSession().createProducer(message.getJMSReplyTo());
+ }
+ mp.send(response);
+ } catch (JMSException e) {
+ fail(e.getLocalizedMessage());
+ }
+ }
+
+ public void close() throws JMSException {
+ mp.close();
+ }
+ }
+}
Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerSyncLoadTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerSyncLoadTest.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerSyncLoadTest.java (added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerSyncLoadTest.java Thu Aug 2 10:56:40 2012
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.producer;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.TextMessage;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.sjms.jms.JmsObjectFactory;
+import org.apache.camel.component.sjms.support.JmsTestSupport;
+
+import org.junit.Test;
+
+public class InOutQueueProducerSyncLoadTest extends JmsTestSupport {
+
+ private static final String TEST_DESTINATION_NAME = "in.out.queue.producer.test";
+ private MessageConsumer mc1;
+ private MessageConsumer mc2;
+
+ public InOutQueueProducerSyncLoadTest() {
+ }
+
+ @Override
+ protected boolean useJmx() {
+ return false;
+ }
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ mc1 = JmsObjectFactory.createQueueConsumer(getSession(), TEST_DESTINATION_NAME + ".request");
+ mc2 = JmsObjectFactory.createQueueConsumer(getSession(), TEST_DESTINATION_NAME + ".request");
+ mc1.setMessageListener(new MyMessageListener());
+ mc2.setMessageListener(new MyMessageListener());
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ MyMessageListener l1 = (MyMessageListener)mc1.getMessageListener();
+ l1.close();
+ mc1.close();
+ MyMessageListener l2 = (MyMessageListener)mc2.getMessageListener();
+ l2.close();
+ mc2.close();
+ super.tearDown();
+ }
+
+ /**
+ * Test to verify that when using the consumer listener for the InOut
+ * producer we get the correct message back.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testInOutQueueProducer() throws Exception {
+
+ ExecutorService executor = Executors.newFixedThreadPool(2);
+
+ for (int i = 1; i <= 5000; i++) {
+ final int tempI = i;
+ Runnable worker = new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ final String requestText = "Message " + tempI;
+ final String responseText = "Response Message " + tempI;
+ String response = template.requestBody("direct:start", requestText, String.class);
+ assertNotNull(response);
+ assertEquals(responseText, response);
+ } catch (Exception e) {
+ log.error("TODO Auto-generated catch block", e);
+ }
+ }
+ };
+ executor.execute(worker);
+ }
+ while (context.getInflightRepository().size() > 0) {
+
+ }
+ executor.shutdown();
+ while (!executor.isTerminated()) {
+ //
+ }
+ }
+
+ /*
+ * @see org.apache.camel.test.junit4.CamelTestSupport#createRouteBuilder()
+ *
+ * @return
+ *
+ * @throws Exception
+ */
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() {
+ from("direct:start")
+ .to("log:" + TEST_DESTINATION_NAME + ".in.log?showBody=true")
+ .inOut("sjms:queue:" + TEST_DESTINATION_NAME + ".request" + "?namedReplyTo="
+ + TEST_DESTINATION_NAME
+ + ".response&consumerCount=20&producerCount=40&synchronous=true")
+ .to("log:" + TEST_DESTINATION_NAME + ".out.log?showBody=true");
+ }
+ };
+ }
+
+ protected class MyMessageListener implements MessageListener {
+ private MessageProducer mp;
+
+ public MyMessageListener() {
+ super();
+ }
+
+ @Override
+ public void onMessage(Message message) {
+ try {
+ TextMessage request = (TextMessage)message;
+ String text = request.getText();
+
+ TextMessage response = getSession().createTextMessage();
+ response.setText("Response " + text);
+ response.setJMSCorrelationID(request.getJMSCorrelationID());
+ if (mp == null) {
+ mp = getSession().createProducer(message.getJMSReplyTo());
+ }
+ mp.send(response);
+ } catch (JMSException e) {
+ fail(e.getLocalizedMessage());
+ }
+ }
+
+ public void close() throws JMSException {
+ mp.close();
+ }
+ }
+}
Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerTest.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerTest.java (added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerTest.java Thu Aug 2 10:56:40 2012
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.producer;
+
+import java.util.UUID;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.TextMessage;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.sjms.jms.JmsObjectFactory;
+import org.apache.camel.component.sjms.support.JmsTestSupport;
+
+import org.junit.Test;
+
+public class InOutQueueProducerTest extends JmsTestSupport {
+
+ private static final String TEST_DESTINATION_NAME = "in.out.queue.producer.test";
+
+ public InOutQueueProducerTest() {
+ }
+
+ @Override
+ protected boolean useJmx() {
+ return false;
+ }
+
+ @Test
+ public void testInOutQueueProducer() throws Exception {
+ MessageConsumer mc = JmsObjectFactory.createQueueConsumer(getSession(), TEST_DESTINATION_NAME
+ + ".request");
+ assertNotNull(mc);
+ final String requestText = "Hello World!";
+ final String responseText = "How are you";
+ mc.setMessageListener(new MyMessageListener(requestText, responseText));
+ Object responseObject = template.requestBody("direct:start", requestText);
+ assertNotNull(responseObject);
+ assertTrue(responseObject instanceof String);
+ assertEquals(responseText, responseObject);
+ mc.close();
+
+ }
+
+ @Test
+ public void testInOutQueueProducerWithCorrelationId() throws Exception {
+ MessageConsumer mc = JmsObjectFactory.createQueueConsumer(getSession(), TEST_DESTINATION_NAME
+ + ".request");
+ assertNotNull(mc);
+ final String requestText = "Hello World!";
+ final String responseText = "How are you";
+ mc.setMessageListener(new MyMessageListener(requestText, responseText));
+ final String correlationId = UUID.randomUUID().toString().replace("-", "");
+ Exchange exchange = template.request("direct:start", new Processor() {
+
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ exchange.getOut().setBody(requestText);
+ exchange.getOut().setHeader("JMSCorrelationID", correlationId);
+ }
+ });
+ assertNotNull(exchange);
+ assertTrue(exchange.getIn().getBody() instanceof String);
+ assertEquals(responseText, exchange.getIn().getBody());
+ assertEquals(correlationId, exchange.getIn().getHeader("JMSCorrelationID", String.class));
+ mc.close();
+
+ }
+
+ /*
+ * @see org.apache.camel.test.junit4.CamelTestSupport#createRouteBuilder()
+ *
+ * @return
+ *
+ * @throws Exception
+ */
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() {
+ from("direct:start")
+ .to("log:" + TEST_DESTINATION_NAME + ".in.log.1?showBody=true")
+ .inOut("sjms:queue:" + TEST_DESTINATION_NAME + ".request" + "?namedReplyTo="
+ + TEST_DESTINATION_NAME + ".response")
+ .to("log:" + TEST_DESTINATION_NAME + ".out.log.1?showBody=true");
+ }
+ };
+ }
+
+ protected class MyMessageListener implements MessageListener {
+ private String requestText;
+ private String responseText;
+
+ /**
+ * TODO Add Constructor Javadoc
+ *
+ * @param request
+ * @param response
+ */
+ public MyMessageListener(String request, String response) {
+ super();
+ this.requestText = request;
+ this.responseText = response;
+ }
+
+ @Override
+ public void onMessage(Message message) {
+ try {
+ TextMessage request = (TextMessage)message;
+ assertNotNull(request);
+ String text = request.getText();
+ assertEquals(requestText, text);
+
+ TextMessage response = getSession().createTextMessage();
+ response.setText(responseText);
+ response.setJMSCorrelationID(request.getJMSCorrelationID());
+ MessageProducer mp = getSession().createProducer(message.getJMSReplyTo());
+ mp.send(response);
+ mp.close();
+ } catch (JMSException e) {
+ fail(e.getLocalizedMessage());
+ }
+ }
+ }
+}
Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutTempQueueProducerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutTempQueueProducerTest.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutTempQueueProducerTest.java (added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutTempQueueProducerTest.java Thu Aug 2 10:56:40 2012
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.producer;
+
+import java.util.UUID;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.TextMessage;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.component.sjms.jms.JmsObjectFactory;
+import org.apache.camel.component.sjms.support.JmsTestSupport;
+
+import org.junit.Test;
+
+public class InOutTempQueueProducerTest extends JmsTestSupport {
+
+ @Override
+ protected boolean useJmx() {
+ return true;
+ }
+
+ @Test
+ public void testInOutQueueProducer() throws Exception {
+ String queueName = "in.out.queue.producer.test.request";
+ MessageConsumer mc = JmsObjectFactory.createQueueConsumer(getSession(), queueName);
+ assertNotNull(mc);
+ final String requestText = "Hello World!";
+ final String responseText = "How are you";
+ mc.setMessageListener(new MyMessageListener(requestText, responseText));
+ Object responseObject = template.requestBody("sjms:queue:" + queueName + "?exchangePattern=InOut", requestText);
+ assertNotNull(responseObject);
+ assertTrue(responseObject instanceof String);
+ assertEquals(responseText, responseObject);
+ mc.close();
+
+ }
+
+ @Test
+ public void testInOutQueueProducerWithCorrelationId() throws Exception {
+ String queueName = "in.out.queue.producer.test.request";
+ MessageConsumer mc = JmsObjectFactory.createQueueConsumer(getSession(), queueName);
+ assertNotNull(mc);
+ final String requestText = "Hello World!";
+ final String responseText = "How are you";
+ mc.setMessageListener(new MyMessageListener(requestText, responseText));
+ final String correlationId = UUID.randomUUID().toString().replace("-", "");
+ Exchange exchange = template.request("sjms:queue:" + queueName + "?exchangePattern=InOut", new Processor() {
+
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setBody(requestText);
+ exchange.getIn().setHeader("JMSCorrelationID", correlationId);
+ }
+ });
+ assertNotNull(exchange);
+ assertTrue(exchange.getIn().getBody() instanceof String);
+ assertEquals(responseText, exchange.getOut().getBody());
+ assertEquals(correlationId, exchange.getOut().getHeader("JMSCorrelationID", String.class));
+ mc.close();
+
+ }
+
+ protected class MyMessageListener implements MessageListener {
+ private String requestText;
+ private String responseText;
+
+ /**
+ * TODO Add Constructor Javadoc
+ *
+ * @param request
+ * @param response
+ */
+ public MyMessageListener(String request, String response) {
+ super();
+ this.requestText = request;
+ this.responseText = response;
+ }
+
+ @Override
+ public void onMessage(Message message) {
+ try {
+ TextMessage request = (TextMessage) message;
+ assertNotNull(request);
+ String text = request.getText();
+ assertEquals(requestText, text);
+
+ TextMessage response = getSession().createTextMessage();
+ response.setText(responseText);
+ response.setJMSCorrelationID(request.getJMSCorrelationID());
+ MessageProducer mp = getSession().createProducer(message.getJMSReplyTo());
+ mp.send(response);
+ mp.close();
+ } catch (JMSException e) {
+ fail(e.getLocalizedMessage());
+ }
+ }
+ }
+}
Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerTest.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerTest.java (added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerTest.java Thu Aug 2 10:56:40 2012
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.producer;
+
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.TextMessage;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.sjms.jms.JmsObjectFactory;
+import org.apache.camel.component.sjms.support.JmsTestSupport;
+
+import org.junit.Test;
+
+public class QueueProducerTest extends JmsTestSupport {
+
+ private static final String TEST_DESTINATION_NAME = "test.foo";
+
+ public QueueProducerTest() {
+ }
+
+ @Override
+ protected boolean useJmx() {
+ return false;
+ }
+
+ @Test
+ public void testQueueProducer() throws Exception {
+ MessageConsumer mc = JmsObjectFactory.createQueueConsumer(getSession(), TEST_DESTINATION_NAME);
+ assertNotNull(mc);
+ final String expectedBody = "Hello World!";
+ MockEndpoint mock = getMockEndpoint("mock:result");
+
+ mock.expectedMessageCount(1);
+ mock.expectedBodiesReceived(expectedBody);
+
+ template.sendBody("direct:start", expectedBody);
+ Message message = mc.receive(5000);
+ assertNotNull(message);
+ assertTrue(message instanceof TextMessage);
+
+ TextMessage tm = (TextMessage) message;
+ String text = tm.getText();
+ assertNotNull(text);
+
+ template.sendBody("direct:finish", text);
+
+ mock.assertIsSatisfied();
+ mc.close();
+
+ }
+
+ /*
+ * @see org.apache.camel.test.junit4.CamelTestSupport#createRouteBuilder()
+ *
+ * @return
+ * @throws Exception
+ */
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() {
+ from("direct:start")
+ .to("sjms:queue:" + TEST_DESTINATION_NAME);
+
+ from("direct:finish")
+ .to("log:test.log.1?showBody=true", "mock:result");
+ }
+ };
+ }
+}